|
reSIProcate/stack
9694
|
Connection implements, via sockets, ConnectionBase for managed connections. More...
#include <Connection.hxx>


Public Member Functions | |
| Connection (Transport *transport, const Tuple &who, Socket socket, Compression &compression) | |
| virtual | ~Connection () |
| Socket | getSocket () const |
| virtual bool | hasDataToRead () |
| always true -- always add to fdset as read ready | |
| virtual bool | isGood () |
| has valid connection | |
| virtual bool | isWritable () |
| virtual bool | transportWrite () |
| void | requestWrite (SendData *sendData) |
| queue data to write and add this to writable list | |
| int | performWrite () |
| send some or all of a queued data; remove from writable if completely written | |
| bool | performWrites (unsigned int max=0) |
| Call performWrite() repeatedly, until either the send queue is exhausted, the write() call fails (probably because the fd is no longer ready to write), or a set number of writes has been performed. | |
| void | ensureWritable () |
| ensure that we are on the writeable list if required | |
| int | read () |
| move data from the connection to the buffer; move this to front of least recently used list. | |
| bool | performReads (unsigned int max=0) |
| Call read() repeatedly, until an error occurs (because the fd is not ready to read, most likely). | |
| void | enableFlowTimer () |
| Ensures this connection is in the FlowTimer LRU list in the connection manager. | |
| bool | isFlowTimerEnabled () |
Static Public Member Functions | |
| static void | setEnablePostConnectSocketFuncCall (bool enabled=true) |
Public Attributes | |
| bool | mRequestPostConnectSocketFuncCall |
Static Public Attributes | |
| static volatile bool | mEnablePostConnectSocketFuncCall = false |
Protected Member Functions | |
| virtual int | read (char *, const int) |
| pure virtual, but need concrete Connection for book-ends of lists | |
| virtual int | write (const char *, const int) |
| pure virtual, but need concrete Connection for book-ends of lists | |
| virtual void | onDoubleCRLF () |
| virtual void | onSingleCRLF () |
| virtual void | processPollEvent (FdPollEventMask mask) |
| Virtual function of FdPollItemIf, called to process io events. | |
Private Member Functions | |
| ConnectionManager & | getConnectionManager () const |
| void | removeFrontOutstandingSend () |
| Connection () | |
| no default c'tor | |
| Connection (const Connection &) | |
| no value semantics | |
| Connection & | operator= (const Connection &) |
Private Attributes | |
| bool | mInWritable |
| bool | mFlowTimerEnabled |
| FdPollItemHandle | mPollItemHandle |
Friends | |
| class | ConnectionManager |
| EncodeStream & | operator<< (EncodeStream &strm, const resip::Connection &c) |
Connection implements, via sockets, ConnectionBase for managed connections.
Connections are managed for approximate fairness and least recently used garbage collection. Connection inherits three different instantiations of intrusive lists.
Definition at line 35 of file Connection.hxx.
| Connection::Connection | ( | Transport * | transport, |
| const Tuple & | who, | ||
| Socket | socket, | ||
| Compression & | compression | ||
| ) |
Definition at line 29 of file Connection.cxx.
References resip::ConnectionManager::addConnection(), getConnectionManager(), InfoLog, resip::Tuple::mFlowKey, resip::ConnectionBase::mWho, and resip::ConnectionBase::transport().
: ConnectionBase(transport,who,compression), mRequestPostConnectSocketFuncCall(false), mInWritable(false), mFlowTimerEnabled(false), mPollItemHandle(0) { mWho.mFlowKey=(FlowKey)socket; InfoLog (<< "Connection::Connection: new connection created to who: " << mWho); if(mWho.mFlowKey && ConnectionBase::transport()) { getConnectionManager().addConnection(this); } }

| Connection::~Connection | ( | ) | [virtual] |
Definition at line 46 of file Connection.cxx.
References resip::closeSocket(), getConnectionManager(), resip::Tuple::mFlowKey, resip::ConnectionBase::mWho, resip::ConnectionManager::removeConnection(), and resip::ConnectionBase::transport().
{
if(mWho.mFlowKey && ConnectionBase::transport())
{
getConnectionManager().removeConnection(this);
// remove first then close, since conn manager may need socket
closeSocket(mWho.mFlowKey);
}
}

