|
reSIProcate/rutil
9694
|
The base class from which various templated Fifo classes are derived. More...
#include <AbstractFifo.hxx>


Public Member Functions | |
| AbstractFifo () | |
| Constructor. | |
| virtual | ~AbstractFifo () |
| bool | empty () const |
| is the queue empty? | |
| virtual unsigned int | size () const |
| get the current size of the fifo. | |
| bool | messageAvailable () const |
| is a message available? | |
| virtual time_t | getTimeDepth () const |
| computes the time delta between the oldest and newest queue members | |
| virtual size_t | getCountDepth () const |
| Returns the number of elements in the FIFO. | |
| virtual time_t | expectedWaitTimeMilliSec () const |
| Returns the expected time it will take to service all messages currently in the queue (in milli-seconds) | |
| virtual time_t | averageServiceTimeMicroSec () const |
| Returns the average time it takes for individual messages to be serviced (in micro-seconds) | |
| virtual void | clear () |
| remove all elements in the queue (or not) | |
Protected Types | |
| typedef std::deque< T > | Messages |
Protected Member Functions | |
| T | getNext () |
| Returns the first message available. | |
| bool | getNext (int ms, T &toReturn) |
| Returns the next message available. | |
| void | getMultiple (Messages &other, unsigned int max) |
| bool | getMultiple (int ms, Messages &other, unsigned int max) |
| size_t | add (const T &item) |
| size_t | addMultiple (Messages &items) |
| virtual void | onFifoPolled () |
| virtual void | onMessagePopped (unsigned int num=1) |
| Called when a message (or messages) are removed from this fifo. | |
| virtual void | onMessagePushed (int num) |
Protected Attributes | |
| Messages | mFifo |
| container for FIFO items | |
| Mutex | mMutex |
| access serialization lock | |
| Condition | mCondition |
| condition for waiting on new queue items | |
| UInt64 | mLastSampleTakenMicroSec |
| UInt32 | mCounter |
| UInt32 | mAverageServiceTimeMicroSec |
| UInt32 | mSize |
Private Member Functions | |
| AbstractFifo (const AbstractFifo &) | |
| AbstractFifo & | operator= (const AbstractFifo &) |
The base class from which various templated Fifo classes are derived.
(aka template hoist) AbstractFifo's get operations are all threadsafe; AbstractFifo does not define any put operations (these are defined in subclasses).
Definition at line 125 of file AbstractFifo.hxx.
typedef std::deque<T> resip::AbstractFifo< T >::Messages [protected] |
Reimplemented in resip::Fifo< Msg >, resip::Fifo< Command >, and resip::Fifo< T >.
Definition at line 300 of file AbstractFifo.hxx.
| resip::AbstractFifo< T >::AbstractFifo | ( | ) | [inline] |
Constructor.
| maxSize | max number of messages to keep |
Definition at line 132 of file AbstractFifo.hxx.
: FifoStatsInterface(), mLastSampleTakenMicroSec(0), mCounter(0), mAverageServiceTimeMicroSec(0), mSize(0) {}
| virtual resip::AbstractFifo< T >::~AbstractFifo | ( | ) | [inline, virtual] |
Definition at line 140 of file AbstractFifo.hxx.
{
}
| resip::AbstractFifo< T >::AbstractFifo | ( | const AbstractFifo< T > & | ) | [private] |
| size_t resip::AbstractFifo< T >::add | ( | const T & | item | ) | [inline, protected] |
Definition at line 384 of file AbstractFifo.hxx.
Referenced by resip::Fifo< Msg >::add().
{
Lock lock(mMutex); (void)lock;
mFifo.push_back(item);
mCondition.signal();
onMessagePushed(1);
return mFifo.size();
}
| size_t resip::AbstractFifo< T >::addMultiple | ( | Messages & | items | ) | [inline, protected] |
Reimplemented in resip::Fifo< Command >, and resip::Fifo< T >.
Definition at line 393 of file AbstractFifo.hxx.
Referenced by resip::Fifo< Msg >::addMultiple().
{
Lock lock(mMutex); (void)lock;
size_t size=items.size();
if(mFifo.empty())
{
std::swap(mFifo, items);
}
else
{
// I suppose it is possible to optimize this as a push_front() from
// mFifo to items, and then do a swap, if items is larger.
while(!items.empty())
{
mFifo.push_back(items.front());
items.pop_front();
}
}
mCondition.signal();
onMessagePushed((int)size);
return mFifo.size();
}
| virtual time_t resip::AbstractFifo< T >::averageServiceTimeMicroSec | ( | ) | const [inline, virtual] |
Returns the average time it takes for individual messages to be serviced (in micro-seconds)
Implements resip::FifoStatsInterface.
Definition at line 198 of file AbstractFifo.hxx.
{
return mAverageServiceTimeMicroSec;
}
| virtual void resip::AbstractFifo< T >::clear | ( | ) | [inline, virtual] |
remove all elements in the queue (or not)
Reimplemented in resip::TimeLimitFifo< Msg >, resip::TimeLimitFifo< Foo >, resip::Fifo< Msg >, resip::Fifo< Command >, and resip::Fifo< T >.
Definition at line 204 of file AbstractFifo.hxx.
{};
| bool resip::AbstractFifo< T >::empty | ( | ) | const [inline] |
is the queue empty?
Definition at line 148 of file AbstractFifo.hxx.
Referenced by resip::ConsumerFifoBuffer< T >::getNext(), main(), and resip::ConsumerFifoBuffer< T >::messageAvailable().
| virtual time_t resip::AbstractFifo< T >::expectedWaitTimeMilliSec | ( | ) | const [inline, virtual] |
Returns the expected time it will take to service all messages currently in the queue (in milli-seconds)
Implements resip::FifoStatsInterface.
Definition at line 193 of file AbstractFifo.hxx.
{
return ((mAverageServiceTimeMicroSec*mSize)+500)/1000;
}
| virtual size_t resip::AbstractFifo< T >::getCountDepth | ( | ) | const [inline, virtual] |
Returns the number of elements in the FIFO.
Implements resip::FifoStatsInterface.
Reimplemented in resip::TimeLimitFifo< Msg >, and resip::TimeLimitFifo< Foo >.
Definition at line 188 of file AbstractFifo.hxx.
{
return mSize;
}
| void resip::AbstractFifo< T >::getMultiple | ( | Messages & | other, |
| unsigned int | max | ||
| ) | [inline, protected] |
Reimplemented in resip::Fifo< Command >, and resip::Fifo< T >.
Definition at line 302 of file AbstractFifo.hxx.
Referenced by resip::Fifo< Msg >::getMultiple(), and resip::AbstractFifo< Timestamped< Msg * > >::getMultiple().
{
Lock lock(mMutex); (void)lock;
onFifoPolled();
assert(other.empty());
while (mFifo.empty())
{
mCondition.wait(mMutex);
}
if(mFifo.size() <= max)
{
std::swap(mFifo, other);
onMessagePopped(mSize);
}
else
{
size_t num=max;
while( 0 != max-- )
{
other.push_back(mFifo.front());
mFifo.pop_front();
}
onMessagePopped((unsigned int)num);
}
}
| bool resip::AbstractFifo< T >::getMultiple | ( | int | ms, |
| Messages & | other, | ||
| unsigned int | max | ||
| ) | [inline, protected] |
Reimplemented in resip::Fifo< Command >, and resip::Fifo< T >.
Definition at line 329 of file AbstractFifo.hxx.
{
if(ms==0)
{
getMultiple(other,max);
return true;
}
assert(other.empty());
const UInt64 begin(Timer::getTimeMs());
const UInt64 end(begin + (unsigned int)(ms)); // !kh! ms should've been unsigned :(
Lock lock(mMutex); (void)lock;
onFifoPolled();
// Wait until there are messages available
while (mFifo.empty())
{
if(ms < 0)
{
return false;
}
const UInt64 now(Timer::getTimeMs());
if(now >= end)
{
return false;
}
unsigned int timeout((unsigned int)(end - now));
// bail if total wait time exceeds limit
bool signaled = mCondition.wait(mMutex, timeout);
if (!signaled)
{
return false;
}
}
if(mFifo.size() <= max)
{
std::swap(mFifo, other);
onMessagePopped(mSize);
}
else
{
size_t num=max;
while( 0 != max-- )
{
other.push_back(mFifo.front());
mFifo.pop_front();
}
onMessagePopped((unsigned int)num);
}
return true;
}
| T resip::AbstractFifo< T >::getNext | ( | ) | [inline, protected] |
Returns the first message available.
Returns the first message available. It will wait if no messages are available. If a signal interrupts the wait, it will retry the wait. Signals can therefore not be caught via getNext. If you need to detect a signal, use block prior to calling getNext.
Reimplemented in resip::TimeLimitFifo< Msg >, resip::TimeLimitFifo< Foo >, resip::FiniteFifo< Msg >, resip::Fifo< Msg >, resip::Fifo< Command >, and resip::Fifo< T >.
Definition at line 216 of file AbstractFifo.hxx.
Referenced by resip::Fifo< Msg >::getNext(), resip::FiniteFifo< Msg >::getNext(), and resip::AbstractFifo< Timestamped< Msg * > >::getNext().
{
Lock lock(mMutex); (void)lock;
onFifoPolled();
// Wait util there are messages available.
while (mFifo.empty())
{
mCondition.wait(mMutex);
}
// Return the first message on the fifo.
//
T firstMessage(mFifo.front());
mFifo.pop_front();
onMessagePopped();
return firstMessage;
}
| bool resip::AbstractFifo< T >::getNext | ( | int | ms, |
| T & | toReturn | ||
| ) | [inline, protected] |
Returns the next message available.
Returns the next message available. Will wait up to ms milliseconds if no information is available. If the specified time passes or a signal interrupts the wait, this method returns 0. This interface provides no mechanism to distinguish between timeout and interrupt.
Definition at line 245 of file AbstractFifo.hxx.
{
if(ms == 0)
{
toReturn = getNext();
return true;
}
if(ms < 0)
{
Lock lock(mMutex); (void)lock;
onFifoPolled();
if (mFifo.empty()) // WATCHOUT: Do not test mSize instead
return false;
toReturn = mFifo.front();
mFifo.pop_front();
return true;
}
const UInt64 begin(Timer::getTimeMs());
const UInt64 end(begin + (unsigned int)(ms)); // !kh! ms should've been unsigned :(
Lock lock(mMutex); (void)lock;
onFifoPolled();
// Wait until there are messages available
while (mFifo.empty())
{
if(ms==0)
{
return false;
}
const UInt64 now(Timer::getTimeMs());
if(now >= end)
{
return false;
}
unsigned int timeout((unsigned int)(end - now));
// bail if total wait time exceeds limit
bool signaled = mCondition.wait(mMutex, timeout);
if (!signaled)
{
return false;
}
}
// Return the first message on the fifo.
//
toReturn=mFifo.front();
mFifo.pop_front();
onMessagePopped();
return true;
}
| virtual time_t resip::AbstractFifo< T >::getTimeDepth | ( | ) | const [inline, virtual] |
computes the time delta between the oldest and newest queue members
Implements resip::FifoStatsInterface.
Reimplemented in resip::TimeLimitFifo< Msg >, and resip::TimeLimitFifo< Foo >.
Definition at line 183 of file AbstractFifo.hxx.
{
return 0;
}
| bool resip::AbstractFifo< T >::messageAvailable | ( | ) | const [inline] |
is a message available?
| true | if a message is available and false otherwise |
Definition at line 172 of file AbstractFifo.hxx.
Referenced by resip::ConsumerFifoBuffer< T >::messageAvailable(), resip::DnsStub::processFifo(), and Consumer::thread().
| virtual void resip::AbstractFifo< T >::onFifoPolled | ( | ) | [inline, protected, virtual] |
Definition at line 431 of file AbstractFifo.hxx.
Referenced by resip::AbstractFifo< Timestamped< Msg * > >::getMultiple(), and resip::AbstractFifo< Timestamped< Msg * > >::getNext().
{
// !bwc! TODO allow this sampling frequency to be tweaked
if(mLastSampleTakenMicroSec &&
mCounter &&
(mCounter >= 64 || mFifo.empty()))
{
UInt64 now(Timer::getTimeMicroSec());
UInt64 diff = now-mLastSampleTakenMicroSec;
if(mCounter >= 4096)
{
mAverageServiceTimeMicroSec=(UInt32)resipIntDiv(diff, mCounter);
}
else // fifo got emptied; merge into a rolling average
{
// .bwc. This is a moving average with period 64, round to
// nearest int.
mAverageServiceTimeMicroSec=(UInt32)resipIntDiv(
diff+((4096-mCounter)*mAverageServiceTimeMicroSec),
4096U);
}
mCounter=0;
if(mFifo.empty())
{
mLastSampleTakenMicroSec=0;
}
else
{
mLastSampleTakenMicroSec=now;
}
}
}
| virtual void resip::AbstractFifo< T >::onMessagePopped | ( | unsigned int | num = 1 | ) | [inline, protected, virtual] |
Called when a message (or messages) are removed from this fifo.
Used to drive service time calculations.
Definition at line 469 of file AbstractFifo.hxx.
Referenced by resip::AbstractFifo< Timestamped< Msg * > >::getMultiple(), and resip::AbstractFifo< Timestamped< Msg * > >::getNext().
| virtual void resip::AbstractFifo< T >::onMessagePushed | ( | int | num | ) | [inline, protected, virtual] |
Definition at line 475 of file AbstractFifo.hxx.
Referenced by resip::AbstractFifo< Timestamped< Msg * > >::add(), and resip::AbstractFifo< Timestamped< Msg * > >::addMultiple().
{
if(mSize==0)
{
// Fifo went from empty to non-empty. Take a timestamp, and record
// how long it takes to process some messages.
mLastSampleTakenMicroSec=Timer::getTimeMicroSec();
}
mSize+=num;
}
| AbstractFifo& resip::AbstractFifo< T >::operator= | ( | const AbstractFifo< T > & | ) | [private] |
| virtual unsigned int resip::AbstractFifo< T >::size | ( | ) | const [inline, virtual] |
get the current size of the fifo.
Definition at line 161 of file AbstractFifo.hxx.
Referenced by resip::ProducerFifoBuffer< T >::add(), resip::AbstractFifo< Timestamped< Msg * > >::addMultiple(), resip::DnsStub::getTimeTillNextProcessMS(), main(), resip::ProducerFifoBuffer< T >::setBufferSize(), resip::ConsumerFifoBuffer< T >::size(), Consumer::thread(), and Producer::thread().
UInt32 resip::AbstractFifo< T >::mAverageServiceTimeMicroSec [mutable, protected] |
Condition resip::AbstractFifo< T >::mCondition [protected] |
condition for waiting on new queue items
Definition at line 421 of file AbstractFifo.hxx.
Referenced by resip::AbstractFifo< Timestamped< Msg * > >::add(), resip::AbstractFifo< Timestamped< Msg * > >::addMultiple(), resip::AbstractFifo< Timestamped< Msg * > >::getMultiple(), and resip::AbstractFifo< Timestamped< Msg * > >::getNext().
UInt32 resip::AbstractFifo< T >::mCounter [mutable, protected] |
Definition at line 424 of file AbstractFifo.hxx.
Referenced by resip::AbstractFifo< Timestamped< Msg * > >::onFifoPolled(), and resip::AbstractFifo< Timestamped< Msg * > >::onMessagePopped().
Messages resip::AbstractFifo< T >::mFifo [protected] |
container for FIFO items
Definition at line 417 of file AbstractFifo.hxx.
Referenced by resip::AbstractFifo< Timestamped< Msg * > >::add(), resip::AbstractFifo< Timestamped< Msg * > >::addMultiple(), resip::AbstractFifo< Timestamped< Msg * > >::empty(), resip::AbstractFifo< Timestamped< Msg * > >::getMultiple(), resip::AbstractFifo< Timestamped< Msg * > >::getNext(), resip::AbstractFifo< Timestamped< Msg * > >::messageAvailable(), resip::AbstractFifo< Timestamped< Msg * > >::onFifoPolled(), and resip::AbstractFifo< Timestamped< Msg * > >::size().
UInt64 resip::AbstractFifo< T >::mLastSampleTakenMicroSec [mutable, protected] |
Definition at line 423 of file AbstractFifo.hxx.
Referenced by resip::AbstractFifo< Timestamped< Msg * > >::onFifoPolled(), and resip::AbstractFifo< Timestamped< Msg * > >::onMessagePushed().
Mutex resip::AbstractFifo< T >::mMutex [mutable, protected] |
access serialization lock
Definition at line 419 of file AbstractFifo.hxx.
Referenced by resip::AbstractFifo< Timestamped< Msg * > >::add(), resip::AbstractFifo< Timestamped< Msg * > >::addMultiple(), resip::AbstractFifo< Timestamped< Msg * > >::empty(), resip::AbstractFifo< Timestamped< Msg * > >::getMultiple(), resip::AbstractFifo< Timestamped< Msg * > >::getNext(), resip::AbstractFifo< Timestamped< Msg * > >::messageAvailable(), and resip::AbstractFifo< Timestamped< Msg * > >::size().
UInt32 resip::AbstractFifo< T >::mSize [protected] |
Definition at line 429 of file AbstractFifo.hxx.
Referenced by resip::AbstractFifo< Timestamped< Msg * > >::expectedWaitTimeMilliSec(), resip::AbstractFifo< Timestamped< Msg * > >::getCountDepth(), resip::AbstractFifo< Timestamped< Msg * > >::getMultiple(), resip::AbstractFifo< Timestamped< Msg * > >::onMessagePopped(), and resip::AbstractFifo< Timestamped< Msg * > >::onMessagePushed().
1.7.5.1