reSIProcate/rutil  9694
Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Private Member Functions
resip::AbstractFifo< T > Class Template Reference

The base class from which various templated Fifo classes are derived. More...

#include <AbstractFifo.hxx>

Inheritance diagram for resip::AbstractFifo< T >:
Inheritance graph
[legend]
Collaboration diagram for resip::AbstractFifo< T >:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 AbstractFifo ()
 Constructor.
virtual ~AbstractFifo ()
bool empty () const
 is the queue empty?
virtual unsigned int size () const
 get the current size of the fifo.
bool messageAvailable () const
 is a message available?
virtual time_t getTimeDepth () const
 computes the time delta between the oldest and newest queue members
virtual size_t getCountDepth () const
 Returns the number of elements in the FIFO.
virtual time_t expectedWaitTimeMilliSec () const
 Returns the expected time it will take to service all messages currently in the queue (in milli-seconds)
virtual time_t averageServiceTimeMicroSec () const
 Returns the average time it takes for individual messages to be serviced (in micro-seconds)
virtual void clear ()
 remove all elements in the queue (or not)

Protected Types

typedef std::deque< T > Messages

Protected Member Functions

getNext ()
 Returns the first message available.
bool getNext (int ms, T &toReturn)
 Returns the next message available.
void getMultiple (Messages &other, unsigned int max)
bool getMultiple (int ms, Messages &other, unsigned int max)
size_t add (const T &item)
size_t addMultiple (Messages &items)
virtual void onFifoPolled ()
virtual void onMessagePopped (unsigned int num=1)
 Called when a message (or messages) are removed from this fifo.
virtual void onMessagePushed (int num)

Protected Attributes

Messages mFifo
 container for FIFO items
Mutex mMutex
 access serialization lock
Condition mCondition
 condition for waiting on new queue items
UInt64 mLastSampleTakenMicroSec
UInt32 mCounter
UInt32 mAverageServiceTimeMicroSec
UInt32 mSize

Private Member Functions

 AbstractFifo (const AbstractFifo &)
AbstractFifooperator= (const AbstractFifo &)

Detailed Description

template<typename T>
class resip::AbstractFifo< T >

The base class from which various templated Fifo classes are derived.

(aka template hoist) AbstractFifo's get operations are all threadsafe; AbstractFifo does not define any put operations (these are defined in subclasses).

Note:
Users of the resip stack will not need to interact with this class directly in most cases. Look at Fifo and TimeLimitFifo instead.

Definition at line 125 of file AbstractFifo.hxx.


Member Typedef Documentation

template<typename T>
typedef std::deque<T> resip::AbstractFifo< T >::Messages [protected]

Reimplemented in resip::Fifo< Msg >, resip::Fifo< Command >, and resip::Fifo< T >.

Definition at line 300 of file AbstractFifo.hxx.


Constructor & Destructor Documentation

template<typename T>
resip::AbstractFifo< T >::AbstractFifo ( ) [inline]

Constructor.

Parameters:
maxSizemax number of messages to keep

Definition at line 132 of file AbstractFifo.hxx.

template<typename T>
virtual resip::AbstractFifo< T >::~AbstractFifo ( ) [inline, virtual]

Definition at line 140 of file AbstractFifo.hxx.

      {
      }
template<typename T>
resip::AbstractFifo< T >::AbstractFifo ( const AbstractFifo< T > &  ) [private]

Member Function Documentation

template<typename T>
size_t resip::AbstractFifo< T >::add ( const T &  item) [inline, protected]

Definition at line 384 of file AbstractFifo.hxx.

Referenced by resip::Fifo< Msg >::add().

      {
         Lock lock(mMutex); (void)lock;
         mFifo.push_back(item);
         mCondition.signal();
         onMessagePushed(1);
         return mFifo.size();
      }
template<typename T>
size_t resip::AbstractFifo< T >::addMultiple ( Messages items) [inline, protected]

Reimplemented in resip::Fifo< Command >, and resip::Fifo< T >.

Definition at line 393 of file AbstractFifo.hxx.

