Header And Logo

PostgreSQL
| The world's most advanced open source database.

predicate.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * predicate.c
00004  *    POSTGRES predicate locking
00005  *    to support full serializable transaction isolation
00006  *
00007  *
00008  * The approach taken is to implement Serializable Snapshot Isolation (SSI)
00009  * as initially described in this paper:
00010  *
00011  *  Michael J. Cahill, Uwe Röhm, and Alan D. Fekete. 2008.
00012  *  Serializable isolation for snapshot databases.
00013  *  In SIGMOD '08: Proceedings of the 2008 ACM SIGMOD
00014  *  international conference on Management of data,
00015  *  pages 729-738, New York, NY, USA. ACM.
00016  *  http://doi.acm.org/10.1145/1376616.1376690
00017  *
00018  * and further elaborated in Cahill's doctoral thesis:
00019  *
00020  *  Michael James Cahill. 2009.
00021  *  Serializable Isolation for Snapshot Databases.
00022  *  Sydney Digital Theses.
00023  *  University of Sydney, School of Information Technologies.
00024  *  http://hdl.handle.net/2123/5353
00025  *
00026  *
00027  * Predicate locks for Serializable Snapshot Isolation (SSI) are SIREAD
00028  * locks, which are so different from normal locks that a distinct set of
00029  * structures is required to handle them.  They are needed to detect
00030  * rw-conflicts when the read happens before the write.  (When the write
00031  * occurs first, the reading transaction can check for a conflict by
00032  * examining the MVCC data.)
00033  *
00034  * (1)  Besides tuples actually read, they must cover ranges of tuples
00035  *      which would have been read based on the predicate.  This will
00036  *      require modelling the predicates through locks against database
00037  *      objects such as pages, index ranges, or entire tables.
00038  *
00039  * (2)  They must be kept in RAM for quick access.  Because of this, it
00040  *      isn't possible to always maintain tuple-level granularity -- when
00041  *      the space allocated to store these approaches exhaustion, a
00042  *      request for a lock may need to scan for situations where a single
00043  *      transaction holds many fine-grained locks which can be coalesced
00044  *      into a single coarser-grained lock.
00045  *
00046  * (3)  They never block anything; they are more like flags than locks
00047  *      in that regard; although they refer to database objects and are
00048  *      used to identify rw-conflicts with normal write locks.
00049  *
00050  * (4)  While they are associated with a transaction, they must survive
00051  *      a successful COMMIT of that transaction, and remain until all
00052  *      overlapping transactions complete.  This even means that they
00053  *      must survive termination of the transaction's process.  If a
00054  *      top level transaction is rolled back, however, it is immediately
00055  *      flagged so that it can be ignored, and its SIREAD locks can be
00056  *      released any time after that.
00057  *
00058  * (5)  The only transactions which create SIREAD locks or check for
00059  *      conflicts with them are serializable transactions.
00060  *
00061  * (6)  When a write lock for a top level transaction is found to cover
00062  *      an existing SIREAD lock for the same transaction, the SIREAD lock
00063  *      can be deleted.
00064  *
00065  * (7)  A write from a serializable transaction must ensure that a xact
00066  *      record exists for the transaction, with the same lifespan (until
00067  *      all concurrent transaction complete or the transaction is rolled
00068  *      back) so that rw-dependencies to that transaction can be
00069  *      detected.
00070  *
00071  * We use an optimization for read-only transactions. Under certain
00072  * circumstances, a read-only transaction's snapshot can be shown to
00073  * never have conflicts with other transactions.  This is referred to
00074  * as a "safe" snapshot (and one known not to be is "unsafe").
00075  * However, it can't be determined whether a snapshot is safe until
00076  * all concurrent read/write transactions complete.
00077  *
00078  * Once a read-only transaction is known to have a safe snapshot, it
00079  * can release its predicate locks and exempt itself from further
00080  * predicate lock tracking. READ ONLY DEFERRABLE transactions run only
00081  * on safe snapshots, waiting as necessary for one to be available.
00082  *
00083  *
00084  * Lightweight locks to manage access to the predicate locking shared
00085  * memory objects must be taken in this order, and should be released in
00086  * reverse order:
00087  *
00088  *  SerializableFinishedListLock
00089  *      - Protects the list of transactions which have completed but which
00090  *          may yet matter because they overlap still-active transactions.
00091  *
00092  *  SerializablePredicateLockListLock
00093  *      - Protects the linked list of locks held by a transaction.  Note
00094  *          that the locks themselves are also covered by the partition
00095  *          locks of their respective lock targets; this lock only affects
00096  *          the linked list connecting the locks related to a transaction.
00097  *      - All transactions share this single lock (with no partitioning).
00098  *      - There is never a need for a process other than the one running
00099  *          an active transaction to walk the list of locks held by that
00100  *          transaction.
00101  *      - It is relatively infrequent that another process needs to
00102  *          modify the list for a transaction, but it does happen for such
00103  *          things as index page splits for pages with predicate locks and
00104  *          freeing of predicate locked pages by a vacuum process.  When
00105  *          removing a lock in such cases, the lock itself contains the
00106  *          pointers needed to remove it from the list.  When adding a
00107  *          lock in such cases, the lock can be added using the anchor in
00108  *          the transaction structure.  Neither requires walking the list.
00109  *      - Cleaning up the list for a terminated transaction is sometimes
00110  *          not done on a retail basis, in which case no lock is required.
00111  *      - Due to the above, a process accessing its active transaction's
00112  *          list always uses a shared lock, regardless of whether it is
00113  *          walking or maintaining the list.  This improves concurrency
00114  *          for the common access patterns.
00115  *      - A process which needs to alter the list of a transaction other
00116  *          than its own active transaction must acquire an exclusive
00117  *          lock.
00118  *
00119  *  FirstPredicateLockMgrLock based partition locks
00120  *      - The same lock protects a target, all locks on that target, and
00121  *          the linked list of locks on the target..
00122  *      - When more than one is needed, acquire in ascending order.
00123  *
00124  *  SerializableXactHashLock
00125  *      - Protects both PredXact and SerializableXidHash.
00126  *
00127  *
00128  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00129  * Portions Copyright (c) 1994, Regents of the University of California
00130  *
00131  *
00132  * IDENTIFICATION
00133  *    src/backend/storage/lmgr/predicate.c
00134  *
00135  *-------------------------------------------------------------------------
00136  */
00137 /*
00138  * INTERFACE ROUTINES
00139  *
00140  * housekeeping for setting up shared memory predicate lock structures
00141  *      InitPredicateLocks(void)
00142  *      PredicateLockShmemSize(void)
00143  *
00144  * predicate lock reporting
00145  *      GetPredicateLockStatusData(void)
00146  *      PageIsPredicateLocked(Relation relation, BlockNumber blkno)
00147  *
00148  * predicate lock maintenance
00149  *      GetSerializableTransactionSnapshot(Snapshot snapshot)
00150  *      SetSerializableTransactionSnapshot(Snapshot snapshot,
00151  *                                         TransactionId sourcexid)
00152  *      RegisterPredicateLockingXid(void)
00153  *      PredicateLockRelation(Relation relation, Snapshot snapshot)
00154  *      PredicateLockPage(Relation relation, BlockNumber blkno,
00155  *                      Snapshot snapshot)
00156  *      PredicateLockTuple(Relation relation, HeapTuple tuple,
00157  *                      Snapshot snapshot)
00158  *      PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
00159  *                             BlockNumber newblkno);
00160  *      PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
00161  *                               BlockNumber newblkno);
00162  *      TransferPredicateLocksToHeapRelation(Relation relation)
00163  *      ReleasePredicateLocks(bool isCommit)
00164  *
00165  * conflict detection (may also trigger rollback)
00166  *      CheckForSerializableConflictOut(bool visible, Relation relation,
00167  *                                      HeapTupleData *tup, Buffer buffer,
00168  *                                      Snapshot snapshot)
00169  *      CheckForSerializableConflictIn(Relation relation, HeapTupleData *tup,
00170  *                                     Buffer buffer)
00171  *      CheckTableForSerializableConflictIn(Relation relation)
00172  *
00173  * final rollback checking
00174  *      PreCommit_CheckForSerializationFailure(void)
00175  *
00176  * two-phase commit support
00177  *      AtPrepare_PredicateLocks(void);
00178  *      PostPrepare_PredicateLocks(TransactionId xid);
00179  *      PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
00180  *      predicatelock_twophase_recover(TransactionId xid, uint16 info,
00181  *                                     void *recdata, uint32 len);
00182  */
00183 
00184 #include "postgres.h"
00185 
00186 #include "access/htup_details.h"
00187 #include "access/slru.h"
00188 #include "access/subtrans.h"
00189 #include "access/transam.h"
00190 #include "access/twophase.h"
00191 #include "access/twophase_rmgr.h"
00192 #include "access/xact.h"
00193 #include "miscadmin.h"
00194 #include "storage/bufmgr.h"
00195 #include "storage/predicate.h"
00196 #include "storage/predicate_internals.h"
00197 #include "storage/proc.h"
00198 #include "storage/procarray.h"
00199 #include "utils/rel.h"
00200 #include "utils/snapmgr.h"
00201 #include "utils/tqual.h"
00202 
00203 /* Uncomment the next line to test the graceful degradation code. */
00204 /* #define TEST_OLDSERXID */
00205 
00206 /*
00207  * Test the most selective fields first, for performance.
00208  *
00209  * a is covered by b if all of the following hold:
00210  *  1) a.database = b.database
00211  *  2) a.relation = b.relation
00212  *  3) b.offset is invalid (b is page-granularity or higher)
00213  *  4) either of the following:
00214  *      4a) a.offset is valid (a is tuple-granularity) and a.page = b.page
00215  *   or 4b) a.offset is invalid and b.page is invalid (a is
00216  *          page-granularity and b is relation-granularity
00217  */
00218 #define TargetTagIsCoveredBy(covered_target, covering_target)           \
00219     ((GET_PREDICATELOCKTARGETTAG_RELATION(covered_target) == /* (2) */  \
00220       GET_PREDICATELOCKTARGETTAG_RELATION(covering_target))             \
00221      && (GET_PREDICATELOCKTARGETTAG_OFFSET(covering_target) ==          \
00222          InvalidOffsetNumber)                                /* (3) */  \
00223      && (((GET_PREDICATELOCKTARGETTAG_OFFSET(covered_target) !=         \
00224            InvalidOffsetNumber)                              /* (4a) */ \
00225           && (GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) ==       \
00226               GET_PREDICATELOCKTARGETTAG_PAGE(covered_target)))         \
00227          || ((GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) ==       \
00228               InvalidBlockNumber)                            /* (4b) */ \
00229              && (GET_PREDICATELOCKTARGETTAG_PAGE(covered_target)        \
00230                  != InvalidBlockNumber)))                               \
00231      && (GET_PREDICATELOCKTARGETTAG_DB(covered_target) ==    /* (1) */  \
00232          GET_PREDICATELOCKTARGETTAG_DB(covering_target)))
00233 
00234 /*
00235  * The predicate locking target and lock shared hash tables are partitioned to
00236  * reduce contention.  To determine which partition a given target belongs to,
00237  * compute the tag's hash code with PredicateLockTargetTagHashCode(), then
00238  * apply one of these macros.
00239  * NB: NUM_PREDICATELOCK_PARTITIONS must be a power of 2!
00240  */
00241 #define PredicateLockHashPartition(hashcode) \
00242     ((hashcode) % NUM_PREDICATELOCK_PARTITIONS)
00243 #define PredicateLockHashPartitionLock(hashcode) \
00244     ((LWLockId) (FirstPredicateLockMgrLock + PredicateLockHashPartition(hashcode)))
00245 
00246 #define NPREDICATELOCKTARGETENTS() \
00247     mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
00248 
00249 #define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink)))
00250 
00251 /*
00252  * Note that a sxact is marked "prepared" once it has passed
00253  * PreCommit_CheckForSerializationFailure, even if it isn't using
00254  * 2PC. This is the point at which it can no longer be aborted.
00255  *
00256  * The PREPARED flag remains set after commit, so SxactIsCommitted
00257  * implies SxactIsPrepared.
00258  */
00259 #define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0)
00260 #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0)
00261 #define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0)
00262 #define SxactIsDoomed(sxact) (((sxact)->flags & SXACT_FLAG_DOOMED) != 0)
00263 #define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
00264 #define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0)
00265 #define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0)
00266 /*
00267  * The following macro actually means that the specified transaction has a
00268  * conflict out *to a transaction which committed ahead of it*.  It's hard
00269  * to get that into a name of a reasonable length.
00270  */
00271 #define SxactHasConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_CONFLICT_OUT) != 0)
00272 #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
00273 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
00274 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
00275 
00276 /*
00277  * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
00278  *
00279  * To avoid unnecessary recomputations of the hash code, we try to do this
00280  * just once per function, and then pass it around as needed.  Aside from
00281  * passing the hashcode to hash_search_with_hash_value(), we can extract
00282  * the lock partition number from the hashcode.
00283  */
00284 #define PredicateLockTargetTagHashCode(predicatelocktargettag) \
00285     (tag_hash((predicatelocktargettag), sizeof(PREDICATELOCKTARGETTAG)))
00286 
00287 /*
00288  * Given a predicate lock tag, and the hash for its target,
00289  * compute the lock hash.
00290  *
00291  * To make the hash code also depend on the transaction, we xor the sxid
00292  * struct's address into the hash code, left-shifted so that the
00293  * partition-number bits don't change.  Since this is only a hash, we
00294  * don't care if we lose high-order bits of the address; use an
00295  * intermediate variable to suppress cast-pointer-to-int warnings.
00296  */
00297 #define PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash) \
00298     ((targethash) ^ ((uint32) PointerGetDatum((predicatelocktag)->myXact)) \
00299      << LOG2_NUM_PREDICATELOCK_PARTITIONS)
00300 
00301 
00302 /*
00303  * The SLRU buffer area through which we access the old xids.
00304  */
00305 static SlruCtlData OldSerXidSlruCtlData;
00306 
00307 #define OldSerXidSlruCtl            (&OldSerXidSlruCtlData)
00308 
00309 #define OLDSERXID_PAGESIZE          BLCKSZ
00310 #define OLDSERXID_ENTRYSIZE         sizeof(SerCommitSeqNo)
00311 #define OLDSERXID_ENTRIESPERPAGE    (OLDSERXID_PAGESIZE / OLDSERXID_ENTRYSIZE)
00312 
00313 /*
00314  * Set maximum pages based on the lesser of the number needed to track all
00315  * transactions and the maximum that SLRU supports.
00316  */
00317 #define OLDSERXID_MAX_PAGE          Min(SLRU_PAGES_PER_SEGMENT * 0x10000 - 1, \
00318                                         (MaxTransactionId) / OLDSERXID_ENTRIESPERPAGE)
00319 
00320 #define OldSerXidNextPage(page) (((page) >= OLDSERXID_MAX_PAGE) ? 0 : (page) + 1)
00321 
00322 #define OldSerXidValue(slotno, xid) (*((SerCommitSeqNo *) \
00323     (OldSerXidSlruCtl->shared->page_buffer[slotno] + \
00324     ((((uint32) (xid)) % OLDSERXID_ENTRIESPERPAGE) * OLDSERXID_ENTRYSIZE))))
00325 
00326 #define OldSerXidPage(xid)  ((((uint32) (xid)) / OLDSERXID_ENTRIESPERPAGE) % (OLDSERXID_MAX_PAGE + 1))
00327 #define OldSerXidSegment(page)  ((page) / SLRU_PAGES_PER_SEGMENT)
00328 
00329 typedef struct OldSerXidControlData
00330 {
00331     int         headPage;       /* newest initialized page */
00332     TransactionId headXid;      /* newest valid Xid in the SLRU */
00333     TransactionId tailXid;      /* oldest xmin we might be interested in */
00334     bool        warningIssued;  /* have we issued SLRU wrap-around warning? */
00335 }   OldSerXidControlData;
00336 
00337 typedef struct OldSerXidControlData *OldSerXidControl;
00338 
00339 static OldSerXidControl oldSerXidControl;
00340 
00341 /*
00342  * When the oldest committed transaction on the "finished" list is moved to
00343  * SLRU, its predicate locks will be moved to this "dummy" transaction,
00344  * collapsing duplicate targets.  When a duplicate is found, the later
00345  * commitSeqNo is used.
00346  */
00347 static SERIALIZABLEXACT *OldCommittedSxact;
00348 
00349 
00350 /* This configuration variable is used to set the predicate lock table size */
00351 int         max_predicate_locks_per_xact;       /* set by guc.c */
00352 
00353 /*
00354  * This provides a list of objects in order to track transactions
00355  * participating in predicate locking.  Entries in the list are fixed size,
00356  * and reside in shared memory.  The memory address of an entry must remain
00357  * fixed during its lifetime.  The list will be protected from concurrent
00358  * update externally; no provision is made in this code to manage that.  The
00359  * number of entries in the list, and the size allowed for each entry is
00360  * fixed upon creation.
00361  */
00362 static PredXactList PredXact;
00363 
00364 /*
00365  * This provides a pool of RWConflict data elements to use in conflict lists
00366  * between transactions.
00367  */
00368 static RWConflictPoolHeader RWConflictPool;
00369 
00370 /*
00371  * The predicate locking hash tables are in shared memory.
00372  * Each backend keeps pointers to them.
00373  */
00374 static HTAB *SerializableXidHash;
00375 static HTAB *PredicateLockTargetHash;
00376 static HTAB *PredicateLockHash;
00377 static SHM_QUEUE *FinishedSerializableTransactions;
00378 
00379 /*
00380  * Tag for a dummy entry in PredicateLockTargetHash. By temporarily removing
00381  * this entry, you can ensure that there's enough scratch space available for
00382  * inserting one entry in the hash table. This is an otherwise-invalid tag.
00383  */
00384 static const PREDICATELOCKTARGETTAG ScratchTargetTag = {0, 0, 0, 0, 0};
00385 static uint32 ScratchTargetTagHash;
00386 static int  ScratchPartitionLock;
00387 
00388 /*
00389  * The local hash table used to determine when to combine multiple fine-
00390  * grained locks into a single courser-grained lock.
00391  */
00392 static HTAB *LocalPredicateLockHash = NULL;
00393 
00394 /*
00395  * Keep a pointer to the currently-running serializable transaction (if any)
00396  * for quick reference. Also, remember if we have written anything that could
00397  * cause a rw-conflict.
00398  */
00399 static SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
00400 static bool MyXactDidWrite = false;
00401 
00402 /* local functions */
00403 
00404 static SERIALIZABLEXACT *CreatePredXact(void);
00405 static void ReleasePredXact(SERIALIZABLEXACT *sxact);
00406 static SERIALIZABLEXACT *FirstPredXact(void);
00407 static SERIALIZABLEXACT *NextPredXact(SERIALIZABLEXACT *sxact);
00408 
00409 static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer);
00410 static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
00411 static void SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, SERIALIZABLEXACT *activeXact);
00412 static void ReleaseRWConflict(RWConflict conflict);
00413 static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
00414 
00415 static bool OldSerXidPagePrecedesLogically(int p, int q);
00416 static void OldSerXidInit(void);
00417 static void OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo);
00418 static SerCommitSeqNo OldSerXidGetMinConflictCommitSeqNo(TransactionId xid);
00419 static void OldSerXidSetActiveSerXmin(TransactionId xid);
00420 
00421 static uint32 predicatelock_hash(const void *key, Size keysize);
00422 static void SummarizeOldestCommittedSxact(void);
00423 static Snapshot GetSafeSnapshot(Snapshot snapshot);
00424 static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot,
00425                                       TransactionId sourcexid);
00426 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
00427 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
00428                           PREDICATELOCKTARGETTAG *parent);
00429 static bool CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag);
00430 static void RemoveScratchTarget(bool lockheld);
00431 static void RestoreScratchTarget(bool lockheld);
00432 static void RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target,
00433                            uint32 targettaghash);
00434 static void DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag);
00435 static int  PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag);
00436 static bool CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag);
00437 static void DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag);
00438 static void CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
00439                     uint32 targettaghash,
00440                     SERIALIZABLEXACT *sxact);
00441 static void DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash);
00442 static bool TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
00443                                   PREDICATELOCKTARGETTAG newtargettag,
00444                                   bool removeOld);
00445 static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag);
00446 static void DropAllPredicateLocksFromTable(Relation relation,
00447                                bool transfer);
00448 static void SetNewSxactGlobalXmin(void);
00449 static void ClearOldPredicateLocks(void);
00450 static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
00451                            bool summarize);
00452 static bool XidIsConcurrent(TransactionId xid);
00453 static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag);
00454 static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
00455 static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
00456                                         SERIALIZABLEXACT *writer);
00457 
00458 
00459 /*------------------------------------------------------------------------*/
00460 
00461 /*
00462  * Does this relation participate in predicate locking? Temporary and system
00463  * relations are exempt, as are materialized views.
00464  */
00465 static inline bool
00466 PredicateLockingNeededForRelation(Relation relation)
00467 {
00468     return !(relation->rd_id < FirstBootstrapObjectId ||
00469              RelationUsesLocalBuffers(relation) ||
00470              relation->rd_rel->relkind == RELKIND_MATVIEW);
00471 }
00472 
00473 /*
00474  * When a public interface method is called for a read, this is the test to
00475  * see if we should do a quick return.
00476  *
00477  * Note: this function has side-effects! If this transaction has been flagged
00478  * as RO-safe since the last call, we release all predicate locks and reset
00479  * MySerializableXact. That makes subsequent calls to return quickly.
00480  *
00481  * This is marked as 'inline' to make to eliminate the function call overhead
00482  * in the common case that serialization is not needed.
00483  */
00484 static inline bool
00485 SerializationNeededForRead(Relation relation, Snapshot snapshot)
00486 {
00487     /* Nothing to do if this is not a serializable transaction */
00488     if (MySerializableXact == InvalidSerializableXact)
00489         return false;
00490 
00491     /*
00492      * Don't acquire locks or conflict when scanning with a special snapshot.
00493      * This excludes things like CLUSTER and REINDEX. They use the wholesale
00494      * functions TransferPredicateLocksToHeapRelation() and
00495      * CheckTableForSerializableConflictIn() to participate serialization, but
00496      * the scans involved don't need serialization.
00497      */
00498     if (!IsMVCCSnapshot(snapshot))
00499         return false;
00500 
00501     /*
00502      * Check if we have just become "RO-safe". If we have, immediately release
00503      * all locks as they're not needed anymore. This also resets
00504      * MySerializableXact, so that subsequent calls to this function can exit
00505      * quickly.
00506      *
00507      * A transaction is flagged as RO_SAFE if all concurrent R/W transactions
00508      * commit without having conflicts out to an earlier snapshot, thus
00509      * ensuring that no conflicts are possible for this transaction.
00510      */
00511     if (SxactIsROSafe(MySerializableXact))
00512     {
00513         ReleasePredicateLocks(false);
00514         return false;
00515     }
00516 
00517     /* Check if the relation doesn't participate in predicate locking */
00518     if (!PredicateLockingNeededForRelation(relation))
00519         return false;
00520 
00521     return true;                /* no excuse to skip predicate locking */
00522 }
00523 
00524 /*
00525  * Like SerializationNeededForRead(), but called on writes.
00526  * The logic is the same, but there is no snapshot and we can't be RO-safe.
00527  */
00528 static inline bool
00529 SerializationNeededForWrite(Relation relation)
00530 {
00531     /* Nothing to do if this is not a serializable transaction */
00532     if (MySerializableXact == InvalidSerializableXact)
00533         return false;
00534 
00535     /* Check if the relation doesn't participate in predicate locking */
00536     if (!PredicateLockingNeededForRelation(relation))
00537         return false;
00538 
00539     return true;                /* no excuse to skip predicate locking */
00540 }
00541 
00542 
00543 /*------------------------------------------------------------------------*/
00544 
00545 /*
00546  * These functions are a simple implementation of a list for this specific
00547  * type of struct.  If there is ever a generalized shared memory list, we
00548  * should probably switch to that.
00549  */
00550 static SERIALIZABLEXACT *
00551 CreatePredXact(void)
00552 {
00553     PredXactListElement ptle;
00554 
00555     ptle = (PredXactListElement)
00556         SHMQueueNext(&PredXact->availableList,
00557                      &PredXact->availableList,
00558                      offsetof(PredXactListElementData, link));
00559     if (!ptle)
00560         return NULL;
00561 
00562     SHMQueueDelete(&ptle->link);
00563     SHMQueueInsertBefore(&PredXact->activeList, &ptle->link);
00564     return &ptle->sxact;
00565 }
00566 
00567 static void
00568 ReleasePredXact(SERIALIZABLEXACT *sxact)
00569 {
00570     PredXactListElement ptle;
00571 
00572     Assert(ShmemAddrIsValid(sxact));
00573 
00574     ptle = (PredXactListElement)
00575         (((char *) sxact)
00576          - offsetof(PredXactListElementData, sxact)
00577          + offsetof(PredXactListElementData, link));
00578     SHMQueueDelete(&ptle->link);
00579     SHMQueueInsertBefore(&PredXact->availableList, &ptle->link);
00580 }
00581 
00582 static SERIALIZABLEXACT *
00583 FirstPredXact(void)
00584 {
00585     PredXactListElement ptle;
00586 
00587     ptle = (PredXactListElement)
00588         SHMQueueNext(&PredXact->activeList,
00589                      &PredXact->activeList,
00590                      offsetof(PredXactListElementData, link));
00591     if (!ptle)
00592         return NULL;
00593 
00594     return &ptle->sxact;
00595 }
00596 
00597 static SERIALIZABLEXACT *
00598 NextPredXact(SERIALIZABLEXACT *sxact)
00599 {
00600     PredXactListElement ptle;
00601 
00602     Assert(ShmemAddrIsValid(sxact));
00603 
00604     ptle = (PredXactListElement)
00605         (((char *) sxact)
00606          - offsetof(PredXactListElementData, sxact)
00607          + offsetof(PredXactListElementData, link));
00608     ptle = (PredXactListElement)
00609         SHMQueueNext(&PredXact->activeList,
00610                      &ptle->link,
00611                      offsetof(PredXactListElementData, link));
00612     if (!ptle)
00613         return NULL;
00614 
00615     return &ptle->sxact;
00616 }
00617 
00618 /*------------------------------------------------------------------------*/
00619 
00620 /*
00621  * These functions manage primitive access to the RWConflict pool and lists.
00622  */
00623 static bool
00624 RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
00625 {
00626     RWConflict  conflict;
00627 
00628     Assert(reader != writer);
00629 
00630     /* Check the ends of the purported conflict first. */
00631     if (SxactIsDoomed(reader)
00632         || SxactIsDoomed(writer)
00633         || SHMQueueEmpty(&reader->outConflicts)
00634         || SHMQueueEmpty(&writer->inConflicts))
00635         return false;
00636 
00637     /* A conflict is possible; walk the list to find out. */
00638     conflict = (RWConflict)
00639         SHMQueueNext(&reader->outConflicts,
00640                      &reader->outConflicts,
00641                      offsetof(RWConflictData, outLink));
00642     while (conflict)
00643     {
00644         if (conflict->sxactIn == writer)
00645             return true;
00646         conflict = (RWConflict)
00647             SHMQueueNext(&reader->outConflicts,
00648                          &conflict->outLink,
00649                          offsetof(RWConflictData, outLink));
00650     }
00651 
00652     /* No conflict found. */
00653     return false;
00654 }
00655 
00656 static void
00657 SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
00658 {
00659     RWConflict  conflict;
00660 
00661     Assert(reader != writer);
00662     Assert(!RWConflictExists(reader, writer));
00663 
00664     conflict = (RWConflict)
00665         SHMQueueNext(&RWConflictPool->availableList,
00666                      &RWConflictPool->availableList,
00667                      offsetof(RWConflictData, outLink));
00668     if (!conflict)
00669         ereport(ERROR,
00670                 (errcode(ERRCODE_OUT_OF_MEMORY),
00671                  errmsg("not enough elements in RWConflictPool to record a read/write conflict"),
00672                  errhint("You might need to run fewer transactions at a time or increase max_connections.")));
00673 
00674     SHMQueueDelete(&conflict->outLink);
00675 
00676     conflict->sxactOut = reader;
00677     conflict->sxactIn = writer;
00678     SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
00679     SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
00680 }
00681 
00682 static void
00683 SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact,
00684                           SERIALIZABLEXACT *activeXact)
00685 {
00686     RWConflict  conflict;
00687 
00688     Assert(roXact != activeXact);
00689     Assert(SxactIsReadOnly(roXact));
00690     Assert(!SxactIsReadOnly(activeXact));
00691 
00692     conflict = (RWConflict)
00693         SHMQueueNext(&RWConflictPool->availableList,
00694                      &RWConflictPool->availableList,
00695                      offsetof(RWConflictData, outLink));
00696     if (!conflict)
00697         ereport(ERROR,
00698                 (errcode(ERRCODE_OUT_OF_MEMORY),
00699                  errmsg("not enough elements in RWConflictPool to record a potential read/write conflict"),
00700                  errhint("You might need to run fewer transactions at a time or increase max_connections.")));
00701 
00702     SHMQueueDelete(&conflict->outLink);
00703 
00704     conflict->sxactOut = activeXact;
00705     conflict->sxactIn = roXact;
00706     SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts,
00707                          &conflict->outLink);
00708     SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts,
00709                          &conflict->inLink);
00710 }
00711 
00712 static void
00713 ReleaseRWConflict(RWConflict conflict)
00714 {
00715     SHMQueueDelete(&conflict->inLink);
00716     SHMQueueDelete(&conflict->outLink);
00717     SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
00718 }
00719 
00720 static void
00721 FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
00722 {
00723     RWConflict  conflict,
00724                 nextConflict;
00725 
00726     Assert(SxactIsReadOnly(sxact));
00727     Assert(!SxactIsROSafe(sxact));
00728 
00729     sxact->flags |= SXACT_FLAG_RO_UNSAFE;
00730 
00731     /*
00732      * We know this isn't a safe snapshot, so we can stop looking for other
00733      * potential conflicts.
00734      */
00735     conflict = (RWConflict)
00736         SHMQueueNext(&sxact->possibleUnsafeConflicts,
00737                      &sxact->possibleUnsafeConflicts,
00738                      offsetof(RWConflictData, inLink));
00739     while (conflict)
00740     {
00741         nextConflict = (RWConflict)
00742             SHMQueueNext(&sxact->possibleUnsafeConflicts,
00743                          &conflict->inLink,
00744                          offsetof(RWConflictData, inLink));
00745 
00746         Assert(!SxactIsReadOnly(conflict->sxactOut));
00747         Assert(sxact == conflict->sxactIn);
00748 
00749         ReleaseRWConflict(conflict);
00750 
00751         conflict = nextConflict;
00752     }
00753 }
00754 
00755 /*------------------------------------------------------------------------*/
00756 
00757 /*
00758  * We will work on the page range of 0..OLDSERXID_MAX_PAGE.
00759  * Compares using wraparound logic, as is required by slru.c.
00760  */
00761 static bool
00762 OldSerXidPagePrecedesLogically(int p, int q)
00763 {
00764     int         diff;
00765 
00766     /*
00767      * We have to compare modulo (OLDSERXID_MAX_PAGE+1)/2.  Both inputs should
00768      * be in the range 0..OLDSERXID_MAX_PAGE.
00769      */
00770     Assert(p >= 0 && p <= OLDSERXID_MAX_PAGE);
00771     Assert(q >= 0 && q <= OLDSERXID_MAX_PAGE);
00772 
00773     diff = p - q;
00774     if (diff >= ((OLDSERXID_MAX_PAGE + 1) / 2))
00775         diff -= OLDSERXID_MAX_PAGE + 1;
00776     else if (diff < -((int) (OLDSERXID_MAX_PAGE + 1) / 2))
00777         diff += OLDSERXID_MAX_PAGE + 1;
00778     return diff < 0;
00779 }
00780 
00781 /*
00782  * Initialize for the tracking of old serializable committed xids.
00783  */
00784 static void
00785 OldSerXidInit(void)
00786 {
00787     bool        found;
00788 
00789     /*
00790      * Set up SLRU management of the pg_serial data.
00791      */
00792     OldSerXidSlruCtl->PagePrecedes = OldSerXidPagePrecedesLogically;
00793     SimpleLruInit(OldSerXidSlruCtl, "OldSerXid SLRU Ctl",
00794                   NUM_OLDSERXID_BUFFERS, 0, OldSerXidLock, "pg_serial");
00795     /* Override default assumption that writes should be fsync'd */
00796     OldSerXidSlruCtl->do_fsync = false;
00797 
00798     /*
00799      * Create or attach to the OldSerXidControl structure.
00800      */
00801     oldSerXidControl = (OldSerXidControl)
00802         ShmemInitStruct("OldSerXidControlData", sizeof(OldSerXidControlData), &found);
00803 
00804     if (!found)
00805     {
00806         /*
00807          * Set control information to reflect empty SLRU.
00808          */
00809         oldSerXidControl->headPage = -1;
00810         oldSerXidControl->headXid = InvalidTransactionId;
00811         oldSerXidControl->tailXid = InvalidTransactionId;
00812         oldSerXidControl->warningIssued = false;
00813     }
00814 }
00815 
00816 /*
00817  * Record a committed read write serializable xid and the minimum
00818  * commitSeqNo of any transactions to which this xid had a rw-conflict out.
00819  * An invalid seqNo means that there were no conflicts out from xid.
00820  */
00821 static void
00822 OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
00823 {
00824     TransactionId tailXid;
00825     int         targetPage;
00826     int         slotno;
00827     int         firstZeroPage;
00828     bool        isNewPage;
00829 
00830     Assert(TransactionIdIsValid(xid));
00831 
00832     targetPage = OldSerXidPage(xid);
00833 
00834     LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
00835 
00836     /*
00837      * If no serializable transactions are active, there shouldn't be anything
00838      * to push out to the SLRU.  Hitting this assert would mean there's
00839      * something wrong with the earlier cleanup logic.
00840      */
00841     tailXid = oldSerXidControl->tailXid;
00842     Assert(TransactionIdIsValid(tailXid));
00843 
00844     /*
00845      * If the SLRU is currently unused, zero out the whole active region from
00846      * tailXid to headXid before taking it into use. Otherwise zero out only
00847      * any new pages that enter the tailXid-headXid range as we advance
00848      * headXid.
00849      */
00850     if (oldSerXidControl->headPage < 0)
00851     {
00852         firstZeroPage = OldSerXidPage(tailXid);
00853         isNewPage = true;
00854     }
00855     else
00856     {
00857         firstZeroPage = OldSerXidNextPage(oldSerXidControl->headPage);
00858         isNewPage = OldSerXidPagePrecedesLogically(oldSerXidControl->headPage,
00859                                                    targetPage);
00860     }
00861 
00862     if (!TransactionIdIsValid(oldSerXidControl->headXid)
00863         || TransactionIdFollows(xid, oldSerXidControl->headXid))
00864         oldSerXidControl->headXid = xid;
00865     if (isNewPage)
00866         oldSerXidControl->headPage = targetPage;
00867 
00868     /*
00869      * Give a warning if we're about to run out of SLRU pages.
00870      *
00871      * slru.c has a maximum of 64k segments, with 32 (SLRU_PAGES_PER_SEGMENT)
00872      * pages each. We need to store a 64-bit integer for each Xid, and with
00873      * default 8k block size, 65536*32 pages is only enough to cover 2^30
00874      * XIDs. If we're about to hit that limit and wrap around, warn the user.
00875      *
00876      * To avoid spamming the user, we only give one warning when we've used 1
00877      * billion XIDs, and stay silent until the situation is fixed and the
00878      * number of XIDs used falls below 800 million again.
00879      *
00880      * XXX: We have no safeguard to actually *prevent* the wrap-around,
00881      * though. All you get is a warning.
00882      */
00883     if (oldSerXidControl->warningIssued)
00884     {
00885         TransactionId lowWatermark;
00886 
00887         lowWatermark = tailXid + 800000000;
00888         if (lowWatermark < FirstNormalTransactionId)
00889             lowWatermark = FirstNormalTransactionId;
00890         if (TransactionIdPrecedes(xid, lowWatermark))
00891             oldSerXidControl->warningIssued = false;
00892     }
00893     else
00894     {
00895         TransactionId highWatermark;
00896 
00897         highWatermark = tailXid + 1000000000;
00898         if (highWatermark < FirstNormalTransactionId)
00899             highWatermark = FirstNormalTransactionId;
00900         if (TransactionIdFollows(xid, highWatermark))
00901         {
00902             oldSerXidControl->warningIssued = true;
00903             ereport(WARNING,
00904                     (errmsg("memory for serializable conflict tracking is nearly exhausted"),
00905                      errhint("There might be an idle transaction or a forgotten prepared transaction causing this.")));
00906         }
00907     }
00908 
00909     if (isNewPage)
00910     {
00911         /* Initialize intervening pages. */
00912         while (firstZeroPage != targetPage)
00913         {
00914             (void) SimpleLruZeroPage(OldSerXidSlruCtl, firstZeroPage);
00915             firstZeroPage = OldSerXidNextPage(firstZeroPage);
00916         }
00917         slotno = SimpleLruZeroPage(OldSerXidSlruCtl, targetPage);
00918     }
00919     else
00920         slotno = SimpleLruReadPage(OldSerXidSlruCtl, targetPage, true, xid);
00921 
00922     OldSerXidValue(slotno, xid) = minConflictCommitSeqNo;
00923     OldSerXidSlruCtl->shared->page_dirty[slotno] = true;
00924 
00925     LWLockRelease(OldSerXidLock);
00926 }
00927 
00928 /*
00929  * Get the minimum commitSeqNo for any conflict out for the given xid.  For
00930  * a transaction which exists but has no conflict out, InvalidSerCommitSeqNo
00931  * will be returned.
00932  */
00933 static SerCommitSeqNo
00934 OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)
00935 {
00936     TransactionId headXid;
00937     TransactionId tailXid;
00938     SerCommitSeqNo val;
00939     int         slotno;
00940 
00941     Assert(TransactionIdIsValid(xid));
00942 
00943     LWLockAcquire(OldSerXidLock, LW_SHARED);
00944     headXid = oldSerXidControl->headXid;
00945     tailXid = oldSerXidControl->tailXid;
00946     LWLockRelease(OldSerXidLock);
00947 
00948     if (!TransactionIdIsValid(headXid))
00949         return 0;
00950 
00951     Assert(TransactionIdIsValid(tailXid));
00952 
00953     if (TransactionIdPrecedes(xid, tailXid)
00954         || TransactionIdFollows(xid, headXid))
00955         return 0;
00956 
00957     /*
00958      * The following function must be called without holding OldSerXidLock,
00959      * but will return with that lock held, which must then be released.
00960      */
00961     slotno = SimpleLruReadPage_ReadOnly(OldSerXidSlruCtl,
00962                                         OldSerXidPage(xid), xid);
00963     val = OldSerXidValue(slotno, xid);
00964     LWLockRelease(OldSerXidLock);
00965     return val;
00966 }
00967 
00968 /*
00969  * Call this whenever there is a new xmin for active serializable
00970  * transactions.  We don't need to keep information on transactions which
00971  * precede that.  InvalidTransactionId means none active, so everything in
00972  * the SLRU can be discarded.
00973  */
00974 static void
00975 OldSerXidSetActiveSerXmin(TransactionId xid)
00976 {
00977     LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
00978 
00979     /*
00980      * When no sxacts are active, nothing overlaps, set the xid values to
00981      * invalid to show that there are no valid entries.  Don't clear headPage,
00982      * though.  A new xmin might still land on that page, and we don't want to
00983      * repeatedly zero out the same page.
00984      */
00985     if (!TransactionIdIsValid(xid))
00986     {
00987         oldSerXidControl->tailXid = InvalidTransactionId;
00988         oldSerXidControl->headXid = InvalidTransactionId;
00989         LWLockRelease(OldSerXidLock);
00990         return;
00991     }
00992 
00993     /*
00994      * When we're recovering prepared transactions, the global xmin might move
00995      * backwards depending on the order they're recovered. Normally that's not
00996      * OK, but during recovery no serializable transactions will commit, so
00997      * the SLRU is empty and we can get away with it.
00998      */
00999     if (RecoveryInProgress())
01000     {
01001         Assert(oldSerXidControl->headPage < 0);
01002         if (!TransactionIdIsValid(oldSerXidControl->tailXid)
01003             || TransactionIdPrecedes(xid, oldSerXidControl->tailXid))
01004         {
01005             oldSerXidControl->tailXid = xid;
01006         }
01007         LWLockRelease(OldSerXidLock);
01008         return;
01009     }
01010 
01011     Assert(!TransactionIdIsValid(oldSerXidControl->tailXid)
01012            || TransactionIdFollows(xid, oldSerXidControl->tailXid));
01013 
01014     oldSerXidControl->tailXid = xid;
01015 
01016     LWLockRelease(OldSerXidLock);
01017 }
01018 
01019 /*
01020  * Perform a checkpoint --- either during shutdown, or on-the-fly
01021  *
01022  * We don't have any data that needs to survive a restart, but this is a
01023  * convenient place to truncate the SLRU.
01024  */
01025 void
01026 CheckPointPredicate(void)
01027 {
01028     int         tailPage;
01029 
01030     LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
01031 
01032     /* Exit quickly if the SLRU is currently not in use. */
01033     if (oldSerXidControl->headPage < 0)
01034     {
01035         LWLockRelease(OldSerXidLock);
01036         return;
01037     }
01038 
01039     if (TransactionIdIsValid(oldSerXidControl->tailXid))
01040     {
01041         /* We can truncate the SLRU up to the page containing tailXid */
01042         tailPage = OldSerXidPage(oldSerXidControl->tailXid);
01043     }
01044     else
01045     {
01046         /*
01047          * The SLRU is no longer needed. Truncate to head before we set head
01048          * invalid.
01049          *
01050          * XXX: It's possible that the SLRU is not needed again until XID
01051          * wrap-around has happened, so that the segment containing headPage
01052          * that we leave behind will appear to be new again. In that case it
01053          * won't be removed until XID horizon advances enough to make it
01054          * current again.
01055          */
01056         tailPage = oldSerXidControl->headPage;
01057         oldSerXidControl->headPage = -1;
01058     }
01059 
01060     LWLockRelease(OldSerXidLock);
01061 
01062     /* Truncate away pages that are no longer required */
01063     SimpleLruTruncate(OldSerXidSlruCtl, tailPage);
01064 
01065     /*
01066      * Flush dirty SLRU pages to disk
01067      *
01068      * This is not actually necessary from a correctness point of view. We do
01069      * it merely as a debugging aid.
01070      *
01071      * We're doing this after the truncation to avoid writing pages right
01072      * before deleting the file in which they sit, which would be completely
01073      * pointless.
01074      */
01075     SimpleLruFlush(OldSerXidSlruCtl, true);
01076 }
01077 
01078 /*------------------------------------------------------------------------*/
01079 
01080 /*
01081  * InitPredicateLocks -- Initialize the predicate locking data structures.
01082  *
01083  * This is called from CreateSharedMemoryAndSemaphores(), which see for
01084  * more comments.  In the normal postmaster case, the shared hash tables
01085  * are created here.  Backends inherit the pointers
01086  * to the shared tables via fork().  In the EXEC_BACKEND case, each
01087  * backend re-executes this code to obtain pointers to the already existing
01088  * shared hash tables.
01089  */
01090 void
01091 InitPredicateLocks(void)
01092 {
01093     HASHCTL     info;
01094     int         hash_flags;
01095     long        max_table_size;
01096     Size        requestSize;
01097     bool        found;
01098 
01099     /*
01100      * Compute size of predicate lock target hashtable. Note these
01101      * calculations must agree with PredicateLockShmemSize!
01102      */
01103     max_table_size = NPREDICATELOCKTARGETENTS();
01104 
01105     /*
01106      * Allocate hash table for PREDICATELOCKTARGET structs.  This stores
01107      * per-predicate-lock-target information.
01108      */
01109     MemSet(&info, 0, sizeof(info));
01110     info.keysize = sizeof(PREDICATELOCKTARGETTAG);
01111     info.entrysize = sizeof(PREDICATELOCKTARGET);
01112     info.hash = tag_hash;
01113     info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
01114     hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION | HASH_FIXED_SIZE);
01115 
01116     PredicateLockTargetHash = ShmemInitHash("PREDICATELOCKTARGET hash",
01117                                             max_table_size,
01118                                             max_table_size,
01119                                             &info,
01120                                             hash_flags);
01121 
01122     /* Assume an average of 2 xacts per target */
01123     max_table_size *= 2;
01124 
01125     /*
01126      * Reserve a dummy entry in the hash table; we use it to make sure there's
01127      * always one entry available when we need to split or combine a page,
01128      * because running out of space there could mean aborting a
01129      * non-serializable transaction.
01130      */
01131     hash_search(PredicateLockTargetHash, &ScratchTargetTag, HASH_ENTER, NULL);
01132 
01133     /*
01134      * Allocate hash table for PREDICATELOCK structs.  This stores per
01135      * xact-lock-of-a-target information.
01136      */
01137     MemSet(&info, 0, sizeof(info));
01138     info.keysize = sizeof(PREDICATELOCKTAG);
01139     info.entrysize = sizeof(PREDICATELOCK);
01140     info.hash = predicatelock_hash;
01141     info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
01142     hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION | HASH_FIXED_SIZE);
01143 
01144     PredicateLockHash = ShmemInitHash("PREDICATELOCK hash",
01145                                       max_table_size,
01146                                       max_table_size,
01147                                       &info,
01148                                       hash_flags);
01149 
01150     /*
01151      * Compute size for serializable transaction hashtable. Note these
01152      * calculations must agree with PredicateLockShmemSize!
01153      */
01154     max_table_size = (MaxBackends + max_prepared_xacts);
01155 
01156     /*
01157      * Allocate a list to hold information on transactions participating in
01158      * predicate locking.
01159      *
01160      * Assume an average of 10 predicate locking transactions per backend.
01161      * This allows aggressive cleanup while detail is present before data must
01162      * be summarized for storage in SLRU and the "dummy" transaction.
01163      */
01164     max_table_size *= 10;
01165 
01166     PredXact = ShmemInitStruct("PredXactList",
01167                                PredXactListDataSize,
01168                                &found);
01169     if (!found)
01170     {
01171         int         i;
01172 
01173         SHMQueueInit(&PredXact->availableList);
01174         SHMQueueInit(&PredXact->activeList);
01175         PredXact->SxactGlobalXmin = InvalidTransactionId;
01176         PredXact->SxactGlobalXminCount = 0;
01177         PredXact->WritableSxactCount = 0;
01178         PredXact->LastSxactCommitSeqNo = FirstNormalSerCommitSeqNo - 1;
01179         PredXact->CanPartialClearThrough = 0;
01180         PredXact->HavePartialClearedThrough = 0;
01181         requestSize = mul_size((Size) max_table_size,
01182                                PredXactListElementDataSize);
01183         PredXact->element = ShmemAlloc(requestSize);
01184         if (PredXact->element == NULL)
01185             ereport(ERROR,
01186                     (errcode(ERRCODE_OUT_OF_MEMORY),
01187              errmsg("not enough shared memory for elements of data structure"
01188                     " \"%s\" (%lu bytes requested)",
01189                     "PredXactList", (unsigned long) requestSize)));
01190         /* Add all elements to available list, clean. */
01191         memset(PredXact->element, 0, requestSize);
01192         for (i = 0; i < max_table_size; i++)
01193         {
01194             SHMQueueInsertBefore(&(PredXact->availableList),
01195                                  &(PredXact->element[i].link));
01196         }
01197         PredXact->OldCommittedSxact = CreatePredXact();
01198         SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid);
01199         PredXact->OldCommittedSxact->prepareSeqNo = 0;
01200         PredXact->OldCommittedSxact->commitSeqNo = 0;
01201         PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
01202         SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
01203         SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
01204         SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
01205         SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
01206         SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
01207         PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
01208         PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
01209         PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
01210         PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
01211         PredXact->OldCommittedSxact->pid = 0;
01212     }
01213     /* This never changes, so let's keep a local copy. */
01214     OldCommittedSxact = PredXact->OldCommittedSxact;
01215 
01216     /*
01217      * Allocate hash table for SERIALIZABLEXID structs.  This stores per-xid
01218      * information for serializable transactions which have accessed data.
01219      */
01220     MemSet(&info, 0, sizeof(info));
01221     info.keysize = sizeof(SERIALIZABLEXIDTAG);
01222     info.entrysize = sizeof(SERIALIZABLEXID);
01223     info.hash = tag_hash;
01224     hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE);
01225 
01226     SerializableXidHash = ShmemInitHash("SERIALIZABLEXID hash",
01227                                         max_table_size,
01228                                         max_table_size,
01229                                         &info,
01230                                         hash_flags);
01231 
01232     /*
01233      * Allocate space for tracking rw-conflicts in lists attached to the
01234      * transactions.
01235      *
01236      * Assume an average of 5 conflicts per transaction.  Calculations suggest
01237      * that this will prevent resource exhaustion in even the most pessimal
01238      * loads up to max_connections = 200 with all 200 connections pounding the
01239      * database with serializable transactions.  Beyond that, there may be
01240      * occasional transactions canceled when trying to flag conflicts. That's
01241      * probably OK.
01242      */
01243     max_table_size *= 5;
01244 
01245     RWConflictPool = ShmemInitStruct("RWConflictPool",
01246                                      RWConflictPoolHeaderDataSize,
01247                                      &found);
01248     if (!found)
01249     {
01250         int         i;
01251 
01252         SHMQueueInit(&RWConflictPool->availableList);
01253         requestSize = mul_size((Size) max_table_size,
01254                                RWConflictDataSize);
01255         RWConflictPool->element = ShmemAlloc(requestSize);
01256         if (RWConflictPool->element == NULL)
01257             ereport(ERROR,
01258                     (errcode(ERRCODE_OUT_OF_MEMORY),
01259              errmsg("not enough shared memory for elements of data structure"
01260                     " \"%s\" (%lu bytes requested)",
01261                     "RWConflictPool", (unsigned long) requestSize)));
01262         /* Add all elements to available list, clean. */
01263         memset(RWConflictPool->element, 0, requestSize);
01264         for (i = 0; i < max_table_size; i++)
01265         {
01266             SHMQueueInsertBefore(&(RWConflictPool->availableList),
01267                                  &(RWConflictPool->element[i].outLink));
01268         }
01269     }
01270 
01271     /*
01272      * Create or attach to the header for the list of finished serializable
01273      * transactions.
01274      */
01275     FinishedSerializableTransactions = (SHM_QUEUE *)
01276         ShmemInitStruct("FinishedSerializableTransactions",
01277                         sizeof(SHM_QUEUE),
01278                         &found);
01279     if (!found)
01280         SHMQueueInit(FinishedSerializableTransactions);
01281 
01282     /*
01283      * Initialize the SLRU storage for old committed serializable
01284      * transactions.
01285      */
01286     OldSerXidInit();
01287 
01288     /* Pre-calculate the hash and partition lock of the scratch entry */
01289     ScratchTargetTagHash = PredicateLockTargetTagHashCode(&ScratchTargetTag);
01290     ScratchPartitionLock = PredicateLockHashPartitionLock(ScratchTargetTagHash);
01291 }
01292 
01293 /*
01294  * Estimate shared-memory space used for predicate lock table
01295  */
01296 Size
01297 PredicateLockShmemSize(void)
01298 {
01299     Size        size = 0;
01300     long        max_table_size;
01301 
01302     /* predicate lock target hash table */
01303     max_table_size = NPREDICATELOCKTARGETENTS();
01304     size = add_size(size, hash_estimate_size(max_table_size,
01305                                              sizeof(PREDICATELOCKTARGET)));
01306 
01307     /* predicate lock hash table */
01308     max_table_size *= 2;
01309     size = add_size(size, hash_estimate_size(max_table_size,
01310                                              sizeof(PREDICATELOCK)));
01311 
01312     /*
01313      * Since NPREDICATELOCKTARGETENTS is only an estimate, add 10% safety
01314      * margin.
01315      */
01316     size = add_size(size, size / 10);
01317 
01318     /* transaction list */
01319     max_table_size = MaxBackends + max_prepared_xacts;
01320     max_table_size *= 10;
01321     size = add_size(size, PredXactListDataSize);
01322     size = add_size(size, mul_size((Size) max_table_size,
01323                                    PredXactListElementDataSize));
01324 
01325     /* transaction xid table */
01326     size = add_size(size, hash_estimate_size(max_table_size,
01327                                              sizeof(SERIALIZABLEXID)));
01328 
01329     /* rw-conflict pool */
01330     max_table_size *= 5;
01331     size = add_size(size, RWConflictPoolHeaderDataSize);
01332     size = add_size(size, mul_size((Size) max_table_size,
01333                                    RWConflictDataSize));
01334 
01335     /* Head for list of finished serializable transactions. */
01336     size = add_size(size, sizeof(SHM_QUEUE));
01337 
01338     /* Shared memory structures for SLRU tracking of old committed xids. */
01339     size = add_size(size, sizeof(OldSerXidControlData));
01340     size = add_size(size, SimpleLruShmemSize(NUM_OLDSERXID_BUFFERS, 0));
01341 
01342     return size;
01343 }
01344 
01345 
01346 /*
01347  * Compute the hash code associated with a PREDICATELOCKTAG.
01348  *
01349  * Because we want to use just one set of partition locks for both the
01350  * PREDICATELOCKTARGET and PREDICATELOCK hash tables, we have to make sure
01351  * that PREDICATELOCKs fall into the same partition number as their
01352  * associated PREDICATELOCKTARGETs.  dynahash.c expects the partition number
01353  * to be the low-order bits of the hash code, and therefore a
01354  * PREDICATELOCKTAG's hash code must have the same low-order bits as the
01355  * associated PREDICATELOCKTARGETTAG's hash code.  We achieve this with this
01356  * specialized hash function.
01357  */
01358 static uint32
01359 predicatelock_hash(const void *key, Size keysize)
01360 {
01361     const PREDICATELOCKTAG *predicatelocktag = (const PREDICATELOCKTAG *) key;
01362     uint32      targethash;
01363 
01364     Assert(keysize == sizeof(PREDICATELOCKTAG));
01365 
01366     /* Look into the associated target object, and compute its hash code */
01367     targethash = PredicateLockTargetTagHashCode(&predicatelocktag->myTarget->tag);
01368 
01369     return PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash);
01370 }
01371 
01372 
01373 /*
01374  * GetPredicateLockStatusData
01375  *      Return a table containing the internal state of the predicate
01376  *      lock manager for use in pg_lock_status.
01377  *
01378  * Like GetLockStatusData, this function tries to hold the partition LWLocks
01379  * for as short a time as possible by returning two arrays that simply
01380  * contain the PREDICATELOCKTARGETTAG and SERIALIZABLEXACT for each lock
01381  * table entry. Multiple copies of the same PREDICATELOCKTARGETTAG and
01382  * SERIALIZABLEXACT will likely appear.
01383  */
01384 PredicateLockData *
01385 GetPredicateLockStatusData(void)
01386 {
01387     PredicateLockData *data;
01388     int         i;
01389     int         els,
01390                 el;
01391     HASH_SEQ_STATUS seqstat;
01392     PREDICATELOCK *predlock;
01393 
01394     data = (PredicateLockData *) palloc(sizeof(PredicateLockData));
01395 
01396     /*
01397      * To ensure consistency, take simultaneous locks on all partition locks
01398      * in ascending order, then SerializableXactHashLock.
01399      */
01400     for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
01401         LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
01402     LWLockAcquire(SerializableXactHashLock, LW_SHARED);
01403 
01404     /* Get number of locks and allocate appropriately-sized arrays. */
01405     els = hash_get_num_entries(PredicateLockHash);
01406     data->nelements = els;
01407     data->locktags = (PREDICATELOCKTARGETTAG *)
01408         palloc(sizeof(PREDICATELOCKTARGETTAG) * els);
01409     data->xacts = (SERIALIZABLEXACT *)
01410         palloc(sizeof(SERIALIZABLEXACT) * els);
01411 
01412 
01413     /* Scan through PredicateLockHash and copy contents */
01414     hash_seq_init(&seqstat, PredicateLockHash);
01415 
01416     el = 0;
01417 
01418     while ((predlock = (PREDICATELOCK *) hash_seq_search(&seqstat)))
01419     {
01420         data->locktags[el] = predlock->tag.myTarget->tag;
01421         data->xacts[el] = *predlock->tag.myXact;
01422         el++;
01423     }
01424 
01425     Assert(el == els);
01426 
01427     /* Release locks in reverse order */
01428     LWLockRelease(SerializableXactHashLock);
01429     for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
01430         LWLockRelease(FirstPredicateLockMgrLock + i);
01431 
01432     return data;
01433 }
01434 
01435 /*
01436  * Free up shared memory structures by pushing the oldest sxact (the one at
01437  * the front of the SummarizeOldestCommittedSxact queue) into summary form.
01438  * Each call will free exactly one SERIALIZABLEXACT structure and may also
01439  * free one or more of these structures: SERIALIZABLEXID, PREDICATELOCK,
01440  * PREDICATELOCKTARGET, RWConflictData.
01441  */
01442 static void
01443 SummarizeOldestCommittedSxact(void)
01444 {
01445     SERIALIZABLEXACT *sxact;
01446 
01447     LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
01448 
01449     /*
01450      * This function is only called if there are no sxact slots available.
01451      * Some of them must belong to old, already-finished transactions, so
01452      * there should be something in FinishedSerializableTransactions list that
01453      * we can summarize. However, there's a race condition: while we were not
01454      * holding any locks, a transaction might have ended and cleaned up all
01455      * the finished sxact entries already, freeing up their sxact slots. In
01456      * that case, we have nothing to do here. The caller will find one of the
01457      * slots released by the other backend when it retries.
01458      */
01459     if (SHMQueueEmpty(FinishedSerializableTransactions))
01460     {
01461         LWLockRelease(SerializableFinishedListLock);
01462         return;
01463     }
01464 
01465     /*
01466      * Grab the first sxact off the finished list -- this will be the earliest
01467      * commit.  Remove it from the list.
01468      */
01469     sxact = (SERIALIZABLEXACT *)
01470         SHMQueueNext(FinishedSerializableTransactions,
01471                      FinishedSerializableTransactions,
01472                      offsetof(SERIALIZABLEXACT, finishedLink));
01473     SHMQueueDelete(&(sxact->finishedLink));
01474 
01475     /* Add to SLRU summary information. */
01476     if (TransactionIdIsValid(sxact->topXid) && !SxactIsReadOnly(sxact))
01477         OldSerXidAdd(sxact->topXid, SxactHasConflictOut(sxact)
01478            ? sxact->SeqNo.earliestOutConflictCommit : InvalidSerCommitSeqNo);
01479 
01480     /* Summarize and release the detail. */
01481     ReleaseOneSerializableXact(sxact, false, true);
01482 
01483     LWLockRelease(SerializableFinishedListLock);
01484 }
01485 
01486 /*
01487  * GetSafeSnapshot
01488  *      Obtain and register a snapshot for a READ ONLY DEFERRABLE
01489  *      transaction. Ensures that the snapshot is "safe", i.e. a
01490  *      read-only transaction running on it can execute serializably
01491  *      without further checks. This requires waiting for concurrent
01492  *      transactions to complete, and retrying with a new snapshot if
01493  *      one of them could possibly create a conflict.
01494  *
01495  *      As with GetSerializableTransactionSnapshot (which this is a subroutine
01496  *      for), the passed-in Snapshot pointer should reference a static data
01497  *      area that can safely be passed to GetSnapshotData.
01498  */
01499 static Snapshot
01500 GetSafeSnapshot(Snapshot origSnapshot)
01501 {
01502     Snapshot    snapshot;
01503 
01504     Assert(XactReadOnly && XactDeferrable);
01505 
01506     while (true)
01507     {
01508         /*
01509          * GetSerializableTransactionSnapshotInt is going to call
01510          * GetSnapshotData, so we need to provide it the static snapshot area
01511          * our caller passed to us.  The pointer returned is actually the same
01512          * one passed to it, but we avoid assuming that here.
01513          */
01514         snapshot = GetSerializableTransactionSnapshotInt(origSnapshot,
01515                                                        InvalidTransactionId);
01516 
01517         if (MySerializableXact == InvalidSerializableXact)
01518             return snapshot;    /* no concurrent r/w xacts; it's safe */
01519 
01520         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
01521 
01522         /*
01523          * Wait for concurrent transactions to finish. Stop early if one of
01524          * them marked us as conflicted.
01525          */
01526         MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
01527         while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
01528                  SxactIsROUnsafe(MySerializableXact)))
01529         {
01530             LWLockRelease(SerializableXactHashLock);
01531             ProcWaitForSignal();
01532             LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
01533         }
01534         MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
01535 
01536         if (!SxactIsROUnsafe(MySerializableXact))
01537         {
01538             LWLockRelease(SerializableXactHashLock);
01539             break;              /* success */
01540         }
01541 
01542         LWLockRelease(SerializableXactHashLock);
01543 
01544         /* else, need to retry... */
01545         ereport(DEBUG2,
01546                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
01547                  errmsg("deferrable snapshot was unsafe; trying a new one")));
01548         ReleasePredicateLocks(false);
01549     }
01550 
01551     /*
01552      * Now we have a safe snapshot, so we don't need to do any further checks.
01553      */
01554     Assert(SxactIsROSafe(MySerializableXact));
01555     ReleasePredicateLocks(false);
01556 
01557     return snapshot;
01558 }
01559 
01560 /*
01561  * Acquire a snapshot that can be used for the current transaction.
01562  *
01563  * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact.
01564  * It should be current for this process and be contained in PredXact.
01565  *
01566  * The passed-in Snapshot pointer should reference a static data area that
01567  * can safely be passed to GetSnapshotData.  The return value is actually
01568  * always this same pointer; no new snapshot data structure is allocated
01569  * within this function.
01570  */
01571 Snapshot
01572 GetSerializableTransactionSnapshot(Snapshot snapshot)
01573 {
01574     Assert(IsolationIsSerializable());
01575 
01576     /*
01577      * Can't use serializable mode while recovery is still active, as it is,
01578      * for example, on a hot standby.  We could get here despite the check
01579      * in check_XactIsoLevel() if default_transaction_isolation is set to
01580      * serializable, so phrase the hint accordingly.
01581      */
01582     if (RecoveryInProgress())
01583         ereport(ERROR,
01584                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01585                  errmsg("cannot use serializable mode in a hot standby"),
01586                  errdetail("\"default_transaction_isolation\" is set to \"serializable\"."),
01587                  errhint("You can use \"SET default_transaction_isolation = 'repeatable read'\" to change the default.")));
01588 
01589     /*
01590      * A special optimization is available for SERIALIZABLE READ ONLY
01591      * DEFERRABLE transactions -- we can wait for a suitable snapshot and
01592      * thereby avoid all SSI overhead once it's running.
01593      */
01594     if (XactReadOnly && XactDeferrable)
01595         return GetSafeSnapshot(snapshot);
01596 
01597     return GetSerializableTransactionSnapshotInt(snapshot,
01598                                                  InvalidTransactionId);
01599 }
01600 
01601 /*
01602  * Import a snapshot to be used for the current transaction.
01603  *
01604  * This is nearly the same as GetSerializableTransactionSnapshot, except that
01605  * we don't take a new snapshot, but rather use the data we're handed.
01606  *
01607  * The caller must have verified that the snapshot came from a serializable
01608  * transaction; and if we're read-write, the source transaction must not be
01609  * read-only.
01610  */
01611 void
01612 SetSerializableTransactionSnapshot(Snapshot snapshot,
01613                                    TransactionId sourcexid)
01614 {
01615     Assert(IsolationIsSerializable());
01616 
01617     /*
01618      * We do not allow SERIALIZABLE READ ONLY DEFERRABLE transactions to
01619      * import snapshots, since there's no way to wait for a safe snapshot when
01620      * we're using the snap we're told to.  (XXX instead of throwing an error,
01621      * we could just ignore the XactDeferrable flag?)
01622      */
01623     if (XactReadOnly && XactDeferrable)
01624         ereport(ERROR,
01625                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01626                  errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
01627 
01628     (void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid);
01629 }
01630 
01631 /*
01632  * Guts of GetSerializableTransactionSnapshot
01633  *
01634  * If sourcexid is valid, this is actually an import operation and we should
01635  * skip calling GetSnapshotData, because the snapshot contents are already
01636  * loaded up.  HOWEVER: to avoid race conditions, we must check that the
01637  * source xact is still running after we acquire SerializableXactHashLock.
01638  * We do that by calling ProcArrayInstallImportedXmin.
01639  */
01640 static Snapshot
01641 GetSerializableTransactionSnapshotInt(Snapshot snapshot,
01642                                       TransactionId sourcexid)
01643 {
01644     PGPROC     *proc;
01645     VirtualTransactionId vxid;
01646     SERIALIZABLEXACT *sxact,
01647                *othersxact;
01648     HASHCTL     hash_ctl;
01649 
01650     /* We only do this for serializable transactions.  Once. */
01651     Assert(MySerializableXact == InvalidSerializableXact);
01652 
01653     Assert(!RecoveryInProgress());
01654 
01655     proc = MyProc;
01656     Assert(proc != NULL);
01657     GET_VXID_FROM_PGPROC(vxid, *proc);
01658 
01659     /*
01660      * First we get the sxact structure, which may involve looping and access
01661      * to the "finished" list to free a structure for use.
01662      *
01663      * We must hold SerializableXactHashLock when taking/checking the snapshot
01664      * to avoid race conditions, for much the same reasons that
01665      * GetSnapshotData takes the ProcArrayLock.  Since we might have to
01666      * release SerializableXactHashLock to call SummarizeOldestCommittedSxact,
01667      * this means we have to create the sxact first, which is a bit annoying
01668      * (in particular, an elog(ERROR) in procarray.c would cause us to leak
01669      * the sxact).  Consider refactoring to avoid this.
01670      */
01671 #ifdef TEST_OLDSERXID
01672     SummarizeOldestCommittedSxact();
01673 #endif
01674     LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
01675     do
01676     {
01677         sxact = CreatePredXact();
01678         /* If null, push out committed sxact to SLRU summary & retry. */
01679         if (!sxact)
01680         {
01681             LWLockRelease(SerializableXactHashLock);
01682             SummarizeOldestCommittedSxact();
01683             LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
01684         }
01685     } while (!sxact);
01686 
01687     /* Get the snapshot, or check that it's safe to use */
01688     if (!TransactionIdIsValid(sourcexid))
01689         snapshot = GetSnapshotData(snapshot);
01690     else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcexid))
01691     {
01692         ReleasePredXact(sxact);
01693         LWLockRelease(SerializableXactHashLock);
01694         ereport(ERROR,
01695                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
01696                  errmsg("could not import the requested snapshot"),
01697                errdetail("The source transaction %u is not running anymore.",
01698                          sourcexid)));
01699     }
01700 
01701     /*
01702      * If there are no serializable transactions which are not read-only, we
01703      * can "opt out" of predicate locking and conflict checking for a
01704      * read-only transaction.
01705      *
01706      * The reason this is safe is that a read-only transaction can only become
01707      * part of a dangerous structure if it overlaps a writable transaction
01708      * which in turn overlaps a writable transaction which committed before
01709      * the read-only transaction started.  A new writable transaction can
01710      * overlap this one, but it can't meet the other condition of overlapping
01711      * a transaction which committed before this one started.
01712      */
01713     if (XactReadOnly && PredXact->WritableSxactCount == 0)
01714     {
01715         ReleasePredXact(sxact);
01716         LWLockRelease(SerializableXactHashLock);
01717         return snapshot;
01718     }
01719 
01720     /* Maintain serializable global xmin info. */
01721     if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
01722     {
01723         Assert(PredXact->SxactGlobalXminCount == 0);
01724         PredXact->SxactGlobalXmin = snapshot->xmin;
01725         PredXact->SxactGlobalXminCount = 1;
01726         OldSerXidSetActiveSerXmin(snapshot->xmin);
01727     }
01728     else if (TransactionIdEquals(snapshot->xmin, PredXact->SxactGlobalXmin))
01729     {
01730         Assert(PredXact->SxactGlobalXminCount > 0);
01731         PredXact->SxactGlobalXminCount++;
01732     }
01733     else
01734     {
01735         Assert(TransactionIdFollows(snapshot->xmin, PredXact->SxactGlobalXmin));
01736     }
01737 
01738     /* Initialize the structure. */
01739     sxact->vxid = vxid;
01740     sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
01741     sxact->prepareSeqNo = InvalidSerCommitSeqNo;
01742     sxact->commitSeqNo = InvalidSerCommitSeqNo;
01743     SHMQueueInit(&(sxact->outConflicts));
01744     SHMQueueInit(&(sxact->inConflicts));
01745     SHMQueueInit(&(sxact->possibleUnsafeConflicts));
01746     sxact->topXid = GetTopTransactionIdIfAny();
01747     sxact->finishedBefore = InvalidTransactionId;
01748     sxact->xmin = snapshot->xmin;
01749     sxact->pid = MyProcPid;
01750     SHMQueueInit(&(sxact->predicateLocks));
01751     SHMQueueElemInit(&(sxact->finishedLink));
01752     sxact->flags = 0;
01753     if (XactReadOnly)
01754     {
01755         sxact->flags |= SXACT_FLAG_READ_ONLY;
01756 
01757         /*
01758          * Register all concurrent r/w transactions as possible conflicts; if
01759          * all of them commit without any outgoing conflicts to earlier
01760          * transactions then this snapshot can be deemed safe (and we can run
01761          * without tracking predicate locks).
01762          */
01763         for (othersxact = FirstPredXact();
01764              othersxact != NULL;
01765              othersxact = NextPredXact(othersxact))
01766         {
01767             if (!SxactIsCommitted(othersxact)
01768                 && !SxactIsDoomed(othersxact)
01769                 && !SxactIsReadOnly(othersxact))
01770             {
01771                 SetPossibleUnsafeConflict(sxact, othersxact);
01772             }
01773         }
01774     }
01775     else
01776     {
01777         ++(PredXact->WritableSxactCount);
01778         Assert(PredXact->WritableSxactCount <=
01779                (MaxBackends + max_prepared_xacts));
01780     }
01781 
01782     MySerializableXact = sxact;
01783     MyXactDidWrite = false;     /* haven't written anything yet */
01784 
01785     LWLockRelease(SerializableXactHashLock);
01786 
01787     /* Initialize the backend-local hash table of parent locks */
01788     Assert(LocalPredicateLockHash == NULL);
01789     MemSet(&hash_ctl, 0, sizeof(hash_ctl));
01790     hash_ctl.keysize = sizeof(PREDICATELOCKTARGETTAG);
01791     hash_ctl.entrysize = sizeof(LOCALPREDICATELOCK);
01792     hash_ctl.hash = tag_hash;
01793     LocalPredicateLockHash = hash_create("Local predicate lock",
01794                                          max_predicate_locks_per_xact,
01795                                          &hash_ctl,
01796                                          HASH_ELEM | HASH_FUNCTION);
01797 
01798     return snapshot;
01799 }
01800 
01801 /*
01802  * Register the top level XID in SerializableXidHash.
01803  * Also store it for easy reference in MySerializableXact.
01804  */
01805 void
01806 RegisterPredicateLockingXid(TransactionId xid)
01807 {
01808     SERIALIZABLEXIDTAG sxidtag;
01809     SERIALIZABLEXID *sxid;
01810     bool        found;
01811 
01812     /*
01813      * If we're not tracking predicate lock data for this transaction, we
01814      * should ignore the request and return quickly.
01815      */
01816     if (MySerializableXact == InvalidSerializableXact)
01817         return;
01818 
01819     /* We should have a valid XID and be at the top level. */
01820     Assert(TransactionIdIsValid(xid));
01821 
01822     LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
01823 
01824     /* This should only be done once per transaction. */
01825     Assert(MySerializableXact->topXid == InvalidTransactionId);
01826 
01827     MySerializableXact->topXid = xid;
01828 
01829     sxidtag.xid = xid;
01830     sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
01831                                            &sxidtag,
01832                                            HASH_ENTER, &found);
01833     Assert(!found);
01834 
01835     /* Initialize the structure. */
01836     sxid->myXact = MySerializableXact;
01837     LWLockRelease(SerializableXactHashLock);
01838 }
01839 
01840 
01841 /*
01842  * Check whether there are any predicate locks held by any transaction
01843  * for the page at the given block number.
01844  *
01845  * Note that the transaction may be completed but not yet subject to
01846  * cleanup due to overlapping serializable transactions.  This must
01847  * return valid information regardless of transaction isolation level.
01848  *
01849  * Also note that this doesn't check for a conflicting relation lock,
01850  * just a lock specifically on the given page.
01851  *
01852  * One use is to support proper behavior during GiST index vacuum.
01853  */
01854 bool
01855 PageIsPredicateLocked(Relation relation, BlockNumber blkno)
01856 {
01857     PREDICATELOCKTARGETTAG targettag;
01858     uint32      targettaghash;
01859     LWLockId    partitionLock;
01860     PREDICATELOCKTARGET *target;
01861 
01862     SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
01863                                     relation->rd_node.dbNode,
01864                                     relation->rd_id,
01865                                     blkno);
01866 
01867     targettaghash = PredicateLockTargetTagHashCode(&targettag);
01868     partitionLock = PredicateLockHashPartitionLock(targettaghash);
01869     LWLockAcquire(partitionLock, LW_SHARED);
01870     target = (PREDICATELOCKTARGET *)
01871         hash_search_with_hash_value(PredicateLockTargetHash,
01872                                     &targettag, targettaghash,
01873                                     HASH_FIND, NULL);
01874     LWLockRelease(partitionLock);
01875 
01876     return (target != NULL);
01877 }
01878 
01879 
01880 /*
01881  * Check whether a particular lock is held by this transaction.
01882  *
01883  * Important note: this function may return false even if the lock is
01884  * being held, because it uses the local lock table which is not
01885  * updated if another transaction modifies our lock list (e.g. to
01886  * split an index page). It can also return true when a coarser
01887  * granularity lock that covers this target is being held. Be careful
01888  * to only use this function in circumstances where such errors are
01889  * acceptable!
01890  */
01891 static bool
01892 PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag)
01893 {
01894     LOCALPREDICATELOCK *lock;
01895 
01896     /* check local hash table */
01897     lock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
01898                                               targettag,
01899                                               HASH_FIND, NULL);
01900 
01901     if (!lock)
01902         return false;
01903 
01904     /*
01905      * Found entry in the table, but still need to check whether it's actually
01906      * held -- it could just be a parent of some held lock.
01907      */
01908     return lock->held;
01909 }
01910 
01911 /*
01912  * Return the parent lock tag in the lock hierarchy: the next coarser
01913  * lock that covers the provided tag.
01914  *
01915  * Returns true and sets *parent to the parent tag if one exists,
01916  * returns false if none exists.
01917  */
01918 static bool
01919 GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
01920                           PREDICATELOCKTARGETTAG *parent)
01921 {
01922     switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
01923     {
01924         case PREDLOCKTAG_RELATION:
01925             /* relation locks have no parent lock */
01926             return false;
01927 
01928         case PREDLOCKTAG_PAGE:
01929             /* parent lock is relation lock */
01930             SET_PREDICATELOCKTARGETTAG_RELATION(*parent,
01931                                          GET_PREDICATELOCKTARGETTAG_DB(*tag),
01932                                   GET_PREDICATELOCKTARGETTAG_RELATION(*tag));
01933 
01934             return true;
01935 
01936         case PREDLOCKTAG_TUPLE:
01937             /* parent lock is page lock */
01938             SET_PREDICATELOCKTARGETTAG_PAGE(*parent,
01939                                          GET_PREDICATELOCKTARGETTAG_DB(*tag),
01940                                    GET_PREDICATELOCKTARGETTAG_RELATION(*tag),
01941                                       GET_PREDICATELOCKTARGETTAG_PAGE(*tag));
01942             return true;
01943     }
01944 
01945     /* not reachable */
01946     Assert(false);
01947     return false;
01948 }
01949 
01950 /*
01951  * Check whether the lock we are considering is already covered by a
01952  * coarser lock for our transaction.
01953  *
01954  * Like PredicateLockExists, this function might return a false
01955  * negative, but it will never return a false positive.
01956  */
01957 static bool
01958 CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag)
01959 {
01960     PREDICATELOCKTARGETTAG targettag,
01961                 parenttag;
01962 
01963     targettag = *newtargettag;
01964 
01965     /* check parents iteratively until no more */
01966     while (GetParentPredicateLockTag(&targettag, &parenttag))
01967     {
01968         targettag = parenttag;
01969         if (PredicateLockExists(&targettag))
01970             return true;
01971     }
01972 
01973     /* no more parents to check; lock is not covered */
01974     return false;
01975 }
01976 
01977 /*
01978  * Remove the dummy entry from the predicate lock target hash, to free up some
01979  * scratch space. The caller must be holding SerializablePredicateLockListLock,
01980  * and must restore the entry with RestoreScratchTarget() before releasing the
01981  * lock.
01982  *
01983  * If lockheld is true, the caller is already holding the partition lock
01984  * of the partition containing the scratch entry.
01985  */
01986 static void
01987 RemoveScratchTarget(bool lockheld)
01988 {
01989     bool        found;
01990 
01991     Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
01992 
01993     if (!lockheld)
01994         LWLockAcquire(ScratchPartitionLock, LW_EXCLUSIVE);
01995     hash_search_with_hash_value(PredicateLockTargetHash,
01996                                 &ScratchTargetTag,
01997                                 ScratchTargetTagHash,
01998                                 HASH_REMOVE, &found);
01999     Assert(found);
02000     if (!lockheld)
02001         LWLockRelease(ScratchPartitionLock);
02002 }
02003 
02004 /*
02005  * Re-insert the dummy entry in predicate lock target hash.
02006  */
02007 static void
02008 RestoreScratchTarget(bool lockheld)
02009 {
02010     bool        found;
02011 
02012     Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
02013 
02014     if (!lockheld)
02015         LWLockAcquire(ScratchPartitionLock, LW_EXCLUSIVE);
02016     hash_search_with_hash_value(PredicateLockTargetHash,
02017                                 &ScratchTargetTag,
02018                                 ScratchTargetTagHash,
02019                                 HASH_ENTER, &found);
02020     Assert(!found);
02021     if (!lockheld)
02022         LWLockRelease(ScratchPartitionLock);
02023 }
02024 
02025 /*
02026  * Check whether the list of related predicate locks is empty for a
02027  * predicate lock target, and remove the target if it is.
02028  */
02029 static void
02030 RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
02031 {
02032     PREDICATELOCKTARGET *rmtarget PG_USED_FOR_ASSERTS_ONLY;
02033 
02034     Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
02035 
02036     /* Can't remove it until no locks at this target. */
02037     if (!SHMQueueEmpty(&target->predicateLocks))
02038         return;
02039 
02040     /* Actually remove the target. */
02041     rmtarget = hash_search_with_hash_value(PredicateLockTargetHash,
02042                                            &target->tag,
02043                                            targettaghash,
02044                                            HASH_REMOVE, NULL);
02045     Assert(rmtarget == target);
02046 }
02047 
02048 /*
02049  * Delete child target locks owned by this process.
02050  * This implementation is assuming that the usage of each target tag field
02051  * is uniform.  No need to make this hard if we don't have to.
02052  *
02053  * We aren't acquiring lightweight locks for the predicate lock or lock
02054  * target structures associated with this transaction unless we're going
02055  * to modify them, because no other process is permitted to modify our
02056  * locks.
02057  */
02058 static void
02059 DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
02060 {
02061     SERIALIZABLEXACT *sxact;
02062     PREDICATELOCK *predlock;
02063 
02064     LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
02065     sxact = MySerializableXact;
02066     predlock = (PREDICATELOCK *)
02067         SHMQueueNext(&(sxact->predicateLocks),
02068                      &(sxact->predicateLocks),
02069                      offsetof(PREDICATELOCK, xactLink));
02070     while (predlock)
02071     {
02072         SHM_QUEUE  *predlocksxactlink;
02073         PREDICATELOCK *nextpredlock;
02074         PREDICATELOCKTAG oldlocktag;
02075         PREDICATELOCKTARGET *oldtarget;
02076         PREDICATELOCKTARGETTAG oldtargettag;
02077 
02078         predlocksxactlink = &(predlock->xactLink);
02079         nextpredlock = (PREDICATELOCK *)
02080             SHMQueueNext(&(sxact->predicateLocks),
02081                          predlocksxactlink,
02082                          offsetof(PREDICATELOCK, xactLink));
02083 
02084         oldlocktag = predlock->tag;
02085         Assert(oldlocktag.myXact == sxact);
02086         oldtarget = oldlocktag.myTarget;
02087         oldtargettag = oldtarget->tag;
02088 
02089         if (TargetTagIsCoveredBy(oldtargettag, *newtargettag))
02090         {
02091             uint32      oldtargettaghash;
02092             LWLockId    partitionLock;
02093             PREDICATELOCK *rmpredlock PG_USED_FOR_ASSERTS_ONLY;
02094 
02095             oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
02096             partitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
02097 
02098             LWLockAcquire(partitionLock, LW_EXCLUSIVE);
02099 
02100             SHMQueueDelete(predlocksxactlink);
02101             SHMQueueDelete(&(predlock->targetLink));
02102             rmpredlock = hash_search_with_hash_value
02103                 (PredicateLockHash,
02104                  &oldlocktag,
02105                  PredicateLockHashCodeFromTargetHashCode(&oldlocktag,
02106                                                          oldtargettaghash),
02107                  HASH_REMOVE, NULL);
02108             Assert(rmpredlock == predlock);
02109 
02110             RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
02111 
02112             LWLockRelease(partitionLock);
02113 
02114             DecrementParentLocks(&oldtargettag);
02115         }
02116 
02117         predlock = nextpredlock;
02118     }
02119     LWLockRelease(SerializablePredicateLockListLock);
02120 }
02121 
02122 /*
02123  * Returns the promotion threshold for a given predicate lock
02124  * target. This is the number of descendant locks required to promote
02125  * to the specified tag. Note that the threshold includes non-direct
02126  * descendants, e.g. both tuples and pages for a relation lock.
02127  *
02128  * TODO SSI: We should do something more intelligent about what the
02129  * thresholds are, either making it proportional to the number of
02130  * tuples in a page & pages in a relation, or at least making it a
02131  * GUC. Currently the threshold is 3 for a page lock, and
02132  * max_pred_locks_per_transaction/2 for a relation lock, chosen
02133  * entirely arbitrarily (and without benchmarking).
02134  */
02135 static int
02136 PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag)
02137 {
02138     switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
02139     {
02140         case PREDLOCKTAG_RELATION:
02141             return max_predicate_locks_per_xact / 2;
02142 
02143         case PREDLOCKTAG_PAGE:
02144             return 3;
02145 
02146         case PREDLOCKTAG_TUPLE:
02147 
02148             /*
02149              * not reachable: nothing is finer-granularity than a tuple, so we
02150              * should never try to promote to it.
02151              */
02152             Assert(false);
02153             return 0;
02154     }
02155 
02156     /* not reachable */
02157     Assert(false);
02158     return 0;
02159 }
02160 
02161 /*
02162  * For all ancestors of a newly-acquired predicate lock, increment
02163  * their child count in the parent hash table. If any of them have
02164  * more descendants than their promotion threshold, acquire the
02165  * coarsest such lock.
02166  *
02167  * Returns true if a parent lock was acquired and false otherwise.
02168  */
02169 static bool
02170 CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag)
02171 {
02172     PREDICATELOCKTARGETTAG targettag,
02173                 nexttag,
02174                 promotiontag;
02175     LOCALPREDICATELOCK *parentlock;
02176     bool        found,
02177                 promote;
02178 
02179     promote = false;
02180 
02181     targettag = *reqtag;
02182 
02183     /* check parents iteratively */
02184     while (GetParentPredicateLockTag(&targettag, &nexttag))
02185     {
02186         targettag = nexttag;
02187         parentlock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
02188                                                         &targettag,
02189                                                         HASH_ENTER,
02190                                                         &found);
02191         if (!found)
02192         {
02193             parentlock->held = false;
02194             parentlock->childLocks = 1;
02195         }
02196         else
02197             parentlock->childLocks++;
02198 
02199         if (parentlock->childLocks >=
02200             PredicateLockPromotionThreshold(&targettag))
02201         {
02202             /*
02203              * We should promote to this parent lock. Continue to check its
02204              * ancestors, however, both to get their child counts right and to
02205              * check whether we should just go ahead and promote to one of
02206              * them.
02207              */
02208             promotiontag = targettag;
02209             promote = true;
02210         }
02211     }
02212 
02213     if (promote)
02214     {
02215         /* acquire coarsest ancestor eligible for promotion */
02216         PredicateLockAcquire(&promotiontag);
02217         return true;
02218     }
02219     else
02220         return false;
02221 }
02222 
02223 /*
02224  * When releasing a lock, decrement the child count on all ancestor
02225  * locks.
02226  *
02227  * This is called only when releasing a lock via
02228  * DeleteChildTargetLocks (i.e. when a lock becomes redundant because
02229  * we've acquired its parent, possibly due to promotion) or when a new
02230  * MVCC write lock makes the predicate lock unnecessary. There's no
02231  * point in calling it when locks are released at transaction end, as
02232  * this information is no longer needed.
02233  */
02234 static void
02235 DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag)
02236 {
02237     PREDICATELOCKTARGETTAG parenttag,
02238                 nexttag;
02239 
02240     parenttag = *targettag;
02241 
02242     while (GetParentPredicateLockTag(&parenttag, &nexttag))
02243     {
02244         uint32      targettaghash;
02245         LOCALPREDICATELOCK *parentlock,
02246                    *rmlock PG_USED_FOR_ASSERTS_ONLY;
02247 
02248         parenttag = nexttag;
02249         targettaghash = PredicateLockTargetTagHashCode(&parenttag);
02250         parentlock = (LOCALPREDICATELOCK *)
02251             hash_search_with_hash_value(LocalPredicateLockHash,
02252                                         &parenttag, targettaghash,
02253                                         HASH_FIND, NULL);
02254 
02255         /*
02256          * There's a small chance the parent lock doesn't exist in the lock
02257          * table. This can happen if we prematurely removed it because an
02258          * index split caused the child refcount to be off.
02259          */
02260         if (parentlock == NULL)
02261             continue;
02262 
02263         parentlock->childLocks--;
02264 
02265         /*
02266          * Under similar circumstances the parent lock's refcount might be
02267          * zero. This only happens if we're holding that lock (otherwise we
02268          * would have removed the entry).
02269          */
02270         if (parentlock->childLocks < 0)
02271         {
02272             Assert(parentlock->held);
02273             parentlock->childLocks = 0;
02274         }
02275 
02276         if ((parentlock->childLocks == 0) && (!parentlock->held))
02277         {
02278             rmlock = (LOCALPREDICATELOCK *)
02279                 hash_search_with_hash_value(LocalPredicateLockHash,
02280                                             &parenttag, targettaghash,
02281                                             HASH_REMOVE, NULL);
02282             Assert(rmlock == parentlock);
02283         }
02284     }
02285 }
02286 
02287 /*
02288  * Indicate that a predicate lock on the given target is held by the
02289  * specified transaction. Has no effect if the lock is already held.
02290  *
02291  * This updates the lock table and the sxact's lock list, and creates
02292  * the lock target if necessary, but does *not* do anything related to
02293  * granularity promotion or the local lock table. See
02294  * PredicateLockAcquire for that.
02295  */
02296 static void
02297 CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
02298                     uint32 targettaghash,
02299                     SERIALIZABLEXACT *sxact)
02300 {
02301     PREDICATELOCKTARGET *target;
02302     PREDICATELOCKTAG locktag;
02303     PREDICATELOCK *lock;
02304     LWLockId    partitionLock;
02305     bool        found;
02306 
02307     partitionLock = PredicateLockHashPartitionLock(targettaghash);
02308 
02309     LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
02310     LWLockAcquire(partitionLock, LW_EXCLUSIVE);
02311 
02312     /* Make sure that the target is represented. */
02313     target = (PREDICATELOCKTARGET *)
02314         hash_search_with_hash_value(PredicateLockTargetHash,
02315                                     targettag, targettaghash,
02316                                     HASH_ENTER_NULL, &found);
02317     if (!target)
02318         ereport(ERROR,
02319                 (errcode(ERRCODE_OUT_OF_MEMORY),
02320                  errmsg("out of shared memory"),
02321                  errhint("You might need to increase max_pred_locks_per_transaction.")));
02322     if (!found)
02323         SHMQueueInit(&(target->predicateLocks));
02324 
02325     /* We've got the sxact and target, make sure they're joined. */
02326     locktag.myTarget = target;
02327     locktag.myXact = sxact;
02328     lock = (PREDICATELOCK *)
02329         hash_search_with_hash_value(PredicateLockHash, &locktag,
02330             PredicateLockHashCodeFromTargetHashCode(&locktag, targettaghash),
02331                                     HASH_ENTER_NULL, &found);
02332     if (!lock)
02333         ereport(ERROR,
02334                 (errcode(ERRCODE_OUT_OF_MEMORY),
02335                  errmsg("out of shared memory"),
02336                  errhint("You might need to increase max_pred_locks_per_transaction.")));
02337 
02338     if (!found)
02339     {
02340         SHMQueueInsertBefore(&(target->predicateLocks), &(lock->targetLink));
02341         SHMQueueInsertBefore(&(sxact->predicateLocks),
02342                              &(lock->xactLink));
02343         lock->commitSeqNo = InvalidSerCommitSeqNo;
02344     }
02345 
02346     LWLockRelease(partitionLock);
02347     LWLockRelease(SerializablePredicateLockListLock);
02348 }
02349 
02350 /*
02351  * Acquire a predicate lock on the specified target for the current
02352  * connection if not already held. This updates the local lock table
02353  * and uses it to implement granularity promotion. It will consolidate
02354  * multiple locks into a coarser lock if warranted, and will release
02355  * any finer-grained locks covered by the new one.
02356  */
02357 static void
02358 PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
02359 {
02360     uint32      targettaghash;
02361     bool        found;
02362     LOCALPREDICATELOCK *locallock;
02363 
02364     /* Do we have the lock already, or a covering lock? */
02365     if (PredicateLockExists(targettag))
02366         return;
02367 
02368     if (CoarserLockCovers(targettag))
02369         return;
02370 
02371     /* the same hash and LW lock apply to the lock target and the local lock. */
02372     targettaghash = PredicateLockTargetTagHashCode(targettag);
02373 
02374     /* Acquire lock in local table */
02375     locallock = (LOCALPREDICATELOCK *)
02376         hash_search_with_hash_value(LocalPredicateLockHash,
02377                                     targettag, targettaghash,
02378                                     HASH_ENTER, &found);
02379     locallock->held = true;
02380     if (!found)
02381         locallock->childLocks = 0;
02382 
02383     /* Actually create the lock */
02384     CreatePredicateLock(targettag, targettaghash, MySerializableXact);
02385 
02386     /*
02387      * Lock has been acquired. Check whether it should be promoted to a
02388      * coarser granularity, or whether there are finer-granularity locks to
02389      * clean up.
02390      */
02391     if (CheckAndPromotePredicateLockRequest(targettag))
02392     {
02393         /*
02394          * Lock request was promoted to a coarser-granularity lock, and that
02395          * lock was acquired. It will delete this lock and any of its
02396          * children, so we're done.
02397          */
02398     }
02399     else
02400     {
02401         /* Clean up any finer-granularity locks */
02402         if (GET_PREDICATELOCKTARGETTAG_TYPE(*targettag) != PREDLOCKTAG_TUPLE)
02403             DeleteChildTargetLocks(targettag);
02404     }
02405 }
02406 
02407 
02408 /*
02409  *      PredicateLockRelation
02410  *
02411  * Gets a predicate lock at the relation level.
02412  * Skip if not in full serializable transaction isolation level.
02413  * Skip if this is a temporary table.
02414  * Clear any finer-grained predicate locks this session has on the relation.
02415  */
02416 void
02417 PredicateLockRelation(Relation relation, Snapshot snapshot)
02418 {
02419     PREDICATELOCKTARGETTAG tag;
02420 
02421     if (!SerializationNeededForRead(relation, snapshot))
02422         return;
02423 
02424     SET_PREDICATELOCKTARGETTAG_RELATION(tag,
02425                                         relation->rd_node.dbNode,
02426                                         relation->rd_id);
02427     PredicateLockAcquire(&tag);
02428 }
02429 
02430 /*
02431  *      PredicateLockPage
02432  *
02433  * Gets a predicate lock at the page level.
02434  * Skip if not in full serializable transaction isolation level.
02435  * Skip if this is a temporary table.
02436  * Skip if a coarser predicate lock already covers this page.
02437  * Clear any finer-grained predicate locks this session has on the relation.
02438  */
02439 void
02440 PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot)
02441 {
02442     PREDICATELOCKTARGETTAG tag;
02443 
02444     if (!SerializationNeededForRead(relation, snapshot))
02445         return;
02446 
02447     SET_PREDICATELOCKTARGETTAG_PAGE(tag,
02448                                     relation->rd_node.dbNode,
02449                                     relation->rd_id,
02450                                     blkno);
02451     PredicateLockAcquire(&tag);
02452 }
02453 
02454 /*
02455  *      PredicateLockTuple
02456  *
02457  * Gets a predicate lock at the tuple level.
02458  * Skip if not in full serializable transaction isolation level.
02459  * Skip if this is a temporary table.
02460  */
02461 void
02462 PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snapshot)
02463 {
02464     PREDICATELOCKTARGETTAG tag;
02465     ItemPointer tid;
02466     TransactionId targetxmin;
02467 
02468     if (!SerializationNeededForRead(relation, snapshot))
02469         return;
02470 
02471     /*
02472      * If it's a heap tuple, return if this xact wrote it.
02473      */
02474     if (relation->rd_index == NULL)
02475     {
02476         TransactionId myxid;
02477 
02478         targetxmin = HeapTupleHeaderGetXmin(tuple->t_data);
02479 
02480         myxid = GetTopTransactionIdIfAny();
02481         if (TransactionIdIsValid(myxid))
02482         {
02483             if (TransactionIdFollowsOrEquals(targetxmin, TransactionXmin))
02484             {
02485                 TransactionId xid = SubTransGetTopmostTransaction(targetxmin);
02486 
02487                 if (TransactionIdEquals(xid, myxid))
02488                 {
02489                     /* We wrote it; we already have a write lock. */
02490                     return;
02491                 }
02492             }
02493         }
02494     }
02495     else
02496         targetxmin = InvalidTransactionId;
02497 
02498     /*
02499      * Do quick-but-not-definitive test for a relation lock first.  This will
02500      * never cause a return when the relation is *not* locked, but will
02501      * occasionally let the check continue when there really *is* a relation
02502      * level lock.
02503      */
02504     SET_PREDICATELOCKTARGETTAG_RELATION(tag,
02505                                         relation->rd_node.dbNode,
02506                                         relation->rd_id);
02507     if (PredicateLockExists(&tag))
02508         return;
02509 
02510     tid = &(tuple->t_self);
02511     SET_PREDICATELOCKTARGETTAG_TUPLE(tag,
02512                                      relation->rd_node.dbNode,
02513                                      relation->rd_id,
02514                                      ItemPointerGetBlockNumber(tid),
02515                                      ItemPointerGetOffsetNumber(tid),
02516                                      targetxmin);
02517     PredicateLockAcquire(&tag);
02518 }
02519 
02520 
02521 /*
02522  *      DeleteLockTarget
02523  *
02524  * Remove a predicate lock target along with any locks held for it.
02525  *
02526  * Caller must hold SerializablePredicateLockListLock and the
02527  * appropriate hash partition lock for the target.
02528  */
02529 static void
02530 DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
02531 {
02532     PREDICATELOCK *predlock;
02533     SHM_QUEUE  *predlocktargetlink;
02534     PREDICATELOCK *nextpredlock;
02535     bool        found;
02536 
02537     Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
02538     Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
02539 
02540     predlock = (PREDICATELOCK *)
02541         SHMQueueNext(&(target->predicateLocks),
02542                      &(target->predicateLocks),
02543                      offsetof(PREDICATELOCK, targetLink));
02544     LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
02545     while (predlock)
02546     {
02547         predlocktargetlink = &(predlock->targetLink);
02548         nextpredlock = (PREDICATELOCK *)
02549             SHMQueueNext(&(target->predicateLocks),
02550                          predlocktargetlink,
02551                          offsetof(PREDICATELOCK, targetLink));
02552 
02553         SHMQueueDelete(&(predlock->xactLink));
02554         SHMQueueDelete(&(predlock->targetLink));
02555 
02556         hash_search_with_hash_value
02557             (PredicateLockHash,
02558              &predlock->tag,
02559              PredicateLockHashCodeFromTargetHashCode(&predlock->tag,
02560                                                      targettaghash),
02561              HASH_REMOVE, &found);
02562         Assert(found);
02563 
02564         predlock = nextpredlock;
02565     }
02566     LWLockRelease(SerializableXactHashLock);
02567 
02568     /* Remove the target itself, if possible. */
02569     RemoveTargetIfNoLongerUsed(target, targettaghash);
02570 }
02571 
02572 
02573 /*
02574  *      TransferPredicateLocksToNewTarget
02575  *
02576  * Move or copy all the predicate locks for a lock target, for use by
02577  * index page splits/combines and other things that create or replace
02578  * lock targets. If 'removeOld' is true, the old locks and the target
02579  * will be removed.
02580  *
02581  * Returns true on success, or false if we ran out of shared memory to
02582  * allocate the new target or locks. Guaranteed to always succeed if
02583  * removeOld is set (by using the scratch entry in PredicateLockTargetHash
02584  * for scratch space).
02585  *
02586  * Warning: the "removeOld" option should be used only with care,
02587  * because this function does not (indeed, can not) update other
02588  * backends' LocalPredicateLockHash. If we are only adding new
02589  * entries, this is not a problem: the local lock table is used only
02590  * as a hint, so missing entries for locks that are held are
02591  * OK. Having entries for locks that are no longer held, as can happen
02592  * when using "removeOld", is not in general OK. We can only use it
02593  * safely when replacing a lock with a coarser-granularity lock that
02594  * covers it, or if we are absolutely certain that no one will need to
02595  * refer to that lock in the future.
02596  *
02597  * Caller must hold SerializablePredicateLockListLock.
02598  */
02599 static bool
02600 TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
02601                                   PREDICATELOCKTARGETTAG newtargettag,
02602                                   bool removeOld)
02603 {
02604     uint32      oldtargettaghash;
02605     LWLockId    oldpartitionLock;
02606     PREDICATELOCKTARGET *oldtarget;
02607     uint32      newtargettaghash;
02608     LWLockId    newpartitionLock;
02609     bool        found;
02610     bool        outOfShmem = false;
02611 
02612     Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
02613 
02614     oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
02615     newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
02616     oldpartitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
02617     newpartitionLock = PredicateLockHashPartitionLock(newtargettaghash);
02618 
02619     if (removeOld)
02620     {
02621         /*
02622          * Remove the dummy entry to give us scratch space, so we know we'll
02623          * be able to create the new lock target.
02624          */
02625         RemoveScratchTarget(false);
02626     }
02627 
02628     /*
02629      * We must get the partition locks in ascending sequence to avoid
02630      * deadlocks. If old and new partitions are the same, we must request the
02631      * lock only once.
02632      */
02633     if (oldpartitionLock < newpartitionLock)
02634     {
02635         LWLockAcquire(oldpartitionLock,
02636                       (removeOld ? LW_EXCLUSIVE : LW_SHARED));
02637         LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
02638     }
02639     else if (oldpartitionLock > newpartitionLock)
02640     {
02641         LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
02642         LWLockAcquire(oldpartitionLock,
02643                       (removeOld ? LW_EXCLUSIVE : LW_SHARED));
02644     }
02645     else
02646         LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
02647 
02648     /*
02649      * Look for the old target.  If not found, that's OK; no predicate locks
02650      * are affected, so we can just clean up and return. If it does exist,
02651      * walk its list of predicate locks and move or copy them to the new
02652      * target.
02653      */
02654     oldtarget = hash_search_with_hash_value(PredicateLockTargetHash,
02655                                             &oldtargettag,
02656                                             oldtargettaghash,
02657                                             HASH_FIND, NULL);
02658 
02659     if (oldtarget)
02660     {
02661         PREDICATELOCKTARGET *newtarget;
02662         PREDICATELOCK *oldpredlock;
02663         PREDICATELOCKTAG newpredlocktag;
02664 
02665         newtarget = hash_search_with_hash_value(PredicateLockTargetHash,
02666                                                 &newtargettag,
02667                                                 newtargettaghash,
02668                                                 HASH_ENTER_NULL, &found);
02669 
02670         if (!newtarget)
02671         {
02672             /* Failed to allocate due to insufficient shmem */
02673             outOfShmem = true;
02674             goto exit;
02675         }
02676 
02677         /* If we created a new entry, initialize it */
02678         if (!found)
02679             SHMQueueInit(&(newtarget->predicateLocks));
02680 
02681         newpredlocktag.myTarget = newtarget;
02682 
02683         /*
02684          * Loop through all the locks on the old target, replacing them with
02685          * locks on the new target.
02686          */
02687         oldpredlock = (PREDICATELOCK *)
02688             SHMQueueNext(&(oldtarget->predicateLocks),
02689                          &(oldtarget->predicateLocks),
02690                          offsetof(PREDICATELOCK, targetLink));
02691         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
02692         while (oldpredlock)
02693         {
02694             SHM_QUEUE  *predlocktargetlink;
02695             PREDICATELOCK *nextpredlock;
02696             PREDICATELOCK *newpredlock;
02697             SerCommitSeqNo oldCommitSeqNo = oldpredlock->commitSeqNo;
02698 
02699             predlocktargetlink = &(oldpredlock->targetLink);
02700             nextpredlock = (PREDICATELOCK *)
02701                 SHMQueueNext(&(oldtarget->predicateLocks),
02702                              predlocktargetlink,
02703                              offsetof(PREDICATELOCK, targetLink));
02704             newpredlocktag.myXact = oldpredlock->tag.myXact;
02705 
02706             if (removeOld)
02707             {
02708                 SHMQueueDelete(&(oldpredlock->xactLink));
02709                 SHMQueueDelete(&(oldpredlock->targetLink));
02710 
02711                 hash_search_with_hash_value
02712                     (PredicateLockHash,
02713                      &oldpredlock->tag,
02714                    PredicateLockHashCodeFromTargetHashCode(&oldpredlock->tag,
02715                                                            oldtargettaghash),
02716                      HASH_REMOVE, &found);
02717                 Assert(found);
02718             }
02719 
02720             newpredlock = (PREDICATELOCK *)
02721                 hash_search_with_hash_value(PredicateLockHash,
02722                                             &newpredlocktag,
02723                      PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
02724                                                            newtargettaghash),
02725                                             HASH_ENTER_NULL,
02726                                             &found);
02727             if (!newpredlock)
02728             {
02729                 /* Out of shared memory. Undo what we've done so far. */
02730                 LWLockRelease(SerializableXactHashLock);
02731                 DeleteLockTarget(newtarget, newtargettaghash);
02732                 outOfShmem = true;
02733                 goto exit;
02734             }
02735             if (!found)
02736             {
02737                 SHMQueueInsertBefore(&(newtarget->predicateLocks),
02738                                      &(newpredlock->targetLink));
02739                 SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
02740                                      &(newpredlock->xactLink));
02741                 newpredlock->commitSeqNo = oldCommitSeqNo;
02742             }
02743             else
02744             {
02745                 if (newpredlock->commitSeqNo < oldCommitSeqNo)
02746                     newpredlock->commitSeqNo = oldCommitSeqNo;
02747             }
02748 
02749             Assert(newpredlock->commitSeqNo != 0);
02750             Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
02751                    || (newpredlock->tag.myXact == OldCommittedSxact));
02752 
02753             oldpredlock = nextpredlock;
02754         }
02755         LWLockRelease(SerializableXactHashLock);
02756 
02757         if (removeOld)
02758         {
02759             Assert(SHMQueueEmpty(&oldtarget->predicateLocks));
02760             RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
02761         }
02762     }
02763 
02764 
02765 exit:
02766     /* Release partition locks in reverse order of acquisition. */
02767     if (oldpartitionLock < newpartitionLock)
02768     {
02769         LWLockRelease(newpartitionLock);
02770         LWLockRelease(oldpartitionLock);
02771     }
02772     else if (oldpartitionLock > newpartitionLock)
02773     {
02774         LWLockRelease(oldpartitionLock);
02775         LWLockRelease(newpartitionLock);
02776     }
02777     else
02778         LWLockRelease(newpartitionLock);
02779 
02780     if (removeOld)
02781     {
02782         /* We shouldn't run out of memory if we're moving locks */
02783         Assert(!outOfShmem);
02784 
02785         /* Put the scrach entry back */
02786         RestoreScratchTarget(false);
02787     }
02788 
02789     return !outOfShmem;
02790 }
02791 
02792 /*
02793  * Drop all predicate locks of any granularity from the specified relation,
02794  * which can be a heap relation or an index relation.  If 'transfer' is true,
02795  * acquire a relation lock on the heap for any transactions with any lock(s)
02796  * on the specified relation.
02797  *
02798  * This requires grabbing a lot of LW locks and scanning the entire lock
02799  * target table for matches.  That makes this more expensive than most
02800  * predicate lock management functions, but it will only be called for DDL
02801  * type commands that are expensive anyway, and there are fast returns when
02802  * no serializable transactions are active or the relation is temporary.
02803  *
02804  * We don't use the TransferPredicateLocksToNewTarget function because it
02805  * acquires its own locks on the partitions of the two targets involved,
02806  * and we'll already be holding all partition locks.
02807  *
02808  * We can't throw an error from here, because the call could be from a
02809  * transaction which is not serializable.
02810  *
02811  * NOTE: This is currently only called with transfer set to true, but that may
02812  * change.  If we decide to clean up the locks from a table on commit of a
02813  * transaction which executed DROP TABLE, the false condition will be useful.
02814  */
02815 static void
02816 DropAllPredicateLocksFromTable(Relation relation, bool transfer)
02817 {
02818     HASH_SEQ_STATUS seqstat;
02819     PREDICATELOCKTARGET *oldtarget;
02820     PREDICATELOCKTARGET *heaptarget;
02821     Oid         dbId;
02822     Oid         relId;
02823     Oid         heapId;
02824     int         i;
02825     bool        isIndex;
02826     bool        found;
02827     uint32      heaptargettaghash;
02828 
02829     /*
02830      * Bail out quickly if there are no serializable transactions running.
02831      * It's safe to check this without taking locks because the caller is
02832      * holding an ACCESS EXCLUSIVE lock on the relation.  No new locks which
02833      * would matter here can be acquired while that is held.
02834      */
02835     if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
02836         return;
02837 
02838     if (!PredicateLockingNeededForRelation(relation))
02839         return;
02840 
02841     dbId = relation->rd_node.dbNode;
02842     relId = relation->rd_id;
02843     if (relation->rd_index == NULL)
02844     {
02845         isIndex = false;
02846         heapId = relId;
02847     }
02848     else
02849     {
02850         isIndex = true;
02851         heapId = relation->rd_index->indrelid;
02852     }
02853     Assert(heapId != InvalidOid);
02854     Assert(transfer || !isIndex);       /* index OID only makes sense with
02855                                          * transfer */
02856 
02857     /* Retrieve first time needed, then keep. */
02858     heaptargettaghash = 0;
02859     heaptarget = NULL;
02860 
02861     /* Acquire locks on all lock partitions */
02862     LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
02863     for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
02864         LWLockAcquire(FirstPredicateLockMgrLock + i, LW_EXCLUSIVE);
02865     LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
02866 
02867     /*
02868      * Remove the dummy entry to give us scratch space, so we know we'll be
02869      * able to create the new lock target.
02870      */
02871     if (transfer)
02872         RemoveScratchTarget(true);
02873 
02874     /* Scan through target map */
02875     hash_seq_init(&seqstat, PredicateLockTargetHash);
02876 
02877     while ((oldtarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
02878     {
02879         PREDICATELOCK *oldpredlock;
02880 
02881         /*
02882          * Check whether this is a target which needs attention.
02883          */
02884         if (GET_PREDICATELOCKTARGETTAG_RELATION(oldtarget->tag) != relId)
02885             continue;           /* wrong relation id */
02886         if (GET_PREDICATELOCKTARGETTAG_DB(oldtarget->tag) != dbId)
02887             continue;           /* wrong database id */
02888         if (transfer && !isIndex
02889             && GET_PREDICATELOCKTARGETTAG_TYPE(oldtarget->tag) == PREDLOCKTAG_RELATION)
02890             continue;           /* already the right lock */
02891 
02892         /*
02893          * If we made it here, we have work to do.  We make sure the heap
02894          * relation lock exists, then we walk the list of predicate locks for
02895          * the old target we found, moving all locks to the heap relation lock
02896          * -- unless they already hold that.
02897          */
02898 
02899         /*
02900          * First make sure we have the heap relation target.  We only need to
02901          * do this once.
02902          */
02903         if (transfer && heaptarget == NULL)
02904         {
02905             PREDICATELOCKTARGETTAG heaptargettag;
02906 
02907             SET_PREDICATELOCKTARGETTAG_RELATION(heaptargettag, dbId, heapId);
02908             heaptargettaghash = PredicateLockTargetTagHashCode(&heaptargettag);
02909             heaptarget = hash_search_with_hash_value(PredicateLockTargetHash,
02910                                                      &heaptargettag,
02911                                                      heaptargettaghash,
02912                                                      HASH_ENTER, &found);
02913             if (!found)
02914                 SHMQueueInit(&heaptarget->predicateLocks);
02915         }
02916 
02917         /*
02918          * Loop through all the locks on the old target, replacing them with
02919          * locks on the new target.
02920          */
02921         oldpredlock = (PREDICATELOCK *)
02922             SHMQueueNext(&(oldtarget->predicateLocks),
02923                          &(oldtarget->predicateLocks),
02924                          offsetof(PREDICATELOCK, targetLink));
02925         while (oldpredlock)
02926         {
02927             PREDICATELOCK *nextpredlock;
02928             PREDICATELOCK *newpredlock;
02929             SerCommitSeqNo oldCommitSeqNo;
02930             SERIALIZABLEXACT *oldXact;
02931 
02932             nextpredlock = (PREDICATELOCK *)
02933                 SHMQueueNext(&(oldtarget->predicateLocks),
02934                              &(oldpredlock->targetLink),
02935                              offsetof(PREDICATELOCK, targetLink));
02936 
02937             /*
02938              * Remove the old lock first. This avoids the chance of running
02939              * out of lock structure entries for the hash table.
02940              */
02941             oldCommitSeqNo = oldpredlock->commitSeqNo;
02942             oldXact = oldpredlock->tag.myXact;
02943 
02944             SHMQueueDelete(&(oldpredlock->xactLink));
02945 
02946             /*
02947              * No need for retail delete from oldtarget list, we're removing
02948              * the whole target anyway.
02949              */
02950             hash_search(PredicateLockHash,
02951                         &oldpredlock->tag,
02952                         HASH_REMOVE, &found);
02953             Assert(found);
02954 
02955             if (transfer)
02956             {
02957                 PREDICATELOCKTAG newpredlocktag;
02958 
02959                 newpredlocktag.myTarget = heaptarget;
02960                 newpredlocktag.myXact = oldXact;
02961                 newpredlock = (PREDICATELOCK *)
02962                     hash_search_with_hash_value(PredicateLockHash,
02963                                                 &newpredlocktag,
02964                      PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
02965                                                           heaptargettaghash),
02966                                                 HASH_ENTER,
02967                                                 &found);
02968                 if (!found)
02969                 {
02970                     SHMQueueInsertBefore(&(heaptarget->predicateLocks),
02971                                          &(newpredlock->targetLink));
02972                     SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
02973                                          &(newpredlock->xactLink));
02974                     newpredlock->commitSeqNo = oldCommitSeqNo;
02975                 }
02976                 else
02977                 {
02978                     if (newpredlock->commitSeqNo < oldCommitSeqNo)
02979                         newpredlock->commitSeqNo = oldCommitSeqNo;
02980                 }
02981 
02982                 Assert(newpredlock->commitSeqNo != 0);
02983                 Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
02984                        || (newpredlock->tag.myXact == OldCommittedSxact));
02985             }
02986 
02987             oldpredlock = nextpredlock;
02988         }
02989 
02990         hash_search(PredicateLockTargetHash, &oldtarget->tag, HASH_REMOVE,
02991                     &found);
02992         Assert(found);
02993     }
02994 
02995     /* Put the scratch entry back */
02996     if (transfer)
02997         RestoreScratchTarget(true);
02998 
02999     /* Release locks in reverse order */
03000     LWLockRelease(SerializableXactHashLock);
03001     for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
03002         LWLockRelease(FirstPredicateLockMgrLock + i);
03003     LWLockRelease(SerializablePredicateLockListLock);
03004 }
03005 
03006 /*
03007  * TransferPredicateLocksToHeapRelation
03008  *      For all transactions, transfer all predicate locks for the given
03009  *      relation to a single relation lock on the heap.
03010  */
03011 void
03012 TransferPredicateLocksToHeapRelation(Relation relation)
03013 {
03014     DropAllPredicateLocksFromTable(relation, true);
03015 }
03016 
03017 
03018 /*
03019  *      PredicateLockPageSplit
03020  *
03021  * Copies any predicate locks for the old page to the new page.
03022  * Skip if this is a temporary table or toast table.
03023  *
03024  * NOTE: A page split (or overflow) affects all serializable transactions,
03025  * even if it occurs in the context of another transaction isolation level.
03026  *
03027  * NOTE: This currently leaves the local copy of the locks without
03028  * information on the new lock which is in shared memory.  This could cause
03029  * problems if enough page splits occur on locked pages without the processes
03030  * which hold the locks getting in and noticing.
03031  */
03032 void
03033 PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
03034                        BlockNumber newblkno)
03035 {
03036     PREDICATELOCKTARGETTAG oldtargettag;
03037     PREDICATELOCKTARGETTAG newtargettag;
03038     bool        success;
03039 
03040     /*
03041      * Bail out quickly if there are no serializable transactions running.
03042      *
03043      * It's safe to do this check without taking any additional locks. Even if
03044      * a serializable transaction starts concurrently, we know it can't take
03045      * any SIREAD locks on the page being split because the caller is holding
03046      * the associated buffer page lock. Memory reordering isn't an issue; the
03047      * memory barrier in the LWLock acquisition guarantees that this read
03048      * occurs while the buffer page lock is held.
03049      */
03050     if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
03051         return;
03052 
03053     if (!PredicateLockingNeededForRelation(relation))
03054         return;
03055 
03056     Assert(oldblkno != newblkno);
03057     Assert(BlockNumberIsValid(oldblkno));
03058     Assert(BlockNumberIsValid(newblkno));
03059 
03060     SET_PREDICATELOCKTARGETTAG_PAGE(oldtargettag,
03061                                     relation->rd_node.dbNode,
03062                                     relation->rd_id,
03063                                     oldblkno);
03064     SET_PREDICATELOCKTARGETTAG_PAGE(newtargettag,
03065                                     relation->rd_node.dbNode,
03066                                     relation->rd_id,
03067                                     newblkno);
03068 
03069     LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
03070 
03071     /*
03072      * Try copying the locks over to the new page's tag, creating it if
03073      * necessary.
03074      */
03075     success = TransferPredicateLocksToNewTarget(oldtargettag,
03076                                                 newtargettag,
03077                                                 false);
03078 
03079     if (!success)
03080     {
03081         /*
03082          * No more predicate lock entries are available. Failure isn't an
03083          * option here, so promote the page lock to a relation lock.
03084          */
03085 
03086         /* Get the parent relation lock's lock tag */
03087         success = GetParentPredicateLockTag(&oldtargettag,
03088                                             &newtargettag);
03089         Assert(success);
03090 
03091         /*
03092          * Move the locks to the parent. This shouldn't fail.
03093          *
03094          * Note that here we are removing locks held by other backends,
03095          * leading to a possible inconsistency in their local lock hash table.
03096          * This is OK because we're replacing it with a lock that covers the
03097          * old one.
03098          */
03099         success = TransferPredicateLocksToNewTarget(oldtargettag,
03100                                                     newtargettag,
03101                                                     true);
03102         Assert(success);
03103     }
03104 
03105     LWLockRelease(SerializablePredicateLockListLock);
03106 }
03107 
03108 /*
03109  *      PredicateLockPageCombine
03110  *
03111  * Combines predicate locks for two existing pages.
03112  * Skip if this is a temporary table or toast table.
03113  *
03114  * NOTE: A page combine affects all serializable transactions, even if it
03115  * occurs in the context of another transaction isolation level.
03116  */
03117 void
03118 PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
03119                          BlockNumber newblkno)
03120 {
03121     /*
03122      * Page combines differ from page splits in that we ought to be able to
03123      * remove the locks on the old page after transferring them to the new
03124      * page, instead of duplicating them. However, because we can't edit other
03125      * backends' local lock tables, removing the old lock would leave them
03126      * with an entry in their LocalPredicateLockHash for a lock they're not
03127      * holding, which isn't acceptable. So we wind up having to do the same
03128      * work as a page split, acquiring a lock on the new page and keeping the
03129      * old page locked too. That can lead to some false positives, but should
03130      * be rare in practice.
03131      */
03132     PredicateLockPageSplit(relation, oldblkno, newblkno);
03133 }
03134 
03135 /*
03136  * Walk the list of in-progress serializable transactions and find the new
03137  * xmin.
03138  */
03139 static void
03140 SetNewSxactGlobalXmin(void)
03141 {
03142     SERIALIZABLEXACT *sxact;
03143 
03144     Assert(LWLockHeldByMe(SerializableXactHashLock));
03145 
03146     PredXact->SxactGlobalXmin = InvalidTransactionId;
03147     PredXact->SxactGlobalXminCount = 0;
03148 
03149     for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
03150     {
03151         if (!SxactIsRolledBack(sxact)
03152             && !SxactIsCommitted(sxact)
03153             && sxact != OldCommittedSxact)
03154         {
03155             Assert(sxact->xmin != InvalidTransactionId);
03156             if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
03157                 || TransactionIdPrecedes(sxact->xmin,
03158                                          PredXact->SxactGlobalXmin))
03159             {
03160                 PredXact->SxactGlobalXmin = sxact->xmin;
03161                 PredXact->SxactGlobalXminCount = 1;
03162             }
03163             else if (TransactionIdEquals(sxact->xmin,
03164                                          PredXact->SxactGlobalXmin))
03165                 PredXact->SxactGlobalXminCount++;
03166         }
03167     }
03168 
03169     OldSerXidSetActiveSerXmin(PredXact->SxactGlobalXmin);
03170 }
03171 
03172 /*
03173  *      ReleasePredicateLocks
03174  *
03175  * Releases predicate locks based on completion of the current transaction,
03176  * whether committed or rolled back.  It can also be called for a read only
03177  * transaction when it becomes impossible for the transaction to become
03178  * part of a dangerous structure.
03179  *
03180  * We do nothing unless this is a serializable transaction.
03181  *
03182  * This method must ensure that shared memory hash tables are cleaned
03183  * up in some relatively timely fashion.
03184  *
03185  * If this transaction is committing and is holding any predicate locks,
03186  * it must be added to a list of completed serializable transactions still
03187  * holding locks.
03188  */
03189 void
03190 ReleasePredicateLocks(bool isCommit)
03191 {
03192     bool        needToClear;
03193     RWConflict  conflict,
03194                 nextConflict,
03195                 possibleUnsafeConflict;
03196     SERIALIZABLEXACT *roXact;
03197 
03198     /*
03199      * We can't trust XactReadOnly here, because a transaction which started
03200      * as READ WRITE can show as READ ONLY later, e.g., within
03201      * substransactions.  We want to flag a transaction as READ ONLY if it
03202      * commits without writing so that de facto READ ONLY transactions get the
03203      * benefit of some RO optimizations, so we will use this local variable to
03204      * get some cleanup logic right which is based on whether the transaction
03205      * was declared READ ONLY at the top level.
03206      */
03207     bool        topLevelIsDeclaredReadOnly;
03208 
03209     if (MySerializableXact == InvalidSerializableXact)
03210     {
03211         Assert(LocalPredicateLockHash == NULL);
03212         return;
03213     }
03214 
03215     Assert(!isCommit || SxactIsPrepared(MySerializableXact));
03216     Assert(!isCommit || !SxactIsDoomed(MySerializableXact));
03217     Assert(!SxactIsCommitted(MySerializableXact));
03218     Assert(!SxactIsRolledBack(MySerializableXact));
03219 
03220     /* may not be serializable during COMMIT/ROLLBACK PREPARED */
03221     if (MySerializableXact->pid != 0)
03222         Assert(IsolationIsSerializable());
03223 
03224     /* We'd better not already be on the cleanup list. */
03225     Assert(!SxactIsOnFinishedList(MySerializableXact));
03226 
03227     topLevelIsDeclaredReadOnly = SxactIsReadOnly(MySerializableXact);
03228 
03229     LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
03230 
03231     /*
03232      * We don't hold XidGenLock lock here, assuming that TransactionId is
03233      * atomic!
03234      *
03235      * If this value is changing, we don't care that much whether we get the
03236      * old or new value -- it is just used to determine how far
03237      * GlobalSerizableXmin must advance before this transaction can be fully
03238      * cleaned up.  The worst that could happen is we wait for one more
03239      * transaction to complete before freeing some RAM; correctness of visible
03240      * behavior is not affected.
03241      */
03242     MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
03243 
03244     /*
03245      * If it's not a commit it's a rollback, and we can clear our locks
03246      * immediately.
03247      */
03248     if (isCommit)
03249     {
03250         MySerializableXact->flags |= SXACT_FLAG_COMMITTED;
03251         MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
03252         /* Recognize implicit read-only transaction (commit without write). */
03253         if (!MyXactDidWrite)
03254             MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
03255     }
03256     else
03257     {
03258         /*
03259          * The DOOMED flag indicates that we intend to roll back this
03260          * transaction and so it should not cause serialization failures for
03261          * other transactions that conflict with it. Note that this flag might
03262          * already be set, if another backend marked this transaction for
03263          * abort.
03264          *
03265          * The ROLLED_BACK flag further indicates that ReleasePredicateLocks
03266          * has been called, and so the SerializableXact is eligible for
03267          * cleanup. This means it should not be considered when calculating
03268          * SxactGlobalXmin.
03269          */
03270         MySerializableXact->flags |= SXACT_FLAG_DOOMED;
03271         MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
03272 
03273         /*
03274          * If the transaction was previously prepared, but is now failing due
03275          * to a ROLLBACK PREPARED or (hopefully very rare) error after the
03276          * prepare, clear the prepared flag.  This simplifies conflict
03277          * checking.
03278          */
03279         MySerializableXact->flags &= ~SXACT_FLAG_PREPARED;
03280     }
03281 
03282     if (!topLevelIsDeclaredReadOnly)
03283     {
03284         Assert(PredXact->WritableSxactCount > 0);
03285         if (--(PredXact->WritableSxactCount) == 0)
03286         {
03287             /*
03288              * Release predicate locks and rw-conflicts in for all committed
03289              * transactions.  There are no longer any transactions which might
03290              * conflict with the locks and no chance for new transactions to
03291              * overlap.  Similarly, existing conflicts in can't cause pivots,
03292              * and any conflicts in which could have completed a dangerous
03293              * structure would already have caused a rollback, so any
03294              * remaining ones must be benign.
03295              */
03296             PredXact->CanPartialClearThrough = PredXact->LastSxactCommitSeqNo;
03297         }
03298     }
03299     else
03300     {
03301         /*
03302          * Read-only transactions: clear the list of transactions that might
03303          * make us unsafe. Note that we use 'inLink' for the iteration as
03304          * opposed to 'outLink' for the r/w xacts.
03305          */
03306         possibleUnsafeConflict = (RWConflict)
03307             SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
03308                          &MySerializableXact->possibleUnsafeConflicts,
03309                          offsetof(RWConflictData, inLink));
03310         while (possibleUnsafeConflict)
03311         {
03312             nextConflict = (RWConflict)
03313                 SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
03314                              &possibleUnsafeConflict->inLink,
03315                              offsetof(RWConflictData, inLink));
03316 
03317             Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut));
03318             Assert(MySerializableXact == possibleUnsafeConflict->sxactIn);
03319 
03320             ReleaseRWConflict(possibleUnsafeConflict);
03321 
03322             possibleUnsafeConflict = nextConflict;
03323         }
03324     }
03325 
03326     /* Check for conflict out to old committed transactions. */
03327     if (isCommit
03328         && !SxactIsReadOnly(MySerializableXact)
03329         && SxactHasSummaryConflictOut(MySerializableXact))
03330     {
03331         /*
03332          * we don't know which old committed transaction we conflicted with,
03333          * so be conservative and use FirstNormalSerCommitSeqNo here
03334          */
03335         MySerializableXact->SeqNo.earliestOutConflictCommit =
03336             FirstNormalSerCommitSeqNo;
03337         MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
03338     }
03339 
03340     /*
03341      * Release all outConflicts to committed transactions.  If we're rolling
03342      * back clear them all.  Set SXACT_FLAG_CONFLICT_OUT if any point to
03343      * previously committed transactions.
03344      */
03345     conflict = (RWConflict)
03346         SHMQueueNext(&MySerializableXact->outConflicts,
03347                      &MySerializableXact->outConflicts,
03348                      offsetof(RWConflictData, outLink));
03349     while (conflict)
03350     {
03351         nextConflict = (RWConflict)
03352             SHMQueueNext(&MySerializableXact->outConflicts,
03353                          &conflict->outLink,
03354                          offsetof(RWConflictData, outLink));
03355 
03356         if (isCommit
03357             && !SxactIsReadOnly(MySerializableXact)
03358             && SxactIsCommitted(conflict->sxactIn))
03359         {
03360             if ((MySerializableXact->flags & SXACT_FLAG_CONFLICT_OUT) == 0
03361                 || conflict->sxactIn->prepareSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit)
03362                 MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->prepareSeqNo;
03363             MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
03364         }
03365 
03366         if (!isCommit
03367             || SxactIsCommitted(conflict->sxactIn)
03368             || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
03369             ReleaseRWConflict(conflict);
03370 
03371         conflict = nextConflict;
03372     }
03373 
03374     /*
03375      * Release all inConflicts from committed and read-only transactions. If
03376      * we're rolling back, clear them all.
03377      */
03378     conflict = (RWConflict)
03379         SHMQueueNext(&MySerializableXact->inConflicts,
03380                      &MySerializableXact->inConflicts,
03381                      offsetof(RWConflictData, inLink));
03382     while (conflict)
03383     {
03384         nextConflict = (RWConflict)
03385             SHMQueueNext(&MySerializableXact->inConflicts,
03386                          &conflict->inLink,
03387                          offsetof(RWConflictData, inLink));
03388 
03389         if (!isCommit
03390             || SxactIsCommitted(conflict->sxactOut)
03391             || SxactIsReadOnly(conflict->sxactOut))
03392             ReleaseRWConflict(conflict);
03393 
03394         conflict = nextConflict;
03395     }
03396 
03397     if (!topLevelIsDeclaredReadOnly)
03398     {
03399         /*
03400          * Remove ourselves from the list of possible conflicts for concurrent
03401          * READ ONLY transactions, flagging them as unsafe if we have a
03402          * conflict out. If any are waiting DEFERRABLE transactions, wake them
03403          * up if they are known safe or known unsafe.
03404          */
03405         possibleUnsafeConflict = (RWConflict)
03406             SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
03407                          &MySerializableXact->possibleUnsafeConflicts,
03408                          offsetof(RWConflictData, outLink));
03409         while (possibleUnsafeConflict)
03410         {
03411             nextConflict = (RWConflict)
03412                 SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
03413                              &possibleUnsafeConflict->outLink,
03414                              offsetof(RWConflictData, outLink));
03415 
03416             roXact = possibleUnsafeConflict->sxactIn;
03417             Assert(MySerializableXact == possibleUnsafeConflict->sxactOut);
03418             Assert(SxactIsReadOnly(roXact));
03419 
03420             /* Mark conflicted if necessary. */
03421             if (isCommit
03422                 && MyXactDidWrite
03423                 && SxactHasConflictOut(MySerializableXact)
03424                 && (MySerializableXact->SeqNo.earliestOutConflictCommit
03425                     <= roXact->SeqNo.lastCommitBeforeSnapshot))
03426             {
03427                 /*
03428                  * This releases possibleUnsafeConflict (as well as all other
03429                  * possible conflicts for roXact)
03430                  */
03431                 FlagSxactUnsafe(roXact);
03432             }
03433             else
03434             {
03435                 ReleaseRWConflict(possibleUnsafeConflict);
03436 
03437                 /*
03438                  * If we were the last possible conflict, flag it safe. The
03439                  * transaction can now safely release its predicate locks (but
03440                  * that transaction's backend has to do that itself).
03441                  */
03442                 if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts))
03443                     roXact->flags |= SXACT_FLAG_RO_SAFE;
03444             }
03445 
03446             /*
03447              * Wake up the process for a waiting DEFERRABLE transaction if we
03448              * now know it's either safe or conflicted.
03449              */
03450             if (SxactIsDeferrableWaiting(roXact) &&
03451                 (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
03452                 ProcSendSignal(roXact->pid);
03453 
03454             possibleUnsafeConflict = nextConflict;
03455         }
03456     }
03457 
03458     /*
03459      * Check whether it's time to clean up old transactions. This can only be
03460      * done when the last serializable transaction with the oldest xmin among
03461      * serializable transactions completes.  We then find the "new oldest"
03462      * xmin and purge any transactions which finished before this transaction
03463      * was launched.
03464      */
03465     needToClear = false;
03466     if (TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
03467     {
03468         Assert(PredXact->SxactGlobalXminCount > 0);
03469         if (--(PredXact->SxactGlobalXminCount) == 0)
03470         {
03471             SetNewSxactGlobalXmin();
03472             needToClear = true;
03473         }
03474     }
03475 
03476     LWLockRelease(SerializableXactHashLock);
03477 
03478     LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
03479 
03480     /* Add this to the list of transactions to check for later cleanup. */
03481     if (isCommit)
03482         SHMQueueInsertBefore(FinishedSerializableTransactions,
03483                              &MySerializableXact->finishedLink);
03484 
03485     if (!isCommit)
03486         ReleaseOneSerializableXact(MySerializableXact, false, false);
03487 
03488     LWLockRelease(SerializableFinishedListLock);
03489 
03490     if (needToClear)
03491         ClearOldPredicateLocks();
03492 
03493     MySerializableXact = InvalidSerializableXact;
03494     MyXactDidWrite = false;
03495 
03496     /* Delete per-transaction lock table */
03497     if (LocalPredicateLockHash != NULL)
03498     {
03499         hash_destroy(LocalPredicateLockHash);
03500         LocalPredicateLockHash = NULL;
03501     }
03502 }
03503 
03504 /*
03505  * Clear old predicate locks, belonging to committed transactions that are no
03506  * longer interesting to any in-progress transaction.
03507  */
03508 static void
03509 ClearOldPredicateLocks(void)
03510 {
03511     SERIALIZABLEXACT *finishedSxact;
03512     PREDICATELOCK *predlock;
03513 
03514     /*
03515      * Loop through finished transactions. They are in commit order, so we can
03516      * stop as soon as we find one that's still interesting.
03517      */
03518     LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
03519     finishedSxact = (SERIALIZABLEXACT *)
03520         SHMQueueNext(FinishedSerializableTransactions,
03521                      FinishedSerializableTransactions,
03522                      offsetof(SERIALIZABLEXACT, finishedLink));
03523     LWLockAcquire(SerializableXactHashLock, LW_SHARED);
03524     while (finishedSxact)
03525     {
03526         SERIALIZABLEXACT *nextSxact;
03527 
03528         nextSxact = (SERIALIZABLEXACT *)
03529             SHMQueueNext(FinishedSerializableTransactions,
03530                          &(finishedSxact->finishedLink),
03531                          offsetof(SERIALIZABLEXACT, finishedLink));
03532         if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
03533             || TransactionIdPrecedesOrEquals(finishedSxact->finishedBefore,
03534                                              PredXact->SxactGlobalXmin))
03535         {
03536             /*
03537              * This transaction committed before any in-progress transaction
03538              * took its snapshot. It's no longer interesting.
03539              */
03540             LWLockRelease(SerializableXactHashLock);
03541             SHMQueueDelete(&(finishedSxact->finishedLink));
03542             ReleaseOneSerializableXact(finishedSxact, false, false);
03543             LWLockAcquire(SerializableXactHashLock, LW_SHARED);
03544         }
03545         else if (finishedSxact->commitSeqNo > PredXact->HavePartialClearedThrough
03546            && finishedSxact->commitSeqNo <= PredXact->CanPartialClearThrough)
03547         {
03548             /*
03549              * Any active transactions that took their snapshot before this
03550              * transaction committed are read-only, so we can clear part of
03551              * its state.
03552              */
03553             LWLockRelease(SerializableXactHashLock);
03554 
03555             if (SxactIsReadOnly(finishedSxact))
03556             {
03557                 /* A read-only transaction can be removed entirely */
03558                 SHMQueueDelete(&(finishedSxact->finishedLink));
03559                 ReleaseOneSerializableXact(finishedSxact, false, false);
03560             }
03561             else
03562             {
03563                 /*
03564                  * A read-write transaction can only be partially cleared. We
03565                  * need to keep the SERIALIZABLEXACT but can release the
03566                  * SIREAD locks and conflicts in.
03567                  */
03568                 ReleaseOneSerializableXact(finishedSxact, true, false);
03569             }
03570 
03571             PredXact->HavePartialClearedThrough = finishedSxact->commitSeqNo;
03572             LWLockAcquire(SerializableXactHashLock, LW_SHARED);
03573         }
03574         else
03575         {
03576             /* Still interesting. */
03577             break;
03578         }
03579         finishedSxact = nextSxact;
03580     }
03581     LWLockRelease(SerializableXactHashLock);
03582 
03583     /*
03584      * Loop through predicate locks on dummy transaction for summarized data.
03585      */
03586     LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
03587     predlock = (PREDICATELOCK *)
03588         SHMQueueNext(&OldCommittedSxact->predicateLocks,
03589                      &OldCommittedSxact->predicateLocks,
03590                      offsetof(PREDICATELOCK, xactLink));
03591     while (predlock)
03592     {
03593         PREDICATELOCK *nextpredlock;
03594         bool        canDoPartialCleanup;
03595 
03596         nextpredlock = (PREDICATELOCK *)
03597             SHMQueueNext(&OldCommittedSxact->predicateLocks,
03598                          &predlock->xactLink,
03599                          offsetof(PREDICATELOCK, xactLink));
03600 
03601         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
03602         Assert(predlock->commitSeqNo != 0);
03603         Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
03604         canDoPartialCleanup = (predlock->commitSeqNo <= PredXact->CanPartialClearThrough);
03605         LWLockRelease(SerializableXactHashLock);
03606 
03607         /*
03608          * If this lock originally belonged to an old enough transaction, we
03609          * can release it.
03610          */
03611         if (canDoPartialCleanup)
03612         {
03613             PREDICATELOCKTAG tag;
03614             PREDICATELOCKTARGET *target;
03615             PREDICATELOCKTARGETTAG targettag;
03616             uint32      targettaghash;
03617             LWLockId    partitionLock;
03618 
03619             tag = predlock->tag;
03620             target = tag.myTarget;
03621             targettag = target->tag;
03622             targettaghash = PredicateLockTargetTagHashCode(&targettag);
03623             partitionLock = PredicateLockHashPartitionLock(targettaghash);
03624 
03625             LWLockAcquire(partitionLock, LW_EXCLUSIVE);
03626 
03627             SHMQueueDelete(&(predlock->targetLink));
03628             SHMQueueDelete(&(predlock->xactLink));
03629 
03630             hash_search_with_hash_value(PredicateLockHash, &tag,
03631                                 PredicateLockHashCodeFromTargetHashCode(&tag,
03632                                                               targettaghash),
03633                                         HASH_REMOVE, NULL);
03634             RemoveTargetIfNoLongerUsed(target, targettaghash);
03635 
03636             LWLockRelease(partitionLock);
03637         }
03638 
03639         predlock = nextpredlock;
03640     }
03641 
03642     LWLockRelease(SerializablePredicateLockListLock);
03643     LWLockRelease(SerializableFinishedListLock);
03644 }
03645 
03646 /*
03647  * This is the normal way to delete anything from any of the predicate
03648  * locking hash tables.  Given a transaction which we know can be deleted:
03649  * delete all predicate locks held by that transaction and any predicate
03650  * lock targets which are now unreferenced by a lock; delete all conflicts
03651  * for the transaction; delete all xid values for the transaction; then
03652  * delete the transaction.
03653  *
03654  * When the partial flag is set, we can release all predicate locks and
03655  * in-conflict information -- we've established that there are no longer
03656  * any overlapping read write transactions for which this transaction could
03657  * matter -- but keep the transaction entry itself and any outConflicts.
03658  *
03659  * When the summarize flag is set, we've run short of room for sxact data
03660  * and must summarize to the SLRU.  Predicate locks are transferred to a
03661  * dummy "old" transaction, with duplicate locks on a single target
03662  * collapsing to a single lock with the "latest" commitSeqNo from among
03663  * the conflicting locks..
03664  */
03665 static void
03666 ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
03667                            bool summarize)
03668 {
03669     PREDICATELOCK *predlock;
03670     SERIALIZABLEXIDTAG sxidtag;
03671     RWConflict  conflict,
03672                 nextConflict;
03673 
03674     Assert(sxact != NULL);
03675     Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
03676     Assert(partial || !SxactIsOnFinishedList(sxact));
03677     Assert(LWLockHeldByMe(SerializableFinishedListLock));
03678 
03679     /*
03680      * First release all the predicate locks held by this xact (or transfer
03681      * them to OldCommittedSxact if summarize is true)
03682      */
03683     LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
03684     predlock = (PREDICATELOCK *)
03685         SHMQueueNext(&(sxact->predicateLocks),
03686                      &(sxact->predicateLocks),
03687                      offsetof(PREDICATELOCK, xactLink));
03688     while (predlock)
03689     {
03690         PREDICATELOCK *nextpredlock;
03691         PREDICATELOCKTAG tag;
03692         SHM_QUEUE  *targetLink;
03693         PREDICATELOCKTARGET *target;
03694         PREDICATELOCKTARGETTAG targettag;
03695         uint32      targettaghash;
03696         LWLockId    partitionLock;
03697 
03698         nextpredlock = (PREDICATELOCK *)
03699             SHMQueueNext(&(sxact->predicateLocks),
03700                          &(predlock->xactLink),
03701                          offsetof(PREDICATELOCK, xactLink));
03702 
03703         tag = predlock->tag;
03704         targetLink = &(predlock->targetLink);
03705         target = tag.myTarget;
03706         targettag = target->tag;
03707         targettaghash = PredicateLockTargetTagHashCode(&targettag);
03708         partitionLock = PredicateLockHashPartitionLock(targettaghash);
03709 
03710         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
03711 
03712         SHMQueueDelete(targetLink);
03713 
03714         hash_search_with_hash_value(PredicateLockHash, &tag,
03715                                 PredicateLockHashCodeFromTargetHashCode(&tag,
03716                                                               targettaghash),
03717                                     HASH_REMOVE, NULL);
03718         if (summarize)
03719         {
03720             bool        found;
03721 
03722             /* Fold into dummy transaction list. */
03723             tag.myXact = OldCommittedSxact;
03724             predlock = hash_search_with_hash_value(PredicateLockHash, &tag,
03725                                 PredicateLockHashCodeFromTargetHashCode(&tag,
03726                                                               targettaghash),
03727                                                    HASH_ENTER_NULL, &found);
03728             if (!predlock)
03729                 ereport(ERROR,
03730                         (errcode(ERRCODE_OUT_OF_MEMORY),
03731                          errmsg("out of shared memory"),
03732                          errhint("You might need to increase max_pred_locks_per_transaction.")));
03733             if (found)
03734             {
03735                 Assert(predlock->commitSeqNo != 0);
03736                 Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
03737                 if (predlock->commitSeqNo < sxact->commitSeqNo)
03738                     predlock->commitSeqNo = sxact->commitSeqNo;
03739             }
03740             else
03741             {
03742                 SHMQueueInsertBefore(&(target->predicateLocks),
03743                                      &(predlock->targetLink));
03744                 SHMQueueInsertBefore(&(OldCommittedSxact->predicateLocks),
03745                                      &(predlock->xactLink));
03746                 predlock->commitSeqNo = sxact->commitSeqNo;
03747             }
03748         }
03749         else
03750             RemoveTargetIfNoLongerUsed(target, targettaghash);
03751 
03752         LWLockRelease(partitionLock);
03753 
03754         predlock = nextpredlock;
03755     }
03756 
03757     /*
03758      * Rather than retail removal, just re-init the head after we've run
03759      * through the list.
03760      */
03761     SHMQueueInit(&sxact->predicateLocks);
03762 
03763     LWLockRelease(SerializablePredicateLockListLock);
03764 
03765     sxidtag.xid = sxact->topXid;
03766     LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
03767 
03768     /* Release all outConflicts (unless 'partial' is true) */
03769     if (!partial)
03770     {
03771         conflict = (RWConflict)
03772             SHMQueueNext(&sxact->outConflicts,
03773                          &sxact->outConflicts,
03774                          offsetof(RWConflictData, outLink));
03775         while (conflict)
03776         {
03777             nextConflict = (RWConflict)
03778                 SHMQueueNext(&sxact->outConflicts,
03779                              &conflict->outLink,
03780                              offsetof(RWConflictData, outLink));
03781             if (summarize)
03782                 conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
03783             ReleaseRWConflict(conflict);
03784             conflict = nextConflict;
03785         }
03786     }
03787 
03788     /* Release all inConflicts. */
03789     conflict = (RWConflict)
03790         SHMQueueNext(&sxact->inConflicts,
03791                      &sxact->inConflicts,
03792                      offsetof(RWConflictData, inLink));
03793     while (conflict)
03794     {
03795         nextConflict = (RWConflict)
03796             SHMQueueNext(&sxact->inConflicts,
03797                          &conflict->inLink,
03798                          offsetof(RWConflictData, inLink));
03799         if (summarize)
03800             conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
03801         ReleaseRWConflict(conflict);
03802         conflict = nextConflict;
03803     }
03804 
03805     /* Finally, get rid of the xid and the record of the transaction itself. */
03806     if (!partial)
03807     {
03808         if (sxidtag.xid != InvalidTransactionId)
03809             hash_search(SerializableXidHash, &sxidtag, HASH_REMOVE, NULL);
03810         ReleasePredXact(sxact);
03811     }
03812 
03813     LWLockRelease(SerializableXactHashLock);
03814 }
03815 
03816 /*
03817  * Tests whether the given top level transaction is concurrent with
03818  * (overlaps) our current transaction.
03819  *
03820  * We need to identify the top level transaction for SSI, anyway, so pass
03821  * that to this function to save the overhead of checking the snapshot's
03822  * subxip array.
03823  */
03824 static bool
03825 XidIsConcurrent(TransactionId xid)
03826 {
03827     Snapshot    snap;
03828     uint32      i;
03829 
03830     Assert(TransactionIdIsValid(xid));
03831     Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
03832 
03833     snap = GetTransactionSnapshot();
03834 
03835     if (TransactionIdPrecedes(xid, snap->xmin))
03836         return false;
03837 
03838     if (TransactionIdFollowsOrEquals(xid, snap->xmax))
03839         return true;
03840 
03841     for (i = 0; i < snap->xcnt; i++)
03842     {
03843         if (xid == snap->xip[i])
03844             return true;
03845     }
03846 
03847     return false;
03848 }
03849 
03850 /*
03851  * CheckForSerializableConflictOut
03852  *      We are reading a tuple which has been modified.  If it is visible to
03853  *      us but has been deleted, that indicates a rw-conflict out.  If it's
03854  *      not visible and was created by a concurrent (overlapping)
03855  *      serializable transaction, that is also a rw-conflict out,
03856  *
03857  * We will determine the top level xid of the writing transaction with which
03858  * we may be in conflict, and check for overlap with our own transaction.
03859  * If the transactions overlap (i.e., they cannot see each other's writes),
03860  * then we have a conflict out.
03861  *
03862  * This function should be called just about anywhere in heapam.c where a
03863  * tuple has been read. The caller must hold at least a shared lock on the
03864  * buffer, because this function might set hint bits on the tuple. There is
03865  * currently no known reason to call this function from an index AM.
03866  */
03867 void
03868 CheckForSerializableConflictOut(bool visible, Relation relation,
03869                                 HeapTuple tuple, Buffer buffer,
03870                                 Snapshot snapshot)
03871 {
03872     TransactionId xid;
03873     SERIALIZABLEXIDTAG sxidtag;
03874     SERIALIZABLEXID *sxid;
03875     SERIALIZABLEXACT *sxact;
03876     HTSV_Result htsvResult;
03877 
03878     if (!SerializationNeededForRead(relation, snapshot))
03879         return;
03880 
03881     /* Check if someone else has already decided that we need to die */
03882     if (SxactIsDoomed(MySerializableXact))
03883     {
03884         ereport(ERROR,
03885                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
03886                  errmsg("could not serialize access due to read/write dependencies among transactions"),
03887                  errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict out checking."),
03888                  errhint("The transaction might succeed if retried.")));
03889     }
03890 
03891     /*
03892      * Check to see whether the tuple has been written to by a concurrent
03893      * transaction, either to create it not visible to us, or to delete it
03894      * while it is visible to us.  The "visible" bool indicates whether the
03895      * tuple is visible to us, while HeapTupleSatisfiesVacuum checks what else
03896      * is going on with it.
03897      */
03898     htsvResult = HeapTupleSatisfiesVacuum(tuple->t_data, TransactionXmin, buffer);
03899     switch (htsvResult)
03900     {
03901         case HEAPTUPLE_LIVE:
03902             if (visible)
03903                 return;
03904             xid = HeapTupleHeaderGetXmin(tuple->t_data);
03905             break;
03906         case HEAPTUPLE_RECENTLY_DEAD:
03907             if (!visible)
03908                 return;
03909             xid = HeapTupleHeaderGetUpdateXid(tuple->t_data);
03910             break;
03911         case HEAPTUPLE_DELETE_IN_PROGRESS:
03912             xid = HeapTupleHeaderGetUpdateXid(tuple->t_data);
03913             break;
03914         case HEAPTUPLE_INSERT_IN_PROGRESS:
03915             xid = HeapTupleHeaderGetXmin(tuple->t_data);
03916             break;
03917         case HEAPTUPLE_DEAD:
03918             return;
03919         default:
03920 
03921             /*
03922              * The only way to get to this default clause is if a new value is
03923              * added to the enum type without adding it to this switch
03924              * statement.  That's a bug, so elog.
03925              */
03926             elog(ERROR, "unrecognized return value from HeapTupleSatisfiesVacuum: %u", htsvResult);
03927 
03928             /*
03929              * In spite of having all enum values covered and calling elog on
03930              * this default, some compilers think this is a code path which
03931              * allows xid to be used below without initialization. Silence
03932              * that warning.
03933              */
03934             xid = InvalidTransactionId;
03935     }
03936     Assert(TransactionIdIsValid(xid));
03937     Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
03938 
03939     /*
03940      * Find top level xid.  Bail out if xid is too early to be a conflict, or
03941      * if it's our own xid.
03942      */
03943     if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
03944         return;
03945     xid = SubTransGetTopmostTransaction(xid);
03946     if (TransactionIdPrecedes(xid, TransactionXmin))
03947         return;
03948     if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
03949         return;
03950 
03951     /*
03952      * Find sxact or summarized info for the top level xid.
03953      */
03954     sxidtag.xid = xid;
03955     LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
03956     sxid = (SERIALIZABLEXID *)
03957         hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
03958     if (!sxid)
03959     {
03960         /*
03961          * Transaction not found in "normal" SSI structures.  Check whether it
03962          * got pushed out to SLRU storage for "old committed" transactions.
03963          */
03964         SerCommitSeqNo conflictCommitSeqNo;
03965 
03966         conflictCommitSeqNo = OldSerXidGetMinConflictCommitSeqNo(xid);
03967         if (conflictCommitSeqNo != 0)
03968         {
03969             if (conflictCommitSeqNo != InvalidSerCommitSeqNo
03970                 && (!SxactIsReadOnly(MySerializableXact)
03971                     || conflictCommitSeqNo
03972                     <= MySerializableXact->SeqNo.lastCommitBeforeSnapshot))
03973                 ereport(ERROR,
03974                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
03975                          errmsg("could not serialize access due to read/write dependencies among transactions"),
03976                          errdetail_internal("Reason code: Canceled on conflict out to old pivot %u.", xid),
03977                       errhint("The transaction might succeed if retried.")));
03978 
03979             if (SxactHasSummaryConflictIn(MySerializableXact)
03980                 || !SHMQueueEmpty(&MySerializableXact->inConflicts))
03981                 ereport(ERROR,
03982                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
03983                          errmsg("could not serialize access due to read/write dependencies among transactions"),
03984                          errdetail_internal("Reason code: Canceled on identification as a pivot, with conflict out to old committed transaction %u.", xid),
03985                       errhint("The transaction might succeed if retried.")));
03986 
03987             MySerializableXact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
03988         }
03989 
03990         /* It's not serializable or otherwise not important. */
03991         LWLockRelease(SerializableXactHashLock);
03992         return;
03993     }
03994     sxact = sxid->myXact;
03995     Assert(TransactionIdEquals(sxact->topXid, xid));
03996     if (sxact == MySerializableXact || SxactIsDoomed(sxact))
03997     {
03998         /* Can't conflict with ourself or a transaction that will roll back. */
03999         LWLockRelease(SerializableXactHashLock);
04000         return;
04001     }
04002 
04003     /*
04004      * We have a conflict out to a transaction which has a conflict out to a
04005      * summarized transaction.  That summarized transaction must have
04006      * committed first, and we can't tell when it committed in relation to our
04007      * snapshot acquisition, so something needs to be canceled.
04008      */
04009     if (SxactHasSummaryConflictOut(sxact))
04010     {
04011         if (!SxactIsPrepared(sxact))
04012         {
04013             sxact->flags |= SXACT_FLAG_DOOMED;
04014             LWLockRelease(SerializableXactHashLock);
04015             return;
04016         }
04017         else
04018         {
04019             LWLockRelease(SerializableXactHashLock);
04020             ereport(ERROR,
04021                     (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
04022                      errmsg("could not serialize access due to read/write dependencies among transactions"),
04023                      errdetail_internal("Reason code: Canceled on conflict out to old pivot."),
04024                      errhint("The transaction might succeed if retried.")));
04025         }
04026     }
04027 
04028     /*
04029      * If this is a read-only transaction and the writing transaction has
04030      * committed, and it doesn't have a rw-conflict to a transaction which
04031      * committed before it, no conflict.
04032      */
04033     if (SxactIsReadOnly(MySerializableXact)
04034         && SxactIsCommitted(sxact)
04035         && !SxactHasSummaryConflictOut(sxact)
04036         && (!SxactHasConflictOut(sxact)
04037             || MySerializableXact->SeqNo.lastCommitBeforeSnapshot < sxact->SeqNo.earliestOutConflictCommit))
04038     {
04039         /* Read-only transaction will appear to run first.  No conflict. */
04040         LWLockRelease(SerializableXactHashLock);
04041         return;
04042     }
04043 
04044     if (!XidIsConcurrent(xid))
04045     {
04046         /* This write was already in our snapshot; no conflict. */
04047         LWLockRelease(SerializableXactHashLock);
04048         return;
04049     }
04050 
04051     if (RWConflictExists(MySerializableXact, sxact))
04052     {
04053         /* We don't want duplicate conflict records in the list. */
04054         LWLockRelease(SerializableXactHashLock);
04055         return;
04056     }
04057 
04058     /*
04059      * Flag the conflict.  But first, if this conflict creates a dangerous
04060      * structure, ereport an error.
04061      */
04062     FlagRWConflict(MySerializableXact, sxact);
04063     LWLockRelease(SerializableXactHashLock);
04064 }
04065 
04066 /*
04067  * Check a particular target for rw-dependency conflict in. A subroutine of
04068  * CheckForSerializableConflictIn().
04069  */
04070 static void
04071 CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
04072 {
04073     uint32      targettaghash;
04074     LWLockId    partitionLock;
04075     PREDICATELOCKTARGET *target;
04076     PREDICATELOCK *predlock;
04077     PREDICATELOCK *mypredlock = NULL;
04078     PREDICATELOCKTAG mypredlocktag;
04079 
04080     Assert(MySerializableXact != InvalidSerializableXact);
04081 
04082     /*
04083      * The same hash and LW lock apply to the lock target and the lock itself.
04084      */
04085     targettaghash = PredicateLockTargetTagHashCode(targettag);
04086     partitionLock = PredicateLockHashPartitionLock(targettaghash);
04087     LWLockAcquire(partitionLock, LW_SHARED);
04088     target = (PREDICATELOCKTARGET *)
04089         hash_search_with_hash_value(PredicateLockTargetHash,
04090                                     targettag, targettaghash,
04091                                     HASH_FIND, NULL);
04092     if (!target)
04093     {
04094         /* Nothing has this target locked; we're done here. */
04095         LWLockRelease(partitionLock);
04096         return;
04097     }
04098 
04099     /*
04100      * Each lock for an overlapping transaction represents a conflict: a
04101      * rw-dependency in to this transaction.
04102      */
04103     predlock = (PREDICATELOCK *)
04104         SHMQueueNext(&(target->predicateLocks),
04105                      &(target->predicateLocks),
04106                      offsetof(PREDICATELOCK, targetLink));
04107     LWLockAcquire(SerializableXactHashLock, LW_SHARED);
04108     while (predlock)
04109     {
04110         SHM_QUEUE  *predlocktargetlink;
04111         PREDICATELOCK *nextpredlock;
04112         SERIALIZABLEXACT *sxact;
04113 
04114         predlocktargetlink = &(predlock->targetLink);
04115         nextpredlock = (PREDICATELOCK *)
04116             SHMQueueNext(&(target->predicateLocks),
04117                          predlocktargetlink,
04118                          offsetof(PREDICATELOCK, targetLink));
04119 
04120         sxact = predlock->tag.myXact;
04121         if (sxact == MySerializableXact)
04122         {
04123             /*
04124              * If we're getting a write lock on a tuple, we don't need a
04125              * predicate (SIREAD) lock on the same tuple. We can safely remove
04126              * our SIREAD lock, but we'll defer doing so until after the loop
04127              * because that requires upgrading to an exclusive partition lock.
04128              *
04129              * We can't use this optimization within a subtransaction because
04130              * the subtransaction could roll back, and we would be left
04131              * without any lock at the top level.
04132              */
04133             if (!IsSubTransaction()
04134                 && GET_PREDICATELOCKTARGETTAG_OFFSET(*targettag))
04135             {
04136                 mypredlock = predlock;
04137                 mypredlocktag = predlock->tag;
04138             }
04139         }
04140         else if (!SxactIsDoomed(sxact)
04141                  && (!SxactIsCommitted(sxact)
04142                      || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
04143                                               sxact->finishedBefore))
04144                  && !RWConflictExists(sxact, MySerializableXact))
04145         {
04146             LWLockRelease(SerializableXactHashLock);
04147             LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
04148 
04149             /*
04150              * Re-check after getting exclusive lock because the other
04151              * transaction may have flagged a conflict.
04152              */
04153             if (!SxactIsDoomed(sxact)
04154                 && (!SxactIsCommitted(sxact)
04155                     || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
04156                                              sxact->finishedBefore))
04157                 && !RWConflictExists(sxact, MySerializableXact))
04158             {
04159                 FlagRWConflict(sxact, MySerializableXact);
04160             }
04161 
04162             LWLockRelease(SerializableXactHashLock);
04163             LWLockAcquire(SerializableXactHashLock, LW_SHARED);
04164         }
04165 
04166         predlock = nextpredlock;
04167     }
04168     LWLockRelease(SerializableXactHashLock);
04169     LWLockRelease(partitionLock);
04170 
04171     /*
04172      * If we found one of our own SIREAD locks to remove, remove it now.
04173      *
04174      * At this point our transaction already has an ExclusiveRowLock on the
04175      * relation, so we are OK to drop the predicate lock on the tuple, if
04176      * found, without fearing that another write against the tuple will occur
04177      * before the MVCC information makes it to the buffer.
04178      */
04179     if (mypredlock != NULL)
04180     {
04181         uint32      predlockhashcode;
04182         PREDICATELOCK *rmpredlock;
04183 
04184         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
04185         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
04186         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
04187 
04188         /*
04189          * Remove the predicate lock from shared memory, if it wasn't removed
04190          * while the locks were released.  One way that could happen is from
04191          * autovacuum cleaning up an index.
04192          */
04193         predlockhashcode = PredicateLockHashCodeFromTargetHashCode
04194             (&mypredlocktag, targettaghash);
04195         rmpredlock = (PREDICATELOCK *)
04196             hash_search_with_hash_value(PredicateLockHash,
04197                                         &mypredlocktag,
04198                                         predlockhashcode,
04199                                         HASH_FIND, NULL);
04200         if (rmpredlock != NULL)
04201         {
04202             Assert(rmpredlock == mypredlock);
04203 
04204             SHMQueueDelete(&(mypredlock->targetLink));
04205             SHMQueueDelete(&(mypredlock->xactLink));
04206 
04207             rmpredlock = (PREDICATELOCK *)
04208                 hash_search_with_hash_value(PredicateLockHash,
04209                                             &mypredlocktag,
04210                                             predlockhashcode,
04211                                             HASH_REMOVE, NULL);
04212             Assert(rmpredlock == mypredlock);
04213 
04214             RemoveTargetIfNoLongerUsed(target, targettaghash);
04215         }
04216 
04217         LWLockRelease(SerializableXactHashLock);
04218         LWLockRelease(partitionLock);
04219         LWLockRelease(SerializablePredicateLockListLock);
04220 
04221         if (rmpredlock != NULL)
04222         {
04223             /*
04224              * Remove entry in local lock table if it exists. It's OK if it
04225              * doesn't exist; that means the lock was transferred to a new
04226              * target by a different backend.
04227              */
04228             hash_search_with_hash_value(LocalPredicateLockHash,
04229                                         targettag, targettaghash,
04230                                         HASH_REMOVE, NULL);
04231 
04232             DecrementParentLocks(targettag);
04233         }
04234     }
04235 }
04236 
04237 /*
04238  * CheckForSerializableConflictIn
04239  *      We are writing the given tuple.  If that indicates a rw-conflict
04240  *      in from another serializable transaction, take appropriate action.
04241  *
04242  * Skip checking for any granularity for which a parameter is missing.
04243  *
04244  * A tuple update or delete is in conflict if we have a predicate lock
04245  * against the relation or page in which the tuple exists, or against the
04246  * tuple itself.
04247  */
04248 void
04249 CheckForSerializableConflictIn(Relation relation, HeapTuple tuple,
04250                                Buffer buffer)
04251 {
04252     PREDICATELOCKTARGETTAG targettag;
04253 
04254     if (!SerializationNeededForWrite(relation))
04255         return;
04256 
04257     /* Check if someone else has already decided that we need to die */
04258     if (SxactIsDoomed(MySerializableXact))
04259         ereport(ERROR,
04260                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
04261                  errmsg("could not serialize access due to read/write dependencies among transactions"),
04262                  errdetail_internal("Reason code: Canceled on identification as a pivot, during conflict in checking."),
04263                  errhint("The transaction might succeed if retried.")));
04264 
04265     /*
04266      * We're doing a write which might cause rw-conflicts now or later.
04267      * Memorize that fact.
04268      */
04269     MyXactDidWrite = true;
04270 
04271     /*
04272      * It is important that we check for locks from the finest granularity to
04273      * the coarsest granularity, so that granularity promotion doesn't cause
04274      * us to miss a lock.  The new (coarser) lock will be acquired before the
04275      * old (finer) locks are released.
04276      *
04277      * It is not possible to take and hold a lock across the checks for all
04278      * granularities because each target could be in a separate partition.
04279      */
04280     if (tuple != NULL)
04281     {
04282         SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
04283                                          relation->rd_node.dbNode,
04284                                          relation->rd_id,
04285                          ItemPointerGetBlockNumber(&(tuple->t_data->t_ctid)),
04286                         ItemPointerGetOffsetNumber(&(tuple->t_data->t_ctid)),
04287                                       HeapTupleHeaderGetXmin(tuple->t_data));
04288         CheckTargetForConflictsIn(&targettag);
04289     }
04290 
04291     if (BufferIsValid(buffer))
04292     {
04293         SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
04294                                         relation->rd_node.dbNode,
04295                                         relation->rd_id,
04296                                         BufferGetBlockNumber(buffer));
04297         CheckTargetForConflictsIn(&targettag);
04298     }
04299 
04300     SET_PREDICATELOCKTARGETTAG_RELATION(targettag,
04301                                         relation->rd_node.dbNode,
04302                                         relation->rd_id);
04303     CheckTargetForConflictsIn(&targettag);
04304 }
04305 
04306 /*
04307  * CheckTableForSerializableConflictIn
04308  *      The entire table is going through a DDL-style logical mass delete
04309  *      like TRUNCATE or DROP TABLE.  If that causes a rw-conflict in from
04310  *      another serializable transaction, take appropriate action.
04311  *
04312  * While these operations do not operate entirely within the bounds of
04313  * snapshot isolation, they can occur inside a serializable transaction, and
04314  * will logically occur after any reads which saw rows which were destroyed
04315  * by these operations, so we do what we can to serialize properly under
04316  * SSI.
04317  *
04318  * The relation passed in must be a heap relation. Any predicate lock of any
04319  * granularity on the heap will cause a rw-conflict in to this transaction.
04320  * Predicate locks on indexes do not matter because they only exist to guard
04321  * against conflicting inserts into the index, and this is a mass *delete*.
04322  * When a table is truncated or dropped, the index will also be truncated
04323  * or dropped, and we'll deal with locks on the index when that happens.
04324  *
04325  * Dropping or truncating a table also needs to drop any existing predicate
04326  * locks on heap tuples or pages, because they're about to go away. This
04327  * should be done before altering the predicate locks because the transaction
04328  * could be rolled back because of a conflict, in which case the lock changes
04329  * are not needed. (At the moment, we don't actually bother to drop the
04330  * existing locks on a dropped or truncated table at the moment. That might
04331  * lead to some false positives, but it doesn't seem worth the trouble.)
04332  */
04333 void
04334 CheckTableForSerializableConflictIn(Relation relation)
04335 {
04336     HASH_SEQ_STATUS seqstat;
04337     PREDICATELOCKTARGET *target;
04338     Oid         dbId;
04339     Oid         heapId;
04340     int         i;
04341 
04342     /*
04343      * Bail out quickly if there are no serializable transactions running.
04344      * It's safe to check this without taking locks because the caller is
04345      * holding an ACCESS EXCLUSIVE lock on the relation.  No new locks which
04346      * would matter here can be acquired while that is held.
04347      */
04348     if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
04349         return;
04350 
04351     if (!SerializationNeededForWrite(relation))
04352         return;
04353 
04354     /*
04355      * We're doing a write which might cause rw-conflicts now or later.
04356      * Memorize that fact.
04357      */
04358     MyXactDidWrite = true;
04359 
04360     Assert(relation->rd_index == NULL); /* not an index relation */
04361 
04362     dbId = relation->rd_node.dbNode;
04363     heapId = relation->rd_id;
04364 
04365     LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
04366     for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
04367         LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
04368     LWLockAcquire(SerializableXactHashLock, LW_SHARED);
04369 
04370     /* Scan through target list */
04371     hash_seq_init(&seqstat, PredicateLockTargetHash);
04372 
04373     while ((target = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
04374     {
04375         PREDICATELOCK *predlock;
04376 
04377         /*
04378          * Check whether this is a target which needs attention.
04379          */
04380         if (GET_PREDICATELOCKTARGETTAG_RELATION(target->tag) != heapId)
04381             continue;           /* wrong relation id */
04382         if (GET_PREDICATELOCKTARGETTAG_DB(target->tag) != dbId)
04383             continue;           /* wrong database id */
04384 
04385         /*
04386          * Loop through locks for this target and flag conflicts.
04387          */
04388         predlock = (PREDICATELOCK *)
04389             SHMQueueNext(&(target->predicateLocks),
04390                          &(target->predicateLocks),
04391                          offsetof(PREDICATELOCK, targetLink));
04392         while (predlock)
04393         {
04394             PREDICATELOCK *nextpredlock;
04395 
04396             nextpredlock = (PREDICATELOCK *)
04397                 SHMQueueNext(&(target->predicateLocks),
04398                              &(predlock->targetLink),
04399                              offsetof(PREDICATELOCK, targetLink));
04400 
04401             if (predlock->tag.myXact != MySerializableXact
04402               && !RWConflictExists(predlock->tag.myXact, MySerializableXact))
04403             {
04404                 FlagRWConflict(predlock->tag.myXact, MySerializableXact);
04405             }
04406 
04407             predlock = nextpredlock;
04408         }
04409     }
04410 
04411     /* Release locks in reverse order */
04412     LWLockRelease(SerializableXactHashLock);
04413     for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
04414         LWLockRelease(FirstPredicateLockMgrLock + i);
04415     LWLockRelease(SerializablePredicateLockListLock);
04416 }
04417 
04418 
04419 /*
04420  * Flag a rw-dependency between two serializable transactions.
04421  *
04422  * The caller is responsible for ensuring that we have a LW lock on
04423  * the transaction hash table.
04424  */
04425 static void
04426 FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
04427 {
04428     Assert(reader != writer);
04429 
04430     /* First, see if this conflict causes failure. */
04431     OnConflict_CheckForSerializationFailure(reader, writer);
04432 
04433     /* Actually do the conflict flagging. */
04434     if (reader == OldCommittedSxact)
04435         writer->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
04436     else if (writer == OldCommittedSxact)
04437         reader->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
04438     else
04439         SetRWConflict(reader, writer);
04440 }
04441 
04442 /*----------------------------------------------------------------------------
04443  * We are about to add a RW-edge to the dependency graph - check that we don't
04444  * introduce a dangerous structure by doing so, and abort one of the
04445  * transactions if so.
04446  *
04447  * A serialization failure can only occur if there is a dangerous structure
04448  * in the dependency graph:
04449  *
04450  *      Tin ------> Tpivot ------> Tout
04451  *            rw             rw
04452  *
04453  * Furthermore, Tout must commit first.
04454  *
04455  * One more optimization is that if Tin is declared READ ONLY (or commits
04456  * without writing), we can only have a problem if Tout committed before Tin
04457  * acquired its snapshot.
04458  *----------------------------------------------------------------------------
04459  */
04460 static void
04461 OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
04462                                         SERIALIZABLEXACT *writer)
04463 {
04464     bool        failure;
04465     RWConflict  conflict;
04466 
04467     Assert(LWLockHeldByMe(SerializableXactHashLock));
04468 
04469     failure = false;
04470 
04471     /*------------------------------------------------------------------------
04472      * Check for already-committed writer with rw-conflict out flagged
04473      * (conflict-flag on W means that T2 committed before W):
04474      *
04475      *      R ------> W ------> T2
04476      *          rw        rw
04477      *
04478      * That is a dangerous structure, so we must abort. (Since the writer
04479      * has already committed, we must be the reader)
04480      *------------------------------------------------------------------------
04481      */
04482     if (SxactIsCommitted(writer)
04483       && (SxactHasConflictOut(writer) || SxactHasSummaryConflictOut(writer)))
04484         failure = true;
04485 
04486     /*------------------------------------------------------------------------
04487      * Check whether the writer has become a pivot with an out-conflict
04488      * committed transaction (T2), and T2 committed first:
04489      *
04490      *      R ------> W ------> T2
04491      *          rw        rw
04492      *
04493      * Because T2 must've committed first, there is no anomaly if:
04494      * - the reader committed before T2
04495      * - the writer committed before T2
04496      * - the reader is a READ ONLY transaction and the reader was concurrent
04497      *   with T2 (= reader acquired its snapshot before T2 committed)
04498      *
04499      * We also handle the case that T2 is prepared but not yet committed
04500      * here. In that case T2 has already checked for conflicts, so if it
04501      * commits first, making the above conflict real, it's too late for it
04502      * to abort.
04503      *------------------------------------------------------------------------
04504      */
04505     if (!failure)
04506     {
04507         if (SxactHasSummaryConflictOut(writer))
04508         {
04509             failure = true;
04510             conflict = NULL;
04511         }
04512         else
04513             conflict = (RWConflict)
04514                 SHMQueueNext(&writer->outConflicts,
04515                              &writer->outConflicts,
04516                              offsetof(RWConflictData, outLink));
04517         while (conflict)
04518         {
04519             SERIALIZABLEXACT *t2 = conflict->sxactIn;
04520 
04521             if (SxactIsPrepared(t2)
04522                 && (!SxactIsCommitted(reader)
04523                     || t2->prepareSeqNo <= reader->commitSeqNo)
04524                 && (!SxactIsCommitted(writer)
04525                     || t2->prepareSeqNo <= writer->commitSeqNo)
04526                 && (!SxactIsReadOnly(reader)
04527               || t2->prepareSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
04528             {
04529                 failure = true;
04530                 break;
04531             }
04532             conflict = (RWConflict)
04533                 SHMQueueNext(&writer->outConflicts,
04534                              &conflict->outLink,
04535                              offsetof(RWConflictData, outLink));
04536         }
04537     }
04538 
04539     /*------------------------------------------------------------------------
04540      * Check whether the reader has become a pivot with a writer
04541      * that's committed (or prepared):
04542      *
04543      *      T0 ------> R ------> W
04544      *           rw        rw
04545      *
04546      * Because W must've committed first for an anomaly to occur, there is no
04547      * anomaly if:
04548      * - T0 committed before the writer
04549      * - T0 is READ ONLY, and overlaps the writer
04550      *------------------------------------------------------------------------
04551      */
04552     if (!failure && SxactIsPrepared(writer) && !SxactIsReadOnly(reader))
04553     {
04554         if (SxactHasSummaryConflictIn(reader))
04555         {
04556             failure = true;
04557             conflict = NULL;
04558         }
04559         else
04560             conflict = (RWConflict)
04561                 SHMQueueNext(&reader->inConflicts,
04562                              &reader->inConflicts,
04563                              offsetof(RWConflictData, inLink));
04564         while (conflict)
04565         {
04566             SERIALIZABLEXACT *t0 = conflict->sxactOut;
04567 
04568             if (!SxactIsDoomed(t0)
04569                 && (!SxactIsCommitted(t0)
04570                     || t0->commitSeqNo >= writer->prepareSeqNo)
04571                 && (!SxactIsReadOnly(t0)
04572               || t0->SeqNo.lastCommitBeforeSnapshot >= writer->prepareSeqNo))
04573             {
04574                 failure = true;
04575                 break;
04576             }
04577             conflict = (RWConflict)
04578                 SHMQueueNext(&reader->inConflicts,
04579                              &conflict->inLink,
04580                              offsetof(RWConflictData, inLink));
04581         }
04582     }
04583 
04584     if (failure)
04585     {
04586         /*
04587          * We have to kill a transaction to avoid a possible anomaly from
04588          * occurring. If the writer is us, we can just ereport() to cause a
04589          * transaction abort. Otherwise we flag the writer for termination,
04590          * causing it to abort when it tries to commit. However, if the writer
04591          * is a prepared transaction, already prepared, we can't abort it
04592          * anymore, so we have to kill the reader instead.
04593          */
04594         if (MySerializableXact == writer)
04595         {
04596             LWLockRelease(SerializableXactHashLock);
04597             ereport(ERROR,
04598                     (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
04599                      errmsg("could not serialize access due to read/write dependencies among transactions"),
04600                      errdetail_internal("Reason code: Canceled on identification as a pivot, during write."),
04601                      errhint("The transaction might succeed if retried.")));
04602         }
04603         else if (SxactIsPrepared(writer))
04604         {
04605             LWLockRelease(SerializableXactHashLock);
04606 
04607             /* if we're not the writer, we have to be the reader */
04608             Assert(MySerializableXact == reader);
04609             ereport(ERROR,
04610                     (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
04611                      errmsg("could not serialize access due to read/write dependencies among transactions"),
04612                      errdetail_internal("Reason code: Canceled on conflict out to pivot %u, during read.", writer->topXid),
04613                      errhint("The transaction might succeed if retried.")));
04614         }
04615         writer->flags |= SXACT_FLAG_DOOMED;
04616     }
04617 }
04618 
04619 /*
04620  * PreCommit_CheckForSerializableConflicts
04621  *      Check for dangerous structures in a serializable transaction
04622  *      at commit.
04623  *
04624  * We're checking for a dangerous structure as each conflict is recorded.
04625  * The only way we could have a problem at commit is if this is the "out"
04626  * side of a pivot, and neither the "in" side nor the pivot has yet
04627  * committed.
04628  *
04629  * If a dangerous structure is found, the pivot (the near conflict) is
04630  * marked for death, because rolling back another transaction might mean
04631  * that we flail without ever making progress.  This transaction is
04632  * committing writes, so letting it commit ensures progress.  If we
04633  * canceled the far conflict, it might immediately fail again on retry.
04634  */
04635 void
04636 PreCommit_CheckForSerializationFailure(void)
04637 {
04638     RWConflict  nearConflict;
04639 
04640     if (MySerializableXact == InvalidSerializableXact)
04641         return;
04642 
04643     Assert(IsolationIsSerializable());
04644 
04645     LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
04646 
04647     /* Check if someone else has already decided that we need to die */
04648     if (SxactIsDoomed(MySerializableXact))
04649     {
04650         LWLockRelease(SerializableXactHashLock);
04651         ereport(ERROR,
04652                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
04653                  errmsg("could not serialize access due to read/write dependencies among transactions"),
04654                  errdetail_internal("Reason code: Canceled on identification as a pivot, during commit attempt."),
04655                  errhint("The transaction might succeed if retried.")));
04656     }
04657 
04658     nearConflict = (RWConflict)
04659         SHMQueueNext(&MySerializableXact->inConflicts,
04660                      &MySerializableXact->inConflicts,
04661                      offsetof(RWConflictData, inLink));
04662     while (nearConflict)
04663     {
04664         if (!SxactIsCommitted(nearConflict->sxactOut)
04665             && !SxactIsDoomed(nearConflict->sxactOut))
04666         {
04667             RWConflict  farConflict;
04668 
04669             farConflict = (RWConflict)
04670                 SHMQueueNext(&nearConflict->sxactOut->inConflicts,
04671                              &nearConflict->sxactOut->inConflicts,
04672                              offsetof(RWConflictData, inLink));
04673             while (farConflict)
04674             {
04675                 if (farConflict->sxactOut == MySerializableXact
04676                     || (!SxactIsCommitted(farConflict->sxactOut)
04677                         && !SxactIsReadOnly(farConflict->sxactOut)
04678                         && !SxactIsDoomed(farConflict->sxactOut)))
04679                 {
04680                     /*
04681                      * Normally, we kill the pivot transaction to make sure we
04682                      * make progress if the failing transaction is retried.
04683                      * However, we can't kill it if it's already prepared, so
04684                      * in that case we commit suicide instead.
04685                      */
04686                     if (SxactIsPrepared(nearConflict->sxactOut))
04687                     {
04688                         LWLockRelease(SerializableXactHashLock);
04689                         ereport(ERROR,
04690                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
04691                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
04692                                  errdetail_internal("Reason code: Canceled on commit attempt with conflict in from prepared pivot."),
04693                                  errhint("The transaction might succeed if retried.")));
04694                     }
04695                     nearConflict->sxactOut->flags |= SXACT_FLAG_DOOMED;
04696                     break;
04697                 }
04698                 farConflict = (RWConflict)
04699                     SHMQueueNext(&nearConflict->sxactOut->inConflicts,
04700                                  &farConflict->inLink,
04701                                  offsetof(RWConflictData, inLink));
04702             }
04703         }
04704 
04705         nearConflict = (RWConflict)
04706             SHMQueueNext(&MySerializableXact->inConflicts,
04707                          &nearConflict->inLink,
04708                          offsetof(RWConflictData, inLink));
04709     }
04710 
04711     MySerializableXact->prepareSeqNo = ++(PredXact->LastSxactCommitSeqNo);
04712     MySerializableXact->flags |= SXACT_FLAG_PREPARED;
04713 
04714     LWLockRelease(SerializableXactHashLock);
04715 }
04716 
04717 /*------------------------------------------------------------------------*/
04718 
04719 /*
04720  * Two-phase commit support
04721  */
04722 
04723 /*
04724  * AtPrepare_Locks
04725  *      Do the preparatory work for a PREPARE: make 2PC state file
04726  *      records for all predicate locks currently held.
04727  */
04728 void
04729 AtPrepare_PredicateLocks(void)
04730 {
04731     PREDICATELOCK *predlock;
04732     SERIALIZABLEXACT *sxact;
04733     TwoPhasePredicateRecord record;
04734     TwoPhasePredicateXactRecord *xactRecord;
04735     TwoPhasePredicateLockRecord *lockRecord;
04736 
04737     sxact = MySerializableXact;
04738     xactRecord = &(record.data.xactRecord);
04739     lockRecord = &(record.data.lockRecord);
04740 
04741     if (MySerializableXact == InvalidSerializableXact)
04742         return;
04743 
04744     /* Generate a xact record for our SERIALIZABLEXACT */
04745     record.type = TWOPHASEPREDICATERECORD_XACT;
04746     xactRecord->xmin = MySerializableXact->xmin;
04747     xactRecord->flags = MySerializableXact->flags;
04748 
04749     /*
04750      * Note that we don't include the list of conflicts in our out in the
04751      * statefile, because new conflicts can be added even after the
04752      * transaction prepares. We'll just make a conservative assumption during
04753      * recovery instead.
04754      */
04755 
04756     RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
04757                            &record, sizeof(record));
04758 
04759     /*
04760      * Generate a lock record for each lock.
04761      *
04762      * To do this, we need to walk the predicate lock list in our sxact rather
04763      * than using the local predicate lock table because the latter is not
04764      * guaranteed to be accurate.
04765      */
04766     LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
04767 
04768     predlock = (PREDICATELOCK *)
04769         SHMQueueNext(&(sxact->predicateLocks),
04770                      &(sxact->predicateLocks),
04771                      offsetof(PREDICATELOCK, xactLink));
04772 
04773     while (predlock != NULL)
04774     {
04775         record.type = TWOPHASEPREDICATERECORD_LOCK;
04776         lockRecord->target = predlock->tag.myTarget->tag;
04777 
04778         RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
04779                                &record, sizeof(record));
04780 
04781         predlock = (PREDICATELOCK *)
04782             SHMQueueNext(&(sxact->predicateLocks),
04783                          &(predlock->xactLink),
04784                          offsetof(PREDICATELOCK, xactLink));
04785     }
04786 
04787     LWLockRelease(SerializablePredicateLockListLock);
04788 }
04789 
04790 /*
04791  * PostPrepare_Locks
04792  *      Clean up after successful PREPARE. Unlike the non-predicate
04793  *      lock manager, we do not need to transfer locks to a dummy
04794  *      PGPROC because our SERIALIZABLEXACT will stay around
04795  *      anyway. We only need to clean up our local state.
04796  */
04797 void
04798 PostPrepare_PredicateLocks(TransactionId xid)
04799 {
04800     if (MySerializableXact == InvalidSerializableXact)
04801         return;
04802 
04803     Assert(SxactIsPrepared(MySerializableXact));
04804 
04805     MySerializableXact->pid = 0;
04806 
04807     hash_destroy(LocalPredicateLockHash);
04808     LocalPredicateLockHash = NULL;
04809 
04810     MySerializableXact = InvalidSerializableXact;
04811     MyXactDidWrite = false;
04812 }
04813 
04814 /*
04815  * PredicateLockTwoPhaseFinish
04816  *      Release a prepared transaction's predicate locks once it
04817  *      commits or aborts.
04818  */
04819 void
04820 PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
04821 {
04822     SERIALIZABLEXID *sxid;
04823     SERIALIZABLEXIDTAG sxidtag;
04824 
04825     sxidtag.xid = xid;
04826 
04827     LWLockAcquire(SerializableXactHashLock, LW_SHARED);
04828     sxid = (SERIALIZABLEXID *)
04829         hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
04830     LWLockRelease(SerializableXactHashLock);
04831 
04832     /* xid will not be found if it wasn't a serializable transaction */
04833     if (sxid == NULL)
04834         return;
04835 
04836     /* Release its locks */
04837     MySerializableXact = sxid->myXact;
04838     MyXactDidWrite = true;      /* conservatively assume that we wrote
04839                                  * something */
04840     ReleasePredicateLocks(isCommit);
04841 }
04842 
04843 /*
04844  * Re-acquire a predicate lock belonging to a transaction that was prepared.
04845  */
04846 void
04847 predicatelock_twophase_recover(TransactionId xid, uint16 info,
04848                                void *recdata, uint32 len)
04849 {
04850     TwoPhasePredicateRecord *record;
04851 
04852     Assert(len == sizeof(TwoPhasePredicateRecord));
04853 
04854     record = (TwoPhasePredicateRecord *) recdata;
04855 
04856     Assert((record->type == TWOPHASEPREDICATERECORD_XACT) ||
04857            (record->type == TWOPHASEPREDICATERECORD_LOCK));
04858 
04859     if (record->type == TWOPHASEPREDICATERECORD_XACT)
04860     {
04861         /* Per-transaction record. Set up a SERIALIZABLEXACT. */
04862         TwoPhasePredicateXactRecord *xactRecord;
04863         SERIALIZABLEXACT *sxact;
04864         SERIALIZABLEXID *sxid;
04865         SERIALIZABLEXIDTAG sxidtag;
04866         bool        found;
04867 
04868         xactRecord = (TwoPhasePredicateXactRecord *) &record->data.xactRecord;
04869 
04870         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
04871         sxact = CreatePredXact();
04872         if (!sxact)
04873             ereport(ERROR,
04874                     (errcode(ERRCODE_OUT_OF_MEMORY),
04875                      errmsg("out of shared memory")));
04876 
04877         /* vxid for a prepared xact is InvalidBackendId/xid; no pid */
04878         sxact->vxid.backendId = InvalidBackendId;
04879         sxact->vxid.localTransactionId = (LocalTransactionId) xid;
04880         sxact->pid = 0;
04881 
04882         /* a prepared xact hasn't committed yet */
04883         sxact->prepareSeqNo = RecoverySerCommitSeqNo;
04884         sxact->commitSeqNo = InvalidSerCommitSeqNo;
04885         sxact->finishedBefore = InvalidTransactionId;
04886 
04887         sxact->SeqNo.lastCommitBeforeSnapshot = RecoverySerCommitSeqNo;
04888 
04889         /*
04890          * Don't need to track this; no transactions running at the time the
04891          * recovered xact started are still active, except possibly other
04892          * prepared xacts and we don't care whether those are RO_SAFE or not.
04893          */
04894         SHMQueueInit(&(sxact->possibleUnsafeConflicts));
04895 
04896         SHMQueueInit(&(sxact->predicateLocks));
04897         SHMQueueElemInit(&(sxact->finishedLink));
04898 
04899         sxact->topXid = xid;
04900         sxact->xmin = xactRecord->xmin;
04901         sxact->flags = xactRecord->flags;
04902         Assert(SxactIsPrepared(sxact));
04903         if (!SxactIsReadOnly(sxact))
04904         {
04905             ++(PredXact->WritableSxactCount);
04906             Assert(PredXact->WritableSxactCount <=
04907                    (MaxBackends + max_prepared_xacts));
04908         }
04909 
04910         /*
04911          * We don't know whether the transaction had any conflicts or not, so
04912          * we'll conservatively assume that it had both a conflict in and a
04913          * conflict out, and represent that with the summary conflict flags.
04914          */
04915         SHMQueueInit(&(sxact->outConflicts));
04916         SHMQueueInit(&(sxact->inConflicts));
04917         sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
04918         sxact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
04919 
04920         /* Register the transaction's xid */
04921         sxidtag.xid = xid;
04922         sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
04923                                                &sxidtag,
04924                                                HASH_ENTER, &found);
04925         Assert(sxid != NULL);
04926         Assert(!found);
04927         sxid->myXact = (SERIALIZABLEXACT *) sxact;
04928 
04929         /*
04930          * Update global xmin. Note that this is a special case compared to
04931          * registering a normal transaction, because the global xmin might go
04932          * backwards. That's OK, because until recovery is over we're not
04933          * going to complete any transactions or create any non-prepared
04934          * transactions, so there's no danger of throwing away.
04935          */
04936         if ((!TransactionIdIsValid(PredXact->SxactGlobalXmin)) ||
04937             (TransactionIdFollows(PredXact->SxactGlobalXmin, sxact->xmin)))
04938         {
04939             PredXact->SxactGlobalXmin = sxact->xmin;
04940             PredXact->SxactGlobalXminCount = 1;
04941             OldSerXidSetActiveSerXmin(sxact->xmin);
04942         }
04943         else if (TransactionIdEquals(sxact->xmin, PredXact->SxactGlobalXmin))
04944         {
04945             Assert(PredXact->SxactGlobalXminCount > 0);
04946             PredXact->SxactGlobalXminCount++;
04947         }
04948 
04949         LWLockRelease(SerializableXactHashLock);
04950     }
04951     else if (record->type == TWOPHASEPREDICATERECORD_LOCK)
04952     {
04953         /* Lock record. Recreate the PREDICATELOCK */
04954         TwoPhasePredicateLockRecord *lockRecord;
04955         SERIALIZABLEXID *sxid;
04956         SERIALIZABLEXACT *sxact;
04957         SERIALIZABLEXIDTAG sxidtag;
04958         uint32      targettaghash;
04959 
04960         lockRecord = (TwoPhasePredicateLockRecord *) &record->data.lockRecord;
04961         targettaghash = PredicateLockTargetTagHashCode(&lockRecord->target);
04962 
04963         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
04964         sxidtag.xid = xid;
04965         sxid = (SERIALIZABLEXID *)
04966             hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
04967         LWLockRelease(SerializableXactHashLock);
04968 
04969         Assert(sxid != NULL);
04970         sxact = sxid->myXact;
04971         Assert(sxact != InvalidSerializableXact);
04972 
04973         CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
04974     }
04975 }