Main Page | Class Hierarchy | Data Structures | Directories | File List | Data Fields | Related Pages

rep_method.c

00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 2001-2005
00005  *      Sleepycat Software.  All rights reserved.
00006  *
00007  * $Id: rep_method.c,v 12.18 2005/11/08 03:25:13 bostic Exp $
00008  */
00009 
00010 #include "db_config.h"
00011 
00012 #ifndef NO_SYSTEM_INCLUDES
00013 #include <sys/types.h>
00014 
00015 #include <stdlib.h>
00016 #include <string.h>
00017 #endif
00018 
00019 #include "db_int.h"
00020 #include "dbinc/db_page.h"
00021 #include "dbinc/btree.h"
00022 #include "dbinc/log.h"
00023 #include "dbinc/txn.h"
00024 
00025 static int  __rep_abort_prepared __P((DB_ENV *));
00026 static int  __rep_bt_cmp __P((DB *, const DBT *, const DBT *));
00027 static void __rep_config_map __P((DB_ENV *, u_int32_t *, u_int32_t *));
00028 static int  __rep_restore_prepared __P((DB_ENV *));
00029 
00030 /*
00031  * __rep_open --
00032  *      Replication-specific initialization of the DB_ENV structure.
00033  *
00034  * PUBLIC: int __rep_open __P((DB_ENV *));
00035  */
00036 int
00037 __rep_open(dbenv)
00038         DB_ENV *dbenv;
00039 {
00040         DB_REP *db_rep;
00041         int ret;
00042 
00043         if ((ret = __os_calloc(dbenv, 1, sizeof(DB_REP), &db_rep)) != 0)
00044                 return (ret);
00045         dbenv->rep_handle = db_rep;
00046         ret = __rep_region_init(dbenv);
00047         return (ret);
00048 }
00049 
00050 /*
00051  * __rep_get_config --
00052  *      Configure the replication subsystem.
00053  *
00054  * PUBLIC: int __rep_get_config __P((DB_ENV *, u_int32_t, int *));
00055  */
00056 int
00057 __rep_get_config(dbenv, which, onp)
00058         DB_ENV *dbenv;
00059         u_int32_t which;
00060         int *onp;
00061 {
00062         DB_REP *db_rep;
00063         REP *rep;
00064         u_int32_t mapped;
00065 
00066 #undef  OK_FLAGS
00067 #define OK_FLAGS                                                        \
00068 (DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | DB_REP_CONF_NOAUTOINIT    \
00069     | DB_REP_CONF_NOWAIT)
00070 
00071         PANIC_CHECK(dbenv);
00072         ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle,
00073             "rep_get_config", DB_INIT_REP);
00074         if (FLD_ISSET(which, ~OK_FLAGS))
00075                 return (__db_ferr(dbenv, "DB_ENV->rep_get_config", 0));
00076 
00077         db_rep = dbenv->rep_handle;
00078         rep = db_rep->region;
00079 
00080         mapped = 0;
00081         __rep_config_map(dbenv, &which, &mapped);
00082         if (FLD_ISSET(rep->config, mapped))
00083                 *onp = 1;
00084         else
00085                 *onp = 0;
00086         return (0);
00087 }
00088 
00089 /*
00090  * __rep_set_config --
00091  *      Configure the replication subsystem.
00092  *
00093  * PUBLIC: int __rep_set_config __P((DB_ENV *, u_int32_t, int));
00094  */
00095 int
00096 __rep_set_config(dbenv, which, on)
00097         DB_ENV *dbenv;
00098         u_int32_t which;
00099         int on;
00100 {
00101         DB_LOG *dblp;
00102         DB_REP *db_rep;
00103         LOG *lp;
00104         REP *rep;
00105         REP_BULK bulk;
00106         int ret;
00107         u_int32_t mapped, orig;
00108 
00109 #undef  OK_FLAGS
00110 #define OK_FLAGS                                                        \
00111 (DB_REP_CONF_BULK | DB_REP_CONF_DELAYCLIENT | DB_REP_CONF_NOAUTOINIT    \
00112     | DB_REP_CONF_NOWAIT)
00113 
00114         PANIC_CHECK(dbenv);
00115         ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle,
00116             "rep_config", DB_INIT_REP);
00117         if (FLD_ISSET(which, ~OK_FLAGS))
00118                 return (__db_ferr(dbenv, "DB_ENV->rep_set_config", 0));
00119 
00120         dblp = dbenv->lg_handle;
00121         lp = dblp->reginfo.primary;
00122         db_rep = dbenv->rep_handle;
00123         rep = db_rep->region;
00124 
00125         mapped = ret = 0;
00126         __rep_config_map(dbenv, &which, &mapped);
00127         MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00128         REP_SYSTEM_LOCK(dbenv);
00129         orig = rep->config;
00130         if (on)
00131                 FLD_SET(rep->config, mapped);
00132         else
00133                 FLD_CLR(rep->config, mapped);
00134 
00135         /*
00136          * Bulk transfer requires special processing if it is getting
00137          * toggled.
00138          */
00139         if (FLD_ISSET(rep->config, REP_C_BULK) &&
00140             !FLD_ISSET(orig, REP_C_BULK))
00141                 db_rep->bulk = R_ADDR(&dblp->reginfo, lp->bulk_buf);
00142         REP_SYSTEM_UNLOCK(dbenv);
00143         /*
00144          * If turning bulk off and it was on, send out whatever is in the
00145          * buffer already.
00146          */
00147         if (FLD_ISSET(orig, REP_C_BULK) &&
00148             !FLD_ISSET(rep->config, REP_C_BULK) && lp->bulk_off != 0) {
00149                 memset(&bulk, 0, sizeof(bulk));
00150                 if (db_rep->bulk == NULL)
00151                         bulk.addr = R_ADDR(&dblp->reginfo, lp->bulk_buf);
00152                 else
00153                         bulk.addr = db_rep->bulk;
00154                 bulk.offp = &lp->bulk_off;
00155                 bulk.len = lp->bulk_len;
00156                 bulk.type = REP_BULK_LOG;
00157                 bulk.eid = DB_EID_BROADCAST;
00158                 bulk.flagsp = &lp->bulk_flags;
00159                 ret = __rep_send_bulk(dbenv, &bulk, 0);
00160         }
00161         MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00162         return (ret);
00163 }
00164 
00165 static void
00166 __rep_config_map(dbenv, inflagsp, outflagsp)
00167         DB_ENV *dbenv;
00168         u_int32_t *inflagsp, *outflagsp;
00169 {
00170         COMPQUIET(dbenv, NULL);
00171 
00172         if (FLD_ISSET(*inflagsp, DB_REP_CONF_BULK)) {
00173                 FLD_SET(*outflagsp, REP_C_BULK);
00174                 FLD_CLR(*inflagsp, DB_REP_CONF_BULK);
00175         }
00176         if (FLD_ISSET(*inflagsp, DB_REP_CONF_DELAYCLIENT)) {
00177                 FLD_SET(*outflagsp, REP_C_DELAYCLIENT);
00178                 FLD_CLR(*inflagsp, DB_REP_CONF_DELAYCLIENT);
00179         }
00180         if (FLD_ISSET(*inflagsp, DB_REP_CONF_NOAUTOINIT)) {
00181                 FLD_SET(*outflagsp, REP_C_NOAUTOINIT);
00182                 FLD_CLR(*inflagsp, DB_REP_CONF_NOAUTOINIT);
00183         }
00184         if (FLD_ISSET(*inflagsp, DB_REP_CONF_NOWAIT)) {
00185                 FLD_SET(*outflagsp, REP_C_NOWAIT);
00186                 FLD_CLR(*inflagsp, DB_REP_CONF_NOWAIT);
00187         }
00188 }
00189 
00190 /*
00191  * __rep_start --
00192  *      Become a master or client, and start sending messages to participate
00193  * in the replication environment.  Must be called after the environment
00194  * is open.
00195  *
00196  * We must protect rep_start, which may change the world, with the rest
00197  * of the DB library.  Each API interface will count itself as it enters
00198  * the library.  Rep_start checks the following:
00199  *
00200  * rep->msg_th - this is the count of threads currently in rep_process_message
00201  * rep->start_th - this is set if a thread is in rep_start.
00202  * rep->handle_cnt - number of threads actively using a dbp in library.
00203  * rep->txn_cnt - number of active txns.
00204  * REP_F_READY - Replication flag that indicates that we wish to run
00205  * recovery, and want to prohibit new transactions from entering and cause
00206  * existing ones to return immediately (with a DB_LOCK_DEADLOCK error).
00207  *
00208  * There is also the renv->rep_timestamp which is updated whenever significant
00209  * events (i.e., new masters, log rollback, etc).  Upon creation, a handle
00210  * is associated with the current timestamp.  Each time a handle enters the
00211  * library it must check if the handle timestamp is the same as the one
00212  * stored in the replication region.  This prevents the use of handles on
00213  * clients that reference non-existent files whose creation was backed out
00214  * during a synchronizing recovery.
00215  *
00216  * PUBLIC: int __rep_start __P((DB_ENV *, DBT *, u_int32_t));
00217  */
00218 int
00219 __rep_start(dbenv, dbt, flags)
00220         DB_ENV *dbenv;
00221         DBT *dbt;
00222         u_int32_t flags;
00223 {
00224         DB_LOG *dblp;
00225         DB_LSN lsn;
00226         DB_REP *db_rep;
00227         REP *rep;
00228         u_int32_t repflags;
00229         int announce, init_db, redo_prepared, ret, role_chg;
00230         int sleep_cnt, t_ret;
00231 #ifdef DIAGNOSTIC
00232         DB_MSGBUF mb;
00233 #endif
00234 
00235         PANIC_CHECK(dbenv);
00236         ENV_ILLEGAL_BEFORE_OPEN(dbenv, "DB_ENV->rep_start");
00237         ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_start", DB_INIT_REP);
00238 
00239         db_rep = dbenv->rep_handle;
00240         rep = db_rep->region;
00241 
00242         if ((ret = __db_fchk(dbenv, "DB_ENV->rep_start", flags,
00243             DB_REP_CLIENT | DB_REP_MASTER)) != 0)
00244                 return (ret);
00245 
00246         /* Exactly one of CLIENT and MASTER must be specified. */
00247         if ((ret = __db_fcchk(dbenv,
00248             "DB_ENV->rep_start", flags, DB_REP_CLIENT, DB_REP_MASTER)) != 0)
00249                 return (ret);
00250         if (!LF_ISSET(DB_REP_CLIENT | DB_REP_MASTER)) {
00251                 __db_err(dbenv,
00252         "DB_ENV->rep_start: replication mode must be specified");
00253                 return (EINVAL);
00254         }
00255 
00256         /* We need a transport function. */
00257         if (dbenv->rep_send == NULL) {
00258                 __db_err(dbenv,
00259     "DB_ENV->set_rep_transport must be called before DB_ENV->rep_start");
00260                 return (EINVAL);
00261         }
00262 
00263         /*
00264          * If we are about to become (or stay) a master.  Let's flush the log
00265          * to close any potential holes that might happen when upgrading from
00266          * client to master status.
00267          */
00268         if (LF_ISSET(DB_REP_MASTER) && (ret = __log_flush(dbenv, NULL)) != 0)
00269                 return (ret);
00270 
00271         REP_SYSTEM_LOCK(dbenv);
00272         /*
00273          * We only need one thread to start-up replication, so if
00274          * there is another thread in rep_start, we'll let it finish
00275          * its work and have this thread simply return.
00276          */
00277         if (rep->start_th != 0) {
00278                 /*
00279                  * There is already someone in rep_start.  Return.
00280                  */
00281                 RPRINT(dbenv, rep, (dbenv, &mb, "Thread already in rep_start"));
00282                 goto err;
00283         } else
00284                 rep->start_th = 1;
00285 
00286         role_chg = (!F_ISSET(rep, REP_F_MASTER) && LF_ISSET(DB_REP_MASTER)) ||
00287             (!F_ISSET(rep, REP_F_CLIENT) && LF_ISSET(DB_REP_CLIENT));
00288 
00289         /*
00290          * Wait for any active txns or mpool ops to complete, and
00291          * prevent any new ones from occurring, only if we're
00292          * changing roles.  If we are not changing roles, then we
00293          * only need to coordinate with msg_th.
00294          */
00295         if (role_chg) {
00296                 if ((ret = __rep_lockout(dbenv, rep, 0)) != 0)
00297                         goto errunlock;
00298         } else {
00299                 for (sleep_cnt = 0; rep->msg_th != 0;) {
00300                         if (++sleep_cnt % 60 == 0)
00301                                 __db_err(dbenv,
00302         "DB_ENV->rep_start waiting %d minutes for replication message thread",
00303                                     sleep_cnt / 60);
00304                         REP_SYSTEM_UNLOCK(dbenv);
00305                         __os_sleep(dbenv, 1, 0);
00306                         REP_SYSTEM_LOCK(dbenv);
00307                 }
00308         }
00309 
00310         if (rep->eid == DB_EID_INVALID)
00311                 rep->eid = dbenv->rep_eid;
00312 
00313         if (LF_ISSET(DB_REP_MASTER)) {
00314                 if (role_chg) {
00315                         /*
00316                          * If we're upgrading from having been a client,
00317                          * preclose, so that we close our temporary database
00318                          * and any files we opened while doing a rep_apply.
00319                          * If we don't we can infinitely leak file ids if
00320                          * the master crashed with files open (the likely
00321                          * case).  If we don't close them we can run into
00322                          * problems if we try to remove that file or long
00323                          * running applications end up with an unbounded
00324                          * number of used fileids, each getting written
00325                          * on checkpoint.  Just close them.
00326                          */
00327                         if ((ret = __rep_preclose(dbenv)) != 0)
00328                                 goto errunlock;
00329                 }
00330 
00331                 redo_prepared = 0;
00332                 if (!F_ISSET(rep, REP_F_MASTER)) {
00333                         /* Master is not yet set. */
00334                         if (role_chg) {
00335                                 if (rep->w_gen > rep->recover_gen)
00336                                         rep->gen = ++rep->w_gen;
00337                                 else if (rep->gen > rep->recover_gen)
00338                                         rep->gen++;
00339                                 else
00340                                         rep->gen = rep->recover_gen + 1;
00341                                 /*
00342                                  * There could have been any number of failed
00343                                  * elections, so jump the gen if we need to now.
00344                                  */
00345                                 if (rep->egen > rep->gen)
00346                                         rep->gen = rep->egen;
00347                                 redo_prepared = 1;
00348                         } else if (rep->gen == 0)
00349                                 rep->gen = rep->recover_gen + 1;
00350                         if (F_ISSET(rep, REP_F_MASTERELECT)) {
00351                                 __rep_elect_done(dbenv, rep);
00352                                 F_CLR(rep, REP_F_MASTERELECT);
00353                         }
00354                         if (rep->egen <= rep->gen)
00355                                 rep->egen = rep->gen + 1;
00356                         RPRINT(dbenv, rep, (dbenv, &mb,
00357                             "New master gen %lu, egen %lu",
00358                             (u_long)rep->gen, (u_long)rep->egen));
00359                 }
00360                 rep->master_id = rep->eid;
00361                 /*
00362                  * Note, setting flags below implicitly clears out
00363                  * REP_F_NOARCHIVE, REP_F_INIT and REP_F_READY.
00364                  */
00365                 rep->flags = REP_F_MASTER;
00366                 rep->start_th = 0;
00367                 REP_SYSTEM_UNLOCK(dbenv);
00368                 dblp = (DB_LOG *)dbenv->lg_handle;
00369                 LOG_SYSTEM_LOCK(dbenv);
00370                 lsn = ((LOG *)dblp->reginfo.primary)->lsn;
00371                 LOG_SYSTEM_UNLOCK(dbenv);
00372 
00373                 /*
00374                  * Send the NEWMASTER message first so that clients know
00375                  * subsequent messages are coming from the right master.
00376                  * We need to perform all actions below no master what
00377                  * regarding errors.
00378                  */
00379                 (void)__rep_send_message(dbenv,
00380                     DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0, 0);
00381                 ret = 0;
00382                 if (role_chg) {
00383                         ret = __txn_reset(dbenv);
00384                         REP_SYSTEM_LOCK(dbenv);
00385                         F_CLR(rep, REP_F_READY);
00386                         rep->in_recovery = 0;
00387                         REP_SYSTEM_UNLOCK(dbenv);
00388                 }
00389                 /*
00390                  * Take a transaction checkpoint so that our new generation
00391                  * number get written to the log.
00392                  */
00393                 if ((t_ret = __txn_checkpoint(dbenv, 0, 0, DB_FORCE)) != 0 &&
00394                     ret == 0)
00395                         ret = t_ret;
00396                 if (redo_prepared &&
00397                     (t_ret = __rep_restore_prepared(dbenv)) != 0 && ret == 0)
00398                         ret = t_ret;
00399         } else {
00400                 init_db = 0;
00401                 announce = role_chg || rep->master_id == DB_EID_INVALID;
00402 
00403                 /*
00404                  * If we're changing roles from master to client or if
00405                  * we never were any role at all, we need to init the db.
00406                  */
00407                 if (role_chg || !F_ISSET(rep, REP_F_CLIENT)) {
00408                         rep->master_id = DB_EID_INVALID;
00409                         init_db = 1;
00410                 }
00411                 /* Zero out everything except recovery and tally flags. */
00412                 repflags = F_ISSET(rep, REP_F_NOARCHIVE |
00413                     REP_F_RECOVER_MASK | REP_F_TALLY);
00414                 FLD_SET(repflags, REP_F_CLIENT);
00415 
00416                 rep->flags = repflags;
00417                 REP_SYSTEM_UNLOCK(dbenv);
00418 
00419                 /*
00420                  * Abort any prepared transactions that were restored
00421                  * by recovery.  We won't be able to create any txns of
00422                  * our own until they're resolved, but we can't resolve
00423                  * them ourselves;  the master has to.  If any get
00424                  * resolved as commits, we'll redo them when commit
00425                  * records come in.  Aborts will simply be ignored.
00426                  */
00427                 if ((ret = __rep_abort_prepared(dbenv)) != 0)
00428                         goto errlock;
00429 
00430                 MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00431                 ret = __rep_client_dbinit(dbenv, init_db, REP_DB);
00432                 MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00433                 if (ret != 0)
00434                         goto errlock;
00435                 REP_SYSTEM_LOCK(dbenv);
00436                 rep->start_th = 0;
00437                 if (role_chg) {
00438                         F_CLR(rep, REP_F_READY);
00439                         rep->in_recovery = 0;
00440                 }
00441                 REP_SYSTEM_UNLOCK(dbenv);
00442 
00443                 /*
00444                  * If this client created a newly replicated environment,
00445                  * then announce the existence of this client.  The master
00446                  * should respond with a message that will tell this client
00447                  * the current generation number and the current LSN.  This
00448                  * will allow the client to either perform recovery or
00449                  * simply join in.
00450                  */
00451                 if (announce)
00452                         (void)__rep_send_message(dbenv,
00453                             DB_EID_BROADCAST, REP_NEWCLIENT, NULL, dbt, 0, 0);
00454                 else
00455                         (void)__rep_send_message(dbenv,
00456                             DB_EID_BROADCAST, REP_ALIVE_REQ, NULL, NULL, 0, 0);
00457         }
00458 
00459         if (0) {
00460                 /*
00461                  * We have separate labels for errors.  If we're returning an
00462                  * error before we've set start_th, we use 'err'.  If
00463                  * we are erroring while holding the region mutex, then we use
00464                  * 'errunlock' label.  If we're erroring without holding the rep
00465                  * mutex we must use 'errlock'.
00466                  */
00467 errlock:        REP_SYSTEM_LOCK(dbenv);
00468 errunlock:      rep->start_th = 0;
00469                 if (role_chg) {
00470                         F_CLR(rep, REP_F_READY);
00471                         rep->in_recovery = 0;
00472                 }
00473 err:            REP_SYSTEM_UNLOCK(dbenv);
00474         }
00475         return (ret);
00476 }
00477 
00478 /*
00479  * __rep_client_dbinit --
00480  *
00481  * Initialize the LSN database on the client side.  This is called from the
00482  * client initialization code.  The startup flag value indicates if
00483  * this is the first thread/process starting up and therefore should create
00484  * the LSN database.  This routine must be called once by each process acting
00485  * as a client.
00486  *
00487  * Assumes caller holds appropriate mutex.
00488  *
00489  * PUBLIC: int __rep_client_dbinit __P((DB_ENV *, int, repdb_t));
00490  */
00491 int
00492 __rep_client_dbinit(dbenv, startup, which)
00493         DB_ENV *dbenv;
00494         int startup;
00495         repdb_t which;
00496 {
00497         DB_REP *db_rep;
00498         DB *dbp, **rdbpp;
00499         REP *rep;
00500         int ret, t_ret;
00501         u_int32_t flags;
00502         const char *name;
00503 
00504         PANIC_CHECK(dbenv);
00505         db_rep = dbenv->rep_handle;
00506         rep = db_rep->region;
00507         dbp = NULL;
00508 
00509 #define REPDBNAME       "__db.rep.db"
00510 #define REPPAGENAME     "__db.reppg.db"
00511 
00512         if (which == REP_DB) {
00513                 name = REPDBNAME;
00514                 rdbpp = &db_rep->rep_db;
00515         } else {
00516                 name = REPPAGENAME;
00517                 rdbpp = &rep->file_dbp;
00518         }
00519         /* Check if this has already been called on this environment. */
00520         if (*rdbpp != NULL)
00521                 return (0);
00522 
00523         if (startup) {
00524                 if ((ret = db_create(&dbp, dbenv, 0)) != 0)
00525                         goto err;
00526                 /*
00527                  * Ignore errors, because if the file doesn't exist, this
00528                  * is perfectly OK.
00529                  */
00530                 (void)__db_remove(dbp, NULL, name, NULL, DB_FORCE);
00531         }
00532 
00533         if ((ret = db_create(&dbp, dbenv, 0)) != 0)
00534                 goto err;
00535         if (which == REP_DB &&
00536             (ret = __bam_set_bt_compare(dbp, __rep_bt_cmp)) != 0)
00537                 goto err;
00538 
00539         /* Allow writes to this database on a client. */
00540         F_SET(dbp, DB_AM_CL_WRITER);
00541 
00542         flags = DB_NO_AUTO_COMMIT |
00543             (startup ? DB_CREATE : 0) |
00544             (F_ISSET(dbenv, DB_ENV_THREAD) ? DB_THREAD : 0);
00545 
00546         if ((ret = __db_open(dbp, NULL, name, NULL,
00547             (which == REP_DB ? DB_BTREE : DB_RECNO),
00548             flags, 0, PGNO_BASE_MD)) != 0)
00549                 goto err;
00550 
00551         *rdbpp= dbp;
00552 
00553         if (0) {
00554 err:            if (dbp != NULL &&
00555                     (t_ret = __db_close(dbp, NULL, DB_NOSYNC)) != 0 && ret == 0)
00556                         ret = t_ret;
00557                 *rdbpp = NULL;
00558         }
00559 
00560         return (ret);
00561 }
00562 
00563 /*
00564  * __rep_bt_cmp --
00565  *
00566  * Comparison function for the LSN table.  We use the entire control
00567  * structure as a key (for simplicity, so we don't have to merge the
00568  * other fields in the control with the data field), but really only
00569  * care about the LSNs.
00570  */
00571 static int
00572 __rep_bt_cmp(dbp, dbt1, dbt2)
00573         DB *dbp;
00574         const DBT *dbt1, *dbt2;
00575 {
00576         DB_LSN lsn1, lsn2;
00577         REP_CONTROL *rp1, *rp2;
00578 
00579         COMPQUIET(dbp, NULL);
00580 
00581         rp1 = dbt1->data;
00582         rp2 = dbt2->data;
00583 
00584         (void)__ua_memcpy(&lsn1, &rp1->lsn, sizeof(DB_LSN));
00585         (void)__ua_memcpy(&lsn2, &rp2->lsn, sizeof(DB_LSN));
00586 
00587         if (lsn1.file > lsn2.file)
00588                 return (1);
00589 
00590         if (lsn1.file < lsn2.file)
00591                 return (-1);
00592 
00593         if (lsn1.offset > lsn2.offset)
00594                 return (1);
00595 
00596         if (lsn1.offset < lsn2.offset)
00597                 return (-1);
00598 
00599         return (0);
00600 }
00601 
00602 /*
00603  * __rep_abort_prepared --
00604  *      Abort any prepared transactions that recovery restored.
00605  *
00606  *      This is used by clients that have just run recovery, since
00607  * they cannot/should not call txn_recover and handle prepared transactions
00608  * themselves.
00609  */
00610 static int
00611 __rep_abort_prepared(dbenv)
00612         DB_ENV *dbenv;
00613 {
00614 #define PREPLISTSIZE    50
00615         DB_PREPLIST prep[PREPLISTSIZE], *p;
00616         DB_TXNMGR *mgr;
00617         DB_TXNREGION *region;
00618         int do_aborts, ret;
00619         long count, i;
00620         u_int32_t op;
00621 
00622         mgr = dbenv->tx_handle;
00623         region = mgr->reginfo.primary;
00624 
00625         do_aborts = 0;
00626         TXN_SYSTEM_LOCK(dbenv);
00627         if (region->stat.st_nrestores != 0)
00628                 do_aborts = 1;
00629         TXN_SYSTEM_UNLOCK(dbenv);
00630 
00631         if (do_aborts) {
00632                 op = DB_FIRST;
00633                 do {
00634                         if ((ret = __txn_recover(dbenv,
00635                             prep, PREPLISTSIZE, &count, op)) != 0)
00636                                 return (ret);
00637                         for (i = 0; i < count; i++) {
00638                                 p = &prep[i];
00639                                 if ((ret = __txn_abort(p->txn)) != 0)
00640                                         return (ret);
00641                         }
00642                         op = DB_NEXT;
00643                 } while (count == PREPLISTSIZE);
00644         }
00645 
00646         return (0);
00647 }
00648 
00649 /*
00650  * __rep_restore_prepared --
00651  *      Restore to a prepared state any prepared but not yet committed
00652  * transactions.
00653  *
00654  *      This performs, in effect, a "mini-recovery";  it is called from
00655  * __rep_start by newly upgraded masters.  There may be transactions that an
00656  * old master prepared but did not resolve, which we need to restore to an
00657  * active state.
00658  */
00659 static int
00660 __rep_restore_prepared(dbenv)
00661         DB_ENV *dbenv;
00662 {
00663         DB_LOGC *logc;
00664         DB_LSN ckp_lsn, lsn;
00665         DB_TXNHEAD *txninfo;
00666         DBT rec;
00667         __txn_ckp_args *ckp_args;
00668         __txn_regop_args *regop_args;
00669         __txn_xa_regop_args *prep_args;
00670         int ret, t_ret;
00671         u_int32_t hi_txn, low_txn, rectype, status;
00672 
00673         txninfo = NULL;
00674         ckp_args = NULL;
00675         prep_args = NULL;
00676         regop_args = NULL;
00677         ZERO_LSN(ckp_lsn);
00678         ZERO_LSN(lsn);
00679 
00680         if ((ret = __log_cursor(dbenv, &logc)) != 0)
00681                 return (ret);
00682 
00683         /*
00684          * We need to consider the set of records between the most recent
00685          * checkpoint LSN and the end of the log;  any txn in that
00686          * range, and only txns in that range, could still have been
00687          * active, and thus prepared but not yet committed (PBNYC),
00688          * when the old master died.
00689          *
00690          * Find the most recent checkpoint LSN, and get the record there.
00691          * If there is no checkpoint in the log, start off by getting
00692          * the very first record in the log instead.
00693          */
00694         memset(&rec, 0, sizeof(DBT));
00695         if ((ret = __txn_getckp(dbenv, &lsn)) == 0) {
00696                 if ((ret = __log_c_get(logc, &lsn, &rec, DB_SET)) != 0)  {
00697                         __db_err(dbenv,
00698                             "Checkpoint record at LSN [%lu][%lu] not found",
00699                             (u_long)lsn.file, (u_long)lsn.offset);
00700                         goto err;
00701                 }
00702 
00703                 if ((ret = __txn_ckp_read(dbenv, rec.data, &ckp_args)) != 0) {
00704                         __db_err(dbenv,
00705                             "Invalid checkpoint record at [%lu][%lu]",
00706                             (u_long)lsn.file, (u_long)lsn.offset);
00707                         goto err;
00708                 }
00709 
00710                 ckp_lsn = ckp_args->ckp_lsn;
00711                 __os_free(dbenv, ckp_args);
00712 
00713                 if ((ret = __log_c_get(logc, &ckp_lsn, &rec, DB_SET)) != 0) {
00714                         __db_err(dbenv,
00715                             "Checkpoint LSN record [%lu][%lu] not found",
00716                             (u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
00717                         goto err;
00718                 }
00719         } else if ((ret = __log_c_get(logc, &lsn, &rec, DB_FIRST)) != 0) {
00720                 if (ret == DB_NOTFOUND) {
00721                         /* An empty log means no PBNYC txns. */
00722                         ret = 0;
00723                         goto done;
00724                 }
00725                 __db_err(dbenv, "Attempt to get first log record failed");
00726                 goto err;
00727         }
00728 
00729         /*
00730          * We use the same txnlist infrastructure that recovery does;
00731          * it demands an estimate of the high and low txnids for
00732          * initialization.
00733          *
00734          * First, the low txnid.
00735          */
00736         do {
00737                 /* txnid is after rectype, which is a u_int32. */
00738                 memcpy(&low_txn,
00739                     (u_int8_t *)rec.data + sizeof(u_int32_t), sizeof(low_txn));
00740                 if (low_txn != 0)
00741                         break;
00742         } while ((ret = __log_c_get(logc, &lsn, &rec, DB_NEXT)) == 0);
00743 
00744         /* If there are no txns, there are no PBNYC txns. */
00745         if (ret == DB_NOTFOUND) {
00746                 ret = 0;
00747                 goto done;
00748         } else if (ret != 0)
00749                 goto err;
00750 
00751         /* Now, the high txnid. */
00752         if ((ret = __log_c_get(logc, &lsn, &rec, DB_LAST)) != 0) {
00753                 /*
00754                  * Note that DB_NOTFOUND is unacceptable here because we
00755                  * had to have looked at some log record to get this far.
00756                  */
00757                 __db_err(dbenv, "Final log record not found");
00758                 goto err;
00759         }
00760         do {
00761                 /* txnid is after rectype, which is a u_int32. */
00762                 memcpy(&hi_txn,
00763                     (u_int8_t *)rec.data + sizeof(u_int32_t), sizeof(hi_txn));
00764                 if (hi_txn != 0)
00765                         break;
00766         } while ((ret = __log_c_get(logc, &lsn, &rec, DB_PREV)) == 0);
00767         if (ret == DB_NOTFOUND) {
00768                 ret = 0;
00769                 goto done;
00770         } else if (ret != 0)
00771                 goto err;
00772 
00773         /* We have a high and low txnid.  Initialise the txn list. */
00774         if ((ret =
00775             __db_txnlist_init(dbenv, low_txn, hi_txn, NULL, &txninfo)) != 0)
00776                 goto err;
00777 
00778         /*
00779          * Now, walk backward from the end of the log to ckp_lsn.  Any
00780          * prepares that we hit without first hitting a commit or
00781          * abort belong to PBNYC txns, and we need to apply them and
00782          * restore them to a prepared state.
00783          *
00784          * Note that we wind up applying transactions out of order.
00785          * Since all PBNYC txns still held locks on the old master and
00786          * were isolated, this should be safe.
00787          */
00788         for (ret = __log_c_get(logc, &lsn, &rec, DB_LAST);
00789             ret == 0 && log_compare(&lsn, &ckp_lsn) > 0;
00790             ret = __log_c_get(logc, &lsn, &rec, DB_PREV)) {
00791                 memcpy(&rectype, rec.data, sizeof(rectype));
00792                 switch (rectype) {
00793                 case DB___txn_regop:
00794                         /*
00795                          * It's a commit or abort--but we don't care
00796                          * which!  Just add it to the list of txns
00797                          * that are resolved.
00798                          */
00799                         if ((ret = __txn_regop_read(dbenv, rec.data,
00800                             &regop_args)) != 0)
00801                                 goto err;
00802 
00803                         ret = __db_txnlist_find(dbenv,
00804                             txninfo, regop_args->txnid->txnid, &status);
00805                         if (ret == DB_NOTFOUND)
00806                                 ret = __db_txnlist_add(dbenv, txninfo,
00807                                     regop_args->txnid->txnid,
00808                                     regop_args->opcode, &lsn);
00809                         else if (ret != 0)
00810                                 goto err;
00811                         __os_free(dbenv, regop_args);
00812                         break;
00813                 case DB___txn_xa_regop:
00814                         /*
00815                          * It's a prepare.  If its not aborted and
00816                          * we haven't put the txn on our list yet, it
00817                          * hasn't been resolved, so apply and restore it.
00818                          */
00819                         if ((ret = __txn_xa_regop_read(dbenv, rec.data,
00820                             &prep_args)) != 0)
00821                                 goto err;
00822                         ret = __db_txnlist_find(dbenv, txninfo,
00823                             prep_args->txnid->txnid, &status);
00824                         if (ret == DB_NOTFOUND) {
00825                                 if (prep_args->opcode == TXN_ABORT)
00826                                         ret = __db_txnlist_add(dbenv, txninfo,
00827                                             prep_args->txnid->txnid,
00828                                             prep_args->opcode, &lsn);
00829                                 else if ((ret =
00830                                     __rep_process_txn(dbenv, &rec)) == 0)
00831                                         ret = __txn_restore_txn(dbenv,
00832                                             &lsn, prep_args);
00833                         } else if (ret != 0)
00834                                 goto err;
00835                         __os_free(dbenv, prep_args);
00836                         break;
00837                 default:
00838                         continue;
00839                 }
00840         }
00841 
00842         /* It's not an error to have hit the beginning of the log. */
00843         if (ret == DB_NOTFOUND)
00844                 ret = 0;
00845 
00846 done:
00847 err:    t_ret = __log_c_close(logc);
00848 
00849         if (txninfo != NULL)
00850                 __db_txnlist_end(dbenv, txninfo);
00851 
00852         return (ret == 0 ? t_ret : ret);
00853 }
00854 
00855 /*
00856  * PUBLIC: int __rep_get_limit __P((DB_ENV *, u_int32_t *, u_int32_t *));
00857  */
00858 int
00859 __rep_get_limit(dbenv, gbytesp, bytesp)
00860         DB_ENV *dbenv;
00861         u_int32_t *gbytesp, *bytesp;
00862 {
00863         DB_REP *db_rep;
00864         REP *rep;
00865 
00866         PANIC_CHECK(dbenv);
00867         ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_get_limit",
00868             DB_INIT_REP);
00869 
00870         if (!REP_ON(dbenv)) {
00871                 __db_err(dbenv,
00872     "DB_ENV->get_rep_limit: database environment not properly initialized");
00873                 return (__db_panic(dbenv, EINVAL));
00874         }
00875         db_rep = dbenv->rep_handle;
00876         rep = db_rep->region;
00877 
00878         if (gbytesp != NULL)
00879                 *gbytesp = rep->gbytes;
00880         if (bytesp != NULL)
00881                 *bytesp = rep->bytes;
00882 
00883         return (0);
00884 }
00885 
00886 /*
00887  * __rep_set_limit --
00888  *      Set a limit on the amount of data that will be sent during a single
00889  * invocation of __rep_process_message.
00890  *
00891  * PUBLIC: int __rep_set_limit __P((DB_ENV *, u_int32_t, u_int32_t));
00892  */
00893 int
00894 __rep_set_limit(dbenv, gbytes, bytes)
00895         DB_ENV *dbenv;
00896         u_int32_t gbytes, bytes;
00897 {
00898         DB_REP *db_rep;
00899         REP *rep;
00900 
00901         PANIC_CHECK(dbenv);
00902         ENV_ILLEGAL_BEFORE_OPEN(dbenv, "DB_ENV->rep_set_limit");
00903         ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_set_limit",
00904             DB_INIT_REP);
00905 
00906         if (!REP_ON(dbenv)) {
00907                 __db_err(dbenv,
00908     "DB_ENV->set_rep_limit: database environment not properly initialized");
00909                 return (__db_panic(dbenv, EINVAL));
00910         }
00911         db_rep = dbenv->rep_handle;
00912         rep = db_rep->region;
00913         REP_SYSTEM_LOCK(dbenv);
00914         if (bytes > GIGABYTE) {
00915                 gbytes += bytes / GIGABYTE;
00916                 bytes = bytes % GIGABYTE;
00917         }
00918         rep->gbytes = gbytes;
00919         rep->bytes = bytes;
00920         REP_SYSTEM_UNLOCK(dbenv);
00921 
00922         return (0);
00923 }
00924 
00925 /*
00926  * __rep_set_request --
00927  *      Set the minimum and maximum number of log records that we wait
00928  *      before retransmitting.
00929  *
00930  * !!!
00931  * UNDOCUMENTED.
00932  *
00933  * PUBLIC: int __rep_set_request __P((DB_ENV *, u_int32_t, u_int32_t));
00934  */
00935 int
00936 __rep_set_request(dbenv, min, max)
00937         DB_ENV *dbenv;
00938         u_int32_t min, max;
00939 {
00940         LOG *lp;
00941         DB_LOG *dblp;
00942         DB_REP *db_rep;
00943         REP *rep;
00944 
00945         PANIC_CHECK(dbenv);
00946         ENV_ILLEGAL_BEFORE_OPEN(dbenv, "DB_ENV->rep_set_request");
00947         ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_set_request",
00948             DB_INIT_REP);
00949 
00950         if (!REP_ON(dbenv)) {
00951                 __db_err(dbenv,
00952     "DB_ENV->set_rep_request: database environment not properly initialized");
00953                 return (__db_panic(dbenv, EINVAL));
00954         }
00955         db_rep = dbenv->rep_handle;
00956         rep = db_rep->region;
00957 
00958         /*
00959          * We acquire the mtx_region or mtx_clientdb mutexes as needed.
00960          */
00961         REP_SYSTEM_LOCK(dbenv);
00962         rep->request_gap = min;
00963         rep->max_gap = max;
00964         REP_SYSTEM_UNLOCK(dbenv);
00965 
00966         MUTEX_LOCK(dbenv, rep->mtx_clientdb);
00967         dblp = dbenv->lg_handle;
00968         if (dblp != NULL && (lp = dblp->reginfo.primary) != NULL) {
00969                 lp->wait_recs = 0;
00970                 lp->rcvd_recs = 0;
00971         }
00972         MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
00973 
00974         return (0);
00975 }
00976 
00977 /*
00978  * __rep_set_transport --
00979  *      Set the transport function for replication.
00980  *
00981  * PUBLIC: int __rep_set_rep_transport __P((DB_ENV *, int,
00982  * PUBLIC:     int (*)(DB_ENV *, const DBT *, const DBT *, const DB_LSN *,
00983  * PUBLIC:     int, u_int32_t)));
00984  */
00985 int
00986 __rep_set_rep_transport(dbenv, eid, f_send)
00987         DB_ENV *dbenv;
00988         int eid;
00989         int (*f_send) __P((DB_ENV *, const DBT *, const DBT *, const DB_LSN *,
00990             int, u_int32_t));
00991 {
00992         PANIC_CHECK(dbenv);
00993 
00994         if (f_send == NULL) {
00995                 __db_err(dbenv,
00996         "DB_ENV->set_rep_transport: no send function specified");
00997                 return (EINVAL);
00998         }
00999         if (eid < 0) {
01000                 __db_err(dbenv,
01001         "DB_ENV->set_rep_transport: eid must be greater than or equal to 0");
01002                 return (EINVAL);
01003         }
01004         dbenv->rep_send = f_send;
01005         dbenv->rep_eid = eid;
01006         return (0);
01007 }
01008 
01009 /*
01010  * __rep_flush --
01011  *      Re-push the last log record to all clients, in case they've lost
01012  *      messages and don't know it.
01013  *
01014  * PUBLIC: int __rep_flush __P((DB_ENV *));
01015  */
01016 int
01017 __rep_flush(dbenv)
01018         DB_ENV *dbenv;
01019 {
01020         DBT rec;
01021         DB_LOGC *logc;
01022         DB_LSN lsn;
01023         int ret, t_ret;
01024 
01025         PANIC_CHECK(dbenv);
01026         ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_flush", DB_INIT_REP);
01027 
01028         if ((ret = __log_cursor(dbenv, &logc)) != 0)
01029                 return (ret);
01030 
01031         memset(&rec, 0, sizeof(rec));
01032         memset(&lsn, 0, sizeof(lsn));
01033 
01034         if ((ret = __log_c_get(logc, &lsn, &rec, DB_LAST)) != 0)
01035                 goto err;
01036 
01037         (void)__rep_send_message(dbenv,
01038             DB_EID_BROADCAST, REP_LOG, &lsn, &rec, 0, 0);
01039 
01040 err:    if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
01041                 ret = t_ret;
01042         return (ret);
01043 }
01044 
01045 /*
01046  * __rep_sync --
01047  *      Force a synchronization to occur between this client and the master.
01048  *      This is the other half of configuring DELAYCLIENT.
01049  *
01050  * PUBLIC: int __rep_sync __P((DB_ENV *, u_int32_t));
01051  */
01052 int
01053 __rep_sync(dbenv, flags)
01054         DB_ENV *dbenv;
01055         u_int32_t flags;
01056 {
01057         DB_LOG *dblp;
01058         DB_LSN lsn;
01059         DB_REP *db_rep;
01060         LOG *lp;
01061         REP *rep;
01062         int master;
01063         u_int32_t type;
01064 
01065         COMPQUIET(flags, 0);
01066         PANIC_CHECK(dbenv);
01067         ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle,
01068             "rep_sync", DB_INIT_REP);
01069 
01070         dblp = dbenv->lg_handle;
01071         lp = dblp->reginfo.primary;
01072         db_rep = dbenv->rep_handle;
01073         rep = db_rep->region;
01074 
01075         /*
01076          * Simple cases.  If we're not in the DELAY state we have nothing
01077          * to do.  If we don't know who the master is, send a MASTER_REQ.
01078          */
01079         MUTEX_LOCK(dbenv, rep->mtx_clientdb);
01080         lsn = lp->verify_lsn;
01081         MUTEX_UNLOCK(dbenv, rep->mtx_clientdb);
01082         REP_SYSTEM_LOCK(dbenv);
01083         master = rep->master_id;
01084         if (master == DB_EID_INVALID) {
01085                 REP_SYSTEM_UNLOCK(dbenv);
01086                 (void)__rep_send_message(dbenv, DB_EID_BROADCAST,
01087                     REP_MASTER_REQ, NULL, NULL, 0, 0);
01088                 return (0);
01089         }
01090         /*
01091          * We want to hold the rep mutex to test and then clear the
01092          * DELAY flag.  Racing threads in here could otherwise result
01093          * in dual data streams.
01094          */
01095         if (!F_ISSET(rep, REP_F_DELAY)) {
01096                 REP_SYSTEM_UNLOCK(dbenv);
01097                 return (0);
01098         }
01099 
01100         /*
01101          * If we get here, we clear the delay flag and kick off a
01102          * synchronization.  From this point forward, we will
01103          * synchronize until the next time the master changes.
01104          */
01105         F_CLR(rep, REP_F_DELAY);
01106         REP_SYSTEM_UNLOCK(dbenv);
01107         /*
01108          * When we set REP_F_DELAY, we set verify_lsn to the real verify
01109          * lsn if we need to verify, or we zeroed it out if this is a client
01110          * that needs to sync up from the beginning.  So, send the type
01111          * of message now that __rep_new_master delayed sending.
01112          */
01113         if (IS_ZERO_LSN(lsn))
01114                 type = REP_ALL_REQ;
01115         else
01116                 type = REP_VERIFY_REQ;
01117         (void)__rep_send_message(dbenv, master, type, &lsn, NULL, 0,
01118             DB_REP_ANYWHERE);
01119         return (0);
01120 }

Generated on Sun Dec 25 12:14:45 2005 for Berkeley DB 4.4.16 by  doxygen 1.4.2