|
reSIProcate/rutil
9694
|
00001 #include <iostream> 00002 #include "rutil/Log.hxx" 00003 #include "rutil/Fifo.hxx" 00004 #include "rutil/FiniteFifo.hxx" 00005 #include "rutil/TimeLimitFifo.hxx" 00006 #include "rutil/Data.hxx" 00007 #include "rutil/ThreadIf.hxx" 00008 #include "rutil/Timer.hxx" 00009 #ifndef WIN32 00010 #include <unistd.h> 00011 #endif 00012 00013 //#define VERBOSE 00014 00015 using namespace resip; 00016 using namespace std; 00017 00018 void sleepMS(unsigned int ms) 00019 { 00020 #ifdef WIN32 00021 Sleep(ms); 00022 #else 00023 usleep(ms*1000); 00024 #endif 00025 } 00026 00027 class Foo 00028 { 00029 public: 00030 Foo(const Data& val) 00031 : mVal(val) 00032 {} 00033 00034 Data mVal; 00035 }; 00036 00037 class Consumer: public ThreadIf 00038 { 00039 public: 00040 Consumer(TimeLimitFifo<Foo>&); 00041 virtual ~Consumer() 00042 { 00043 #ifdef VERBOSE 00044 cerr << "Consumer thread finishing..." << endl; 00045 #endif 00046 shutdown(); 00047 join(); 00048 #ifdef VERBOSE 00049 cerr << "Consumer thread finished" << endl; 00050 #endif 00051 }; 00052 00053 void thread(); 00054 00055 private: 00056 TimeLimitFifo<Foo>& mFifo; 00057 }; 00058 00059 class Producer: public ThreadIf 00060 { 00061 public: 00062 Producer(TimeLimitFifo<Foo>&); 00063 virtual ~Producer() 00064 { 00065 #ifdef VERBOSE 00066 cerr << "Producer thread finishing" << endl; 00067 #endif 00068 shutdown(); 00069 join(); 00070 #ifdef VERBOSE 00071 cerr << "Producer thread finished" << endl; 00072 #endif 00073 } 00074 00075 void thread(); 00076 00077 private: 00078 TimeLimitFifo<Foo>& mFifo; 00079 }; 00080 00081 Consumer::Consumer(TimeLimitFifo<Foo>& f) : 00082 mFifo(f) 00083 {} 00084 00085 void Consumer::thread() 00086 { 00087 static unsigned wakeups[6] = { 10, 20, 30, 0, 10, 30 }; 00088 unsigned int w = 0; 00089 00090 #ifdef VERBOSE 00091 cerr << "Consumer running..." << endl; 00092 #endif 00093 00094 while (!mShutdown) 00095 { 00096 if (mFifo.messageAvailable()) 00097 { 00098 delete mFifo.getNext(100); 00099 } 00100 else 00101 { 00102 unsigned wakeup = wakeups[w]; 00103 w = (w + 1) % 6; 00104 #ifdef VERBOSE 00105 cerr << "Consumer sleeping for " << wakeup << " ms with mSize " << mFifo.size() << endl; 00106 #endif 00107 if (wakeup > 0) 00108 { 00109 sleepMS(wakeup); 00110 } 00111 } 00112 } 00113 } 00114 00115 Producer::Producer(TimeLimitFifo<Foo>& f) : 00116 mFifo(f) 00117 {} 00118 00119 void Producer::thread() 00120 { 00121 static unsigned wakeups[6] = { 0, 10, 0, 20, 30, 10 }; 00122 unsigned int w = 0; 00123 00124 #ifdef VERBOSE 00125 cerr << "Producer running..." << endl; 00126 #endif 00127 00128 for (unsigned long n = 0; n < 0x1ffff; n++) 00129 { 00130 if (mFifo.wouldAccept(TimeLimitFifo<Foo>::EnforceTimeDepth)) 00131 { 00132 mFifo.add(new Foo(Data(n)), TimeLimitFifo<Foo>::EnforceTimeDepth); 00133 } 00134 else 00135 { 00136 unsigned wakeup = wakeups[w]; 00137 w = (w + 1) % 6; 00138 #ifdef VERBOSE 00139 cerr << "Producer sleeping for " << wakeup << " ms at " << n << " with mSize " << mFifo.size() << endl; 00140 #endif 00141 if (wakeup > 0) 00142 { 00143 sleepMS(wakeup); 00144 } 00145 } 00146 } 00147 } 00148 00149 bool 00150 isNear(int value, int reference, int epsilon=250) 00151 { 00152 int diff = ::abs(value-reference); 00153 return (diff < epsilon); 00154 } 00155 00156 int 00157 main() 00158 { 00159 Log::initialize(Log::Cout, Log::Debug, Data::Empty); 00160 00161 { 00162 cerr << "!! test getNext(ms) empty fifo timing" << endl; 00163 Fifo<Foo> fifo; 00164 UInt64 begin(Timer::getTimeMs()); 00165 fifo.getNext(2000); 00166 UInt64 end(Timer::getTimeMs()); 00167 cerr << begin << " " << end << " " << end-begin << endl; 00168 00169 int offMark = 2000 - (end - begin); 00170 00171 assert(abs(offMark) < 200); 00172 } 00173 00174 Fifo<Foo> f; 00175 FiniteFifo<Foo> ff(5); 00176 00177 { 00178 cerr << "!! test basic" << endl; 00179 00180 bool c; 00181 TimeLimitFifo<Foo> tlf(5, 10); // 5 seconds or 10 count limit 00182 00183 assert(tlf.empty()); 00184 assert(tlf.size() == 0); 00185 assert(tlf.timeDepth() == 0); 00186 00187 c = tlf.add(new Foo("first"), TimeLimitFifo<Foo>::EnforceTimeDepth); 00188 assert(c); 00189 00190 #ifdef VERBOSE 00191 cerr << __LINE__ << endl; 00192 #endif 00193 00194 assert(!tlf.empty()); 00195 assert(tlf.size() == 1); 00196 #ifdef VERBOSE 00197 cerr << tlf.timeDepth() << endl; 00198 #endif 00199 assert(tlf.timeDepth() == 0); 00200 00201 #ifdef VERBOSE 00202 cerr << __LINE__ << endl; 00203 #endif 00204 00205 sleepMS(2000); 00206 00207 assert(!tlf.empty()); 00208 assert(tlf.size() == 1); 00209 assert(tlf.timeDepth() > 1); 00210 00211 delete tlf.getNext(); 00212 00213 assert(tlf.empty()); 00214 assert(tlf.size() == 0); 00215 assert(tlf.timeDepth() == 0); 00216 00217 #ifdef VERBOSE 00218 cerr << __LINE__ << endl; 00219 #endif 00220 00221 c = tlf.add(new Foo("first"), TimeLimitFifo<Foo>::EnforceTimeDepth); 00222 assert(c); 00223 sleepMS(3000); 00224 c = tlf.add(new Foo("second"), TimeLimitFifo<Foo>::EnforceTimeDepth); 00225 assert(c); 00226 sleepMS(3000); 00227 c = tlf.add(new Foo("nope"), TimeLimitFifo<Foo>::EnforceTimeDepth); 00228 assert(!c); 00229 c = tlf.add(new Foo("yep"), TimeLimitFifo<Foo>::IgnoreTimeDepth); 00230 assert(c); 00231 c = tlf.add(new Foo("internal"), TimeLimitFifo<Foo>::InternalElement); 00232 assert(c); 00233 00234 cerr << __LINE__ << endl; 00235 00236 Foo* fp = tlf.getNext(); 00237 assert(fp->mVal == "first"); 00238 delete fp; 00239 c = tlf.add(new Foo("third"), TimeLimitFifo<Foo>::EnforceTimeDepth); 00240 assert(c); 00241 } 00242 00243 { 00244 cerr << "!! Test time depth" << endl; 00245 00246 TimeLimitFifo<Foo> tlfNS(5, 0); // 5 seconds, no count limit 00247 bool c; 00248 00249 assert(tlfNS.empty()); 00250 assert(tlfNS.size() == 0); 00251 assert(tlfNS.timeDepth() == 0); 00252 00253 c = tlfNS.add(new Foo("first"), TimeLimitFifo<Foo>::EnforceTimeDepth); 00254 assert(c); 00255 sleepMS(3000); 00256 c = tlfNS.add(new Foo("second"), TimeLimitFifo<Foo>::EnforceTimeDepth); 00257 assert(c); 00258 sleepMS(3000); 00259 c = tlfNS.add(new Foo("nope"), TimeLimitFifo<Foo>::EnforceTimeDepth); 00260 assert(!c); 00261 Foo* fp = tlfNS.getNext(); 00262 assert(fp->mVal == "first"); 00263 delete fp; 00264 c = tlfNS.add(new Foo("third"), TimeLimitFifo<Foo>::EnforceTimeDepth); 00265 assert(c); 00266 } 00267 00268 { 00269 TimeLimitFifo<Foo> tlfNS(5, 0); // 5 seconds, no count limit 00270 bool c; 00271 00272 assert(tlfNS.empty()); 00273 assert(tlfNS.size() == 0); 00274 assert(tlfNS.timeDepth() == 0); 00275 00276 for (int i = 0; i < 100; ++i) 00277 { 00278 c = tlfNS.add(new Foo(Data("element") + Data(i)), TimeLimitFifo<Foo>::EnforceTimeDepth); 00279 assert(c); 00280 } 00281 00282 sleepMS(6000); 00283 c = tlfNS.add(new Foo("nope"), TimeLimitFifo<Foo>::EnforceTimeDepth); 00284 assert(!c); 00285 00286 c = tlfNS.add(new Foo("yep"), TimeLimitFifo<Foo>::IgnoreTimeDepth); 00287 assert(c); 00288 00289 assert(tlfNS.size() == 101); 00290 00291 while (!tlfNS.empty()) 00292 { 00293 delete tlfNS.getNext(); 00294 } 00295 00296 c = tlfNS.add(new Foo("first"), TimeLimitFifo<Foo>::EnforceTimeDepth); 00297 assert(c); 00298 } 00299 00300 { 00301 cerr << "!! Test reserved" << endl; 00302 00303 TimeLimitFifo<Foo> tlfNS(5, 10); // 5 seconds, limit 10 (2 reserved) 00304 bool c; 00305 00306 assert(tlfNS.empty()); 00307 assert(tlfNS.size() == 0); 00308 assert(tlfNS.timeDepth() == 0); 00309 00310 for (int i = 0; i < 8; ++i) 00311 { 00312 c = tlfNS.add(new Foo(Data("element") + Data(i)), TimeLimitFifo<Foo>::EnforceTimeDepth); 00313 assert(c); 00314 } 00315 00316 c = tlfNS.add(new Foo("nope"), TimeLimitFifo<Foo>::IgnoreTimeDepth); 00317 assert(!c); 00318 00319 assert(tlfNS.size() == 8); 00320 00321 c = tlfNS.add(new Foo("yep"), TimeLimitFifo<Foo>::InternalElement); 00322 assert(c); 00323 00324 c = tlfNS.add(new Foo("yepAgain"), TimeLimitFifo<Foo>::InternalElement); 00325 assert(c); 00326 00327 c = tlfNS.add(new Foo("hard nope!"), TimeLimitFifo<Foo>::InternalElement); 00328 assert(!c); 00329 00330 while (!tlfNS.empty()) 00331 { 00332 delete tlfNS.getNext(); 00333 } 00334 00335 c = tlfNS.add(new Foo("first"), TimeLimitFifo<Foo>::EnforceTimeDepth); 00336 assert(c); 00337 } 00338 00339 { 00340 cerr << "!! Test unlimited" << endl; 00341 00342 TimeLimitFifo<Foo> tlfNS(0, 0); // unlimited 00343 00344 bool c; 00345 00346 assert(tlfNS.empty()); 00347 assert(tlfNS.size() == 0); 00348 assert(tlfNS.timeDepth() == 0); 00349 00350 for (int i = 0; i < 100; ++i) 00351 { 00352 c = tlfNS.add(new Foo(Data("element") + Data(i)), TimeLimitFifo<Foo>::EnforceTimeDepth); 00353 assert(c); 00354 sleepMS(1000); 00355 } 00356 } 00357 00358 { 00359 cerr << "!! Test produce consumer" << endl; 00360 00361 TimeLimitFifo<Foo> tlfNS(20, 5000); 00362 Producer prod(tlfNS); 00363 Consumer cons(tlfNS); 00364 00365 cons.run(); 00366 prod.run(); 00367 #ifdef VERBOSE 00368 cerr << "Producer and consumer threads are running" << endl; 00369 #endif 00370 prod.join(); 00371 #ifdef VERBOSE 00372 cerr << "Producer thread finished" << endl; 00373 #endif 00374 cons.shutdown(); 00375 cons.join(); 00376 #ifdef VERBOSE 00377 cerr << "Consumer thread finished" << endl; 00378 #endif 00379 } 00380 00381 { 00382 cerr << "!! Test producers consumers" << endl; 00383 00384 TimeLimitFifo<Foo> tlfNS(20, 50000); 00385 00386 Producer prod1(tlfNS); 00387 Producer prod2(tlfNS); 00388 Producer prod3(tlfNS); 00389 Producer prod4(tlfNS); 00390 Producer prod5(tlfNS); 00391 Producer prod6(tlfNS); 00392 Producer prod7(tlfNS); 00393 Producer prod8(tlfNS); 00394 Producer prod9(tlfNS); 00395 Producer prod10(tlfNS); 00396 00397 Consumer cons1(tlfNS); 00398 Consumer cons2(tlfNS); 00399 Consumer cons3(tlfNS); 00400 Consumer cons4(tlfNS); 00401 Consumer cons5(tlfNS); 00402 Consumer cons6(tlfNS); 00403 Consumer cons7(tlfNS); 00404 Consumer cons8(tlfNS); 00405 Consumer cons9(tlfNS); 00406 Consumer cons10(tlfNS); 00407 00408 00409 cons1.run(); 00410 cons2.run(); 00411 cons3.run(); 00412 cons4.run(); 00413 cons5.run(); 00414 cons6.run(); 00415 cons7.run(); 00416 cons8.run(); 00417 cons9.run(); 00418 cons10.run(); 00419 00420 #ifdef VERBOSE 00421 cerr << "before getNext(1000) " << Timer::getTimeMs() << endl; 00422 #endif 00423 tlfNS.getNext(1000); 00424 #ifdef VERBOSE 00425 cerr << "after getNext(1000) " << Timer::getTimeMs() << endl; 00426 #endif 00427 prod1.run(); 00428 00429 #ifdef VERBOSE 00430 cerr << "before getNext(1000) " << Timer::getTimeMs() << endl; 00431 #endif 00432 tlfNS.getNext(1000); 00433 #ifdef VERBOSE 00434 cerr << "after getNext(1000) " << Timer::getTimeMs() << endl; 00435 #endif 00436 prod2.run(); 00437 prod3.run(); 00438 prod4.run(); 00439 prod5.run(); 00440 prod6.run(); 00441 prod7.run(); 00442 prod8.run(); 00443 prod9.run(); 00444 prod10.run(); 00445 00446 // Wait for producers to finish 00447 prod1.join(); 00448 prod2.join(); 00449 prod3.join(); 00450 prod4.join(); 00451 prod5.join(); 00452 prod6.join(); 00453 prod7.join(); 00454 prod8.join(); 00455 prod9.join(); 00456 prod10.join(); 00457 00458 // Give some time for consumers to finish consuming, before shutting down 00459 sleepMS(1000); 00460 } 00461 00462 cerr << "All OK" << endl; 00463 return 0; 00464 } 00465 00466 /* ==================================================================== 00467 * The Vovida Software License, Version 1.0 00468 * 00469 * Redistribution and use in source and binary forms, with or without 00470 * modification, are permitted provided that the following conditions 00471 * are met: 00472 * 00473 * 1. Redistributions of source code must retain the above copyright 00474 * notice, this list of conditions and the following disclaimer. 00475 * 00476 * 2. Redistributions in binary form must reproduce the above copyright 00477 * notice, this list of conditions and the following disclaimer in 00478 * the documentation and/or other materials provided with the 00479 * distribution. 00480 * 00481 * 3. The names "VOCAL", "Vovida Open Communication Application Library", 00482 * and "Vovida Open Communication Application Library (VOCAL)" must 00483 * not be used to endorse or promote products derived from this 00484 * software without prior written permission. For written 00485 * permission, please contact vocal@vovida.org. 00486 * 00487 * 4. Products derived from this software may not be called "VOCAL", nor 00488 * may "VOCAL" appear in their name, without prior written 00489 * permission of Vovida Networks, Inc. 00490 * 00491 * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED 00492 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 00493 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND 00494 * NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA 00495 * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES 00496 * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL, 00497 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00498 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00499 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 00500 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00501 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE 00502 * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 00503 * DAMAGE. 00504 * 00505 * ==================================================================== 00506 * 00507 * This software consists of voluntary contributions made by Vovida 00508 * Networks, Inc. and many individuals on behalf of Vovida Networks, 00509 * Inc. For more information on Vovida Networks, Inc., please see 00510 * <http://www.vovida.org/>. 00511 * 00512 */
1.7.5.1