|
reSIProcate/stack
9694
|
#include <TcpBaseTransport.hxx>


Public Member Functions | |
| TcpBaseTransport (Fifo< TransactionMessage > &fifo, int portNum, IpVersion version, const Data &interfaceName, AfterSocketCreationFuncPtr socketFunc, Compression &compression, unsigned transportFlags=0) | |
| virtual | ~TcpBaseTransport () |
| virtual void | processPollEvent (FdPollEventMask mask) |
| virtual void | process (FdSet &fdset) |
| If there is work to do, this is the method that does it. | |
| virtual void | buildFdSet (FdSet &fdset) |
| Adds the Transport's socket FD to the appropriate read or write sets as applicable. | |
| virtual bool | isReliable () const |
| virtual bool | isDatagram () const |
| virtual void | process () |
| Version of process to be invoked periodically when using callback-based IO (via FdPollGrp). | |
| virtual void | setPollGrp (FdPollGrp *grp) |
| virtual void | setRcvBufLen (int buflen) |
| ConnectionManager & | getConnectionManager () |
| const ConnectionManager & | getConnectionManager () const |
Protected Member Functions | |
| virtual void | init () |
| Performs constructor activities that depend on virtual functions specified by derived classes. | |
| virtual Connection * | createConnection (const Tuple &who, Socket fd, bool server=false)=0 |
| Makes new Connection using provided socket. | |
| void | processAllWriteRequests () |
| Forms a connection if one doesn't exist, moves requests to the appropriate connection's fifo. | |
| int | processListen () |
| This doesn't exist anywhere that I can find? !kw! void sendFromRoundRobin(FdSet& fdset);. | |
| Connection * | makeOutgoingConnection (const Tuple &dest, TransportFailure::FailureReason &failCode, int &subCode) |
Static Protected Attributes | |
| static const size_t | MaxWriteSize = 4096 |
| static const size_t | MaxReadSize = 4096 |
Private Attributes | |
| ConnectionManager | mConnectionManager |
Static Private Attributes | |
| static const int | MaxBufferSize |
Definition at line 13 of file TcpBaseTransport.hxx.
| TcpBaseTransport::TcpBaseTransport | ( | Fifo< TransactionMessage > & | fifo, |
| int | portNum, | ||
| IpVersion | version, | ||
| const Data & | interfaceName, | ||
| AfterSocketCreationFuncPtr | socketFunc, | ||
| Compression & | compression, | ||
| unsigned | transportFlags = 0 |
||
| ) |
Definition at line 21 of file TcpBaseTransport.cxx.
References resip::InternalTransport::mFd, resip::Transport::mTransportFlags, RESIP_TRANSPORT_FLAG_NOBIND, resip::InternalTransport::socket(), and resip::TCP.
: InternalTransport(fifo, portNum, version, pinterface, socketFunc, compression, transportFlags) { if ( (mTransportFlags & RESIP_TRANSPORT_FLAG_NOBIND)==0 ) { mFd = InternalTransport::socket(TCP, version); } }

| TcpBaseTransport::~TcpBaseTransport | ( | ) | [virtual] |
Definition at line 36 of file TcpBaseTransport.cxx.
References DebugLog, resip::FdPollGrp::delPollItem(), resip::SendData::destination, resip::Transport::fail(), resip::ConsumerFifoBuffer< class >::getNext(), InfoLog, resip::ConsumerFifoBuffer< class >::messageAvailable(), resip::InternalTransport::mPollGrp, resip::InternalTransport::mPollItemHandle, resip::Transport::mTuple, resip::InternalTransport::mTxFifoOutBuffer, resip::SendData::transactionId, and resip::TransportFailure::TransportShutdown.
{
//DebugLog (<< "Shutting down TCP Transport " << this << " " << mFd << " " << mInterface << ":" << port());
// !jf! this is not right. should drain the sends before
while (mTxFifoOutBuffer.messageAvailable())
{
SendData* data = mTxFifoOutBuffer.getNext();
InfoLog (<< "Throwing away queued data for " << data->destination);
fail(data->transactionId, TransportFailure::TransportShutdown);
delete data;
}
DebugLog (<< "Shutting down " << mTuple);
//mSendRoundRobin.clear(); // clear before we delete the connections
if(mPollGrp && mPollItemHandle)
{
mPollGrp->delPollItem(mPollItemHandle);
mPollItemHandle=0;
}
}