Referenced by resip::Fifo< Msg >::addMultiple().

      {
         Lock lock(mMutex); (void)lock;
         size_t size=items.size();
         if(mFifo.empty())
         {
            std::swap(mFifo, items);
         }
         else
         {
            // I suppose it is possible to optimize this as a push_front() from
            // mFifo to items, and then do a swap, if items is larger.
            while(!items.empty())
            {
               mFifo.push_back(items.front());
               items.pop_front();
            }
         }
         mCondition.signal();
         onMessagePushed((int)size);
         return mFifo.size();
      }
template<typename T>
virtual time_t resip::AbstractFifo< T >::averageServiceTimeMicroSec ( ) const [inline, virtual]

Returns the average time it takes for individual messages to be serviced (in micro-seconds)

Implements resip::FifoStatsInterface.

Definition at line 198 of file AbstractFifo.hxx.

template<typename T>
virtual void resip::AbstractFifo< T >::clear ( ) [inline, virtual]

remove all elements in the queue (or not)

Reimplemented in resip::TimeLimitFifo< Msg >, resip::TimeLimitFifo< Foo >, resip::Fifo< Msg >, resip::Fifo< Command >, and resip::Fifo< T >.

Definition at line 204 of file AbstractFifo.hxx.

{};
template<typename T>
bool resip::AbstractFifo< T >::empty ( ) const [inline]

is the queue empty?

Returns:
true if the queue is empty and false otherwise

Definition at line 148 of file AbstractFifo.hxx.

Referenced by resip::ConsumerFifoBuffer< T >::getNext(), main(), and resip::ConsumerFifoBuffer< T >::messageAvailable().

      {
         Lock lock(mMutex); (void)lock;
         return mFifo.empty();
      }
template<typename T>
virtual time_t resip::AbstractFifo< T >::expectedWaitTimeMilliSec ( ) const [inline, virtual]

Returns the expected time it will take to service all messages currently in the queue (in milli-seconds)

Implements resip::FifoStatsInterface.

Definition at line 193 of file AbstractFifo.hxx.

      {
         return ((mAverageServiceTimeMicroSec*mSize)+500)/1000;
      }
template<typename T>
virtual size_t resip::AbstractFifo< T >::getCountDepth ( ) const [inline, virtual]

Returns the number of elements in the FIFO.

Implements resip::FifoStatsInterface.

Reimplemented in resip::TimeLimitFifo< Msg >, and resip::TimeLimitFifo< Foo >.

Definition at line 188 of file AbstractFifo.hxx.

      {
         return mSize;
      }
template<typename T>
void resip::AbstractFifo< T >::getMultiple ( Messages other,
unsigned int  max 
) [inline, protected]

Reimplemented in resip::Fifo< Command >, and resip::Fifo< T >.

Definition at line 302 of file AbstractFifo.hxx.

Referenced by resip::Fifo< Msg >::getMultiple(), and resip::AbstractFifo< Timestamped< Msg * > >::getMultiple().

      {
         Lock lock(mMutex); (void)lock;
         onFifoPolled();
         assert(other.empty());
         while (mFifo.empty())
         {
            mCondition.wait(mMutex);
         }

         if(mFifo.size() <= max)
         {
            std::swap(mFifo, other);
            onMessagePopped(mSize);
         }
         else
         {
            size_t num=max;
            while( 0 != max-- )
            {
               other.push_back(mFifo.front());
               mFifo.pop_front();
            }
            onMessagePopped((unsigned int)num);
         }
      }
template<typename T>
bool resip::AbstractFifo< T >::getMultiple ( int  ms,
Messages other,
unsigned int  max 
) [inline, protected]

Reimplemented in resip::Fifo< Command >, and resip::Fifo< T >.

Definition at line 329 of file AbstractFifo.hxx.

      {
         if(ms==0)
         {
            getMultiple(other,max);
            return true;
         }

         assert(other.empty());
         const UInt64 begin(Timer::getTimeMs());
         const UInt64 end(begin + (unsigned int)(ms)); // !kh! ms should've been unsigned :(
         Lock lock(mMutex); (void)lock;
         onFifoPolled();

         // Wait until there are messages available
         while (mFifo.empty())
         {
            if(ms < 0)
            {
               return false;
            }
            const UInt64 now(Timer::getTimeMs());
            if(now >= end)
            {
                return false;
            }

            unsigned int timeout((unsigned int)(end - now));
                    
            // bail if total wait time exceeds limit
            bool signaled = mCondition.wait(mMutex, timeout);
            if (!signaled)
            {
               return false;
            }
         }

         if(mFifo.size() <= max)
         {
            std::swap(mFifo, other);
            onMessagePopped(mSize);
         }
         else
         {
            size_t num=max;
            while( 0 != max-- )
            {
               other.push_back(mFifo.front());
               mFifo.pop_front();
            }
            onMessagePopped((unsigned int)num);
         }
         return true;
      }
