|
reSIProcate/stack
9694
|
Collection of Connection per Transport. More...
#include <ConnectionManager.hxx>

Public Member Functions | |
| ConnectionManager () | |
| ~ConnectionManager () | |
| Connection * | findConnection (const Tuple &tuple) |
| may return 0 | |
| const Connection * | findConnection (const Tuple &tuple) const |
| void | setPollGrp (FdPollGrp *grp) |
| populate the fdset againt the read and write lists | |
| void | buildFdSet (FdSet &fdset) |
| void | process (FdSet &fdset) |
Static Public Attributes | |
| static UInt64 | MinimumGcAge = 1 |
| connection must have no inbound traffic for greater than this time (in ms) before it is garbage collected | |
| static bool | EnableAgressiveGc = false |
| Enable Agressive Connection Garbage Collection to have resip perform garbage collection on every new connection. | |
Private Types | |
| typedef std::map< Tuple, Connection * > | AddrMap |
| typedef std::map< Socket, Connection * > | IdMap |
Private Member Functions | |
| void | addToWritable (Connection *conn) |
| void | removeFromWritable (Connection *conn) |
| void | addConnection (Connection *connection) |
| void | removeConnection (Connection *connection) |
| void | closeConnections () |
| void | gc (UInt64 threshold, unsigned int maxToRemove) |
| release excessively old connections (free up file descriptors) set maxToRemove to 0 for no-max | |
| void | touch (Connection *connection) |
| move to youngest | |
| void | moveToFlowTimerLru (Connection *connection) |
Private Attributes | |
| AddrMap | mAddrMap |
| IdMap | mIdMap |
| Connection | mHead |
| all intrusive lists based on the same element type | |
| ConnectionWriteList * | mWriteHead |
| ready to write list | |
| ConnectionReadList * | mReadHead |
| ready to read list | |
| ConnectionLruList * | mLRUHead |
| least recently used list | |
| FlowTimerLruList * | mFlowTimerLRUHead |
| least recently used list for flow timer enabled connections | |
| FdPollGrp * | mPollGrp |
| collection for epoll | |
Friends | |
| class | Connection |
| class | TcpBaseTransport |
Collection of Connection per Transport.
Maintains round-robin orders for read and write. Maintains least-recently-used connections list for garbage collection.
Maintains mapping from Tuple to Connection.
Definition at line 19 of file ConnectionManager.hxx.
typedef std::map<Tuple, Connection*> resip::ConnectionManager::AddrMap [private] |
Definition at line 47 of file ConnectionManager.hxx.
typedef std::map<Socket, Connection*> resip::ConnectionManager::IdMap [private] |
Definition at line 48 of file ConnectionManager.hxx.
| ConnectionManager::ConnectionManager | ( | ) |
Definition at line 20 of file ConnectionManager.cxx.
References DebugLog.
: mHead(0,Tuple(),0,Compression::Disabled), mWriteHead(ConnectionWriteList::makeList(&mHead)), mReadHead(ConnectionReadList::makeList(&mHead)), mLRUHead(ConnectionLruList::makeList(&mHead)), mFlowTimerLRUHead(FlowTimerLruList::makeList(&mHead)), mPollGrp(0) { DebugLog(<<"ConnectionManager::ConnectionManager() called "); }
| ConnectionManager::~ConnectionManager | ( | ) |
Definition at line 31 of file ConnectionManager.cxx.
References closeConnections(), resip::IntrusiveListElement2< P >::empty(), resip::IntrusiveListElement3< P >::empty(), resip::IntrusiveListElement< P >::empty(), resip::IntrusiveListElement1< P >::empty(), mFlowTimerLRUHead, mLRUHead, mReadHead, and mWriteHead.
{
closeConnections();
assert(mReadHead->empty());
assert(mWriteHead->empty());
assert(mLRUHead->empty());
assert(mFlowTimerLRUHead->empty());
}

