reSIProcate/stack  9694
ConnectionManager.cxx
Go to the documentation of this file.
00001 #if defined(HAVE_CONFIG_H)
00002 #include "config.h"
00003 #endif
00004 
00005 #include "resip/stack/ConnectionManager.hxx"
00006 #include "resip/stack/InteropHelper.hxx"
00007 #include "rutil/Logger.hxx"
00008 #include "rutil/Inserter.hxx"
00009 
00010 #include <vector>
00011 
00012 using namespace resip;
00013 using namespace std;
00014 
00015 #define RESIPROCATE_SUBSYSTEM Subsystem::TRANSPORT
00016 
00017 UInt64 ConnectionManager::MinimumGcAge = 1;  // in milliseconds
00018 bool ConnectionManager::EnableAgressiveGc = false;
00019 
00020 ConnectionManager::ConnectionManager() : 
00021    mHead(0,Tuple(),0,Compression::Disabled),
00022    mWriteHead(ConnectionWriteList::makeList(&mHead)),
00023    mReadHead(ConnectionReadList::makeList(&mHead)),
00024    mLRUHead(ConnectionLruList::makeList(&mHead)),
00025    mFlowTimerLRUHead(FlowTimerLruList::makeList(&mHead)),
00026    mPollGrp(0)
00027 {
00028    DebugLog(<<"ConnectionManager::ConnectionManager() called ");
00029 }
00030 
00031 ConnectionManager::~ConnectionManager()
00032 {
00033    closeConnections();
00034    assert(mReadHead->empty());
00035    assert(mWriteHead->empty());
00036    assert(mLRUHead->empty());
00037    assert(mFlowTimerLRUHead->empty());
00038 }
00039 
00040 void 
00041 ConnectionManager::closeConnections()
00042 {
00043    while (!mAddrMap.empty())
00044    {
00045       delete mAddrMap.begin()->second;
00046    }
00047 }
00048 
00049 Connection*
00050 ConnectionManager::findConnection(const Tuple& addr)
00051 {
00052    if (addr.mFlowKey != 0)
00053    {
00054       IdMap::iterator i = mIdMap.find(addr.mFlowKey);
00055       if (i != mIdMap.end())
00056       {
00057          if(i->second->who()==addr)
00058          {
00059             DebugLog(<<"Found fd " << addr.mFlowKey);
00060             return i->second;
00061          }
00062          else
00063          {
00064             DebugLog(<<"fd " << addr.mFlowKey 
00065                      << " exists, but does not match the destination. FD -> "
00066                      << i->second->who() << ", tuple -> " << addr);
00067          }
00068       }
00069       else
00070       {
00071          DebugLog(<<"fd " << addr.mFlowKey << " does not exist.");
00072       }
00073 
00074       if(addr.onlyUseExistingConnection)
00075       {
00076          return 0;
00077       }
00078    }
00079    
00080    AddrMap::iterator i = mAddrMap.find(addr);
00081    if (i != mAddrMap.end())
00082    {
00083       DebugLog(<<"Found connection for tuple "<< addr );
00084       return i->second;
00085    }
00086 
00087    
00088    DebugLog(<<"Could not find a connection for " << addr);
00089    return 0;
00090 }
00091 
00092 const Connection* 
00093 ConnectionManager::findConnection(const Tuple& addr) const
00094 {
00095    if (addr.mFlowKey != 0)
00096    {
00097       IdMap::const_iterator i = mIdMap.find(addr.mFlowKey);
00098       if (i != mIdMap.end())
00099       {
00100          if(i->second->who()==addr)
00101          {
00102             DebugLog(<<"Found fd " << addr.mFlowKey);
00103             return i->second;
00104          }
00105          else
00106          {
00107             DebugLog(<<"fd " << addr.mFlowKey 
00108                      << " exists, but does not match the destination. FD -> "
00109                      << i->second->who() << ", tuple -> " << addr);
00110          }
00111       }
00112       else
00113       {
00114          DebugLog(<<"fd " << addr.mFlowKey << " does not exist.");
00115       }
00116    }
00117    
00118    AddrMap::const_iterator i = mAddrMap.find(addr);
00119    if (i != mAddrMap.end())
00120    {
00121       DebugLog(<<"Found connection for tuple "<< addr );
00122       return i->second;
00123    }
00124 
00125    
00126    DebugLog(<<"Could not find a connection for " << addr);
00127    return 0;
00128 }
00129 
00130 void
00131 ConnectionManager::buildFdSet(FdSet& fdset)
00132 {
00133    // If using PollGrp, caller shouldn't call this
00134    assert( mPollGrp==0 );
00135 
00136    for (ConnectionReadList::iterator i = mReadHead->begin(); 
00137         i != mReadHead->end(); ++i)
00138    {
00139       fdset.setRead((*i)->getSocket());
00140       fdset.setExcept((*i)->getSocket());
00141    }
00142 
00143    for (ConnectionWriteList::iterator i = mWriteHead->begin(); 
00144         i != mWriteHead->end(); ++i)
00145    {
00146       fdset.setWrite((*i)->getSocket());
00147       fdset.setExcept((*i)->getSocket());
00148    }
00149 }
00150 
00151 void
00152 ConnectionManager::addToWritable(Connection* conn)
00153 {
00154    if ( mPollGrp ) 
00155    {
00156       mPollGrp->modPollItem(conn->mPollItemHandle, FPEM_Read|FPEM_Write);
00157    } 
00158    else 
00159    {
00160       mWriteHead->push_back(conn);
00161    }
00162 }
00163 
00164 void
00165 ConnectionManager::removeFromWritable(Connection* conn)
00166 {
00167    if ( mPollGrp ) 
00168    {
00169       mPollGrp->modPollItem(conn->mPollItemHandle, FPEM_Read);
00170    }
00171    else
00172    {
00173       assert(!mWriteHead->empty());
00174       conn->ConnectionWriteList::remove();
00175    }
00176 }
00177 
00178 void
00179 ConnectionManager::addConnection(Connection* connection)
00180 {
00181    assert(mAddrMap.find(connection->who())==mAddrMap.end());
00182 
00183    //DebugLog (<< "ConnectionManager::addConnection() " << connection->mWho.mFlowKey  << ":" << connection->mSocket);
00184    
00185    
00186    mAddrMap[connection->who()] = connection;
00187    mIdMap[connection->who().mFlowKey] = connection;
00188 
00189    if ( mPollGrp ) 
00190    {
00191       connection->mPollItemHandle = mPollGrp->addPollItem(
00192          connection->getSocket(), FPEM_Read, connection);
00193    }
00194    else 
00195    {
00196       mReadHead->push_back(connection);
00197    }
00198    mLRUHead->push_back(connection);
00199 
00200    // Garbage collect old connections if agressive is enabled
00201    if(EnableAgressiveGc)
00202    {
00203       gc(MinimumGcAge, 0);  // cleanup all connections that haven't seen data in last x ms
00204    }
00205 
00206    //DebugLog (<< "count=" << mAddrMap.count(connection->who()) << "who=" << connection->who() << " mAddrMap=" << Inserter(mAddrMap));
00207    //assert(mAddrMap.begin()->first == connection->who());
00208    assert(mAddrMap.count(connection->who()) == 1);
00209 }
00210 
00211 void
00212 ConnectionManager::removeConnection(Connection* connection)
00213 {
00214    //DebugLog (<< "ConnectionManager::removeConnection()");
00215 
00216    mIdMap.erase(connection->mWho.mFlowKey);
00217    mAddrMap.erase(connection->mWho);
00218 
00219    if ( mPollGrp ) 
00220    {
00221       mPollGrp->delPollItem(connection->mPollItemHandle);
00222    }
00223    else
00224    {
00225       assert(!mReadHead->empty());
00226       connection->ConnectionReadList::remove();
00227       connection->ConnectionWriteList::remove();
00228       if(connection->isFlowTimerEnabled())
00229       {
00230          connection->FlowTimerLruList::remove();
00231       }
00232       else
00233       {
00234          connection->ConnectionLruList::remove();
00235       }
00236    }
00237 }
00238 
00239 // release excessively old connections (free up file descriptors)
00240 void
00241 ConnectionManager::gc(UInt64 relThreshold, unsigned int maxToRemove)
00242 {
00243    UInt64 curTimeMs = Timer::getTimeMs();
00244    UInt64 threshold = curTimeMs - relThreshold;
00245    DebugLog(<< "recycling connections not used in last " << relThreshold/1000.0 << " seconds");
00246 
00247    // Look through non-flow-timer connections and close those using the passed in relThreshold
00248    unsigned int numRemoved = 0;
00249    for (ConnectionLruList::iterator i = mLRUHead->begin();
00250         i != mLRUHead->end() &&
00251         (maxToRemove == 0 || numRemoved != maxToRemove);)
00252    {
00253       if ((*i)->whenLastUsed() < threshold)
00254       {
00255          Connection* discard = *i;
00256          InfoLog(<< "recycling connection: " << discard << " " << discard->getSocket());
00257          // iterate before removing
00258          ++i;
00259          delete discard;
00260          numRemoved++;
00261       }
00262       else
00263       {
00264          break;
00265       }
00266    }
00267 
00268    // Look through flow-timer connections and close those using the configured FlowTimer 
00269    // value + the configured grace period as a threshold
00270    if(mFlowTimerLRUHead->begin() != mFlowTimerLRUHead->end())
00271    {
00272       threshold = curTimeMs - ((InteropHelper::getFlowTimerSeconds() + InteropHelper::getFlowTimerGracePeriodSeconds()) * 1000);
00273       for (FlowTimerLruList::iterator i2 = mFlowTimerLRUHead->begin();
00274          i2 != mFlowTimerLRUHead->end() &&
00275          (maxToRemove == 0 || numRemoved != maxToRemove);)
00276       {  
00277          if ((*i2)->whenLastUsed() < threshold)
00278          {
00279             Connection* discard = *i2;
00280             InfoLog(<< "recycling flow-timer enabled connection: " << discard << " " << discard->getSocket());
00281             // iterate before removing
00282             ++i2;
00283             delete discard;
00284             numRemoved++;
00285          }
00286          else
00287          {
00288             break;
00289          }
00290       }
00291    }
00292 }
00293 
00294 // move to youngest
00295 void
00296 ConnectionManager::touch(Connection* connection)
00297 {
00298    connection->resetLastUsed();
00299    if(connection->isFlowTimerEnabled())
00300    {
00301       connection->FlowTimerLruList::remove();
00302       mFlowTimerLRUHead->push_back(connection);
00303    }
00304    else
00305    {
00306       connection->ConnectionLruList::remove();
00307       mLRUHead->push_back(connection);
00308    }
00309 }
00310 
00311 void 
00312 ConnectionManager::moveToFlowTimerLru(Connection *connection)
00313 {
00314    connection->ConnectionLruList::remove();
00315    mFlowTimerLRUHead->push_back(connection);
00316 }
00317 
00318 void
00319 ConnectionManager::process(FdSet& fdset)
00320 {
00321    assert( mPollGrp==NULL );    // owner shouldn't call this if polling
00322 
00323    // process the write list
00324    for (ConnectionWriteList::iterator writeIter = mWriteHead->begin();
00325         writeIter != mWriteHead->end(); )
00326    {
00327       Connection* currConnection = *writeIter;
00328 
00329       // update iterator to next first so that it can traverse safely
00330       // even if current one is removed from the list later
00331       ++writeIter;
00332 
00333       if (!currConnection)
00334          continue;
00335 
00336       if (fdset.readyToWrite(currConnection->getSocket()))
00337       {
00338          currConnection->performWrites();
00339       }
00340       else if (fdset.hasException(currConnection->getSocket()))
00341       {
00342          int errNum = 0;
00343          int errNumSize = sizeof(errNum);
00344          getsockopt(currConnection->getSocket(), SOL_SOCKET, SO_ERROR, (char *)&errNum, (socklen_t *)&errNumSize);
00345          InfoLog(<< "Exception writing to socket " << currConnection->getSocket() << " code: " << errNum << "; closing connection");
00346          delete currConnection;
00347       }
00348    }
00349 
00350    // process the read list
00351    for (ConnectionReadList::iterator readIter = mReadHead->begin();
00352         readIter != mReadHead->end(); )
00353    {
00354       Connection* currConnection = *readIter; 
00355 
00356       // update iterator to next first so that it can traverse safely
00357       // even if current one is removed from the list later
00358       ++readIter;
00359 
00360       if (!currConnection)
00361          continue;
00362 
00363       if ( fdset.readyToRead(currConnection->getSocket()) ||
00364            currConnection->hasDataToRead() )
00365       {
00366          fdset.clear(currConnection->getSocket());
00367          currConnection->performReads();
00368       }
00369       else if (fdset.hasException(currConnection->getSocket()))
00370       {
00371          int errNum = 0;
00372          int errNumSize = sizeof(errNum);
00373          getsockopt(currConnection->getSocket(), SOL_SOCKET, SO_ERROR, (char *)&errNum, (socklen_t *)&errNumSize);
00374          InfoLog(<< "Exception reading from socket " << currConnection->getSocket() << " code: " << errNum << "; closing connection");
00375          delete currConnection;
00376       }
00377    }
00378 }
00379 
00380 void
00381 ConnectionManager::setPollGrp(FdPollGrp *grp) 
00382 {
00383     // .bwc. We could have all the connections detach and re-attach, but the 
00384     // only place we call this when connections exist is when tearing down 
00385     // anyway.
00386     closeConnections();
00387     mPollGrp = grp;
00388 }
00389 
00390 /* ====================================================================
00391  * The Vovida Software License, Version 1.0 
00392  * 
00393  * Copyright (c)
00394  * 
00395  * Redistribution and use in source and binary forms, with or without
00396  * modification, are permitted provided that the following conditions
00397  * are met:
00398  * 
00399  * 1. Redistributions of source code must retain the above copyright
00400  *    notice, this list of conditions and the following disclaimer.
00401  * 
00402  * 2. Redistributions in binary form must reproduce the above copyright
00403  *    notice, this list of conditions and the following disclaimer in
00404  *    the documentation and/or other materials provided with the
00405  *    distribution.
00406  * 
00407  * 3. The names "VOCAL", "Vovida Open Communication Application Library",
00408  *    and "Vovida Open Communication Application Library (VOCAL)" must
00409  *    not be used to endorse or promote products derived from this
00410  *    software without prior written permission. For written
00411  *    permission, please contact vocal@vovida.org.
00412  *
00413  * 4. Products derived from this software may not be called "VOCAL", nor
00414  *    may "VOCAL" appear in their name, without prior written
00415  *    permission of Vovida Networks, Inc.
00416  * 
00417  * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
00418  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
00419  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
00420  * NON-INFRINGEMENT ARE DISCLAIMED.  IN NO EVENT SHALL VOVIDA
00421  * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
00422  * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
00423  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00424  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00425  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
00426  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00427  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
00428  * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
00429  * DAMAGE.
00430  * 
00431  * ====================================================================
00432  * 
00433  * This software consists of voluntary contributions made by Vovida
00434  * Networks, Inc. and many individuals on behalf of Vovida Networks,
00435  * Inc.  For more information on Vovida Networks, Inc., please see
00436  * <http://www.vovida.org/>.
00437  *
00438  */