| resip::Connection::Connection | ( | ) | [private] |
no default c'tor
| resip::Connection::Connection | ( | const Connection & | ) | [private] |
no value semantics
| void Connection::enableFlowTimer | ( | ) |
Ensures this connection is in the FlowTimer LRU list in the connection manager.
Definition at line 311 of file Connection.cxx.
References getConnectionManager(), mFlowTimerEnabled, and resip::ConnectionManager::moveToFlowTimerLru().
Referenced by performWrite().
{
if(!mFlowTimerEnabled)
{
mFlowTimerEnabled = true;
// ensure connection is in a FlowTimer LRU list on the connection manager
getConnectionManager().moveToFlowTimerLru(this);
}
}

| void Connection::ensureWritable | ( | ) |
ensure that we are on the writeable list if required
Definition at line 208 of file Connection.cxx.
References resip::ConnectionManager::addToWritable(), getConnectionManager(), mInWritable, and resip::ConnectionBase::mOutstandingSends.
Referenced by resip::TlsConnection::checkState(), and requestWrite().
{
if(!mInWritable)
{
assert(!mOutstandingSends.empty());
getConnectionManager().addToWritable(this);
mInWritable = true;
}
}

| ConnectionManager & Connection::getConnectionManager | ( | ) | const [private] |
Definition at line 219 of file Connection.cxx.
References resip::TcpBaseTransport::getConnectionManager(), and resip::ConnectionBase::transport().
Referenced by Connection(), enableFlowTimer(), ensureWritable(), performWrite(), read(), removeFrontOutstandingSend(), and ~Connection().
{
TcpBaseTransport* transport = static_cast<TcpBaseTransport*>(ConnectionBase::transport());
return transport->getConnectionManager();
}

| Socket resip::Connection::getSocket | ( | ) | const [inline] |
Definition at line 56 of file Connection.hxx.
References resip::Tuple::mFlowKey, and resip::ConnectionBase::mWho.
Referenced by resip::ConnectionManager::addConnection(), resip::ConnectionManager::gc(), performWrite(), resip::ConnectionManager::process(), resip::TcpBaseTransport::processAllWriteRequests(), processPollEvent(), and resip::TcpConnection::read().
{return mWho.mFlowKey;}
| bool Connection::hasDataToRead | ( | ) | [virtual] |
always true -- always add to fdset as read ready
Reimplemented in resip::TlsConnection, and resip::TcpConnection.
Definition at line 342 of file Connection.cxx.
Referenced by resip::ConnectionManager::process().
{
return true;
}
| bool resip::Connection::isFlowTimerEnabled | ( | ) | [inline] |
Definition at line 100 of file Connection.hxx.
References mFlowTimerEnabled.
Referenced by resip::ConnectionManager::removeConnection(), and resip::ConnectionManager::touch().
{ return mFlowTimerEnabled; }
| bool Connection::isGood | ( | ) | [virtual] |
has valid connection
Reimplemented in resip::TlsConnection, and resip::TcpConnection.
Definition at line 348 of file Connection.cxx.
{
return true;
}
| bool Connection::isWritable | ( | ) | [virtual] |
Reimplemented in resip::TlsConnection, and resip::TcpConnection.
Definition at line 354 of file Connection.cxx.
Referenced by requestWrite().
{
return true;
}
| void Connection::onDoubleCRLF | ( | ) | [protected, virtual] |
Reimplemented from resip::ConnectionBase.
Definition at line 323 of file Connection.cxx.
References resip::Symbols::CRLF, DebugLog, resip::Data::Empty, resip::InteropHelper::getOutboundVersion(), resip::ConnectionBase::mWho, and requestWrite().
{
// !bwc! TODO might need to make this more efficient.
// !bwc! Need to make this sigcomp-friendly
if(InteropHelper::getOutboundVersion()>=8)
{
DebugLog(<<"Sending response CRLF (aka pong).");
requestWrite(new SendData(mWho,Symbols::CRLF,Data::Empty,Data::Empty));
}
}

