Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

TxnGuideInMemory.cpp

00001 // File TxnGuideInMemory.cpp
00002 
00003 #include <iostream>
00004 #include <db_cxx.h>
00005 
00006 #ifdef _WIN32
00007 #include <windows.h>
00008 #define PATHD '\\'
00009 extern "C" {
00010     extern int getopt(int, char * const *, const char *);
00011     extern char *optarg;
00012 }
00013 
00014 typedef HANDLE thread_t;
00015 #define thread_create(thrp, attr, func, arg)                               \
00016     (((*(thrp) = CreateThread(NULL, 0,                                     \
00017         (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
00018 #define thread_join(thr, statusp)                                          \
00019     ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) &&            \
00020     GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
00021 
00022 typedef HANDLE mutex_t;
00023 #define mutex_init(m, attr)                                                \
00024     (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
00025 #define mutex_lock(m)                                                      \
00026     ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
00027 #define mutex_unlock(m)         (ReleaseMutex(*(m)) ? 0 : -1)
00028 #else
00029 #include <pthread.h>
00030 #include <unistd.h>
00031 #define PATHD '/'
00032 
00033 typedef pthread_t thread_t;
00034 #define thread_create(thrp, attr, func, arg)                               \
00035     pthread_create((thrp), (attr), (func), (arg))
00036 #define thread_join(thr, statusp) pthread_join((thr), (statusp))
00037 
00038 typedef pthread_mutex_t mutex_t;
00039 #define mutex_init(m, attr)     pthread_mutex_init((m), (attr))
00040 #define mutex_lock(m)           pthread_mutex_lock(m)
00041 #define mutex_unlock(m)         pthread_mutex_unlock(m)
00042 #endif
00043 
00044 // Run 5 writers threads at a time.
00045 #define NUMWRITERS 5
00046 
00047 // Printing of pthread_t is implementation-specific, so we
00048 // create our own thread IDs for reporting purposes.
00049 int global_thread_num;
00050 mutex_t thread_num_lock;
00051 
00052 // Forward declarations
00053 int countRecords(Db *, DbTxn *);
00054 int openDb(Db **, const char *, const char *, DbEnv *, u_int32_t);
00055 int usage(void);
00056 void *writerThread(void *);
00057 
00058 int 
00059 main(void)
00060 {
00061     // Initialize our handles
00062     Db *dbp = NULL;
00063     DbEnv *envp = NULL;
00064 
00065     thread_t writerThreads[NUMWRITERS];
00066     int i;
00067     u_int32_t envFlags;
00068 
00069     // Application name
00070     const char *progName = "TxnGuideInMemory";
00071 
00072     // Env open flags
00073     envFlags =
00074       DB_CREATE     |  // Create the environment if it does not exist
00075       DB_RECOVER    |  // Run normal recovery.
00076       DB_INIT_LOCK  |  // Initialize the locking subsystem
00077       DB_INIT_LOG   |  // Initialize the logging subsystem
00078       DB_INIT_TXN   |  // Initialize the transactional subsystem. This
00079                        // also turns on logging.
00080       DB_INIT_MPOOL |  // Initialize the memory pool (in-memory cache)
00081       DB_PRIVATE    |  // Region files are not backed by the filesystem.
00082                        // Instead, they are backed by heap memory.
00083       DB_THREAD;       // Cause the environment to be free-threaded
00084 
00085     try {
00086         // Create the environment 
00087         envp = new DbEnv(0);
00088 
00089         // Specify in-memory logging
00090         envp->set_flags(DB_LOG_INMEMORY, 1);
00091 
00092         // Specify the size of the in-memory log buffer.
00093         envp->set_lg_bsize(10 * 1024 * 1024);
00094 
00095         // Specify the size of the in-memory cache
00096         envp->set_cachesize(0, 10 * 1024 * 1024, 1);
00097 
00098         // Indicate that we want db to internally perform deadlock 
00099         // detection.  Also indicate that the transaction with 
00100         // the fewest number of write locks will receive the 
00101         // deadlock notification in the event of a deadlock.
00102         envp->set_lk_detect(DB_LOCK_MINWRITE);
00103 
00104         // Open the environment
00105         envp->open(NULL, envFlags, 0);
00106 
00107         // If we had utility threads (for running checkpoints or 
00108         // deadlock detection, for example) we would spawn those
00109         // here. However, for a simple example such as this,
00110         // that is not required.
00111 
00112         // Open the database
00113         openDb(&dbp, progName, NULL,
00114             envp, DB_DUPSORT);
00115         
00116         // Initialize a mutex. Used to help provide thread ids.
00117         (void)mutex_init(&thread_num_lock, NULL);
00118 
00119         // Start the writer threads.
00120         for (i = 0; i < NUMWRITERS; i++)
00121             (void)thread_create(
00122                 &writerThreads[i], NULL, 
00123                 writerThread, 
00124                 (void *)dbp);
00125 
00126         // Join the writers
00127         for (i = 0; i < NUMWRITERS; i++)
00128             (void)thread_join(writerThreads[i], NULL);
00129 
00130     } catch(DbException &e) {
00131         std::cerr << "Error opening database environment: "
00132                   << std::endl;
00133         std::cerr << e.what() << std::endl;
00134         return (EXIT_FAILURE);
00135     }
00136 
00137     try {
00138         // Close our database handle if it was opened.
00139         if (dbp != NULL)
00140             dbp->close(0);
00141 
00142         // Close our environment if it was opened.
00143         if (envp != NULL)
00144             envp->close(0);
00145     } catch(DbException &e) {
00146         std::cerr << "Error closing database and environment." 
00147                   << std::endl;
00148         std::cerr << e.what() << std::endl;
00149         return (EXIT_FAILURE);
00150     }
00151 
00152     // Final status message and return.
00153 
00154     std::cout << "I'm all done." << std::endl;
00155     return (EXIT_SUCCESS);
00156 }
00157 
00158 // A function that performs a series of writes to a
00159 // Berkeley DB database. The information written
00160 // to the database is largely nonsensical, but the
00161 // mechanism of transactional commit/abort and
00162 // deadlock detection is illustrated here.
00163 void *
00164 writerThread(void *args)
00165 {
00166     int j, thread_num;
00167     int max_retries = 20;   // Max retry on a deadlock
00168     char *key_strings[] = {"key 1", "key 2", "key 3", "key 4",
00169                            "key 5", "key 6", "key 7", "key 8",
00170                            "key 9", "key 10"};
00171 
00172     Db *dbp = (Db *)args;
00173     DbEnv *envp = dbp->get_env();
00174 
00175     // Get the thread number
00176     (void)mutex_lock(&thread_num_lock);
00177     global_thread_num++;
00178     thread_num = global_thread_num;
00179     (void)mutex_unlock(&thread_num_lock);
00180 
00181     // Initialize the random number generator 
00182     srand(thread_num);
00183 
00184     // Perform 50 transactions
00185     for (int i=0; i<50; i++) { 
00186         DbTxn *txn;
00187         bool retry = true;
00188         int retry_count = 0;
00189         // while loop is used for deadlock retries
00190         while (retry) {
00191             // try block used for deadlock detection and
00192             // general db exception handling
00193             try {
00194 
00195                 // Begin our transaction. We group multiple writes in
00196                 // this thread under a single transaction so as to
00197                 // (1) show that you can atomically perform multiple 
00198                 // writes at a time, and (2) to increase the chances 
00199                 // of a deadlock occurring so that we can observe our 
00200                 // deadlock detection at work.
00201          
00202                 // Normally we would want to avoid the potential for 
00203                 // deadlocks, so for this workload the correct thing 
00204                 // would be to perform our puts with autocommit. But 
00205                 // that would excessively simplify our example, so we 
00206                 // do the "wrong" thing here instead.
00207                 txn = NULL;
00208                 envp->txn_begin(NULL, &txn, 0);
00209 
00210                 // Perform the database write for this transaction.
00211                 for (j = 0; j < 10; j++) {
00212                     Dbt key, value;
00213                     key.set_data(key_strings[j]);
00214                     key.set_size((u_int32_t)strlen(key_strings[j]) + 1);
00215 
00216                     int payload = rand() + i;
00217                     value.set_data(&payload);
00218                     value.set_size(sizeof(int));
00219 
00220                     // Perform the database put
00221                     dbp->put(txn, &key, &value, 0);
00222                 }
00223 
00224                 // countRecords runs a cursor over the entire database.
00225                 // We do this to illustrate issues of deadlocking
00226                 std::cout << thread_num <<  " : Found " 
00227                           <<  countRecords(dbp, txn) 
00228                           << " records in the database." << std::endl;
00229 
00230                 std::cout << thread_num <<  " : committing txn : " << i 
00231                           << std::endl;
00232 
00233                 // commit
00234                 try {
00235                     txn->commit(0);
00236                     retry = false;
00237                     txn = NULL;
00238                 } catch (DbException &e) {
00239                     std::cout << "Error on txn commit: " 
00240                               << e.what() << std::endl;
00241                 }
00242             } catch (DbDeadlockException &) {
00243                 // First thing that we MUST do is abort the transaction.
00244                 if (txn != NULL)
00245                     (void)txn->abort();
00246 
00247                 // Now we decide if we want to retry the operation.
00248                 // If we have retried less than max_retries,
00249                 // increment the retry count and goto retry.
00250                 if (retry_count < max_retries) {
00251                     std::cerr << "############### Writer " << thread_num 
00252                               << ": Got DB_LOCK_DEADLOCK.\n"
00253                               << "Retrying write operation." << std::endl;
00254                     retry_count++;
00255                     retry = true;
00256                  } else {
00257                     // Otherwise, just give up.
00258                     std::cerr << "Writer " << thread_num 
00259                               << ": Got DeadLockException and out of "
00260                               << "retries. Giving up." << std::endl;
00261                     retry = false;
00262                  }
00263            } catch (DbException &e) {
00264                 std::cerr << "db put failed" << std::endl;
00265                 std::cerr << e.what() << std::endl;
00266                 if (txn != NULL)
00267                     txn->abort();
00268                 retry = false;
00269            } catch (std::exception &ee) {
00270             std::cerr << "Unknown exception: " << ee.what() << std::endl;
00271             return (0);
00272           }
00273         }
00274     }
00275     return (0);
00276 }
00277 
00278 
00279 // This simply counts the number of records contained in the
00280 // database and returns the result. You can use this method
00281 // in three ways:
00282 //
00283 // First call it with an active txn handle.
00284 //
00285 // Secondly, configure the cursor for uncommitted reads
00286 //
00287 // Third, call countRecords AFTER the writer has committed
00288 //    its transaction.
00289 //
00290 // If you do none of these things, the writer thread will 
00291 // self-deadlock. 
00292 //
00293 // Note that this method exists only for illustrative purposes.
00294 // A more straight-forward way to count the number of records in
00295 // a database is to use the Database.getStats() method.
00296 int 
00297 countRecords(Db *dbp, DbTxn *txn) 
00298 {
00299 
00300     Dbc *cursorp = NULL;
00301     int count = 0;
00302 
00303     try {
00304         // Get the cursor
00305         dbp->cursor(txn, &cursorp, 0);
00306 
00307         Dbt key, value;
00308         while (cursorp->get(&key, &value, DB_NEXT) == 0) {
00309             count++;
00310         }
00311     } catch (DbDeadlockException &de) {
00312         std::cerr << "countRecords: got deadlock" << std::endl;
00313         cursorp->close();
00314         throw de;
00315     } catch (DbException &e) {
00316         std::cerr << "countRecords error:" << std::endl;
00317         std::cerr << e.what() << std::endl;
00318     }
00319 
00320     if (cursorp != NULL) {
00321         try {
00322             cursorp->close();
00323         } catch (DbException &e) {
00324             std::cerr << "countRecords: cursor close failed:" << std::endl;
00325             std::cerr << e.what() << std::endl;
00326         }
00327     }
00328 
00329     return (count);
00330 }
00331 
00332 
00333 // Open a Berkeley DB database
00334 int
00335 openDb(Db **dbpp, const char *progname, const char *fileName,
00336   DbEnv *envp, u_int32_t extraFlags)
00337 {
00338     int ret;
00339     u_int32_t openFlags;
00340 
00341     try {
00342         Db *dbp = new Db(envp, 0);
00343 
00344         // Point to the new'd Db
00345         *dbpp = dbp;
00346 
00347         if (extraFlags != 0)
00348             ret = dbp->set_flags(extraFlags);
00349 
00350         // Now open the database */
00351         openFlags = DB_CREATE        | // Allow database creation
00352                     DB_THREAD        |
00353                     DB_AUTO_COMMIT;    // Allow autocommit
00354 
00355         dbp->open(NULL,       // Txn pointer
00356                   fileName,   // File name
00357                   NULL,       // Logical db name
00358                   DB_BTREE,   // Database type (using btree)
00359                   openFlags,  // Open flags
00360                   0);         // File mode. Using defaults
00361     } catch (DbException &e) {
00362         std::cerr << progname << ": openDb: db open failed:" << std::endl;
00363         std::cerr << e.what() << std::endl;
00364         return (EXIT_FAILURE);
00365     }
00366 
00367     return (EXIT_SUCCESS);
00368 }
00369 

Generated on Sun Dec 25 12:14:26 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2