reSIProcate/stack  9694
Classes | Defines | Functions
testStack.cxx File Reference
#include <sys/types.h>
#include <iostream>
#include <memory>
#include "rutil/GeneralCongestionManager.hxx"
#include "rutil/DnsUtil.hxx"
#include "rutil/Inserter.hxx"
#include "rutil/Logger.hxx"
#include "resip/stack/DeprecatedDialog.hxx"
#include "resip/stack/Helper.hxx"
#include "resip/stack/SipMessage.hxx"
#include "resip/stack/SipStack.hxx"
#include "resip/stack/StackThread.hxx"
#include "rutil/SelectInterruptor.hxx"
#include "resip/stack/TransportThread.hxx"
#include "resip/stack/InterruptableStackThread.hxx"
#include "resip/stack/EventStackThread.hxx"
#include "resip/stack/Uri.hxx"
Include dependency graph for testStack.cxx:

Go to the source code of this file.

Classes

class  SharedAsyncNotify
class  SipStackAndThread
struct  StackThreadPair

Defines

#define RESIPROCATE_SUBSYSTEM   Subsystem::TEST

Functions

static void waitForTwoStacks (SipStackAndThread &receiver, SipStackAndThread &sender, SelectInterruptor *commonIntr, int &thisseltime, bool &isStrange)
static void performTest (int verbose, int runs, int window, int invite, Data &bindIfAddr, int numPorts, int senderPort, int registrarPort, const char *proto, int sendSleepUs, StackThreadPair &pair)
int main (int argc, char *argv[])

Define Documentation

#define RESIPROCATE_SUBSYSTEM   Subsystem::TEST

Definition at line 35 of file testStack.cxx.


Function Documentation

int main ( int  argc,
char *  argv[] 
)

Definition at line 573 of file testStack.cxx.

References context, SipStackAndThread::destroy(), SipStackAndThread::getStack(), increaseLimitFds(), SipStackAndThread::join(), StackThreadPair::mCommonIntr, StackThreadPair::mNoStackThread, StackThreadPair::mSeltime, performTest(), RESIP_TRANSPORT_FLAG_NOBIND, RESIP_TRANSPORT_FLAG_OWNTHREAD, SipStackAndThread::run(), SipStackAndThread::setCongestionManager(), resip::SipStack::setStatisticsInterval(), SipStackAndThread::shutdown(), resip::Data::size(), StunDisabled, TCP, resip::SecurityTypes::TLSv1, transports(), UDP, V4, and V6.

