00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #include "db_config.h"
00011
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #if TIME_WITH_SYS_TIME
00014 #include <sys/time.h>
00015 #include <time.h>
00016 #else
00017 #if HAVE_SYS_TIME_H
00018 #include <sys/time.h>
00019 #else
00020 #include <time.h>
00021 #endif
00022 #endif
00023
00024 #include <string.h>
00025 #endif
00026
00027 #include "db_int.h"
00028 #include "dbinc/db_page.h"
00029 #include "dbinc/db_shash.h"
00030 #include "dbinc/db_am.h"
00031 #include "dbinc/lock.h"
00032 #include "dbinc/log.h"
00033 #include "dbinc/mp.h"
00034 #include "dbinc/qam.h"
00035 #include "dbinc/txn.h"
00036
00037 static int __rep_filedone __P((DB_ENV *, int, REP *, __rep_fileinfo_args *,
00038 u_int32_t));
00039 static int __rep_find_dbs __P((DB_ENV *, u_int8_t *, size_t *,
00040 size_t *, u_int32_t *));
00041 static int __rep_get_fileinfo __P((DB_ENV *, const char *,
00042 const char *, __rep_fileinfo_args *, u_int8_t *, u_int32_t *));
00043 static int __rep_log_setup __P((DB_ENV *, REP *));
00044 static int __rep_mpf_open __P((DB_ENV *, DB_MPOOLFILE **,
00045 __rep_fileinfo_args *, u_int32_t));
00046 static int __rep_page_gap __P((DB_ENV *, REP *, __rep_fileinfo_args *,
00047 u_int32_t));
00048 static int __rep_page_sendpages __P((DB_ENV *, int,
00049 __rep_fileinfo_args *, DB_MPOOLFILE *, DB *));
00050 static int __rep_queue_filedone __P((DB_ENV *, REP *, __rep_fileinfo_args *));
00051 static int __rep_walk_dir __P((DB_ENV *, const char *, u_int8_t *,
00052 size_t *, size_t *, u_int32_t *));
00053 static int __rep_write_page __P((DB_ENV *, REP *, __rep_fileinfo_args *));
00054
00055
00056
00057
00058
00059
00060
00061 int
00062 __rep_update_req(dbenv, eid)
00063 DB_ENV *dbenv;
00064 int eid;
00065 {
00066 DBT updbt;
00067 DB_LOG *dblp;
00068 DB_LSN lsn;
00069 size_t filelen, filesz, updlen;
00070 u_int32_t filecnt;
00071 u_int8_t *buf, *fp;
00072 int ret;
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086 dblp = dbenv->lg_handle;
00087 filecnt = 0;
00088 filelen = 0;
00089 updlen = 0;
00090 filesz = MEGABYTE;
00091 if ((ret = __os_calloc(dbenv, 1, filesz, &buf)) != 0)
00092 return (ret);
00093
00094
00095
00096
00097
00098 fp = buf + sizeof(__rep_update_args);
00099 if ((ret = __rep_find_dbs(dbenv, fp, &filesz, &filelen, &filecnt)) != 0)
00100 goto err;
00101
00102
00103
00104
00105
00106 if ((ret = __log_get_stable_lsn(dbenv, &lsn)) != 0)
00107 goto err;
00108
00109
00110
00111
00112 if ((ret = __rep_update_buf(buf, filesz, &updlen, &lsn, filecnt)) != 0)
00113 goto err;
00114
00115
00116
00117 memset(&updbt, 0, sizeof(updbt));
00118 updbt.data = buf;
00119 updbt.size = (u_int32_t)(filelen + updlen);
00120 LOG_SYSTEM_LOCK(dbenv);
00121 lsn = ((LOG *)dblp->reginfo.primary)->lsn;
00122 LOG_SYSTEM_UNLOCK(dbenv);
00123 (void)__rep_send_message(dbenv, eid, REP_UPDATE, &lsn, &updbt, 0,
00124 DB_REP_ANYWHERE);
00125
00126 err:
00127 __os_free(dbenv, buf);
00128 return (ret);
00129 }
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139 static int
00140 __rep_find_dbs(dbenv, fp, fileszp, filelenp, filecntp)
00141 DB_ENV *dbenv;
00142 u_int8_t *fp;
00143 size_t *fileszp, *filelenp;
00144 u_int32_t *filecntp;
00145 {
00146 int ret;
00147 char **ddir;
00148
00149 ret = 0;
00150 if (dbenv->db_data_dir == NULL) {
00151
00152
00153
00154
00155 ret = __rep_walk_dir(dbenv, dbenv->db_home, fp,
00156 fileszp, filelenp, filecntp);
00157 } else {
00158 for (ddir = dbenv->db_data_dir; *ddir != NULL; ++ddir)
00159 if ((ret = __rep_walk_dir(dbenv, *ddir, fp,
00160 fileszp, filelenp, filecntp)) != 0)
00161 break;
00162 }
00163
00164
00165 if (ret == 0)
00166 ret = __rep_walk_dir(dbenv,
00167 NULL, fp, fileszp, filelenp, filecntp);
00168
00169 return (ret);
00170 }
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180 int
00181 __rep_walk_dir(dbenv, dir, fp, fileszp, filelenp, filecntp)
00182 DB_ENV *dbenv;
00183 const char *dir;
00184 u_int8_t *fp;
00185 size_t *fileszp, *filelenp;
00186 u_int32_t *filecntp;
00187 {
00188 DBT namedbt, uiddbt;
00189 __rep_fileinfo_args tmpfp;
00190 size_t len, offset;
00191 int cnt, i, ret;
00192 u_int8_t *rfp, uid[DB_FILE_ID_LEN];
00193 char *file, **names, *subdb;
00194 #ifdef DIAGNOSTIC
00195 REP *rep;
00196 DB_MSGBUF mb;
00197 DB_REP *db_rep;
00198
00199 db_rep = dbenv->rep_handle;
00200 rep = db_rep->region;
00201 #endif
00202 memset(&namedbt, 0, sizeof(namedbt));
00203 memset(&uiddbt, 0, sizeof(uiddbt));
00204 if (dir == NULL) {
00205 RPRINT(dbenv, rep, (dbenv, &mb,
00206 "Walk_dir: Getting info for in-memory named files"));
00207 if ((ret = __memp_inmemlist(dbenv, &names, &cnt)) != 0)
00208 return (ret);
00209 } else {
00210 RPRINT(dbenv, rep, (dbenv, &mb,
00211 "Walk_dir: Getting info for dir: %s", dir));
00212 if ((ret = __os_dirlist(dbenv, dir, &names, &cnt)) != 0)
00213 return (ret);
00214 }
00215 rfp = fp;
00216 RPRINT(dbenv, rep, (dbenv, &mb,
00217 "Walk_dir: Dir %s has %d files", dir, cnt));
00218 for (i = 0; i < cnt; i++) {
00219 RPRINT(dbenv, rep, (dbenv, &mb,
00220 "Walk_dir: File %d name: %s", i, names[i]));
00221
00222
00223
00224 if (strcmp(names[i], ".") == 0)
00225 continue;
00226 if (strcmp(names[i], "..") == 0)
00227 continue;
00228 if (strncmp(names[i], "__db", 4) == 0)
00229 continue;
00230 if (strncmp(names[i], "DB_CONFIG", 9) == 0)
00231 continue;
00232 if (strncmp(names[i], "log", 3) == 0)
00233 continue;
00234
00235
00236
00237
00238 if (dir == NULL) {
00239 file = NULL;
00240 subdb = names[i];
00241 } else {
00242 file = names[i];
00243 subdb = NULL;
00244 }
00245 if ((ret = __rep_get_fileinfo(dbenv,
00246 file, subdb, &tmpfp, uid, filecntp)) != 0) {
00247
00248
00249
00250 RPRINT(dbenv, rep, (dbenv, &mb,
00251 "Walk_dir: File %d %s: returned error %s",
00252 i, names[i], db_strerror(ret)));
00253 ret = 0;
00254 continue;
00255 }
00256 RPRINT(dbenv, rep, (dbenv, &mb,
00257 "Walk_dir: File %d (of %d) %s: pgsize %lu, max_pgno %lu",
00258 tmpfp.filenum, i, names[i],
00259 (u_long)tmpfp.pgsize, (u_long)tmpfp.max_pgno));
00260 namedbt.data = names[i];
00261 namedbt.size = (u_int32_t)strlen(names[i]) + 1;
00262 uiddbt.data = uid;
00263 uiddbt.size = DB_FILE_ID_LEN;
00264 retry:
00265 ret = __rep_fileinfo_buf(rfp, *fileszp, &len,
00266 tmpfp.pgsize, tmpfp.pgno, tmpfp.max_pgno,
00267 tmpfp.filenum, tmpfp.id, tmpfp.type,
00268 tmpfp.flags, &uiddbt, &namedbt);
00269 if (ret == ENOMEM) {
00270 offset = (size_t)(rfp - fp);
00271 *fileszp *= 2;
00272
00273
00274
00275
00276 fp -= sizeof(__rep_update_args);
00277 if ((ret = __os_realloc(dbenv, *fileszp, fp)) != 0)
00278 break;
00279 fp += sizeof(__rep_update_args);
00280 rfp = fp + offset;
00281
00282
00283
00284
00285 goto retry;
00286 }
00287 rfp += len;
00288 *filelenp += len;
00289 }
00290 __os_dirfree(dbenv, names, cnt);
00291 return (ret);
00292 }
00293
00294 static int
00295 __rep_get_fileinfo(dbenv, file, subdb, rfp, uid, filecntp)
00296 DB_ENV *dbenv;
00297 const char *file, *subdb;
00298 __rep_fileinfo_args *rfp;
00299 u_int8_t *uid;
00300 u_int32_t *filecntp;
00301 {
00302
00303 DB *dbp, *entdbp;
00304 DB_LOCK lk;
00305 DB_LOG *dblp;
00306 DB_MPOOLFILE *mpf;
00307 DBC *dbc;
00308 DBMETA *dbmeta;
00309 PAGE *pagep;
00310 int i, ret, t_ret;
00311
00312 dbp = NULL;
00313 dbc = NULL;
00314 pagep = NULL;
00315 mpf = NULL;
00316 LOCK_INIT(lk);
00317
00318 if ((ret = db_create(&dbp, dbenv, 0)) != 0)
00319 goto err;
00320 if ((ret = __db_open(dbp, NULL, file, subdb, DB_UNKNOWN,
00321 DB_RDONLY | (F_ISSET(dbenv, DB_ENV_THREAD) ? DB_THREAD : 0),
00322 0, PGNO_BASE_MD)) != 0)
00323 goto err;
00324
00325 if ((ret = __db_cursor(dbp, NULL, &dbc, 0)) != 0)
00326 goto err;
00327 if ((ret = __db_lget(
00328 dbc, 0, dbp->meta_pgno, DB_LOCK_READ, 0, &lk)) != 0)
00329 goto err;
00330 if ((ret = __memp_fget(dbp->mpf, &dbp->meta_pgno, 0, &pagep)) != 0)
00331 goto err;
00332
00333
00334
00335 dbmeta = (DBMETA *)pagep;
00336 rfp->pgno = 0;
00337
00338
00339
00340
00341 if (dbp->type == DB_QUEUE)
00342 rfp->max_pgno = 0;
00343 else
00344 rfp->max_pgno = dbmeta->last_pgno;
00345 rfp->pgsize = dbp->pgsize;
00346 memcpy(uid, dbp->fileid, DB_FILE_ID_LEN);
00347 rfp->filenum = (*filecntp)++;
00348 rfp->type = (u_int32_t)dbp->type;
00349 rfp->flags = dbp->flags;
00350 rfp->id = DB_LOGFILEID_INVALID;
00351 ret = __memp_fput(dbp->mpf, pagep, 0);
00352 pagep = NULL;
00353 if ((t_ret = __LPUT(dbc, lk)) != 0 && ret == 0)
00354 ret = t_ret;
00355 if (ret != 0)
00356 goto err;
00357 err:
00358 if ((t_ret = __LPUT(dbc, lk)) != 0 && ret == 0)
00359 ret = t_ret;
00360 if (dbc != NULL && (t_ret = __db_c_close(dbc)) != 0 && ret == 0)
00361 ret = t_ret;
00362 if (pagep != NULL &&
00363 (t_ret = __memp_fput(mpf, pagep, 0)) != 0 && ret == 0)
00364 ret = t_ret;
00365 if (dbp != NULL && (t_ret = __db_close(dbp, NULL, 0)) != 0 && ret == 0)
00366 ret = t_ret;
00367
00368
00369
00370
00371
00372 if (ret == 0) {
00373 LOG_SYSTEM_LOCK(dbenv);
00374
00375
00376
00377
00378 for (dblp = dbenv->lg_handle,
00379 i = 0; i < dblp->dbentry_cnt; i++) {
00380 entdbp = dblp->dbentry[i].dbp;
00381 if (entdbp == NULL)
00382 break;
00383 DB_ASSERT(entdbp->log_filename != NULL);
00384 if (memcmp(uid,
00385 entdbp->log_filename->ufid,
00386 DB_FILE_ID_LEN) == 0)
00387 rfp->id = i;
00388 }
00389 LOG_SYSTEM_UNLOCK(dbenv);
00390 }
00391 return (ret);
00392 }
00393
00394
00395
00396
00397
00398
00399
00400 int
00401 __rep_page_req(dbenv, eid, rec)
00402 DB_ENV *dbenv;
00403 int eid;
00404 DBT *rec;
00405 {
00406 __rep_fileinfo_args *msgfp;
00407 DB *dbp;
00408 DBT msgdbt;
00409 DB_LOG *dblp;
00410 DB_MPOOLFILE *mpf;
00411 DB_REP *db_rep;
00412 REP *rep;
00413 int ret, t_ret;
00414 void *next;
00415 #ifdef DIAGNOSTIC
00416 DB_MSGBUF mb;
00417 #endif
00418
00419 db_rep = dbenv->rep_handle;
00420 rep = db_rep->region;
00421 dblp = dbenv->lg_handle;
00422
00423 if ((ret = __rep_fileinfo_read(dbenv, rec->data, &next, &msgfp)) != 0)
00424 return (ret);
00425
00426
00427
00428
00429
00430 RPRINT(dbenv, rep, (dbenv, &mb, "page_req: file %d page %lu to %lu",
00431 msgfp->filenum, (u_long)msgfp->pgno, (u_long)msgfp->max_pgno));
00432 LOG_SYSTEM_LOCK(dbenv);
00433 if (msgfp->id >= 0 && dblp->dbentry_cnt > msgfp->id) {
00434 dbp = dblp->dbentry[msgfp->id].dbp;
00435 if (dbp != NULL) {
00436 DB_ASSERT(dbp->log_filename != NULL);
00437 if (memcmp(msgfp->uid.data, dbp->log_filename->ufid,
00438 DB_FILE_ID_LEN) == 0) {
00439 LOG_SYSTEM_UNLOCK(dbenv);
00440 RPRINT(dbenv, rep, (dbenv, &mb,
00441 "page_req: found %d in dbreg",
00442 msgfp->filenum));
00443 ret = __rep_page_sendpages(dbenv, eid,
00444 msgfp, dbp->mpf, dbp);
00445 goto err;
00446 }
00447 }
00448 }
00449 LOG_SYSTEM_UNLOCK(dbenv);
00450
00451
00452
00453
00454
00455
00456 RPRINT(dbenv, rep, (dbenv, &mb, "page_req: Open %d via mpf_open",
00457 msgfp->filenum));
00458 if ((ret = __rep_mpf_open(dbenv, &mpf, msgfp, 0)) != 0) {
00459 memset(&msgdbt, 0, sizeof(msgdbt));
00460 msgdbt.data = msgfp;
00461 msgdbt.size = sizeof(*msgfp);
00462 RPRINT(dbenv, rep, (dbenv, &mb, "page_req: Open %d failed",
00463 msgfp->filenum));
00464 if (F_ISSET(rep, REP_F_MASTER))
00465 (void)__rep_send_message(dbenv, eid, REP_FILE_FAIL,
00466 NULL, &msgdbt, 0, 0);
00467 else
00468 ret = DB_NOTFOUND;
00469 goto err;
00470 }
00471
00472 ret = __rep_page_sendpages(dbenv, eid, msgfp, mpf, NULL);
00473 t_ret = __memp_fclose(mpf, 0);
00474 if (ret == 0 && t_ret != 0)
00475 ret = t_ret;
00476 err:
00477 __os_free(dbenv, msgfp);
00478 return (ret);
00479 }
00480
00481 static int
00482 __rep_page_sendpages(dbenv, eid, msgfp, mpf, dbp)
00483 DB_ENV *dbenv;
00484 int eid;
00485 __rep_fileinfo_args *msgfp;
00486 DB_MPOOLFILE *mpf;
00487 DB *dbp;
00488 {
00489 DB *qdbp;
00490 DBT lockdbt, msgdbt, pgdbt;
00491 DB_LOCK lock;
00492 DB_LOCK_ILOCK lock_obj;
00493 DB_LOG *dblp;
00494 DB_LSN lsn;
00495 DB_MSGBUF mb;
00496 DB_REP *db_rep;
00497 PAGE *pagep;
00498 REP *rep;
00499 REP_BULK bulk;
00500 REP_THROTTLE repth;
00501 db_pgno_t p;
00502 uintptr_t bulkoff;
00503 size_t len, msgsz;
00504 u_int32_t bulkflags, lockid, use_bulk;
00505 int opened, ret, t_ret;
00506 u_int8_t *buf;
00507
00508 #ifndef DIAGNOSTIC
00509 DB_MSGBUF_INIT(&mb);
00510 #endif
00511 db_rep = dbenv->rep_handle;
00512 rep = db_rep->region;
00513 lockid = DB_LOCK_INVALIDID;
00514 opened = 0;
00515 qdbp = NULL;
00516 buf = NULL;
00517 bulk.addr = NULL;
00518 use_bulk = FLD_ISSET(rep->config, REP_C_BULK);
00519 if (msgfp->type == (u_int32_t)DB_QUEUE) {
00520 if (dbp == NULL) {
00521 if ((ret = db_create(&qdbp, dbenv, 0)) != 0)
00522 goto err;
00523
00524
00525
00526
00527
00528 if ((ret = __db_open(qdbp, NULL,
00529 FLD_ISSET(msgfp->flags, DB_AM_INMEM) ?
00530 NULL : msgfp->info.data,
00531 FLD_ISSET(msgfp->flags, DB_AM_INMEM) ?
00532 msgfp->info.data : NULL,
00533 DB_UNKNOWN,
00534 DB_RDONLY | (F_ISSET(dbenv, DB_ENV_THREAD) ?
00535 DB_THREAD : 0), 0, PGNO_BASE_MD)) != 0)
00536 goto err;
00537 opened = 1;
00538 } else
00539 qdbp = dbp;
00540 }
00541 msgsz = sizeof(__rep_fileinfo_args) + DB_FILE_ID_LEN + msgfp->pgsize;
00542 if ((ret = __os_calloc(dbenv, 1, msgsz, &buf)) != 0)
00543 goto err;
00544 memset(&msgdbt, 0, sizeof(msgdbt));
00545 memset(&pgdbt, 0, sizeof(pgdbt));
00546 RPRINT(dbenv, rep, (dbenv, &mb, "sendpages: file %d page %lu to %lu",
00547 msgfp->filenum, (u_long)msgfp->pgno, (u_long)msgfp->max_pgno));
00548 memset(&repth, 0, sizeof(repth));
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558 if (use_bulk && (ret = __rep_bulk_alloc(dbenv, &bulk, eid,
00559 &bulkoff, &bulkflags, REP_BULK_PAGE)) != 0)
00560 goto err;
00561 REP_SYSTEM_LOCK(dbenv);
00562 repth.gbytes = rep->gbytes;
00563 repth.bytes = rep->bytes;
00564 repth.type = REP_PAGE;
00565 repth.data_dbt = &msgdbt;
00566 REP_SYSTEM_UNLOCK(dbenv);
00567
00568
00569
00570
00571 LOCK_INIT(lock);
00572 memset(&lock_obj, 0, sizeof(lock_obj));
00573 if ((ret = __lock_id(dbenv, &lockid, NULL)) != 0)
00574 goto err;
00575 memcpy(lock_obj.fileid, mpf->fileid, DB_FILE_ID_LEN);
00576 lock_obj.type = DB_PAGE_LOCK;
00577
00578 memset(&lockdbt, 0, sizeof(lockdbt));
00579 lockdbt.data = &lock_obj;
00580 lockdbt.size = sizeof(lock_obj);
00581
00582 for (p = msgfp->pgno; p <= msgfp->max_pgno; p++) {
00583
00584
00585
00586
00587
00588 lock_obj.pgno = p;
00589 if ((ret = __lock_get(dbenv, lockid, DB_LOCK_NOWAIT, &lockdbt,
00590 DB_LOCK_READ, &lock)) != 0) {
00591
00592
00593
00594 if (ret == DB_LOCK_NOTGRANTED)
00595 continue;
00596
00597
00598
00599 goto err;
00600 }
00601 if (msgfp->type == (u_int32_t)DB_QUEUE && p != 0)
00602 #ifdef HAVE_QUEUE
00603 ret = __qam_fget(qdbp, &p, DB_MPOOL_CREATE, &pagep);
00604 #else
00605 ret = DB_PAGE_NOTFOUND;
00606 #endif
00607 else
00608 ret = __memp_fget(mpf, &p, DB_MPOOL_CREATE, &pagep);
00609 if (ret == DB_PAGE_NOTFOUND) {
00610 memset(&pgdbt, 0, sizeof(pgdbt));
00611 ZERO_LSN(lsn);
00612 msgfp->pgno = p;
00613 if (F_ISSET(rep, REP_F_MASTER)) {
00614 ret = 0;
00615 RPRINT(dbenv, rep, (dbenv, &mb,
00616 "sendpages: PAGE_FAIL on page %lu",
00617 (u_long)p));
00618 (void)__rep_send_message(dbenv, eid,
00619 REP_PAGE_FAIL, &lsn, &msgdbt, 0, 0);
00620 } else
00621 ret = DB_NOTFOUND;
00622 goto lockerr;
00623 } else if (ret != 0)
00624 goto lockerr;
00625 else {
00626 pgdbt.data = pagep;
00627 pgdbt.size = (u_int32_t)msgfp->pgsize;
00628 }
00629 len = 0;
00630 ret = __rep_fileinfo_buf(buf, msgsz, &len,
00631 msgfp->pgsize, p, msgfp->max_pgno,
00632 msgfp->filenum, msgfp->id, msgfp->type,
00633 msgfp->flags, &msgfp->uid, &pgdbt);
00634 if (msgfp->type != (u_int32_t)DB_QUEUE || p == 0)
00635 t_ret = __memp_fput(mpf, pagep, 0);
00636 #ifdef HAVE_QUEUE
00637 else
00638
00639
00640
00641
00642
00643 t_ret = __qam_fput(qdbp, p, pagep, 0);
00644 #endif
00645 if ((t_ret = __ENV_LPUT(dbenv, lock)) != 0 && ret == 0)
00646 ret = t_ret;
00647 if (ret != 0)
00648 goto err;
00649
00650 DB_ASSERT(len <= msgsz);
00651 msgdbt.data = buf;
00652 msgdbt.size = (u_int32_t)len;
00653
00654 dblp = dbenv->lg_handle;
00655 LOG_SYSTEM_LOCK(dbenv);
00656 repth.lsn = ((LOG *)dblp->reginfo.primary)->lsn;
00657 LOG_SYSTEM_UNLOCK(dbenv);
00658
00659
00660
00661
00662
00663 if (use_bulk)
00664 ret = __rep_bulk_message(dbenv, &bulk, &repth,
00665 &repth.lsn, &msgdbt, 0);
00666 if (!use_bulk || ret == DB_REP_BULKOVF)
00667 ret = __rep_send_throttle(dbenv, eid, &repth, 0);
00668 RPRINT(dbenv, rep, (dbenv, &mb,
00669 "sendpages: %lu, lsn [%lu][%lu]", (u_long)p,
00670 (u_long)repth.lsn.file, (u_long)repth.lsn.offset));
00671
00672
00673
00674
00675
00676 if (ret == 0)
00677 ret = t_ret;
00678 if (repth.type == REP_PAGE_MORE || ret != 0)
00679 break;
00680 }
00681
00682 if (0) {
00683 lockerr: if ((t_ret = __ENV_LPUT(dbenv, lock)) != 0 && ret == 0)
00684 ret = t_ret;
00685 }
00686 err:
00687
00688
00689
00690
00691 if (use_bulk && bulk.addr != NULL &&
00692 (t_ret = __rep_bulk_free(dbenv, &bulk, 0)) != 0 && ret == 0)
00693 ret = t_ret;
00694 if (opened && (t_ret = __db_close(qdbp, NULL, DB_NOSYNC)) != 0 &&
00695 ret == 0)
00696 ret = t_ret;
00697 if (buf != NULL)
00698 __os_free(dbenv, buf);
00699 if (lockid != DB_LOCK_INVALIDID && (t_ret = __lock_id_free(dbenv,
00700 lockid)) != 0 && ret == 0)
00701 ret = t_ret;
00702 return (ret);
00703 }
00704
00705
00706
00707
00708
00709
00710
00711 int
00712 __rep_update_setup(dbenv, eid, rp, rec)
00713 DB_ENV *dbenv;
00714 int eid;
00715 REP_CONTROL *rp;
00716 DBT *rec;
00717 {
00718 DB_LOG *dblp;
00719 DB_REP *db_rep;
00720 DBT pagereq_dbt;
00721 LOG *lp;
00722 REGENV *renv;
00723 REGINFO *infop;
00724 REP *rep;
00725 __rep_update_args *rup;
00726 int ret;
00727 u_int32_t count, infolen;
00728 void *next;
00729 #ifdef DIAGNOSTIC
00730 __rep_fileinfo_args *msgfp;
00731 DB_MSGBUF mb;
00732 #endif
00733
00734 db_rep = dbenv->rep_handle;
00735 rep = db_rep->region;
00736 dblp = dbenv->lg_handle;
00737 lp = dblp->reginfo.primary;
00738 ret = 0;
00739
00740 REP_SYSTEM_LOCK(dbenv);
00741 if (!F_ISSET(rep, REP_F_RECOVER_UPDATE)) {
00742 REP_SYSTEM_UNLOCK(dbenv);
00743 return (0);
00744 }
00745 F_CLR(rep, REP_F_RECOVER_UPDATE);
00746
00747
00748
00749
00750 F_SET(rep, REP_F_RECOVER_PAGE);
00751
00752
00753
00754
00755
00756
00757 if ((ret = __rep_lockout(dbenv, rep, 1)) != 0)
00758 goto err;
00759
00760
00761
00762
00763 infop = dbenv->reginfo;
00764 renv = infop->primary;
00765 (void)time(&renv->rep_timestamp);
00766
00767 REP_SYSTEM_UNLOCK(dbenv);
00768 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00769 lp->wait_recs = rep->request_gap;
00770 lp->rcvd_recs = 0;
00771 ZERO_LSN(lp->ready_lsn);
00772 ZERO_LSN(lp->waiting_lsn);
00773 ZERO_LSN(lp->max_wait_lsn);
00774 ZERO_LSN(lp->max_perm_lsn);
00775 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00776 if ((ret = __rep_update_read(dbenv, rec->data, &next, &rup)) != 0)
00777 goto err_nolock;
00778
00779
00780
00781
00782
00783 if ((ret = __db_truncate(db_rep->rep_db, NULL, &count)) != 0)
00784 goto err_nolock;
00785
00786
00787
00788
00789
00790 REP_SYSTEM_LOCK(dbenv);
00791 rep->first_lsn = rup->first_lsn;
00792 rep->last_lsn = rp->lsn;
00793 rep->nfiles = rup->num_files;
00794 rep->curfile = 0;
00795 rep->ready_pg = 0;
00796 rep->npages = 0;
00797 rep->waiting_pg = PGNO_INVALID;
00798 rep->max_wait_pg = PGNO_INVALID;
00799
00800 __os_free(dbenv, rup);
00801
00802 RPRINT(dbenv, rep, (dbenv, &mb,
00803 "Update setup for %d files.", rep->nfiles));
00804 RPRINT(dbenv, rep, (dbenv, &mb, "Update setup: First LSN [%lu][%lu].",
00805 (u_long)rep->first_lsn.file, (u_long)rep->first_lsn.offset));
00806 RPRINT(dbenv, rep, (dbenv, &mb, "Update setup: Last LSN [%lu][%lu]",
00807 (u_long)rep->last_lsn.file, (u_long)rep->last_lsn.offset));
00808
00809 infolen = rec->size - sizeof(__rep_update_args);
00810 if ((ret = __os_calloc(dbenv, 1, infolen, &rep->originfo)) != 0)
00811 goto err;
00812 memcpy(rep->originfo, next, infolen);
00813 rep->finfo = rep->originfo;
00814 if ((ret = __rep_fileinfo_read(dbenv,
00815 rep->finfo, &next, &rep->curinfo)) != 0) {
00816 RPRINT(dbenv, rep, (dbenv, &mb,
00817 "Update setup: Fileinfo read: %s", db_strerror(ret)));
00818 goto errmem1;
00819 }
00820 rep->nextinfo = next;
00821
00822 #ifdef DIAGNOSTIC
00823 msgfp = rep->curinfo;
00824 DB_ASSERT(msgfp->pgno == 0);
00825 #endif
00826
00827
00828
00829
00830
00831 if ((ret = __rep_client_dbinit(dbenv, 1, REP_PG)) != 0) {
00832 RPRINT(dbenv, rep, (dbenv, &mb,
00833 "Update setup: Client_dbinit %s", db_strerror(ret)));
00834 goto errmem;
00835 }
00836
00837
00838
00839
00840 memset(&pagereq_dbt, 0, sizeof(pagereq_dbt));
00841 pagereq_dbt.data = rep->finfo;
00842 pagereq_dbt.size = (u_int32_t)((u_int8_t *)rep->nextinfo -
00843 (u_int8_t *)rep->finfo);
00844
00845 RPRINT(dbenv, rep, (dbenv, &mb,
00846 "Update PAGE_REQ file 0: pgsize %lu, maxpg %lu",
00847 (u_long)rep->curinfo->pgsize,
00848 (u_long)rep->curinfo->max_pgno));
00849
00850
00851
00852 (void)__rep_send_message(dbenv, eid, REP_PAGE_REQ,
00853 NULL, &pagereq_dbt, 0, DB_REP_ANYWHERE);
00854 if (0) {
00855 errmem: __os_free(dbenv, rep->curinfo);
00856 errmem1: __os_free(dbenv, rep->originfo);
00857 rep->finfo = NULL;
00858 rep->curinfo = NULL;
00859 rep->originfo = NULL;
00860 }
00861
00862 if (0) {
00863 err_nolock: REP_SYSTEM_LOCK(dbenv);
00864 }
00865
00866 err:
00867
00868
00869
00870
00871 if (ret != 0) {
00872 RPRINT(dbenv, rep, (dbenv, &mb,
00873 "Update_setup: Error: Clear PAGE, set UPDATE again. %s",
00874 db_strerror(ret)));
00875 F_CLR(rep, REP_F_RECOVER_PAGE | REP_F_READY);
00876 rep->in_recovery = 0;
00877 F_SET(rep, REP_F_RECOVER_UPDATE);
00878 }
00879 REP_SYSTEM_UNLOCK(dbenv);
00880 return (ret);
00881 }
00882
00883
00884
00885
00886
00887
00888
00889 int
00890 __rep_bulk_page(dbenv, eid, rp, rec)
00891 DB_ENV *dbenv;
00892 int eid;
00893 REP_CONTROL *rp;
00894 DBT *rec;
00895 {
00896 DB_REP *db_rep;
00897 DBT pgrec;
00898 REP *rep;
00899 REP_CONTROL tmprp;
00900 u_int32_t len;
00901 int ret;
00902 u_int8_t *p, *ep;
00903 #ifdef DIAGNOSTIC
00904 DB_MSGBUF mb;
00905 #endif
00906
00907 memset(&pgrec, 0, sizeof(pgrec));
00908
00909
00910
00911
00912
00913
00914
00915 memcpy(&tmprp, rp, sizeof(tmprp));
00916 tmprp.rectype = REP_PAGE;
00917 ret = 0;
00918 db_rep = dbenv->rep_handle;
00919 rep = db_rep->region;
00920 for (ep = (u_int8_t *)rec->data + rec->size, p = (u_int8_t *)rec->data;
00921 p < ep; p += len) {
00922
00923
00924
00925
00926 memcpy(&len, p, sizeof(len));
00927 p += sizeof(len);
00928 memcpy(&tmprp.lsn, p, sizeof(DB_LSN));
00929 p += sizeof(DB_LSN);
00930 pgrec.data = p;
00931 pgrec.size = len;
00932 RPRINT(dbenv, rep, (dbenv, &mb,
00933 "rep_bulk_page: Processing LSN [%lu][%lu]",
00934 (u_long)tmprp.lsn.file, (u_long)tmprp.lsn.offset));
00935 RPRINT(dbenv, rep, (dbenv, &mb,
00936 "rep_bulk_page: p %#lx ep %#lx pgrec data %#lx, size %lu (%#lx)",
00937 P_TO_ULONG(p), P_TO_ULONG(ep), P_TO_ULONG(pgrec.data),
00938 (u_long)pgrec.size, (u_long)pgrec.size));
00939
00940
00941
00942 ret = __rep_page(dbenv, eid, &tmprp, &pgrec);
00943 RPRINT(dbenv, rep, (dbenv, &mb,
00944 "rep_bulk_page: rep_page ret %d", ret));
00945
00946 if (ret != 0)
00947 break;
00948 }
00949 return (ret);
00950 }
00951
00952
00953
00954
00955
00956
00957
00958 int
00959 __rep_page(dbenv, eid, rp, rec)
00960 DB_ENV *dbenv;
00961 int eid;
00962 REP_CONTROL *rp;
00963 DBT *rec;
00964 {
00965
00966 DB_REP *db_rep;
00967 DBT key, data;
00968 REP *rep;
00969 __rep_fileinfo_args *msgfp;
00970 db_recno_t recno;
00971 int ret;
00972 void *next;
00973 #ifdef DIAGNOSTIC
00974 DB_MSGBUF mb;
00975 #endif
00976
00977 ret = 0;
00978 db_rep = dbenv->rep_handle;
00979 rep = db_rep->region;
00980
00981 REP_SYSTEM_LOCK(dbenv);
00982 if (!F_ISSET(rep, REP_F_RECOVER_PAGE)) {
00983 REP_SYSTEM_UNLOCK(dbenv);
00984 return (0);
00985 }
00986 if ((ret = __rep_fileinfo_read(dbenv, rec->data, &next, &msgfp)) != 0) {
00987 REP_SYSTEM_UNLOCK(dbenv);
00988 return (ret);
00989 }
00990 RPRINT(dbenv, rep, (dbenv, &mb,
00991 "PAGE: Received page %lu from file %d",
00992 (u_long)msgfp->pgno, msgfp->filenum));
00993
00994
00995
00996
00997
00998
00999
01000
01001
01002 if (msgfp->filenum != rep->curfile) {
01003 RPRINT(dbenv, rep,
01004 (dbenv, &mb, "Msg file %d != curfile %d",
01005 msgfp->filenum, rep->curfile));
01006 goto err;
01007 }
01008
01009
01010
01011
01012 if ((ret = __rep_client_dbinit(dbenv, 1, REP_PG)) != 0)
01013 goto err;
01014
01015 REP_SYSTEM_UNLOCK(dbenv);
01016 memset(&key, 0, sizeof(key));
01017 memset(&data, 0, sizeof(data));
01018 recno = (db_recno_t)(msgfp->pgno + 1);
01019 key.data = &recno;
01020 key.ulen = key.size = sizeof(db_recno_t);
01021 key.flags = DB_DBT_USERMEM;
01022
01023
01024
01025
01026
01027
01028 ret = __db_put(rep->file_dbp, NULL, &key, &data, DB_NOOVERWRITE);
01029 if (ret == DB_KEYEXIST) {
01030 RPRINT(dbenv, rep, (dbenv, &mb,
01031 "PAGE: Received duplicate page %lu from file %d",
01032 (u_long)msgfp->pgno, msgfp->filenum));
01033 rep->stat.st_pg_duplicated++;
01034 ret = 0;
01035 goto err_nolock;
01036 }
01037 if (ret != 0)
01038 goto err_nolock;
01039
01040 RPRINT(dbenv, rep, (dbenv, &mb,
01041 "PAGE: Write page %lu into mpool", (u_long)msgfp->pgno));
01042 REP_SYSTEM_LOCK(dbenv);
01043
01044
01045
01046 ret = __rep_write_page(dbenv, rep, msgfp);
01047 if (ret != 0) {
01048
01049
01050
01051
01052
01053
01054
01055
01056
01057 (void)__db_del(rep->file_dbp, NULL, &key, 0);
01058 goto err;
01059 }
01060 rep->stat.st_pg_records++;
01061 rep->npages++;
01062
01063
01064
01065
01066
01067 if (log_compare(&rp->lsn, &rep->last_lsn) > 0)
01068 rep->last_lsn = rp->lsn;
01069
01070
01071
01072
01073
01074
01075
01076 ret = __rep_filedone(dbenv, eid, rep, msgfp, rp->rectype);
01077
01078 err: REP_SYSTEM_UNLOCK(dbenv);
01079
01080 err_nolock:
01081 __os_free(dbenv, msgfp);
01082 return (ret);
01083 }
01084
01085
01086
01087
01088
01089
01090
01091 int
01092 __rep_page_fail(dbenv, eid, rec)
01093 DB_ENV *dbenv;
01094 int eid;
01095 DBT *rec;
01096 {
01097
01098 DB_REP *db_rep;
01099 REP *rep;
01100 __rep_fileinfo_args *msgfp, *rfp;
01101 int ret;
01102 void *next;
01103 #ifdef DIAGNOSTIC
01104 DB_MSGBUF mb;
01105 #endif
01106
01107 ret = 0;
01108 db_rep = dbenv->rep_handle;
01109 rep = db_rep->region;
01110
01111 REP_SYSTEM_LOCK(dbenv);
01112 if (!F_ISSET(rep, REP_F_RECOVER_PAGE)) {
01113 REP_SYSTEM_UNLOCK(dbenv);
01114 return (0);
01115 }
01116 if ((ret = __rep_fileinfo_read(dbenv, rec->data, &next, &msgfp)) != 0) {
01117 REP_SYSTEM_UNLOCK(dbenv);
01118 return (ret);
01119 }
01120
01121
01122
01123
01124
01125
01126
01127
01128
01129 if (msgfp->filenum != rep->curfile) {
01130 RPRINT(dbenv, rep, (dbenv, &mb, "Msg file %d != curfile %d",
01131 msgfp->filenum, rep->curfile));
01132 REP_SYSTEM_UNLOCK(dbenv);
01133 return (0);
01134 }
01135 rfp = rep->curinfo;
01136 if (rfp->type != (u_int32_t)DB_QUEUE)
01137 --rfp->max_pgno;
01138 else {
01139
01140
01141
01142
01143
01144 RPRINT(dbenv, rep, (dbenv, &mb,
01145 "page_fail: BEFORE page %lu failed. ready %lu, max %lu, npages %d",
01146 (u_long)msgfp->pgno, (u_long)rep->ready_pg,
01147 (u_long)rfp->max_pgno, rep->npages));
01148 if (msgfp->pgno == rfp->max_pgno)
01149 --rfp->max_pgno;
01150 if (msgfp->pgno >= rep->ready_pg) {
01151 rep->ready_pg = msgfp->pgno + 1;
01152 rep->npages = rep->ready_pg;
01153 }
01154 RPRINT(dbenv, rep, (dbenv, &mb,
01155 "page_fail: AFTER page %lu failed. ready %lu, max %lu, npages %d",
01156 (u_long)msgfp->pgno, (u_long)rep->ready_pg,
01157 (u_long)rfp->max_pgno, rep->npages));
01158 }
01159
01160
01161
01162
01163
01164
01165
01166 ret = __rep_filedone(dbenv, eid, rep, msgfp, REP_PAGE_FAIL);
01167 REP_SYSTEM_UNLOCK(dbenv);
01168 return (ret);
01169 }
01170
01171
01172
01173
01174
01175 static int
01176 __rep_write_page(dbenv, rep, msgfp)
01177 DB_ENV *dbenv;
01178 REP *rep;
01179 __rep_fileinfo_args *msgfp;
01180 {
01181 __rep_fileinfo_args *rfp;
01182 DB_FH *rfh;
01183 int ret;
01184 void *dst;
01185 char *real_name;
01186
01187 real_name = NULL;
01188 rfp = NULL;
01189
01190
01191
01192
01193
01194
01195
01196
01197
01198
01199 rfp = rep->curinfo;
01200 if (rep->file_mpf == NULL) {
01201 if (!F_ISSET(rfp, DB_AM_INMEM)) {
01202 if ((ret = __db_appname(dbenv, DB_APP_DATA,
01203 rfp->info.data, 0, NULL, &real_name)) != 0)
01204 goto err;
01205
01206
01207
01208
01209 if ((ret = __memp_nameop(dbenv,
01210 rfp->uid.data, NULL, real_name, NULL, 0)) != 0)
01211 goto err;
01212
01213
01214
01215
01216 if ((ret = __os_open(dbenv, real_name,
01217 DB_OSO_CREATE, dbenv->db_mode, &rfh)) == 0)
01218 ret = __os_closehandle(dbenv, rfh);
01219 if (ret != 0)
01220 goto err;
01221 }
01222
01223 if ((ret =
01224 __rep_mpf_open(dbenv, &rep->file_mpf, rep->curinfo,
01225 F_ISSET(rfp, DB_AM_INMEM) ? DB_CREATE : 0)) != 0)
01226 goto err;
01227 }
01228
01229
01230
01231
01232
01233
01234
01235
01236 if (msgfp->type == (u_int32_t)DB_QUEUE && msgfp->pgno != 0) {
01237 #ifdef HAVE_QUEUE
01238 if ((ret = __qam_fget(
01239 rep->queue_dbp, &msgfp->pgno, DB_MPOOL_CREATE, &dst)) != 0)
01240 goto err;
01241 #else
01242
01243
01244
01245 ret = __db_no_queue_am(dbenv);
01246 goto err;
01247 #endif
01248 } else if ((ret = __memp_fget(
01249 rep->file_mpf, &msgfp->pgno, DB_MPOOL_CREATE, &dst)) != 0)
01250 goto err;
01251
01252 memcpy(dst, msgfp->info.data, msgfp->pgsize);
01253 if (msgfp->type != (u_int32_t)DB_QUEUE || msgfp->pgno == 0)
01254 ret = __memp_fput(rep->file_mpf, dst, DB_MPOOL_DIRTY);
01255 #ifdef HAVE_QUEUE
01256 else
01257 ret = __qam_fput(rep->queue_dbp, msgfp->pgno, dst,
01258 DB_MPOOL_DIRTY);
01259 #endif
01260
01261 err: if (real_name != NULL)
01262 __os_free(dbenv, real_name);
01263 return (ret);
01264 }
01265
01266
01267
01268
01269
01270
01271 static int
01272 __rep_page_gap(dbenv, rep, msgfp, type)
01273 DB_ENV *dbenv;
01274 REP *rep;
01275 __rep_fileinfo_args *msgfp;
01276 u_int32_t type;
01277 {
01278 DB_LOG *dblp;
01279 DBT data, key;
01280 LOG *lp;
01281 __rep_fileinfo_args *rfp;
01282 db_recno_t recno;
01283 int ret;
01284 #ifdef DIAGNOSTIC
01285 DB_MSGBUF mb;
01286 #endif
01287
01288 dblp = dbenv->lg_handle;
01289 lp = dblp->reginfo.primary;
01290 ret = 0;
01291
01292
01293
01294
01295
01296
01297
01298
01299
01300
01301
01302 REP_SYSTEM_UNLOCK(dbenv);
01303 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
01304 REP_SYSTEM_LOCK(dbenv);
01305 rfp = rep->curinfo;
01306
01307
01308
01309
01310
01311 if (rfp->filenum != msgfp->filenum) {
01312 ret = DB_REP_PAGEDONE;
01313 goto err;
01314 }
01315
01316
01317
01318
01319
01320
01321
01322
01323
01324
01325
01326
01327
01328
01329
01330 DB_ASSERT(msgfp->pgno >= rep->ready_pg);
01331
01332
01333
01334
01335
01336
01337
01338 RPRINT(dbenv, rep, (dbenv, &mb,
01339 "PAGE_GAP: pgno %lu, max_pg %lu ready %lu, waiting %lu max_wait %lu",
01340 (u_long)msgfp->pgno, (u_long)rfp->max_pgno, (u_long)rep->ready_pg,
01341 (u_long)rep->waiting_pg, (u_long)rep->max_wait_pg));
01342 if (msgfp->pgno > rep->ready_pg) {
01343 if (rep->waiting_pg == PGNO_INVALID ||
01344 msgfp->pgno < rep->waiting_pg)
01345 rep->waiting_pg = msgfp->pgno;
01346 } else {
01347
01348
01349
01350 rep->ready_pg++;
01351 lp->rcvd_recs = 0;
01352 while (ret == 0 && rep->ready_pg == rep->waiting_pg) {
01353
01354
01355
01356 lp->wait_recs = 0;
01357 lp->rcvd_recs = 0;
01358 rep->max_wait_pg = PGNO_INVALID;
01359
01360
01361
01362
01363 memset(&key, 0, sizeof(key));
01364 memset(&data, 0, sizeof(data));
01365 recno = (db_recno_t)rep->ready_pg;
01366 key.data = &recno;
01367 key.ulen = key.size = sizeof(db_recno_t);
01368 key.flags = DB_DBT_USERMEM;
01369 ret = __db_get(rep->file_dbp, NULL, &key, &data, 0);
01370 if (ret == DB_NOTFOUND || ret == DB_KEYEMPTY)
01371 break;
01372 else if (ret != 0)
01373 goto err;
01374 rep->ready_pg++;
01375 }
01376 }
01377
01378
01379
01380
01381
01382
01383 if (rep->ready_pg > rfp->max_pgno)
01384 goto err;
01385
01386
01387
01388
01389 if ((rep->waiting_pg != PGNO_INVALID &&
01390 rep->ready_pg != rep->waiting_pg) || type == REP_PAGE_MORE) {
01391
01392
01393
01394 if (lp->wait_recs == 0) {
01395
01396
01397
01398
01399
01400
01401 lp->wait_recs = rep->request_gap;
01402 lp->rcvd_recs = 0;
01403 rep->max_wait_pg = PGNO_INVALID;
01404 }
01405
01406
01407
01408 if ((__rep_check_doreq(dbenv, rep) || type == REP_PAGE_MORE) &&
01409 ((ret = __rep_pggap_req(dbenv, rep, rfp,
01410 (type == REP_PAGE_MORE) ? REP_GAP_FORCE : 0)) != 0))
01411 goto err;
01412 } else {
01413 lp->wait_recs = 0;
01414 rep->max_wait_pg = PGNO_INVALID;
01415 }
01416
01417 err:
01418 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
01419 return (ret);
01420 }
01421
01422
01423
01424
01425
01426
01427
01428 int
01429 __rep_init_cleanup(dbenv, rep, force)
01430 DB_ENV *dbenv;
01431 REP *rep;
01432 int force;
01433 {
01434 int ret, t_ret;
01435
01436 ret = 0;
01437
01438
01439
01440
01441
01442
01443
01444 if (rep->file_mpf != NULL) {
01445 ret = __memp_fclose(rep->file_mpf, 0);
01446 rep->file_mpf = NULL;
01447 }
01448 if (rep->file_dbp != NULL) {
01449 t_ret = __db_close(rep->file_dbp, NULL, DB_NOSYNC);
01450 rep->file_dbp = NULL;
01451 if (t_ret != 0 && ret == 0)
01452 ret = t_ret;
01453 }
01454 if (force && rep->queue_dbp != NULL) {
01455 t_ret = __db_close(rep->queue_dbp, NULL, DB_NOSYNC);
01456 rep->queue_dbp = NULL;
01457 if (t_ret != 0 && ret == 0)
01458 ret = t_ret;
01459 }
01460 if (rep->curinfo != NULL) {
01461 __os_free(dbenv, rep->curinfo);
01462 rep->curinfo = NULL;
01463 }
01464 if (rep->originfo != NULL &&
01465 (force || ++rep->curfile == rep->nfiles)) {
01466 __os_free(dbenv, rep->originfo);
01467 rep->originfo = NULL;
01468 }
01469 return (ret);
01470 }
01471
01472
01473
01474
01475
01476
01477
01478
01479
01480
01481 static int
01482 __rep_filedone(dbenv, eid, rep, msgfp, type)
01483 DB_ENV *dbenv;
01484 int eid;
01485 REP *rep;
01486 __rep_fileinfo_args *msgfp;
01487 u_int32_t type;
01488 {
01489 DBT dbt;
01490 __rep_fileinfo_args *rfp;
01491 int ret;
01492 #ifdef DIAGNOSTIC
01493 DB_MSGBUF mb;
01494 #endif
01495
01496
01497
01498
01499
01500 ret = __rep_page_gap(dbenv, rep, msgfp, type);
01501
01502
01503
01504
01505 if (ret == DB_REP_PAGEDONE)
01506 return (0);
01507
01508 rfp = rep->curinfo;
01509
01510
01511
01512
01513 RPRINT(dbenv, rep, (dbenv, &mb, "FILEDONE: have %lu pages. Need %lu.",
01514 (u_long)rep->npages, (u_long)rfp->max_pgno + 1));
01515 if (rep->npages <= rfp->max_pgno)
01516 return (0);
01517
01518
01519
01520
01521
01522
01523 if (rfp->type == (u_int32_t)DB_QUEUE &&
01524 ((ret = __rep_queue_filedone(dbenv, rep, rfp)) !=
01525 DB_REP_PAGEDONE))
01526 return (ret);
01527
01528
01529
01530 if ((ret = __rep_init_cleanup(dbenv, rep, 0)) != 0)
01531 goto err;
01532 if (rep->curfile == rep->nfiles) {
01533 RPRINT(dbenv, rep, (dbenv, &mb,
01534 "FILEDONE: have %d files. RECOVER_LOG now", rep->nfiles));
01535
01536
01537
01538
01539
01540
01541
01542
01543
01544
01545 if ((ret = __memp_sync(dbenv, NULL)) != 0)
01546 goto err;
01547 F_CLR(rep, REP_F_RECOVER_PAGE);
01548 F_SET(rep, REP_F_RECOVER_LOG);
01549 memset(&dbt, 0, sizeof(dbt));
01550 dbt.data = &rep->last_lsn;
01551 dbt.size = sizeof(rep->last_lsn);
01552 REP_SYSTEM_UNLOCK(dbenv);
01553 if ((ret = __rep_log_setup(dbenv, rep)) != 0)
01554 goto err;
01555 RPRINT(dbenv, rep, (dbenv, &mb,
01556 "FILEDONE: LOG_REQ from LSN [%lu][%lu] to [%lu][%lu]",
01557 (u_long)rep->first_lsn.file, (u_long)rep->first_lsn.offset,
01558 (u_long)rep->last_lsn.file, (u_long)rep->last_lsn.offset));
01559 (void)__rep_send_message(dbenv, eid,
01560 REP_LOG_REQ, &rep->first_lsn, &dbt, 0, DB_REP_ANYWHERE);
01561 REP_SYSTEM_LOCK(dbenv);
01562 return (0);
01563 }
01564
01565
01566
01567
01568 rep->finfo = rep->nextinfo;
01569 if ((ret = __rep_fileinfo_read(dbenv, rep->finfo, &rep->nextinfo,
01570 &rep->curinfo)) != 0)
01571 goto err;
01572 DB_ASSERT(rep->curinfo->pgno == 0);
01573 rep->ready_pg = 0;
01574 rep->npages = 0;
01575 rep->waiting_pg = PGNO_INVALID;
01576 rep->max_wait_pg = PGNO_INVALID;
01577 memset(&dbt, 0, sizeof(dbt));
01578 RPRINT(dbenv, rep, (dbenv, &mb,
01579 "FILEDONE: Next file %d. Request pages 0 to %lu",
01580 rep->curinfo->filenum, (u_long)rep->curinfo->max_pgno));
01581 dbt.data = rep->finfo;
01582 dbt.size = (u_int32_t)((u_int8_t *)rep->nextinfo -
01583 (u_int8_t *)rep->finfo);
01584 (void)__rep_send_message(dbenv, eid, REP_PAGE_REQ,
01585 NULL, &dbt, 0, DB_REP_ANYWHERE);
01586 err:
01587 return (ret);
01588 }
01589
01590
01591
01592
01593
01594
01595 static int
01596 __rep_mpf_open(dbenv, mpfp, rfp, flags)
01597 DB_ENV *dbenv;
01598 DB_MPOOLFILE **mpfp;
01599 __rep_fileinfo_args *rfp;
01600 u_int32_t flags;
01601 {
01602 DB db;
01603 int ret;
01604
01605 if ((ret = __memp_fcreate(dbenv, mpfp)) != 0)
01606 return (ret);
01607
01608
01609
01610
01611
01612 db.dbenv = dbenv;
01613 db.type = (DBTYPE)rfp->type;
01614 db.pgsize = rfp->pgsize;
01615 memcpy(db.fileid, rfp->uid.data, DB_FILE_ID_LEN);
01616 db.flags = rfp->flags;
01617
01618 F_CLR(&db, DB_AM_OPEN_CALLED);
01619 db.mpf = *mpfp;
01620 if (F_ISSET(&db, DB_AM_INMEM))
01621 (void)__memp_set_flags(db.mpf, DB_MPOOL_NOFILE, 1);
01622 if ((ret = __db_dbenv_mpool(&db, rfp->info.data, flags)) != 0) {
01623 (void)__memp_fclose(*mpfp, 0);
01624 *mpfp = NULL;
01625 }
01626 return (ret);
01627 }
01628
01629
01630
01631
01632
01633
01634
01635
01636 int
01637 __rep_pggap_req(dbenv, rep, reqfp, gapflags)
01638 DB_ENV *dbenv;
01639 REP *rep;
01640 __rep_fileinfo_args *reqfp;
01641 u_int32_t gapflags;
01642 {
01643 DBT max_pg_dbt;
01644 __rep_fileinfo_args *tmpfp;
01645 size_t len;
01646 u_int32_t flags;
01647 int alloc, ret;
01648
01649 ret = 0;
01650 alloc = 0;
01651
01652
01653
01654
01655
01656
01657
01658
01659 if (rep->curinfo == NULL)
01660 return (0);
01661 if (reqfp == NULL) {
01662 if ((ret = __rep_finfo_alloc(dbenv, rep->curinfo, &tmpfp)) != 0)
01663 return (ret);
01664 alloc = 1;
01665 } else
01666 tmpfp = reqfp;
01667
01668
01669
01670
01671
01672
01673
01674 flags = 0;
01675 memset(&max_pg_dbt, 0, sizeof(max_pg_dbt));
01676 tmpfp->pgno = rep->ready_pg;
01677 max_pg_dbt.data = rep->finfo;
01678 max_pg_dbt.size = (u_int32_t)((u_int8_t *)rep->nextinfo -
01679 (u_int8_t *)rep->finfo);
01680 if (rep->max_wait_pg == PGNO_INVALID ||
01681 FLD_ISSET(gapflags, REP_GAP_FORCE | REP_GAP_REREQUEST)) {
01682
01683
01684
01685
01686 if (rep->waiting_pg == PGNO_INVALID) {
01687 if (FLD_ISSET(gapflags,
01688 REP_GAP_FORCE | REP_GAP_REREQUEST))
01689 rep->max_wait_pg = rep->curinfo->max_pgno;
01690 else
01691 rep->max_wait_pg = rep->ready_pg;
01692 } else
01693 rep->max_wait_pg = rep->waiting_pg - 1;
01694 tmpfp->max_pgno = rep->max_wait_pg;
01695
01696
01697
01698 if (FLD_ISSET(gapflags, REP_GAP_REREQUEST))
01699 flags = DB_REP_REREQUEST;
01700 else
01701 flags = DB_REP_ANYWHERE;
01702 } else {
01703
01704
01705
01706 rep->max_wait_pg = rep->ready_pg;
01707 tmpfp->max_pgno = rep->ready_pg;
01708
01709
01710
01711 flags = DB_REP_REREQUEST;
01712 }
01713 if (rep->master_id != DB_EID_INVALID) {
01714 rep->stat.st_pg_requested++;
01715
01716
01717
01718
01719
01720
01721 ret = __rep_fileinfo_buf(rep->finfo, max_pg_dbt.size, &len,
01722 tmpfp->pgsize, tmpfp->pgno, tmpfp->max_pgno,
01723 tmpfp->filenum, tmpfp->id, tmpfp->type,
01724 tmpfp->flags, &tmpfp->uid, &tmpfp->info);
01725 DB_ASSERT(len == max_pg_dbt.size);
01726 (void)__rep_send_message(dbenv, rep->master_id,
01727 REP_PAGE_REQ, NULL, &max_pg_dbt, 0, flags);
01728 } else
01729 (void)__rep_send_message(dbenv, DB_EID_BROADCAST,
01730 REP_MASTER_REQ, NULL, NULL, 0, 0);
01731
01732 if (alloc)
01733 __os_free(dbenv, tmpfp);
01734 return (ret);
01735 }
01736
01737
01738
01739
01740
01741
01742
01743
01744 int
01745 __rep_finfo_alloc(dbenv, rfpsrc, rfpp)
01746 DB_ENV *dbenv;
01747 __rep_fileinfo_args *rfpsrc, **rfpp;
01748 {
01749 __rep_fileinfo_args *rfp;
01750 size_t size;
01751 int ret;
01752 void *uidp, *infop;
01753
01754
01755
01756
01757 size = sizeof(__rep_fileinfo_args) + rfpsrc->uid.size +
01758 rfpsrc->info.size;
01759 if ((ret = __os_malloc(dbenv, size, &rfp)) != 0)
01760 return (ret);
01761
01762
01763
01764
01765
01766 memcpy(rfp, rfpsrc, sizeof(__rep_fileinfo_args));
01767 uidp = (u_int8_t *)rfp + sizeof(__rep_fileinfo_args);
01768 rfp->uid.data = uidp;
01769 memcpy(uidp, rfpsrc->uid.data, rfpsrc->uid.size);
01770
01771 infop = (u_int8_t *)uidp + rfpsrc->uid.size;
01772 rfp->info.data = infop;
01773 memcpy(infop, rfpsrc->info.data, rfpsrc->info.size);
01774 *rfpp = rfp;
01775 return (ret);
01776 }
01777
01778
01779
01780
01781
01782
01783 static int
01784 __rep_log_setup(dbenv, rep)
01785 DB_ENV *dbenv;
01786 REP *rep;
01787 {
01788 DB_LOG *dblp;
01789 DB_LSN lsn;
01790 DB_TXNMGR *mgr;
01791 DB_TXNREGION *region;
01792 LOG *lp;
01793 u_int32_t fnum, lastfile;
01794 int ret;
01795 char *name;
01796
01797 dblp = dbenv->lg_handle;
01798 lp = dblp->reginfo.primary;
01799 mgr = dbenv->tx_handle;
01800 region = mgr->reginfo.primary;
01801
01802
01803
01804
01805 lastfile = lp->lsn.file;
01806 for (fnum = 1; fnum <= lastfile; fnum++) {
01807 if ((ret = __log_name(dblp, fnum, &name, NULL, 0)) != 0)
01808 goto err;
01809 (void)__os_unlink(dbenv, name);
01810 __os_free(dbenv, name);
01811 }
01812
01813
01814
01815
01816 ret = __log_newfile(dblp, &lsn, rep->first_lsn.file);
01817
01818
01819
01820
01821
01822
01823
01824 rep->first_lsn = lp->lsn;
01825 TXN_SYSTEM_LOCK(dbenv);
01826 ZERO_LSN(region->last_ckp);
01827 TXN_SYSTEM_UNLOCK(dbenv);
01828 err:
01829 return (ret);
01830 }
01831
01832
01833
01834
01835
01836
01837
01838
01839
01840
01841
01842
01843
01844
01845
01846
01847
01848
01849
01850
01851
01852 static int
01853 __rep_queue_filedone(dbenv, rep, rfp)
01854 DB_ENV *dbenv;
01855 REP *rep;
01856 __rep_fileinfo_args *rfp;
01857 {
01858 #ifndef HAVE_QUEUE
01859 COMPQUIET(rep, NULL);
01860 COMPQUIET(rfp, NULL);
01861 return (__db_no_queue_am(dbenv));
01862 #else
01863 db_pgno_t first, last;
01864 u_int32_t flags;
01865 int empty, ret, t_ret;
01866 #ifdef DIAGNOSTIC
01867 DB_MSGBUF mb;
01868 #endif
01869
01870 ret = 0;
01871 if (rep->queue_dbp == NULL) {
01872
01873
01874
01875
01876 if ((ret = __memp_sync(dbenv, NULL)) != 0)
01877 goto out;
01878 if ((ret = db_create(&rep->queue_dbp, dbenv, 0)) != 0)
01879 goto out;
01880 flags = DB_NO_AUTO_COMMIT |
01881 (F_ISSET(dbenv, DB_ENV_THREAD) ? DB_THREAD : 0);
01882
01883
01884
01885
01886 if ((ret = __db_open(rep->queue_dbp, NULL,
01887 FLD_ISSET(rfp->flags, DB_AM_INMEM) ? NULL : rfp->info.data,
01888 FLD_ISSET(rfp->flags, DB_AM_INMEM) ? rfp->info.data : NULL,
01889 DB_QUEUE, flags, 0, PGNO_BASE_MD)) != 0)
01890 goto out;
01891 }
01892 if ((ret = __queue_pageinfo(rep->queue_dbp,
01893 &first, &last, &empty, 0, 0)) != 0)
01894 goto out;
01895 RPRINT(dbenv, rep, (dbenv, &mb,
01896 "Queue fileinfo: first %lu, last %lu, empty %d",
01897 (u_long)first, (u_long)last, empty));
01898
01899
01900
01901
01902
01903
01904
01905
01906 if (rfp->max_pgno == 0) {
01907
01908
01909
01910
01911 if (empty)
01912 goto out;
01913 if (first > last) {
01914 rfp->max_pgno =
01915 QAM_RECNO_PAGE(rep->queue_dbp, UINT32_MAX);
01916 } else
01917 rfp->max_pgno = last;
01918 RPRINT(dbenv, rep, (dbenv, &mb,
01919 "Queue fileinfo: First req: first %lu, last %lu",
01920 (u_long)first, (u_long)rfp->max_pgno));
01921 goto req;
01922 } else if (rfp->max_pgno != last) {
01923
01924
01925
01926
01927
01928
01929 first = 1;
01930 rfp->max_pgno = last;
01931 RPRINT(dbenv, rep, (dbenv, &mb,
01932 "Queue fileinfo: Wrap req: first %lu, last %lu",
01933 (u_long)first, (u_long)last));
01934 req:
01935
01936
01937
01938
01939
01940 rep->npages = first;
01941 rep->ready_pg = first;
01942 rep->waiting_pg = rfp->max_pgno + 1;
01943 rep->max_wait_pg = PGNO_INVALID;
01944 ret = __rep_pggap_req(dbenv, rep, rfp, 0);
01945 return (ret);
01946 }
01947
01948
01949
01950
01951
01952 out:
01953 if (rep->queue_dbp != NULL &&
01954 (t_ret = __db_close(rep->queue_dbp, NULL, DB_NOSYNC)) != 0 &&
01955 ret == 0)
01956 ret = t_ret;
01957 rep->queue_dbp = NULL;
01958 if (ret == 0)
01959 ret = DB_REP_PAGEDONE;
01960 return (ret);
01961 #endif
01962 }