| void Connection::onSingleCRLF | ( | ) | [protected, virtual] |
Reimplemented from resip::ConnectionBase.
Definition at line 335 of file Connection.cxx.
References DebugLog, resip::Transport::keepAlivePong(), resip::ConnectionBase::mTransport, and resip::ConnectionBase::mWho.
{
DebugLog(<<"Received response CRLF (aka pong).");
mTransport->keepAlivePong(mWho);
}

| Connection& resip::Connection::operator= | ( | const Connection & | ) | [private] |
Reimplemented from resip::ConnectionBase.
| bool Connection::performReads | ( | unsigned int | max = 0 | ) |
Call read() repeatedly, until an error occurs (because the fd is not ready to read, most likely).
| max | The maximum number of reads to perform. 0 indicates that there is no limit. |
Definition at line 291 of file Connection.cxx.
References DebugLog, and read().
Referenced by resip::ConnectionManager::process(), and processPollEvent().
{
int bytesRead;
// if max==0, we will overflow into UINT_MAX. This is intentional.
while((bytesRead = read())>0 && --max!=0)
{
DebugLog(<< "Connection::performReads() " << " read=" << bytesRead);
}
if ( bytesRead < 0 )
{
DebugLog(<< "Closing connection bytesRead=" << bytesRead);
delete this;
return false;
}
return true;
}

| int Connection::performWrite | ( | ) |
send some or all of a queued data; remove from writable if completely written
Definition at line 81 of file Connection.cxx.
References resip::Transport::callSocketFunc(), resip::SendData::CloseConnection, resip::ConnectionBase::Compressed, resip::Data::data(), DebugLog, resip::SendData::EnableFlowTimer, enableFlowTimer(), errno, getConnectionManager(), getSocket(), InfoLog, resip::Compression::isEnabled(), resip::ConnectionBase::mCompression, mEnablePostConnectSocketFuncCall, mInWritable, resip::ConnectionBase::mOutstandingSends, mRequestPostConnectSocketFuncCall, resip::ConnectionBase::mSendingTransmissionFormat, resip::ConnectionBase::mSendPos, resip::ConnectionBase::mSigcompStack, resip::ConnectionBase::mTransport, resip::ConnectionManager::removeFromWritable(), removeFrontOutstandingSend(), resip::Data::size(), transportWrite(), resip::ConnectionBase::Uncompressed, resip::ConnectionBase::Unknown, and write().
Referenced by performWrites().
{
if(transportWrite())
{
assert(mInWritable);
getConnectionManager().removeFromWritable(this);
mInWritable = false;
return 0; // What does this transportWrite() mean?
}
assert(!mOutstandingSends.empty());
switch(mOutstandingSends.front()->command)
{
case SendData::CloseConnection:
// .bwc. Close this connection.
return -1;
break;
case SendData::EnableFlowTimer:
enableFlowTimer();
removeFrontOutstandingSend();
return 0;
break;
default:
// do nothing
break;
}
const Data& sigcompId = mOutstandingSends.front()->sigcompId;
if(mSendingTransmissionFormat == Unknown)
{
if (sigcompId.size() > 0 && mCompression.isEnabled())
{
mSendingTransmissionFormat = Compressed;
}
else
{
mSendingTransmissionFormat = Uncompressed;
}
}
#ifdef USE_SIGCOMP
// Perform compression here, if appropriate
if (mSendingTransmissionFormat == Compressed
&& !(mOutstandingSends.front()->isAlreadyCompressed))
{
const Data& uncompressed = mOutstandingSends.front()->data;
osc::SigcompMessage *sm =
mSigcompStack->compressMessage(uncompressed.data(), uncompressed.size(),
sigcompId.data(), sigcompId.size(),
true);
DebugLog (<< "Compressed message from "
<< uncompressed.size() << " bytes to "
<< sm->getStreamLength() << " bytes");
SendData *oldSd = mOutstandingSends.front();
SendData *newSd = new SendData(oldSd->destination,
Data(sm->getStreamMessage(),
sm->getStreamLength()),
oldSd->transactionId,
oldSd->sigcompId,
true);
mOutstandingSends.front() = newSd;
delete oldSd;
delete sm;
}
#endif
if(mEnablePostConnectSocketFuncCall && mRequestPostConnectSocketFuncCall)
{
// Note: The first time the socket is available for write, is when the TCP connect call is completed
mRequestPostConnectSocketFuncCall = false;
mTransport->callSocketFunc(getSocket());
}
const Data& data = mOutstandingSends.front()->data;
int nBytes = write(data.data() + mSendPos,int(data.size() - mSendPos));
//DebugLog (<< "Tried to send " << data.size() - mSendPos << " bytes, sent " << nBytes << " bytes");
if (nBytes < 0)
{
if(errno!=EAGAIN)
{
//fail(data.transactionId);
InfoLog(<< "Write failed on socket: " << this->getSocket() << ", closing connection");
return -1;
}
else
{
return 0;
}
}
else
{
// Safe because of the conditional above ( < 0 ).
Data::size_type bytesWritten = static_cast<Data::size_type>(nBytes);
mSendPos += bytesWritten;
if (mSendPos == data.size())
{
mSendPos = 0;
removeFrontOutstandingSend();
}
return bytesWritten;
}
}