| void TcpBaseTransport::buildFdSet | ( | FdSet & | fdset | ) | [virtual] |
Adds the Transport's socket FD to the appropriate read or write sets as applicable.
Implements resip::Transport.
Definition at line 124 of file TcpBaseTransport.cxx.
References resip::SelectInterruptor::buildFdSet(), resip::ConnectionManager::buildFdSet(), INVALID_SOCKET, mConnectionManager, resip::InternalTransport::mFd, resip::InternalTransport::mPollGrp, resip::InternalTransport::mSelectInterruptor, resip::FdSet::setRead(), and resip::InternalTransport::shareStackProcessAndSelect().
Referenced by main().
{
assert( mPollGrp==NULL );
mConnectionManager.buildFdSet(fdset);
if ( mFd!=INVALID_SOCKET )
{
fdset.setRead(mFd); // for the transport itself (accept)
}
if(!shareStackProcessAndSelect())
{
mSelectInterruptor.buildFdSet(fdset);
}
}

| virtual Connection* resip::TcpBaseTransport::createConnection | ( | const Tuple & | who, |
| Socket | fd, | ||
| bool | server = false |
||
| ) | [protected, pure virtual] |
Makes new Connection using provided socket.
Implemented in resip::TlsTransport, and resip::TcpTransport.
Referenced by makeOutgoingConnection(), and processListen().
| ConnectionManager& resip::TcpBaseTransport::getConnectionManager | ( | ) | [inline] |
Definition at line 36 of file TcpBaseTransport.hxx.
References mConnectionManager.
Referenced by resip::Connection::getConnectionManager().
{return mConnectionManager;}
| const ConnectionManager& resip::TcpBaseTransport::getConnectionManager | ( | ) | const [inline] |
Definition at line 37 of file TcpBaseTransport.hxx.
References mConnectionManager.
{return mConnectionManager;}
| void TcpBaseTransport::init | ( | ) | [protected, virtual] |
Performs constructor activities that depend on virtual functions specified by derived classes.
Derived classes should call this in their constructors.
Definition at line 60 of file TcpBaseTransport.cxx.
References resip::InternalTransport::bind(), resip::Transport::error(), resip::getErrno(), InfoLog, resip::makeSocketNonBlocking(), resip::InternalTransport::mFd, resip::Transport::mTransportFlags, RESIP_TRANSPORT_FLAG_NOBIND, and strerror().
Referenced by resip::TcpTransport::TcpTransport().
{
if ( (mTransportFlags & RESIP_TRANSPORT_FLAG_NOBIND)!=0 )
{
return;
}
//DebugLog (<< "Opening TCP " << mFd << " : " << this);
int on = 1;
#if !defined(WIN32)
if ( ::setsockopt ( mFd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) )
#else
if ( ::setsockopt ( mFd, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on)) )
#endif
{
int e = getErrno();
InfoLog (<< "Couldn't set sockoptions SO_REUSEPORT | SO_REUSEADDR: " << strerror(e));
error(e);
throw Exception("Failed setsockopt", __FILE__,__LINE__);
}
bind();
makeSocketNonBlocking(mFd);
// do the listen, seting the maximum queue size for compeletly established
// sockets -- on linux, tcp_max_syn_backlog should be used for the incomplete
// queue size(see man listen)
int e = listen(mFd,64 );
if (e != 0 )
{
int e = getErrno();
InfoLog (<< "Failed listen " << strerror(e));
error(e);
// !cj! deal with errors
throw Transport::Exception("Address already in use", __FILE__,__LINE__);
}
}

