reSIProcate/stack  9694
TcpBaseTransport.cxx
Go to the documentation of this file.
00001 #if defined(HAVE_CONFIG_H)
00002 #include "config.h"
00003 #endif
00004 
00005 #include <memory>
00006 #include "rutil/Socket.hxx"
00007 #include "rutil/Data.hxx"
00008 #include "rutil/DnsUtil.hxx"
00009 #include "rutil/Logger.hxx"
00010 #include "resip/stack/TcpBaseTransport.hxx"
00011 
00012 #define RESIPROCATE_SUBSYSTEM Subsystem::TRANSPORT
00013 
00014 using namespace std;
00015 using namespace resip;
00016 
00017 
00018 const size_t TcpBaseTransport::MaxWriteSize = 4096;
00019 const size_t TcpBaseTransport::MaxReadSize = 4096;
00020 
00021 TcpBaseTransport::TcpBaseTransport(Fifo<TransactionMessage>& fifo,
00022                                    int portNum, IpVersion version,
00023                                    const Data& pinterface,
00024                                    AfterSocketCreationFuncPtr socketFunc,
00025                                    Compression &compression,
00026                                    unsigned transportFlags)
00027    : InternalTransport(fifo, portNum, version, pinterface, socketFunc, compression, transportFlags)
00028 {
00029    if ( (mTransportFlags & RESIP_TRANSPORT_FLAG_NOBIND)==0 )
00030    {
00031       mFd = InternalTransport::socket(TCP, version);
00032    }
00033 }
00034 
00035 
00036 TcpBaseTransport::~TcpBaseTransport()
00037 {
00038    //DebugLog (<< "Shutting down TCP Transport " << this << " " << mFd << " " << mInterface << ":" << port());
00039 
00040    // !jf! this is not right. should drain the sends before
00041    while (mTxFifoOutBuffer.messageAvailable())
00042    {
00043       SendData* data = mTxFifoOutBuffer.getNext();
00044       InfoLog (<< "Throwing away queued data for " << data->destination);
00045 
00046       fail(data->transactionId, TransportFailure::TransportShutdown);
00047       delete data;
00048    }
00049    DebugLog (<< "Shutting down " << mTuple);
00050    //mSendRoundRobin.clear(); // clear before we delete the connections
00051    if(mPollGrp && mPollItemHandle)
00052    {
00053       mPollGrp->delPollItem(mPollItemHandle);
00054       mPollItemHandle=0;
00055    }
00056 }
00057 
00058 // called from constructor of TcpTransport
00059 void
00060 TcpBaseTransport::init()
00061 {
00062    if ( (mTransportFlags & RESIP_TRANSPORT_FLAG_NOBIND)!=0 )
00063    {
00064       return;
00065    }
00066 
00067    //DebugLog (<< "Opening TCP " << mFd << " : " << this);
00068 
00069    int on = 1;
00070 #if !defined(WIN32)
00071    if ( ::setsockopt ( mFd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) )
00072 #else
00073    if ( ::setsockopt ( mFd, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on)) )
00074 #endif
00075    {
00076        int e = getErrno();
00077        InfoLog (<< "Couldn't set sockoptions SO_REUSEPORT | SO_REUSEADDR: " << strerror(e));
00078        error(e);
00079        throw Exception("Failed setsockopt", __FILE__,__LINE__);
00080    }
00081 
00082    bind();
00083    makeSocketNonBlocking(mFd);
00084 
00085    // do the listen, seting the maximum queue size for compeletly established
00086    // sockets -- on linux, tcp_max_syn_backlog should be used for the incomplete
00087    // queue size(see man listen)
00088    int e = listen(mFd,64 );
00089 
00090    if (e != 0 )
00091    {
00092       int e = getErrno();
00093       InfoLog (<< "Failed listen " << strerror(e));
00094       error(e);
00095       // !cj! deal with errors
00096       throw Transport::Exception("Address already in use", __FILE__,__LINE__);
00097    }
00098 }
00099 
00100 // ?kw?: when should this be called relative to init() above? merge?
00101 void
00102 TcpBaseTransport::setPollGrp(FdPollGrp *grp)
00103 {
00104    if(mPollGrp && mPollItemHandle)
00105    {
00106       mPollGrp->delPollItem(mPollItemHandle);
00107       mPollItemHandle=0;
00108    }
00109 
00110    if ( mFd!=INVALID_SOCKET && grp)
00111    {
00112       mPollItemHandle = grp->addPollItem(mFd, FPEM_Read|FPEM_Edge, this);
00113       // above released by InternalTransport destructor
00114       // ?bwc? Is this really a good idea? If the InternalTransport d'tor is
00115       // freeing this, shouldn't InternalTransport::setPollGrp() handle 
00116       // creating it?
00117    }
00118    mConnectionManager.setPollGrp(grp);
00119 
00120    InternalTransport::setPollGrp(grp);
00121 }
00122 
00123 void
00124 TcpBaseTransport::buildFdSet( FdSet& fdset)
00125 {
00126    assert( mPollGrp==NULL );
00127    mConnectionManager.buildFdSet(fdset);
00128    if ( mFd!=INVALID_SOCKET )
00129    {
00130       fdset.setRead(mFd); // for the transport itself (accept)
00131    }
00132    if(!shareStackProcessAndSelect())
00133    {
00134       mSelectInterruptor.buildFdSet(fdset);
00135    }
00136 }
00137 
00142 int
00143 TcpBaseTransport::processListen()
00144 {
00145    if (1)
00146    {
00147       Tuple tuple(mTuple);
00148       struct sockaddr& peer = tuple.getMutableSockaddr();
00149       socklen_t peerLen = tuple.length();
00150       Socket sock = accept( mFd, &peer, &peerLen);
00151       if ( sock == SOCKET_ERROR )
00152       {
00153          int e = getErrno();
00154          switch (e)
00155          {
00156             case EWOULDBLOCK:
00157                // !jf! this can not be ready in some cases
00158                // !kw! this will happen every epoll cycle
00159                return 0;
00160             default:
00161                Transport::error(e);
00162          }
00163          return -1;
00164       }
00165       makeSocketNonBlocking(sock);
00166 
00167       DebugLog (<< "Received TCP connection from: " << tuple << " as fd=" << sock);
00168 
00169       if (mSocketFunc)
00170       {
00171          mSocketFunc(sock, transport(), __FILE__, __LINE__);
00172       }
00173 
00174       if(!mConnectionManager.findConnection(tuple))
00175       {
00176          createConnection(tuple, sock, true);
00177       }
00178       else
00179       {
00180          InfoLog(<<"Someone probably sent a reciprocal SYN at us.");
00181          // ?bwc? Can we call this right after calling accept()?
00182          closeSocket(sock);
00183       }
00184    }
00185    return 1;
00186 }
00187 
00188 Connection*
00189 TcpBaseTransport::makeOutgoingConnection(const Tuple &dest,
00190       TransportFailure::FailureReason &failReason, int &failSubCode)
00191 {
00192    // attempt to open
00193    Socket sock = InternalTransport::socket( TCP, ipVersion());
00194    // fdset.clear(sock); !kw! removed as part of epoll impl
00195 
00196    if ( sock == INVALID_SOCKET ) // no socket found - try to free one up and try again
00197    {
00198       int err = getErrno();
00199       InfoLog (<< "Failed to create a socket " << strerror(err));
00200       error(err);
00201       mConnectionManager.gc(ConnectionManager::MinimumGcAge, 1); // free one up
00202 
00203       sock = InternalTransport::socket( TCP, ipVersion());
00204       if ( sock == INVALID_SOCKET )
00205       {
00206          err = getErrno();
00207          WarningLog( << "Error in finding free filedescriptor to use. " << strerror(err));
00208          error(err);
00209          failReason = TransportFailure::TransportNoSocket;
00210          failSubCode = err;
00211          return NULL;
00212       }
00213    }
00214 
00215    assert(sock != INVALID_SOCKET);
00216 
00217    DebugLog (<<"Opening new connection to " << dest);
00218    makeSocketNonBlocking(sock);
00219    if (mSocketFunc)
00220    {
00221       mSocketFunc(sock, transport(), __FILE__, __LINE__);
00222    }
00223    const sockaddr& servaddr = dest.getSockaddr();
00224    int ret = connect( sock, &servaddr, dest.length() );
00225 
00226    // See Chapter 15.3 of Stevens, Unix Network Programming Vol. 1 2nd Edition
00227    if (ret == SOCKET_ERROR)
00228    {
00229       int err = getErrno();
00230 
00231       switch (err)
00232       {
00233          case EINPROGRESS:
00234          case EWOULDBLOCK:
00235             break;
00236          default:
00237          {
00238             // !jf! this has failed
00239             InfoLog( << "Error on TCP connect to " <<  dest << ", err=" << err << ": " << strerror(err));
00240             error(err);
00241             //fdset.clear(sock);
00242             closeSocket(sock);
00243             failReason = TransportFailure::TransportBadConnect;
00244             failSubCode = err;
00245             return NULL;
00246          }
00247       }
00248    }
00249 
00250    // This will add the connection to the manager
00251    Connection *conn = createConnection(dest, sock, false);
00252    assert(conn);
00253    conn->mRequestPostConnectSocketFuncCall = true;
00254    return conn;
00255 }
00256 
00257 void
00258 TcpBaseTransport::processAllWriteRequests()
00259 {
00260    while (mTxFifoOutBuffer.messageAvailable())
00261    {
00262       SendData* data = mTxFifoOutBuffer.getNext();
00263       DebugLog (<< "Processing write for " << data->destination);
00264 
00265       // this will check by connectionId first, then by address
00266       Connection* conn = mConnectionManager.findConnection(data->destination);
00267 
00268       //DebugLog (<< "TcpBaseTransport::processAllWriteRequests() using " << conn);
00269 
00270       // There is no connection yet, so make a client connection
00271       if (conn == 0 && 
00272             !data->destination.onlyUseExistingConnection &&
00273             data->command == 0)  // SendData commands (ie. close connection and enable flow timers) shouldn't cause new connections to form
00274       {
00275          TransportFailure::FailureReason failCode = TransportFailure::Failure;
00276          int subCode = 0;
00277          if((conn=makeOutgoingConnection(data->destination, failCode, subCode)) == NULL)
00278          {
00279             fail(data->transactionId, failCode, subCode);
00280             delete data;
00281             return;     // .kw. WHY? What about messages left in queue?
00282          }
00283          assert(conn->getSocket() != INVALID_SOCKET);
00284          // .kw. why do below? We already have the conn, who uses key?
00285          data->destination.mFlowKey = conn->getSocket(); // !jf!
00286       }
00287 
00288       if (conn == 0)
00289       {
00290          DebugLog (<< "Failed to create/get connection: " << data->destination);
00291          fail(data->transactionId, TransportFailure::TransportNoExistConn, 0);
00292          delete data;
00293          // NOTE: We fail this one but don't give up on others in queue
00294       }
00295       else // have a connection
00296       {
00297          conn->requestWrite(data);
00298       }
00299    }
00300 }
00301 
00302 void
00303 TcpBaseTransport::process()
00304 {
00305    mStateMachineFifo.flush();
00306 
00307    // called within SipStack's thread. There is some risk of
00308    // recursion here if connection starts doing anything fancy.
00309    // For backward-compat when not-epoll, don't handle transmit synchronously
00310    // now, but rather wait for the process() call
00311    if (mPollGrp)
00312    {
00313       processAllWriteRequests();
00314    }
00315 }
00316 
00317 void
00318 TcpBaseTransport::process(FdSet& fdSet)
00319 {
00320    assert( mPollGrp==NULL );
00321 
00322    processAllWriteRequests();
00323 
00324    // process the connections in ConnectionManager
00325    mConnectionManager.process(fdSet);
00326 
00327    mStateMachineFifo.flush();
00328 
00329    // process our own listen/accept socket for incoming connections
00330    if (mFd!=INVALID_SOCKET && fdSet.readyToRead(mFd))
00331    {
00332       processListen();
00333    }
00334 }
00335 
00336 void
00337 TcpBaseTransport::processPollEvent(FdPollEventMask mask) {
00338    if ( mask & FPEM_Read )
00339    {
00340       while ( processListen() > 0 )
00341          ;
00342    }
00343 }
00344 
00345 void
00346 TcpBaseTransport::setRcvBufLen(int buflen)
00347 {
00348    assert(0);   // not implemented yet
00349    // need to store away the length and use when setting up new connections
00350 }
00351 
00352 
00353 
00354 /* ====================================================================
00355  * The Vovida Software License, Version 1.0
00356  *
00357  * Copyright (c) 2000 Vovida Networks, Inc.  All rights reserved.
00358  *
00359  * Redistribution and use in source and binary forms, with or without
00360  * modification, are permitted provided that the following conditions
00361  * are met:
00362  *
00363  * 1. Redistributions of source code must retain the above copyright
00364  *    notice, this list of conditions and the following disclaimer.
00365  *
00366  * 2. Redistributions in binary form must reproduce the above copyright
00367  *    notice, this list of conditions and the following disclaimer in
00368  *    the documentation and/or other materials provided with the
00369  *    distribution.
00370  *
00371  * 3. The names "VOCAL", "Vovida Open Communication Application Library",
00372  *    and "Vovida Open Communication Application Library (VOCAL)" must
00373  *    not be used to endorse or promote products derived from this
00374  *    software without prior written permission. For written
00375  *    permission, please contact vocal@vovida.org.
00376  *
00377  * 4. Products derived from this software may not be called "VOCAL", nor
00378  *    may "VOCAL" appear in their name, without prior written
00379  *    permission of Vovida Networks, Inc.
00380  *
00381  * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
00382  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
00383  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
00384  * NON-INFRINGEMENT ARE DISCLAIMED.  IN NO EVENT SHALL VOVIDA
00385  * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
00386  * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
00387  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00388  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00389  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
00390  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00391  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
00392  * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
00393  * DAMAGE.
00394  *
00395  * ====================================================================
00396  *
00397  * This software consists of voluntary contributions made by Vovida
00398  * Networks, Inc. and many individuals on behalf of Vovida Networks,
00399  * Inc.  For more information on Vovida Networks, Inc., please see
00400  * <http://www.vovida.org/>.
00401  *
00402  * vi: set shiftwidth=3 expandtab:
00403  */