| void ConnectionManager::addConnection | ( | Connection * | connection | ) | [private] |
Definition at line 179 of file ConnectionManager.cxx.
References resip::FdPollGrp::addPollItem(), EnableAgressiveGc, FPEM_Read, gc(), resip::Connection::getSocket(), mAddrMap, resip::Tuple::mFlowKey, mIdMap, MinimumGcAge, mLRUHead, mPollGrp, resip::Connection::mPollItemHandle, mReadHead, resip::IntrusiveListElement1< P >::push_back(), resip::IntrusiveListElement< P >::push_back(), and resip::ConnectionBase::who().
Referenced by resip::Connection::Connection().
{
assert(mAddrMap.find(connection->who())==mAddrMap.end());
//DebugLog (<< "ConnectionManager::addConnection() " << connection->mWho.mFlowKey << ":" << connection->mSocket);
mAddrMap[connection->who()] = connection;
mIdMap[connection->who().mFlowKey] = connection;
if ( mPollGrp )
{
connection->mPollItemHandle = mPollGrp->addPollItem(
connection->getSocket(), FPEM_Read, connection);
}
else
{
mReadHead->push_back(connection);
}
mLRUHead->push_back(connection);
// Garbage collect old connections if agressive is enabled
if(EnableAgressiveGc)
{
gc(MinimumGcAge, 0); // cleanup all connections that haven't seen data in last x ms
}
//DebugLog (<< "count=" << mAddrMap.count(connection->who()) << "who=" << connection->who() << " mAddrMap=" << Inserter(mAddrMap));
//assert(mAddrMap.begin()->first == connection->who());
assert(mAddrMap.count(connection->who()) == 1);
}

| void ConnectionManager::addToWritable | ( | Connection * | conn | ) | [private] |
Definition at line 152 of file ConnectionManager.cxx.
References FPEM_Read, FPEM_Write, resip::FdPollGrp::modPollItem(), mPollGrp, resip::Connection::mPollItemHandle, mWriteHead, and resip::IntrusiveListElement2< P >::push_back().
Referenced by resip::Connection::ensureWritable().
{
if ( mPollGrp )
{
mPollGrp->modPollItem(conn->mPollItemHandle, FPEM_Read|FPEM_Write);
}
else
{
mWriteHead->push_back(conn);
}
}

| void ConnectionManager::buildFdSet | ( | FdSet & | fdset | ) |
Definition at line 131 of file ConnectionManager.cxx.
References resip::IntrusiveListElement1< P >::begin(), resip::IntrusiveListElement2< P >::begin(), resip::IntrusiveListElement1< P >::end(), resip::IntrusiveListElement2< P >::end(), mPollGrp, mReadHead, mWriteHead, resip::FdSet::setExcept(), resip::FdSet::setRead(), and resip::FdSet::setWrite().
Referenced by resip::TcpBaseTransport::buildFdSet().
{
// If using PollGrp, caller shouldn't call this
assert( mPollGrp==0 );
for (ConnectionReadList::iterator i = mReadHead->begin();
i != mReadHead->end(); ++i)
{
fdset.setRead((*i)->getSocket());
fdset.setExcept((*i)->getSocket());
}
for (ConnectionWriteList::iterator i = mWriteHead->begin();
i != mWriteHead->end(); ++i)
{
fdset.setWrite((*i)->getSocket());
fdset.setExcept((*i)->getSocket());
}
}