| virtual bool resip::TcpBaseTransport::isDatagram | ( | ) | const [inline, virtual] |
| virtual bool resip::TcpBaseTransport::isReliable | ( | ) | const [inline, virtual] |
| Connection * TcpBaseTransport::makeOutgoingConnection | ( | const Tuple & | dest, |
| TransportFailure::FailureReason & | failCode, | ||
| int & | subCode | ||
| ) | [protected] |
Definition at line 189 of file TcpBaseTransport.cxx.
References resip::closeSocket(), createConnection(), DebugLog, resip::Transport::error(), resip::ConnectionManager::gc(), resip::getErrno(), resip::Tuple::getSockaddr(), InfoLog, INVALID_SOCKET, resip::Transport::ipVersion(), resip::Tuple::length(), resip::makeSocketNonBlocking(), mConnectionManager, resip::ConnectionManager::MinimumGcAge, resip::Connection::mRequestPostConnectSocketFuncCall, resip::Transport::mSocketFunc, resip::InternalTransport::socket(), SOCKET_ERROR, strerror(), resip::TCP, resip::Transport::transport(), resip::TransportFailure::TransportBadConnect, resip::TransportFailure::TransportNoSocket, and WarningLog.
Referenced by processAllWriteRequests().
{
// attempt to open
Socket sock = InternalTransport::socket( TCP, ipVersion());
// fdset.clear(sock); !kw! removed as part of epoll impl
if ( sock == INVALID_SOCKET ) // no socket found - try to free one up and try again
{
int err = getErrno();
InfoLog (<< "Failed to create a socket " << strerror(err));
error(err);
mConnectionManager.gc(ConnectionManager::MinimumGcAge, 1); // free one up
sock = InternalTransport::socket( TCP, ipVersion());
if ( sock == INVALID_SOCKET )
{
err = getErrno();
WarningLog( << "Error in finding free filedescriptor to use. " << strerror(err));
error(err);
failReason = TransportFailure::TransportNoSocket;
failSubCode = err;
return NULL;
}
}
assert(sock != INVALID_SOCKET);
DebugLog (<<"Opening new connection to " << dest);
makeSocketNonBlocking(sock);
if (mSocketFunc)
{
mSocketFunc(sock, transport(), __FILE__, __LINE__);
}
const sockaddr& servaddr = dest.getSockaddr();
int ret = connect( sock, &servaddr, dest.length() );
// See Chapter 15.3 of Stevens, Unix Network Programming Vol. 1 2nd Edition
if (ret == SOCKET_ERROR)
{
int err = getErrno();
switch (err)
{
case EINPROGRESS:
case EWOULDBLOCK:
break;
default:
{
// !jf! this has failed
InfoLog( << "Error on TCP connect to " << dest << ", err=" << err << ": " << strerror(err));
error(err);
//fdset.clear(sock);
closeSocket(sock);
failReason = TransportFailure::TransportBadConnect;
failSubCode = err;
return NULL;
}
}
}
// This will add the connection to the manager
Connection *conn = createConnection(dest, sock, false);
assert(conn);
conn->mRequestPostConnectSocketFuncCall = true;
return conn;
}

| void TcpBaseTransport::process | ( | FdSet & | fdset | ) | [virtual] |
If there is work to do, this is the method that does it.
If the socket is readable, it is read. If the socket is writable and there are outgoing messages to be sent, they are sent.
Incoming messages are parsed and dispatched to the relevant entity. SIP messages will be posted to the TransactionMessage Fifo.
| fdset | is the FdSet after select() has been called. |
Implements resip::Transport.
Definition at line 318 of file TcpBaseTransport.cxx.
References resip::ProducerFifoBuffer< class >::flush(), INVALID_SOCKET, mConnectionManager, resip::InternalTransport::mFd, resip::InternalTransport::mPollGrp, resip::Transport::mStateMachineFifo, resip::ConnectionManager::process(), processAllWriteRequests(), processListen(), and resip::FdSet::readyToRead().
Referenced by main().
{
assert( mPollGrp==NULL );
processAllWriteRequests();
// process the connections in ConnectionManager
mConnectionManager.process(fdSet);
mStateMachineFifo.flush();
// process our own listen/accept socket for incoming connections
if (mFd!=INVALID_SOCKET && fdSet.readyToRead(mFd))
{
processListen();
}
}

| void TcpBaseTransport::process | ( | ) | [virtual] |
Version of process to be invoked periodically when using callback-based IO (via FdPollGrp).
Implements resip::Transport.
Definition at line 303 of file TcpBaseTransport.cxx.
References resip::ProducerFifoBuffer< class >::flush(), resip::InternalTransport::mPollGrp, resip::Transport::mStateMachineFifo, and processAllWriteRequests().
{
mStateMachineFifo.flush();
// called within SipStack's thread. There is some risk of
// recursion here if connection starts doing anything fancy.
// For backward-compat when not-epoll, don't handle transmit synchronously
// now, but rather wait for the process() call
if (mPollGrp)
{
processAllWriteRequests();
}
}

| void TcpBaseTransport::processAllWriteRequests | ( | ) | [protected] |
Forms a connection if one doesn't exist, moves requests to the appropriate connection's fifo.
Definition at line 258 of file TcpBaseTransport.cxx.
References resip::SendData::command, DebugLog, resip::SendData::destination, resip::Transport::fail(), resip::TransportFailure::Failure, resip::ConnectionManager::findConnection(), resip::ConsumerFifoBuffer< class >::getNext(), resip::Connection::getSocket(), INVALID_SOCKET, makeOutgoingConnection(), mConnectionManager, resip::ConsumerFifoBuffer< class >::messageAvailable(), resip::Tuple::mFlowKey, resip::InternalTransport::mTxFifoOutBuffer, resip::Tuple::onlyUseExistingConnection, resip::Connection::requestWrite(), resip::SendData::transactionId, and resip::TransportFailure::TransportNoExistConn.
Referenced by process().
{
while (mTxFifoOutBuffer.messageAvailable())
{
SendData* data = mTxFifoOutBuffer.getNext();
DebugLog (<< "Processing write for " << data->destination);
// this will check by connectionId first, then by address
Connection* conn = mConnectionManager.findConnection(data->destination);
//DebugLog (<< "TcpBaseTransport::processAllWriteRequests() using " << conn);
// There is no connection yet, so make a client connection
if (conn == 0 &&
!data->destination.onlyUseExistingConnection &&
data->command == 0) // SendData commands (ie. close connection and enable flow timers) shouldn't cause new connections to form
{
TransportFailure::FailureReason failCode = TransportFailure::Failure;
int subCode = 0;
if((conn=makeOutgoingConnection(data->destination, failCode, subCode)) == NULL)
{
fail(data->transactionId, failCode, subCode);
delete data;
return; // .kw. WHY? What about messages left in queue?
}
assert(conn->getSocket() != INVALID_SOCKET);
// .kw. why do below? We already have the conn, who uses key?
data->destination.mFlowKey = conn->getSocket(); // !jf!
}
if (conn == 0)
{
DebugLog (<< "Failed to create/get connection: " << data->destination);
fail(data->transactionId, TransportFailure::TransportNoExistConn, 0);
delete data;
// NOTE: We fail this one but don't give up on others in queue
}
else // have a connection
{
conn->requestWrite(data);
}
}
}