| bool Connection::performWrites | ( | unsigned int | max = 0 | ) |
Call performWrite() repeatedly, until either the send queue is exhausted, the write() call fails (probably because the fd is no longer ready to write), or a set number of writes has been performed.
| max | The maximum number of writes to perform. 0 indicates that there is no limit. |
Definition at line 192 of file Connection.cxx.
References resip::ConnectionBase::mOutstandingSends, and performWrite().
Referenced by resip::ConnectionManager::process(), and processPollEvent().
{
int res;
// if max==0, we will overflow into UINT_MAX. This is intentional.
while((res=performWrite())>0 && !mOutstandingSends.empty() && --max!=0)
{;}
if(res<0)
{
delete this;
return false;
}
return true;
}

| void Connection::processPollEvent | ( | FdPollEventMask | mask | ) | [protected, virtual] |
Virtual function of FdPollItemIf, called to process io events.
Implements resip::FdPollItemIf.
Definition at line 363 of file Connection.cxx.
References resip::TransportFailure::ConnectionException, FPEM_Error, FPEM_Read, FPEM_Write, getSocket(), resip::getSocketError(), InfoLog, performReads(), performWrites(), and resip::ConnectionBase::setFailureReason().
{
/* The original code in ConnectionManager.cxx didn't check
* for error events unless no writable event. (e.g., writable
* masked error. Why?)
*/
if ( mask & FPEM_Error )
{
Socket fd = getSocket();
int errNum = getSocketError(fd);
InfoLog(<< "Exception on socket " << fd << " code: " << errNum << "; closing connection");
setFailureReason(TransportFailure::ConnectionException, errNum);
delete this;
return;
}
if ( mask & FPEM_Write )
{
if(!performWrites())
{
// Just deleted self
return;
}
}
if ( mask & FPEM_Read )
{
performReads();
}
}

| int Connection::read | ( | ) |
move data from the connection to the buffer; move this to front of least recently used list.
when the message is complete, it is delivered via mTransport->pushRxMsgUp() which generally puts it on a fifo
Definition at line 234 of file Connection.cxx.
References resip::ConnectionBase::ChunkSize, resip::ConnectionBase::Compressed, resip::ConnectionBase::decompressNewBytes(), getConnectionManager(), resip::ConnectionBase::getCurrentWriteBuffer(), resip::ConnectionBase::getWriteBuffer(), resip::Compression::isEnabled(), resip::ConnectionBase::mCompression, resip::ConnectionBase::mReceivingTransmissionFormat, resip::ConnectionBase::preparseNewBytes(), resip::resipMin(), resip::ConnectionManager::touch(), resip::ConnectionBase::Uncompressed, and resip::ConnectionBase::Unknown.
Referenced by performReads(), and resip::TcpConnection::read().
{
std::pair<char*, size_t> writePair = getWriteBuffer();
size_t bytesToRead = resipMin(writePair.second,
static_cast<size_t>(Connection::ChunkSize));
assert(bytesToRead > 0);
int bytesRead = read(writePair.first, (int)bytesToRead);
if (bytesRead <= 0)
{
return bytesRead;
}
// mBuffer might have been reallocated inside read()
writePair = getCurrentWriteBuffer();
getConnectionManager().touch(this);
#ifdef USE_SIGCOMP
// If this is the first data we read, determine whether the
// connection is compressed.
if(mReceivingTransmissionFormat == Unknown)
{
if (((writePair.first[0] & 0xf8) == 0xf8) && mCompression.isEnabled())
{
mReceivingTransmissionFormat = Compressed;
}
else
{
mReceivingTransmissionFormat = Uncompressed;
}
}
// SigComp compressed messages are handed very differently
// than non-compressed messages: they are guaranteed to
// be framed within SigComp, and each frame contains
// *exactly* one SIP message. Processing looks a lot like
// it does for Datagram-oriented transports.
if (mReceivingTransmissionFormat == Compressed)
{
decompressNewBytes(bytesRead);
}
else
#endif
{
if(!preparseNewBytes(bytesRead))
{
// Iffy; only way we have right now to indicate that this connection has
// gone away.
bytesRead=-1;
}
}
return bytesRead;
}