| void ConnectionManager::closeConnections | ( | ) | [private] |
Definition at line 41 of file ConnectionManager.cxx.
References mAddrMap.
Referenced by setPollGrp(), and ~ConnectionManager().
| Connection * ConnectionManager::findConnection | ( | const Tuple & | tuple | ) |
may return 0
Definition at line 50 of file ConnectionManager.cxx.
References DebugLog, mAddrMap, resip::Tuple::mFlowKey, mIdMap, and resip::Tuple::onlyUseExistingConnection.
Referenced by resip::TcpBaseTransport::processAllWriteRequests(), and resip::TcpBaseTransport::processListen().
{
if (addr.mFlowKey != 0)
{
IdMap::iterator i = mIdMap.find(addr.mFlowKey);
if (i != mIdMap.end())
{
if(i->second->who()==addr)
{
DebugLog(<<"Found fd " << addr.mFlowKey);
return i->second;
}
else
{
DebugLog(<<"fd " << addr.mFlowKey
<< " exists, but does not match the destination. FD -> "
<< i->second->who() << ", tuple -> " << addr);
}
}
else
{
DebugLog(<<"fd " << addr.mFlowKey << " does not exist.");
}
if(addr.onlyUseExistingConnection)
{
return 0;
}
}
AddrMap::iterator i = mAddrMap.find(addr);
if (i != mAddrMap.end())
{
DebugLog(<<"Found connection for tuple "<< addr );
return i->second;
}
DebugLog(<<"Could not find a connection for " << addr);
return 0;
}
| const Connection * ConnectionManager::findConnection | ( | const Tuple & | tuple | ) | const |
Definition at line 93 of file ConnectionManager.cxx.
References DebugLog, mAddrMap, resip::Tuple::mFlowKey, and mIdMap.
{
if (addr.mFlowKey != 0)
{
IdMap::const_iterator i = mIdMap.find(addr.mFlowKey);
if (i != mIdMap.end())
{
if(i->second->who()==addr)
{
DebugLog(<<"Found fd " << addr.mFlowKey);
return i->second;
}
else
{
DebugLog(<<"fd " << addr.mFlowKey
<< " exists, but does not match the destination. FD -> "
<< i->second->who() << ", tuple -> " << addr);
}
}
else
{
DebugLog(<<"fd " << addr.mFlowKey << " does not exist.");
}
}
AddrMap::const_iterator i = mAddrMap.find(addr);
if (i != mAddrMap.end())
{
DebugLog(<<"Found connection for tuple "<< addr );
return i->second;
}
DebugLog(<<"Could not find a connection for " << addr);
return 0;
}
| void ConnectionManager::gc | ( | UInt64 | threshold, |
| unsigned int | maxToRemove | ||
| ) | [private] |
release excessively old connections (free up file descriptors) set maxToRemove to 0 for no-max
Definition at line 241 of file ConnectionManager.cxx.
References resip::IntrusiveListElement< P >::begin(), resip::IntrusiveListElement3< P >::begin(), DebugLog, resip::IntrusiveListElement< P >::end(), resip::IntrusiveListElement3< P >::end(), resip::InteropHelper::getFlowTimerGracePeriodSeconds(), resip::InteropHelper::getFlowTimerSeconds(), resip::Connection::getSocket(), resip::Timer::getTimeMs(), InfoLog, mFlowTimerLRUHead, and mLRUHead.
Referenced by addConnection(), and resip::TcpBaseTransport::makeOutgoingConnection().
{
UInt64 curTimeMs = Timer::getTimeMs();
UInt64 threshold = curTimeMs - relThreshold;
DebugLog(<< "recycling connections not used in last " << relThreshold/1000.0 << " seconds");
// Look through non-flow-timer connections and close those using the passed in relThreshold
unsigned int numRemoved = 0;
for (ConnectionLruList::iterator i = mLRUHead->begin();
i != mLRUHead->end() &&
(maxToRemove == 0 || numRemoved != maxToRemove);)
{
if ((*i)->whenLastUsed() < threshold)
{
Connection* discard = *i;
InfoLog(<< "recycling connection: " << discard << " " << discard->getSocket());
// iterate before removing
++i;
delete discard;
numRemoved++;
}
else
{
break;
}
}
// Look through flow-timer connections and close those using the configured FlowTimer
// value + the configured grace period as a threshold
if(mFlowTimerLRUHead->begin() != mFlowTimerLRUHead->end())
{
threshold = curTimeMs - ((InteropHelper::getFlowTimerSeconds() + InteropHelper::getFlowTimerGracePeriodSeconds()) * 1000);
for (FlowTimerLruList::iterator i2 = mFlowTimerLRUHead->begin();
i2 != mFlowTimerLRUHead->end() &&
(maxToRemove == 0 || numRemoved != maxToRemove);)
{
if ((*i2)->whenLastUsed() < threshold)
{
Connection* discard = *i2;
InfoLog(<< "recycling flow-timer enabled connection: " << discard << " " << discard->getSocket());
// iterate before removing
++i2;
delete discard;
numRemoved++;
}
else
{
break;
}
}
}
}

