|
reSIProcate/rutil
9694
|
00001 #include <cassert> 00002 #include <climits> 00003 00004 #ifndef WIN32 00005 # include <pthread.h> 00006 # include <errno.h> 00007 # include <sys/time.h> 00008 # include <sys/syscall.h> 00009 # include <unistd.h> 00010 #endif 00011 00012 #include "rutil/compat.hxx" 00013 #include "rutil/Condition.hxx" 00014 #include "rutil/Mutex.hxx" 00015 #include "rutil/Timer.hxx" 00016 00017 #ifdef _RESIP_MONOTONIC_CLOCK 00018 #ifdef __APPLE__ 00019 #undef _RESIP_MONOTONIC_CLOCK 00020 #warning Mac OS X does not support POSIX monotonic timers. 00021 #endif 00022 #endif 00023 00024 using namespace resip; 00025 00026 Condition::Condition() 00027 { 00028 //std::cerr << this << " Condition::Condition" << std::endl; 00029 00030 #ifdef WIN32 00031 # ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX 00032 m_blocked = 0; 00033 m_gone = 0; 00034 m_waiting = 0; 00035 m_gate = reinterpret_cast<void*>(CreateSemaphore(0, 1, 1, 0)); 00036 m_queue = reinterpret_cast<void*>(CreateSemaphore(0, 0, LONG_MAX, 0)); 00037 m_mutex = reinterpret_cast<void*>(CreateMutex(0, 0, 0)); 00038 00039 if (!m_gate || !m_queue || !m_mutex) 00040 { 00041 int res = 0; 00042 if (m_gate) 00043 { 00044 res = CloseHandle(reinterpret_cast<HANDLE>(m_gate)); 00045 assert(res); 00046 } 00047 if (m_queue) 00048 { 00049 res = CloseHandle(reinterpret_cast<HANDLE>(m_queue)); 00050 assert(res); 00051 } 00052 if (m_mutex) 00053 { 00054 res = CloseHandle(reinterpret_cast<HANDLE>(m_mutex)); 00055 assert(res); 00056 } 00057 00058 assert(0); 00059 } 00060 # else 00061 mId = CreateEvent( 00062 NULL, //LPSECURITY_ATTRIBUTES lpEventAttributes, 00063 // pointer to security attributes 00064 FALSE, // BOOL bManualReset, // flag for manual-reset event 00065 FALSE, //BOOL bInitialState, // flag for initial state 00066 NULL //LPCTSTR lpName // pointer to event-object name 00067 ); 00068 assert(mId); 00069 # endif 00070 #else 00071 #ifdef _RESIP_MONOTONIC_CLOCK 00072 pthread_condattr_t attr; 00073 struct timespec dummy; 00074 int ret = pthread_condattr_init( &attr ); 00075 assert( ret == 0 ); 00076 00077 // if((syscall( __NR_clock_getres, CLOCK_MONOTONIC, &dummy ) == 0) && 00078 if((clock_getres( CLOCK_MONOTONIC, &dummy ) == 0) && 00079 (pthread_condattr_setclock( &attr, CLOCK_MONOTONIC ) == 0)) 00080 { 00081 ret = pthread_cond_init( &mId, &attr ); 00082 assert( ret == 0 ); 00083 pthread_condattr_destroy( &attr ); 00084 return; 00085 } 00086 pthread_condattr_destroy( &attr ); 00087 #endif 00088 int rc = pthread_cond_init(&mId,0); 00089 (void)rc; 00090 assert( rc == 0 ); 00091 #endif 00092 } 00093 00094 00095 Condition::~Condition () 00096 { 00097 #ifdef WIN32 00098 # ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX 00099 int res = 0; 00100 res = CloseHandle(reinterpret_cast<HANDLE>(m_gate)); 00101 assert(res); 00102 res = CloseHandle(reinterpret_cast<HANDLE>(m_queue)); 00103 assert(res); 00104 res = CloseHandle(reinterpret_cast<HANDLE>(m_mutex)); 00105 assert(res); 00106 # else 00107 BOOL ok = CloseHandle(mId); 00108 assert( ok ); 00109 # endif 00110 #else 00111 if (pthread_cond_destroy(&mId) == EBUSY) 00112 { 00113 assert(0); 00114 } 00115 #endif 00116 } 00117 00118 #if defined(WIN32) && defined(RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX) 00119 void 00120 Condition::enterWait () 00121 { 00122 int res = 0; 00123 res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE); 00124 assert(res == WAIT_OBJECT_0); 00125 ++m_blocked; 00126 res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0); 00127 assert(res); 00128 } 00129 #endif 00130 00131 void 00132 Condition::wait (Mutex& mutex) 00133 { 00134 //std::cerr << "Condition::wait " << mutex << std::endl; 00135 #ifdef WIN32 00136 # ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX 00137 enterWait(); 00138 00139 // Release the mutex 00140 mutex.unlock(); 00141 00142 // do wait 00143 { 00144 int res = 0; 00145 res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue), INFINITE); 00146 assert(res == WAIT_OBJECT_0); 00147 00148 unsigned was_waiting=0; 00149 unsigned was_gone=0; 00150 00151 res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE); 00152 assert(res == WAIT_OBJECT_0); 00153 was_waiting = m_waiting; 00154 was_gone = m_gone; 00155 if (was_waiting != 0) 00156 { 00157 if (--m_waiting == 0) 00158 { 00159 if (m_blocked != 0) 00160 { 00161 res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0); // open m_gate 00162 assert(res); 00163 was_waiting = 0; 00164 } 00165 else if (m_gone != 0) 00166 m_gone = 0; 00167 } 00168 } 00169 else if (++m_gone == (ULONG_MAX / 2)) 00170 { 00171 // timeout occured, normalize the m_gone count 00172 // this may occur if many calls to wait with a timeout are made and 00173 // no call to notify_* is made 00174 res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE); 00175 assert(res == WAIT_OBJECT_0); 00176 m_blocked -= m_gone; 00177 res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0); 00178 assert(res); 00179 m_gone = 0; 00180 } 00181 res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex)); 00182 assert(res); 00183 00184 if (was_waiting == 1) 00185 { 00186 for (/* */ ; was_gone; --was_gone) 00187 { 00188 // better now than spurious later 00189 res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue), 00190 INFINITE); 00191 assert(res == WAIT_OBJECT_0); 00192 } 00193 res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0); 00194 assert(res); 00195 } 00196 } 00197 00198 // Reacquire the mutex 00199 mutex.lock(); 00200 00201 # else 00202 // FixMe: Race condition between time we get mId and when we 00203 // re-acquire the mutex. 00204 mutex.unlock(); 00205 WaitForSingleObject(mId,INFINITE); 00206 mutex.lock(); 00207 # endif 00208 #else 00209 int ret = pthread_cond_wait(&mId, mutex.getId()); 00210 (void)ret; 00211 assert( ret == 0 ); 00212 #endif 00213 } 00214 00215 void 00216 Condition::wait (Mutex* mutex) 00217 { 00218 this->wait(*mutex); 00219 } 00220 00221 bool 00222 Condition::wait(Mutex& mutex, 00223 unsigned int ms) 00224 { 00225 if (ms == 0) 00226 { 00227 wait(mutex); 00228 return true; 00229 } 00230 00231 #ifdef WIN32 00232 # ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX 00233 enterWait(); 00234 00235 // Release the mutex 00236 mutex.unlock(); 00237 00238 // do timed wait 00239 bool ret = false; 00240 unsigned int res = 0; 00241 00242 #if 0 /* unnecessary time stuff - used in BOOST implementation because expiry time is provided to do_timed_wait - we pass in an interval */ 00243 UInt64 start = Timer::getTimeMs(); 00244 00245 for (;;) 00246 { 00247 res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue), 00248 ms); 00249 assert(res != WAIT_FAILED && res != WAIT_ABANDONED); 00250 ret = (res == WAIT_OBJECT_0); 00251 if (res == WAIT_TIMEOUT) 00252 { 00253 UInt64 now = Timer::getTimeMs(); 00254 unsigned int elapsed = (unsigned int)(now - start); 00255 if (ms > elapsed) 00256 { 00257 ms -= elapsed; 00258 continue; 00259 } 00260 } 00261 00262 break; 00263 } 00264 #endif 00265 00266 res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue),ms); 00267 assert(res != WAIT_FAILED && res != WAIT_ABANDONED); 00268 ret = (res == WAIT_OBJECT_0); 00269 00270 unsigned was_waiting=0; 00271 unsigned was_gone=0; 00272 00273 res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE); 00274 assert(res == WAIT_OBJECT_0); 00275 was_waiting = m_waiting; 00276 was_gone = m_gone; 00277 if (was_waiting != 0) 00278 { 00279 if (!ret) // timeout 00280 { 00281 if (m_blocked != 0) 00282 --m_blocked; 00283 else 00284 ++m_gone; // count spurious wakeups 00285 } 00286 if (--m_waiting == 0) 00287 { 00288 if (m_blocked != 0) 00289 { 00290 res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0); // open m_gate 00291 assert(res); 00292 was_waiting = 0; 00293 } 00294 else if (m_gone != 0) 00295 m_gone = 0; 00296 } 00297 } 00298 else if (++m_gone == (ULONG_MAX / 2)) 00299 { 00300 // timeout occured, normalize the m_gone count 00301 // this may occur if many calls to wait with a timeout are made and 00302 // no call to notify_* is made 00303 res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE); 00304 assert(res == WAIT_OBJECT_0); 00305 m_blocked -= m_gone; 00306 res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0); 00307 assert(res); 00308 m_gone = 0; 00309 } 00310 res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex)); 00311 assert(res); 00312 00313 if (was_waiting == 1) 00314 { 00315 for (/* */ ; was_gone; --was_gone) 00316 { 00317 // better now than spurious later 00318 res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue), INFINITE); 00319 assert(res == WAIT_OBJECT_0); 00320 } 00321 res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0); 00322 assert(res); 00323 } 00324 00325 // Reacquire the mutex 00326 mutex.lock(); 00327 00328 return ret; 00329 00330 # else 00331 // FixMe: Race condition between time we get mId and when we 00332 // re-acquire the mutex. 00333 // 00334 // SLG: A Note about the Win32 Implementation of Conditions 00335 // 00336 // I have investigated a fix for this. A solution to this problem is 00337 // non-trivial. Please read http://www.cs.wustl.edu/~schmidt/win32-cv-1.html 00338 // for a full explanation. This is an implementation of the SetEvent solution 00339 // discussed in that article. This solution has the following issues: 00340 // 1. Unfairness - ie. First thread to call wait may not be first thread 00341 // to be released from condition. 00342 // 2. Incorrectness due to a race condition when a broadcast occurs 00343 // (see the link for more details on these issues) 00344 // 00345 // There is a solution that corrects these two problem, but also introduces 2 more. 00346 // This solution (also discussed in the link) requires the use of a primitive only 00347 // available in WinNT and above. It also requires that the Mutex passed in be 00348 // implemented using windows Mutexes instead of CriticalSections - they are less 00349 // efficient. Thus the problems with this SignalObjectAndWait solution are: 00350 // 1. Not portable to all versions of windows - ie. will not work with Win98/Me 00351 // 2. Less efficient than tthe SetEvent solution 00352 // 00353 // I have choosen to stick with the SetEvent Solution for the following reasons: 00354 // 1. Speed is important. 00355 // 2. The Unfairness issue is not really a big problem since the stack currently 00356 // does not call a wait function from two different threads. (assuming the 00357 // hosting application always calls process() from the same thread). The only 00358 // time multi-threading comes into the picture is when the transports queue 00359 // messages from the wire onto the stateMacFifo - but they are retrieved off the 00360 // Fifo by a single thread. 00361 // 3. The Incorrectness issue is also not a big problem, since the stack currently 00362 // doesn't use the broadcast member of this class. 00363 // 00364 // Note: The implementation of broadcast remains incomplete - since it is currently 00365 // unused and would require an additional CriticalSection Enter and Leave to 00366 // keep track of a counter (see the above link for more info). This can be 00367 // easily added in the future if required. 00368 mutex.unlock(); 00369 DWORD ret = WaitForSingleObject(mId, ms); 00370 mutex.lock(); 00371 assert(ret != WAIT_FAILED); 00372 return (ret == WAIT_OBJECT_0); 00373 # endif 00374 #else // WIN32 00375 UInt64 expires64 = Timer::getTimeMs() + ms; 00376 timespec expiresTS; 00377 expiresTS.tv_sec = expires64 / 1000; 00378 expiresTS.tv_nsec = (expires64 % 1000) * 1000000L; 00379 00380 assert( expiresTS.tv_nsec < 1000000000L ); 00381 00382 //std::cerr << "Condition::wait " << mutex << "ms=" << ms << " expire=" << expiresTS.tv_sec << " " << expiresTS.tv_nsec << std::endl; 00383 int ret = pthread_cond_timedwait(&mId, mutex.getId(), &expiresTS); 00384 00385 if (ret == EINTR || ret == ETIMEDOUT) 00386 { 00387 return false; 00388 } 00389 else 00390 { 00391 //std::cerr << this << " pthread_cond_timedwait failed " << ret << " mutex=" << mutex << std::endl; 00392 (void)ret; 00393 assert( ret == 0 ); 00394 return true; 00395 } 00396 #endif // not WIN32 00397 } 00398 00399 bool 00400 Condition::wait (Mutex* mutex, unsigned int ms) 00401 { 00402 return this->wait(*mutex, ms); 00403 } 00404 00405 void 00406 Condition::signal () 00407 { 00408 #ifdef WIN32 00409 # ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX 00410 unsigned signals = 0; 00411 00412 int res = 0; 00413 res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE); 00414 assert(res == WAIT_OBJECT_0); 00415 00416 if (m_waiting != 0) // the m_gate is already closed 00417 { 00418 if (m_blocked == 0) 00419 { 00420 res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex)); 00421 assert(res); 00422 return; 00423 } 00424 00425 ++m_waiting; 00426 --m_blocked; 00427 signals = 1; 00428 } 00429 else 00430 { 00431 res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE); 00432 assert(res == WAIT_OBJECT_0); 00433 if (m_blocked > m_gone) 00434 { 00435 if (m_gone != 0) 00436 { 00437 m_blocked -= m_gone; 00438 m_gone = 0; 00439 } 00440 signals = m_waiting = 1; 00441 --m_blocked; 00442 } 00443 else 00444 { 00445 res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0); 00446 assert(res); 00447 } 00448 } 00449 00450 res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex)); 00451 assert(res); 00452 00453 if (signals) 00454 { 00455 res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_queue), signals, 0); 00456 assert(res); 00457 } 00458 # else 00459 BOOL ret = SetEvent( 00460 mId // HANDLE hEvent // handle to event object 00461 ); 00462 assert(ret); 00463 # endif 00464 #else 00465 int ret = pthread_cond_signal(&mId); 00466 (void)ret; 00467 assert( ret == 0 ); 00468 #endif 00469 } 00470 00471 00472 void 00473 Condition::broadcast() 00474 { 00475 #ifdef WIN32 00476 # ifdef RESIP_CONDITION_WIN32_CONFORMANCE_TO_POSIX 00477 unsigned signals = 0; 00478 00479 int res = 0; 00480 res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE); 00481 assert(res == WAIT_OBJECT_0); 00482 00483 if (m_waiting != 0) // the m_gate is already closed 00484 { 00485 if (m_blocked == 0) 00486 { 00487 res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex)); 00488 assert(res); 00489 return; 00490 } 00491 00492 m_waiting += (signals = m_blocked); 00493 m_blocked = 0; 00494 } 00495 else 00496 { 00497 res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE); 00498 assert(res == WAIT_OBJECT_0); 00499 if (m_blocked > m_gone) 00500 { 00501 if (m_gone != 0) 00502 { 00503 m_blocked -= m_gone; 00504 m_gone = 0; 00505 } 00506 signals = m_waiting = m_blocked; 00507 m_blocked = 0; 00508 } 00509 else 00510 { 00511 res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0); 00512 assert(res); 00513 } 00514 } 00515 00516 res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex)); 00517 assert(res); 00518 00519 if (signals) 00520 { 00521 res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_queue), signals, 0); 00522 assert(res); 00523 } 00524 # else 00525 assert(0); 00526 # endif 00527 #else 00528 pthread_cond_broadcast(&mId); 00529 #endif 00530 } 00531 00532 /* ==================================================================== 00533 * The Vovida Software License, Version 1.0 00534 * 00535 * Copyright (c) 2000-2005 Vovida Networks, Inc. All rights reserved. 00536 * 00537 * Redistribution and use in source and binary forms, with or without 00538 * modification, are permitted provided that the following conditions 00539 * are met: 00540 * 00541 * 1. Redistributions of source code must retain the above copyright 00542 * notice, this list of conditions and the following disclaimer. 00543 * 00544 * 2. Redistributions in binary form must reproduce the above copyright 00545 * notice, this list of conditions and the following disclaimer in 00546 * the documentation and/or other materials provided with the 00547 * distribution. 00548 * 00549 * 3. The names "VOCAL", "Vovida Open Communication Application Library", 00550 * and "Vovida Open Communication Application Library (VOCAL)" must 00551 * not be used to endorse or promote products derived from this 00552 * software without prior written permission. For written 00553 * permission, please contact vocal@vovida.org. 00554 * 00555 * 4. Products derived from this software may not be called "VOCAL", nor 00556 * may "VOCAL" appear in their name, without prior written 00557 * permission of Vovida Networks, Inc. 00558 * 00559 * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED 00560 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 00561 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND 00562 * NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA 00563 * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES 00564 * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL, 00565 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00566 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00567 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 00568 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00569 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE 00570 * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 00571 * DAMAGE. 00572 * 00573 * ==================================================================== 00574 * 00575 * This software consists of voluntary contributions made by Vovida 00576 * Networks, Inc. and many individuals on behalf of Vovida Networks, 00577 * Inc. For more information on Vovida Networks, Inc., please see 00578 * <http://www.vovida.org/>. 00579 * 00580 */
1.7.5.1