{

   const char* logType = "cout";
   const char* logLevel = "WARNING";
   const char* proto = "tcp";
   const char* bindAddr = "127.0.0.1";
   int doListen = 1;

   int verbose = 0;
   int runs = 10000;
   int window = 100;
   int seltime = 0;
   int v6 = 0;
   int invite=0;
   int numPorts = 1;
   int portBase = 0;
   const char* threadType = "event";
   int tpFlags = 0;
   int sendSleepUs = 0;
   int cManager=0;
   int statisticsInterval=60;

#if defined(HAVE_POPT_H)

   char threadTypeDesc[200];
   strcpy(threadTypeDesc, "none|common|std|intr|multithreadedstack|");
   strcat(threadTypeDesc, FdPollGrp::getImplList());

   struct poptOption table[] = {
      {"log-type",    'l', POPT_ARG_STRING, &logType,   0, "where to send logging messages", "syslog|cerr|cout"},
      {"log-level",   'v', POPT_ARG_STRING, &logLevel,  0, "specify the default log level", "DEBUG|INFO|WARNING|ALERT"},
      {"num-runs",    'r', POPT_ARG_INT,    &runs,      0, "number of runs (SIP requests) in test", 0},
      {"window-size", 'w', POPT_ARG_INT,    &window,    0, "number of concurrent transactions", 0},
      {"select-time", 's', POPT_ARG_INT,    &seltime,   0, "polling interval (ms) for stack thread", 0},
      {"protocol",    'p', POPT_ARG_STRING, &proto,     0, "protocol to use (tcp | udp)", 0},
      {"bind",        'b', POPT_ARG_STRING, &bindAddr,  0, "interface address to bind to",0},
      {"listen",      0,   POPT_ARG_INT,    &doListen,  0, "do not bind/listen sender ports", 0},
      {"verbose",     0,   POPT_ARG_INT,    &verbose,   0, "verbose", 0},
      {"v6",          '6', POPT_ARG_NONE,   &v6     ,   0, "ipv6", 0},
      {"invite",      'i', POPT_ARG_NONE,   &invite ,   0, "send INVITE/BYE instead of REGISTER", 0},
      {"port",        0,   POPT_ARG_INT,    &portBase,  0, "first port to use", 0},
      {"numports",    'n', POPT_ARG_INT,    &numPorts,  0, "number of parallel sessions(ports)", 0},
      {"thread-type", 't', POPT_ARG_STRING, &threadType,0, "stack thread type", threadTypeDesc},
      {"tf",          0,   POPT_ARG_INT,    &tpFlags,   0, "bit encoding of transportFlags", 0},
      {"sleep",       0,   POPT_ARG_INT,    &sendSleepUs,0, "time (us) to sleep after each sent request", 0},
      {"use-congestion-manager",0, POPT_ARG_NONE, &cManager ,   0, "use a CongestionManager", 0},
      {"statistics-interval",       0,   POPT_ARG_INT,    &statisticsInterval,0, "time in seconds between statistics logging", 0},
      POPT_AUTOHELP
      { NULL, 0, 0, NULL, 0 }
   };

   poptContext context = poptGetContext(NULL, argc, const_cast<const char**>(argv), table, 0);
   int pret=poptGetNextOpt(context);
   assert(pret==-1);
   assert( poptGetArg(context)==NULL);
#endif  // popt
   Log::initialize(logType, logLevel, argv[0]);

   Data bindIfAddr(bindAddr);
   if ( bindIfAddr.size()==0 )
   {
      bindIfAddr = DnsUtil::getLocalHostName();
   }

   cout << "Performing " << runs << " runs with"
     <<" win="<<window
     <<" ip"<<(v6?"v4":"v4")
     <<" proto="<<proto
     <<" numports="<<numPorts
     <<" thread="<<threadType
     <<" bindIf="<<bindIfAddr
     <<" listen="<<doListen
     <<" tf="<<tpFlags
     <<"." << endl;

   const char *eachThreadType = threadType;
   SelectInterruptor *commonIntr = NULL;
   AsyncProcessHandler *notifyUp = NULL;
   bool noStackThread = false;
   SharedAsyncNotify sharedUp;
   if ( strcmp(eachThreadType,"none")==0 )
   {
      // Everything runs in single thread,
      // and we spin, just keep cycling thru looking for stuff to do.
      // Default seltime is zero, so no delay at all. This isn't
      // very useful for profiling because we spend all our time checking
      // stuff to do.
      noStackThread = true;
   }
   else if ( strcmp(eachThreadType,"common")==0 )
   {
      // Everything runs in single thread, but thread blocks
      // until there is something to do. When there is something
      // to do within the stack (notifyDn) or app (notifyUp) the
      // common interruptor is invoked and it breaks the select loop.
      seltime = 10*1000;
      commonIntr = new SelectInterruptor();
      notifyUp = commonIntr;
      noStackThread = true;
      eachThreadType = "none";
   }
   else
   {
      notifyUp = &sharedUp;
   }
   SipStackAndThread receiver(eachThreadType, commonIntr, notifyUp);
   SipStackAndThread sender(eachThreadType, commonIntr, notifyUp);
   receiver.getStack().setStatisticsInterval(statisticsInterval);
   sender.getStack().setStatisticsInterval(statisticsInterval);

   IpVersion version = (v6 ? V6 : V4);

   // estimate number of sockets we need:
   // 2x for sender and receiver
   // 3 for UDP (listen + select interruptor) 
   // 4 for TCP (listen + connection + select interruptor)
   // ~30 for misc (DNS, SelectInterruptors)
   int needFds = numPorts * 14 + 30;
   increaseLimitFds(needFds);

   /* On linux, the client TCP connection port range is controll by
    * /proc/sys/net/ipv4/ip_local_port_range, and defaults to [32768,61000].
    * To avoid conflicts when binding, the bound ports below should
    * stay out of the range (e.g., below 32768)
    */


   int senderPort = portBase;
   if ( senderPort==0 )
      senderPort = numPorts==1 ? 25060+(rand()&0x0fff) : 11000;
   int registrarPort = senderPort + numPorts;

   int idx;
   std::vector<Transport*> transports;
   for (idx=0; idx < numPorts; idx++)
   {
      transports.push_back(sender->addTransport(UDP, 
                           senderPort+idx, 
                           version, 
                           StunDisabled, 
                           bindIfAddr,
                           /*sipDomain*/Data::Empty, 
                           /*keypass*/Data::Empty, 
                           SecurityTypes::TLSv1,
                           tpFlags));

      // NOBIND doesn't make sense for UDP
      transports.push_back(sender->addTransport(TCP, 
                           senderPort+idx, 
                           version, 
                           StunDisabled, 
                           bindIfAddr,
                           /*sipDomain*/Data::Empty, 
                           /*keypass*/Data::Empty, 
                           SecurityTypes::TLSv1,
                           tpFlags|(doListen?0:RESIP_TRANSPORT_FLAG_NOBIND)));

      // NOTE: we could also bind receive to bindIfAddr, but existing code
      // doesn't do this. Responses are sent from here, so why don't we?
      transports.push_back(receiver->addTransport(UDP, 
                             registrarPort+idx, 
                             version, 
                             StunDisabled,
                             /*ipInterface*/Data::Empty,
                             /*sipDomain*/Data::Empty, 
                             /*keypass*/Data::Empty, 
                             SecurityTypes::TLSv1,
                             tpFlags));

      transports.push_back(receiver->addTransport(TCP, 
                             registrarPort+idx, 
                             version, 
                             StunDisabled,
                             /*ipInterface*/Data::Empty,
                             /*sipDomain*/Data::Empty, 
                             /*keypass*/Data::Empty, 
                             SecurityTypes::TLSv1,
                             tpFlags));
   }

   std::auto_ptr<CongestionManager> senderCongestionManager;
   std::auto_ptr<CongestionManager> receiverCongestionManager;
   if(cManager)
   {
      senderCongestionManager.reset(new GeneralCongestionManager(
                                    GeneralCongestionManager::WAIT_TIME,
                                    200));
      receiverCongestionManager.reset(new GeneralCongestionManager(
                                    GeneralCongestionManager::WAIT_TIME,
                                    200));
      sender.setCongestionManager(senderCongestionManager.get());
      receiver.setCongestionManager(receiverCongestionManager.get());
   }

   std::vector<TransportThread*> transportThreads;
   if(tpFlags & RESIP_TRANSPORT_FLAG_OWNTHREAD)
   {
      while(!transports.empty())
      {
         transportThreads.push_back(new TransportThread(*transports.back()));
         transportThreads.back()->run();
         transports.pop_back();
      }
   }

   sender.run();
   receiver.run();

   StackThreadPair pair(receiver, sender, sharedUp);
   pair.mSeltime = seltime;
   pair.mCommonIntr = commonIntr;
   pair.mNoStackThread = noStackThread;

   performTest(verbose, runs, window, invite,
      bindIfAddr, numPorts, senderPort, registrarPort, proto,
      sendSleepUs, pair);

   sender.shutdown();
   receiver.shutdown();

   sender.join();
   receiver.join();

   sender.setCongestionManager(0);
   receiver.setCongestionManager(0);

   if(tpFlags&RESIP_TRANSPORT_FLAG_OWNTHREAD)
   {
      while(!transportThreads.empty())
      {
         transportThreads.back()->shutdown();
         transportThreads.back()->join();
         delete transportThreads.back();
         transportThreads.pop_back();
      }
   }

   sender.destroy();
   receiver.destroy();

   if ( commonIntr )
   {
       delete commonIntr;
       commonIntr = NULL;
   }

#if defined(HAVE_POPT_H)
   poptFreeContext(context);
#endif
   return 0;
}