| void ConnectionManager::moveToFlowTimerLru | ( | Connection * | connection | ) | [private] |
Definition at line 312 of file ConnectionManager.cxx.
References mFlowTimerLRUHead, and resip::IntrusiveListElement3< P >::push_back().
Referenced by resip::Connection::enableFlowTimer().
{
connection->ConnectionLruList::remove();
mFlowTimerLRUHead->push_back(connection);
}

| void ConnectionManager::process | ( | FdSet & | fdset | ) |
Definition at line 319 of file ConnectionManager.cxx.
References resip::IntrusiveListElement2< P >::begin(), resip::IntrusiveListElement1< P >::begin(), resip::FdSet::clear(), resip::IntrusiveListElement2< P >::end(), resip::IntrusiveListElement1< P >::end(), resip::Connection::getSocket(), resip::Connection::hasDataToRead(), resip::FdSet::hasException(), InfoLog, mPollGrp, mReadHead, mWriteHead, resip::Connection::performReads(), resip::Connection::performWrites(), resip::FdSet::readyToRead(), and resip::FdSet::readyToWrite().
Referenced by resip::TcpBaseTransport::process().
{
assert( mPollGrp==NULL ); // owner shouldn't call this if polling
// process the write list
for (ConnectionWriteList::iterator writeIter = mWriteHead->begin();
writeIter != mWriteHead->end(); )
{
Connection* currConnection = *writeIter;
// update iterator to next first so that it can traverse safely
// even if current one is removed from the list later
++writeIter;
if (!currConnection)
continue;
if (fdset.readyToWrite(currConnection->getSocket()))
{
currConnection->performWrites();
}
else if (fdset.hasException(currConnection->getSocket()))
{
int errNum = 0;
int errNumSize = sizeof(errNum);
getsockopt(currConnection->getSocket(), SOL_SOCKET, SO_ERROR, (char *)&errNum, (socklen_t *)&errNumSize);
InfoLog(<< "Exception writing to socket " << currConnection->getSocket() << " code: " << errNum << "; closing connection");
delete currConnection;
}
}
// process the read list
for (ConnectionReadList::iterator readIter = mReadHead->begin();
readIter != mReadHead->end(); )
{
Connection* currConnection = *readIter;
// update iterator to next first so that it can traverse safely
// even if current one is removed from the list later
++readIter;
if (!currConnection)
continue;
if ( fdset.readyToRead(currConnection->getSocket()) ||
currConnection->hasDataToRead() )
{
fdset.clear(currConnection->getSocket());
currConnection->performReads();
}
else if (fdset.hasException(currConnection->getSocket()))
{
int errNum = 0;
int errNumSize = sizeof(errNum);
getsockopt(currConnection->getSocket(), SOL_SOCKET, SO_ERROR, (char *)&errNum, (socklen_t *)&errNumSize);
InfoLog(<< "Exception reading from socket " << currConnection->getSocket() << " code: " << errNum << "; closing connection");
delete currConnection;
}
}
}

| void ConnectionManager::removeConnection | ( | Connection * | connection | ) | [private] |
Definition at line 212 of file ConnectionManager.cxx.
References resip::FdPollGrp::delPollItem(), resip::IntrusiveListElement1< P >::empty(), resip::Connection::isFlowTimerEnabled(), mAddrMap, resip::Tuple::mFlowKey, mIdMap, mPollGrp, resip::Connection::mPollItemHandle, mReadHead, and resip::ConnectionBase::mWho.
Referenced by resip::Connection::~Connection().
{
//DebugLog (<< "ConnectionManager::removeConnection()");
mIdMap.erase(connection->mWho.mFlowKey);
mAddrMap.erase(connection->mWho);
if ( mPollGrp )
{
mPollGrp->delPollItem(connection->mPollItemHandle);
}
else
{
assert(!mReadHead->empty());
connection->ConnectionReadList::remove();
connection->ConnectionWriteList::remove();
if(connection->isFlowTimerEnabled())
{
connection->FlowTimerLruList::remove();
}
else
{
connection->ConnectionLruList::remove();
}
}
}

