00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include "db_config.h"
00011
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #if TIME_WITH_SYS_TIME
00014 #include <sys/time.h>
00015 #include <time.h>
00016 #else
00017 #if HAVE_SYS_TIME_H
00018 #include <sys/time.h>
00019 #else
00020 #include <time.h>
00021 #endif
00022 #endif
00023
00024 #include <stdlib.h>
00025 #include <string.h>
00026 #endif
00027
00028 #include "db_int.h"
00029 #include "dbinc/db_page.h"
00030 #include "dbinc/db_shash.h"
00031 #include "dbinc/db_am.h"
00032 #include "dbinc/lock.h"
00033 #include "dbinc/log.h"
00034 #include "dbinc/mp.h"
00035 #include "dbinc/txn.h"
00036
00037 static int __rep_collect_txn __P((DB_ENV *, DB_LSN *, LSN_COLLECTION *));
00038 static int __rep_do_ckp __P((DB_ENV *, DBT *, REP_CONTROL *));
00039 static int __rep_getnext __P((DB_ENV *));
00040 static int __rep_lsn_cmp __P((const void *, const void *));
00041 static int __rep_newfile __P((DB_ENV *, REP_CONTROL *, DB_LSN *));
00042 static int __rep_process_rec __P((DB_ENV *,
00043 REP_CONTROL *, DBT *, u_int32_t *, DB_LSN *));
00044 static int __rep_remfirst __P((DB_ENV *, DBT *, DBT *));
00045 static int __rep_resend_req __P((DB_ENV *, int));
00046 static int __rep_skip_msg __P((DB_ENV *, REP *, int, u_int32_t));
00047
00048
00049
00050 #define MASTER_ONLY(rep, rp) do { \
00051 if (!F_ISSET(rep, REP_F_MASTER)) { \
00052 RPRINT(dbenv, rep, \
00053 (dbenv, &mb, "Master record received on client")); \
00054 REP_PRINT_MESSAGE(dbenv, \
00055 *eidp, rp, "rep_process_message"); \
00056 ret = EINVAL; \
00057 goto errlock; \
00058 } \
00059 } while (0)
00060
00061 #define CLIENT_ONLY(rep, rp) do { \
00062 if (!F_ISSET(rep, REP_F_CLIENT)) { \
00063 RPRINT(dbenv, rep, \
00064 (dbenv, &mb, "Client record received on master")); \
00065 REP_PRINT_MESSAGE(dbenv, \
00066 *eidp, rp, "rep_process_message"); \
00067 (void)__rep_send_message(dbenv, \
00068 DB_EID_BROADCAST, REP_DUPMASTER, NULL, NULL, 0, 0); \
00069 ret = DB_REP_DUPMASTER; \
00070 goto errlock; \
00071 } \
00072 } while (0)
00073
00074
00075
00076
00077
00078
00079 #define CLIENT_REREQ do { \
00080 if (F_ISSET(rep, REP_F_CLIENT)) { \
00081 rep->stat.st_client_svc_req++; \
00082 if (ret == DB_NOTFOUND) { \
00083 rep->stat.st_client_svc_miss++; \
00084 ret = __rep_skip_msg(dbenv, rep, *eidp, rp->rectype);\
00085 } \
00086 } \
00087 } while (0)
00088
00089 #define MASTER_UPDATE(dbenv, renv) do { \
00090 REP_SYSTEM_LOCK(dbenv); \
00091 F_SET((renv), DB_REGENV_REPLOCKED); \
00092 (void)time(&(renv)->op_timestamp); \
00093 REP_SYSTEM_UNLOCK(dbenv); \
00094 } while (0)
00095
00096 #define RECOVERING_SKIP do { \
00097 if (recovering) { \
00098 \
00099 rep->stat.st_msgs_recover++; \
00100 ret = __rep_skip_msg(dbenv, rep, *eidp, rp->rectype); \
00101 goto errlock; \
00102 } \
00103 } while (0)
00104
00105
00106
00107
00108
00109
00110
00111
00112 #define RECOVERING_LOG_SKIP do { \
00113 if (F_ISSET(rep, REP_F_DELAY) || \
00114 (recovering && \
00115 (!F_ISSET(rep, REP_F_RECOVER_LOG) || \
00116 log_compare(&rp->lsn, &rep->last_lsn) > 0))) { \
00117 \
00118 rep->stat.st_msgs_recover++; \
00119 ret = __rep_skip_msg(dbenv, rep, *eidp, rp->rectype); \
00120 goto errlock; \
00121 } \
00122 } while (0)
00123
00124 #define ANYSITE(rep)
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143 int
00144 __rep_process_message(dbenv, control, rec, eidp, ret_lsnp)
00145 DB_ENV *dbenv;
00146 DBT *control, *rec;
00147 int *eidp;
00148 DB_LSN *ret_lsnp;
00149 {
00150 DB_LOG *dblp;
00151 DB_LSN lsn;
00152 DB_REP *db_rep;
00153 DBT data_dbt;
00154 LOG *lp;
00155 REGENV *renv;
00156 REGINFO *infop;
00157 REP *rep;
00158 REP_CONTROL *rp;
00159 u_int32_t egen, gen;
00160 int cmp, recovering, ret;
00161 time_t savetime;
00162 #ifdef DIAGNOSTIC
00163 DB_MSGBUF mb;
00164 #endif
00165
00166 PANIC_CHECK(dbenv);
00167 ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_process_message",
00168 DB_INIT_REP);
00169
00170
00171 if (control == NULL || control->size == 0) {
00172 __db_err(dbenv,
00173 "DB_ENV->rep_process_message: control argument must be specified");
00174 return (EINVAL);
00175 }
00176
00177 if (!IS_REP_MASTER(dbenv) && !IS_REP_CLIENT(dbenv)) {
00178 __db_err(dbenv,
00179 "Environment not configured as replication master or client");
00180 return (EINVAL);
00181 }
00182
00183 ret = 0;
00184 db_rep = dbenv->rep_handle;
00185 rep = db_rep->region;
00186 dblp = dbenv->lg_handle;
00187 lp = dblp->reginfo.primary;
00188 infop = dbenv->reginfo;
00189 renv = infop->primary;
00190 rp = (REP_CONTROL *)control->data;
00191 if (ret_lsnp != NULL)
00192 ZERO_LSN(*ret_lsnp);
00193
00194
00195
00196
00197 REP_SYSTEM_LOCK(dbenv);
00198 if (rep->start_th != 0) {
00199
00200
00201
00202
00203 RPRINT(dbenv, rep, (dbenv, &mb,
00204 "Racing rep_start, ignore message."));
00205 if (F_ISSET(rp, DB_LOG_PERM))
00206 ret = DB_REP_IGNORE;
00207 REP_SYSTEM_UNLOCK(dbenv);
00208 goto out;
00209 }
00210 rep->msg_th++;
00211 gen = rep->gen;
00212 recovering = rep->in_recovery || F_ISSET(rep, REP_F_RECOVER_MASK);
00213 savetime = renv->rep_timestamp;
00214
00215 rep->stat.st_msgs_processed++;
00216 REP_SYSTEM_UNLOCK(dbenv);
00217
00218 REP_PRINT_MESSAGE(dbenv, *eidp, rp, "rep_process_message");
00219
00220
00221 if (rp->rep_version != DB_REPVERSION) {
00222 __db_err(dbenv,
00223 "unexpected replication message version %lu, expected %d",
00224 (u_long)rp->rep_version, DB_REPVERSION);
00225 ret = EINVAL;
00226 goto errlock;
00227 }
00228 if (rp->log_version != DB_LOGVERSION) {
00229 __db_err(dbenv,
00230 "unexpected log record version %lu, expected %d",
00231 (u_long)rp->log_version, DB_LOGVERSION);
00232 ret = EINVAL;
00233 goto errlock;
00234 }
00235
00236
00237
00238
00239
00240
00241 if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ &&
00242 rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ &&
00243 rp->rectype != REP_DUPMASTER) {
00244
00245
00246
00247 rep->stat.st_msgs_badgen++;
00248 if (F_ISSET(rp, DB_LOG_PERM))
00249 ret = DB_REP_IGNORE;
00250 goto errlock;
00251 }
00252
00253 if (rp->gen > gen) {
00254
00255
00256
00257
00258 if (F_ISSET(rep, REP_F_MASTER)) {
00259 rep->stat.st_dupmasters++;
00260 ret = DB_REP_DUPMASTER;
00261 if (rp->rectype != REP_DUPMASTER)
00262 (void)__rep_send_message(dbenv,
00263 DB_EID_BROADCAST, REP_DUPMASTER,
00264 NULL, NULL, 0, 0);
00265 goto errlock;
00266 }
00267
00268
00269
00270
00271
00272
00273
00274
00275 if (rp->rectype == REP_ALIVE ||
00276 rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) {
00277 REP_SYSTEM_LOCK(dbenv);
00278 RPRINT(dbenv, rep, (dbenv, &mb,
00279 "Updating gen from %lu to %lu",
00280 (u_long)gen, (u_long)rp->gen));
00281 rep->master_id = DB_EID_INVALID;
00282 gen = rep->gen = rp->gen;
00283
00284
00285
00286
00287 REP_SYSTEM_UNLOCK(dbenv);
00288 if (rp->rectype == REP_ALIVE)
00289 (void)__rep_send_message(dbenv,
00290 DB_EID_BROADCAST, REP_MASTER_REQ, NULL,
00291 NULL, 0, 0);
00292 } else if (rp->rectype != REP_NEWMASTER) {
00293
00294
00295
00296 if (__rep_check_doreq(dbenv, rep))
00297 (void)__rep_send_message(dbenv,
00298 DB_EID_BROADCAST, REP_MASTER_REQ,
00299 NULL, NULL, 0, 0);
00300 goto errlock;
00301 }
00302
00303
00304
00305
00306
00307 }
00308
00309
00310
00311
00312
00313
00314
00315
00316 switch (rp->rectype) {
00317 case REP_ALIVE:
00318
00319
00320
00321 ANYSITE(rep);
00322 egen = *(u_int32_t *)rec->data;
00323 REP_SYSTEM_LOCK(dbenv);
00324 RPRINT(dbenv, rep, (dbenv, &mb,
00325 "Received ALIVE egen of %lu, mine %lu",
00326 (u_long)egen, (u_long)rep->egen));
00327 if (egen > rep->egen) {
00328
00329
00330
00331
00332 __rep_elect_done(dbenv, rep);
00333 rep->egen = egen;
00334 }
00335 REP_SYSTEM_UNLOCK(dbenv);
00336 break;
00337 case REP_ALIVE_REQ:
00338
00339
00340
00341 ANYSITE(rep);
00342 dblp = dbenv->lg_handle;
00343 LOG_SYSTEM_LOCK(dbenv);
00344 lsn = ((LOG *)dblp->reginfo.primary)->lsn;
00345 LOG_SYSTEM_UNLOCK(dbenv);
00346 REP_SYSTEM_LOCK(dbenv);
00347 egen = rep->egen;
00348 REP_SYSTEM_UNLOCK(dbenv);
00349 data_dbt.data = &egen;
00350 data_dbt.size = sizeof(egen);
00351 (void)__rep_send_message(dbenv,
00352 *eidp, REP_ALIVE, &lsn, &data_dbt, 0, 0);
00353 break;
00354 case REP_ALL_REQ:
00355 RECOVERING_SKIP;
00356 ret = __rep_allreq(dbenv, rp, *eidp);
00357 CLIENT_REREQ;
00358 break;
00359 case REP_BULK_LOG:
00360 RECOVERING_LOG_SKIP;
00361 CLIENT_ONLY(rep, rp);
00362 ret = __rep_bulk_log(dbenv, rp, rec, savetime, ret_lsnp);
00363 break;
00364 case REP_BULK_PAGE:
00365
00366
00367
00368 CLIENT_ONLY(rep, rp);
00369 ret = __rep_bulk_page(dbenv, *eidp, rp, rec);
00370 break;
00371 case REP_DUPMASTER:
00372
00373
00374
00375 if (F_ISSET(rep, REP_F_MASTER))
00376 ret = DB_REP_DUPMASTER;
00377 break;
00378 #ifdef NOTYET
00379 case REP_FILE:
00380 CLIENT_ONLY(rep, rp);
00381 break;
00382 case REP_FILE_REQ:
00383 ret = __rep_send_file(dbenv, rec, *eidp);
00384 break;
00385 #endif
00386 case REP_FILE_FAIL:
00387
00388
00389
00390 CLIENT_ONLY(rep, rp);
00391
00392
00393
00394 break;
00395 case REP_LOG:
00396 case REP_LOG_MORE:
00397 RECOVERING_LOG_SKIP;
00398 CLIENT_ONLY(rep, rp);
00399 ret = __rep_log(dbenv, rp, rec, savetime, ret_lsnp);
00400 break;
00401 case REP_LOG_REQ:
00402 RECOVERING_SKIP;
00403 ret = __rep_logreq(dbenv, rp, rec, *eidp);
00404 CLIENT_REREQ;
00405 break;
00406 case REP_NEWSITE:
00407
00408
00409
00410
00411 rep->stat.st_newsites++;
00412
00413
00414 if (F_ISSET(rep, REP_F_MASTER)) {
00415 dblp = dbenv->lg_handle;
00416 lp = dblp->reginfo.primary;
00417 LOG_SYSTEM_LOCK(dbenv);
00418 lsn = lp->lsn;
00419 LOG_SYSTEM_UNLOCK(dbenv);
00420 (void)__rep_send_message(dbenv,
00421 *eidp, REP_NEWMASTER, &lsn, NULL, 0, 0);
00422 }
00423 ret = DB_REP_NEWSITE;
00424 break;
00425 case REP_NEWCLIENT:
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438 (void)__rep_send_message(dbenv,
00439 DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0, 0);
00440
00441 ret = DB_REP_NEWSITE;
00442
00443 if (F_ISSET(rep, REP_F_CLIENT)) {
00444 REP_SYSTEM_LOCK(dbenv);
00445 egen = rep->egen;
00446 if (*eidp == rep->master_id)
00447 rep->master_id = DB_EID_INVALID;
00448 REP_SYSTEM_UNLOCK(dbenv);
00449 data_dbt.data = &egen;
00450 data_dbt.size = sizeof(egen);
00451 (void)__rep_send_message(dbenv, DB_EID_BROADCAST,
00452 REP_ALIVE, &rp->lsn, &data_dbt, 0, 0);
00453 break;
00454 }
00455
00456 case REP_MASTER_REQ:
00457 RECOVERING_SKIP;
00458 if (F_ISSET(rep, REP_F_MASTER)) {
00459 LOG_SYSTEM_LOCK(dbenv);
00460 lsn = lp->lsn;
00461 LOG_SYSTEM_UNLOCK(dbenv);
00462 (void)__rep_send_message(dbenv,
00463 DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
00464 }
00465
00466
00467
00468
00469
00470
00471 if (F_ISSET(rep, REP_F_CLIENT) && rp->gen < gen) {
00472 REP_SYSTEM_LOCK(dbenv);
00473 egen = rep->egen;
00474 if (*eidp == rep->master_id)
00475 rep->master_id = DB_EID_INVALID;
00476 REP_SYSTEM_UNLOCK(dbenv);
00477 data_dbt.data = &egen;
00478 data_dbt.size = sizeof(egen);
00479 (void)__rep_send_message(dbenv, *eidp,
00480 REP_ALIVE, &rp->lsn, &data_dbt, 0, 0);
00481 }
00482 break;
00483 case REP_NEWFILE:
00484 RECOVERING_LOG_SKIP;
00485 CLIENT_ONLY(rep, rp);
00486 ret = __rep_apply(dbenv, rp, rec, ret_lsnp, NULL);
00487 break;
00488 case REP_NEWMASTER:
00489
00490
00491
00492 ANYSITE(rep);
00493 if (F_ISSET(rep, REP_F_MASTER) &&
00494 *eidp != dbenv->rep_eid) {
00495
00496 rep->stat.st_dupmasters++;
00497 ret = DB_REP_DUPMASTER;
00498 (void)__rep_send_message(dbenv,
00499 DB_EID_BROADCAST, REP_DUPMASTER, NULL, NULL, 0, 0);
00500 break;
00501 }
00502 ret = __rep_new_master(dbenv, rp, *eidp);
00503 break;
00504 case REP_PAGE:
00505 case REP_PAGE_MORE:
00506
00507
00508
00509 CLIENT_ONLY(rep, rp);
00510 ret = __rep_page(dbenv, *eidp, rp, rec);
00511 break;
00512 case REP_PAGE_FAIL:
00513
00514
00515
00516 CLIENT_ONLY(rep, rp);
00517 ret = __rep_page_fail(dbenv, *eidp, rec);
00518 break;
00519 case REP_PAGE_REQ:
00520
00521
00522
00523 MASTER_UPDATE(dbenv, renv);
00524 ret = __rep_page_req(dbenv, *eidp, rec);
00525 CLIENT_REREQ;
00526 break;
00527 case REP_REREQUEST:
00528
00529
00530
00531
00532 CLIENT_ONLY(rep, rp);
00533
00534
00535
00536 rep->stat.st_client_rerequests++;
00537 ret = __rep_resend_req(dbenv, 1);
00538 break;
00539 case REP_UPDATE:
00540
00541
00542
00543 CLIENT_ONLY(rep, rp);
00544 ret = __rep_update_setup(dbenv, *eidp, rp, rec);
00545 break;
00546 case REP_UPDATE_REQ:
00547
00548
00549
00550 MASTER_ONLY(rep, rp);
00551 infop = dbenv->reginfo;
00552 renv = infop->primary;
00553 MASTER_UPDATE(dbenv, renv);
00554 ret = __rep_update_req(dbenv, *eidp);
00555 break;
00556 case REP_VERIFY:
00557 if (recovering) {
00558 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00559 cmp = log_compare(&lp->verify_lsn, &rp->lsn);
00560 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00561
00562
00563
00564 if (cmp != 0) {
00565 ret = __rep_skip_msg(
00566 dbenv, rep, *eidp, rp->rectype);
00567 break;
00568 }
00569 }
00570 CLIENT_ONLY(rep, rp);
00571 ret = __rep_verify(dbenv, rp, rec, *eidp, savetime);
00572 break;
00573 case REP_VERIFY_FAIL:
00574
00575
00576
00577 CLIENT_ONLY(rep, rp);
00578 ret = __rep_verify_fail(dbenv, rp, *eidp);
00579 break;
00580 case REP_VERIFY_REQ:
00581 RECOVERING_SKIP;
00582 ret = __rep_verify_req(dbenv, rp, *eidp);
00583 CLIENT_REREQ;
00584 break;
00585 case REP_VOTE1:
00586
00587
00588
00589 ret = __rep_vote1(dbenv, rp, rec, *eidp);
00590 break;
00591 case REP_VOTE2:
00592
00593
00594
00595 ret = __rep_vote2(dbenv, rec, eidp);
00596 break;
00597 default:
00598 __db_err(dbenv,
00599 "DB_ENV->rep_process_message: unknown replication message: type %lu",
00600 (u_long)rp->rectype);
00601 ret = EINVAL;
00602 break;
00603 }
00604
00605 errlock:
00606 REP_SYSTEM_LOCK(dbenv);
00607 rep->msg_th--;
00608 REP_SYSTEM_UNLOCK(dbenv);
00609 out:
00610 if (ret == 0 && F_ISSET(rp, DB_LOG_PERM)) {
00611 if (ret_lsnp != NULL)
00612 *ret_lsnp = rp->lsn;
00613 ret = DB_REP_NOTPERM;
00614 }
00615 return (ret);
00616 }
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631 int
00632 __rep_apply(dbenv, rp, rec, ret_lsnp, is_dupp)
00633 DB_ENV *dbenv;
00634 REP_CONTROL *rp;
00635 DBT *rec;
00636 DB_LSN *ret_lsnp;
00637 int *is_dupp;
00638 {
00639 DB_REP *db_rep;
00640 DBT control_dbt, key_dbt;
00641 DBT rec_dbt;
00642 DB *dbp;
00643 DB_LOG *dblp;
00644 DB_LSN max_lsn;
00645 LOG *lp;
00646 REP *rep;
00647 u_int32_t rectype;
00648 int cmp, ret;
00649 #ifdef DIAGNOSTIC
00650 DB_MSGBUF mb;
00651 #endif
00652
00653 db_rep = dbenv->rep_handle;
00654 rep = db_rep->region;
00655 dbp = db_rep->rep_db;
00656 rectype = 0;
00657 ret = 0;
00658 memset(&control_dbt, 0, sizeof(control_dbt));
00659 memset(&rec_dbt, 0, sizeof(rec_dbt));
00660 ZERO_LSN(max_lsn);
00661
00662 dblp = dbenv->lg_handle;
00663 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00664 lp = dblp->reginfo.primary;
00665 REP_SYSTEM_LOCK(dbenv);
00666 if (F_ISSET(rep, REP_F_RECOVER_LOG) &&
00667 log_compare(&lp->ready_lsn, &rep->first_lsn) < 0)
00668 lp->ready_lsn = rep->first_lsn;
00669 REP_SYSTEM_UNLOCK(dbenv);
00670 cmp = log_compare(&rp->lsn, &lp->ready_lsn);
00671
00672 if (cmp == 0) {
00673 if ((ret =
00674 __rep_process_rec(dbenv, rp, rec, &rectype, &max_lsn)) != 0)
00675 goto err;
00676
00677
00678
00679
00680
00681 lp->rcvd_recs = 0;
00682 ZERO_LSN(lp->max_wait_lsn);
00683
00684 while (ret == 0 &&
00685 log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0) {
00686
00687
00688
00689
00690 gap_check:
00691 if ((ret =
00692 __rep_remfirst(dbenv, &control_dbt, &rec_dbt)) != 0)
00693 goto err;
00694
00695 rp = (REP_CONTROL *)control_dbt.data;
00696 rec = &rec_dbt;
00697 if ((ret = __rep_process_rec(dbenv,
00698 rp, rec, &rectype, &max_lsn)) != 0)
00699 goto err;
00700
00701
00702
00703
00704 --rep->stat.st_log_queued;
00705
00706
00707
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718 lp->rcvd_recs = rep->stat.st_log_queued;
00719 lp->wait_recs = rep->request_gap;
00720
00721 if ((ret = __rep_getnext(dbenv)) == DB_NOTFOUND) {
00722 lp->rcvd_recs = 0;
00723 ret = 0;
00724 break;
00725 } else if (ret != 0)
00726 goto err;
00727 }
00728
00729
00730
00731
00732
00733 if (!IS_ZERO_LSN(lp->waiting_lsn) &&
00734 log_compare(&lp->ready_lsn, &lp->waiting_lsn) != 0) {
00735
00736
00737
00738
00739
00740
00741
00742 if (__rep_check_doreq(dbenv, rep) && (ret =
00743 __rep_loggap_req(dbenv, rep, &rp->lsn, 0)) != 0)
00744 goto err;
00745 } else {
00746 lp->wait_recs = 0;
00747 ZERO_LSN(lp->max_wait_lsn);
00748 }
00749
00750 } else if (cmp > 0) {
00751
00752
00753
00754
00755
00756
00757
00758 memset(&key_dbt, 0, sizeof(key_dbt));
00759 key_dbt.data = rp;
00760 key_dbt.size = sizeof(*rp);
00761 if (lp->wait_recs == 0) {
00762
00763
00764
00765
00766
00767
00768 lp->wait_recs = rep->request_gap;
00769 lp->rcvd_recs = 0;
00770 ZERO_LSN(lp->max_wait_lsn);
00771 }
00772 if (__rep_check_doreq(dbenv, rep) &&
00773 (ret = __rep_loggap_req(dbenv, rep, &rp->lsn, 0) != 0))
00774 goto err;
00775
00776 ret = __db_put(dbp, NULL, &key_dbt, rec, DB_NOOVERWRITE);
00777 rep->stat.st_log_queued++;
00778 rep->stat.st_log_queued_total++;
00779 if (rep->stat.st_log_queued_max < rep->stat.st_log_queued)
00780 rep->stat.st_log_queued_max = rep->stat.st_log_queued;
00781
00782 if (ret == DB_KEYEXIST)
00783 ret = 0;
00784 if (ret != 0)
00785 goto done;
00786
00787 if (IS_ZERO_LSN(lp->waiting_lsn) ||
00788 log_compare(&rp->lsn, &lp->waiting_lsn) < 0)
00789 lp->waiting_lsn = rp->lsn;
00790
00791
00792
00793
00794
00795 if (ret == 0 && F_ISSET(rp, DB_LOG_PERM)) {
00796 max_lsn = rp->lsn;
00797 ret = DB_REP_NOTPERM;
00798 }
00799 goto done;
00800 } else {
00801
00802
00803
00804
00805 rep->stat.st_log_duplicated++;
00806 if (is_dupp != NULL)
00807 *is_dupp = 1;
00808 if (F_ISSET(rp, DB_LOG_PERM))
00809 max_lsn = lp->max_perm_lsn;
00810 goto done;
00811 }
00812
00813
00814 if (ret == 0 && log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0)
00815 goto gap_check;
00816
00817 done:
00818 err:
00819 REP_SYSTEM_LOCK(dbenv);
00820 if (ret == 0 &&
00821 F_ISSET(rep, REP_F_RECOVER_LOG) &&
00822 log_compare(&lp->ready_lsn, &rep->last_lsn) >= 0) {
00823 rep->last_lsn = max_lsn;
00824 ZERO_LSN(max_lsn);
00825 ret = DB_REP_LOGREADY;
00826 }
00827 REP_SYSTEM_UNLOCK(dbenv);
00828
00829 if (ret == 0 && !F_ISSET(rep, REP_F_RECOVER_LOG) &&
00830 !IS_ZERO_LSN(max_lsn)) {
00831 if (ret_lsnp != NULL)
00832 *ret_lsnp = max_lsn;
00833 ret = DB_REP_ISPERM;
00834 DB_ASSERT(log_compare(&max_lsn, &lp->max_perm_lsn) >= 0);
00835 lp->max_perm_lsn = max_lsn;
00836 }
00837 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00838
00839
00840
00841
00842
00843
00844
00845 if (ret == 0 && !F_ISSET(rp, DB_LOG_RESEND) &&
00846 rectype != 0 && rep->stat.st_startup_complete == 0) {
00847 rep->stat.st_startup_complete = 1;
00848 ret = DB_REP_STARTUPDONE;
00849 }
00850 if (ret == 0 && rp->rectype == REP_NEWFILE && lp->db_log_autoremove)
00851 __log_autoremove(dbenv);
00852 if (control_dbt.data != NULL)
00853 __os_ufree(dbenv, control_dbt.data);
00854 if (rec_dbt.data != NULL)
00855 __os_ufree(dbenv, rec_dbt.data);
00856
00857 if (ret == DB_REP_NOTPERM && !F_ISSET(rep, REP_F_RECOVER_LOG) &&
00858 !IS_ZERO_LSN(max_lsn) && ret_lsnp != NULL)
00859 *ret_lsnp = max_lsn;
00860
00861 #ifdef DIAGNOSTIC
00862 if (ret == DB_REP_ISPERM)
00863 RPRINT(dbenv, rep, (dbenv, &mb, "Returning ISPERM [%lu][%lu]",
00864 (u_long)max_lsn.file, (u_long)max_lsn.offset));
00865 else if (ret == DB_REP_LOGREADY)
00866 RPRINT(dbenv, rep, (dbenv, &mb,
00867 "Returning LOGREADY up to [%lu][%lu]",
00868 (u_long)rep->last_lsn.file,
00869 (u_long)rep->last_lsn.offset));
00870 else if (ret == DB_REP_NOTPERM)
00871 RPRINT(dbenv, rep, (dbenv, &mb, "Returning NOTPERM [%lu][%lu]",
00872 (u_long)max_lsn.file, (u_long)max_lsn.offset));
00873 else if (ret == DB_REP_STARTUPDONE)
00874 RPRINT(dbenv, rep, (dbenv, &mb,
00875 "Returning STARTUPDONE [%lu][%lu]",
00876 (u_long)rp->lsn.file, (u_long)rp->lsn.offset));
00877 else if (ret != 0)
00878 RPRINT(dbenv, rep, (dbenv, &mb, "Returning %d [%lu][%lu]", ret,
00879 (u_long)max_lsn.file, (u_long)max_lsn.offset));
00880 #endif
00881 return (ret);
00882 }
00883
00884
00885
00886
00887
00888
00889
00890
00891
00892 int
00893 __rep_process_txn(dbenv, rec)
00894 DB_ENV *dbenv;
00895 DBT *rec;
00896 {
00897 DBT data_dbt, *lock_dbt;
00898 DB_LOCKREQ req, *lvp;
00899 DB_LOGC *logc;
00900 DB_LSN prev_lsn, *lsnp;
00901 DB_REP *db_rep;
00902 DB_TXNHEAD *txninfo;
00903 LSN_COLLECTION lc;
00904 REP *rep;
00905 __txn_regop_args *txn_args;
00906 __txn_xa_regop_args *prep_args;
00907 u_int32_t lockid, rectype;
00908 u_int i;
00909 int ret, t_ret;
00910
00911 db_rep = dbenv->rep_handle;
00912 rep = db_rep->region;
00913 logc = NULL;
00914 txn_args = NULL;
00915 prep_args = NULL;
00916 txninfo = NULL;
00917
00918 memset(&data_dbt, 0, sizeof(data_dbt));
00919 if (F_ISSET(dbenv, DB_ENV_THREAD))
00920 F_SET(&data_dbt, DB_DBT_REALLOC);
00921
00922
00923
00924
00925
00926
00927
00928
00929
00930
00931 memcpy(&rectype, rec->data, sizeof(rectype));
00932 memset(&lc, 0, sizeof(lc));
00933 if (rectype == DB___txn_regop) {
00934
00935
00936
00937
00938 if ((ret = __txn_regop_read(dbenv, rec->data, &txn_args)) != 0)
00939 return (ret);
00940 if (txn_args->opcode != TXN_COMMIT) {
00941 __os_free(dbenv, txn_args);
00942 return (0);
00943 }
00944 prev_lsn = txn_args->prev_lsn;
00945 lock_dbt = &txn_args->locks;
00946 } else {
00947
00948 DB_ASSERT(rectype == DB___txn_xa_regop);
00949
00950 if ((ret =
00951 __txn_xa_regop_read(dbenv, rec->data, &prep_args)) != 0)
00952 return (ret);
00953 prev_lsn = prep_args->prev_lsn;
00954 lock_dbt = &prep_args->locks;
00955 }
00956
00957
00958 if ((ret = __lock_id(dbenv, &lockid, NULL)) != 0)
00959 goto err1;
00960
00961 if ((ret =
00962 __lock_get_list(dbenv, lockid, 0, DB_LOCK_WRITE, lock_dbt)) != 0)
00963 goto err;
00964
00965
00966 if ((ret = __rep_collect_txn(dbenv, &prev_lsn, &lc)) != 0)
00967 goto err;
00968 qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp);
00969
00970
00971
00972
00973
00974
00975 if ((ret = __db_txnlist_init(dbenv, 0, 0, NULL, &txninfo)) != 0)
00976 goto err;
00977
00978
00979 if ((ret = __log_cursor(dbenv, &logc)) != 0)
00980 goto err;
00981 for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) {
00982 if ((ret = __log_c_get(logc, lsnp, &data_dbt, DB_SET)) != 0) {
00983 __db_err(dbenv, "failed to read the log at [%lu][%lu]",
00984 (u_long)lsnp->file, (u_long)lsnp->offset);
00985 goto err;
00986 }
00987 if ((ret = __db_dispatch(dbenv, dbenv->recover_dtab,
00988 dbenv->recover_dtab_size, &data_dbt, lsnp,
00989 DB_TXN_APPLY, txninfo)) != 0) {
00990 __db_err(dbenv, "transaction failed at [%lu][%lu]",
00991 (u_long)lsnp->file, (u_long)lsnp->offset);
00992 goto err;
00993 }
00994 }
00995
00996 err: memset(&req, 0, sizeof(req));
00997 req.op = DB_LOCK_PUT_ALL;
00998 if ((t_ret =
00999 __lock_vec(dbenv, lockid, 0, &req, 1, &lvp)) != 0 && ret == 0)
01000 ret = t_ret;
01001
01002 if ((t_ret = __lock_id_free(dbenv, lockid)) != 0 && ret == 0)
01003 ret = t_ret;
01004
01005 err1: if (txn_args != NULL)
01006 __os_free(dbenv, txn_args);
01007 if (prep_args != NULL)
01008 __os_free(dbenv, prep_args);
01009 if (lc.array != NULL)
01010 __os_free(dbenv, lc.array);
01011
01012 if (logc != NULL && (t_ret = __log_c_close(logc)) != 0 && ret == 0)
01013 ret = t_ret;
01014
01015 if (txninfo != NULL)
01016 __db_txnlist_end(dbenv, txninfo);
01017
01018 if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL)
01019 __os_ufree(dbenv, data_dbt.data);
01020
01021 if (ret == 0)
01022
01023
01024
01025 rep->stat.st_txns_applied++;
01026
01027 return (ret);
01028 }
01029
01030
01031
01032
01033
01034
01035
01036 static int
01037 __rep_collect_txn(dbenv, lsnp, lc)
01038 DB_ENV *dbenv;
01039 DB_LSN *lsnp;
01040 LSN_COLLECTION *lc;
01041 {
01042 __txn_child_args *argp;
01043 DB_LOGC *logc;
01044 DB_LSN c_lsn;
01045 DBT data;
01046 u_int32_t rectype;
01047 u_int nalloc;
01048 int ret, t_ret;
01049
01050 memset(&data, 0, sizeof(data));
01051 F_SET(&data, DB_DBT_REALLOC);
01052
01053 if ((ret = __log_cursor(dbenv, &logc)) != 0)
01054 return (ret);
01055
01056 while (!IS_ZERO_LSN(*lsnp) &&
01057 (ret = __log_c_get(logc, lsnp, &data, DB_SET)) == 0) {
01058 memcpy(&rectype, data.data, sizeof(rectype));
01059 if (rectype == DB___txn_child) {
01060 if ((ret = __txn_child_read(dbenv,
01061 data.data, &argp)) != 0)
01062 goto err;
01063 c_lsn = argp->c_lsn;
01064 *lsnp = argp->prev_lsn;
01065 __os_free(dbenv, argp);
01066 ret = __rep_collect_txn(dbenv, &c_lsn, lc);
01067 } else {
01068 if (lc->nalloc < lc->nlsns + 1) {
01069 nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2;
01070 if ((ret = __os_realloc(dbenv,
01071 nalloc * sizeof(DB_LSN), &lc->array)) != 0)
01072 goto err;
01073 lc->nalloc = nalloc;
01074 }
01075 lc->array[lc->nlsns++] = *lsnp;
01076
01077
01078
01079
01080
01081
01082
01083
01084 memcpy(lsnp, (u_int8_t *)data.data +
01085 sizeof(u_int32_t) + sizeof(u_int32_t),
01086 sizeof(DB_LSN));
01087 }
01088
01089 if (ret != 0)
01090 goto err;
01091 }
01092 if (ret != 0)
01093 __db_err(dbenv, "collect failed at: [%lu][%lu]",
01094 (u_long)lsnp->file, (u_long)lsnp->offset);
01095
01096 err: if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
01097 ret = t_ret;
01098 if (data.data != NULL)
01099 __os_ufree(dbenv, data.data);
01100 return (ret);
01101 }
01102
01103
01104
01105
01106
01107 static int
01108 __rep_lsn_cmp(lsn1, lsn2)
01109 const void *lsn1, *lsn2;
01110 {
01111
01112 return (log_compare((DB_LSN *)lsn1, (DB_LSN *)lsn2));
01113 }
01114
01115
01116
01117
01118
01119
01120
01121 static int
01122 __rep_newfile(dbenv, rc, lsnp)
01123 DB_ENV *dbenv;
01124 REP_CONTROL *rc;
01125 DB_LSN *lsnp;
01126 {
01127 DB_LOG *dblp;
01128 LOG *lp;
01129
01130 dblp = dbenv->lg_handle;
01131 lp = dblp->reginfo.primary;
01132
01133 if (rc->lsn.file + 1 > lp->lsn.file)
01134 return (__log_newfile(dblp, lsnp, 0));
01135 else {
01136
01137 *lsnp = lp->lsn;
01138 return (0);
01139 }
01140 }
01141
01142
01143
01144
01145
01146
01147
01148 static int
01149 __rep_do_ckp(dbenv, rec, rp)
01150 DB_ENV *dbenv;
01151 DBT *rec;
01152 REP_CONTROL *rp;
01153 {
01154 DB_LSN ckp_lsn;
01155 DB_REP *db_rep;
01156 int ret;
01157
01158 db_rep = dbenv->rep_handle;
01159
01160 MUTEX_UNLOCK(dbenv, db_rep->region->mtx_clientdb);
01161
01162 DB_TEST_WAIT(dbenv, dbenv->test_check);
01163
01164
01165 memcpy(&ckp_lsn, (u_int8_t *)rec->data +
01166 SSZ(__txn_ckp_args, ckp_lsn), sizeof(DB_LSN));
01167 ret = __memp_sync(dbenv, &ckp_lsn);
01168
01169
01170 if (ret == 0)
01171 ret = __txn_updateckp(dbenv, &rp->lsn);
01172 else {
01173 __db_err(dbenv, "Error syncing ckp [%lu][%lu]",
01174 (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
01175 ret = __db_panic(dbenv, ret);
01176 }
01177 MUTEX_LOCK(dbenv, db_rep->region->mtx_clientdb);
01178
01179 return (ret);
01180 }
01181
01182
01183
01184
01185
01186 static int
01187 __rep_remfirst(dbenv, cntrl, rec)
01188 DB_ENV *dbenv;
01189 DBT *cntrl;
01190 DBT *rec;
01191 {
01192 DB *dbp;
01193 DBC *dbc;
01194 DB_REP *db_rep;
01195 int ret, t_ret;
01196
01197 db_rep = dbenv->rep_handle;
01198 dbp = db_rep->rep_db;
01199
01200 if ((ret = __db_cursor(dbp, NULL, &dbc, 0)) != 0)
01201 return (ret);
01202
01203
01204 F_SET(cntrl, DB_DBT_REALLOC);
01205 F_SET(rec, DB_DBT_REALLOC);
01206 if ((ret = __db_c_get(dbc, cntrl, rec, DB_RMW | DB_FIRST)) == 0)
01207 ret = __db_c_del(dbc, 0);
01208 if ((t_ret = __db_c_close(dbc)) != 0 && ret == 0)
01209 ret = t_ret;
01210
01211 return (ret);
01212 }
01213
01214
01215
01216
01217
01218 static int
01219 __rep_getnext(dbenv)
01220 DB_ENV *dbenv;
01221 {
01222 DB *dbp;
01223 DB_REP *db_rep;
01224 DB_LOG *dblp;
01225 DBC *dbc;
01226 DBT lsn_dbt, nextrec_dbt;
01227 LOG *lp;
01228 REP_CONTROL *rp;
01229 int ret, t_ret;
01230
01231 dblp = dbenv->lg_handle;
01232 lp = dblp->reginfo.primary;
01233
01234 db_rep = dbenv->rep_handle;
01235 dbp = db_rep->rep_db;
01236
01237 if ((ret = __db_cursor(dbp, NULL, &dbc, 0)) != 0)
01238 return (ret);
01239
01240
01241
01242
01243
01244
01245
01246
01247
01248
01249
01250 memset(&nextrec_dbt, 0, sizeof(nextrec_dbt));
01251 F_SET(&nextrec_dbt, DB_DBT_PARTIAL);
01252 nextrec_dbt.ulen = nextrec_dbt.dlen = 0;
01253
01254 memset(&lsn_dbt, 0, sizeof(lsn_dbt));
01255 ret = __db_c_get(dbc, &lsn_dbt, &nextrec_dbt, DB_FIRST);
01256 if (ret != DB_NOTFOUND && ret != 0)
01257 goto err;
01258
01259 if (ret == DB_NOTFOUND) {
01260 ZERO_LSN(lp->waiting_lsn);
01261
01262
01263
01264
01265
01266
01267 goto err;
01268 }
01269 rp = (REP_CONTROL *)lsn_dbt.data;
01270 lp->waiting_lsn = rp->lsn;
01271
01272 err: if ((t_ret = __db_c_close(dbc)) != 0 && ret == 0)
01273 ret = t_ret;
01274 return (ret);
01275 }
01276
01277
01278
01279
01280
01281
01282
01283
01284
01285 static int
01286 __rep_process_rec(dbenv, rp, rec, typep, ret_lsnp)
01287 DB_ENV *dbenv;
01288 REP_CONTROL *rp;
01289 DBT *rec;
01290 u_int32_t *typep;
01291 DB_LSN *ret_lsnp;
01292 {
01293 DB *dbp;
01294 DB_LOG *dblp;
01295 DB_REP *db_rep;
01296 DBT control_dbt, key_dbt, rec_dbt;
01297 LOG *lp;
01298 REP *rep;
01299 u_int32_t txnid;
01300 int ret, t_ret;
01301
01302 db_rep = dbenv->rep_handle;
01303 rep = db_rep->region;
01304 dbp = db_rep->rep_db;
01305 dblp = dbenv->lg_handle;
01306 lp = dblp->reginfo.primary;
01307 ret = 0;
01308
01309 if (rp->rectype == REP_NEWFILE) {
01310 ret = __rep_newfile(dbenv, rp, &lp->ready_lsn);
01311
01312
01313 *typep = 0;
01314 return (0);
01315 }
01316
01317 memcpy(typep, rec->data, sizeof(*typep));
01318 memset(&control_dbt, 0, sizeof(control_dbt));
01319 memset(&rec_dbt, 0, sizeof(rec_dbt));
01320
01321
01322
01323
01324
01325
01326
01327
01328
01329
01330
01331
01332
01333
01334
01335
01336
01337
01338
01339 if (*typep != DB___txn_ckp || F_ISSET(rep, REP_F_RECOVER_LOG)) {
01340 if ((ret = __log_rep_put(dbenv, &rp->lsn, rec)) != 0)
01341 return (ret);
01342 rep->stat.st_log_records++;
01343 if (F_ISSET(rep, REP_F_RECOVER_LOG)) {
01344 *ret_lsnp = rp->lsn;
01345 goto out;
01346 }
01347 }
01348
01349 switch (*typep) {
01350 case DB___dbreg_register:
01351
01352
01353
01354
01355
01356
01357
01358
01359
01360
01361 memcpy(&txnid, (u_int8_t *)rec->data +
01362 SSZ(__dbreg_register_args, txnid), sizeof(u_int32_t));
01363 if (txnid == TXN_INVALID)
01364 ret = __db_dispatch(dbenv, dbenv->recover_dtab,
01365 dbenv->recover_dtab_size, rec, &rp->lsn,
01366 DB_TXN_APPLY, NULL);
01367 break;
01368 case DB___txn_regop:
01369
01370
01371
01372
01373
01374
01375
01376
01377 do {
01378 ret = 0;
01379 if (!F_ISSET(db_rep, DBREP_OPENFILES)) {
01380 ret = __txn_openfiles(dbenv, NULL, 1);
01381 F_SET(db_rep, DBREP_OPENFILES);
01382 }
01383 if (ret == 0)
01384 ret = __rep_process_txn(dbenv, rec);
01385 } while (ret == DB_LOCK_DEADLOCK);
01386
01387
01388 if (ret == 0 && !F_ISSET(dbenv, DB_ENV_TXN_NOSYNC))
01389 ret = __log_flush(dbenv, NULL);
01390 if (ret != 0) {
01391 __db_err(dbenv, "Error processing txn [%lu][%lu]",
01392 (u_long)rp->lsn.file, (u_long)rp->lsn.offset);
01393 ret = __db_panic(dbenv, ret);
01394 }
01395 break;
01396 case DB___txn_xa_regop:
01397 ret = __log_flush(dbenv, NULL);
01398 break;
01399 case DB___txn_ckp:
01400
01401
01402
01403
01404
01405
01406
01407
01408
01409 memset(&key_dbt, 0, sizeof(key_dbt));
01410 key_dbt.data = rp;
01411 key_dbt.size = sizeof(*rp);
01412
01413
01414
01415
01416
01417 ret = __db_put(dbp, NULL, &key_dbt, rec, DB_NOOVERWRITE);
01418 if (ret == DB_KEYEXIST) {
01419 if (ret_lsnp != NULL)
01420 *ret_lsnp = rp->lsn;
01421 ret = DB_REP_NOTPERM;
01422 }
01423 if (ret != 0)
01424 break;
01425
01426
01427
01428
01429
01430
01431
01432
01433
01434
01435 if ((ret = __rep_do_ckp(dbenv, rec, rp)) == 0)
01436 ret = __log_rep_put(dbenv, &rp->lsn, rec);
01437 if ((t_ret = __rep_remfirst(dbenv,
01438 &control_dbt, &rec_dbt)) != 0 && ret == 0)
01439 ret = t_ret;
01440 break;
01441 default:
01442 break;
01443 }
01444
01445 out:
01446 if (ret == 0 && F_ISSET(rp, DB_LOG_PERM))
01447 *ret_lsnp = rp->lsn;
01448 if (control_dbt.data != NULL)
01449 __os_ufree(dbenv, control_dbt.data);
01450 if (rec_dbt.data != NULL)
01451 __os_ufree(dbenv, rec_dbt.data);
01452
01453 return (ret);
01454 }
01455
01456
01457
01458
01459
01460
01461
01462 static int
01463 __rep_resend_req(dbenv, rereq)
01464 DB_ENV *dbenv;
01465 int rereq;
01466 {
01467
01468 DB_LOG *dblp;
01469 DB_LSN lsn;
01470 DB_REP *db_rep;
01471 LOG *lp;
01472 REP *rep;
01473 int ret;
01474 u_int32_t gapflags, repflags;
01475
01476 db_rep = dbenv->rep_handle;
01477 rep = db_rep->region;
01478 dblp = dbenv->lg_handle;
01479 lp = dblp->reginfo.primary;
01480 ret = 0;
01481
01482 repflags = rep->flags;
01483
01484
01485
01486 if (FLD_ISSET(repflags, REP_F_DELAY))
01487 return (ret);
01488 gapflags = rereq ? REP_GAP_REREQUEST : 0;
01489
01490 if (FLD_ISSET(repflags, REP_F_RECOVER_VERIFY)) {
01491 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
01492 lsn = lp->verify_lsn;
01493 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
01494 if (!IS_ZERO_LSN(lsn))
01495 (void)__rep_send_message(dbenv, rep->master_id,
01496 REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_REREQUEST);
01497 } else if (FLD_ISSET(repflags, REP_F_RECOVER_UPDATE)) {
01498
01499
01500
01501 (void)__rep_send_message(dbenv, rep->master_id,
01502 REP_UPDATE_REQ, NULL, NULL, 0, 0);
01503 } else if (FLD_ISSET(repflags, REP_F_RECOVER_PAGE)) {
01504 REP_SYSTEM_LOCK(dbenv);
01505 ret = __rep_pggap_req(dbenv, rep, NULL, gapflags);
01506 REP_SYSTEM_UNLOCK(dbenv);
01507 } else {
01508 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
01509 ret = __rep_loggap_req(dbenv, rep, NULL, gapflags);
01510 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
01511 }
01512
01513 return (ret);
01514 }
01515
01516
01517
01518
01519
01520
01521
01522
01523
01524
01525 int
01526 __rep_check_doreq(dbenv, rep)
01527 DB_ENV *dbenv;
01528 REP *rep;
01529 {
01530
01531 DB_LOG *dblp;
01532 LOG *lp;
01533 int req;
01534
01535 dblp = dbenv->lg_handle;
01536 lp = dblp->reginfo.primary;
01537 req = ++lp->rcvd_recs >= lp->wait_recs;
01538 if (req) {
01539 lp->wait_recs *= 2;
01540 if (lp->wait_recs > rep->max_gap)
01541 lp->wait_recs = rep->max_gap;
01542 lp->rcvd_recs = 0;
01543 }
01544 return (req);
01545 }
01546
01547
01548
01549
01550
01551
01552
01553 static int
01554 __rep_skip_msg(dbenv, rep, eid, rectype)
01555 DB_ENV *dbenv;
01556 REP *rep;
01557 int eid;
01558 u_int32_t rectype;
01559 {
01560 int do_req, ret;
01561
01562 ret = 0;
01563
01564
01565
01566
01567 if (rep->master_id != DB_EID_INVALID && eid != rep->master_id)
01568 do_req = 1;
01569 else {
01570
01571 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
01572 do_req = __rep_check_doreq(dbenv, rep);
01573 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
01574 }
01575
01576
01577
01578
01579 if (do_req && rectype != REP_MASTER_REQ) {
01580
01581
01582
01583
01584
01585
01586
01587
01588
01589 if (rep->master_id == DB_EID_INVALID)
01590 (void)__rep_send_message(dbenv,
01591 DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0);
01592 else if (eid == rep->master_id)
01593 ret = __rep_resend_req(dbenv, 0);
01594 else
01595 (void)__rep_send_message(dbenv,
01596 eid, REP_REREQUEST, NULL, NULL, 0, 0);
01597 }
01598 return (ret);
01599 }