|
reSIProcate/repro
9694
|
00001 #include <cassert> 00002 #include <sstream> 00003 00004 #include <resip/stack/Symbols.hxx> 00005 #include <resip/stack/Tuple.hxx> 00006 #include <rutil/Data.hxx> 00007 #include <rutil/DnsUtil.hxx> 00008 #include <rutil/Logger.hxx> 00009 #include <rutil/ParseBuffer.hxx> 00010 #include <rutil/Socket.hxx> 00011 #include <rutil/TransportType.hxx> 00012 #include <rutil/Timer.hxx> 00013 00014 #include "repro/RegSyncClient.hxx" 00015 #include "repro/RegSyncServer.hxx" 00016 00017 using namespace repro; 00018 using namespace resip; 00019 using namespace std; 00020 00021 #define RESIPROCATE_SUBSYSTEM Subsystem::REPRO 00022 00023 RegSyncClient::RegSyncClient(InMemorySyncRegDb* regDb, 00024 Data address, 00025 unsigned short port) : 00026 mRegDb(regDb), 00027 mAddress(address), 00028 mPort(port), 00029 mSocketDesc(0) 00030 { 00031 assert(mRegDb); 00032 } 00033 00034 void 00035 RegSyncClient::delaySeconds(unsigned int seconds) 00036 { 00037 // Delay for requested number of seconds - but check every second if we are shutdown or not 00038 for(unsigned int i = 0; i < seconds && !mShutdown; i++) 00039 { 00040 #ifdef WIN32 00041 Sleep(1000); 00042 #else 00043 sleep(1); 00044 #endif 00045 } 00046 } 00047 00048 void 00049 RegSyncClient::shutdown() 00050 { 00051 ThreadIf::shutdown(); 00052 if(mSocketDesc) 00053 { 00054 #ifdef WIN32 00055 closesocket(mSocketDesc); 00056 mSocketDesc = 0; 00057 #else 00058 ::shutdown(mSocketDesc, SHUT_RDWR); 00059 #endif 00060 } 00061 } 00062 00063 void 00064 RegSyncClient::thread() 00065 { 00066 int rc; 00067 00068 addrinfo* results; 00069 addrinfo hint; 00070 memset(&hint, 0, sizeof(hint)); 00071 hint.ai_family = AF_UNSPEC; 00072 hint.ai_flags = AI_PASSIVE; 00073 hint.ai_socktype = SOCK_STREAM; 00074 00075 rc = getaddrinfo(mAddress.c_str(), 0, &hint, &results); 00076 if(rc != 0) 00077 { 00078 ErrLog(<< "RegSyncClient: unknown host " << mAddress); 00079 return; 00080 } 00081 00082 // Use first address resolved if there are more than one. 00083 Tuple servAddr(*results->ai_addr, TCP); 00084 servAddr.setPort(mPort); 00085 Tuple localAddr(Data::Empty /* all interfaces */, 0, servAddr.ipVersion(), TCP); 00086 //InfoLog(<< "**********" << servAddr << " " << localAddr << " " << localAddr.isAnyInterface()); 00087 00088 freeaddrinfo(results); 00089 00090 while(!mShutdown) 00091 { 00092 // Create TCP Socket 00093 mSocketDesc = (int)socket(servAddr.ipVersion() == V6 ? PF_INET6 : PF_INET , SOCK_STREAM, 0); 00094 if(mSocketDesc < 0) 00095 { 00096 ErrLog(<< "RegSyncClient: cannot open socket"); 00097 mSocketDesc = 0; 00098 return; 00099 } 00100 00101 // bind to any local interface/port 00102 rc = ::bind(mSocketDesc, &localAddr.getMutableSockaddr(), localAddr.length()); 00103 if(rc < 0) 00104 { 00105 ErrLog(<<"RegSyncClient: error binding locally"); 00106 closeSocket(mSocketDesc); 00107 mSocketDesc = 0; 00108 return; 00109 } 00110 00111 // Connect to server 00112 rc = ::connect(mSocketDesc, &servAddr.getMutableSockaddr(), servAddr.length()); 00113 if(rc < 0) 00114 { 00115 if(!mShutdown) ErrLog(<< "RegSyncClient: error connecting to " << mAddress << ":" << mPort); 00116 closeSocket(mSocketDesc); 00117 mSocketDesc = 0; 00118 delaySeconds(30); 00119 continue; 00120 } 00121 00122 Data request( 00123 "<InitialSync>\r\n" 00124 " <Request>\r\n" 00125 " <Version>" + Data(REGSYNC_VERSION) + "</Version>\r\n" // For use in detecting if client/server are a compatible version 00126 " </Request>\r\n" 00127 "</InitialSync>\r\n"); 00128 rc = ::send(mSocketDesc, request.c_str(), (int)request.size(), 0); 00129 if(rc < 0) 00130 { 00131 if(!mShutdown) ErrLog(<< "RegSyncClient: error sending"); 00132 closeSocket(mSocketDesc); 00133 mSocketDesc = 0; 00134 continue; 00135 } 00136 00137 while(rc > 0) 00138 { 00139 rc = ::recv(mSocketDesc, (char*)&mRxBuffer, sizeof(mRxBuffer), 0); 00140 if(rc < 0) 00141 { 00142 if(!mShutdown) ErrLog(<< "RegSyncClient: error receiving"); 00143 closeSocket(mSocketDesc); 00144 mSocketDesc = 0; 00145 break; 00146 } 00147 00148 if(rc > 0) 00149 { 00150 mRxDataBuffer += Data(Data::Borrow, (const char*)&mRxBuffer, rc); 00151 while(tryParse()); 00152 } 00153 } 00154 } // end while 00155 00156 if(mSocketDesc) closeSocket(mSocketDesc); 00157 } 00158 00159 bool 00160 RegSyncClient::tryParse() 00161 { 00162 ParseBuffer pb(mRxDataBuffer); 00163 Data initialTag; 00164 const char* start = pb.position(); 00165 pb.skipWhitespace(); 00166 pb.skipToChar('<'); 00167 if(!pb.eof()) 00168 { 00169 pb.skipChar(); 00170 const char* anchor = pb.position(); 00171 pb.skipToChar('>'); 00172 if(!pb.eof()) 00173 { 00174 initialTag = pb.data(anchor); 00175 // Find end of initial tag 00176 pb.skipToChars("</" + initialTag + ">"); 00177 if (!pb.eof()) 00178 { 00179 pb.skipN((int)initialTag.size() + 3); // Skip past </InitialTag> 00180 handleXml(pb.data(start)); 00181 00182 // Remove processed data from RxBuffer 00183 pb.skipWhitespace(); 00184 if(!pb.eof()) 00185 { 00186 anchor = pb.position(); 00187 pb.skipToEnd(); 00188 mRxDataBuffer = pb.data(anchor); 00189 return true; 00190 } 00191 else 00192 { 00193 mRxDataBuffer.clear(); 00194 } 00195 } 00196 } 00197 } 00198 return false; 00199 } 00200 00201 void 00202 RegSyncClient::handleXml(const Data& xmlData) 00203 { 00204 //InfoLog(<< "RegSyncClient::handleXml received: " << xmlData); 00205 00206 try 00207 { 00208 ParseBuffer pb(xmlData); 00209 XMLCursor xml(pb); 00210 00211 if(isEqualNoCase(xml.getTag(), "InitialSync")) 00212 { 00213 // Must be an InitialSync response 00214 InfoLog(<< "RegSyncClient::handleXml: InitialSync complete."); 00215 } 00216 else if(isEqualNoCase(xml.getTag(), "reginfo")) 00217 { 00218 try 00219 { 00220 handleRegInfoEvent(xml); 00221 } 00222 catch(BaseException& e) 00223 { 00224 ErrLog(<< "RegSyncClient::handleXml: exception: " << e); 00225 } 00226 } 00227 else 00228 { 00229 WarningLog(<< "RegSyncClient::handleXml: Ignoring XML message with unknown method: " << xml.getTag()); 00230 } 00231 } 00232 catch(resip::BaseException& e) 00233 { 00234 WarningLog(<< "RegSyncClient::handleXml: Ignoring XML message due to ParseException: " << e); 00235 } 00236 } 00237 00238 void 00239 RegSyncClient::handleRegInfoEvent(resip::XMLCursor& xml) 00240 { 00241 UInt64 now = Timer::getTimeSecs(); 00242 Uri aor; 00243 ContactList contacts; 00244 DebugLog(<< "RegSyncClient::handleRegInfoEvent"); 00245 if(xml.firstChild()) 00246 { 00247 do 00248 { 00249 if(isEqualNoCase(xml.getTag(), "aor")) 00250 { 00251 if(xml.firstChild()) 00252 { 00253 aor = Uri(xml.getValue().xmlCharDataDecode()); 00254 xml.parent(); 00255 } 00256 //InfoLog(<< "RegSyncClient::handleRegInfoEvent: aor=" << aor); 00257 } 00258 else if(isEqualNoCase(xml.getTag(), "contactinfo")) 00259 { 00260 if(xml.firstChild()) 00261 { 00262 ContactInstanceRecord rec; 00263 do 00264 { 00265 if(isEqualNoCase(xml.getTag(), "contacturi")) 00266 { 00267 if(xml.firstChild()) 00268 { 00269 //InfoLog(<< "RegSyncClient::handleRegInfoEvent: contacturi=" << xml.getValue()); 00270 rec.mContact = NameAddr(xml.getValue().xmlCharDataDecode()); 00271 xml.parent(); 00272 } 00273 } 00274 else if(isEqualNoCase(xml.getTag(), "expires")) 00275 { 00276 if(xml.firstChild()) 00277 { 00278 //InfoLog(<< "RegSyncClient::handleRegInfoEvent: expires=" << xml.getValue()); 00279 UInt64 expires = xml.getValue().convertUInt64(); 00280 rec.mRegExpires = (expires == 0 ? 0 : now+expires); 00281 xml.parent(); 00282 } 00283 } 00284 else if(isEqualNoCase(xml.getTag(), "lastupdate")) 00285 { 00286 if(xml.firstChild()) 00287 { 00288 //InfoLog(<< "RegSyncClient::handleRegInfoEvent: lastupdate=" << xml.getValue()); 00289 rec.mLastUpdated = now-xml.getValue().convertUInt64(); 00290 xml.parent(); 00291 } 00292 } 00293 else if(isEqualNoCase(xml.getTag(), "receivedfrom")) 00294 { 00295 if(xml.firstChild()) 00296 { 00297 rec.mReceivedFrom = Tuple::makeTupleFromBinaryToken(xml.getValue().base64decode()); 00298 //InfoLog(<< "RegSyncClient::handleRegInfoEvent: receivedfrom=" << xml.getValue() << " tuple=" << rec.mReceivedFrom); 00299 xml.parent(); 00300 } 00301 } 00302 else if(isEqualNoCase(xml.getTag(), "publicaddress")) 00303 { 00304 if(xml.firstChild()) 00305 { 00306 rec.mPublicAddress = Tuple::makeTupleFromBinaryToken(xml.getValue().base64decode()); 00307 //InfoLog(<< "RegSyncClient::handleRegInfoEvent: publicaddress=" << xml.getValue() << " tuple=" << rec.mPublicAddress); 00308 xml.parent(); 00309 } 00310 } 00311 else if(isEqualNoCase(xml.getTag(), "sippath")) 00312 { 00313 if(xml.firstChild()) 00314 { 00315 //InfoLog(<< "RegSyncClient::handleRegInfoEvent: sippath=" << xml.getValue()); 00316 rec.mSipPath.push_back(NameAddr(xml.getValue().xmlCharDataDecode())); 00317 xml.parent(); 00318 } 00319 } 00320 else if(isEqualNoCase(xml.getTag(), "instance")) 00321 { 00322 if(xml.firstChild()) 00323 { 00324 //InfoLog(<< "RegSyncClient::handleRegInfoEvent: instance=" << xml.getValue()); 00325 rec.mInstance = xml.getValue().xmlCharDataDecode(); 00326 xml.parent(); 00327 } 00328 } 00329 else if(isEqualNoCase(xml.getTag(), "regid")) 00330 { 00331 if(xml.firstChild()) 00332 { 00333 //InfoLog(<< "RegSyncClient::handleRegInfoEvent: regid=" << xml.getValue()); 00334 rec.mRegId = xml.getValue().convertUnsignedLong(); 00335 xml.parent(); 00336 } 00337 } 00338 } while(xml.nextSibling()); 00339 xml.parent(); 00340 00341 // Add record to list 00342 rec.mSyncContact = true; // This ContactInstanceRecord came from registration sync process 00343 contacts.push_back(rec); 00344 } 00345 } 00346 } while(xml.nextSibling()); 00347 xml.parent(); 00348 } 00349 xml.parent(); 00350 00351 processModify(aor, contacts); 00352 } 00353 00354 void 00355 RegSyncClient::processModify(const resip::Uri& aor, ContactList& syncContacts) 00356 { 00357 ContactList currentContacts; 00358 00359 mRegDb->lockRecord(aor); 00360 mRegDb->getContacts(aor, currentContacts); 00361 00362 InfoLog(<< "RegSyncClient::processModify: for aor=" << aor << 00363 ", numSyncContacts=" << syncContacts.size() << 00364 ", numCurrentContacts=" << currentContacts.size()); 00365 00366 // Iteratate through new syncContact List 00367 ContactList::iterator itSync = syncContacts.begin(); 00368 ContactList::iterator itCurrent; 00369 bool found; 00370 for(; itSync != syncContacts.end(); itSync++) 00371 { 00372 // See if contact already exists in currentContacts 00373 found = false; 00374 for(itCurrent = currentContacts.begin(); itCurrent != currentContacts.end(); itCurrent++) 00375 { 00376 if(*itSync == *itCurrent) 00377 { 00378 found = true; 00379 // We found a match - check if sycnContacts LastUpdated time is newer 00380 if(itSync->mLastUpdated > itCurrent->mLastUpdated) 00381 { 00382 // Replace current contact with Sync contact 00383 mRegDb->updateContact(aor, *itSync); 00384 } 00385 } 00386 } 00387 if(!found) 00388 { 00389 mRegDb->updateContact(aor, *itSync); 00390 } 00391 } 00392 mRegDb->unlockRecord(aor); 00393 } 00394 00395 /* ==================================================================== 00396 * The Vovida Software License, Version 1.0 00397 * 00398 * Copyright (c) 2000 Vovida Networks, Inc. All rights reserved. 00399 * Copyright (c) 2010 SIP Spectrum, Inc. All rights reserved. 00400 * 00401 * Redistribution and use in source and binary forms, with or without 00402 * modification, are permitted provided that the following conditions 00403 * are met: 00404 * 00405 * 1. Redistributions of source code must retain the above copyright 00406 * notice, this list of conditions and the following disclaimer. 00407 * 00408 * 2. Redistributions in binary form must reproduce the above copyright 00409 * notice, this list of conditions and the following disclaimer in 00410 * the documentation and/or other materials provided with the 00411 * distribution. 00412 * 00413 * 3. The names "VOCAL", "Vovida Open Communication Application Library", 00414 * and "Vovida Open Communication Application Library (VOCAL)" must 00415 * not be used to endorse or promote products derived from this 00416 * software without prior written permission. For written 00417 * permission, please contact vocal@vovida.org. 00418 * 00419 * 4. Products derived from this software may not be called "VOCAL", nor 00420 * may "VOCAL" appear in their name, without prior written 00421 * permission of Vovida Networks, Inc. 00422 * 00423 * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED 00424 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 00425 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND 00426 * NON-INFRINGEMENT ARE DISCLAIMED. IN NO EVENT SHALL VOVIDA 00427 * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES 00428 * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL, 00429 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 00430 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 00431 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY 00432 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00433 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE 00434 * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 00435 * DAMAGE. 00436 * 00437 * ==================================================================== 00438 * 00439 * This software consists of voluntary contributions made by Vovida 00440 * Networks, Inc. and many individuals on behalf of Vovida Networks, 00441 * Inc. For more information on Vovida Networks, Inc., please see 00442 * <http://www.vovida.org/>. 00443 * 00444 */ 00445
1.7.5.1