| void ConnectionManager::removeFromWritable | ( | Connection * | conn | ) | [private] |
Definition at line 165 of file ConnectionManager.cxx.
References resip::IntrusiveListElement2< P >::empty(), FPEM_Read, resip::FdPollGrp::modPollItem(), mPollGrp, resip::Connection::mPollItemHandle, and mWriteHead.
Referenced by resip::Connection::performWrite(), and resip::Connection::removeFrontOutstandingSend().
{
if ( mPollGrp )
{
mPollGrp->modPollItem(conn->mPollItemHandle, FPEM_Read);
}
else
{
assert(!mWriteHead->empty());
conn->ConnectionWriteList::remove();
}
}

| void ConnectionManager::setPollGrp | ( | FdPollGrp * | grp | ) |
populate the fdset againt the read and write lists
Definition at line 381 of file ConnectionManager.cxx.
References closeConnections(), and mPollGrp.
Referenced by resip::TcpBaseTransport::setPollGrp().
{
// .bwc. We could have all the connections detach and re-attach, but the
// only place we call this when connections exist is when tearing down
// anyway.
closeConnections();
mPollGrp = grp;
}

| void ConnectionManager::touch | ( | Connection * | connection | ) | [private] |
move to youngest
Definition at line 296 of file ConnectionManager.cxx.
References resip::Connection::isFlowTimerEnabled(), mFlowTimerLRUHead, mLRUHead, resip::IntrusiveListElement3< P >::push_back(), resip::IntrusiveListElement< P >::push_back(), and resip::ConnectionBase::resetLastUsed().
Referenced by resip::Connection::read().
{
connection->resetLastUsed();
if(connection->isFlowTimerEnabled())
{
connection->FlowTimerLruList::remove();
mFlowTimerLRUHead->push_back(connection);
}
else
{
connection->ConnectionLruList::remove();
mLRUHead->push_back(connection);
}
}

friend class Connection [friend] |
Definition at line 21 of file ConnectionManager.hxx.
friend class TcpBaseTransport [friend] |
Definition at line 81 of file ConnectionManager.hxx.
bool ConnectionManager::EnableAgressiveGc = false [static] |
Enable Agressive Connection Garbage Collection to have resip perform garbage collection on every new connection.
If disabled then garbage collection is only performed if we run out of Fd's
Definition at line 29 of file ConnectionManager.hxx.
Referenced by addConnection().
AddrMap resip::ConnectionManager::mAddrMap [private] |
Definition at line 62 of file ConnectionManager.hxx.
Referenced by addConnection(), closeConnections(), findConnection(), and removeConnection().
least recently used list for flow timer enabled connections
Definition at line 75 of file ConnectionManager.hxx.
Referenced by gc(), moveToFlowTimerLru(), touch(), and ~ConnectionManager().
Connection resip::ConnectionManager::mHead [private] |
all intrusive lists based on the same element type
Definition at line 66 of file ConnectionManager.hxx.
IdMap resip::ConnectionManager::mIdMap [private] |
Definition at line 63 of file ConnectionManager.hxx.
Referenced by addConnection(), findConnection(), and removeConnection().
UInt64 ConnectionManager::MinimumGcAge = 1 [static] |
connection must have no inbound traffic for greater than this time (in ms) before it is garbage collected
Definition at line 25 of file ConnectionManager.hxx.
Referenced by addConnection(), and resip::TcpBaseTransport::makeOutgoingConnection().
least recently used list
Definition at line 73 of file ConnectionManager.hxx.
Referenced by addConnection(), gc(), touch(), and ~ConnectionManager().
FdPollGrp* resip::ConnectionManager::mPollGrp [private] |
collection for epoll
Definition at line 78 of file ConnectionManager.hxx.
Referenced by addConnection(), addToWritable(), buildFdSet(), process(), removeConnection(), removeFromWritable(), and setPollGrp().
ready to read list
Definition at line 71 of file ConnectionManager.hxx.
Referenced by addConnection(), buildFdSet(), process(), removeConnection(), and ~ConnectionManager().
ready to write list
Definition at line 69 of file ConnectionManager.hxx.
Referenced by addToWritable(), buildFdSet(), process(), removeFromWritable(), and ~ConnectionManager().
1.7.5.1