/[resiprocate]/main/reflow/Flow.cxx
ViewVC logotype

Contents of /main/reflow/Flow.cxx

Parent Directory Parent Directory | Revision Log Revision Log


Revision 11159 - (show annotations) (download)
Fri Apr 25 14:03:32 2014 UTC (5 years, 8 months ago) by sgodin
File MIME type: text/plain
File size: 30525 byte(s)
-modified asio and boost include file ordering to avoid multiply defined symbol errors on linking
 -in preparation for upgrade of asio drop and support for latest boost 
 -include asio/ssh.hpp everywhere asio.hpp is included
 -include boost headers before others, to ensure we are not redefining stdint definitions in some includes
  and not others

1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4
5 #include <asio.hpp>
6 #ifdef USE_SSL
7 #include <asio/ssl.hpp>
8 #endif
9 #include <boost/function.hpp>
10
11 #include <rutil/Log.hxx>
12 #include <rutil/Logger.hxx>
13 #include <rutil/Timer.hxx>
14 #include <rutil/Lock.hxx>
15
16 #include "FlowManagerSubsystem.hxx"
17 #include "ErrorCode.hxx"
18 #include "Flow.hxx"
19 #include "MediaStream.hxx"
20 #include "FlowDtlsSocketContext.hxx"
21
22 using namespace flowmanager;
23 using namespace resip;
24
25 #ifdef USE_SSL
26 using namespace dtls;
27 #endif
28
29 using namespace std;
30
31 #define MAX_RECEIVE_FIFO_DURATION 10 // seconds
32 #define MAX_RECEIVE_FIFO_SIZE (100 * MAX_RECEIVE_FIFO_DURATION) // 1000 = 1 message every 10 ms for 10 seconds - appropriate for RTP
33
34 #define RESIPROCATE_SUBSYSTEM FlowManagerSubsystem::FLOWMANAGER
35
36 const char* srtp_error_string(err_status_t error)
37 {
38 switch(error)
39 {
40 case err_status_ok:
41 return "nothing to report";
42 break;
43 case err_status_fail:
44 return "unspecified failure";
45 break;
46 case err_status_bad_param:
47 return "unsupported parameter";
48 break;
49 case err_status_alloc_fail:
50 return "couldn't allocate memory";
51 break;
52 case err_status_dealloc_fail:
53 return "couldn't deallocate properly";
54 break;
55 case err_status_init_fail:
56 return "couldn't initialize";
57 break;
58 case err_status_terminus:
59 return "can't process as much data as requested";
60 break;
61 case err_status_auth_fail:
62 return "authentication failure";
63 break;
64 case err_status_cipher_fail:
65 return "cipher failure";
66 break;
67 case err_status_replay_fail:
68 return "replay check failed (bad index)";
69 break;
70 case err_status_replay_old:
71 return "replay check failed (index too old)";
72 break;
73 case err_status_algo_fail:
74 return "algorithm failed test routine";
75 break;
76 case err_status_no_such_op:
77 return "unsupported operation";
78 break;
79 case err_status_no_ctx:
80 return "no appropriate context found";
81 break;
82 case err_status_cant_check:
83 return "unable to perform desired validation";
84 break;
85 case err_status_key_expired:
86 return "can't use key any more";
87 break;
88 case err_status_socket_err:
89 return "error in use of socket";
90 break;
91 case err_status_signal_err:
92 return "error in use POSIX signals";
93 break;
94 case err_status_nonce_bad:
95 return "nonce check failed";
96 break;
97 case err_status_read_fail:
98 return "couldn't read data";
99 break;
100 case err_status_write_fail:
101 return "couldn't write data";
102 break;
103 case err_status_parse_err:
104 return "error pasring data";
105 break;
106 case err_status_encode_err:
107 return "error encoding data";
108 break;
109 case err_status_semaphore_err:
110 return "error while using semaphores";
111 break;
112 case err_status_pfkey_err:
113 return "error while using pfkey";
114 break;
115 default:
116 return "unrecognized error";
117 }
118 }
119
120 Flow::Flow(asio::io_service& ioService,
121 #ifdef USE_SSL
122 asio::ssl::context& sslContext,
123 #endif
124 unsigned int componentId,
125 const StunTuple& localBinding,
126 MediaStream& mediaStream)
127 : mIOService(ioService),
128 #ifdef USE_SSL
129 mSslContext(sslContext),
130 #endif
131 mComponentId(componentId),
132 mLocalBinding(localBinding),
133 mMediaStream(mediaStream),
134 mAllocationProps(StunMessage::PropsNone),
135 mReservationToken(0),
136 mFlowState(Unconnected),
137 mReceivedDataFifo(MAX_RECEIVE_FIFO_DURATION,MAX_RECEIVE_FIFO_SIZE)
138 {
139 InfoLog(<< "Flow: flow created for " << mLocalBinding << " ComponentId=" << mComponentId);
140
141 switch(mLocalBinding.getTransportType())
142 {
143 case StunTuple::UDP:
144 mTurnSocket.reset(new TurnAsyncUdpSocket(mIOService, this, mLocalBinding.getAddress(), mLocalBinding.getPort()));
145 break;
146 case StunTuple::TCP:
147 mTurnSocket.reset(new TurnAsyncTcpSocket(mIOService, this, mLocalBinding.getAddress(), mLocalBinding.getPort()));
148 break;
149 #ifdef USE_SSL
150 case StunTuple::TLS:
151 mTurnSocket.reset(new TurnAsyncTlsSocket(mIOService,
152 mSslContext,
153 false, // validateServerCertificateHostname - TODO - make this configurable
154 this,
155 mLocalBinding.getAddress(),
156 mLocalBinding.getPort()));
157 #endif
158 break;
159 default:
160 // Bad Transport type!
161 assert(false);
162 }
163
164 if(mTurnSocket.get() &&
165 mMediaStream.mNatTraversalMode != MediaStream::NoNatTraversal &&
166 !mMediaStream.mStunUsername.empty() &&
167 !mMediaStream.mStunPassword.empty())
168 {
169 mTurnSocket->setUsernameAndPassword(mMediaStream.mStunUsername.c_str(), mMediaStream.mStunPassword.c_str(), false);
170 }
171 }
172
173 Flow::~Flow()
174 {
175 InfoLog(<< "Flow: flow destroyed for " << mLocalBinding << " ComponentId=" << mComponentId);
176
177
178 #ifdef USE_SSL
179 // Cleanup DtlsSockets
180 {
181 Lock lock(mMutex);
182 std::map<reTurn::StunTuple, dtls::DtlsSocket*>::iterator it;
183 for(it = mDtlsSockets.begin(); it != mDtlsSockets.end(); it++)
184 {
185 delete it->second;
186 }
187 }
188 #endif //USE_SSL
189
190 // Cleanup TurnSocket
191 if(mTurnSocket.get())
192 {
193 mTurnSocket->disableTurnAsyncHandler();
194 mTurnSocket->close();
195 }
196 }
197
198 void
199 Flow::activateFlow(UInt64 reservationToken)
200 {
201 mReservationToken = reservationToken;
202 activateFlow(StunMessage::PropsNone);
203 }
204
205 void
206 Flow::activateFlow(UInt8 allocationProps)
207 {
208 mAllocationProps = allocationProps;
209
210 if(mTurnSocket.get())
211 {
212 if(mMediaStream.mNatTraversalMode != MediaStream::NoNatTraversal &&
213 !mMediaStream.mNatTraversalServerHostname.empty())
214 {
215 changeFlowState(ConnectingServer);
216 mTurnSocket->connect(mMediaStream.mNatTraversalServerHostname.c_str(),
217 mMediaStream.mNatTraversalServerPort);
218 }
219 else
220 {
221 changeFlowState(Ready);
222 mMediaStream.onFlowReady(mComponentId);
223 }
224 }
225 }
226
227 unsigned int
228 Flow::getSelectSocketDescriptor()
229 {
230 return mFakeSelectSocketDescriptor.getSocketDescriptor();
231 }
232
233 unsigned int
234 Flow::getSocketDescriptor()
235 {
236 if(mTurnSocket.get() != 0)
237 {
238 return mTurnSocket->getSocketDescriptor();
239 }
240 else
241 {
242 return 0;
243 }
244 }
245
246 // Turn Send Methods
247 void
248 Flow::send(char* buffer, unsigned int size)
249 {
250 assert(mTurnSocket.get());
251 if(isReady())
252 {
253 if(processSendData(buffer, size, mTurnSocket->getConnectedAddress(), mTurnSocket->getConnectedPort()))
254 {
255 mTurnSocket->send(buffer, size);
256 }
257 }
258 else
259 {
260 onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::InvalidState, asio::error::misc_category));
261 }
262 }
263
264 void
265 Flow::sendTo(const asio::ip::address& address, unsigned short port, char* buffer, unsigned int size)
266 {
267 assert(mTurnSocket.get());
268 if(isReady())
269 {
270 if(processSendData(buffer, size, address, port))
271 {
272 mTurnSocket->sendTo(address, port, buffer, size);
273 }
274 }
275 else
276 {
277 onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::InvalidState, asio::error::misc_category));
278 }
279 }
280
281 // Note: this fn is used to send raw data to the far end, without attempting to SRTP encrypt it - ie. used for sending DTLS traffic
282 void
283 Flow::rawSendTo(const asio::ip::address& address, unsigned short port, const char* buffer, unsigned int size)
284 {
285 assert(mTurnSocket.get());
286 mTurnSocket->sendTo(address, port, buffer, size);
287 }
288
289
290 bool
291 Flow::processSendData(char* buffer, unsigned int& size, const asio::ip::address& address, unsigned short port)
292 {
293 if(mMediaStream.mSRTPSessionOutCreated)
294 {
295 err_status_t status = mMediaStream.srtpProtect((void*)buffer, (int*)&size, mComponentId == RTCP_COMPONENT_ID);
296 if(status != err_status_ok)
297 {
298 ErrLog(<< "Unable to SRTP protect the packet, error code=" << status << "(" << srtp_error_string(status) << ") ComponentId=" << mComponentId);
299 onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::SRTPError, asio::error::misc_category));
300 return false;
301 }
302 }
303 #ifdef USE_SSL
304 else
305 {
306 Lock lock(mMutex);
307 DtlsSocket* dtlsSocket = getDtlsSocket(StunTuple(mLocalBinding.getTransportType(), address, port));
308 if(dtlsSocket)
309 {
310 if(((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->isSrtpInitialized())
311 {
312 err_status_t status = ((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->srtpProtect((void*)buffer, (int*)&size, mComponentId == RTCP_COMPONENT_ID);
313 if(status != err_status_ok)
314 {
315 ErrLog(<< "Unable to SRTP protect the packet, error code=" << status << "(" << srtp_error_string(status) << ") ComponentId=" << mComponentId);
316 onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::SRTPError, asio::error::misc_category));
317 return false;
318 }
319 }
320 else
321 {
322 //WarningLog(<< "Unable to send packet yet - handshake is not completed yet, ComponentId=" << mComponentId);
323 onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::InvalidState, asio::error::misc_category));
324 return false;
325 }
326 }
327 }
328 #endif //USE_SSL
329
330 return true;
331 }
332
333
334
335 // Receive Methods
336 asio::error_code
337 Flow::receiveFrom(const asio::ip::address& address, unsigned short port, char* buffer, unsigned int& size, unsigned int timeout)
338 {
339 bool done = false;
340 asio::error_code errorCode;
341
342 UInt64 startTime = Timer::getTimeMs();
343 unsigned int recvTimeout;
344 while(!done)
345 {
346 // We define timeout of 0 differently then TimeLimitFifo - we want 0 to mean no-block at all
347 if(timeout == 0 && mReceivedDataFifo.empty())
348 {
349 // timeout
350 return asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
351 }
352
353 recvTimeout = timeout ? (unsigned int)(timeout - (Timer::getTimeMs() - startTime)) : 0;
354 if(timeout != 0 && recvTimeout <= 0)
355 {
356 // timeout
357 return asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
358 }
359 ReceivedData* receivedData = mReceivedDataFifo.getNext(recvTimeout);
360 if(receivedData)
361 {
362 mFakeSelectSocketDescriptor.receive();
363
364 // discard any data not from address/port requested
365 if(address == receivedData->mAddress && port == receivedData->mPort)
366 {
367 errorCode = processReceivedData(buffer, size, receivedData);
368 done = true;
369 }
370 delete receivedData;
371 }
372 else
373 {
374 // timeout
375 errorCode = asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
376 done = true;
377 }
378 }
379 return errorCode;
380 }
381
382 asio::error_code
383 Flow::receive(char* buffer, unsigned int& size, unsigned int timeout, asio::ip::address* sourceAddress, unsigned short* sourcePort)
384 {
385 asio::error_code errorCode;
386
387 //InfoLog(<< "Flow::receive called with buffer size=" << size << ", timeout=" << timeout);
388 // We define timeout of 0 differently then TimeLimitFifo - we want 0 to mean no-block at all
389 if(timeout == 0 && mReceivedDataFifo.empty())
390 {
391 // timeout
392 InfoLog(<< "Receive timeout (timeout==0 and fifo empty)!");
393 return asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
394 }
395 if(mReceivedDataFifo.empty())
396 {
397 WarningLog(<< "Receive called when there is no data available! ComponentId=" << mComponentId);
398 }
399
400 ReceivedData* receivedData = mReceivedDataFifo.getNext(timeout);
401 if(receivedData)
402 {
403 mFakeSelectSocketDescriptor.receive();
404 errorCode = processReceivedData(buffer, size, receivedData, sourceAddress, sourcePort);
405 delete receivedData;
406 }
407 else
408 {
409 // timeout
410 InfoLog(<< "Receive timeout! ComponentId=" << mComponentId);
411 errorCode = asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category);
412 }
413 return errorCode;
414 }
415
416
417 asio::error_code
418 Flow::processReceivedData(char* buffer, unsigned int& size, ReceivedData* receivedData, asio::ip::address* sourceAddress, unsigned short* sourcePort)
419 {
420 asio::error_code errorCode;
421 unsigned int receivedsize = receivedData->mData->size();
422
423 // SRTP Unprotect (if required)
424 if(mMediaStream.mSRTPSessionInCreated)
425 {
426 err_status_t status = mMediaStream.srtpUnprotect((void*)receivedData->mData->data(), (int*)&receivedsize, mComponentId == RTCP_COMPONENT_ID);
427 if(status != err_status_ok)
428 {
429 ErrLog(<< "Unable to SRTP unprotect the packet (componentid=" << mComponentId << "), error code=" << status << "(" << srtp_error_string(status) << ")");
430 //errorCode = asio::error_code(flowmanager::SRTPError, asio::error::misc_category);
431 }
432 }
433 #ifdef USE_SSL
434 else
435 {
436 Lock lock(mMutex);
437 DtlsSocket* dtlsSocket = getDtlsSocket(StunTuple(mLocalBinding.getTransportType(), receivedData->mAddress, receivedData->mPort));
438 if(dtlsSocket)
439 {
440 if(((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->isSrtpInitialized())
441 {
442 err_status_t status = ((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->srtpUnprotect((void*)receivedData->mData->data(), (int*)&receivedsize, mComponentId == RTCP_COMPONENT_ID);
443 if(status != err_status_ok)
444 {
445 ErrLog(<< "Unable to SRTP unprotect the packet (componentid=" << mComponentId << "), error code=" << status << "(" << srtp_error_string(status) << ")");
446 //errorCode = asio::error_code(flowmanager::SRTPError, asio::error::misc_category);
447 }
448 }
449 else
450 {
451 //WarningLog(<< "Unable to send packet yet - handshake is not completed yet, ComponentId=" << mComponentId);
452 errorCode = asio::error_code(flowmanager::InvalidState, asio::error::misc_category);
453 }
454 }
455 }
456 #endif //USE_SSL
457 if(!errorCode)
458 {
459 if(size > receivedsize)
460 {
461 size = receivedsize;
462 memcpy(buffer, receivedData->mData->data(), size);
463 //InfoLog(<< "Received a buffer of size=" << receivedData->mData.size());
464 }
465 else
466 {
467 // Receive buffer too small
468 InfoLog(<< "Receive buffer too small for data size=" << receivedsize << " ComponentId=" << mComponentId);
469 errorCode = asio::error_code(flowmanager::BufferTooSmall, asio::error::misc_category);
470 }
471 if(sourceAddress)
472 {
473 *sourceAddress = receivedData->mAddress;
474 }
475 if(sourcePort)
476 {
477 *sourcePort = receivedData->mPort;
478 }
479 }
480 return errorCode;
481 }
482
483 void
484 Flow::setActiveDestination(const char* address, unsigned short port)
485 {
486 if(mTurnSocket.get())
487 {
488 if(mMediaStream.mNatTraversalMode != MediaStream::TurnAllocation)
489 {
490 changeFlowState(Connecting);
491 mTurnSocket->connect(address, port);
492 }
493 else
494 {
495 mTurnSocket->setActiveDestination(asio::ip::address::from_string(address), port);
496
497 }
498 }
499 else
500 WarningLog(<<"No TURN Socket, can't send media to destination");
501 }
502
503 #ifdef USE_SSL
504 void
505 Flow::startDtlsClient(const char* address, unsigned short port)
506 {
507 Lock lock(mMutex);
508 createDtlsSocketClient(StunTuple(mLocalBinding.getTransportType(), asio::ip::address::from_string(address), port));
509 }
510 #endif
511
512 void
513 Flow::setRemoteSDPFingerprint(const resip::Data& fingerprint)
514 {
515 Lock lock(mMutex);
516 mRemoteSDPFingerprint = fingerprint;
517
518 #ifdef USE_SSL
519 // Check all existing DtlsSockets and tear down those that don't match
520 std::map<reTurn::StunTuple, dtls::DtlsSocket*>::iterator it;
521 for(it = mDtlsSockets.begin(); it != mDtlsSockets.end(); it++)
522 {
523 if(it->second->handshakeCompleted() &&
524 !it->second->checkFingerprint(fingerprint.c_str(), fingerprint.size()))
525 {
526 InfoLog(<< "Marking Dtls socket bad with non-matching fingerprint!");
527 ((FlowDtlsSocketContext*)it->second->getSocketContext())->fingerprintMismatch();
528 }
529 }
530 #endif //USE_SSL
531 }
532
533 const resip::Data
534 Flow::getRemoteSDPFingerprint()
535 {
536 Lock lock(mMutex);
537 return mRemoteSDPFingerprint;
538 }
539
540 const StunTuple&
541 Flow::getLocalTuple()
542 {
543 return mLocalBinding;
544 }
545
546 StunTuple
547 Flow::getSessionTuple()
548 {
549 assert(mFlowState == Ready);
550 Lock lock(mMutex);
551
552 if(mMediaStream.mNatTraversalMode == MediaStream::TurnAllocation)
553 {
554 return mRelayTuple;
555 }
556 else if(mMediaStream.mNatTraversalMode == MediaStream::StunBindDiscovery)
557 {
558 return mReflexiveTuple;
559 }
560 return mLocalBinding;
561 }
562
563 StunTuple
564 Flow::getRelayTuple()
565 {
566 assert(mFlowState == Ready);
567 Lock lock(mMutex);
568 return mRelayTuple;
569 }
570
571 StunTuple
572 Flow::getReflexiveTuple()
573 {
574 assert(mFlowState == Ready);
575 Lock lock(mMutex);
576 return mReflexiveTuple;
577 }
578
579 UInt64
580 Flow::getReservationToken()
581 {
582 assert(mFlowState == Ready);
583 Lock lock(mMutex);
584 return mReservationToken;
585 }
586
587 void
588 Flow::onConnectSuccess(unsigned int socketDesc, const asio::ip::address& address, unsigned short port)
589 {
590 InfoLog(<< "Flow::onConnectSuccess: socketDesc=" << socketDesc << ", address=" << address.to_string() << ", port=" << port << ", componentId=" << mComponentId);
591
592 // Start candidate discovery
593 switch(mMediaStream.mNatTraversalMode)
594 {
595 case MediaStream::StunBindDiscovery:
596 if(mFlowState == ConnectingServer)
597 {
598 changeFlowState(Binding);
599 mTurnSocket->bindRequest();
600 }
601 else
602 {
603 changeFlowState(Ready);
604 mMediaStream.onFlowReady(mComponentId);
605 }
606 break;
607 case MediaStream::TurnAllocation:
608 changeFlowState(Allocating);
609 mTurnSocket->createAllocation(TurnAsyncSocket::UnspecifiedLifetime,
610 TurnAsyncSocket::UnspecifiedBandwidth,
611 mAllocationProps,
612 mReservationToken != 0 ? mReservationToken : TurnAsyncSocket::UnspecifiedToken,
613 StunTuple::UDP); // Always relay as UDP
614 break;
615 case MediaStream::NoNatTraversal:
616 default:
617 changeFlowState(Ready);
618 mMediaStream.onFlowReady(mComponentId);
619 break;
620 }
621 }
622
623 void
624 Flow::onConnectFailure(unsigned int socketDesc, const asio::error_code& e)
625 {
626 WarningLog(<< "Flow::onConnectFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << ", componentId=" << mComponentId);
627 changeFlowState(Unconnected);
628 mMediaStream.onFlowError(mComponentId, e.value()); // TODO define different error code?
629 }
630
631
632 void
633 Flow::onSharedSecretSuccess(unsigned int socketDesc, const char* username, unsigned int usernameSize, const char* password, unsigned int passwordSize)
634 {
635 InfoLog(<< "Flow::onSharedSecretSuccess: socketDesc=" << socketDesc << ", username=" << username << ", password=" << password << ", componentId=" << mComponentId);
636 }
637
638 void
639 Flow::onSharedSecretFailure(unsigned int socketDesc, const asio::error_code& e)
640 {
641 WarningLog(<< "Flow::onSharedSecretFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << "), componentId=" << mComponentId );
642 }
643
644 void
645 Flow::onBindSuccess(unsigned int socketDesc, const StunTuple& reflexiveTuple, const StunTuple& stunServerTuple)
646 {
647 InfoLog(<< "Flow::onBindingSuccess: socketDesc=" << socketDesc << ", reflexive=" << reflexiveTuple << ", componentId=" << mComponentId);
648 {
649 Lock lock(mMutex);
650 mReflexiveTuple = reflexiveTuple;
651 }
652 changeFlowState(Ready);
653 mMediaStream.onFlowReady(mComponentId);
654 }
655 void
656 Flow::onBindFailure(unsigned int socketDesc, const asio::error_code& e, const StunTuple& stunServerTuple)
657 {
658 WarningLog(<< "Flow::onBindingFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << "), componentId=" << mComponentId );
659 changeFlowState(Connected);
660 mMediaStream.onFlowError(mComponentId, e.value()); // TODO define different error code?
661 }
662
663 void
664 Flow::onAllocationSuccess(unsigned int socketDesc, const StunTuple& reflexiveTuple, const StunTuple& relayTuple, unsigned int lifetime, unsigned int bandwidth, UInt64 reservationToken)
665 {
666 InfoLog(<< "Flow::onAllocationSuccess: socketDesc=" << socketDesc <<
667 ", reflexive=" << reflexiveTuple <<
668 ", relay=" << relayTuple <<
669 ", lifetime=" << lifetime <<
670 ", bandwidth=" << bandwidth <<
671 ", reservationToken=" << reservationToken <<
672 ", componentId=" << mComponentId);
673 {
674 Lock lock(mMutex);
675 mReflexiveTuple = reflexiveTuple;
676 mRelayTuple = relayTuple;
677 mReservationToken = reservationToken;
678 }
679 changeFlowState(Ready);
680 mMediaStream.onFlowReady(mComponentId);
681 }
682
683 void
684 Flow::onAllocationFailure(unsigned int socketDesc, const asio::error_code& e)
685 {
686 WarningLog(<< "Flow::onAllocationFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << "), componentId=" << mComponentId );
687 changeFlowState(Connected);
688 mMediaStream.onFlowError(mComponentId, e.value()); // TODO define different error code?
689 }
690
691 void
692 Flow::onRefreshSuccess(unsigned int socketDesc, unsigned int lifetime)
693 {
694 InfoLog(<< "Flow::onRefreshSuccess: socketDesc=" << socketDesc << ", lifetime=" << lifetime << ", componentId=" << mComponentId);
695 if(lifetime == 0)
696 {
697 changeFlowState(Connected);
698 }
699 }
700
701 void
702 Flow::onRefreshFailure(unsigned int socketDesc, const asio::error_code& e)
703 {
704 WarningLog(<< "Flow::onRefreshFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << "), componentId=" << mComponentId );
705 }
706
707 void
708 Flow::onSetActiveDestinationSuccess(unsigned int socketDesc)
709 {
710 InfoLog(<< "Flow::onSetActiveDestinationSuccess: socketDesc=" << socketDesc << ", componentId=" << mComponentId);
711 }
712
713 void
714 Flow::onSetActiveDestinationFailure(unsigned int socketDesc, const asio::error_code& e)
715 {
716 WarningLog(<< "Flow::onSetActiveDestinationFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << "), componentId=" << mComponentId );
717 }
718
719 void
720 Flow::onClearActiveDestinationSuccess(unsigned int socketDesc)
721 {
722 InfoLog(<< "Flow::onClearActiveDestinationSuccess: socketDesc=" << socketDesc << ", componentId=" << mComponentId);
723 }
724
725 void
726 Flow::onClearActiveDestinationFailure(unsigned int socketDesc, const asio::error_code& e)
727 {
728 WarningLog(<< "Flow::onClearActiveDestinationFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << "), componentId=" << mComponentId );
729 }
730
731 void
732 Flow::onChannelBindRequestSent(unsigned int socketDesc, unsigned short channelNumber)
733 {
734 InfoLog(<< "Flow::onChannelBindRequestSent: socketDesc=" << socketDesc << ", channelNumber=" << channelNumber << ", componentId=" << mComponentId);
735 }
736
737 void
738 Flow::onChannelBindSuccess(unsigned int socketDesc, unsigned short channelNumber)
739 {
740 InfoLog(<< "Flow::onChannelBindSuccess: socketDesc=" << socketDesc << ", channelNumber=" << channelNumber << ", componentId=" << mComponentId);
741 }
742
743 void
744 Flow::onChannelBindFailure(unsigned int socketDesc, const asio::error_code& e)
745 {
746 WarningLog(<< "Flow::onChannelBindFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << "), componentId=" << mComponentId );
747 }
748
749 void
750 Flow::onSendSuccess(unsigned int socketDesc)
751 {
752 //InfoLog(<< "Flow::onSendSuccess: socketDesc=" << socketDesc);
753 }
754
755 void
756 Flow::onSendFailure(unsigned int socketDesc, const asio::error_code& e)
757 {
758 if(e.value() == InvalidState)
759 {
760 // Note: if setActiveDestination is called it can take some time to "connect" the socket to the destination
761 // and send requests during this time, will be discarded - this can be considered normal
762 InfoLog(<< "Flow::onSendFailure: socketDesc=" << socketDesc << " socket is not in correct state to send yet, componentId=" << mComponentId );
763 }
764 else
765 {
766 WarningLog(<< "Flow::onSendFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << "), componentId=" << mComponentId );
767 }
768 }
769
770 void
771 Flow::onReceiveSuccess(unsigned int socketDesc, const asio::ip::address& address, unsigned short port, boost::shared_ptr<reTurn::DataBuffer>& data)
772 {
773 DebugLog(<< "Flow::onReceiveSuccess: socketDesc=" << socketDesc << ", fromAddress=" << address.to_string() << ", fromPort=" << port << ", size=" << data->size() << ", componentId=" << mComponentId);
774
775 #ifdef USE_SSL
776 // Check if packet is a dtls packet - if so then process it
777 // Note: Stun messaging should be picked off by the reTurn library - so we only need to tell the difference between DTLS and SRTP here
778 if(DtlsFactory::demuxPacket((const unsigned char*) data->data(), data->size()) == DtlsFactory::dtls)
779 {
780 Lock lock(mMutex);
781
782 StunTuple endpoint(mLocalBinding.getTransportType(), address, port);
783 DtlsSocket* dtlsSocket = getDtlsSocket(endpoint);
784 if(!dtlsSocket)
785 {
786 // If don't have a socket already for this endpoint and we are receiving data, then assume we are the server side of the DTLS connection
787 dtlsSocket = createDtlsSocketServer(endpoint);
788 }
789 if(dtlsSocket)
790 {
791 dtlsSocket->handlePacketMaybe((const unsigned char*) data->data(), data->size());
792 }
793
794 // Packet was a DTLS packet - do not queue for app
795 return;
796 }
797 #endif
798
799 if(!mReceivedDataFifo.add(new ReceivedData(address, port, data), ReceivedDataFifo::EnforceTimeDepth))
800 {
801 WarningLog(<< "Flow::onReceiveSuccess: TimeLimitFifo is full - discarding data! componentId=" << mComponentId);
802 }
803 else
804 {
805 mFakeSelectSocketDescriptor.send();
806 }
807 }
808
809 void
810 Flow::onReceiveFailure(unsigned int socketDesc, const asio::error_code& e)
811 {
812 WarningLog(<< "Flow::onReceiveFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << "), componentId=" << mComponentId);
813
814 // Make sure we keep receiving if we get an ICMP error on a UDP socket
815 if(e.value() == asio::error::connection_reset && mLocalBinding.getTransportType() == StunTuple::UDP)
816 {
817 assert(mTurnSocket.get());
818 mTurnSocket->turnReceive();
819 }
820 }
821
822 void
823 Flow::onIncomingBindRequestProcessed(unsigned int socketDesc, const StunTuple& sourceTuple)
824 {
825 InfoLog(<< "Flow::onIncomingBindRequestProcessed: socketDesc=" << socketDesc << ", sourceTuple=" << sourceTuple );
826 // TODO - handle
827 }
828
829 void
830 Flow::changeFlowState(FlowState newState)
831 {
832 InfoLog(<< "Flow::changeState: oldState=" << flowStateToString(mFlowState) << ", newState=" << flowStateToString(newState) << ", componentId=" << mComponentId);
833 mFlowState = newState;
834 }
835
836 const char*
837 Flow::flowStateToString(FlowState state)
838 {
839 switch(state)
840 {
841 case Unconnected:
842 return "Unconnected";
843 case ConnectingServer:
844 return "ConnectingServer";
845 case Connecting:
846 return "Connecting";
847 case Binding:
848 return "Binding";
849 case Allocating:
850 return "Allocating";
851 case Connected:
852 return "Connected";
853 case Ready:
854 return "Ready";
855 default:
856 assert(false);
857 return "Unknown";
858 }
859 }
860
861 #ifdef USE_SSL
862 DtlsSocket*
863 Flow::getDtlsSocket(const StunTuple& endpoint)
864 {
865 std::map<reTurn::StunTuple, dtls::DtlsSocket*>::iterator it = mDtlsSockets.find(endpoint);
866 if(it != mDtlsSockets.end())
867 {
868 return it->second;
869 }
870 return 0;
871 }
872
873 DtlsSocket*
874 Flow::createDtlsSocketClient(const StunTuple& endpoint)
875 {
876 DtlsSocket* dtlsSocket = getDtlsSocket(endpoint);
877 if(!dtlsSocket && mMediaStream.mDtlsFactory)
878 {
879 InfoLog(<< "Creating DTLS Client socket, componentId=" << mComponentId);
880 std::auto_ptr<DtlsSocketContext> socketContext(new FlowDtlsSocketContext(*this, endpoint.getAddress(), endpoint.getPort()));
881 dtlsSocket = mMediaStream.mDtlsFactory->createClient(socketContext);
882 dtlsSocket->startClient();
883 mDtlsSockets[endpoint] = dtlsSocket;
884 }
885
886 return dtlsSocket;
887 }
888
889 DtlsSocket*
890 Flow::createDtlsSocketServer(const StunTuple& endpoint)
891 {
892 DtlsSocket* dtlsSocket = getDtlsSocket(endpoint);
893 if(!dtlsSocket && mMediaStream.mDtlsFactory)
894 {
895 InfoLog(<< "Creating DTLS Server socket, componentId=" << mComponentId);
896 std::auto_ptr<DtlsSocketContext> socketContext(new FlowDtlsSocketContext(*this, endpoint.getAddress(), endpoint.getPort()));
897 dtlsSocket = mMediaStream.mDtlsFactory->createServer(socketContext);
898 mDtlsSockets[endpoint] = dtlsSocket;
899 }
900
901 return dtlsSocket;
902 }
903
904 #endif
905
906 /* ====================================================================
907
908 Copyright (c) 2007-2008, Plantronics, Inc.
909 All rights reserved.
910
911 Redistribution and use in source and binary forms, with or without
912 modification, are permitted provided that the following conditions are
913 met:
914
915 1. Redistributions of source code must retain the above copyright
916 notice, this list of conditions and the following disclaimer.
917
918 2. Redistributions in binary form must reproduce the above copyright
919 notice, this list of conditions and the following disclaimer in the
920 documentation and/or other materials provided with the distribution.
921
922 3. Neither the name of Plantronics nor the names of its contributors
923 may be used to endorse or promote products derived from this
924 software without specific prior written permission.
925
926 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
927 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
928 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
929 A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
930 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
931 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
932 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
933 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
934 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
935 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
936 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
937
938 ==================================================================== */

Properties

Name Value
svn:eol-style native
svn:mime-type text/plain

webmaster AT resiprocate DOT org
ViewVC Help
Powered by ViewVC 1.1.27