|
reSIProcate/stack
9694
|
00001 #if defined(HAVE_CONFIG_H) 00002 #include "config.h" 00003 #endif 00004 00005 #include <memory> 00006 #include "rutil/Socket.hxx" 00007 #include "rutil/Data.hxx" 00008 #include "rutil/DnsUtil.hxx" 00009 #include "rutil/Logger.hxx" 00010 #include "resip/stack/TcpBaseTransport.hxx" 00011 00012 #define RESIPROCATE_SUBSYSTEM Subsystem::TRANSPORT 00013 00014 using namespace std; 00015 using namespace resip; 00016 00017 00018 const size_t TcpBaseTransport::MaxWriteSize = 4096; 00019 const size_t TcpBaseTransport::MaxReadSize = 4096; 00020 00021 TcpBaseTransport::TcpBaseTransport(Fifo<TransactionMessage>& fifo, 00022 int portNum, IpVersion version, 00023 const Data& pinterface, 00024 AfterSocketCreationFuncPtr socketFunc, 00025 Compression &compression, 00026 unsigned transportFlags) 00027 : InternalTransport(fifo, portNum, version, pinterface, socketFunc, compression, transportFlags) 00028 { 00029 if ( (mTransportFlags & RESIP_TRANSPORT_FLAG_NOBIND)==0 ) 00030 { 00031 mFd = InternalTransport::socket(TCP, version); 00032 } 00033 } 00034 00035 00036 TcpBaseTransport::~TcpBaseTransport() 00037 { 00038 //DebugLog (<< "Shutting down TCP Transport " << this << " " << mFd << " " << mInterface << ":" << port()); 00039 00040 // !jf! this is not right. should drain the sends before 00041 while (mTxFifoOutBuffer.messageAvailable()) 00042 { 00043 SendData* data = mTxFifoOutBuffer.getNext(); 00044 InfoLog (<< "Throwing away queued data for " << data->destination); 00045 00046 fail(data->transactionId, TransportFailure::TransportShutdown); 00047 delete data; 00048 } 00049 DebugLog (<< "Shutting down " << mTuple); 00050 //mSendRoundRobin.clear(); // clear before we delete the connections 00051 if(mPollGrp && mPollItemHandle) 00052 { 00053 mPollGrp->delPollItem(mPollItemHandle); 00054 mPollItemHandle=0; 00055 } 00056 } 00057 00058 // called from constructor of TcpTransport 00059 void 00060 TcpBaseTransport::init() 00061 { 00062 if ( (mTransportFlags & RESIP_TRANSPORT_FLAG_NOBIND)!=0 ) 00063 { 00064 return; 00065 } 00066 00067 //DebugLog (<< "Opening TCP " << mFd << " : " << this); 00068 00069 int on = 1; 00070 #if !defined(WIN32) 00071 if ( ::setsockopt ( mFd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) ) 00072 #else 00073 if ( ::setsockopt ( mFd, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on)) ) 00074 #endif 00075 { 00076 int e = getErrno(); 00077 InfoLog (<< "Couldn't set sockoptions SO_REUSEPORT | SO_REUSEADDR: " << strerror(e)); 00078 error(e); 00079 throw Exception("Failed setsockopt", __FILE__,__LINE__); 00080 } 00081 00082 bind(); 00083 makeSocketNonBlocking(mFd); 00084 00085 // do the listen, seting the maximum queue size for compeletly established 00086 // sockets -- on linux, tcp_max_syn_backlog should be used for the incomplete 00087 // queue size(see man listen) 00088 int e = listen(mFd,64 ); 00089 00090 if (e != 0 ) 00091 { 00092 int e = getErrno(); 00093 InfoLog (<< "Failed listen " << strerror(e)); 00094 error(e); 00095 // !cj! deal with errors 00096 throw Transport::Exception("Address already in use", __FILE__,__LINE__); 00097 } 00098 } 00099 00100 // ?kw?: when should this be called relative to init() above? merge? 00101 void 00102 TcpBaseTransport::setPollGrp(FdPollGrp *grp) 00103 { 00104 if(mPollGrp && mPollItemHandle) 00105 { 00106 mPollGrp->delPollItem(mPollItemHandle); 00107 mPollItemHandle=0; 00108 } 00109 00110 if ( mFd!=INVALID_SOCKET && grp) 00111 { 00112 mPollItemHandle = grp->addPollItem(mFd, FPEM_Read|FPEM_Edge, this); 00113 // above released by InternalTransport destructor 00114 // ?bwc? Is this really a good idea? If the InternalTransport d'tor is 00115 // freeing this, shouldn't InternalTransport::setPollGrp() handle 00116 // creating it? 00117 } 00118 mConnectionManager.setPollGrp(grp); 00119 00120 InternalTransport::setPollGrp(grp); 00121 } 00122 00123 void 00124 TcpBaseTransport::buildFdSet( FdSet& fdset) 00125 { 00126 assert( mPollGrp==NULL ); 00127 mConnectionManager.buildFdSet(fdset); 00128 if ( mFd!=INVALID_SOCKET ) 00129 { 00130 fdset.setRead(mFd); // for the transport itself (accept) 00131 } 00132 if(!shareStackProcessAndSelect()) 00133 { 00134 mSelectInterruptor.buildFdSet(fdset); 00135 } 00136 } 00137 00142 int 00143 TcpBaseTransport::processListen() 00144 { 00145 if (1) 00146 { 00147 Tuple tuple(mTuple); 00148 struct sockaddr& peer = tuple.getMutableSockaddr(); 00149 socklen_t peerLen = tuple.length(); 00150 Socket sock = accept( mFd, &peer, &peerLen); 00151 if ( sock == SOCKET_ERROR ) 00152 { 00153 int e = getErrno(); 00154 switch (e) 00155 { 00156 case EWOULDBLOCK: 00157 // !jf! this can not be ready in some cases 00158 // !kw! this will happen every epoll cycle 00159 return 0; 00160 default: 00161 Transport::error(e); 00162 } 00163 return -1; 00164 } 00165 makeSocketNonBlocking(sock); 00166 00167 DebugLog (<< "Received TCP connection from: " << tuple << " as fd=" << sock); 00168 00169 if (mSocketFunc) 00170 { 00171 mSocketFunc(sock, transport(), __FILE__, __LINE__); 00172 } 00173 00174 if(!mConnectionManager.findConnection(tuple)) 00175 { 00176 createConnection(tuple, sock, true); 00177 } 00178 else 00179 { 00180 InfoLog(<<"Someone probably sent a reciprocal SYN at us."); 00181 // ?bwc? Can we call this right after calling accept()? 00182 closeSocket(sock); 00183 } 00184 } 00185 return 1; 00186 } 00187 00188 Connection* 00189 TcpBaseTransport::makeOutgoingConnection(const Tuple &dest, 00190 TransportFailure::FailureReason &failReason, int &failSubCode) 00191 { 00192 // attempt to open 00193 Socket sock = InternalTransport::socket( TCP, ipVersion()); 00194 // fdset.clear(sock); !kw! removed as part of epoll impl 00195 00196 if ( sock == INVALID_SOCKET ) // no socket found - try to free one up and try again 00197 { 00198 int err = getErrno(); 00199 InfoLog (<< "Failed to create a socket " << strerror(err)); 00200 error(err); 00201 mConnectionManager.gc(ConnectionManager::MinimumGcAge, 1); // free one up 00202 00203 sock = InternalTransport::socket( TCP, ipVersion()); 00204 if ( sock == INVALID_SOCKET ) 00205 { 00206 err = getErrno(); 00207 WarningLog( << "Error in finding free filedescriptor to use. " << strerror(err)); 00208 error(err); 00209 failReason = TransportFailure::TransportNoSocket; 00210 failSubCode = err; 00211 return NULL; 00212 } 00213 } 00214 00215 assert(sock != INVALID_SOCKET); 00216 00217 DebugLog (<<"Opening new connection to " << dest); 00218 makeSocketNonBlocking(sock); 00219 if (mSocketFunc) 00220 { 00221 mSocketFunc(sock, transport(), __FILE__, __LINE__); 00222 } 00223 const sockaddr& servaddr = dest.getSockaddr(); 00224 int ret = connect( sock, &servaddr, dest.length() ); 00225 00226 // See Chapter 15.3 of Stevens, Unix Network Programming Vol. 1 2nd Edition 00227 if (ret == SOCKET_ERROR) 00228 { 00229 int err = getErrno(); 00230 00231 switch (err) 00232 { 00233 case EINPROGRESS: 00234 case EWOULDBLOCK: 00235 break; 00236 default: 00237 { 00238 // !jf! this has failed 00239 InfoLog( << "Error on TCP connect to " << dest << ", err=" << err << ": " << strerror(err)); 00240 error(err); 00241 //fdset.clear(sock); 00242 closeSocket(sock); 00243 failReason = TransportFailure::TransportBadConnect; 00244 failSubCode = err; 00245 return NULL; 00246 } 00247 } 00248 } 00249 00250 // This will add the connection to the manager 00251 Connection *conn = createConnection(dest, sock, false); 00252 assert(conn); 00253 conn->mRequestPostConnectSocketFuncCall = true; 00254 return conn; 00255 } 00256 00257 void 00258 TcpBaseTransport::processAllWriteRequests() 00259 { 00260 while (mTxFifoOutBuffer.messageAvailable()) 00261 { 00262 SendData* data = mTxFifoOutBuffer.getNext(); 00263 DebugLog (<< "Processing write for " << data->destination); 00264 00265 // this will check by connectionId first, then by address 00266 Connection* conn = mConnectionManager.findConnection(data->destination); 00267 00268 //DebugLog (<< "TcpBaseTransport::processAllWriteRequests() using " << conn); 00269 00270 // There is no connection yet, so make a client connection 00271 if (conn == 0 && 00272 !data->destination.onlyUseExistingConnection && 00273 data->command == 0) // SendData commands (ie. close connection and enable flow timers) shouldn't cause new connections to form 00274 { 00275 TransportFailure::FailureReason failCode = TransportFailure::Failure; 00276 int subCode = 0; 00277 if((conn=makeOutgoingConnection(data->destination, failCode, subCode)) == NULL) 00278 { 00279 fail(data->transactionId, failCode, subCode); 00280 delete data; 00281 return; // .kw. WHY? What about messages left in queue? 00282 } 00283 assert(conn->getSocket() != INVALID_SOCKET); 00284 // .kw. why do below? We already have the conn, who uses key? 00285 data->destination.mFlowKey = conn->getSocket(); // !jf! 00286 } 00287 00288 if (conn == 0) 00289 { 00290 DebugLog (<< "Failed to create/get connection: " << data->destination); 00291 fail(data->transactionId, TransportFailure::TransportNoExistConn, 0); 00292 delete data; 00293 // NOTE: We fail this one but don't give up on others in queue 00294 } 00295 else // have a connection 00296 { 00297 conn->requestWrite(data); 00298 } 00299 } 00300 } 00301 00302 void 00303 TcpBaseTransport::process() 00304 { 00305 mStateMachineFifo.flush(); 00306 00307 // called within SipStack's thread. There is some risk of 00308 // recursion here if connection starts doing anything fancy. 00309 // For backward-compat when not-epoll, don't handle transmit synchronously 00310 // now, but rather wait for the process() call 00311 if (mPollGrp) 00312 { 00313 processAllWriteRequests(); 00314 } 00315 } 00316 00317 void 00318 TcpBaseTransport::process(FdSet& fdSet) 00319 { 00320 assert( mPollGrp==NULL ); 00321 00322 processAllWriteRequests(); 00323 00324 // process the connections in ConnectionManager 00325 mConnectionManager.process(fdSet); 00326 00327 mStateMachineFifo.flush(); 00328 00329 // process our own listen/accept socket for incoming connections 00330 if (mFd!=INVALID_SOCKET && fdSet.readyToRead(mFd)) 00331 { 00332 processListen(); 00333 } 00334 } 00335 00336 void 00337 TcpBaseTransport::processPollEvent(FdPollEventMask mask) { 00338 if ( mask & FPEM_Read ) 00339 { 00340 while ( processListen() > 0 ) 00341 ; 00342 } 00343 } 00344 00345 void 00346 TcpBaseTransport::setRcvBufLen(int buflen) 00347 { 00348 assert(0); // not implemented yet 00349 // need to store away the length and use when setting up new connections 00350 } 00351 00352 00353 00354 /* ==================================================================== 00355 * The Vovida Software License, Version 1.0 00356 * 00357 * Copyright (c) 2000 Vovida Networks, Inc. All rights reserved. 00358 * 00359 * Redistribution and use in source and binary forms, with or without 00360 * modification, are permitted provided that the following conditions 00361 * are met: 00362 * 00363 * 1. Redistributions of source code must retain the above copyright 00364 * notice, this list of conditions and the following disclaimer. 00365 * 00366 * 2. Redistributions in binary form must reproduce the above copyright 00367 * notice, this list of conditions and the following disclaimer in 00368 * the documentation and/or other materials provided with the 00369 * distribution. 00370 * 00371 * 3. The names "VOCAL", "Vovida Open Communication Application Library", 00372 * and "Vovida Open Communication Application Library (VOCAL)" must 00373 * not be used to endorse or promote products derived from this 00374 * software without prior written permission. For written 00375 * permission, please contact vocal@vovida.org. 00376 * 00377 * 4. Products derived from this software may not be called "VOCAL", nor 00378 * may "VOCAL" appear in their name, without prior written 00379 * permission of Vovida Networks, Inc. 00380 * 00381 * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED 00382 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 00383 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND 00384 * NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA 00385 * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES 00386 * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL, 00387 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00388 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00389 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 00390 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00391 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE 00392 * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 00393 * DAMAGE. 00394 * 00395 * ==================================================================== 00396 * 00397 * This software consists of voluntary contributions made by Vovida 00398 * Networks, Inc. and many individuals on behalf of Vovida Networks, 00399 * Inc. For more information on Vovida Networks, Inc., please see 00400 * <http://www.vovida.org/>. 00401 * 00402 * vi: set shiftwidth=3 expandtab: 00403 */
1.7.5.1