reSIProcate/stack  9694
Public Member Functions | Static Public Member Functions | Public Attributes | Static Public Attributes | Protected Member Functions | Private Member Functions | Private Attributes | Friends
resip::Connection Class Reference

Connection implements, via sockets, ConnectionBase for managed connections. More...

#include <Connection.hxx>

Inheritance diagram for resip::Connection:
Inheritance graph
[legend]
Collaboration diagram for resip::Connection:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 Connection (Transport *transport, const Tuple &who, Socket socket, Compression &compression)
virtual ~Connection ()
Socket getSocket () const
virtual bool hasDataToRead ()
 always true -- always add to fdset as read ready
virtual bool isGood ()
 has valid connection
virtual bool isWritable ()
virtual bool transportWrite ()
void requestWrite (SendData *sendData)
 queue data to write and add this to writable list
int performWrite ()
 send some or all of a queued data; remove from writable if completely written
bool performWrites (unsigned int max=0)
 Call performWrite() repeatedly, until either the send queue is exhausted, the write() call fails (probably because the fd is no longer ready to write), or a set number of writes has been performed.
void ensureWritable ()
 ensure that we are on the writeable list if required
int read ()
 move data from the connection to the buffer; move this to front of least recently used list.
bool performReads (unsigned int max=0)
 Call read() repeatedly, until an error occurs (because the fd is not ready to read, most likely).
void enableFlowTimer ()
 Ensures this connection is in the FlowTimer LRU list in the connection manager.
bool isFlowTimerEnabled ()

Static Public Member Functions

static void setEnablePostConnectSocketFuncCall (bool enabled=true)

Public Attributes

bool mRequestPostConnectSocketFuncCall

Static Public Attributes

static volatile bool mEnablePostConnectSocketFuncCall = false

Protected Member Functions

virtual int read (char *, const int)
 pure virtual, but need concrete Connection for book-ends of lists
virtual int write (const char *, const int)
 pure virtual, but need concrete Connection for book-ends of lists
virtual void onDoubleCRLF ()
virtual void onSingleCRLF ()
virtual void processPollEvent (FdPollEventMask mask)
 Virtual function of FdPollItemIf, called to process io events.

Private Member Functions

ConnectionManagergetConnectionManager () const
void removeFrontOutstandingSend ()
 Connection ()
 no default c'tor
 Connection (const Connection &)
 no value semantics
Connectionoperator= (const Connection &)

Private Attributes

bool mInWritable
bool mFlowTimerEnabled
FdPollItemHandle mPollItemHandle

Friends

class ConnectionManager
EncodeStreamoperator<< (EncodeStream &strm, const resip::Connection &c)

Detailed Description

Connection implements, via sockets, ConnectionBase for managed connections.

Connections are managed for approximate fairness and least recently used garbage collection. Connection inherits three different instantiations of intrusive lists.

Definition at line 35 of file Connection.hxx.


Constructor & Destructor Documentation

Connection::Connection ( Transport transport,
const Tuple who,
Socket  socket,
Compression compression 
)

Definition at line 29 of file Connection.cxx.

References resip::ConnectionManager::addConnection(), getConnectionManager(), InfoLog, resip::Tuple::mFlowKey, resip::ConnectionBase::mWho, and resip::ConnectionBase::transport().

   : ConnectionBase(transport,who,compression),
     mRequestPostConnectSocketFuncCall(false),
     mInWritable(false),
     mFlowTimerEnabled(false),
     mPollItemHandle(0)
{
   mWho.mFlowKey=(FlowKey)socket;
   InfoLog (<< "Connection::Connection: new connection created to who: " << mWho);

   if(mWho.mFlowKey && ConnectionBase::transport())
   {
      getConnectionManager().addConnection(this);
   }
}

Here is the call graph for this function:

Connection::~Connection ( ) [virtual]

Definition at line 46 of file Connection.cxx.

References resip::closeSocket(), getConnectionManager(), resip::Tuple::mFlowKey, resip::ConnectionBase::mWho, resip::ConnectionManager::removeConnection(), and resip::ConnectionBase::transport().

