|
reSIProcate/rutil
9694
|
A condition variable that can be signaled or waited on, wraps POSIX/Windows implementations depending on environment. More...
#include <Condition.hxx>
Public Member Functions | |
| Condition () | |
| virtual | ~Condition () |
| void | wait (Mutex &mtx) |
| wait for the condition to be signaled | |
| bool | wait (Mutex &mutex, unsigned int ms) |
| wait for the condition to be signaled | |
| void | wait (Mutex *mutex) |
| bool | wait (Mutex *mutex, unsigned int ms) |
| void | signal () |
| Signal one waiting thread. | |
| void | broadcast () |
| Signal all waiting threads. | |
Private Member Functions | |
| Condition (const Condition &) | |
| Condition & | operator= (const Condition &) |
Private Attributes | |
| pthread_cond_t | mId |
A condition variable that can be signaled or waited on, wraps POSIX/Windows implementations depending on environment.
Here's an example (from ThreadIf):
void ThreadIf::shutdown() { Lock lock(mShutdownMutex); if (!mShutdown) { mShutdown = true; mShutdownCondition.signal(); } } bool ThreadIf::waitForShutdown(int ms) const { Lock lock(mShutdownMutex); mShutdownCondition.wait(mShutdownMutex, ms); return mShutdown; }
Definition at line 51 of file Condition.hxx.
| Condition::Condition | ( | ) |
Definition at line 26 of file Condition.cxx.
References mId.
{
//std::cerr << this << " Condition::Condition" << std::endl;
#ifdef WIN32
# ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX
m_blocked = 0;
m_gone = 0;
m_waiting = 0;
m_gate = reinterpret_cast<void*>(CreateSemaphore(0, 1, 1, 0));
m_queue = reinterpret_cast<void*>(CreateSemaphore(0, 0, LONG_MAX, 0));
m_mutex = reinterpret_cast<void*>(CreateMutex(0, 0, 0));
if (!m_gate || !m_queue || !m_mutex)
{
int res = 0;
if (m_gate)
{
res = CloseHandle(reinterpret_cast<HANDLE>(m_gate));
assert(res);
}
if (m_queue)
{
res = CloseHandle(reinterpret_cast<HANDLE>(m_queue));
assert(res);
}
if (m_mutex)
{
res = CloseHandle(reinterpret_cast<HANDLE>(m_mutex));
assert(res);
}
assert(0);
}
# else
mId = CreateEvent(
NULL, //LPSECURITY_ATTRIBUTES lpEventAttributes,
// pointer to security attributes
FALSE, // BOOL bManualReset, // flag for manual-reset event
FALSE, //BOOL bInitialState, // flag for initial state
NULL //LPCTSTR lpName // pointer to event-object name
);
assert(mId);
# endif
#else
#ifdef _RESIP_MONOTONIC_CLOCK
pthread_condattr_t attr;
struct timespec dummy;
int ret = pthread_condattr_init( &attr );
assert( ret == 0 );
// if((syscall( __NR_clock_getres, CLOCK_MONOTONIC, &dummy ) == 0) &&
if((clock_getres( CLOCK_MONOTONIC, &dummy ) == 0) &&
(pthread_condattr_setclock( &attr, CLOCK_MONOTONIC ) == 0))
{
ret = pthread_cond_init( &mId, &attr );
assert( ret == 0 );
pthread_condattr_destroy( &attr );
return;
}
pthread_condattr_destroy( &attr );
#endif
int rc = pthread_cond_init(&mId,0);
(void)rc;
assert( rc == 0 );
#endif
}
| Condition::~Condition | ( | ) | [virtual] |
Definition at line 95 of file Condition.cxx.
References mId.
{
#ifdef WIN32
# ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX
int res = 0;
res = CloseHandle(reinterpret_cast<HANDLE>(m_gate));
assert(res);
res = CloseHandle(reinterpret_cast<HANDLE>(m_queue));
assert(res);
res = CloseHandle(reinterpret_cast<HANDLE>(m_mutex));
assert(res);
# else
BOOL ok = CloseHandle(mId);
assert( ok );
# endif
#else
if (pthread_cond_destroy(&mId) == EBUSY)
{
assert(0);
}
#endif
}
| resip::Condition::Condition | ( | const Condition & | ) | [private] |
| void Condition::broadcast | ( | ) |
Signal all waiting threads.
Definition at line 473 of file Condition.cxx.
References mId.
Referenced by resip::RWMutex::unlock().
{
#ifdef WIN32
# ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX
unsigned signals = 0;
int res = 0;
res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
assert(res == WAIT_OBJECT_0);
if (m_waiting != 0) // the m_gate is already closed
{
if (m_blocked == 0)
{
res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
assert(res);
return;
}
m_waiting += (signals = m_blocked);
m_blocked = 0;
}
else
{
res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
assert(res == WAIT_OBJECT_0);
if (m_blocked > m_gone)
{
if (m_gone != 0)
{
m_blocked -= m_gone;
m_gone = 0;
}
signals = m_waiting = m_blocked;
m_blocked = 0;
}
else
{
res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
assert(res);
}
}
res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
assert(res);
if (signals)
{
res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_queue), signals, 0);
assert(res);
}
# else
assert(0);
# endif
#else
pthread_cond_broadcast(&mId);
#endif
}
| void Condition::signal | ( | ) |
Signal one waiting thread.
Definition at line 406 of file Condition.cxx.
References mId.
Referenced by resip::AbstractFifo< Timestamped< Msg * > >::add(), resip::AbstractFifo< Timestamped< Msg * > >::addMultiple(), resip::ThreadIf::shutdown(), and resip::RWMutex::unlock().
{
#ifdef WIN32
# ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX
unsigned signals = 0;
int res = 0;
res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
assert(res == WAIT_OBJECT_0);
if (m_waiting != 0) // the m_gate is already closed
{
if (m_blocked == 0)
{
res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
assert(res);
return;
}
++m_waiting;
--m_blocked;
signals = 1;
}
else
{
res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
assert(res == WAIT_OBJECT_0);
if (m_blocked > m_gone)
{
if (m_gone != 0)
{
m_blocked -= m_gone;
m_gone = 0;
}
signals = m_waiting = 1;
--m_blocked;
}
else
{
res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
assert(res);
}
}
res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
assert(res);
if (signals)
{
res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_queue), signals, 0);
assert(res);
}
# else
BOOL ret = SetEvent(
mId // HANDLE hEvent // handle to event object
);
assert(ret);
# endif
#else
int ret = pthread_cond_signal(&mId);
(void)ret;
assert( ret == 0 );
#endif
}
| void Condition::wait | ( | Mutex & | mtx | ) |
wait for the condition to be signaled
| mtx | The mutex associated with the condition variable |
Definition at line 132 of file Condition.cxx.
References resip::Mutex::getId(), resip::Mutex::lock(), mId, and resip::Mutex::unlock().
Referenced by resip::AbstractFifo< Timestamped< Msg * > >::getMultiple(), resip::AbstractFifo< Timestamped< Msg * > >::getNext(), resip::RWMutex::readlock(), wait(), resip::ThreadIf::waitForShutdown(), and resip::RWMutex::writelock().
{
//std::cerr << "Condition::wait " << mutex << std::endl;
#ifdef WIN32
# ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX
enterWait();
// Release the mutex
mutex.unlock();
// do wait
{
int res = 0;
res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue), INFINITE);
assert(res == WAIT_OBJECT_0);
unsigned was_waiting=0;
unsigned was_gone=0;
res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
assert(res == WAIT_OBJECT_0);
was_waiting = m_waiting;
was_gone = m_gone;
if (was_waiting != 0)
{
if (--m_waiting == 0)
{
if (m_blocked != 0)
{
res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0); // open m_gate
assert(res);
was_waiting = 0;
}
else if (m_gone != 0)
m_gone = 0;
}
}
else if (++m_gone == (ULONG_MAX / 2))
{
// timeout occured, normalize the m_gone count
// this may occur if many calls to wait with a timeout are made and
// no call to notify_* is made
res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
assert(res == WAIT_OBJECT_0);
m_blocked -= m_gone;
res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
assert(res);
m_gone = 0;
}
res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
assert(res);
if (was_waiting == 1)
{
for (/* */ ; was_gone; --was_gone)
{
// better now than spurious later
res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue),
INFINITE);
assert(res == WAIT_OBJECT_0);
}
res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
assert(res);
}
}
// Reacquire the mutex
mutex.lock();
# else
// FixMe: Race condition between time we get mId and when we
// re-acquire the mutex.
mutex.unlock();
WaitForSingleObject(mId,INFINITE);
mutex.lock();
# endif
#else
int ret = pthread_cond_wait(&mId, mutex.getId());
(void)ret;
assert( ret == 0 );
#endif
}

