reSIProcate/repro  9694
BerkeleyDb.cxx
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include "config.h"
00003 #endif
00004 
00005 #include <fcntl.h>
00006 #include <cassert>
00007 #include <cstdlib>
00008 
00009 #include "rutil/Data.hxx"
00010 #include "rutil/DataStream.hxx"
00011 #include "rutil/Logger.hxx"
00012 
00013 #include "repro/AbstractDb.hxx"
00014 #include "repro/BerkeleyDb.hxx"
00015 #include "rutil/WinLeakCheck.hxx"
00016 
00017 using namespace resip;
00018 using namespace repro;
00019 using namespace std;
00020 
00021 #define RESIPROCATE_SUBSYSTEM Subsystem::REPRO
00022 //#define USE_DBENV   // Required for transaction support
00023 
00024 BerkeleyDb::BerkeleyDb()
00025 {
00026    init(Data::Empty, Data::Empty);
00027 }
00028 
00029 
00030 BerkeleyDb::BerkeleyDb( const Data& dbPath, const Data& dbName )
00031 {
00032    init(dbPath, dbName);
00033 }
00034 
00035 
00036 void
00037 BerkeleyDb::init( const Data& dbPath, const Data& dbName )
00038 { 
00039    Data filePath(dbPath);
00040 
00041    // An empty path is how you specify the current working directory as a path
00042    if ( !filePath.empty() )
00043    {
00044 #ifdef WIN32
00045       filePath += '\\';
00046 #else
00047       filePath += '/';
00048 #endif
00049    }
00050 
00051    if ( dbName.empty() )
00052    {
00053       DebugLog( << "No BerkeleyDb prefix specified - using default" );
00054       filePath += "repro";
00055    }
00056    else
00057    {
00058       filePath += dbName;
00059    }
00060 
00061    InfoLog( << "Using BerkeleyDb prefixed with " << filePath );
00062 
00063    mSane = true;
00064 
00065    // Create Environment
00066    int ret;
00067 #ifdef USE_DBENV
00068    mEnv = new DbEnv(DB_CXX_NO_EXCEPTIONS);
00069    assert(mEnv);
00070    ret = mEnv->open(0, DB_CREATE |     // If the env does not exist, then create it
00071                        DB_INIT_LOCK |  // Initialize Locking (needed for transactions)
00072                        DB_INIT_LOG |   // Initialize Logging (needed for transactions)
00073                        DB_INIT_MPOOL | // Initialize the cache (needed for transactions)
00074                        DB_INIT_TXN |   // Initialize transactions
00075                        DB_RECOVER |    // Run normal recovery
00076                        DB_THREAD,      // Free-thread the env handle
00077                        0 /* mode */);
00078    if(ret != 0)
00079    {
00080       ErrLog( <<"Could not open environment: " << db_strerror(ret));
00081       mSane = false;
00082       return;
00083    }
00084    mEnv->txn_checkpoint(0, 0, 0);  // Note:  a checkpoint is run when this last is created and when it is destroyed
00085 #else
00086    mEnv = 0;
00087 #endif
00088 
00089    bool enableTransactions = false;
00090    bool secondaryIndex = false;
00091    Data secondaryFileName;
00092    for (int i=0;i<MaxTable;i++)
00093    {
00094       enableTransactions = false;
00095       // if the line bellow seems wrong, you need to check which version 
00096       // of db you have - it is likely an very out of date version 
00097       // still trying to figure this out so email fluffy if you have 
00098       // problems and include your version the DB_VERSION_STRING found 
00099       // in your db4/db.h file. 
00100       Data fileName( filePath );
00101       switch (i)
00102       {
00103          case UserTable:
00104             fileName += "_user"; break;
00105          case RouteTable:
00106             fileName += "_route"; break;
00107          case AclTable:
00108             fileName += "_acl"; break;
00109          case ConfigTable:
00110             fileName += "_config"; break;
00111          case StaticRegTable:
00112             fileName += "_staticreg"; break;
00113          case FilterTable:
00114             fileName += "_filter"; break;
00115          case SiloTable:
00116             fileName += "_silo"; 
00117             enableTransactions = true;
00118             secondaryIndex = true;
00119             break;
00120          default:
00121             assert(0);
00122       }
00123 
00124       if(!secondaryIndex)
00125       {
00126          fileName += ".db";
00127       }
00128       else
00129       {
00130          secondaryFileName = fileName;
00131          fileName += ".db";
00132          secondaryFileName += "_idx1.db";
00133       }
00134 
00135       mTableInfo[i].mDb = new Db(mEnv, DB_CXX_NO_EXCEPTIONS);
00136       assert(mTableInfo[i].mDb);
00137       
00138       DebugLog( << "About to open Berkeley DB: " << fileName );
00139       ret = mTableInfo[i].mDb->open(0,
00140                          fileName.c_str(),
00141                          0,
00142                          DB_BTREE,
00143 #ifdef USE_DBENV
00144                          DB_CREATE | DB_THREAD | (enableTransactions ? DB_AUTO_COMMIT : 0),
00145 #else
00146                          DB_CREATE | DB_THREAD,
00147 #endif
00148                          0);
00149       if(ret != 0)
00150       {
00151          ErrLog( <<"Could not open database " << fileName << ": " << db_strerror(ret));
00152          mSane = false;
00153          return;
00154       }
00155 
00156       // Open a cursor on the database
00157       ret = mTableInfo[i].mDb->cursor(0, &mTableInfo[i].mCursor, 0);
00158       if(ret != 0)
00159       {
00160          ErrLog( <<"Could not cursor on database " << fileName << ": " << db_strerror(ret));
00161          mSane = false;
00162          return;
00163       }
00164       assert(mTableInfo[i].mCursor);
00165 
00166       DebugLog( << "Opened Berkeley DB: " << fileName );
00167 
00168 
00169       if(secondaryIndex)
00170       {
00171          mTableInfo[i].mSecondaryDb = new Db(mEnv, DB_CXX_NO_EXCEPTIONS);
00172          assert(mTableInfo[i].mSecondaryDb);
00173 
00174          ret = mTableInfo[i].mSecondaryDb->set_flags(DB_DUP);
00175          if(ret!=0)
00176          {
00177             ErrLog( <<"Could not set database " << secondaryFileName << " to allow duplicates: " << db_strerror(ret));
00178             mSane = false;
00179             return;
00180          }
00181       
00182          DebugLog( << "About to open secondary Berkeley DB: " << secondaryFileName );
00183          ret = mTableInfo[i].mSecondaryDb->open(0,
00184                             secondaryFileName.c_str(),
00185                             0,
00186                             DB_BTREE,
00187 #ifdef USE_DBENV
00188                             DB_CREATE | DB_THREAD | (enableTransactions ? DB_AUTO_COMMIT : 0),
00189 #else
00190                             DB_CREATE | DB_THREAD,
00191 #endif
00192                             0);
00193          if(ret != 0)
00194          {
00195             ErrLog( <<"Could not open secondary database " << secondaryFileName << ": " << db_strerror(ret));
00196             mSane = false;
00197             return;
00198          }
00199 
00200          // Associate Secondary Database with Primary
00201          mTableInfo[i].mSecondaryDb->set_app_private(this);  // retrievable from callback so we can have access to this BerkeleyDb instance
00202          ret = mTableInfo[i].mDb->associate(0, mTableInfo[i].mSecondaryDb, &getSecondaryKeyCallback, 0 /* flags */);
00203          if(ret != 0)
00204          {
00205             ErrLog( <<"Could not associate secondary database " << secondaryFileName << ": " << db_strerror(ret));
00206             mSane = false;
00207             return;
00208          }
00209          DebugLog( << "Opened secondary Berkeley DB: " << secondaryFileName );
00210 
00211          ret = mTableInfo[i].mSecondaryDb->cursor(0, &mTableInfo[i].mSecondaryCursor, 0);
00212          if(ret != 0)
00213          {
00214             ErrLog( <<"Could not secondary cursor on database " << secondaryFileName << ": " << db_strerror(ret));
00215             mSane = false;
00216             return;
00217          }
00218          assert(mTableInfo[i].mSecondaryCursor);
00219       }
00220    }
00221 }
00222 
00223 
00224 BerkeleyDb::~BerkeleyDb()
00225 {  
00226    for (int i=0;i<MaxTable;i++)
00227    {
00228       if(mTableInfo[i].mSecondaryCursor)
00229       {
00230          mTableInfo[i].mSecondaryCursor->close();
00231          mTableInfo[i].mSecondaryCursor = 0;
00232       }
00233 
00234       if(mTableInfo[i].mCursor)
00235       {
00236          mTableInfo[i].mCursor->close();
00237          mTableInfo[i].mCursor = 0;
00238       }
00239       
00240       if(mTableInfo[i].mTransaction)
00241       {
00242          dbRollbackTransaction((Table)i);
00243       }
00244 
00245       // Secondary DB should be closed before primary
00246       if(mTableInfo[i].mSecondaryDb)
00247       {
00248          mTableInfo[i].mSecondaryDb->close(0);
00249          delete mTableInfo[i].mSecondaryDb; 
00250          mTableInfo[i].mSecondaryDb = 0;
00251       }
00252 
00253       if(mTableInfo[i].mDb)
00254       {
00255          mTableInfo[i].mDb->close(0);
00256          delete mTableInfo[i].mDb; 
00257          mTableInfo[i].mDb = 0;
00258       }
00259    }
00260    if(mEnv)
00261    {
00262       mEnv->txn_checkpoint(0, 0, 0);  // Note:  a checkpoint is run when this last is created and when it is destroyed
00263       delete mEnv;
00264    }
00265 }
00266 
00267 
00268 int 
00269 BerkeleyDb::getSecondaryKeyCallback(Db *db, const Dbt *pkey, const Dbt *pdata, Dbt *skey)
00270 {
00271    BerkeleyDb* bdb = (BerkeleyDb*)db->get_app_private();
00272 
00273    // Find associated table using db pointer
00274    Table table = MaxTable;
00275    for (int i=MaxTable-1; i >= 0; i--)  // search backwards, since tables at the end have the secondary indexes
00276    {
00277       if(bdb->mTableInfo[i].mSecondaryDb == db)
00278       {
00279          table = (Table)i;
00280          break;
00281       }
00282    }
00283    assert(table != MaxTable);
00284 
00285    Data primaryKey(Data::Share, reinterpret_cast<const char*>(pkey->get_data()), pkey->get_size());
00286    Data primaryData(Data::Share, reinterpret_cast<const char*>(pdata->get_data()), pdata->get_size());
00287    void* secondaryKey;
00288    unsigned int secondaryKeyLen;
00289    int rc = bdb->getSecondaryKey(table, primaryKey, primaryData, &secondaryKey, &secondaryKeyLen);
00290    skey->set_data(secondaryKey);
00291    skey->set_size(secondaryKeyLen);
00292    return rc;
00293 }
00294 
00295 
00296 bool
00297 BerkeleyDb::dbWriteRecord(const Table table, 
00298                           const resip::Data& pKey, 
00299                           const resip::Data& pData )
00300 {
00301    Dbt key((void*)pKey.data(), (::u_int32_t)pKey.size());
00302    Dbt data((void*)pData.data(), (::u_int32_t)pData.size());
00303    int ret;
00304    
00305    assert(mTableInfo[table].mDb);
00306    ret = mTableInfo[table].mDb->put(mTableInfo[table].mTransaction, &key, &data, 0);
00307 
00308    if(ret == 0 && mTableInfo[table].mTransaction == 0)
00309    {
00310       // If we are in a transaction, then it will sync on commit
00311       mTableInfo[table].mDb->sync(0);
00312       if(mTableInfo[table].mSecondaryDb)
00313       {
00314          mTableInfo[table].mSecondaryDb->sync(0);
00315       }
00316    }
00317    return ret == 0;
00318 }
00319 
00320 
00321 bool 
00322 BerkeleyDb::dbReadRecord(const Table table, 
00323                          const resip::Data& pKey, 
00324                          resip::Data& pData) const
00325 { 
00326    Dbt key((void*)pKey.data(), (::u_int32_t)pKey.size());
00327    Dbt data;
00328    data.set_flags(DB_DBT_MALLOC);  // required for DB_THREAD flag use
00329 
00330    int ret;
00331    
00332    assert(mTableInfo[table].mDb);
00333    ret = mTableInfo[table].mDb->get(mTableInfo[table].mTransaction, &key, &data, 0);
00334 
00335    if (ret == DB_NOTFOUND)
00336    {
00337       // key not found 
00338       if (data.get_data())
00339       {
00340          free(data.get_data());
00341       }
00342       return false;
00343    }
00344    assert(ret != DB_KEYEMPTY);
00345    assert(ret == 0);
00346    pData.copy(reinterpret_cast<const char*>(data.get_data()), data.get_size());
00347    if (data.get_data())
00348    {
00349       free(data.get_data());
00350    }
00351    if(pData.empty())
00352    {
00353       // this should never happen
00354       return false;
00355    }
00356 
00357    return true;
00358 }
00359 
00360 
00361 void 
00362 BerkeleyDb::dbEraseRecord(const Table table,
00363                           const resip::Data& pKey,
00364                           bool isSecondaryKey) // allows deleting records from a table that supports secondary keying using a secondary key
00365 { 
00366    Dbt key((void*) pKey.data(), (::u_int32_t)pKey.size());
00367 
00368    Db* db = mTableInfo[table].mDb;
00369    if(isSecondaryKey && mTableInfo[table].mSecondaryDb)
00370    {
00371       db = mTableInfo[table].mSecondaryDb;
00372    }
00373    assert(db);
00374    db->del(mTableInfo[table].mTransaction, &key, 0);
00375    if(mTableInfo[table].mTransaction == 0)
00376    {
00377       // If we are in a transaction, then it will sync on commit
00378       mTableInfo[table].mDb->sync(0);
00379       if(mTableInfo[table].mSecondaryDb)
00380       {
00381          mTableInfo[table].mSecondaryDb->sync(0);
00382       }
00383    }
00384 }
00385 
00386 
00387 resip::Data 
00388 BerkeleyDb::dbNextKey(const Table table, 
00389                       bool first)
00390 { 
00391    Dbt key, data;
00392    int ret;
00393    
00394    assert(mTableInfo[table].mDb);
00395    ret = mTableInfo[table].mCursor->get(&key, &data, first ? DB_FIRST : DB_NEXT);
00396    if (ret == DB_NOTFOUND)
00397    {
00398       return Data::Empty;
00399    }
00400    assert(ret == 0);
00401    
00402    Data d(Data::Share, reinterpret_cast<const char*>(key.get_data()), key.get_size());
00403    return d;
00404 }
00405 
00406 
00407 bool 
00408 BerkeleyDb::dbNextRecord(const Table table,
00409                          const resip::Data& key,
00410                          resip::Data& data,
00411                          bool forUpdate,  // specifies to use DB_RMW flag to write lock reads
00412                          bool first)
00413 {
00414    Dbt dbkey((void*) key.data(), (::u_int32_t)key.size());
00415    Dbt dbdata;
00416    int ret;
00417 
00418    assert(mTableInfo[table].mSecondaryCursor);
00419    if(mTableInfo[table].mSecondaryCursor == 0)
00420    {
00421       // Iterating across multiple records with a common key is only 
00422       // supported on Seconday databases where duplicate keys exist
00423       return false;
00424    }
00425 
00426    unsigned int flags = 0;
00427    if(key.empty())
00428    {
00429       flags = first ? DB_FIRST : DB_NEXT;
00430    }
00431    else
00432    {
00433       flags = first ? DB_SET : DB_NEXT_DUP;
00434    }
00435 
00436 #ifdef USE_DBENV
00437    if(forUpdate)
00438    {
00439       flags |= DB_RMW;
00440    }
00441 #endif
00442 
00443    ret = mTableInfo[table].mSecondaryCursor->get(&dbkey, &dbdata, flags);
00444    if (ret == DB_NOTFOUND)
00445    {
00446       return false;
00447    }
00448    assert(ret == 0);
00449    data.copy(reinterpret_cast<const char*>(dbdata.get_data()), dbdata.get_size());
00450 
00451    return true;
00452 }
00453 
00454 bool 
00455 BerkeleyDb::dbBeginTransaction(const Table table)
00456 {
00457 #ifdef USE_DBENV
00458    // For now - we support transactions on the primary table only
00459    assert(mDb);
00460    assert(mTableInfo[table].mTransaction == 0);
00461    int ret = mTableInfo[table].mDb->get_env()->txn_begin(0 /* parent trans*/, &mTableInfo[table].mTransaction, 0);
00462    if(ret != 0)
00463    {
00464       ErrLog( <<"Could not begin transaction: " << db_strerror(ret));
00465       return false;
00466    }
00467 
00468    // Open new Cursors - since cursors used in a transaction must be opened and closed within the transation
00469    if(mTableInfo[table].mCursor)
00470    {
00471       mTableInfo[table].mCursor->close();
00472       mTableInfo[table].mCursor = 0;
00473    }
00474 
00475    ret = mTableInfo[table].mDb->cursor(mTableInfo[table].mTransaction, &mTableInfo[table].mCursor, 0);
00476    if(ret != 0)
00477    {
00478       ErrLog( <<"Could not open cursor for transaction: " << db_strerror(ret));
00479    }
00480 #endif
00481 
00482    return true;
00483 }
00484 
00485 bool 
00486 BerkeleyDb::dbCommitTransaction(const Table table)
00487 {
00488    bool success = true;
00489 #ifdef USE_DBENV
00490    assert(mDb);
00491    assert(mTableInfo[table].mTransaction);
00492 
00493    // Close the cursor - since cursors used in a transaction must be opened and closed within the transation
00494    if(mTableInfo[table].mCursor)
00495    {
00496       mTableInfo[table].mCursor->close();
00497       mTableInfo[table].mCursor = 0;
00498    }
00499 
00500    int ret = mTableInfo[table].mTransaction->commit(0);
00501    mTableInfo[table].mTransaction = 0;
00502    if(ret != 0)
00503    {
00504       ErrLog( <<"Could not commit transaction: " << db_strerror(ret));
00505       success = false;
00506    }
00507 
00508    // Reopen a cursor for general use
00509    mTableInfo[table].mDb->cursor(0, &mTableInfo[table].mCursor, 0);
00510 #endif
00511 
00512    return success;
00513 }
00514 
00515 bool 
00516 BerkeleyDb::dbRollbackTransaction(const Table table)
00517 {
00518    bool success = true;
00519 #ifdef USE_DBENV
00520    assert(mDb);
00521    assert(mTableInfo[table].mTransaction);
00522 
00523    // Close the cursor - since cursors used in a transaction must be opened and closed within the transation
00524    if(mTableInfo[table].mCursor)
00525    {
00526       mTableInfo[table].mCursor->close();
00527       mTableInfo[table].mCursor = 0;
00528    }
00529 
00530    int ret = mTableInfo[table].mTransaction->abort();
00531    mTableInfo[table].mTransaction = 0;
00532    if(ret != 0)
00533    {
00534       success = false;
00535    }
00536 
00537    // Reopen a cursor for general use
00538    mTableInfo[table].mDb->cursor(0, &mTableInfo[table].mCursor, 0);
00539 #endif
00540 
00541    return success;
00542 }
00543 
00544 
00545 /* ====================================================================
00546  * The Vovida Software License, Version 1.0 
00547  * 
00548  * Copyright (c) 2000 Vovida Networks, Inc.  All rights reserved.
00549  * 
00550  * Redistribution and use in source and binary forms, with or without
00551  * modification, are permitted provided that the following conditions
00552  * are met:
00553  * 
00554  * 1. Redistributions of source code must retain the above copyright
00555  *    notice, this list of conditions and the following disclaimer.
00556  * 
00557  * 2. Redistributions in binary form must reproduce the above copyright
00558  *    notice, this list of conditions and the following disclaimer in
00559  *    the documentation and/or other materials provided with the
00560  *    distribution.
00561  * 
00562  * 3. The names "VOCAL", "Vovida Open Communication Application Library",
00563  *    and "Vovida Open Communication Application Library (VOCAL)" must
00564  *    not be used to endorse or promote products derived from this
00565  *    software without prior written permission. For written
00566  *    permission, please contact vocal@vovida.org.
00567  *
00568  * 4. Products derived from this software may not be called "VOCAL", nor
00569  *    may "VOCAL" appear in their name, without prior written
00570  *    permission of Vovida Networks, Inc.
00571  * 
00572  * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
00573  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
00574  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
00575  * NON-INFRINGEMENT ARE DISCLAIMED.  IN NO EVENT SHALL VOVIDA
00576  * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
00577  * IN EXCESS OF $1,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
00578  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00579  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00580  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
00581  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00582  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
00583  * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
00584  * DAMAGE.
00585  * 
00586  * ====================================================================
00587  */