|
reSIProcate/stack
9694
|
00001 #if defined(HAVE_CONFIG_H) 00002 #include "config.h" 00003 #endif 00004 00005 #include "resip/stack/ConnectionManager.hxx" 00006 #include "resip/stack/InteropHelper.hxx" 00007 #include "rutil/Logger.hxx" 00008 #include "rutil/Inserter.hxx" 00009 00010 #include <vector> 00011 00012 using namespace resip; 00013 using namespace std; 00014 00015 #define RESIPROCATE_SUBSYSTEM Subsystem::TRANSPORT 00016 00017 UInt64 ConnectionManager::MinimumGcAge = 1; // in milliseconds 00018 bool ConnectionManager::EnableAgressiveGc = false; 00019 00020 ConnectionManager::ConnectionManager() : 00021 mHead(0,Tuple(),0,Compression::Disabled), 00022 mWriteHead(ConnectionWriteList::makeList(&mHead)), 00023 mReadHead(ConnectionReadList::makeList(&mHead)), 00024 mLRUHead(ConnectionLruList::makeList(&mHead)), 00025 mFlowTimerLRUHead(FlowTimerLruList::makeList(&mHead)), 00026 mPollGrp(0) 00027 { 00028 DebugLog(<<"ConnectionManager::ConnectionManager() called "); 00029 } 00030 00031 ConnectionManager::~ConnectionManager() 00032 { 00033 closeConnections(); 00034 assert(mReadHead->empty()); 00035 assert(mWriteHead->empty()); 00036 assert(mLRUHead->empty()); 00037 assert(mFlowTimerLRUHead->empty()); 00038 } 00039 00040 void 00041 ConnectionManager::closeConnections() 00042 { 00043 while (!mAddrMap.empty()) 00044 { 00045 delete mAddrMap.begin()->second; 00046 } 00047 } 00048 00049 Connection* 00050 ConnectionManager::findConnection(const Tuple& addr) 00051 { 00052 if (addr.mFlowKey != 0) 00053 { 00054 IdMap::iterator i = mIdMap.find(addr.mFlowKey); 00055 if (i != mIdMap.end()) 00056 { 00057 if(i->second->who()==addr) 00058 { 00059 DebugLog(<<"Found fd " << addr.mFlowKey); 00060 return i->second; 00061 } 00062 else 00063 { 00064 DebugLog(<<"fd " << addr.mFlowKey 00065 << " exists, but does not match the destination. FD -> " 00066 << i->second->who() << ", tuple -> " << addr); 00067 } 00068 } 00069 else 00070 { 00071 DebugLog(<<"fd " << addr.mFlowKey << " does not exist."); 00072 } 00073 00074 if(addr.onlyUseExistingConnection) 00075 { 00076 return 0; 00077 } 00078 } 00079 00080 AddrMap::iterator i = mAddrMap.find(addr); 00081 if (i != mAddrMap.end()) 00082 { 00083 DebugLog(<<"Found connection for tuple "<< addr ); 00084 return i->second; 00085 } 00086 00087 00088 DebugLog(<<"Could not find a connection for " << addr); 00089 return 0; 00090 } 00091 00092 const Connection* 00093 ConnectionManager::findConnection(const Tuple& addr) const 00094 { 00095 if (addr.mFlowKey != 0) 00096 { 00097 IdMap::const_iterator i = mIdMap.find(addr.mFlowKey); 00098 if (i != mIdMap.end()) 00099 { 00100 if(i->second->who()==addr) 00101 { 00102 DebugLog(<<"Found fd " << addr.mFlowKey); 00103 return i->second; 00104 } 00105 else 00106 { 00107 DebugLog(<<"fd " << addr.mFlowKey 00108 << " exists, but does not match the destination. FD -> " 00109 << i->second->who() << ", tuple -> " << addr); 00110 } 00111 } 00112 else 00113 { 00114 DebugLog(<<"fd " << addr.mFlowKey << " does not exist."); 00115 } 00116 } 00117 00118 AddrMap::const_iterator i = mAddrMap.find(addr); 00119 if (i != mAddrMap.end()) 00120 { 00121 DebugLog(<<"Found connection for tuple "<< addr ); 00122 return i->second; 00123 } 00124 00125 00126 DebugLog(<<"Could not find a connection for " << addr); 00127 return 0; 00128 } 00129 00130 void 00131 ConnectionManager::buildFdSet(FdSet& fdset) 00132 { 00133 // If using PollGrp, caller shouldn't call this 00134 assert( mPollGrp==0 ); 00135 00136 for (ConnectionReadList::iterator i = mReadHead->begin(); 00137 i != mReadHead->end(); ++i) 00138 { 00139 fdset.setRead((*i)->getSocket()); 00140 fdset.setExcept((*i)->getSocket()); 00141 } 00142 00143 for (ConnectionWriteList::iterator i = mWriteHead->begin(); 00144 i != mWriteHead->end(); ++i) 00145 { 00146 fdset.setWrite((*i)->getSocket()); 00147 fdset.setExcept((*i)->getSocket()); 00148 } 00149 } 00150 00151 void 00152 ConnectionManager::addToWritable(Connection* conn) 00153 { 00154 if ( mPollGrp ) 00155 { 00156 mPollGrp->modPollItem(conn->mPollItemHandle, FPEM_Read|FPEM_Write); 00157 } 00158 else 00159 { 00160 mWriteHead->push_back(conn); 00161 } 00162 } 00163 00164 void 00165 ConnectionManager::removeFromWritable(Connection* conn) 00166 { 00167 if ( mPollGrp ) 00168 { 00169 mPollGrp->modPollItem(conn->mPollItemHandle, FPEM_Read); 00170 } 00171 else 00172 { 00173 assert(!mWriteHead->empty()); 00174 conn->ConnectionWriteList::remove(); 00175 } 00176 } 00177 00178 void 00179 ConnectionManager::addConnection(Connection* connection) 00180 { 00181 assert(mAddrMap.find(connection->who())==mAddrMap.end()); 00182 00183 //DebugLog (<< "ConnectionManager::addConnection() " << connection->mWho.mFlowKey << ":" << connection->mSocket); 00184 00185 00186 mAddrMap[connection->who()] = connection; 00187 mIdMap[connection->who().mFlowKey] = connection; 00188 00189 if ( mPollGrp ) 00190 { 00191 connection->mPollItemHandle = mPollGrp->addPollItem( 00192 connection->getSocket(), FPEM_Read, connection); 00193 } 00194 else 00195 { 00196 mReadHead->push_back(connection); 00197 } 00198 mLRUHead->push_back(connection); 00199 00200 // Garbage collect old connections if agressive is enabled 00201 if(EnableAgressiveGc) 00202 { 00203 gc(MinimumGcAge, 0); // cleanup all connections that haven't seen data in last x ms 00204 } 00205 00206 //DebugLog (<< "count=" << mAddrMap.count(connection->who()) << "who=" << connection->who() << " mAddrMap=" << Inserter(mAddrMap)); 00207 //assert(mAddrMap.begin()->first == connection->who()); 00208 assert(mAddrMap.count(connection->who()) == 1); 00209 } 00210 00211 void 00212 ConnectionManager::removeConnection(Connection* connection) 00213 { 00214 //DebugLog (<< "ConnectionManager::removeConnection()"); 00215 00216 mIdMap.erase(connection->mWho.mFlowKey); 00217 mAddrMap.erase(connection->mWho); 00218 00219 if ( mPollGrp ) 00220 { 00221 mPollGrp->delPollItem(connection->mPollItemHandle); 00222 } 00223 else 00224 { 00225 assert(!mReadHead->empty()); 00226 connection->ConnectionReadList::remove(); 00227 connection->ConnectionWriteList::remove(); 00228 if(connection->isFlowTimerEnabled()) 00229 { 00230 connection->FlowTimerLruList::remove(); 00231 } 00232 else 00233 { 00234 connection->ConnectionLruList::remove(); 00235 } 00236 } 00237 } 00238 00239 // release excessively old connections (free up file descriptors) 00240 void 00241 ConnectionManager::gc(UInt64 relThreshold, unsigned int maxToRemove) 00242 { 00243 UInt64 curTimeMs = Timer::getTimeMs(); 00244 UInt64 threshold = curTimeMs - relThreshold; 00245 DebugLog(<< "recycling connections not used in last " << relThreshold/1000.0 << " seconds"); 00246 00247 // Look through non-flow-timer connections and close those using the passed in relThreshold 00248 unsigned int numRemoved = 0; 00249 for (ConnectionLruList::iterator i = mLRUHead->begin(); 00250 i != mLRUHead->end() && 00251 (maxToRemove == 0 || numRemoved != maxToRemove);) 00252 { 00253 if ((*i)->whenLastUsed() < threshold) 00254 { 00255 Connection* discard = *i; 00256 InfoLog(<< "recycling connection: " << discard << " " << discard->getSocket()); 00257 // iterate before removing 00258 ++i; 00259 delete discard; 00260 numRemoved++; 00261 } 00262 else 00263 { 00264 break; 00265 } 00266 } 00267 00268 // Look through flow-timer connections and close those using the configured FlowTimer 00269 // value + the configured grace period as a threshold 00270 if(mFlowTimerLRUHead->begin() != mFlowTimerLRUHead->end()) 00271 { 00272 threshold = curTimeMs - ((InteropHelper::getFlowTimerSeconds() + InteropHelper::getFlowTimerGracePeriodSeconds()) * 1000); 00273 for (FlowTimerLruList::iterator i2 = mFlowTimerLRUHead->begin(); 00274 i2 != mFlowTimerLRUHead->end() && 00275 (maxToRemove == 0 || numRemoved != maxToRemove);) 00276 { 00277 if ((*i2)->whenLastUsed() < threshold) 00278 { 00279 Connection* discard = *i2; 00280 InfoLog(<< "recycling flow-timer enabled connection: " << discard << " " << discard->getSocket()); 00281 // iterate before removing 00282 ++i2; 00283 delete discard; 00284 numRemoved++; 00285 } 00286 else 00287 { 00288 break; 00289 } 00290 } 00291 } 00292 } 00293 00294 // move to youngest 00295 void 00296 ConnectionManager::touch(Connection* connection) 00297 { 00298 connection->resetLastUsed(); 00299 if(connection->isFlowTimerEnabled()) 00300 { 00301 connection->FlowTimerLruList::remove(); 00302 mFlowTimerLRUHead->push_back(connection); 00303 } 00304 else 00305 { 00306 connection->ConnectionLruList::remove(); 00307 mLRUHead->push_back(connection); 00308 } 00309 } 00310 00311 void 00312 ConnectionManager::moveToFlowTimerLru(Connection *connection) 00313 { 00314 connection->ConnectionLruList::remove(); 00315 mFlowTimerLRUHead->push_back(connection); 00316 } 00317 00318 void 00319 ConnectionManager::process(FdSet& fdset) 00320 { 00321 assert( mPollGrp==NULL ); // owner shouldn't call this if polling 00322 00323 // process the write list 00324 for (ConnectionWriteList::iterator writeIter = mWriteHead->begin(); 00325 writeIter != mWriteHead->end(); ) 00326 { 00327 Connection* currConnection = *writeIter; 00328 00329 // update iterator to next first so that it can traverse safely 00330 // even if current one is removed from the list later 00331 ++writeIter; 00332 00333 if (!currConnection) 00334 continue; 00335 00336 if (fdset.readyToWrite(currConnection->getSocket())) 00337 { 00338 currConnection->performWrites(); 00339 } 00340 else if (fdset.hasException(currConnection->getSocket())) 00341 { 00342 int errNum = 0; 00343 int errNumSize = sizeof(errNum); 00344 getsockopt(currConnection->getSocket(), SOL_SOCKET, SO_ERROR, (char *)&errNum, (socklen_t *)&errNumSize); 00345 InfoLog(<< "Exception writing to socket " << currConnection->getSocket() << " code: " << errNum << "; closing connection"); 00346 delete currConnection; 00347 } 00348 } 00349 00350 // process the read list 00351 for (ConnectionReadList::iterator readIter = mReadHead->begin(); 00352 readIter != mReadHead->end(); ) 00353 { 00354 Connection* currConnection = *readIter; 00355 00356 // update iterator to next first so that it can traverse safely 00357 // even if current one is removed from the list later 00358 ++readIter; 00359 00360 if (!currConnection) 00361 continue; 00362 00363 if ( fdset.readyToRead(currConnection->getSocket()) || 00364 currConnection->hasDataToRead() ) 00365 { 00366 fdset.clear(currConnection->getSocket()); 00367 currConnection->performReads(); 00368 } 00369 else if (fdset.hasException(currConnection->getSocket())) 00370 { 00371 int errNum = 0; 00372 int errNumSize = sizeof(errNum); 00373 getsockopt(currConnection->getSocket(), SOL_SOCKET, SO_ERROR, (char *)&errNum, (socklen_t *)&errNumSize); 00374 InfoLog(<< "Exception reading from socket " << currConnection->getSocket() << " code: " << errNum << "; closing connection"); 00375 delete currConnection; 00376 } 00377 } 00378 } 00379 00380 void 00381 ConnectionManager::setPollGrp(FdPollGrp *grp) 00382 { 00383 // .bwc. We could have all the connections detach and re-attach, but the 00384 // only place we call this when connections exist is when tearing down 00385 // anyway. 00386 closeConnections(); 00387 mPollGrp = grp; 00388 } 00389 00390 /* ==================================================================== 00391 * The Vovida Software License, Version 1.0 00392 * 00393 * Copyright (c) 00394 * 00395 * Redistribution and use in source and binary forms, with or without 00396 * modification, are permitted provided that the following conditions 00397 * are met: 00398 * 00399 * 1. Redistributions of source code must retain the above copyright 00400 * notice, this list of conditions and the following disclaimer. 00401 * 00402 * 2. Redistributions in binary form must reproduce the above copyright 00403 * notice, this list of conditions and the following disclaimer in 00404 * the documentation and/or other materials provided with the 00405 * distribution. 00406 * 00407 * 3. The names "VOCAL", "Vovida Open Communication Application Library", 00408 * and "Vovida Open Communication Application Library (VOCAL)" must 00409 * not be used to endorse or promote products derived from this 00410 * software without prior written permission. For written 00411 * permission, please contact vocal@vovida.org. 00412 * 00413 * 4. Products derived from this software may not be called "VOCAL", nor 00414 * may "VOCAL" appear in their name, without prior written 00415 * permission of Vovida Networks, Inc. 00416 * 00417 * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED 00418 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 00419 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND 00420 * NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA 00421 * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES 00422 * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL, 00423 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00424 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00425 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 00426 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00427 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE 00428 * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 00429 * DAMAGE. 00430 * 00431 * ==================================================================== 00432 * 00433 * This software consists of voluntary contributions made by Vovida 00434 * Networks, Inc. and many individuals on behalf of Vovida Networks, 00435 * Inc. For more information on Vovida Networks, Inc., please see 00436 * <http://www.vovida.org/>. 00437 * 00438 */
1.7.5.1