{
   if(mWho.mFlowKey && ConnectionBase::transport())
   {
      getConnectionManager().removeConnection(this);
      // remove first then close, since conn manager may need socket
      closeSocket(mWho.mFlowKey);
   }
}

Here is the call graph for this function:

resip::Connection::Connection ( ) [private]

no default c'tor

resip::Connection::Connection ( const Connection ) [private]

no value semantics


Member Function Documentation

void Connection::enableFlowTimer ( )

Ensures this connection is in the FlowTimer LRU list in the connection manager.

Definition at line 311 of file Connection.cxx.

References getConnectionManager(), mFlowTimerEnabled, and resip::ConnectionManager::moveToFlowTimerLru().

Referenced by performWrite().

{
   if(!mFlowTimerEnabled)
   {
      mFlowTimerEnabled = true;

      // ensure connection is in a FlowTimer LRU list on the connection manager
      getConnectionManager().moveToFlowTimerLru(this);
   }
}

Here is the call graph for this function:

void Connection::ensureWritable ( )

ensure that we are on the writeable list if required

Definition at line 208 of file Connection.cxx.

References resip::ConnectionManager::addToWritable(), getConnectionManager(), mInWritable, and resip::ConnectionBase::mOutstandingSends.

Referenced by resip::TlsConnection::checkState(), and requestWrite().

{
   if(!mInWritable)
   {
      assert(!mOutstandingSends.empty());
      getConnectionManager().addToWritable(this);
      mInWritable = true;
   }
}

Here is the call graph for this function:

ConnectionManager & Connection::getConnectionManager ( ) const [private]
Socket resip::Connection::getSocket ( ) const [inline]
Note:
Right now, Connection is assumed to be a TCP connection. This means that there is a 1-1 correspondence between FD and Connection. If this changes down the line (say, if we use this class to do SCTP connections), we can just remove this function.

Definition at line 56 of file Connection.hxx.

References resip::Tuple::mFlowKey, and resip::ConnectionBase::mWho.

Referenced by resip::ConnectionManager::addConnection(), resip::ConnectionManager::gc(), performWrite(), resip::ConnectionManager::process(), resip::TcpBaseTransport::processAllWriteRequests(), processPollEvent(), and resip::TcpConnection::read().

{return mWho.mFlowKey;}
bool Connection::hasDataToRead ( ) [virtual]

always true -- always add to fdset as read ready

Reimplemented in resip::TlsConnection, and resip::TcpConnection.

Definition at line 342 of file Connection.cxx.

Referenced by resip::ConnectionManager::process().

{
   return true;
}
bool resip::Connection::isFlowTimerEnabled ( ) [inline]
bool Connection::isGood ( ) [virtual]

has valid connection

Reimplemented in resip::TlsConnection, and resip::TcpConnection.

Definition at line 348 of file Connection.cxx.

{
   return true;
}
bool Connection::isWritable ( ) [virtual]

Reimplemented in resip::TlsConnection, and resip::TcpConnection.

Definition at line 354 of file Connection.cxx.

Referenced by requestWrite().

{
   return true;
}
void Connection::onDoubleCRLF ( ) [protected, virtual]

Reimplemented from resip::ConnectionBase.

Definition at line 323 of file Connection.cxx.

References resip::Symbols::CRLF, DebugLog, resip::Data::Empty, resip::InteropHelper::getOutboundVersion(), resip::ConnectionBase::mWho, and requestWrite().

{
   // !bwc! TODO might need to make this more efficient.
   // !bwc! Need to make this sigcomp-friendly
   if(InteropHelper::getOutboundVersion()>=8)
   {
      DebugLog(<<"Sending response CRLF (aka pong).");
      requestWrite(new SendData(mWho,Symbols::CRLF,Data::Empty,Data::Empty));
   }
}

Here is the call graph for this function:

void Connection::onSingleCRLF ( ) [protected, virtual]

Reimplemented from resip::ConnectionBase.

Definition at line 335 of file Connection.cxx.

References DebugLog, resip::Transport::keepAlivePong(), resip::ConnectionBase::mTransport, and resip::ConnectionBase::mWho.

{
   DebugLog(<<"Received response CRLF (aka pong).");
   mTransport->keepAlivePong(mWho);
}

Here is the call graph for this function:

