Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

rep_util.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 2001-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  *
00007  * $Id: rep_util.c,v 12.42 2005/10/27 01:26:02 mjc Exp $
00008  */
00009 
00010 #include "db_config.h"
00011 
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #if TIME_WITH_SYS_TIME
00014 #include <sys/time.h>
00015 #include <time.h>
00016 #else
00017 #if HAVE_SYS_TIME_H
00018 #include <sys/time.h>
00019 #else
00020 #include <time.h>
00021 #endif
00022 #endif
00023 
00024 #include <stdlib.h>
00025 #include <string.h>
00026 #endif
00027 
00028 #include "db_int.h"
00029 #include "dbinc/log.h"
00030 #include "dbinc/txn.h"
00031 #ifdef REP_DIAGNOSTIC
00032 #include "dbinc/db_page.h"
00033 #include "dbinc/fop.h"
00034 #include "dbinc/btree.h"
00035 #include "dbinc/hash.h"
00036 #include "dbinc/qam.h"
00037 #endif
00038 
00039 /*
00040  * rep_util.c:
00041  *      Miscellaneous replication-related utility functions, including
00042  *      those called by other subsystems.
00043  */
00044 
00045 #define TIMESTAMP_CHECK(dbenv, ts, renv) do {                           \
00046         if (renv->op_timestamp != 0 &&                                  \
00047             renv->op_timestamp + DB_REGENV_TIMEOUT < ts) {              \
00048                 REP_SYSTEM_LOCK(dbenv);                                 \
00049                 F_CLR(renv, DB_REGENV_REPLOCKED);                       \
00050                 renv->op_timestamp = 0;                                 \
00051                 REP_SYSTEM_UNLOCK(dbenv);                               \
00052         }                                                               \
00053 } while (0)
00054 
00055 #ifdef REP_DIAGNOSTIC
00056 static void __rep_print_logmsg __P((DB_ENV *, const DBT *, DB_LSN *));
00057 #endif
00058 
00059 /*
00060  * __rep_bulk_message --
00061  *      This is a wrapper for putting a record into a bulk buffer.  Since
00062  * we have different bulk buffers, the caller must hand us the information
00063  * we need to put the record into the correct buffer.  All bulk buffers
00064  * are protected by the REP->mtx_clientdb.
00065  *
00066  * PUBLIC: int __rep_bulk_message __P((DB_ENV *, REP_BULK *, REP_THROTTLE *,
00067  * PUBLIC:     DB_LSN *, const DBT *, u_int32_t));
00068  */
00069 int
00070 __rep_bulk_message(dbenv, bulk, repth, lsn, dbt, flags)
00071         DB_ENV *dbenv;
00072         REP_BULK *bulk;
00073         REP_THROTTLE *repth;
00074         DB_LSN *lsn;
00075         const DBT *dbt;
00076         u_int32_t flags;
00077 {
00078         DB_REP *db_rep;
00079         REP *rep;
00080         int ret;
00081         u_int32_t recsize, typemore;
00082         u_int8_t *p;
00083 #ifdef DIAGNOSTIC
00084         DB_MSGBUF mb;
00085 #endif
00086 
00087         db_rep = dbenv->rep_handle;
00088         rep = db_rep->region;
00089         ret = 0;
00090 
00091         /*
00092          * Figure out the total number of bytes needed for this record.
00093          */
00094         recsize = dbt->size + sizeof(DB_LSN) + sizeof(dbt->size);
00095 
00096         /*
00097          * If *this* buffer is actively being transmitted, wait until
00098          * we can use it.
00099          */
00100         MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00101         while (FLD_ISSET(*(bulk->flagsp), BULK_XMIT)) {
00102                 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00103                 __os_sleep(dbenv, 1, 0);
00104                 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00105         }
00106 
00107         /*
00108          * If the record is bigger than the buffer entirely, send the
00109          * current buffer and then return DB_REP_BULKOVF so that this
00110          * record is sent as a singleton.  Do we have enough info to
00111          * do that here?  XXX
00112          */
00113         if (recsize > bulk->len) {
00114                 RPRINT(dbenv, rep, (dbenv, &mb,
00115                     "bulk_msg: Record %d (0x%x) larger than entire buffer 0x%x",
00116                     recsize, recsize, bulk->len));
00117                 rep->stat.st_bulk_overflows++;
00118                 (void)__rep_send_bulk(dbenv, bulk, flags);
00119                 /*
00120                  * XXX __rep_send_message...
00121                  */
00122                 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00123                 return (DB_REP_BULKOVF);
00124         }
00125         /*
00126          * If this record doesn't fit, send the current buffer.
00127          * Sending the buffer will reset the offset, but we will
00128          * drop the mutex while sending so we need to keep checking
00129          * if we're racing.
00130          */
00131         while (recsize + *(bulk->offp) > bulk->len) {
00132                 RPRINT(dbenv, rep, (dbenv, &mb,
00133             "bulk_msg: Record %lu (%#lx) doesn't fit.  Send %lu (%#lx) now.",
00134                     (u_long)recsize, (u_long)recsize,
00135                     (u_long)bulk->len, (u_long)bulk->len));
00136                 rep->stat.st_bulk_fills++;
00137                 if ((ret = __rep_send_bulk(dbenv, bulk, flags)) != 0)
00138                         break;
00139         }
00140 
00141         /*
00142          * If we're using throttling, see if we are at the throttling
00143          * limit before we do any more work here, by checking if the
00144          * call to rep_send_throttle changed the repth->type to the
00145          * *_MORE message type.  If the throttling code hits the limit
00146          * then we're done here.
00147          */
00148         if (bulk->type == REP_BULK_LOG)
00149                 typemore = REP_LOG_MORE;
00150         else
00151                 typemore = REP_PAGE_MORE;
00152         if (repth != NULL &&
00153             (ret = __rep_send_throttle(dbenv, bulk->eid, repth,
00154             REP_THROTTLE_ONLY)) == 0 && repth->type == typemore) {
00155                 RPRINT(dbenv, rep, (dbenv, &mb,
00156                     "bulk_msg: Record %d (0x%x) hit throttle limit.",
00157                     recsize, recsize));
00158                 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00159                 return (ret);
00160         }
00161 
00162         /*
00163          * Now we own the buffer, and we know our record fits into it.
00164          * The buffer is structured with the len, LSN and then the record.
00165          * Copy the record into the buffer.  Then if we need to,
00166          * send the buffer.
00167          */
00168         /*
00169          * First thing is the length of the dbt record.
00170          */
00171         p = bulk->addr + *(bulk->offp);
00172         memcpy(p, &dbt->size, sizeof(dbt->size));
00173         p += sizeof(dbt->size);
00174         /*
00175          * The next thing is the LSN.  We need LSNs for both pages and
00176          * log records.  For log records, this is obviously, the LSN of
00177          * this record.  For pages, the LSN is used by the internal init code.
00178          */
00179         memcpy(p, lsn, sizeof(DB_LSN));
00180         RPRINT(dbenv, rep, (dbenv, &mb,
00181             "bulk_msg: Copying LSN [%lu][%lu] of %lu bytes to %#lx",
00182             (u_long)lsn->file, (u_long)lsn->offset, (u_long)dbt->size,
00183             P_TO_ULONG(p)));
00184         p += sizeof(DB_LSN);
00185         /*
00186          * If we're the first record, we need to save the first
00187          * LSN in the bulk structure.
00188          */
00189         if (*(bulk->offp) == 0)
00190                 bulk->lsn = *lsn;
00191         /*
00192          * Now copy the record and finally adjust the offset.
00193          */
00194         memcpy(p, dbt->data, dbt->size);
00195         p += dbt->size;
00196         *(bulk->offp) = (uintptr_t)p - (uintptr_t)bulk->addr;
00197         rep->stat.st_bulk_records++;
00198         /*
00199          * Send the buffer if it is a perm record or a force.
00200          */
00201         if (LF_ISSET(DB_LOG_PERM) || FLD_ISSET(*(bulk->flagsp), BULK_FORCE)) {
00202                 RPRINT(dbenv, rep, (dbenv, &mb,
00203                     "bulk_msg: Send buffer after copy due to %s",
00204                     LF_ISSET(DB_LOG_PERM) ? "PERM" : "FORCE"));
00205                 ret = __rep_send_bulk(dbenv, bulk, flags);
00206         }
00207         MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00208         return (ret);
00209 
00210 }
00211 
00212 /*
00213  * __rep_send_bulk --
00214  *      This function transmits the bulk buffer given.  It assumes the
00215  * caller holds the REP->mtx_clientdb.  We may release it and reacquire
00216  * it during this call.  We will return with it held.
00217  *
00218  * PUBLIC: int __rep_send_bulk __P((DB_ENV *, REP_BULK *, u_int32_t));
00219  */
00220 int
00221 __rep_send_bulk(dbenv, bulkp, flags)
00222         DB_ENV *dbenv;
00223         REP_BULK *bulkp;
00224         u_int32_t flags;
00225 {
00226         DB_REP *db_rep;
00227         REP *rep;
00228         DBT dbt;
00229         int ret;
00230 #ifdef DIAGNOSTIC
00231         DB_MSGBUF mb;
00232 #endif
00233 
00234         /*
00235          * If the offset is 0, we're done.  There is nothing to send.
00236          */
00237         if (*(bulkp->offp) == 0)
00238                 return (0);
00239 
00240         db_rep = dbenv->rep_handle;
00241         rep = db_rep->region;
00242 
00243         memset(&dbt, 0, sizeof(dbt));
00244         /*
00245          * Set that this buffer is being actively transmitted.
00246          */
00247         FLD_SET(*(bulkp->flagsp), BULK_XMIT);
00248         dbt.data = bulkp->addr;
00249         dbt.size = (u_int32_t)*(bulkp->offp);
00250         MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00251         RPRINT(dbenv, rep, (dbenv, &mb,
00252             "send_bulk: Send %d (0x%x) bulk buffer bytes", dbt.size, dbt.size));
00253         /*
00254          * Unlocked the mutex and now send the message.
00255          */
00256         rep->stat.st_bulk_transfers++;
00257         ret = __rep_send_message(dbenv, bulkp->eid, bulkp->type, &bulkp->lsn,
00258             &dbt, flags, 0);
00259 
00260         MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00261         /*
00262          * If we're successful, reset the offset pointer to 0.
00263          * Clear the transmit flag regardless.
00264          */
00265         if (ret == 0)
00266                 *(bulkp->offp) = 0;
00267         FLD_CLR(*(bulkp->flagsp), BULK_XMIT);
00268         return (ret);
00269 }
00270 
00271 /*
00272  * __rep_bulk_alloc --
00273  *      This function allocates and initializes an internal bulk buffer.
00274  * This is used by the master when fulfilling a request for a chunk of
00275  * log records or a bunch of pages.
00276  *
00277  * PUBLIC: int __rep_bulk_alloc __P((DB_ENV *, REP_BULK *, int, uintptr_t *,
00278  * PUBLIC:    u_int32_t *, u_int32_t));
00279  */
00280 int
00281 __rep_bulk_alloc(dbenv, bulkp, eid, offp, flagsp, type)
00282         DB_ENV *dbenv;
00283         REP_BULK *bulkp;
00284         int eid;
00285         uintptr_t *offp;
00286         u_int32_t *flagsp, type;
00287 {
00288         int ret;
00289 
00290         memset(bulkp, 0, sizeof(REP_BULK));
00291         *offp = *flagsp = 0;
00292         bulkp->len = MEGABYTE;
00293         if ((ret = __os_malloc(dbenv, bulkp->len, &bulkp->addr)) != 0)
00294                 return (ret);
00295         bulkp->offp = offp;
00296         bulkp->type = type;
00297         bulkp->eid = eid;
00298         bulkp->flagsp = flagsp;
00299         return (ret);
00300 }
00301 
00302 /*
00303  * __rep_bulk_free --
00304  *      This function sends the remainder of the bulk buffer and frees it.
00305  *
00306  * PUBLIC: int __rep_bulk_free __P((DB_ENV *, REP_BULK *, u_int32_t));
00307  */
00308 int
00309 __rep_bulk_free(dbenv, bulkp, flags)
00310         DB_ENV *dbenv;
00311         REP_BULK *bulkp;
00312         u_int32_t flags;
00313 {
00314         DB_REP *db_rep;
00315         int ret;
00316 
00317         db_rep = dbenv->rep_handle;
00318 
00319         MUTEX_LOCK(dbenv, db_rep->region->mtx_clientdb);
00320         ret = __rep_send_bulk(dbenv, bulkp, flags);
00321         MUTEX_UNLOCK(dbenv, db_rep->region->mtx_clientdb);
00322         __os_free(dbenv, bulkp->addr);
00323         return (ret);
00324 }
00325 
00326 /*
00327  * __rep_send_message --
00328  *      This is a wrapper for sending a message.  It takes care of constructing
00329  * the REP_CONTROL structure and calling the user's specified send function.
00330  *
00331  * PUBLIC: int __rep_send_message __P((DB_ENV *, int,
00332  * PUBLIC:     u_int32_t, DB_LSN *, const DBT *, u_int32_t, u_int32_t));
00333  */
00334 int
00335 __rep_send_message(dbenv, eid, rtype, lsnp, dbt, logflags, repflags)
00336         DB_ENV *dbenv;
00337         int eid;
00338         u_int32_t rtype;
00339         DB_LSN *lsnp;
00340         const DBT *dbt;
00341         u_int32_t logflags, repflags;
00342 {
00343         DB_REP *db_rep;
00344         REP *rep;
00345         DBT cdbt, scrap_dbt;
00346         REP_CONTROL cntrl;
00347         int ret;
00348         u_int32_t myflags, rectype;
00349 #ifdef DIAGNOSTIC
00350         DB_MSGBUF mb;
00351 #endif
00352 
00353         db_rep = dbenv->rep_handle;
00354         rep = db_rep->region;
00355 
00356         /* Set up control structure. */
00357         memset(&cntrl, 0, sizeof(cntrl));
00358         if (lsnp == NULL)
00359                 ZERO_LSN(cntrl.lsn);
00360         else
00361                 cntrl.lsn = *lsnp;
00362         cntrl.rectype = rtype;
00363         cntrl.flags = logflags;
00364         cntrl.rep_version = DB_REPVERSION;
00365         cntrl.log_version = DB_LOGVERSION;
00366         cntrl.gen = rep->gen;
00367 
00368         memset(&cdbt, 0, sizeof(cdbt));
00369         cdbt.data = &cntrl;
00370         cdbt.size = sizeof(cntrl);
00371 
00372         /* Don't assume the send function will be tolerant of NULL records. */
00373         if (dbt == NULL) {
00374                 memset(&scrap_dbt, 0, sizeof(DBT));
00375                 dbt = &scrap_dbt;
00376         }
00377 
00378         REP_PRINT_MESSAGE(dbenv, eid, &cntrl, "rep_send_message");
00379 #ifdef REP_DIAGNOSTIC
00380         if (FLD_ISSET(dbenv->verbose, DB_VERB_REPLICATION) && rtype == REP_LOG)
00381                 __rep_print_logmsg(dbenv, dbt, lsnp);
00382 #endif
00383         /*
00384          * There are several types of records: commit and checkpoint records
00385          * that affect database durability, regular log records that might
00386          * be buffered on the master before being transmitted, and control
00387          * messages which don't require the guarantees of permanency, but
00388          * should not be buffered.
00389          *
00390          * There are request records that can be sent anywhere, and there
00391          * are rerequest records that the app might want to send to the master.
00392          */
00393         myflags = repflags;
00394         if (FLD_ISSET(logflags, DB_LOG_PERM))
00395                 myflags |= DB_REP_PERMANENT;
00396         else if (rtype != REP_LOG || FLD_ISSET(logflags, DB_LOG_RESEND))
00397                 myflags |= DB_REP_NOBUFFER;
00398         if (rtype == REP_LOG && !FLD_ISSET(logflags, DB_LOG_PERM)) {
00399                 /*
00400                  * Check if this is a log record we just read that
00401                  * may need a DB_LOG_PERM.  This is of type REP_LOG,
00402                  * so we know that dbt is a log record.
00403                  */
00404                 memcpy(&rectype, dbt->data, sizeof(rectype));
00405                 if (rectype == DB___txn_regop || rectype == DB___txn_ckp)
00406                         F_SET(&cntrl, DB_LOG_PERM);
00407          }
00408 
00409         /*
00410          * We set the LSN above to something valid.  Give the master the
00411          * actual LSN so that they can coordinate with permanent records from
00412          * the client if they want to.
00413          */
00414         ret = dbenv->rep_send(dbenv, &cdbt, dbt, &cntrl.lsn, eid, myflags);
00415 
00416         /*
00417          * We don't hold the rep lock, so this could miscount if we race.
00418          * I don't think it's worth grabbing the mutex for that bit of
00419          * extra accuracy.
00420          */
00421         if (ret == 0)
00422                 rep->stat.st_msgs_sent++;
00423         else {
00424                 rep->stat.st_msgs_send_failures++;
00425                 RPRINT(dbenv, rep, (dbenv, &mb,
00426                     "rep_send_function returned: %d", ret));
00427         }
00428         return (ret);
00429 }
00430 
00431 #ifdef REP_DIAGNOSTIC
00432 /*
00433  * __rep_print_logmsg --
00434  *      This is a debugging routine for printing out log records that
00435  * we are about to transmit to a client.
00436  */
00437 static void
00438 __rep_print_logmsg(dbenv, logdbt, lsnp)
00439         DB_ENV *dbenv;
00440         const DBT *logdbt;
00441         DB_LSN *lsnp;
00442 {
00443         /* Static structures to hold the printing functions. */
00444         static int (**ptab)__P((DB_ENV *,
00445             DBT *, DB_LSN *, db_recops, void *)) = NULL;
00446         size_t ptabsize = 0;
00447 
00448         if (ptabsize == 0) {
00449                 /* Initialize the table. */
00450                 (void)__bam_init_print(dbenv, &ptab, &ptabsize);
00451                 (void)__crdel_init_print(dbenv, &ptab, &ptabsize);
00452                 (void)__db_init_print(dbenv, &ptab, &ptabsize);
00453                 (void)__dbreg_init_print(dbenv, &ptab, &ptabsize);
00454                 (void)__fop_init_print(dbenv, &ptab, &ptabsize);
00455                 (void)__ham_init_print(dbenv, &ptab, &ptabsize);
00456                 (void)__qam_init_print(dbenv, &ptab, &ptabsize);
00457                 (void)__txn_init_print(dbenv, &ptab, &ptabsize);
00458         }
00459 
00460         (void)__db_dispatch(dbenv,
00461             ptab, ptabsize, (DBT *)logdbt, lsnp, DB_TXN_PRINT, NULL);
00462 }
00463 #endif
00464 
00465 /*
00466  * __rep_new_master --
00467  *      Called after a master election to sync back up with a new master.
00468  * It's possible that we already know of this new master in which case
00469  * we don't need to do anything.
00470  *
00471  * This is written assuming that this message came from the master; we
00472  * need to enforce that in __rep_process_record, but right now, we have
00473  * no way to identify the master.
00474  *
00475  * PUBLIC: int __rep_new_master __P((DB_ENV *, REP_CONTROL *, int));
00476  */
00477 int
00478 __rep_new_master(dbenv, cntrl, eid)
00479         DB_ENV *dbenv;
00480         REP_CONTROL *cntrl;
00481         int eid;
00482 {
00483         DB_LOG *dblp;
00484         DB_LOGC *logc;
00485         DB_LSN first_lsn, lsn;
00486         DB_REP *db_rep;
00487         DBT dbt;
00488         LOG *lp;
00489         REGENV *renv;
00490         REGINFO *infop;
00491         REP *rep;
00492         int change, do_req, ret, t_ret;
00493 #ifdef DIAGNOSTIC
00494         DB_MSGBUF mb;
00495 #endif
00496 
00497         db_rep = dbenv->rep_handle;
00498         rep = db_rep->region;
00499         ret = 0;
00500         logc = NULL;
00501         REP_SYSTEM_LOCK(dbenv);
00502         __rep_elect_done(dbenv, rep);
00503         change = rep->gen != cntrl->gen || rep->master_id != eid;
00504         if (change) {
00505                 RPRINT(dbenv, rep, (dbenv, &mb,
00506                     "Updating gen from %lu to %lu from master %d",
00507                     (u_long)rep->gen, (u_long)cntrl->gen, eid));
00508                 rep->gen = cntrl->gen;
00509                 if (rep->egen <= rep->gen)
00510                         rep->egen = rep->gen + 1;
00511                 RPRINT(dbenv, rep, (dbenv, &mb,
00512                     "Egen is %lu", (u_long)rep->egen));
00513                 rep->master_id = eid;
00514                 rep->stat.st_master_changes++;
00515                 rep->stat.st_startup_complete = 0;
00516                 /*
00517                  * If we're delaying client sync-up, we know we have a
00518                  * new/changed master now, set flag indicating we are
00519                  * actively delaying.
00520                  */
00521                 if (FLD_ISSET(rep->config, REP_C_DELAYCLIENT))
00522                         F_SET(rep, REP_F_DELAY);
00523                 /*
00524                  * If we are already locking out others, we're either
00525                  * in the middle of sync-up recovery or internal init
00526                  * when this newmaster comes in (we also lockout in
00527                  * rep_start, but we cannot be racing that because we
00528                  * don't allow rep_proc_msg when rep_start is going on).
00529                  *
00530                  * If we were in the middle of an internal initialization
00531                  * and we've discovered a new master instead, clean up
00532                  * our old internal init information.  We need to clean
00533                  * up any flags and unlock our lockout.
00534                  */
00535                 if (rep->in_recovery || F_ISSET(rep, REP_F_READY)) {
00536                         (void)__rep_init_cleanup(dbenv, rep, DB_FORCE);
00537                         F_CLR(rep, REP_F_RECOVER_MASK);
00538                         rep->in_recovery = 0;
00539                         F_CLR(rep, REP_F_READY);
00540                 }
00541                 F_SET(rep, REP_F_NOARCHIVE | REP_F_RECOVER_VERIFY);
00542         }
00543         REP_SYSTEM_UNLOCK(dbenv);
00544 
00545         dblp = dbenv->lg_handle;
00546         lp = dblp->reginfo.primary;
00547         LOG_SYSTEM_LOCK(dbenv);
00548         lsn = lp->lsn;
00549         LOG_SYSTEM_UNLOCK(dbenv);
00550 
00551         if (!change) {
00552                 /*
00553                  * If there wasn't a change, we might still have some
00554                  * catching up or verification to do.
00555                  */
00556                 ret = 0;
00557                 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00558                 do_req = __rep_check_doreq(dbenv, rep);
00559                 if (F_ISSET(rep, REP_F_RECOVER_VERIFY)) {
00560                         lsn = lp->verify_lsn;
00561                         MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00562                         if (!F_ISSET(rep, REP_F_DELAY) &&
00563                             !IS_ZERO_LSN(lsn) && do_req)
00564                                 (void)__rep_send_message(dbenv, eid,
00565                                     REP_VERIFY_REQ, &lsn, NULL, 0,
00566                                     DB_REP_ANYWHERE);
00567                 } else {
00568                         MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00569                         if (log_compare(&lsn, &cntrl->lsn) < 0 && do_req)
00570                                 (void)__rep_send_message(dbenv, eid,
00571                                     REP_ALL_REQ, &lsn, NULL,
00572                                     0, DB_REP_ANYWHERE);
00573                         REP_SYSTEM_LOCK(dbenv);
00574                         F_CLR(rep, REP_F_NOARCHIVE);
00575                         REP_SYSTEM_UNLOCK(dbenv);
00576                 }
00577                 return (ret);
00578         }
00579 
00580         /*
00581          * If the master changed, we need to start the process of
00582          * figuring out what our last valid log record is.  However,
00583          * if both the master and we agree that the max LSN is 0,0,
00584          * then there is no recovery to be done.  If we are at 0 and
00585          * the master is not, then we just need to request all the log
00586          * records from the master.
00587          */
00588         if (IS_INIT_LSN(lsn) || IS_ZERO_LSN(lsn)) {
00589                 /*
00590                  * If we have no log, then we have no files to open
00591                  * in recovery, but we've opened what we can, which
00592                  * is none.  Mark DBREP_OPENFILES here.
00593                  */
00594 empty:          MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00595                 F_SET(db_rep, DBREP_OPENFILES);
00596                 ZERO_LSN(lp->verify_lsn);
00597                 REP_SYSTEM_LOCK(dbenv);
00598                 F_CLR(rep, REP_F_NOARCHIVE | REP_F_RECOVER_MASK);
00599                 REP_SYSTEM_UNLOCK(dbenv);
00600 
00601                 if (!IS_INIT_LSN(cntrl->lsn)) {
00602                         /*
00603                          * We're making an ALL_REQ.  But now that we've
00604                          * cleared the flags, we're likely receiving new
00605                          * log records from the master, resulting in a gap
00606                          * immediately.  So to avoid multiple data streams,
00607                          * set the wait_recs value high now to give the master
00608                          * a chance to start sending us these records before
00609                          * the gap code re-requests the same gap.  Wait_recs
00610                          * will get reset once we start receiving these
00611                          * records.
00612                          */
00613                         lp->wait_recs = rep->max_gap;
00614                         MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00615                         /*
00616                          * Don't send the ALL_REQ if we're delayed.  But we
00617                          * check here, after lp->wait_recs is set up so that
00618                          * when the app calls rep_sync, everything is ready
00619                          * to go.
00620                          */
00621                         if (!F_ISSET(rep, REP_F_DELAY))
00622                                 (void)__rep_send_message(dbenv, eid,
00623                                     REP_ALL_REQ, &lsn, NULL,
00624                                     0, DB_REP_ANYWHERE);
00625                 } else
00626                         MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00627 
00628                 return (DB_REP_NEWMASTER);
00629         }
00630 
00631         memset(&dbt, 0, sizeof(dbt));
00632         /*
00633          * If this client is farther ahead on the log file than the master, see
00634          * if there is any overlap in the logs.  If not, the client is too
00635          * far ahead of the master and we cannot determine they're part of
00636          * the same replication group.
00637          */
00638         if (cntrl->lsn.file < lsn.file) {
00639                 if ((ret = __log_cursor(dbenv, &logc)) != 0)
00640                         goto err;
00641                 if ((ret = __log_c_get(logc, &first_lsn, &dbt, DB_FIRST)) != 0)
00642                         goto err;
00643                 if (cntrl->lsn.file < first_lsn.file) {
00644                         __db_err(dbenv,
00645     "Client too far ahead of master; unable to join replication group");
00646                         ret = DB_REP_JOIN_FAILURE;
00647                         goto err;
00648                 }
00649                 ret = __log_c_close(logc);
00650                 logc = NULL;
00651                 if (ret != 0)
00652                         goto err;
00653         }
00654         if ((ret = __log_cursor(dbenv, &logc)) != 0)
00655                 goto err;
00656         ret = __rep_log_backup(logc, &lsn);
00657 err:    if (logc != NULL && (t_ret = __log_c_close(logc)) != 0 && ret == 0)
00658                 ret = t_ret;
00659         if (ret == DB_NOTFOUND) {
00660                 /*
00661                  * If we don't have an identification record, we still might
00662                  * have some log records but we're discarding them to sync
00663                  * up with the master from the start.  Therefore,
00664                  * truncate our log and go to the no log case.
00665                  */
00666                 INIT_LSN(lsn);
00667                 RPRINT(dbenv, rep, (dbenv, &mb,
00668                     "No commit or ckp found.  Truncate log."));
00669                 (void)__log_vtruncate(dbenv, &lsn, &lsn, NULL);
00670                 infop = dbenv->reginfo;
00671                 renv = infop->primary;
00672                 REP_SYSTEM_LOCK(dbenv);
00673                 (void)time(&renv->rep_timestamp);
00674                 REP_SYSTEM_UNLOCK(dbenv);
00675                 goto empty;
00676         }
00677 
00678         /*
00679          * If we failed here, we need to clear the flags we may
00680          * have set above because we're not going to be setting
00681          * the verify_lsn.
00682          */
00683         if (ret != 0) {
00684                 REP_SYSTEM_LOCK(dbenv);
00685                 F_CLR(rep, REP_F_RECOVER_MASK | REP_F_DELAY);
00686                 REP_SYSTEM_UNLOCK(dbenv);
00687                 return (ret);
00688         }
00689 
00690         /*
00691          * Finally, we have a record to ask for.
00692          */
00693         MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00694         lp->verify_lsn = lsn;
00695         lp->rcvd_recs = 0;
00696         lp->wait_recs = rep->request_gap;
00697         MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00698         if (!F_ISSET(rep, REP_F_DELAY))
00699                 (void)__rep_send_message(dbenv,
00700                     eid, REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_ANYWHERE);
00701 
00702         return (DB_REP_NEWMASTER);
00703 }
00704 
00705 /*
00706  * __rep_is_client
00707  *      Used by other subsystems to figure out if this is a replication
00708  * client site.
00709  *
00710  * PUBLIC: int __rep_is_client __P((DB_ENV *));
00711  */
00712 int
00713 __rep_is_client(dbenv)
00714         DB_ENV *dbenv;
00715 {
00716         DB_REP *db_rep;
00717         REP *rep;
00718 
00719         if (!REP_ON(dbenv))
00720                 return (0);
00721 
00722         db_rep = dbenv->rep_handle;
00723         rep = db_rep->region;
00724 
00725         /*
00726          * Don't just return F_ISSET since that converts unsigned
00727          * into signed.
00728          */
00729         return (F_ISSET(rep, REP_F_CLIENT) ? 1 : 0);
00730 }
00731 
00732 /*
00733  * __rep_noarchive
00734  *      Used by log_archive to determine if it is okay to remove
00735  * log files.
00736  *
00737  * PUBLIC: int __rep_noarchive __P((DB_ENV *));
00738  */
00739 int
00740 __rep_noarchive(dbenv)
00741         DB_ENV *dbenv;
00742 {
00743         DB_REP *db_rep;
00744         REGENV *renv;
00745         REGINFO *infop;
00746         REP *rep;
00747         time_t timestamp;
00748 
00749         infop = dbenv->reginfo;
00750         renv = infop->primary;
00751 
00752         /*
00753          * This is tested before REP_ON below because we always need
00754          * to obey if any replication process has disabled archiving.
00755          * Everything is in the environment region that we need here.
00756          */
00757         if (F_ISSET(renv, DB_REGENV_REPLOCKED)) {
00758                 (void)time(&timestamp);
00759                 TIMESTAMP_CHECK(dbenv, timestamp, renv);
00760                 /*
00761                  * Check if we're still locked out after checking
00762                  * the timestamp.
00763                  */
00764                 if (F_ISSET(renv, DB_REGENV_REPLOCKED))
00765                         return (EINVAL);
00766         }
00767 
00768         if (!REP_ON(dbenv))
00769                 return (0);
00770         db_rep = dbenv->rep_handle;
00771         rep = db_rep->region;
00772         if (F_ISSET(rep, REP_F_NOARCHIVE))
00773                 return (1);
00774         return (0);
00775 }
00776 
00777 /*
00778  * __rep_send_vote
00779  *      Send this site's vote for the election.
00780  *
00781  * PUBLIC: void __rep_send_vote __P((DB_ENV *, DB_LSN *, int, int, int,
00782  * PUBLIC:    u_int32_t, u_int32_t, int, u_int32_t));
00783  */
00784 void
00785 __rep_send_vote(dbenv, lsnp, nsites, nvotes, pri, tie, egen, eid, vtype)
00786         DB_ENV *dbenv;
00787         DB_LSN *lsnp;
00788         int eid, nsites, nvotes, pri;
00789         u_int32_t egen, tie, vtype;
00790 {
00791         DBT vote_dbt;
00792         REP_VOTE_INFO vi;
00793 
00794         memset(&vi, 0, sizeof(vi));
00795 
00796         vi.egen = egen;
00797         vi.priority = pri;
00798         vi.nsites = nsites;
00799         vi.nvotes = nvotes;
00800         vi.tiebreaker = tie;
00801 
00802         memset(&vote_dbt, 0, sizeof(vote_dbt));
00803         vote_dbt.data = &vi;
00804         vote_dbt.size = sizeof(vi);
00805 
00806         (void)__rep_send_message(dbenv, eid, vtype, lsnp, &vote_dbt, 0, 0);
00807 }
00808 
00809 /*
00810  * __rep_elect_done
00811  *      Clear all election information for this site.  Assumes the
00812  *      caller hold the region mutex.
00813  *
00814  * PUBLIC: void __rep_elect_done __P((DB_ENV *, REP *));
00815  */
00816 void
00817 __rep_elect_done(dbenv, rep)
00818         DB_ENV *dbenv;
00819         REP *rep;
00820 {
00821         int inelect;
00822         u_int32_t endsec, endusec;
00823 #ifdef DIAGNOSTIC
00824         DB_MSGBUF mb;
00825 #else
00826         COMPQUIET(dbenv, NULL);
00827 #endif
00828         inelect = IN_ELECTION_TALLY(rep);
00829         F_CLR(rep, REP_F_EPHASE1 | REP_F_EPHASE2 | REP_F_TALLY);
00830         rep->sites = 0;
00831         rep->votes = 0;
00832         if (inelect) {
00833                 if (rep->esec != 0) {
00834                         __os_clock(dbenv, &endsec, &endusec);
00835                         __db_difftime(rep->esec, endsec, rep->eusec, endusec,
00836                             &rep->stat.st_election_sec,
00837                             &rep->stat.st_election_usec);
00838                         RPRINT(dbenv, rep, (dbenv, &mb,
00839                             "Election finished in %u.%06u sec",
00840                             rep->stat.st_election_sec,
00841                             rep->stat.st_election_usec));
00842                         rep->esec = 0;
00843                         rep->eusec = 0;
00844                 }
00845                 rep->egen++;
00846         }
00847         RPRINT(dbenv, rep, (dbenv, &mb,
00848             "Election done; egen %lu", (u_long)rep->egen));
00849 }
00850 
00851 /*
00852  * __rep_grow_sites --
00853  *      Called to allocate more space in the election tally information.
00854  * Called with the rep mutex held.  We need to call the region mutex, so
00855  * we need to make sure that we *never* acquire those mutexes in the
00856  * opposite order.
00857  *
00858  * PUBLIC: int __rep_grow_sites __P((DB_ENV *dbenv, int nsites));
00859  */
00860 int
00861 __rep_grow_sites(dbenv, nsites)
00862         DB_ENV *dbenv;
00863         int nsites;
00864 {
00865         REGENV *renv;
00866         REGINFO *infop;
00867         REP *rep;
00868         int nalloc, ret, *tally;
00869 
00870         rep = ((DB_REP *)dbenv->rep_handle)->region;
00871 
00872         /*
00873          * Allocate either twice the current allocation or nsites,
00874          * whichever is more.
00875          */
00876         nalloc = 2 * rep->asites;
00877         if (nalloc < nsites)
00878                 nalloc = nsites;
00879 
00880         infop = dbenv->reginfo;
00881         renv = infop->primary;
00882         MUTEX_LOCK(dbenv, renv->mtx_regenv);
00883 
00884         /*
00885          * We allocate 2 tally regions, one for tallying VOTE1's and
00886          * one for VOTE2's.  Always grow them in tandem, because if we
00887          * get more VOTE1's we'll always expect more VOTE2's then too.
00888          */
00889         if ((ret = __db_shalloc(infop,
00890             (size_t)nalloc * sizeof(REP_VTALLY), sizeof(REP_VTALLY),
00891             &tally)) == 0) {
00892                 if (rep->tally_off != INVALID_ROFF)
00893                          __db_shalloc_free(
00894                              infop, R_ADDR(infop, rep->tally_off));
00895                 rep->tally_off = R_OFFSET(infop, tally);
00896                 if ((ret = __db_shalloc(infop,
00897                     (size_t)nalloc * sizeof(REP_VTALLY), sizeof(REP_VTALLY),
00898                     &tally)) == 0) {
00899                         /* Success */
00900                         if (rep->v2tally_off != INVALID_ROFF)
00901                                  __db_shalloc_free(infop,
00902                                     R_ADDR(infop, rep->v2tally_off));
00903                         rep->v2tally_off = R_OFFSET(infop, tally);
00904                         rep->asites = nalloc;
00905                         rep->nsites = nsites;
00906                 } else {
00907                         /*
00908                          * We were unable to allocate both.  So, we must
00909                          * free the first one and reinitialize.  If
00910                          * v2tally_off is valid, it is from an old
00911                          * allocation and we are clearing it all out due
00912                          * to the error.
00913                          */
00914                         if (rep->v2tally_off != INVALID_ROFF)
00915                                  __db_shalloc_free(infop,
00916                                     R_ADDR(infop, rep->v2tally_off));
00917                         __db_shalloc_free(infop,
00918                             R_ADDR(infop, rep->tally_off));
00919                         rep->v2tally_off = rep->tally_off = INVALID_ROFF;
00920                         rep->asites = 0;
00921                         rep->nsites = 0;
00922                 }
00923         }
00924         MUTEX_UNLOCK(dbenv, renv->mtx_regenv);
00925         return (ret);
00926 }
00927 
00928 /*
00929  * __env_rep_enter --
00930  *
00931  * Check if we are in the middle of replication initialization and/or
00932  * recovery, and if so, disallow operations.  If operations are allowed,
00933  * increment handle-counts, so that we do not start recovery while we
00934  * are operating in the library.
00935  *
00936  * PUBLIC: int __env_rep_enter __P((DB_ENV *, int));
00937  */
00938 int
00939 __env_rep_enter(dbenv, checklock)
00940         DB_ENV *dbenv;
00941         int checklock;
00942 {
00943         DB_REP *db_rep;
00944         REGENV *renv;
00945         REGINFO *infop;
00946         REP *rep;
00947         int cnt;
00948         time_t  timestamp;
00949 
00950         /* Check if locks have been globally turned off. */
00951         if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
00952                 return (0);
00953 
00954         db_rep = dbenv->rep_handle;
00955         rep = db_rep->region;
00956 
00957         infop = dbenv->reginfo;
00958         renv = infop->primary;
00959         if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) {
00960                 (void)time(&timestamp);
00961                 TIMESTAMP_CHECK(dbenv, timestamp, renv);
00962                 /*
00963                  * Check if we're still locked out after checking
00964                  * the timestamp.
00965                  */
00966                 if (F_ISSET(renv, DB_REGENV_REPLOCKED))
00967                         return (EINVAL);
00968         }
00969 
00970         REP_SYSTEM_LOCK(dbenv);
00971         for (cnt = 0; rep->in_recovery;) {
00972                 REP_SYSTEM_UNLOCK(dbenv);
00973                 if (FLD_ISSET(rep->config, REP_C_NOWAIT)) {
00974                         __db_err(dbenv,
00975     "Operation locked out.  Waiting for replication recovery to complete");
00976                         return (DB_REP_LOCKOUT);
00977                 }
00978                 __os_sleep(dbenv, 1, 0);
00979                 REP_SYSTEM_LOCK(dbenv);
00980                 if (++cnt % 60 == 0)
00981                         __db_err(dbenv,
00982     "DB_ENV handle waiting %d minutes for replication recovery to complete",
00983                             cnt / 60);
00984         }
00985         rep->handle_cnt++;
00986         REP_SYSTEM_UNLOCK(dbenv);
00987 
00988         return (0);
00989 }
00990 
00991 /*
00992  * __env_db_rep_exit --
00993  *
00994  *      Decrement handle count upon routine exit.
00995  *
00996  * PUBLIC: int __env_db_rep_exit __P((DB_ENV *));
00997  */
00998 int
00999 __env_db_rep_exit(dbenv)
01000         DB_ENV *dbenv;
01001 {
01002         DB_REP *db_rep;
01003         REP *rep;
01004 
01005         /* Check if locks have been globally turned off. */
01006         if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
01007                 return (0);
01008 
01009         db_rep = dbenv->rep_handle;
01010         rep = db_rep->region;
01011 
01012         REP_SYSTEM_LOCK(dbenv);
01013         rep->handle_cnt--;
01014         REP_SYSTEM_UNLOCK(dbenv);
01015 
01016         return (0);
01017 }
01018 
01019 /*
01020  * __db_rep_enter --
01021  *      Called in replicated environments to keep track of in-use handles
01022  * and prevent any concurrent operation during recovery.  If checkgen is
01023  * non-zero, then we verify that the dbp has the same handle as the env.
01024  *
01025  * If return_now is non-zero, we'll return DB_DEADLOCK immediately, else we'll
01026  * sleep before returning DB_DEADLOCK.  Without the sleep, it is likely
01027  * the application will immediately try again and could reach a retry
01028  * limit before replication has a chance to finish.  The sleep increases
01029  * the probability that an application retry will succeed.
01030  *
01031  * PUBLIC: int __db_rep_enter __P((DB *, int, int, int));
01032  */
01033 int
01034 __db_rep_enter(dbp, checkgen, checklock, return_now)
01035         DB *dbp;
01036         int checkgen, checklock, return_now;
01037 {
01038         DB_ENV *dbenv;
01039         DB_REP *db_rep;
01040         REGENV *renv;
01041         REGINFO *infop;
01042         REP *rep;
01043         time_t  timestamp;
01044 
01045         dbenv = dbp->dbenv;
01046         /* Check if locks have been globally turned off. */
01047         if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
01048                 return (0);
01049 
01050         db_rep = dbenv->rep_handle;
01051         rep = db_rep->region;
01052         infop = dbenv->reginfo;
01053         renv = infop->primary;
01054 
01055         if (checklock && F_ISSET(renv, DB_REGENV_REPLOCKED)) {
01056                 (void)time(&timestamp);
01057                 TIMESTAMP_CHECK(dbenv, timestamp, renv);
01058                 /*
01059                  * Check if we're still locked out after checking
01060                  * the timestamp.
01061                  */
01062                 if (F_ISSET(renv, DB_REGENV_REPLOCKED))
01063                         return (EINVAL);
01064         }
01065         REP_SYSTEM_LOCK(dbenv);
01066         if (F_ISSET(rep, REP_F_READY)) {
01067                 REP_SYSTEM_UNLOCK(dbenv);
01068                 if (!return_now)
01069                         __os_sleep(dbenv, 5, 0);
01070                 return (DB_LOCK_DEADLOCK);
01071         }
01072 
01073         if (checkgen && dbp->timestamp != renv->rep_timestamp) {
01074                 REP_SYSTEM_UNLOCK(dbenv);
01075                 __db_err(dbenv, "%s %s",
01076                     "replication recovery unrolled committed transactions;",
01077                     "open DB and DBcursor handles must be closed");
01078                 return (DB_REP_HANDLE_DEAD);
01079         }
01080         rep->handle_cnt++;
01081         REP_SYSTEM_UNLOCK(dbenv);
01082 
01083         return (0);
01084 }
01085 
01086 /*
01087  * __op_rep_enter --
01088  *
01089  *      Check if we are in the middle of replication initialization and/or
01090  * recovery, and if so, disallow new multi-step operations, such as
01091  * transaction and memp gets.  If operations are allowed,
01092  * increment the op_cnt, so that we do not start recovery while we have
01093  * active operations.
01094  *
01095  * PUBLIC: int __op_rep_enter __P((DB_ENV *));
01096  */
01097 int
01098 __op_rep_enter(dbenv)
01099         DB_ENV *dbenv;
01100 {
01101         DB_REP *db_rep;
01102         REP *rep;
01103         int cnt;
01104 
01105         /* Check if locks have been globally turned off. */
01106         if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
01107                 return (0);
01108 
01109         db_rep = dbenv->rep_handle;
01110         rep = db_rep->region;
01111 
01112         REP_SYSTEM_LOCK(dbenv);
01113         for (cnt = 0; F_ISSET(rep, REP_F_READY);) {
01114                 REP_SYSTEM_UNLOCK(dbenv);
01115                 if (FLD_ISSET(rep->config, REP_C_NOWAIT)) {
01116                         __db_err(dbenv,
01117     "Operation locked out.  Waiting for replication recovery to complete");
01118                         return (DB_REP_LOCKOUT);
01119                 }
01120                 __os_sleep(dbenv, 5, 0);
01121                 cnt += 5;
01122                 REP_SYSTEM_LOCK(dbenv);
01123                 if (cnt % 60 == 0)
01124                         __db_err(dbenv,
01125         "__op_rep_enter waiting %d minutes for op count to drain",
01126                             cnt / 60);
01127         }
01128         rep->op_cnt++;
01129         REP_SYSTEM_UNLOCK(dbenv);
01130 
01131         return (0);
01132 }
01133 
01134 /*
01135  * __op_rep_exit --
01136  *
01137  *      Decrement op count upon transaction commit/abort/discard or
01138  *      memp_fput.
01139  *
01140  * PUBLIC: int __op_rep_exit __P((DB_ENV *));
01141  */
01142 int
01143 __op_rep_exit(dbenv)
01144         DB_ENV *dbenv;
01145 {
01146         DB_REP *db_rep;
01147         REP *rep;
01148 
01149         /* Check if locks have been globally turned off. */
01150         if (F_ISSET(dbenv, DB_ENV_NOLOCKING))
01151                 return (0);
01152 
01153         db_rep = dbenv->rep_handle;
01154         rep = db_rep->region;
01155 
01156         REP_SYSTEM_LOCK(dbenv);
01157         DB_ASSERT(rep->op_cnt > 0);
01158         rep->op_cnt--;
01159         REP_SYSTEM_UNLOCK(dbenv);
01160 
01161         return (0);
01162 }
01163 
01164 /*
01165  * __rep_get_gen --
01166  *
01167  *      Get the generation number from a replicated environment.
01168  *
01169  * PUBLIC: int __rep_get_gen __P((DB_ENV *, u_int32_t *));
01170  */
01171 int
01172 __rep_get_gen(dbenv, genp)
01173         DB_ENV *dbenv;
01174         u_int32_t *genp;
01175 {
01176         DB_REP *db_rep;
01177         REP *rep;
01178 
01179         db_rep = dbenv->rep_handle;
01180         rep = db_rep->region;
01181 
01182         REP_SYSTEM_LOCK(dbenv);
01183         if (rep->recover_gen > rep->gen)
01184                 *genp = rep->recover_gen;
01185         else
01186                 *genp = rep->gen;
01187         REP_SYSTEM_UNLOCK(dbenv);
01188 
01189         return (0);
01190 }
01191 
01192 /*
01193  * __rep_lockout --
01194  *      Coordinate with other threads in the library and active txns so
01195  *      that we can run single-threaded, for recovery or internal backup.
01196  *      Assumes the caller holds the region mutex.
01197  *
01198  * PUBLIC: int __rep_lockout __P((DB_ENV *, REP *, u_int32_t));
01199  */
01200 int
01201 __rep_lockout(dbenv, rep, msg_th)
01202         DB_ENV *dbenv;
01203         REP *rep;
01204         u_int32_t msg_th;
01205 {
01206         int wait_cnt;
01207 
01208         /* Phase 1: set REP_F_READY and wait for op_cnt to go to 0. */
01209         F_SET(rep, REP_F_READY);
01210         for (wait_cnt = 0; rep->op_cnt != 0;) {
01211                 REP_SYSTEM_UNLOCK(dbenv);
01212                 __os_sleep(dbenv, 1, 0);
01213 #if defined(DIAGNOSTIC) || defined(CONFIG_TEST)
01214                 if (++wait_cnt % 60 == 0)
01215                         __db_err(dbenv,
01216         "Waiting for txn_cnt to run replication recovery/backup for %d minutes",
01217                         wait_cnt / 60);
01218 #endif
01219                 REP_SYSTEM_LOCK(dbenv);
01220         }
01221 
01222         /*
01223          * Phase 2: set in_recovery and wait for handle count to go
01224          * to 0 and for the number of threads in __rep_process_message
01225          * to go to 1 (us).
01226          */
01227         rep->in_recovery = 1;
01228         for (wait_cnt = 0; rep->handle_cnt != 0 || rep->msg_th > msg_th;) {
01229                 REP_SYSTEM_UNLOCK(dbenv);
01230                 __os_sleep(dbenv, 1, 0);
01231 #ifdef DIAGNOSTIC
01232                 if (++wait_cnt % 60 == 0)
01233                         __db_err(dbenv,
01234 "Waiting for handle count to run replication recovery/backup for %d minutes",
01235                         wait_cnt / 60);
01236 #endif
01237                 REP_SYSTEM_LOCK(dbenv);
01238         }
01239 
01240         return (0);
01241 }
01242 
01243 /*
01244  * __rep_send_throttle -
01245  *      Send a record, throttling if necessary.  Callers of this function
01246  * will throttle - breaking out of their loop, if the repth->type field
01247  * changes from the normal message type to the *_MORE message type.
01248  * This function will send the normal type unless throttling gets invoked.
01249  * Then it sets the type field and sends the _MORE message.
01250  *
01251  * PUBLIC: int __rep_send_throttle __P((DB_ENV *, int, REP_THROTTLE *,
01252  * PUBLIC:    u_int32_t));
01253  */
01254 int
01255 __rep_send_throttle(dbenv, eid, repth, flags)
01256         DB_ENV *dbenv;
01257         int eid;
01258         REP_THROTTLE *repth;
01259         u_int32_t flags;
01260 {
01261         DB_REP *db_rep;
01262         REP *rep;
01263         u_int32_t size, typemore;
01264         int check_limit;
01265 
01266         check_limit = repth->gbytes != 0 || repth->bytes != 0;
01267         /*
01268          * If we only want to do throttle processing and we don't have it
01269          * turned on, return immediately.
01270          */
01271         if (!check_limit && LF_ISSET(REP_THROTTLE_ONLY))
01272                 return (0);
01273 
01274         db_rep = dbenv->rep_handle;
01275         rep = db_rep->region;
01276         typemore = 0;
01277         if (repth->type == REP_LOG)
01278                 typemore = REP_LOG_MORE;
01279         if (repth->type == REP_PAGE)
01280                 typemore = REP_PAGE_MORE;
01281         DB_ASSERT(typemore != 0);
01282 
01283         /*
01284          * data_dbt.size is only the size of the log
01285          * record;  it doesn't count the size of the
01286          * control structure. Factor that in as well
01287          * so we're not off by a lot if our log records
01288          * are small.
01289          */
01290         size = repth->data_dbt->size + sizeof(REP_CONTROL);
01291         if (check_limit) {
01292                 if (repth->lsn.offset == 28) {
01293                         repth->type = typemore;
01294                         goto send;
01295                 }
01296                 while (repth->bytes <= size) {
01297                         if (repth->gbytes > 0) {
01298                                 repth->bytes += GIGABYTE;
01299                                 --(repth->gbytes);
01300                                 continue;
01301                         }
01302                         /*
01303                          * We don't hold the rep mutex,
01304                          * and may miscount.
01305                          */
01306                         rep->stat.st_nthrottles++;
01307                         repth->type = typemore;
01308                         goto send;
01309                 }
01310                 repth->bytes -= size;
01311         }
01312         /*
01313          * Always send if it is typemore, otherwise send only if
01314          * REP_THROTTLE_ONLY is not set.
01315          */
01316 send:   if ((repth->type == typemore || !LF_ISSET(REP_THROTTLE_ONLY)) &&
01317             (__rep_send_message(dbenv, eid, repth->type,
01318             &repth->lsn, repth->data_dbt, DB_LOG_RESEND, 0) != 0))
01319                 return (1);
01320         return (0);
01321 }
01322 
01323 #ifdef DIAGNOSTIC
01324 /*
01325  * PUBLIC: void __rep_print_message __P((DB_ENV *, int, REP_CONTROL *, char *));
01326  */
01327 void
01328 __rep_print_message(dbenv, eid, rp, str)
01329         DB_ENV *dbenv;
01330         int eid;
01331         REP_CONTROL *rp;
01332         char *str;
01333 {
01334         DB_MSGBUF mb;
01335         char *type;
01336 
01337         switch (rp->rectype) {
01338         case REP_ALIVE:
01339                 type = "alive";
01340                 break;
01341         case REP_ALIVE_REQ:
01342                 type = "alive_req";
01343                 break;
01344         case REP_ALL_REQ:
01345                 type = "all_req";
01346                 break;
01347         case REP_BULK_LOG:
01348                 type = "bulk_log";
01349                 break;
01350         case REP_BULK_PAGE:
01351                 type = "bulk_page";
01352                 break;
01353         case REP_DUPMASTER:
01354                 type = "dupmaster";
01355                 break;
01356         case REP_FILE:
01357                 type = "file";
01358                 break;
01359         case REP_FILE_FAIL:
01360                 type = "file_fail";
01361                 break;
01362         case REP_FILE_REQ:
01363                 type = "file_req";
01364                 break;
01365         case REP_LOG:
01366                 type = "log";
01367                 break;
01368         case REP_LOG_MORE:
01369                 type = "log_more";
01370                 break;
01371         case REP_LOG_REQ:
01372                 type = "log_req";
01373                 break;
01374         case REP_MASTER_REQ:
01375                 type = "master_req";
01376                 break;
01377         case REP_NEWCLIENT:
01378                 type = "newclient";
01379                 break;
01380         case REP_NEWFILE:
01381                 type = "newfile";
01382                 break;
01383         case REP_NEWMASTER:
01384                 type = "newmaster";
01385                 break;
01386         case REP_NEWSITE:
01387                 type = "newsite";
01388                 break;
01389         case REP_PAGE:
01390                 type = "page";
01391                 break;
01392         case REP_PAGE_FAIL:
01393                 type = "page_fail";
01394                 break;
01395         case REP_PAGE_MORE:
01396                 type = "page_more";
01397                 break;
01398         case REP_PAGE_REQ:
01399                 type = "page_req";
01400                 break;
01401         case REP_REREQUEST:
01402                 type = "rerequest";
01403                 break;
01404         case REP_UPDATE:
01405                 type = "update";
01406                 break;
01407         case REP_UPDATE_REQ:
01408                 type = "update_req";
01409                 break;
01410         case REP_VERIFY:
01411                 type = "verify";
01412                 break;
01413         case REP_VERIFY_FAIL:
01414                 type = "verify_fail";
01415                 break;
01416         case REP_VERIFY_REQ:
01417                 type = "verify_req";
01418                 break;
01419         case REP_VOTE1:
01420                 type = "vote1";
01421                 break;
01422         case REP_VOTE2:
01423                 type = "vote2";
01424                 break;
01425         default:
01426                 type = "NOTYPE";
01427                 break;
01428         }
01429         RPRINT(dbenv, ((REP *)((DB_REP *)(dbenv)->rep_handle)->region),
01430             (dbenv, &mb, "%s %s: gen = %lu eid %d, type %s, LSN [%lu][%lu]",
01431             dbenv->db_home, str, (u_long)rp->gen,
01432             eid, type, (u_long)rp->lsn.file, (u_long)rp->lsn.offset));
01433 }
01434 #endif

Generated on Sun Dec 25 12:14:45 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2