reSIProcate/rutil  9694
AbstractFifo.hxx
Go to the documentation of this file.
00001 #ifndef RESIP_AbstractFifo_hxx
00002 #define RESIP_AbstractFifo_hxx 
00003 
00004 #include <cassert>
00005 #include <deque>
00006 
00007 #include "rutil/Mutex.hxx"
00008 #include "rutil/Condition.hxx"
00009 #include "rutil/Lock.hxx"
00010 #include "rutil/CongestionManager.hxx"
00011 
00012 #include "rutil/compat.hxx"
00013 #include "rutil/Timer.hxx"
00014 
00015 namespace resip
00016 {
00029 class FifoStatsInterface
00030 {
00031    public:
00032          
00033      FifoStatsInterface();
00034      virtual ~FifoStatsInterface();
00035      
00040      virtual time_t expectedWaitTimeMilliSec() const =0;
00041 
00046       virtual time_t getTimeDepth() const = 0;
00047      
00051       virtual size_t getCountDepth() const = 0;
00052 
00057      virtual time_t averageServiceTimeMicroSec() const = 0;
00058 
00069       inline UInt8 getRole() const {return mRole;}
00070 
00076       inline void setRole(UInt8 role) {mRole=role;}
00077 
00084       inline void setDescription(const resip::Data& description)
00085       {
00086          mDescription=description;
00087       }
00088 
00093       virtual const resip::Data& getDescription() const {return mDescription;}
00094 
00095    protected:
00096       Data mDescription;
00097       UInt8 mRole;
00098 };
00099 
00110 #define RESIP_FIFO_NOWAIT       -1
00111 #define RESIP_FIFO_FOREVER      0
00112 
00124 template <typename T>
00125 class AbstractFifo : public FifoStatsInterface
00126 {
00127    public:
00132       AbstractFifo()
00133          : FifoStatsInterface(),
00134             mLastSampleTakenMicroSec(0),
00135             mCounter(0),
00136             mAverageServiceTimeMicroSec(0),
00137             mSize(0)
00138       {}
00139 
00140       virtual ~AbstractFifo()
00141       {
00142       }
00143 
00148       bool empty() const
00149       {
00150          Lock lock(mMutex); (void)lock;
00151          return mFifo.empty();
00152       }
00153 
00161       virtual unsigned int size() const
00162       {
00163          Lock lock(mMutex); (void)lock;
00164          return (unsigned int)mFifo.size();
00165       }
00166 
00172       bool messageAvailable() const
00173       {
00174          Lock lock(mMutex); (void)lock;
00175          return !mFifo.empty();
00176       }
00177 
00183       virtual time_t getTimeDepth() const
00184       {
00185          return 0;
00186       }
00187 
00188       virtual size_t getCountDepth() const
00189       {
00190          return mSize;
00191       }
00192 
00193       virtual time_t expectedWaitTimeMilliSec() const
00194       {
00195          return ((mAverageServiceTimeMicroSec*mSize)+500)/1000;
00196       }
00197 
00198       virtual time_t averageServiceTimeMicroSec() const
00199       {
00200          return mAverageServiceTimeMicroSec;
00201       }
00202 
00204       virtual void clear() {};
00205 
00206    protected:
00216       T getNext()
00217       {
00218          Lock lock(mMutex); (void)lock;
00219          onFifoPolled();
00220 
00221          // Wait util there are messages available.
00222          while (mFifo.empty())
00223          {
00224             mCondition.wait(mMutex);
00225          }
00226 
00227          // Return the first message on the fifo.
00228          //
00229          T firstMessage(mFifo.front());
00230          mFifo.pop_front();
00231          onMessagePopped();
00232          return firstMessage;
00233       }
00234 
00235 
00245       bool getNext(int ms, T& toReturn)
00246       {
00247          if(ms == 0) 
00248          {
00249             toReturn = getNext();
00250             return true;
00251          }
00252 
00253          if(ms < 0)
00254          {
00255             Lock lock(mMutex); (void)lock;
00256             onFifoPolled();
00257             if (mFifo.empty())  // WATCHOUT: Do not test mSize instead
00258               return false;
00259             toReturn = mFifo.front();
00260             mFifo.pop_front();
00261             return true;
00262          }
00263 
00264          const UInt64 begin(Timer::getTimeMs());
00265          const UInt64 end(begin + (unsigned int)(ms)); // !kh! ms should've been unsigned :(
00266          Lock lock(mMutex); (void)lock;
00267          onFifoPolled();
00268 
00269          // Wait until there are messages available
00270          while (mFifo.empty())
00271          {
00272             if(ms==0)
00273             {
00274                return false;
00275             }
00276             const UInt64 now(Timer::getTimeMs());
00277             if(now >= end)
00278             {
00279                 return false;
00280             }
00281       
00282             unsigned int timeout((unsigned int)(end - now));
00283                     
00284             // bail if total wait time exceeds limit
00285             bool signaled = mCondition.wait(mMutex, timeout);
00286             if (!signaled)
00287             {
00288                return false;
00289             }
00290          }
00291       
00292          // Return the first message on the fifo.
00293          //
00294          toReturn=mFifo.front();
00295          mFifo.pop_front();
00296          onMessagePopped();
00297          return true;
00298       }
00299 
00300       typedef std::deque<T> Messages;
00301 
00302       void getMultiple(Messages& other, unsigned int max)
00303       {
00304          Lock lock(mMutex); (void)lock;
00305          onFifoPolled();
00306          assert(other.empty());
00307          while (mFifo.empty())
00308          {
00309             mCondition.wait(mMutex);
00310          }
00311 
00312          if(mFifo.size() <= max)
00313          {
00314             std::swap(mFifo, other);
00315             onMessagePopped(mSize);
00316          }
00317          else
00318          {
00319             size_t num=max;
00320             while( 0 != max-- )
00321             {
00322                other.push_back(mFifo.front());
00323                mFifo.pop_front();
00324             }
00325             onMessagePopped((unsigned int)num);
00326          }
00327       }
00328 
00329       bool getMultiple(int ms, Messages& other, unsigned int max)
00330       {
00331          if(ms==0)
00332          {
00333             getMultiple(other,max);
00334             return true;
00335          }
00336 
00337          assert(other.empty());
00338          const UInt64 begin(Timer::getTimeMs());
00339          const UInt64 end(begin + (unsigned int)(ms)); // !kh! ms should've been unsigned :(
00340          Lock lock(mMutex); (void)lock;
00341          onFifoPolled();
00342 
00343          // Wait until there are messages available
00344          while (mFifo.empty())
00345          {
00346             if(ms < 0)
00347             {
00348                return false;
00349             }
00350             const UInt64 now(Timer::getTimeMs());
00351             if(now >= end)
00352             {
00353                 return false;
00354             }
00355 
00356             unsigned int timeout((unsigned int)(end - now));
00357                     
00358             // bail if total wait time exceeds limit
00359             bool signaled = mCondition.wait(mMutex, timeout);
00360             if (!signaled)
00361             {
00362                return false;
00363             }
00364          }
00365 
00366          if(mFifo.size() <= max)
00367          {
00368             std::swap(mFifo, other);
00369             onMessagePopped(mSize);
00370          }
00371          else
00372          {
00373             size_t num=max;
00374             while( 0 != max-- )
00375             {
00376                other.push_back(mFifo.front());
00377                mFifo.pop_front();
00378             }
00379             onMessagePopped((unsigned int)num);
00380          }
00381          return true;
00382       }
00383 
00384       size_t add(const T& item)
00385       {
00386          Lock lock(mMutex); (void)lock;
00387          mFifo.push_back(item);
00388          mCondition.signal();
00389          onMessagePushed(1);
00390          return mFifo.size();
00391       }
00392 
00393       size_t addMultiple(Messages& items)
00394       {
00395          Lock lock(mMutex); (void)lock;
00396          size_t size=items.size();
00397          if(mFifo.empty())
00398          {
00399             std::swap(mFifo, items);
00400          }
00401          else
00402          {
00403             // I suppose it is possible to optimize this as a push_front() from
00404             // mFifo to items, and then do a swap, if items is larger.
00405             while(!items.empty())
00406             {
00407                mFifo.push_back(items.front());
00408                items.pop_front();
00409             }
00410          }
00411          mCondition.signal();
00412          onMessagePushed((int)size);
00413          return mFifo.size();
00414       }
00415 
00417       Messages mFifo;
00419       mutable Mutex mMutex;
00421       Condition mCondition;
00422 
00423       mutable UInt64 mLastSampleTakenMicroSec;
00424       mutable UInt32 mCounter;
00425       mutable UInt32 mAverageServiceTimeMicroSec;
00426       // std::deque has to perform some amount of traversal to calculate its 
00427       // size; we maintain this count so that it can be queried without locking, 
00428       // in situations where it being off by a small amount is ok.
00429       UInt32 mSize;
00430 
00431       virtual void onFifoPolled()
00432       {
00433          // !bwc! TODO allow this sampling frequency to be tweaked
00434          if(mLastSampleTakenMicroSec &&
00435             mCounter &&
00436             (mCounter >= 64 || mFifo.empty()))
00437          {
00438             UInt64 now(Timer::getTimeMicroSec());
00439             UInt64 diff = now-mLastSampleTakenMicroSec;
00440 
00441             if(mCounter >= 4096)
00442             {
00443                mAverageServiceTimeMicroSec=(UInt32)resipIntDiv(diff, mCounter);
00444             }
00445             else // fifo got emptied; merge into a rolling average
00446             {
00447                // .bwc. This is a moving average with period 64, round to 
00448                // nearest int.
00449                mAverageServiceTimeMicroSec=(UInt32)resipIntDiv(
00450                      diff+((4096-mCounter)*mAverageServiceTimeMicroSec),
00451                      4096U);
00452             }
00453             mCounter=0;
00454             if(mFifo.empty())
00455             {
00456                mLastSampleTakenMicroSec=0;
00457             }
00458             else
00459             {
00460                mLastSampleTakenMicroSec=now;
00461             }
00462          }
00463       }
00464 
00469       virtual void onMessagePopped(unsigned int num=1)
00470       {
00471          mCounter+=num;
00472          mSize-=num;
00473       }
00474 
00475       virtual void onMessagePushed(int num)
00476       {
00477          if(mSize==0)
00478          {
00479             // Fifo went from empty to non-empty. Take a timestamp, and record
00480             // how long it takes to process some messages.
00481             mLastSampleTakenMicroSec=Timer::getTimeMicroSec();
00482          }
00483          mSize+=num;
00484       }
00485    private:
00486       // no value semantics
00487       AbstractFifo(const AbstractFifo&);
00488       AbstractFifo& operator=(const AbstractFifo&);
00489 };
00490 
00491 } // namespace resip
00492 
00493 #endif
00494 
00495 /* ====================================================================
00496  * The Vovida Software License, Version 1.0 
00497  * 
00498  * Redistribution and use in source and binary forms, with or without
00499  * modification, are permitted provided that the following conditions
00500  * are met:
00501  * 
00502  * 1. Redistributions of source code must retain the above copyright
00503  *    notice, this list of conditions and the following disclaimer.
00504  * 
00505  * 2. Redistributions in binary form must reproduce the above copyright
00506  *    notice, this list of conditions and the following disclaimer in
00507  *    the documentation and/or other materials provided with the
00508  *    distribution.
00509  * 
00510  * 3. The names "VOCAL", "Vovida Open Communication Application Library",
00511  *    and "Vovida Open Communication Application Library (VOCAL)" must
00512  *    not be used to endorse or promote products derived from this
00513  *    software without prior written permission. For written
00514  *    permission, please contact vocal@vovida.org.
00515  *
00516  * 4. Products derived from this software may not be called "VOCAL", nor
00517  *    may "VOCAL" appear in their name, without prior written
00518  *    permission of Vovida Networks, Inc.
00519  * 
00520  * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
00521  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
00522  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
00523  * NON-INFRINGEMENT ARE DISCLAIMED.  IN NO EVENT SHALL VOVIDA
00524  * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
00525  * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
00526  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00527  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00528  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
00529  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00530  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
00531  * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
00532  * DAMAGE.
00533  * 
00534  * ====================================================================
00535  * 
00536  * This software consists of voluntary contributions made by Vovida
00537  * Networks, Inc. and many individuals on behalf of Vovida Networks,
00538  * Inc.  For more information on Vovida Networks, Inc., please see
00539  * <http://www.vovida.org/>.
00540  *
00541  */