Fixed BSD unblockPollSocket(). Was signaling to break out of

poll() but there was a race condition where the thread trying
to unblock poll() could send the signal before the polling
thread had entered poll().  Now using a pipe and polling on
that and the client's sockets, and just writing a byte into
the pipe to unblock poll.  This persists until the next call
to poll() so we might force poll() to return once unnecessarily
but that's not a problem.  This change makes the BSD code
similar to the winsock code, which uses a winsock event instead
of a pipe.
This commit is contained in:
crs 2004-02-29 16:48:22 +00:00
parent d6ec331b09
commit 1ccb92b888
4 changed files with 203 additions and 110 deletions

View File

@ -69,6 +69,7 @@ public:
bool m_cancelling; bool m_cancelling;
bool m_exited; bool m_exited;
void* m_result; void* m_result;
void* m_networkData;
}; };
CArchThreadImpl::CArchThreadImpl() : CArchThreadImpl::CArchThreadImpl() :
@ -79,7 +80,8 @@ CArchThreadImpl::CArchThreadImpl() :
m_cancel(false), m_cancel(false),
m_cancelling(false), m_cancelling(false),
m_exited(false), m_exited(false),
m_result(NULL) m_result(NULL),
m_networkData(NULL)
{ {
// do nothing // do nothing
} }
@ -149,9 +151,21 @@ CArchMultithreadPosix::~CArchMultithreadPosix()
} }
void void
CArchMultithreadPosix::unblockThread(CArchThread thread) CArchMultithreadPosix::setNetworkDataForCurrentThread(void* data)
{ {
pthread_kill(thread->m_thread, SIGWAKEUP); lockMutex(m_threadMutex);
CArchThreadImpl* thread = find(pthread_self());
thread->m_networkData = data;
unlockMutex(m_threadMutex);
}
void*
CArchMultithreadPosix::getNetworkDataForThread(CArchThread thread)
{
lockMutex(m_threadMutex);
void* data = thread->m_networkData;
unlockMutex(m_threadMutex);
return data;
} }
CArchMultithreadPosix* CArchMultithreadPosix*
@ -579,7 +593,7 @@ CArchMultithreadPosix::raiseSignal(ESignal signal)
lockMutex(m_threadMutex); lockMutex(m_threadMutex);
if (m_signalFunc[signal] != NULL) { if (m_signalFunc[signal] != NULL) {
m_signalFunc[signal](signal, m_signalUserData[signal]); m_signalFunc[signal](signal, m_signalUserData[signal]);
unblockThread(m_mainThread); pthread_kill(m_mainThread->m_thread, SIGWAKEUP);
} }
else if (signal == kINTERRUPT || signal == kTERMINATE) { else if (signal == kINTERRUPT || signal == kTERMINATE) {
ARCH->cancelThread(m_mainThread); ARCH->cancelThread(m_mainThread);

View File

@ -40,12 +40,14 @@ public:
//! @name manipulators //! @name manipulators
//@{ //@{
void unblockThread(CArchThread thread); void setNetworkDataForCurrentThread(void*);
//@} //@}
//! @name accessors //! @name accessors
//@{ //@{
void* getNetworkDataForThread(CArchThread);
static CArchMultithreadPosix* getInstance(); static CArchMultithreadPosix* getInstance();
//@} //@}

View File

@ -257,7 +257,7 @@ CArchNetworkBSD::pollSocket(CPollEntry pe[], int num, double timeout)
// allocate space for translated query // allocate space for translated query
struct pollfd* pfd = reinterpret_cast<struct pollfd*>( struct pollfd* pfd = reinterpret_cast<struct pollfd*>(
alloca(num * sizeof(struct pollfd))); alloca((1 + num) * sizeof(struct pollfd)));
// translate query // translate query
for (int i = 0; i < num; ++i) { for (int i = 0; i < num; ++i) {
@ -270,21 +270,43 @@ CArchNetworkBSD::pollSocket(CPollEntry pe[], int num, double timeout)
pfd[i].events |= POLLOUT; pfd[i].events |= POLLOUT;
} }
} }
int n = num;
// add the unblock pipe
const int* unblockPipe = getUnblockPipe();
if (unblockPipe != NULL) {
pfd[n].fd = unblockPipe[0];
pfd[n].events = POLLIN;
++n;
}
// prepare timeout
int t = (timeout < 0.0) ? -1 : static_cast<int>(1000.0 * timeout);
// do the poll // do the poll
int t = (timeout < 0.0) ? -1 : static_cast<int>(1000.0 * timeout); n = poll(pfd, n, t);
int n;
do { // reset the unblock pipe
n = poll(pfd, num, t); if (unblockPipe != NULL && (pfd[num].revents & POLLIN) != 0) {
if (n == -1) { // the unblock event was signalled. flush the pipe.
if (errno == EINTR) { char dummy[100];
// interrupted system call do {
ARCH->testCancelThread(); read(unblockPipe[0], dummy, sizeof(dummy));
return 0; } while (errno != EAGAIN);
}
throwError(errno); // don't count this unblock pipe in return value
--n;
}
// handle results
if (n == -1) {
if (errno == EINTR) {
// interrupted system call
ARCH->testCancelThread();
return 0;
} }
} while (false); throwError(errno);
}
// translate back // translate back
for (int i = 0; i < num; ++i) { for (int i = 0; i < num; ++i) {
@ -313,102 +335,119 @@ CArchNetworkBSD::pollSocket(CPollEntry pe[], int num, double timeout)
{ {
int i, n; int i, n;
do { // prepare sets for select
// prepare sets for select n = 0;
n = 0; fd_set readSet, writeSet, errSet;
fd_set readSet, writeSet, errSet; fd_set* readSetP = NULL;
fd_set* readSetP = NULL; fd_set* writeSetP = NULL;
fd_set* writeSetP = NULL; fd_set* errSetP = NULL;
fd_set* errSetP = NULL; FD_ZERO(&readSet);
FD_ZERO(&readSet); FD_ZERO(&writeSet);
FD_ZERO(&writeSet); FD_ZERO(&errSet);
FD_ZERO(&errSet); for (i = 0; i < num; ++i) {
for (i = 0; i < num; ++i) { // reset return flags
// reset return flags pe[i].m_revents = 0;
pe[i].m_revents = 0;
// set invalid flag if socket is bogus then go to next socket // set invalid flag if socket is bogus then go to next socket
if (pe[i].m_socket == NULL) { if (pe[i].m_socket == NULL) {
pe[i].m_revents |= kPOLLNVAL; pe[i].m_revents |= kPOLLNVAL;
continue; continue;
}
int fdi = pe[i].m_socket->m_fd;
if (pe[i].m_events & kPOLLIN) {
FD_SET(pe[i].m_socket->m_fd, &readSet);
readSetP = &readSet;
if (fdi > n) {
n = fdi;
}
}
if (pe[i].m_events & kPOLLOUT) {
FD_SET(pe[i].m_socket->m_fd, &writeSet);
writeSetP = &writeSet;
if (fdi > n) {
n = fdi;
}
}
if (true) {
FD_SET(pe[i].m_socket->m_fd, &errSet);
errSetP = &errSet;
if (fdi > n) {
n = fdi;
}
}
} }
// if there are no sockets then don't block forever int fdi = pe[i].m_socket->m_fd;
if (n == 0 && timeout < 0.0) { if (pe[i].m_events & kPOLLIN) {
timeout = 0.0; FD_SET(pe[i].m_socket->m_fd, &readSet);
} readSetP = &readSet;
if (fdi > n) {
// prepare timeout for select n = fdi;
struct timeval timeout2;
struct timeval* timeout2P;
if (timeout < 0.0) {
timeout2P = NULL;
}
else {
timeout2P = &timeout2;
timeout2.tv_sec = static_cast<int>(timeout);
timeout2.tv_usec = static_cast<int>(1.0e+6 *
(timeout - timeout2.tv_sec));
}
// do the select
n = select((SELECT_TYPE_ARG1) n + 1,
SELECT_TYPE_ARG234 readSetP,
SELECT_TYPE_ARG234 writeSetP,
SELECT_TYPE_ARG234 errSetP,
SELECT_TYPE_ARG5 timeout2P);
// handle results
if (n == -1) {
if (errno == EINTR) {
// interrupted system call
ARCH->testCancelThread();
return 0;
}
throwError(errno);
}
n = 0;
for (i = 0; i < num; ++i) {
if (pe[i].m_socket != NULL) {
if (FD_ISSET(pe[i].m_socket->m_fd, &readSet)) {
pe[i].m_revents |= kPOLLIN;
}
if (FD_ISSET(pe[i].m_socket->m_fd, &writeSet)) {
pe[i].m_revents |= kPOLLOUT;
}
if (FD_ISSET(pe[i].m_socket->m_fd, &errSet)) {
pe[i].m_revents |= kPOLLERR;
}
}
if (pe[i].m_revents != 0) {
++n;
} }
} }
} while (false); if (pe[i].m_events & kPOLLOUT) {
FD_SET(pe[i].m_socket->m_fd, &writeSet);
writeSetP = &writeSet;
if (fdi > n) {
n = fdi;
}
}
if (true) {
FD_SET(pe[i].m_socket->m_fd, &errSet);
errSetP = &errSet;
if (fdi > n) {
n = fdi;
}
}
}
// add the unblock pipe
const int* unblockPipe = getUnblockPipe();
if (unblockPipe != NULL) {
FD_SET(unblockPipe[0], &readSet);
readSetP = &readSet;
if (unblockPipe[0] > n) {
n = unblockPipe[0];
}
}
// if there are no sockets then don't block forever
if (n == 0 && timeout < 0.0) {
timeout = 0.0;
}
// prepare timeout for select
struct timeval timeout2;
struct timeval* timeout2P;
if (timeout < 0.0) {
timeout2P = NULL;
}
else {
timeout2P = &timeout2;
timeout2.tv_sec = static_cast<int>(timeout);
timeout2.tv_usec = static_cast<int>(1.0e+6 *
(timeout - timeout2.tv_sec));
}
// do the select
n = select((SELECT_TYPE_ARG1) n + 1,
SELECT_TYPE_ARG234 readSetP,
SELECT_TYPE_ARG234 writeSetP,
SELECT_TYPE_ARG234 errSetP,
SELECT_TYPE_ARG5 timeout2P);
// reset the unblock pipe
if (unblockPipe != NULL && FD_ISSET(unblockPipe[0], &readSet)) {
// the unblock event was signalled. flush the pipe.
char dummy[100];
do {
read(unblockPipe[0], dummy, sizeof(dummy));
} while (errno != EAGAIN);
}
// handle results
if (n == -1) {
if (errno == EINTR) {
// interrupted system call
ARCH->testCancelThread();
return 0;
}
throwError(errno);
}
n = 0;
for (i = 0; i < num; ++i) {
if (pe[i].m_socket != NULL) {
if (FD_ISSET(pe[i].m_socket->m_fd, &readSet)) {
pe[i].m_revents |= kPOLLIN;
}
if (FD_ISSET(pe[i].m_socket->m_fd, &writeSet)) {
pe[i].m_revents |= kPOLLOUT;
}
if (FD_ISSET(pe[i].m_socket->m_fd, &errSet)) {
pe[i].m_revents |= kPOLLERR;
}
}
if (pe[i].m_revents != 0) {
++n;
}
}
return n; return n;
} }
@ -418,7 +457,11 @@ CArchNetworkBSD::pollSocket(CPollEntry pe[], int num, double timeout)
void void
CArchNetworkBSD::unblockPollSocket(CArchThread thread) CArchNetworkBSD::unblockPollSocket(CArchThread thread)
{ {
CArchMultithreadPosix::getInstance()->unblockThread(thread); const int* unblockPipe = getUnblockPipeForThread(thread);
if (unblockPipe != NULL) {
char dummy = 0;
write(unblockPipe[1], &dummy, 1);
}
} }
size_t size_t
@ -739,6 +782,38 @@ CArchNetworkBSD::isEqualAddr(CArchNetAddress a, CArchNetAddress b)
memcmp(&a->m_addr, &b->m_addr, a->m_len) == 0); memcmp(&a->m_addr, &b->m_addr, a->m_len) == 0);
} }
const int*
CArchNetworkBSD::getUnblockPipe()
{
CArchMultithreadPosix* mt = CArchMultithreadPosix::getInstance();
return getUnblockPipeForThread(mt->newCurrentThread());
}
const int*
CArchNetworkBSD::getUnblockPipeForThread(CArchThread thread)
{
CArchMultithreadPosix* mt = CArchMultithreadPosix::getInstance();
int* unblockPipe = (int*)mt->getNetworkDataForThread(thread);
if (unblockPipe == NULL) {
unblockPipe = new int[2];
if (pipe(unblockPipe) != -1) {
try {
setBlockingOnSocket(unblockPipe[0], false);
mt->setNetworkDataForCurrentThread(unblockPipe);
}
catch (...) {
delete[] unblockPipe;
unblockPipe = NULL;
}
}
else {
delete[] unblockPipe;
unblockPipe = NULL;
}
}
return unblockPipe;
}
void void
CArchNetworkBSD::throwError(int err) CArchNetworkBSD::throwError(int err)
{ {

View File

@ -79,6 +79,8 @@ public:
virtual bool isEqualAddr(CArchNetAddress, CArchNetAddress); virtual bool isEqualAddr(CArchNetAddress, CArchNetAddress);
private: private:
const int* getUnblockPipe();
const int* getUnblockPipeForThread(CArchThread);
void setBlockingOnSocket(int fd, bool blocking); void setBlockingOnSocket(int fd, bool blocking);
void throwError(int); void throwError(int);
void throwNameError(int); void throwNameError(int);