Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

ex_thread.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 1997-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  *
00007  * $Id: ex_thread.c,v 12.2 2005/10/21 17:52:33 bostic Exp $
00008  */
00009 
00010 #include <sys/types.h>
00011 #include <sys/time.h>
00012 
00013 #include <errno.h>
00014 #include <pthread.h>
00015 #include <signal.h>
00016 #include <stdio.h>
00017 #include <stdlib.h>
00018 #include <string.h>
00019 #include <time.h>
00020 
00021 #ifdef _WIN32
00022 extern int getopt(int, char * const *, const char *);
00023 #else
00024 #include <unistd.h>
00025 #endif
00026 
00027 #include <db.h>
00028 
00029 /*
00030  * NB: This application is written using POSIX 1003.1b-1993 pthreads
00031  * interfaces, which may not be portable to your system.
00032  */
00033 extern int sched_yield __P((void));             /* Pthread yield function. */
00034 
00035 int     db_init __P((const char *));
00036 void   *deadlock __P((void *));
00037 void    fatal __P((const char *, int, int));
00038 void    onint __P((int));
00039 int     main __P((int, char *[]));
00040 int     reader __P((int));
00041 void    stats __P((void));
00042 void   *trickle __P((void *));
00043 void   *tstart __P((void *));
00044 int     usage __P((void));
00045 void    word __P((void));
00046 int     writer __P((int));
00047 
00048 int     quit;                                   /* Interrupt handling flag. */
00049 
00050 struct _statistics {
00051         int aborted;                            /* Write. */
00052         int aborts;                             /* Read/write. */
00053         int adds;                               /* Write. */
00054         int deletes;                            /* Write. */
00055         int txns;                               /* Write. */
00056         int found;                              /* Read. */
00057         int notfound;                           /* Read. */
00058 } *perf;
00059 
00060 const char
00061         *progname = "ex_thread";                /* Program name. */
00062 
00063 #define DATABASE        "access.db"             /* Database name. */
00064 #define WORDLIST        "../test/wordlist"      /* Dictionary. */
00065 
00066 /*
00067  * We can seriously increase the number of collisions and transaction
00068  * aborts by yielding the scheduler after every DB call.  Specify the
00069  * -p option to do this.
00070  */
00071 int     punish;                                 /* -p */
00072 int     nlist;                                  /* -n */
00073 int     nreaders;                               /* -r */
00074 int     verbose;                                /* -v */
00075 int     nwriters;                               /* -w */
00076 
00077 DB     *dbp;                                    /* Database handle. */
00078 DB_ENV *dbenv;                                  /* Database environment. */
00079 int     nthreads;                               /* Total threads. */
00080 char  **list;                                   /* Word list. */
00081 
00082 /*
00083  * ex_thread --
00084  *      Run a simple threaded application of some numbers of readers and
00085  *      writers competing for a set of words.
00086  *
00087  * Example UNIX shell script to run this program:
00088  *      % rm -rf TESTDIR
00089  *      % mkdir TESTDIR
00090  *      % ex_thread -h TESTDIR
00091  */
00092 int
00093 main(argc, argv)
00094         int argc;
00095         char *argv[];
00096 {
00097         extern char *optarg;
00098         extern int errno, optind;
00099         DB_TXN *txnp;
00100         pthread_t *tids;
00101         int ch, i, ret;
00102         const char *home;
00103         void *retp;
00104 
00105         txnp = NULL;
00106         nlist = 1000;
00107         nreaders = nwriters = 4;
00108         home = "TESTDIR";
00109         while ((ch = getopt(argc, argv, "h:pn:r:vw:")) != EOF)
00110                 switch (ch) {
00111                 case 'h':
00112                         home = optarg;
00113                         break;
00114                 case 'p':
00115                         punish = 1;
00116                         break;
00117                 case 'n':
00118                         nlist = atoi(optarg);
00119                         break;
00120                 case 'r':
00121                         nreaders = atoi(optarg);
00122                         break;
00123                 case 'v':
00124                         verbose = 1;
00125                         break;
00126                 case 'w':
00127                         nwriters = atoi(optarg);
00128                         break;
00129                 case '?':
00130                 default:
00131                         return (usage());
00132                 }
00133         argc -= optind;
00134         argv += optind;
00135 
00136         /* Initialize the random number generator. */
00137         srand(getpid() | time(NULL));
00138 
00139         /* Register the signal handler. */
00140         (void)signal(SIGINT, onint);
00141 
00142         /* Build the key list. */
00143         word();
00144 
00145         /* Remove the previous database. */
00146         (void)remove(DATABASE);
00147 
00148         /* Initialize the database environment. */
00149         if ((ret = db_init(home)) != 0)
00150                 return (ret);
00151 
00152         /* Initialize the database. */
00153         if ((ret = db_create(&dbp, dbenv, 0)) != 0) {
00154                 dbenv->err(dbenv, ret, "db_create");
00155                 (void)dbenv->close(dbenv, 0);
00156                 return (EXIT_FAILURE);
00157         }
00158         if ((ret = dbp->set_pagesize(dbp, 1024)) != 0) {
00159                 dbp->err(dbp, ret, "set_pagesize");
00160                 goto err;
00161         }
00162 
00163         if ((ret = dbenv->txn_begin(dbenv, NULL, &txnp, 0)) != 0)
00164                 fatal("txn_begin", ret, 1);
00165         if ((ret = dbp->open(dbp, txnp,
00166              DATABASE, NULL, DB_BTREE, DB_CREATE | DB_THREAD, 0664)) != 0) {
00167                 dbp->err(dbp, ret, "%s: open", DATABASE);
00168                 goto err;
00169         } else {
00170                 ret = txnp->commit(txnp, 0);
00171                 txnp = NULL;
00172                 if (ret != 0)
00173                         goto err;
00174         }
00175 
00176         nthreads = nreaders + nwriters + 2;
00177         printf("Running: readers %d, writers %d\n", nreaders, nwriters);
00178         fflush(stdout);
00179 
00180         /* Create statistics structures, offset by 1. */
00181         if ((perf = calloc(nreaders + nwriters + 1, sizeof(*perf))) == NULL)
00182                 fatal(NULL, errno, 1);
00183 
00184         /* Create thread ID structures. */
00185         if ((tids = malloc(nthreads * sizeof(pthread_t))) == NULL)
00186                 fatal(NULL, errno, 1);
00187 
00188         /* Create reader/writer threads. */
00189         for (i = 0; i < nreaders + nwriters; ++i)
00190                 if ((ret = pthread_create(
00191                     &tids[i], NULL, tstart, (void *)(uintptr_t)i)) != 0)
00192                         fatal("pthread_create", ret > 0 ? ret : errno, 1);
00193 
00194         /* Create buffer pool trickle thread. */
00195         if (pthread_create(&tids[i], NULL, trickle, &i))
00196                 fatal("pthread_create", errno, 1);
00197         ++i;
00198 
00199         /* Create deadlock detector thread. */
00200         if (pthread_create(&tids[i], NULL, deadlock, &i))
00201                 fatal("pthread_create", errno, 1);
00202 
00203         /* Wait for the threads. */
00204         for (i = 0; i < nthreads; ++i)
00205                 (void)pthread_join(tids[i], &retp);
00206 
00207         printf("Exiting\n");
00208         stats();
00209 
00210 err:    if (txnp != NULL)
00211                 (void)txnp->abort(txnp);
00212         (void)dbp->close(dbp, 0);
00213         (void)dbenv->close(dbenv, 0);
00214 
00215         return (EXIT_SUCCESS);
00216 }
00217 
00218 int
00219 reader(id)
00220         int id;
00221 {
00222         DBT key, data;
00223         int n, ret;
00224         char buf[64];
00225 
00226         /*
00227          * DBT's must use local memory or malloc'd memory if the DB handle
00228          * is accessed in a threaded fashion.
00229          */
00230         memset(&key, 0, sizeof(DBT));
00231         memset(&data, 0, sizeof(DBT));
00232         data.flags = DB_DBT_MALLOC;
00233 
00234         /*
00235          * Read-only threads do not require transaction protection, unless
00236          * there's a need for repeatable reads.
00237          */
00238         while (!quit) {
00239                 /* Pick a key at random, and look it up. */
00240                 n = rand() % nlist;
00241                 key.data = list[n];
00242                 key.size = strlen(key.data);
00243 
00244                 if (verbose) {
00245                         sprintf(buf, "reader: %d: list entry %d\n", id, n);
00246                         write(STDOUT_FILENO, buf, strlen(buf));
00247                 }
00248 
00249                 switch (ret = dbp->get(dbp, NULL, &key, &data, 0)) {
00250                 case DB_LOCK_DEADLOCK:          /* Deadlock. */
00251                         ++perf[id].aborts;
00252                         break;
00253                 case 0:                         /* Success. */
00254                         ++perf[id].found;
00255                         free(data.data);
00256                         break;
00257                 case DB_NOTFOUND:               /* Not found. */
00258                         ++perf[id].notfound;
00259                         break;
00260                 default:
00261                         sprintf(buf,
00262                             "reader %d: dbp->get: %s", id, (char *)key.data);
00263                         fatal(buf, ret, 0);
00264                 }
00265         }
00266         return (0);
00267 }
00268 
00269 int
00270 writer(id)
00271         int id;
00272 {
00273         DBT key, data;
00274         DB_TXN *tid;
00275         time_t now, then;
00276         int n, ret;
00277         char buf[256], dbuf[10000];
00278 
00279         time(&now);
00280         then = now;
00281 
00282         /*
00283          * DBT's must use local memory or malloc'd memory if the DB handle
00284          * is accessed in a threaded fashion.
00285          */
00286         memset(&key, 0, sizeof(DBT));
00287         memset(&data, 0, sizeof(DBT));
00288         data.data = dbuf;
00289         data.ulen = sizeof(dbuf);
00290         data.flags = DB_DBT_USERMEM;
00291 
00292         while (!quit) {
00293                 /* Pick a random key. */
00294                 n = rand() % nlist;
00295                 key.data = list[n];
00296                 key.size = strlen(key.data);
00297 
00298                 if (verbose) {
00299                         sprintf(buf, "writer: %d: list entry %d\n", id, n);
00300                         write(STDOUT_FILENO, buf, strlen(buf));
00301                 }
00302 
00303                 /* Abort and retry. */
00304                 if (0) {
00305 retry:                  if ((ret = tid->abort(tid)) != 0)
00306                                 fatal("DB_TXN->abort", ret, 1);
00307                         ++perf[id].aborts;
00308                         ++perf[id].aborted;
00309                 }
00310 
00311                 /* Thread #1 prints out the stats every 20 seconds. */
00312                 if (id == 1) {
00313                         time(&now);
00314                         if (now - then >= 20) {
00315                                 stats();
00316                                 then = now;
00317                         }
00318                 }
00319 
00320                 /* Begin the transaction. */
00321                 if ((ret = dbenv->txn_begin(dbenv, NULL, &tid, 0)) != 0)
00322                         fatal("txn_begin", ret, 1);
00323 
00324                 /*
00325                  * Get the key.  If it doesn't exist, add it.  If it does
00326                  * exist, delete it.
00327                  */
00328                 switch (ret = dbp->get(dbp, tid, &key, &data, 0)) {
00329                 case DB_LOCK_DEADLOCK:
00330                         goto retry;
00331                 case 0:
00332                         goto delete;
00333                 case DB_NOTFOUND:
00334                         goto add;
00335                 }
00336 
00337                 sprintf(buf, "writer: %d: dbp->get", id);
00338                 fatal(buf, ret, 1);
00339                 /* NOTREACHED */
00340 
00341 delete:         /* Delete the key. */
00342                 switch (ret = dbp->del(dbp, tid, &key, 0)) {
00343                 case DB_LOCK_DEADLOCK:
00344                         goto retry;
00345                 case 0:
00346                         ++perf[id].deletes;
00347                         goto commit;
00348                 }
00349 
00350                 sprintf(buf, "writer: %d: dbp->del", id);
00351                 fatal(buf, ret, 1);
00352                 /* NOTREACHED */
00353 
00354 add:            /* Add the key.  1 data item in 30 is an overflow item. */
00355                 data.size = 20 + rand() % 128;
00356                 if (rand() % 30 == 0)
00357                         data.size += 8192;
00358 
00359                 switch (ret = dbp->put(dbp, tid, &key, &data, 0)) {
00360                 case DB_LOCK_DEADLOCK:
00361                         goto retry;
00362                 case 0:
00363                         ++perf[id].adds;
00364                         goto commit;
00365                 default:
00366                         sprintf(buf, "writer: %d: dbp->put", id);
00367                         fatal(buf, ret, 1);
00368                 }
00369 
00370 commit:         /* The transaction finished, commit it. */
00371                 if ((ret = tid->commit(tid, 0)) != 0)
00372                         fatal("DB_TXN->commit", ret, 1);
00373 
00374                 /*
00375                  * Every time the thread completes 20 transactions, show
00376                  * our progress.
00377                  */
00378                 if (++perf[id].txns % 20 == 0) {
00379                         sprintf(buf,
00380 "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d\n",
00381                             id, perf[id].adds, perf[id].deletes,
00382                             perf[id].aborts, perf[id].txns);
00383                         write(STDOUT_FILENO, buf, strlen(buf));
00384                 }
00385 
00386                 /*
00387                  * If this thread was aborted more than 5 times before
00388                  * the transaction finished, complain.
00389                  */
00390                 if (perf[id].aborted > 5) {
00391                         sprintf(buf,
00392 "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d: ABORTED: %2d\n",
00393                             id, perf[id].adds, perf[id].deletes,
00394                             perf[id].aborts, perf[id].txns, perf[id].aborted);
00395                         write(STDOUT_FILENO, buf, strlen(buf));
00396                 }
00397                 perf[id].aborted = 0;
00398         }
00399         return (0);
00400 }
00401 
00402 /*
00403  * stats --
00404  *      Display reader/writer thread statistics.  To display the statistics
00405  *      for the mpool trickle or deadlock threads, use db_stat(1).
00406  */
00407 void
00408 stats()
00409 {
00410         int id;
00411         char *p, buf[8192];
00412 
00413         p = buf + sprintf(buf, "-------------\n");
00414         for (id = 0; id < nreaders + nwriters;)
00415                 if (id++ < nwriters)
00416                         p += sprintf(p,
00417         "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d\n",
00418                             id, perf[id].adds,
00419                             perf[id].deletes, perf[id].aborts, perf[id].txns);
00420                 else
00421                         p += sprintf(p,
00422         "reader: %2d: found: %5d: notfound: %5d: aborts: %4d\n",
00423                             id, perf[id].found,
00424                             perf[id].notfound, perf[id].aborts);
00425         p += sprintf(p, "-------------\n");
00426 
00427         write(STDOUT_FILENO, buf, p - buf);
00428 }
00429 
00430 /*
00431  * db_init --
00432  *      Initialize the environment.
00433  */
00434 int
00435 db_init(home)
00436         const char *home;
00437 {
00438         int ret;
00439 
00440         if ((ret = db_env_create(&dbenv, 0)) != 0) {
00441                 fprintf(stderr,
00442                     "%s: db_env_create: %s\n", progname, db_strerror(ret));
00443                 return (EXIT_FAILURE);
00444         }
00445         if (punish) {
00446                 (void)dbenv->set_flags(dbenv, DB_YIELDCPU, 1);
00447                 (void)db_env_set_func_yield(sched_yield);
00448         }
00449 
00450         dbenv->set_errfile(dbenv, stderr);
00451         dbenv->set_errpfx(dbenv, progname);
00452         (void)dbenv->set_cachesize(dbenv, 0, 100 * 1024, 0);
00453         (void)dbenv->set_lg_max(dbenv, 200000);
00454 
00455         if ((ret = dbenv->open(dbenv, home,
00456             DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG |
00457             DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, 0)) != 0) {
00458                 dbenv->err(dbenv, ret, NULL);
00459                 (void)dbenv->close(dbenv, 0);
00460                 return (EXIT_FAILURE);
00461         }
00462 
00463         return (0);
00464 }
00465 
00466 /*
00467  * tstart --
00468  *      Thread start function for readers and writers.
00469  */
00470 void *
00471 tstart(arg)
00472         void *arg;
00473 {
00474         pthread_t tid;
00475         u_int id;
00476 
00477         id = (uintptr_t)arg + 1;
00478 
00479         tid = pthread_self();
00480 
00481         if (id <= (u_int)nwriters) {
00482                 printf("write thread %d starting: tid: %lu\n", id, (u_long)tid);
00483                 fflush(stdout);
00484                 writer(id);
00485         } else {
00486                 printf("read thread %d starting: tid: %lu\n", id, (u_long)tid);
00487                 fflush(stdout);
00488                 reader(id);
00489         }
00490 
00491         /* NOTREACHED */
00492         return (NULL);
00493 }
00494 
00495 /*
00496  * deadlock --
00497  *      Thread start function for DB_ENV->lock_detect.
00498  */
00499 void *
00500 deadlock(arg)
00501         void *arg;
00502 {
00503         struct timeval t;
00504         pthread_t tid;
00505 
00506         arg = arg;                              /* XXX: shut the compiler up. */
00507         tid = pthread_self();
00508 
00509         printf("deadlock thread starting: tid: %lu\n", (u_long)tid);
00510         fflush(stdout);
00511 
00512         t.tv_sec = 0;
00513         t.tv_usec = 100000;
00514         while (!quit) {
00515                 (void)dbenv->lock_detect(dbenv, 0, DB_LOCK_YOUNGEST, NULL);
00516 
00517                 /* Check every 100ms. */
00518                 (void)select(0, NULL, NULL, NULL, &t);
00519         }
00520 
00521         return (NULL);
00522 }
00523 
00524 /*
00525  * trickle --
00526  *      Thread start function for memp_trickle.
00527  */
00528 void *
00529 trickle(arg)
00530         void *arg;
00531 {
00532         pthread_t tid;
00533         int wrote;
00534         char buf[64];
00535 
00536         arg = arg;                              /* XXX: shut the compiler up. */
00537         tid = pthread_self();
00538 
00539         printf("trickle thread starting: tid: %lu\n", (u_long)tid);
00540         fflush(stdout);
00541 
00542         while (!quit) {
00543                 (void)dbenv->memp_trickle(dbenv, 10, &wrote);
00544                 if (verbose) {
00545                         sprintf(buf, "trickle: wrote %d\n", wrote);
00546                         write(STDOUT_FILENO, buf, strlen(buf));
00547                 }
00548                 if (wrote == 0) {
00549                         sleep(1);
00550                         sched_yield();
00551                 }
00552         }
00553 
00554         return (NULL);
00555 }
00556 
00557 /*
00558  * word --
00559  *      Build the dictionary word list.
00560  */
00561 void
00562 word()
00563 {
00564         FILE *fp;
00565         int cnt;
00566         char buf[256];
00567 
00568         if ((fp = fopen(WORDLIST, "r")) == NULL)
00569                 fatal(WORDLIST, errno, 1);
00570 
00571         if ((list = malloc(nlist * sizeof(char *))) == NULL)
00572                 fatal(NULL, errno, 1);
00573 
00574         for (cnt = 0; cnt < nlist; ++cnt) {
00575                 if (fgets(buf, sizeof(buf), fp) == NULL)
00576                         break;
00577                 if ((list[cnt] = strdup(buf)) == NULL)
00578                         fatal(NULL, errno, 1);
00579         }
00580         nlist = cnt;            /* In case nlist was larger than possible. */
00581 }
00582 
00583 /*
00584  * fatal --
00585  *      Report a fatal error and quit.
00586  */
00587 void
00588 fatal(msg, err, syserr)
00589         const char *msg;
00590         int err, syserr;
00591 {
00592         fprintf(stderr, "%s: ", progname);
00593         if (msg != NULL) {
00594                 fprintf(stderr, "%s", msg);
00595                 if (syserr)
00596                         fprintf(stderr, ": ");
00597         }
00598         if (syserr)
00599                 fprintf(stderr, "%s", strerror(err));
00600         fprintf(stderr, "\n");
00601         exit(EXIT_FAILURE);
00602 
00603         /* NOTREACHED */
00604 }
00605 
00606 /*
00607  * usage --
00608  *      Usage message.
00609  */
00610 int
00611 usage()
00612 {
00613         (void)fprintf(stderr,
00614     "usage: %s [-pv] [-h home] [-n words] [-r readers] [-w writers]\n",
00615             progname);
00616         return (EXIT_FAILURE);
00617 }
00618 
00619 /*
00620  * onint --
00621  *      Interrupt signal handler.
00622  */
00623 void
00624 onint(signo)
00625         int signo;
00626 {
00627         signo = 0;              /* Quiet compiler. */
00628         quit = 1;
00629 }

Generated on Sun Dec 25 12:14:25 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2