00001
00002
00003 #include <iostream>
00004 #include <db_cxx.h>
00005
00006 #ifdef _WIN32
00007 #include <windows.h>
00008 #define PATHD '\\'
00009 extern "C" {
00010 extern int getopt(int, char * const *, const char *);
00011 extern char *optarg;
00012 }
00013
00014 typedef HANDLE thread_t;
00015 #define thread_create(thrp, attr, func, arg) \
00016 (((*(thrp) = CreateThread(NULL, 0, \
00017 (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
00018 #define thread_join(thr, statusp) \
00019 ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \
00020 GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
00021
00022 typedef HANDLE mutex_t;
00023 #define mutex_init(m, attr) \
00024 (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
00025 #define mutex_lock(m) \
00026 ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
00027 #define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1)
00028 #else
00029 #include <pthread.h>
00030 #include <unistd.h>
00031 #define PATHD '/'
00032
00033 typedef pthread_t thread_t;
00034 #define thread_create(thrp, attr, func, arg) \
00035 pthread_create((thrp), (attr), (func), (arg))
00036 #define thread_join(thr, statusp) pthread_join((thr), (statusp))
00037
00038 typedef pthread_mutex_t mutex_t;
00039 #define mutex_init(m, attr) pthread_mutex_init((m), (attr))
00040 #define mutex_lock(m) pthread_mutex_lock(m)
00041 #define mutex_unlock(m) pthread_mutex_unlock(m)
00042 #endif
00043
00044
00045 #define NUMWRITERS 5
00046
00047
00048
00049 int global_thread_num;
00050 mutex_t thread_num_lock;
00051
00052
00053 int countRecords(Db *, DbTxn *);
00054 int openDb(Db **, const char *, const char *, DbEnv *, u_int32_t);
00055 int usage(void);
00056 void *writerThread(void *);
00057
00058 int
00059 main(void)
00060 {
00061
00062 Db *dbp = NULL;
00063 DbEnv *envp = NULL;
00064
00065 thread_t writerThreads[NUMWRITERS];
00066 int i;
00067 u_int32_t envFlags;
00068
00069
00070 const char *progName = "TxnGuideInMemory";
00071
00072
00073 envFlags =
00074 DB_CREATE |
00075 DB_RECOVER |
00076 DB_INIT_LOCK |
00077 DB_INIT_LOG |
00078 DB_INIT_TXN |
00079
00080 DB_INIT_MPOOL |
00081 DB_PRIVATE |
00082
00083 DB_THREAD;
00084
00085 try {
00086
00087 envp = new DbEnv(0);
00088
00089
00090 envp->set_flags(DB_LOG_INMEMORY, 1);
00091
00092
00093 envp->set_lg_bsize(10 * 1024 * 1024);
00094
00095
00096 envp->set_cachesize(0, 10 * 1024 * 1024, 1);
00097
00098
00099
00100
00101
00102 envp->set_lk_detect(DB_LOCK_MINWRITE);
00103
00104
00105 envp->open(NULL, envFlags, 0);
00106
00107
00108
00109
00110
00111
00112
00113 openDb(&dbp, progName, NULL,
00114 envp, DB_DUPSORT);
00115
00116
00117 (void)mutex_init(&thread_num_lock, NULL);
00118
00119
00120 for (i = 0; i < NUMWRITERS; i++)
00121 (void)thread_create(
00122 &writerThreads[i], NULL,
00123 writerThread,
00124 (void *)dbp);
00125
00126
00127 for (i = 0; i < NUMWRITERS; i++)
00128 (void)thread_join(writerThreads[i], NULL);
00129
00130 } catch(DbException &e) {
00131 std::cerr << "Error opening database environment: "
00132 << std::endl;
00133 std::cerr << e.what() << std::endl;
00134 return (EXIT_FAILURE);
00135 }
00136
00137 try {
00138
00139 if (dbp != NULL)
00140 dbp->close(0);
00141
00142
00143 if (envp != NULL)
00144 envp->close(0);
00145 } catch(DbException &e) {
00146 std::cerr << "Error closing database and environment."
00147 << std::endl;
00148 std::cerr << e.what() << std::endl;
00149 return (EXIT_FAILURE);
00150 }
00151
00152
00153
00154 std::cout << "I'm all done." << std::endl;
00155 return (EXIT_SUCCESS);
00156 }
00157
00158
00159
00160
00161
00162
00163 void *
00164 writerThread(void *args)
00165 {
00166 int j, thread_num;
00167 int max_retries = 20;
00168 char *key_strings[] = {"key 1", "key 2", "key 3", "key 4",
00169 "key 5", "key 6", "key 7", "key 8",
00170 "key 9", "key 10"};
00171
00172 Db *dbp = (Db *)args;
00173 DbEnv *envp = dbp->get_env();
00174
00175
00176 (void)mutex_lock(&thread_num_lock);
00177 global_thread_num++;
00178 thread_num = global_thread_num;
00179 (void)mutex_unlock(&thread_num_lock);
00180
00181
00182 srand(thread_num);
00183
00184
00185 for (int i=0; i<50; i++) {
00186 DbTxn *txn;
00187 bool retry = true;
00188 int retry_count = 0;
00189
00190 while (retry) {
00191
00192
00193 try {
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207 txn = NULL;
00208 envp->txn_begin(NULL, &txn, 0);
00209
00210
00211 for (j = 0; j < 10; j++) {
00212 Dbt key, value;
00213 key.set_data(key_strings[j]);
00214 key.set_size((u_int32_t)strlen(key_strings[j]) + 1);
00215
00216 int payload = rand() + i;
00217 value.set_data(&payload);
00218 value.set_size(sizeof(int));
00219
00220
00221 dbp->put(txn, &key, &value, 0);
00222 }
00223
00224
00225
00226 std::cout << thread_num << " : Found "
00227 << countRecords(dbp, txn)
00228 << " records in the database." << std::endl;
00229
00230 std::cout << thread_num << " : committing txn : " << i
00231 << std::endl;
00232
00233
00234 try {
00235 txn->commit(0);
00236 retry = false;
00237 txn = NULL;
00238 } catch (DbException &e) {
00239 std::cout << "Error on txn commit: "
00240 << e.what() << std::endl;
00241 }
00242 } catch (DbDeadlockException &) {
00243
00244 if (txn != NULL)
00245 (void)txn->abort();
00246
00247
00248
00249
00250 if (retry_count < max_retries) {
00251 std::cerr << "############### Writer " << thread_num
00252 << ": Got DB_LOCK_DEADLOCK.\n"
00253 << "Retrying write operation." << std::endl;
00254 retry_count++;
00255 retry = true;
00256 } else {
00257
00258 std::cerr << "Writer " << thread_num
00259 << ": Got DeadLockException and out of "
00260 << "retries. Giving up." << std::endl;
00261 retry = false;
00262 }
00263 } catch (DbException &e) {
00264 std::cerr << "db put failed" << std::endl;
00265 std::cerr << e.what() << std::endl;
00266 if (txn != NULL)
00267 txn->abort();
00268 retry = false;
00269 } catch (std::exception &ee) {
00270 std::cerr << "Unknown exception: " << ee.what() << std::endl;
00271 return (0);
00272 }
00273 }
00274 }
00275 return (0);
00276 }
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296 int
00297 countRecords(Db *dbp, DbTxn *txn)
00298 {
00299
00300 Dbc *cursorp = NULL;
00301 int count = 0;
00302
00303 try {
00304
00305 dbp->cursor(txn, &cursorp, 0);
00306
00307 Dbt key, value;
00308 while (cursorp->get(&key, &value, DB_NEXT) == 0) {
00309 count++;
00310 }
00311 } catch (DbDeadlockException &de) {
00312 std::cerr << "countRecords: got deadlock" << std::endl;
00313 cursorp->close();
00314 throw de;
00315 } catch (DbException &e) {
00316 std::cerr << "countRecords error:" << std::endl;
00317 std::cerr << e.what() << std::endl;
00318 }
00319
00320 if (cursorp != NULL) {
00321 try {
00322 cursorp->close();
00323 } catch (DbException &e) {
00324 std::cerr << "countRecords: cursor close failed:" << std::endl;
00325 std::cerr << e.what() << std::endl;
00326 }
00327 }
00328
00329 return (count);
00330 }
00331
00332
00333
00334 int
00335 openDb(Db **dbpp, const char *progname, const char *fileName,
00336 DbEnv *envp, u_int32_t extraFlags)
00337 {
00338 int ret;
00339 u_int32_t openFlags;
00340
00341 try {
00342 Db *dbp = new Db(envp, 0);
00343
00344
00345 *dbpp = dbp;
00346
00347 if (extraFlags != 0)
00348 ret = dbp->set_flags(extraFlags);
00349
00350
00351 openFlags = DB_CREATE |
00352 DB_THREAD |
00353 DB_AUTO_COMMIT;
00354
00355 dbp->open(NULL,
00356 fileName,
00357 NULL,
00358 DB_BTREE,
00359 openFlags,
00360 0);
00361 } catch (DbException &e) {
00362 std::cerr << progname << ": openDb: db open failed:" << std::endl;
00363 std::cerr << e.what() << std::endl;
00364 return (EXIT_FAILURE);
00365 }
00366
00367 return (EXIT_SUCCESS);
00368 }
00369