00001
00002
00003 #include <iostream>
00004 #include <db_cxx.h>
00005
00006 #ifdef _WIN32
00007 #include <windows.h>
00008 extern "C" {
00009 extern int getopt(int, char * const *, const char *);
00010 extern char *optarg;
00011 }
00012 #define PATHD '\\'
00013
00014 typedef HANDLE thread_t;
00015 #define thread_create(thrp, attr, func, arg) \
00016 (((*(thrp) = CreateThread(NULL, 0, \
00017 (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
00018 #define thread_join(thr, statusp) \
00019 ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \
00020 GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
00021
00022 typedef HANDLE mutex_t;
00023 #define mutex_init(m, attr) \
00024 (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
00025 #define mutex_lock(m) \
00026 ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
00027 #define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1)
00028 #else
00029 #include <pthread.h>
00030 #include <unistd.h>
00031 #define PATHD '/'
00032
00033 typedef pthread_t thread_t;
00034 #define thread_create(thrp, attr, func, arg) \
00035 pthread_create((thrp), (attr), (func), (arg))
00036 #define thread_join(thr, statusp) pthread_join((thr), (statusp))
00037
00038 typedef pthread_mutex_t mutex_t;
00039 #define mutex_init(m, attr) pthread_mutex_init((m), (attr))
00040 #define mutex_lock(m) pthread_mutex_lock(m)
00041 #define mutex_unlock(m) pthread_mutex_unlock(m)
00042 #endif
00043
00044
00045 #define NUMWRITERS 5
00046
00047
00048
00049 int global_thread_num;
00050 mutex_t thread_num_lock;
00051
00052
00053 int countRecords(Db *, DbTxn *);
00054 int openDb(Db **, const char *, const char *, DbEnv *, u_int32_t);
00055 int usage(void);
00056 void *writerThread(void *);
00057
00058
00059 int
00060 usage()
00061 {
00062 std::cerr << " [-h <database_home_directory>]" << std::endl;
00063 return (EXIT_FAILURE);
00064 }
00065
00066 int
00067 main(int argc, char *argv[])
00068 {
00069
00070 Db *dbp = NULL;
00071 DbEnv *envp = NULL;
00072
00073 thread_t writerThreads[NUMWRITERS];
00074 int ch, i;
00075 u_int32_t envFlags;
00076 char *dbHomeDir;
00077 extern char *optarg;
00078
00079
00080 const char *progName = "TxnGuide";
00081
00082
00083 const char *fileName = "mydb.db";
00084
00085
00086 #ifdef _WIN32
00087 dbHomeDir = ".\\";
00088 #else
00089 dbHomeDir = "./";
00090 #endif
00091 while ((ch = getopt(argc, argv, "h:")) != EOF)
00092 switch (ch) {
00093 case 'h':
00094 dbHomeDir = optarg;
00095 break;
00096 case '?':
00097 default:
00098 return (usage());
00099 }
00100
00101
00102
00103 envFlags =
00104 DB_CREATE |
00105 DB_RECOVER |
00106 DB_INIT_LOCK |
00107 DB_INIT_LOG |
00108 DB_INIT_TXN |
00109
00110 DB_INIT_MPOOL |
00111 DB_THREAD;
00112
00113 try {
00114
00115 envp = new DbEnv(0);
00116
00117
00118
00119
00120
00121 envp->set_lk_detect(DB_LOCK_MINWRITE);
00122
00123 envp->open(dbHomeDir, envFlags, 0);
00124
00125
00126
00127
00128
00129
00130
00131
00132 openDb(&dbp, progName, fileName,
00133 envp, DB_DUPSORT);
00134
00135
00136 (void)mutex_init(&thread_num_lock, NULL);
00137
00138
00139 for (i = 0; i < NUMWRITERS; i++)
00140 (void)thread_create(
00141 &writerThreads[i], NULL,
00142 writerThread, (void *)dbp);
00143
00144
00145 for (i = 0; i < NUMWRITERS; i++)
00146 (void)thread_join(writerThreads[i], NULL);
00147
00148 } catch(DbException &e) {
00149 std::cerr << "Error opening database environment: "
00150 << dbHomeDir << std::endl;
00151 std::cerr << e.what() << std::endl;
00152 return (EXIT_FAILURE);
00153 }
00154
00155 try {
00156
00157 if (dbp != NULL)
00158 dbp->close(0);
00159
00160
00161 if (envp != NULL)
00162 envp->close(0);
00163 } catch(DbException &e) {
00164 std::cerr << "Error closing database and environment."
00165 << std::endl;
00166 std::cerr << e.what() << std::endl;
00167 return (EXIT_FAILURE);
00168 }
00169
00170
00171
00172 std::cout << "I'm all done." << std::endl;
00173 return (EXIT_SUCCESS);
00174 }
00175
00176
00177
00178
00179
00180
00181 void *
00182 writerThread(void *args)
00183 {
00184 int j, thread_num;
00185 int max_retries = 20;
00186 char *key_strings[] = {"key 1", "key 2", "key 3", "key 4",
00187 "key 5", "key 6", "key 7", "key 8",
00188 "key 9", "key 10"};
00189
00190 Db *dbp = (Db *)args;
00191 DbEnv *envp = dbp->get_env();
00192
00193
00194 (void)mutex_lock(&thread_num_lock);
00195 global_thread_num++;
00196 thread_num = global_thread_num;
00197 (void)mutex_unlock(&thread_num_lock);
00198
00199
00200 srand(thread_num);
00201
00202
00203 for (int i=0; i<50; i++) {
00204 DbTxn *txn;
00205 bool retry = true;
00206 int retry_count = 0;
00207
00208 while (retry) {
00209
00210
00211 try {
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225 txn = NULL;
00226 envp->txn_begin(NULL, &txn, 0);
00227
00228
00229 for (j = 0; j < 10; j++) {
00230 Dbt key, value;
00231 key.set_data(key_strings[j]);
00232 key.set_size((u_int32_t)strlen(key_strings[j]) + 1);
00233
00234 int payload = rand() + i;
00235 value.set_data(&payload);
00236 value.set_size(sizeof(int));
00237
00238
00239 dbp->put(txn, &key, &value, 0);
00240 }
00241
00242
00243
00244 std::cout << thread_num << " : Found "
00245 << countRecords(dbp, NULL)
00246 << " records in the database." << std::endl;
00247
00248 std::cout << thread_num << " : committing txn : " << i
00249 << std::endl;
00250
00251
00252 try {
00253 txn->commit(0);
00254 retry = false;
00255 txn = NULL;
00256 } catch (DbException &e) {
00257 std::cout << "Error on txn commit: "
00258 << e.what() << std::endl;
00259 }
00260 } catch (DbDeadlockException &) {
00261
00262 if (txn != NULL)
00263 (void)txn->abort();
00264
00265
00266
00267
00268 if (retry_count < max_retries) {
00269 std::cout << "############### Writer " << thread_num
00270 << ": Got DB_LOCK_DEADLOCK.\n"
00271 << "Retrying write operation."
00272 << std::endl;
00273 retry_count++;
00274 retry = true;
00275 } else {
00276
00277 std::cerr << "Writer " << thread_num
00278 << ": Got DeadLockException and out of "
00279 << "retries. Giving up." << std::endl;
00280 retry = false;
00281 }
00282 } catch (DbException &e) {
00283 std::cerr << "db put failed" << std::endl;
00284 std::cerr << e.what() << std::endl;
00285 if (txn != NULL)
00286 txn->abort();
00287 retry = false;
00288 } catch (std::exception &ee) {
00289 std::cerr << "Unknown exception: " << ee.what() << std::endl;
00290 return (0);
00291 }
00292 }
00293 }
00294 return (0);
00295 }
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314 int
00315 countRecords(Db *dbp, DbTxn *txn)
00316 {
00317
00318 Dbc *cursorp = NULL;
00319 int count = 0;
00320
00321 try {
00322
00323 dbp->cursor(txn, &cursorp, DB_READ_UNCOMMITTED);
00324
00325 Dbt key, value;
00326 while (cursorp->get(&key, &value, DB_NEXT) == 0) {
00327 count++;
00328 }
00329 } catch (DbDeadlockException &de) {
00330 std::cerr << "countRecords: got deadlock" << std::endl;
00331 cursorp->close();
00332 throw de;
00333 } catch (DbException &e) {
00334 std::cerr << "countRecords error:" << std::endl;
00335 std::cerr << e.what() << std::endl;
00336 }
00337
00338 if (cursorp != NULL) {
00339 try {
00340 cursorp->close();
00341 } catch (DbException &e) {
00342 std::cerr << "countRecords: cursor close failed:" << std::endl;
00343 std::cerr << e.what() << std::endl;
00344 }
00345 }
00346
00347 return (count);
00348 }
00349
00350
00351
00352 int
00353 openDb(Db **dbpp, const char *progname, const char *fileName,
00354 DbEnv *envp, u_int32_t extraFlags)
00355 {
00356 int ret;
00357 u_int32_t openFlags;
00358
00359 try {
00360 Db *dbp = new Db(envp, 0);
00361
00362
00363 *dbpp = dbp;
00364
00365 if (extraFlags != 0)
00366 ret = dbp->set_flags(extraFlags);
00367
00368
00369 openFlags = DB_CREATE |
00370 DB_READ_UNCOMMITTED |
00371 DB_AUTO_COMMIT;
00372
00373 dbp->open(NULL,
00374 fileName,
00375 NULL,
00376 DB_BTREE,
00377 openFlags,
00378 0);
00379 } catch (DbException &e) {
00380 std::cerr << progname << ": openDb: db open failed:" << std::endl;
00381 std::cerr << e.what() << std::endl;
00382 return (EXIT_FAILURE);
00383 }
00384
00385 return (EXIT_SUCCESS);
00386 }
00387