reSIProcate/repro  9694
QValueTargetHandler.cxx
Go to the documentation of this file.
00001 #include "repro/monkeys/QValueTargetHandler.hxx"
00002 
00003 
00004 #include "repro/RequestContext.hxx"
00005 #include "repro/ResponseContext.hxx"
00006 #include "repro/Proxy.hxx"
00007 #include "repro/ForkControlMessage.hxx"
00008 #include "repro/QValueTarget.hxx"
00009 
00010 #include "rutil/Data.hxx"
00011 #include "rutil/Logger.hxx"
00012 #include "repro/ProxyConfig.hxx"
00013 #include "rutil/WinLeakCheck.hxx"
00014 
00015 #define RESIPROCATE_SUBSYSTEM resip::Subsystem::REPRO
00016 
00017 namespace repro
00018 {
00019 
00020 QValueTargetHandler::QValueTargetHandler(ProxyConfig& config) : 
00021    Processor("QValueTargetHandler")
00022 {
00023    mForkBehavior=QValueTargetHandler::EQUAL_Q_PARALLEL;
00024       
00025    if(config.getConfigData("QValueBehavior", "") =="FULL_SEQUENTIAL")
00026    {
00027       mForkBehavior=QValueTargetHandler::FULL_SEQUENTIAL;
00028    }
00029    else if(config.getConfigData("QValueBehavior", "") == "FULL_PARALLEL")
00030    {
00031       mForkBehavior=QValueTargetHandler::FULL_PARALLEL;
00032    }
00033 
00034    mCancelBetweenForkGroups=config.getConfigBool("QValueCancelBetweenForkGroups", true);
00035    mWaitForTerminate=config.getConfigBool("QValueWaitForTerminateBetweenForkGroups", true);
00036    mDelayBetweenForkGroups=config.getConfigInt("QValueMsBetweenForkGroups", 3000);
00037    mCancellationDelay=config.getConfigInt("QValueMsBeforeCancel", 30000);
00038 }
00039 
00040 QValueTargetHandler::~QValueTargetHandler()
00041 {
00042 }
00043 
00044 Processor::processor_action_t
00045 QValueTargetHandler::process(RequestContext &rc)
00046 {
00047    std::vector<resip::Data> nextCancelTids;
00048    std::vector<resip::Data> nextBeginTids;
00049 
00050    Proxy* proxy=&(rc.getProxy());
00051 
00052    resip::Data tid = rc.getTransactionId();
00053    ResponseContext& rsp=rc.getResponseContext();
00054    bool shouldContinue=true;
00055    
00056    //Use this as a loop invariant.
00057    bool bail=false;
00058 
00059    resip::Message* msg = rc.getCurrentEvent();
00060    assert(msg);
00061 
00062    if(msg)
00063    {
00064       repro::ForkControlMessage* fc = dynamic_cast<repro::ForkControlMessage*>(msg);
00065       
00066       if(fc)
00067       {
00068          shouldContinue=false;
00069          std::vector<resip::Data>::iterator i;
00070          
00071          //Might we have scheduled cancellations? If so, is there anything else
00072          //worth trying? (We won't cancel stuff if there is nothing else left 
00073          //to try)
00074          if(mCancelBetweenForkGroups && rsp.hasCandidateTransactions())
00075          {
00076             std::vector<resip::Data>& cancelTids=fc->mTransactionsToCancel;
00077             for(i=cancelTids.begin();i!=cancelTids.end();i++)
00078             {
00079                //Calling cancelClientTransaction on an already cancelled
00080                //target is safe, and usually more efficient than checking
00081                //beforehand.
00082                rsp.cancelClientTransaction(*i);
00083             }
00084          }
00085          
00086          //Might we have scheduled some transactions to start?
00087          //(and if we did, should we now schedule them for cancellation later?)
00088          if(!mWaitForTerminate)
00089          {
00090             std::vector<resip::Data>& beginTids=fc->mTransactionsToProcess;
00091             for(i=beginTids.begin();i!=beginTids.end();i++)
00092             {
00093                //Calling beginClientTransaction on an already active
00094                // (or Terminated) transaction is safe, and more
00095                // efficient than checking beforehand.
00096                rsp.beginClientTransaction(*i);
00097                if(mCancelBetweenForkGroups)
00098                {            
00099                   nextCancelTids.push_back(*i);
00100                }
00101             }
00102          }
00103       }
00104       else
00105       {
00106          DebugLog(<<"No ForkControlMessage for me.");
00107       }
00108    }
00109 
00110    std::list<std::list<resip::Data> >& targetCollection = 
00111          rsp.mTransactionQueueCollection;
00112    std::list<std::list<resip::Data> >::iterator outer;
00113    std::list<std::list<resip::Data> >::iterator temp;
00114    std::list<resip::Data>::iterator inner;
00115    bool activeTargets=false;
00116    bool startedTargets=false;
00117 
00118    for(outer=targetCollection.begin(); 
00119       outer!=targetCollection.end() && !activeTargets;)
00120    {
00121       inner=outer->begin();
00122       
00123       if(inner!=outer->end() && isMyType(rsp.getTarget(*inner)))
00124       {
00125          DebugLog(<<"QValueTargetHandler: "
00126                   <<"Found a queue of QValueTargets. Are any of them active?");
00127          //Are there active targets in this queue already?
00128          bail=false;
00129          for(;inner!=outer->end() && !bail;inner++)
00130          {
00131             if(rsp.isActive(*inner))
00132             {
00133                activeTargets=true;
00134                DebugLog(<<"There are active targets. "
00135                         <<"I don't need to do anything else yet.");
00136                bail=true;
00137             }
00138          }
00139 
00140          //If no active targets, we need to start some new ones.
00141          //(and schedule their cancellation if configured to)
00142          //However, calling beginClientTransaction does not guarantee that a
00143          //target will start, since that target may be a duplicate.
00144          //So, we keep firing up target groups until something sticks, or we hit
00145          //the end of this queue.
00146          bail=false;
00147          while(!activeTargets && !bail)
00148          {
00149             DebugLog(<<"There are no active targets here. "
00150                      <<"Looking for a group to start.");
00151             std::vector<resip::Data> beginTargets;
00152             
00153             fillNextTargetGroup(beginTargets,*outer,rsp);
00154             
00155             if(beginTargets.empty())
00156             {
00157                DebugLog(<<"There are no more targets to start in this queue."
00158                         <<" Trying to find another queue to work on.");
00159                bail=true;
00160             }
00161             else
00162             {
00163                DebugLog(<<"This queue has a group of targets in it. "
00164                         <<"Trying to start this group.");
00165                std::vector<resip::Data>::iterator i;
00166                for(i=beginTargets.begin();i!=beginTargets.end();i++)
00167                {
00168                   bool success = rsp.beginClientTransaction(*i);
00169                   if(success && mCancelBetweenForkGroups)
00170                   {
00171                      nextCancelTids.push_back(*i);
00172                   }
00173                   
00174                   activeTargets |= success;
00175                   startedTargets |= success;
00176                }
00177                if(startedTargets)
00178                {
00179                   DebugLog(<<"Successfully started some targets.");
00180                }
00181                else
00182                {
00183                   DebugLog(<<"None of these Targets were valid!"
00184                            << " Moving on to another group.");
00185                }
00186             }
00187          }
00188             
00189          if(!startedTargets)
00190          {
00191             DebugLog(<< "There weren't any valid Targets in this queue!");
00192          }
00193          //If we just started some targets, and we are not supposed to wait
00194          //for these targets to terminate before beginning the next group
00195          //in this queue, we should schedule the next group now.
00196          //If there is no next group, nextBeginTids will be empty.
00197          if(startedTargets && !mWaitForTerminate)
00198          {
00199             DebugLog(<<"Now I need to schedule the next group of Targets.");
00200             fillNextTargetGroup(nextBeginTids,*outer,rsp);
00201          }
00202          
00203          //Clean up the queue we just tried
00204          removeTerminated(*outer,rsp);
00205       }
00206       
00207       if(outer->empty())
00208       {
00209          temp=outer;
00210          outer++;
00211          targetCollection.erase(temp);
00212       }
00213       else
00214       {
00215          outer++;
00216       }
00217    
00218       assert(activeTargets || !startedTargets);
00219    }
00220    
00221    //Do we have anything to schedule for later?
00222    if(!nextCancelTids.empty() || !nextBeginTids.empty())
00223    {
00224       // If the delays are equal, then put both cancel and nextBeginTids in one ForkControlMessage
00225       if(mCancellationDelay == mDelayBetweenForkGroups)  
00226       {
00227          ForkControlMessage* fork = new ForkControlMessage(*this,tid,proxy);
00228          fork->mTransactionsToProcess=nextBeginTids;
00229          fork->mTransactionsToCancel=nextCancelTids;
00230 
00231          resip::ApplicationMessage* app=
00232                            dynamic_cast<resip::ApplicationMessage*>(fork);
00233                            
00234          proxy->postMS(std::auto_ptr<resip::ApplicationMessage>(app),
00235                               mDelayBetweenForkGroups);
00236       }
00237       else // Issue two seperate ForkControlMessages
00238       {
00239          if(!nextCancelTids.empty())
00240          {
00241             ForkControlMessage* cancel = new ForkControlMessage(*this,tid,proxy);
00242             cancel->mTransactionsToCancel=nextCancelTids;
00243 
00244             resip::ApplicationMessage* app=
00245                               dynamic_cast<resip::ApplicationMessage*>(cancel);
00246 
00247             rc.getProxy().postMS(std::auto_ptr<resip::ApplicationMessage>(app),
00248                                  mCancellationDelay);
00249          }
00250          if(!nextBeginTids.empty())
00251          {
00252             ForkControlMessage* begin = new ForkControlMessage(*this,tid,proxy);
00253             begin->mTransactionsToProcess=nextBeginTids;
00254 
00255             resip::ApplicationMessage* app
00256                               =dynamic_cast<resip::ApplicationMessage*>(begin);
00257 
00258             proxy->postMS(std::auto_ptr<resip::ApplicationMessage>(app),
00259                                  mDelayBetweenForkGroups);
00260          }
00261       }
00262    }
00263 
00264    //We should not pass control on to the rest of the chain until all
00265    //of the QValueTargets have been taken care of. Also, we should not
00266    //pass control to the rest of the chain if we received a 
00267    //ForkControlMessage explicitly intended for us.
00268    //(this could confuse the other Target Processors)
00269    if(!activeTargets && shouldContinue)
00270    {
00271       return Processor::Continue;
00272    }
00273    else
00274    {
00275       return Processor::SkipAllChains;
00276    }
00277 }
00278 
00279 void
00280 QValueTargetHandler::fillNextTargetGroup(std::vector<resip::Data>& fillHere,
00281                         const std::list<resip::Data> & queue,
00282                         const ResponseContext& rsp) const
00283 {
00284    if(queue.empty())
00285    {
00286       return;
00287    }
00288 
00289    std::list<resip::Data>::const_iterator i = queue.begin();
00290    int currentQ=0;
00291    
00292    //Find the first Candidate target in the queue.
00293    for(i=queue.begin();i!=queue.end();i++)
00294    {
00295       if(rsp.isCandidate(*i))
00296       {
00297          currentQ=rsp.getTarget(*i)->getPriority();
00298          break;
00299       }
00300    }
00301 
00302    switch(mForkBehavior)
00303    {
00304       case FULL_SEQUENTIAL:
00305          if(i!=queue.end())
00306          {
00307             fillHere.push_back(*i);
00308          }
00309          break;
00310          
00311       case EQUAL_Q_PARALLEL:   
00312          while(i!=queue.end() && rsp.getTarget(*i)->getPriority()==currentQ)
00313          {
00314             fillHere.push_back(*i);
00315             i++;
00316          }
00317          break;
00318          
00319       case FULL_PARALLEL:
00320          while(i!=queue.end())
00321          {
00322             fillHere.push_back(*i);
00323             i++;
00324          }
00325          break;
00326          
00327       default:
00328          ErrLog(<<"mForkBehavior is not defined! How did this happen?");
00329    }
00330 }
00331 
00332 bool
00333 QValueTargetHandler::isMyType( Target* target) const
00334 {
00335    QValueTarget* qt = dynamic_cast<QValueTarget*>(target);
00336    if(qt)
00337    {
00338       return true;
00339    }
00340    else
00341    {
00342       return false;
00343    }
00344 }
00345 
00346 void
00347 QValueTargetHandler::removeTerminated(std::list<resip::Data> & queue,
00348                                       const ResponseContext& rsp) const
00349 {
00350    std::list<resip::Data>::iterator i = queue.begin();
00351 
00352    while(i!=queue.end())
00353    {
00354       if(rsp.isTerminated(*i))
00355       {
00356          std::list<resip::Data>::iterator temp=i;
00357          i++;
00358          queue.erase(temp);
00359       }
00360       else
00361       {
00362          i++;
00363       }
00364    }
00365 }
00366 
00367 }
00368 
00369 /* ====================================================================
00370  * The Vovida Software License, Version 1.0 
00371  * 
00372  * Copyright (c) 2000 Vovida Networks, Inc.  All rights reserved.
00373  * 
00374  * Redistribution and use in source and binary forms, with or without
00375  * modification, are permitted provided that the following conditions
00376  * are met:
00377  * 
00378  * 1. Redistributions of source code must retain the above copyright
00379  *    notice, this list of conditions and the following disclaimer.
00380  * 
00381  * 2. Redistributions in binary form must reproduce the above copyright
00382  *    notice, this list of conditions and the following disclaimer in
00383  *    the documentation and/or other materials provided with the
00384  *    distribution.
00385  * 
00386  * 3. The names "VOCAL", "Vovida Open Communication Application Library",
00387  *    and "Vovida Open Communication Application Library (VOCAL)" must
00388  *    not be used to endorse or promote products derived from this
00389  *    software without prior written permission. For written
00390  *    permission, please contact vocal@vovida.org.
00391  *
00392  * 4. Products derived from this software may not be called "VOCAL", nor
00393  *    may "VOCAL" appear in their name, without prior written
00394  *    permission of Vovida Networks, Inc.
00395  * 
00396  * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
00397  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
00398  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
00399  * NON-INFRINGEMENT ARE DISCLAIMED.  IN NO EVENT SHALL VOVIDA
00400  * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
00401  * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
00402  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00403  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00404  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
00405  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00406  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
00407  * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
00408  * DAMAGE.
00409  * 
00410  * ====================================================================
00411  */