Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

rep_record.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 2001-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  *
00007  * $Id: rep_record.c,v 12.25 2005/10/20 18:57:13 bostic Exp $
00008  */
00009 
00010 #include "db_config.h"
00011 
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #if TIME_WITH_SYS_TIME
00014 #include <sys/time.h>
00015 #include <time.h>
00016 #else
00017 #if HAVE_SYS_TIME_H
00018 #include <sys/time.h>
00019 #else
00020 #include <time.h>
00021 #endif
00022 #endif
00023 
00024 #include <stdlib.h>
00025 #include <string.h>
00026 #endif
00027 
00028 #include "db_int.h"
00029 #include "dbinc/db_page.h"
00030 #include "dbinc/db_shash.h"
00031 #include "dbinc/db_am.h"
00032 #include "dbinc/lock.h"
00033 #include "dbinc/log.h"
00034 #include "dbinc/mp.h"
00035 #include "dbinc/txn.h"
00036 
00037 static int __rep_collect_txn __P((DB_ENV *, DB_LSN *, LSN_COLLECTION *));
00038 static int __rep_do_ckp __P((DB_ENV *, DBT *, REP_CONTROL *));
00039 static int __rep_getnext __P((DB_ENV *));
00040 static int __rep_lsn_cmp __P((const void *, const void *));
00041 static int __rep_newfile __P((DB_ENV *, REP_CONTROL *, DB_LSN *));
00042 static int __rep_process_rec __P((DB_ENV *,
00043     REP_CONTROL *, DBT *, u_int32_t *, DB_LSN *));
00044 static int __rep_remfirst __P((DB_ENV *, DBT *, DBT *));
00045 static int __rep_resend_req __P((DB_ENV *, int));
00046 static int __rep_skip_msg __P((DB_ENV *, REP *, int, u_int32_t));
00047 
00048 /* Used to consistently designate which messages ought to be received where. */
00049 
00050 #define MASTER_ONLY(rep, rp) do {                                       \
00051         if (!F_ISSET(rep, REP_F_MASTER)) {                              \
00052                 RPRINT(dbenv, rep,                                      \
00053                 (dbenv, &mb, "Master record received on client"));      \
00054                 REP_PRINT_MESSAGE(dbenv,                                \
00055                     *eidp, rp, "rep_process_message");                  \
00056                 ret = EINVAL;                                           \
00057                 goto errlock;                                           \
00058         }                                                               \
00059 } while (0)
00060 
00061 #define CLIENT_ONLY(rep, rp) do {                                       \
00062         if (!F_ISSET(rep, REP_F_CLIENT)) {                              \
00063                 RPRINT(dbenv, rep,                                      \
00064                     (dbenv, &mb, "Client record received on master"));  \
00065                 REP_PRINT_MESSAGE(dbenv,                                \
00066                     *eidp, rp, "rep_process_message");                  \
00067                 (void)__rep_send_message(dbenv,                         \
00068                     DB_EID_BROADCAST, REP_DUPMASTER, NULL, NULL, 0, 0); \
00069                 ret = DB_REP_DUPMASTER;                                 \
00070                 goto errlock;                                           \
00071         }                                                               \
00072 } while (0)
00073 
00074 /*
00075  * If a client is attempting to service a request it does not have,
00076  * call rep_skip_msg to skip this message and force a rerequest to the
00077  * sender.  We don't hold the mutex for the stats and may miscount.
00078  */
00079 #define CLIENT_REREQ do {                                               \
00080         if (F_ISSET(rep, REP_F_CLIENT)) {                               \
00081                 rep->stat.st_client_svc_req++;                          \
00082                 if (ret == DB_NOTFOUND) {                               \
00083                         rep->stat.st_client_svc_miss++;                 \
00084                         ret = __rep_skip_msg(dbenv, rep, *eidp, rp->rectype);\
00085                 }                                                       \
00086         }                                                               \
00087 } while (0)
00088 
00089 #define MASTER_UPDATE(dbenv, renv) do {                                 \
00090         REP_SYSTEM_LOCK(dbenv);                                         \
00091         F_SET((renv), DB_REGENV_REPLOCKED);                             \
00092         (void)time(&(renv)->op_timestamp);                              \
00093         REP_SYSTEM_UNLOCK(dbenv);                                       \
00094 } while (0)
00095 
00096 #define RECOVERING_SKIP do {                                            \
00097         if (recovering) {                                               \
00098                 /* Not holding region mutex, may miscount */            \
00099                 rep->stat.st_msgs_recover++;                            \
00100                 ret = __rep_skip_msg(dbenv, rep, *eidp, rp->rectype);   \
00101                 goto errlock;                                           \
00102         }                                                               \
00103 } while (0)
00104 
00105 /*
00106  * If we're recovering the log we only want log records that are in the
00107  * range we need to recover.  Otherwise we can end up storing a huge
00108  * number of "new" records, only to truncate the temp database later after
00109  * we run recovery.  If we are actively delaying a sync-up, we also skip
00110  * all incoming log records until the application requests sync-up.
00111  */
00112 #define RECOVERING_LOG_SKIP do {                                        \
00113         if (F_ISSET(rep, REP_F_DELAY) ||                                \
00114             (recovering &&                                              \
00115             (!F_ISSET(rep, REP_F_RECOVER_LOG) ||                        \
00116              log_compare(&rp->lsn, &rep->last_lsn) > 0))) {             \
00117                 /* Not holding region mutex, may miscount */            \
00118                 rep->stat.st_msgs_recover++;                            \
00119                 ret = __rep_skip_msg(dbenv, rep, *eidp, rp->rectype);   \
00120                 goto errlock;                                           \
00121         }                                                               \
00122 } while (0)
00123 
00124 #define ANYSITE(rep)
00125 
00126 /*
00127  * __rep_process_message --
00128  *
00129  * This routine takes an incoming message and processes it.
00130  *
00131  * control: contains the control fields from the record
00132  * rec: contains the actual record
00133  * eidp: contains the machine id of the sender of the message;
00134  *      in the case of a DB_NEWMASTER message, returns the eid
00135  *      of the new master.
00136  * ret_lsnp: On DB_REP_ISPERM and DB_REP_NOTPERM returns, contains the
00137  *      lsn of the maximum permanent or current not permanent log record
00138  *      (respectively).
00139  *
00140  * PUBLIC: int __rep_process_message __P((DB_ENV *, DBT *, DBT *, int *,
00141  * PUBLIC:     DB_LSN *));
00142  */
00143 int
00144 __rep_process_message(dbenv, control, rec, eidp, ret_lsnp)
00145         DB_ENV *dbenv;
00146         DBT *control, *rec;
00147         int *eidp;
00148         DB_LSN *ret_lsnp;
00149 {
00150         DB_LOG *dblp;
00151         DB_LSN lsn;
00152         DB_REP *db_rep;
00153         DBT data_dbt;
00154         LOG *lp;
00155         REGENV *renv;
00156         REGINFO *infop;
00157         REP *rep;
00158         REP_CONTROL *rp;
00159         u_int32_t egen, gen;
00160         int cmp, recovering, ret;
00161         time_t savetime;
00162 #ifdef DIAGNOSTIC
00163         DB_MSGBUF mb;
00164 #endif
00165 
00166         PANIC_CHECK(dbenv);
00167         ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_process_message",
00168             DB_INIT_REP);
00169 
00170         /* Control argument must be non-Null. */
00171         if (control == NULL || control->size == 0) {
00172                 __db_err(dbenv,
00173         "DB_ENV->rep_process_message: control argument must be specified");
00174                 return (EINVAL);
00175         }
00176 
00177         if (!IS_REP_MASTER(dbenv) && !IS_REP_CLIENT(dbenv)) {
00178                 __db_err(dbenv,
00179         "Environment not configured as replication master or client");
00180                 return (EINVAL);
00181         }
00182 
00183         ret = 0;
00184         db_rep = dbenv->rep_handle;
00185         rep = db_rep->region;
00186         dblp = dbenv->lg_handle;
00187         lp = dblp->reginfo.primary;
00188         infop = dbenv->reginfo;
00189         renv = infop->primary;
00190         rp = (REP_CONTROL *)control->data;
00191         if (ret_lsnp != NULL)
00192                 ZERO_LSN(*ret_lsnp);
00193 
00194         /*
00195          * Acquire the replication lock.
00196          */
00197         REP_SYSTEM_LOCK(dbenv);
00198         if (rep->start_th != 0) {
00199                 /*
00200                  * If we're racing with a thread in rep_start, then
00201                  * just ignore the message and return.
00202                  */
00203                 RPRINT(dbenv, rep, (dbenv, &mb,
00204                     "Racing rep_start, ignore message."));
00205                 if (F_ISSET(rp, DB_LOG_PERM))
00206                         ret = DB_REP_IGNORE;
00207                 REP_SYSTEM_UNLOCK(dbenv);
00208                 goto out;
00209         }
00210         rep->msg_th++;
00211         gen = rep->gen;
00212         recovering = rep->in_recovery || F_ISSET(rep, REP_F_RECOVER_MASK);
00213         savetime = renv->rep_timestamp;
00214 
00215         rep->stat.st_msgs_processed++;
00216         REP_SYSTEM_UNLOCK(dbenv);
00217 
00218         REP_PRINT_MESSAGE(dbenv, *eidp, rp, "rep_process_message");
00219 
00220         /* Complain if we see an improper version number. */
00221         if (rp->rep_version != DB_REPVERSION) {
00222                 __db_err(dbenv,
00223                     "unexpected replication message version %lu, expected %d",
00224                     (u_long)rp->rep_version, DB_REPVERSION);
00225                 ret = EINVAL;
00226                 goto errlock;
00227         }
00228         if (rp->log_version != DB_LOGVERSION) {
00229                 __db_err(dbenv,
00230                     "unexpected log record version %lu, expected %d",
00231                     (u_long)rp->log_version, DB_LOGVERSION);
00232                 ret = EINVAL;
00233                 goto errlock;
00234         }
00235 
00236         /*
00237          * Check for generation number matching.  Ignore any old messages
00238          * except requests that are indicative of a new client that needs
00239          * to get in sync.
00240          */
00241         if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ &&
00242             rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ &&
00243             rp->rectype != REP_DUPMASTER) {
00244                 /*
00245                  * We don't hold the rep mutex, and could miscount if we race.
00246                  */
00247                 rep->stat.st_msgs_badgen++;
00248                 if (F_ISSET(rp, DB_LOG_PERM))
00249                         ret = DB_REP_IGNORE;
00250                 goto errlock;
00251         }
00252 
00253         if (rp->gen > gen) {
00254                 /*
00255                  * If I am a master and am out of date with a lower generation
00256                  * number, I am in bad shape and should downgrade.
00257                  */
00258                 if (F_ISSET(rep, REP_F_MASTER)) {
00259                         rep->stat.st_dupmasters++;
00260                         ret = DB_REP_DUPMASTER;
00261                         if (rp->rectype != REP_DUPMASTER)
00262                                 (void)__rep_send_message(dbenv,
00263                                     DB_EID_BROADCAST, REP_DUPMASTER,
00264                                     NULL, NULL, 0, 0);
00265                         goto errlock;
00266                 }
00267 
00268                 /*
00269                  * I am a client and am out of date.  If this is an election,
00270                  * or a response from the first site I contacted, then I can
00271                  * accept the generation number and participate in future
00272                  * elections and communication. Otherwise, I need to hear about
00273                  * a new master and sync up.
00274                  */
00275                 if (rp->rectype == REP_ALIVE ||
00276                     rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) {
00277                         REP_SYSTEM_LOCK(dbenv);
00278                         RPRINT(dbenv, rep, (dbenv, &mb,
00279                             "Updating gen from %lu to %lu",
00280                             (u_long)gen, (u_long)rp->gen));
00281                         rep->master_id = DB_EID_INVALID;
00282                         gen = rep->gen = rp->gen;
00283                         /*
00284                          * Updating of egen will happen when we process the
00285                          * message below for each message type.
00286                          */
00287                         REP_SYSTEM_UNLOCK(dbenv);
00288                         if (rp->rectype == REP_ALIVE)
00289                                 (void)__rep_send_message(dbenv,
00290                                     DB_EID_BROADCAST, REP_MASTER_REQ, NULL,
00291                                     NULL, 0, 0);
00292                 } else if (rp->rectype != REP_NEWMASTER) {
00293                         /*
00294                          * Ignore this message, retransmit if needed.
00295                          */
00296                         if (__rep_check_doreq(dbenv, rep))
00297                                 (void)__rep_send_message(dbenv,
00298                                     DB_EID_BROADCAST, REP_MASTER_REQ,
00299                                     NULL, NULL, 0, 0);
00300                         goto errlock;
00301                 }
00302                 /*
00303                  * If you get here, then you're a client and either you're
00304                  * in an election or you have a NEWMASTER or an ALIVE message
00305                  * whose processing will do the right thing below.
00306                  */
00307         }
00308 
00309         /*
00310          * We need to check if we're in recovery and if we are
00311          * then we need to ignore any messages except VERIFY*, VOTE*,
00312          * NEW* and ALIVE_REQ, or backup related messages: UPDATE*,
00313          * PAGE* and FILE*.  We need to also accept LOG messages
00314          * if we're copying the log for recovery/backup.
00315          */
00316         switch (rp->rectype) {
00317         case REP_ALIVE:
00318                 /*
00319                  * Handle even if we're recovering.
00320                  */
00321                 ANYSITE(rep);
00322                 egen = *(u_int32_t *)rec->data;
00323                 REP_SYSTEM_LOCK(dbenv);
00324                 RPRINT(dbenv, rep, (dbenv, &mb,
00325                     "Received ALIVE egen of %lu, mine %lu",
00326                     (u_long)egen, (u_long)rep->egen));
00327                 if (egen > rep->egen) {
00328                         /*
00329                          * We're changing egen, need to clear out any old
00330                          * election information.
00331                          */
00332                         __rep_elect_done(dbenv, rep);
00333                         rep->egen = egen;
00334                 }
00335                 REP_SYSTEM_UNLOCK(dbenv);
00336                 break;
00337         case REP_ALIVE_REQ:
00338                 /*
00339                  * Handle even if we're recovering.
00340                  */
00341                 ANYSITE(rep);
00342                 dblp = dbenv->lg_handle;
00343                 LOG_SYSTEM_LOCK(dbenv);
00344                 lsn = ((LOG *)dblp->reginfo.primary)->lsn;
00345                 LOG_SYSTEM_UNLOCK(dbenv);
00346                 REP_SYSTEM_LOCK(dbenv);
00347                 egen = rep->egen;
00348                 REP_SYSTEM_UNLOCK(dbenv);
00349                 data_dbt.data = &egen;
00350                 data_dbt.size = sizeof(egen);
00351                 (void)__rep_send_message(dbenv,
00352                     *eidp, REP_ALIVE, &lsn, &data_dbt, 0, 0);
00353                 break;
00354         case REP_ALL_REQ:
00355                 RECOVERING_SKIP;
00356                 ret = __rep_allreq(dbenv, rp, *eidp);
00357                 CLIENT_REREQ;
00358                 break;
00359         case REP_BULK_LOG:
00360                 RECOVERING_LOG_SKIP;
00361                 CLIENT_ONLY(rep, rp);
00362                 ret = __rep_bulk_log(dbenv, rp, rec, savetime, ret_lsnp);
00363                 break;
00364         case REP_BULK_PAGE:
00365                 /*
00366                  * Handle even if we're recovering.
00367                  */
00368                 CLIENT_ONLY(rep, rp);
00369                 ret = __rep_bulk_page(dbenv, *eidp, rp, rec);
00370                 break;
00371         case REP_DUPMASTER:
00372                 /*
00373                  * Handle even if we're recovering.
00374                  */
00375                 if (F_ISSET(rep, REP_F_MASTER))
00376                         ret = DB_REP_DUPMASTER;
00377                 break;
00378 #ifdef NOTYET
00379         case REP_FILE: /* TODO */
00380                 CLIENT_ONLY(rep, rp);
00381                 break;
00382         case REP_FILE_REQ:
00383                 ret = __rep_send_file(dbenv, rec, *eidp);
00384                 break;
00385 #endif
00386         case REP_FILE_FAIL:
00387                 /*
00388                  * Handle even if we're recovering.
00389                  */
00390                 CLIENT_ONLY(rep, rp);
00391                 /*
00392                  * XXX
00393                  */
00394                 break;
00395         case REP_LOG:
00396         case REP_LOG_MORE:
00397                 RECOVERING_LOG_SKIP;
00398                 CLIENT_ONLY(rep, rp);
00399                 ret = __rep_log(dbenv, rp, rec, savetime, ret_lsnp);
00400                 break;
00401         case REP_LOG_REQ:
00402                 RECOVERING_SKIP;
00403                 ret = __rep_logreq(dbenv, rp, rec, *eidp);
00404                 CLIENT_REREQ;
00405                 break;
00406         case REP_NEWSITE:
00407                 /*
00408                  * Handle even if we're recovering.
00409                  */
00410                 /* We don't hold the rep mutex, and may miscount. */
00411                 rep->stat.st_newsites++;
00412 
00413                 /* This is a rebroadcast; simply tell the application. */
00414                 if (F_ISSET(rep, REP_F_MASTER)) {
00415                         dblp = dbenv->lg_handle;
00416                         lp = dblp->reginfo.primary;
00417                         LOG_SYSTEM_LOCK(dbenv);
00418                         lsn = lp->lsn;
00419                         LOG_SYSTEM_UNLOCK(dbenv);
00420                         (void)__rep_send_message(dbenv,
00421                             *eidp, REP_NEWMASTER, &lsn, NULL, 0, 0);
00422                 }
00423                 ret = DB_REP_NEWSITE;
00424                 break;
00425         case REP_NEWCLIENT:
00426                 /*
00427                  * Handle even if we're recovering.
00428                  */
00429                 /*
00430                  * This message was received and should have resulted in the
00431                  * application entering the machine ID in its machine table.
00432                  * We respond to this with an ALIVE to send relevant information
00433                  * to the new client (if we are a master, we'll send a
00434                  * NEWMASTER, so we only need to send the ALIVE if we're a
00435                  * client).  But first, broadcast the new client's record to
00436                  * all the clients.
00437                  */
00438                 (void)__rep_send_message(dbenv,
00439                     DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0, 0);
00440 
00441                 ret = DB_REP_NEWSITE;
00442 
00443                 if (F_ISSET(rep, REP_F_CLIENT)) {
00444                         REP_SYSTEM_LOCK(dbenv);
00445                         egen = rep->egen;
00446                         if (*eidp == rep->master_id)
00447                                 rep->master_id = DB_EID_INVALID;
00448                         REP_SYSTEM_UNLOCK(dbenv);
00449                         data_dbt.data = &egen;
00450                         data_dbt.size = sizeof(egen);
00451                         (void)__rep_send_message(dbenv, DB_EID_BROADCAST,
00452                             REP_ALIVE, &rp->lsn, &data_dbt, 0, 0);
00453                         break;
00454                 }
00455                 /* FALLTHROUGH */
00456         case REP_MASTER_REQ:
00457                 RECOVERING_SKIP;
00458                 if (F_ISSET(rep, REP_F_MASTER)) {
00459                         LOG_SYSTEM_LOCK(dbenv);
00460                         lsn = lp->lsn;
00461                         LOG_SYSTEM_UNLOCK(dbenv);
00462                         (void)__rep_send_message(dbenv,
00463                             DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
00464                 }
00465                 /*
00466                  * If there is no master, then we could get into a state
00467                  * where an old client lost the initial ALIVE message and
00468                  * is calling an election under an old gen and can
00469                  * never get to the current gen.
00470                  */
00471                 if (F_ISSET(rep, REP_F_CLIENT) && rp->gen < gen) {
00472                         REP_SYSTEM_LOCK(dbenv);
00473                         egen = rep->egen;
00474                         if (*eidp == rep->master_id)
00475                                 rep->master_id = DB_EID_INVALID;
00476                         REP_SYSTEM_UNLOCK(dbenv);
00477                         data_dbt.data = &egen;
00478                         data_dbt.size = sizeof(egen);
00479                         (void)__rep_send_message(dbenv, *eidp,
00480                             REP_ALIVE, &rp->lsn, &data_dbt, 0, 0);
00481                 }
00482                 break;
00483         case REP_NEWFILE:
00484                 RECOVERING_LOG_SKIP;
00485                 CLIENT_ONLY(rep, rp);
00486                 ret = __rep_apply(dbenv, rp, rec, ret_lsnp, NULL);
00487                 break;
00488         case REP_NEWMASTER:
00489                 /*
00490                  * Handle even if we're recovering.
00491                  */
00492                 ANYSITE(rep);
00493                 if (F_ISSET(rep, REP_F_MASTER) &&
00494                     *eidp != dbenv->rep_eid) {
00495                         /* We don't hold the rep mutex, and may miscount. */
00496                         rep->stat.st_dupmasters++;
00497                         ret = DB_REP_DUPMASTER;
00498                         (void)__rep_send_message(dbenv,
00499                             DB_EID_BROADCAST, REP_DUPMASTER, NULL, NULL, 0, 0);
00500                         break;
00501                 }
00502                 ret = __rep_new_master(dbenv, rp, *eidp);
00503                 break;
00504         case REP_PAGE:
00505         case REP_PAGE_MORE:
00506                 /*
00507                  * Handle even if we're recovering.
00508                  */
00509                 CLIENT_ONLY(rep, rp);
00510                 ret = __rep_page(dbenv, *eidp, rp, rec);
00511                 break;
00512         case REP_PAGE_FAIL:
00513                 /*
00514                  * Handle even if we're recovering.
00515                  */
00516                 CLIENT_ONLY(rep, rp);
00517                 ret = __rep_page_fail(dbenv, *eidp, rec);
00518                 break;
00519         case REP_PAGE_REQ:
00520                 /*
00521                  * Handle even if we're recovering.
00522                  */
00523                 MASTER_UPDATE(dbenv, renv);
00524                 ret = __rep_page_req(dbenv, *eidp, rec);
00525                 CLIENT_REREQ;
00526                 break;
00527         case REP_REREQUEST:
00528                 /*
00529                  * Handle even if we're recovering.  Don't do a master
00530                  * check.
00531                  */
00532                 CLIENT_ONLY(rep, rp);
00533                 /*
00534                  * Don't hold any mutex, may miscount.
00535                  */
00536                 rep->stat.st_client_rerequests++;
00537                 ret = __rep_resend_req(dbenv, 1);
00538                 break;
00539         case REP_UPDATE:
00540                 /*
00541                  * Handle even if we're recovering.
00542                  */
00543                 CLIENT_ONLY(rep, rp);
00544                 ret = __rep_update_setup(dbenv, *eidp, rp, rec);
00545                 break;
00546         case REP_UPDATE_REQ:
00547                 /*
00548                  * Handle even if we're recovering.
00549                  */
00550                 MASTER_ONLY(rep, rp);
00551                 infop = dbenv->reginfo;
00552                 renv = infop->primary;
00553                 MASTER_UPDATE(dbenv, renv);
00554                 ret = __rep_update_req(dbenv, *eidp);
00555                 break;
00556         case REP_VERIFY:
00557                 if (recovering) {
00558                         MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00559                         cmp = log_compare(&lp->verify_lsn, &rp->lsn);
00560                         MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00561                         /*
00562                          * If this is not the verify record I want, skip it.
00563                          */
00564                         if (cmp != 0) {
00565                                 ret = __rep_skip_msg(
00566                                     dbenv, rep, *eidp, rp->rectype);
00567                                 break;
00568                         }
00569                 }
00570                 CLIENT_ONLY(rep, rp);
00571                 ret = __rep_verify(dbenv, rp, rec, *eidp, savetime);
00572                 break;
00573         case REP_VERIFY_FAIL:
00574                 /*
00575                  * Handle even if we're recovering.
00576                  */
00577                 CLIENT_ONLY(rep, rp);
00578                 ret = __rep_verify_fail(dbenv, rp, *eidp);
00579                 break;
00580         case REP_VERIFY_REQ:
00581                 RECOVERING_SKIP;
00582                 ret = __rep_verify_req(dbenv, rp, *eidp);
00583                 CLIENT_REREQ;
00584                 break;
00585         case REP_VOTE1:
00586                 /*
00587                  * Handle even if we're recovering.
00588                  */
00589                 ret = __rep_vote1(dbenv, rp, rec, *eidp);
00590                 break;
00591         case REP_VOTE2:
00592                 /*
00593                  * Handle even if we're recovering.
00594                  */
00595                 ret = __rep_vote2(dbenv, rec, eidp);
00596                 break;
00597         default:
00598                 __db_err(dbenv,
00599         "DB_ENV->rep_process_message: unknown replication message: type %lu",
00600                    (u_long)rp->rectype);
00601                 ret = EINVAL;
00602                 break;
00603         }
00604 
00605 errlock:
00606         REP_SYSTEM_LOCK(dbenv);
00607         rep->msg_th--;
00608         REP_SYSTEM_UNLOCK(dbenv);
00609 out:
00610         if (ret == 0 && F_ISSET(rp, DB_LOG_PERM)) {
00611                 if (ret_lsnp != NULL)
00612                         *ret_lsnp = rp->lsn;
00613                 ret = DB_REP_NOTPERM;
00614         }
00615         return (ret);
00616 }
00617 
00618 /*
00619  * __rep_apply --
00620  *
00621  * Handle incoming log records on a client, applying when possible and
00622  * entering into the bookkeeping table otherwise.  This routine manages
00623  * the state of the incoming message stream -- processing records, via
00624  * __rep_process_rec, when possible and enqueuing in the __db.rep.db
00625  * when necessary.  As gaps in the stream are filled in, this is where
00626  * we try to process as much as possible from __db.rep.db to catch up.
00627  *
00628  * PUBLIC: int __rep_apply __P((DB_ENV *, REP_CONTROL *,
00629  * PUBLIC:     DBT *, DB_LSN *, int *));
00630  */
00631 int
00632 __rep_apply(dbenv, rp, rec, ret_lsnp, is_dupp)
00633         DB_ENV *dbenv;
00634         REP_CONTROL *rp;
00635         DBT *rec;
00636         DB_LSN *ret_lsnp;
00637         int *is_dupp;
00638 {
00639         DB_REP *db_rep;
00640         DBT control_dbt, key_dbt;
00641         DBT rec_dbt;
00642         DB *dbp;
00643         DB_LOG *dblp;
00644         DB_LSN max_lsn;
00645         LOG *lp;
00646         REP *rep;
00647         u_int32_t rectype;
00648         int cmp, ret;
00649 #ifdef DIAGNOSTIC
00650         DB_MSGBUF mb;
00651 #endif
00652 
00653         db_rep = dbenv->rep_handle;
00654         rep = db_rep->region;
00655         dbp = db_rep->rep_db;
00656         rectype = 0;
00657         ret = 0;
00658         memset(&control_dbt, 0, sizeof(control_dbt));
00659         memset(&rec_dbt, 0, sizeof(rec_dbt));
00660         ZERO_LSN(max_lsn);
00661 
00662         dblp = dbenv->lg_handle;
00663         MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00664         lp = dblp->reginfo.primary;
00665         REP_SYSTEM_LOCK(dbenv);
00666         if (F_ISSET(rep, REP_F_RECOVER_LOG) &&
00667             log_compare(&lp->ready_lsn, &rep->first_lsn) < 0)
00668                 lp->ready_lsn = rep->first_lsn;
00669         REP_SYSTEM_UNLOCK(dbenv);
00670         cmp = log_compare(&rp->lsn, &lp->ready_lsn);
00671 
00672         if (cmp == 0) {
00673                 if ((ret =
00674                     __rep_process_rec(dbenv, rp, rec, &rectype, &max_lsn)) != 0)
00675                         goto err;
00676                 /*
00677                  * If we get the record we are expecting, reset
00678                  * the count of records we've received and are applying
00679                  * towards the request interval.
00680                  */
00681                 lp->rcvd_recs = 0;
00682                 ZERO_LSN(lp->max_wait_lsn);
00683 
00684                 while (ret == 0 &&
00685                     log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0) {
00686                         /*
00687                          * We just filled in a gap in the log record stream.
00688                          * Write subsequent records to the log.
00689                          */
00690 gap_check:
00691                         if ((ret =
00692                             __rep_remfirst(dbenv, &control_dbt, &rec_dbt)) != 0)
00693                                 goto err;
00694 
00695                         rp = (REP_CONTROL *)control_dbt.data;
00696                         rec = &rec_dbt;
00697                         if ((ret = __rep_process_rec(dbenv,
00698                             rp, rec, &rectype, &max_lsn)) != 0)
00699                                 goto err;
00700 
00701                         /*
00702                          * We may miscount, as we don't hold the rep mutex.
00703                          */
00704                         --rep->stat.st_log_queued;
00705 
00706                         /*
00707                          * Since we just filled a gap in the log stream, and
00708                          * we're writing subsequent records to the log, we want
00709                          * to use rcvd_recs and wait_recs so that we will
00710                          * request the next gap if we end up with a gap and
00711                          * a lot of records still in the temp db, but not
00712                          * request if it is near the end of the temp db and
00713                          * likely to arrive on its own shortly.  We want to
00714                          * avoid requesting the record in that case.  Also
00715                          * reset max_wait_lsn because the next gap is a
00716                          * fresh gap.
00717                          */
00718                         lp->rcvd_recs = rep->stat.st_log_queued;
00719                         lp->wait_recs = rep->request_gap;
00720 
00721                         if ((ret = __rep_getnext(dbenv)) == DB_NOTFOUND) {
00722                                 lp->rcvd_recs = 0;
00723                                 ret = 0;
00724                                 break;
00725                         } else if (ret != 0)
00726                                 goto err;
00727                 }
00728 
00729                 /*
00730                  * Check if we're at a gap in the table and if so, whether we
00731                  * need to ask for any records.
00732                  */
00733                 if (!IS_ZERO_LSN(lp->waiting_lsn) &&
00734                     log_compare(&lp->ready_lsn, &lp->waiting_lsn) != 0) {
00735                         /*
00736                          * We got a record and processed it, but we may
00737                          * still be waiting for more records.  If we
00738                          * filled a gap we keep a count of how many other
00739                          * records are in the temp database and if we should
00740                          * request the next gap at this time.
00741                          */
00742                         if (__rep_check_doreq(dbenv, rep) && (ret =
00743                             __rep_loggap_req(dbenv, rep, &rp->lsn, 0)) != 0)
00744                                 goto err;
00745                 } else {
00746                         lp->wait_recs = 0;
00747                         ZERO_LSN(lp->max_wait_lsn);
00748                 }
00749 
00750         } else if (cmp > 0) {
00751                 /*
00752                  * The LSN is higher than the one we were waiting for.
00753                  * This record isn't in sequence; add it to the temporary
00754                  * database, update waiting_lsn if necessary, and perform
00755                  * calculations to determine if we should issue requests
00756                  * for new records.
00757                  */
00758                 memset(&key_dbt, 0, sizeof(key_dbt));
00759                 key_dbt.data = rp;
00760                 key_dbt.size = sizeof(*rp);
00761                 if (lp->wait_recs == 0) {
00762                         /*
00763                          * This is a new gap. Initialize the number of
00764                          * records that we should wait before requesting
00765                          * that it be resent.  We grab the limits out of
00766                          * the rep without the mutex.
00767                          */
00768                         lp->wait_recs = rep->request_gap;
00769                         lp->rcvd_recs = 0;
00770                         ZERO_LSN(lp->max_wait_lsn);
00771                 }
00772                 if (__rep_check_doreq(dbenv, rep) &&
00773                     (ret = __rep_loggap_req(dbenv, rep, &rp->lsn, 0) != 0))
00774                         goto err;
00775 
00776                 ret = __db_put(dbp, NULL, &key_dbt, rec, DB_NOOVERWRITE);
00777                 rep->stat.st_log_queued++;
00778                 rep->stat.st_log_queued_total++;
00779                 if (rep->stat.st_log_queued_max < rep->stat.st_log_queued)
00780                         rep->stat.st_log_queued_max = rep->stat.st_log_queued;
00781 
00782                 if (ret == DB_KEYEXIST)
00783                         ret = 0;
00784                 if (ret != 0)
00785                         goto done;
00786 
00787                 if (IS_ZERO_LSN(lp->waiting_lsn) ||
00788                     log_compare(&rp->lsn, &lp->waiting_lsn) < 0)
00789                         lp->waiting_lsn = rp->lsn;
00790 
00791                 /*
00792                  * If this is permanent; let the caller know that we have
00793                  * not yet written it to disk, but we've accepted it.
00794                  */
00795                 if (ret == 0 && F_ISSET(rp, DB_LOG_PERM)) {
00796                         max_lsn = rp->lsn;
00797                         ret = DB_REP_NOTPERM;
00798                 }
00799                 goto done;
00800         } else {
00801                 /*
00802                  * We may miscount if we race, since we
00803                  * don't currently hold the rep mutex.
00804                  */
00805                 rep->stat.st_log_duplicated++;
00806                 if (is_dupp != NULL)
00807                         *is_dupp = 1;
00808                 if (F_ISSET(rp, DB_LOG_PERM))
00809                         max_lsn = lp->max_perm_lsn;
00810                 goto done;
00811         }
00812 
00813         /* Check if we need to go back into the table. */
00814         if (ret == 0 && log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0)
00815                 goto gap_check;
00816 
00817 done:
00818 err:    /* Check if we need to go back into the table. */
00819         REP_SYSTEM_LOCK(dbenv);
00820         if (ret == 0 &&
00821             F_ISSET(rep, REP_F_RECOVER_LOG) &&
00822             log_compare(&lp->ready_lsn, &rep->last_lsn) >= 0) {
00823                 rep->last_lsn = max_lsn;
00824                 ZERO_LSN(max_lsn);
00825                 ret = DB_REP_LOGREADY;
00826         }
00827         REP_SYSTEM_UNLOCK(dbenv);
00828 
00829         if (ret == 0 && !F_ISSET(rep, REP_F_RECOVER_LOG) &&
00830             !IS_ZERO_LSN(max_lsn)) {
00831                 if (ret_lsnp != NULL)
00832                         *ret_lsnp = max_lsn;
00833                 ret = DB_REP_ISPERM;
00834                 DB_ASSERT(log_compare(&max_lsn, &lp->max_perm_lsn) >= 0);
00835                 lp->max_perm_lsn = max_lsn;
00836         }
00837         MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00838 
00839         /*
00840          * Startup is complete when we process our first live record.  However,
00841          * we want to return DB_REP_STARTUPDONE on the first record we can --
00842          * but other return values trump this one.  We know we've processed at
00843          * least one record when rectype is non-zero.
00844          */
00845         if (ret == 0 && !F_ISSET(rp, DB_LOG_RESEND) &&
00846             rectype != 0 && rep->stat.st_startup_complete == 0) {
00847                 rep->stat.st_startup_complete = 1;
00848                 ret = DB_REP_STARTUPDONE;
00849         }
00850         if (ret == 0 && rp->rectype == REP_NEWFILE && lp->db_log_autoremove)
00851                 __log_autoremove(dbenv);
00852         if (control_dbt.data != NULL)
00853                 __os_ufree(dbenv, control_dbt.data);
00854         if (rec_dbt.data != NULL)
00855                 __os_ufree(dbenv, rec_dbt.data);
00856 
00857         if (ret == DB_REP_NOTPERM && !F_ISSET(rep, REP_F_RECOVER_LOG) &&
00858             !IS_ZERO_LSN(max_lsn) && ret_lsnp != NULL)
00859                 *ret_lsnp = max_lsn;
00860 
00861 #ifdef DIAGNOSTIC
00862         if (ret == DB_REP_ISPERM)
00863                 RPRINT(dbenv, rep, (dbenv, &mb, "Returning ISPERM [%lu][%lu]",
00864                     (u_long)max_lsn.file, (u_long)max_lsn.offset));
00865         else if (ret == DB_REP_LOGREADY)
00866                 RPRINT(dbenv, rep, (dbenv, &mb,
00867                     "Returning LOGREADY up to [%lu][%lu]",
00868                     (u_long)rep->last_lsn.file,
00869                     (u_long)rep->last_lsn.offset));
00870         else if (ret == DB_REP_NOTPERM)
00871                 RPRINT(dbenv, rep, (dbenv, &mb, "Returning NOTPERM [%lu][%lu]",
00872                     (u_long)max_lsn.file, (u_long)max_lsn.offset));
00873         else if (ret == DB_REP_STARTUPDONE)
00874                 RPRINT(dbenv, rep, (dbenv, &mb,
00875                     "Returning STARTUPDONE [%lu][%lu]",
00876                     (u_long)rp->lsn.file, (u_long)rp->lsn.offset));
00877         else if (ret != 0)
00878                 RPRINT(dbenv, rep, (dbenv, &mb, "Returning %d [%lu][%lu]", ret,
00879                     (u_long)max_lsn.file, (u_long)max_lsn.offset));
00880 #endif
00881         return (ret);
00882 }
00883 
00884 /*
00885  * __rep_process_txn --
00886  *
00887  * This is the routine that actually gets a transaction ready for
00888  * processing.
00889  *
00890  * PUBLIC: int __rep_process_txn __P((DB_ENV *, DBT *));
00891  */
00892 int
00893 __rep_process_txn(dbenv, rec)
00894         DB_ENV *dbenv;
00895         DBT *rec;
00896 {
00897         DBT data_dbt, *lock_dbt;
00898         DB_LOCKREQ req, *lvp;
00899         DB_LOGC *logc;
00900         DB_LSN prev_lsn, *lsnp;
00901         DB_REP *db_rep;
00902         DB_TXNHEAD *txninfo;
00903         LSN_COLLECTION lc;
00904         REP *rep;
00905         __txn_regop_args *txn_args;
00906         __txn_xa_regop_args *prep_args;
00907         u_int32_t lockid, rectype;
00908         u_int i;
00909         int ret, t_ret;
00910 
00911         db_rep = dbenv->rep_handle;
00912         rep = db_rep->region;
00913         logc = NULL;
00914         txn_args = NULL;
00915         prep_args = NULL;
00916         txninfo = NULL;
00917 
00918         memset(&data_dbt, 0, sizeof(data_dbt));
00919         if (F_ISSET(dbenv, DB_ENV_THREAD))
00920                 F_SET(&data_dbt, DB_DBT_REALLOC);
00921 
00922         /*
00923          * There are two phases:  First, we have to traverse backwards through
00924          * the log records gathering the list of all LSNs in the transaction.
00925          * Once we have this information, we can loop through and then apply it.
00926          *
00927          * We may be passed a prepare (if we're restoring a prepare on upgrade)
00928          * instead of a commit (the common case).  Check which it is and behave
00929          * appropriately.
00930          */
00931         memcpy(&rectype, rec->data, sizeof(rectype));
00932         memset(&lc, 0, sizeof(lc));
00933         if (rectype == DB___txn_regop) {
00934                 /*
00935                  * We're the end of a transaction.  Make sure this is
00936                  * really a commit and not an abort!
00937                  */
00938                 if ((ret = __txn_regop_read(dbenv, rec->data, &txn_args)) != 0)
00939                         return (ret);
00940                 if (txn_args->opcode != TXN_COMMIT) {
00941                         __os_free(dbenv, txn_args);
00942                         return (0);
00943                 }
00944                 prev_lsn = txn_args->prev_lsn;
00945                 lock_dbt = &txn_args->locks;
00946         } else {
00947                 /* We're a prepare. */
00948                 DB_ASSERT(rectype == DB___txn_xa_regop);
00949 
00950                 if ((ret =
00951                     __txn_xa_regop_read(dbenv, rec->data, &prep_args)) != 0)
00952                         return (ret);
00953                 prev_lsn = prep_args->prev_lsn;
00954                 lock_dbt = &prep_args->locks;
00955         }
00956 
00957         /* Get locks. */
00958         if ((ret = __lock_id(dbenv, &lockid, NULL)) != 0)
00959                 goto err1;
00960 
00961         if ((ret =
00962               __lock_get_list(dbenv, lockid, 0, DB_LOCK_WRITE, lock_dbt)) != 0)
00963                 goto err;
00964 
00965         /* Phase 1.  Get a list of the LSNs in this transaction, and sort it. */
00966         if ((ret = __rep_collect_txn(dbenv, &prev_lsn, &lc)) != 0)
00967                 goto err;
00968         qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp);
00969 
00970         /*
00971          * The set of records for a transaction may include dbreg_register
00972          * records.  Create a txnlist so that they can keep track of file
00973          * state between records.
00974          */
00975         if ((ret = __db_txnlist_init(dbenv, 0, 0, NULL, &txninfo)) != 0)
00976                 goto err;
00977 
00978         /* Phase 2: Apply updates. */
00979         if ((ret = __log_cursor(dbenv, &logc)) != 0)
00980                 goto err;
00981         for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) {
00982                 if ((ret = __log_c_get(logc, lsnp, &data_dbt, DB_SET)) != 0) {
00983                         __db_err(dbenv, "failed to read the log at [%lu][%lu]",
00984                             (u_long)lsnp->file, (u_long)lsnp->offset);
00985                         goto err;
00986                 }
00987                 if ((ret = __db_dispatch(dbenv, dbenv->recover_dtab,
00988                     dbenv->recover_dtab_size, &data_dbt, lsnp,
00989                     DB_TXN_APPLY, txninfo)) != 0) {
00990                         __db_err(dbenv, "transaction failed at [%lu][%lu]",
00991                             (u_long)lsnp->file, (u_long)lsnp->offset);
00992                         goto err;
00993                 }
00994         }
00995 
00996 err:    memset(&req, 0, sizeof(req));
00997         req.op = DB_LOCK_PUT_ALL;
00998         if ((t_ret =
00999              __lock_vec(dbenv, lockid, 0, &req, 1, &lvp)) != 0 && ret == 0)
01000                 ret = t_ret;
01001 
01002         if ((t_ret = __lock_id_free(dbenv, lockid)) != 0 && ret == 0)
01003                 ret = t_ret;
01004 
01005 err1:   if (txn_args != NULL)
01006                 __os_free(dbenv, txn_args);
01007         if (prep_args != NULL)
01008                 __os_free(dbenv, prep_args);
01009         if (lc.array != NULL)
01010                 __os_free(dbenv, lc.array);
01011 
01012         if (logc != NULL && (t_ret = __log_c_close(logc)) != 0 && ret == 0)
01013                 ret = t_ret;
01014 
01015         if (txninfo != NULL)
01016                 __db_txnlist_end(dbenv, txninfo);
01017 
01018         if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL)
01019                 __os_ufree(dbenv, data_dbt.data);
01020 
01021         if (ret == 0)
01022                 /*
01023                  * We don't hold the rep mutex, and could miscount if we race.
01024                  */
01025                 rep->stat.st_txns_applied++;
01026 
01027         return (ret);
01028 }
01029 
01030 /*
01031  * __rep_collect_txn
01032  *      Recursive function that will let us visit every entry in a transaction
01033  *      chain including all child transactions so that we can then apply
01034  *      the entire transaction family at once.
01035  */
01036 static int
01037 __rep_collect_txn(dbenv, lsnp, lc)
01038         DB_ENV *dbenv;
01039         DB_LSN *lsnp;
01040         LSN_COLLECTION *lc;
01041 {
01042         __txn_child_args *argp;
01043         DB_LOGC *logc;
01044         DB_LSN c_lsn;
01045         DBT data;
01046         u_int32_t rectype;
01047         u_int nalloc;
01048         int ret, t_ret;
01049 
01050         memset(&data, 0, sizeof(data));
01051         F_SET(&data, DB_DBT_REALLOC);
01052 
01053         if ((ret = __log_cursor(dbenv, &logc)) != 0)
01054                 return (ret);
01055 
01056         while (!IS_ZERO_LSN(*lsnp) &&
01057             (ret = __log_c_get(logc, lsnp, &data, DB_SET)) == 0) {
01058                 memcpy(&rectype, data.data, sizeof(rectype));
01059                 if (rectype == DB___txn_child) {
01060                         if ((ret = __txn_child_read(dbenv,
01061                             data.data, &argp)) != 0)
01062                                 goto err;
01063                         c_lsn = argp->c_lsn;
01064                         *lsnp = argp->prev_lsn;
01065                         __os_free(dbenv, argp);
01066                         ret = __rep_collect_txn(dbenv, &c_lsn, lc);
01067                 } else {
01068                         if (lc->nalloc < lc->nlsns + 1) {
01069                                 nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2;
01070                                 if ((ret = __os_realloc(dbenv,
01071                                     nalloc * sizeof(DB_LSN), &lc->array)) != 0)
01072                                         goto err;
01073                                 lc->nalloc = nalloc;
01074                         }
01075                         lc->array[lc->nlsns++] = *lsnp;
01076 
01077                         /*
01078                          * Explicitly copy the previous lsn.  The record
01079                          * starts with a u_int32_t record type, a u_int32_t
01080                          * txn id, and then the DB_LSN (prev_lsn) that we
01081                          * want.  We copy explicitly because we have no idea
01082                          * what kind of record this is.
01083                          */
01084                         memcpy(lsnp, (u_int8_t *)data.data +
01085                             sizeof(u_int32_t) + sizeof(u_int32_t),
01086                             sizeof(DB_LSN));
01087                 }
01088 
01089                 if (ret != 0)
01090                         goto err;
01091         }
01092         if (ret != 0)
01093                 __db_err(dbenv, "collect failed at: [%lu][%lu]",
01094                     (u_long)lsnp->file, (u_long)lsnp->offset);
01095 
01096 err:    if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
01097                 ret = t_ret;
01098         if (data.data != NULL)
01099                 __os_ufree(dbenv, data.data);
01100         return (ret);
01101 }
01102 
01103 /*
01104  * __rep_lsn_cmp --
01105  *      qsort-type-compatible wrapper for log_compare.
01106  */
01107 static int
01108 __rep_lsn_cmp(lsn1, lsn2)
01109         const void *lsn1, *lsn2;
01110 {
01111 
01112         return (log_compare((DB_LSN *)lsn1, (DB_LSN *)lsn2));
01113 }
01114 
01115 /*
01116  * __rep_newfile --
01117  *      NEWFILE messages have the LSN of the last record in the previous
01118  * log file.  When applying a NEWFILE message, make sure we haven't already
01119  * swapped files.
01120  */
01121 static int
01122 __rep_newfile(dbenv, rc, lsnp)
01123         DB_ENV *dbenv;
01124         REP_CONTROL *rc;
01125         DB_LSN *lsnp;
01126 {
01127         DB_LOG *dblp;
01128         LOG *lp;
01129 
01130         dblp = dbenv->lg_handle;
01131         lp = dblp->reginfo.primary;
01132 
01133         if (rc->lsn.file + 1 > lp->lsn.file)
01134                 return (__log_newfile(dblp, lsnp, 0));
01135         else {
01136                 /* We've already applied this NEWFILE.  Just ignore it. */
01137                 *lsnp = lp->lsn;
01138                 return (0);
01139         }
01140 }
01141 
01142 /*
01143  * __rep_do_ckp --
01144  * Perform the memp_sync necessary for this checkpoint without holding the
01145  * REP->mtx_clientdb.  Callers of this function must hold REP->mtx_clientdb
01146  * and must not be holding the region mutex.
01147  */
01148 static int
01149 __rep_do_ckp(dbenv, rec, rp)
01150         DB_ENV *dbenv;
01151         DBT *rec;
01152         REP_CONTROL *rp;
01153 {
01154         DB_LSN ckp_lsn;
01155         DB_REP *db_rep;
01156         int ret;
01157 
01158         db_rep = dbenv->rep_handle;
01159 
01160         MUTEX_UNLOCK(dbenv, db_rep->region->mtx_clientdb);
01161 
01162         DB_TEST_WAIT(dbenv, dbenv->test_check);
01163 
01164         /* Sync the memory pool. */
01165         memcpy(&ckp_lsn, (u_int8_t *)rec->data +
01166             SSZ(__txn_ckp_args, ckp_lsn), sizeof(DB_LSN));
01167         ret = __memp_sync(dbenv, &ckp_lsn);
01168 
01169         /* Update the last_ckp in the txn region. */
01170         if (ret == 0)
01171                 ret = __txn_updateckp(dbenv, &rp->lsn);
01172         else {
01173                 __db_err(dbenv, "Error syncing ckp [%lu][%lu]",
01174                     (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
01175                 ret = __db_panic(dbenv, ret);
01176         }
01177         MUTEX_LOCK(dbenv, db_rep->region->mtx_clientdb);
01178 
01179         return (ret);
01180 }
01181 
01182 /*
01183  * __rep_remfirst --
01184  * Remove the first entry from the __db.rep.db
01185  */
01186 static int
01187 __rep_remfirst(dbenv, cntrl, rec)
01188         DB_ENV *dbenv;
01189         DBT *cntrl;
01190         DBT *rec;
01191 {
01192         DB *dbp;
01193         DBC *dbc;
01194         DB_REP *db_rep;
01195         int ret, t_ret;
01196 
01197         db_rep = dbenv->rep_handle;
01198         dbp = db_rep->rep_db;
01199 
01200         if ((ret = __db_cursor(dbp, NULL, &dbc, 0)) != 0)
01201                 return (ret);
01202 
01203         /* The DBTs need to persist through another call. */
01204         F_SET(cntrl, DB_DBT_REALLOC);
01205         F_SET(rec, DB_DBT_REALLOC);
01206         if ((ret = __db_c_get(dbc, cntrl, rec, DB_RMW | DB_FIRST)) == 0)
01207                 ret = __db_c_del(dbc, 0);
01208         if ((t_ret = __db_c_close(dbc)) != 0 && ret == 0)
01209                 ret = t_ret;
01210 
01211         return (ret);
01212 }
01213 
01214 /*
01215  * __rep_getnext --
01216  * Get the next record out of the __db.rep.db table.
01217  */
01218 static int
01219 __rep_getnext(dbenv)
01220         DB_ENV *dbenv;
01221 {
01222         DB *dbp;
01223         DB_REP *db_rep;
01224         DB_LOG *dblp;
01225         DBC *dbc;
01226         DBT lsn_dbt, nextrec_dbt;
01227         LOG *lp;
01228         REP_CONTROL *rp;
01229         int ret, t_ret;
01230 
01231         dblp = dbenv->lg_handle;
01232         lp = dblp->reginfo.primary;
01233 
01234         db_rep = dbenv->rep_handle;
01235         dbp = db_rep->rep_db;
01236 
01237         if ((ret = __db_cursor(dbp, NULL, &dbc, 0)) != 0)
01238                 return (ret);
01239 
01240         /*
01241          * Update waiting_lsn.  We need to move it
01242          * forward to the LSN of the next record
01243          * in the queue.
01244          *
01245          * If the next item in the database is a log
01246          * record--the common case--we're not
01247          * interested in its contents, just in its LSN.
01248          * Optimize by doing a partial get of the data item.
01249          */
01250         memset(&nextrec_dbt, 0, sizeof(nextrec_dbt));
01251         F_SET(&nextrec_dbt, DB_DBT_PARTIAL);
01252         nextrec_dbt.ulen = nextrec_dbt.dlen = 0;
01253 
01254         memset(&lsn_dbt, 0, sizeof(lsn_dbt));
01255         ret = __db_c_get(dbc, &lsn_dbt, &nextrec_dbt, DB_FIRST);
01256         if (ret != DB_NOTFOUND && ret != 0)
01257                 goto err;
01258 
01259         if (ret == DB_NOTFOUND) {
01260                 ZERO_LSN(lp->waiting_lsn);
01261                 /*
01262                  * Whether or not the current record is
01263                  * simple, there's no next one, and
01264                  * therefore we haven't got anything
01265                  * else to do right now.  Break out.
01266                  */
01267                 goto err;
01268         }
01269         rp = (REP_CONTROL *)lsn_dbt.data;
01270         lp->waiting_lsn = rp->lsn;
01271 
01272 err:    if ((t_ret = __db_c_close(dbc)) != 0 && ret == 0)
01273                 ret = t_ret;
01274         return (ret);
01275 }
01276 
01277 /*
01278  * __rep_process_rec --
01279  *
01280  * Given a record in 'rp', process it.  In the case of a NEWFILE, that means
01281  * potentially switching files.  In the case of a checkpoint, it means doing
01282  * the checkpoint, and in other cases, it means simply writing the record into
01283  * the log.
01284  */
01285 static int
01286 __rep_process_rec(dbenv, rp, rec, typep, ret_lsnp)
01287         DB_ENV *dbenv;
01288         REP_CONTROL *rp;
01289         DBT *rec;
01290         u_int32_t *typep;
01291         DB_LSN *ret_lsnp;
01292 {
01293         DB *dbp;
01294         DB_LOG *dblp;
01295         DB_REP *db_rep;
01296         DBT control_dbt, key_dbt, rec_dbt;
01297         LOG *lp;
01298         REP *rep;
01299         u_int32_t txnid;
01300         int ret, t_ret;
01301 
01302         db_rep = dbenv->rep_handle;
01303         rep = db_rep->region;
01304         dbp = db_rep->rep_db;
01305         dblp = dbenv->lg_handle;
01306         lp = dblp->reginfo.primary;
01307         ret = 0;
01308 
01309         if (rp->rectype == REP_NEWFILE) {
01310                 ret = __rep_newfile(dbenv, rp, &lp->ready_lsn);
01311 
01312                 /* Make this evaluate to a simple rectype. */
01313                 *typep = 0;
01314                 return (0);
01315         }
01316 
01317         memcpy(typep, rec->data, sizeof(*typep));
01318         memset(&control_dbt, 0, sizeof(control_dbt));
01319         memset(&rec_dbt, 0, sizeof(rec_dbt));
01320 
01321         /*
01322          * We write all records except for checkpoint records here.
01323          * All non-checkpoint records need to appear in the log before
01324          * we take action upon them (i.e., we enforce write-ahead logging).
01325          * However, we can't write the checkpoint record here until the
01326          * data buffers are actually written to disk, else we are creating
01327          * an invalid log -- one that says all data before a certain point
01328          * has been written to disk.
01329          *
01330          * If two threads are both processing the same checkpoint record
01331          * (because, for example, it was resent and the original finally
01332          * arrived), we handle that below by checking for the existence of
01333          * the log record when we add it to the replication database.
01334          *
01335          * Any log records that arrive while we are processing the checkpoint
01336          * are added to the bookkeeping database because ready_lsn is not yet
01337          * updated to point after the checkpoint record.
01338          */
01339         if (*typep != DB___txn_ckp || F_ISSET(rep, REP_F_RECOVER_LOG)) {
01340                 if ((ret = __log_rep_put(dbenv, &rp->lsn, rec)) != 0)
01341                         return (ret);
01342                 rep->stat.st_log_records++;
01343                 if (F_ISSET(rep, REP_F_RECOVER_LOG)) {
01344                         *ret_lsnp = rp->lsn;
01345                         goto out;
01346                 }
01347         }
01348 
01349         switch (*typep) {
01350         case DB___dbreg_register:
01351                 /*
01352                  * DB opens occur in the context of a transaction, so we can
01353                  * simply handle them when we process the transaction.  Closes,
01354                  * however, are not transaction-protected, so we have to
01355                  * handle them here.
01356                  *
01357                  * Note that it should be unsafe for the master to do a close
01358                  * of a file that was opened in an active transaction, so we
01359                  * should be guaranteed to get the ordering right.
01360                  */
01361                 memcpy(&txnid, (u_int8_t *)rec->data +
01362                     SSZ(__dbreg_register_args, txnid), sizeof(u_int32_t));
01363                 if (txnid == TXN_INVALID)
01364                         ret = __db_dispatch(dbenv, dbenv->recover_dtab,
01365                             dbenv->recover_dtab_size, rec, &rp->lsn,
01366                             DB_TXN_APPLY, NULL);
01367                 break;
01368         case DB___txn_regop:
01369                 /*
01370                  * If an application is doing app-specific recovery
01371                  * and acquires locks while applying a transaction,
01372                  * it can deadlock.  Any other locks held by this
01373                  * thread should have been discarded in the
01374                  * __rep_process_txn error path, so if we simply
01375                  * retry, we should eventually succeed.
01376                  */
01377                 do {
01378                         ret = 0;
01379                         if (!F_ISSET(db_rep, DBREP_OPENFILES)) {
01380                                 ret = __txn_openfiles(dbenv, NULL, 1);
01381                                 F_SET(db_rep, DBREP_OPENFILES);
01382                         }
01383                         if (ret == 0)
01384                                 ret = __rep_process_txn(dbenv, rec);
01385                 } while (ret == DB_LOCK_DEADLOCK);
01386 
01387                 /* Now flush the log unless we're running TXN_NOSYNC. */
01388                 if (ret == 0 && !F_ISSET(dbenv, DB_ENV_TXN_NOSYNC))
01389                         ret = __log_flush(dbenv, NULL);
01390                 if (ret != 0) {
01391                         __db_err(dbenv, "Error processing txn [%lu][%lu]",
01392                             (u_long)rp->lsn.file, (u_long)rp->lsn.offset);
01393                         ret = __db_panic(dbenv, ret);
01394                 }
01395                 break;
01396         case DB___txn_xa_regop:
01397                 ret = __log_flush(dbenv, NULL);
01398                 break;
01399         case DB___txn_ckp:
01400                 /*
01401                  * We do not want to hold the REP->mtx_clientdb mutex while
01402                  * syncing the mpool, so if we get a checkpoint record we are
01403                  * supposed to process, add it to the __db.rep.db, do the
01404                  * memp_sync and then go back and process it later, when the
01405                  * sync has finished.  If this record is already in the table,
01406                  * then some other thread will process it, so simply return
01407                  * REP_NOTPERM.
01408                  */
01409                 memset(&key_dbt, 0, sizeof(key_dbt));
01410                 key_dbt.data = rp;
01411                 key_dbt.size = sizeof(*rp);
01412 
01413                 /*
01414                  * We want to put this record into the tmp DB only if
01415                  * it doesn't exist, so use DB_NOOVERWRITE.
01416                  */
01417                 ret = __db_put(dbp, NULL, &key_dbt, rec, DB_NOOVERWRITE);
01418                 if (ret == DB_KEYEXIST) {
01419                         if (ret_lsnp != NULL)
01420                                 *ret_lsnp = rp->lsn;
01421                         ret = DB_REP_NOTPERM;
01422                 }
01423                 if (ret != 0)
01424                         break;
01425 
01426                 /*
01427                  * Now, do the checkpoint.  Regardless of
01428                  * whether the checkpoint succeeds or not,
01429                  * we need to remove the record we just put
01430                  * in the temporary database.  If the
01431                  * checkpoint failed, return an error.  We
01432                  * will act like we never received the
01433                  * checkpoint.
01434                  */
01435                 if ((ret = __rep_do_ckp(dbenv, rec, rp)) == 0)
01436                         ret = __log_rep_put(dbenv, &rp->lsn, rec);
01437                 if ((t_ret = __rep_remfirst(dbenv,
01438                     &control_dbt, &rec_dbt)) != 0 && ret == 0)
01439                         ret = t_ret;
01440                 break;
01441         default:
01442                 break;
01443         }
01444 
01445 out:
01446         if (ret == 0 && F_ISSET(rp, DB_LOG_PERM))
01447                 *ret_lsnp = rp->lsn;
01448         if (control_dbt.data != NULL)
01449                 __os_ufree(dbenv, control_dbt.data);
01450         if (rec_dbt.data != NULL)
01451                 __os_ufree(dbenv, rec_dbt.data);
01452 
01453         return (ret);
01454 }
01455 
01456 /*
01457  * __rep_resend_req --
01458  *      We might have dropped a message, we need to resend our request.
01459  *      The request we send is dependent on what recovery state we're in.
01460  *      The caller holds no locks.
01461  */
01462 static int
01463 __rep_resend_req(dbenv, rereq)
01464         DB_ENV *dbenv;
01465         int rereq;
01466 {
01467 
01468         DB_LOG *dblp;
01469         DB_LSN lsn;
01470         DB_REP *db_rep;
01471         LOG *lp;
01472         REP *rep;
01473         int ret;
01474         u_int32_t gapflags, repflags;
01475 
01476         db_rep = dbenv->rep_handle;
01477         rep = db_rep->region;
01478         dblp = dbenv->lg_handle;
01479         lp = dblp->reginfo.primary;
01480         ret = 0;
01481 
01482         repflags = rep->flags;
01483         /*
01484          * If we are delayed we do not rerequest anything.
01485          */
01486         if (FLD_ISSET(repflags, REP_F_DELAY))
01487                 return (ret);
01488         gapflags = rereq ? REP_GAP_REREQUEST : 0;
01489 
01490         if (FLD_ISSET(repflags, REP_F_RECOVER_VERIFY)) {
01491                 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
01492                 lsn = lp->verify_lsn;
01493                 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
01494                 if (!IS_ZERO_LSN(lsn))
01495                         (void)__rep_send_message(dbenv, rep->master_id,
01496                             REP_VERIFY_REQ, &lsn, NULL, 0, DB_REP_REREQUEST);
01497         } else if (FLD_ISSET(repflags, REP_F_RECOVER_UPDATE)) {
01498                 /*
01499                  * UPDATE_REQ only goes to the master.
01500                  */
01501                 (void)__rep_send_message(dbenv, rep->master_id,
01502                     REP_UPDATE_REQ, NULL, NULL, 0, 0);
01503         } else if (FLD_ISSET(repflags, REP_F_RECOVER_PAGE)) {
01504                 REP_SYSTEM_LOCK(dbenv);
01505                 ret = __rep_pggap_req(dbenv, rep, NULL, gapflags);
01506                 REP_SYSTEM_UNLOCK(dbenv);
01507         } else {
01508                 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
01509                 ret = __rep_loggap_req(dbenv, rep, NULL, gapflags);
01510                 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
01511         }
01512 
01513         return (ret);
01514 }
01515 
01516 /*
01517  * __rep_check_doreq --
01518  * PUBLIC: int __rep_check_doreq __P((DB_ENV *, REP *));
01519  *
01520  * Check if we need to send another request.  If so, compare with
01521  * the request limits the user might have set.  This assumes the
01522  * caller holds the REP->mtx_clientdb mutex.  Returns 1 if a request
01523  * needs to be made, and 0 if it does not.
01524  */
01525 int
01526 __rep_check_doreq(dbenv, rep)
01527         DB_ENV *dbenv;
01528         REP *rep;
01529 {
01530 
01531         DB_LOG *dblp;
01532         LOG *lp;
01533         int req;
01534 
01535         dblp = dbenv->lg_handle;
01536         lp = dblp->reginfo.primary;
01537         req = ++lp->rcvd_recs >= lp->wait_recs;
01538         if (req) {
01539                 lp->wait_recs *= 2;
01540                 if (lp->wait_recs > rep->max_gap)
01541                         lp->wait_recs = rep->max_gap;
01542                 lp->rcvd_recs = 0;
01543         }
01544         return (req);
01545 }
01546 
01547 /*
01548  * __rep_skip_msg -
01549  *
01550  *      If we're in recovery we want to skip/ignore the message, but
01551  *      we also need to see if we need to re-request any retransmissions.
01552  */
01553 static int
01554 __rep_skip_msg(dbenv, rep, eid, rectype)
01555         DB_ENV *dbenv;
01556         REP *rep;
01557         int eid;
01558         u_int32_t rectype;
01559 {
01560         int do_req, ret;
01561 
01562         ret = 0;
01563         /*
01564          * If we have a request message from a client then immediately
01565          * send a REP_REREQUEST back to that client since we're skipping it.
01566          */
01567         if (rep->master_id != DB_EID_INVALID && eid != rep->master_id)
01568                 do_req = 1;
01569         else {
01570                 /* Check for need to retransmit. */
01571                 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
01572                 do_req = __rep_check_doreq(dbenv, rep);
01573                 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
01574         }
01575         /*
01576          * Don't respond to a MASTER_REQ with
01577          * a MASTER_REQ or REREQUEST.
01578          */
01579         if (do_req && rectype != REP_MASTER_REQ) {
01580                 /*
01581                  * There are three cases:
01582                  * 1.  If we don't know who the master is, then send MASTER_REQ.
01583                  * 2.  If the message we're skipping came from the master,
01584                  * then we need to rerequest.
01585                  * 3.  If the message didn't come from a master (i.e. client
01586                  * to client), then send a rerequest back to the sender so
01587                  * the sender can rerequest it elsewhere.
01588                  */
01589                 if (rep->master_id == DB_EID_INVALID)   /* Case 1. */
01590                         (void)__rep_send_message(dbenv,
01591                             DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0, 0);
01592                 else if (eid == rep->master_id)         /* Case 2. */
01593                         ret = __rep_resend_req(dbenv, 0);
01594                 else                                    /* Case 3. */
01595                         (void)__rep_send_message(dbenv,
01596                             eid, REP_REREQUEST, NULL, NULL, 0, 0);
01597         }
01598         return (ret);
01599 }

Generated on Sun Dec 25 12:14:45 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2