| bool Condition::wait | ( | Mutex & | mutex, |
| unsigned int | ms | ||
| ) |
wait for the condition to be signaled
| mtx | The mutex associated with the condition variable |
| true | The condition was woken up by activity |
| false | Timeout or interrupt. |
Definition at line 222 of file Condition.cxx.
References resip::Mutex::getId(), resip::Timer::getTimeMs(), resip::Mutex::lock(), mId, resip::Mutex::unlock(), and wait().
{
if (ms == 0)
{
wait(mutex);
return true;
}
#ifdef WIN32
# ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX
enterWait();
// Release the mutex
mutex.unlock();
// do timed wait
bool ret = false;
unsigned int res = 0;
#if 0 /* unnecessary time stuff - used in BOOST implementation because expiry time is provided to do_timed_wait - we pass in an interval */
UInt64 start = Timer::getTimeMs();
for (;;)
{
res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue),
ms);
assert(res != WAIT_FAILED && res != WAIT_ABANDONED);
ret = (res == WAIT_OBJECT_0);
if (res == WAIT_TIMEOUT)
{
UInt64 now = Timer::getTimeMs();
unsigned int elapsed = (unsigned int)(now - start);
if (ms > elapsed)
{
ms -= elapsed;
continue;
}
}
break;
}
#endif
res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue),ms);
assert(res != WAIT_FAILED && res != WAIT_ABANDONED);
ret = (res == WAIT_OBJECT_0);
unsigned was_waiting=0;
unsigned was_gone=0;
res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
assert(res == WAIT_OBJECT_0);
was_waiting = m_waiting;
was_gone = m_gone;
if (was_waiting != 0)
{
if (!ret) // timeout
{
if (m_blocked != 0)
--m_blocked;
else
++m_gone; // count spurious wakeups
}
if (--m_waiting == 0)
{
if (m_blocked != 0)
{
res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0); // open m_gate
assert(res);
was_waiting = 0;
}
else if (m_gone != 0)
m_gone = 0;
}
}
else if (++m_gone == (ULONG_MAX / 2))
{
// timeout occured, normalize the m_gone count
// this may occur if many calls to wait with a timeout are made and
// no call to notify_* is made
res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
assert(res == WAIT_OBJECT_0);
m_blocked -= m_gone;
res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
assert(res);
m_gone = 0;
}
res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
assert(res);
if (was_waiting == 1)
{
for (/* */ ; was_gone; --was_gone)
{
// better now than spurious later
res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue), INFINITE);
assert(res == WAIT_OBJECT_0);
}
res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
assert(res);
}
// Reacquire the mutex
mutex.lock();
return ret;
# else
// FixMe: Race condition between time we get mId and when we
// re-acquire the mutex.
//
// SLG: A Note about the Win32 Implementation of Conditions
//
// I have investigated a fix for this. A solution to this problem is
// non-trivial. Please read http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
// for a full explanation. This is an implementation of the SetEvent solution
// discussed in that article. This solution has the following issues:
// 1. Unfairness - ie. First thread to call wait may not be first thread
// to be released from condition.
// 2. Incorrectness due to a race condition when a broadcast occurs
// (see the link for more details on these issues)
//
// There is a solution that corrects these two problem, but also introduces 2 more.
// This solution (also discussed in the link) requires the use of a primitive only
// available in WinNT and above. It also requires that the Mutex passed in be
// implemented using windows Mutexes instead of CriticalSections - they are less
// efficient. Thus the problems with this SignalObjectAndWait solution are:
// 1. Not portable to all versions of windows - ie. will not work with Win98/Me
// 2. Less efficient than tthe SetEvent solution
//
// I have choosen to stick with the SetEvent Solution for the following reasons:
// 1. Speed is important.
// 2. The Unfairness issue is not really a big problem since the stack currently
// does not call a wait function from two different threads. (assuming the
// hosting application always calls process() from the same thread). The only
// time multi-threading comes into the picture is when the transports queue
// messages from the wire onto the stateMacFifo - but they are retrieved off the
// Fifo by a single thread.
// 3. The Incorrectness issue is also not a big problem, since the stack currently
// doesn't use the broadcast member of this class.
//
// Note: The implementation of broadcast remains incomplete - since it is currently
// unused and would require an additional CriticalSection Enter and Leave to
// keep track of a counter (see the above link for more info). This can be
// easily added in the future if required.
mutex.unlock();
DWORD ret = WaitForSingleObject(mId, ms);
mutex.lock();
assert(ret != WAIT_FAILED);
return (ret == WAIT_OBJECT_0);
# endif
#else // WIN32
UInt64 expires64 = Timer::getTimeMs() + ms;
timespec expiresTS;
expiresTS.tv_sec = expires64 / 1000;
expiresTS.tv_nsec = (expires64 % 1000) * 1000000L;
assert( expiresTS.tv_nsec < 1000000000L );
//std::cerr << "Condition::wait " << mutex << "ms=" << ms << " expire=" << expiresTS.tv_sec << " " << expiresTS.tv_nsec << std::endl;
int ret = pthread_cond_timedwait(&mId, mutex.getId(), &expiresTS);
if (ret == EINTR || ret == ETIMEDOUT)
{
return false;
}
else
{
//std::cerr << this << " pthread_cond_timedwait failed " << ret << " mutex=" << mutex << std::endl;
(void)ret;
assert( ret == 0 );
return true;
}
#endif // not WIN32
}

| void Condition::wait | ( | Mutex * | mutex | ) |
Definition at line 216 of file Condition.cxx.
References wait().
{
this->wait(*mutex);
}

| bool Condition::wait | ( | Mutex * | mutex, |
| unsigned int | ms | ||
| ) |
Definition at line 400 of file Condition.cxx.
References wait().
{
return this->wait(*mutex, ms);
}

pthread_cond_t resip::Condition::mId [mutable, private] |
Definition at line 108 of file Condition.hxx.
Referenced by broadcast(), Condition(), signal(), wait(), and ~Condition().
1.7.5.1