template<typename T>
T resip::AbstractFifo< T >::getNext ( ) [inline, protected]

Returns the first message available.

Returns the first message available. It will wait if no messages are available. If a signal interrupts the wait, it will retry the wait. Signals can therefore not be caught via getNext. If you need to detect a signal, use block prior to calling getNext.

Returns:
the first message available

Reimplemented in resip::TimeLimitFifo< Msg >, resip::TimeLimitFifo< Foo >, resip::FiniteFifo< Msg >, resip::Fifo< Msg >, resip::Fifo< Command >, and resip::Fifo< T >.

Definition at line 216 of file AbstractFifo.hxx.

Referenced by resip::Fifo< Msg >::getNext(), resip::FiniteFifo< Msg >::getNext(), and resip::AbstractFifo< Timestamped< Msg * > >::getNext().

      {
         Lock lock(mMutex); (void)lock;
         onFifoPolled();

         // Wait util there are messages available.
         while (mFifo.empty())
         {
            mCondition.wait(mMutex);
         }

         // Return the first message on the fifo.
         //
         T firstMessage(mFifo.front());
         mFifo.pop_front();
         onMessagePopped();
         return firstMessage;
      }
template<typename T>
bool resip::AbstractFifo< T >::getNext ( int  ms,
T &  toReturn 
) [inline, protected]

Returns the next message available.

Returns the next message available. Will wait up to ms milliseconds if no information is available. If the specified time passes or a signal interrupts the wait, this method returns 0. This interface provides no mechanism to distinguish between timeout and interrupt.

Definition at line 245 of file AbstractFifo.hxx.

      {
         if(ms == 0) 
         {
            toReturn = getNext();
            return true;
         }

         if(ms < 0)
         {
            Lock lock(mMutex); (void)lock;
            onFifoPolled();
            if (mFifo.empty())  // WATCHOUT: Do not test mSize instead
              return false;
            toReturn = mFifo.front();
            mFifo.pop_front();
            return true;
         }

         const UInt64 begin(Timer::getTimeMs());
         const UInt64 end(begin + (unsigned int)(ms)); // !kh! ms should've been unsigned :(
         Lock lock(mMutex); (void)lock;
         onFifoPolled();

         // Wait until there are messages available
         while (mFifo.empty())
         {
            if(ms==0)
            {
               return false;
            }
            const UInt64 now(Timer::getTimeMs());
            if(now >= end)
            {
                return false;
            }
      
            unsigned int timeout((unsigned int)(end - now));
                    
            // bail if total wait time exceeds limit
            bool signaled = mCondition.wait(mMutex, timeout);
            if (!signaled)
            {
               return false;
            }
         }
      
         // Return the first message on the fifo.
         //
         toReturn=mFifo.front();
         mFifo.pop_front();
         onMessagePopped();
         return true;
      }
template<typename T>
virtual time_t resip::AbstractFifo< T >::getTimeDepth ( ) const [inline, virtual]

computes the time delta between the oldest and newest queue members

Note:
defaults to zero, overridden by TimeLimitFifo<T>
Returns:
the time delta between the oldest and newest queue members

Implements resip::FifoStatsInterface.

Reimplemented in resip::TimeLimitFifo< Msg >, and resip::TimeLimitFifo< Foo >.

Definition at line 183 of file AbstractFifo.hxx.

      {
         return 0;
      }
template<typename T>
bool resip::AbstractFifo< T >::messageAvailable ( ) const [inline]

is a message available?

Return values:
trueif a message is available and false otherwise

Definition at line 172 of file AbstractFifo.hxx.

Referenced by resip::ConsumerFifoBuffer< T >::messageAvailable(), resip::DnsStub::processFifo(), and Consumer::thread().

      {
         Lock lock(mMutex); (void)lock;
         return !mFifo.empty();
      }