Here is the call graph for this function:

static void performTest ( int  verbose,
int  runs,
int  window,
int  invite,
Data bindIfAddr,
int  numPorts,
int  senderPort,
int  registrarPort,
const char *  proto,
int  sendSleepUs,
StackThreadPair pair 
) [static]

Definition at line 374 of file testStack.cxx.

References resip::DeprecatedDialog::createDialogAsUAC(), DebugLog, resip::Data::empty(), resip::RequestLine::getMethod(), resip::SipMessage::header(), resip::Uri::host(), InfoLog, resip::SipMessage::isRequest(), resip::SipMessage::isResponse(), resip::DeprecatedDialog::makeAck(), resip::DeprecatedDialog::makeBye(), resip::DeprecatedDialog::makeResponse(), resip::RequestLine::method(), StackThreadPair::mReceiver, StackThreadPair::mSender, resip::ParserCategory::param(), resip::Uri::port(), resip::Uri::scheme(), resip::NameAddr::uri(), resip::Uri::user(), and StackThreadPair::wait().

Referenced by main().

{
   NameAddr target;
   target.uri().scheme() = "sip";
   target.uri().user() = "fluffy";
   target.uri().host() = bindIfAddr;
   target.uri().port() = registrarPort;
   target.uri().param(p_transport) = proto;

   NameAddr contact;
   contact.uri().scheme() = "sip";
   contact.uri().user() = "fluffy";

   NameAddr from = target;
   from.uri().port() = senderPort;

   UInt64 startTime = Timer::getTimeMs();
   int outstanding=0;
   int count = 0;
   int sent = 0;

   int rxReqTryCnt = 0, rxReqHitCnt = 0;
   int rxRspTryCnt = 0, rxRspHitCnt = 0;

   while (count < runs)
   {
      //InfoLog (<< "count=" << count << " messages=" << messages.size());

      // load up the send window
      for (int i=0; i<64 && sent < runs && outstanding < window; ++i)
      {
         DebugLog (<< "Sending " << count << " / " << runs << " (" << outstanding << ")");
         target.uri().port() = registrarPort + (sent%numPorts);

         SipMessage* next=0;
         if (invite)
         {
            next = Helper::makeInvite( target, from, contact);
         }
         else
         {
            next = Helper::makeRegister( target, from, contact);
         }

         // The Via header serves two purposes:
         // (1) tells the recipient where to send the response,
         // (2) selects which Transport we send from
         if (!bindIfAddr.empty() && numPorts > 1)
         {
             // currently TCP only honors Via if host is populated
             // the "numPorts>1" test is for backwards compat
             next->header(h_Vias).front().sentHost() = bindIfAddr;
         }
         next->header(h_Vias).front().sentPort() = senderPort + (sent%numPorts);
         pair.mSender->send(std::auto_ptr<SipMessage>(next));
         next = 0; // DON'T delete next; consumed by send above
         outstanding++;
         sent++;
#ifndef WIN32
         if (sendSleepUs>0)
            usleep(sendSleepUs);
#endif
      }

      int thisseltime = 0;
      bool isStrange = pair.wait(thisseltime);
      if (isStrange)
      {
         cout << "STRANGE: Stuck for long time: "
             <<" sent="<<sent
             <<" done="<<count
             <<" thisseltime="<<thisseltime
             <<endl;
      }

      for (int i=0;i<64;++i)
      {
         static NameAddr contact;

         ++rxReqTryCnt;
         SipMessage* request = pair.mReceiver->receive();
          if (request==NULL)
             break;
         ++rxReqHitCnt;
         assert(request->isRequest());
         SipMessage response;
         switch (request->header(h_RequestLine).getMethod())
         {
            case INVITE:
            {
               DeprecatedDialog dlg(contact);
               dlg.makeResponse(*request, response, 180);
               pair.mReceiver->send(response);
               dlg.makeResponse(*request, response, 200);
               pair.mReceiver->send(response);
               break;
            }

            case ACK:
               break;

            case BYE:
               Helper::makeResponse(response, *request, 200);
               pair.mReceiver->send(response);
               break;

            case REGISTER:
               Helper::makeResponse(response, *request, 200);
               pair.mReceiver->send(response);
               break;
            default:
               assert(0);
               break;
         }
         delete request;
      }

      for (int i=0;i<64;++i)
      {
         ++rxRspTryCnt;
         SipMessage* response = pair.mSender->receive();
         if (response==NULL)
            break;
         ++rxRspHitCnt;
         assert(response->isResponse());
         switch(response->header(h_CSeq).method())
         {
            case REGISTER:
               outstanding--;
               if (response->header(h_StatusLine).statusCode() == 200)
               {
                  count++;
               }
               else
               {
                  --sent;
               }
               break;

            case INVITE:
               if (response->header(h_StatusLine).statusCode() == 200)
               {
                  outstanding--;
                  count++;

                  DeprecatedDialog dlg(contact);
                  dlg.createDialogAsUAC(*response);
                  SipMessage* ack = dlg.makeAck();
                  pair.mSender->send(*ack);
                  delete ack;

                  SipMessage* bye = dlg.makeBye();
                  pair.mSender->send(*bye);
                  delete bye;
               }
               break;

            case BYE:
               break;

            default:
               assert(0);
               break;
         }

         delete response;
      }
   }
   InfoLog (<< "Finished " << count << " runs");

   UInt64 elapsed = Timer::getTimeMs() - startTime;
   if (!invite)
   {
      cout << runs << " registrations performed in " << elapsed << " ms, a rate of "
           << runs / ((float) elapsed / 1000.0) << " transactions per second." << endl;
   }
   else
   {
      cout << runs << " calls performed in " << elapsed << " ms, a rate of "
           << runs / ((float) elapsed / 1000.0) << " calls per second." << endl;
   }
   if ( verbose )
   {
      cout << "Note: this test runs both sides (client and server)" << endl;

      cout << "RxCnts: "
        <<" Req="<<rxReqHitCnt<<"/"<<rxReqTryCnt
        <<" ("<<(rxReqHitCnt*100/rxReqTryCnt)<<"%)"
        <<" Rsp="<<rxRspHitCnt<<"/"<<rxRspTryCnt
        <<" ("<<(rxRspHitCnt*100/rxRspTryCnt)<<"%)"
        << endl;
   }
}

Here is the call graph for this function:

static void waitForTwoStacks ( SipStackAndThread receiver,
SipStackAndThread sender,
SelectInterruptor commonIntr,
int &  thisseltime,
bool &  isStrange 
) [static]

Definition at line 300 of file testStack.cxx.

References resip::SelectInterruptor::buildFdSet(), resip::SelectInterruptor::process(), resipMin(), and resip::FdSet::selectMilliSeconds().

Referenced by StackThreadPair::wait().

{
   FdSet fdset;
   receiver->buildFdSet(fdset);
   sender->buildFdSet(fdset);
   if ( commonIntr )
   {
      commonIntr->buildFdSet(fdset);
   }

   if ( thisseltime > 0 )
   {
      unsigned int stackMs = resipMin(
              receiver->getTimeTillNextProcessMS(),
              sender->getTimeTillNextProcessMS());
      thisseltime = resipMin((unsigned)thisseltime, stackMs);
   }
   int numReady = fdset.selectMilliSeconds(thisseltime);

   isStrange = (thisseltime > 4000 && numReady==0);
   if ( commonIntr )
   {
      commonIntr->process(fdset);
   }
   receiver->process(fdset);
   sender->process(fdset);
}

Here is the call graph for this function: