00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044 #include "postgres.h"
00045
00046 #include <signal.h>
00047
00048 #include "access/clog.h"
00049 #include "access/subtrans.h"
00050 #include "access/transam.h"
00051 #include "access/xact.h"
00052 #include "access/twophase.h"
00053 #include "miscadmin.h"
00054 #include "storage/proc.h"
00055 #include "storage/procarray.h"
00056 #include "storage/spin.h"
00057 #include "utils/builtins.h"
00058 #include "utils/snapmgr.h"
00059
00060
00061
00062 typedef struct ProcArrayStruct
00063 {
00064 int numProcs;
00065 int maxProcs;
00066
00067
00068
00069
00070 int maxKnownAssignedXids;
00071 int numKnownAssignedXids;
00072 int tailKnownAssignedXids;
00073 int headKnownAssignedXids;
00074 slock_t known_assigned_xids_lck;
00075
00076
00077
00078
00079
00080
00081
00082
00083 TransactionId lastOverflowedXid;
00084
00085
00086
00087
00088
00089 int pgprocnos[1];
00090 } ProcArrayStruct;
00091
00092 static ProcArrayStruct *procArray;
00093
00094 static PGPROC *allProcs;
00095 static PGXACT *allPgXact;
00096
00097
00098
00099
00100 static TransactionId *KnownAssignedXids;
00101 static bool *KnownAssignedXidsValid;
00102 static TransactionId latestObservedXid = InvalidTransactionId;
00103
00104
00105
00106
00107
00108
00109 static TransactionId standbySnapshotPendingXmin;
00110
00111 #ifdef XIDCACHE_DEBUG
00112
00113
00114 static long xc_by_recent_xmin = 0;
00115 static long xc_by_known_xact = 0;
00116 static long xc_by_my_xact = 0;
00117 static long xc_by_latest_xid = 0;
00118 static long xc_by_main_xid = 0;
00119 static long xc_by_child_xid = 0;
00120 static long xc_by_known_assigned = 0;
00121 static long xc_no_overflow = 0;
00122 static long xc_slow_answer = 0;
00123
00124 #define xc_by_recent_xmin_inc() (xc_by_recent_xmin++)
00125 #define xc_by_known_xact_inc() (xc_by_known_xact++)
00126 #define xc_by_my_xact_inc() (xc_by_my_xact++)
00127 #define xc_by_latest_xid_inc() (xc_by_latest_xid++)
00128 #define xc_by_main_xid_inc() (xc_by_main_xid++)
00129 #define xc_by_child_xid_inc() (xc_by_child_xid++)
00130 #define xc_by_known_assigned_inc() (xc_by_known_assigned++)
00131 #define xc_no_overflow_inc() (xc_no_overflow++)
00132 #define xc_slow_answer_inc() (xc_slow_answer++)
00133
00134 static void DisplayXidCache(void);
00135 #else
00136
00137 #define xc_by_recent_xmin_inc() ((void) 0)
00138 #define xc_by_known_xact_inc() ((void) 0)
00139 #define xc_by_my_xact_inc() ((void) 0)
00140 #define xc_by_latest_xid_inc() ((void) 0)
00141 #define xc_by_main_xid_inc() ((void) 0)
00142 #define xc_by_child_xid_inc() ((void) 0)
00143 #define xc_by_known_assigned_inc() ((void) 0)
00144 #define xc_no_overflow_inc() ((void) 0)
00145 #define xc_slow_answer_inc() ((void) 0)
00146 #endif
00147
00148
00149 static void KnownAssignedXidsCompress(bool force);
00150 static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
00151 bool exclusive_lock);
00152 static bool KnownAssignedXidsSearch(TransactionId xid, bool remove);
00153 static bool KnownAssignedXidExists(TransactionId xid);
00154 static void KnownAssignedXidsRemove(TransactionId xid);
00155 static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
00156 TransactionId *subxids);
00157 static void KnownAssignedXidsRemovePreceding(TransactionId xid);
00158 static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
00159 static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
00160 TransactionId *xmin,
00161 TransactionId xmax);
00162 static TransactionId KnownAssignedXidsGetOldestXmin(void);
00163 static void KnownAssignedXidsDisplay(int trace_level);
00164 static void KnownAssignedXidsReset(void);
00165
00166
00167
00168
00169 Size
00170 ProcArrayShmemSize(void)
00171 {
00172 Size size;
00173
00174
00175 #define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
00176
00177 size = offsetof(ProcArrayStruct, pgprocnos);
00178 size = add_size(size, mul_size(sizeof(int), PROCARRAY_MAXPROCS));
00179
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193 #define TOTAL_MAX_CACHED_SUBXIDS \
00194 ((PGPROC_MAX_CACHED_SUBXIDS + 1) * PROCARRAY_MAXPROCS)
00195
00196 if (EnableHotStandby)
00197 {
00198 size = add_size(size,
00199 mul_size(sizeof(TransactionId),
00200 TOTAL_MAX_CACHED_SUBXIDS));
00201 size = add_size(size,
00202 mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS));
00203 }
00204
00205 return size;
00206 }
00207
00208
00209
00210
00211 void
00212 CreateSharedProcArray(void)
00213 {
00214 bool found;
00215
00216
00217 procArray = (ProcArrayStruct *)
00218 ShmemInitStruct("Proc Array",
00219 add_size(offsetof(ProcArrayStruct, pgprocnos),
00220 mul_size(sizeof(int),
00221 PROCARRAY_MAXPROCS)),
00222 &found);
00223
00224 if (!found)
00225 {
00226
00227
00228
00229 procArray->numProcs = 0;
00230 procArray->maxProcs = PROCARRAY_MAXPROCS;
00231 procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
00232 procArray->numKnownAssignedXids = 0;
00233 procArray->tailKnownAssignedXids = 0;
00234 procArray->headKnownAssignedXids = 0;
00235 SpinLockInit(&procArray->known_assigned_xids_lck);
00236 procArray->lastOverflowedXid = InvalidTransactionId;
00237 }
00238
00239 allProcs = ProcGlobal->allProcs;
00240 allPgXact = ProcGlobal->allPgXact;
00241
00242
00243 if (EnableHotStandby)
00244 {
00245 KnownAssignedXids = (TransactionId *)
00246 ShmemInitStruct("KnownAssignedXids",
00247 mul_size(sizeof(TransactionId),
00248 TOTAL_MAX_CACHED_SUBXIDS),
00249 &found);
00250 KnownAssignedXidsValid = (bool *)
00251 ShmemInitStruct("KnownAssignedXidsValid",
00252 mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
00253 &found);
00254 }
00255 }
00256
00257
00258
00259
00260 void
00261 ProcArrayAdd(PGPROC *proc)
00262 {
00263 ProcArrayStruct *arrayP = procArray;
00264 int index;
00265
00266 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
00267
00268 if (arrayP->numProcs >= arrayP->maxProcs)
00269 {
00270
00271
00272
00273
00274
00275 LWLockRelease(ProcArrayLock);
00276 ereport(FATAL,
00277 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
00278 errmsg("sorry, too many clients already")));
00279 }
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290 for (index = 0; index < arrayP->numProcs; index++)
00291 {
00292
00293
00294
00295
00296 if ((arrayP->pgprocnos[index] == -1) || (arrayP->pgprocnos[index] > proc->pgprocno))
00297 break;
00298 }
00299
00300 memmove(&arrayP->pgprocnos[index + 1], &arrayP->pgprocnos[index],
00301 (arrayP->numProcs - index) * sizeof(int));
00302 arrayP->pgprocnos[index] = proc->pgprocno;
00303 arrayP->numProcs++;
00304
00305 LWLockRelease(ProcArrayLock);
00306 }
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318 void
00319 ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
00320 {
00321 ProcArrayStruct *arrayP = procArray;
00322 int index;
00323
00324 #ifdef XIDCACHE_DEBUG
00325
00326 if (proc->pid != 0)
00327 DisplayXidCache();
00328 #endif
00329
00330 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
00331
00332 if (TransactionIdIsValid(latestXid))
00333 {
00334 Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
00335
00336
00337 if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
00338 latestXid))
00339 ShmemVariableCache->latestCompletedXid = latestXid;
00340 }
00341 else
00342 {
00343
00344 Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
00345 }
00346
00347 for (index = 0; index < arrayP->numProcs; index++)
00348 {
00349 if (arrayP->pgprocnos[index] == proc->pgprocno)
00350 {
00351
00352 memmove(&arrayP->pgprocnos[index], &arrayP->pgprocnos[index + 1],
00353 (arrayP->numProcs - index - 1) * sizeof(int));
00354 arrayP->pgprocnos[arrayP->numProcs - 1] = -1;
00355 arrayP->numProcs--;
00356 LWLockRelease(ProcArrayLock);
00357 return;
00358 }
00359 }
00360
00361
00362 LWLockRelease(ProcArrayLock);
00363
00364 elog(LOG, "failed to find proc %p in ProcArray", proc);
00365 }
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381 void
00382 ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
00383 {
00384 PGXACT *pgxact = &allPgXact[proc->pgprocno];
00385
00386 if (TransactionIdIsValid(latestXid))
00387 {
00388
00389
00390
00391
00392
00393
00394 Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
00395
00396 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
00397
00398 pgxact->xid = InvalidTransactionId;
00399 proc->lxid = InvalidLocalTransactionId;
00400 pgxact->xmin = InvalidTransactionId;
00401
00402 pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
00403 pgxact->delayChkpt = false;
00404 proc->recoveryConflictPending = false;
00405
00406
00407 pgxact->nxids = 0;
00408 pgxact->overflowed = false;
00409
00410
00411 if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
00412 latestXid))
00413 ShmemVariableCache->latestCompletedXid = latestXid;
00414
00415 LWLockRelease(ProcArrayLock);
00416 }
00417 else
00418 {
00419
00420
00421
00422
00423
00424 Assert(!TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
00425
00426 proc->lxid = InvalidLocalTransactionId;
00427 pgxact->xmin = InvalidTransactionId;
00428
00429 pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
00430 pgxact->delayChkpt = false;
00431 proc->recoveryConflictPending = false;
00432
00433 Assert(pgxact->nxids == 0);
00434 Assert(pgxact->overflowed == false);
00435 }
00436 }
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447 void
00448 ProcArrayClearTransaction(PGPROC *proc)
00449 {
00450 PGXACT *pgxact = &allPgXact[proc->pgprocno];
00451
00452
00453
00454
00455
00456
00457
00458 pgxact->xid = InvalidTransactionId;
00459 proc->lxid = InvalidLocalTransactionId;
00460 pgxact->xmin = InvalidTransactionId;
00461 proc->recoveryConflictPending = false;
00462
00463
00464 pgxact->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
00465 pgxact->delayChkpt = false;
00466
00467
00468 pgxact->nxids = 0;
00469 pgxact->overflowed = false;
00470 }
00471
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487 void
00488 ProcArrayApplyRecoveryInfo(RunningTransactions running)
00489 {
00490 TransactionId *xids;
00491 int nxids;
00492 TransactionId nextXid;
00493 int i;
00494
00495 Assert(standbyState >= STANDBY_INITIALIZED);
00496 Assert(TransactionIdIsValid(running->nextXid));
00497 Assert(TransactionIdIsValid(running->oldestRunningXid));
00498 Assert(TransactionIdIsNormal(running->latestCompletedXid));
00499
00500
00501
00502
00503 ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
00504
00505
00506
00507
00508
00509
00510
00511 StandbyReleaseOldLocks(running->xcnt, running->xids);
00512
00513
00514
00515
00516 if (standbyState == STANDBY_SNAPSHOT_READY)
00517 return;
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530 if (standbyState == STANDBY_SNAPSHOT_PENDING)
00531 {
00532
00533
00534
00535
00536 if (!running->subxid_overflow || running->xcnt == 0)
00537 {
00538
00539
00540
00541
00542 KnownAssignedXidsReset();
00543 standbyState = STANDBY_INITIALIZED;
00544 }
00545 else
00546 {
00547 if (TransactionIdPrecedes(standbySnapshotPendingXmin,
00548 running->oldestRunningXid))
00549 {
00550 standbyState = STANDBY_SNAPSHOT_READY;
00551 elog(trace_recovery(DEBUG1),
00552 "recovery snapshots are now enabled");
00553 }
00554 else
00555 elog(trace_recovery(DEBUG1),
00556 "recovery snapshot waiting for non-overflowed snapshot or "
00557 "until oldest active xid on standby is at least %u (now %u)",
00558 standbySnapshotPendingXmin,
00559 running->oldestRunningXid);
00560 return;
00561 }
00562 }
00563
00564 Assert(standbyState == STANDBY_INITIALIZED);
00565
00566
00567
00568
00569
00570
00571
00572
00573 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591 xids = palloc(sizeof(TransactionId) * (running->xcnt + running->subxcnt));
00592
00593
00594
00595
00596 nxids = 0;
00597 for (i = 0; i < running->xcnt + running->subxcnt; i++)
00598 {
00599 TransactionId xid = running->xids[i];
00600
00601
00602
00603
00604
00605
00606
00607 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
00608 continue;
00609
00610 xids[nxids++] = xid;
00611 }
00612
00613 if (nxids > 0)
00614 {
00615 if (procArray->numKnownAssignedXids != 0)
00616 {
00617 LWLockRelease(ProcArrayLock);
00618 elog(ERROR, "KnownAssignedXids is not empty");
00619 }
00620
00621
00622
00623
00624
00625 qsort(xids, nxids, sizeof(TransactionId), xidComparator);
00626
00627
00628
00629
00630 for (i = 0; i < nxids; i++)
00631 KnownAssignedXidsAdd(xids[i], xids[i], true);
00632
00633 KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
00634 }
00635
00636 pfree(xids);
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650 latestObservedXid = running->nextXid;
00651 TransactionIdRetreat(latestObservedXid);
00652
00653 if (running->subxid_overflow)
00654 {
00655 standbyState = STANDBY_SNAPSHOT_PENDING;
00656
00657 standbySnapshotPendingXmin = latestObservedXid;
00658 procArray->lastOverflowedXid = latestObservedXid;
00659 }
00660 else
00661 {
00662 standbyState = STANDBY_SNAPSHOT_READY;
00663
00664 standbySnapshotPendingXmin = InvalidTransactionId;
00665 }
00666
00667
00668
00669
00670
00671
00672 if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
00673 running->latestCompletedXid))
00674 ShmemVariableCache->latestCompletedXid = running->latestCompletedXid;
00675
00676 Assert(TransactionIdIsNormal(ShmemVariableCache->latestCompletedXid));
00677
00678 LWLockRelease(ProcArrayLock);
00679
00680
00681
00682
00683
00684
00685
00686
00687 nextXid = latestObservedXid;
00688 TransactionIdAdvance(nextXid);
00689 if (TransactionIdFollows(nextXid, ShmemVariableCache->nextXid))
00690 {
00691 LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
00692 ShmemVariableCache->nextXid = nextXid;
00693 LWLockRelease(XidGenLock);
00694 }
00695
00696 Assert(TransactionIdIsValid(ShmemVariableCache->nextXid));
00697
00698 KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
00699 if (standbyState == STANDBY_SNAPSHOT_READY)
00700 elog(trace_recovery(DEBUG1), "recovery snapshots are now enabled");
00701 else
00702 elog(trace_recovery(DEBUG1),
00703 "recovery snapshot waiting for non-overflowed snapshot or "
00704 "until oldest active xid on standby is at least %u (now %u)",
00705 standbySnapshotPendingXmin,
00706 running->oldestRunningXid);
00707 }
00708
00709
00710
00711
00712
00713 void
00714 ProcArrayApplyXidAssignment(TransactionId topxid,
00715 int nsubxids, TransactionId *subxids)
00716 {
00717 TransactionId max_xid;
00718 int i;
00719
00720 Assert(standbyState >= STANDBY_INITIALIZED);
00721
00722 max_xid = TransactionIdLatest(topxid, nsubxids, subxids);
00723
00724
00725
00726
00727
00728
00729
00730
00731
00732 RecordKnownAssignedTransactionIds(max_xid);
00733
00734
00735
00736
00737
00738
00739
00740
00741
00742
00743
00744
00745 for (i = 0; i < nsubxids; i++)
00746 SubTransSetParent(subxids[i], topxid, false);
00747
00748
00749
00750
00751 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
00752
00753
00754
00755
00756 KnownAssignedXidsRemoveTree(InvalidTransactionId, nsubxids, subxids);
00757
00758
00759
00760
00761 if (TransactionIdPrecedes(procArray->lastOverflowedXid, max_xid))
00762 procArray->lastOverflowedXid = max_xid;
00763
00764 LWLockRelease(ProcArrayLock);
00765 }
00766
00767
00768
00769
00770
00771
00772
00773
00774
00775
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787
00788
00789
00790
00791
00792
00793 bool
00794 TransactionIdIsInProgress(TransactionId xid)
00795 {
00796 static TransactionId *xids = NULL;
00797 int nxids = 0;
00798 ProcArrayStruct *arrayP = procArray;
00799 TransactionId topxid;
00800 int i,
00801 j;
00802
00803
00804
00805
00806
00807
00808
00809 if (TransactionIdPrecedes(xid, RecentXmin))
00810 {
00811 xc_by_recent_xmin_inc();
00812 return false;
00813 }
00814
00815
00816
00817
00818
00819
00820 if (TransactionIdIsKnownCompleted(xid))
00821 {
00822 xc_by_known_xact_inc();
00823 return false;
00824 }
00825
00826
00827
00828
00829
00830 if (TransactionIdIsCurrentTransactionId(xid))
00831 {
00832 xc_by_my_xact_inc();
00833 return true;
00834 }
00835
00836
00837
00838
00839
00840 if (xids == NULL)
00841 {
00842
00843
00844
00845
00846
00847 int maxxids = RecoveryInProgress() ? TOTAL_MAX_CACHED_SUBXIDS : arrayP->maxProcs;
00848
00849 xids = (TransactionId *) malloc(maxxids * sizeof(TransactionId));
00850 if (xids == NULL)
00851 ereport(ERROR,
00852 (errcode(ERRCODE_OUT_OF_MEMORY),
00853 errmsg("out of memory")));
00854 }
00855
00856 LWLockAcquire(ProcArrayLock, LW_SHARED);
00857
00858
00859
00860
00861
00862 if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, xid))
00863 {
00864 LWLockRelease(ProcArrayLock);
00865 xc_by_latest_xid_inc();
00866 return true;
00867 }
00868
00869
00870 for (i = 0; i < arrayP->numProcs; i++)
00871 {
00872 int pgprocno = arrayP->pgprocnos[i];
00873 volatile PGPROC *proc = &allProcs[pgprocno];
00874 volatile PGXACT *pgxact = &allPgXact[pgprocno];
00875 TransactionId pxid;
00876
00877
00878 if (proc == MyProc)
00879 continue;
00880
00881
00882 pxid = pgxact->xid;
00883
00884 if (!TransactionIdIsValid(pxid))
00885 continue;
00886
00887
00888
00889
00890 if (TransactionIdEquals(pxid, xid))
00891 {
00892 LWLockRelease(ProcArrayLock);
00893 xc_by_main_xid_inc();
00894 return true;
00895 }
00896
00897
00898
00899
00900
00901 if (TransactionIdPrecedes(xid, pxid))
00902 continue;
00903
00904
00905
00906
00907 for (j = pgxact->nxids - 1; j >= 0; j--)
00908 {
00909
00910 TransactionId cxid = proc->subxids.xids[j];
00911
00912 if (TransactionIdEquals(cxid, xid))
00913 {
00914 LWLockRelease(ProcArrayLock);
00915 xc_by_child_xid_inc();
00916 return true;
00917 }
00918 }
00919
00920
00921
00922
00923
00924
00925
00926
00927 if (pgxact->overflowed)
00928 xids[nxids++] = pxid;
00929 }
00930
00931
00932
00933
00934
00935 if (RecoveryInProgress())
00936 {
00937
00938 Assert(nxids == 0);
00939
00940 if (KnownAssignedXidExists(xid))
00941 {
00942 LWLockRelease(ProcArrayLock);
00943 xc_by_known_assigned_inc();
00944 return true;
00945 }
00946
00947
00948
00949
00950
00951
00952
00953
00954 if (TransactionIdPrecedesOrEquals(xid, procArray->lastOverflowedXid))
00955 nxids = KnownAssignedXidsGet(xids, xid);
00956 }
00957
00958 LWLockRelease(ProcArrayLock);
00959
00960
00961
00962
00963
00964 if (nxids == 0)
00965 {
00966 xc_no_overflow_inc();
00967 return false;
00968 }
00969
00970
00971
00972
00973
00974
00975
00976
00977
00978 xc_slow_answer_inc();
00979
00980 if (TransactionIdDidAbort(xid))
00981 return false;
00982
00983
00984
00985
00986
00987
00988 topxid = SubTransGetTopmostTransaction(xid);
00989 Assert(TransactionIdIsValid(topxid));
00990 if (!TransactionIdEquals(topxid, xid))
00991 {
00992 for (i = 0; i < nxids; i++)
00993 {
00994 if (TransactionIdEquals(xids[i], topxid))
00995 return true;
00996 }
00997 }
00998
00999 return false;
01000 }
01001
01002
01003
01004
01005
01006
01007
01008
01009
01010 bool
01011 TransactionIdIsActive(TransactionId xid)
01012 {
01013 bool result = false;
01014 ProcArrayStruct *arrayP = procArray;
01015 int i;
01016
01017
01018
01019
01020
01021 if (TransactionIdPrecedes(xid, RecentXmin))
01022 return false;
01023
01024 LWLockAcquire(ProcArrayLock, LW_SHARED);
01025
01026 for (i = 0; i < arrayP->numProcs; i++)
01027 {
01028 int pgprocno = arrayP->pgprocnos[i];
01029 volatile PGPROC *proc = &allProcs[pgprocno];
01030 volatile PGXACT *pgxact = &allPgXact[pgprocno];
01031 TransactionId pxid;
01032
01033
01034 pxid = pgxact->xid;
01035
01036 if (!TransactionIdIsValid(pxid))
01037 continue;
01038
01039 if (proc->pid == 0)
01040 continue;
01041
01042 if (TransactionIdEquals(pxid, xid))
01043 {
01044 result = true;
01045 break;
01046 }
01047 }
01048
01049 LWLockRelease(ProcArrayLock);
01050
01051 return result;
01052 }
01053
01054
01055
01056
01057
01058
01059
01060
01061
01062
01063
01064
01065
01066
01067
01068
01069
01070
01071
01072
01073
01074
01075
01076
01077
01078
01079
01080
01081
01082
01083
01084
01085
01086
01087
01088
01089
01090
01091
01092
01093
01094
01095
01096
01097
01098
01099
01100
01101
01102 TransactionId
01103 GetOldestXmin(bool allDbs, bool ignoreVacuum)
01104 {
01105 ProcArrayStruct *arrayP = procArray;
01106 TransactionId result;
01107 int index;
01108
01109
01110 Assert(allDbs || !RecoveryInProgress());
01111
01112 LWLockAcquire(ProcArrayLock, LW_SHARED);
01113
01114
01115
01116
01117
01118
01119
01120 result = ShmemVariableCache->latestCompletedXid;
01121 Assert(TransactionIdIsNormal(result));
01122 TransactionIdAdvance(result);
01123
01124 for (index = 0; index < arrayP->numProcs; index++)
01125 {
01126 int pgprocno = arrayP->pgprocnos[index];
01127 volatile PGPROC *proc = &allProcs[pgprocno];
01128 volatile PGXACT *pgxact = &allPgXact[pgprocno];
01129
01130 if (ignoreVacuum && (pgxact->vacuumFlags & PROC_IN_VACUUM))
01131 continue;
01132
01133 if (allDbs ||
01134 proc->databaseId == MyDatabaseId ||
01135 proc->databaseId == 0)
01136 {
01137
01138 TransactionId xid = pgxact->xid;
01139
01140
01141 if (TransactionIdIsNormal(xid) &&
01142 TransactionIdPrecedes(xid, result))
01143 result = xid;
01144
01145
01146
01147
01148
01149
01150
01151
01152 xid = pgxact->xmin;
01153 if (TransactionIdIsNormal(xid) &&
01154 TransactionIdPrecedes(xid, result))
01155 result = xid;
01156 }
01157 }
01158
01159 if (RecoveryInProgress())
01160 {
01161
01162
01163
01164
01165 TransactionId kaxmin = KnownAssignedXidsGetOldestXmin();
01166
01167 LWLockRelease(ProcArrayLock);
01168
01169 if (TransactionIdIsNormal(kaxmin) &&
01170 TransactionIdPrecedes(kaxmin, result))
01171 result = kaxmin;
01172 }
01173 else
01174 {
01175
01176
01177
01178 LWLockRelease(ProcArrayLock);
01179
01180
01181
01182
01183
01184
01185
01186
01187
01188
01189
01190
01191
01192
01193
01194 result -= vacuum_defer_cleanup_age;
01195 if (!TransactionIdIsNormal(result))
01196 result = FirstNormalTransactionId;
01197 }
01198
01199 return result;
01200 }
01201
01202
01203
01204
01205
01206
01207 int
01208 GetMaxSnapshotXidCount(void)
01209 {
01210 return procArray->maxProcs;
01211 }
01212
01213
01214
01215
01216
01217
01218 int
01219 GetMaxSnapshotSubxidCount(void)
01220 {
01221 return TOTAL_MAX_CACHED_SUBXIDS;
01222 }
01223
01224
01225
01226
01227
01228
01229
01230
01231
01232
01233
01234
01235
01236
01237
01238
01239
01240
01241
01242
01243
01244
01245
01246
01247
01248
01249
01250
01251
01252
01253
01254
01255
01256
01257 Snapshot
01258 GetSnapshotData(Snapshot snapshot)
01259 {
01260 ProcArrayStruct *arrayP = procArray;
01261 TransactionId xmin;
01262 TransactionId xmax;
01263 TransactionId globalxmin;
01264 int index;
01265 int count = 0;
01266 int subcount = 0;
01267 bool suboverflowed = false;
01268
01269 Assert(snapshot != NULL);
01270
01271
01272
01273
01274
01275
01276
01277
01278
01279
01280
01281
01282 if (snapshot->xip == NULL)
01283 {
01284
01285
01286
01287
01288 snapshot->xip = (TransactionId *)
01289 malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
01290 if (snapshot->xip == NULL)
01291 ereport(ERROR,
01292 (errcode(ERRCODE_OUT_OF_MEMORY),
01293 errmsg("out of memory")));
01294 Assert(snapshot->subxip == NULL);
01295 snapshot->subxip = (TransactionId *)
01296 malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
01297 if (snapshot->subxip == NULL)
01298 ereport(ERROR,
01299 (errcode(ERRCODE_OUT_OF_MEMORY),
01300 errmsg("out of memory")));
01301 }
01302
01303
01304
01305
01306
01307 LWLockAcquire(ProcArrayLock, LW_SHARED);
01308
01309
01310 xmax = ShmemVariableCache->latestCompletedXid;
01311 Assert(TransactionIdIsNormal(xmax));
01312 TransactionIdAdvance(xmax);
01313
01314
01315 globalxmin = xmin = xmax;
01316
01317 snapshot->takenDuringRecovery = RecoveryInProgress();
01318
01319 if (!snapshot->takenDuringRecovery)
01320 {
01321 int *pgprocnos = arrayP->pgprocnos;
01322 int numProcs;
01323
01324
01325
01326
01327
01328
01329 numProcs = arrayP->numProcs;
01330 for (index = 0; index < numProcs; index++)
01331 {
01332 int pgprocno = pgprocnos[index];
01333 volatile PGXACT *pgxact = &allPgXact[pgprocno];
01334 TransactionId xid;
01335
01336
01337 if (pgxact->vacuumFlags & PROC_IN_VACUUM)
01338 continue;
01339
01340
01341 xid = pgxact->xmin;
01342 if (TransactionIdIsNormal(xid) &&
01343 NormalTransactionIdPrecedes(xid, globalxmin))
01344 globalxmin = xid;
01345
01346
01347 xid = pgxact->xid;
01348
01349
01350
01351
01352
01353
01354
01355 if (!TransactionIdIsNormal(xid)
01356 || !NormalTransactionIdPrecedes(xid, xmax))
01357 continue;
01358
01359
01360
01361
01362
01363 if (NormalTransactionIdPrecedes(xid, xmin))
01364 xmin = xid;
01365 if (pgxact == MyPgXact)
01366 continue;
01367
01368
01369 snapshot->xip[count++] = xid;
01370
01371
01372
01373
01374
01375
01376
01377
01378
01379
01380
01381
01382
01383
01384
01385
01386 if (!suboverflowed)
01387 {
01388 if (pgxact->overflowed)
01389 suboverflowed = true;
01390 else
01391 {
01392 int nxids = pgxact->nxids;
01393
01394 if (nxids > 0)
01395 {
01396 volatile PGPROC *proc = &allProcs[pgprocno];
01397
01398 memcpy(snapshot->subxip + subcount,
01399 (void *) proc->subxids.xids,
01400 nxids * sizeof(TransactionId));
01401 subcount += nxids;
01402 }
01403 }
01404 }
01405 }
01406 }
01407 else
01408 {
01409
01410
01411
01412
01413
01414
01415
01416
01417
01418
01419
01420
01421
01422
01423
01424
01425
01426
01427
01428
01429
01430
01431
01432
01433
01434
01435
01436
01437
01438 subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin,
01439 xmax);
01440
01441 if (TransactionIdPrecedesOrEquals(xmin, procArray->lastOverflowedXid))
01442 suboverflowed = true;
01443 }
01444
01445 if (!TransactionIdIsValid(MyPgXact->xmin))
01446 MyPgXact->xmin = TransactionXmin = xmin;
01447 LWLockRelease(ProcArrayLock);
01448
01449
01450
01451
01452
01453
01454 if (TransactionIdPrecedes(xmin, globalxmin))
01455 globalxmin = xmin;
01456
01457
01458 RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
01459 if (!TransactionIdIsNormal(RecentGlobalXmin))
01460 RecentGlobalXmin = FirstNormalTransactionId;
01461 RecentXmin = xmin;
01462
01463 snapshot->xmin = xmin;
01464 snapshot->xmax = xmax;
01465 snapshot->xcnt = count;
01466 snapshot->subxcnt = subcount;
01467 snapshot->suboverflowed = suboverflowed;
01468
01469 snapshot->curcid = GetCurrentCommandId(false);
01470
01471
01472
01473
01474
01475 snapshot->active_count = 0;
01476 snapshot->regd_count = 0;
01477 snapshot->copied = false;
01478
01479 return snapshot;
01480 }
01481
01482
01483
01484
01485
01486
01487
01488
01489
01490
01491
01492 bool
01493 ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
01494 {
01495 bool result = false;
01496 ProcArrayStruct *arrayP = procArray;
01497 int index;
01498
01499 Assert(TransactionIdIsNormal(xmin));
01500 if (!TransactionIdIsNormal(sourcexid))
01501 return false;
01502
01503
01504 LWLockAcquire(ProcArrayLock, LW_SHARED);
01505
01506 for (index = 0; index < arrayP->numProcs; index++)
01507 {
01508 int pgprocno = arrayP->pgprocnos[index];
01509 volatile PGPROC *proc = &allProcs[pgprocno];
01510 volatile PGXACT *pgxact = &allPgXact[pgprocno];
01511 TransactionId xid;
01512
01513
01514 if (pgxact->vacuumFlags & PROC_IN_VACUUM)
01515 continue;
01516
01517 xid = pgxact->xid;
01518 if (xid != sourcexid)
01519 continue;
01520
01521
01522
01523
01524
01525
01526
01527 if (proc->databaseId != MyDatabaseId)
01528 continue;
01529
01530
01531
01532
01533 xid = pgxact->xmin;
01534 if (!TransactionIdIsNormal(xid) ||
01535 !TransactionIdPrecedesOrEquals(xid, xmin))
01536 continue;
01537
01538
01539
01540
01541
01542
01543
01544 MyPgXact->xmin = TransactionXmin = xmin;
01545
01546 result = true;
01547 break;
01548 }
01549
01550 LWLockRelease(ProcArrayLock);
01551
01552 return result;
01553 }
01554
01555
01556
01557
01558
01559
01560
01561
01562
01563
01564
01565
01566
01567
01568
01569
01570
01571
01572
01573
01574
01575
01576
01577
01578
01579
01580 RunningTransactions
01581 GetRunningTransactionData(void)
01582 {
01583
01584 static RunningTransactionsData CurrentRunningXactsData;
01585
01586 ProcArrayStruct *arrayP = procArray;
01587 RunningTransactions CurrentRunningXacts = &CurrentRunningXactsData;
01588 TransactionId latestCompletedXid;
01589 TransactionId oldestRunningXid;
01590 TransactionId *xids;
01591 int index;
01592 int count;
01593 int subcount;
01594 bool suboverflowed;
01595
01596 Assert(!RecoveryInProgress());
01597
01598
01599
01600
01601
01602
01603
01604
01605
01606
01607 if (CurrentRunningXacts->xids == NULL)
01608 {
01609
01610
01611
01612 CurrentRunningXacts->xids = (TransactionId *)
01613 malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
01614 if (CurrentRunningXacts->xids == NULL)
01615 ereport(ERROR,
01616 (errcode(ERRCODE_OUT_OF_MEMORY),
01617 errmsg("out of memory")));
01618 }
01619
01620 xids = CurrentRunningXacts->xids;
01621
01622 count = subcount = 0;
01623 suboverflowed = false;
01624
01625
01626
01627
01628
01629 LWLockAcquire(ProcArrayLock, LW_SHARED);
01630 LWLockAcquire(XidGenLock, LW_SHARED);
01631
01632 latestCompletedXid = ShmemVariableCache->latestCompletedXid;
01633
01634 oldestRunningXid = ShmemVariableCache->nextXid;
01635
01636
01637
01638
01639 for (index = 0; index < arrayP->numProcs; index++)
01640 {
01641 int pgprocno = arrayP->pgprocnos[index];
01642 volatile PGXACT *pgxact = &allPgXact[pgprocno];
01643 TransactionId xid;
01644
01645
01646 xid = pgxact->xid;
01647
01648
01649
01650
01651
01652 if (!TransactionIdIsValid(xid))
01653 continue;
01654
01655 xids[count++] = xid;
01656
01657 if (TransactionIdPrecedes(xid, oldestRunningXid))
01658 oldestRunningXid = xid;
01659
01660 if (pgxact->overflowed)
01661 suboverflowed = true;
01662 }
01663
01664
01665
01666
01667
01668 if (!suboverflowed)
01669 {
01670 for (index = 0; index < arrayP->numProcs; index++)
01671 {
01672 int pgprocno = arrayP->pgprocnos[index];
01673 volatile PGPROC *proc = &allProcs[pgprocno];
01674 volatile PGXACT *pgxact = &allPgXact[pgprocno];
01675 int nxids;
01676
01677
01678
01679
01680
01681 nxids = pgxact->nxids;
01682 if (nxids > 0)
01683 {
01684 memcpy(&xids[count], (void *) proc->subxids.xids,
01685 nxids * sizeof(TransactionId));
01686 count += nxids;
01687 subcount += nxids;
01688
01689
01690
01691
01692
01693
01694 }
01695 }
01696 }
01697
01698 CurrentRunningXacts->xcnt = count - subcount;
01699 CurrentRunningXacts->subxcnt = subcount;
01700 CurrentRunningXacts->subxid_overflow = suboverflowed;
01701 CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
01702 CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
01703 CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
01704
01705
01706 LWLockRelease(ProcArrayLock);
01707
01708 Assert(TransactionIdIsValid(CurrentRunningXacts->nextXid));
01709 Assert(TransactionIdIsValid(CurrentRunningXacts->oldestRunningXid));
01710 Assert(TransactionIdIsNormal(CurrentRunningXacts->latestCompletedXid));
01711
01712 return CurrentRunningXacts;
01713 }
01714
01715
01716
01717
01718
01719
01720
01721
01722
01723
01724
01725
01726
01727
01728
01729
01730 TransactionId
01731 GetOldestActiveTransactionId(void)
01732 {
01733 ProcArrayStruct *arrayP = procArray;
01734 TransactionId oldestRunningXid;
01735 int index;
01736
01737 Assert(!RecoveryInProgress());
01738
01739 LWLockAcquire(ProcArrayLock, LW_SHARED);
01740
01741
01742
01743
01744
01745
01746
01747
01748 oldestRunningXid = ShmemVariableCache->nextXid;
01749
01750
01751
01752
01753 for (index = 0; index < arrayP->numProcs; index++)
01754 {
01755 int pgprocno = arrayP->pgprocnos[index];
01756 volatile PGXACT *pgxact = &allPgXact[pgprocno];
01757 TransactionId xid;
01758
01759
01760 xid = pgxact->xid;
01761
01762 if (!TransactionIdIsNormal(xid))
01763 continue;
01764
01765 if (TransactionIdPrecedes(xid, oldestRunningXid))
01766 oldestRunningXid = xid;
01767
01768
01769
01770
01771
01772
01773 }
01774
01775 LWLockRelease(ProcArrayLock);
01776
01777 return oldestRunningXid;
01778 }
01779
01780
01781
01782
01783
01784
01785
01786
01787
01788
01789
01790
01791
01792
01793
01794
01795
01796
01797
01798 VirtualTransactionId *
01799 GetVirtualXIDsDelayingChkpt(int *nvxids)
01800 {
01801 VirtualTransactionId *vxids;
01802 ProcArrayStruct *arrayP = procArray;
01803 int count = 0;
01804 int index;
01805
01806
01807 vxids = (VirtualTransactionId *)
01808 palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
01809
01810 LWLockAcquire(ProcArrayLock, LW_SHARED);
01811
01812 for (index = 0; index < arrayP->numProcs; index++)
01813 {
01814 int pgprocno = arrayP->pgprocnos[index];
01815 volatile PGPROC *proc = &allProcs[pgprocno];
01816 volatile PGXACT *pgxact = &allPgXact[pgprocno];
01817
01818 if (pgxact->delayChkpt)
01819 {
01820 VirtualTransactionId vxid;
01821
01822 GET_VXID_FROM_PGPROC(vxid, *proc);
01823 if (VirtualTransactionIdIsValid(vxid))
01824 vxids[count++] = vxid;
01825 }
01826 }
01827
01828 LWLockRelease(ProcArrayLock);
01829
01830 *nvxids = count;
01831 return vxids;
01832 }
01833
01834
01835
01836
01837
01838
01839
01840
01841
01842
01843 bool
01844 HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids)
01845 {
01846 bool result = false;
01847 ProcArrayStruct *arrayP = procArray;
01848 int index;
01849
01850 LWLockAcquire(ProcArrayLock, LW_SHARED);
01851
01852 while (VirtualTransactionIdIsValid(*vxids))
01853 {
01854 for (index = 0; index < arrayP->numProcs; index++)
01855 {
01856 int pgprocno = arrayP->pgprocnos[index];
01857 volatile PGPROC *proc = &allProcs[pgprocno];
01858 volatile PGXACT *pgxact = &allPgXact[pgprocno];
01859 VirtualTransactionId vxid;
01860
01861 GET_VXID_FROM_PGPROC(vxid, *proc);
01862 if (VirtualTransactionIdIsValid(vxid))
01863 {
01864 if (VirtualTransactionIdEquals(vxid, *vxids) &&
01865 pgxact->delayChkpt)
01866 {
01867 result = true;
01868 break;
01869 }
01870 }
01871 }
01872
01873 if (result)
01874 break;
01875
01876
01877 vxids++;
01878 }
01879
01880 LWLockRelease(ProcArrayLock);
01881
01882 return result;
01883 }
01884
01885
01886
01887
01888
01889
01890
01891
01892 PGPROC *
01893 BackendPidGetProc(int pid)
01894 {
01895 PGPROC *result = NULL;
01896 ProcArrayStruct *arrayP = procArray;
01897 int index;
01898
01899 if (pid == 0)
01900 return NULL;
01901
01902 LWLockAcquire(ProcArrayLock, LW_SHARED);
01903
01904 for (index = 0; index < arrayP->numProcs; index++)
01905 {
01906 PGPROC *proc = &allProcs[arrayP->pgprocnos[index]];
01907
01908 if (proc->pid == pid)
01909 {
01910 result = proc;
01911 break;
01912 }
01913 }
01914
01915 LWLockRelease(ProcArrayLock);
01916
01917 return result;
01918 }
01919
01920
01921
01922
01923
01924
01925
01926
01927
01928
01929
01930
01931
01932
01933 int
01934 BackendXidGetPid(TransactionId xid)
01935 {
01936 int result = 0;
01937 ProcArrayStruct *arrayP = procArray;
01938 int index;
01939
01940 if (xid == InvalidTransactionId)
01941 return 0;
01942
01943 LWLockAcquire(ProcArrayLock, LW_SHARED);
01944
01945 for (index = 0; index < arrayP->numProcs; index++)
01946 {
01947 int pgprocno = arrayP->pgprocnos[index];
01948 volatile PGPROC *proc = &allProcs[pgprocno];
01949 volatile PGXACT *pgxact = &allPgXact[pgprocno];
01950
01951 if (pgxact->xid == xid)
01952 {
01953 result = proc->pid;
01954 break;
01955 }
01956 }
01957
01958 LWLockRelease(ProcArrayLock);
01959
01960 return result;
01961 }
01962
01963
01964
01965
01966
01967
01968 bool
01969 IsBackendPid(int pid)
01970 {
01971 return (BackendPidGetProc(pid) != NULL);
01972 }
01973
01974
01975
01976
01977
01978
01979
01980
01981
01982
01983
01984
01985
01986
01987
01988
01989
01990
01991
01992
01993
01994
01995
01996
01997
01998
01999
02000
02001 VirtualTransactionId *
02002 GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
02003 bool allDbs, int excludeVacuum,
02004 int *nvxids)
02005 {
02006 VirtualTransactionId *vxids;
02007 ProcArrayStruct *arrayP = procArray;
02008 int count = 0;
02009 int index;
02010
02011
02012 vxids = (VirtualTransactionId *)
02013 palloc(sizeof(VirtualTransactionId) * arrayP->maxProcs);
02014
02015 LWLockAcquire(ProcArrayLock, LW_SHARED);
02016
02017 for (index = 0; index < arrayP->numProcs; index++)
02018 {
02019 int pgprocno = arrayP->pgprocnos[index];
02020 volatile PGPROC *proc = &allProcs[pgprocno];
02021 volatile PGXACT *pgxact = &allPgXact[pgprocno];
02022
02023 if (proc == MyProc)
02024 continue;
02025
02026 if (excludeVacuum & pgxact->vacuumFlags)
02027 continue;
02028
02029 if (allDbs || proc->databaseId == MyDatabaseId)
02030 {
02031
02032 TransactionId pxmin = pgxact->xmin;
02033
02034 if (excludeXmin0 && !TransactionIdIsValid(pxmin))
02035 continue;
02036
02037
02038
02039
02040
02041 if (!TransactionIdIsValid(limitXmin) ||
02042 TransactionIdPrecedesOrEquals(pxmin, limitXmin))
02043 {
02044 VirtualTransactionId vxid;
02045
02046 GET_VXID_FROM_PGPROC(vxid, *proc);
02047 if (VirtualTransactionIdIsValid(vxid))
02048 vxids[count++] = vxid;
02049 }
02050 }
02051 }
02052
02053 LWLockRelease(ProcArrayLock);
02054
02055 *nvxids = count;
02056 return vxids;
02057 }
02058
02059
02060
02061
02062
02063
02064
02065
02066
02067
02068
02069
02070
02071
02072
02073
02074
02075
02076
02077
02078
02079
02080
02081
02082
02083
02084
02085
02086
02087
02088
02089 VirtualTransactionId *
02090 GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
02091 {
02092 static VirtualTransactionId *vxids;
02093 ProcArrayStruct *arrayP = procArray;
02094 int count = 0;
02095 int index;
02096
02097
02098
02099
02100
02101
02102 if (vxids == NULL)
02103 {
02104 vxids = (VirtualTransactionId *)
02105 malloc(sizeof(VirtualTransactionId) * (arrayP->maxProcs + 1));
02106 if (vxids == NULL)
02107 ereport(ERROR,
02108 (errcode(ERRCODE_OUT_OF_MEMORY),
02109 errmsg("out of memory")));
02110 }
02111
02112 LWLockAcquire(ProcArrayLock, LW_SHARED);
02113
02114 for (index = 0; index < arrayP->numProcs; index++)
02115 {
02116 int pgprocno = arrayP->pgprocnos[index];
02117 volatile PGPROC *proc = &allProcs[pgprocno];
02118 volatile PGXACT *pgxact = &allPgXact[pgprocno];
02119
02120
02121 if (proc->pid == 0)
02122 continue;
02123
02124 if (!OidIsValid(dbOid) ||
02125 proc->databaseId == dbOid)
02126 {
02127
02128 TransactionId pxmin = pgxact->xmin;
02129
02130
02131
02132
02133
02134
02135 if (!TransactionIdIsValid(limitXmin) ||
02136 (TransactionIdIsValid(pxmin) && !TransactionIdFollows(pxmin, limitXmin)))
02137 {
02138 VirtualTransactionId vxid;
02139
02140 GET_VXID_FROM_PGPROC(vxid, *proc);
02141 if (VirtualTransactionIdIsValid(vxid))
02142 vxids[count++] = vxid;
02143 }
02144 }
02145 }
02146
02147 LWLockRelease(ProcArrayLock);
02148
02149
02150 vxids[count].backendId = InvalidBackendId;
02151 vxids[count].localTransactionId = InvalidLocalTransactionId;
02152
02153 return vxids;
02154 }
02155
02156
02157
02158
02159
02160
02161 pid_t
02162 CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
02163 {
02164 ProcArrayStruct *arrayP = procArray;
02165 int index;
02166 pid_t pid = 0;
02167
02168 LWLockAcquire(ProcArrayLock, LW_SHARED);
02169
02170 for (index = 0; index < arrayP->numProcs; index++)
02171 {
02172 int pgprocno = arrayP->pgprocnos[index];
02173 volatile PGPROC *proc = &allProcs[pgprocno];
02174 VirtualTransactionId procvxid;
02175
02176 GET_VXID_FROM_PGPROC(procvxid, *proc);
02177
02178 if (procvxid.backendId == vxid.backendId &&
02179 procvxid.localTransactionId == vxid.localTransactionId)
02180 {
02181 proc->recoveryConflictPending = true;
02182 pid = proc->pid;
02183 if (pid != 0)
02184 {
02185
02186
02187
02188
02189 (void) SendProcSignal(pid, sigmode, vxid.backendId);
02190 }
02191 break;
02192 }
02193 }
02194
02195 LWLockRelease(ProcArrayLock);
02196
02197 return pid;
02198 }
02199
02200
02201
02202
02203
02204
02205
02206
02207
02208
02209 bool
02210 MinimumActiveBackends(int min)
02211 {
02212 ProcArrayStruct *arrayP = procArray;
02213 int count = 0;
02214 int index;
02215
02216
02217 if (min == 0)
02218 return true;
02219
02220
02221
02222
02223
02224
02225 for (index = 0; index < arrayP->numProcs; index++)
02226 {
02227 int pgprocno = arrayP->pgprocnos[index];
02228 volatile PGPROC *proc = &allProcs[pgprocno];
02229 volatile PGXACT *pgxact = &allPgXact[pgprocno];
02230
02231
02232
02233
02234
02235
02236
02237
02238
02239
02240
02241
02242 if (proc == NULL)
02243 continue;
02244
02245 if (proc == MyProc)
02246 continue;
02247 if (pgxact->xid == InvalidTransactionId)
02248 continue;
02249 if (proc->pid == 0)
02250 continue;
02251 if (proc->waitLock != NULL)
02252 continue;
02253 count++;
02254 if (count >= min)
02255 break;
02256 }
02257
02258 return count >= min;
02259 }
02260
02261
02262
02263
02264 int
02265 CountDBBackends(Oid databaseid)
02266 {
02267 ProcArrayStruct *arrayP = procArray;
02268 int count = 0;
02269 int index;
02270
02271 LWLockAcquire(ProcArrayLock, LW_SHARED);
02272
02273 for (index = 0; index < arrayP->numProcs; index++)
02274 {
02275 int pgprocno = arrayP->pgprocnos[index];
02276 volatile PGPROC *proc = &allProcs[pgprocno];
02277
02278 if (proc->pid == 0)
02279 continue;
02280 if (!OidIsValid(databaseid) ||
02281 proc->databaseId == databaseid)
02282 count++;
02283 }
02284
02285 LWLockRelease(ProcArrayLock);
02286
02287 return count;
02288 }
02289
02290
02291
02292
02293 void
02294 CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending)
02295 {
02296 ProcArrayStruct *arrayP = procArray;
02297 int index;
02298 pid_t pid = 0;
02299
02300
02301 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
02302
02303 for (index = 0; index < arrayP->numProcs; index++)
02304 {
02305 int pgprocno = arrayP->pgprocnos[index];
02306 volatile PGPROC *proc = &allProcs[pgprocno];
02307
02308 if (databaseid == InvalidOid || proc->databaseId == databaseid)
02309 {
02310 VirtualTransactionId procvxid;
02311
02312 GET_VXID_FROM_PGPROC(procvxid, *proc);
02313
02314 proc->recoveryConflictPending = conflictPending;
02315 pid = proc->pid;
02316 if (pid != 0)
02317 {
02318
02319
02320
02321
02322 (void) SendProcSignal(pid, sigmode, procvxid.backendId);
02323 }
02324 }
02325 }
02326
02327 LWLockRelease(ProcArrayLock);
02328 }
02329
02330
02331
02332
02333 int
02334 CountUserBackends(Oid roleid)
02335 {
02336 ProcArrayStruct *arrayP = procArray;
02337 int count = 0;
02338 int index;
02339
02340 LWLockAcquire(ProcArrayLock, LW_SHARED);
02341
02342 for (index = 0; index < arrayP->numProcs; index++)
02343 {
02344 int pgprocno = arrayP->pgprocnos[index];
02345 volatile PGPROC *proc = &allProcs[pgprocno];
02346
02347 if (proc->pid == 0)
02348 continue;
02349 if (proc->roleId == roleid)
02350 count++;
02351 }
02352
02353 LWLockRelease(ProcArrayLock);
02354
02355 return count;
02356 }
02357
02358
02359
02360
02361
02362
02363
02364
02365
02366
02367
02368
02369
02370
02371
02372
02373
02374
02375
02376
02377
02378
02379
02380
02381 bool
02382 CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
02383 {
02384 ProcArrayStruct *arrayP = procArray;
02385
02386 #define MAXAUTOVACPIDS 10
02387 int autovac_pids[MAXAUTOVACPIDS];
02388 int tries;
02389
02390
02391 for (tries = 0; tries < 50; tries++)
02392 {
02393 int nautovacs = 0;
02394 bool found = false;
02395 int index;
02396
02397 CHECK_FOR_INTERRUPTS();
02398
02399 *nbackends = *nprepared = 0;
02400
02401 LWLockAcquire(ProcArrayLock, LW_SHARED);
02402
02403 for (index = 0; index < arrayP->numProcs; index++)
02404 {
02405 int pgprocno = arrayP->pgprocnos[index];
02406 volatile PGPROC *proc = &allProcs[pgprocno];
02407 volatile PGXACT *pgxact = &allPgXact[pgprocno];
02408
02409 if (proc->databaseId != databaseId)
02410 continue;
02411 if (proc == MyProc)
02412 continue;
02413
02414 found = true;
02415
02416 if (proc->pid == 0)
02417 (*nprepared)++;
02418 else
02419 {
02420 (*nbackends)++;
02421 if ((pgxact->vacuumFlags & PROC_IS_AUTOVACUUM) &&
02422 nautovacs < MAXAUTOVACPIDS)
02423 autovac_pids[nautovacs++] = proc->pid;
02424 }
02425 }
02426
02427 LWLockRelease(ProcArrayLock);
02428
02429 if (!found)
02430 return false;
02431
02432
02433
02434
02435
02436
02437
02438 for (index = 0; index < nautovacs; index++)
02439 (void) kill(autovac_pids[index], SIGTERM);
02440
02441
02442 pg_usleep(100 * 1000L);
02443 }
02444
02445 return true;
02446 }
02447
02448
02449 #define XidCacheRemove(i) \
02450 do { \
02451 MyProc->subxids.xids[i] = MyProc->subxids.xids[MyPgXact->nxids - 1]; \
02452 MyPgXact->nxids--; \
02453 } while (0)
02454
02455
02456
02457
02458
02459
02460
02461
02462
02463 void
02464 XidCacheRemoveRunningXids(TransactionId xid,
02465 int nxids, const TransactionId *xids,
02466 TransactionId latestXid)
02467 {
02468 int i,
02469 j;
02470
02471 Assert(TransactionIdIsValid(xid));
02472
02473
02474
02475
02476
02477
02478
02479
02480 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
02481
02482
02483
02484
02485
02486
02487 for (i = nxids - 1; i >= 0; i--)
02488 {
02489 TransactionId anxid = xids[i];
02490
02491 for (j = MyPgXact->nxids - 1; j >= 0; j--)
02492 {
02493 if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
02494 {
02495 XidCacheRemove(j);
02496 break;
02497 }
02498 }
02499
02500
02501
02502
02503
02504
02505
02506
02507 if (j < 0 && !MyPgXact->overflowed)
02508 elog(WARNING, "did not find subXID %u in MyProc", anxid);
02509 }
02510
02511 for (j = MyPgXact->nxids - 1; j >= 0; j--)
02512 {
02513 if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
02514 {
02515 XidCacheRemove(j);
02516 break;
02517 }
02518 }
02519
02520 if (j < 0 && !MyPgXact->overflowed)
02521 elog(WARNING, "did not find subXID %u in MyProc", xid);
02522
02523
02524 if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
02525 latestXid))
02526 ShmemVariableCache->latestCompletedXid = latestXid;
02527
02528 LWLockRelease(ProcArrayLock);
02529 }
02530
02531 #ifdef XIDCACHE_DEBUG
02532
02533
02534
02535
02536 static void
02537 DisplayXidCache(void)
02538 {
02539 fprintf(stderr,
02540 "XidCache: xmin: %ld, known: %ld, myxact: %ld, latest: %ld, mainxid: %ld, childxid: %ld, knownassigned: %ld, nooflo: %ld, slow: %ld\n",
02541 xc_by_recent_xmin,
02542 xc_by_known_xact,
02543 xc_by_my_xact,
02544 xc_by_latest_xid,
02545 xc_by_main_xid,
02546 xc_by_child_xid,
02547 xc_by_known_assigned,
02548 xc_no_overflow,
02549 xc_slow_answer);
02550 }
02551 #endif
02552
02553
02554
02555
02556
02557
02558
02559
02560
02561
02562
02563
02564
02565
02566
02567
02568
02569
02570
02571
02572
02573
02574
02575
02576
02577
02578
02579
02580
02581
02582
02583
02584
02585
02586
02587
02588
02589
02590
02591
02592
02593
02594
02595
02596
02597
02598
02599
02600
02601
02602
02603
02604
02605
02606
02607
02608
02609
02610
02611
02612
02613
02614
02615
02616
02617
02618
02619
02620 void
02621 RecordKnownAssignedTransactionIds(TransactionId xid)
02622 {
02623 Assert(standbyState >= STANDBY_INITIALIZED);
02624 Assert(TransactionIdIsValid(xid));
02625
02626 elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u",
02627 xid, latestObservedXid);
02628
02629
02630
02631
02632 if (standbyState <= STANDBY_INITIALIZED)
02633 return;
02634
02635 Assert(TransactionIdIsValid(latestObservedXid));
02636
02637
02638
02639
02640
02641
02642 if (TransactionIdFollows(xid, latestObservedXid))
02643 {
02644 TransactionId next_expected_xid;
02645
02646
02647
02648
02649
02650
02651 next_expected_xid = latestObservedXid;
02652 TransactionIdAdvance(next_expected_xid);
02653 while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
02654 {
02655 ExtendCLOG(next_expected_xid);
02656 ExtendSUBTRANS(next_expected_xid);
02657
02658 TransactionIdAdvance(next_expected_xid);
02659 }
02660
02661
02662
02663
02664 next_expected_xid = latestObservedXid;
02665 TransactionIdAdvance(next_expected_xid);
02666 KnownAssignedXidsAdd(next_expected_xid, xid, false);
02667
02668
02669
02670
02671 latestObservedXid = xid;
02672
02673
02674 next_expected_xid = latestObservedXid;
02675 TransactionIdAdvance(next_expected_xid);
02676 LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
02677 ShmemVariableCache->nextXid = next_expected_xid;
02678 LWLockRelease(XidGenLock);
02679 }
02680 }
02681
02682
02683
02684
02685
02686
02687
02688 void
02689 ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
02690 TransactionId *subxids, TransactionId max_xid)
02691 {
02692 Assert(standbyState >= STANDBY_INITIALIZED);
02693
02694
02695
02696
02697 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
02698
02699 KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
02700
02701
02702 if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
02703 max_xid))
02704 ShmemVariableCache->latestCompletedXid = max_xid;
02705
02706 LWLockRelease(ProcArrayLock);
02707 }
02708
02709
02710
02711
02712
02713 void
02714 ExpireAllKnownAssignedTransactionIds(void)
02715 {
02716 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
02717 KnownAssignedXidsRemovePreceding(InvalidTransactionId);
02718 LWLockRelease(ProcArrayLock);
02719 }
02720
02721
02722
02723
02724
02725 void
02726 ExpireOldKnownAssignedTransactionIds(TransactionId xid)
02727 {
02728 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
02729 KnownAssignedXidsRemovePreceding(xid);
02730 LWLockRelease(ProcArrayLock);
02731 }
02732
02733
02734
02735
02736
02737
02738
02739
02740
02741
02742
02743
02744
02745
02746
02747
02748
02749
02750
02751
02752
02753
02754
02755
02756
02757
02758
02759
02760
02761
02762
02763
02764
02765
02766
02767
02768
02769
02770
02771
02772
02773
02774
02775
02776
02777
02778
02779
02780
02781
02782
02783
02784
02785
02786
02787
02788
02789
02790
02791
02792
02793
02794
02795
02796
02797
02798
02799
02800
02801
02802
02803
02804
02805
02806
02807
02808
02809
02810
02811
02812
02813
02814
02815
02816
02817
02818
02819
02820
02821
02822
02823
02824
02825
02826
02827
02828 static void
02829 KnownAssignedXidsCompress(bool force)
02830 {
02831
02832 volatile ProcArrayStruct *pArray = procArray;
02833 int head,
02834 tail;
02835 int compress_index;
02836 int i;
02837
02838
02839 head = pArray->headKnownAssignedXids;
02840 tail = pArray->tailKnownAssignedXids;
02841
02842 if (!force)
02843 {
02844
02845
02846
02847
02848
02849
02850
02851
02852
02853
02854 int nelements = head - tail;
02855
02856 if (nelements < 4 * PROCARRAY_MAXPROCS ||
02857 nelements < 2 * pArray->numKnownAssignedXids)
02858 return;
02859 }
02860
02861
02862
02863
02864
02865 compress_index = 0;
02866 for (i = tail; i < head; i++)
02867 {
02868 if (KnownAssignedXidsValid[i])
02869 {
02870 KnownAssignedXids[compress_index] = KnownAssignedXids[i];
02871 KnownAssignedXidsValid[compress_index] = true;
02872 compress_index++;
02873 }
02874 }
02875
02876 pArray->tailKnownAssignedXids = 0;
02877 pArray->headKnownAssignedXids = compress_index;
02878 }
02879
02880
02881
02882
02883
02884
02885
02886
02887
02888
02889
02890
02891 static void
02892 KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
02893 bool exclusive_lock)
02894 {
02895
02896 volatile ProcArrayStruct *pArray = procArray;
02897 TransactionId next_xid;
02898 int head,
02899 tail;
02900 int nxids;
02901 int i;
02902
02903 Assert(TransactionIdPrecedesOrEquals(from_xid, to_xid));
02904
02905
02906
02907
02908
02909
02910 if (to_xid >= from_xid)
02911 nxids = to_xid - from_xid + 1;
02912 else
02913 {
02914 nxids = 1;
02915 next_xid = from_xid;
02916 while (TransactionIdPrecedes(next_xid, to_xid))
02917 {
02918 nxids++;
02919 TransactionIdAdvance(next_xid);
02920 }
02921 }
02922
02923
02924
02925
02926
02927 head = pArray->headKnownAssignedXids;
02928 tail = pArray->tailKnownAssignedXids;
02929
02930 Assert(head >= 0 && head <= pArray->maxKnownAssignedXids);
02931 Assert(tail >= 0 && tail < pArray->maxKnownAssignedXids);
02932
02933
02934
02935
02936
02937
02938 if (head > tail &&
02939 TransactionIdFollowsOrEquals(KnownAssignedXids[head - 1], from_xid))
02940 {
02941 KnownAssignedXidsDisplay(LOG);
02942 elog(ERROR, "out-of-order XID insertion in KnownAssignedXids");
02943 }
02944
02945
02946
02947
02948 if (head + nxids > pArray->maxKnownAssignedXids)
02949 {
02950
02951 if (!exclusive_lock)
02952 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
02953
02954 KnownAssignedXidsCompress(true);
02955
02956 head = pArray->headKnownAssignedXids;
02957
02958
02959 if (!exclusive_lock)
02960 LWLockRelease(ProcArrayLock);
02961
02962
02963
02964
02965 if (head + nxids > pArray->maxKnownAssignedXids)
02966 elog(ERROR, "too many KnownAssignedXids");
02967 }
02968
02969
02970 next_xid = from_xid;
02971 for (i = 0; i < nxids; i++)
02972 {
02973 KnownAssignedXids[head] = next_xid;
02974 KnownAssignedXidsValid[head] = true;
02975 TransactionIdAdvance(next_xid);
02976 head++;
02977 }
02978
02979
02980 pArray->numKnownAssignedXids += nxids;
02981
02982
02983
02984
02985
02986
02987
02988
02989
02990
02991 if (exclusive_lock)
02992 pArray->headKnownAssignedXids = head;
02993 else
02994 {
02995 SpinLockAcquire(&pArray->known_assigned_xids_lck);
02996 pArray->headKnownAssignedXids = head;
02997 SpinLockRelease(&pArray->known_assigned_xids_lck);
02998 }
02999 }
03000
03001
03002
03003
03004
03005
03006
03007
03008
03009
03010 static bool
03011 KnownAssignedXidsSearch(TransactionId xid, bool remove)
03012 {
03013
03014 volatile ProcArrayStruct *pArray = procArray;
03015 int first,
03016 last;
03017 int head;
03018 int tail;
03019 int result_index = -1;
03020
03021 if (remove)
03022 {
03023
03024 tail = pArray->tailKnownAssignedXids;
03025 head = pArray->headKnownAssignedXids;
03026 }
03027 else
03028 {
03029
03030 SpinLockAcquire(&pArray->known_assigned_xids_lck);
03031 tail = pArray->tailKnownAssignedXids;
03032 head = pArray->headKnownAssignedXids;
03033 SpinLockRelease(&pArray->known_assigned_xids_lck);
03034 }
03035
03036
03037
03038
03039
03040 first = tail;
03041 last = head - 1;
03042 while (first <= last)
03043 {
03044 int mid_index;
03045 TransactionId mid_xid;
03046
03047 mid_index = (first + last) / 2;
03048 mid_xid = KnownAssignedXids[mid_index];
03049
03050 if (xid == mid_xid)
03051 {
03052 result_index = mid_index;
03053 break;
03054 }
03055 else if (TransactionIdPrecedes(xid, mid_xid))
03056 last = mid_index - 1;
03057 else
03058 first = mid_index + 1;
03059 }
03060
03061 if (result_index < 0)
03062 return false;
03063
03064 if (!KnownAssignedXidsValid[result_index])
03065 return false;
03066
03067 if (remove)
03068 {
03069 KnownAssignedXidsValid[result_index] = false;
03070
03071 pArray->numKnownAssignedXids--;
03072 Assert(pArray->numKnownAssignedXids >= 0);
03073
03074
03075
03076
03077
03078 if (result_index == tail)
03079 {
03080 tail++;
03081 while (tail < head && !KnownAssignedXidsValid[tail])
03082 tail++;
03083 if (tail >= head)
03084 {
03085
03086 pArray->headKnownAssignedXids = 0;
03087 pArray->tailKnownAssignedXids = 0;
03088 }
03089 else
03090 {
03091 pArray->tailKnownAssignedXids = tail;
03092 }
03093 }
03094 }
03095
03096 return true;
03097 }
03098
03099
03100
03101
03102
03103
03104 static bool
03105 KnownAssignedXidExists(TransactionId xid)
03106 {
03107 Assert(TransactionIdIsValid(xid));
03108
03109 return KnownAssignedXidsSearch(xid, false);
03110 }
03111
03112
03113
03114
03115
03116
03117 static void
03118 KnownAssignedXidsRemove(TransactionId xid)
03119 {
03120 Assert(TransactionIdIsValid(xid));
03121
03122 elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);
03123
03124
03125
03126
03127
03128
03129
03130
03131
03132
03133
03134 (void) KnownAssignedXidsSearch(xid, true);
03135 }
03136
03137
03138
03139
03140
03141
03142
03143 static void
03144 KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
03145 TransactionId *subxids)
03146 {
03147 int i;
03148
03149 if (TransactionIdIsValid(xid))
03150 KnownAssignedXidsRemove(xid);
03151
03152 for (i = 0; i < nsubxids; i++)
03153 KnownAssignedXidsRemove(subxids[i]);
03154
03155
03156 KnownAssignedXidsCompress(false);
03157 }
03158
03159
03160
03161
03162
03163
03164
03165 static void
03166 KnownAssignedXidsRemovePreceding(TransactionId removeXid)
03167 {
03168
03169 volatile ProcArrayStruct *pArray = procArray;
03170 int count = 0;
03171 int head,
03172 tail,
03173 i;
03174
03175 if (!TransactionIdIsValid(removeXid))
03176 {
03177 elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
03178 pArray->numKnownAssignedXids = 0;
03179 pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0;
03180 return;
03181 }
03182
03183 elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", removeXid);
03184
03185
03186
03187
03188
03189 tail = pArray->tailKnownAssignedXids;
03190 head = pArray->headKnownAssignedXids;
03191
03192 for (i = tail; i < head; i++)
03193 {
03194 if (KnownAssignedXidsValid[i])
03195 {
03196 TransactionId knownXid = KnownAssignedXids[i];
03197
03198 if (TransactionIdFollowsOrEquals(knownXid, removeXid))
03199 break;
03200
03201 if (!StandbyTransactionIdIsPrepared(knownXid))
03202 {
03203 KnownAssignedXidsValid[i] = false;
03204 count++;
03205 }
03206 }
03207 }
03208
03209 pArray->numKnownAssignedXids -= count;
03210 Assert(pArray->numKnownAssignedXids >= 0);
03211
03212
03213
03214
03215 for (i = tail; i < head; i++)
03216 {
03217 if (KnownAssignedXidsValid[i])
03218 break;
03219 }
03220 if (i >= head)
03221 {
03222
03223 pArray->headKnownAssignedXids = 0;
03224 pArray->tailKnownAssignedXids = 0;
03225 }
03226 else
03227 {
03228 pArray->tailKnownAssignedXids = i;
03229 }
03230
03231
03232 KnownAssignedXidsCompress(false);
03233 }
03234
03235
03236
03237
03238
03239
03240
03241
03242
03243
03244 static int
03245 KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
03246 {
03247 TransactionId xtmp = InvalidTransactionId;
03248
03249 return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
03250 }
03251
03252
03253
03254
03255
03256
03257
03258 static int
03259 KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
03260 TransactionId xmax)
03261 {
03262
03263 volatile ProcArrayStruct *pArray = procArray;
03264 int count = 0;
03265 int head,
03266 tail;
03267 int i;
03268
03269
03270
03271
03272
03273
03274
03275
03276
03277
03278 SpinLockAcquire(&pArray->known_assigned_xids_lck);
03279 tail = pArray->tailKnownAssignedXids;
03280 head = pArray->headKnownAssignedXids;
03281 SpinLockRelease(&pArray->known_assigned_xids_lck);
03282
03283 for (i = tail; i < head; i++)
03284 {
03285
03286 if (KnownAssignedXidsValid[i])
03287 {
03288 TransactionId knownXid = KnownAssignedXids[i];
03289
03290
03291
03292
03293
03294 if (count == 0 &&
03295 TransactionIdPrecedes(knownXid, *xmin))
03296 *xmin = knownXid;
03297
03298
03299
03300
03301
03302 if (TransactionIdIsValid(xmax) &&
03303 TransactionIdFollowsOrEquals(knownXid, xmax))
03304 break;
03305
03306
03307 xarray[count++] = knownXid;
03308 }
03309 }
03310
03311 return count;
03312 }
03313
03314
03315
03316
03317
03318 static TransactionId
03319 KnownAssignedXidsGetOldestXmin(void)
03320 {
03321
03322 volatile ProcArrayStruct *pArray = procArray;
03323 int head,
03324 tail;
03325 int i;
03326
03327
03328
03329
03330 SpinLockAcquire(&pArray->known_assigned_xids_lck);
03331 tail = pArray->tailKnownAssignedXids;
03332 head = pArray->headKnownAssignedXids;
03333 SpinLockRelease(&pArray->known_assigned_xids_lck);
03334
03335 for (i = tail; i < head; i++)
03336 {
03337
03338 if (KnownAssignedXidsValid[i])
03339 return KnownAssignedXids[i];
03340 }
03341
03342 return InvalidTransactionId;
03343 }
03344
03345
03346
03347
03348
03349
03350
03351
03352
03353
03354
03355 static void
03356 KnownAssignedXidsDisplay(int trace_level)
03357 {
03358
03359 volatile ProcArrayStruct *pArray = procArray;
03360 StringInfoData buf;
03361 int head,
03362 tail,
03363 i;
03364 int nxids = 0;
03365
03366 tail = pArray->tailKnownAssignedXids;
03367 head = pArray->headKnownAssignedXids;
03368
03369 initStringInfo(&buf);
03370
03371 for (i = tail; i < head; i++)
03372 {
03373 if (KnownAssignedXidsValid[i])
03374 {
03375 nxids++;
03376 appendStringInfo(&buf, "[%d]=%u ", i, KnownAssignedXids[i]);
03377 }
03378 }
03379
03380 elog(trace_level, "%d KnownAssignedXids (num=%d tail=%d head=%d) %s",
03381 nxids,
03382 pArray->numKnownAssignedXids,
03383 pArray->tailKnownAssignedXids,
03384 pArray->headKnownAssignedXids,
03385 buf.data);
03386
03387 pfree(buf.data);
03388 }
03389
03390
03391
03392
03393
03394 static void
03395 KnownAssignedXidsReset(void)
03396 {
03397
03398 volatile ProcArrayStruct *pArray = procArray;
03399
03400 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
03401
03402 pArray->numKnownAssignedXids = 0;
03403 pArray->tailKnownAssignedXids = 0;
03404 pArray->headKnownAssignedXids = 0;
03405
03406 LWLockRelease(ProcArrayLock);
03407 }