template<typename T>
virtual void resip::AbstractFifo< T >::onFifoPolled ( ) [inline, protected, virtual]

Definition at line 431 of file AbstractFifo.hxx.

Referenced by resip::AbstractFifo< Timestamped< Msg * > >::getMultiple(), and resip::AbstractFifo< Timestamped< Msg * > >::getNext().

      {
         // !bwc! TODO allow this sampling frequency to be tweaked
         if(mLastSampleTakenMicroSec &&
            mCounter &&
            (mCounter >= 64 || mFifo.empty()))
         {
            UInt64 now(Timer::getTimeMicroSec());
            UInt64 diff = now-mLastSampleTakenMicroSec;

            if(mCounter >= 4096)
            {
               mAverageServiceTimeMicroSec=(UInt32)resipIntDiv(diff, mCounter);
            }
            else // fifo got emptied; merge into a rolling average
            {
               // .bwc. This is a moving average with period 64, round to 
               // nearest int.
               mAverageServiceTimeMicroSec=(UInt32)resipIntDiv(
                     diff+((4096-mCounter)*mAverageServiceTimeMicroSec),
                     4096U);
            }
            mCounter=0;
            if(mFifo.empty())
            {
               mLastSampleTakenMicroSec=0;
            }
            else
            {
               mLastSampleTakenMicroSec=now;
            }
         }
      }
template<typename T>
virtual void resip::AbstractFifo< T >::onMessagePopped ( unsigned int  num = 1) [inline, protected, virtual]

Called when a message (or messages) are removed from this fifo.

Used to drive service time calculations.

Definition at line 469 of file AbstractFifo.hxx.

Referenced by resip::AbstractFifo< Timestamped< Msg * > >::getMultiple(), and resip::AbstractFifo< Timestamped< Msg * > >::getNext().

      {
         mCounter+=num;
         mSize-=num;
      }
template<typename T>
virtual void resip::AbstractFifo< T >::onMessagePushed ( int  num) [inline, protected, virtual]

Definition at line 475 of file AbstractFifo.hxx.

Referenced by resip::AbstractFifo< Timestamped< Msg * > >::add(), and resip::AbstractFifo< Timestamped< Msg * > >::addMultiple().

      {
         if(mSize==0)
         {
            // Fifo went from empty to non-empty. Take a timestamp, and record
            // how long it takes to process some messages.
            mLastSampleTakenMicroSec=Timer::getTimeMicroSec();
         }
         mSize+=num;
      }
template<typename T>
AbstractFifo& resip::AbstractFifo< T >::operator= ( const AbstractFifo< T > &  ) [private]
template<typename T>
virtual unsigned int resip::AbstractFifo< T >::size ( ) const [inline, virtual]

get the current size of the fifo.

Note:
Note you should not use this function to determine whether a call to getNext() will block or not. Use messageAvailable() instead.
Returns:
the number of messages in the queue

Definition at line 161 of file AbstractFifo.hxx.

Referenced by resip::ProducerFifoBuffer< T >::add(), resip::AbstractFifo< Timestamped< Msg * > >::addMultiple(), resip::DnsStub::getTimeTillNextProcessMS(), main(), resip::ProducerFifoBuffer< T >::setBufferSize(), resip::ConsumerFifoBuffer< T >::size(), Consumer::thread(), and Producer::thread().

      {
         Lock lock(mMutex); (void)lock;
         return (unsigned int)mFifo.size();
      }

Member Data Documentation

template<typename T>
UInt32 resip::AbstractFifo< T >::mAverageServiceTimeMicroSec [mutable, protected]
template<typename T>
Condition resip::AbstractFifo< T >::mCondition [protected]
template<typename T>
UInt32 resip::AbstractFifo< T >::mCounter [mutable, protected]
template<typename T>
Messages resip::AbstractFifo< T >::mFifo [protected]
template<typename T>
UInt64 resip::AbstractFifo< T >::mLastSampleTakenMicroSec [mutable, protected]
template<typename T>
Mutex resip::AbstractFifo< T >::mMutex [mutable, protected]
template<typename T>
UInt32 resip::AbstractFifo< T >::mSize [protected]

The documentation for this class was generated from the following file: