|
reSIProcate/stack
9694
|
00001 #if defined(HAVE_CONFIG_H) 00002 #include "config.h" 00003 #endif 00004 00005 #include "rutil/Socket.hxx" 00006 #include "rutil/Logger.hxx" 00007 #include "resip/stack/Connection.hxx" 00008 #include "resip/stack/ConnectionManager.hxx" 00009 #include "resip/stack/InteropHelper.hxx" 00010 #include "resip/stack/SipMessage.hxx" 00011 #include "resip/stack/TcpBaseTransport.hxx" 00012 #include "rutil/WinLeakCheck.hxx" 00013 00014 #ifdef USE_SSL 00015 #include "resip/stack/ssl/Security.hxx" 00016 #endif 00017 00018 #ifdef USE_SIGCOMP 00019 #include <osc/Stack.h> 00020 #include <osc/SigcompMessage.h> 00021 #endif 00022 00023 using namespace resip; 00024 00025 volatile bool Connection::mEnablePostConnectSocketFuncCall = false; 00026 00027 #define RESIPROCATE_SUBSYSTEM Subsystem::TRANSPORT 00028 00029 Connection::Connection(Transport* transport,const Tuple& who, Socket socket, 00030 Compression &compression) 00031 : ConnectionBase(transport,who,compression), 00032 mRequestPostConnectSocketFuncCall(false), 00033 mInWritable(false), 00034 mFlowTimerEnabled(false), 00035 mPollItemHandle(0) 00036 { 00037 mWho.mFlowKey=(FlowKey)socket; 00038 InfoLog (<< "Connection::Connection: new connection created to who: " << mWho); 00039 00040 if(mWho.mFlowKey && ConnectionBase::transport()) 00041 { 00042 getConnectionManager().addConnection(this); 00043 } 00044 } 00045 00046 Connection::~Connection() 00047 { 00048 if(mWho.mFlowKey && ConnectionBase::transport()) 00049 { 00050 getConnectionManager().removeConnection(this); 00051 // remove first then close, since conn manager may need socket 00052 closeSocket(mWho.mFlowKey); 00053 } 00054 } 00055 00056 void 00057 Connection::requestWrite(SendData* sendData) 00058 { 00059 mOutstandingSends.push_back(sendData); 00060 if (isWritable()) 00061 { 00062 ensureWritable(); 00063 } 00064 } 00065 00066 void 00067 Connection::removeFrontOutstandingSend() 00068 { 00069 delete mOutstandingSends.front(); 00070 mOutstandingSends.pop_front(); 00071 00072 if (mOutstandingSends.empty()) 00073 { 00074 assert(mInWritable); 00075 getConnectionManager().removeFromWritable(this); 00076 mInWritable = false; 00077 } 00078 } 00079 00080 int 00081 Connection::performWrite() 00082 { 00083 if(transportWrite()) 00084 { 00085 assert(mInWritable); 00086 getConnectionManager().removeFromWritable(this); 00087 mInWritable = false; 00088 return 0; // What does this transportWrite() mean? 00089 } 00090 00091 assert(!mOutstandingSends.empty()); 00092 switch(mOutstandingSends.front()->command) 00093 { 00094 case SendData::CloseConnection: 00095 // .bwc. Close this connection. 00096 return -1; 00097 break; 00098 case SendData::EnableFlowTimer: 00099 enableFlowTimer(); 00100 removeFrontOutstandingSend(); 00101 return 0; 00102 break; 00103 default: 00104 // do nothing 00105 break; 00106 } 00107 00108 const Data& sigcompId = mOutstandingSends.front()->sigcompId; 00109 00110 if(mSendingTransmissionFormat == Unknown) 00111 { 00112 if (sigcompId.size() > 0 && mCompression.isEnabled()) 00113 { 00114 mSendingTransmissionFormat = Compressed; 00115 } 00116 else 00117 { 00118 mSendingTransmissionFormat = Uncompressed; 00119 } 00120 } 00121 00122 00123 #ifdef USE_SIGCOMP 00124 // Perform compression here, if appropriate 00125 if (mSendingTransmissionFormat == Compressed 00126 && !(mOutstandingSends.front()->isAlreadyCompressed)) 00127 { 00128 const Data& uncompressed = mOutstandingSends.front()->data; 00129 osc::SigcompMessage *sm = 00130 mSigcompStack->compressMessage(uncompressed.data(), uncompressed.size(), 00131 sigcompId.data(), sigcompId.size(), 00132 true); 00133 DebugLog (<< "Compressed message from " 00134 << uncompressed.size() << " bytes to " 00135 << sm->getStreamLength() << " bytes"); 00136 00137 SendData *oldSd = mOutstandingSends.front(); 00138 SendData *newSd = new SendData(oldSd->destination, 00139 Data(sm->getStreamMessage(), 00140 sm->getStreamLength()), 00141 oldSd->transactionId, 00142 oldSd->sigcompId, 00143 true); 00144 mOutstandingSends.front() = newSd; 00145 delete oldSd; 00146 delete sm; 00147 } 00148 #endif 00149 00150 if(mEnablePostConnectSocketFuncCall && mRequestPostConnectSocketFuncCall) 00151 { 00152 // Note: The first time the socket is available for write, is when the TCP connect call is completed 00153 mRequestPostConnectSocketFuncCall = false; 00154 mTransport->callSocketFunc(getSocket()); 00155 } 00156 00157 const Data& data = mOutstandingSends.front()->data; 00158 00159 int nBytes = write(data.data() + mSendPos,int(data.size() - mSendPos)); 00160 00161 //DebugLog (<< "Tried to send " << data.size() - mSendPos << " bytes, sent " << nBytes << " bytes"); 00162 00163 if (nBytes < 0) 00164 { 00165 if(errno!=EAGAIN) 00166 { 00167 //fail(data.transactionId); 00168 InfoLog(<< "Write failed on socket: " << this->getSocket() << ", closing connection"); 00169 return -1; 00170 } 00171 else 00172 { 00173 return 0; 00174 } 00175 } 00176 else 00177 { 00178 // Safe because of the conditional above ( < 0 ). 00179 Data::size_type bytesWritten = static_cast<Data::size_type>(nBytes); 00180 mSendPos += bytesWritten; 00181 if (mSendPos == data.size()) 00182 { 00183 mSendPos = 0; 00184 removeFrontOutstandingSend(); 00185 } 00186 return bytesWritten; 00187 } 00188 } 00189 00190 00191 bool 00192 Connection::performWrites(unsigned int max) 00193 { 00194 int res; 00195 // if max==0, we will overflow into UINT_MAX. This is intentional. 00196 while((res=performWrite())>0 && !mOutstandingSends.empty() && --max!=0) 00197 {;} 00198 00199 if(res<0) 00200 { 00201 delete this; 00202 return false; 00203 } 00204 return true; 00205 } 00206 00207 void 00208 Connection::ensureWritable() 00209 { 00210 if(!mInWritable) 00211 { 00212 assert(!mOutstandingSends.empty()); 00213 getConnectionManager().addToWritable(this); 00214 mInWritable = true; 00215 } 00216 } 00217 00218 ConnectionManager& 00219 Connection::getConnectionManager() const 00220 { 00221 TcpBaseTransport* transport = static_cast<TcpBaseTransport*>(ConnectionBase::transport()); 00222 00223 return transport->getConnectionManager(); 00224 } 00225 00226 EncodeStream& 00227 resip::operator<<(EncodeStream& strm, const resip::Connection& c) 00228 { 00229 strm << "CONN: " << &c << " " << int(c.getSocket()) << " " << c.mWho; 00230 return strm; 00231 } 00232 00233 int 00234 Connection::read() 00235 { 00236 std::pair<char*, size_t> writePair = getWriteBuffer(); 00237 size_t bytesToRead = resipMin(writePair.second, 00238 static_cast<size_t>(Connection::ChunkSize)); 00239 00240 assert(bytesToRead > 0); 00241 00242 int bytesRead = read(writePair.first, (int)bytesToRead); 00243 if (bytesRead <= 0) 00244 { 00245 return bytesRead; 00246 } 00247 // mBuffer might have been reallocated inside read() 00248 writePair = getCurrentWriteBuffer(); 00249 00250 getConnectionManager().touch(this); 00251 00252 #ifdef USE_SIGCOMP 00253 // If this is the first data we read, determine whether the 00254 // connection is compressed. 00255 if(mReceivingTransmissionFormat == Unknown) 00256 { 00257 if (((writePair.first[0] & 0xf8) == 0xf8) && mCompression.isEnabled()) 00258 { 00259 mReceivingTransmissionFormat = Compressed; 00260 } 00261 else 00262 { 00263 mReceivingTransmissionFormat = Uncompressed; 00264 } 00265 } 00266 00267 // SigComp compressed messages are handed very differently 00268 // than non-compressed messages: they are guaranteed to 00269 // be framed within SigComp, and each frame contains 00270 // *exactly* one SIP message. Processing looks a lot like 00271 // it does for Datagram-oriented transports. 00272 00273 if (mReceivingTransmissionFormat == Compressed) 00274 { 00275 decompressNewBytes(bytesRead); 00276 } 00277 else 00278 #endif 00279 { 00280 if(!preparseNewBytes(bytesRead)) 00281 { 00282 // Iffy; only way we have right now to indicate that this connection has 00283 // gone away. 00284 bytesRead=-1; 00285 } 00286 } 00287 return bytesRead; 00288 } 00289 00290 bool 00291 Connection::performReads(unsigned int max) 00292 { 00293 int bytesRead; 00294 00295 // if max==0, we will overflow into UINT_MAX. This is intentional. 00296 while((bytesRead = read())>0 && --max!=0) 00297 { 00298 DebugLog(<< "Connection::performReads() " << " read=" << bytesRead); 00299 } 00300 00301 if ( bytesRead < 0 ) 00302 { 00303 DebugLog(<< "Closing connection bytesRead=" << bytesRead); 00304 delete this; 00305 return false; 00306 } 00307 return true; 00308 } 00309 00310 void 00311 Connection::enableFlowTimer() 00312 { 00313 if(!mFlowTimerEnabled) 00314 { 00315 mFlowTimerEnabled = true; 00316 00317 // ensure connection is in a FlowTimer LRU list on the connection manager 00318 getConnectionManager().moveToFlowTimerLru(this); 00319 } 00320 } 00321 00322 void 00323 Connection::onDoubleCRLF() 00324 { 00325 // !bwc! TODO might need to make this more efficient. 00326 // !bwc! Need to make this sigcomp-friendly 00327 if(InteropHelper::getOutboundVersion()>=8) 00328 { 00329 DebugLog(<<"Sending response CRLF (aka pong)."); 00330 requestWrite(new SendData(mWho,Symbols::CRLF,Data::Empty,Data::Empty)); 00331 } 00332 } 00333 00334 void 00335 Connection::onSingleCRLF() 00336 { 00337 DebugLog(<<"Received response CRLF (aka pong)."); 00338 mTransport->keepAlivePong(mWho); 00339 } 00340 00341 bool 00342 Connection::hasDataToRead() 00343 { 00344 return true; 00345 } 00346 00347 bool 00348 Connection::isGood() 00349 { 00350 return true; 00351 } 00352 00353 bool 00354 Connection::isWritable() 00355 { 00356 return true; 00357 } 00358 00362 void 00363 Connection::processPollEvent(FdPollEventMask mask) { 00364 /* The original code in ConnectionManager.cxx didn't check 00365 * for error events unless no writable event. (e.g., writable 00366 * masked error. Why?) 00367 */ 00368 if ( mask & FPEM_Error ) 00369 { 00370 Socket fd = getSocket(); 00371 int errNum = getSocketError(fd); 00372 InfoLog(<< "Exception on socket " << fd << " code: " << errNum << "; closing connection"); 00373 setFailureReason(TransportFailure::ConnectionException, errNum); 00374 delete this; 00375 return; 00376 } 00377 if ( mask & FPEM_Write ) 00378 { 00379 if(!performWrites()) 00380 { 00381 // Just deleted self 00382 return; 00383 } 00384 } 00385 if ( mask & FPEM_Read ) 00386 { 00387 performReads(); 00388 } 00389 } 00390 00391 /* ==================================================================== 00392 * The Vovida Software License, Version 1.0 00393 * 00394 * Copyright (c) 2000 00395 * 00396 * Redistribution and use in source and binary forms, with or without 00397 * modification, are permitted provided that the following conditions 00398 * are met: 00399 * 00400 * 1. Redistributions of source code must retain the above copyright 00401 * notice, this list of conditions and the following disclaimer. 00402 * 00403 * 2. Redistributions in binary form must reproduce the above copyright 00404 * notice, this list of conditions and the following disclaimer in 00405 * the documentation and/or other materials provided with the 00406 * distribution. 00407 * 00408 * 3. The names "VOCAL", "Vovida Open Communication Application Library", 00409 * and "Vovida Open Communication Application Library (VOCAL)" must 00410 * not be used to endorse or promote products derived from this 00411 * software without prior written permission. For written 00412 * permission, please contact vocal@vovida.org. 00413 * 00414 * 4. Products derived from this software may not be called "VOCAL", nor 00415 * may "VOCAL" appear in their name, without prior written 00416 * permission of Vovida Networks, Inc. 00417 * 00418 * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED 00419 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 00420 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND 00421 * NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA 00422 * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES 00423 * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL, 00424 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00425 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00426 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 00427 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00428 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE 00429 * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 00430 * DAMAGE. 00431 * 00432 * ==================================================================== 00433 * 00434 * This software consists of voluntary contributions made by Vovida 00435 * Networks, Inc. and many individuals on behalf of Vovida Networks, 00436 * Inc. For more information on Vovida Networks, Inc., please see 00437 * <http://www.vovida.org/>. 00438 * 00439 * vi: set shiftwidth=3 expandtab: 00440 */
1.7.5.1