00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include "db_config.h"
00011
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #if TIME_WITH_SYS_TIME
00014 #include <sys/time.h>
00015 #include <time.h>
00016 #else
00017 #if HAVE_SYS_TIME_H
00018 #include <sys/time.h>
00019 #else
00020 #include <time.h>
00021 #endif
00022 #endif
00023
00024 #include <stdlib.h>
00025 #include <string.h>
00026 #endif
00027
00028 #include "db_int.h"
00029 #include "dbinc/log.h"
00030 #include "dbinc/txn.h"
00031 #ifdef REP_DIAGNOSTIC
00032 #include "dbinc/db_page.h"
00033 #include "dbinc/fop.h"
00034 #include "dbinc/btree.h"
00035 #include "dbinc/hash.h"
00036 #include "dbinc/qam.h"
00037 #endif
00038
00039
00040
00041
00042
00043
00044
00045 #define TIMESTAMP_CHECK(dbenv, ts, renv) do { \
00046 if (renv->op_timestamp != 0 && \
00047 renv->op_timestamp + DB_REGENV_TIMEOUT < ts) { \
00048 REP_SYSTEM_LOCK(dbenv); \
00049 F_CLR(renv, DB_REGENV_REPLOCKED); \
00050 renv->op_timestamp = 0; \
00051 REP_SYSTEM_UNLOCK(dbenv); \
00052 } \
00053 } while (0)
00054
00055 #ifdef REP_DIAGNOSTIC
00056 static void __rep_print_logmsg __P((DB_ENV *, const DBT *, DB_LSN *));
00057 #endif
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069 int
00070 __rep_bulk_message(dbenv, bulk, repth, lsn, dbt, flags)
00071 DB_ENV *dbenv;
00072 REP_BULK *bulk;
00073 REP_THROTTLE *repth;
00074 DB_LSN *lsn;
00075 const DBT *dbt;
00076 u_int32_t flags;
00077 {
00078 DB_REP *db_rep;
00079 REP *rep;
00080 int ret;
00081 u_int32_t recsize, typemore;
00082 u_int8_t *p;
00083 #ifdef DIAGNOSTIC
00084 DB_MSGBUF mb;
00085 #endif
00086
00087 db_rep = dbenv->rep_handle;
00088 rep = db_rep->region;
00089 ret = 0;
00090
00091
00092
00093
00094 recsize = dbt->size + sizeof(DB_LSN) + sizeof(dbt->size);
00095
00096
00097
00098
00099
00100 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00101 while (FLD_ISSET(*(bulk->flagsp), BULK_XMIT)) {
00102 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00103 __os_sleep(dbenv, 1, 0);
00104 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00105 }
00106
00107
00108
00109
00110
00111
00112
00113 if (recsize > bulk->len) {
00114 RPRINT(dbenv, rep, (dbenv, &mb,
00115 "bulk_msg: Record %d (0x%x) larger than entire buffer 0x%x",
00116 recsize, recsize, bulk->len));
00117 rep->stat.st_bulk_overflows++;
00118 (void)__rep_send_bulk(dbenv, bulk, flags);
00119
00120
00121
00122 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00123 return (DB_REP_BULKOVF);
00124 }
00125
00126
00127
00128
00129
00130
00131 while (recsize + *(bulk->offp) > bulk->len) {
00132 RPRINT(dbenv, rep, (dbenv, &mb,
00133 "bulk_msg: Record %lu (%#lx) doesn't fit. Send %lu (%#lx) now.",
00134 (u_long)recsize, (u_long)recsize,
00135 (u_long)bulk->len, (u_long)bulk->len));
00136 rep->stat.st_bulk_fills++;
00137 if ((ret = __rep_send_bulk(dbenv, bulk, flags)) != 0)
00138 break;
00139 }
00140
00141
00142
00143
00144
00145
00146
00147
00148 if (bulk->type == REP_BULK_LOG)
00149 typemore = REP_LOG_MORE;
00150 else
00151 typemore = REP_PAGE_MORE;
00152 if (repth != NULL &&
00153 (ret = __rep_send_throttle(dbenv, bulk->eid, repth,
00154 REP_THROTTLE_ONLY)) == 0 && repth->type == typemore) {
00155 RPRINT(dbenv, rep, (dbenv, &mb,
00156 "bulk_msg: Record %d (0x%x) hit throttle limit.",
00157 recsize, recsize));
00158 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00159 return (ret);
00160 }
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171 p = bulk->addr + *(bulk->offp);
00172 memcpy(p, &dbt->size, sizeof(dbt->size));
00173 p += sizeof(dbt->size);
00174
00175
00176
00177
00178
00179 memcpy(p, lsn, sizeof(DB_LSN));
00180 RPRINT(dbenv, rep, (dbenv, &mb,
00181 "bulk_msg: Copying LSN [%lu][%lu] of %lu bytes to %#lx",
00182 (u_long)lsn->file, (u_long)lsn->offset, (u_long)dbt->size,
00183 P_TO_ULONG(p)));
00184 p += sizeof(DB_LSN);
00185
00186
00187
00188
00189 if (*(bulk->offp) == 0)
00190 bulk->lsn = *lsn;
00191
00192
00193
00194 memcpy(p, dbt->data, dbt->size);
00195 p += dbt->size;
00196 *(bulk->offp) = (uintptr_t)p - (uintptr_t)bulk->addr;
00197 rep->stat.st_bulk_records++;
00198
00199
00200
00201 if (LF_ISSET(DB_LOG_PERM) || FLD_ISSET(*(bulk->flagsp), BULK_FORCE)) {
00202 RPRINT(dbenv, rep, (dbenv, &mb,
00203 "bulk_msg: Send buffer after copy due to %s",
00204 LF_ISSET(DB_LOG_PERM) ? "PERM" : "FORCE"));
00205 ret = __rep_send_bulk(dbenv, bulk, flags);
00206 }
00207 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00208 return (ret);
00209
00210 }
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220 int
00221 __rep_send_bulk(dbenv, bulkp, flags)
00222 DB_ENV *dbenv;
00223 REP_BULK *bulkp;
00224 u_int32_t flags;
00225 {
00226 DB_REP *db_rep;
00227 REP *rep;
00228 DBT dbt;
00229 int ret;
00230 #ifdef DIAGNOSTIC
00231 DB_MSGBUF mb;
00232 #endif
00233
00234
00235
00236
00237 if (*(bulkp->offp) == 0)
00238 return (0);
00239
00240 db_rep = dbenv->rep_handle;
00241 rep = db_rep->region;
00242
00243 memset(&dbt, 0, sizeof(dbt));
00244
00245
00246
00247 FLD_SET(*(bulkp->flagsp), BULK_XMIT);
00248 dbt.data = bulkp->addr;
00249 dbt.size = (u_int32_t)*(bulkp->offp);
00250 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00251 RPRINT(dbenv, rep, (dbenv, &mb,
00252 "send_bulk: Send %d (0x%x) bulk buffer bytes", dbt.size, dbt.size));
00253
00254
00255
00256 rep->stat.st_bulk_transfers++;
00257 ret = __rep_send_message(dbenv, bulkp->eid, bulkp->type, &bulkp->lsn,
00258 &dbt, flags, 0);
00259
00260 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00261
00262
00263
00264
00265 if (ret == 0)
00266 *(bulkp->offp) = 0;
00267 FLD_CLR(*(bulkp->flagsp), BULK_XMIT);
00268 return (ret);
00269 }
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280 int
00281 __rep_bulk_alloc(dbenv, bulkp, eid, offp, flagsp, type)
00282 DB_ENV *dbenv;
00283 REP_BULK *bulkp;
00284 int eid;
00285 uintptr_t *offp;
00286 u_int32_t *flagsp, type;
00287 {
00288 int ret;
00289
00290 memset(bulkp, 0, sizeof(REP_BULK));
00291 *offp = *flagsp = 0;
00292 bulkp->len = MEGABYTE;
00293 if ((ret = __os_malloc(dbenv, bulkp->len, &bulkp->addr)) != 0)
00294 return (ret);
00295 bulkp->offp = offp;
00296 bulkp->type = type;
00297 bulkp->eid = eid;
00298 bulkp->flagsp = flagsp;
00299 return (ret);
00300 }
00301
00302
00303
00304
00305
00306
00307
00308 int
00309 __rep_bulk_free(dbenv, bulkp, flags)
00310 DB_ENV *dbenv;
00311 REP_BULK *bulkp;
00312 u_int32_t flags;
00313 {
00314 DB_REP *db_rep;
00315 int ret;
00316
00317 db_rep = dbenv->rep_handle;
00318
00319 MUTEX_LOCK(dbenv, db_rep->region->mtx_clientdb);
00320 ret = __rep_send_bulk(dbenv, bulkp, flags);
00321 MUTEX_UNLOCK(dbenv, db_rep->region->mtx_clientdb);
00322 __os_free(dbenv, bulkp->addr);
00323 return (ret);
00324 }
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334 int
00335 __rep_send_message(dbenv, eid, rtype, lsnp, dbt, logflags, repflags)
00336 DB_ENV *dbenv;
00337 int eid;
00338 u_int32_t rtype;
00339 DB_LSN *lsnp;
00340 const DBT *dbt;
00341 u_int32_t logflags, repflags;
00342 {
00343 DB_REP *db_rep;
00344 REP *rep;
00345 DBT cdbt, scrap_dbt;
00346 REP_CONTROL cntrl;
00347 int ret;
00348 u_int32_t myflags, rectype;
00349 #ifdef DIAGNOSTIC
00350 DB_MSGBUF mb;
00351 #endif
00352
00353 db_rep = dbenv->rep_handle;
00354 rep = db_rep->region;
00355
00356
00357 memset(&cntrl, 0, sizeof(cntrl));
00358 if (lsnp == NULL)
00359 ZERO_LSN(cntrl.lsn);
00360 else
00361 cntrl.lsn = *lsnp;
00362 cntrl.rectype = rtype;
00363 cntrl.flags = logflags;
00364 cntrl.rep_version = DB_REPVERSION;
00365 cntrl.log_version = DB_LOGVERSION;
00366 cntrl.gen = rep->gen;
00367
00368 memset(&cdbt, 0, sizeof(cdbt));
00369 cdbt.data = &cntrl;
00370 cdbt.size = sizeof(cntrl);
00371
00372
00373 if (dbt == NULL) {
00374 memset(&scrap_dbt, 0, sizeof(DBT));
00375 dbt = &scrap_dbt;
00376 }
00377
00378 REP_PRINT_MESSAGE(dbenv, eid, &cntrl, "rep_send_message");
00379 #ifdef REP_DIAGNOSTIC
00380 if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION) && rtype == REP_LOG)
00381 __rep_print_logmsg(dbenv, dbt, lsnp);
00382 #endif
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393 myflags = repflags;
00394 if (FLD_ISSET(logflags, DB_LOG_PERM))
00395 myflags |= DB_REP_PERMANENT;
00396 else if (rtype != REP_LOG || FLD_ISSET(logflags, DB_LOG_RESEND))
00397 myflags |= DB_REP_NOBUFFER;
00398 if (rtype == REP_LOG && !FLD_ISSET(logflags, DB_LOG_PERM)) {
00399
00400
00401
00402
00403
00404 memcpy(&rectype, dbt->data, sizeof(rectype));
00405 if (rectype == DB___txn_regop || rectype == DB___txn_ckp)
00406 F_SET(&cntrl, DB_LOG_PERM);
00407 }
00408
00409
00410
00411
00412
00413
00414 ret = dbenv->rep_send(dbenv, &cdbt, dbt, &cntrl.lsn, eid, myflags);
00415
00416
00417
00418
00419
00420
00421 if (ret == 0)
00422 rep->stat.st_msgs_sent++;
00423 else {
00424 rep->stat.st_msgs_send_failures++;
00425 RPRINT(dbenv, rep, (dbenv, &mb,
00426 "rep_send_function returned: %d", ret));
00427 }
00428 return (ret);
00429 }
00430
00431 #ifdef REP_DIAGNOSTIC
00432
00433
00434
00435
00436
00437 static void
00438 __rep_print_logmsg(dbenv, logdbt, lsnp)
00439 DB_ENV *dbenv;
00440 const DBT *logdbt;
00441 DB_LSN *lsnp;
00442 {
00443
00444 static int (**ptab)__P((DB_ENV *,
00445 DBT *, DB_LSN *, db_recops, void *)) = NULL;
00446 size_t ptabsize = 0;
00447
00448 if (ptabsize == 0) {
00449
00450 (void)__bam_init_print(dbenv, &ptab, &ptabsize);
00451 (void)__crdel_init_print(dbenv, &ptab, &ptabsize);
00452 (void)__db_init_print(dbenv, &ptab, &ptabsize);
00453 (void)__dbreg_init_print(dbenv, &ptab, &ptabsize);
00454 (void)__fop_init_print(dbenv, &ptab, &ptabsize);
00455 (void)__ham_init_print(dbenv, &ptab, &ptabsize);
00456 (void)__qam_init_print(dbenv, &ptab, &ptabsize);
00457 (void)__txn_init_print(dbenv, &ptab, &ptabsize);
00458 }
00459
00460 (void)__db_dispatch(dbenv,
00461 ptab, ptabsize, (DBT *)logdbt, lsnp, DB_TXN_PRINT, NULL);
00462 }
00463 #endif
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477 int
00478 __rep_new_master(dbenv, cntrl, eid)
00479 DB_ENV *dbenv;
00480 REP_CONTROL *cntrl;
00481 int eid;
00482 {
00483 DB_LOG *dblp;
00484 DB_LOGC *logc;
00485 DB_LSN first_lsn, lsn;
00486 DB_REP *db_rep;
00487 DBT dbt;
00488 LOG *lp;
00489 REGENV *renv;
00490 REGINFO *infop;
00491 REP *rep;
00492 int change, do_req, ret, t_ret;
00493 #ifdef DIAGNOSTIC
00494 DB_MSGBUF mb;
00495 #endif
00496
00497 db_rep = dbenv->rep_handle;
00498 rep = db_rep->region;
00499 ret = 0;
00500 logc = NULL;
00501 REP_SYSTEM_LOCK(dbenv);
00502 __rep_elect_done(dbenv, rep);
00503 change = rep->gen != cntrl->gen || rep->master_id != eid;
00504 if (change) {
00505 RPRINT(dbenv, rep, (dbenv, &mb,
00506 "Updating gen from %lu to %lu from master %d",
00507 (u_long)rep->gen, (u_long)cntrl->gen, eid));
00508 rep->gen = cntrl->gen;
00509 if (rep->egen <= rep->gen)
00510 rep->egen = rep->gen + 1;
00511 RPRINT(dbenv, rep, (dbenv, &mb,
00512 "Egen is %lu", (u_long)rep->egen));
00513 rep->master_id = eid;
00514 rep->stat.st_master_changes++;
00515 rep->stat.st_startup_complete = 0;
00516
00517
00518
00519
00520
00521 if (FLD_ISSET(rep->config, REP_C_DELAYCLIENT))
00522 F_SET(rep, REP_F_DELAY);
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535 if (rep->in_recovery || F_ISSET(rep, REP_F_READY)) {
00536 (void)__rep_init_cleanup(dbenv, rep, DB_FORCE);
00537 F_CLR(rep, REP_F_RECOVER_MASK);
00538 rep->in_recovery = 0;
00539 F_CLR(rep, REP_F_READY);
00540 }
00541 F_SET(rep, REP_F_NOARCHIVE | REP_F_RECOVER_VERIFY);
00542 }
00543 REP_SYSTEM_UNLOCK(dbenv);
00544
00545 dblp = dbenv->lg_handle;
00546 lp = dblp->reginfo.primary;
00547 LOG_SYSTEM_LOCK(dbenv);
00548 lsn = lp->lsn;
00549 LOG_SYSTEM_UNLOCK(dbenv);
00550
00551 if (!change) {
00552
00553
00554
00555
00556 ret = 0;
00557 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00558 do_req = __rep_check_doreq(dbenv, rep);
00559 if (F_ISSET(rep, REP_F_RECOVER_VERIFY)) {
00560 lsn = lp->verify_lsn;
00561 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00562 if (!F_ISSET(rep, REP_F_DELAY) &&
00563 !IS_ZERO_LSN(lsn) && do_req)
00564 (void)__rep_send_message(dbenv, eid,
00565 REP_VERIFY_REQ, &lsn, NULL, 0,
00566 DB_REP_ANYWHERE);
00567 } else {
00568 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00569 if (log_compare(&lsn, &cntrl->lsn) < 0 && do_req)
00570 (void)__rep_send_message(dbenv, eid,
00571 REP_ALL_REQ, &lsn, NULL,
00572 0, DB_REP_ANYWHERE);
00573 REP_SYSTEM_LOCK(dbenv);
00574 F_CLR(rep, REP_F_NOARCHIVE);
00575 REP_SYSTEM_UNLOCK(dbenv);
00576 }
00577 return (ret);
00578 }
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588 if (IS_INIT_LSN(lsn) || IS_ZERO_LSN(lsn)) {
00589
00590
00591
00592
00593
00594 empty: MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00595 F_SET(db_rep, DBREP_OPENFILES);
00596 ZERO_LSN(lp->verify_lsn);
00597 REP_SYSTEM_LOCK(dbenv);
00598 F_CLR(rep, REP_F_NOARCHIVE | REP_F_RECOVER_MASK);
00599 REP_SYSTEM_UNLOCK(dbenv);
00600
00601 if (!IS_INIT_LSN(cntrl->lsn)) {
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613 lp->wait_recs = rep->max_gap;
00614 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00615
00616
00617
00618
00619
00620
00621 if (!F_ISSET(rep, REP_F_DELAY))
00622 (void)__rep_send_message(dbenv, eid,
00623 REP_ALL_REQ, &lsn, NULL,
00624 0, DB_REP_ANYWHERE);
00625 } else
00626 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00627
00628 return (DB_REP_NEWMASTER);
00629 }
00630
00631 memset(&dbt, 0, sizeof(dbt));
00632
00633
00634
00635
00636
00637
00638 if (cntrl->lsn.file < lsn.file) {
00639 if ((ret = __log_cursor(dbenv, &logc)) != 0)
00640 goto err;
00641 if ((ret = __log_c_get(logc, &first_lsn, &dbt, DB_FIRST)) != 0)
00642 goto err;
00643 if (cntrl->lsn.file < first_lsn.file) {
00644 __db_err(dbenv,
00645 "Client too far ahead of master; unable to join replication group");
00646 ret = DB_REP_JOIN_FAILURE;
00647 goto err;
00648 }
00649 ret = __log_c_close(logc);
00650 logc = NULL;
00651 if (ret != 0)
00652 goto err;
00653 }
00654 if ((ret = __log_cursor(dbenv, &logc)) != 0)
00655 goto err;
00656 ret = __rep_log_backup(logc, &lsn);
00657 err: if (logc != NULL && (t_ret = __log_c_close(logc)) != 0 && ret == 0)
00658 ret = t_ret;
00659 if (ret == DB_NOTFOUND) {
00660
00661
00662
00663
00664
00665
00666 INIT_LSN(lsn);
00667 RPRINT(dbenv, rep, (dbenv, &mb,
00668 "No commit or ckp found. Truncate log."));
00669 (void)__log_vtruncate(dbenv, &lsn, &lsn, NULL);
00670 infop = dbenv->reginfo;
00671 renv = infop->primary;
00672 REP_SYSTEM_LOCK(dbenv);
00673 (void)time(&renv->rep_timestamp);
00674 REP_SYSTEM_UNLOCK(dbenv);
00675 goto empty;
00676 }
00677
00678
00679
00680
00681
00682
00683 if (ret != 0) {
00684 REP_SYSTEM_LOCK(dbenv);
00685 F_CLR(rep, REP_F_RECOVER_MASK | REP_F_DELAY);
00686 REP_SYSTEM_UNLOCK(dbenv);
00687 return (ret);
00688 }
00689
00690
00691
00692
00693 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00694 lp->verify_lsn = lsn;
00695 lp->rcvd_recs = 0;
00696 lp->wait_recs = rep->request_gap;
00697 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00698 if (!F_ISSET(rep, REP_F_DELAY))
00699 (void)__rep_send_message(dbenv,
00700 eid, REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_ANYWHERE);
00701
00702 return (DB_REP_NEWMASTER);
00703 }
00704
00705
00706
00707
00708
00709
00710
00711
00712 int
00713 __rep_is_client(dbenv)
00714 DB_ENV *dbenv;
00715 {
00716 DB_REP *db_rep;
00717 REP *rep;
00718
00719 if (!REP_ON(dbenv))
00720 return (0);
00721
00722 db_rep = dbenv->rep_handle;
00723 rep = db_rep->region;
00724
00725
00726
00727
00728
00729 return (F_ISSET(rep, REP_F_CLIENT) ? 1 : 0);
00730 }
00731
00732
00733
00734
00735
00736
00737
00738
00739 int
00740 __rep_noarchive(dbenv)
00741 DB_ENV *dbenv;
00742 {
00743 DB_REP *db_rep;
00744 REGENV *renv;
00745 REGINFO *infop;
00746 REP *rep;
00747 time_t timestamp;
00748
00749 infop = dbenv->reginfo;
00750 renv = infop->primary;
00751
00752
00753
00754
00755
00756
00757 if (F_ISSET(renv, DB_REGENV_REPLOCKED)) {
00758 (void)time(×tamp);
00759 TIMESTAMP_CHECK(dbenv, timestamp, renv);
00760
00761
00762
00763
00764 if (F_ISSET(renv, DB_REGENV_REPLOCKED))
00765 return (EINVAL);
00766 }
00767
00768 if (!REP_ON(dbenv))
00769 return (0);
00770 db_rep = dbenv->rep_handle;
00771 rep = db_rep->region;
00772 if (F_ISSET(rep, REP_F_NOARCHIVE))
00773 return (1);
00774 return (0);
00775 }
00776
00777
00778
00779
00780
00781
00782
00783
00784 void
00785 __rep_send_vote(dbenv, lsnp, nsites, nvotes, pri, tie, egen, eid, vtype)
00786 DB_ENV *dbenv;
00787 DB_LSN *lsnp;
00788 int eid, nsites, nvotes, pri;
00789 u_int32_t egen, tie, vtype;
00790 {
00791 DBT vote_dbt;
00792 REP_VOTE_INFO vi;
00793
00794 memset(&vi, 0, sizeof(vi));
00795
00796 vi.egen = egen;
00797 vi.priority = pri;
00798 vi.nsites = nsites;
00799 vi.nvotes = nvotes;
00800 vi.tiebreaker = tie;
00801
00802 memset(&vote_dbt, 0, sizeof(vote_dbt));
00803 vote_dbt.data = &vi;
00804 vote_dbt.size = sizeof(vi);
00805
00806 (void)__rep_send_message(dbenv, eid, vtype, lsnp, &vote_dbt, 0, 0);
00807 }
00808
00809
00810
00811
00812
00813
00814
00815
00816 void
00817 __rep_elect_done(dbenv, rep)
00818 DB_ENV *dbenv;
00819 REP *rep;
00820 {
00821 int inelect;
00822 u_int32_t endsec, endusec;
00823 #ifdef DIAGNOSTIC
00824 DB_MSGBUF mb;
00825 #else
00826 COMPQUIET(dbenv, NULL);
00827 #endif
00828 inelect = IN_ELECTION_TALLY(rep);
00829 F_CLR(rep, REP_F_EPHASE1 | REP_F_EPHASE2 | REP_F_TALLY);
00830 rep->sites = 0;
00831 rep->votes = 0;
00832 if (inelect) {
00833 if (rep->esec != 0) {
00834 __os_clock(dbenv, &endsec, &endusec);
00835 __db_difftime(rep->esec, endsec, rep->eusec, endusec,
00836 &rep->stat.st_election_sec,
00837 &rep->stat.st_election_usec);
00838 RPRINT(dbenv, rep, (dbenv, &mb,
00839 "Election finished in %u.%06u sec",
00840 rep->stat.st_election_sec,
00841 rep->stat.st_election_usec));
00842 rep->esec = 0;
00843 rep->eusec = 0;
00844 }
00845 rep->egen++;
00846 }
00847 RPRINT(dbenv, rep, (dbenv, &mb,
00848 "Election done; egen %lu", (u_long)rep->egen));
00849 }
00850
00851
00852
00853
00854
00855
00856
00857
00858
00859
00860 int
00861 __rep_grow_sites(dbenv, nsites)
00862 DB_ENV *dbenv;
00863 int nsites;
00864 {
00865 REGENV *renv;
00866 REGINFO *infop;
00867 REP *rep;
00868 int nalloc, ret, *tally;
00869
00870 rep = ((DB_REP *)dbenv->rep_handle)->region;
00871
00872
00873
00874
00875
00876 nalloc = 2 * rep->asites;
00877 if (nalloc < nsites)
00878 nalloc = nsites;
00879
00880 infop = dbenv->reginfo;
00881 renv = infop->primary;
00882 MUTEX_LOCK(dbenv, renv->mtx_regenv);
00883
00884
00885
00886
00887
00888
00889 if ((ret = __db_shalloc(infop,
00890 (size_t)nalloc * sizeof(REP_VTALLY), sizeof(REP_VTALLY),
00891 &tally)) == 0) {
00892 if (rep->tally_off != INVALID_ROFF)
00893 __db_shalloc_free(
00894 infop, R_ADDR(infop, rep->tally_off));
00895 rep->tally_off = R_OFFSET(infop, tally);
00896 if ((ret = __db_shalloc(infop,
00897 (size_t)nalloc * sizeof(REP_VTALLY), sizeof(REP_VTALLY),
00898 &tally)) == 0) {
00899
00900 if (rep->v2tally_off != INVALID_ROFF)
00901 __db_shalloc_free(infop,
00902 R_ADDR(infop, rep->v2tally_off));
00903 rep->v2tally_off = R_OFFSET(infop, tally);
00904 rep->asites = nalloc;
00905 rep->nsites = nsites;
00906 } else {
00907
00908
00909
00910
00911
00912
00913
00914 if (rep->v2tally_off != INVALID_ROFF)
00915 __db_shalloc_free(infop,
00916 R_ADDR(infop, rep->v2tally_off));
00917 __db_shalloc_free(infop,
00918 R_ADDR(infop, rep->tally_off));
00919 rep->v2tally_off = rep->tally_off = INVALID_ROFF;
00920 rep->asites = 0;
00921 rep->nsites = 0;
00922 }
00923 }
00924 MUTEX_UNLOCK(dbenv, renv->mtx_regenv);
00925 return (ret);
00926 }
00927
00928
00929
00930
00931
00932
00933
00934
00935
00936
00937
00938 int
00939 __env_rep_enter(dbenv, checklock)
00940 DB_ENV *dbenv;
00941 int checklock;
00942 {
00943 DB_REP *db_rep;
00944 REGENV *renv;
00945 REGINFO *infop;
00946 REP *rep;
00947 int cnt;
00948 time_t timestamp;
00949
00950
00951 if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
00952 return (0);
00953
00954 db_rep = dbenv->rep_handle;
00955 rep = db_rep->region;
00956
00957 infop = dbenv->reginfo;
00958 renv = infop->primary;
00959 if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) {
00960 (void)time(×tamp);
00961 TIMESTAMP_CHECK(dbenv, timestamp, renv);
00962
00963
00964
00965
00966 if (F_ISSET(renv, DB_REGENV_REPLOCKED))
00967 return (EINVAL);
00968 }
00969
00970 REP_SYSTEM_LOCK(dbenv);
00971 for (cnt = 0; rep->in_recovery;) {
00972 REP_SYSTEM_UNLOCK(dbenv);
00973 if (FLD_ISSET(rep->config, REP_C_NOWAIT)) {
00974 __db_err(dbenv,
00975 "Operation locked out. Waiting for replication recovery to complete");
00976 return (DB_REP_LOCKOUT);
00977 }
00978 __os_sleep(dbenv, 1, 0);
00979 REP_SYSTEM_LOCK(dbenv);
00980 if (++cnt % 60 == 0)
00981 __db_err(dbenv,
00982 "DB_ENV handle waiting %d minutes for replication recovery to complete",
00983 cnt / 60);
00984 }
00985 rep->handle_cnt++;
00986 REP_SYSTEM_UNLOCK(dbenv);
00987
00988 return (0);
00989 }
00990
00991
00992
00993
00994
00995
00996
00997
00998 int
00999 __env_db_rep_exit(dbenv)
01000 DB_ENV *dbenv;
01001 {
01002 DB_REP *db_rep;
01003 REP *rep;
01004
01005
01006 if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
01007 return (0);
01008
01009 db_rep = dbenv->rep_handle;
01010 rep = db_rep->region;
01011
01012 REP_SYSTEM_LOCK(dbenv);
01013 rep->handle_cnt--;
01014 REP_SYSTEM_UNLOCK(dbenv);
01015
01016 return (0);
01017 }
01018
01019
01020
01021
01022
01023
01024
01025
01026
01027
01028
01029
01030
01031
01032
01033 int
01034 __db_rep_enter(dbp, checkgen, checklock, return_now)
01035 DB *dbp;
01036 int checkgen, checklock, return_now;
01037 {
01038 DB_ENV *dbenv;
01039 DB_REP *db_rep;
01040 REGENV *renv;
01041 REGINFO *infop;
01042 REP *rep;
01043 time_t timestamp;
01044
01045 dbenv = dbp->dbenv;
01046
01047 if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
01048 return (0);
01049
01050 db_rep = dbenv->rep_handle;
01051 rep = db_rep->region;
01052 infop = dbenv->reginfo;
01053 renv = infop->primary;
01054
01055 if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) {
01056 (void)time(×tamp);
01057 TIMESTAMP_CHECK(dbenv, timestamp, renv);
01058
01059
01060
01061
01062 if (F_ISSET(renv, DB_REGENV_REPLOCKED))
01063 return (EINVAL);
01064 }
01065 REP_SYSTEM_LOCK(dbenv);
01066 if (F_ISSET(rep, REP_F_READY)) {
01067 REP_SYSTEM_UNLOCK(dbenv);
01068 if (!return_now)
01069 __os_sleep(dbenv, 5, 0);
01070 return (DB_LOCK_DEADLOCK);
01071 }
01072
01073 if (checkgen && dbp->timestamp != renv->rep_timestamp) {
01074 REP_SYSTEM_UNLOCK(dbenv);
01075 __db_err(dbenv, "%s %s",
01076 "replication recovery unrolled committed transactions;",
01077 "open DB and DBcursor handles must be closed");
01078 return (DB_REP_HANDLE_DEAD);
01079 }
01080 rep->handle_cnt++;
01081 REP_SYSTEM_UNLOCK(dbenv);
01082
01083 return (0);
01084 }
01085
01086
01087
01088
01089
01090
01091
01092
01093
01094
01095
01096
01097 int
01098 __op_rep_enter(dbenv)
01099 DB_ENV *dbenv;
01100 {
01101 DB_REP *db_rep;
01102 REP *rep;
01103 int cnt;
01104
01105
01106 if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
01107 return (0);
01108
01109 db_rep = dbenv->rep_handle;
01110 rep = db_rep->region;
01111
01112 REP_SYSTEM_LOCK(dbenv);
01113 for (cnt = 0; F_ISSET(rep, REP_F_READY);) {
01114 REP_SYSTEM_UNLOCK(dbenv);
01115 if (FLD_ISSET(rep->config, REP_C_NOWAIT)) {
01116 __db_err(dbenv,
01117 "Operation locked out. Waiting for replication recovery to complete");
01118 return (DB_REP_LOCKOUT);
01119 }
01120 __os_sleep(dbenv, 5, 0);
01121 cnt += 5;
01122 REP_SYSTEM_LOCK(dbenv);
01123 if (cnt % 60 == 0)
01124 __db_err(dbenv,
01125 "__op_rep_enter waiting %d minutes for op count to drain",
01126 cnt / 60);
01127 }
01128 rep->op_cnt++;
01129 REP_SYSTEM_UNLOCK(dbenv);
01130
01131 return (0);
01132 }
01133
01134
01135
01136
01137
01138
01139
01140
01141
01142 int
01143 __op_rep_exit(dbenv)
01144 DB_ENV *dbenv;
01145 {
01146 DB_REP *db_rep;
01147 REP *rep;
01148
01149
01150 if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
01151 return (0);
01152
01153 db_rep = dbenv->rep_handle;
01154 rep = db_rep->region;
01155
01156 REP_SYSTEM_LOCK(dbenv);
01157 DB_ASSERT(rep->op_cnt > 0);
01158 rep->op_cnt--;
01159 REP_SYSTEM_UNLOCK(dbenv);
01160
01161 return (0);
01162 }
01163
01164
01165
01166
01167
01168
01169
01170
01171 int
01172 __rep_get_gen(dbenv, genp)
01173 DB_ENV *dbenv;
01174 u_int32_t *genp;
01175 {
01176 DB_REP *db_rep;
01177 REP *rep;
01178
01179 db_rep = dbenv->rep_handle;
01180 rep = db_rep->region;
01181
01182 REP_SYSTEM_LOCK(dbenv);
01183 if (rep->recover_gen > rep->gen)
01184 *genp = rep->recover_gen;
01185 else
01186 *genp = rep->gen;
01187 REP_SYSTEM_UNLOCK(dbenv);
01188
01189 return (0);
01190 }
01191
01192
01193
01194
01195
01196
01197
01198
01199
01200 int
01201 __rep_lockout(dbenv, rep, msg_th)
01202 DB_ENV *dbenv;
01203 REP *rep;
01204 u_int32_t msg_th;
01205 {
01206 int wait_cnt;
01207
01208
01209 F_SET(rep, REP_F_READY);
01210 for (wait_cnt = 0; rep->op_cnt != 0;) {
01211 REP_SYSTEM_UNLOCK(dbenv);
01212 __os_sleep(dbenv, 1, 0);
01213 #if defined(DIAGNOSTIC) || defined(CONFIG_TEST)
01214 if (++wait_cnt % 60 == 0)
01215 __db_err(dbenv,
01216 "Waiting for txn_cnt to run replication recovery/backup for %d minutes",
01217 wait_cnt / 60);
01218 #endif
01219 REP_SYSTEM_LOCK(dbenv);
01220 }
01221
01222
01223
01224
01225
01226
01227 rep->in_recovery = 1;
01228 for (wait_cnt = 0; rep->handle_cnt != 0 || rep->msg_th > msg_th;) {
01229 REP_SYSTEM_UNLOCK(dbenv);
01230 __os_sleep(dbenv, 1, 0);
01231 #ifdef DIAGNOSTIC
01232 if (++wait_cnt % 60 == 0)
01233 __db_err(dbenv,
01234 "Waiting for handle count to run replication recovery/backup for %d minutes",
01235 wait_cnt / 60);
01236 #endif
01237 REP_SYSTEM_LOCK(dbenv);
01238 }
01239
01240 return (0);
01241 }
01242
01243
01244
01245
01246
01247
01248
01249
01250
01251
01252
01253
01254 int
01255 __rep_send_throttle(dbenv, eid, repth, flags)
01256 DB_ENV *dbenv;
01257 int eid;
01258 REP_THROTTLE *repth;
01259 u_int32_t flags;
01260 {
01261 DB_REP *db_rep;
01262 REP *rep;
01263 u_int32_t size, typemore;
01264 int check_limit;
01265
01266 check_limit = repth->gbytes != 0 || repth->bytes != 0;
01267
01268
01269
01270
01271 if (!check_limit && LF_ISSET(REP_THROTTLE_ONLY))
01272 return (0);
01273
01274 db_rep = dbenv->rep_handle;
01275 rep = db_rep->region;
01276 typemore = 0;
01277 if (repth->type == REP_LOG)
01278 typemore = REP_LOG_MORE;
01279 if (repth->type == REP_PAGE)
01280 typemore = REP_PAGE_MORE;
01281 DB_ASSERT(typemore != 0);
01282
01283
01284
01285
01286
01287
01288
01289
01290 size = repth->data_dbt->size + sizeof(REP_CONTROL);
01291 if (check_limit) {
01292 if (repth->lsn.offset == 28) {
01293 repth->type = typemore;
01294 goto send;
01295 }
01296 while (repth->bytes <= size) {
01297 if (repth->gbytes > 0) {
01298 repth->bytes += GIGABYTE;
01299 --(repth->gbytes);
01300 continue;
01301 }
01302
01303
01304
01305
01306 rep->stat.st_nthrottles++;
01307 repth->type = typemore;
01308 goto send;
01309 }
01310 repth->bytes -= size;
01311 }
01312
01313
01314
01315
01316 send: if ((repth->type == typemore || !LF_ISSET(REP_THROTTLE_ONLY)) &&
01317 (__rep_send_message(dbenv, eid, repth->type,
01318 &repth->lsn, repth->data_dbt, DB_LOG_RESEND, 0) != 0))
01319 return (1);
01320 return (0);
01321 }
01322
01323 #ifdef DIAGNOSTIC
01324
01325
01326
01327 void
01328 __rep_print_message(dbenv, eid, rp, str)
01329 DB_ENV *dbenv;
01330 int eid;
01331 REP_CONTROL *rp;
01332 char *str;
01333 {
01334 DB_MSGBUF mb;
01335 char *type;
01336
01337 switch (rp->rectype) {
01338 case REP_ALIVE:
01339 type = "alive";
01340 break;
01341 case REP_ALIVE_REQ:
01342 type = "alive_req";
01343 break;
01344 case REP_ALL_REQ:
01345 type = "all_req";
01346 break;
01347 case REP_BULK_LOG:
01348 type = "bulk_log";
01349 break;
01350 case REP_BULK_PAGE:
01351 type = "bulk_page";
01352 break;
01353 case REP_DUPMASTER:
01354 type = "dupmaster";
01355 break;
01356 case REP_FILE:
01357 type = "file";
01358 break;
01359 case REP_FILE_FAIL:
01360 type = "file_fail";
01361 break;
01362 case REP_FILE_REQ:
01363 type = "file_req";
01364 break;
01365 case REP_LOG:
01366 type = "log";
01367 break;
01368 case REP_LOG_MORE:
01369 type = "log_more";
01370 break;
01371 case REP_LOG_REQ:
01372 type = "log_req";
01373 break;
01374 case REP_MASTER_REQ:
01375 type = "master_req";
01376 break;
01377 case REP_NEWCLIENT:
01378 type = "newclient";
01379 break;
01380 case REP_NEWFILE:
01381 type = "newfile";
01382 break;
01383 case REP_NEWMASTER:
01384 type = "newmaster";
01385 break;
01386 case REP_NEWSITE:
01387 type = "newsite";
01388 break;
01389 case REP_PAGE:
01390 type = "page";
01391 break;
01392 case REP_PAGE_FAIL:
01393 type = "page_fail";
01394 break;
01395 case REP_PAGE_MORE:
01396 type = "page_more";
01397 break;
01398 case REP_PAGE_REQ:
01399 type = "page_req";
01400 break;
01401 case REP_REREQUEST:
01402 type = "rerequest";
01403 break;
01404 case REP_UPDATE:
01405 type = "update";
01406 break;
01407 case REP_UPDATE_REQ:
01408 type = "update_req";
01409 break;
01410 case REP_VERIFY:
01411 type = "verify";
01412 break;
01413 case REP_VERIFY_FAIL:
01414 type = "verify_fail";
01415 break;
01416 case REP_VERIFY_REQ:
01417 type = "verify_req";
01418 break;
01419 case REP_VOTE1:
01420 type = "vote1";
01421 break;
01422 case REP_VOTE2:
01423 type = "vote2";
01424 break;
01425 default:
01426 type = "NOTYPE";
01427 break;
01428 }
01429 RPRINT(dbenv, ((REP *)((DB_REP *)(dbenv)->rep_handle)->region),
01430 (dbenv, &mb, "%s %s: gen = %lu eid %d, type %s, LSN [%lu][%lu]",
01431 dbenv->db_home, str, (u_long)rp->gen,
01432 eid, type, (u_long)rp->lsn.file, (u_long)rp->lsn.offset));
01433 }
01434 #endif