Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

TxnGuide.cpp

00001 // File TxnGuide.cpp
00002 
00003 #include <iostream>
00004 #include <db_cxx.h>
00005 
00006 #ifdef _WIN32
00007 #include <windows.h>
00008 extern "C" {
00009     extern int getopt(int, char * const *, const char *);
00010     extern char *optarg;
00011 }
00012 #define PATHD '\\'
00013 
00014 typedef HANDLE thread_t;
00015 #define thread_create(thrp, attr, func, arg)                               \
00016     (((*(thrp) = CreateThread(NULL, 0,                                     \
00017         (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
00018 #define thread_join(thr, statusp)                                          \
00019     ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) &&            \
00020     GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
00021 
00022 typedef HANDLE mutex_t;
00023 #define mutex_init(m, attr)                                                \
00024     (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
00025 #define mutex_lock(m)                                                      \
00026     ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
00027 #define mutex_unlock(m)         (ReleaseMutex(*(m)) ? 0 : -1)
00028 #else
00029 #include <pthread.h>
00030 #include <unistd.h>
00031 #define PATHD '/'
00032 
00033 typedef pthread_t thread_t;
00034 #define thread_create(thrp, attr, func, arg)                               \
00035     pthread_create((thrp), (attr), (func), (arg))
00036 #define thread_join(thr, statusp) pthread_join((thr), (statusp))
00037 
00038 typedef pthread_mutex_t mutex_t;
00039 #define mutex_init(m, attr)     pthread_mutex_init((m), (attr))
00040 #define mutex_lock(m)           pthread_mutex_lock(m)
00041 #define mutex_unlock(m)         pthread_mutex_unlock(m)
00042 #endif
00043 
00044 // Run 5 writers threads at a time.
00045 #define NUMWRITERS 5
00046 
00047 // Printing of thread_t is implementation-specific, so we
00048 // create our own thread IDs for reporting purposes.
00049 int global_thread_num;
00050 mutex_t thread_num_lock;
00051 
00052 // Forward declarations
00053 int countRecords(Db *, DbTxn *); 
00054 int openDb(Db **, const char *, const char *, DbEnv *, u_int32_t);
00055 int usage(void);
00056 void *writerThread(void *);
00057 
00058 // Usage function
00059 int
00060 usage()
00061 {
00062     std::cerr << " [-h <database_home_directory>]" << std::endl;
00063     return (EXIT_FAILURE);
00064 }
00065 
00066 int 
00067 main(int argc, char *argv[])
00068 {
00069     // Initialize our handles
00070     Db *dbp = NULL;
00071     DbEnv *envp = NULL;
00072 
00073     thread_t writerThreads[NUMWRITERS];
00074     int ch, i;
00075     u_int32_t envFlags;
00076     char *dbHomeDir;
00077     extern char *optarg;
00078 
00079     // Application name
00080     const char *progName = "TxnGuide";
00081 
00082     // Database file name
00083     const char *fileName = "mydb.db";
00084 
00085     // Parse the command line arguments
00086 #ifdef _WIN32
00087     dbHomeDir = ".\\";
00088 #else
00089     dbHomeDir = "./";
00090 #endif
00091     while ((ch = getopt(argc, argv, "h:")) != EOF)
00092         switch (ch) {
00093         case 'h':
00094             dbHomeDir = optarg;
00095             break;
00096         case '?':
00097         default:
00098             return (usage());
00099         }
00100 
00101 
00102     // Env open flags
00103     envFlags =
00104       DB_CREATE     |  // Create the environment if it does not exist
00105       DB_RECOVER    |  // Run normal recovery.
00106       DB_INIT_LOCK  |  // Initialize the locking subsystem
00107       DB_INIT_LOG   |  // Initialize the logging subsystem
00108       DB_INIT_TXN   |  // Initialize the transactional subsystem. This
00109                        // also turns on logging.
00110       DB_INIT_MPOOL |  // Initialize the memory pool (in-memory cache)
00111       DB_THREAD;       // Cause the environment to be free-threaded
00112 
00113     try {
00114         // Create and open the environment 
00115         envp = new DbEnv(0);
00116 
00117         // Indicate that we want db to internally perform deadlock 
00118         // detection.  Also indicate that the transaction with 
00119         // the fewest number of write locks will receive the 
00120         // deadlock notification in the event of a deadlock.
00121         envp->set_lk_detect(DB_LOCK_MINWRITE);
00122 
00123         envp->open(dbHomeDir, envFlags, 0);
00124 
00125 
00126         // If we had utility threads (for running checkpoints or 
00127         // deadlock detection, for example) we would spawn those
00128         // here. However, for a simple example such as this,
00129         // that is not required.
00130 
00131         // Open the database
00132         openDb(&dbp, progName, fileName,
00133             envp, DB_DUPSORT);
00134         
00135         // Initialize a mutex. Used to help provide thread ids.
00136         (void)mutex_init(&thread_num_lock, NULL);
00137 
00138         // Start the writer threads.
00139         for (i = 0; i < NUMWRITERS; i++)
00140             (void)thread_create(
00141                 &writerThreads[i], NULL, 
00142                 writerThread, (void *)dbp);
00143 
00144         // Join the writers
00145         for (i = 0; i < NUMWRITERS; i++)
00146             (void)thread_join(writerThreads[i], NULL);
00147 
00148     } catch(DbException &e) {
00149         std::cerr << "Error opening database environment: "
00150                   << dbHomeDir << std::endl;
00151         std::cerr << e.what() << std::endl;
00152         return (EXIT_FAILURE);
00153     }
00154 
00155     try {
00156         // Close our database handle if it was opened.
00157         if (dbp != NULL)
00158             dbp->close(0);
00159 
00160         // Close our environment if it was opened.
00161         if (envp != NULL)
00162             envp->close(0);
00163     } catch(DbException &e) {
00164         std::cerr << "Error closing database and environment." 
00165                   << std::endl;
00166         std::cerr << e.what() << std::endl;
00167         return (EXIT_FAILURE);
00168     }
00169 
00170     // Final status message and return.
00171 
00172     std::cout << "I'm all done." << std::endl;
00173     return (EXIT_SUCCESS);
00174 }
00175 
00176 // A function that performs a series of writes to a
00177 // Berkeley DB database. The information written
00178 // to the database is largely nonsensical, but the
00179 // mechanism of transactional commit/abort and
00180 // deadlock detection is illustrated here.
00181 void *
00182 writerThread(void *args)
00183 {
00184     int j, thread_num;
00185     int max_retries = 20;   // Max retry on a deadlock
00186     char *key_strings[] = {"key 1", "key 2", "key 3", "key 4",
00187                            "key 5", "key 6", "key 7", "key 8",
00188                            "key 9", "key 10"};
00189 
00190     Db *dbp = (Db *)args;
00191     DbEnv *envp = dbp->get_env();
00192 
00193     // Get the thread number
00194     (void)mutex_lock(&thread_num_lock);
00195     global_thread_num++;
00196     thread_num = global_thread_num;
00197     (void)mutex_unlock(&thread_num_lock);
00198 
00199     // Initialize the random number generator 
00200     srand(thread_num);
00201 
00202     // Perform 50 transactions
00203     for (int i=0; i<50; i++) { 
00204         DbTxn *txn;
00205         bool retry = true;
00206         int retry_count = 0;
00207         // while loop is used for deadlock retries
00208         while (retry) {
00209             // try block used for deadlock detection and
00210             // general db exception handling
00211             try {
00212 
00213                 // Begin our transaction. We group multiple writes in
00214                 // this thread under a single transaction so as to
00215                 // (1) show that you can atomically perform multiple 
00216                 // writes at a time, and (2) to increase the chances 
00217                 // of a deadlock occurring so that we can observe our 
00218                 // deadlock detection at work.
00219          
00220                 // Normally we would want to avoid the potential for 
00221                 // deadlocks, so for this workload the correct thing 
00222                 // would be to perform our puts with autocommit. But 
00223                 // that would excessively simplify our example, so we 
00224                 // do the "wrong" thing here instead.
00225                 txn = NULL;
00226                 envp->txn_begin(NULL, &txn, 0);
00227 
00228                 // Perform the database write for this transaction.
00229                 for (j = 0; j < 10; j++) {
00230                     Dbt key, value;
00231                     key.set_data(key_strings[j]);
00232                     key.set_size((u_int32_t)strlen(key_strings[j]) + 1);
00233 
00234                     int payload = rand() + i;
00235                     value.set_data(&payload);
00236                     value.set_size(sizeof(int));
00237 
00238                     // Perform the database put
00239                     dbp->put(txn, &key, &value, 0);
00240                 }
00241 
00242                 // countRecords runs a cursor over the entire database.
00243                 // We do this to illustrate issues of deadlocking
00244                 std::cout << thread_num <<  " : Found " 
00245                           <<  countRecords(dbp, NULL) 
00246                           << " records in the database." << std::endl;
00247 
00248                 std::cout << thread_num <<  " : committing txn : " << i 
00249                           << std::endl;
00250 
00251                 // commit
00252                 try {
00253                     txn->commit(0);
00254                     retry = false;
00255                     txn = NULL;
00256                 } catch (DbException &e) {
00257                     std::cout << "Error on txn commit: " 
00258                               << e.what() << std::endl;
00259                 }
00260             } catch (DbDeadlockException &) {
00261                 // First thing that we MUST do is abort the transaction.
00262                 if (txn != NULL)
00263                     (void)txn->abort();
00264 
00265                 // Now we decide if we want to retry the operation.
00266                 // If we have retried less than max_retries,
00267                 // increment the retry count and goto retry.
00268                 if (retry_count < max_retries) {
00269                     std::cout << "############### Writer " << thread_num 
00270                               << ": Got DB_LOCK_DEADLOCK.\n"
00271                               << "Retrying write operation."
00272                               << std::endl;
00273                     retry_count++;
00274                     retry = true;
00275                  } else {
00276                     // Otherwise, just give up.
00277                     std::cerr << "Writer " << thread_num 
00278                               << ": Got DeadLockException and out of "
00279                               << "retries. Giving up." << std::endl;
00280                     retry = false;
00281                  }
00282            } catch (DbException &e) {
00283                 std::cerr << "db put failed" << std::endl;
00284                 std::cerr << e.what() << std::endl;
00285                 if (txn != NULL)
00286                     txn->abort();
00287                 retry = false;
00288            } catch (std::exception &ee) {
00289             std::cerr << "Unknown exception: " << ee.what() << std::endl;
00290             return (0);
00291           }
00292         }
00293     }
00294     return (0);
00295 }
00296 
00297 
00298 // This simply counts the number of records contained in the
00299 // database and returns the result. You can use this method
00300 // in three ways:
00301 //
00302 // First call it with an active txn handle.
00303 // Secondly, configure the cursor for uncommitted reads
00304 //
00305 // Third, call countRecords AFTER the writer has committed
00306 //    its transaction.
00307 //
00308 // If you do none of these things, the writer thread will 
00309 // self-deadlock. 
00310 //
00311 // Note that this method exists only for illustrative purposes.
00312 // A more straight-forward way to count the number of records in
00313 // a database is to use the Database.getStats() method.
00314 int 
00315 countRecords(Db *dbp, DbTxn *txn) 
00316 {
00317 
00318     Dbc *cursorp = NULL;
00319     int count = 0;
00320 
00321     try {
00322         // Get the cursor
00323         dbp->cursor(txn, &cursorp, DB_READ_UNCOMMITTED);
00324 
00325         Dbt key, value;
00326         while (cursorp->get(&key, &value, DB_NEXT) == 0) {
00327             count++;
00328         }
00329     } catch (DbDeadlockException &de) {
00330         std::cerr << "countRecords: got deadlock" << std::endl;
00331         cursorp->close();
00332         throw de;
00333     } catch (DbException &e) {
00334         std::cerr << "countRecords error:" << std::endl;
00335         std::cerr << e.what() << std::endl;
00336     }
00337 
00338     if (cursorp != NULL) {
00339         try {
00340             cursorp->close();
00341         } catch (DbException &e) {
00342             std::cerr << "countRecords: cursor close failed:" << std::endl;
00343             std::cerr << e.what() << std::endl;
00344         }
00345     }
00346 
00347     return (count);
00348 }
00349 
00350 
00351 // Open a Berkeley DB database
00352 int
00353 openDb(Db **dbpp, const char *progname, const char *fileName,
00354   DbEnv *envp, u_int32_t extraFlags)
00355 {
00356     int ret;
00357     u_int32_t openFlags;
00358 
00359     try {
00360         Db *dbp = new Db(envp, 0);
00361 
00362         // Point to the new'd Db
00363         *dbpp = dbp;
00364 
00365         if (extraFlags != 0)
00366             ret = dbp->set_flags(extraFlags);
00367 
00368         // Now open the database */
00369         openFlags = DB_CREATE              | // Allow database creation
00370                     DB_READ_UNCOMMITTED    | // Allow uncommitted reads
00371                     DB_AUTO_COMMIT;          // Allow autocommit
00372 
00373         dbp->open(NULL,       // Txn pointer
00374                   fileName,   // File name
00375                   NULL,       // Logical db name
00376                   DB_BTREE,   // Database type (using btree)
00377                   openFlags,  // Open flags
00378                   0);         // File mode. Using defaults
00379     } catch (DbException &e) {
00380         std::cerr << progname << ": openDb: db open failed:" << std::endl;
00381         std::cerr << e.what() << std::endl;
00382         return (EXIT_FAILURE);
00383     }
00384 
00385     return (EXIT_SUCCESS);
00386 }
00387 

Generated on Sun Dec 25 12:14:26 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2