Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

txn_guide_inmemory.c

00001 /* File: txn_guide_inmemory.c */
00002 
00003 /* We assume an ANSI-compatible compiler */
00004 #include <stdio.h>
00005 #include <stdlib.h>
00006 #include <string.h>
00007 #include <db.h>
00008 
00009 #ifdef _WIN32
00010 #include <windows.h>
00011 #define PATHD '\\'
00012 extern int getopt(int, char * const *, const char *);
00013 extern char *optarg;
00014 
00015 typedef HANDLE thread_t;
00016 #define thread_create(thrp, attr, func, arg)                               \
00017     (((*(thrp) = CreateThread(NULL, 0,                                     \
00018         (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
00019 #define thread_join(thr, statusp)                                          \
00020     ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) &&            \
00021     GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
00022 
00023 typedef HANDLE mutex_t;
00024 #define mutex_init(m, attr)                                                \
00025     (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
00026 #define mutex_lock(m)                                                      \
00027     ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
00028 #define mutex_unlock(m)         (ReleaseMutex(*(m)) ? 0 : -1)
00029 #else
00030 #include <pthread.h>
00031 #include <unistd.h>
00032 #define PATHD '/'
00033 
00034 typedef pthread_t thread_t;
00035 #define thread_create(thrp, attr, func, arg)                               \
00036     pthread_create((thrp), (attr), (func), (arg))
00037 #define thread_join(thr, statusp) pthread_join((thr), (statusp))
00038 
00039 typedef pthread_mutex_t mutex_t;
00040 #define mutex_init(m, attr)     pthread_mutex_init((m), (attr))
00041 #define mutex_lock(m)           pthread_mutex_lock(m)
00042 #define mutex_unlock(m)         pthread_mutex_unlock(m)
00043 #endif
00044 
00045 /* Run 5 writers threads at a time. */
00046 #define NUMWRITERS 5
00047 
00048 /*
00049  * Printing of a thread_t is implementation-specific, so we
00050  * create our own thread IDs for reporting purposes.
00051  */
00052 int global_thread_num;
00053 mutex_t thread_num_lock;
00054 
00055 /* Forward declarations */
00056 int count_records(DB *, DB_TXN *); // --> different
00057 int open_db(DB **, const char *, const char *, DB_ENV *, u_int32_t);
00058 int writer_thread(void *);
00059 
00060 int
00061 main(void)
00062 {
00063     /* Initialize our handles */
00064     DB *dbp = NULL;
00065     DB_ENV *envp = NULL;
00066 
00067     thread_t writer_threads[NUMWRITERS];
00068     int i, ret, ret_t;
00069     u_int32_t env_flags;
00070 
00071     /* Application name */
00072     const char *prog_name = "txn_guide_inmemory";
00073     
00074     /* Create the environment */
00075     ret = db_env_create(&envp, 0);
00076     if (ret != 0) {
00077         fprintf(stderr, "Error creating environment handle: %s\n",
00078             db_strerror(ret));
00079         goto err;
00080     }
00081 
00082     env_flags =
00083       DB_CREATE     |  /* Create the environment if it does not exist */ 
00084       DB_INIT_LOCK  |  /* Initialize the locking subsystem */
00085       DB_INIT_LOG   |  /* Initialize the logging subsystem */
00086       DB_INIT_TXN   |  /* Initialize the transactional subsystem. This
00087                         * also turns on logging. */
00088       DB_INIT_MPOOL |  /* Initialize the memory pool (in-memory cache) */
00089       DB_PRIVATE    |  /* Region files are not backed by the filesystem. Instead, they
00090                         * are backed by heap memory.  */
00091       DB_THREAD;       /* Cause the environment to be free-threaded */
00092 
00093     /* Specify in-memory logging */
00094     ret = envp->set_flags(envp, DB_LOG_INMEMORY, 1);
00095     if (ret != 0) {
00096         fprintf(stderr, "Error setting log subsystem to in-memory: %s\n",
00097             db_strerror(ret));
00098         goto err;
00099     }
00100 
00101     /* 
00102      * Specify the size of the in-memory log buffer. 
00103      */
00104     ret = envp->set_lg_bsize(envp, 10 * 1024 * 1024);
00105     if (ret != 0) {
00106         fprintf(stderr, "Error increasing the log buffer size: %s\n",
00107             db_strerror(ret));
00108         goto err;
00109     }
00110 
00111     /* 
00112      * Specify the size of the in-memory cache. 
00113      */
00114     ret = envp->set_cachesize(envp, 0, 10 * 1024 * 1024, 1);
00115     if (ret != 0) {
00116         fprintf(stderr, "Error increasing the cache size: %s\n",
00117             db_strerror(ret));
00118         goto err;
00119     }
00120 
00121      /*
00122      * Indicate that we want db to perform lock detection internally.
00123      * Also indicate that the transaction with the fewest number of
00124      * write locks will receive the deadlock notification in 
00125      * the event of a deadlock.
00126      */  
00127     ret = envp->set_lk_detect(envp, DB_LOCK_MINWRITE);
00128     if (ret != 0) {
00129         fprintf(stderr, "Error setting lock detect: %s\n",
00130             db_strerror(ret));
00131         goto err;
00132     }
00133 
00134     /* Now actually open the environment */
00135     ret = envp->open(envp, NULL, env_flags, 0);
00136     if (ret != 0) {
00137         fprintf(stderr, "Error opening environment: %s\n",
00138             db_strerror(ret));
00139         goto err;
00140     }
00141 
00142     /*
00143      * If we had utility threads (for running checkpoints or 
00144      * deadlock detection, for example) we would spawn those
00145      * here. However, for a simple example such as this,
00146      * that is not required.
00147      */
00148 
00149     /* Open the database */
00150     ret = open_db(&dbp, prog_name, NULL, 
00151       envp, DB_DUPSORT);
00152     if (ret != 0)
00153         goto err;
00154 
00155     /* Initialize a mutex. Used to help provide thread ids. */
00156     (void)mutex_init(&thread_num_lock, NULL);
00157 
00158     /* Start the writer threads. */
00159     for (i = 0; i < NUMWRITERS; i++)
00160         (void)thread_create(
00161       &writer_threads[i], NULL, (void *)writer_thread, (void *)dbp);
00162 
00163     /* Join the writers */
00164     for (i = 0; i < NUMWRITERS; i++)
00165         (void)thread_join(writer_threads[i], NULL);
00166 
00167 err:
00168     /* Close our database handle, if it was opened. */
00169     if (dbp != NULL) {
00170         ret_t = dbp->close(dbp, 0);
00171         if (ret_t != 0) {
00172             fprintf(stderr, "%s database close failed.\n",
00173                 db_strerror(ret_t));
00174             ret = ret_t;
00175         }
00176     }
00177 
00178     /* Close our environment, if it was opened. */
00179     if (envp != NULL) {
00180         ret_t = envp->close(envp, 0);
00181         if (ret_t != 0) {
00182             fprintf(stderr, "environment close failed: %s\n",
00183                 db_strerror(ret_t));
00184                 ret = ret_t;
00185         }
00186     }
00187 
00188     /* Final status message and return. */
00189     printf("I'm all done.\n");
00190     return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
00191 }
00192 
00193 /* 
00194  * A function that performs a series of writes to a
00195  * Berkeley DB database. The information written
00196  * to the database is largely nonsensical, but the
00197  * mechanism of transactional commit/abort and
00198  * deadlock detection is illustrated here.
00199  */
00200 int
00201 writer_thread(void *args)
00202 {
00203     DB *dbp;
00204     DB_ENV *envp;
00205     DBT key, value;
00206     DB_TXN *txn;
00207     int i, j, payload, ret, thread_num;
00208     int retry_count, max_retries = 20;   /* Max retry on a deadlock */
00209     char *key_strings[] = {"key 1", "key 2", "key 3", "key 4",
00210                            "key 5", "key 6", "key 7", "key 8",
00211                            "key 9", "key 10"};
00212 
00213     dbp = (DB *)args;
00214     envp = dbp->get_env(dbp);
00215 
00216     /* Get the thread number */
00217     (void)mutex_lock(&thread_num_lock);
00218     global_thread_num++;
00219     thread_num = global_thread_num;
00220     (void)mutex_unlock(&thread_num_lock);
00221 
00222     /* Initialize the random number generator */
00223     srand(thread_num);
00224 
00225     /* Write 50 times and then quit */
00226     for (i = 0; i < 50; i++) {
00227         retry_count = 0; /* Used for deadlock retries */
00228 
00229         /*
00230          * Some think it is bad form to loop with a goto statement, but 
00231          * we do it anyway because it is the simplest and clearest way
00232          * to achieve our abort/retry operation.
00233          */
00234 retry:
00235         /* Begin our transaction. We group multiple writes in
00236          * this thread under a single transaction so as to
00237          * (1) show that you can atomically perform multiple writes 
00238          * at a time, and (2) to increase the chances of a 
00239          * deadlock occurring so that we can observe our 
00240          * deadlock detection at work.
00241          *
00242          * Normally we would want to avoid the potential for deadlocks,
00243          * so for this workload the correct thing would be to perform our 
00244          * puts with autocommit. But that would excessively simplify our 
00245          * example, so we do the "wrong" thing here instead.
00246          */
00247         ret = envp->txn_begin(envp, NULL, &txn, 0);
00248         if (ret != 0) {
00249             envp->err(envp, ret, "txn_begin failed");
00250             return (EXIT_FAILURE);
00251         }
00252         for (j = 0; j < 10; j++) {
00253             /* Set up our key and values DBTs */
00254             memset(&key, 0, sizeof(DBT));
00255             key.data = key_strings[j];
00256             key.size = (u_int32_t)strlen(key_strings[j]) + 1;
00257 
00258             memset(&value, 0, sizeof(DBT));
00259             payload = rand() + i;
00260             value.data = &payload;
00261             value.size = sizeof(int);
00262 
00263             /* Perform the database put. */
00264             switch (ret = dbp->put(dbp, txn, &key, &value, 0)) {
00265                 case 0:
00266                     break;
00267 
00268                 /*
00269                  * Here's where we perform deadlock detection. If 
00270                  * DB_LOCK_DEADLOCK is returned by the put operation, 
00271                  * then this thread has been chosen to break a deadlock.
00272                  * It must abort its operation, and optionally retry the
00273                  * put.
00274                  */
00275                 case DB_LOCK_DEADLOCK:
00276                     /* 
00277                      * First thing that we MUST do is abort the 
00278                      * transaction.
00279                      */
00280                     (void)txn->abort(txn);
00281                     /*
00282                      * Now we decide if we want to retry the operation.
00283                      * If we have retried less than max_retries,
00284                      * increment the retry count and goto retry.
00285                      */
00286                     if (retry_count < max_retries) {
00287                         printf("Writer %i: Got DB_LOCK_DEADLOCK.\n", 
00288                             thread_num);
00289                         printf("Writer %i: Retrying write operation.\n",
00290                             thread_num);
00291                         retry_count++;
00292                         goto retry;
00293                     }
00294                     /*
00295                      * Otherwise, just give up.
00296                      */
00297                     printf("Writer %i: ", thread_num);
00298                     printf("Got DB_LOCK_DEADLOCK and out of retries.\n");
00299                     printf("Writer %i: Giving up.\n", thread_num);
00300                     return (EXIT_FAILURE);
00301                 /* 
00302                  * If a generic error occurs, we simply abort the 
00303                  * transaction and exit the thread completely.
00304                  */
00305                 default:
00306                     envp->err(envp, ret, "db put failed");
00307                     ret = txn->abort(txn);
00308                     if (ret != 0)
00309                         envp->err(envp, ret, "txn abort failed");
00310                     return (EXIT_FAILURE);
00311              } 
00313         }   
00315         /* 
00316          * print the number of records found in the database. 
00317          * See count_records() for usage information.
00318          */
00319         printf("Thread %i. Record count: %i\n", thread_num, 
00320             count_records(dbp, txn));
00321 
00322         /* 
00323          * If all goes well, we can commit the transaction and
00324          * exit the thread.
00325          */
00326         ret = txn->commit(txn, 0);
00327         if (ret != 0) {
00328             envp->err(envp, ret, "txn commit failed");
00329             return (EXIT_FAILURE);
00330         }
00331     }
00332     return (EXIT_SUCCESS);
00333 }
00334 
00335 /* 
00336  * This simply counts the number of records contained in the
00337  * database and returns the result. You can use this function 
00338  * in three ways:
00339  *
00340  * First call it with an active txn handle (this is what the
00341  *  example currently does).
00342  *
00343  * Secondly, configure the cursor for uncommitted reads.
00344  *
00345  * Third, call count_records AFTER the writer has committed
00346  *    its transaction.
00347  *
00348  * If you do none of these things, the writer thread will 
00349  * self-deadlock.
00350  *
00351  * Note that this function exists only for illustrative purposes.
00352  * A more straight-forward way to count the number of records in
00353  * a database is to use DB->stat() or DB->stat_print().
00354  */
00355 
00356 int
00357 count_records(DB *dbp, DB_TXN *txn)
00358 {
00359     DBT key, value;
00360     DBC *cursorp;
00361     int count, ret;
00362 
00363     cursorp = NULL;
00364     count = 0;
00365 
00366     /* Get the cursor */
00367     ret = dbp->cursor(dbp, txn, &cursorp, 0);
00368     if (ret != 0) {
00369         dbp->err(dbp, ret,
00370           "count_records: cursor open failed.");
00371         goto cursor_err;
00372     }
00373 
00374     /* Get the key DBT used for the database read */
00375     memset(&key, 0, sizeof(DBT));
00376     memset(&value, 0, sizeof(DBT));
00377     do {
00378         ret = cursorp->c_get(cursorp, &key, &value, DB_NEXT);
00379         switch (ret) {
00380             case 0:
00381                 count++;
00382                 break;
00383             case DB_NOTFOUND:
00384                 break;
00385             default:
00386                 dbp->err(dbp, ret, 
00387                     "Count records unspecified error");
00388                 goto cursor_err;
00389         }
00390     } while (ret == 0);
00391 
00392 cursor_err:
00393     if (cursorp != NULL) {
00394         ret = cursorp->c_close(cursorp);
00395         if (ret != 0) {
00396             dbp->err(dbp, ret,
00397                 "count_records: cursor close failed.");
00398         }
00399     }
00400 
00401     return (count);
00402 }
00403 
00404 
00405 /* Open a Berkeley DB database */
00406 int
00407 open_db(DB **dbpp, const char *progname, const char *file_name,
00408   DB_ENV *envp, u_int32_t extra_flags)
00409 {
00410     int ret;
00411     u_int32_t open_flags;
00412     DB *dbp;
00413 
00414     /* Initialize the DB handle */
00415     ret = db_create(&dbp, envp, 0);
00416     if (ret != 0) {
00417         fprintf(stderr, "%s: %s\n", progname,
00418                 db_strerror(ret));
00419         return (EXIT_FAILURE);
00420     }
00421 
00422     /* Point to the memory malloc'd by db_create() */
00423     *dbpp = dbp;
00424 
00425     if (extra_flags != 0) {
00426         ret = dbp->set_flags(dbp, extra_flags);
00427         if (ret != 0) {
00428             dbp->err(dbp, ret, 
00429                 "open_db: Attempt to set extra flags failed.");
00430             return (EXIT_FAILURE);
00431         }
00432     }
00433 
00434     /* Now open the database */
00435     open_flags = DB_CREATE        | /* Allow database creation */ 
00436                  DB_THREAD        |
00437                  DB_AUTO_COMMIT;    /* Allow autocommit */
00438 
00439     ret = dbp->open(dbp,        /* Pointer to the database */
00440                     NULL,       /* Txn pointer */
00441                     file_name,  /* File name */
00442                     NULL,       /* Logical db name */
00443                     DB_BTREE,   /* Database type (using btree) */
00444                     open_flags, /* Open flags */
00445                     0);         /* File mode. Using defaults */
00446     
00447     if (ret != 0) {
00448         dbp->err(dbp, ret, "Database  open failed");
00449         return (EXIT_FAILURE);
00450     }
00451     return (EXIT_SUCCESS);
00452 }

Generated on Sun Dec 25 12:14:25 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2