From 1ccb92b888be10e6931eae96ebd04f359548fb15 Mon Sep 17 00:00:00 2001 From: crs Date: Sun, 29 Feb 2004 16:48:22 +0000 Subject: [PATCH] Fixed BSD unblockPollSocket(). Was signaling to break out of poll() but there was a race condition where the thread trying to unblock poll() could send the signal before the polling thread had entered poll(). Now using a pipe and polling on that and the client's sockets, and just writing a byte into the pipe to unblock poll. This persists until the next call to poll() so we might force poll() to return once unnecessarily but that's not a problem. This change makes the BSD code similar to the winsock code, which uses a winsock event instead of a pipe. --- lib/arch/CArchMultithreadPosix.cpp | 22 ++- lib/arch/CArchMultithreadPosix.h | 4 +- lib/arch/CArchNetworkBSD.cpp | 285 ++++++++++++++++++----------- lib/arch/CArchNetworkBSD.h | 2 + 4 files changed, 203 insertions(+), 110 deletions(-) diff --git a/lib/arch/CArchMultithreadPosix.cpp b/lib/arch/CArchMultithreadPosix.cpp index b87a9b58..ea292412 100644 --- a/lib/arch/CArchMultithreadPosix.cpp +++ b/lib/arch/CArchMultithreadPosix.cpp @@ -69,6 +69,7 @@ public: bool m_cancelling; bool m_exited; void* m_result; + void* m_networkData; }; CArchThreadImpl::CArchThreadImpl() : @@ -79,7 +80,8 @@ CArchThreadImpl::CArchThreadImpl() : m_cancel(false), m_cancelling(false), m_exited(false), - m_result(NULL) + m_result(NULL), + m_networkData(NULL) { // do nothing } @@ -149,9 +151,21 @@ CArchMultithreadPosix::~CArchMultithreadPosix() } void -CArchMultithreadPosix::unblockThread(CArchThread thread) +CArchMultithreadPosix::setNetworkDataForCurrentThread(void* data) { - pthread_kill(thread->m_thread, SIGWAKEUP); + lockMutex(m_threadMutex); + CArchThreadImpl* thread = find(pthread_self()); + thread->m_networkData = data; + unlockMutex(m_threadMutex); +} + +void* +CArchMultithreadPosix::getNetworkDataForThread(CArchThread thread) +{ + lockMutex(m_threadMutex); + void* data = thread->m_networkData; + unlockMutex(m_threadMutex); + return data; } CArchMultithreadPosix* @@ -579,7 +593,7 @@ CArchMultithreadPosix::raiseSignal(ESignal signal) lockMutex(m_threadMutex); if (m_signalFunc[signal] != NULL) { m_signalFunc[signal](signal, m_signalUserData[signal]); - unblockThread(m_mainThread); + pthread_kill(m_mainThread->m_thread, SIGWAKEUP); } else if (signal == kINTERRUPT || signal == kTERMINATE) { ARCH->cancelThread(m_mainThread); diff --git a/lib/arch/CArchMultithreadPosix.h b/lib/arch/CArchMultithreadPosix.h index 2ac6183b..4e587cfc 100644 --- a/lib/arch/CArchMultithreadPosix.h +++ b/lib/arch/CArchMultithreadPosix.h @@ -40,12 +40,14 @@ public: //! @name manipulators //@{ - void unblockThread(CArchThread thread); + void setNetworkDataForCurrentThread(void*); //@} //! @name accessors //@{ + void* getNetworkDataForThread(CArchThread); + static CArchMultithreadPosix* getInstance(); //@} diff --git a/lib/arch/CArchNetworkBSD.cpp b/lib/arch/CArchNetworkBSD.cpp index e7f64a43..44b1a39d 100644 --- a/lib/arch/CArchNetworkBSD.cpp +++ b/lib/arch/CArchNetworkBSD.cpp @@ -257,7 +257,7 @@ CArchNetworkBSD::pollSocket(CPollEntry pe[], int num, double timeout) // allocate space for translated query struct pollfd* pfd = reinterpret_cast( - alloca(num * sizeof(struct pollfd))); + alloca((1 + num) * sizeof(struct pollfd))); // translate query for (int i = 0; i < num; ++i) { @@ -270,21 +270,43 @@ CArchNetworkBSD::pollSocket(CPollEntry pe[], int num, double timeout) pfd[i].events |= POLLOUT; } } + int n = num; + + // add the unblock pipe + const int* unblockPipe = getUnblockPipe(); + if (unblockPipe != NULL) { + pfd[n].fd = unblockPipe[0]; + pfd[n].events = POLLIN; + ++n; + } + + // prepare timeout + int t = (timeout < 0.0) ? -1 : static_cast(1000.0 * timeout); // do the poll - int t = (timeout < 0.0) ? -1 : static_cast(1000.0 * timeout); - int n; - do { - n = poll(pfd, num, t); - if (n == -1) { - if (errno == EINTR) { - // interrupted system call - ARCH->testCancelThread(); - return 0; - } - throwError(errno); + n = poll(pfd, n, t); + + // reset the unblock pipe + if (unblockPipe != NULL && (pfd[num].revents & POLLIN) != 0) { + // the unblock event was signalled. flush the pipe. + char dummy[100]; + do { + read(unblockPipe[0], dummy, sizeof(dummy)); + } while (errno != EAGAIN); + + // don't count this unblock pipe in return value + --n; + } + + // handle results + if (n == -1) { + if (errno == EINTR) { + // interrupted system call + ARCH->testCancelThread(); + return 0; } - } while (false); + throwError(errno); + } // translate back for (int i = 0; i < num; ++i) { @@ -313,102 +335,119 @@ CArchNetworkBSD::pollSocket(CPollEntry pe[], int num, double timeout) { int i, n; - do { - // prepare sets for select - n = 0; - fd_set readSet, writeSet, errSet; - fd_set* readSetP = NULL; - fd_set* writeSetP = NULL; - fd_set* errSetP = NULL; - FD_ZERO(&readSet); - FD_ZERO(&writeSet); - FD_ZERO(&errSet); - for (i = 0; i < num; ++i) { - // reset return flags - pe[i].m_revents = 0; + // prepare sets for select + n = 0; + fd_set readSet, writeSet, errSet; + fd_set* readSetP = NULL; + fd_set* writeSetP = NULL; + fd_set* errSetP = NULL; + FD_ZERO(&readSet); + FD_ZERO(&writeSet); + FD_ZERO(&errSet); + for (i = 0; i < num; ++i) { + // reset return flags + pe[i].m_revents = 0; - // set invalid flag if socket is bogus then go to next socket - if (pe[i].m_socket == NULL) { - pe[i].m_revents |= kPOLLNVAL; - continue; - } - - int fdi = pe[i].m_socket->m_fd; - if (pe[i].m_events & kPOLLIN) { - FD_SET(pe[i].m_socket->m_fd, &readSet); - readSetP = &readSet; - if (fdi > n) { - n = fdi; - } - } - if (pe[i].m_events & kPOLLOUT) { - FD_SET(pe[i].m_socket->m_fd, &writeSet); - writeSetP = &writeSet; - if (fdi > n) { - n = fdi; - } - } - if (true) { - FD_SET(pe[i].m_socket->m_fd, &errSet); - errSetP = &errSet; - if (fdi > n) { - n = fdi; - } - } + // set invalid flag if socket is bogus then go to next socket + if (pe[i].m_socket == NULL) { + pe[i].m_revents |= kPOLLNVAL; + continue; } - // if there are no sockets then don't block forever - if (n == 0 && timeout < 0.0) { - timeout = 0.0; - } - - // prepare timeout for select - struct timeval timeout2; - struct timeval* timeout2P; - if (timeout < 0.0) { - timeout2P = NULL; - } - else { - timeout2P = &timeout2; - timeout2.tv_sec = static_cast(timeout); - timeout2.tv_usec = static_cast(1.0e+6 * - (timeout - timeout2.tv_sec)); - } - - // do the select - n = select((SELECT_TYPE_ARG1) n + 1, - SELECT_TYPE_ARG234 readSetP, - SELECT_TYPE_ARG234 writeSetP, - SELECT_TYPE_ARG234 errSetP, - SELECT_TYPE_ARG5 timeout2P); - - // handle results - if (n == -1) { - if (errno == EINTR) { - // interrupted system call - ARCH->testCancelThread(); - return 0; - } - throwError(errno); - } - n = 0; - for (i = 0; i < num; ++i) { - if (pe[i].m_socket != NULL) { - if (FD_ISSET(pe[i].m_socket->m_fd, &readSet)) { - pe[i].m_revents |= kPOLLIN; - } - if (FD_ISSET(pe[i].m_socket->m_fd, &writeSet)) { - pe[i].m_revents |= kPOLLOUT; - } - if (FD_ISSET(pe[i].m_socket->m_fd, &errSet)) { - pe[i].m_revents |= kPOLLERR; - } - } - if (pe[i].m_revents != 0) { - ++n; + int fdi = pe[i].m_socket->m_fd; + if (pe[i].m_events & kPOLLIN) { + FD_SET(pe[i].m_socket->m_fd, &readSet); + readSetP = &readSet; + if (fdi > n) { + n = fdi; } } - } while (false); + if (pe[i].m_events & kPOLLOUT) { + FD_SET(pe[i].m_socket->m_fd, &writeSet); + writeSetP = &writeSet; + if (fdi > n) { + n = fdi; + } + } + if (true) { + FD_SET(pe[i].m_socket->m_fd, &errSet); + errSetP = &errSet; + if (fdi > n) { + n = fdi; + } + } + } + + // add the unblock pipe + const int* unblockPipe = getUnblockPipe(); + if (unblockPipe != NULL) { + FD_SET(unblockPipe[0], &readSet); + readSetP = &readSet; + if (unblockPipe[0] > n) { + n = unblockPipe[0]; + } + } + + // if there are no sockets then don't block forever + if (n == 0 && timeout < 0.0) { + timeout = 0.0; + } + + // prepare timeout for select + struct timeval timeout2; + struct timeval* timeout2P; + if (timeout < 0.0) { + timeout2P = NULL; + } + else { + timeout2P = &timeout2; + timeout2.tv_sec = static_cast(timeout); + timeout2.tv_usec = static_cast(1.0e+6 * + (timeout - timeout2.tv_sec)); + } + + // do the select + n = select((SELECT_TYPE_ARG1) n + 1, + SELECT_TYPE_ARG234 readSetP, + SELECT_TYPE_ARG234 writeSetP, + SELECT_TYPE_ARG234 errSetP, + SELECT_TYPE_ARG5 timeout2P); + + // reset the unblock pipe + if (unblockPipe != NULL && FD_ISSET(unblockPipe[0], &readSet)) { + // the unblock event was signalled. flush the pipe. + char dummy[100]; + do { + read(unblockPipe[0], dummy, sizeof(dummy)); + } while (errno != EAGAIN); + } + + // handle results + if (n == -1) { + if (errno == EINTR) { + // interrupted system call + ARCH->testCancelThread(); + return 0; + } + throwError(errno); + } + n = 0; + for (i = 0; i < num; ++i) { + if (pe[i].m_socket != NULL) { + if (FD_ISSET(pe[i].m_socket->m_fd, &readSet)) { + pe[i].m_revents |= kPOLLIN; + } + if (FD_ISSET(pe[i].m_socket->m_fd, &writeSet)) { + pe[i].m_revents |= kPOLLOUT; + } + if (FD_ISSET(pe[i].m_socket->m_fd, &errSet)) { + pe[i].m_revents |= kPOLLERR; + } + } + if (pe[i].m_revents != 0) { + ++n; + } + } return n; } @@ -418,7 +457,11 @@ CArchNetworkBSD::pollSocket(CPollEntry pe[], int num, double timeout) void CArchNetworkBSD::unblockPollSocket(CArchThread thread) { - CArchMultithreadPosix::getInstance()->unblockThread(thread); + const int* unblockPipe = getUnblockPipeForThread(thread); + if (unblockPipe != NULL) { + char dummy = 0; + write(unblockPipe[1], &dummy, 1); + } } size_t @@ -739,6 +782,38 @@ CArchNetworkBSD::isEqualAddr(CArchNetAddress a, CArchNetAddress b) memcmp(&a->m_addr, &b->m_addr, a->m_len) == 0); } +const int* +CArchNetworkBSD::getUnblockPipe() +{ + CArchMultithreadPosix* mt = CArchMultithreadPosix::getInstance(); + return getUnblockPipeForThread(mt->newCurrentThread()); +} + +const int* +CArchNetworkBSD::getUnblockPipeForThread(CArchThread thread) +{ + CArchMultithreadPosix* mt = CArchMultithreadPosix::getInstance(); + int* unblockPipe = (int*)mt->getNetworkDataForThread(thread); + if (unblockPipe == NULL) { + unblockPipe = new int[2]; + if (pipe(unblockPipe) != -1) { + try { + setBlockingOnSocket(unblockPipe[0], false); + mt->setNetworkDataForCurrentThread(unblockPipe); + } + catch (...) { + delete[] unblockPipe; + unblockPipe = NULL; + } + } + else { + delete[] unblockPipe; + unblockPipe = NULL; + } + } + return unblockPipe; +} + void CArchNetworkBSD::throwError(int err) { diff --git a/lib/arch/CArchNetworkBSD.h b/lib/arch/CArchNetworkBSD.h index 06f7f507..2aaf75d7 100644 --- a/lib/arch/CArchNetworkBSD.h +++ b/lib/arch/CArchNetworkBSD.h @@ -79,6 +79,8 @@ public: virtual bool isEqualAddr(CArchNetAddress, CArchNetAddress); private: + const int* getUnblockPipe(); + const int* getUnblockPipeForThread(CArchThread); void setBlockingOnSocket(int fd, bool blocking); void throwError(int); void throwNameError(int);