Header And Logo

PostgreSQL
| The world's most advanced open source database.

standby.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * standby.c
00004  *    Misc functions used in Hot Standby mode.
00005  *
00006  *  All functions for handling RM_STANDBY_ID, which relate to
00007  *  AccessExclusiveLocks and starting snapshots for Hot Standby mode.
00008  *  Plus conflict recovery processing.
00009  *
00010  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00011  * Portions Copyright (c) 1994, Regents of the University of California
00012  *
00013  * IDENTIFICATION
00014  *    src/backend/storage/ipc/standby.c
00015  *
00016  *-------------------------------------------------------------------------
00017  */
00018 #include "postgres.h"
00019 #include "access/transam.h"
00020 #include "access/twophase.h"
00021 #include "access/xact.h"
00022 #include "access/xlog.h"
00023 #include "miscadmin.h"
00024 #include "storage/bufmgr.h"
00025 #include "storage/lmgr.h"
00026 #include "storage/proc.h"
00027 #include "storage/procarray.h"
00028 #include "storage/sinvaladt.h"
00029 #include "storage/standby.h"
00030 #include "utils/ps_status.h"
00031 #include "utils/timeout.h"
00032 #include "utils/timestamp.h"
00033 
00034 /* User-settable GUC parameters */
00035 int         vacuum_defer_cleanup_age;
00036 int         max_standby_archive_delay = 30 * 1000;
00037 int         max_standby_streaming_delay = 30 * 1000;
00038 
00039 static List *RecoveryLockList;
00040 
00041 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
00042                                        ProcSignalReason reason);
00043 static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
00044 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
00045 static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
00046 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
00047 
00048 
00049 /*
00050  * InitRecoveryTransactionEnvironment
00051  *      Initialize tracking of in-progress transactions in master
00052  *
00053  * We need to issue shared invalidations and hold locks. Holding locks
00054  * means others may want to wait on us, so we need to make a lock table
00055  * vxact entry like a real transaction. We could create and delete
00056  * lock table entries for each transaction but its simpler just to create
00057  * one permanent entry and leave it there all the time. Locks are then
00058  * acquired and released as needed. Yes, this means you can see the
00059  * Startup process in pg_locks once we have run this.
00060  */
00061 void
00062 InitRecoveryTransactionEnvironment(void)
00063 {
00064     VirtualTransactionId vxid;
00065 
00066     /*
00067      * Initialize shared invalidation management for Startup process, being
00068      * careful to register ourselves as a sendOnly process so we don't need to
00069      * read messages, nor will we get signalled when the queue starts filling
00070      * up.
00071      */
00072     SharedInvalBackendInit(true);
00073 
00074     /*
00075      * Lock a virtual transaction id for Startup process.
00076      *
00077      * We need to do GetNextLocalTransactionId() because
00078      * SharedInvalBackendInit() leaves localTransactionid invalid and the lock
00079      * manager doesn't like that at all.
00080      *
00081      * Note that we don't need to run XactLockTableInsert() because nobody
00082      * needs to wait on xids. That sounds a little strange, but table locks
00083      * are held by vxids and row level locks are held by xids. All queries
00084      * hold AccessShareLocks so never block while we write or lock new rows.
00085      */
00086     vxid.backendId = MyBackendId;
00087     vxid.localTransactionId = GetNextLocalTransactionId();
00088     VirtualXactLockTableInsert(vxid);
00089 
00090     standbyState = STANDBY_INITIALIZED;
00091 }
00092 
00093 /*
00094  * ShutdownRecoveryTransactionEnvironment
00095  *      Shut down transaction tracking
00096  *
00097  * Prepare to switch from hot standby mode to normal operation. Shut down
00098  * recovery-time transaction tracking.
00099  */
00100 void
00101 ShutdownRecoveryTransactionEnvironment(void)
00102 {
00103     /* Mark all tracked in-progress transactions as finished. */
00104     ExpireAllKnownAssignedTransactionIds();
00105 
00106     /* Release all locks the tracked transactions were holding */
00107     StandbyReleaseAllLocks();
00108 
00109     /* Cleanup our VirtualTransaction */
00110     VirtualXactLockTableCleanup();
00111 }
00112 
00113 
00114 /*
00115  * -----------------------------------------------------
00116  *      Standby wait timers and backend cancel logic
00117  * -----------------------------------------------------
00118  */
00119 
00120 /*
00121  * Determine the cutoff time at which we want to start canceling conflicting
00122  * transactions.  Returns zero (a time safely in the past) if we are willing
00123  * to wait forever.
00124  */
00125 static TimestampTz
00126 GetStandbyLimitTime(void)
00127 {
00128     TimestampTz rtime;
00129     bool        fromStream;
00130 
00131     /*
00132      * The cutoff time is the last WAL data receipt time plus the appropriate
00133      * delay variable.  Delay of -1 means wait forever.
00134      */
00135     GetXLogReceiptTime(&rtime, &fromStream);
00136     if (fromStream)
00137     {
00138         if (max_standby_streaming_delay < 0)
00139             return 0;           /* wait forever */
00140         return TimestampTzPlusMilliseconds(rtime, max_standby_streaming_delay);
00141     }
00142     else
00143     {
00144         if (max_standby_archive_delay < 0)
00145             return 0;           /* wait forever */
00146         return TimestampTzPlusMilliseconds(rtime, max_standby_archive_delay);
00147     }
00148 }
00149 
00150 #define STANDBY_INITIAL_WAIT_US  1000
00151 static int  standbyWait_us = STANDBY_INITIAL_WAIT_US;
00152 
00153 /*
00154  * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
00155  * We wait here for a while then return. If we decide we can't wait any
00156  * more then we return true, if we can wait some more return false.
00157  */
00158 static bool
00159 WaitExceedsMaxStandbyDelay(void)
00160 {
00161     TimestampTz ltime;
00162 
00163     /* Are we past the limit time? */
00164     ltime = GetStandbyLimitTime();
00165     if (ltime && GetCurrentTimestamp() >= ltime)
00166         return true;
00167 
00168     /*
00169      * Sleep a bit (this is essential to avoid busy-waiting).
00170      */
00171     pg_usleep(standbyWait_us);
00172 
00173     /*
00174      * Progressively increase the sleep times, but not to more than 1s, since
00175      * pg_usleep isn't interruptable on some platforms.
00176      */
00177     standbyWait_us *= 2;
00178     if (standbyWait_us > 1000000)
00179         standbyWait_us = 1000000;
00180 
00181     return false;
00182 }
00183 
00184 /*
00185  * This is the main executioner for any query backend that conflicts with
00186  * recovery processing. Judgement has already been passed on it within
00187  * a specific rmgr. Here we just issue the orders to the procs. The procs
00188  * then throw the required error as instructed.
00189  */
00190 static void
00191 ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
00192                                        ProcSignalReason reason)
00193 {
00194     TimestampTz waitStart;
00195     char       *new_status;
00196 
00197     /* Fast exit, to avoid a kernel call if there's no work to be done. */
00198     if (!VirtualTransactionIdIsValid(*waitlist))
00199         return;
00200 
00201     waitStart = GetCurrentTimestamp();
00202     new_status = NULL;          /* we haven't changed the ps display */
00203 
00204     while (VirtualTransactionIdIsValid(*waitlist))
00205     {
00206         /* reset standbyWait_us for each xact we wait for */
00207         standbyWait_us = STANDBY_INITIAL_WAIT_US;
00208 
00209         /* wait until the virtual xid is gone */
00210         while (!VirtualXactLock(*waitlist, false))
00211         {
00212             /*
00213              * Report via ps if we have been waiting for more than 500 msec
00214              * (should that be configurable?)
00215              */
00216             if (update_process_title && new_status == NULL &&
00217                 TimestampDifferenceExceeds(waitStart, GetCurrentTimestamp(),
00218                                            500))
00219             {
00220                 const char *old_status;
00221                 int         len;
00222 
00223                 old_status = get_ps_display(&len);
00224                 new_status = (char *) palloc(len + 8 + 1);
00225                 memcpy(new_status, old_status, len);
00226                 strcpy(new_status + len, " waiting");
00227                 set_ps_display(new_status, false);
00228                 new_status[len] = '\0'; /* truncate off " waiting" */
00229             }
00230 
00231             /* Is it time to kill it? */
00232             if (WaitExceedsMaxStandbyDelay())
00233             {
00234                 pid_t       pid;
00235 
00236                 /*
00237                  * Now find out who to throw out of the balloon.
00238                  */
00239                 Assert(VirtualTransactionIdIsValid(*waitlist));
00240                 pid = CancelVirtualTransaction(*waitlist, reason);
00241 
00242                 /*
00243                  * Wait a little bit for it to die so that we avoid flooding
00244                  * an unresponsive backend when system is heavily loaded.
00245                  */
00246                 if (pid != 0)
00247                     pg_usleep(5000L);
00248             }
00249         }
00250 
00251         /* The virtual transaction is gone now, wait for the next one */
00252         waitlist++;
00253     }
00254 
00255     /* Reset ps display if we changed it */
00256     if (new_status)
00257     {
00258         set_ps_display(new_status, false);
00259         pfree(new_status);
00260     }
00261 }
00262 
00263 void
00264 ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
00265 {
00266     VirtualTransactionId *backends;
00267 
00268     /*
00269      * If we get passed InvalidTransactionId then we are a little surprised,
00270      * but it is theoretically possible in normal running. It also happens
00271      * when replaying already applied WAL records after a standby crash or
00272      * restart. If latestRemovedXid is invalid then there is no conflict. That
00273      * rule applies across all record types that suffer from this conflict.
00274      */
00275     if (!TransactionIdIsValid(latestRemovedXid))
00276         return;
00277 
00278     backends = GetConflictingVirtualXIDs(latestRemovedXid,
00279                                          node.dbNode);
00280 
00281     ResolveRecoveryConflictWithVirtualXIDs(backends,
00282                                          PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
00283 }
00284 
00285 void
00286 ResolveRecoveryConflictWithTablespace(Oid tsid)
00287 {
00288     VirtualTransactionId *temp_file_users;
00289 
00290     /*
00291      * Standby users may be currently using this tablespace for their
00292      * temporary files. We only care about current users because
00293      * temp_tablespace parameter will just ignore tablespaces that no longer
00294      * exist.
00295      *
00296      * Ask everybody to cancel their queries immediately so we can ensure no
00297      * temp files remain and we can remove the tablespace. Nuke the entire
00298      * site from orbit, it's the only way to be sure.
00299      *
00300      * XXX: We could work out the pids of active backends using this
00301      * tablespace by examining the temp filenames in the directory. We would
00302      * then convert the pids into VirtualXIDs before attempting to cancel
00303      * them.
00304      *
00305      * We don't wait for commit because drop tablespace is non-transactional.
00306      */
00307     temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
00308                                                 InvalidOid);
00309     ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
00310                                        PROCSIG_RECOVERY_CONFLICT_TABLESPACE);
00311 }
00312 
00313 void
00314 ResolveRecoveryConflictWithDatabase(Oid dbid)
00315 {
00316     /*
00317      * We don't do ResolveRecoveryConflictWithVirtualXIDs() here since that
00318      * only waits for transactions and completely idle sessions would block
00319      * us. This is rare enough that we do this as simply as possible: no wait,
00320      * just force them off immediately.
00321      *
00322      * No locking is required here because we already acquired
00323      * AccessExclusiveLock. Anybody trying to connect while we do this will
00324      * block during InitPostgres() and then disconnect when they see the
00325      * database has been removed.
00326      */
00327     while (CountDBBackends(dbid) > 0)
00328     {
00329         CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true);
00330 
00331         /*
00332          * Wait awhile for them to die so that we avoid flooding an
00333          * unresponsive backend when system is heavily loaded.
00334          */
00335         pg_usleep(10000);
00336     }
00337 }
00338 
00339 static void
00340 ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
00341 {
00342     VirtualTransactionId *backends;
00343     bool        lock_acquired = false;
00344     int         num_attempts = 0;
00345     LOCKTAG     locktag;
00346 
00347     SET_LOCKTAG_RELATION(locktag, dbOid, relOid);
00348 
00349     /*
00350      * If blowing away everybody with conflicting locks doesn't work, after
00351      * the first two attempts then we just start blowing everybody away until
00352      * it does work. We do this because its likely that we either have too
00353      * many locks and we just can't get one at all, or that there are many
00354      * people crowding for the same table. Recovery must win; the end
00355      * justifies the means.
00356      */
00357     while (!lock_acquired)
00358     {
00359         if (++num_attempts < 3)
00360             backends = GetLockConflicts(&locktag, AccessExclusiveLock);
00361         else
00362             backends = GetConflictingVirtualXIDs(InvalidTransactionId,
00363                                                  InvalidOid);
00364 
00365         ResolveRecoveryConflictWithVirtualXIDs(backends,
00366                                              PROCSIG_RECOVERY_CONFLICT_LOCK);
00367 
00368         if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
00369             != LOCKACQUIRE_NOT_AVAIL)
00370             lock_acquired = true;
00371     }
00372 }
00373 
00374 /*
00375  * ResolveRecoveryConflictWithBufferPin is called from LockBufferForCleanup()
00376  * to resolve conflicts with other backends holding buffer pins.
00377  *
00378  * The ProcWaitForSignal() sleep normally done in LockBufferForCleanup()
00379  * (when not InHotStandby) is performed here, for code clarity.
00380  *
00381  * We either resolve conflicts immediately or set a timeout to wake us at
00382  * the limit of our patience.
00383  *
00384  * Resolve conflicts by sending a PROCSIG signal to all backends to check if
00385  * they hold one of the buffer pins that is blocking Startup process. If so,
00386  * those backends will take an appropriate error action, ERROR or FATAL.
00387  *
00388  * We also must check for deadlocks.  Deadlocks occur because if queries
00389  * wait on a lock, that must be behind an AccessExclusiveLock, which can only
00390  * be cleared if the Startup process replays a transaction completion record.
00391  * If Startup process is also waiting then that is a deadlock. The deadlock
00392  * can occur if the query is waiting and then the Startup sleeps, or if
00393  * Startup is sleeping and the query waits on a lock. We protect against
00394  * only the former sequence here, the latter sequence is checked prior to
00395  * the query sleeping, in CheckRecoveryConflictDeadlock().
00396  *
00397  * Deadlocks are extremely rare, and relatively expensive to check for,
00398  * so we don't do a deadlock check right away ... only if we have had to wait
00399  * at least deadlock_timeout.
00400  */
00401 void
00402 ResolveRecoveryConflictWithBufferPin(void)
00403 {
00404     TimestampTz ltime;
00405 
00406     Assert(InHotStandby);
00407 
00408     ltime = GetStandbyLimitTime();
00409 
00410     if (ltime == 0)
00411     {
00412         /*
00413          * We're willing to wait forever for conflicts, so set timeout for
00414          * deadlock check only
00415          */
00416         enable_timeout_after(STANDBY_DEADLOCK_TIMEOUT, DeadlockTimeout);
00417     }
00418     else if (GetCurrentTimestamp() >= ltime)
00419     {
00420         /*
00421          * We're already behind, so clear a path as quickly as possible.
00422          */
00423         SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
00424     }
00425     else
00426     {
00427         /*
00428          * Wake up at ltime, and check for deadlocks as well if we will be
00429          * waiting longer than deadlock_timeout
00430          */
00431         EnableTimeoutParams timeouts[2];
00432 
00433         timeouts[0].id = STANDBY_TIMEOUT;
00434         timeouts[0].type = TMPARAM_AT;
00435         timeouts[0].fin_time = ltime;
00436         timeouts[1].id = STANDBY_DEADLOCK_TIMEOUT;
00437         timeouts[1].type = TMPARAM_AFTER;
00438         timeouts[1].delay_ms = DeadlockTimeout;
00439         enable_timeouts(timeouts, 2);
00440     }
00441 
00442     /* Wait to be signaled by UnpinBuffer() */
00443     ProcWaitForSignal();
00444 
00445     /*
00446      * Clear any timeout requests established above.  We assume here that
00447      * the Startup process doesn't have any other timeouts than what this
00448      * function uses.  If that stops being true, we could cancel the
00449      * timeouts individually, but that'd be slower.
00450      */
00451     disable_all_timeouts(false);
00452 }
00453 
00454 static void
00455 SendRecoveryConflictWithBufferPin(ProcSignalReason reason)
00456 {
00457     Assert(reason == PROCSIG_RECOVERY_CONFLICT_BUFFERPIN ||
00458            reason == PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
00459 
00460     /*
00461      * We send signal to all backends to ask them if they are holding the
00462      * buffer pin which is delaying the Startup process. We must not set the
00463      * conflict flag yet, since most backends will be innocent. Let the
00464      * SIGUSR1 handling in each backend decide their own fate.
00465      */
00466     CancelDBBackends(InvalidOid, reason, false);
00467 }
00468 
00469 /*
00470  * In Hot Standby perform early deadlock detection.  We abort the lock
00471  * wait if we are about to sleep while holding the buffer pin that Startup
00472  * process is waiting for.
00473  *
00474  * Note: this code is pessimistic, because there is no way for it to
00475  * determine whether an actual deadlock condition is present: the lock we
00476  * need to wait for might be unrelated to any held by the Startup process.
00477  * Sooner or later, this mechanism should get ripped out in favor of somehow
00478  * accounting for buffer locks in DeadLockCheck().  However, errors here
00479  * seem to be very low-probability in practice, so for now it's not worth
00480  * the trouble.
00481  */
00482 void
00483 CheckRecoveryConflictDeadlock(void)
00484 {
00485     Assert(!InRecovery);        /* do not call in Startup process */
00486 
00487     if (!HoldingBufferPinThatDelaysRecovery())
00488         return;
00489 
00490     /*
00491      * Error message should match ProcessInterrupts() but we avoid calling
00492      * that because we aren't handling an interrupt at this point. Note that
00493      * we only cancel the current transaction here, so if we are in a
00494      * subtransaction and the pin is held by a parent, then the Startup
00495      * process will continue to wait even though we have avoided deadlock.
00496      */
00497     ereport(ERROR,
00498             (errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
00499              errmsg("canceling statement due to conflict with recovery"),
00500        errdetail("User transaction caused buffer deadlock with recovery.")));
00501 }
00502 
00503 
00504 /* --------------------------------
00505  *      timeout handler routines
00506  * --------------------------------
00507  */
00508 
00509 /*
00510  * StandbyDeadLockHandler() will be called if STANDBY_DEADLOCK_TIMEOUT
00511  * occurs before STANDBY_TIMEOUT.  Send out a request for hot-standby
00512  * backends to check themselves for deadlocks.
00513  */
00514 void
00515 StandbyDeadLockHandler(void)
00516 {
00517     SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
00518 }
00519 
00520 /*
00521  * StandbyTimeoutHandler() will be called if STANDBY_TIMEOUT is exceeded.
00522  * Send out a request to release conflicting buffer pins unconditionally,
00523  * so we can press ahead with applying changes in recovery.
00524  */
00525 void
00526 StandbyTimeoutHandler(void)
00527 {
00528     /* forget any pending STANDBY_DEADLOCK_TIMEOUT request */
00529     disable_timeout(STANDBY_DEADLOCK_TIMEOUT, false);
00530 
00531     SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
00532 }
00533 
00534 
00535 /*
00536  * -----------------------------------------------------
00537  * Locking in Recovery Mode
00538  * -----------------------------------------------------
00539  *
00540  * All locks are held by the Startup process using a single virtual
00541  * transaction. This implementation is both simpler and in some senses,
00542  * more correct. The locks held mean "some original transaction held
00543  * this lock, so query access is not allowed at this time". So the Startup
00544  * process is the proxy by which the original locks are implemented.
00545  *
00546  * We only keep track of AccessExclusiveLocks, which are only ever held by
00547  * one transaction on one relation, and don't worry about lock queuing.
00548  *
00549  * We keep a single dynamically expandible list of locks in local memory,
00550  * RelationLockList, so we can keep track of the various entries made by
00551  * the Startup process's virtual xid in the shared lock table.
00552  *
00553  * We record the lock against the top-level xid, rather than individual
00554  * subtransaction xids. This means AccessExclusiveLocks held by aborted
00555  * subtransactions are not released as early as possible on standbys.
00556  *
00557  * List elements use type xl_rel_lock, since the WAL record type exactly
00558  * matches the information that we need to keep track of.
00559  *
00560  * We use session locks rather than normal locks so we don't need
00561  * ResourceOwners.
00562  */
00563 
00564 
00565 void
00566 StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
00567 {
00568     xl_standby_lock *newlock;
00569     LOCKTAG     locktag;
00570 
00571     /* Already processed? */
00572     if (!TransactionIdIsValid(xid) ||
00573         TransactionIdDidCommit(xid) ||
00574         TransactionIdDidAbort(xid))
00575         return;
00576 
00577     elog(trace_recovery(DEBUG4),
00578          "adding recovery lock: db %u rel %u", dbOid, relOid);
00579 
00580     /* dbOid is InvalidOid when we are locking a shared relation. */
00581     Assert(OidIsValid(relOid));
00582 
00583     newlock = palloc(sizeof(xl_standby_lock));
00584     newlock->xid = xid;
00585     newlock->dbOid = dbOid;
00586     newlock->relOid = relOid;
00587     RecoveryLockList = lappend(RecoveryLockList, newlock);
00588 
00589     /*
00590      * Attempt to acquire the lock as requested, if not resolve conflict
00591      */
00592     SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
00593 
00594     if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
00595         == LOCKACQUIRE_NOT_AVAIL)
00596         ResolveRecoveryConflictWithLock(newlock->dbOid, newlock->relOid);
00597 }
00598 
00599 static void
00600 StandbyReleaseLocks(TransactionId xid)
00601 {
00602     ListCell   *cell,
00603                *prev,
00604                *next;
00605 
00606     /*
00607      * Release all matching locks and remove them from list
00608      */
00609     prev = NULL;
00610     for (cell = list_head(RecoveryLockList); cell; cell = next)
00611     {
00612         xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
00613 
00614         next = lnext(cell);
00615 
00616         if (!TransactionIdIsValid(xid) || lock->xid == xid)
00617         {
00618             LOCKTAG     locktag;
00619 
00620             elog(trace_recovery(DEBUG4),
00621                  "releasing recovery lock: xid %u db %u rel %u",
00622                  lock->xid, lock->dbOid, lock->relOid);
00623             SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
00624             if (!LockRelease(&locktag, AccessExclusiveLock, true))
00625                 elog(LOG,
00626                      "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
00627                      lock->xid, lock->dbOid, lock->relOid);
00628 
00629             RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
00630             pfree(lock);
00631         }
00632         else
00633             prev = cell;
00634     }
00635 }
00636 
00637 /*
00638  * Release locks for a transaction tree, starting at xid down, from
00639  * RecoveryLockList.
00640  *
00641  * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
00642  * to remove any AccessExclusiveLocks requested by a transaction.
00643  */
00644 void
00645 StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
00646 {
00647     int         i;
00648 
00649     StandbyReleaseLocks(xid);
00650 
00651     for (i = 0; i < nsubxids; i++)
00652         StandbyReleaseLocks(subxids[i]);
00653 }
00654 
00655 /*
00656  * Called at end of recovery and when we see a shutdown checkpoint.
00657  */
00658 void
00659 StandbyReleaseAllLocks(void)
00660 {
00661     ListCell   *cell,
00662                *prev,
00663                *next;
00664     LOCKTAG     locktag;
00665 
00666     elog(trace_recovery(DEBUG2), "release all standby locks");
00667 
00668     prev = NULL;
00669     for (cell = list_head(RecoveryLockList); cell; cell = next)
00670     {
00671         xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
00672 
00673         next = lnext(cell);
00674 
00675         elog(trace_recovery(DEBUG4),
00676              "releasing recovery lock: xid %u db %u rel %u",
00677              lock->xid, lock->dbOid, lock->relOid);
00678         SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
00679         if (!LockRelease(&locktag, AccessExclusiveLock, true))
00680             elog(LOG,
00681                  "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
00682                  lock->xid, lock->dbOid, lock->relOid);
00683         RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
00684         pfree(lock);
00685     }
00686 }
00687 
00688 /*
00689  * StandbyReleaseOldLocks
00690  *      Release standby locks held by top-level XIDs that aren't running,
00691  *      as long as they're not prepared transactions.
00692  */
00693 void
00694 StandbyReleaseOldLocks(int nxids, TransactionId *xids)
00695 {
00696     ListCell   *cell,
00697                *prev,
00698                *next;
00699     LOCKTAG     locktag;
00700 
00701     prev = NULL;
00702     for (cell = list_head(RecoveryLockList); cell; cell = next)
00703     {
00704         xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
00705         bool        remove = false;
00706 
00707         next = lnext(cell);
00708 
00709         Assert(TransactionIdIsValid(lock->xid));
00710 
00711         if (StandbyTransactionIdIsPrepared(lock->xid))
00712             remove = false;
00713         else
00714         {
00715             int         i;
00716             bool        found = false;
00717 
00718             for (i = 0; i < nxids; i++)
00719             {
00720                 if (lock->xid == xids[i])
00721                 {
00722                     found = true;
00723                     break;
00724                 }
00725             }
00726 
00727             /*
00728              * If its not a running transaction, remove it.
00729              */
00730             if (!found)
00731                 remove = true;
00732         }
00733 
00734         if (remove)
00735         {
00736             elog(trace_recovery(DEBUG4),
00737                  "releasing recovery lock: xid %u db %u rel %u",
00738                  lock->xid, lock->dbOid, lock->relOid);
00739             SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
00740             if (!LockRelease(&locktag, AccessExclusiveLock, true))
00741                 elog(LOG,
00742                      "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
00743                      lock->xid, lock->dbOid, lock->relOid);
00744             RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
00745             pfree(lock);
00746         }
00747         else
00748             prev = cell;
00749     }
00750 }
00751 
00752 /*
00753  * --------------------------------------------------------------------
00754  *      Recovery handling for Rmgr RM_STANDBY_ID
00755  *
00756  * These record types will only be created if XLogStandbyInfoActive()
00757  * --------------------------------------------------------------------
00758  */
00759 
00760 void
00761 standby_redo(XLogRecPtr lsn, XLogRecord *record)
00762 {
00763     uint8       info = record->xl_info & ~XLR_INFO_MASK;
00764 
00765     /* Backup blocks are not used in standby records */
00766     Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00767 
00768     /* Do nothing if we're not in hot standby mode */
00769     if (standbyState == STANDBY_DISABLED)
00770         return;
00771 
00772     if (info == XLOG_STANDBY_LOCK)
00773     {
00774         xl_standby_locks *xlrec = (xl_standby_locks *) XLogRecGetData(record);
00775         int         i;
00776 
00777         for (i = 0; i < xlrec->nlocks; i++)
00778             StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
00779                                               xlrec->locks[i].dbOid,
00780                                               xlrec->locks[i].relOid);
00781     }
00782     else if (info == XLOG_RUNNING_XACTS)
00783     {
00784         xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
00785         RunningTransactionsData running;
00786 
00787         running.xcnt = xlrec->xcnt;
00788         running.subxcnt = xlrec->subxcnt;
00789         running.subxid_overflow = xlrec->subxid_overflow;
00790         running.nextXid = xlrec->nextXid;
00791         running.latestCompletedXid = xlrec->latestCompletedXid;
00792         running.oldestRunningXid = xlrec->oldestRunningXid;
00793         running.xids = xlrec->xids;
00794 
00795         ProcArrayApplyRecoveryInfo(&running);
00796     }
00797     else
00798         elog(PANIC, "standby_redo: unknown op code %u", info);
00799 }
00800 
00801 /*
00802  * Log details of the current snapshot to WAL. This allows the snapshot state
00803  * to be reconstructed on the standby.
00804  *
00805  * We can move directly to STANDBY_SNAPSHOT_READY at startup if we
00806  * start from a shutdown checkpoint because we know nothing was running
00807  * at that time and our recovery snapshot is known empty. In the more
00808  * typical case of an online checkpoint we need to jump through a few
00809  * hoops to get a correct recovery snapshot and this requires a two or
00810  * sometimes a three stage process.
00811  *
00812  * The initial snapshot must contain all running xids and all current
00813  * AccessExclusiveLocks at a point in time on the standby. Assembling
00814  * that information while the server is running requires many and
00815  * various LWLocks, so we choose to derive that information piece by
00816  * piece and then re-assemble that info on the standby. When that
00817  * information is fully assembled we move to STANDBY_SNAPSHOT_READY.
00818  *
00819  * Since locking on the primary when we derive the information is not
00820  * strict, we note that there is a time window between the derivation and
00821  * writing to WAL of the derived information. That allows race conditions
00822  * that we must resolve, since xids and locks may enter or leave the
00823  * snapshot during that window. This creates the issue that an xid or
00824  * lock may start *after* the snapshot has been derived yet *before* the
00825  * snapshot is logged in the running xacts WAL record. We resolve this by
00826  * starting to accumulate changes at a point just prior to when we derive
00827  * the snapshot on the primary, then ignore duplicates when we later apply
00828  * the snapshot from the running xacts record. This is implemented during
00829  * CreateCheckpoint() where we use the logical checkpoint location as
00830  * our starting point and then write the running xacts record immediately
00831  * before writing the main checkpoint WAL record. Since we always start
00832  * up from a checkpoint and are immediately at our starting point, we
00833  * unconditionally move to STANDBY_INITIALIZED. After this point we
00834  * must do 4 things:
00835  *  * move shared nextXid forwards as we see new xids
00836  *  * extend the clog and subtrans with each new xid
00837  *  * keep track of uncommitted known assigned xids
00838  *  * keep track of uncommitted AccessExclusiveLocks
00839  *
00840  * When we see a commit/abort we must remove known assigned xids and locks
00841  * from the completing transaction. Attempted removals that cannot locate
00842  * an entry are expected and must not cause an error when we are in state
00843  * STANDBY_INITIALIZED. This is implemented in StandbyReleaseLocks() and
00844  * KnownAssignedXidsRemove().
00845  *
00846  * Later, when we apply the running xact data we must be careful to ignore
00847  * transactions already committed, since those commits raced ahead when
00848  * making WAL entries.
00849  *
00850  * The loose timing also means that locks may be recorded that have a
00851  * zero xid, since xids are removed from procs before locks are removed.
00852  * So we must prune the lock list down to ensure we hold locks only for
00853  * currently running xids, performed by StandbyReleaseOldLocks().
00854  * Zero xids should no longer be possible, but we may be replaying WAL
00855  * from a time when they were possible.
00856  */
00857 void
00858 LogStandbySnapshot(void)
00859 {
00860     RunningTransactions running;
00861     xl_standby_lock *locks;
00862     int         nlocks;
00863 
00864     Assert(XLogStandbyInfoActive());
00865 
00866     /*
00867      * Get details of any AccessExclusiveLocks being held at the moment.
00868      *
00869      * XXX GetRunningTransactionLocks() currently holds a lock on all
00870      * partitions though it is possible to further optimise the locking. By
00871      * reference counting locks and storing the value on the ProcArray entry
00872      * for each backend we can easily tell if any locks need recording without
00873      * trying to acquire the partition locks and scanning the lock table.
00874      */
00875     locks = GetRunningTransactionLocks(&nlocks);
00876     if (nlocks > 0)
00877         LogAccessExclusiveLocks(nlocks, locks);
00878 
00879     /*
00880      * Log details of all in-progress transactions. This should be the last
00881      * record we write, because standby will open up when it sees this.
00882      */
00883     running = GetRunningTransactionData();
00884     LogCurrentRunningXacts(running);
00885     /* GetRunningTransactionData() acquired XidGenLock, we must release it */
00886     LWLockRelease(XidGenLock);
00887 }
00888 
00889 /*
00890  * Record an enhanced snapshot of running transactions into WAL.
00891  *
00892  * The definitions of RunningTransactionsData and xl_xact_running_xacts
00893  * are similar. We keep them separate because xl_xact_running_xacts
00894  * is a contiguous chunk of memory and never exists fully until it is
00895  * assembled in WAL.
00896  */
00897 static void
00898 LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
00899 {
00900     xl_running_xacts xlrec;
00901     XLogRecData rdata[2];
00902     int         lastrdata = 0;
00903     XLogRecPtr  recptr;
00904 
00905     xlrec.xcnt = CurrRunningXacts->xcnt;
00906     xlrec.subxcnt = CurrRunningXacts->subxcnt;
00907     xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
00908     xlrec.nextXid = CurrRunningXacts->nextXid;
00909     xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
00910     xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
00911 
00912     /* Header */
00913     rdata[0].data = (char *) (&xlrec);
00914     rdata[0].len = MinSizeOfXactRunningXacts;
00915     rdata[0].buffer = InvalidBuffer;
00916 
00917     /* array of TransactionIds */
00918     if (xlrec.xcnt > 0)
00919     {
00920         rdata[0].next = &(rdata[1]);
00921         rdata[1].data = (char *) CurrRunningXacts->xids;
00922         rdata[1].len = (xlrec.xcnt + xlrec.subxcnt) * sizeof(TransactionId);
00923         rdata[1].buffer = InvalidBuffer;
00924         lastrdata = 1;
00925     }
00926 
00927     rdata[lastrdata].next = NULL;
00928 
00929     recptr = XLogInsert(RM_STANDBY_ID, XLOG_RUNNING_XACTS, rdata);
00930 
00931     if (CurrRunningXacts->subxid_overflow)
00932         elog(trace_recovery(DEBUG2),
00933              "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
00934              CurrRunningXacts->xcnt,
00935              (uint32) (recptr >> 32), (uint32) recptr,
00936              CurrRunningXacts->oldestRunningXid,
00937              CurrRunningXacts->latestCompletedXid,
00938              CurrRunningXacts->nextXid);
00939     else
00940         elog(trace_recovery(DEBUG2),
00941              "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
00942              CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt,
00943              (uint32) (recptr >> 32), (uint32) recptr,
00944              CurrRunningXacts->oldestRunningXid,
00945              CurrRunningXacts->latestCompletedXid,
00946              CurrRunningXacts->nextXid);
00947 }
00948 
00949 /*
00950  * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
00951  * logged, as described in backend/storage/lmgr/README.
00952  */
00953 static void
00954 LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks)
00955 {
00956     XLogRecData rdata[2];
00957     xl_standby_locks xlrec;
00958 
00959     xlrec.nlocks = nlocks;
00960 
00961     rdata[0].data = (char *) &xlrec;
00962     rdata[0].len = offsetof(xl_standby_locks, locks);
00963     rdata[0].buffer = InvalidBuffer;
00964     rdata[0].next = &rdata[1];
00965 
00966     rdata[1].data = (char *) locks;
00967     rdata[1].len = nlocks * sizeof(xl_standby_lock);
00968     rdata[1].buffer = InvalidBuffer;
00969     rdata[1].next = NULL;
00970 
00971     (void) XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_LOCK, rdata);
00972 }
00973 
00974 /*
00975  * Individual logging of AccessExclusiveLocks for use during LockAcquire()
00976  */
00977 void
00978 LogAccessExclusiveLock(Oid dbOid, Oid relOid)
00979 {
00980     xl_standby_lock xlrec;
00981 
00982     xlrec.xid = GetTopTransactionId();
00983 
00984     /*
00985      * Decode the locktag back to the original values, to avoid sending lots
00986      * of empty bytes with every message.  See lock.h to check how a locktag
00987      * is defined for LOCKTAG_RELATION
00988      */
00989     xlrec.dbOid = dbOid;
00990     xlrec.relOid = relOid;
00991 
00992     LogAccessExclusiveLocks(1, &xlrec);
00993 }
00994 
00995 /*
00996  * Prepare to log an AccessExclusiveLock, for use during LockAcquire()
00997  */
00998 void
00999 LogAccessExclusiveLockPrepare(void)
01000 {
01001     /*
01002      * Ensure that a TransactionId has been assigned to this transaction, for
01003      * two reasons, both related to lock release on the standby. First, we
01004      * must assign an xid so that RecordTransactionCommit() and
01005      * RecordTransactionAbort() do not optimise away the transaction
01006      * completion record which recovery relies upon to release locks. It's a
01007      * hack, but for a corner case not worth adding code for into the main
01008      * commit path. Second, we must assign an xid before the lock is recorded
01009      * in shared memory, otherwise a concurrently executing
01010      * GetRunningTransactionLocks() might see a lock associated with an
01011      * InvalidTransactionId which we later assert cannot happen.
01012      */
01013     (void) GetTopTransactionId();
01014 }