/* * synergy -- mouse and keyboard sharing utility * Copyright (C) 2002 Chris Schoeneman * * This package is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * found in the file COPYING that should have accompanied this file. * * This package is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. */ #include "CThreadRep.h" #include "CLock.h" #include "CMutex.h" #include "CThread.h" #include "XMT.h" #include "XThread.h" #include "CLog.h" #include "IJob.h" #if HAVE_PTHREAD # include # define SIGWAKEUP SIGUSR1 #elif WINDOWS_LIKE # if !defined(_MT) # error multithreading compile option is required # endif # include #else #error unsupported platform for multithreading #endif // // CThreadRep // CMutex* CThreadRep::s_mutex = NULL; CThreadRep* CThreadRep::s_head = NULL; #if HAVE_PTHREAD pthread_t CThreadRep::s_signalThread; #endif CThreadRep::CThreadRep() : m_prev(NULL), m_next(NULL), m_refCount(1), m_job(NULL), m_userData(NULL) { // note -- s_mutex must be locked on entry assert(s_mutex != NULL); // initialize stuff init(); #if HAVE_PTHREAD // get main thread id m_thread = pthread_self(); #elif WINDOWS_LIKE // get main thread id m_thread = NULL; m_id = GetCurrentThreadId(); #endif // insert ourself into linked list if (s_head != NULL) { s_head->m_prev = this; m_next = s_head; } s_head = this; } CThreadRep::CThreadRep(IJob* job, void* userData) : m_prev(NULL), m_next(NULL), m_refCount(2), // 1 for us, 1 for thread m_job(job), m_userData(userData) { assert(m_job != NULL); assert(s_mutex != NULL); // create a thread rep for the main thread if the current thread // is unknown. note that this might cause multiple "main" threads // if threads are created external to this library. getCurrentThreadRep()->unref(); // initialize init(); // hold mutex while we create the thread CLock lock(s_mutex); // start the thread. throw if it doesn't start. #if HAVE_PTHREAD // mask some signals in all threads except the main thread sigset_t sigset, oldsigset; sigemptyset(&sigset); sigaddset(&sigset, SIGINT); sigaddset(&sigset, SIGTERM); pthread_sigmask(SIG_BLOCK, &sigset, &oldsigset); // pthread_create() RedHat 7.2 smp fails with a NULL attr. pthread_attr_t attr; int status = pthread_attr_init(&attr); if (status == 0) { status = pthread_create(&m_thread, &attr, threadFunc, (void*)this); pthread_attr_destroy(&attr); } pthread_sigmask(SIG_SETMASK, &oldsigset, NULL); if (status != 0) { throw XMTThreadUnavailable(); } #elif WINDOWS_LIKE unsigned int id; m_thread = reinterpret_cast(_beginthreadex(NULL, 0, threadFunc, (void*)this, 0, &id)); m_id = static_cast(id); if (m_thread == 0) { throw XMTThreadUnavailable(); } #endif // insert ourself into linked list if (s_head != NULL) { s_head->m_prev = this; m_next = s_head; } s_head = this; // returning releases the locks, allowing the child thread to run } CThreadRep::~CThreadRep() { // note -- s_mutex must be locked on entry // remove ourself from linked list if (m_prev != NULL) { m_prev->m_next = m_next; } if (m_next != NULL) { m_next->m_prev = m_prev; } if (s_head == this) { s_head = m_next; } // clean up fini(); } void CThreadRep::initThreads() { if (s_mutex == NULL) { s_mutex = new CMutex; #if HAVE_PTHREAD // install SIGWAKEUP handler struct sigaction act; sigemptyset(&act.sa_mask); # if defined(SA_INTERRUPT) act.sa_flags = SA_INTERRUPT; # else act.sa_flags = 0; # endif act.sa_handler = &threadCancel; sigaction(SIGWAKEUP, &act, NULL); // set signal mask sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGWAKEUP); pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); sigemptyset(&sigset); sigaddset(&sigset, SIGPIPE); sigaddset(&sigset, SIGINT); sigaddset(&sigset, SIGTERM); pthread_sigmask(SIG_BLOCK, &sigset, NULL); // fire up the INT and TERM signal handler thread. we could // instead arrange to catch and handle these signals but // we'd be unable to cancel the main thread since no pthread // calls are allowed in a signal handler. pthread_attr_t attr; int status = pthread_attr_init(&attr); if (status == 0) { status = pthread_create(&s_signalThread, &attr, &CThreadRep::threadSignalHandler, getCurrentThreadRep()); pthread_attr_destroy(&attr); } if (status != 0) { // can't create thread to wait for signal so don't block // the signals. sigemptyset(&sigset); sigaddset(&sigset, SIGINT); sigaddset(&sigset, SIGTERM); pthread_sigmask(SIG_UNBLOCK, &sigset, NULL); } #endif // HAVE_PTHREAD } } void CThreadRep::ref() { CLock lock(s_mutex); ++m_refCount; } void CThreadRep::unref() { CLock lock(s_mutex); if (--m_refCount == 0) { delete this; } } bool CThreadRep::enableCancel(bool enable) { CLock lock(s_mutex); const bool old = m_cancellable; m_cancellable = enable; return old; } bool CThreadRep::isCancellable() const { CLock lock(s_mutex); return (m_cancellable && !m_cancelling); } void* CThreadRep::getResult() const { // no lock necessary since thread isn't running return m_result; } void* CThreadRep::getUserData() const { // no lock necessary because the value never changes return m_userData; } CThreadRep* CThreadRep::getCurrentThreadRep() { assert(s_mutex != NULL); #if HAVE_PTHREAD const pthread_t thread = pthread_self(); #elif WINDOWS_LIKE const DWORD id = GetCurrentThreadId(); #endif // lock list while we search CLock lock(s_mutex); // search CThreadRep* scan = s_head; while (scan != NULL) { #if HAVE_PTHREAD if (scan->m_thread == thread) { break; } #elif WINDOWS_LIKE if (scan->m_id == id) { break; } #endif scan = scan->m_next; } // create and use main thread rep if thread not found if (scan == NULL) { scan = new CThreadRep(); } // ref for caller ++scan->m_refCount; return scan; } void CThreadRep::doThreadFunc() { // default priority is slightly below normal setPriority(1); // wait for parent to initialize this object { CLock lock(s_mutex); } void* result = NULL; try { // go LOG((CLOG_DEBUG1 "thread %p entry", this)); m_job->run(); LOG((CLOG_DEBUG1 "thread %p exit", this)); } catch (XThreadCancel&) { // client called cancel() LOG((CLOG_DEBUG1 "caught cancel on thread %p", this)); } catch (XThreadExit& e) { // client called exit() result = e.m_result; LOG((CLOG_DEBUG1 "caught exit on thread %p", this)); } catch (...) { LOG((CLOG_DEBUG1 "exception on thread %p", this)); // note -- don't catch (...) to avoid masking bugs delete m_job; throw; } // done with job delete m_job; // store exit result (no lock necessary because the result will // not be accessed until m_exit is set) m_result = result; } #if HAVE_PTHREAD #include "CStopwatch.h" #if TIME_WITH_SYS_TIME # include # include #else # if HAVE_SYS_TIME_H # include # else # include # endif #endif void CThreadRep::init() { m_result = NULL; m_cancellable = true; m_cancelling = false; m_cancel = false; m_exit = false; } void CThreadRep::fini() { // main thread has NULL job if (m_job != NULL) { pthread_detach(m_thread); } } void CThreadRep::sleep( double timeout) { if (timeout < 0.0) { return; } struct timespec t; t.tv_sec = (long)timeout; t.tv_nsec = (long)(1000000000.0 * (timeout - (double)t.tv_sec)); while (nanosleep(&t, &t) < 0) testCancel(); } void CThreadRep::cancel() { CLock lock(s_mutex); if (!m_exit && m_cancellable && !m_cancelling) { m_cancel = true; } else { return; } // break out of system calls LOG((CLOG_DEBUG1 "cancel thread %p", this)); pthread_kill(m_thread, SIGWAKEUP); } void CThreadRep::testCancel() { { CLock lock(s_mutex); // done if not cancelled, not cancellable, or already cancelling if (!m_cancel || !m_cancellable || m_cancelling) { return; } // update state for cancel m_cancel = false; m_cancelling = true; } // start cancel LOG((CLOG_DEBUG1 "throw cancel on thread %p", this)); throw XThreadCancel(); } bool CThreadRep::wait(CThreadRep* target, double timeout) { if (target == this) { return false; } testCancel(); if (target->isExited()) { return true; } if (timeout != 0.0) { CStopwatch timer; do { sleep(0.05); testCancel(); if (target->isExited()) { return true; } } while (timeout < 0.0 || timer.getTime() <= timeout); } return false; } void CThreadRep::setPriority(int) { // FIXME } bool CThreadRep::isExited() const { CLock lock(s_mutex); return m_exit; } void* CThreadRep::threadFunc(void* arg) { CThreadRep* rep = (CThreadRep*)arg; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); // run thread rep->doThreadFunc(); { // mark as terminated CLock lock(s_mutex); rep->m_exit = true; } // unref thread rep->unref(); // terminate the thread return NULL; } void CThreadRep::threadCancel(int) { // do nothing } void* CThreadRep::threadSignalHandler(void* vrep) { CThreadRep* mainThreadRep = reinterpret_cast(vrep); // detach pthread_detach(pthread_self()); // add signal to mask sigset_t sigset; sigemptyset(&sigset); sigaddset(&sigset, SIGINT); sigaddset(&sigset, SIGTERM); // also wait on SIGABRT. on linux (others?) this thread (process) // will persist after all the other threads evaporate due to an // assert unless we wait on SIGABRT. that means our resources (like // the socket we're listening on) are not released and never will be // until the lingering thread is killed. i don't know why sigwait() // should protect the thread from being killed. note that sigwait() // doesn't actually return if we receive SIGABRT and, for some // reason, we don't have to block SIGABRT. sigaddset(&sigset, SIGABRT); // we exit the loop via thread cancellation in sigwait() for (;;) { // wait int signal; sigwait(&sigset, &signal); // if we get here then the signal was raised. cancel the thread. mainThreadRep->cancel(); } } #endif // HAVE_PTHREAD #if WINDOWS_LIKE void CThreadRep::init() { m_result = NULL; m_cancellable = true; m_cancelling = false; m_exit = CreateEvent(NULL, TRUE, FALSE, NULL); m_cancel = CreateEvent(NULL, TRUE, FALSE, NULL); } void CThreadRep::fini() { // destroy the events CloseHandle(m_cancel); CloseHandle(m_exit); // close the handle (main thread has a NULL handle) if (m_thread != NULL) { CloseHandle(m_thread); } } void CThreadRep::sleep( double timeout) { if (isCancellable()) { WaitForSingleObject(m_cancel, (DWORD)(1000.0 * timeout)); } else { Sleep((DWORD)(1000.0 * timeout)); } } void CThreadRep::cancel() { LOG((CLOG_DEBUG1 "cancel thread %p", this)); SetEvent(m_cancel); } void CThreadRep::testCancel() { // poll cancel event. return if not set. const DWORD result = WaitForSingleObject(getCancelEvent(), 0); if (result != WAIT_OBJECT_0) { return; } { // ignore if disabled or already cancelling CLock lock(s_mutex); if (!m_cancellable || m_cancelling) return; // update state for cancel m_cancelling = true; ResetEvent(m_cancel); } // start cancel LOG((CLOG_DEBUG1 "throw cancel on thread %p", this)); throw XThreadCancel(); } bool CThreadRep::wait(CThreadRep* target, double timeout) { // get the current thread. if it's the same as the target thread // then the thread is waiting on itself. CThreadPtr currentRep(CThreadRep::getCurrentThreadRep()); if (target == this) { return false; } // is cancellation enabled? const DWORD n = (isCancellable() ? 2 : 1); // convert timeout DWORD t; if (timeout < 0.0) { t = INFINITE; } else { t = (DWORD)(1000.0 * timeout); } // wait for this thread to be cancelled or for the target thread to // terminate. HANDLE handles[2]; handles[0] = target->getExitEvent(); handles[1] = m_cancel; DWORD result = WaitForMultipleObjects(n, handles, FALSE, t); // cancel takes priority if (n == 2 && result != WAIT_OBJECT_0 + 1 && WaitForSingleObject(handles[1], 0) == WAIT_OBJECT_0) { result = WAIT_OBJECT_0 + 1; } // handle result switch (result) { case WAIT_OBJECT_0 + 0: // target thread terminated return true; case WAIT_OBJECT_0 + 1: // this thread was cancelled. does not return. testCancel(); default: // timeout or error return false; } } bool CThreadRep::waitForEvent(double timeout) { // check if messages are available first. if we don't do this then // MsgWaitForMultipleObjects() will block even if the queue isn't // empty if the messages in the queue were there before the last // call to GetMessage()/PeekMessage(). if (HIWORD(GetQueueStatus(QS_ALLINPUT)) != 0) { return true; } // is cancellation enabled? const DWORD n = (isCancellable() ? 1 : 0); // convert timeout DWORD t; if (timeout < 0.0) { t = INFINITE; } else { t = (DWORD)(1000.0 * timeout); } // wait for this thread to be cancelled or for the target thread to // terminate. HANDLE handles[1]; handles[0] = m_cancel; DWORD result = MsgWaitForMultipleObjects(n, handles, FALSE, t, QS_ALLINPUT); // handle result switch (result) { case WAIT_OBJECT_0 + 1: // message is available return true; case WAIT_OBJECT_0 + 0: // this thread was cancelled. does not return. testCancel(); default: // timeout or error return false; } } void CThreadRep::setPriority(int n) { DWORD pClass = NORMAL_PRIORITY_CLASS; if (n < 0) { switch (-n) { case 1: n = THREAD_PRIORITY_ABOVE_NORMAL; break; case 2: n = THREAD_PRIORITY_HIGHEST; break; default: pClass = HIGH_PRIORITY_CLASS; switch (-n - 3) { case 0: n = THREAD_PRIORITY_LOWEST; break; case 1: n = THREAD_PRIORITY_BELOW_NORMAL; break; case 2: n = THREAD_PRIORITY_NORMAL; break; case 3: n = THREAD_PRIORITY_ABOVE_NORMAL; break; default: n = THREAD_PRIORITY_HIGHEST; break; } break; } } else { switch (n) { case 0: n = THREAD_PRIORITY_NORMAL; break; case 1: n = THREAD_PRIORITY_BELOW_NORMAL; break; case 2: n = THREAD_PRIORITY_LOWEST; break; default: n = THREAD_PRIORITY_IDLE; break; } } SetPriorityClass(m_thread, pClass); SetThreadPriority(m_thread, n); } HANDLE CThreadRep::getExitEvent() const { // no lock necessary because the value never changes return m_exit; } HANDLE CThreadRep::getCancelEvent() const { // no lock necessary because the value never changes return m_cancel; } unsigned int __stdcall CThreadRep::threadFunc(void* arg) { CThreadRep* rep = (CThreadRep*)arg; // run thread rep->doThreadFunc(); // signal termination SetEvent(rep->m_exit); // unref thread rep->unref(); // terminate the thread return 0; } #endif // WINDOWS_LIKE