00001
00002
00003
00004 #include <stdio.h>
00005 #include <stdlib.h>
00006 #include <string.h>
00007 #include <db.h>
00008
00009 #ifdef _WIN32
00010 #include <windows.h>
00011 #define PATHD '\\'
00012 extern int getopt(int, char * const *, const char *);
00013 extern char *optarg;
00014
00015 typedef HANDLE thread_t;
00016 #define thread_create(thrp, attr, func, arg) \
00017 (((*(thrp) = CreateThread(NULL, 0, \
00018 (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
00019 #define thread_join(thr, statusp) \
00020 ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \
00021 GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
00022
00023 typedef HANDLE mutex_t;
00024 #define mutex_init(m, attr) \
00025 (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
00026 #define mutex_lock(m) \
00027 ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
00028 #define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1)
00029 #else
00030 #include <pthread.h>
00031 #include <unistd.h>
00032 #define PATHD '/'
00033
00034 typedef pthread_t thread_t;
00035 #define thread_create(thrp, attr, func, arg) \
00036 pthread_create((thrp), (attr), (func), (arg))
00037 #define thread_join(thr, statusp) pthread_join((thr), (statusp))
00038
00039 typedef pthread_mutex_t mutex_t;
00040 #define mutex_init(m, attr) pthread_mutex_init((m), (attr))
00041 #define mutex_lock(m) pthread_mutex_lock(m)
00042 #define mutex_unlock(m) pthread_mutex_unlock(m)
00043 #endif
00044
00045
00046 #define NUMWRITERS 5
00047
00048
00049
00050
00051
00052 int global_thread_num;
00053 mutex_t thread_num_lock;
00054
00055
00056 int count_records(DB *, DB_TXN *);
00057 int open_db(DB **, const char *, const char *, DB_ENV *, u_int32_t);
00058 int usage(void);
00059 int writer_thread(void *);
00060
00061
00062 int
00063 usage()
00064 {
00065 fprintf(stderr, " [-h <database_home_directory>]\n");
00066 return (EXIT_FAILURE);
00067 }
00068
00069
00070 int
00071 main(int argc, char *argv[])
00072 {
00073
00074 DB *dbp = NULL;
00075 DB_ENV *envp = NULL;
00076
00077 thread_t writer_threads[NUMWRITERS];
00078 int ch, i, ret, ret_t;
00079 u_int32_t env_flags;
00080 char *db_home_dir;
00081
00082 const char *prog_name = "txn_guide";
00083
00084 const char *file_name = "mydb.db";
00085
00086
00087 #ifdef _WIN32
00088 db_home_dir = ".\\";
00089 #else
00090 db_home_dir = "./";
00091 #endif
00092 while ((ch = getopt(argc, argv, "h:")) != EOF)
00093 switch (ch) {
00094 case 'h':
00095 db_home_dir = optarg;
00096 break;
00097 case '?':
00098 default:
00099 return (usage());
00100 }
00101
00102
00103 ret = db_env_create(&envp, 0);
00104 if (ret != 0) {
00105 fprintf(stderr, "Error creating environment handle: %s\n",
00106 db_strerror(ret));
00107 goto err;
00108 }
00109
00110
00111
00112
00113
00114
00115
00116 ret = envp->set_lk_detect(envp, DB_LOCK_MINWRITE);
00117 if (ret != 0) {
00118 fprintf(stderr, "Error setting lock detect: %s\n",
00119 db_strerror(ret));
00120 goto err;
00121 }
00122
00123
00124 env_flags =
00125 DB_CREATE |
00126 DB_RECOVER |
00127 DB_INIT_LOCK |
00128 DB_INIT_LOG |
00129 DB_INIT_TXN |
00130
00131 DB_INIT_MPOOL |
00132 DB_THREAD;
00133
00134
00135 ret = envp->open(envp, db_home_dir, env_flags, 0);
00136 if (ret != 0) {
00137 fprintf(stderr, "Error opening environment: %s\n",
00138 db_strerror(ret));
00139 goto err;
00140 }
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150 ret = open_db(&dbp, prog_name, file_name,
00151 envp, DB_DUPSORT);
00152 if (ret != 0)
00153 goto err;
00154
00155
00156 (void)mutex_init(&thread_num_lock, NULL);
00157
00158
00159 for (i = 0; i < NUMWRITERS; i++)
00160 (void)thread_create(&writer_threads[i], NULL,
00161 (void *)writer_thread, (void *)dbp);
00162
00163
00164 for (i = 0; i < NUMWRITERS; i++)
00165 (void)thread_join(writer_threads[i], NULL);
00166
00167 err:
00168
00169 if (dbp != NULL) {
00170 ret_t = dbp->close(dbp, 0);
00171 if (ret_t != 0) {
00172 fprintf(stderr, "%s database close failed: %s\n",
00173 file_name, db_strerror(ret_t));
00174 ret = ret_t;
00175 }
00176 }
00177
00178
00179 if (envp != NULL) {
00180 ret_t = envp->close(envp, 0);
00181 if (ret_t != 0) {
00182 fprintf(stderr, "environment close failed: %s\n",
00183 db_strerror(ret_t));
00184 ret = ret_t;
00185 }
00186 }
00187
00188
00189 printf("I'm all done.\n");
00190 return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
00191 }
00192
00193
00194
00195
00196
00197
00198
00199
00200 int
00201 writer_thread(void *args)
00202 {
00203 DB *dbp;
00204 DB_ENV *envp;
00205
00206 DBT key, value;
00207 DB_TXN *txn;
00208 int i, j, payload, ret, thread_num;
00209 int retry_count, max_retries = 20;
00210 char *key_strings[] = {"key 1", "key 2", "key 3", "key 4",
00211 "key 5", "key 6", "key 7", "key 8",
00212 "key 9", "key 10"};
00213
00214 dbp = (DB *)args;
00215 envp = dbp->get_env(dbp);
00216
00217
00218 (void)mutex_lock(&thread_num_lock);
00219 global_thread_num++;
00220 thread_num = global_thread_num;
00221 (void)mutex_unlock(&thread_num_lock);
00222
00223
00224 srand(thread_num);
00225
00226
00227
00228 for (i = 0; i < 50; i++) {
00229 retry_count = 0;
00230
00231
00232
00233
00234
00235
00236 retry:
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249 ret = envp->txn_begin(envp, NULL, &txn, 0);
00250 if (ret != 0) {
00251 envp->err(envp, ret, "txn_begin failed");
00252 return (EXIT_FAILURE);
00253 }
00254 for (j = 0; j < 10; j++) {
00255
00256 memset(&key, 0, sizeof(DBT));
00257 key.data = key_strings[j];
00258 key.size = (u_int32_t)strlen(key_strings[j]) + 1;
00259
00260 memset(&value, 0, sizeof(DBT));
00261 payload = rand() + i;
00262 value.data = &payload;
00263 value.size = sizeof(int);
00264
00265
00266 switch (ret = dbp->put(dbp, txn, &key, &value, 0)) {
00267 case 0:
00268 break;
00269
00270
00271
00272
00273
00274
00275
00276
00277 case DB_KEYEXIST:
00278 printf("Got keyexists.\n");
00279 break;
00280
00281
00282
00283
00284
00285
00286
00287 case DB_LOCK_DEADLOCK:
00288
00289
00290
00291
00292 (void)txn->abort(txn);
00293
00294
00295
00296
00297
00298 if (retry_count < max_retries) {
00299 printf("Writer %i: Got DB_LOCK_DEADLOCK.\n",
00300 thread_num);
00301 printf("Writer %i: Retrying write operation.\n",
00302 thread_num);
00303 retry_count++;
00304 goto retry;
00305 }
00306
00307
00308
00309 printf("Writer %i: ", thread_num);
00310 printf("Got DB_LOCK_DEADLOCK and out of retries.\n");
00311 printf("Writer %i: Giving up.\n", thread_num);
00312 return (EXIT_FAILURE);
00313
00314
00315
00316
00317 default:
00318 envp->err(envp, ret, "db put failed");
00319 ret = txn->abort(txn);
00320 if (ret != 0)
00321 envp->err(envp, ret,
00322 "txn abort failed");
00323 return (EXIT_FAILURE);
00324 }
00326 }
00328
00329
00330
00331
00332 printf("Thread %i. Record count: %i\n", thread_num,
00333 count_records(dbp, NULL));
00334
00335
00336
00337
00338
00339 ret = txn->commit(txn, 0);
00340 if (ret != 0) {
00341 envp->err(envp, ret, "txn commit failed");
00342 return (EXIT_FAILURE);
00343 }
00344 }
00345 return (EXIT_SUCCESS);
00346 }
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367 int
00368 count_records(DB *dbp, DB_TXN *txn)
00369 {
00370
00371 DBT key, value;
00372 DBC *cursorp;
00373 int count, ret;
00374
00375 cursorp = NULL;
00376 count = 0;
00377
00378
00379 ret = dbp->cursor(dbp, txn, &cursorp,
00380 DB_READ_UNCOMMITTED);
00381 if (ret != 0) {
00382 dbp->err(dbp, ret,
00383 "count_records: cursor open failed.");
00384 goto cursor_err;
00385 }
00386
00387
00388 memset(&key, 0, sizeof(DBT));
00389 memset(&value, 0, sizeof(DBT));
00390 do {
00391 ret = cursorp->c_get(cursorp, &key, &value, DB_NEXT);
00392 switch (ret) {
00393 case 0:
00394 count++;
00395 break;
00396 case DB_NOTFOUND:
00397 break;
00398 default:
00399 dbp->err(dbp, ret,
00400 "Count records unspecified error");
00401 goto cursor_err;
00402 }
00403 } while (ret == 0);
00404
00405 cursor_err:
00406 if (cursorp != NULL) {
00407 ret = cursorp->c_close(cursorp);
00408 if (ret != 0) {
00409 dbp->err(dbp, ret,
00410 "count_records: cursor close failed.");
00411 }
00412 }
00413
00414 return (count);
00415 }
00416
00417
00418
00419 int
00420 open_db(DB **dbpp, const char *progname, const char *file_name,
00421 DB_ENV *envp, u_int32_t extra_flags)
00422 {
00423 int ret;
00424 u_int32_t open_flags;
00425 DB *dbp;
00426
00427
00428 ret = db_create(&dbp, envp, 0);
00429 if (ret != 0) {
00430 fprintf(stderr, "%s: %s\n", progname,
00431 db_strerror(ret));
00432 return (EXIT_FAILURE);
00433 }
00434
00435
00436 *dbpp = dbp;
00437
00438 if (extra_flags != 0) {
00439 ret = dbp->set_flags(dbp, extra_flags);
00440 if (ret != 0) {
00441 dbp->err(dbp, ret,
00442 "open_db: Attempt to set extra flags failed.");
00443 return (EXIT_FAILURE);
00444 }
00445 }
00446
00447
00448 open_flags = DB_CREATE |
00449 DB_READ_UNCOMMITTED |
00450 DB_AUTO_COMMIT;
00451
00452 ret = dbp->open(dbp,
00453 NULL,
00454 file_name,
00455 NULL,
00456 DB_BTREE,
00457 open_flags,
00458 0);
00459 if (ret != 0) {
00460 dbp->err(dbp, ret, "Database '%s' open failed",
00461 file_name);
00462 return (EXIT_FAILURE);
00463 }
00464 return (EXIT_SUCCESS);
00465 }