Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

rep_backup.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 2004-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  *
00007  * $Id: rep_backup.c,v 12.38 2005/11/09 14:17:30 margo Exp $
00008  */
00009 
00010 #include "db_config.h"
00011 
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #if TIME_WITH_SYS_TIME
00014 #include <sys/time.h>
00015 #include <time.h>
00016 #else
00017 #if HAVE_SYS_TIME_H
00018 #include <sys/time.h>
00019 #else
00020 #include <time.h>
00021 #endif
00022 #endif
00023 
00024 #include <string.h>
00025 #endif
00026 
00027 #include "db_int.h"
00028 #include "dbinc/db_page.h"
00029 #include "dbinc/db_shash.h"
00030 #include "dbinc/db_am.h"
00031 #include "dbinc/lock.h"
00032 #include "dbinc/log.h"
00033 #include "dbinc/mp.h"
00034 #include "dbinc/qam.h"
00035 #include "dbinc/txn.h"
00036 
00037 static int __rep_filedone __P((DB_ENV *, int, REP *, __rep_fileinfo_args *,
00038     u_int32_t));
00039 static int __rep_find_dbs __P((DB_ENV *, u_int8_t *, size_t *,
00040     size_t *, u_int32_t *));
00041 static int __rep_get_fileinfo __P((DB_ENV *, const char *,
00042     const char *, __rep_fileinfo_args *, u_int8_t *, u_int32_t *));
00043 static int __rep_log_setup __P((DB_ENV *, REP *));
00044 static int __rep_mpf_open __P((DB_ENV *, DB_MPOOLFILE **,
00045     __rep_fileinfo_args *, u_int32_t));
00046 static int __rep_page_gap __P((DB_ENV *, REP *, __rep_fileinfo_args *,
00047     u_int32_t));
00048 static int __rep_page_sendpages __P((DB_ENV *, int,
00049     __rep_fileinfo_args *, DB_MPOOLFILE *, DB *));
00050 static int __rep_queue_filedone __P((DB_ENV *, REP *, __rep_fileinfo_args *));
00051 static int __rep_walk_dir __P((DB_ENV *, const char *, u_int8_t *,
00052     size_t *, size_t *, u_int32_t *));
00053 static int __rep_write_page __P((DB_ENV *, REP *, __rep_fileinfo_args *));
00054 
00055 /*
00056  * __rep_update_req -
00057  *      Process an update_req and send the file information to the client.
00058  *
00059  * PUBLIC: int __rep_update_req __P((DB_ENV *, int));
00060  */
00061 int
00062 __rep_update_req(dbenv, eid)
00063         DB_ENV *dbenv;
00064         int eid;
00065 {
00066         DBT updbt;
00067         DB_LOG *dblp;
00068         DB_LSN lsn;
00069         size_t filelen, filesz, updlen;
00070         u_int32_t filecnt;
00071         u_int8_t *buf, *fp;
00072         int ret;
00073 
00074         /*
00075          * Allocate enough for all currently open files and then some.
00076          * Optimize for the common use of having most databases open.
00077          * Allocate dbentry_cnt * 2 plus an estimated 60 bytes per
00078          * file for the filename/path (or multiplied by 120).
00079          *
00080          * The data we send looks like this:
00081          *      __rep_update_args
00082          *      __rep_fileinfo_args
00083          *      __rep_fileinfo_args
00084          *      ...
00085          */
00086         dblp = dbenv->lg_handle;
00087         filecnt = 0;
00088         filelen = 0;
00089         updlen = 0;
00090         filesz = MEGABYTE;
00091         if ((ret = __os_calloc(dbenv, 1, filesz, &buf)) != 0)
00092                 return (ret);
00093 
00094         /*
00095          * First get our file information.  Get in-memory files first
00096          * then get on-disk files.
00097          */
00098         fp = buf + sizeof(__rep_update_args);
00099         if ((ret = __rep_find_dbs(dbenv, fp, &filesz, &filelen, &filecnt)) != 0)
00100                 goto err;
00101 
00102         /*
00103          * Now get our first LSN.  We send the lsn of the first
00104          * non-archivable log file.
00105          */
00106         if ((ret = __log_get_stable_lsn(dbenv, &lsn)) != 0)
00107                 goto err;
00108 
00109         /*
00110          * Package up the update information.
00111          */
00112         if ((ret = __rep_update_buf(buf, filesz, &updlen, &lsn, filecnt)) != 0)
00113                 goto err;
00114         /*
00115          * We have all the file information now.  Send it to the client.
00116          */
00117         memset(&updbt, 0, sizeof(updbt));
00118         updbt.data = buf;
00119         updbt.size = (u_int32_t)(filelen + updlen);
00120         LOG_SYSTEM_LOCK(dbenv);
00121         lsn = ((LOG *)dblp->reginfo.primary)->lsn;
00122         LOG_SYSTEM_UNLOCK(dbenv);
00123         (void)__rep_send_message(dbenv, eid, REP_UPDATE, &lsn, &updbt, 0,
00124             DB_REP_ANYWHERE);
00125 
00126 err:
00127         __os_free(dbenv, buf);
00128         return (ret);
00129 }
00130 
00131 /*
00132  * __rep_find_dbs -
00133  *      Walk through all the named files/databases including those in the
00134  *      environment or data_dirs and those that in named and in-memory.  We
00135  *      need to open them, gather the necessary information and then close
00136  *      them. Then we need to figure out if they're already in the dbentry
00137  *      array.
00138  */
00139 static int
00140 __rep_find_dbs(dbenv, fp, fileszp, filelenp, filecntp)
00141         DB_ENV *dbenv;
00142         u_int8_t *fp;
00143         size_t *fileszp, *filelenp;
00144         u_int32_t *filecntp;
00145 {
00146         int ret;
00147         char **ddir;
00148 
00149         ret = 0;
00150         if (dbenv->db_data_dir == NULL) {
00151                 /*
00152                  * If we don't have a data dir, we have just the
00153                  * env home dir.
00154                  */
00155                 ret = __rep_walk_dir(dbenv, dbenv->db_home, fp,
00156                     fileszp, filelenp, filecntp);
00157         } else {
00158                 for (ddir = dbenv->db_data_dir; *ddir != NULL; ++ddir)
00159                         if ((ret = __rep_walk_dir(dbenv, *ddir, fp,
00160                             fileszp, filelenp, filecntp)) != 0)
00161                                 break;
00162         }
00163 
00164         /* Now, collect any in-memory named databases. */
00165         if (ret == 0)
00166                 ret = __rep_walk_dir(dbenv,
00167                     NULL, fp, fileszp, filelenp, filecntp);
00168 
00169         return (ret);
00170 }
00171 
00172 /*
00173  * __rep_walk_dir --
00174  *
00175  * This is the routine that walks a directory and fills in the structures
00176  * that we use to generate messages to the client telling it what files
00177  * files are available.  If the directory name is NULL, then we should
00178  * walk the list of in-memory named files.
00179  */
00180 int
00181 __rep_walk_dir(dbenv, dir, fp, fileszp, filelenp, filecntp)
00182         DB_ENV *dbenv;
00183         const char *dir;
00184         u_int8_t *fp;
00185         size_t *fileszp, *filelenp;
00186         u_int32_t *filecntp;
00187 {
00188         DBT namedbt, uiddbt;
00189         __rep_fileinfo_args tmpfp;
00190         size_t len, offset;
00191         int cnt, i, ret;
00192         u_int8_t *rfp, uid[DB_FILE_ID_LEN];
00193         char *file, **names, *subdb;
00194 #ifdef DIAGNOSTIC
00195         REP *rep;
00196         DB_MSGBUF mb;
00197         DB_REP *db_rep;
00198 
00199         db_rep = dbenv->rep_handle;
00200         rep = db_rep->region;
00201 #endif
00202         memset(&namedbt, 0, sizeof(namedbt));
00203         memset(&uiddbt, 0, sizeof(uiddbt));
00204         if (dir == NULL) {
00205                 RPRINT(dbenv, rep, (dbenv, &mb,
00206                     "Walk_dir: Getting info for in-memory named files"));
00207                 if ((ret = __memp_inmemlist(dbenv, &names, &cnt)) != 0)
00208                         return (ret);
00209         } else {
00210                 RPRINT(dbenv, rep, (dbenv, &mb,
00211                     "Walk_dir: Getting info for dir: %s", dir));
00212                 if ((ret = __os_dirlist(dbenv, dir, &names, &cnt)) != 0)
00213                         return (ret);
00214         }
00215         rfp = fp;
00216         RPRINT(dbenv, rep, (dbenv, &mb,
00217             "Walk_dir: Dir %s has %d files", dir, cnt));
00218         for (i = 0; i < cnt; i++) {
00219                 RPRINT(dbenv, rep, (dbenv, &mb,
00220                     "Walk_dir: File %d name: %s", i, names[i]));
00221                 /*
00222                  * Skip DB-owned files: ., ..,  __db*, DB_CONFIG, log*
00223                  */
00224                 if (strcmp(names[i], ".") == 0)
00225                         continue;
00226                 if (strcmp(names[i], "..") == 0)
00227                         continue;
00228                 if (strncmp(names[i], "__db", 4) == 0)
00229                         continue;
00230                 if (strncmp(names[i], "DB_CONFIG", 9) == 0)
00231                         continue;
00232                 if (strncmp(names[i], "log", 3) == 0)
00233                         continue;
00234                 /*
00235                  * We found a file to process.  Check if we need
00236                  * to allocate more space.
00237                  */
00238                 if (dir == NULL) {
00239                         file = NULL;
00240                         subdb = names[i];
00241                 } else {
00242                         file = names[i];
00243                         subdb = NULL;
00244                 }
00245                 if ((ret = __rep_get_fileinfo(dbenv,
00246                     file, subdb, &tmpfp, uid, filecntp)) != 0) {
00247                         /*
00248                          * If we find a file that isn't a database, skip it.
00249                          */
00250                         RPRINT(dbenv, rep, (dbenv, &mb,
00251                             "Walk_dir: File %d %s: returned error %s",
00252                             i, names[i], db_strerror(ret)));
00253                         ret = 0;
00254                         continue;
00255                 }
00256                 RPRINT(dbenv, rep, (dbenv, &mb,
00257                     "Walk_dir: File %d (of %d) %s: pgsize %lu, max_pgno %lu",
00258                     tmpfp.filenum, i, names[i],
00259                     (u_long)tmpfp.pgsize, (u_long)tmpfp.max_pgno));
00260                 namedbt.data = names[i];
00261                 namedbt.size = (u_int32_t)strlen(names[i]) + 1;
00262                 uiddbt.data = uid;
00263                 uiddbt.size = DB_FILE_ID_LEN;
00264 retry:
00265                 ret = __rep_fileinfo_buf(rfp, *fileszp, &len,
00266                     tmpfp.pgsize, tmpfp.pgno, tmpfp.max_pgno,
00267                     tmpfp.filenum, tmpfp.id, tmpfp.type,
00268                     tmpfp.flags, &uiddbt, &namedbt);
00269                 if (ret == ENOMEM) {
00270                         offset = (size_t)(rfp - fp);
00271                         *fileszp *= 2;
00272                         /*
00273                          * Need to account for update info on both sides
00274                          * of the allocation.
00275                          */
00276                         fp -= sizeof(__rep_update_args);
00277                         if ((ret = __os_realloc(dbenv, *fileszp, fp)) != 0)
00278                                 break;
00279                         fp += sizeof(__rep_update_args);
00280                         rfp = fp + offset;
00281                         /*
00282                          * Now that we've reallocated the space, try to
00283                          * store it again.
00284                          */
00285                         goto retry;
00286                 }
00287                 rfp += len;
00288                 *filelenp += len;
00289         }
00290         __os_dirfree(dbenv, names, cnt);
00291         return (ret);
00292 }
00293 
00294 static int
00295 __rep_get_fileinfo(dbenv, file, subdb, rfp, uid, filecntp)
00296         DB_ENV *dbenv;
00297         const char *file, *subdb;
00298         __rep_fileinfo_args *rfp;
00299         u_int8_t *uid;
00300         u_int32_t *filecntp;
00301 {
00302 
00303         DB *dbp, *entdbp;
00304         DB_LOCK lk;
00305         DB_LOG *dblp;
00306         DB_MPOOLFILE *mpf;
00307         DBC *dbc;
00308         DBMETA *dbmeta;
00309         PAGE *pagep;
00310         int i, ret, t_ret;
00311 
00312         dbp = NULL;
00313         dbc = NULL;
00314         pagep = NULL;
00315         mpf = NULL;
00316         LOCK_INIT(lk);
00317 
00318         if ((ret = db_create(&dbp, dbenv, 0)) != 0)
00319                 goto err;
00320         if ((ret = __db_open(dbp, NULL, file, subdb, DB_UNKNOWN,
00321             DB_RDONLY | (F_ISSET(dbenv, DB_ENV_THREAD) ? DB_THREAD : 0),
00322             0, PGNO_BASE_MD)) != 0)
00323                 goto err;
00324 
00325         if ((ret = __db_cursor(dbp, NULL, &dbc, 0)) != 0)
00326                 goto err;
00327         if ((ret = __db_lget(
00328             dbc, 0, dbp->meta_pgno, DB_LOCK_READ, 0, &lk)) != 0)
00329                 goto err;
00330         if ((ret = __memp_fget(dbp->mpf, &dbp->meta_pgno, 0, &pagep)) != 0)
00331                 goto err;
00332         /*
00333          * We have the meta page.  Set up our information.
00334          */
00335         dbmeta = (DBMETA *)pagep;
00336         rfp->pgno = 0;
00337         /*
00338          * Queue is a special-case.  We need to set max_pgno to 0 so that
00339          * the client can compute the pages from the meta-data.
00340          */
00341         if (dbp->type == DB_QUEUE)
00342                 rfp->max_pgno = 0;
00343         else
00344                 rfp->max_pgno = dbmeta->last_pgno;
00345         rfp->pgsize = dbp->pgsize;
00346         memcpy(uid, dbp->fileid, DB_FILE_ID_LEN);
00347         rfp->filenum = (*filecntp)++;
00348         rfp->type = (u_int32_t)dbp->type;
00349         rfp->flags = dbp->flags;
00350         rfp->id = DB_LOGFILEID_INVALID;
00351         ret = __memp_fput(dbp->mpf, pagep, 0);
00352         pagep = NULL;
00353         if ((t_ret = __LPUT(dbc, lk)) != 0 && ret == 0)
00354                 ret = t_ret;
00355         if (ret != 0)
00356                 goto err;
00357 err:
00358         if ((t_ret = __LPUT(dbc, lk)) != 0 && ret == 0)
00359                 ret = t_ret;
00360         if (dbc != NULL && (t_ret = __db_c_close(dbc)) != 0 && ret == 0)
00361                 ret = t_ret;
00362         if (pagep != NULL &&
00363             (t_ret = __memp_fput(mpf, pagep, 0)) != 0 && ret == 0)
00364                 ret = t_ret;
00365         if (dbp != NULL && (t_ret = __db_close(dbp, NULL, 0)) != 0 && ret == 0)
00366                 ret = t_ret;
00367         /*
00368          * We walk the entry table now, after closing the dbp because
00369          * otherwise we find the open from this function and the id
00370          * is useless in that case.
00371          */
00372         if (ret == 0) {
00373                 LOG_SYSTEM_LOCK(dbenv);
00374                 /*
00375                  * Walk entry table looking for this uid.
00376                  * If we find it, save the id.
00377                  */
00378                 for (dblp = dbenv->lg_handle,
00379                     i = 0; i < dblp->dbentry_cnt; i++) {
00380                         entdbp = dblp->dbentry[i].dbp;
00381                         if (entdbp == NULL)
00382                                 break;
00383                         DB_ASSERT(entdbp->log_filename != NULL);
00384                         if (memcmp(uid,
00385                             entdbp->log_filename->ufid,
00386                             DB_FILE_ID_LEN) == 0)
00387                                 rfp->id = i;
00388                 }
00389                 LOG_SYSTEM_UNLOCK(dbenv);
00390         }
00391         return (ret);
00392 }
00393 
00394 /*
00395  * __rep_page_req
00396  *      Process a page_req and send the page information to the client.
00397  *
00398  * PUBLIC: int __rep_page_req __P((DB_ENV *, int, DBT *));
00399  */
00400 int
00401 __rep_page_req(dbenv, eid, rec)
00402         DB_ENV *dbenv;
00403         int eid;
00404         DBT *rec;
00405 {
00406         __rep_fileinfo_args *msgfp;
00407         DB *dbp;
00408         DBT msgdbt;
00409         DB_LOG *dblp;
00410         DB_MPOOLFILE *mpf;
00411         DB_REP *db_rep;
00412         REP *rep;
00413         int ret, t_ret;
00414         void *next;
00415 #ifdef DIAGNOSTIC
00416         DB_MSGBUF mb;
00417 #endif
00418 
00419         db_rep = dbenv->rep_handle;
00420         rep = db_rep->region;
00421         dblp = dbenv->lg_handle;
00422 
00423         if ((ret = __rep_fileinfo_read(dbenv, rec->data, &next, &msgfp)) != 0)
00424                 return (ret);
00425 
00426         /*
00427          * See if we can find it already.  If so we can quickly access its
00428          * mpool and process.  Otherwise we have to open the file ourselves.
00429          */
00430         RPRINT(dbenv, rep, (dbenv, &mb, "page_req: file %d page %lu to %lu",
00431             msgfp->filenum, (u_long)msgfp->pgno, (u_long)msgfp->max_pgno));
00432         LOG_SYSTEM_LOCK(dbenv);
00433         if (msgfp->id >= 0 && dblp->dbentry_cnt > msgfp->id) {
00434                 dbp = dblp->dbentry[msgfp->id].dbp;
00435                 if (dbp != NULL) {
00436                         DB_ASSERT(dbp->log_filename != NULL);
00437                         if (memcmp(msgfp->uid.data, dbp->log_filename->ufid,
00438                             DB_FILE_ID_LEN) == 0) {
00439                                 LOG_SYSTEM_UNLOCK(dbenv);
00440                                 RPRINT(dbenv, rep, (dbenv, &mb,
00441                                     "page_req: found %d in dbreg",
00442                                     msgfp->filenum));
00443                                 ret = __rep_page_sendpages(dbenv, eid,
00444                                     msgfp, dbp->mpf, dbp);
00445                                 goto err;
00446                         }
00447                 }
00448         }
00449         LOG_SYSTEM_UNLOCK(dbenv);
00450 
00451         /*
00452          * If we get here, we do not have the file open via dbreg.
00453          * We need to open the file and then send its pages.
00454          * If we cannot open the file, we send REP_FILE_FAIL.
00455          */
00456         RPRINT(dbenv, rep, (dbenv, &mb, "page_req: Open %d via mpf_open",
00457             msgfp->filenum));
00458         if ((ret = __rep_mpf_open(dbenv, &mpf, msgfp, 0)) != 0) {
00459                 memset(&msgdbt, 0, sizeof(msgdbt));
00460                 msgdbt.data = msgfp;
00461                 msgdbt.size = sizeof(*msgfp);
00462                 RPRINT(dbenv, rep, (dbenv, &mb, "page_req: Open %d failed",
00463                     msgfp->filenum));
00464                 if (F_ISSET(rep, REP_F_MASTER))
00465                         (void)__rep_send_message(dbenv, eid, REP_FILE_FAIL,
00466                             NULL, &msgdbt, 0, 0);
00467                 else
00468                         ret = DB_NOTFOUND;
00469                 goto err;
00470         }
00471 
00472         ret = __rep_page_sendpages(dbenv, eid, msgfp, mpf, NULL);
00473         t_ret = __memp_fclose(mpf, 0);
00474         if (ret == 0 && t_ret != 0)
00475                 ret = t_ret;
00476 err:
00477         __os_free(dbenv, msgfp);
00478         return (ret);
00479 }
00480 
00481 static int
00482 __rep_page_sendpages(dbenv, eid, msgfp, mpf, dbp)
00483         DB_ENV *dbenv;
00484         int eid;
00485         __rep_fileinfo_args *msgfp;
00486         DB_MPOOLFILE *mpf;
00487         DB *dbp;
00488 {
00489         DB *qdbp;
00490         DBT lockdbt, msgdbt, pgdbt;
00491         DB_LOCK lock;
00492         DB_LOCK_ILOCK lock_obj;
00493         DB_LOG *dblp;
00494         DB_LSN lsn;
00495         DB_MSGBUF mb;
00496         DB_REP *db_rep;
00497         PAGE *pagep;
00498         REP *rep;
00499         REP_BULK bulk;
00500         REP_THROTTLE repth;
00501         db_pgno_t p;
00502         uintptr_t bulkoff;
00503         size_t len, msgsz;
00504         u_int32_t bulkflags, lockid, use_bulk;
00505         int opened, ret, t_ret;
00506         u_int8_t *buf;
00507 
00508 #ifndef DIAGNOSTIC
00509         DB_MSGBUF_INIT(&mb);
00510 #endif
00511         db_rep = dbenv->rep_handle;
00512         rep = db_rep->region;
00513         lockid = DB_LOCK_INVALIDID;
00514         opened = 0;
00515         qdbp = NULL;
00516         buf = NULL;
00517         bulk.addr = NULL;
00518         use_bulk = FLD_ISSET(rep->config, REP_C_BULK);
00519         if (msgfp->type == (u_int32_t)DB_QUEUE) {
00520                 if (dbp == NULL) {
00521                         if ((ret = db_create(&qdbp, dbenv, 0)) != 0)
00522                                 goto err;
00523                         /*
00524                          * We need to check whether this is in-memory so that
00525                          * we pass the name correctly as either the file or
00526                          * the database name.
00527                          */
00528                         if ((ret = __db_open(qdbp, NULL,
00529                             FLD_ISSET(msgfp->flags, DB_AM_INMEM) ?
00530                             NULL : msgfp->info.data,
00531                             FLD_ISSET(msgfp->flags, DB_AM_INMEM) ?
00532                             msgfp->info.data : NULL,
00533                             DB_UNKNOWN,
00534                             DB_RDONLY | (F_ISSET(dbenv, DB_ENV_THREAD) ?
00535                             DB_THREAD : 0), 0, PGNO_BASE_MD)) != 0)
00536                                 goto err;
00537                         opened = 1;
00538                 } else
00539                         qdbp = dbp;
00540         }
00541         msgsz = sizeof(__rep_fileinfo_args) + DB_FILE_ID_LEN + msgfp->pgsize;
00542         if ((ret = __os_calloc(dbenv, 1, msgsz, &buf)) != 0)
00543                 goto err;
00544         memset(&msgdbt, 0, sizeof(msgdbt));
00545         memset(&pgdbt, 0, sizeof(pgdbt));
00546         RPRINT(dbenv, rep, (dbenv, &mb, "sendpages: file %d page %lu to %lu",
00547             msgfp->filenum, (u_long)msgfp->pgno, (u_long)msgfp->max_pgno));
00548         memset(&repth, 0, sizeof(repth));
00549         /*
00550          * If we're doing bulk transfer, allocate a bulk buffer to put our
00551          * pages in.  We still need to initialize the throttle info
00552          * because if we encounter a page larger than our entire bulk
00553          * buffer, we need to send it as a singleton.
00554          *
00555          * Use a local var so that we don't need to worry if someone else
00556          * turns on/off bulk in the middle of our call here.
00557          */
00558         if (use_bulk && (ret = __rep_bulk_alloc(dbenv, &bulk, eid,
00559             &bulkoff, &bulkflags, REP_BULK_PAGE)) != 0)
00560                 goto err;
00561         REP_SYSTEM_LOCK(dbenv);
00562         repth.gbytes = rep->gbytes;
00563         repth.bytes = rep->bytes;
00564         repth.type = REP_PAGE;
00565         repth.data_dbt = &msgdbt;
00566         REP_SYSTEM_UNLOCK(dbenv);
00567 
00568         /*
00569          * Set up locking.
00570          */
00571         LOCK_INIT(lock);
00572         memset(&lock_obj, 0, sizeof(lock_obj));
00573         if ((ret = __lock_id(dbenv, &lockid, NULL)) != 0)
00574                 goto err;
00575         memcpy(lock_obj.fileid, mpf->fileid, DB_FILE_ID_LEN);
00576         lock_obj.type = DB_PAGE_LOCK;
00577 
00578         memset(&lockdbt, 0, sizeof(lockdbt));
00579         lockdbt.data = &lock_obj;
00580         lockdbt.size = sizeof(lock_obj);
00581 
00582         for (p = msgfp->pgno; p <= msgfp->max_pgno; p++) {
00583                 /*
00584                  * We're not waiting for the lock, if we cannot get
00585                  * the lock for this page, skip it.  The gap
00586                  * code will rerequest it.
00587                  */
00588                 lock_obj.pgno = p;
00589                 if ((ret = __lock_get(dbenv, lockid, DB_LOCK_NOWAIT, &lockdbt,
00590                     DB_LOCK_READ, &lock)) != 0) {
00591                         /*
00592                          * Continue if we couldn't get the lock.
00593                          */
00594                         if (ret == DB_LOCK_NOTGRANTED)
00595                                 continue;
00596                         /*
00597                          * Otherwise we have an error.
00598                          */
00599                         goto err;
00600                 }
00601                 if (msgfp->type == (u_int32_t)DB_QUEUE && p != 0)
00602 #ifdef HAVE_QUEUE
00603                         ret = __qam_fget(qdbp, &p, DB_MPOOL_CREATE, &pagep);
00604 #else
00605                         ret = DB_PAGE_NOTFOUND;
00606 #endif
00607                 else
00608                         ret = __memp_fget(mpf, &p, DB_MPOOL_CREATE, &pagep);
00609                 if (ret == DB_PAGE_NOTFOUND) {
00610                         memset(&pgdbt, 0, sizeof(pgdbt));
00611                         ZERO_LSN(lsn);
00612                         msgfp->pgno = p;
00613                         if (F_ISSET(rep, REP_F_MASTER)) {
00614                                 ret = 0;
00615                                 RPRINT(dbenv, rep, (dbenv, &mb,
00616                                     "sendpages: PAGE_FAIL on page %lu",
00617                                     (u_long)p));
00618                                 (void)__rep_send_message(dbenv, eid,
00619                                     REP_PAGE_FAIL, &lsn, &msgdbt, 0, 0);
00620                         } else
00621                                 ret = DB_NOTFOUND;
00622                         goto lockerr;
00623                 } else if (ret != 0)
00624                         goto lockerr;
00625                 else {
00626                         pgdbt.data = pagep;
00627                         pgdbt.size = (u_int32_t)msgfp->pgsize;
00628                 }
00629                 len = 0;
00630                 ret = __rep_fileinfo_buf(buf, msgsz, &len,
00631                     msgfp->pgsize, p, msgfp->max_pgno,
00632                     msgfp->filenum, msgfp->id, msgfp->type,
00633                     msgfp->flags, &msgfp->uid, &pgdbt);
00634                 if (msgfp->type != (u_int32_t)DB_QUEUE || p == 0)
00635                         t_ret = __memp_fput(mpf, pagep, 0);
00636 #ifdef HAVE_QUEUE
00637                 else
00638                         /*
00639                          * We don't need an #else for HAVE_QUEUE here because if
00640                          * we're not compiled with queue, then we're guaranteed
00641                          * to have set REP_PAGE_FAIL above.
00642                          */
00643                         t_ret = __qam_fput(qdbp, p, pagep, 0);
00644 #endif
00645                 if ((t_ret = __ENV_LPUT(dbenv, lock)) != 0 && ret == 0)
00646                         ret = t_ret;
00647                 if (ret != 0)
00648                         goto err;
00649 
00650                 DB_ASSERT(len <= msgsz);
00651                 msgdbt.data = buf;
00652                 msgdbt.size = (u_int32_t)len;
00653 
00654                 dblp = dbenv->lg_handle;
00655                 LOG_SYSTEM_LOCK(dbenv);
00656                 repth.lsn = ((LOG *)dblp->reginfo.primary)->lsn;
00657                 LOG_SYSTEM_UNLOCK(dbenv);
00658                 /*
00659                  * If we are configured for bulk, try to send this as a bulk
00660                  * request.  If not configured, or it is too big for bulk
00661                  * then just send normally.
00662                  */
00663                 if (use_bulk)
00664                         ret = __rep_bulk_message(dbenv, &bulk, &repth,
00665                             &repth.lsn, &msgdbt, 0);
00666                 if (!use_bulk || ret == DB_REP_BULKOVF)
00667                         ret = __rep_send_throttle(dbenv, eid, &repth, 0);
00668                 RPRINT(dbenv, rep, (dbenv, &mb,
00669                     "sendpages: %lu, lsn [%lu][%lu]", (u_long)p,
00670                     (u_long)repth.lsn.file, (u_long)repth.lsn.offset));
00671                 /*
00672                  * If we have REP_PAGE_MORE
00673                  * we need to break this loop after giving the page back
00674                  * to mpool.  Otherwise, with REP_PAGE, we keep going.
00675                  */
00676                 if (ret == 0)
00677                         ret = t_ret;
00678                 if (repth.type == REP_PAGE_MORE || ret != 0)
00679                         break;
00680         }
00681 
00682         if (0) {
00683 lockerr:        if ((t_ret = __ENV_LPUT(dbenv, lock)) != 0 && ret == 0)
00684                         ret = t_ret;
00685         }
00686 err:
00687         /*
00688          * We're done, force out whatever remains in the bulk buffer and
00689          * free it.
00690          */
00691         if (use_bulk && bulk.addr != NULL &&
00692             (t_ret = __rep_bulk_free(dbenv, &bulk, 0)) != 0 && ret == 0)
00693                 ret = t_ret;
00694         if (opened && (t_ret = __db_close(qdbp, NULL, DB_NOSYNC)) != 0 &&
00695             ret == 0)
00696                 ret = t_ret;
00697         if (buf != NULL)
00698                 __os_free(dbenv, buf);
00699         if (lockid != DB_LOCK_INVALIDID && (t_ret = __lock_id_free(dbenv,
00700             lockid)) != 0 && ret == 0)
00701                 ret = t_ret;
00702         return (ret);
00703 }
00704 
00705 /*
00706  * __rep_update_setup
00707  *      Process and setup with this file information.
00708  *
00709  * PUBLIC: int __rep_update_setup __P((DB_ENV *, int, REP_CONTROL *, DBT *));
00710  */
00711 int
00712 __rep_update_setup(dbenv, eid, rp, rec)
00713         DB_ENV *dbenv;
00714         int eid;
00715         REP_CONTROL *rp;
00716         DBT *rec;
00717 {
00718         DB_LOG *dblp;
00719         DB_REP *db_rep;
00720         DBT pagereq_dbt;
00721         LOG *lp;
00722         REGENV *renv;
00723         REGINFO *infop;
00724         REP *rep;
00725         __rep_update_args *rup;
00726         int ret;
00727         u_int32_t count, infolen;
00728         void *next;
00729 #ifdef DIAGNOSTIC
00730         __rep_fileinfo_args *msgfp;
00731         DB_MSGBUF mb;
00732 #endif
00733 
00734         db_rep = dbenv->rep_handle;
00735         rep = db_rep->region;
00736         dblp = dbenv->lg_handle;
00737         lp = dblp->reginfo.primary;
00738         ret = 0;
00739 
00740         REP_SYSTEM_LOCK(dbenv);
00741         if (!F_ISSET(rep, REP_F_RECOVER_UPDATE)) {
00742                 REP_SYSTEM_UNLOCK(dbenv);
00743                 return (0);
00744         }
00745         F_CLR(rep, REP_F_RECOVER_UPDATE);
00746         /*
00747          * We know we're the first to come in here due to the
00748          * REP_F_RECOVER_UPDATE flag.
00749          */
00750         F_SET(rep, REP_F_RECOVER_PAGE);
00751         /*
00752          * We do not clear REP_F_READY or rep->in_recovery in this code.
00753          * We'll eventually call the normal __rep_verify_match recovery
00754          * code and that will clear all the flags and allow others to
00755          * proceed.
00756          */
00757         if ((ret = __rep_lockout(dbenv, rep, 1)) != 0)
00758                 goto err;
00759         /*
00760          * We need to update the timestamp and kill any open handles
00761          * on this client.  The files are changing completely.
00762          */
00763         infop = dbenv->reginfo;
00764         renv = infop->primary;
00765         (void)time(&renv->rep_timestamp);
00766 
00767         REP_SYSTEM_UNLOCK(dbenv);
00768         MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00769         lp->wait_recs = rep->request_gap;
00770         lp->rcvd_recs = 0;
00771         ZERO_LSN(lp->ready_lsn);
00772         ZERO_LSN(lp->waiting_lsn);
00773         ZERO_LSN(lp->max_wait_lsn);
00774         ZERO_LSN(lp->max_perm_lsn);
00775         MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00776         if ((ret = __rep_update_read(dbenv, rec->data, &next, &rup)) != 0)
00777                 goto err_nolock;
00778 
00779         /*
00780          * We need to empty out any old log records that might be in the
00781          * temp database.
00782          */
00783         if ((ret = __db_truncate(db_rep->rep_db, NULL, &count)) != 0)
00784                 goto err_nolock;
00785 
00786         /*
00787          * We will remove all logs we have so we need to request
00788          * from the master's beginning.
00789          */
00790         REP_SYSTEM_LOCK(dbenv);
00791         rep->first_lsn = rup->first_lsn;
00792         rep->last_lsn = rp->lsn;
00793         rep->nfiles = rup->num_files;
00794         rep->curfile = 0;
00795         rep->ready_pg = 0;
00796         rep->npages = 0;
00797         rep->waiting_pg = PGNO_INVALID;
00798         rep->max_wait_pg = PGNO_INVALID;
00799 
00800         __os_free(dbenv, rup);
00801 
00802         RPRINT(dbenv, rep, (dbenv, &mb,
00803             "Update setup for %d files.", rep->nfiles));
00804         RPRINT(dbenv, rep, (dbenv, &mb, "Update setup:  First LSN [%lu][%lu].",
00805             (u_long)rep->first_lsn.file, (u_long)rep->first_lsn.offset));
00806         RPRINT(dbenv, rep, (dbenv, &mb, "Update setup:  Last LSN [%lu][%lu]",
00807             (u_long)rep->last_lsn.file, (u_long)rep->last_lsn.offset));
00808 
00809         infolen = rec->size - sizeof(__rep_update_args);
00810         if ((ret = __os_calloc(dbenv, 1, infolen, &rep->originfo)) != 0)
00811                 goto err;
00812         memcpy(rep->originfo, next, infolen);
00813         rep->finfo = rep->originfo;
00814         if ((ret = __rep_fileinfo_read(dbenv,
00815             rep->finfo, &next, &rep->curinfo)) != 0) {
00816                 RPRINT(dbenv, rep, (dbenv, &mb,
00817                     "Update setup: Fileinfo read: %s", db_strerror(ret)));
00818                 goto errmem1;
00819         }
00820         rep->nextinfo = next;
00821 
00822 #ifdef DIAGNOSTIC
00823         msgfp = rep->curinfo;
00824         DB_ASSERT(msgfp->pgno == 0);
00825 #endif
00826 
00827         /*
00828          * We want to create/open our dbp to the database
00829          * where we'll keep our page information.
00830          */
00831         if ((ret = __rep_client_dbinit(dbenv, 1, REP_PG)) != 0) {
00832                 RPRINT(dbenv, rep, (dbenv, &mb,
00833                     "Update setup: Client_dbinit %s", db_strerror(ret)));
00834                 goto errmem;
00835         }
00836 
00837         /*
00838          * We should get file info 'ready to go' to avoid data copies.
00839          */
00840         memset(&pagereq_dbt, 0, sizeof(pagereq_dbt));
00841         pagereq_dbt.data = rep->finfo;
00842         pagereq_dbt.size = (u_int32_t)((u_int8_t *)rep->nextinfo -
00843             (u_int8_t *)rep->finfo);
00844 
00845         RPRINT(dbenv, rep, (dbenv, &mb,
00846             "Update PAGE_REQ file 0: pgsize %lu, maxpg %lu",
00847             (u_long)rep->curinfo->pgsize,
00848             (u_long)rep->curinfo->max_pgno));
00849         /*
00850          * We set up pagereq_dbt as we went along.  Send it now.
00851          */
00852         (void)__rep_send_message(dbenv, eid, REP_PAGE_REQ,
00853             NULL, &pagereq_dbt, 0, DB_REP_ANYWHERE);
00854         if (0) {
00855 errmem:         __os_free(dbenv, rep->curinfo);
00856 errmem1:        __os_free(dbenv, rep->originfo);
00857                 rep->finfo = NULL;
00858                 rep->curinfo = NULL;
00859                 rep->originfo = NULL;
00860         }
00861 
00862         if (0) {
00863 err_nolock:     REP_SYSTEM_LOCK(dbenv);
00864         }
00865 
00866 err:    /*
00867          * If we get an error, we cannot leave ourselves in the RECOVER_PAGE
00868          * state because we have no file information.  That also means undo'ing
00869          * the rep_lockout.  We need to move back to the RECOVER_UPDATE stage.
00870          */
00871         if (ret != 0) {
00872                 RPRINT(dbenv, rep, (dbenv, &mb,
00873                     "Update_setup: Error: Clear PAGE, set UPDATE again. %s",
00874                     db_strerror(ret)));
00875                 F_CLR(rep, REP_F_RECOVER_PAGE | REP_F_READY);
00876                 rep->in_recovery = 0;
00877                 F_SET(rep, REP_F_RECOVER_UPDATE);
00878         }
00879         REP_SYSTEM_UNLOCK(dbenv);
00880         return (ret);
00881 }
00882 
00883 /*
00884  * __rep_bulk_page
00885  *      Process a bulk page message.
00886  *
00887  * PUBLIC: int __rep_bulk_page __P((DB_ENV *, int, REP_CONTROL *, DBT *));
00888  */
00889 int
00890 __rep_bulk_page(dbenv, eid, rp, rec)
00891         DB_ENV *dbenv;
00892         int eid;
00893         REP_CONTROL *rp;
00894         DBT *rec;
00895 {
00896         DB_REP *db_rep;
00897         DBT pgrec;
00898         REP *rep;
00899         REP_CONTROL tmprp;
00900         u_int32_t len;
00901         int ret;
00902         u_int8_t *p, *ep;
00903 #ifdef DIAGNOSTIC
00904         DB_MSGBUF mb;
00905 #endif
00906 
00907         memset(&pgrec, 0, sizeof(pgrec));
00908         /*
00909          * We're going to be modifying the rp LSN contents so make
00910          * our own private copy to play with.  We need to set the
00911          * rectype to REP_PAGE because we're calling through __rep_page
00912          * to process each page, and lower functions make decisions
00913          * based on the rectypes (for throttling/gap processing)
00914          */
00915         memcpy(&tmprp, rp, sizeof(tmprp));
00916         tmprp.rectype = REP_PAGE;
00917         ret = 0;
00918         db_rep = dbenv->rep_handle;
00919         rep = db_rep->region;
00920         for (ep = (u_int8_t *)rec->data + rec->size, p = (u_int8_t *)rec->data;
00921             p < ep; p += len) {
00922                 /*
00923                  * First thing in the buffer is the length.  Then the LSN
00924                  * of this page, then the page info itself.
00925                  */
00926                 memcpy(&len, p, sizeof(len));
00927                 p += sizeof(len);
00928                 memcpy(&tmprp.lsn, p, sizeof(DB_LSN));
00929                 p += sizeof(DB_LSN);
00930                 pgrec.data = p;
00931                 pgrec.size = len;
00932                 RPRINT(dbenv, rep, (dbenv, &mb,
00933                     "rep_bulk_page: Processing LSN [%lu][%lu]",
00934                     (u_long)tmprp.lsn.file, (u_long)tmprp.lsn.offset));
00935                 RPRINT(dbenv, rep, (dbenv, &mb,
00936     "rep_bulk_page: p %#lx ep %#lx pgrec data %#lx, size %lu (%#lx)",
00937                     P_TO_ULONG(p), P_TO_ULONG(ep), P_TO_ULONG(pgrec.data),
00938                     (u_long)pgrec.size, (u_long)pgrec.size));
00939                 /*
00940                  * Now send the page info DBT to the page processing function.
00941                  */
00942                 ret = __rep_page(dbenv, eid, &tmprp, &pgrec);
00943                 RPRINT(dbenv, rep, (dbenv, &mb,
00944                     "rep_bulk_page: rep_page ret %d", ret));
00945 
00946                 if (ret != 0)
00947                         break;
00948         }
00949         return (ret);
00950 }
00951 
00952 /*
00953  * __rep_page
00954  *      Process a page message.
00955  *
00956  * PUBLIC: int __rep_page __P((DB_ENV *, int, REP_CONTROL *, DBT *));
00957  */
00958 int
00959 __rep_page(dbenv, eid, rp, rec)
00960         DB_ENV *dbenv;
00961         int eid;
00962         REP_CONTROL *rp;
00963         DBT *rec;
00964 {
00965 
00966         DB_REP *db_rep;
00967         DBT key, data;
00968         REP *rep;
00969         __rep_fileinfo_args *msgfp;
00970         db_recno_t recno;
00971         int ret;
00972         void *next;
00973 #ifdef DIAGNOSTIC
00974         DB_MSGBUF mb;
00975 #endif
00976 
00977         ret = 0;
00978         db_rep = dbenv->rep_handle;
00979         rep = db_rep->region;
00980 
00981         REP_SYSTEM_LOCK(dbenv);
00982         if (!F_ISSET(rep, REP_F_RECOVER_PAGE)) {
00983                 REP_SYSTEM_UNLOCK(dbenv);
00984                 return (0);
00985         }
00986         if ((ret = __rep_fileinfo_read(dbenv, rec->data, &next, &msgfp)) != 0) {
00987                 REP_SYSTEM_UNLOCK(dbenv);
00988                 return (ret);
00989         }
00990         RPRINT(dbenv, rep, (dbenv, &mb,
00991             "PAGE: Received page %lu from file %d",
00992             (u_long)msgfp->pgno, msgfp->filenum));
00993         /*
00994          * Check if this page is from the file we're expecting.
00995          * This may be an old or delayed page message.
00996          */
00997         /*
00998          * !!!
00999          * If we allow dbrename/dbremove on the master while a client
01000          * is updating, then we'd have to verify the file's uid here too.
01001          */
01002         if (msgfp->filenum != rep->curfile) {
01003                 RPRINT(dbenv, rep,
01004                     (dbenv, &mb, "Msg file %d != curfile %d",
01005                     msgfp->filenum, rep->curfile));
01006                 goto err;
01007         }
01008         /*
01009          * We want to create/open our dbp to the database
01010          * where we'll keep our page information.
01011          */
01012         if ((ret = __rep_client_dbinit(dbenv, 1, REP_PG)) != 0)
01013                 goto err;
01014 
01015         REP_SYSTEM_UNLOCK(dbenv);
01016         memset(&key, 0, sizeof(key));
01017         memset(&data, 0, sizeof(data));
01018         recno = (db_recno_t)(msgfp->pgno + 1);
01019         key.data = &recno;
01020         key.ulen = key.size = sizeof(db_recno_t);
01021         key.flags = DB_DBT_USERMEM;
01022 
01023         /*
01024          * If we already have this page, then we don't want to bother
01025          * rewriting it into the file.  Otherwise, any other error
01026          * we want to return.
01027          */
01028         ret = __db_put(rep->file_dbp, NULL, &key, &data, DB_NOOVERWRITE);
01029         if (ret == DB_KEYEXIST) {
01030                 RPRINT(dbenv, rep, (dbenv, &mb,
01031                     "PAGE: Received duplicate page %lu from file %d",
01032                     (u_long)msgfp->pgno, msgfp->filenum));
01033                 rep->stat.st_pg_duplicated++;
01034                 ret = 0;
01035                 goto err_nolock;
01036         }
01037         if (ret != 0)
01038                 goto err_nolock;
01039 
01040         RPRINT(dbenv, rep, (dbenv, &mb,
01041             "PAGE: Write page %lu into mpool", (u_long)msgfp->pgno));
01042         REP_SYSTEM_LOCK(dbenv);
01043         /*
01044          * We put the page in the database file itself.
01045          */
01046         ret = __rep_write_page(dbenv, rep, msgfp);
01047         if (ret != 0) {
01048                 /*
01049                  * We got an error storing the page, therefore, we need
01050                  * remove this page marker from the page database too.
01051                  * !!!
01052                  * I'm ignoring errors from the delete because we want to
01053                  * return the original error.  If we cannot write the page
01054                  * and we cannot delete the item we just put, what should
01055                  * we do?  Panic the env and return DB_RUNRECOVERY?
01056                  */
01057                 (void)__db_del(rep->file_dbp, NULL, &key, 0);
01058                 goto err;
01059         }
01060         rep->stat.st_pg_records++;
01061         rep->npages++;
01062 
01063         /*
01064          * Now check the LSN on the page and save it if it is later
01065          * than the one we have.
01066          */
01067         if (log_compare(&rp->lsn, &rep->last_lsn) > 0)
01068                 rep->last_lsn = rp->lsn;
01069 
01070         /*
01071          * We've successfully written the page.  Now we need to see if
01072          * we're done with this file.  __rep_filedone will check if we
01073          * have all the pages expected and if so, set up for the next
01074          * file and send out a page request for the next file's pages.
01075          */
01076         ret = __rep_filedone(dbenv, eid, rep, msgfp, rp->rectype);
01077 
01078 err:    REP_SYSTEM_UNLOCK(dbenv);
01079 
01080 err_nolock:
01081         __os_free(dbenv, msgfp);
01082         return (ret);
01083 }
01084 
01085 /*
01086  * __rep_page_fail
01087  *      Process a page fail message.
01088  *
01089  * PUBLIC: int __rep_page_fail __P((DB_ENV *, int, DBT *));
01090  */
01091 int
01092 __rep_page_fail(dbenv, eid, rec)
01093         DB_ENV *dbenv;
01094         int eid;
01095         DBT *rec;
01096 {
01097 
01098         DB_REP *db_rep;
01099         REP *rep;
01100         __rep_fileinfo_args *msgfp, *rfp;
01101         int ret;
01102         void *next;
01103 #ifdef DIAGNOSTIC
01104         DB_MSGBUF mb;
01105 #endif
01106 
01107         ret = 0;
01108         db_rep = dbenv->rep_handle;
01109         rep = db_rep->region;
01110 
01111         REP_SYSTEM_LOCK(dbenv);
01112         if (!F_ISSET(rep, REP_F_RECOVER_PAGE)) {
01113                 REP_SYSTEM_UNLOCK(dbenv);
01114                 return (0);
01115         }
01116         if ((ret = __rep_fileinfo_read(dbenv, rec->data, &next, &msgfp)) != 0) {
01117                 REP_SYSTEM_UNLOCK(dbenv);
01118                 return (ret);
01119         }
01120         /*
01121          * Check if this page is from the file we're expecting.
01122          * This may be an old or delayed page message.
01123          */
01124         /*
01125          * !!!
01126          * If we allow dbrename/dbremove on the master while a client
01127          * is updating, then we'd have to verify the file's uid here too.
01128          */
01129         if (msgfp->filenum != rep->curfile) {
01130                 RPRINT(dbenv, rep, (dbenv, &mb, "Msg file %d != curfile %d",
01131                     msgfp->filenum, rep->curfile));
01132                 REP_SYSTEM_UNLOCK(dbenv);
01133                 return (0);
01134         }
01135         rfp = rep->curinfo;
01136         if (rfp->type != (u_int32_t)DB_QUEUE)
01137                 --rfp->max_pgno;
01138         else {
01139                 /*
01140                  * Queue is special.  Pages at the beginning of the queue
01141                  * may disappear, as well as at the end.  Use msgfp->pgno
01142                  * to adjust accordingly.
01143                  */
01144                 RPRINT(dbenv, rep, (dbenv, &mb,
01145             "page_fail: BEFORE page %lu failed. ready %lu, max %lu, npages %d",
01146                     (u_long)msgfp->pgno, (u_long)rep->ready_pg,
01147                     (u_long)rfp->max_pgno, rep->npages));
01148                 if (msgfp->pgno == rfp->max_pgno)
01149                         --rfp->max_pgno;
01150                 if (msgfp->pgno >= rep->ready_pg) {
01151                         rep->ready_pg = msgfp->pgno + 1;
01152                         rep->npages = rep->ready_pg;
01153                 }
01154                 RPRINT(dbenv, rep, (dbenv, &mb,
01155             "page_fail: AFTER page %lu failed. ready %lu, max %lu, npages %d",
01156                     (u_long)msgfp->pgno, (u_long)rep->ready_pg,
01157                     (u_long)rfp->max_pgno, rep->npages));
01158         }
01159         /*
01160          * We've lowered the number of pages expected.  It is possible that
01161          * this was the last page we were expecting.  Now we need to see if
01162          * we're done with this file.  __rep_filedone will check if we have
01163          * all the pages expected and if so, set up for the next file and
01164          * send out a page request for the next file's pages.
01165          */
01166         ret = __rep_filedone(dbenv, eid, rep, msgfp, REP_PAGE_FAIL);
01167         REP_SYSTEM_UNLOCK(dbenv);
01168         return (ret);
01169 }
01170 
01171 /*
01172  * __rep_write_page -
01173  *      Write this page into a database.
01174  */
01175 static int
01176 __rep_write_page(dbenv, rep, msgfp)
01177         DB_ENV *dbenv;
01178         REP *rep;
01179         __rep_fileinfo_args *msgfp;
01180 {
01181         __rep_fileinfo_args *rfp;
01182         DB_FH *rfh;
01183         int ret;
01184         void *dst;
01185         char *real_name;
01186 
01187         real_name = NULL;
01188         rfp = NULL;
01189 
01190         /*
01191          * If this is the first page we're putting in this database, we need
01192          * to create the mpool file.  Otherwise call memp_fget to create the
01193          * page in mpool.  Then copy the data to the page, and memp_fput the
01194          * page to give it back to mpool.
01195          *
01196          * We need to create the file, removing any existing file and associate
01197          * the correct file ID with the new one.
01198          */
01199         rfp = rep->curinfo;
01200         if (rep->file_mpf == NULL) {
01201                 if (!F_ISSET(rfp, DB_AM_INMEM)) {
01202                         if ((ret = __db_appname(dbenv, DB_APP_DATA,
01203                             rfp->info.data, 0, NULL, &real_name)) != 0)
01204                                 goto err;
01205                         /*
01206                          * Calling memp_nameop will both purge any matching
01207                          * fileid from mpool and unlink it on disk.
01208                          */
01209                         if ((ret = __memp_nameop(dbenv,
01210                             rfp->uid.data, NULL, real_name, NULL, 0)) != 0)
01211                                 goto err;
01212                         /*
01213                          * Create the file on disk.  We'll be putting the data
01214                          * into the file via mpool.
01215                          */
01216                         if ((ret = __os_open(dbenv, real_name,
01217                             DB_OSO_CREATE, dbenv->db_mode, &rfh)) == 0)
01218                                 ret = __os_closehandle(dbenv, rfh);
01219                         if (ret != 0)
01220                                 goto err;
01221                 }
01222 
01223                 if ((ret =
01224                     __rep_mpf_open(dbenv, &rep->file_mpf, rep->curinfo,
01225                     F_ISSET(rfp, DB_AM_INMEM) ? DB_CREATE : 0)) != 0)
01226                         goto err;
01227         }
01228         /*
01229          * Handle queue specially.  If we're a QUEUE database, we need to
01230          * use the __qam_fget/put calls.  We need to use rep->queue_dbp for
01231          * that.  That dbp is opened after getting the metapage for the
01232          * queue database.  Since the meta-page is always in the queue file,
01233          * we'll use the normal path for that first page.  After that we
01234          * can assume the dbp is opened.
01235          */
01236         if (msgfp->type == (u_int32_t)DB_QUEUE && msgfp->pgno != 0) {
01237 #ifdef HAVE_QUEUE
01238                 if ((ret = __qam_fget(
01239                     rep->queue_dbp, &msgfp->pgno, DB_MPOOL_CREATE, &dst)) != 0)
01240                         goto err;
01241 #else
01242                 /*
01243                  * This always returns an error.
01244                  */
01245                 ret = __db_no_queue_am(dbenv);
01246                 goto err;
01247 #endif
01248         } else if ((ret = __memp_fget(
01249                     rep->file_mpf, &msgfp->pgno, DB_MPOOL_CREATE, &dst)) != 0)
01250                         goto err;
01251 
01252         memcpy(dst, msgfp->info.data, msgfp->pgsize);
01253         if (msgfp->type != (u_int32_t)DB_QUEUE || msgfp->pgno == 0)
01254                 ret = __memp_fput(rep->file_mpf, dst, DB_MPOOL_DIRTY);
01255 #ifdef HAVE_QUEUE
01256         else
01257                 ret = __qam_fput(rep->queue_dbp, msgfp->pgno, dst,
01258                     DB_MPOOL_DIRTY);
01259 #endif
01260 
01261 err:    if (real_name != NULL)
01262                  __os_free(dbenv, real_name);
01263         return (ret);
01264 }
01265 
01266 /*
01267  * __rep_page_gap -
01268  *      After we've put the page into the database, we need to check if
01269  *      we have a page gap and whether we need to request pages.
01270  */
01271 static int
01272 __rep_page_gap(dbenv, rep, msgfp, type)
01273         DB_ENV *dbenv;
01274         REP *rep;
01275         __rep_fileinfo_args *msgfp;
01276         u_int32_t type;
01277 {
01278         DB_LOG *dblp;
01279         DBT data, key;
01280         LOG *lp;
01281         __rep_fileinfo_args *rfp;
01282         db_recno_t recno;
01283         int ret;
01284 #ifdef DIAGNOSTIC
01285         DB_MSGBUF mb;
01286 #endif
01287 
01288         dblp = dbenv->lg_handle;
01289         lp = dblp->reginfo.primary;
01290         ret = 0;
01291 
01292         /*
01293          * We've successfully put this page into our file.
01294          * Now we need to account for it and re-request new pages
01295          * if necessary.
01296          */
01297         /*
01298          * We already hold the rep mutex, but we also need the db mutex.
01299          * So we need to drop it, acquire both in the right order and
01300          * then recheck the state of the world.
01301          */
01302         REP_SYSTEM_UNLOCK(dbenv);
01303         MUTEX_LOCK(dbenv, rep->mtx_clientdb);
01304         REP_SYSTEM_LOCK(dbenv);
01305         rfp = rep->curinfo;
01306 
01307         /*
01308          * Make sure we're still talking about the same file.
01309          * If not, we're done here.
01310          */
01311         if (rfp->filenum != msgfp->filenum) {
01312                 ret = DB_REP_PAGEDONE;
01313                 goto err;
01314         }
01315 
01316         /*
01317          * We have 3 possible states:
01318          * 1.  We receive a page we already have.
01319          *      msg pgno < ready pgno
01320          * 2.  We receive a page that is beyond a gap.
01321          *      msg pgno > ready pgno
01322          * 3.  We receive the page we're expecting.
01323          *      msg pgno == ready pgno
01324          */
01325         /*
01326          * State 1.  This should not happen because this function
01327          * should only be called once per page received because we
01328          * check for DB_KEY_EXIST when we save the page information.
01329          */
01330         DB_ASSERT(msgfp->pgno >= rep->ready_pg);
01331 
01332         /*
01333          * State 2.  This page is beyond the page we're expecting.
01334          * We need to update waiting_pg if this page is less than
01335          * (earlier) the current waiting_pg.  There is nothing
01336          * to do but see if we need to request.
01337          */
01338         RPRINT(dbenv, rep, (dbenv, &mb,
01339     "PAGE_GAP: pgno %lu, max_pg %lu ready %lu, waiting %lu max_wait %lu",
01340             (u_long)msgfp->pgno, (u_long)rfp->max_pgno, (u_long)rep->ready_pg,
01341             (u_long)rep->waiting_pg, (u_long)rep->max_wait_pg));
01342         if (msgfp->pgno > rep->ready_pg) {
01343                 if (rep->waiting_pg == PGNO_INVALID ||
01344                     msgfp->pgno < rep->waiting_pg)
01345                         rep->waiting_pg = msgfp->pgno;
01346         } else {
01347                 /*
01348                  * We received the page we're expecting.
01349                  */
01350                 rep->ready_pg++;
01351                 lp->rcvd_recs = 0;
01352                 while (ret == 0 && rep->ready_pg == rep->waiting_pg) {
01353                         /*
01354                          * If we get here we know we just filled a gap.
01355                          */
01356                         lp->wait_recs = 0;
01357                         lp->rcvd_recs = 0;
01358                         rep->max_wait_pg = PGNO_INVALID;
01359                         /*
01360                          * We need to walk the recno database looking for the
01361                          * next page we need or expect.
01362                          */
01363                         memset(&key, 0, sizeof(key));
01364                         memset(&data, 0, sizeof(data));
01365                         recno = (db_recno_t)rep->ready_pg;
01366                         key.data = &recno;
01367                         key.ulen = key.size = sizeof(db_recno_t);
01368                         key.flags = DB_DBT_USERMEM;
01369                         ret = __db_get(rep->file_dbp, NULL, &key, &data, 0);
01370                         if (ret == DB_NOTFOUND || ret == DB_KEYEMPTY)
01371                                 break;
01372                         else if (ret != 0)
01373                                 goto err;
01374                         rep->ready_pg++;
01375                 }
01376         }
01377 
01378         /*
01379          * If we filled a gap and now have the entire file, there's
01380          * nothing to do.  We're done when ready_pg is > max_pgno
01381          * because ready_pg is larger than the last page we received.
01382          */
01383         if (rep->ready_pg > rfp->max_pgno)
01384                 goto err;
01385 
01386         /*
01387          * Check if we need to ask for more pages.
01388          */
01389         if ((rep->waiting_pg != PGNO_INVALID &&
01390             rep->ready_pg != rep->waiting_pg) || type == REP_PAGE_MORE) {
01391                 /*
01392                  * We got a page but we may still be waiting for more.
01393                  */
01394                 if (lp->wait_recs == 0) {
01395                         /*
01396                          * This is a new gap. Initialize the number of
01397                          * records that we should wait before requesting
01398                          * that it be resent.  We grab the limits out of
01399                          * the rep without the mutex.
01400                          */
01401                         lp->wait_recs = rep->request_gap;
01402                         lp->rcvd_recs = 0;
01403                         rep->max_wait_pg = PGNO_INVALID;
01404                 }
01405                 /*
01406                  * If we got REP_PAGE_MORE we always want to ask for more.
01407                  */
01408                 if ((__rep_check_doreq(dbenv, rep) || type == REP_PAGE_MORE) &&
01409                     ((ret = __rep_pggap_req(dbenv, rep, rfp,
01410                     (type == REP_PAGE_MORE) ? REP_GAP_FORCE : 0)) != 0))
01411                         goto err;
01412         } else {
01413                 lp->wait_recs = 0;
01414                 rep->max_wait_pg = PGNO_INVALID;
01415         }
01416 
01417 err:
01418         MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
01419         return (ret);
01420 }
01421 
01422 /*
01423  * __rep_init_cleanup -
01424  *      Clean up internal initialization pieces.
01425  *
01426  * PUBLIC: int __rep_init_cleanup __P((DB_ENV *, REP *, int));
01427  */
01428 int
01429 __rep_init_cleanup(dbenv, rep, force)
01430         DB_ENV *dbenv;
01431         REP *rep;
01432         int force;
01433 {
01434         int ret, t_ret;
01435 
01436         ret = 0;
01437         /*
01438          * 1.  Close up the file data pointer we used.
01439          * 2.  Close/reset the page database.
01440          * 3.  Close/reset the queue database if we're forcing a cleanup.
01441          * 4.  Free current file info.
01442          * 5.  If we have all files or need to force, free original file info.
01443          */
01444         if (rep->file_mpf != NULL) {
01445                 ret = __memp_fclose(rep->file_mpf, 0);
01446                 rep->file_mpf = NULL;
01447         }
01448         if (rep->file_dbp != NULL) {
01449                 t_ret = __db_close(rep->file_dbp, NULL, DB_NOSYNC);
01450                 rep->file_dbp = NULL;
01451                 if (t_ret != 0 && ret == 0)
01452                         ret = t_ret;
01453         }
01454         if (force && rep->queue_dbp != NULL) {
01455                 t_ret = __db_close(rep->queue_dbp, NULL, DB_NOSYNC);
01456                 rep->queue_dbp = NULL;
01457                 if (t_ret != 0 && ret == 0)
01458                         ret = t_ret;
01459         }
01460         if (rep->curinfo != NULL) {
01461                 __os_free(dbenv, rep->curinfo);
01462                 rep->curinfo = NULL;
01463         }
01464         if (rep->originfo != NULL &&
01465             (force || ++rep->curfile == rep->nfiles)) {
01466                 __os_free(dbenv, rep->originfo);
01467                 rep->originfo = NULL;
01468         }
01469         return (ret);
01470 }
01471 
01472 /*
01473  * __rep_filedone -
01474  *      We need to check if we're done with the current file after
01475  *      processing the current page.  Stat the database to see if
01476  *      we have all the pages.  If so, we need to clean up/close
01477  *      this one, set up for the next one, and ask for its pages,
01478  *      or if this is the last file, request the log records and
01479  *      move to the REP_RECOVER_LOG state.
01480  */
01481 static int
01482 __rep_filedone(dbenv, eid, rep, msgfp, type)
01483         DB_ENV *dbenv;
01484         int eid;
01485         REP *rep;
01486         __rep_fileinfo_args *msgfp;
01487         u_int32_t type;
01488 {
01489         DBT dbt;
01490         __rep_fileinfo_args *rfp;
01491         int ret;
01492 #ifdef DIAGNOSTIC
01493         DB_MSGBUF mb;
01494 #endif
01495 
01496         /*
01497          * We've put our page, now we need to do any gap processing
01498          * that might be needed to re-request pages.
01499          */
01500         ret = __rep_page_gap(dbenv, rep, msgfp, type);
01501         /*
01502          * The world changed while we were doing gap processing.
01503          * We're done here.
01504          */
01505         if (ret == DB_REP_PAGEDONE)
01506                 return (0);
01507 
01508         rfp = rep->curinfo;
01509         /*
01510          * max_pgno is 0-based and npages is 1-based, so we don't have
01511          * all the pages until npages is > max_pgno.
01512          */
01513         RPRINT(dbenv, rep, (dbenv, &mb, "FILEDONE: have %lu pages. Need %lu.",
01514             (u_long)rep->npages, (u_long)rfp->max_pgno + 1));
01515         if (rep->npages <= rfp->max_pgno)
01516                 return (0);
01517 
01518         /*
01519          * If we're queue and we think we have all the pages for this file,
01520          * we need to do special queue processing.  Queue is handled in
01521          * several stages.
01522          */
01523         if (rfp->type == (u_int32_t)DB_QUEUE &&
01524             ((ret = __rep_queue_filedone(dbenv, rep, rfp)) !=
01525             DB_REP_PAGEDONE))
01526                 return (ret);
01527         /*
01528          * We have all the pages for this file.  Clean up.
01529          */
01530         if ((ret = __rep_init_cleanup(dbenv, rep, 0)) != 0)
01531                 goto err;
01532         if (rep->curfile == rep->nfiles) {
01533                 RPRINT(dbenv, rep, (dbenv, &mb,
01534                     "FILEDONE: have %d files.  RECOVER_LOG now", rep->nfiles));
01535                 /*
01536                  * Move to REP_RECOVER_LOG state.
01537                  * Request logs.
01538                  */
01539                 /*
01540                  * We need to do a sync here so that any later opens
01541                  * can find the file and file id.  We need to do it
01542                  * before we clear REP_F_RECOVER_PAGE so that we do not
01543                  * try to flush the log.
01544                  */
01545                 if ((ret = __memp_sync(dbenv, NULL)) != 0)
01546                         goto err;
01547                 F_CLR(rep, REP_F_RECOVER_PAGE);
01548                 F_SET(rep, REP_F_RECOVER_LOG);
01549                 memset(&dbt, 0, sizeof(dbt));
01550                 dbt.data = &rep->last_lsn;
01551                 dbt.size = sizeof(rep->last_lsn);
01552                 REP_SYSTEM_UNLOCK(dbenv);
01553                 if ((ret = __rep_log_setup(dbenv, rep)) != 0)
01554                         goto err;
01555                 RPRINT(dbenv, rep, (dbenv, &mb,
01556                     "FILEDONE: LOG_REQ from LSN [%lu][%lu] to [%lu][%lu]",
01557                     (u_long)rep->first_lsn.file, (u_long)rep->first_lsn.offset,
01558                     (u_long)rep->last_lsn.file, (u_long)rep->last_lsn.offset));
01559                 (void)__rep_send_message(dbenv, eid,
01560                     REP_LOG_REQ, &rep->first_lsn, &dbt, 0, DB_REP_ANYWHERE);
01561                 REP_SYSTEM_LOCK(dbenv);
01562                 return (0);
01563         }
01564 
01565         /*
01566          * 4.  If not, set curinfo to next file and request its pages.
01567          */
01568         rep->finfo = rep->nextinfo;
01569         if ((ret = __rep_fileinfo_read(dbenv, rep->finfo, &rep->nextinfo,
01570             &rep->curinfo)) != 0)
01571                 goto err;
01572         DB_ASSERT(rep->curinfo->pgno == 0);
01573         rep->ready_pg = 0;
01574         rep->npages = 0;
01575         rep->waiting_pg = PGNO_INVALID;
01576         rep->max_wait_pg = PGNO_INVALID;
01577         memset(&dbt, 0, sizeof(dbt));
01578         RPRINT(dbenv, rep, (dbenv, &mb,
01579             "FILEDONE: Next file %d.  Request pages 0 to %lu",
01580             rep->curinfo->filenum, (u_long)rep->curinfo->max_pgno));
01581         dbt.data = rep->finfo;
01582         dbt.size = (u_int32_t)((u_int8_t *)rep->nextinfo -
01583             (u_int8_t *)rep->finfo);
01584         (void)__rep_send_message(dbenv, eid, REP_PAGE_REQ,
01585             NULL, &dbt, 0, DB_REP_ANYWHERE);
01586 err:
01587         return (ret);
01588 }
01589 
01590 /*
01591  * __rep_mpf_open -
01592  *      Create and open the mpool file for a database.
01593  *      Used by both master and client to bring files into mpool.
01594  */
01595 static int
01596 __rep_mpf_open(dbenv, mpfp, rfp, flags)
01597         DB_ENV *dbenv;
01598         DB_MPOOLFILE **mpfp;
01599         __rep_fileinfo_args *rfp;
01600         u_int32_t flags;
01601 {
01602         DB db;
01603         int ret;
01604 
01605         if ((ret = __memp_fcreate(dbenv, mpfp)) != 0)
01606                 return (ret);
01607 
01608         /*
01609          * We need a dbp to pass into to __db_dbenv_mpool.  Set up
01610          * only the parts that it needs.
01611          */
01612         db.dbenv = dbenv;
01613         db.type = (DBTYPE)rfp->type;
01614         db.pgsize = rfp->pgsize;
01615         memcpy(db.fileid, rfp->uid.data, DB_FILE_ID_LEN);
01616         db.flags = rfp->flags;
01617         /* We need to make sure the dbp isn't marked open. */
01618         F_CLR(&db, DB_AM_OPEN_CALLED);
01619         db.mpf = *mpfp;
01620         if (F_ISSET(&db, DB_AM_INMEM))
01621                 (void)__memp_set_flags(db.mpf, DB_MPOOL_NOFILE, 1);
01622         if ((ret = __db_dbenv_mpool(&db, rfp->info.data, flags)) != 0) {
01623                 (void)__memp_fclose(*mpfp, 0);
01624                 *mpfp = NULL;
01625         }
01626         return (ret);
01627 }
01628 
01629 /*
01630  * __rep_pggap_req -
01631  *      Request a page gap.  Assumes the caller holds the rep_mutex.
01632  *
01633  * PUBLIC: int __rep_pggap_req __P((DB_ENV *, REP *, __rep_fileinfo_args *,
01634  * PUBLIC:    u_int32_t));
01635  */
01636 int
01637 __rep_pggap_req(dbenv, rep, reqfp, gapflags)
01638         DB_ENV *dbenv;
01639         REP *rep;
01640         __rep_fileinfo_args *reqfp;
01641         u_int32_t gapflags;
01642 {
01643         DBT max_pg_dbt;
01644         __rep_fileinfo_args *tmpfp;
01645         size_t len;
01646         u_int32_t flags;
01647         int alloc, ret;
01648 
01649         ret = 0;
01650         alloc = 0;
01651         /*
01652          * There is a window where we have to set REP_RECOVER_PAGE when
01653          * we receive the update information to transition from getting
01654          * file information to getting page information.  However, that
01655          * thread does release and then reacquire mutexes.  So, we might
01656          * try re-requesting before the original thread can get curinfo
01657          * setup.  If curinfo isn't set up there is nothing to do.
01658          */
01659         if (rep->curinfo == NULL)
01660                 return (0);
01661         if (reqfp == NULL) {
01662                 if ((ret = __rep_finfo_alloc(dbenv, rep->curinfo, &tmpfp)) != 0)
01663                         return (ret);
01664                 alloc = 1;
01665         } else
01666                 tmpfp = reqfp;
01667 
01668         /*
01669          * If we've never requested this page, then
01670          * request everything between it and the first
01671          * page we have.  If we have requested this page
01672          * then only request this record, not the entire gap.
01673          */
01674         flags = 0;
01675         memset(&max_pg_dbt, 0, sizeof(max_pg_dbt));
01676         tmpfp->pgno = rep->ready_pg;
01677         max_pg_dbt.data = rep->finfo;
01678         max_pg_dbt.size = (u_int32_t)((u_int8_t *)rep->nextinfo -
01679             (u_int8_t *)rep->finfo);
01680         if (rep->max_wait_pg == PGNO_INVALID ||
01681             FLD_ISSET(gapflags, REP_GAP_FORCE | REP_GAP_REREQUEST)) {
01682                 /*
01683                  * Request the gap - set max to waiting_pg - 1 or if
01684                  * there is no waiting_pg, just ask for one.
01685                  */
01686                 if (rep->waiting_pg == PGNO_INVALID) {
01687                         if (FLD_ISSET(gapflags,
01688                             REP_GAP_FORCE | REP_GAP_REREQUEST))
01689                                 rep->max_wait_pg = rep->curinfo->max_pgno;
01690                         else
01691                                 rep->max_wait_pg = rep->ready_pg;
01692                 } else
01693                         rep->max_wait_pg = rep->waiting_pg - 1;
01694                 tmpfp->max_pgno = rep->max_wait_pg;
01695                 /*
01696                  * Gap requests are "new" and can go anywhere.
01697                  */
01698                 if (FLD_ISSET(gapflags, REP_GAP_REREQUEST))
01699                         flags = DB_REP_REREQUEST;
01700                 else
01701                         flags = DB_REP_ANYWHERE;
01702         } else {
01703                 /*
01704                  * Request 1 page - set max to ready_pg.
01705                  */
01706                 rep->max_wait_pg = rep->ready_pg;
01707                 tmpfp->max_pgno = rep->ready_pg;
01708                 /*
01709                  * If we're dropping to singletons, this is a rerequest.
01710                  */
01711                 flags = DB_REP_REREQUEST;
01712         }
01713         if (rep->master_id != DB_EID_INVALID) {
01714                 rep->stat.st_pg_requested++;
01715                 /*
01716                  * We need to request the pages, but we need to get the
01717                  * new info into rep->finfo.  Assert that the sizes never
01718                  * change.  The only thing this should do is change
01719                  * the pgno field.  Everything else remains the same.
01720                  */
01721                 ret = __rep_fileinfo_buf(rep->finfo, max_pg_dbt.size, &len,
01722                     tmpfp->pgsize, tmpfp->pgno, tmpfp->max_pgno,
01723                     tmpfp->filenum, tmpfp->id, tmpfp->type,
01724                     tmpfp->flags, &tmpfp->uid, &tmpfp->info);
01725                 DB_ASSERT(len == max_pg_dbt.size);
01726                 (void)__rep_send_message(dbenv, rep->master_id,
01727                     REP_PAGE_REQ, NULL, &max_pg_dbt, 0, flags);
01728         } else
01729                 (void)__rep_send_message(dbenv, DB_EID_BROADCAST,
01730                     REP_MASTER_REQ, NULL, NULL, 0, 0);
01731 
01732         if (alloc)
01733                 __os_free(dbenv, tmpfp);
01734         return (ret);
01735 }
01736 
01737 /*
01738  * __rep_finfo_alloc -
01739  *      Allocate and initialize a fileinfo structure.
01740  *
01741  * PUBLIC: int __rep_finfo_alloc __P((DB_ENV *, __rep_fileinfo_args *,
01742  * PUBLIC:    __rep_fileinfo_args **));
01743  */
01744 int
01745 __rep_finfo_alloc(dbenv, rfpsrc, rfpp)
01746         DB_ENV *dbenv;
01747         __rep_fileinfo_args *rfpsrc, **rfpp;
01748 {
01749         __rep_fileinfo_args *rfp;
01750         size_t size;
01751         int ret;
01752         void *uidp, *infop;
01753 
01754         /*
01755          * Allocate enough for the structure and the two DBT data areas.
01756          */
01757         size = sizeof(__rep_fileinfo_args) + rfpsrc->uid.size +
01758             rfpsrc->info.size;
01759         if ((ret = __os_malloc(dbenv, size, &rfp)) != 0)
01760                 return (ret);
01761 
01762         /*
01763          * Copy the structure itself, and then set the DBT data pointers
01764          * to their space and copy the data itself as well.
01765          */
01766         memcpy(rfp, rfpsrc, sizeof(__rep_fileinfo_args));
01767         uidp = (u_int8_t *)rfp + sizeof(__rep_fileinfo_args);
01768         rfp->uid.data = uidp;
01769         memcpy(uidp, rfpsrc->uid.data, rfpsrc->uid.size);
01770 
01771         infop = (u_int8_t *)uidp + rfpsrc->uid.size;
01772         rfp->info.data = infop;
01773         memcpy(infop, rfpsrc->info.data, rfpsrc->info.size);
01774         *rfpp = rfp;
01775         return (ret);
01776 }
01777 
01778 /*
01779  * __rep_log_setup -
01780  *      We know our first LSN and need to reset the log subsystem
01781  *      to get our logs set up for the proper file.
01782  */
01783 static int
01784 __rep_log_setup(dbenv, rep)
01785         DB_ENV *dbenv;
01786         REP *rep;
01787 {
01788         DB_LOG *dblp;
01789         DB_LSN lsn;
01790         DB_TXNMGR *mgr;
01791         DB_TXNREGION *region;
01792         LOG *lp;
01793         u_int32_t fnum, lastfile;
01794         int ret;
01795         char *name;
01796 
01797         dblp = dbenv->lg_handle;
01798         lp = dblp->reginfo.primary;
01799         mgr = dbenv->tx_handle;
01800         region = mgr->reginfo.primary;
01801 
01802         /*
01803          * Forcibly remove *all* existing log files.
01804          */
01805         lastfile = lp->lsn.file;
01806         for (fnum = 1; fnum <= lastfile; fnum++) {
01807                 if ((ret = __log_name(dblp, fnum, &name, NULL, 0)) != 0)
01808                         goto err;
01809                 (void)__os_unlink(dbenv, name);
01810                 __os_free(dbenv, name);
01811         }
01812         /*
01813          * Set up the log starting at the file number of the first LSN we
01814          * need to get from the master.
01815          */
01816         ret = __log_newfile(dblp, &lsn, rep->first_lsn.file);
01817 
01818         /*
01819          * We reset first_lsn to the lp->lsn.  We were given the LSN of
01820          * the checkpoint and we now need the LSN for the beginning of
01821          * the file, which __log_newfile conveniently set up for us
01822          * in lp->lsn.
01823          */
01824         rep->first_lsn = lp->lsn;
01825         TXN_SYSTEM_LOCK(dbenv);
01826         ZERO_LSN(region->last_ckp);
01827         TXN_SYSTEM_UNLOCK(dbenv);
01828 err:
01829         return (ret);
01830 }
01831 
01832 /*
01833  * __rep_queue_filedone -
01834  *      Determine if we're really done getting the pages for a queue file.
01835  *      Queue is handled in several steps.
01836  *      1.  First we get the meta page only.
01837  *      2.  We use the meta-page information to figure out first and last
01838  *          page numbers (and if queue wraps, first can be > last.
01839  *      3.  If first < last, we do a REP_PAGE_REQ for all pages.
01840  *      4.  If first > last, we REP_PAGE_REQ from first -> max page number.
01841  *          Then we'll ask for page 1 -> last.
01842  *
01843  * This function can return several things:
01844  *      DB_REP_PAGEDONE - if we're done with this file.
01845  *      0 - if we're not doen with this file.
01846  *      error - if we get an error doing some operations.
01847  *
01848  * This function will open a dbp handle to the queue file.  This is needed
01849  * by most of the QAM macros.  We'll open it on the first pass through
01850  * here and we'll close it whenever we decide we're done.
01851  */
01852 static int
01853 __rep_queue_filedone(dbenv, rep, rfp)
01854         DB_ENV *dbenv;
01855         REP *rep;
01856         __rep_fileinfo_args *rfp;
01857 {
01858 #ifndef HAVE_QUEUE
01859         COMPQUIET(rep, NULL);
01860         COMPQUIET(rfp, NULL);
01861         return (__db_no_queue_am(dbenv));
01862 #else
01863         db_pgno_t first, last;
01864         u_int32_t flags;
01865         int empty, ret, t_ret;
01866 #ifdef DIAGNOSTIC
01867         DB_MSGBUF mb;
01868 #endif
01869 
01870         ret = 0;
01871         if (rep->queue_dbp == NULL) {
01872                 /*
01873                  * We need to do a sync here so that the open
01874                  * can find the file and file id.
01875                  */
01876                 if ((ret = __memp_sync(dbenv, NULL)) != 0)
01877                         goto out;
01878                 if ((ret = db_create(&rep->queue_dbp, dbenv, 0)) != 0)
01879                         goto out;
01880                 flags = DB_NO_AUTO_COMMIT |
01881                     (F_ISSET(dbenv, DB_ENV_THREAD) ? DB_THREAD : 0);
01882                 /*
01883                  * We need to check whether this is in-memory so that we pass
01884                  * the name correctly as either the file or the database name.
01885                  */
01886                 if ((ret = __db_open(rep->queue_dbp, NULL,
01887                     FLD_ISSET(rfp->flags, DB_AM_INMEM) ? NULL : rfp->info.data,
01888                     FLD_ISSET(rfp->flags, DB_AM_INMEM) ? rfp->info.data : NULL,
01889                     DB_QUEUE, flags, 0, PGNO_BASE_MD)) != 0)
01890                         goto out;
01891         }
01892         if ((ret = __queue_pageinfo(rep->queue_dbp,
01893             &first, &last, &empty, 0, 0)) != 0)
01894                 goto out;
01895         RPRINT(dbenv, rep, (dbenv, &mb,
01896             "Queue fileinfo: first %lu, last %lu, empty %d",
01897             (u_long)first, (u_long)last, empty));
01898         /*
01899          * We can be at the end of 3 possible states.
01900          * 1.  We have received the meta-page and now need to get the
01901          *     rest of the pages in the database.
01902          * 2.  We have received from first -> max_pgno.  We might be done,
01903          *     or we might need to ask for wrapped pages.
01904          * 3.  We have received all pages in the file.  We're done.
01905          */
01906         if (rfp->max_pgno == 0) {
01907                 /*
01908                  * We have just received the meta page.  Set up the next
01909                  * pages to ask for and check if the file is empty.
01910                  */
01911                 if (empty)
01912                         goto out;
01913                 if (first > last) {
01914                         rfp->max_pgno =
01915                             QAM_RECNO_PAGE(rep->queue_dbp, UINT32_MAX);
01916                 } else
01917                         rfp->max_pgno = last;
01918                 RPRINT(dbenv, rep, (dbenv, &mb,
01919                     "Queue fileinfo: First req: first %lu, last %lu",
01920                     (u_long)first, (u_long)rfp->max_pgno));
01921                 goto req;
01922         } else if (rfp->max_pgno != last) {
01923                 /*
01924                  * If max_pgno != last that means we're dealing with a
01925                  * wrapped situation.  Request next batch of pages.
01926                  * Set npages to 1 because we already have page 0, the
01927                  * meta-page, now we need pages 1-max_pgno.
01928                  */
01929                 first = 1;
01930                 rfp->max_pgno = last;
01931                 RPRINT(dbenv, rep, (dbenv, &mb,
01932                     "Queue fileinfo: Wrap req: first %lu, last %lu",
01933                     (u_long)first, (u_long)last));
01934 req:
01935                 /*
01936                  * Since we're simulating a "gap" to resend new PAGE_REQ
01937                  * for this file, we need to set waiting page to last + 1
01938                  * so that we'll ask for all from ready_pg -> last.
01939                  */
01940                 rep->npages = first;
01941                 rep->ready_pg = first;
01942                 rep->waiting_pg = rfp->max_pgno + 1;
01943                 rep->max_wait_pg = PGNO_INVALID;
01944                 ret = __rep_pggap_req(dbenv, rep, rfp, 0);
01945                 return (ret);
01946         }
01947         /*
01948          * max_pgno == last
01949          * If we get here, we have all the pages we need.
01950          * Close the dbp and return.
01951          */
01952 out:
01953         if (rep->queue_dbp != NULL &&
01954             (t_ret = __db_close(rep->queue_dbp, NULL, DB_NOSYNC)) != 0 &&
01955             ret == 0)
01956                 ret = t_ret;
01957         rep->queue_dbp = NULL;
01958         if (ret == 0)
01959                 ret = DB_REP_PAGEDONE;
01960         return (ret);
01961 #endif
01962 }

Generated on Sun Dec 25 12:14:44 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2