Connection& resip::Connection::operator= ( const Connection ) [private]

Reimplemented from resip::ConnectionBase.

bool Connection::performReads ( unsigned int  max = 0)

Call read() repeatedly, until an error occurs (because the fd is not ready to read, most likely).

Parameters:
maxThe maximum number of reads to perform. 0 indicates that there is no limit.
Returns:
false iff this connection has deleted itself.

Definition at line 291 of file Connection.cxx.

References DebugLog, and read().

Referenced by resip::ConnectionManager::process(), and processPollEvent().

{
   int bytesRead;

   // if max==0, we will overflow into UINT_MAX. This is intentional.
   while((bytesRead = read())>0 && --max!=0)
   {
      DebugLog(<< "Connection::performReads() " << " read=" << bytesRead);
   }

   if ( bytesRead < 0 ) 
   {
      DebugLog(<< "Closing connection bytesRead=" << bytesRead);
      delete this;
      return false;
   }
   return true;
}

Here is the call graph for this function:

int Connection::performWrite ( )

send some or all of a queued data; remove from writable if completely written

Definition at line 81 of file Connection.cxx.

References resip::Transport::callSocketFunc(), resip::SendData::CloseConnection, resip::ConnectionBase::Compressed, resip::Data::data(), DebugLog, resip::SendData::EnableFlowTimer, enableFlowTimer(), errno, getConnectionManager(), getSocket(), InfoLog, resip::Compression::isEnabled(), resip::ConnectionBase::mCompression, mEnablePostConnectSocketFuncCall, mInWritable, resip::ConnectionBase::mOutstandingSends, mRequestPostConnectSocketFuncCall, resip::ConnectionBase::mSendingTransmissionFormat, resip::ConnectionBase::mSendPos, resip::ConnectionBase::mSigcompStack, resip::ConnectionBase::mTransport, resip::ConnectionManager::removeFromWritable(), removeFrontOutstandingSend(), resip::Data::size(), transportWrite(), resip::ConnectionBase::Uncompressed, resip::ConnectionBase::Unknown, and write().

Referenced by performWrites().

{
   if(transportWrite())
   {
      assert(mInWritable);
      getConnectionManager().removeFromWritable(this);
      mInWritable = false;
      return 0; // What does this transportWrite() mean?
   }

   assert(!mOutstandingSends.empty());
   switch(mOutstandingSends.front()->command)
   {
   case SendData::CloseConnection:
      // .bwc. Close this connection.
      return -1;
      break;
   case SendData::EnableFlowTimer:
      enableFlowTimer();
      removeFrontOutstandingSend();
      return 0;
      break;
   default:
      // do nothing
      break;
   }

   const Data& sigcompId = mOutstandingSends.front()->sigcompId;

   if(mSendingTransmissionFormat == Unknown)
   {
      if (sigcompId.size() > 0 && mCompression.isEnabled())
      {
         mSendingTransmissionFormat = Compressed;
      }
      else
      {
         mSendingTransmissionFormat = Uncompressed;
      }
   }


#ifdef USE_SIGCOMP
   // Perform compression here, if appropriate
   if (mSendingTransmissionFormat == Compressed
       && !(mOutstandingSends.front()->isAlreadyCompressed))
   {
      const Data& uncompressed = mOutstandingSends.front()->data;
      osc::SigcompMessage *sm = 
        mSigcompStack->compressMessage(uncompressed.data(), uncompressed.size(),
                                       sigcompId.data(), sigcompId.size(),
                                       true);
      DebugLog (<< "Compressed message from "
                << uncompressed.size() << " bytes to " 
                << sm->getStreamLength() << " bytes");

      SendData *oldSd = mOutstandingSends.front();
      SendData *newSd = new SendData(oldSd->destination,
                                     Data(sm->getStreamMessage(),
                                          sm->getStreamLength()),
                                     oldSd->transactionId,
                                     oldSd->sigcompId,
                                     true);
      mOutstandingSends.front() = newSd;
      delete oldSd;
      delete sm;
   }
#endif

   if(mEnablePostConnectSocketFuncCall && mRequestPostConnectSocketFuncCall)
   {
       // Note:  The first time the socket is available for write, is when the TCP connect call is completed
      mRequestPostConnectSocketFuncCall = false;
      mTransport->callSocketFunc(getSocket());
   }

   const Data& data = mOutstandingSends.front()->data;

   int nBytes = write(data.data() + mSendPos,int(data.size() - mSendPos));

   //DebugLog (<< "Tried to send " << data.size() - mSendPos << " bytes, sent " << nBytes << " bytes");

   if (nBytes < 0)
   {
      if(errno!=EAGAIN)
      {
         //fail(data.transactionId);
         InfoLog(<< "Write failed on socket: " << this->getSocket() << ", closing connection");
         return -1;
      }
      else
      {
         return 0;
      }
   }
   else
   {
      // Safe because of the conditional above ( < 0 ).
      Data::size_type bytesWritten = static_cast<Data::size_type>(nBytes);
      mSendPos += bytesWritten;
      if (mSendPos == data.size())
      {
         mSendPos = 0;
         removeFrontOutstandingSend();
      }
      return bytesWritten;
   }
}

