#include "CThreadRep.h" #include "CThread.h" #include "CMutex.h" #include "CLock.h" #include "XThread.h" #include "CLog.h" #include "IJob.h" #include #if defined(CONFIG_PTHREADS) #include #define SIGWAKEUP SIGUSR1 #endif #if defined(CONFIG_PLATFORM_WIN32) # if !defined(_MT) # error multithreading compile option is required # endif #include #endif // FIXME -- temporary exception type class XThreadUnavailable { }; #if defined(CONFIG_PLATFORM_UNIX) && !defined(NDEBUG) #include #include #include #include static void threadDebug(int) { if (fork() == 0) abort(); else { wait(0); exit(1); } } #endif // // CThreadRep // CMutex* CThreadRep::s_mutex = NULL; CThreadRep* CThreadRep::s_head = NULL; #if defined(CONFIG_PTHREADS) pthread_t CThreadRep::s_signalThread; #endif CThreadRep::CThreadRep() : m_prev(NULL), m_next(NULL), m_refCount(1), m_job(NULL), m_userData(NULL) { // note -- s_mutex must be locked on entry assert(s_mutex != NULL); // initialize stuff init(); #if defined(CONFIG_PTHREADS) // get main thread id m_thread = pthread_self(); #elif defined(CONFIG_PLATFORM_WIN32) // get main thread id m_thread = NULL; m_id = GetCurrentThreadId(); #endif // insert ourself into linked list if (s_head != NULL) { s_head->m_prev = this; m_next = s_head; } s_head = this; } CThreadRep::CThreadRep(IJob* job, void* userData) : m_prev(NULL), m_next(NULL), m_refCount(2), // 1 for us, 1 for thread m_job(job), m_userData(userData) { assert(m_job != NULL); assert(s_mutex != NULL); // create a thread rep for the main thread if the current thread // is unknown. note that this might cause multiple "main" threads // if threads are created external to this library. getCurrentThreadRep()->unref(); // initialize init(); // hold mutex while we create the thread CLock lock(s_mutex); // start the thread. throw if it doesn't start. #if defined(CONFIG_PTHREADS) // mask some signals in all threads except the main thread sigset_t sigset, oldsigset; sigemptyset(&sigset); sigaddset(&sigset, SIGINT); sigaddset(&sigset, SIGTERM); pthread_sigmask(SIG_BLOCK, &sigset, &oldsigset); int status = pthread_create(&m_thread, NULL, threadFunc, (void*)this); pthread_sigmask(SIG_SETMASK, &oldsigset, NULL); if (status != 0) throw XThreadUnavailable(); #elif defined(CONFIG_PLATFORM_WIN32) unsigned int id; m_thread = reinterpret_cast(_beginthreadex(NULL, 0, threadFunc, (void*)this, 0, &id)); m_id = static_cast(id); if (m_thread == 0) throw XThreadUnavailable(); #endif // insert ourself into linked list if (s_head != NULL) { s_head->m_prev = this; m_next = s_head; } s_head = this; // returning releases the locks, allowing the child thread to run } CThreadRep::~CThreadRep() { // note -- s_mutex must be locked on entry // remove ourself from linked list if (m_prev != NULL) { m_prev->m_next = m_next; } if (m_next != NULL) { m_next->m_prev = m_prev; } if (s_head == this) { s_head = m_next; } // clean up fini(); } void CThreadRep::initThreads() { if (s_mutex == NULL) { s_mutex = new CMutex; #if defined(CONFIG_PTHREADS) // install SIGWAKEUP handler struct sigaction act; sigemptyset(&act.sa_mask); # if defined(SA_INTERRUPT) act.sa_flags = SA_INTERRUPT; # else act.sa_flags = 0; # endif act.sa_handler = &threadCancel; sigaction(SIGWAKEUP, &act, NULL); # ifndef NDEBUG act.sa_handler = &threadDebug; sigaction(SIGSEGV, &act, NULL); # endif // set signal mask sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGWAKEUP); # ifndef NDEBUG sigaddset(&sigset, SIGSEGV); # endif pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); sigemptyset(&sigset); sigaddset(&sigset, SIGPIPE); sigaddset(&sigset, SIGINT); sigaddset(&sigset, SIGTERM); pthread_sigmask(SIG_BLOCK, &sigset, NULL); // fire up the INT and TERM signal handler thread int status = pthread_create(&s_signalThread, NULL, &CThreadRep::threadSignalHandler, getCurrentThreadRep()); if (status != 0) { // can't create thread to wait for signal so don't block // the signals. sigemptyset(&sigset); sigaddset(&sigset, SIGINT); sigaddset(&sigset, SIGTERM); pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); } #endif } } void CThreadRep::ref() { CLock lock(s_mutex); ++m_refCount; } void CThreadRep::unref() { CLock lock(s_mutex); if (--m_refCount == 0) { delete this; } } bool CThreadRep::enableCancel(bool enable) { CLock lock(s_mutex); const bool old = m_cancellable; m_cancellable = enable; return old; } bool CThreadRep::isCancellable() const { CLock lock(s_mutex); return (m_cancellable && !m_cancelling); } void* CThreadRep::getResult() const { // no lock necessary since thread isn't running return m_result; } void* CThreadRep::getUserData() const { // no lock necessary because the value never changes return m_userData; } CThreadRep* CThreadRep::getCurrentThreadRep() { assert(s_mutex != NULL); #if defined(CONFIG_PTHREADS) const pthread_t thread = pthread_self(); #elif defined(CONFIG_PLATFORM_WIN32) const DWORD id = GetCurrentThreadId(); #endif // lock list while we search CLock lock(s_mutex); // search CThreadRep* scan = s_head; while (scan != NULL) { #if defined(CONFIG_PTHREADS) if (scan->m_thread == thread) { break; } #elif defined(CONFIG_PLATFORM_WIN32) if (scan->m_id == id) { break; } #endif scan = scan->m_next; } // create and use main thread rep if thread not found if (scan == NULL) { scan = new CThreadRep(); } // ref for caller ++scan->m_refCount; return scan; } void CThreadRep::doThreadFunc() { // default priority is slightly below normal setPriority(1); // wait for parent to initialize this object { CLock lock(s_mutex); } void* result = NULL; try { // go m_job->run(); } catch (XThreadCancel&) { // client called cancel() log((CLOG_DEBUG1 "caught cancel on thread %p", this)); } catch (XThreadExit& e) { // client called exit() result = e.m_result; log((CLOG_DEBUG1 "caught exit on thread %p", this)); } catch (...) { log((CLOG_DEBUG1 "exception on thread %p", this)); // note -- don't catch (...) to avoid masking bugs delete m_job; throw; } // done with job delete m_job; // store exit result (no lock necessary because the result will // not be accessed until m_exit is set) m_result = result; } #if defined(CONFIG_PTHREADS) #include "CStopwatch.h" #include void CThreadRep::init() { m_result = NULL; m_cancellable = true; m_cancelling = false; m_cancel = false; m_exit = false; } void CThreadRep::fini() { // main thread has NULL job if (m_job != NULL) { pthread_detach(m_thread); } } void CThreadRep::sleep(double timeout) { if (timeout < 0.0) return; struct timespec t; t.tv_sec = (long)timeout; t.tv_nsec = (long)(1000000000.0 * (timeout - (double)t.tv_sec)); while (nanosleep(&t, &t) < 0) testCancel(); } void CThreadRep::cancel() { CLock lock(s_mutex); if (m_cancellable && !m_cancelling) { m_cancel = true; } else { return; } // break out of system calls log((CLOG_DEBUG1 "cancel thread %p", this)); pthread_kill(m_thread, SIGWAKEUP); } void CThreadRep::testCancel() { { CLock lock(s_mutex); // done if not cancelled, not cancellable, or already cancelling if (!m_cancel || !m_cancellable || m_cancelling) return; // update state for cancel m_cancel = false; m_cancelling = true; } // start cancel log((CLOG_DEBUG1 "throw cancel on thread %p", this)); throw XThreadCancel(); } bool CThreadRep::wait(CThreadRep* target, double timeout) { if (target == this) return false; testCancel(); if (target->isExited()) return true; if (timeout != 0.0) { CStopwatch timer; do { sleep(0.05); testCancel(); if (target->isExited()) return true; } while (timeout < 0.0 || timer.getTime() <= timeout); } return false; } void CThreadRep::setPriority(int) { // FIXME } bool CThreadRep::isExited() const { CLock lock(s_mutex); return m_exit; } void* CThreadRep::threadFunc(void* arg) { CThreadRep* rep = (CThreadRep*)arg; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); // run thread rep->doThreadFunc(); { // mark as terminated CLock lock(s_mutex); rep->m_exit = true; } // unref thread rep->unref(); // terminate the thread return NULL; } void CThreadRep::threadCancel(int) { // do nothing } void* CThreadRep::threadSignalHandler(void* vrep) { CThreadRep* mainThreadRep = reinterpret_cast(vrep); // add signal to mask sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGINT); sigaddset(&sigset, SIGTERM); // we exit the loop via thread cancellation in sigwait() for (;;) { // wait int signal; sigwait(&sigset, &signal); // if we get here then the signal was raised. cancel the thread. mainThreadRep->cancel(); } } #elif defined(CONFIG_PLATFORM_WIN32) void CThreadRep::init() { m_result = NULL; m_cancellable = true; m_cancelling = false; m_exit = CreateEvent(NULL, TRUE, FALSE, NULL); m_cancel = CreateEvent(NULL, TRUE, FALSE, NULL); } void CThreadRep::fini() { // destroy the events CloseHandle(m_cancel); CloseHandle(m_exit); // close the handle (main thread has a NULL handle) if (m_thread != NULL) { CloseHandle(m_thread); } } void CThreadRep::sleep(double timeout) { if (isCancellable()) WaitForSingleObject(m_cancel, (DWORD)(1000.0 * timeout)); else Sleep((DWORD)(1000.0 * timeout)); } void CThreadRep::cancel() { log((CLOG_DEBUG1 "cancel thread %p", this)); SetEvent(m_cancel); } void CThreadRep::testCancel() { // poll cancel event. return if not set. const DWORD result = WaitForSingleObject(getCancelEvent(), 0); if (result != WAIT_OBJECT_0) return; { // ignore if disabled or already cancelling CLock lock(s_mutex); if (!m_cancellable || m_cancelling) return; // update state for cancel m_cancelling = true; ResetEvent(m_cancel); } // start cancel log((CLOG_DEBUG1 "throw cancel on thread %p", this)); throw XThreadCancel(); } bool CThreadRep::wait(CThreadRep* target, double timeout) { // get the current thread. if it's the same as the target thread // then the thread is waiting on itself. CThreadPtr currentRep(CThreadRep::getCurrentThreadRep()); if (target == this) return false; // is cancellation enabled? const DWORD n = (isCancellable() ? 2 : 1); // convert timeout DWORD t; if (timeout < 0.0) t = INFINITE; else t = (DWORD)(1000.0 * timeout); // wait for this thread to be cancelled or for the target thread to // terminate. HANDLE handles[2]; handles[0] = target->getExitEvent(); handles[1] = m_cancel; DWORD result = WaitForMultipleObjects(n, handles, FALSE, t); // cancel takes priority if (n == 2 && result != WAIT_OBJECT_0 + 1 && WaitForSingleObject(handles[1], 0) == WAIT_OBJECT_0) result = WAIT_OBJECT_0 + 1; // handle result switch (result) { case WAIT_OBJECT_0 + 0: // target thread terminated return true; case WAIT_OBJECT_0 + 1: // this thread was cancelled. does not return. testCancel(); default: // error return false; } } void CThreadRep::setPriority(int n) { DWORD pClass = NORMAL_PRIORITY_CLASS; if (n < 0) { switch (-n) { case 1: n = THREAD_PRIORITY_ABOVE_NORMAL; break; case 2: n = THREAD_PRIORITY_HIGHEST; break; default: pClass = HIGH_PRIORITY_CLASS; switch (-n - 3) { case 0: n = THREAD_PRIORITY_LOWEST; break; case 1: n = THREAD_PRIORITY_BELOW_NORMAL; break; case 2: n = THREAD_PRIORITY_NORMAL; break; case 3: n = THREAD_PRIORITY_ABOVE_NORMAL; break; default: n = THREAD_PRIORITY_HIGHEST; break; } break; } } else { switch (n) { case 0: n = THREAD_PRIORITY_NORMAL; break; case 1: n = THREAD_PRIORITY_BELOW_NORMAL; break; case 2: n = THREAD_PRIORITY_LOWEST; break; default: n = THREAD_PRIORITY_IDLE; break; } } SetPriorityClass(m_thread, pClass); SetThreadPriority(m_thread, n); } HANDLE CThreadRep::getExitEvent() const { // no lock necessary because the value never changes return m_exit; } HANDLE CThreadRep::getCancelEvent() const { // no lock necessary because the value never changes return m_cancel; } unsigned int __stdcall CThreadRep::threadFunc(void* arg) { CThreadRep* rep = (CThreadRep*)arg; // run thread rep->doThreadFunc(); // signal termination SetEvent(rep->m_exit); // unref thread rep->unref(); // terminate the thread return 0; } #endif