reSIProcate/rutil  9694
Condition.cxx
Go to the documentation of this file.
00001 #include <cassert>
00002 #include <climits>
00003 
00004 #ifndef WIN32
00005 #  include <pthread.h>
00006 #  include <errno.h>
00007 #  include <sys/time.h>
00008 #  include <sys/syscall.h>
00009 #  include <unistd.h>
00010 #endif
00011 
00012 #include "rutil/compat.hxx"
00013 #include "rutil/Condition.hxx"
00014 #include "rutil/Mutex.hxx"
00015 #include "rutil/Timer.hxx"
00016 
00017 #ifdef _RESIP_MONOTONIC_CLOCK
00018 #ifdef __APPLE__
00019 #undef _RESIP_MONOTONIC_CLOCK
00020 #warning Mac OS X does not support POSIX monotonic timers.
00021 #endif
00022 #endif
00023 
00024 using namespace resip;
00025 
00026 Condition::Condition()
00027 {
00028    //std::cerr << this << " Condition::Condition" << std::endl;
00029 
00030 #ifdef WIN32
00031 #  ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX
00032    m_blocked = 0;
00033    m_gone = 0;
00034    m_waiting = 0;
00035    m_gate = reinterpret_cast<void*>(CreateSemaphore(0, 1, 1, 0));
00036    m_queue = reinterpret_cast<void*>(CreateSemaphore(0, 0, LONG_MAX, 0));
00037    m_mutex = reinterpret_cast<void*>(CreateMutex(0, 0, 0));
00038 
00039    if (!m_gate || !m_queue || !m_mutex)
00040    {
00041       int res = 0;
00042       if (m_gate)
00043       {
00044          res = CloseHandle(reinterpret_cast<HANDLE>(m_gate));
00045          assert(res);
00046       }
00047       if (m_queue)
00048       {
00049          res = CloseHandle(reinterpret_cast<HANDLE>(m_queue));
00050          assert(res);
00051       }
00052       if (m_mutex)
00053       {
00054          res = CloseHandle(reinterpret_cast<HANDLE>(m_mutex));
00055          assert(res);
00056       }
00057 
00058       assert(0);
00059    }
00060 #  else
00061    mId =  CreateEvent(
00062       NULL, //LPSECURITY_ATTRIBUTES lpEventAttributes,
00063       // pointer to security attributes
00064       FALSE, // BOOL bManualReset,  // flag for manual-reset event
00065       FALSE, //BOOL bInitialState, // flag for initial state
00066       NULL //LPCTSTR lpName      // pointer to event-object name
00067       );
00068    assert(mId);
00069 #  endif
00070 #else
00071 #ifdef _RESIP_MONOTONIC_CLOCK
00072    pthread_condattr_t attr;
00073    struct timespec dummy;
00074    int ret = pthread_condattr_init( &attr );
00075    assert( ret == 0 );
00076 
00077 //   if((syscall( __NR_clock_getres, CLOCK_MONOTONIC, &dummy ) == 0) &&
00078      if((clock_getres( CLOCK_MONOTONIC, &dummy ) == 0) &&
00079        (pthread_condattr_setclock( &attr, CLOCK_MONOTONIC ) == 0))
00080    {
00081       ret = pthread_cond_init( &mId, &attr );
00082       assert( ret == 0 );
00083       pthread_condattr_destroy( &attr );
00084       return;
00085    }
00086    pthread_condattr_destroy( &attr );
00087 #endif
00088    int  rc =  pthread_cond_init(&mId,0);
00089    (void)rc;
00090    assert( rc == 0 );
00091 #endif
00092 }
00093 
00094 
00095 Condition::~Condition ()
00096 {
00097 #ifdef WIN32
00098 #  ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX
00099     int res = 0;
00100     res = CloseHandle(reinterpret_cast<HANDLE>(m_gate));
00101     assert(res);
00102     res = CloseHandle(reinterpret_cast<HANDLE>(m_queue));
00103     assert(res);
00104     res = CloseHandle(reinterpret_cast<HANDLE>(m_mutex));
00105     assert(res);
00106 #  else
00107    BOOL ok = CloseHandle(mId);
00108    assert( ok );
00109 #  endif
00110 #else
00111    if (pthread_cond_destroy(&mId) == EBUSY)
00112    {
00113       assert(0);
00114    }
00115 #endif
00116 }
00117 
00118 #if defined(WIN32) && defined(RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX)
00119 void
00120 Condition::enterWait ()
00121 {
00122    int res = 0;
00123    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
00124    assert(res == WAIT_OBJECT_0);
00125    ++m_blocked;
00126    res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
00127    assert(res);
00128 }
00129 #endif
00130 
00131 void
00132 Condition::wait (Mutex& mutex)
00133 {
00134    //std::cerr << "Condition::wait " << mutex << std::endl;
00135 #ifdef WIN32
00136 #  ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX
00137    enterWait();
00138 
00139    // Release the mutex
00140    mutex.unlock();
00141 
00142    // do wait
00143    {
00144       int res = 0;
00145       res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue), INFINITE);
00146       assert(res == WAIT_OBJECT_0);
00147 
00148       unsigned was_waiting=0;
00149       unsigned was_gone=0;
00150 
00151       res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
00152       assert(res == WAIT_OBJECT_0);
00153       was_waiting = m_waiting;
00154       was_gone = m_gone;
00155       if (was_waiting != 0)
00156       {
00157          if (--m_waiting == 0)
00158          {
00159             if (m_blocked != 0)
00160             {
00161                res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0); // open m_gate
00162                assert(res);
00163                was_waiting = 0;
00164             }
00165             else if (m_gone != 0)
00166                 m_gone = 0;
00167          }
00168       }
00169       else if (++m_gone == (ULONG_MAX / 2))
00170       {
00171          // timeout occured, normalize the m_gone count
00172          // this may occur if many calls to wait with a timeout are made and
00173          // no call to notify_* is made
00174          res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
00175          assert(res == WAIT_OBJECT_0);
00176          m_blocked -= m_gone;
00177          res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
00178          assert(res);
00179          m_gone = 0;
00180       }
00181       res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
00182       assert(res);
00183 
00184       if (was_waiting == 1)
00185       {
00186          for (/* */ ; was_gone; --was_gone)
00187          {
00188             // better now than spurious later
00189             res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue),
00190                   INFINITE);
00191             assert(res == WAIT_OBJECT_0);
00192          }
00193          res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
00194          assert(res);
00195       }
00196    }
00197 
00198    // Reacquire the mutex
00199    mutex.lock();
00200 
00201 #   else
00202    // FixMe: Race condition between time we get mId and when we
00203    // re-acquire the mutex.
00204    mutex.unlock();
00205    WaitForSingleObject(mId,INFINITE);
00206    mutex.lock();
00207 #   endif
00208 #else
00209    int ret = pthread_cond_wait(&mId, mutex.getId());
00210    (void)ret;
00211    assert( ret == 0 );
00212 #endif
00213 }
00214 
00215 void
00216 Condition::wait (Mutex* mutex)
00217 {
00218    this->wait(*mutex);
00219 }
00220 
00221 bool
00222 Condition::wait(Mutex& mutex, 
00223                 unsigned int ms)
00224 {
00225    if (ms == 0)
00226    {
00227       wait(mutex);
00228       return true;
00229    }
00230 
00231 #ifdef WIN32
00232 #   ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX
00233    enterWait();
00234 
00235    // Release the mutex
00236    mutex.unlock();
00237 
00238    //  do timed wait
00239    bool ret = false;
00240    unsigned int res = 0;
00241 
00242 #if 0  /*  unnecessary time stuff - used in BOOST implementation because expiry time is provided to do_timed_wait - we pass in an interval */
00243    UInt64  start = Timer::getTimeMs();
00244 
00245    for (;;)
00246    {
00247        res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue),
00248              ms);
00249        assert(res != WAIT_FAILED && res != WAIT_ABANDONED);
00250        ret = (res == WAIT_OBJECT_0);
00251        if (res == WAIT_TIMEOUT)
00252        {
00253           UInt64  now = Timer::getTimeMs();
00254           unsigned int elapsed = (unsigned int)(now - start);
00255           if (ms > elapsed)
00256           {
00257              ms -= elapsed;
00258              continue;
00259           }
00260        }
00261 
00262        break;
00263    }
00264 #endif
00265 
00266    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue),ms);
00267    assert(res != WAIT_FAILED && res != WAIT_ABANDONED);
00268    ret = (res == WAIT_OBJECT_0);
00269 
00270    unsigned was_waiting=0;
00271    unsigned was_gone=0;
00272 
00273    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
00274    assert(res == WAIT_OBJECT_0);
00275    was_waiting = m_waiting;
00276    was_gone = m_gone;
00277    if (was_waiting != 0)
00278    {
00279       if (!ret) // timeout
00280       {
00281          if (m_blocked != 0)
00282             --m_blocked;
00283          else
00284             ++m_gone; // count spurious wakeups
00285       }
00286       if (--m_waiting == 0)
00287       {
00288          if (m_blocked != 0)
00289          {
00290             res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0); // open m_gate
00291             assert(res);
00292             was_waiting = 0;
00293          }
00294          else if (m_gone != 0)
00295             m_gone = 0;
00296       }
00297    }
00298    else if (++m_gone == (ULONG_MAX / 2))
00299    {
00300       // timeout occured, normalize the m_gone count
00301       // this may occur if many calls to wait with a timeout are made and
00302       // no call to notify_* is made
00303       res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
00304       assert(res == WAIT_OBJECT_0);
00305       m_blocked -= m_gone;
00306       res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
00307       assert(res);
00308       m_gone = 0;
00309    }
00310    res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
00311    assert(res);
00312 
00313    if (was_waiting == 1)
00314    {
00315       for (/* */ ; was_gone; --was_gone)
00316       {
00317          // better now than spurious later
00318          res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue), INFINITE);
00319          assert(res ==  WAIT_OBJECT_0);
00320       }
00321       res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
00322       assert(res);
00323    }
00324 
00325    // Reacquire the mutex
00326    mutex.lock();
00327 
00328    return ret;
00329 
00330 #   else
00331    // FixMe: Race condition between time we get mId and when we
00332    // re-acquire the mutex.
00333    //
00334    // SLG: A Note about the Win32 Implementation of Conditions
00335    //
00336    // I have investigated a fix for this.  A solution to this problem is
00337    // non-trivial.  Please read http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
00338    // for a full explanation.  This is an implementation of the SetEvent solution
00339    // discussed in that article.  This solution has the following issues:
00340    // 1.  Unfairness - ie.  First thread to call wait may not be first thread
00341    //     to be released from condition.
00342    // 2.  Incorrectness due to a race condition when a broadcast occurs
00343    // (see the link for more details on these issues)
00344    //
00345    // There is a solution that corrects these two problem, but also introduces 2 more.
00346    // This solution (also discussed in the link) requires the use of a primitive only
00347    // available in WinNT and above.  It also requires that the Mutex passed in be
00348    // implemented using windows Mutexes instead of CriticalSections - they are less
00349    // efficient.  Thus the problems with this SignalObjectAndWait solution are:
00350    // 1.  Not portable to all versions of windows - ie.  will not work with Win98/Me
00351    // 2.  Less efficient than tthe SetEvent solution
00352    //
00353    // I have choosen to stick with the SetEvent Solution for the following reasons:
00354    // 1.  Speed is important.
00355    // 2.  The Unfairness issue is not really a big problem since the stack currently
00356    //     does not call a wait function from two different threads.  (assuming the
00357    //     hosting application always calls process() from the same thread).  The only
00358    //     time multi-threading comes into the picture is when the transports queue
00359    //     messages from the wire onto the stateMacFifo - but they are retrieved off the
00360    //     Fifo by a single thread.
00361    // 3.  The Incorrectness issue is also not a big problem, since the stack currently
00362    //     doesn't use the broadcast member of this class.
00363    //
00364    // Note:  The implementation of broadcast remains incomplete - since it is currently
00365    //        unused and would require an additional CriticalSection Enter and Leave to
00366    //        keep track of a counter (see the above link for more info).  This can be
00367    //        easily added in the future if required.
00368    mutex.unlock();
00369    DWORD ret = WaitForSingleObject(mId, ms);
00370    mutex.lock();
00371    assert(ret != WAIT_FAILED);
00372    return (ret == WAIT_OBJECT_0);
00373 #   endif
00374 #else   // WIN32
00375    UInt64 expires64 = Timer::getTimeMs() + ms;
00376    timespec expiresTS;
00377    expiresTS.tv_sec = expires64 / 1000;
00378    expiresTS.tv_nsec = (expires64 % 1000) * 1000000L;
00379 
00380    assert( expiresTS.tv_nsec < 1000000000L );
00381 
00382    //std::cerr << "Condition::wait " << mutex << "ms=" << ms << " expire=" << expiresTS.tv_sec << " " << expiresTS.tv_nsec << std::endl;
00383    int ret = pthread_cond_timedwait(&mId, mutex.getId(), &expiresTS);
00384 
00385    if (ret == EINTR || ret == ETIMEDOUT)
00386    {
00387       return false;
00388    }
00389    else
00390    {
00391       //std::cerr << this << " pthread_cond_timedwait failed " << ret << " mutex=" << mutex << std::endl;
00392       (void)ret;
00393       assert( ret == 0 );
00394       return true;
00395    }
00396 #endif  // not WIN32
00397 }
00398 
00399 bool
00400 Condition::wait (Mutex* mutex, unsigned int ms)
00401 {
00402    return this->wait(*mutex, ms);
00403 }
00404 
00405 void
00406 Condition::signal ()
00407 {
00408 #ifdef WIN32
00409 #  ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX
00410     unsigned signals = 0;
00411 
00412    int res = 0;
00413    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
00414    assert(res == WAIT_OBJECT_0);
00415 
00416    if (m_waiting != 0) // the m_gate is already closed
00417    {
00418       if (m_blocked == 0)
00419       {
00420          res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
00421          assert(res);
00422          return;
00423       }
00424 
00425       ++m_waiting;
00426       --m_blocked;
00427       signals = 1;
00428    }
00429    else
00430    {
00431       res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
00432       assert(res == WAIT_OBJECT_0);
00433       if (m_blocked > m_gone)
00434       {
00435          if (m_gone != 0)
00436          {
00437             m_blocked -= m_gone;
00438             m_gone = 0;
00439          }
00440          signals = m_waiting = 1;
00441          --m_blocked;
00442       }
00443       else
00444       {
00445          res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
00446          assert(res);
00447       }
00448    }
00449 
00450    res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
00451    assert(res);
00452 
00453    if (signals)
00454    {
00455       res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_queue), signals, 0);
00456       assert(res);
00457    }
00458 #  else
00459    BOOL ret = SetEvent(
00460       mId // HANDLE hEvent   // handle to event object
00461       );
00462    assert(ret);
00463 #  endif
00464 #else
00465    int ret = pthread_cond_signal(&mId);
00466    (void)ret;
00467    assert( ret == 0 );
00468 #endif
00469 }
00470 
00471 
00472 void
00473 Condition::broadcast()
00474 {
00475 #ifdef WIN32
00476 #  ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX
00477    unsigned signals = 0;
00478 
00479    int res = 0;
00480    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
00481    assert(res == WAIT_OBJECT_0);
00482 
00483    if (m_waiting != 0) // the m_gate is already closed
00484    {
00485       if (m_blocked == 0)
00486       {
00487          res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
00488          assert(res);
00489          return;
00490       }
00491 
00492       m_waiting += (signals = m_blocked);
00493       m_blocked = 0;
00494    }
00495    else
00496    {
00497       res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
00498       assert(res == WAIT_OBJECT_0);
00499       if (m_blocked > m_gone)
00500       {
00501          if (m_gone != 0)
00502          {
00503             m_blocked -= m_gone;
00504             m_gone = 0;
00505          }
00506          signals = m_waiting = m_blocked;
00507          m_blocked = 0;
00508       }
00509       else
00510       {
00511          res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
00512          assert(res);
00513       }
00514    }
00515 
00516    res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
00517    assert(res);
00518 
00519    if (signals)
00520    {
00521       res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_queue), signals, 0);
00522       assert(res);
00523    }
00524 #  else
00525    assert(0);
00526 #  endif
00527 #else
00528    pthread_cond_broadcast(&mId);
00529 #endif
00530 }
00531 
00532 /* ====================================================================
00533  * The Vovida Software License, Version 1.0
00534  *
00535  * Copyright (c) 2000-2005 Vovida Networks, Inc.  All rights reserved.
00536  * 
00537  * Redistribution and use in source and binary forms, with or without
00538  * modification, are permitted provided that the following conditions
00539  * are met:
00540  *
00541  * 1. Redistributions of source code must retain the above copyright
00542  *    notice, this list of conditions and the following disclaimer.
00543  *
00544  * 2. Redistributions in binary form must reproduce the above copyright
00545  *    notice, this list of conditions and the following disclaimer in
00546  *    the documentation and/or other materials provided with the
00547  *    distribution.
00548  *
00549  * 3. The names "VOCAL", "Vovida Open Communication Application Library",
00550  *    and "Vovida Open Communication Application Library (VOCAL)" must
00551  *    not be used to endorse or promote products derived from this
00552  *    software without prior written permission. For written
00553  *    permission, please contact vocal@vovida.org.
00554  *
00555  * 4. Products derived from this software may not be called "VOCAL", nor
00556  *    may "VOCAL" appear in their name, without prior written
00557  *    permission of Vovida Networks, Inc.
00558  *
00559  * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
00560  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
00561  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
00562  * NON-INFRINGEMENT ARE DISCLAIMED.  IN NO EVENT SHALL VOVIDA
00563  * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
00564  * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
00565  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00566  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00567  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
00568  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00569  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
00570  * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
00571  * DAMAGE.
00572  *
00573  * ====================================================================
00574  *
00575  * This software consists of voluntary contributions made by Vovida
00576  * Networks, Inc. and many individuals on behalf of Vovida Networks,
00577  * Inc.  For more information on Vovida Networks, Inc., please see
00578  * <http://www.vovida.org/>.
00579  *
00580  */