reSIProcate/rutil  9694
testFifo.cxx
Go to the documentation of this file.
00001 #include <iostream>
00002 #include "rutil/Log.hxx"
00003 #include "rutil/Fifo.hxx"
00004 #include "rutil/FiniteFifo.hxx"
00005 #include "rutil/TimeLimitFifo.hxx"
00006 #include "rutil/Data.hxx"
00007 #include "rutil/ThreadIf.hxx"
00008 #include "rutil/Timer.hxx"
00009 #ifndef WIN32
00010 #include <unistd.h>
00011 #endif
00012 
00013 //#define VERBOSE
00014 
00015 using namespace resip;
00016 using namespace std;
00017 
00018 void sleepMS(unsigned int ms)
00019 {
00020 #ifdef WIN32
00021    Sleep(ms);
00022 #else
00023    usleep(ms*1000);
00024 #endif
00025 }
00026 
00027 class Foo
00028 {
00029    public:
00030       Foo(const Data& val)
00031          : mVal(val)
00032       {}
00033 
00034       Data mVal;
00035 };
00036 
00037 class Consumer: public ThreadIf
00038 {
00039   public: 
00040       Consumer(TimeLimitFifo<Foo>&);
00041       virtual ~Consumer() 
00042       {
00043 #ifdef VERBOSE
00044          cerr << "Consumer thread finishing..." << endl;
00045 #endif
00046          shutdown();
00047          join();
00048 #ifdef VERBOSE
00049          cerr << "Consumer thread finished" << endl;
00050 #endif
00051       };
00052 
00053       void thread();
00054 
00055    private:
00056       TimeLimitFifo<Foo>& mFifo;
00057 };
00058 
00059 class Producer: public ThreadIf
00060 {
00061   public: 
00062       Producer(TimeLimitFifo<Foo>&);
00063       virtual ~Producer() 
00064       {
00065 #ifdef VERBOSE
00066          cerr << "Producer thread finishing" << endl;
00067 #endif
00068          shutdown();
00069          join();
00070 #ifdef VERBOSE
00071          cerr << "Producer thread finished" << endl;
00072 #endif
00073       }
00074 
00075       void thread();
00076 
00077    private:
00078       TimeLimitFifo<Foo>& mFifo;
00079 };
00080 
00081 Consumer::Consumer(TimeLimitFifo<Foo>& f) :
00082    mFifo(f)
00083 {}
00084 
00085 void Consumer::thread()
00086 {
00087     static unsigned wakeups[6] = { 10, 20, 30, 0, 10, 30 };
00088     unsigned int w = 0;
00089 
00090 #ifdef VERBOSE
00091     cerr << "Consumer running..." << endl;
00092 #endif
00093 
00094     while (!mShutdown) 
00095     {
00096        if (mFifo.messageAvailable())
00097        {
00098           delete mFifo.getNext(100);
00099        }
00100        else
00101        {
00102           unsigned wakeup = wakeups[w];
00103           w = (w + 1) % 6;
00104 #ifdef VERBOSE
00105           cerr << "Consumer sleeping for " << wakeup << " ms with mSize " << mFifo.size() << endl;
00106 #endif
00107           if (wakeup > 0)
00108           {
00109              sleepMS(wakeup);
00110           }
00111        }
00112     }
00113 }
00114 
00115 Producer::Producer(TimeLimitFifo<Foo>& f) :
00116    mFifo(f)
00117 {}
00118 
00119 void Producer::thread()
00120 {
00121    static unsigned wakeups[6] = { 0, 10, 0, 20, 30, 10 };
00122    unsigned int w = 0;
00123 
00124 #ifdef VERBOSE
00125    cerr << "Producer running..." << endl;
00126 #endif
00127 
00128    for (unsigned long n = 0; n < 0x1ffff; n++) 
00129    {
00130       if (mFifo.wouldAccept(TimeLimitFifo<Foo>::EnforceTimeDepth))
00131       {
00132          mFifo.add(new Foo(Data(n)), TimeLimitFifo<Foo>::EnforceTimeDepth);
00133       }
00134       else
00135       {
00136          unsigned wakeup = wakeups[w];
00137          w = (w + 1) % 6;
00138 #ifdef VERBOSE
00139          cerr << "Producer sleeping for " << wakeup << " ms at " << n << " with mSize " << mFifo.size() << endl;
00140 #endif
00141          if (wakeup > 0)
00142          {
00143             sleepMS(wakeup);
00144          }
00145       }
00146    }
00147 }
00148 
00149 bool
00150 isNear(int value, int reference, int epsilon=250)
00151 {
00152    int diff = ::abs(value-reference);
00153    return (diff < epsilon);
00154 }
00155 
00156 int
00157 main()
00158 {
00159    Log::initialize(Log::Cout, Log::Debug, Data::Empty);
00160    
00161    {
00162       cerr << "!! test getNext(ms) empty fifo timing" << endl;
00163       Fifo<Foo> fifo;
00164       UInt64 begin(Timer::getTimeMs());
00165       fifo.getNext(2000);
00166       UInt64 end(Timer::getTimeMs());
00167       cerr << begin << " " << end << " " << end-begin << endl;      
00168       
00169       int offMark = 2000 - (end - begin);
00170       
00171       assert(abs(offMark) < 200);
00172    }
00173 
00174    Fifo<Foo> f;
00175    FiniteFifo<Foo> ff(5);
00176 
00177    {
00178       cerr << "!! test basic" << endl;
00179       
00180       bool c;
00181       TimeLimitFifo<Foo> tlf(5, 10); // 5 seconds or 10 count limit
00182 
00183       assert(tlf.empty());
00184       assert(tlf.size() == 0);
00185       assert(tlf.timeDepth() == 0);
00186 
00187       c = tlf.add(new Foo("first"), TimeLimitFifo<Foo>::EnforceTimeDepth);
00188       assert(c);
00189 
00190 #ifdef VERBOSE
00191       cerr << __LINE__ << endl;
00192 #endif
00193 
00194       assert(!tlf.empty());
00195       assert(tlf.size() == 1);
00196 #ifdef VERBOSE
00197       cerr << tlf.timeDepth() << endl;
00198 #endif
00199       assert(tlf.timeDepth() == 0);
00200 
00201 #ifdef VERBOSE
00202       cerr << __LINE__ << endl;
00203 #endif
00204 
00205       sleepMS(2000);
00206 
00207       assert(!tlf.empty());
00208       assert(tlf.size() == 1);
00209       assert(tlf.timeDepth() > 1);
00210 
00211       delete tlf.getNext();
00212 
00213       assert(tlf.empty());
00214       assert(tlf.size() == 0);
00215       assert(tlf.timeDepth() == 0);
00216 
00217 #ifdef VERBOSE
00218       cerr << __LINE__ << endl;
00219 #endif
00220 
00221       c = tlf.add(new Foo("first"), TimeLimitFifo<Foo>::EnforceTimeDepth);
00222       assert(c);
00223       sleepMS(3000);
00224       c = tlf.add(new Foo("second"), TimeLimitFifo<Foo>::EnforceTimeDepth);
00225       assert(c);
00226       sleepMS(3000);
00227       c = tlf.add(new Foo("nope"), TimeLimitFifo<Foo>::EnforceTimeDepth);
00228       assert(!c);
00229       c = tlf.add(new Foo("yep"), TimeLimitFifo<Foo>::IgnoreTimeDepth);
00230       assert(c);
00231       c = tlf.add(new Foo("internal"), TimeLimitFifo<Foo>::InternalElement);
00232       assert(c);
00233 
00234       cerr << __LINE__ << endl;
00235 
00236       Foo* fp = tlf.getNext();
00237       assert(fp->mVal == "first");
00238       delete fp;
00239       c = tlf.add(new Foo("third"), TimeLimitFifo<Foo>::EnforceTimeDepth);
00240       assert(c);
00241    }
00242 
00243    {
00244       cerr << "!! Test time depth" << endl;
00245 
00246       TimeLimitFifo<Foo> tlfNS(5, 0); // 5 seconds, no count limit
00247       bool c;
00248 
00249       assert(tlfNS.empty());
00250       assert(tlfNS.size() == 0);
00251       assert(tlfNS.timeDepth() == 0);
00252 
00253       c = tlfNS.add(new Foo("first"), TimeLimitFifo<Foo>::EnforceTimeDepth);
00254       assert(c);
00255       sleepMS(3000);
00256       c = tlfNS.add(new Foo("second"), TimeLimitFifo<Foo>::EnforceTimeDepth);
00257       assert(c);
00258       sleepMS(3000);
00259       c = tlfNS.add(new Foo("nope"), TimeLimitFifo<Foo>::EnforceTimeDepth);
00260       assert(!c);
00261       Foo* fp = tlfNS.getNext();
00262       assert(fp->mVal == "first");
00263       delete fp;
00264       c = tlfNS.add(new Foo("third"), TimeLimitFifo<Foo>::EnforceTimeDepth);
00265       assert(c);
00266    }
00267 
00268    {
00269       TimeLimitFifo<Foo> tlfNS(5, 0); // 5 seconds, no count limit
00270       bool c;
00271 
00272       assert(tlfNS.empty());
00273       assert(tlfNS.size() == 0);
00274       assert(tlfNS.timeDepth() == 0);
00275 
00276       for (int i = 0; i < 100; ++i)
00277       {
00278          c = tlfNS.add(new Foo(Data("element") + Data(i)), TimeLimitFifo<Foo>::EnforceTimeDepth);
00279          assert(c);
00280       }
00281 
00282       sleepMS(6000);
00283       c = tlfNS.add(new Foo("nope"), TimeLimitFifo<Foo>::EnforceTimeDepth);
00284       assert(!c);
00285 
00286       c = tlfNS.add(new Foo("yep"), TimeLimitFifo<Foo>::IgnoreTimeDepth);
00287       assert(c);
00288 
00289       assert(tlfNS.size() == 101);
00290 
00291       while (!tlfNS.empty())
00292       {
00293          delete tlfNS.getNext();
00294       }
00295 
00296       c = tlfNS.add(new Foo("first"), TimeLimitFifo<Foo>::EnforceTimeDepth);
00297       assert(c);
00298    }
00299 
00300    {
00301       cerr << "!! Test reserved" << endl;
00302 
00303       TimeLimitFifo<Foo> tlfNS(5, 10); // 5 seconds, limit 10 (2 reserved)
00304       bool c;
00305 
00306       assert(tlfNS.empty());
00307       assert(tlfNS.size() == 0);
00308       assert(tlfNS.timeDepth() == 0);
00309 
00310       for (int i = 0; i < 8; ++i)
00311       {
00312          c = tlfNS.add(new Foo(Data("element") + Data(i)), TimeLimitFifo<Foo>::EnforceTimeDepth);
00313          assert(c);
00314       }
00315 
00316       c = tlfNS.add(new Foo("nope"), TimeLimitFifo<Foo>::IgnoreTimeDepth);
00317       assert(!c);
00318 
00319       assert(tlfNS.size() == 8);
00320  
00321       c = tlfNS.add(new Foo("yep"), TimeLimitFifo<Foo>::InternalElement);
00322       assert(c);
00323 
00324       c = tlfNS.add(new Foo("yepAgain"), TimeLimitFifo<Foo>::InternalElement);
00325       assert(c);
00326 
00327       c = tlfNS.add(new Foo("hard nope!"), TimeLimitFifo<Foo>::InternalElement);
00328       assert(!c);
00329 
00330       while (!tlfNS.empty())
00331       {
00332          delete tlfNS.getNext();
00333       }
00334 
00335       c = tlfNS.add(new Foo("first"), TimeLimitFifo<Foo>::EnforceTimeDepth);
00336       assert(c);
00337    }
00338 
00339    {
00340       cerr << "!! Test unlimited" << endl;
00341 
00342       TimeLimitFifo<Foo> tlfNS(0, 0); // unlimited
00343 
00344       bool c;
00345 
00346       assert(tlfNS.empty());
00347       assert(tlfNS.size() == 0);
00348       assert(tlfNS.timeDepth() == 0);
00349 
00350       for (int i = 0; i < 100; ++i)
00351       {
00352          c = tlfNS.add(new Foo(Data("element") + Data(i)), TimeLimitFifo<Foo>::EnforceTimeDepth);
00353          assert(c);
00354          sleepMS(1000);
00355       }
00356    }
00357    
00358    {
00359       cerr << "!! Test produce consumer" << endl;
00360 
00361        TimeLimitFifo<Foo> tlfNS(20, 5000);
00362        Producer prod(tlfNS);
00363        Consumer cons(tlfNS);
00364 
00365        cons.run();
00366        prod.run();
00367 #ifdef VERBOSE
00368        cerr << "Producer and consumer threads are running" << endl;
00369 #endif
00370        prod.join();
00371 #ifdef VERBOSE
00372        cerr << "Producer thread finished" << endl;
00373 #endif
00374        cons.shutdown();
00375        cons.join();
00376 #ifdef VERBOSE
00377        cerr << "Consumer thread finished" << endl;
00378 #endif
00379    }
00380 
00381    {
00382       cerr << "!! Test producers consumers" << endl;
00383 
00384       TimeLimitFifo<Foo> tlfNS(20, 50000);
00385        
00386       Producer prod1(tlfNS);
00387       Producer prod2(tlfNS);
00388       Producer prod3(tlfNS);
00389       Producer prod4(tlfNS);
00390       Producer prod5(tlfNS);
00391       Producer prod6(tlfNS);
00392       Producer prod7(tlfNS);
00393       Producer prod8(tlfNS);
00394       Producer prod9(tlfNS);
00395       Producer prod10(tlfNS);
00396 
00397       Consumer cons1(tlfNS);
00398       Consumer cons2(tlfNS);
00399       Consumer cons3(tlfNS);
00400       Consumer cons4(tlfNS);
00401       Consumer cons5(tlfNS);
00402       Consumer cons6(tlfNS);
00403       Consumer cons7(tlfNS);
00404       Consumer cons8(tlfNS);
00405       Consumer cons9(tlfNS);
00406       Consumer cons10(tlfNS);
00407 
00408 
00409       cons1.run();
00410       cons2.run();
00411       cons3.run();
00412       cons4.run();
00413       cons5.run();
00414       cons6.run();
00415       cons7.run();
00416       cons8.run();
00417       cons9.run();
00418       cons10.run();
00419 
00420 #ifdef VERBOSE
00421       cerr << "before getNext(1000) " << Timer::getTimeMs() << endl;
00422 #endif
00423       tlfNS.getNext(1000);
00424 #ifdef VERBOSE
00425       cerr << "after getNext(1000) " << Timer::getTimeMs() << endl;
00426 #endif
00427       prod1.run();
00428 
00429 #ifdef VERBOSE
00430       cerr << "before getNext(1000) " << Timer::getTimeMs() << endl;
00431 #endif
00432       tlfNS.getNext(1000);
00433 #ifdef VERBOSE
00434       cerr << "after getNext(1000) " << Timer::getTimeMs() << endl;
00435 #endif
00436       prod2.run();
00437       prod3.run();
00438       prod4.run();
00439       prod5.run();
00440       prod6.run();
00441       prod7.run();
00442       prod8.run();
00443       prod9.run();
00444       prod10.run();
00445 
00446       // Wait for producers to finish
00447       prod1.join();
00448       prod2.join();
00449       prod3.join();
00450       prod4.join();
00451       prod5.join();
00452       prod6.join();
00453       prod7.join();
00454       prod8.join();
00455       prod9.join();
00456       prod10.join();
00457 
00458       // Give some time for consumers to finish consuming, before shutting down
00459       sleepMS(1000);
00460    }
00461 
00462    cerr << "All OK" << endl;
00463    return 0;
00464 }
00465 
00466 /* ====================================================================
00467  * The Vovida Software License, Version 1.0 
00468  * 
00469  * Redistribution and use in source and binary forms, with or without
00470  * modification, are permitted provided that the following conditions
00471  * are met:
00472  * 
00473  * 1. Redistributions of source code must retain the above copyright
00474  *    notice, this list of conditions and the following disclaimer.
00475  * 
00476  * 2. Redistributions in binary form must reproduce the above copyright
00477  *    notice, this list of conditions and the following disclaimer in
00478  *    the documentation and/or other materials provided with the
00479  *    distribution.
00480  * 
00481  * 3. The names "VOCAL", "Vovida Open Communication Application Library",
00482  *    and "Vovida Open Communication Application Library (VOCAL)" must
00483  *    not be used to endorse or promote products derived from this
00484  *    software without prior written permission. For written
00485  *    permission, please contact vocal@vovida.org.
00486  *
00487  * 4. Products derived from this software may not be called "VOCAL", nor
00488  *    may "VOCAL" appear in their name, without prior written
00489  *    permission of Vovida Networks, Inc.
00490  * 
00491  * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
00492  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
00493  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
00494  * NON-INFRINGEMENT ARE DISCLAIMED.  IN NO EVENT SHALL VOVIDA
00495  * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
00496  * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
00497  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00498  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00499  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
00500  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00501  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
00502  * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
00503  * DAMAGE.
00504  * 
00505  * ====================================================================
00506  * 
00507  * This software consists of voluntary contributions made by Vovida
00508  * Networks, Inc. and many individuals on behalf of Vovida Networks,
00509  * Inc.  For more information on Vovida Networks, Inc., please see
00510  * <http://www.vovida.org/>.
00511  *
00512  */