|
reSIProcate/rutil
9694
|
00001 #include <assert.h> 00002 #include <string.h> 00003 00004 #include "rutil/FdPoll.hxx" 00005 #include "rutil/FdSetIOObserver.hxx" 00006 #include "rutil/Logger.hxx" 00007 #include "rutil/BaseException.hxx" 00008 00009 #include <vector> 00010 00011 #ifdef RESIP_POLL_IMPL_EPOLL 00012 # include <sys/epoll.h> 00013 #endif 00014 00015 using namespace resip; 00016 #define RESIPROCATE_SUBSYSTEM Subsystem::SIP 00017 00018 /***************************************************************** 00019 * 00020 * FdPollItemIf and FdPollItemBase impl 00021 * 00022 *****************************************************************/ 00023 00024 FdPollItemIf::~FdPollItemIf() 00025 { 00026 } 00027 00028 FdPollItemBase::FdPollItemBase(FdPollGrp *grp, Socket fd, FdPollEventMask mask) : 00029 mPollGrp(grp), mPollSocket(fd) 00030 { 00031 mPollHandle = mPollGrp->addPollItem(fd, mask, this); 00032 } 00033 00034 FdPollItemBase::~FdPollItemBase() 00035 { 00036 mPollGrp->delPollItem(mPollHandle); 00037 } 00038 00039 /***************************************************************** 00040 * 00041 * FdPollGrp 00042 * 00043 * Implementation for some of the base class methods. 00044 * While some of these are epoll-specific, we can (and do) implement 00045 * them at this level. 00046 * For now we use delegation for the impl data structures 00047 * rather than inhieritance. Long term, not sure which will 00048 * be cleaner. 00049 * 00050 *****************************************************************/ 00051 00052 FdPollGrp::FdPollGrp() 00053 { 00054 } 00055 00056 FdPollGrp::~FdPollGrp() 00057 { 00058 } 00059 00060 void 00061 FdPollGrp::processItem(FdPollItemIf *item, FdPollEventMask mask) 00062 { 00063 try 00064 { 00065 item->processPollEvent( mask ); 00066 } 00067 catch(BaseException& e) 00068 { 00069 // kill it or something? 00070 ErrLog(<<"Exception thrown for FdPollItem: " << e); 00071 } 00072 item = NULL; // WATCHOUT: item may have been deleted 00073 /* 00074 * If FPEM_Error was reported, should really make sure it was deleted 00075 * or disabled from polling. Otherwise were in stuck in an infinite loop. 00076 * But difficult to do that checking robustly until we serials the items. 00077 */ 00078 } 00079 00080 int 00081 FdPollGrp::getEPollFd() const 00082 { 00083 return -1; 00084 } 00085 00086 /***************************************************************** 00087 * 00088 * FdPollImplFdSet 00089 * 00090 *****************************************************************/ 00091 00098 namespace resip 00099 { 00100 00101 class FdPollItemFdSetInfo 00102 { 00103 public: 00104 FdPollItemFdSetInfo() 00105 : mSocketFd(INVALID_SOCKET), mItemObj(0), mEvMask(0), mNextIdx(-1) 00106 { 00107 } 00108 00109 Socket mSocketFd; // socket 00110 FdPollItemIf* mItemObj; // callback object 00111 FdPollEventMask mEvMask; // events the application wants 00112 int mNextIdx; // next link for live or free list 00113 }; 00114 00115 00116 00117 class FdPollImplFdSet : public FdPollGrp 00118 { 00119 public: 00120 FdPollImplFdSet(); 00121 ~FdPollImplFdSet(); 00122 00123 virtual const char* getImplName() const { return "fdset"; } 00124 00125 virtual FdPollItemHandle addPollItem(Socket fd, 00126 FdPollEventMask newMask, FdPollItemIf *item); 00127 virtual void modPollItem(FdPollItemHandle handle, 00128 FdPollEventMask newMask); 00129 virtual void delPollItem(FdPollItemHandle handle); 00130 00131 virtual void registerFdSetIOObserver(FdSetIOObserver& observer); 00132 virtual void unregisterFdSetIOObserver(FdSetIOObserver& observer); 00133 00134 virtual bool waitAndProcess(int ms=0); 00135 virtual void buildFdSet(FdSet& fdSet); 00136 virtual bool processFdSet(FdSet& fdset); 00137 00138 protected: 00139 virtual unsigned int buildFdSetForObservers(FdSet& fdSet); 00140 void killCache(Socket fd); 00141 00142 std::vector<FdPollItemFdSetInfo> mItems; 00143 std::vector<FdSetIOObserver*> mFdSetObservers; 00144 00145 /* 00146 * The ItemInfos are stored in a vector (above) that grows as needed. 00147 * Every Info is in one single-linked list, either the "Live" list 00148 * or the "Free" list. This is somewhat like using 00149 * boost::intrusive::slist, except we use indices not pointers 00150 * since the vector may reallocate and move around. 00151 */ 00152 int mLiveHeadIdx; 00153 int mFreeHeadIdx; 00154 00155 /* 00156 * This is temporary cache of poll events. It is a member (and 00157 * not on stack) for two reasons: (1) simpler memory management, 00158 * and (2) so delPollItem() can traverse it and clean up. 00159 */ 00160 FdSet mSelectSet; 00161 }; 00162 00163 }; // namespace 00164 00165 // NOTE: shift by one so that idx=0 doesn't have NULL handle 00166 #define IMPL_FDSET_IdxToHandle(idx) ((FdPollItemHandle)( ((char*)0) + ((idx)+1) )) 00167 #define IMPL_FDSET_HandleToIdx(handle) ( ((char*)(handle)) - ((char*)0) - 1) 00168 00169 FdPollImplFdSet::FdPollImplFdSet() 00170 : mLiveHeadIdx(-1), mFreeHeadIdx(-1) 00171 { 00172 } 00173 00174 FdPollImplFdSet::~FdPollImplFdSet() 00175 { 00176 // assert( mEvCacheLen == 0 ); // poll not active 00177 unsigned itemIdx; 00178 for (itemIdx=0; itemIdx < mItems.size(); itemIdx++) 00179 { 00180 FdPollItemFdSetInfo& info = mItems[itemIdx]; 00181 if (info.mItemObj) 00182 { 00183 CritLog(<<"FdPollItem idx="<<itemIdx 00184 <<" not deleted prior to destruction"); 00185 } 00186 } 00187 } 00188 00189 FdPollItemHandle 00190 FdPollImplFdSet::addPollItem(Socket fd, FdPollEventMask newMask, FdPollItemIf *item) 00191 { 00192 // if this isn't true then the linked lists will get messed up 00193 assert(item); 00194 assert(fd!=INVALID_SOCKET); 00195 00196 unsigned useIdx; 00197 if ( mFreeHeadIdx >= 0 ) 00198 { 00199 useIdx = mFreeHeadIdx; 00200 mFreeHeadIdx = mItems[useIdx].mNextIdx; 00201 } 00202 else 00203 { 00204 useIdx = mItems.size(); 00205 unsigned newsz = 10+useIdx + useIdx/3; // plus 30% margin 00206 // WATCHOUT: below may trigger re-allocation, invalidating any iters 00207 // We don't use iters (only indices), but need to watchout for 00208 // cached pointers 00209 mItems.resize(newsz); 00210 // push new items onto the free list 00211 unsigned itemIdx; 00212 for (itemIdx=useIdx+1; itemIdx < newsz; itemIdx++) 00213 { 00214 mItems[itemIdx].mNextIdx = mFreeHeadIdx; 00215 mFreeHeadIdx = itemIdx; 00216 } 00217 } 00218 FdPollItemFdSetInfo& info = mItems[useIdx]; 00219 info.mItemObj = item; 00220 info.mSocketFd = fd; 00221 info.mEvMask = newMask; 00222 info.mNextIdx = mLiveHeadIdx; 00223 mLiveHeadIdx = useIdx; 00224 00225 if ( info.mEvMask & FPEM_Read ) 00226 mSelectSet.setRead(info.mSocketFd); 00227 if ( info.mEvMask & FPEM_Write ) 00228 mSelectSet.setWrite(info.mSocketFd); 00229 if ( info.mEvMask & FPEM_Error ) 00230 mSelectSet.setExcept(info.mSocketFd); 00231 00232 return IMPL_FDSET_IdxToHandle(useIdx); 00233 } 00234 00235 void 00236 FdPollImplFdSet::modPollItem(const FdPollItemHandle handle, FdPollEventMask newMask) 00237 { 00238 int useIdx = IMPL_FDSET_HandleToIdx(handle); 00239 assert(useIdx>=0 && ((unsigned)useIdx) < mItems.size()); 00240 FdPollItemFdSetInfo& info = mItems[useIdx]; 00241 assert(info.mSocketFd!=INVALID_SOCKET); 00242 assert(info.mItemObj); 00243 info.mEvMask = newMask; 00244 00245 killCache(info.mSocketFd); 00246 if ( info.mEvMask & FPEM_Read ) 00247 mSelectSet.setRead(info.mSocketFd); 00248 if ( info.mEvMask & FPEM_Write ) 00249 mSelectSet.setWrite(info.mSocketFd); 00250 if ( info.mEvMask & FPEM_Error ) 00251 mSelectSet.setExcept(info.mSocketFd); 00252 } 00253 00254 void 00255 FdPollImplFdSet::delPollItem(FdPollItemHandle handle) 00256 { 00257 int useIdx = IMPL_FDSET_HandleToIdx(handle); 00258 //DebugLog(<<"deleting epoll item fd="<<fd); 00259 assert(useIdx>=0 && ((unsigned)useIdx) < mItems.size()); 00260 FdPollItemFdSetInfo& info = mItems[useIdx]; 00261 assert(info.mSocketFd!=INVALID_SOCKET); 00262 assert(info.mItemObj); 00263 killCache(info.mSocketFd); 00264 // we don't change the lists here since the select loop might 00265 // be iterating. Just mark it as dead and gc it later. 00266 info.mSocketFd = INVALID_SOCKET; 00267 info.mItemObj = NULL; 00268 info.mEvMask = 0; 00269 } 00270 00271 void 00272 FdPollImplFdSet::registerFdSetIOObserver(FdSetIOObserver& observer) 00273 { 00274 // .bwc. Could make this sorted. Probably not worth the trouble. 00275 mFdSetObservers.push_back(&observer); 00276 } 00277 00278 void 00279 FdPollImplFdSet::unregisterFdSetIOObserver(FdSetIOObserver& observer) 00280 { 00281 // .bwc. Could make this sorted. Probably not worth the trouble. 00282 for(std::vector<FdSetIOObserver*>::iterator o=mFdSetObservers.begin(); 00283 o!=mFdSetObservers.end();++o) 00284 { 00285 if(*o==&observer) 00286 { 00287 mFdSetObservers.erase(o); 00288 return; 00289 } 00290 } 00291 } 00292 00293 00314 void 00315 FdPollImplFdSet::killCache(Socket fd) 00316 { 00317 mSelectSet.clear(fd); 00318 } 00319 00320 00321 bool 00322 FdPollImplFdSet::waitAndProcess(int ms) 00323 { 00324 if(ms<0) 00325 { 00326 // On Linux, passing a NULL timeout ptr to select() will wait 00327 // forever, but I don't want to trust that on all platforms. 00328 // So use 60sec as approximation of "forever". 00329 // Use 60sec b/c fits in short. 00330 ms = 60*1000; 00331 } 00332 00333 // Create copy; is cheaper than rebuilding from scratch every time. 00334 FdSet fdset(mSelectSet); 00335 ms = resipMin(buildFdSetForObservers(fdset), (unsigned int)ms); 00336 00337 // Step 2: Select on our built FdSet 00338 int numReady = fdset.selectMilliSeconds(ms); 00339 if ( numReady < 0 ) 00340 { 00341 int err = getErrno(); 00342 if ( err!=EINTR ) 00343 { 00344 CritLog(<<"select() failed: "<<strerror(err)); 00345 assert(0); // .kw. not sure correct behavior... 00346 } 00347 return false; 00348 } 00349 00350 if ( numReady==0 ) 00351 { 00352 return false; // timer expired 00353 } 00354 00355 return processFdSet(fdset); 00356 } 00357 00358 void 00359 FdPollImplFdSet::buildFdSet(FdSet& fdset) 00360 { 00361 int* prevIdxRef=&mLiveHeadIdx; 00362 int loopCnt = 0; 00363 int itemIdx; 00364 00365 // Step 1: build a new FdSet from the Items vector 00366 while ( (itemIdx = *prevIdxRef) != -1 ) 00367 { 00368 assert( ++loopCnt < 99123123 ); 00369 FdPollItemFdSetInfo& info = mItems[itemIdx]; 00370 if ( info.mItemObj==0 ) 00371 { 00372 // item was deleted, need to garbage collect 00373 assert( info.mEvMask==0 ); 00374 // unlink from live list 00375 *prevIdxRef = info.mNextIdx; 00376 // link into free list 00377 info.mNextIdx = mFreeHeadIdx; 00378 mFreeHeadIdx = itemIdx; 00379 continue; 00380 } 00381 if ( info.mEvMask!=0 ) 00382 { 00383 assert(info.mSocketFd!=INVALID_SOCKET); 00384 if ( info.mEvMask & FPEM_Read ) 00385 fdset.setRead(info.mSocketFd); 00386 if ( info.mEvMask & FPEM_Write ) 00387 fdset.setWrite(info.mSocketFd); 00388 if ( info.mEvMask & FPEM_Error ) 00389 fdset.setExcept(info.mSocketFd); 00390 } 00391 prevIdxRef = &info.mNextIdx; 00392 } 00393 00394 // Allow any FdSetIOObservers a crack at the FdSet; we can't really optimize 00395 // this part. 00396 buildFdSetForObservers(fdset); 00397 } 00398 00399 unsigned int 00400 FdPollImplFdSet::buildFdSetForObservers(FdSet& fdset) 00401 { 00402 unsigned int ms=INT_MAX; 00403 for(std::vector<FdSetIOObserver*>::iterator o=mFdSetObservers.begin(); 00404 o!=mFdSetObservers.end();++o) 00405 { 00406 (*o)->buildFdSet(fdset); 00407 ms = resipMin(ms, (*o)->getTimeTillNextProcessMS()); 00408 } 00409 return ms; 00410 } 00411 00412 bool 00413 FdPollImplFdSet::processFdSet(FdSet& fdset) 00414 { 00415 bool didsomething = false; 00416 int itemIdx; 00417 int* prevIdxRef = &mLiveHeadIdx; 00418 int loopCnt = 0; 00419 00420 // Step 3: Invoke callbacks 00421 // Could take advantage of early via numReady, but book keeping 00422 // seems tedious especially if items are deleted during walk 00423 while ( (itemIdx = *prevIdxRef) != -1 ) 00424 { 00425 FdPollItemFdSetInfo& info = mItems[itemIdx]; 00426 assert( ++loopCnt < 99123123 ); 00427 if ( info.mEvMask!=0 && info.mItemObj!=0 ) 00428 { 00429 FdPollEventMask usrMask = 0; 00430 assert(info.mSocketFd!=INVALID_SOCKET); 00431 if ( fdset.readyToRead(info.mSocketFd) ) 00432 usrMask |= FPEM_Read; 00433 if ( fdset.readyToWrite(info.mSocketFd) ) 00434 usrMask |= FPEM_Write; 00435 if ( fdset.hasException(info.mSocketFd) ) 00436 usrMask |= FPEM_Error; 00437 00438 // items's mask may have changed since select occured, so mask it again 00439 usrMask &= info.mEvMask; 00440 if ( usrMask ) 00441 { 00442 processItem(info.mItemObj, usrMask); 00443 didsomething = true; 00444 } 00445 } 00446 // WATCHOUT: {info} may have moved due to add during processItem() 00447 // set pointer using index, not {info} 00448 prevIdxRef = &mItems[itemIdx].mNextIdx; 00449 } 00450 00451 // Step 3.1: Invoke callbacks on any FdSetIOObservers 00452 for(std::vector<FdSetIOObserver*>::iterator o=mFdSetObservers.begin(); 00453 o!=mFdSetObservers.end();++o) 00454 { 00455 // This is not strictly correct; we do not know if this observer actually 00456 // put any FDs in the set, or if any of these FDs ended up being ready. 00457 // Eventually, it would be nice to have process() return whether any 00458 // actual IO was performed. 00459 didsomething=true; 00460 (*o)->process(fdset); 00461 } 00462 00463 return didsomething; 00464 } 00465 00466 00467 // end of ImplFdSet 00468 00469 00470 /***************************************************************** 00471 * 00472 * FdPollImplEpoll 00473 * 00474 *****************************************************************/ 00475 00476 #ifdef RESIP_POLL_IMPL_EPOLL 00477 00478 namespace resip 00479 { 00480 00481 00482 00483 class FdPollImplEpoll : public FdPollGrp 00484 { 00485 public: 00486 FdPollImplEpoll(); 00487 ~FdPollImplEpoll(); 00488 00489 virtual const char* getImplName() const { return "epoll"; } 00490 00491 virtual FdPollItemHandle addPollItem(Socket fd, 00492 FdPollEventMask newMask, FdPollItemIf *item); 00493 virtual void modPollItem(FdPollItemHandle handle, 00494 FdPollEventMask newMask); 00495 virtual void delPollItem(FdPollItemHandle handle); 00496 virtual void registerFdSetIOObserver(FdSetIOObserver& observer); 00497 virtual void unregisterFdSetIOObserver(FdSetIOObserver& observer); 00498 00499 virtual bool waitAndProcess(int ms=0); 00500 00502 virtual int getEPollFd() const { return mEPollFd; } 00503 virtual void buildFdSet(FdSet& fdSet); 00504 virtual bool processFdSet(FdSet& fdset); 00505 00506 protected: 00507 void killCache(Socket fd); 00508 bool epollWait(int ms); 00509 00510 std::vector<FdPollItemIf*> mItems; // indexed by fd 00511 std::vector<FdSetIOObserver*> mFdSetObservers; 00512 int mEPollFd; // from epoll_create() 00513 00514 /* 00515 * This is temporary cache of poll events. It is a member (and 00516 * not on stack) for two reasons: (1) simpler memory management, 00517 * and (2) so delPollItem() can traverse it and clean up. 00518 */ 00519 std::vector<struct epoll_event> mEvCache; 00520 int mEvCacheCur; 00521 int mEvCacheLen; 00522 }; 00523 00524 }; // namespace 00525 00526 // NOTE: shift by one so that fd=0 doesn't have NULL handle 00527 #define IMPL_EPOLL_FdToHandle(fd) ((FdPollItemHandle)( ((char*)0) + ((fd)+1) )) 00528 #define IMPL_EPOLL_HandleToFd(handle) ( ((char*)(handle)) - ((char*)0) - 1) 00529 00530 FdPollImplEpoll::FdPollImplEpoll() : 00531 mEPollFd(-1) 00532 { 00533 int sz = 200; // ignored 00534 if ( (mEPollFd = epoll_create(sz)) < 0 ) 00535 { 00536 CritLog(<<"epoll_create() failed: "<<strerror(errno)); 00537 abort(); 00538 } 00539 mEvCache.resize(sz); 00540 mEvCacheCur = mEvCacheLen = 0; 00541 } 00542 00543 FdPollImplEpoll::~FdPollImplEpoll() 00544 { 00545 assert( mEvCacheLen == 0 ); // poll not active 00546 unsigned itemIdx; 00547 for (itemIdx=0; itemIdx < mItems.size(); itemIdx++) 00548 { 00549 FdPollItemIf *item = mItems[itemIdx]; 00550 if (item) 00551 { 00552 CritLog(<<"FdPollItem idx="<<itemIdx 00553 <<" not deleted prior to destruction"); 00554 } 00555 } 00556 if (mEPollFd != -1) 00557 { 00558 close(mEPollFd); 00559 } 00560 } 00561 00562 static inline unsigned short 00563 CvtSysToUsrMask(unsigned long sysMask) 00564 { 00565 unsigned usrMask = 0; 00566 if ( sysMask & EPOLLIN ) 00567 usrMask |= FPEM_Read; 00568 if ( sysMask & EPOLLOUT ) 00569 usrMask |= FPEM_Write; 00570 if ( sysMask & EPOLLERR ) 00571 usrMask |= FPEM_Error|FPEM_Read|FPEM_Write; 00572 // NOTE: above, fake read and write if error to encourage 00573 // apps to actually do something about it 00574 return usrMask; 00575 } 00576 00577 static inline unsigned long 00578 CvtUsrToSysMask(unsigned short usrMask) 00579 { 00580 unsigned long sysMask = 0; 00581 if ( usrMask & FPEM_Read ) 00582 sysMask |= EPOLLIN; 00583 if ( usrMask & FPEM_Write ) 00584 sysMask |= EPOLLOUT; 00585 if ( usrMask & FPEM_Edge ) 00586 sysMask |= EPOLLET; 00587 return sysMask; 00588 } 00589 00590 FdPollItemHandle 00591 FdPollImplEpoll::addPollItem(Socket fd, FdPollEventMask newMask, FdPollItemIf *item) 00592 { 00593 assert(fd>=0); 00594 //DebugLog(<<"adding epoll item fd="<<fd); 00595 if (mItems.size() <= (unsigned)fd) 00596 { 00597 unsigned newsz = fd+1; 00598 newsz += newsz/3; // plus 30% margin 00599 // WATCHOUT: below may trigger re-allocation, invalidating any iters 00600 // Currently only iterator is destructor, so should be safe 00601 mItems.resize(newsz); 00602 } 00603 FdPollItemIf *olditem = mItems[fd]; 00604 assert(olditem == NULL); // what is right thing to do? 00605 mItems[fd] = item; 00606 struct epoll_event ev; 00607 memset(&ev, 0, sizeof(ev)); // make valgrind happy 00608 ev.events = CvtUsrToSysMask(newMask); 00609 ev.data.fd = fd; 00610 if (epoll_ctl(mEPollFd, EPOLL_CTL_ADD, fd, &ev) < 0) 00611 { 00612 CritLog(<<"epoll_ctl(ADD) failed: " << strerror(errno)); 00613 abort(); 00614 } 00615 return IMPL_EPOLL_FdToHandle(fd); 00616 } 00617 00618 void 00619 FdPollImplEpoll::modPollItem(const FdPollItemHandle handle, FdPollEventMask newMask) 00620 { 00621 int fd = IMPL_EPOLL_HandleToFd(handle); 00622 assert(fd>=0 && ((unsigned)fd) < mItems.size()); 00623 assert(mItems[fd] != NULL); 00624 00625 struct epoll_event ev; 00626 memset(&ev, 0, sizeof(ev)); // make valgrind happy 00627 ev.events = CvtUsrToSysMask(newMask); 00628 ev.data.fd = fd; 00629 if (epoll_ctl(mEPollFd, EPOLL_CTL_MOD, fd, &ev) < 0) 00630 { 00631 CritLog(<<"epoll_ctl(MOD) failed: "<<strerror(errno)); 00632 abort(); 00633 } 00634 } 00635 00636 void 00637 FdPollImplEpoll::delPollItem(FdPollItemHandle handle) 00638 { 00639 int fd = IMPL_EPOLL_HandleToFd(handle); 00640 //DebugLog(<<"deleting epoll item fd="<<fd); 00641 assert(fd>=0 && ((unsigned)fd) < mItems.size()); 00642 assert( mItems[fd] != NULL ); 00643 mItems[fd] = NULL; 00644 if (epoll_ctl(mEPollFd, EPOLL_CTL_DEL, fd, NULL) < 0) 00645 { 00646 CritLog(<<"epoll_ctl(DEL) fd="<<fd<<" failed: " << strerror(errno)); 00647 abort(); 00648 } 00649 killCache(fd); 00650 } 00651 00652 void 00653 FdPollImplEpoll::registerFdSetIOObserver(FdSetIOObserver& observer) 00654 { 00655 // .bwc. Could make this sorted. Probably not worth the trouble. 00656 mFdSetObservers.push_back(&observer); 00657 } 00658 00659 void 00660 FdPollImplEpoll::unregisterFdSetIOObserver(FdSetIOObserver& observer) 00661 { 00662 // .bwc. Could make this sorted. Probably not worth the trouble. 00663 for(std::vector<FdSetIOObserver*>::iterator o=mFdSetObservers.begin(); 00664 o!=mFdSetObservers.end();++o) 00665 { 00666 if(*o==&observer) 00667 { 00668 mFdSetObservers.erase(o); 00669 return; 00670 } 00671 } 00672 } 00673 00674 00698 void 00699 FdPollImplEpoll::killCache(int fd) 00700 { 00701 int ne; 00702 for (ne=mEvCacheCur; ne < mEvCacheLen; ne++) 00703 { 00704 if ( mEvCache[ne].data.fd == fd ) 00705 { 00706 mEvCache[ne].data.fd = INVALID_SOCKET; 00707 } 00708 } 00709 } 00710 00711 00712 bool 00713 FdPollImplEpoll::waitAndProcess(int ms) 00714 { 00715 bool didSomething = false; 00716 int waitMs = ms; 00717 assert( mEvCache.size() > 0 ); 00718 00719 if(!mFdSetObservers.empty()) 00720 { 00721 if(ms < 0) 00722 { 00723 ms=INT_MAX; 00724 waitMs=INT_MAX; 00725 } 00726 00727 // Warning; big fat hack. This is likely to be a tad inefficient, and this 00728 // is why we want to move away from FdSetIOObserver, at least in 00729 // conjunction with stuff that uses epoll. The only holdout right now is 00730 // the cares DNS code. 00731 // Also, a fair bit of duplicated code here. 00732 00733 FdSet fdset; 00734 buildFdSet(fdset); // add our epoll fd, and fds from mFdSetObservers 00735 00736 for(std::vector<FdSetIOObserver*>::iterator o=mFdSetObservers.begin(); 00737 o!=mFdSetObservers.end();++o) 00738 { 00739 ms = resipMin((unsigned int)ms, (*o)->getTimeTillNextProcessMS()); 00740 } 00741 00742 // Avoid waiting too much; this ends up overcompensating unless the 00743 // select() times out, but it is better than just setting to 0. We could 00744 // record the time taken by the select() call, but this would be more 00745 // expensive. 00746 waitMs -= ms; 00747 00748 int numReady = fdset.selectMilliSeconds(ms); 00749 00750 // Should we still do this? If our epoll fd is not marked ready, should we 00751 // do the epoll_wait below? I want to say no... 00752 if ( numReady < 0 ) 00753 { 00754 int err = getErrno(); 00755 if ( err!=EINTR ) 00756 { 00757 CritLog(<<"select() failed: "<<strerror(err)); 00758 assert(0); // .kw. not sure correct behavior... 00759 } 00760 return false; 00761 } 00762 if ( numReady==0 ) 00763 return false; // timer expired 00764 00765 didSomething |= processFdSet(fdset); 00766 } 00767 00768 didSomething |= epollWait(waitMs); 00769 return didSomething; 00770 } 00771 00772 void 00773 FdPollImplEpoll::buildFdSet(FdSet& fdset) 00774 { 00775 int fd = getEPollFd(); 00776 if (fd != -1) 00777 { 00778 fdset.setRead(fd); 00779 } 00780 for(std::vector<FdSetIOObserver*>::iterator o=mFdSetObservers.begin(); 00781 o!=mFdSetObservers.end();++o) 00782 { 00783 (*o)->buildFdSet(fdset); 00784 } 00785 } 00786 00787 bool 00788 FdPollImplEpoll::processFdSet(FdSet& fdset) 00789 { 00790 bool didsomething=false; 00791 for(std::vector<FdSetIOObserver*>::iterator o=mFdSetObservers.begin(); 00792 o!=mFdSetObservers.end();++o) 00793 { 00794 // This is not strictly correct; we do not know if this observer 00795 // actually put any FDs in the set, or if any of these FDs ended up 00796 // being ready. 00797 // Eventually, it would be nice to have process() return whether any 00798 // actual IO was performed. 00799 didsomething=true; 00800 (*o)->process(fdset); 00801 } 00802 00803 int fd = getEPollFd(); 00804 if (fd !=- 1 && fdset.readyToRead(fd)) 00805 { 00806 epollWait(0); 00807 } 00808 return didsomething; 00809 } 00810 00811 bool 00812 FdPollImplEpoll::epollWait(int waitMs) 00813 { 00814 bool maybeMore; 00815 bool didsomething=false; 00816 do 00817 { 00818 int nfds = epoll_wait(mEPollFd, &mEvCache.front(), mEvCache.size(), waitMs); 00819 if (nfds < 0) 00820 { 00821 if (errno==EINTR) 00822 { 00823 // signal handler (like alarm) broke loop. generally ok 00824 DebugLog(<<"epoll_wait() broken by EINTR"); 00825 nfds = 0; // clean-up and return. could add return code 00826 // to indicate this, but not needed by us 00827 } 00828 else 00829 { 00830 CritLog(<<"epoll_wait() failed: " << strerror(errno)); 00831 abort(); // TBD: just throw instead? 00832 } 00833 } 00834 waitMs = 0; // don't wait anymore 00835 mEvCacheLen = nfds; // for killCache() 00836 maybeMore = ( ((unsigned)nfds)==mEvCache.size()) ? 1 : 0; 00837 int ne; 00838 for (ne=0; ne < nfds; ne++) 00839 { 00840 int fd = mEvCache[ne].data.fd; 00841 if (fd == INVALID_SOCKET) 00842 { 00843 continue; // was killed by killCache() 00844 } 00845 int sysEvtMask = mEvCache[ne].events; 00846 assert(fd>=0 && fd < (int)mItems.size()); 00847 FdPollItemIf *item = mItems[fd]; 00848 if (item == NULL) 00849 { 00850 /* this can happen if item was deleted after 00851 * event was generated in kernel, etc. */ 00852 continue; 00853 } 00854 mEvCacheCur = ne; // for killCache() 00855 processItem(item, CvtSysToUsrMask(sysEvtMask)); 00856 item = NULL; // WATCHOUT: item may not exist anymore 00857 didsomething = true; 00858 } 00859 mEvCacheLen = 0; 00860 } while (maybeMore); 00861 return didsomething; 00862 } 00863 00864 #endif // RESIP_POLL_IMPL_EPOLL 00865 00866 /***************************************************************** 00867 * 00868 * Factory 00869 * 00870 *****************************************************************/ 00871 00872 /*static*/FdPollGrp* 00873 FdPollGrp::create(const char *implName) 00874 { 00875 if ( implName==0 || implName[0]==0 || strcmp(implName,"event")==0 ) 00876 implName = 0; // pick the first (best) one supported 00877 #ifdef RESIP_POLL_IMPL_EPOLL 00878 if ( implName==0 || strcmp(implName,"epoll")==0 ) 00879 { 00880 return new FdPollImplEpoll(); 00881 } 00882 #endif 00883 if ( implName==0 || strcmp(implName,"fdset")==0 ) 00884 { 00885 return new FdPollImplFdSet(); 00886 } 00887 assert(0); 00888 return NULL; 00889 } 00890 00891 /*static*/const char* 00892 FdPollGrp::getImplList() 00893 { 00894 // .kw. this isn't really scalable approach if we get a lot of impls 00895 // but it works for now 00896 #ifdef RESIP_POLL_IMPL_EPOLL 00897 return "event|epoll|fdset"; 00898 #else 00899 return "event|fdset"; 00900 #endif 00901 } 00902 00903 /* ==================================================================== 00904 * The Vovida Software License, Version 1.0 00905 * 00906 * Copyright (c) 2000-2005 Jacob Butcher 00907 * 00908 * Redistribution and use in source and binary forms, with or without 00909 * modification, are permitted provided that the following conditions 00910 * are met: 00911 * 00912 * 1. Redistributions of source code must retain the above copyright 00913 * notice, this list of conditions and the following disclaimer. 00914 * 00915 * 2. Redistributions in binary form must reproduce the above copyright 00916 * notice, this list of conditions and the following disclaimer in 00917 * the documentation and/or other materials provided with the 00918 * distribution. 00919 * 00920 * 3. The names "VOCAL", "Vovida Open Communication Application Library", 00921 * and "Vovida Open Communication Application Library (VOCAL)" must 00922 * not be used to endorse or promote products derived from this 00923 * software without prior written permission. For written 00924 * permission, please contact vocal@vovida.org. 00925 * 00926 * 4. Products derived from this software may not be called "VOCAL", nor 00927 * may "VOCAL" appear in their name, without prior written 00928 * permission of Vovida Networks, Inc. 00929 * 00930 * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED 00931 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 00932 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND 00933 * NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA 00934 * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES 00935 * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL, 00936 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00937 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00938 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 00939 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00940 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE 00941 * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 00942 * DAMAGE. 00943 * 00944 * vi: set shiftwidth=3 expandtab: 00945 */
1.7.5.1