| int TcpBaseTransport::processListen | ( | ) | [protected] |
This doesn't exist anywhere that I can find? !kw! void sendFromRoundRobin(FdSet& fdset);.
Returns 1 if created new connection, -1 if "bad" error, and 0 if nothing to do (EWOULDBLOCK)
Definition at line 143 of file TcpBaseTransport.cxx.
References resip::closeSocket(), createConnection(), DebugLog, resip::Transport::error(), resip::ConnectionManager::findConnection(), resip::getErrno(), resip::Tuple::getMutableSockaddr(), InfoLog, resip::Tuple::length(), resip::makeSocketNonBlocking(), mConnectionManager, resip::InternalTransport::mFd, resip::Transport::mSocketFunc, resip::Transport::mTuple, SOCKET_ERROR, and resip::Transport::transport().
Referenced by process(), and processPollEvent().
{
if (1)
{
Tuple tuple(mTuple);
struct sockaddr& peer = tuple.getMutableSockaddr();
socklen_t peerLen = tuple.length();
Socket sock = accept( mFd, &peer, &peerLen);
if ( sock == SOCKET_ERROR )
{
int e = getErrno();
switch (e)
{
case EWOULDBLOCK:
// !jf! this can not be ready in some cases
// !kw! this will happen every epoll cycle
return 0;
default:
Transport::error(e);
}
return -1;
}
makeSocketNonBlocking(sock);
DebugLog (<< "Received TCP connection from: " << tuple << " as fd=" << sock);
if (mSocketFunc)
{
mSocketFunc(sock, transport(), __FILE__, __LINE__);
}
if(!mConnectionManager.findConnection(tuple))
{
createConnection(tuple, sock, true);
}
else
{
InfoLog(<<"Someone probably sent a reciprocal SYN at us.");
// ?bwc? Can we call this right after calling accept()?
closeSocket(sock);
}
}
return 1;
}

| void TcpBaseTransport::processPollEvent | ( | FdPollEventMask | mask | ) | [virtual] |
Implements resip::FdPollItemIf.
Definition at line 337 of file TcpBaseTransport.cxx.
References FPEM_Read, and processListen().
{
if ( mask & FPEM_Read )
{
while ( processListen() > 0 )
;
}
}

| void TcpBaseTransport::setPollGrp | ( | FdPollGrp * | grp | ) | [virtual] |
Reimplemented from resip::InternalTransport.
Definition at line 102 of file TcpBaseTransport.cxx.
References resip::FdPollGrp::addPollItem(), resip::FdPollGrp::delPollItem(), FPEM_Edge, FPEM_Read, INVALID_SOCKET, mConnectionManager, resip::InternalTransport::mFd, resip::InternalTransport::mPollGrp, resip::InternalTransport::mPollItemHandle, and resip::ConnectionManager::setPollGrp().
{
if(mPollGrp && mPollItemHandle)
{
mPollGrp->delPollItem(mPollItemHandle);
mPollItemHandle=0;
}
if ( mFd!=INVALID_SOCKET && grp)
{
mPollItemHandle = grp->addPollItem(mFd, FPEM_Read|FPEM_Edge, this);
// above released by InternalTransport destructor
// ?bwc? Is this really a good idea? If the InternalTransport d'tor is
// freeing this, shouldn't InternalTransport::setPollGrp() handle
// creating it?
}
mConnectionManager.setPollGrp(grp);
InternalTransport::setPollGrp(grp);
}

| void TcpBaseTransport::setRcvBufLen | ( | int | buflen | ) | [virtual] |
Reimplemented from resip::Transport.
Definition at line 346 of file TcpBaseTransport.cxx.
{
assert(0); // not implemented yet
// need to store away the length and use when setting up new connections
}
const int resip::TcpBaseTransport::MaxBufferSize [static, private] |
Definition at line 70 of file TcpBaseTransport.hxx.
const size_t TcpBaseTransport::MaxReadSize = 4096 [static, protected] |
Definition at line 67 of file TcpBaseTransport.hxx.
const size_t TcpBaseTransport::MaxWriteSize = 4096 [static, protected] |
Definition at line 66 of file TcpBaseTransport.hxx.
Definition at line 71 of file TcpBaseTransport.hxx.
Referenced by buildFdSet(), getConnectionManager(), makeOutgoingConnection(), process(), processAllWriteRequests(), processListen(), and setPollGrp().
1.7.5.1