|
reSIProcate/rutil
9694
|
00001 #ifndef RESIP_AbstractFifo_hxx 00002 #define RESIP_AbstractFifo_hxx 00003 00004 #include <cassert> 00005 #include <deque> 00006 00007 #include "rutil/Mutex.hxx" 00008 #include "rutil/Condition.hxx" 00009 #include "rutil/Lock.hxx" 00010 #include "rutil/CongestionManager.hxx" 00011 00012 #include "rutil/compat.hxx" 00013 #include "rutil/Timer.hxx" 00014 00015 namespace resip 00016 { 00029 class FifoStatsInterface 00030 { 00031 public: 00032 00033 FifoStatsInterface(); 00034 virtual ~FifoStatsInterface(); 00035 00040 virtual time_t expectedWaitTimeMilliSec() const =0; 00041 00046 virtual time_t getTimeDepth() const = 0; 00047 00051 virtual size_t getCountDepth() const = 0; 00052 00057 virtual time_t averageServiceTimeMicroSec() const = 0; 00058 00069 inline UInt8 getRole() const {return mRole;} 00070 00076 inline void setRole(UInt8 role) {mRole=role;} 00077 00084 inline void setDescription(const resip::Data& description) 00085 { 00086 mDescription=description; 00087 } 00088 00093 virtual const resip::Data& getDescription() const {return mDescription;} 00094 00095 protected: 00096 Data mDescription; 00097 UInt8 mRole; 00098 }; 00099 00110 #define RESIP_FIFO_NOWAIT -1 00111 #define RESIP_FIFO_FOREVER 0 00112 00124 template <typename T> 00125 class AbstractFifo : public FifoStatsInterface 00126 { 00127 public: 00132 AbstractFifo() 00133 : FifoStatsInterface(), 00134 mLastSampleTakenMicroSec(0), 00135 mCounter(0), 00136 mAverageServiceTimeMicroSec(0), 00137 mSize(0) 00138 {} 00139 00140 virtual ~AbstractFifo() 00141 { 00142 } 00143 00148 bool empty() const 00149 { 00150 Lock lock(mMutex); (void)lock; 00151 return mFifo.empty(); 00152 } 00153 00161 virtual unsigned int size() const 00162 { 00163 Lock lock(mMutex); (void)lock; 00164 return (unsigned int)mFifo.size(); 00165 } 00166 00172 bool messageAvailable() const 00173 { 00174 Lock lock(mMutex); (void)lock; 00175 return !mFifo.empty(); 00176 } 00177 00183 virtual time_t getTimeDepth() const 00184 { 00185 return 0; 00186 } 00187 00188 virtual size_t getCountDepth() const 00189 { 00190 return mSize; 00191 } 00192 00193 virtual time_t expectedWaitTimeMilliSec() const 00194 { 00195 return ((mAverageServiceTimeMicroSec*mSize)+500)/1000; 00196 } 00197 00198 virtual time_t averageServiceTimeMicroSec() const 00199 { 00200 return mAverageServiceTimeMicroSec; 00201 } 00202 00204 virtual void clear() {}; 00205 00206 protected: 00216 T getNext() 00217 { 00218 Lock lock(mMutex); (void)lock; 00219 onFifoPolled(); 00220 00221 // Wait util there are messages available. 00222 while (mFifo.empty()) 00223 { 00224 mCondition.wait(mMutex); 00225 } 00226 00227 // Return the first message on the fifo. 00228 // 00229 T firstMessage(mFifo.front()); 00230 mFifo.pop_front(); 00231 onMessagePopped(); 00232 return firstMessage; 00233 } 00234 00235 00245 bool getNext(int ms, T& toReturn) 00246 { 00247 if(ms == 0) 00248 { 00249 toReturn = getNext(); 00250 return true; 00251 } 00252 00253 if(ms < 0) 00254 { 00255 Lock lock(mMutex); (void)lock; 00256 onFifoPolled(); 00257 if (mFifo.empty()) // WATCHOUT: Do not test mSize instead 00258 return false; 00259 toReturn = mFifo.front(); 00260 mFifo.pop_front(); 00261 return true; 00262 } 00263 00264 const UInt64 begin(Timer::getTimeMs()); 00265 const UInt64 end(begin + (unsigned int)(ms)); // !kh! ms should've been unsigned :( 00266 Lock lock(mMutex); (void)lock; 00267 onFifoPolled(); 00268 00269 // Wait until there are messages available 00270 while (mFifo.empty()) 00271 { 00272 if(ms==0) 00273 { 00274 return false; 00275 } 00276 const UInt64 now(Timer::getTimeMs()); 00277 if(now >= end) 00278 { 00279 return false; 00280 } 00281 00282 unsigned int timeout((unsigned int)(end - now)); 00283 00284 // bail if total wait time exceeds limit 00285 bool signaled = mCondition.wait(mMutex, timeout); 00286 if (!signaled) 00287 { 00288 return false; 00289 } 00290 } 00291 00292 // Return the first message on the fifo. 00293 // 00294 toReturn=mFifo.front(); 00295 mFifo.pop_front(); 00296 onMessagePopped(); 00297 return true; 00298 } 00299 00300 typedef std::deque<T> Messages; 00301 00302 void getMultiple(Messages& other, unsigned int max) 00303 { 00304 Lock lock(mMutex); (void)lock; 00305 onFifoPolled(); 00306 assert(other.empty()); 00307 while (mFifo.empty()) 00308 { 00309 mCondition.wait(mMutex); 00310 } 00311 00312 if(mFifo.size() <= max) 00313 { 00314 std::swap(mFifo, other); 00315 onMessagePopped(mSize); 00316 } 00317 else 00318 { 00319 size_t num=max; 00320 while( 0 != max-- ) 00321 { 00322 other.push_back(mFifo.front()); 00323 mFifo.pop_front(); 00324 } 00325 onMessagePopped((unsigned int)num); 00326 } 00327 } 00328 00329 bool getMultiple(int ms, Messages& other, unsigned int max) 00330 { 00331 if(ms==0) 00332 { 00333 getMultiple(other,max); 00334 return true; 00335 } 00336 00337 assert(other.empty()); 00338 const UInt64 begin(Timer::getTimeMs()); 00339 const UInt64 end(begin + (unsigned int)(ms)); // !kh! ms should've been unsigned :( 00340 Lock lock(mMutex); (void)lock; 00341 onFifoPolled(); 00342 00343 // Wait until there are messages available 00344 while (mFifo.empty()) 00345 { 00346 if(ms < 0) 00347 { 00348 return false; 00349 } 00350 const UInt64 now(Timer::getTimeMs()); 00351 if(now >= end) 00352 { 00353 return false; 00354 } 00355 00356 unsigned int timeout((unsigned int)(end - now)); 00357 00358 // bail if total wait time exceeds limit 00359 bool signaled = mCondition.wait(mMutex, timeout); 00360 if (!signaled) 00361 { 00362 return false; 00363 } 00364 } 00365 00366 if(mFifo.size() <= max) 00367 { 00368 std::swap(mFifo, other); 00369 onMessagePopped(mSize); 00370 } 00371 else 00372 { 00373 size_t num=max; 00374 while( 0 != max-- ) 00375 { 00376 other.push_back(mFifo.front()); 00377 mFifo.pop_front(); 00378 } 00379 onMessagePopped((unsigned int)num); 00380 } 00381 return true; 00382 } 00383 00384 size_t add(const T& item) 00385 { 00386 Lock lock(mMutex); (void)lock; 00387 mFifo.push_back(item); 00388 mCondition.signal(); 00389 onMessagePushed(1); 00390 return mFifo.size(); 00391 } 00392 00393 size_t addMultiple(Messages& items) 00394 { 00395 Lock lock(mMutex); (void)lock; 00396 size_t size=items.size(); 00397 if(mFifo.empty()) 00398 { 00399 std::swap(mFifo, items); 00400 } 00401 else 00402 { 00403 // I suppose it is possible to optimize this as a push_front() from 00404 // mFifo to items, and then do a swap, if items is larger. 00405 while(!items.empty()) 00406 { 00407 mFifo.push_back(items.front()); 00408 items.pop_front(); 00409 } 00410 } 00411 mCondition.signal(); 00412 onMessagePushed((int)size); 00413 return mFifo.size(); 00414 } 00415 00417 Messages mFifo; 00419 mutable Mutex mMutex; 00421 Condition mCondition; 00422 00423 mutable UInt64 mLastSampleTakenMicroSec; 00424 mutable UInt32 mCounter; 00425 mutable UInt32 mAverageServiceTimeMicroSec; 00426 // std::deque has to perform some amount of traversal to calculate its 00427 // size; we maintain this count so that it can be queried without locking, 00428 // in situations where it being off by a small amount is ok. 00429 UInt32 mSize; 00430 00431 virtual void onFifoPolled() 00432 { 00433 // !bwc! TODO allow this sampling frequency to be tweaked 00434 if(mLastSampleTakenMicroSec && 00435 mCounter && 00436 (mCounter >= 64 || mFifo.empty())) 00437 { 00438 UInt64 now(Timer::getTimeMicroSec()); 00439 UInt64 diff = now-mLastSampleTakenMicroSec; 00440 00441 if(mCounter >= 4096) 00442 { 00443 mAverageServiceTimeMicroSec=(UInt32)resipIntDiv(diff, mCounter); 00444 } 00445 else // fifo got emptied; merge into a rolling average 00446 { 00447 // .bwc. This is a moving average with period 64, round to 00448 // nearest int. 00449 mAverageServiceTimeMicroSec=(UInt32)resipIntDiv( 00450 diff+((4096-mCounter)*mAverageServiceTimeMicroSec), 00451 4096U); 00452 } 00453 mCounter=0; 00454 if(mFifo.empty()) 00455 { 00456 mLastSampleTakenMicroSec=0; 00457 } 00458 else 00459 { 00460 mLastSampleTakenMicroSec=now; 00461 } 00462 } 00463 } 00464 00469 virtual void onMessagePopped(unsigned int num=1) 00470 { 00471 mCounter+=num; 00472 mSize-=num; 00473 } 00474 00475 virtual void onMessagePushed(int num) 00476 { 00477 if(mSize==0) 00478 { 00479 // Fifo went from empty to non-empty. Take a timestamp, and record 00480 // how long it takes to process some messages. 00481 mLastSampleTakenMicroSec=Timer::getTimeMicroSec(); 00482 } 00483 mSize+=num; 00484 } 00485 private: 00486 // no value semantics 00487 AbstractFifo(const AbstractFifo&); 00488 AbstractFifo& operator=(const AbstractFifo&); 00489 }; 00490 00491 } // namespace resip 00492 00493 #endif 00494 00495 /* ==================================================================== 00496 * The Vovida Software License, Version 1.0 00497 * 00498 * Redistribution and use in source and binary forms, with or without 00499 * modification, are permitted provided that the following conditions 00500 * are met: 00501 * 00502 * 1. Redistributions of source code must retain the above copyright 00503 * notice, this list of conditions and the following disclaimer. 00504 * 00505 * 2. Redistributions in binary form must reproduce the above copyright 00506 * notice, this list of conditions and the following disclaimer in 00507 * the documentation and/or other materials provided with the 00508 * distribution. 00509 * 00510 * 3. The names "VOCAL", "Vovida Open Communication Application Library", 00511 * and "Vovida Open Communication Application Library (VOCAL)" must 00512 * not be used to endorse or promote products derived from this 00513 * software without prior written permission. For written 00514 * permission, please contact vocal@vovida.org. 00515 * 00516 * 4. Products derived from this software may not be called "VOCAL", nor 00517 * may "VOCAL" appear in their name, without prior written 00518 * permission of Vovida Networks, Inc. 00519 * 00520 * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED 00521 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 00522 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND 00523 * NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA 00524 * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES 00525 * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL, 00526 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00527 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00528 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 00529 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00530 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE 00531 * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 00532 * DAMAGE. 00533 * 00534 * ==================================================================== 00535 * 00536 * This software consists of voluntary contributions made by Vovida 00537 * Networks, Inc. and many individuals on behalf of Vovida Networks, 00538 * Inc. For more information on Vovida Networks, Inc., please see 00539 * <http://www.vovida.org/>. 00540 * 00541 */
1.7.5.1