| virtual int resip::Connection::read | ( | char * | , |
| const int | |||
| ) | [inline, protected, virtual] |
pure virtual, but need concrete Connection for book-ends of lists
Reimplemented in resip::TlsConnection, and resip::TcpConnection.
Definition at line 108 of file Connection.hxx.
{ return 0; }
| void Connection::removeFrontOutstandingSend | ( | ) | [private] |
Definition at line 67 of file Connection.cxx.
References getConnectionManager(), mInWritable, resip::ConnectionBase::mOutstandingSends, and resip::ConnectionManager::removeFromWritable().
Referenced by performWrite().
{
delete mOutstandingSends.front();
mOutstandingSends.pop_front();
if (mOutstandingSends.empty())
{
assert(mInWritable);
getConnectionManager().removeFromWritable(this);
mInWritable = false;
}
}

| void Connection::requestWrite | ( | SendData * | sendData | ) |
queue data to write and add this to writable list
Definition at line 57 of file Connection.cxx.
References ensureWritable(), isWritable(), and resip::ConnectionBase::mOutstandingSends.
Referenced by onDoubleCRLF(), and resip::TcpBaseTransport::processAllWriteRequests().
{
mOutstandingSends.push_back(sendData);
if (isWritable())
{
ensureWritable();
}
}

| static void resip::Connection::setEnablePostConnectSocketFuncCall | ( | bool | enabled = true | ) | [inline, static] |
Definition at line 104 of file Connection.hxx.
References mEnablePostConnectSocketFuncCall.
{ mEnablePostConnectSocketFuncCall = enabled; }
| virtual bool resip::Connection::transportWrite | ( | ) | [inline, virtual] |
Reimplemented in resip::TlsConnection.
Definition at line 63 of file Connection.hxx.
Referenced by performWrite().
{return false;}
| virtual int resip::Connection::write | ( | const char * | , |
| const int | |||
| ) | [inline, protected, virtual] |
pure virtual, but need concrete Connection for book-ends of lists
Reimplemented in resip::TlsConnection, and resip::TcpConnection.
Definition at line 110 of file Connection.hxx.
Referenced by performWrite().
{ return 0; }
friend class ConnectionManager [friend] |
Definition at line 42 of file Connection.hxx.
| EncodeStream& operator<< | ( | EncodeStream & | strm, |
| const resip::Connection & | c | ||
| ) | [friend] |
volatile bool Connection::mEnablePostConnectSocketFuncCall = false [static] |
Definition at line 103 of file Connection.hxx.
Referenced by performWrite(), and setEnablePostConnectSocketFuncCall().
bool resip::Connection::mFlowTimerEnabled [private] |
Definition at line 121 of file Connection.hxx.
Referenced by enableFlowTimer(), and isFlowTimerEnabled().
bool resip::Connection::mInWritable [private] |
Definition at line 120 of file Connection.hxx.
Referenced by ensureWritable(), performWrite(), and removeFrontOutstandingSend().
Definition at line 122 of file Connection.hxx.
Referenced by resip::ConnectionManager::addConnection(), resip::ConnectionManager::addToWritable(), resip::ConnectionManager::removeConnection(), and resip::ConnectionManager::removeFromWritable().
Definition at line 102 of file Connection.hxx.
Referenced by resip::TcpBaseTransport::makeOutgoingConnection(), and performWrite().
1.7.5.1