00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "postgres.h"
00019 #include "access/transam.h"
00020 #include "access/twophase.h"
00021 #include "access/xact.h"
00022 #include "access/xlog.h"
00023 #include "miscadmin.h"
00024 #include "storage/bufmgr.h"
00025 #include "storage/lmgr.h"
00026 #include "storage/proc.h"
00027 #include "storage/procarray.h"
00028 #include "storage/sinvaladt.h"
00029 #include "storage/standby.h"
00030 #include "utils/ps_status.h"
00031 #include "utils/timeout.h"
00032 #include "utils/timestamp.h"
00033
00034
00035 int vacuum_defer_cleanup_age;
00036 int max_standby_archive_delay = 30 * 1000;
00037 int max_standby_streaming_delay = 30 * 1000;
00038
00039 static List *RecoveryLockList;
00040
00041 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
00042 ProcSignalReason reason);
00043 static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
00044 static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
00045 static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
00046 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061 void
00062 InitRecoveryTransactionEnvironment(void)
00063 {
00064 VirtualTransactionId vxid;
00065
00066
00067
00068
00069
00070
00071
00072 SharedInvalBackendInit(true);
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086 vxid.backendId = MyBackendId;
00087 vxid.localTransactionId = GetNextLocalTransactionId();
00088 VirtualXactLockTableInsert(vxid);
00089
00090 standbyState = STANDBY_INITIALIZED;
00091 }
00092
00093
00094
00095
00096
00097
00098
00099
00100 void
00101 ShutdownRecoveryTransactionEnvironment(void)
00102 {
00103
00104 ExpireAllKnownAssignedTransactionIds();
00105
00106
00107 StandbyReleaseAllLocks();
00108
00109
00110 VirtualXactLockTableCleanup();
00111 }
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125 static TimestampTz
00126 GetStandbyLimitTime(void)
00127 {
00128 TimestampTz rtime;
00129 bool fromStream;
00130
00131
00132
00133
00134
00135 GetXLogReceiptTime(&rtime, &fromStream);
00136 if (fromStream)
00137 {
00138 if (max_standby_streaming_delay < 0)
00139 return 0;
00140 return TimestampTzPlusMilliseconds(rtime, max_standby_streaming_delay);
00141 }
00142 else
00143 {
00144 if (max_standby_archive_delay < 0)
00145 return 0;
00146 return TimestampTzPlusMilliseconds(rtime, max_standby_archive_delay);
00147 }
00148 }
00149
00150 #define STANDBY_INITIAL_WAIT_US 1000
00151 static int standbyWait_us = STANDBY_INITIAL_WAIT_US;
00152
00153
00154
00155
00156
00157
00158 static bool
00159 WaitExceedsMaxStandbyDelay(void)
00160 {
00161 TimestampTz ltime;
00162
00163
00164 ltime = GetStandbyLimitTime();
00165 if (ltime && GetCurrentTimestamp() >= ltime)
00166 return true;
00167
00168
00169
00170
00171 pg_usleep(standbyWait_us);
00172
00173
00174
00175
00176
00177 standbyWait_us *= 2;
00178 if (standbyWait_us > 1000000)
00179 standbyWait_us = 1000000;
00180
00181 return false;
00182 }
00183
00184
00185
00186
00187
00188
00189
00190 static void
00191 ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
00192 ProcSignalReason reason)
00193 {
00194 TimestampTz waitStart;
00195 char *new_status;
00196
00197
00198 if (!VirtualTransactionIdIsValid(*waitlist))
00199 return;
00200
00201 waitStart = GetCurrentTimestamp();
00202 new_status = NULL;
00203
00204 while (VirtualTransactionIdIsValid(*waitlist))
00205 {
00206
00207 standbyWait_us = STANDBY_INITIAL_WAIT_US;
00208
00209
00210 while (!VirtualXactLock(*waitlist, false))
00211 {
00212
00213
00214
00215
00216 if (update_process_title && new_status == NULL &&
00217 TimestampDifferenceExceeds(waitStart, GetCurrentTimestamp(),
00218 500))
00219 {
00220 const char *old_status;
00221 int len;
00222
00223 old_status = get_ps_display(&len);
00224 new_status = (char *) palloc(len + 8 + 1);
00225 memcpy(new_status, old_status, len);
00226 strcpy(new_status + len, " waiting");
00227 set_ps_display(new_status, false);
00228 new_status[len] = '\0';
00229 }
00230
00231
00232 if (WaitExceedsMaxStandbyDelay())
00233 {
00234 pid_t pid;
00235
00236
00237
00238
00239 Assert(VirtualTransactionIdIsValid(*waitlist));
00240 pid = CancelVirtualTransaction(*waitlist, reason);
00241
00242
00243
00244
00245
00246 if (pid != 0)
00247 pg_usleep(5000L);
00248 }
00249 }
00250
00251
00252 waitlist++;
00253 }
00254
00255
00256 if (new_status)
00257 {
00258 set_ps_display(new_status, false);
00259 pfree(new_status);
00260 }
00261 }
00262
00263 void
00264 ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
00265 {
00266 VirtualTransactionId *backends;
00267
00268
00269
00270
00271
00272
00273
00274
00275 if (!TransactionIdIsValid(latestRemovedXid))
00276 return;
00277
00278 backends = GetConflictingVirtualXIDs(latestRemovedXid,
00279 node.dbNode);
00280
00281 ResolveRecoveryConflictWithVirtualXIDs(backends,
00282 PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
00283 }
00284
00285 void
00286 ResolveRecoveryConflictWithTablespace(Oid tsid)
00287 {
00288 VirtualTransactionId *temp_file_users;
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307 temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
00308 InvalidOid);
00309 ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
00310 PROCSIG_RECOVERY_CONFLICT_TABLESPACE);
00311 }
00312
00313 void
00314 ResolveRecoveryConflictWithDatabase(Oid dbid)
00315 {
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327 while (CountDBBackends(dbid) > 0)
00328 {
00329 CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true);
00330
00331
00332
00333
00334
00335 pg_usleep(10000);
00336 }
00337 }
00338
00339 static void
00340 ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
00341 {
00342 VirtualTransactionId *backends;
00343 bool lock_acquired = false;
00344 int num_attempts = 0;
00345 LOCKTAG locktag;
00346
00347 SET_LOCKTAG_RELATION(locktag, dbOid, relOid);
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357 while (!lock_acquired)
00358 {
00359 if (++num_attempts < 3)
00360 backends = GetLockConflicts(&locktag, AccessExclusiveLock);
00361 else
00362 backends = GetConflictingVirtualXIDs(InvalidTransactionId,
00363 InvalidOid);
00364
00365 ResolveRecoveryConflictWithVirtualXIDs(backends,
00366 PROCSIG_RECOVERY_CONFLICT_LOCK);
00367
00368 if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
00369 != LOCKACQUIRE_NOT_AVAIL)
00370 lock_acquired = true;
00371 }
00372 }
00373
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401 void
00402 ResolveRecoveryConflictWithBufferPin(void)
00403 {
00404 TimestampTz ltime;
00405
00406 Assert(InHotStandby);
00407
00408 ltime = GetStandbyLimitTime();
00409
00410 if (ltime == 0)
00411 {
00412
00413
00414
00415
00416 enable_timeout_after(STANDBY_DEADLOCK_TIMEOUT, DeadlockTimeout);
00417 }
00418 else if (GetCurrentTimestamp() >= ltime)
00419 {
00420
00421
00422
00423 SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
00424 }
00425 else
00426 {
00427
00428
00429
00430
00431 EnableTimeoutParams timeouts[2];
00432
00433 timeouts[0].id = STANDBY_TIMEOUT;
00434 timeouts[0].type = TMPARAM_AT;
00435 timeouts[0].fin_time = ltime;
00436 timeouts[1].id = STANDBY_DEADLOCK_TIMEOUT;
00437 timeouts[1].type = TMPARAM_AFTER;
00438 timeouts[1].delay_ms = DeadlockTimeout;
00439 enable_timeouts(timeouts, 2);
00440 }
00441
00442
00443 ProcWaitForSignal();
00444
00445
00446
00447
00448
00449
00450
00451 disable_all_timeouts(false);
00452 }
00453
00454 static void
00455 SendRecoveryConflictWithBufferPin(ProcSignalReason reason)
00456 {
00457 Assert(reason == PROCSIG_RECOVERY_CONFLICT_BUFFERPIN ||
00458 reason == PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
00459
00460
00461
00462
00463
00464
00465
00466 CancelDBBackends(InvalidOid, reason, false);
00467 }
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482 void
00483 CheckRecoveryConflictDeadlock(void)
00484 {
00485 Assert(!InRecovery);
00486
00487 if (!HoldingBufferPinThatDelaysRecovery())
00488 return;
00489
00490
00491
00492
00493
00494
00495
00496
00497 ereport(ERROR,
00498 (errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
00499 errmsg("canceling statement due to conflict with recovery"),
00500 errdetail("User transaction caused buffer deadlock with recovery.")));
00501 }
00502
00503
00504
00505
00506
00507
00508
00509
00510
00511
00512
00513
00514 void
00515 StandbyDeadLockHandler(void)
00516 {
00517 SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
00518 }
00519
00520
00521
00522
00523
00524
00525 void
00526 StandbyTimeoutHandler(void)
00527 {
00528
00529 disable_timeout(STANDBY_DEADLOCK_TIMEOUT, false);
00530
00531 SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
00532 }
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565 void
00566 StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
00567 {
00568 xl_standby_lock *newlock;
00569 LOCKTAG locktag;
00570
00571
00572 if (!TransactionIdIsValid(xid) ||
00573 TransactionIdDidCommit(xid) ||
00574 TransactionIdDidAbort(xid))
00575 return;
00576
00577 elog(trace_recovery(DEBUG4),
00578 "adding recovery lock: db %u rel %u", dbOid, relOid);
00579
00580
00581 Assert(OidIsValid(relOid));
00582
00583 newlock = palloc(sizeof(xl_standby_lock));
00584 newlock->xid = xid;
00585 newlock->dbOid = dbOid;
00586 newlock->relOid = relOid;
00587 RecoveryLockList = lappend(RecoveryLockList, newlock);
00588
00589
00590
00591
00592 SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
00593
00594 if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
00595 == LOCKACQUIRE_NOT_AVAIL)
00596 ResolveRecoveryConflictWithLock(newlock->dbOid, newlock->relOid);
00597 }
00598
00599 static void
00600 StandbyReleaseLocks(TransactionId xid)
00601 {
00602 ListCell *cell,
00603 *prev,
00604 *next;
00605
00606
00607
00608
00609 prev = NULL;
00610 for (cell = list_head(RecoveryLockList); cell; cell = next)
00611 {
00612 xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
00613
00614 next = lnext(cell);
00615
00616 if (!TransactionIdIsValid(xid) || lock->xid == xid)
00617 {
00618 LOCKTAG locktag;
00619
00620 elog(trace_recovery(DEBUG4),
00621 "releasing recovery lock: xid %u db %u rel %u",
00622 lock->xid, lock->dbOid, lock->relOid);
00623 SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
00624 if (!LockRelease(&locktag, AccessExclusiveLock, true))
00625 elog(LOG,
00626 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
00627 lock->xid, lock->dbOid, lock->relOid);
00628
00629 RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
00630 pfree(lock);
00631 }
00632 else
00633 prev = cell;
00634 }
00635 }
00636
00637
00638
00639
00640
00641
00642
00643
00644 void
00645 StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
00646 {
00647 int i;
00648
00649 StandbyReleaseLocks(xid);
00650
00651 for (i = 0; i < nsubxids; i++)
00652 StandbyReleaseLocks(subxids[i]);
00653 }
00654
00655
00656
00657
00658 void
00659 StandbyReleaseAllLocks(void)
00660 {
00661 ListCell *cell,
00662 *prev,
00663 *next;
00664 LOCKTAG locktag;
00665
00666 elog(trace_recovery(DEBUG2), "release all standby locks");
00667
00668 prev = NULL;
00669 for (cell = list_head(RecoveryLockList); cell; cell = next)
00670 {
00671 xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
00672
00673 next = lnext(cell);
00674
00675 elog(trace_recovery(DEBUG4),
00676 "releasing recovery lock: xid %u db %u rel %u",
00677 lock->xid, lock->dbOid, lock->relOid);
00678 SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
00679 if (!LockRelease(&locktag, AccessExclusiveLock, true))
00680 elog(LOG,
00681 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
00682 lock->xid, lock->dbOid, lock->relOid);
00683 RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
00684 pfree(lock);
00685 }
00686 }
00687
00688
00689
00690
00691
00692
00693 void
00694 StandbyReleaseOldLocks(int nxids, TransactionId *xids)
00695 {
00696 ListCell *cell,
00697 *prev,
00698 *next;
00699 LOCKTAG locktag;
00700
00701 prev = NULL;
00702 for (cell = list_head(RecoveryLockList); cell; cell = next)
00703 {
00704 xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
00705 bool remove = false;
00706
00707 next = lnext(cell);
00708
00709 Assert(TransactionIdIsValid(lock->xid));
00710
00711 if (StandbyTransactionIdIsPrepared(lock->xid))
00712 remove = false;
00713 else
00714 {
00715 int i;
00716 bool found = false;
00717
00718 for (i = 0; i < nxids; i++)
00719 {
00720 if (lock->xid == xids[i])
00721 {
00722 found = true;
00723 break;
00724 }
00725 }
00726
00727
00728
00729
00730 if (!found)
00731 remove = true;
00732 }
00733
00734 if (remove)
00735 {
00736 elog(trace_recovery(DEBUG4),
00737 "releasing recovery lock: xid %u db %u rel %u",
00738 lock->xid, lock->dbOid, lock->relOid);
00739 SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
00740 if (!LockRelease(&locktag, AccessExclusiveLock, true))
00741 elog(LOG,
00742 "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
00743 lock->xid, lock->dbOid, lock->relOid);
00744 RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
00745 pfree(lock);
00746 }
00747 else
00748 prev = cell;
00749 }
00750 }
00751
00752
00753
00754
00755
00756
00757
00758
00759
00760 void
00761 standby_redo(XLogRecPtr lsn, XLogRecord *record)
00762 {
00763 uint8 info = record->xl_info & ~XLR_INFO_MASK;
00764
00765
00766 Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00767
00768
00769 if (standbyState == STANDBY_DISABLED)
00770 return;
00771
00772 if (info == XLOG_STANDBY_LOCK)
00773 {
00774 xl_standby_locks *xlrec = (xl_standby_locks *) XLogRecGetData(record);
00775 int i;
00776
00777 for (i = 0; i < xlrec->nlocks; i++)
00778 StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
00779 xlrec->locks[i].dbOid,
00780 xlrec->locks[i].relOid);
00781 }
00782 else if (info == XLOG_RUNNING_XACTS)
00783 {
00784 xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
00785 RunningTransactionsData running;
00786
00787 running.xcnt = xlrec->xcnt;
00788 running.subxcnt = xlrec->subxcnt;
00789 running.subxid_overflow = xlrec->subxid_overflow;
00790 running.nextXid = xlrec->nextXid;
00791 running.latestCompletedXid = xlrec->latestCompletedXid;
00792 running.oldestRunningXid = xlrec->oldestRunningXid;
00793 running.xids = xlrec->xids;
00794
00795 ProcArrayApplyRecoveryInfo(&running);
00796 }
00797 else
00798 elog(PANIC, "standby_redo: unknown op code %u", info);
00799 }
00800
00801
00802
00803
00804
00805
00806
00807
00808
00809
00810
00811
00812
00813
00814
00815
00816
00817
00818
00819
00820
00821
00822
00823
00824
00825
00826
00827
00828
00829
00830
00831
00832
00833
00834
00835
00836
00837
00838
00839
00840
00841
00842
00843
00844
00845
00846
00847
00848
00849
00850
00851
00852
00853
00854
00855
00856
00857 void
00858 LogStandbySnapshot(void)
00859 {
00860 RunningTransactions running;
00861 xl_standby_lock *locks;
00862 int nlocks;
00863
00864 Assert(XLogStandbyInfoActive());
00865
00866
00867
00868
00869
00870
00871
00872
00873
00874
00875 locks = GetRunningTransactionLocks(&nlocks);
00876 if (nlocks > 0)
00877 LogAccessExclusiveLocks(nlocks, locks);
00878
00879
00880
00881
00882
00883 running = GetRunningTransactionData();
00884 LogCurrentRunningXacts(running);
00885
00886 LWLockRelease(XidGenLock);
00887 }
00888
00889
00890
00891
00892
00893
00894
00895
00896
00897 static void
00898 LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
00899 {
00900 xl_running_xacts xlrec;
00901 XLogRecData rdata[2];
00902 int lastrdata = 0;
00903 XLogRecPtr recptr;
00904
00905 xlrec.xcnt = CurrRunningXacts->xcnt;
00906 xlrec.subxcnt = CurrRunningXacts->subxcnt;
00907 xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
00908 xlrec.nextXid = CurrRunningXacts->nextXid;
00909 xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
00910 xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
00911
00912
00913 rdata[0].data = (char *) (&xlrec);
00914 rdata[0].len = MinSizeOfXactRunningXacts;
00915 rdata[0].buffer = InvalidBuffer;
00916
00917
00918 if (xlrec.xcnt > 0)
00919 {
00920 rdata[0].next = &(rdata[1]);
00921 rdata[1].data = (char *) CurrRunningXacts->xids;
00922 rdata[1].len = (xlrec.xcnt + xlrec.subxcnt) * sizeof(TransactionId);
00923 rdata[1].buffer = InvalidBuffer;
00924 lastrdata = 1;
00925 }
00926
00927 rdata[lastrdata].next = NULL;
00928
00929 recptr = XLogInsert(RM_STANDBY_ID, XLOG_RUNNING_XACTS, rdata);
00930
00931 if (CurrRunningXacts->subxid_overflow)
00932 elog(trace_recovery(DEBUG2),
00933 "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
00934 CurrRunningXacts->xcnt,
00935 (uint32) (recptr >> 32), (uint32) recptr,
00936 CurrRunningXacts->oldestRunningXid,
00937 CurrRunningXacts->latestCompletedXid,
00938 CurrRunningXacts->nextXid);
00939 else
00940 elog(trace_recovery(DEBUG2),
00941 "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
00942 CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt,
00943 (uint32) (recptr >> 32), (uint32) recptr,
00944 CurrRunningXacts->oldestRunningXid,
00945 CurrRunningXacts->latestCompletedXid,
00946 CurrRunningXacts->nextXid);
00947 }
00948
00949
00950
00951
00952
00953 static void
00954 LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks)
00955 {
00956 XLogRecData rdata[2];
00957 xl_standby_locks xlrec;
00958
00959 xlrec.nlocks = nlocks;
00960
00961 rdata[0].data = (char *) &xlrec;
00962 rdata[0].len = offsetof(xl_standby_locks, locks);
00963 rdata[0].buffer = InvalidBuffer;
00964 rdata[0].next = &rdata[1];
00965
00966 rdata[1].data = (char *) locks;
00967 rdata[1].len = nlocks * sizeof(xl_standby_lock);
00968 rdata[1].buffer = InvalidBuffer;
00969 rdata[1].next = NULL;
00970
00971 (void) XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_LOCK, rdata);
00972 }
00973
00974
00975
00976
00977 void
00978 LogAccessExclusiveLock(Oid dbOid, Oid relOid)
00979 {
00980 xl_standby_lock xlrec;
00981
00982 xlrec.xid = GetTopTransactionId();
00983
00984
00985
00986
00987
00988
00989 xlrec.dbOid = dbOid;
00990 xlrec.relOid = relOid;
00991
00992 LogAccessExclusiveLocks(1, &xlrec);
00993 }
00994
00995
00996
00997
00998 void
00999 LogAccessExclusiveLockPrepare(void)
01000 {
01001
01002
01003
01004
01005
01006
01007
01008
01009
01010
01011
01012
01013 (void) GetTopTransactionId();
01014 }