00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include "db_config.h"
00011
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #if TIME_WITH_SYS_TIME
00014 #include <sys/time.h>
00015 #include <time.h>
00016 #else
00017 #if HAVE_SYS_TIME_H
00018 #include <sys/time.h>
00019 #else
00020 #include <time.h>
00021 #endif
00022 #endif
00023
00024 #include <stdlib.h>
00025 #include <string.h>
00026 #endif
00027
00028 #include "db_int.h"
00029 #include "dbinc/db_page.h"
00030 #include "dbinc/db_am.h"
00031 #include "dbinc/log.h"
00032
00033
00034
00035
00036
00037
00038
00039 int
00040 __rep_allreq(dbenv, rp, eid)
00041 DB_ENV *dbenv;
00042 REP_CONTROL *rp;
00043 int eid;
00044 {
00045 DB_LOGC *logc;
00046 DB_LSN oldfilelsn;
00047 DB_REP *db_rep;
00048 DBT data_dbt;
00049 REP *rep;
00050 REP_BULK bulk;
00051 REP_THROTTLE repth;
00052 uintptr_t bulkoff;
00053 u_int32_t bulkflags, flags, use_bulk;
00054 int ret, t_ret;
00055
00056 ret = 0;
00057 db_rep = dbenv->rep_handle;
00058 rep = db_rep->region;
00059
00060 if ((ret = __log_cursor(dbenv, &logc)) != 0)
00061 return (ret);
00062 memset(&data_dbt, 0, sizeof(data_dbt));
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073 use_bulk = FLD_ISSET(rep->config, REP_C_BULK);
00074 if (use_bulk && (ret = __rep_bulk_alloc(dbenv, &bulk, eid,
00075 &bulkoff, &bulkflags, REP_BULK_LOG)) != 0)
00076 goto err;
00077 memset(&repth, 0, sizeof(repth));
00078 REP_SYSTEM_LOCK(dbenv);
00079 repth.gbytes = rep->gbytes;
00080 repth.bytes = rep->bytes;
00081 oldfilelsn = repth.lsn = rp->lsn;
00082 repth.type = REP_LOG;
00083 repth.data_dbt = &data_dbt;
00084 REP_SYSTEM_UNLOCK(dbenv);
00085 flags = IS_ZERO_LSN(rp->lsn) ||
00086 IS_INIT_LSN(rp->lsn) ? DB_FIRST : DB_SET;
00087
00088
00089
00090
00091
00092
00093
00094 ret = __log_c_get(logc, &repth.lsn, &data_dbt, flags);
00095 if (ret == DB_NOTFOUND) {
00096 if (F_ISSET(rep, REP_F_MASTER))
00097 ret = 0;
00098 goto err;
00099 }
00100
00101
00102
00103
00104
00105 for (;
00106 ret == 0 && repth.type != REP_LOG_MORE;
00107 ret = __log_c_get(logc, &repth.lsn, &data_dbt, DB_NEXT)) {
00108
00109
00110
00111
00112
00113
00114 if (repth.lsn.file != 1 && flags == DB_FIRST) {
00115 (void)__rep_send_message(dbenv, eid,
00116 REP_VERIFY_FAIL, &repth.lsn, NULL, 0, 0);
00117 break;
00118 }
00119 if (repth.lsn.file != oldfilelsn.file)
00120 (void)__rep_send_message(dbenv,
00121 eid, REP_NEWFILE, &oldfilelsn, NULL, 0, 0);
00122
00123
00124
00125
00126
00127 if (use_bulk)
00128 ret = __rep_bulk_message(dbenv, &bulk, &repth,
00129 &repth.lsn, &data_dbt, DB_LOG_RESEND);
00130 if (!use_bulk || ret == DB_REP_BULKOVF)
00131 ret = __rep_send_throttle(dbenv, eid, &repth, 0);
00132 if (ret != 0)
00133 break;
00134
00135
00136
00137
00138 oldfilelsn = repth.lsn;
00139 oldfilelsn.offset += logc->c_len;
00140 }
00141
00142 if (ret == DB_NOTFOUND)
00143 ret = 0;
00144
00145
00146
00147
00148 if (use_bulk && (t_ret = __rep_bulk_free(dbenv, &bulk,
00149 DB_LOG_RESEND)) != 0 && ret == 0)
00150 ret = t_ret;
00151 err:
00152 if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
00153 ret = t_ret;
00154 return (ret);
00155 }
00156
00157
00158
00159
00160
00161
00162
00163
00164 int
00165 __rep_log(dbenv, rp, rec, savetime, ret_lsnp)
00166 DB_ENV *dbenv;
00167 REP_CONTROL *rp;
00168 DBT *rec;
00169 time_t savetime;
00170 DB_LSN *ret_lsnp;
00171 {
00172 DB_LOG *dblp;
00173 DB_LSN lsn;
00174 DB_REP *db_rep;
00175 LOG *lp;
00176 REP *rep;
00177 int is_dup, master, ret;
00178
00179 is_dup = ret = 0;
00180 db_rep = dbenv->rep_handle;
00181 rep = db_rep->region;
00182 dblp = dbenv->lg_handle;
00183 lp = dblp->reginfo.primary;
00184
00185 ret = __rep_apply(dbenv, rp, rec, ret_lsnp, &is_dup);
00186 switch (ret) {
00187
00188
00189
00190
00191 case DB_REP_LOGREADY:
00192 if ((ret = __log_flush(dbenv, NULL)) != 0)
00193 goto out;
00194 if ((ret = __rep_verify_match(dbenv, &rep->last_lsn,
00195 savetime)) == 0) {
00196 REP_SYSTEM_LOCK(dbenv);
00197 ZERO_LSN(rep->first_lsn);
00198 ZERO_LSN(rep->last_lsn);
00199 F_CLR(rep, REP_F_RECOVER_LOG);
00200 REP_SYSTEM_UNLOCK(dbenv);
00201 }
00202 break;
00203
00204
00205
00206
00207
00208
00209
00210
00211 case DB_REP_ISPERM:
00212 case DB_REP_NOTPERM:
00213 case 0:
00214 if (is_dup)
00215 goto out;
00216 else
00217 break;
00218
00219
00220
00221 default:
00222 goto out;
00223 }
00224 if (rp->rectype == REP_LOG_MORE) {
00225 REP_SYSTEM_LOCK(dbenv);
00226 master = rep->master_id;
00227 REP_SYSTEM_UNLOCK(dbenv);
00228 LOG_SYSTEM_LOCK(dbenv);
00229 lsn = lp->lsn;
00230 LOG_SYSTEM_UNLOCK(dbenv);
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00247 if (master == DB_EID_INVALID) {
00248 ret = 0;
00249 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00250 } else if (IS_ZERO_LSN(lp->waiting_lsn)) {
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262 lp->wait_recs = rep->max_gap;
00263 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00264 if (__rep_send_message(dbenv, master, REP_ALL_REQ,
00265 &lsn, NULL, 0, DB_REP_ANYWHERE) != 0)
00266 goto out;
00267 } else {
00268 ret = __rep_loggap_req(dbenv, rep, &lsn, REP_GAP_FORCE);
00269 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00270 }
00271 }
00272 out:
00273 return (ret);
00274 }
00275
00276
00277
00278
00279
00280
00281
00282
00283 int
00284 __rep_bulk_log(dbenv, rp, rec, savetime, ret_lsnp)
00285 DB_ENV *dbenv;
00286 REP_CONTROL *rp;
00287 DBT *rec;
00288 time_t savetime;
00289 DB_LSN *ret_lsnp;
00290 {
00291 DB_REP *db_rep;
00292 REP *rep;
00293 int ret;
00294
00295 db_rep = dbenv->rep_handle;
00296 rep = db_rep->region;
00297
00298 ret = __log_rep_split(dbenv, rp, rec, ret_lsnp);
00299 switch (ret) {
00300
00301
00302
00303
00304 case DB_REP_LOGREADY:
00305 if ((ret = __log_flush(dbenv, NULL)) != 0)
00306 goto out;
00307 if ((ret = __rep_verify_match(dbenv, &rep->last_lsn,
00308 savetime)) == 0) {
00309 REP_SYSTEM_LOCK(dbenv);
00310 ZERO_LSN(rep->first_lsn);
00311 ZERO_LSN(rep->last_lsn);
00312 F_CLR(rep, REP_F_RECOVER_LOG);
00313 REP_SYSTEM_UNLOCK(dbenv);
00314 }
00315 break;
00316
00317
00318
00319 default:
00320 break;
00321 }
00322 out:
00323 return (ret);
00324 }
00325
00326
00327
00328
00329
00330
00331
00332 int
00333 __rep_logreq(dbenv, rp, rec, eid)
00334 DB_ENV *dbenv;
00335 REP_CONTROL *rp;
00336 DBT *rec;
00337 int eid;
00338 {
00339 DB_LOG *dblp;
00340 DB_LOGC *logc;
00341 DB_LSN endlsn, lsn, oldfilelsn;
00342 DB_REP *db_rep;
00343 DBT data_dbt;
00344 LOG *lp;
00345 REP *rep;
00346 REP_BULK bulk;
00347 REP_THROTTLE repth;
00348 uintptr_t bulkoff;
00349 u_int32_t bulkflags, use_bulk;
00350 int ret, t_ret;
00351 #ifdef DIAGNOSTIC
00352 DB_MSGBUF mb;
00353 #endif
00354
00355 ret = 0;
00356 db_rep = dbenv->rep_handle;
00357 rep = db_rep->region;
00358 dblp = dbenv->lg_handle;
00359 lp = dblp->reginfo.primary;
00360
00361 if (rec != NULL && rec->size != 0) {
00362 RPRINT(dbenv, rep, (dbenv, &mb,
00363 "[%lu][%lu]: LOG_REQ max lsn: [%lu][%lu]",
00364 (u_long) rp->lsn.file, (u_long)rp->lsn.offset,
00365 (u_long)((DB_LSN *)rec->data)->file,
00366 (u_long)((DB_LSN *)rec->data)->offset));
00367 }
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381
00382 memset(&data_dbt, 0, sizeof(data_dbt));
00383 oldfilelsn = lsn = rp->lsn;
00384 if ((ret = __log_cursor(dbenv, &logc)) != 0)
00385 return (ret);
00386 ret = __log_c_get(logc, &lsn, &data_dbt, DB_SET);
00387
00388 if (ret == 0)
00389 (void)__rep_send_message(dbenv,
00390 eid, REP_LOG, &lsn, &data_dbt, DB_LOG_RESEND, 0);
00391 else if (ret == DB_NOTFOUND) {
00392 LOG_SYSTEM_LOCK(dbenv);
00393 endlsn = lp->lsn;
00394 LOG_SYSTEM_UNLOCK(dbenv);
00395 if (endlsn.file > lsn.file) {
00396
00397
00398
00399
00400
00401
00402
00403
00404 endlsn.file = lsn.file + 1;
00405 endlsn.offset = 0;
00406 if ((ret = __log_c_get(logc,
00407 &endlsn, &data_dbt, DB_SET)) != 0 ||
00408 (ret = __log_c_get(logc,
00409 &endlsn, &data_dbt, DB_PREV)) != 0) {
00410 RPRINT(dbenv, rep, (dbenv, &mb,
00411 "Unable to get prev of [%lu][%lu]",
00412 (u_long)lsn.file,
00413 (u_long)lsn.offset));
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431 if (F_ISSET(rep, REP_F_MASTER)) {
00432 ret = 0;
00433 (void)__rep_send_message(dbenv, eid,
00434 REP_VERIFY_FAIL, &rp->lsn,
00435 NULL, 0, 0);
00436 } else
00437 ret = DB_NOTFOUND;
00438 } else {
00439 endlsn.offset += logc->c_len;
00440 (void)__rep_send_message(dbenv, eid,
00441 REP_NEWFILE, &endlsn, NULL, 0, 0);
00442 }
00443 } else {
00444
00445
00446
00447
00448
00449
00450 if (F_ISSET(rep, REP_F_MASTER)) {
00451 __db_err(dbenv,
00452 "Request for LSN [%lu][%lu] fails",
00453 (u_long)lsn.file, (u_long)lsn.offset);
00454 DB_ASSERT(0);
00455 ret = EINVAL;
00456 }
00457 }
00458 }
00459 if (ret != 0)
00460 goto err;
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475 use_bulk = FLD_ISSET(rep->config, REP_C_BULK);
00476 if (use_bulk && (ret = __rep_bulk_alloc(dbenv, &bulk, eid,
00477 &bulkoff, &bulkflags, REP_BULK_LOG)) != 0)
00478 goto err;
00479 memset(&repth, 0, sizeof(repth));
00480 REP_SYSTEM_LOCK(dbenv);
00481 repth.gbytes = rep->gbytes;
00482 repth.bytes = rep->bytes;
00483 repth.type = REP_LOG;
00484 repth.data_dbt = &data_dbt;
00485 REP_SYSTEM_UNLOCK(dbenv);
00486 while (ret == 0 && rec != NULL && rec->size != 0 &&
00487 repth.type == REP_LOG) {
00488 if ((ret =
00489 __log_c_get(logc, &repth.lsn, &data_dbt, DB_NEXT)) != 0) {
00490
00491
00492
00493
00494
00495 if (ret == DB_NOTFOUND && F_ISSET(rep, REP_F_MASTER))
00496 ret = 0;
00497 break;
00498 }
00499 if (log_compare(&repth.lsn, (DB_LSN *)rec->data) >= 0)
00500 break;
00501 if (repth.lsn.file != oldfilelsn.file)
00502 (void)__rep_send_message(dbenv,
00503 eid, REP_NEWFILE, &oldfilelsn, NULL, 0, 0);
00504
00505
00506
00507
00508
00509 if (use_bulk)
00510 ret = __rep_bulk_message(dbenv, &bulk, &repth,
00511 &repth.lsn, &data_dbt, DB_LOG_RESEND);
00512 if (!use_bulk || ret == DB_REP_BULKOVF)
00513 ret = __rep_send_throttle(dbenv, eid, &repth, 0);
00514 if (ret != 0)
00515 break;
00516
00517
00518
00519
00520 oldfilelsn = repth.lsn;
00521 oldfilelsn.offset += logc->c_len;
00522 }
00523
00524
00525
00526
00527
00528 if (use_bulk && (t_ret = __rep_bulk_free(dbenv, &bulk,
00529 DB_LOG_RESEND)) != 0 && ret == 0)
00530 ret = t_ret;
00531 err:
00532 if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
00533 ret = t_ret;
00534 return (ret);
00535 }
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548 int
00549 __rep_loggap_req(dbenv, rep, lsnp, gapflags)
00550 DB_ENV *dbenv;
00551 REP *rep;
00552 DB_LSN *lsnp;
00553 u_int32_t gapflags;
00554 {
00555 DB_LOG *dblp;
00556 DBT max_lsn_dbt, *max_lsn_dbtp;
00557 DB_LSN next_lsn;
00558 LOG *lp;
00559 u_int32_t flags, type;
00560
00561 dblp = dbenv->lg_handle;
00562 lp = dblp->reginfo.primary;
00563 LOG_SYSTEM_LOCK(dbenv);
00564 next_lsn = lp->lsn;
00565 LOG_SYSTEM_UNLOCK(dbenv);
00566 flags = 0;
00567 type = REP_LOG_REQ;
00568
00569
00570
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581 if (FLD_ISSET(gapflags, (REP_GAP_FORCE | REP_GAP_REREQUEST)) ||
00582 IS_ZERO_LSN(lp->max_wait_lsn) ||
00583 (lsnp != NULL && log_compare(lsnp, &lp->max_wait_lsn) == 0)) {
00584 lp->max_wait_lsn = lp->waiting_lsn;
00585 if (IS_ZERO_LSN(lp->max_wait_lsn))
00586 type = REP_ALL_REQ;
00587 memset(&max_lsn_dbt, 0, sizeof(max_lsn_dbt));
00588 max_lsn_dbt.data = &lp->waiting_lsn;
00589 max_lsn_dbt.size = sizeof(lp->waiting_lsn);
00590 max_lsn_dbtp = &max_lsn_dbt;
00591
00592
00593
00594
00595 if (FLD_ISSET(gapflags, REP_GAP_REREQUEST))
00596 flags = DB_REP_REREQUEST;
00597 else
00598 flags = DB_REP_ANYWHERE;
00599 } else {
00600 max_lsn_dbtp = NULL;
00601 lp->max_wait_lsn = next_lsn;
00602
00603
00604
00605 flags = DB_REP_REREQUEST;
00606 }
00607 if (rep->master_id != DB_EID_INVALID) {
00608 rep->stat.st_log_requested++;
00609 (void)__rep_send_message(dbenv, rep->master_id,
00610 type, &next_lsn, max_lsn_dbtp, 0, flags);
00611 } else
00612 (void)__rep_send_message(dbenv, DB_EID_BROADCAST,
00613 REP_MASTER_REQ, NULL, NULL, 0, 0);
00614
00615 return (0);
00616 }