Here is the call graph for this function:

bool Connection::performWrites ( unsigned int  max = 0)

Call performWrite() repeatedly, until either the send queue is exhausted, the write() call fails (probably because the fd is no longer ready to write), or a set number of writes has been performed.

Parameters:
maxThe maximum number of writes to perform. 0 indicates that there is no limit.
Returns:
false iff this connection has deleted itself.

Definition at line 192 of file Connection.cxx.

References resip::ConnectionBase::mOutstandingSends, and performWrite().

Referenced by resip::ConnectionManager::process(), and processPollEvent().

{
   int res;
   // if max==0, we will overflow into UINT_MAX. This is intentional.
   while((res=performWrite())>0 && !mOutstandingSends.empty() && --max!=0)
   {;}

   if(res<0)
   {
      delete this;
      return false;
   }
   return true;
}

Here is the call graph for this function:

void Connection::processPollEvent ( FdPollEventMask  mask) [protected, virtual]

Virtual function of FdPollItemIf, called to process io events.

Implements resip::FdPollItemIf.

Definition at line 363 of file Connection.cxx.

References resip::TransportFailure::ConnectionException, FPEM_Error, FPEM_Read, FPEM_Write, getSocket(), resip::getSocketError(), InfoLog, performReads(), performWrites(), and resip::ConnectionBase::setFailureReason().

                                                 {
   /* The original code in ConnectionManager.cxx didn't check
    * for error events unless no writable event. (e.g., writable
    * masked error. Why?)
    */
   if ( mask & FPEM_Error ) 
   {
      Socket fd = getSocket();
      int errNum = getSocketError(fd);
      InfoLog(<< "Exception on socket " << fd << " code: " << errNum << "; closing connection");
      setFailureReason(TransportFailure::ConnectionException, errNum);
      delete this;
      return;
   }
   if ( mask & FPEM_Write ) 
   {
      if(!performWrites())
      {
         // Just deleted self
         return;
      }
   }
   if ( mask & FPEM_Read ) 
   {
      performReads();
   }
}

Here is the call graph for this function:

int Connection::read ( )

move data from the connection to the buffer; move this to front of least recently used list.

when the message is complete, it is delivered via mTransport->pushRxMsgUp() which generally puts it on a fifo

Definition at line 234 of file Connection.cxx.

References resip::ConnectionBase::ChunkSize, resip::ConnectionBase::Compressed, resip::ConnectionBase::decompressNewBytes(), getConnectionManager(), resip::ConnectionBase::getCurrentWriteBuffer(), resip::ConnectionBase::getWriteBuffer(), resip::Compression::isEnabled(), resip::ConnectionBase::mCompression, resip::ConnectionBase::mReceivingTransmissionFormat, resip::ConnectionBase::preparseNewBytes(), resip::resipMin(), resip::ConnectionManager::touch(), resip::ConnectionBase::Uncompressed, and resip::ConnectionBase::Unknown.

Referenced by performReads(), and resip::TcpConnection::read().

