reSIProcate/repro  9694
testDispatcher.cxx
Go to the documentation of this file.
00001 #include "repro/Worker.hxx"
00002 #include "repro/Dispatcher.hxx"
00003 
00004 #include "resip/stack/SipStack.hxx"
00005 #include "resip/stack/StackThread.hxx"
00006 #include "resip/stack/TransactionUser.hxx"
00007 #include "resip/stack/ApplicationMessage.hxx"
00008 
00009 #include "rutil/Data.hxx"
00010 
00011 #include <iostream>
00012 #include <cassert>
00013 
00014 //main is way down at the bottom.
00015 
00016 namespace test
00017 {
00018 
00019 class DummyWorkMessage : public resip::ApplicationMessage
00020 {
00021    public:
00022       DummyWorkMessage(resip::TransactionUser* passedTU, resip::Data string=resip::Data::Empty)
00023       {
00024          tu=passedTU;
00025          mTid="";
00026          mQuery=string;
00027          wait=false;
00028       }
00029       
00030       
00031       DummyWorkMessage(const DummyWorkMessage& orig)
00032       {
00033          tu=orig.tu;
00034          mTid=orig.mTid;
00035          mQuery=orig.mQuery;
00036          mResult=orig.mResult;
00037          wait=orig.wait;
00038       }
00039       
00040       virtual ~DummyWorkMessage(){}
00041       
00042       resip::Data& result()
00043       {
00044          return mResult;
00045       }
00046       
00047       resip::Data& query()
00048       {
00049          return mQuery;
00050       }
00051       
00052       virtual DummyWorkMessage* clone() const {return new DummyWorkMessage(*this);};
00053       virtual const resip::Data& getTransactionId() const {return mTid;};
00054 
00055       virtual std::ostream& encode(std::ostream& ostr) const { ostr << "DummyWorkMessage("<<mTid<<") "; return ostr; };
00056       virtual std::ostream& encodeBrief(std::ostream& ostr) const{ ostr << "DummyWorkMessage("<<mTid<<") "; return ostr; };
00057       
00058       bool wait;
00059 
00060    private:
00061       resip::Data mResult;
00062       resip::Data mQuery;
00063       resip::Data mTid;
00064 };
00065 
00066 class DummyWorker : public repro::Worker
00067 {
00068    public:
00069       DummyWorker(){}
00070       virtual ~DummyWorker(){}
00071       
00072       virtual void process(resip::ApplicationMessage* app)
00073       {
00074          DummyWorkMessage* dwm = dynamic_cast<DummyWorkMessage*>(app);
00075          if(dwm)
00076          {
00077             dwm->result()=" I got that thing you sent me.";
00078             if(dwm->wait)
00079             {
00080                sleep(1);
00081             }
00082          }
00083          
00084       }
00085       
00086       virtual DummyWorker* clone() const
00087       {
00088          return new DummyWorker;
00089       }
00090 };
00091 
00092 
00093 
00094 class DummyTU : public resip::TransactionUser
00095 {
00096    public:
00097       DummyTU(resip::SipStack* stack)
00098       {
00099          mName="DummyTU";
00100          mStack=stack;
00101          mStack->registerTransactionUser(*this);
00102          DummyWorker* dw = new DummyWorker;
00103          std::auto_ptr<repro::Worker> worker(dw);
00104          mDispatcher = new repro::Dispatcher(worker,mStack,3,false);
00105       }
00106 
00107       virtual ~DummyTU()
00108       {
00109          delete mDispatcher;
00110       }
00111 
00112       virtual void go()
00113       {
00114          assert(mDispatcher);
00115          DummyWorkMessage* lost = new DummyWorkMessage(this);
00116          
00117          //Dispatcher should not be accepting work yet
00118          assert(!mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(lost)));
00119                   
00120          mDispatcher->startAll();
00121          
00122          DummyWorkMessage* a = new DummyWorkMessage(this,"A");
00123          DummyWorkMessage* b = new DummyWorkMessage(this,"B");
00124          DummyWorkMessage* c = new DummyWorkMessage(this,"C");
00125          DummyWorkMessage* d = new DummyWorkMessage(this,"D");
00126          
00127          std::auto_ptr<resip::ApplicationMessage> aa(a);
00128          std::auto_ptr<resip::ApplicationMessage> ab(b);
00129          std::auto_ptr<resip::ApplicationMessage> ac(c);
00130          std::auto_ptr<resip::ApplicationMessage> ad(d);
00131          
00132          assert(mDispatcher->post(aa));
00133          assert(mDispatcher->post(ab));
00134          assert(mDispatcher->post(ac));
00135          assert(mDispatcher->post(ad));
00136          
00137          bool gotA=false;
00138          bool gotB=false;
00139          bool gotC=false;
00140          bool gotD=false;
00141          int i=0;
00142          resip::Message* msg;
00143          
00144          for( i=0; i<4 ;i++)
00145          {
00146             assert(msg = mFifo.getNext(1000));
00147             
00148             DummyWorkMessage* dwm = dynamic_cast<DummyWorkMessage*>(msg);
00149             
00150             assert(dwm);
00151 
00152             if(dwm->result()==" I got that thing you sent me.")
00153             {
00154                if(dwm->query()=="A")  gotA=true;
00155                if(dwm->query()=="B")  gotB=true;
00156                if(dwm->query()=="C")  gotC=true;
00157                if(dwm->query()=="D")  gotD=true;
00158             }
00159             
00160             delete dwm;
00161 
00162          }
00163          
00164          assert(i==4);
00165          assert(gotA && gotB && gotC && gotD);
00166          
00167          mDispatcher->stop();
00168          
00169          lost = new DummyWorkMessage(this,"");
00170          
00171          //We stopped the thread bank, it should not be accepting work.
00172          assert(!mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(lost)));
00173          
00174          mDispatcher->resume();
00175          
00176          DummyWorkMessage* wasteTime1 = new DummyWorkMessage(this,"Take your time.");
00177          DummyWorkMessage* wasteTime2 = new DummyWorkMessage(this,"Take your time.");
00178          DummyWorkMessage* wasteTime3 = new DummyWorkMessage(this,"Take your time.");
00179          DummyWorkMessage* wasteTime4 = new DummyWorkMessage(this,"Take your time.");
00180          wasteTime1->wait=true;
00181          wasteTime2->wait=true;
00182          wasteTime3->wait=true;
00183          wasteTime4->wait=true;
00184          
00185          assert(mDispatcher->fifoCountDepth()==0);
00186          
00187          mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(wasteTime1));
00188          mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(wasteTime2));
00189          mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(wasteTime3));
00190          mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(wasteTime4));
00191          mDispatcher->stop();
00192          
00193          usleep(1000);
00194          //Three in the thread bank, one in the queue.
00195          assert(mDispatcher->fifoCountDepth()==1);
00196          
00197          for( i=0; i<4 ;i++)
00198          {
00199             assert(msg = mFifo.getNext(2000));
00200             
00201             DummyWorkMessage* dwm = dynamic_cast<DummyWorkMessage*>(msg);
00202             
00203             assert(dwm);
00204 
00205             assert(dwm->result()==" I got that thing you sent me.");
00206             
00207             delete dwm;
00208          }
00209          
00210          assert(!(msg=mFifo.getNext(1000)));
00211          
00212          mDispatcher->resume();
00213          
00214          DummyWorkMessage* clog1 = new DummyWorkMessage(this);
00215          DummyWorkMessage* clog2 = new DummyWorkMessage(this);
00216          DummyWorkMessage* clog3 = new DummyWorkMessage(this);
00217          clog1->wait=true;
00218          clog2->wait=true;
00219          clog3->wait=true;
00220          
00221          //Three threads in the bank, each waiting for 1 sec
00222          mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(clog1));
00223          mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(clog2));
00224          mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(clog3));
00225 
00226          usleep(100);
00227          
00228          for(i=0;i<1000;i++)
00229          {
00230             DummyWorkMessage* pummel = new DummyWorkMessage(this);
00231             std::auto_ptr<resip::ApplicationMessage>batter(pummel);
00232             assert(mDispatcher->post(batter));
00233          }
00234 
00235          
00236          usleep(100000);
00237          
00238          assert(msg=mFifo.getNext(1000));
00239          
00240          delete msg;
00241          
00242          
00243          
00244          for(i=0;i<1002;i++)
00245          {
00246             msg=mFifo.getNext(10);
00247             assert(msg);
00248             delete msg;
00249          }
00250 
00251          mDispatcher->shutdownAll();
00252          
00253          
00254          lost = new DummyWorkMessage(this);
00255          assert(!(mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(lost))));
00256          
00257          
00258          
00259          delete mDispatcher;
00260          
00261          
00262          DummyWorker* dw = new DummyWorker;
00263          std::auto_ptr<repro::Worker> worker(dw);
00264          //Stack ptr set to null.
00265          mDispatcher = new repro::Dispatcher(worker,0,3,true);
00266          
00267          std::cout << "The following error message is intentional." << std::endl;
00268          
00269          lost = new DummyWorkMessage(this);
00270          assert(mDispatcher->post(std::auto_ptr<resip::ApplicationMessage>(lost)));
00271          
00272          assert(!(msg=mFifo.getNext(100)));
00273          
00274          
00275          
00276       }
00277       
00278       virtual const resip::Data& name() const
00279       {
00280          return mName;
00281       }
00282 
00283       
00284    private:
00285       repro::Dispatcher* mDispatcher;
00286       resip::SipStack* mStack;
00287       resip::Data mName;
00288 
00289 };
00290 
00291 
00292 }
00293 
00294 
00295 int
00296 main()
00297 {
00298    resip::SipStack mStack;
00299    resip::StackThread stackThread(mStack);
00300    test::DummyTU mTU(&mStack);
00301    stackThread.run();
00302    
00303    mTU.go();
00304    std::cout << "PASSED" << std::endl;
00305    stackThread.shutdown();
00306    stackThread.join();
00307    mStack.shutdown();
00308 }
00309