Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

txn_guide.c

00001 /* File: txn_guide.c */
00002 
00003 /* We assume an ANSI-compatible compiler */
00004 #include <stdio.h>
00005 #include <stdlib.h>
00006 #include <string.h>
00007 #include <db.h>
00008 
00009 #ifdef _WIN32
00010 #include <windows.h>
00011 #define PATHD '\\'
00012 extern int getopt(int, char * const *, const char *);
00013 extern char *optarg;
00014 
00015 typedef HANDLE thread_t;
00016 #define thread_create(thrp, attr, func, arg)                               \
00017     (((*(thrp) = CreateThread(NULL, 0,                                     \
00018         (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
00019 #define thread_join(thr, statusp)                                          \
00020     ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) &&            \
00021     GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
00022 
00023 typedef HANDLE mutex_t;
00024 #define mutex_init(m, attr)                                                \
00025     (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
00026 #define mutex_lock(m)                                                      \
00027     ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
00028 #define mutex_unlock(m)         (ReleaseMutex(*(m)) ? 0 : -1)
00029 #else
00030 #include <pthread.h>
00031 #include <unistd.h>
00032 #define PATHD '/'
00033 
00034 typedef pthread_t thread_t;
00035 #define thread_create(thrp, attr, func, arg)                               \
00036     pthread_create((thrp), (attr), (func), (arg))
00037 #define thread_join(thr, statusp) pthread_join((thr), (statusp))
00038 
00039 typedef pthread_mutex_t mutex_t;
00040 #define mutex_init(m, attr)     pthread_mutex_init((m), (attr))
00041 #define mutex_lock(m)           pthread_mutex_lock(m)
00042 #define mutex_unlock(m)         pthread_mutex_unlock(m)
00043 #endif
00044 
00045 /* Run 5 writers threads at a time. */
00046 #define NUMWRITERS 5
00047 
00048 /*
00049  * Printing of a thread_t is implementation-specific, so we
00050  * create our own thread IDs for reporting purposes.
00051  */
00052 int global_thread_num;
00053 mutex_t thread_num_lock;
00054 
00055 /* Forward declarations */
00056 int count_records(DB *, DB_TXN *);
00057 int open_db(DB **, const char *, const char *, DB_ENV *, u_int32_t);
00058 int usage(void);
00059 int writer_thread(void *);
00060 
00061 /* Usage function */
00062 int
00063 usage()
00064 {
00065     fprintf(stderr, " [-h <database_home_directory>]\n");
00066     return (EXIT_FAILURE);
00067 }
00068 
00069 
00070 int
00071 main(int argc, char *argv[])
00072 {
00073     /* Initialize our handles */
00074     DB *dbp = NULL;
00075     DB_ENV *envp = NULL;
00076 
00077     thread_t writer_threads[NUMWRITERS];
00078     int ch, i, ret, ret_t;
00079     u_int32_t env_flags;
00080     char *db_home_dir;
00081     /* Application name */
00082     const char *prog_name = "txn_guide";
00083     /* Database file name */
00084     const char *file_name = "mydb.db";
00085 
00086     /* Parse the command line arguments */
00087 #ifdef _WIN32
00088     db_home_dir = ".\\";
00089 #else
00090     db_home_dir = "./";
00091 #endif
00092     while ((ch = getopt(argc, argv, "h:")) != EOF)
00093         switch (ch) {
00094         case 'h':
00095             db_home_dir = optarg;
00096             break;
00097         case '?':
00098         default:
00099             return (usage());
00100         }
00101 
00102     /* Create the environment */
00103     ret = db_env_create(&envp, 0);
00104     if (ret != 0) {
00105         fprintf(stderr, "Error creating environment handle: %s\n",
00106             db_strerror(ret));
00107         goto err;
00108     }
00109 
00110      /*
00111      * Indicate that we want db to perform lock detection internally.
00112      * Also indicate that the transaction with the fewest number of
00113      * write locks will receive the deadlock notification in 
00114      * the event of a deadlock.
00115      */  
00116     ret = envp->set_lk_detect(envp, DB_LOCK_MINWRITE);
00117     if (ret != 0) {
00118         fprintf(stderr, "Error setting lock detect: %s\n",
00119             db_strerror(ret));
00120         goto err;
00121     }
00122 
00123 
00124     env_flags =
00125       DB_CREATE     |  /* Create the environment if it does not exist */ 
00126       DB_RECOVER    |  /* Run normal recovery. */
00127       DB_INIT_LOCK  |  /* Initialize the locking subsystem */
00128       DB_INIT_LOG   |  /* Initialize the logging subsystem */
00129       DB_INIT_TXN   |  /* Initialize the transactional subsystem. This
00130                         * also turns on logging. */
00131       DB_INIT_MPOOL |  /* Initialize the memory pool (in-memory cache) */
00132       DB_THREAD;       /* Cause the environment to be free-threaded */
00133 
00134     /* Now actually open the environment */
00135     ret = envp->open(envp, db_home_dir, env_flags, 0);
00136     if (ret != 0) {
00137         fprintf(stderr, "Error opening environment: %s\n",
00138             db_strerror(ret));
00139         goto err;
00140     }
00141 
00142     /*
00143      * If we had utility threads (for running checkpoints or 
00144      * deadlock detection, for example) we would spawn those
00145      * here. However, for a simple example such as this,
00146      * that is not required.
00147      */
00148 
00149     /* Open the database */
00150     ret = open_db(&dbp, prog_name, file_name, 
00151       envp, DB_DUPSORT);
00152     if (ret != 0)
00153         goto err;
00154 
00155     /* Initialize a mutex. Used to help provide thread ids. */
00156     (void)mutex_init(&thread_num_lock, NULL);
00157 
00158     /* Start the writer threads. */
00159     for (i = 0; i < NUMWRITERS; i++)
00160         (void)thread_create(&writer_threads[i], NULL, 
00161             (void *)writer_thread, (void *)dbp);
00162 
00163     /* Join the writers */
00164     for (i = 0; i < NUMWRITERS; i++)
00165         (void)thread_join(writer_threads[i], NULL);
00166 
00167 err:
00168     /* Close our database handle, if it was opened. */
00169     if (dbp != NULL) {
00170         ret_t = dbp->close(dbp, 0);
00171         if (ret_t != 0) {
00172             fprintf(stderr, "%s database close failed: %s\n",
00173                 file_name, db_strerror(ret_t));
00174             ret = ret_t;
00175         }
00176     }
00177 
00178     /* Close our environment, if it was opened. */
00179     if (envp != NULL) {
00180         ret_t = envp->close(envp, 0);
00181         if (ret_t != 0) {
00182             fprintf(stderr, "environment close failed: %s\n",
00183                 db_strerror(ret_t));
00184                 ret = ret_t;
00185         }
00186     }
00187 
00188     /* Final status message and return. */
00189     printf("I'm all done.\n");
00190     return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
00191 }
00192 
00193 /* 
00194  * A function that performs a series of writes to a
00195  * Berkeley DB database. The information written
00196  * to the database is largely nonsensical, but the
00197  * mechanism of transactional commit/abort and
00198  * deadlock detection is illustrated here.
00199  */
00200 int
00201 writer_thread(void *args)
00202 {
00203     DB *dbp;
00204     DB_ENV *envp;
00205 
00206     DBT key, value;
00207     DB_TXN *txn;
00208     int i, j, payload, ret, thread_num;
00209     int retry_count, max_retries = 20;   /* Max retry on a deadlock */
00210     char *key_strings[] = {"key 1", "key 2", "key 3", "key 4",
00211                            "key 5", "key 6", "key 7", "key 8",
00212                            "key 9", "key 10"};
00213 
00214     dbp = (DB *)args;
00215     envp = dbp->get_env(dbp);
00216 
00217     /* Get the thread number */
00218     (void)mutex_lock(&thread_num_lock);
00219     global_thread_num++;
00220     thread_num = global_thread_num;
00221     (void)mutex_unlock(&thread_num_lock);
00222 
00223     /* Initialize the random number generator */
00224     srand(thread_num);
00225 
00226 
00227     /* Write 50 times and then quit */
00228     for (i = 0; i < 50; i++) {
00229         retry_count = 0; /* Used for deadlock retries */
00230 
00231         /*
00232          * Some think it is bad form to loop with a goto statement, but 
00233          * we do it anyway because it is the simplest and clearest way
00234          * to achieve our abort/retry operation.
00235          */
00236 retry:
00237         /* Begin our transaction. We group multiple writes in
00238          * this thread under a single transaction so as to
00239          * (1) show that you can atomically perform multiple writes 
00240          * at a time, and (2) to increase the chances of a 
00241          * deadlock occurring so that we can observe our 
00242          * deadlock detection at work.
00243          *
00244          * Normally we would want to avoid the potential for deadlocks,
00245          * so for this workload the correct thing would be to perform our 
00246          * puts with autocommit. But that would excessively simplify our 
00247          * example, so we do the "wrong" thing here instead.
00248          */
00249         ret = envp->txn_begin(envp, NULL, &txn, 0);
00250         if (ret != 0) {
00251             envp->err(envp, ret, "txn_begin failed");
00252             return (EXIT_FAILURE);
00253         }
00254         for (j = 0; j < 10; j++) {
00255             /* Set up our key and values DBTs */
00256             memset(&key, 0, sizeof(DBT));
00257             key.data = key_strings[j];
00258             key.size = (u_int32_t)strlen(key_strings[j]) + 1;
00259 
00260             memset(&value, 0, sizeof(DBT));
00261             payload = rand() + i;
00262             value.data = &payload;
00263             value.size = sizeof(int);
00264 
00265             /* Perform the database put. */
00266             switch (ret = dbp->put(dbp, txn, &key, &value, 0)) {
00267                 case 0:
00268                     break;
00269                 /* 
00270                  * Our database is configured for sorted duplicates, 
00271                  * so there is a potential for a KEYEXIST error return. 
00272                  * If we get one, simply ignore it and continue on.
00273                  *
00274                  * Note that you will see KEYEXIST errors only after you
00275                  * have run this program at least once.
00276                  */
00277                 case DB_KEYEXIST:
00278                     printf("Got keyexists.\n");
00279                     break;
00280                 /*
00281                  * Here's where we perform deadlock detection. If 
00282                  * DB_LOCK_DEADLOCK is returned by the put operation, 
00283                  * then this thread has been chosen to break a deadlock.
00284                  * It must abort its operation, and optionally retry the
00285                  * put.
00286                  */
00287                 case DB_LOCK_DEADLOCK:
00288                     /* 
00289                      * First thing that we MUST do is abort the 
00290                      * transaction.
00291                      */
00292                     (void)txn->abort(txn);
00293                     /*
00294                      * Now we decide if we want to retry the operation.
00295                      * If we have retried less than max_retries,
00296                      * increment the retry count and goto retry.
00297                      */
00298                     if (retry_count < max_retries) {
00299                         printf("Writer %i: Got DB_LOCK_DEADLOCK.\n", 
00300                             thread_num);
00301                         printf("Writer %i: Retrying write operation.\n",
00302                             thread_num);
00303                         retry_count++;
00304                         goto retry;
00305                     }
00306                     /*
00307                      * Otherwise, just give up.
00308                      */
00309                     printf("Writer %i: ", thread_num);
00310                     printf("Got DB_LOCK_DEADLOCK and out of retries.\n");
00311                     printf("Writer %i: Giving up.\n", thread_num);
00312                     return (EXIT_FAILURE);
00313                 /* 
00314                  * If a generic error occurs, we simply abort the 
00315                  * transaction and exit the thread completely.
00316                  */
00317                 default:
00318                     envp->err(envp, ret, "db put failed");
00319                     ret = txn->abort(txn);
00320                     if (ret != 0)
00321                         envp->err(envp, ret, 
00322                             "txn abort failed");
00323                     return (EXIT_FAILURE);
00324              } 
00326         }   
00328         /* 
00329          * print the number of records found in the database. 
00330          * See count_records() for usage information.
00331          */
00332         printf("Thread %i. Record count: %i\n", thread_num, 
00333             count_records(dbp, NULL));
00334 
00335         /* 
00336          * If all goes well, we can commit the transaction and
00337          * exit the thread.
00338          */
00339         ret = txn->commit(txn, 0);
00340         if (ret != 0) {
00341             envp->err(envp, ret, "txn commit failed");
00342             return (EXIT_FAILURE);
00343         }
00344     }
00345     return (EXIT_SUCCESS);
00346 }
00347 
00348 /* 
00349  * This simply counts the number of records contained in the
00350  * database and returns the result. You can use this function 
00351  * in three ways:
00352  *
00353  * First call it with an active txn handle.
00354  * Secondly, configure the cursor for uncommitted reads (this
00355  *    is what the example currently does).
00356  * Third, call count_records AFTER the writer has committed
00357  *    its transaction.
00358  *
00359  * If you do none of these things, the writer thread will 
00360  * self-deadlock.
00361  *
00362  * Note that this function exists only for illustrative purposes.
00363  * A more straight-forward way to count the number of records in
00364  * a database is to use DB->stat() or DB->stat_print().
00365  */
00366 
00367 int
00368 count_records(DB *dbp, DB_TXN *txn)
00369 {
00370 
00371     DBT key, value;
00372     DBC *cursorp;
00373     int count, ret;
00374 
00375     cursorp = NULL;
00376     count = 0;
00377 
00378     /* Get the cursor */
00379     ret = dbp->cursor(dbp, txn, &cursorp, 
00380         DB_READ_UNCOMMITTED);
00381     if (ret != 0) {
00382         dbp->err(dbp, ret,
00383           "count_records: cursor open failed.");
00384         goto cursor_err;
00385     }
00386 
00387     /* Get the key DBT used for the database read */
00388     memset(&key, 0, sizeof(DBT));
00389     memset(&value, 0, sizeof(DBT));
00390     do {
00391         ret = cursorp->c_get(cursorp, &key, &value, DB_NEXT);
00392         switch (ret) {
00393             case 0:
00394                 count++;
00395                 break;
00396             case DB_NOTFOUND:
00397                 break;
00398             default:
00399                 dbp->err(dbp, ret, 
00400                     "Count records unspecified error");
00401                 goto cursor_err;
00402         }
00403     } while (ret == 0);
00404 
00405 cursor_err:
00406     if (cursorp != NULL) {
00407         ret = cursorp->c_close(cursorp);
00408         if (ret != 0) {
00409             dbp->err(dbp, ret,
00410                 "count_records: cursor close failed.");
00411         }
00412     }
00413 
00414     return (count);
00415 }
00416 
00417 
00418 /* Open a Berkeley DB database */
00419 int
00420 open_db(DB **dbpp, const char *progname, const char *file_name,
00421   DB_ENV *envp, u_int32_t extra_flags)
00422 {
00423     int ret;
00424     u_int32_t open_flags;
00425     DB *dbp;
00426 
00427     /* Initialize the DB handle */
00428     ret = db_create(&dbp, envp, 0);
00429     if (ret != 0) {
00430         fprintf(stderr, "%s: %s\n", progname,
00431                 db_strerror(ret));
00432         return (EXIT_FAILURE);
00433     }
00434 
00435     /* Point to the memory malloc'd by db_create() */
00436     *dbpp = dbp;
00437 
00438     if (extra_flags != 0) {
00439         ret = dbp->set_flags(dbp, extra_flags);
00440         if (ret != 0) {
00441             dbp->err(dbp, ret, 
00442                 "open_db: Attempt to set extra flags failed.");
00443             return (EXIT_FAILURE);
00444         }
00445     }
00446 
00447     /* Now open the database */
00448     open_flags = DB_CREATE              | /* Allow database creation */ 
00449                  DB_READ_UNCOMMITTED    | /* Allow dirty reads */
00450                  DB_AUTO_COMMIT;          /* Allow autocommit */
00451 
00452     ret = dbp->open(dbp,        /* Pointer to the database */
00453                     NULL,       /* Txn pointer */
00454                     file_name,  /* File name */
00455                     NULL,       /* Logical db name */
00456                     DB_BTREE,   /* Database type (using btree) */
00457                     open_flags, /* Open flags */
00458                     0);         /* File mode. Using defaults */
00459     if (ret != 0) {
00460         dbp->err(dbp, ret, "Database '%s' open failed",
00461             file_name);
00462         return (EXIT_FAILURE);
00463     }
00464     return (EXIT_SUCCESS);
00465 }

Generated on Sun Dec 25 12:14:25 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2