{
   std::pair<char*, size_t> writePair = getWriteBuffer();
   size_t bytesToRead = resipMin(writePair.second, 
                                 static_cast<size_t>(Connection::ChunkSize));
         
   assert(bytesToRead > 0);

   int bytesRead = read(writePair.first, (int)bytesToRead);
   if (bytesRead <= 0)
   {
      return bytesRead;
   }  
   // mBuffer might have been reallocated inside read()
   writePair = getCurrentWriteBuffer();

   getConnectionManager().touch(this);

#ifdef USE_SIGCOMP
   // If this is the first data we read, determine whether the
   // connection is compressed.
   if(mReceivingTransmissionFormat == Unknown)
   {
     if (((writePair.first[0] & 0xf8) == 0xf8) && mCompression.isEnabled())
     {
       mReceivingTransmissionFormat = Compressed;
     }
     else
     {
       mReceivingTransmissionFormat = Uncompressed;
     }
   }

   // SigComp compressed messages are handed very differently
   // than non-compressed messages: they are guaranteed to
   // be framed within SigComp, and each frame contains
   // *exactly* one SIP message. Processing looks a lot like
   // it does for Datagram-oriented transports.

   if (mReceivingTransmissionFormat == Compressed)
   {
     decompressNewBytes(bytesRead);
   }
   else
#endif
   {
     if(!preparseNewBytes(bytesRead))
     {
        // Iffy; only way we have right now to indicate that this connection has
        // gone away.
        bytesRead=-1;
     }
   }
   return bytesRead;
}

Here is the call graph for this function:

virtual int resip::Connection::read ( char *  ,
const int   
) [inline, protected, virtual]

pure virtual, but need concrete Connection for book-ends of lists

Reimplemented in resip::TlsConnection, and resip::TcpConnection.

Definition at line 108 of file Connection.hxx.

{ return 0; }
void Connection::removeFrontOutstandingSend ( ) [private]

Definition at line 67 of file Connection.cxx.

References getConnectionManager(), mInWritable, resip::ConnectionBase::mOutstandingSends, and resip::ConnectionManager::removeFromWritable().

Referenced by performWrite().

{
   delete mOutstandingSends.front();
   mOutstandingSends.pop_front();

   if (mOutstandingSends.empty())
   {
      assert(mInWritable);
      getConnectionManager().removeFromWritable(this);
      mInWritable = false;
   }
}

Here is the call graph for this function:

void Connection::requestWrite ( SendData sendData)

queue data to write and add this to writable list

Definition at line 57 of file Connection.cxx.

References ensureWritable(), isWritable(), and resip::ConnectionBase::mOutstandingSends.

Referenced by onDoubleCRLF(), and resip::TcpBaseTransport::processAllWriteRequests().

{
   mOutstandingSends.push_back(sendData);
   if (isWritable())
   {
      ensureWritable();
   }
}

Here is the call graph for this function:

static void resip::Connection::setEnablePostConnectSocketFuncCall ( bool  enabled = true) [inline, static]

Definition at line 104 of file Connection.hxx.

References mEnablePostConnectSocketFuncCall.

virtual bool resip::Connection::transportWrite ( ) [inline, virtual]

Reimplemented in resip::TlsConnection.

Definition at line 63 of file Connection.hxx.

Referenced by performWrite().

{return false;}
virtual int resip::Connection::write ( const char *  ,
const int   
) [inline, protected, virtual]

pure virtual, but need concrete Connection for book-ends of lists

Reimplemented in resip::TlsConnection, and resip::TcpConnection.

Definition at line 110 of file Connection.hxx.

Referenced by performWrite().

{ return 0; }

Friends And Related Function Documentation

friend class ConnectionManager [friend]

Definition at line 42 of file Connection.hxx.

EncodeStream& operator<< ( EncodeStream strm,
const resip::Connection c 
) [friend]

Member Data Documentation

volatile bool Connection::mEnablePostConnectSocketFuncCall = false [static]

Definition at line 103 of file Connection.hxx.

Referenced by performWrite(), and setEnablePostConnectSocketFuncCall().

Definition at line 121 of file Connection.hxx.

Referenced by enableFlowTimer(), and isFlowTimerEnabled().

Definition at line 120 of file Connection.hxx.

Referenced by ensureWritable(), performWrite(), and removeFrontOutstandingSend().


The documentation for this class was generated from the following files: