reSIProcate/stack  9694
Connection.cxx
Go to the documentation of this file.
00001 #if defined(HAVE_CONFIG_H)
00002 #include "config.h"
00003 #endif
00004 
00005 #include "rutil/Socket.hxx"
00006 #include "rutil/Logger.hxx"
00007 #include "resip/stack/Connection.hxx"
00008 #include "resip/stack/ConnectionManager.hxx"
00009 #include "resip/stack/InteropHelper.hxx"
00010 #include "resip/stack/SipMessage.hxx"
00011 #include "resip/stack/TcpBaseTransport.hxx"
00012 #include "rutil/WinLeakCheck.hxx"
00013 
00014 #ifdef USE_SSL
00015 #include "resip/stack/ssl/Security.hxx"
00016 #endif
00017 
00018 #ifdef USE_SIGCOMP
00019 #include <osc/Stack.h>
00020 #include <osc/SigcompMessage.h>
00021 #endif
00022 
00023 using namespace resip;
00024 
00025 volatile bool Connection::mEnablePostConnectSocketFuncCall = false;
00026 
00027 #define RESIPROCATE_SUBSYSTEM Subsystem::TRANSPORT
00028 
00029 Connection::Connection(Transport* transport,const Tuple& who, Socket socket,
00030                        Compression &compression)
00031    : ConnectionBase(transport,who,compression),
00032      mRequestPostConnectSocketFuncCall(false),
00033      mInWritable(false),
00034      mFlowTimerEnabled(false),
00035      mPollItemHandle(0)
00036 {
00037    mWho.mFlowKey=(FlowKey)socket;
00038    InfoLog (<< "Connection::Connection: new connection created to who: " << mWho);
00039 
00040    if(mWho.mFlowKey && ConnectionBase::transport())
00041    {
00042       getConnectionManager().addConnection(this);
00043    }
00044 }
00045 
00046 Connection::~Connection()
00047 {
00048    if(mWho.mFlowKey && ConnectionBase::transport())
00049    {
00050       getConnectionManager().removeConnection(this);
00051       // remove first then close, since conn manager may need socket
00052       closeSocket(mWho.mFlowKey);
00053    }
00054 }
00055 
00056 void
00057 Connection::requestWrite(SendData* sendData)
00058 {
00059    mOutstandingSends.push_back(sendData);
00060    if (isWritable())
00061    {
00062       ensureWritable();
00063    }
00064 }
00065 
00066 void 
00067 Connection::removeFrontOutstandingSend()
00068 {
00069    delete mOutstandingSends.front();
00070    mOutstandingSends.pop_front();
00071 
00072    if (mOutstandingSends.empty())
00073    {
00074       assert(mInWritable);
00075       getConnectionManager().removeFromWritable(this);
00076       mInWritable = false;
00077    }
00078 }
00079 
00080 int
00081 Connection::performWrite()
00082 {
00083    if(transportWrite())
00084    {
00085       assert(mInWritable);
00086       getConnectionManager().removeFromWritable(this);
00087       mInWritable = false;
00088       return 0; // What does this transportWrite() mean?
00089    }
00090 
00091    assert(!mOutstandingSends.empty());
00092    switch(mOutstandingSends.front()->command)
00093    {
00094    case SendData::CloseConnection:
00095       // .bwc. Close this connection.
00096       return -1;
00097       break;
00098    case SendData::EnableFlowTimer:
00099       enableFlowTimer();
00100       removeFrontOutstandingSend();
00101       return 0;
00102       break;
00103    default:
00104       // do nothing
00105       break;
00106    }
00107 
00108    const Data& sigcompId = mOutstandingSends.front()->sigcompId;
00109 
00110    if(mSendingTransmissionFormat == Unknown)
00111    {
00112       if (sigcompId.size() > 0 && mCompression.isEnabled())
00113       {
00114          mSendingTransmissionFormat = Compressed;
00115       }
00116       else
00117       {
00118          mSendingTransmissionFormat = Uncompressed;
00119       }
00120    }
00121 
00122 
00123 #ifdef USE_SIGCOMP
00124    // Perform compression here, if appropriate
00125    if (mSendingTransmissionFormat == Compressed
00126        && !(mOutstandingSends.front()->isAlreadyCompressed))
00127    {
00128       const Data& uncompressed = mOutstandingSends.front()->data;
00129       osc::SigcompMessage *sm = 
00130         mSigcompStack->compressMessage(uncompressed.data(), uncompressed.size(),
00131                                        sigcompId.data(), sigcompId.size(),
00132                                        true);
00133       DebugLog (<< "Compressed message from "
00134                 << uncompressed.size() << " bytes to " 
00135                 << sm->getStreamLength() << " bytes");
00136 
00137       SendData *oldSd = mOutstandingSends.front();
00138       SendData *newSd = new SendData(oldSd->destination,
00139                                      Data(sm->getStreamMessage(),
00140                                           sm->getStreamLength()),
00141                                      oldSd->transactionId,
00142                                      oldSd->sigcompId,
00143                                      true);
00144       mOutstandingSends.front() = newSd;
00145       delete oldSd;
00146       delete sm;
00147    }
00148 #endif
00149 
00150    if(mEnablePostConnectSocketFuncCall && mRequestPostConnectSocketFuncCall)
00151    {
00152        // Note:  The first time the socket is available for write, is when the TCP connect call is completed
00153       mRequestPostConnectSocketFuncCall = false;
00154       mTransport->callSocketFunc(getSocket());
00155    }
00156 
00157    const Data& data = mOutstandingSends.front()->data;
00158 
00159    int nBytes = write(data.data() + mSendPos,int(data.size() - mSendPos));
00160 
00161    //DebugLog (<< "Tried to send " << data.size() - mSendPos << " bytes, sent " << nBytes << " bytes");
00162 
00163    if (nBytes < 0)
00164    {
00165       if(errno!=EAGAIN)
00166       {
00167          //fail(data.transactionId);
00168          InfoLog(<< "Write failed on socket: " << this->getSocket() << ", closing connection");
00169          return -1;
00170       }
00171       else
00172       {
00173          return 0;
00174       }
00175    }
00176    else
00177    {
00178       // Safe because of the conditional above ( < 0 ).
00179       Data::size_type bytesWritten = static_cast<Data::size_type>(nBytes);
00180       mSendPos += bytesWritten;
00181       if (mSendPos == data.size())
00182       {
00183          mSendPos = 0;
00184          removeFrontOutstandingSend();
00185       }
00186       return bytesWritten;
00187    }
00188 }
00189 
00190 
00191 bool 
00192 Connection::performWrites(unsigned int max)
00193 {
00194    int res;
00195    // if max==0, we will overflow into UINT_MAX. This is intentional.
00196    while((res=performWrite())>0 && !mOutstandingSends.empty() && --max!=0)
00197    {;}
00198 
00199    if(res<0)
00200    {
00201       delete this;
00202       return false;
00203    }
00204    return true;
00205 }
00206 
00207 void 
00208 Connection::ensureWritable()
00209 {
00210    if(!mInWritable)
00211    {
00212       assert(!mOutstandingSends.empty());
00213       getConnectionManager().addToWritable(this);
00214       mInWritable = true;
00215    }
00216 }
00217 
00218 ConnectionManager&
00219 Connection::getConnectionManager() const
00220 {
00221    TcpBaseTransport* transport = static_cast<TcpBaseTransport*>(ConnectionBase::transport());
00222    
00223    return transport->getConnectionManager();
00224 }
00225             
00226 EncodeStream& 
00227 resip::operator<<(EncodeStream& strm, const resip::Connection& c)
00228 {
00229    strm << "CONN: " << &c << " " << int(c.getSocket()) << " " << c.mWho;
00230    return strm;
00231 }
00232 
00233 int
00234 Connection::read()
00235 {
00236    std::pair<char*, size_t> writePair = getWriteBuffer();
00237    size_t bytesToRead = resipMin(writePair.second, 
00238                                  static_cast<size_t>(Connection::ChunkSize));
00239          
00240    assert(bytesToRead > 0);
00241 
00242    int bytesRead = read(writePair.first, (int)bytesToRead);
00243    if (bytesRead <= 0)
00244    {
00245       return bytesRead;
00246    }  
00247    // mBuffer might have been reallocated inside read()
00248    writePair = getCurrentWriteBuffer();
00249 
00250    getConnectionManager().touch(this);
00251 
00252 #ifdef USE_SIGCOMP
00253    // If this is the first data we read, determine whether the
00254    // connection is compressed.
00255    if(mReceivingTransmissionFormat == Unknown)
00256    {
00257      if (((writePair.first[0] & 0xf8) == 0xf8) && mCompression.isEnabled())
00258      {
00259        mReceivingTransmissionFormat = Compressed;
00260      }
00261      else
00262      {
00263        mReceivingTransmissionFormat = Uncompressed;
00264      }
00265    }
00266 
00267    // SigComp compressed messages are handed very differently
00268    // than non-compressed messages: they are guaranteed to
00269    // be framed within SigComp, and each frame contains
00270    // *exactly* one SIP message. Processing looks a lot like
00271    // it does for Datagram-oriented transports.
00272 
00273    if (mReceivingTransmissionFormat == Compressed)
00274    {
00275      decompressNewBytes(bytesRead);
00276    }
00277    else
00278 #endif
00279    {
00280      if(!preparseNewBytes(bytesRead))
00281      {
00282         // Iffy; only way we have right now to indicate that this connection has
00283         // gone away.
00284         bytesRead=-1;
00285      }
00286    }
00287    return bytesRead;
00288 }
00289 
00290 bool 
00291 Connection::performReads(unsigned int max)
00292 {
00293    int bytesRead;
00294 
00295    // if max==0, we will overflow into UINT_MAX. This is intentional.
00296    while((bytesRead = read())>0 && --max!=0)
00297    {
00298       DebugLog(<< "Connection::performReads() " << " read=" << bytesRead);
00299    }
00300 
00301    if ( bytesRead < 0 ) 
00302    {
00303       DebugLog(<< "Closing connection bytesRead=" << bytesRead);
00304       delete this;
00305       return false;
00306    }
00307    return true;
00308 }
00309 
00310 void
00311 Connection::enableFlowTimer()
00312 {
00313    if(!mFlowTimerEnabled)
00314    {
00315       mFlowTimerEnabled = true;
00316 
00317       // ensure connection is in a FlowTimer LRU list on the connection manager
00318       getConnectionManager().moveToFlowTimerLru(this);
00319    }
00320 }
00321 
00322 void
00323 Connection::onDoubleCRLF()
00324 {
00325    // !bwc! TODO might need to make this more efficient.
00326    // !bwc! Need to make this sigcomp-friendly
00327    if(InteropHelper::getOutboundVersion()>=8)
00328    {
00329       DebugLog(<<"Sending response CRLF (aka pong).");
00330       requestWrite(new SendData(mWho,Symbols::CRLF,Data::Empty,Data::Empty));
00331    }
00332 }
00333 
00334 void
00335 Connection::onSingleCRLF()
00336 {
00337    DebugLog(<<"Received response CRLF (aka pong).");
00338    mTransport->keepAlivePong(mWho);
00339 }
00340 
00341 bool 
00342 Connection::hasDataToRead()
00343 {
00344    return true;
00345 }
00346 
00347 bool 
00348 Connection::isGood()
00349 {
00350    return true;
00351 }
00352 
00353 bool 
00354 Connection::isWritable()
00355 {
00356    return true;
00357 }
00358 
00362 void
00363 Connection::processPollEvent(FdPollEventMask mask) {
00364    /* The original code in ConnectionManager.cxx didn't check
00365     * for error events unless no writable event. (e.g., writable
00366     * masked error. Why?)
00367     */
00368    if ( mask & FPEM_Error ) 
00369    {
00370       Socket fd = getSocket();
00371       int errNum = getSocketError(fd);
00372       InfoLog(<< "Exception on socket " << fd << " code: " << errNum << "; closing connection");
00373       setFailureReason(TransportFailure::ConnectionException, errNum);
00374       delete this;
00375       return;
00376    }
00377    if ( mask & FPEM_Write ) 
00378    {
00379       if(!performWrites())
00380       {
00381          // Just deleted self
00382          return;
00383       }
00384    }
00385    if ( mask & FPEM_Read ) 
00386    {
00387       performReads();
00388    }
00389 }
00390 
00391 /* ====================================================================
00392  * The Vovida Software License, Version 1.0 
00393  * 
00394  * Copyright (c) 2000
00395  * 
00396  * Redistribution and use in source and binary forms, with or without
00397  * modification, are permitted provided that the following conditions
00398  * are met:
00399  * 
00400  * 1. Redistributions of source code must retain the above copyright
00401  *    notice, this list of conditions and the following disclaimer.
00402  * 
00403  * 2. Redistributions in binary form must reproduce the above copyright
00404  *    notice, this list of conditions and the following disclaimer in
00405  *    the documentation and/or other materials provided with the
00406  *    distribution.
00407  * 
00408  * 3. The names "VOCAL", "Vovida Open Communication Application Library",
00409  *    and "Vovida Open Communication Application Library (VOCAL)" must
00410  *    not be used to endorse or promote products derived from this
00411  *    software without prior written permission. For written
00412  *    permission, please contact vocal@vovida.org.
00413  *
00414  * 4. Products derived from this software may not be called "VOCAL", nor
00415  *    may "VOCAL" appear in their name, without prior written
00416  *    permission of Vovida Networks, Inc.
00417  * 
00418  * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
00419  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
00420  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
00421  * NON-INFRINGEMENT ARE DISCLAIMED.  IN NO EVENT SHALL VOVIDA
00422  * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
00423  * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
00424  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00425  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00426  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
00427  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00428  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
00429  * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
00430  * DAMAGE.
00431  * 
00432  * ====================================================================
00433  * 
00434  * This software consists of voluntary contributions made by Vovida
00435  * Networks, Inc. and many individuals on behalf of Vovida Networks,
00436  * Inc.  For more information on Vovida Networks, Inc., please see
00437  * <http://www.vovida.org/>.
00438  *
00439  * vi: set shiftwidth=3 expandtab:
00440  */