00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include <sys/types.h>
00011 #include <sys/time.h>
00012
00013 #include <errno.h>
00014 #include <pthread.h>
00015 #include <signal.h>
00016 #include <stdio.h>
00017 #include <stdlib.h>
00018 #include <string.h>
00019 #include <time.h>
00020
00021 #ifdef _WIN32
00022 extern int getopt(int, char * const *, const char *);
00023 #else
00024 #include <unistd.h>
00025 #endif
00026
00027 #include <db.h>
00028
00029
00030
00031
00032
00033 extern int sched_yield __P((void));
00034
00035 int db_init __P((const char *));
00036 void *deadlock __P((void *));
00037 void fatal __P((const char *, int, int));
00038 void onint __P((int));
00039 int main __P((int, char *[]));
00040 int reader __P((int));
00041 void stats __P((void));
00042 void *trickle __P((void *));
00043 void *tstart __P((void *));
00044 int usage __P((void));
00045 void word __P((void));
00046 int writer __P((int));
00047
00048 int quit;
00049
00050 struct _statistics {
00051 int aborted;
00052 int aborts;
00053 int adds;
00054 int deletes;
00055 int txns;
00056 int found;
00057 int notfound;
00058 } *perf;
00059
00060 const char
00061 *progname = "ex_thread";
00062
00063 #define DATABASE "access.db"
00064 #define WORDLIST "../test/wordlist"
00065
00066
00067
00068
00069
00070
00071 int punish;
00072 int nlist;
00073 int nreaders;
00074 int verbose;
00075 int nwriters;
00076
00077 DB *dbp;
00078 DB_ENV *dbenv;
00079 int nthreads;
00080 char **list;
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092 int
00093 main(argc, argv)
00094 int argc;
00095 char *argv[];
00096 {
00097 extern char *optarg;
00098 extern int errno, optind;
00099 DB_TXN *txnp;
00100 pthread_t *tids;
00101 int ch, i, ret;
00102 const char *home;
00103 void *retp;
00104
00105 txnp = NULL;
00106 nlist = 1000;
00107 nreaders = nwriters = 4;
00108 home = "TESTDIR";
00109 while ((ch = getopt(argc, argv, "h:pn:r:vw:")) != EOF)
00110 switch (ch) {
00111 case 'h':
00112 home = optarg;
00113 break;
00114 case 'p':
00115 punish = 1;
00116 break;
00117 case 'n':
00118 nlist = atoi(optarg);
00119 break;
00120 case 'r':
00121 nreaders = atoi(optarg);
00122 break;
00123 case 'v':
00124 verbose = 1;
00125 break;
00126 case 'w':
00127 nwriters = atoi(optarg);
00128 break;
00129 case '?':
00130 default:
00131 return (usage());
00132 }
00133 argc -= optind;
00134 argv += optind;
00135
00136
00137 srand(getpid() | time(NULL));
00138
00139
00140 (void)signal(SIGINT, onint);
00141
00142
00143 word();
00144
00145
00146 (void)remove(DATABASE);
00147
00148
00149 if ((ret = db_init(home)) != 0)
00150 return (ret);
00151
00152
00153 if ((ret = db_create(&dbp, dbenv, 0)) != 0) {
00154 dbenv->err(dbenv, ret, "db_create");
00155 (void)dbenv->close(dbenv, 0);
00156 return (EXIT_FAILURE);
00157 }
00158 if ((ret = dbp->set_pagesize(dbp, 1024)) != 0) {
00159 dbp->err(dbp, ret, "set_pagesize");
00160 goto err;
00161 }
00162
00163 if ((ret = dbenv->txn_begin(dbenv, NULL, &txnp, 0)) != 0)
00164 fatal("txn_begin", ret, 1);
00165 if ((ret = dbp->open(dbp, txnp,
00166 DATABASE, NULL, DB_BTREE, DB_CREATE | DB_THREAD, 0664)) != 0) {
00167 dbp->err(dbp, ret, "%s: open", DATABASE);
00168 goto err;
00169 } else {
00170 ret = txnp->commit(txnp, 0);
00171 txnp = NULL;
00172 if (ret != 0)
00173 goto err;
00174 }
00175
00176 nthreads = nreaders + nwriters + 2;
00177 printf("Running: readers %d, writers %d\n", nreaders, nwriters);
00178 fflush(stdout);
00179
00180
00181 if ((perf = calloc(nreaders + nwriters + 1, sizeof(*perf))) == NULL)
00182 fatal(NULL, errno, 1);
00183
00184
00185 if ((tids = malloc(nthreads * sizeof(pthread_t))) == NULL)
00186 fatal(NULL, errno, 1);
00187
00188
00189 for (i = 0; i < nreaders + nwriters; ++i)
00190 if ((ret = pthread_create(
00191 &tids[i], NULL, tstart, (void *)(uintptr_t)i)) != 0)
00192 fatal("pthread_create", ret > 0 ? ret : errno, 1);
00193
00194
00195 if (pthread_create(&tids[i], NULL, trickle, &i))
00196 fatal("pthread_create", errno, 1);
00197 ++i;
00198
00199
00200 if (pthread_create(&tids[i], NULL, deadlock, &i))
00201 fatal("pthread_create", errno, 1);
00202
00203
00204 for (i = 0; i < nthreads; ++i)
00205 (void)pthread_join(tids[i], &retp);
00206
00207 printf("Exiting\n");
00208 stats();
00209
00210 err: if (txnp != NULL)
00211 (void)txnp->abort(txnp);
00212 (void)dbp->close(dbp, 0);
00213 (void)dbenv->close(dbenv, 0);
00214
00215 return (EXIT_SUCCESS);
00216 }
00217
00218 int
00219 reader(id)
00220 int id;
00221 {
00222 DBT key, data;
00223 int n, ret;
00224 char buf[64];
00225
00226
00227
00228
00229
00230 memset(&key, 0, sizeof(DBT));
00231 memset(&data, 0, sizeof(DBT));
00232 data.flags = DB_DBT_MALLOC;
00233
00234
00235
00236
00237
00238 while (!quit) {
00239
00240 n = rand() % nlist;
00241 key.data = list[n];
00242 key.size = strlen(key.data);
00243
00244 if (verbose) {
00245 sprintf(buf, "reader: %d: list entry %d\n", id, n);
00246 write(STDOUT_FILENO, buf, strlen(buf));
00247 }
00248
00249 switch (ret = dbp->get(dbp, NULL, &key, &data, 0)) {
00250 case DB_LOCK_DEADLOCK:
00251 ++perf[id].aborts;
00252 break;
00253 case 0:
00254 ++perf[id].found;
00255 free(data.data);
00256 break;
00257 case DB_NOTFOUND:
00258 ++perf[id].notfound;
00259 break;
00260 default:
00261 sprintf(buf,
00262 "reader %d: dbp->get: %s", id, (char *)key.data);
00263 fatal(buf, ret, 0);
00264 }
00265 }
00266 return (0);
00267 }
00268
00269 int
00270 writer(id)
00271 int id;
00272 {
00273 DBT key, data;
00274 DB_TXN *tid;
00275 time_t now, then;
00276 int n, ret;
00277 char buf[256], dbuf[10000];
00278
00279 time(&now);
00280 then = now;
00281
00282
00283
00284
00285
00286 memset(&key, 0, sizeof(DBT));
00287 memset(&data, 0, sizeof(DBT));
00288 data.data = dbuf;
00289 data.ulen = sizeof(dbuf);
00290 data.flags = DB_DBT_USERMEM;
00291
00292 while (!quit) {
00293
00294 n = rand() % nlist;
00295 key.data = list[n];
00296 key.size = strlen(key.data);
00297
00298 if (verbose) {
00299 sprintf(buf, "writer: %d: list entry %d\n", id, n);
00300 write(STDOUT_FILENO, buf, strlen(buf));
00301 }
00302
00303
00304 if (0) {
00305 retry: if ((ret = tid->abort(tid)) != 0)
00306 fatal("DB_TXN->abort", ret, 1);
00307 ++perf[id].aborts;
00308 ++perf[id].aborted;
00309 }
00310
00311
00312 if (id == 1) {
00313 time(&now);
00314 if (now - then >= 20) {
00315 stats();
00316 then = now;
00317 }
00318 }
00319
00320
00321 if ((ret = dbenv->txn_begin(dbenv, NULL, &tid, 0)) != 0)
00322 fatal("txn_begin", ret, 1);
00323
00324
00325
00326
00327
00328 switch (ret = dbp->get(dbp, tid, &key, &data, 0)) {
00329 case DB_LOCK_DEADLOCK:
00330 goto retry;
00331 case 0:
00332 goto delete;
00333 case DB_NOTFOUND:
00334 goto add;
00335 }
00336
00337 sprintf(buf, "writer: %d: dbp->get", id);
00338 fatal(buf, ret, 1);
00339
00340
00341 delete:
00342 switch (ret = dbp->del(dbp, tid, &key, 0)) {
00343 case DB_LOCK_DEADLOCK:
00344 goto retry;
00345 case 0:
00346 ++perf[id].deletes;
00347 goto commit;
00348 }
00349
00350 sprintf(buf, "writer: %d: dbp->del", id);
00351 fatal(buf, ret, 1);
00352
00353
00354 add:
00355 data.size = 20 + rand() % 128;
00356 if (rand() % 30 == 0)
00357 data.size += 8192;
00358
00359 switch (ret = dbp->put(dbp, tid, &key, &data, 0)) {
00360 case DB_LOCK_DEADLOCK:
00361 goto retry;
00362 case 0:
00363 ++perf[id].adds;
00364 goto commit;
00365 default:
00366 sprintf(buf, "writer: %d: dbp->put", id);
00367 fatal(buf, ret, 1);
00368 }
00369
00370 commit:
00371 if ((ret = tid->commit(tid, 0)) != 0)
00372 fatal("DB_TXN->commit", ret, 1);
00373
00374
00375
00376
00377
00378 if (++perf[id].txns % 20 == 0) {
00379 sprintf(buf,
00380 "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d\n",
00381 id, perf[id].adds, perf[id].deletes,
00382 perf[id].aborts, perf[id].txns);
00383 write(STDOUT_FILENO, buf, strlen(buf));
00384 }
00385
00386
00387
00388
00389
00390 if (perf[id].aborted > 5) {
00391 sprintf(buf,
00392 "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d: ABORTED: %2d\n",
00393 id, perf[id].adds, perf[id].deletes,
00394 perf[id].aborts, perf[id].txns, perf[id].aborted);
00395 write(STDOUT_FILENO, buf, strlen(buf));
00396 }
00397 perf[id].aborted = 0;
00398 }
00399 return (0);
00400 }
00401
00402
00403
00404
00405
00406
00407 void
00408 stats()
00409 {
00410 int id;
00411 char *p, buf[8192];
00412
00413 p = buf + sprintf(buf, "-------------\n");
00414 for (id = 0; id < nreaders + nwriters;)
00415 if (id++ < nwriters)
00416 p += sprintf(p,
00417 "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d\n",
00418 id, perf[id].adds,
00419 perf[id].deletes, perf[id].aborts, perf[id].txns);
00420 else
00421 p += sprintf(p,
00422 "reader: %2d: found: %5d: notfound: %5d: aborts: %4d\n",
00423 id, perf[id].found,
00424 perf[id].notfound, perf[id].aborts);
00425 p += sprintf(p, "-------------\n");
00426
00427 write(STDOUT_FILENO, buf, p - buf);
00428 }
00429
00430
00431
00432
00433
00434 int
00435 db_init(home)
00436 const char *home;
00437 {
00438 int ret;
00439
00440 if ((ret = db_env_create(&dbenv, 0)) != 0) {
00441 fprintf(stderr,
00442 "%s: db_env_create: %s\n", progname, db_strerror(ret));
00443 return (EXIT_FAILURE);
00444 }
00445 if (punish) {
00446 (void)dbenv->set_flags(dbenv, DB_YIELDCPU, 1);
00447 (void)db_env_set_func_yield(sched_yield);
00448 }
00449
00450 dbenv->set_errfile(dbenv, stderr);
00451 dbenv->set_errpfx(dbenv, progname);
00452 (void)dbenv->set_cachesize(dbenv, 0, 100 * 1024, 0);
00453 (void)dbenv->set_lg_max(dbenv, 200000);
00454
00455 if ((ret = dbenv->open(dbenv, home,
00456 DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG |
00457 DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, 0)) != 0) {
00458 dbenv->err(dbenv, ret, NULL);
00459 (void)dbenv->close(dbenv, 0);
00460 return (EXIT_FAILURE);
00461 }
00462
00463 return (0);
00464 }
00465
00466
00467
00468
00469
00470 void *
00471 tstart(arg)
00472 void *arg;
00473 {
00474 pthread_t tid;
00475 u_int id;
00476
00477 id = (uintptr_t)arg + 1;
00478
00479 tid = pthread_self();
00480
00481 if (id <= (u_int)nwriters) {
00482 printf("write thread %d starting: tid: %lu\n", id, (u_long)tid);
00483 fflush(stdout);
00484 writer(id);
00485 } else {
00486 printf("read thread %d starting: tid: %lu\n", id, (u_long)tid);
00487 fflush(stdout);
00488 reader(id);
00489 }
00490
00491
00492 return (NULL);
00493 }
00494
00495
00496
00497
00498
00499 void *
00500 deadlock(arg)
00501 void *arg;
00502 {
00503 struct timeval t;
00504 pthread_t tid;
00505
00506 arg = arg;
00507 tid = pthread_self();
00508
00509 printf("deadlock thread starting: tid: %lu\n", (u_long)tid);
00510 fflush(stdout);
00511
00512 t.tv_sec = 0;
00513 t.tv_usec = 100000;
00514 while (!quit) {
00515 (void)dbenv->lock_detect(dbenv, 0, DB_LOCK_YOUNGEST, NULL);
00516
00517
00518 (void)select(0, NULL, NULL, NULL, &t);
00519 }
00520
00521 return (NULL);
00522 }
00523
00524
00525
00526
00527
00528 void *
00529 trickle(arg)
00530 void *arg;
00531 {
00532 pthread_t tid;
00533 int wrote;
00534 char buf[64];
00535
00536 arg = arg;
00537 tid = pthread_self();
00538
00539 printf("trickle thread starting: tid: %lu\n", (u_long)tid);
00540 fflush(stdout);
00541
00542 while (!quit) {
00543 (void)dbenv->memp_trickle(dbenv, 10, &wrote);
00544 if (verbose) {
00545 sprintf(buf, "trickle: wrote %d\n", wrote);
00546 write(STDOUT_FILENO, buf, strlen(buf));
00547 }
00548 if (wrote == 0) {
00549 sleep(1);
00550 sched_yield();
00551 }
00552 }
00553
00554 return (NULL);
00555 }
00556
00557
00558
00559
00560
00561 void
00562 word()
00563 {
00564 FILE *fp;
00565 int cnt;
00566 char buf[256];
00567
00568 if ((fp = fopen(WORDLIST, "r")) == NULL)
00569 fatal(WORDLIST, errno, 1);
00570
00571 if ((list = malloc(nlist * sizeof(char *))) == NULL)
00572 fatal(NULL, errno, 1);
00573
00574 for (cnt = 0; cnt < nlist; ++cnt) {
00575 if (fgets(buf, sizeof(buf), fp) == NULL)
00576 break;
00577 if ((list[cnt] = strdup(buf)) == NULL)
00578 fatal(NULL, errno, 1);
00579 }
00580 nlist = cnt;
00581 }
00582
00583
00584
00585
00586
00587 void
00588 fatal(msg, err, syserr)
00589 const char *msg;
00590 int err, syserr;
00591 {
00592 fprintf(stderr, "%s: ", progname);
00593 if (msg != NULL) {
00594 fprintf(stderr, "%s", msg);
00595 if (syserr)
00596 fprintf(stderr, ": ");
00597 }
00598 if (syserr)
00599 fprintf(stderr, "%s", strerror(err));
00600 fprintf(stderr, "\n");
00601 exit(EXIT_FAILURE);
00602
00603
00604 }
00605
00606
00607
00608
00609
00610 int
00611 usage()
00612 {
00613 (void)fprintf(stderr,
00614 "usage: %s [-pv] [-h home] [-n words] [-r readers] [-w writers]\n",
00615 progname);
00616 return (EXIT_FAILURE);
00617 }
00618
00619
00620
00621
00622
00623 void
00624 onint(signo)
00625 int signo;
00626 {
00627 signo = 0;
00628 quit = 1;
00629 }