/[resiprocate]/main/resip/stack/TcpBaseTransport.cxx
ViewVC logotype

Contents of /main/resip/stack/TcpBaseTransport.cxx

Parent Directory Parent Directory | Revision Log Revision Log


Revision 11235 - (show annotations) (download)
Mon Sep 15 17:20:11 2014 UTC (5 years, 2 months ago) by dpocock
File MIME type: text/plain
File size: 15337 byte(s)
resip/stack: TcpBaseTransport: generalize support for SO_NOSIGPIPE
1 #if defined(HAVE_CONFIG_H)
2 #include "config.h"
3 #endif
4
5 #include <memory>
6 #include "rutil/compat.hxx"
7 #include "rutil/Socket.hxx"
8 #include "rutil/Data.hxx"
9 #include "rutil/DnsUtil.hxx"
10 #include "rutil/Logger.hxx"
11 #include "rutil/NetNs.hxx"
12 #include "resip/stack/TcpBaseTransport.hxx"
13
14 #define RESIPROCATE_SUBSYSTEM Subsystem::TRANSPORT
15
16 using namespace std;
17 using namespace resip;
18
19
20 const size_t TcpBaseTransport::MaxWriteSize = 4096;
21 const size_t TcpBaseTransport::MaxReadSize = 4096;
22
23 TcpBaseTransport::TcpBaseTransport(Fifo<TransactionMessage>& fifo,
24 int portNum, IpVersion version,
25 const Data& pinterface,
26 AfterSocketCreationFuncPtr socketFunc,
27 Compression &compression,
28 unsigned transportFlags,
29 const Data& netNs)
30 : InternalTransport(fifo, portNum, version, pinterface, socketFunc, compression, transportFlags, netNs)
31 {
32 if ( (mTransportFlags & RESIP_TRANSPORT_FLAG_NOBIND)==0 )
33 {
34 #ifdef USE_NETNS
35 DebugLog(<< "TcpBaseTransport: " << this << " netns: " << netNs);
36 // setns here
37 NetNs::setNs(netNs);
38 #endif
39 mFd = InternalTransport::socket(TCP, version);
40 }
41 }
42
43
44 TcpBaseTransport::~TcpBaseTransport()
45 {
46 //DebugLog (<< "Shutting down TCP Transport " << this << " " << mFd << " " << mInterface << ":" << port());
47
48 // !jf! this is not right. should drain the sends before
49 while (mTxFifoOutBuffer.messageAvailable())
50 {
51 SendData* data = mTxFifoOutBuffer.getNext();
52 InfoLog (<< "Throwing away queued data for " << data->destination);
53
54 fail(data->transactionId, TransportFailure::TransportShutdown);
55 delete data;
56 }
57 DebugLog (<< "Shutting down " << mTuple);
58 //mSendRoundRobin.clear(); // clear before we delete the connections
59 if(mPollGrp && mPollItemHandle)
60 {
61 mPollGrp->delPollItem(mPollItemHandle);
62 mPollItemHandle=0;
63 }
64 }
65
66 // called from constructor of TcpTransport
67 void
68 TcpBaseTransport::init()
69 {
70 if ( (mTransportFlags & RESIP_TRANSPORT_FLAG_NOBIND)!=0 )
71 {
72 return;
73 }
74
75 //DebugLog (<< "Opening TCP " << mFd << " : " << this);
76
77 int on = 1;
78 #if !defined(WIN32)
79 if ( ::setsockopt ( mFd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) )
80 #else
81 if ( ::setsockopt ( mFd, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on)) )
82 #endif
83 {
84 int e = getErrno();
85 InfoLog (<< "Couldn't set sockoptions SO_REUSEPORT | SO_REUSEADDR: " << strerror(e));
86 error(e);
87 throw Exception("Failed setsockopt", __FILE__,__LINE__);
88 }
89
90 bind();
91 makeSocketNonBlocking(mFd);
92
93 // do the listen, seting the maximum queue size for compeletly established
94 // sockets -- on linux, tcp_max_syn_backlog should be used for the incomplete
95 // queue size(see man listen)
96 int e = listen(mFd,64 );
97
98 if (e != 0 )
99 {
100 int e = getErrno();
101 InfoLog (<< "Failed listen " << strerror(e));
102 error(e);
103 // !cj! deal with errors
104 throw Transport::Exception("Address already in use", __FILE__,__LINE__);
105 }
106 }
107
108 // ?kw?: when should this be called relative to init() above? merge?
109 void
110 TcpBaseTransport::setPollGrp(FdPollGrp *grp)
111 {
112 if(mPollGrp && mPollItemHandle)
113 {
114 mPollGrp->delPollItem(mPollItemHandle);
115 mPollItemHandle=0;
116 }
117
118 if ( mFd!=INVALID_SOCKET && grp)
119 {
120 mPollItemHandle = grp->addPollItem(mFd, FPEM_Read|FPEM_Edge, this);
121 // above released by InternalTransport destructor
122 // ?bwc? Is this really a good idea? If the InternalTransport d'tor is
123 // freeing this, shouldn't InternalTransport::setPollGrp() handle
124 // creating it?
125 }
126 mConnectionManager.setPollGrp(grp);
127
128 InternalTransport::setPollGrp(grp);
129 }
130
131 void
132 TcpBaseTransport::buildFdSet( FdSet& fdset)
133 {
134 assert( mPollGrp==NULL );
135 mConnectionManager.buildFdSet(fdset);
136 if ( mFd!=INVALID_SOCKET )
137 {
138 fdset.setRead(mFd); // for the transport itself (accept)
139 }
140 if(!shareStackProcessAndSelect())
141 {
142 mSelectInterruptor.buildFdSet(fdset);
143 }
144 }
145
146 /**
147 Returns 1 if created new connection, -1 if "bad" error,
148 and 0 if nothing to do (EWOULDBLOCK)
149 **/
150 int
151 TcpBaseTransport::processListen()
152 {
153 if (1)
154 {
155 Tuple tuple(mTuple);
156 struct sockaddr& peer = tuple.getMutableSockaddr();
157 socklen_t peerLen = tuple.length();
158 Socket sock = accept( mFd, &peer, &peerLen);
159 if ( sock == SOCKET_ERROR )
160 {
161 int e = getErrno();
162 switch (e)
163 {
164 case EAGAIN:
165 #if EAGAIN != EWOULDBLOCK
166 case EWOULDBLOCK: // Treat EGAIN and EWOULDBLOCK as the same: http://stackoverflow.com/questions/7003234/which-systems-define-eagain-and-ewouldblock-as-different-values
167 #endif
168 // !jf! this can not be ready in some cases
169 // !kw! this will happen every epoll cycle
170 return 0;
171 default:
172 Transport::error(e);
173 }
174 return -1;
175 }
176 if(!configureConnectedSocket(sock))
177 {
178 throw Exception("Failed to configure connected socket", __FILE__,__LINE__);
179 }
180 makeSocketNonBlocking(sock);
181
182 DebugLog (<< this << " Received TCP connection from: " << tuple << " mTuple: " << mTuple << " as fd=" << sock);
183
184 if (mSocketFunc)
185 {
186 mSocketFunc(sock, transport(), __FILE__, __LINE__);
187 }
188
189 if(!mConnectionManager.findConnection(tuple))
190 {
191 createConnection(tuple, sock, true);
192 }
193 else
194 {
195 InfoLog(<<"Someone probably sent a reciprocal SYN at us.");
196 // ?bwc? Can we call this right after calling accept()?
197 closeSocket(sock);
198 }
199 }
200 return 1;
201 }
202
203 Connection*
204 TcpBaseTransport::makeOutgoingConnection(const Tuple &dest,
205 TransportFailure::FailureReason &failReason, int &failSubCode)
206 {
207 // attempt to open
208 #ifdef USE_NETNS
209 NetNs::setNs(netNs());
210 #endif
211 Socket sock = InternalTransport::socket( TCP, ipVersion());
212 // fdset.clear(sock); !kw! removed as part of epoll impl
213
214 if ( sock == INVALID_SOCKET ) // no socket found - try to free one up and try again
215 {
216 int err = getErrno();
217 InfoLog (<< "Failed to create a socket " << strerror(err));
218 error(err);
219 if(mConnectionManager.gc(ConnectionManager::MinimumGcAge, 1) == 0)
220 {
221 mConnectionManager.gcWithTarget(1); // free one up
222 }
223
224 #ifdef USE_NETNS
225 NetNs::setNs(netNs());
226 #endif
227 sock = InternalTransport::socket( TCP, ipVersion());
228 if ( sock == INVALID_SOCKET )
229 {
230 err = getErrno();
231 WarningLog( << "Error in finding free filedescriptor to use. " << strerror(err));
232 error(err);
233 failReason = TransportFailure::TransportNoSocket;
234 failSubCode = err;
235 return NULL;
236 }
237 }
238
239 assert(sock != INVALID_SOCKET);
240
241 DebugLog (<<"Opening new connection to " << dest);
242 char _sa[RESIP_MAX_SOCKADDR_SIZE];
243 sockaddr *sa = reinterpret_cast<sockaddr*>(_sa);
244 assert(RESIP_MAX_SOCKADDR_SIZE >= mTuple.length());
245 mTuple.copySockaddrAnyPort(sa);
246 #ifdef USE_NETNS
247 NetNs::setNs(netNs());
248 #endif
249 if(::bind(sock, sa, mTuple.length()) != 0)
250 {
251 WarningLog( << "Error in binding to source interface address. " << strerror(errno));
252 failReason = TransportFailure::Failure;
253 failSubCode = errno;
254 return NULL;
255 }
256 if(!configureConnectedSocket(sock))
257 {
258 throw Exception("Failed to configure connected socket", __FILE__,__LINE__);
259 }
260 makeSocketNonBlocking(sock);
261 if (mSocketFunc)
262 {
263 mSocketFunc(sock, transport(), __FILE__, __LINE__);
264 }
265 const sockaddr& servaddr = dest.getSockaddr();
266 int ret = connect( sock, &servaddr, dest.length() );
267
268 // See Chapter 15.3 of Stevens, Unix Network Programming Vol. 1 2nd Edition
269 if (ret == SOCKET_ERROR)
270 {
271 int err = getErrno();
272
273 switch (err)
274 {
275 case EINPROGRESS:
276 case EAGAIN:
277 #if EAGAIN != EWOULDBLOCK
278 case EWOULDBLOCK: // Treat EGAIN and EWOULDBLOCK as the same: http://stackoverflow.com/questions/7003234/which-systems-define-eagain-and-ewouldblock-as-different-values
279 #endif
280 break;
281 default:
282 {
283 // !jf! this has failed
284 InfoLog( << "Error on TCP connect to " << dest << ", err=" << err << ": " << strerror(err));
285 error(err);
286 //fdset.clear(sock);
287 closeSocket(sock);
288 failReason = TransportFailure::TransportBadConnect;
289 failSubCode = err;
290 return NULL;
291 }
292 }
293 }
294
295 // This will add the connection to the manager
296 Connection *conn = createConnection(dest, sock, false);
297 assert(conn);
298 conn->mRequestPostConnectSocketFuncCall = true;
299 return conn;
300 }
301
302 void
303 TcpBaseTransport::processAllWriteRequests()
304 {
305 while (mTxFifoOutBuffer.messageAvailable())
306 {
307 SendData* data = mTxFifoOutBuffer.getNext();
308 DebugLog (<< "Processing write for " << data->destination);
309
310 // this will check by connectionId first, then by address
311 Connection* conn = mConnectionManager.findConnection(data->destination);
312
313 //DebugLog (<< "TcpBaseTransport::processAllWriteRequests() using " << conn);
314
315 #ifdef WIN32
316 if(conn && mPollGrp && mPollGrp->getImplType() == FdPollGrp::PollImpl)
317 {
318 // Workaround for bug in WSAPoll implementation: see
319 // http://daniel.haxx.se/blog/2012/10/10/wsapoll-is-broken/
320 // http://social.msdn.microsoft.com/Forums/windowsdesktop/en-US/18769abd-fca0-4d3c-9884-1a38ce27ae90/wsapoll-and-nonblocking-connects-to-nonexistent-ports?forum=wsk
321 // Note: This is not an ideal solution - since we won't cleanup the connection until
322 // after the connect has timedout and someone else tries to write to the same
323 // destination. However the only impact to users is that requests will take the
324 // full 32 seconds transaction timeout to get an error vs the 21s connect timeout
325 // observered when using the select implemention (vs Poll). This does save us from
326 // having to use some form of timer to periodically check the connect state though.
327 if(conn->checkConnectionTimedout())
328 {
329 // If checkConnectionTimedout returns true, then connection is no longer available.
330 // Clear conn so that we create a new connection below.
331 conn = 0;
332 }
333 }
334 #endif
335
336 // There is no connection yet, so make a client connection
337 if (conn == 0 &&
338 !data->destination.onlyUseExistingConnection &&
339 data->command == 0) // SendData commands (ie. close connection and enable flow timers) shouldn't cause new connections to form
340 {
341 TransportFailure::FailureReason failCode = TransportFailure::Failure;
342 int subCode = 0;
343 if((conn = makeOutgoingConnection(data->destination, failCode, subCode)) == 0)
344 {
345 DebugLog (<< "Failed to create connection: " << data->destination);
346 fail(data->transactionId, failCode, subCode);
347 delete data;
348 // NOTE: We fail this one but don't give up on others in queue
349 return;
350 }
351 assert(conn->getSocket() != INVALID_SOCKET);
352 data->destination.mFlowKey = conn->getSocket();
353 }
354
355 if (conn == 0)
356 {
357 DebugLog (<< "Failed to find connection: " << data->destination);
358 fail(data->transactionId, TransportFailure::TransportNoExistConn, 0);
359 delete data;
360 // NOTE: We fail this one but don't give up on others in queue
361 }
362 else // have a connection
363 {
364 conn->requestWrite(data);
365 }
366 }
367 }
368
369 void
370 TcpBaseTransport::process()
371 {
372 mStateMachineFifo.flush();
373
374 // called within SipStack's thread. There is some risk of
375 // recursion here if connection starts doing anything fancy.
376 // For backward-compat when not-epoll, don't handle transmit synchronously
377 // now, but rather wait for the process() call
378 if (mPollGrp)
379 {
380 processAllWriteRequests();
381 }
382 }
383
384 void
385 TcpBaseTransport::process(FdSet& fdSet)
386 {
387 assert( mPollGrp==NULL );
388
389 processAllWriteRequests();
390
391 // process the connections in ConnectionManager
392 mConnectionManager.process(fdSet);
393
394 mStateMachineFifo.flush();
395
396 // process our own listen/accept socket for incoming connections
397 if (mFd!=INVALID_SOCKET && fdSet.readyToRead(mFd))
398 {
399 processListen();
400 }
401 }
402
403 void
404 TcpBaseTransport::processPollEvent(FdPollEventMask mask) {
405 if ( mask & FPEM_Read )
406 {
407 while ( processListen() > 0 )
408 ;
409 }
410 }
411
412 void
413 TcpBaseTransport::setRcvBufLen(int buflen)
414 {
415 assert(0); // not implemented yet
416 // need to store away the length and use when setting up new connections
417 }
418
419
420
421 /* ====================================================================
422 * The Vovida Software License, Version 1.0
423 *
424 * Copyright (c) 2000 Vovida Networks, Inc. All rights reserved.
425 *
426 * Redistribution and use in source and binary forms, with or without
427 * modification, are permitted provided that the following conditions
428 * are met:
429 *
430 * 1. Redistributions of source code must retain the above copyright
431 * notice, this list of conditions and the following disclaimer.
432 *
433 * 2. Redistributions in binary form must reproduce the above copyright
434 * notice, this list of conditions and the following disclaimer in
435 * the documentation and/or other materials provided with the
436 * distribution.
437 *
438 * 3. The names "VOCAL", "Vovida Open Communication Application Library",
439 * and "Vovida Open Communication Application Library (VOCAL)" must
440 * not be used to endorse or promote products derived from this
441 * software without prior written permission. For written
442 * permission, please contact vocal@vovida.org.
443 *
444 * 4. Products derived from this software may not be called "VOCAL", nor
445 * may "VOCAL" appear in their name, without prior written
446 * permission of Vovida Networks, Inc.
447 *
448 * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
449 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
450 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
451 * NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA
452 * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
453 * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
454 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
455 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
456 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
457 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
458 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
459 * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
460 * DAMAGE.
461 *
462 * ====================================================================
463 *
464 * This software consists of voluntary contributions made by Vovida
465 * Networks, Inc. and many individuals on behalf of Vovida Networks,
466 * Inc. For more information on Vovida Networks, Inc., please see
467 * <http://www.vovida.org/>.
468 *
469 * vi: set shiftwidth=3 expandtab:
470 */

Properties

Name Value
svn:eol-style native
svn:mime-type text/plain

webmaster AT resiprocate DOT org
ViewVC Help
Powered by ViewVC 1.1.27