00001
00002
00003
00004 #include <stdio.h>
00005 #include <stdlib.h>
00006 #include <string.h>
00007 #include <db.h>
00008
00009 #ifdef _WIN32
00010 #include <windows.h>
00011 #define PATHD '\\'
00012 extern int getopt(int, char * const *, const char *);
00013 extern char *optarg;
00014
00015 typedef HANDLE thread_t;
00016 #define thread_create(thrp, attr, func, arg) \
00017 (((*(thrp) = CreateThread(NULL, 0, \
00018 (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
00019 #define thread_join(thr, statusp) \
00020 ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \
00021 GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
00022
00023 typedef HANDLE mutex_t;
00024 #define mutex_init(m, attr) \
00025 (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
00026 #define mutex_lock(m) \
00027 ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
00028 #define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1)
00029 #else
00030 #include <pthread.h>
00031 #include <unistd.h>
00032 #define PATHD '/'
00033
00034 typedef pthread_t thread_t;
00035 #define thread_create(thrp, attr, func, arg) \
00036 pthread_create((thrp), (attr), (func), (arg))
00037 #define thread_join(thr, statusp) pthread_join((thr), (statusp))
00038
00039 typedef pthread_mutex_t mutex_t;
00040 #define mutex_init(m, attr) pthread_mutex_init((m), (attr))
00041 #define mutex_lock(m) pthread_mutex_lock(m)
00042 #define mutex_unlock(m) pthread_mutex_unlock(m)
00043 #endif
00044
00045
00046 #define NUMWRITERS 5
00047
00048
00049
00050
00051
00052 int global_thread_num;
00053 mutex_t thread_num_lock;
00054
00055
00056 int count_records(DB *, DB_TXN *);
00057 int open_db(DB **, const char *, const char *, DB_ENV *, u_int32_t);
00058 int writer_thread(void *);
00059
00060 int
00061 main(void)
00062 {
00063
00064 DB *dbp = NULL;
00065 DB_ENV *envp = NULL;
00066
00067 thread_t writer_threads[NUMWRITERS];
00068 int i, ret, ret_t;
00069 u_int32_t env_flags;
00070
00071
00072 const char *prog_name = "txn_guide_inmemory";
00073
00074
00075 ret = db_env_create(&envp, 0);
00076 if (ret != 0) {
00077 fprintf(stderr, "Error creating environment handle: %s\n",
00078 db_strerror(ret));
00079 goto err;
00080 }
00081
00082 env_flags =
00083 DB_CREATE |
00084 DB_INIT_LOCK |
00085 DB_INIT_LOG |
00086 DB_INIT_TXN |
00087
00088 DB_INIT_MPOOL |
00089 DB_PRIVATE |
00090
00091 DB_THREAD;
00092
00093
00094 ret = envp->set_flags(envp, DB_LOG_INMEMORY, 1);
00095 if (ret != 0) {
00096 fprintf(stderr, "Error setting log subsystem to in-memory: %s\n",
00097 db_strerror(ret));
00098 goto err;
00099 }
00100
00101
00102
00103
00104 ret = envp->set_lg_bsize(envp, 10 * 1024 * 1024);
00105 if (ret != 0) {
00106 fprintf(stderr, "Error increasing the log buffer size: %s\n",
00107 db_strerror(ret));
00108 goto err;
00109 }
00110
00111
00112
00113
00114 ret = envp->set_cachesize(envp, 0, 10 * 1024 * 1024, 1);
00115 if (ret != 0) {
00116 fprintf(stderr, "Error increasing the cache size: %s\n",
00117 db_strerror(ret));
00118 goto err;
00119 }
00120
00121
00122
00123
00124
00125
00126
00127 ret = envp->set_lk_detect(envp, DB_LOCK_MINWRITE);
00128 if (ret != 0) {
00129 fprintf(stderr, "Error setting lock detect: %s\n",
00130 db_strerror(ret));
00131 goto err;
00132 }
00133
00134
00135 ret = envp->open(envp, NULL, env_flags, 0);
00136 if (ret != 0) {
00137 fprintf(stderr, "Error opening environment: %s\n",
00138 db_strerror(ret));
00139 goto err;
00140 }
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150 ret = open_db(&dbp, prog_name, NULL,
00151 envp, DB_DUPSORT);
00152 if (ret != 0)
00153 goto err;
00154
00155
00156 (void)mutex_init(&thread_num_lock, NULL);
00157
00158
00159 for (i = 0; i < NUMWRITERS; i++)
00160 (void)thread_create(
00161 &writer_threads[i], NULL, (void *)writer_thread, (void *)dbp);
00162
00163
00164 for (i = 0; i < NUMWRITERS; i++)
00165 (void)thread_join(writer_threads[i], NULL);
00166
00167 err:
00168
00169 if (dbp != NULL) {
00170 ret_t = dbp->close(dbp, 0);
00171 if (ret_t != 0) {
00172 fprintf(stderr, "%s database close failed.\n",
00173 db_strerror(ret_t));
00174 ret = ret_t;
00175 }
00176 }
00177
00178
00179 if (envp != NULL) {
00180 ret_t = envp->close(envp, 0);
00181 if (ret_t != 0) {
00182 fprintf(stderr, "environment close failed: %s\n",
00183 db_strerror(ret_t));
00184 ret = ret_t;
00185 }
00186 }
00187
00188
00189 printf("I'm all done.\n");
00190 return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
00191 }
00192
00193
00194
00195
00196
00197
00198
00199
00200 int
00201 writer_thread(void *args)
00202 {
00203 DB *dbp;
00204 DB_ENV *envp;
00205 DBT key, value;
00206 DB_TXN *txn;
00207 int i, j, payload, ret, thread_num;
00208 int retry_count, max_retries = 20;
00209 char *key_strings[] = {"key 1", "key 2", "key 3", "key 4",
00210 "key 5", "key 6", "key 7", "key 8",
00211 "key 9", "key 10"};
00212
00213 dbp = (DB *)args;
00214 envp = dbp->get_env(dbp);
00215
00216
00217 (void)mutex_lock(&thread_num_lock);
00218 global_thread_num++;
00219 thread_num = global_thread_num;
00220 (void)mutex_unlock(&thread_num_lock);
00221
00222
00223 srand(thread_num);
00224
00225
00226 for (i = 0; i < 50; i++) {
00227 retry_count = 0;
00228
00229
00230
00231
00232
00233
00234 retry:
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247 ret = envp->txn_begin(envp, NULL, &txn, 0);
00248 if (ret != 0) {
00249 envp->err(envp, ret, "txn_begin failed");
00250 return (EXIT_FAILURE);
00251 }
00252 for (j = 0; j < 10; j++) {
00253
00254 memset(&key, 0, sizeof(DBT));
00255 key.data = key_strings[j];
00256 key.size = (u_int32_t)strlen(key_strings[j]) + 1;
00257
00258 memset(&value, 0, sizeof(DBT));
00259 payload = rand() + i;
00260 value.data = &payload;
00261 value.size = sizeof(int);
00262
00263
00264 switch (ret = dbp->put(dbp, txn, &key, &value, 0)) {
00265 case 0:
00266 break;
00267
00268
00269
00270
00271
00272
00273
00274
00275 case DB_LOCK_DEADLOCK:
00276
00277
00278
00279
00280 (void)txn->abort(txn);
00281
00282
00283
00284
00285
00286 if (retry_count < max_retries) {
00287 printf("Writer %i: Got DB_LOCK_DEADLOCK.\n",
00288 thread_num);
00289 printf("Writer %i: Retrying write operation.\n",
00290 thread_num);
00291 retry_count++;
00292 goto retry;
00293 }
00294
00295
00296
00297 printf("Writer %i: ", thread_num);
00298 printf("Got DB_LOCK_DEADLOCK and out of retries.\n");
00299 printf("Writer %i: Giving up.\n", thread_num);
00300 return (EXIT_FAILURE);
00301
00302
00303
00304
00305 default:
00306 envp->err(envp, ret, "db put failed");
00307 ret = txn->abort(txn);
00308 if (ret != 0)
00309 envp->err(envp, ret, "txn abort failed");
00310 return (EXIT_FAILURE);
00311 }
00313 }
00315
00316
00317
00318
00319 printf("Thread %i. Record count: %i\n", thread_num,
00320 count_records(dbp, txn));
00321
00322
00323
00324
00325
00326 ret = txn->commit(txn, 0);
00327 if (ret != 0) {
00328 envp->err(envp, ret, "txn commit failed");
00329 return (EXIT_FAILURE);
00330 }
00331 }
00332 return (EXIT_SUCCESS);
00333 }
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356 int
00357 count_records(DB *dbp, DB_TXN *txn)
00358 {
00359 DBT key, value;
00360 DBC *cursorp;
00361 int count, ret;
00362
00363 cursorp = NULL;
00364 count = 0;
00365
00366
00367 ret = dbp->cursor(dbp, txn, &cursorp, 0);
00368 if (ret != 0) {
00369 dbp->err(dbp, ret,
00370 "count_records: cursor open failed.");
00371 goto cursor_err;
00372 }
00373
00374
00375 memset(&key, 0, sizeof(DBT));
00376 memset(&value, 0, sizeof(DBT));
00377 do {
00378 ret = cursorp->c_get(cursorp, &key, &value, DB_NEXT);
00379 switch (ret) {
00380 case 0:
00381 count++;
00382 break;
00383 case DB_NOTFOUND:
00384 break;
00385 default:
00386 dbp->err(dbp, ret,
00387 "Count records unspecified error");
00388 goto cursor_err;
00389 }
00390 } while (ret == 0);
00391
00392 cursor_err:
00393 if (cursorp != NULL) {
00394 ret = cursorp->c_close(cursorp);
00395 if (ret != 0) {
00396 dbp->err(dbp, ret,
00397 "count_records: cursor close failed.");
00398 }
00399 }
00400
00401 return (count);
00402 }
00403
00404
00405
00406 int
00407 open_db(DB **dbpp, const char *progname, const char *file_name,
00408 DB_ENV *envp, u_int32_t extra_flags)
00409 {
00410 int ret;
00411 u_int32_t open_flags;
00412 DB *dbp;
00413
00414
00415 ret = db_create(&dbp, envp, 0);
00416 if (ret != 0) {
00417 fprintf(stderr, "%s: %s\n", progname,
00418 db_strerror(ret));
00419 return (EXIT_FAILURE);
00420 }
00421
00422
00423 *dbpp = dbp;
00424
00425 if (extra_flags != 0) {
00426 ret = dbp->set_flags(dbp, extra_flags);
00427 if (ret != 0) {
00428 dbp->err(dbp, ret,
00429 "open_db: Attempt to set extra flags failed.");
00430 return (EXIT_FAILURE);
00431 }
00432 }
00433
00434
00435 open_flags = DB_CREATE |
00436 DB_THREAD |
00437 DB_AUTO_COMMIT;
00438
00439 ret = dbp->open(dbp,
00440 NULL,
00441 file_name,
00442 NULL,
00443 DB_BTREE,
00444 open_flags,
00445 0);
00446
00447 if (ret != 0) {
00448 dbp->err(dbp, ret, "Database open failed");
00449 return (EXIT_FAILURE);
00450 }
00451 return (EXIT_SUCCESS);
00452 }