|
reSIProcate/repro
9694
|
00001 #include "repro/monkeys/QValueTargetHandler.hxx" 00002 00003 00004 #include "repro/RequestContext.hxx" 00005 #include "repro/ResponseContext.hxx" 00006 #include "repro/Proxy.hxx" 00007 #include "repro/ForkControlMessage.hxx" 00008 #include "repro/QValueTarget.hxx" 00009 00010 #include "rutil/Data.hxx" 00011 #include "rutil/Logger.hxx" 00012 #include "repro/ProxyConfig.hxx" 00013 #include "rutil/WinLeakCheck.hxx" 00014 00015 #define RESIPROCATE_SUBSYSTEM resip::Subsystem::REPRO 00016 00017 namespace repro 00018 { 00019 00020 QValueTargetHandler::QValueTargetHandler(ProxyConfig& config) : 00021 Processor("QValueTargetHandler") 00022 { 00023 mForkBehavior=QValueTargetHandler::EQUAL_Q_PARALLEL; 00024 00025 if(config.getConfigData("QValueBehavior", "") =="FULL_SEQUENTIAL") 00026 { 00027 mForkBehavior=QValueTargetHandler::FULL_SEQUENTIAL; 00028 } 00029 else if(config.getConfigData("QValueBehavior", "") == "FULL_PARALLEL") 00030 { 00031 mForkBehavior=QValueTargetHandler::FULL_PARALLEL; 00032 } 00033 00034 mCancelBetweenForkGroups=config.getConfigBool("QValueCancelBetweenForkGroups", true); 00035 mWaitForTerminate=config.getConfigBool("QValueWaitForTerminateBetweenForkGroups", true); 00036 mDelayBetweenForkGroups=config.getConfigInt("QValueMsBetweenForkGroups", 3000); 00037 mCancellationDelay=config.getConfigInt("QValueMsBeforeCancel", 30000); 00038 } 00039 00040 QValueTargetHandler::~QValueTargetHandler() 00041 { 00042 } 00043 00044 Processor::processor_action_t 00045 QValueTargetHandler::process(RequestContext &rc) 00046 { 00047 std::vector<resip::Data> nextCancelTids; 00048 std::vector<resip::Data> nextBeginTids; 00049 00050 Proxy* proxy=&(rc.getProxy()); 00051 00052 resip::Data tid = rc.getTransactionId(); 00053 ResponseContext& rsp=rc.getResponseContext(); 00054 bool shouldContinue=true; 00055 00056 //Use this as a loop invariant. 00057 bool bail=false; 00058 00059 resip::Message* msg = rc.getCurrentEvent(); 00060 assert(msg); 00061 00062 if(msg) 00063 { 00064 repro::ForkControlMessage* fc = dynamic_cast<repro::ForkControlMessage*>(msg); 00065 00066 if(fc) 00067 { 00068 shouldContinue=false; 00069 std::vector<resip::Data>::iterator i; 00070 00071 //Might we have scheduled cancellations? If so, is there anything else 00072 //worth trying? (We won't cancel stuff if there is nothing else left 00073 //to try) 00074 if(mCancelBetweenForkGroups && rsp.hasCandidateTransactions()) 00075 { 00076 std::vector<resip::Data>& cancelTids=fc->mTransactionsToCancel; 00077 for(i=cancelTids.begin();i!=cancelTids.end();i++) 00078 { 00079 //Calling cancelClientTransaction on an already cancelled 00080 //target is safe, and usually more efficient than checking 00081 //beforehand. 00082 rsp.cancelClientTransaction(*i); 00083 } 00084 } 00085 00086 //Might we have scheduled some transactions to start? 00087 //(and if we did, should we now schedule them for cancellation later?) 00088 if(!mWaitForTerminate) 00089 { 00090 std::vector<resip::Data>& beginTids=fc->mTransactionsToProcess; 00091 for(i=beginTids.begin();i!=beginTids.end();i++) 00092 { 00093 //Calling beginClientTransaction on an already active 00094 // (or Terminated) transaction is safe, and more 00095 // efficient than checking beforehand. 00096 rsp.beginClientTransaction(*i); 00097 if(mCancelBetweenForkGroups) 00098 { 00099 nextCancelTids.push_back(*i); 00100 } 00101 } 00102 } 00103 } 00104 else 00105 { 00106 DebugLog(<<"No ForkControlMessage for me."); 00107 } 00108 } 00109 00110 std::list<std::list<resip::Data> >& targetCollection = 00111 rsp.mTransactionQueueCollection; 00112 std::list<std::list<resip::Data> >::iterator outer; 00113 std::list<std::list<resip::Data> >::iterator temp; 00114 std::list<resip::Data>::iterator inner; 00115 bool activeTargets=false; 00116 bool startedTargets=false; 00117 00118 for(outer=targetCollection.begin(); 00119 outer!=targetCollection.end() && !activeTargets;) 00120 { 00121 inner=outer->begin(); 00122 00123 if(inner!=outer->end() && isMyType(rsp.getTarget(*inner))) 00124 { 00125 DebugLog(<<"QValueTargetHandler: " 00126 <<"Found a queue of QValueTargets. Are any of them active?"); 00127 //Are there active targets in this queue already? 00128 bail=false; 00129 for(;inner!=outer->end() && !bail;inner++) 00130 { 00131 if(rsp.isActive(*inner)) 00132 { 00133 activeTargets=true; 00134 DebugLog(<<"There are active targets. " 00135 <<"I don't need to do anything else yet."); 00136 bail=true; 00137 } 00138 } 00139 00140 //If no active targets, we need to start some new ones. 00141 //(and schedule their cancellation if configured to) 00142 //However, calling beginClientTransaction does not guarantee that a 00143 //target will start, since that target may be a duplicate. 00144 //So, we keep firing up target groups until something sticks, or we hit 00145 //the end of this queue. 00146 bail=false; 00147 while(!activeTargets && !bail) 00148 { 00149 DebugLog(<<"There are no active targets here. " 00150 <<"Looking for a group to start."); 00151 std::vector<resip::Data> beginTargets; 00152 00153 fillNextTargetGroup(beginTargets,*outer,rsp); 00154 00155 if(beginTargets.empty()) 00156 { 00157 DebugLog(<<"There are no more targets to start in this queue." 00158 <<" Trying to find another queue to work on."); 00159 bail=true; 00160 } 00161 else 00162 { 00163 DebugLog(<<"This queue has a group of targets in it. " 00164 <<"Trying to start this group."); 00165 std::vector<resip::Data>::iterator i; 00166 for(i=beginTargets.begin();i!=beginTargets.end();i++) 00167 { 00168 bool success = rsp.beginClientTransaction(*i); 00169 if(success && mCancelBetweenForkGroups) 00170 { 00171 nextCancelTids.push_back(*i); 00172 } 00173 00174 activeTargets |= success; 00175 startedTargets |= success; 00176 } 00177 if(startedTargets) 00178 { 00179 DebugLog(<<"Successfully started some targets."); 00180 } 00181 else 00182 { 00183 DebugLog(<<"None of these Targets were valid!" 00184 << " Moving on to another group."); 00185 } 00186 } 00187 } 00188 00189 if(!startedTargets) 00190 { 00191 DebugLog(<< "There weren't any valid Targets in this queue!"); 00192 } 00193 //If we just started some targets, and we are not supposed to wait 00194 //for these targets to terminate before beginning the next group 00195 //in this queue, we should schedule the next group now. 00196 //If there is no next group, nextBeginTids will be empty. 00197 if(startedTargets && !mWaitForTerminate) 00198 { 00199 DebugLog(<<"Now I need to schedule the next group of Targets."); 00200 fillNextTargetGroup(nextBeginTids,*outer,rsp); 00201 } 00202 00203 //Clean up the queue we just tried 00204 removeTerminated(*outer,rsp); 00205 } 00206 00207 if(outer->empty()) 00208 { 00209 temp=outer; 00210 outer++; 00211 targetCollection.erase(temp); 00212 } 00213 else 00214 { 00215 outer++; 00216 } 00217 00218 assert(activeTargets || !startedTargets); 00219 } 00220 00221 //Do we have anything to schedule for later? 00222 if(!nextCancelTids.empty() || !nextBeginTids.empty()) 00223 { 00224 // If the delays are equal, then put both cancel and nextBeginTids in one ForkControlMessage 00225 if(mCancellationDelay == mDelayBetweenForkGroups) 00226 { 00227 ForkControlMessage* fork = new ForkControlMessage(*this,tid,proxy); 00228 fork->mTransactionsToProcess=nextBeginTids; 00229 fork->mTransactionsToCancel=nextCancelTids; 00230 00231 resip::ApplicationMessage* app= 00232 dynamic_cast<resip::ApplicationMessage*>(fork); 00233 00234 proxy->postMS(std::auto_ptr<resip::ApplicationMessage>(app), 00235 mDelayBetweenForkGroups); 00236 } 00237 else // Issue two seperate ForkControlMessages 00238 { 00239 if(!nextCancelTids.empty()) 00240 { 00241 ForkControlMessage* cancel = new ForkControlMessage(*this,tid,proxy); 00242 cancel->mTransactionsToCancel=nextCancelTids; 00243 00244 resip::ApplicationMessage* app= 00245 dynamic_cast<resip::ApplicationMessage*>(cancel); 00246 00247 rc.getProxy().postMS(std::auto_ptr<resip::ApplicationMessage>(app), 00248 mCancellationDelay); 00249 } 00250 if(!nextBeginTids.empty()) 00251 { 00252 ForkControlMessage* begin = new ForkControlMessage(*this,tid,proxy); 00253 begin->mTransactionsToProcess=nextBeginTids; 00254 00255 resip::ApplicationMessage* app 00256 =dynamic_cast<resip::ApplicationMessage*>(begin); 00257 00258 proxy->postMS(std::auto_ptr<resip::ApplicationMessage>(app), 00259 mDelayBetweenForkGroups); 00260 } 00261 } 00262 } 00263 00264 //We should not pass control on to the rest of the chain until all 00265 //of the QValueTargets have been taken care of. Also, we should not 00266 //pass control to the rest of the chain if we received a 00267 //ForkControlMessage explicitly intended for us. 00268 //(this could confuse the other Target Processors) 00269 if(!activeTargets && shouldContinue) 00270 { 00271 return Processor::Continue; 00272 } 00273 else 00274 { 00275 return Processor::SkipAllChains; 00276 } 00277 } 00278 00279 void 00280 QValueTargetHandler::fillNextTargetGroup(std::vector<resip::Data>& fillHere, 00281 const std::list<resip::Data> & queue, 00282 const ResponseContext& rsp) const 00283 { 00284 if(queue.empty()) 00285 { 00286 return; 00287 } 00288 00289 std::list<resip::Data>::const_iterator i = queue.begin(); 00290 int currentQ=0; 00291 00292 //Find the first Candidate target in the queue. 00293 for(i=queue.begin();i!=queue.end();i++) 00294 { 00295 if(rsp.isCandidate(*i)) 00296 { 00297 currentQ=rsp.getTarget(*i)->getPriority(); 00298 break; 00299 } 00300 } 00301 00302 switch(mForkBehavior) 00303 { 00304 case FULL_SEQUENTIAL: 00305 if(i!=queue.end()) 00306 { 00307 fillHere.push_back(*i); 00308 } 00309 break; 00310 00311 case EQUAL_Q_PARALLEL: 00312 while(i!=queue.end() && rsp.getTarget(*i)->getPriority()==currentQ) 00313 { 00314 fillHere.push_back(*i); 00315 i++; 00316 } 00317 break; 00318 00319 case FULL_PARALLEL: 00320 while(i!=queue.end()) 00321 { 00322 fillHere.push_back(*i); 00323 i++; 00324 } 00325 break; 00326 00327 default: 00328 ErrLog(<<"mForkBehavior is not defined! How did this happen?"); 00329 } 00330 } 00331 00332 bool 00333 QValueTargetHandler::isMyType( Target* target) const 00334 { 00335 QValueTarget* qt = dynamic_cast<QValueTarget*>(target); 00336 if(qt) 00337 { 00338 return true; 00339 } 00340 else 00341 { 00342 return false; 00343 } 00344 } 00345 00346 void 00347 QValueTargetHandler::removeTerminated(std::list<resip::Data> & queue, 00348 const ResponseContext& rsp) const 00349 { 00350 std::list<resip::Data>::iterator i = queue.begin(); 00351 00352 while(i!=queue.end()) 00353 { 00354 if(rsp.isTerminated(*i)) 00355 { 00356 std::list<resip::Data>::iterator temp=i; 00357 i++; 00358 queue.erase(temp); 00359 } 00360 else 00361 { 00362 i++; 00363 } 00364 } 00365 } 00366 00367 } 00368 00369 /* ==================================================================== 00370 * The Vovida Software License, Version 1.0 00371 * 00372 * Copyright (c) 2000 Vovida Networks, Inc. All rights reserved. 00373 * 00374 * Redistribution and use in source and binary forms, with or without 00375 * modification, are permitted provided that the following conditions 00376 * are met: 00377 * 00378 * 1. Redistributions of source code must retain the above copyright 00379 * notice, this list of conditions and the following disclaimer. 00380 * 00381 * 2. Redistributions in binary form must reproduce the above copyright 00382 * notice, this list of conditions and the following disclaimer in 00383 * the documentation and/or other materials provided with the 00384 * distribution. 00385 * 00386 * 3. The names "VOCAL", "Vovida Open Communication Application Library", 00387 * and "Vovida Open Communication Application Library (VOCAL)" must 00388 * not be used to endorse or promote products derived from this 00389 * software without prior written permission. For written 00390 * permission, please contact vocal@vovida.org. 00391 * 00392 * 4. Products derived from this software may not be called "VOCAL", nor 00393 * may "VOCAL" appear in their name, without prior written 00394 * permission of Vovida Networks, Inc. 00395 * 00396 * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED 00397 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 00398 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND 00399 * NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA 00400 * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES 00401 * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL, 00402 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00403 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00404 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 00405 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00406 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE 00407 * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 00408 * DAMAGE. 00409 * 00410 * ==================================================================== 00411 */
1.7.5.1