|
reSIProcate/repro
9694
|
00001 #include "repro/Worker.hxx" 00002 #include "repro/Dispatcher.hxx" 00003 00004 #include "resip/stack/SipStack.hxx" 00005 #include "resip/stack/StackThread.hxx" 00006 #include "resip/stack/TransactionUser.hxx" 00007 #include "resip/stack/ApplicationMessage.hxx" 00008 00009 #include "rutil/Data.hxx" 00010 00011 #include <iostream> 00012 #include <cassert> 00013 00014 //main is way down at the bottom. 00015 00016 namespace test 00017 { 00018 00019 class DummyWorkMessage : public resip::ApplicationMessage 00020 { 00021 public: 00022 DummyWorkMessage(resip::TransactionUser* passedTU, resip::Data string=resip::Data::Empty) 00023 { 00024 tu=passedTU; 00025 mTid=""; 00026 mQuery=string; 00027 wait=false; 00028 } 00029 00030 00031 DummyWorkMessage(const DummyWorkMessage& orig) 00032 { 00033 tu=orig.tu; 00034 mTid=orig.mTid; 00035 mQuery=orig.mQuery; 00036 mResult=orig.mResult; 00037 wait=orig.wait; 00038 } 00039 00040 virtual ~DummyWorkMessage(){} 00041 00042 resip::Data& result() 00043 { 00044 return mResult; 00045 } 00046 00047 resip::Data& query() 00048 { 00049 return mQuery; 00050 } 00051 00052 virtual DummyWorkMessage* clone() const {return new DummyWorkMessage(*this);}; 00053 virtual const resip::Data& getTransactionId() const {return mTid;}; 00054 00055 virtual std::ostream& encode(std::ostream& ostr) const { ostr << "DummyWorkMessage("<<mTid<<") "; return ostr; }; 00056 virtual std::ostream& encodeBrief(std::ostream& ostr) const{ ostr << "DummyWorkMessage("<<mTid<<") "; return ostr; }; 00057 00058 bool wait; 00059 00060 private: 00061 resip::Data mResult; 00062 resip::Data mQuery; 00063 resip::Data mTid; 00064 }; 00065 00066 class DummyWorker : public repro::Worker 00067 { 00068 public: 00069 DummyWorker(){} 00070 virtual ~DummyWorker(){} 00071 00072 virtual void process(resip::ApplicationMessage* app) 00073 { 00074 DummyWorkMessage* dwm = dynamic_cast<DummyWorkMessage*>(app); 00075 if(dwm) 00076 { 00077 dwm->result()=" I got that thing you sent me."; 00078 if(dwm->wait) 00079 { 00080 sleep(1); 00081 } 00082 } 00083 00084 } 00085 00086 virtual DummyWorker* clone() const 00087 { 00088 return new DummyWorker; 00089 } 00090 }; 00091 00092 00093 00094 class DummyTU : public resip::TransactionUser 00095 { 00096 public: 00097 DummyTU(resip::SipStack* stack) 00098 { 00099 mName="DummyTU"; 00100 mStack=stack; 00101 mStack->registerTransactionUser(*this); 00102 DummyWorker* dw = new DummyWorker; 00103 std::auto_ptr<repro::Worker> worker(dw); 00104 mDispatcher = new repro::Dispatcher(worker,mStack,3,false); 00105 } 00106 00107 virtual ~DummyTU() 00108 { 00109 delete mDispatcher; 00110 } 00111 00112 virtual void go() 00113 { 00114 assert(mDispatcher); 00115 DummyWorkMessage* lost = new DummyWorkMessage(this); 00116 00117 //Dispatcher should not be accepting work yet 00118 assert(!mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(lost))); 00119 00120 mDispatcher->startAll(); 00121 00122 DummyWorkMessage* a = new DummyWorkMessage(this,"A"); 00123 DummyWorkMessage* b = new DummyWorkMessage(this,"B"); 00124 DummyWorkMessage* c = new DummyWorkMessage(this,"C"); 00125 DummyWorkMessage* d = new DummyWorkMessage(this,"D"); 00126 00127 std::auto_ptr<resip::ApplicationMessage> aa(a); 00128 std::auto_ptr<resip::ApplicationMessage> ab(b); 00129 std::auto_ptr<resip::ApplicationMessage> ac(c); 00130 std::auto_ptr<resip::ApplicationMessage> ad(d); 00131 00132 assert(mDispatcher->post(aa)); 00133 assert(mDispatcher->post(ab)); 00134 assert(mDispatcher->post(ac)); 00135 assert(mDispatcher->post(ad)); 00136 00137 bool gotA=false; 00138 bool gotB=false; 00139 bool gotC=false; 00140 bool gotD=false; 00141 int i=0; 00142 resip::Message* msg; 00143 00144 for( i=0; i<4 ;i++) 00145 { 00146 assert(msg = mFifo.getNext(1000)); 00147 00148 DummyWorkMessage* dwm = dynamic_cast<DummyWorkMessage*>(msg); 00149 00150 assert(dwm); 00151 00152 if(dwm->result()==" I got that thing you sent me.") 00153 { 00154 if(dwm->query()=="A") gotA=true; 00155 if(dwm->query()=="B") gotB=true; 00156 if(dwm->query()=="C") gotC=true; 00157 if(dwm->query()=="D") gotD=true; 00158 } 00159 00160 delete dwm; 00161 00162 } 00163 00164 assert(i==4); 00165 assert(gotA && gotB && gotC && gotD); 00166 00167 mDispatcher->stop(); 00168 00169 lost = new DummyWorkMessage(this,""); 00170 00171 //We stopped the thread bank, it should not be accepting work. 00172 assert(!mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(lost))); 00173 00174 mDispatcher->resume(); 00175 00176 DummyWorkMessage* wasteTime1 = new DummyWorkMessage(this,"Take your time."); 00177 DummyWorkMessage* wasteTime2 = new DummyWorkMessage(this,"Take your time."); 00178 DummyWorkMessage* wasteTime3 = new DummyWorkMessage(this,"Take your time."); 00179 DummyWorkMessage* wasteTime4 = new DummyWorkMessage(this,"Take your time."); 00180 wasteTime1->wait=true; 00181 wasteTime2->wait=true; 00182 wasteTime3->wait=true; 00183 wasteTime4->wait=true; 00184 00185 assert(mDispatcher->fifoCountDepth()==0); 00186 00187 mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(wasteTime1)); 00188 mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(wasteTime2)); 00189 mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(wasteTime3)); 00190 mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(wasteTime4)); 00191 mDispatcher->stop(); 00192 00193 usleep(1000); 00194 //Three in the thread bank, one in the queue. 00195 assert(mDispatcher->fifoCountDepth()==1); 00196 00197 for( i=0; i<4 ;i++) 00198 { 00199 assert(msg = mFifo.getNext(2000)); 00200 00201 DummyWorkMessage* dwm = dynamic_cast<DummyWorkMessage*>(msg); 00202 00203 assert(dwm); 00204 00205 assert(dwm->result()==" I got that thing you sent me."); 00206 00207 delete dwm; 00208 } 00209 00210 assert(!(msg=mFifo.getNext(1000))); 00211 00212 mDispatcher->resume(); 00213 00214 DummyWorkMessage* clog1 = new DummyWorkMessage(this); 00215 DummyWorkMessage* clog2 = new DummyWorkMessage(this); 00216 DummyWorkMessage* clog3 = new DummyWorkMessage(this); 00217 clog1->wait=true; 00218 clog2->wait=true; 00219 clog3->wait=true; 00220 00221 //Three threads in the bank, each waiting for 1 sec 00222 mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(clog1)); 00223 mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(clog2)); 00224 mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(clog3)); 00225 00226 usleep(100); 00227 00228 for(i=0;i<1000;i++) 00229 { 00230 DummyWorkMessage* pummel = new DummyWorkMessage(this); 00231 std::auto_ptr<resip::ApplicationMessage>batter(pummel); 00232 assert(mDispatcher->post(batter)); 00233 } 00234 00235 00236 usleep(100000); 00237 00238 assert(msg=mFifo.getNext(1000)); 00239 00240 delete msg; 00241 00242 00243 00244 for(i=0;i<1002;i++) 00245 { 00246 msg=mFifo.getNext(10); 00247 assert(msg); 00248 delete msg; 00249 } 00250 00251 mDispatcher->shutdownAll(); 00252 00253 00254 lost = new DummyWorkMessage(this); 00255 assert(!(mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(lost)))); 00256 00257 00258 00259 delete mDispatcher; 00260 00261 00262 DummyWorker* dw = new DummyWorker; 00263 std::auto_ptr<repro::Worker> worker(dw); 00264 //Stack ptr set to null. 00265 mDispatcher = new repro::Dispatcher(worker,0,3,true); 00266 00267 std::cout << "The following error message is intentional." << std::endl; 00268 00269 lost = new DummyWorkMessage(this); 00270 assert(mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(lost))); 00271 00272 assert(!(msg=mFifo.getNext(100))); 00273 00274 00275 00276 } 00277 00278 virtual const resip::Data& name() const 00279 { 00280 return mName; 00281 } 00282 00283 00284 private: 00285 repro::Dispatcher* mDispatcher; 00286 resip::SipStack* mStack; 00287 resip::Data mName; 00288 00289 }; 00290 00291 00292 } 00293 00294 00295 int 00296 main() 00297 { 00298 resip::SipStack mStack; 00299 resip::StackThread stackThread(mStack); 00300 test::DummyTU mTU(&mStack); 00301 stackThread.run(); 00302 00303 mTU.go(); 00304 std::cout << "PASSED" << std::endl; 00305 stackThread.shutdown(); 00306 stackThread.join(); 00307 mStack.shutdown(); 00308 } 00309
1.7.5.1