Header And Logo

PostgreSQL
| The world's most advanced open source database.

lock.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * lock.c
00004  *    POSTGRES primary lock mechanism
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/storage/lmgr/lock.c
00012  *
00013  * NOTES
00014  *    A lock table is a shared memory hash table.  When
00015  *    a process tries to acquire a lock of a type that conflicts
00016  *    with existing locks, it is put to sleep using the routines
00017  *    in storage/lmgr/proc.c.
00018  *
00019  *    For the most part, this code should be invoked via lmgr.c
00020  *    or another lock-management module, not directly.
00021  *
00022  *  Interface:
00023  *
00024  *  InitLocks(), GetLocksMethodTable(),
00025  *  LockAcquire(), LockRelease(), LockReleaseAll(),
00026  *  LockCheckConflicts(), GrantLock()
00027  *
00028  *-------------------------------------------------------------------------
00029  */
00030 #include "postgres.h"
00031 
00032 #include <signal.h>
00033 #include <unistd.h>
00034 
00035 #include "access/transam.h"
00036 #include "access/twophase.h"
00037 #include "access/twophase_rmgr.h"
00038 #include "miscadmin.h"
00039 #include "pg_trace.h"
00040 #include "pgstat.h"
00041 #include "storage/proc.h"
00042 #include "storage/sinvaladt.h"
00043 #include "storage/spin.h"
00044 #include "storage/standby.h"
00045 #include "utils/memutils.h"
00046 #include "utils/ps_status.h"
00047 #include "utils/resowner_private.h"
00048 
00049 
00050 /* This configuration variable is used to set the lock table size */
00051 int         max_locks_per_xact; /* set by guc.c */
00052 
00053 #define NLOCKENTS() \
00054     mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
00055 
00056 
00057 /*
00058  * Data structures defining the semantics of the standard lock methods.
00059  *
00060  * The conflict table defines the semantics of the various lock modes.
00061  */
00062 static const LOCKMASK LockConflicts[] = {
00063     0,
00064 
00065     /* AccessShareLock */
00066     (1 << AccessExclusiveLock),
00067 
00068     /* RowShareLock */
00069     (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
00070 
00071     /* RowExclusiveLock */
00072     (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
00073     (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
00074 
00075     /* ShareUpdateExclusiveLock */
00076     (1 << ShareUpdateExclusiveLock) |
00077     (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
00078     (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
00079 
00080     /* ShareLock */
00081     (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
00082     (1 << ShareRowExclusiveLock) |
00083     (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
00084 
00085     /* ShareRowExclusiveLock */
00086     (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
00087     (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
00088     (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
00089 
00090     /* ExclusiveLock */
00091     (1 << RowShareLock) |
00092     (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
00093     (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
00094     (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
00095 
00096     /* AccessExclusiveLock */
00097     (1 << AccessShareLock) | (1 << RowShareLock) |
00098     (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
00099     (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
00100     (1 << ExclusiveLock) | (1 << AccessExclusiveLock)
00101 
00102 };
00103 
00104 /* Names of lock modes, for debug printouts */
00105 static const char *const lock_mode_names[] =
00106 {
00107     "INVALID",
00108     "AccessShareLock",
00109     "RowShareLock",
00110     "RowExclusiveLock",
00111     "ShareUpdateExclusiveLock",
00112     "ShareLock",
00113     "ShareRowExclusiveLock",
00114     "ExclusiveLock",
00115     "AccessExclusiveLock"
00116 };
00117 
00118 #ifndef LOCK_DEBUG
00119 static bool Dummy_trace = false;
00120 #endif
00121 
00122 static const LockMethodData default_lockmethod = {
00123     AccessExclusiveLock,        /* highest valid lock mode number */
00124     LockConflicts,
00125     lock_mode_names,
00126 #ifdef LOCK_DEBUG
00127     &Trace_locks
00128 #else
00129     &Dummy_trace
00130 #endif
00131 };
00132 
00133 static const LockMethodData user_lockmethod = {
00134     AccessExclusiveLock,        /* highest valid lock mode number */
00135     LockConflicts,
00136     lock_mode_names,
00137 #ifdef LOCK_DEBUG
00138     &Trace_userlocks
00139 #else
00140     &Dummy_trace
00141 #endif
00142 };
00143 
00144 /*
00145  * map from lock method id to the lock table data structures
00146  */
00147 static const LockMethod LockMethods[] = {
00148     NULL,
00149     &default_lockmethod,
00150     &user_lockmethod
00151 };
00152 
00153 
00154 /* Record that's written to 2PC state file when a lock is persisted */
00155 typedef struct TwoPhaseLockRecord
00156 {
00157     LOCKTAG     locktag;
00158     LOCKMODE    lockmode;
00159 } TwoPhaseLockRecord;
00160 
00161 
00162 /*
00163  * Count of the number of fast path lock slots we believe to be used.  This
00164  * might be higher than the real number if another backend has transferred
00165  * our locks to the primary lock table, but it can never be lower than the
00166  * real value, since only we can acquire locks on our own behalf.
00167  */
00168 static int  FastPathLocalUseCount = 0;
00169 
00170 /* Macros for manipulating proc->fpLockBits */
00171 #define FAST_PATH_BITS_PER_SLOT         3
00172 #define FAST_PATH_LOCKNUMBER_OFFSET     1
00173 #define FAST_PATH_MASK                  ((1 << FAST_PATH_BITS_PER_SLOT) - 1)
00174 #define FAST_PATH_GET_BITS(proc, n) \
00175     (((proc)->fpLockBits >> (FAST_PATH_BITS_PER_SLOT * n)) & FAST_PATH_MASK)
00176 #define FAST_PATH_BIT_POSITION(n, l) \
00177     (AssertMacro((l) >= FAST_PATH_LOCKNUMBER_OFFSET), \
00178      AssertMacro((l) < FAST_PATH_BITS_PER_SLOT+FAST_PATH_LOCKNUMBER_OFFSET), \
00179      AssertMacro((n) < FP_LOCK_SLOTS_PER_BACKEND), \
00180      ((l) - FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT * (n)))
00181 #define FAST_PATH_SET_LOCKMODE(proc, n, l) \
00182      (proc)->fpLockBits |= UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)
00183 #define FAST_PATH_CLEAR_LOCKMODE(proc, n, l) \
00184      (proc)->fpLockBits &= ~(UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l))
00185 #define FAST_PATH_CHECK_LOCKMODE(proc, n, l) \
00186      ((proc)->fpLockBits & (UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)))
00187 
00188 /*
00189  * The fast-path lock mechanism is concerned only with relation locks on
00190  * unshared relations by backends bound to a database.  The fast-path
00191  * mechanism exists mostly to accelerate acquisition and release of locks
00192  * that rarely conflict.  Because ShareUpdateExclusiveLock is
00193  * self-conflicting, it can't use the fast-path mechanism; but it also does
00194  * not conflict with any of the locks that do, so we can ignore it completely.
00195  */
00196 #define EligibleForRelationFastPath(locktag, mode) \
00197     ((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
00198     (locktag)->locktag_type == LOCKTAG_RELATION && \
00199     (locktag)->locktag_field1 == MyDatabaseId && \
00200     MyDatabaseId != InvalidOid && \
00201     (mode) < ShareUpdateExclusiveLock)
00202 #define ConflictsWithRelationFastPath(locktag, mode) \
00203     ((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
00204     (locktag)->locktag_type == LOCKTAG_RELATION && \
00205     (locktag)->locktag_field1 != InvalidOid && \
00206     (mode) > ShareUpdateExclusiveLock)
00207 
00208 static bool FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode);
00209 static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode);
00210 static bool FastPathTransferRelationLocks(LockMethod lockMethodTable,
00211                               const LOCKTAG *locktag, uint32 hashcode);
00212 static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock);
00213 
00214 /*
00215  * To make the fast-path lock mechanism work, we must have some way of
00216  * preventing the use of the fast-path when a conflicting lock might be
00217  * present.  We partition* the locktag space into FAST_PATH_HASH_BUCKETS
00218  * partitions, and maintain an integer count of the number of "strong" lockers
00219  * in each partition.  When any "strong" lockers are present (which is
00220  * hopefully not very often), the fast-path mechanism can't be used, and we
00221  * must fall back to the slower method of pushing matching locks directly
00222  * into the main lock tables.
00223  *
00224  * The deadlock detector does not know anything about the fast path mechanism,
00225  * so any locks that might be involved in a deadlock must be transferred from
00226  * the fast-path queues to the main lock table.
00227  */
00228 
00229 #define FAST_PATH_STRONG_LOCK_HASH_BITS         10
00230 #define FAST_PATH_STRONG_LOCK_HASH_PARTITIONS \
00231     (1 << FAST_PATH_STRONG_LOCK_HASH_BITS)
00232 #define FastPathStrongLockHashPartition(hashcode) \
00233     ((hashcode) % FAST_PATH_STRONG_LOCK_HASH_PARTITIONS)
00234 
00235 typedef struct
00236 {
00237     slock_t     mutex;
00238     uint32      count[FAST_PATH_STRONG_LOCK_HASH_PARTITIONS];
00239 } FastPathStrongRelationLockData;
00240 
00241 FastPathStrongRelationLockData *FastPathStrongRelationLocks;
00242 
00243 
00244 /*
00245  * Pointers to hash tables containing lock state
00246  *
00247  * The LockMethodLockHash and LockMethodProcLockHash hash tables are in
00248  * shared memory; LockMethodLocalHash is local to each backend.
00249  */
00250 static HTAB *LockMethodLockHash;
00251 static HTAB *LockMethodProcLockHash;
00252 static HTAB *LockMethodLocalHash;
00253 
00254 
00255 /* private state for error cleanup */
00256 static LOCALLOCK *StrongLockInProgress;
00257 static LOCALLOCK *awaitedLock;
00258 static ResourceOwner awaitedOwner;
00259 
00260 
00261 #ifdef LOCK_DEBUG
00262 
00263 /*------
00264  * The following configuration options are available for lock debugging:
00265  *
00266  *     TRACE_LOCKS      -- give a bunch of output what's going on in this file
00267  *     TRACE_USERLOCKS  -- same but for user locks
00268  *     TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
00269  *                         (use to avoid output on system tables)
00270  *     TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
00271  *     DEBUG_DEADLOCKS  -- currently dumps locks at untimely occasions ;)
00272  *
00273  * Furthermore, but in storage/lmgr/lwlock.c:
00274  *     TRACE_LWLOCKS    -- trace lightweight locks (pretty useless)
00275  *
00276  * Define LOCK_DEBUG at compile time to get all these enabled.
00277  * --------
00278  */
00279 
00280 int         Trace_lock_oidmin = FirstNormalObjectId;
00281 bool        Trace_locks = false;
00282 bool        Trace_userlocks = false;
00283 int         Trace_lock_table = 0;
00284 bool        Debug_deadlocks = false;
00285 
00286 
00287 inline static bool
00288 LOCK_DEBUG_ENABLED(const LOCKTAG *tag)
00289 {
00290     return
00291         (*(LockMethods[tag->locktag_lockmethodid]->trace_flag) &&
00292          ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin))
00293         || (Trace_lock_table &&
00294             (tag->locktag_field2 == Trace_lock_table));
00295 }
00296 
00297 
00298 inline static void
00299 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
00300 {
00301     if (LOCK_DEBUG_ENABLED(&lock->tag))
00302         elog(LOG,
00303              "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
00304              "req(%d,%d,%d,%d,%d,%d,%d)=%d "
00305              "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
00306              where, lock,
00307              lock->tag.locktag_field1, lock->tag.locktag_field2,
00308              lock->tag.locktag_field3, lock->tag.locktag_field4,
00309              lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
00310              lock->grantMask,
00311              lock->requested[1], lock->requested[2], lock->requested[3],
00312              lock->requested[4], lock->requested[5], lock->requested[6],
00313              lock->requested[7], lock->nRequested,
00314              lock->granted[1], lock->granted[2], lock->granted[3],
00315              lock->granted[4], lock->granted[5], lock->granted[6],
00316              lock->granted[7], lock->nGranted,
00317              lock->waitProcs.size,
00318              LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
00319 }
00320 
00321 
00322 inline static void
00323 PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
00324 {
00325     if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag))
00326         elog(LOG,
00327              "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)",
00328              where, proclockP, proclockP->tag.myLock,
00329              PROCLOCK_LOCKMETHOD(*(proclockP)),
00330              proclockP->tag.myProc, (int) proclockP->holdMask);
00331 }
00332 #else                           /* not LOCK_DEBUG */
00333 
00334 #define LOCK_PRINT(where, lock, type)
00335 #define PROCLOCK_PRINT(where, proclockP)
00336 #endif   /* not LOCK_DEBUG */
00337 
00338 
00339 static uint32 proclock_hash(const void *key, Size keysize);
00340 static void RemoveLocalLock(LOCALLOCK *locallock);
00341 static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
00342                  const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode);
00343 static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
00344 static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
00345 static void FinishStrongLockAcquire(void);
00346 static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
00347 static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
00348 static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
00349 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
00350             PROCLOCK *proclock, LockMethod lockMethodTable);
00351 static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
00352             LockMethod lockMethodTable, uint32 hashcode,
00353             bool wakeupNeeded);
00354 static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
00355                      LOCKTAG *locktag, LOCKMODE lockmode,
00356                      bool decrement_strong_lock_count);
00357 
00358 
00359 /*
00360  * InitLocks -- Initialize the lock manager's data structures.
00361  *
00362  * This is called from CreateSharedMemoryAndSemaphores(), which see for
00363  * more comments.  In the normal postmaster case, the shared hash tables
00364  * are created here, as well as a locallock hash table that will remain
00365  * unused and empty in the postmaster itself.  Backends inherit the pointers
00366  * to the shared tables via fork(), and also inherit an image of the locallock
00367  * hash table, which they proceed to use.  In the EXEC_BACKEND case, each
00368  * backend re-executes this code to obtain pointers to the already existing
00369  * shared hash tables and to create its locallock hash table.
00370  */
00371 void
00372 InitLocks(void)
00373 {
00374     HASHCTL     info;
00375     int         hash_flags;
00376     long        init_table_size,
00377                 max_table_size;
00378     bool        found;
00379 
00380     /*
00381      * Compute init/max size to request for lock hashtables.  Note these
00382      * calculations must agree with LockShmemSize!
00383      */
00384     max_table_size = NLOCKENTS();
00385     init_table_size = max_table_size / 2;
00386 
00387     /*
00388      * Allocate hash table for LOCK structs.  This stores per-locked-object
00389      * information.
00390      */
00391     MemSet(&info, 0, sizeof(info));
00392     info.keysize = sizeof(LOCKTAG);
00393     info.entrysize = sizeof(LOCK);
00394     info.hash = tag_hash;
00395     info.num_partitions = NUM_LOCK_PARTITIONS;
00396     hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
00397 
00398     LockMethodLockHash = ShmemInitHash("LOCK hash",
00399                                        init_table_size,
00400                                        max_table_size,
00401                                        &info,
00402                                        hash_flags);
00403 
00404     /* Assume an average of 2 holders per lock */
00405     max_table_size *= 2;
00406     init_table_size *= 2;
00407 
00408     /*
00409      * Allocate hash table for PROCLOCK structs.  This stores
00410      * per-lock-per-holder information.
00411      */
00412     info.keysize = sizeof(PROCLOCKTAG);
00413     info.entrysize = sizeof(PROCLOCK);
00414     info.hash = proclock_hash;
00415     info.num_partitions = NUM_LOCK_PARTITIONS;
00416     hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
00417 
00418     LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash",
00419                                            init_table_size,
00420                                            max_table_size,
00421                                            &info,
00422                                            hash_flags);
00423 
00424     /*
00425      * Allocate fast-path structures.
00426      */
00427     FastPathStrongRelationLocks =
00428         ShmemInitStruct("Fast Path Strong Relation Lock Data",
00429                         sizeof(FastPathStrongRelationLockData), &found);
00430     if (!found)
00431         SpinLockInit(&FastPathStrongRelationLocks->mutex);
00432 
00433     /*
00434      * Allocate non-shared hash table for LOCALLOCK structs.  This stores lock
00435      * counts and resource owner information.
00436      *
00437      * The non-shared table could already exist in this process (this occurs
00438      * when the postmaster is recreating shared memory after a backend crash).
00439      * If so, delete and recreate it.  (We could simply leave it, since it
00440      * ought to be empty in the postmaster, but for safety let's zap it.)
00441      */
00442     if (LockMethodLocalHash)
00443         hash_destroy(LockMethodLocalHash);
00444 
00445     info.keysize = sizeof(LOCALLOCKTAG);
00446     info.entrysize = sizeof(LOCALLOCK);
00447     info.hash = tag_hash;
00448     hash_flags = (HASH_ELEM | HASH_FUNCTION);
00449 
00450     LockMethodLocalHash = hash_create("LOCALLOCK hash",
00451                                       16,
00452                                       &info,
00453                                       hash_flags);
00454 }
00455 
00456 
00457 /*
00458  * Fetch the lock method table associated with a given lock
00459  */
00460 LockMethod
00461 GetLocksMethodTable(const LOCK *lock)
00462 {
00463     LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
00464 
00465     Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
00466     return LockMethods[lockmethodid];
00467 }
00468 
00469 
00470 /*
00471  * Compute the hash code associated with a LOCKTAG.
00472  *
00473  * To avoid unnecessary recomputations of the hash code, we try to do this
00474  * just once per function, and then pass it around as needed.  Aside from
00475  * passing the hashcode to hash_search_with_hash_value(), we can extract
00476  * the lock partition number from the hashcode.
00477  */
00478 uint32
00479 LockTagHashCode(const LOCKTAG *locktag)
00480 {
00481     return get_hash_value(LockMethodLockHash, (const void *) locktag);
00482 }
00483 
00484 /*
00485  * Compute the hash code associated with a PROCLOCKTAG.
00486  *
00487  * Because we want to use just one set of partition locks for both the
00488  * LOCK and PROCLOCK hash tables, we have to make sure that PROCLOCKs
00489  * fall into the same partition number as their associated LOCKs.
00490  * dynahash.c expects the partition number to be the low-order bits of
00491  * the hash code, and therefore a PROCLOCKTAG's hash code must have the
00492  * same low-order bits as the associated LOCKTAG's hash code.  We achieve
00493  * this with this specialized hash function.
00494  */
00495 static uint32
00496 proclock_hash(const void *key, Size keysize)
00497 {
00498     const PROCLOCKTAG *proclocktag = (const PROCLOCKTAG *) key;
00499     uint32      lockhash;
00500     Datum       procptr;
00501 
00502     Assert(keysize == sizeof(PROCLOCKTAG));
00503 
00504     /* Look into the associated LOCK object, and compute its hash code */
00505     lockhash = LockTagHashCode(&proclocktag->myLock->tag);
00506 
00507     /*
00508      * To make the hash code also depend on the PGPROC, we xor the proc
00509      * struct's address into the hash code, left-shifted so that the
00510      * partition-number bits don't change.  Since this is only a hash, we
00511      * don't care if we lose high-order bits of the address; use an
00512      * intermediate variable to suppress cast-pointer-to-int warnings.
00513      */
00514     procptr = PointerGetDatum(proclocktag->myProc);
00515     lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
00516 
00517     return lockhash;
00518 }
00519 
00520 /*
00521  * Compute the hash code associated with a PROCLOCKTAG, given the hashcode
00522  * for its underlying LOCK.
00523  *
00524  * We use this just to avoid redundant calls of LockTagHashCode().
00525  */
00526 static inline uint32
00527 ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
00528 {
00529     uint32      lockhash = hashcode;
00530     Datum       procptr;
00531 
00532     /*
00533      * This must match proclock_hash()!
00534      */
00535     procptr = PointerGetDatum(proclocktag->myProc);
00536     lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
00537 
00538     return lockhash;
00539 }
00540 
00541 /*
00542  * Given two lock modes, return whether they would conflict.
00543  */
00544 bool
00545 DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2)
00546 {
00547     LockMethod  lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
00548 
00549     if (lockMethodTable->conflictTab[mode1] & LOCKBIT_ON(mode2))
00550         return true;
00551 
00552     return false;
00553 }
00554 
00555 /*
00556  * LockHasWaiters -- look up 'locktag' and check if releasing this
00557  *      lock would wake up other processes waiting for it.
00558  */
00559 bool
00560 LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
00561 {
00562     LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
00563     LockMethod  lockMethodTable;
00564     LOCALLOCKTAG localtag;
00565     LOCALLOCK  *locallock;
00566     LOCK       *lock;
00567     PROCLOCK   *proclock;
00568     LWLockId    partitionLock;
00569     bool        hasWaiters = false;
00570 
00571     if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
00572         elog(ERROR, "unrecognized lock method: %d", lockmethodid);
00573     lockMethodTable = LockMethods[lockmethodid];
00574     if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
00575         elog(ERROR, "unrecognized lock mode: %d", lockmode);
00576 
00577 #ifdef LOCK_DEBUG
00578     if (LOCK_DEBUG_ENABLED(locktag))
00579         elog(LOG, "LockHasWaiters: lock [%u,%u] %s",
00580              locktag->locktag_field1, locktag->locktag_field2,
00581              lockMethodTable->lockModeNames[lockmode]);
00582 #endif
00583 
00584     /*
00585      * Find the LOCALLOCK entry for this lock and lockmode
00586      */
00587     MemSet(&localtag, 0, sizeof(localtag));     /* must clear padding */
00588     localtag.lock = *locktag;
00589     localtag.mode = lockmode;
00590 
00591     locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
00592                                           (void *) &localtag,
00593                                           HASH_FIND, NULL);
00594 
00595     /*
00596      * let the caller print its own error message, too. Do not ereport(ERROR).
00597      */
00598     if (!locallock || locallock->nLocks <= 0)
00599     {
00600         elog(WARNING, "you don't own a lock of type %s",
00601              lockMethodTable->lockModeNames[lockmode]);
00602         return false;
00603     }
00604 
00605     /*
00606      * Check the shared lock table.
00607      */
00608     partitionLock = LockHashPartitionLock(locallock->hashcode);
00609 
00610     LWLockAcquire(partitionLock, LW_SHARED);
00611 
00612     /*
00613      * We don't need to re-find the lock or proclock, since we kept their
00614      * addresses in the locallock table, and they couldn't have been removed
00615      * while we were holding a lock on them.
00616      */
00617     lock = locallock->lock;
00618     LOCK_PRINT("LockHasWaiters: found", lock, lockmode);
00619     proclock = locallock->proclock;
00620     PROCLOCK_PRINT("LockHasWaiters: found", proclock);
00621 
00622     /*
00623      * Double-check that we are actually holding a lock of the type we want to
00624      * release.
00625      */
00626     if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
00627     {
00628         PROCLOCK_PRINT("LockHasWaiters: WRONGTYPE", proclock);
00629         LWLockRelease(partitionLock);
00630         elog(WARNING, "you don't own a lock of type %s",
00631              lockMethodTable->lockModeNames[lockmode]);
00632         RemoveLocalLock(locallock);
00633         return false;
00634     }
00635 
00636     /*
00637      * Do the checking.
00638      */
00639     if ((lockMethodTable->conflictTab[lockmode] & lock->waitMask) != 0)
00640         hasWaiters = true;
00641 
00642     LWLockRelease(partitionLock);
00643 
00644     return hasWaiters;
00645 }
00646 
00647 /*
00648  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
00649  *      set lock if/when no conflicts.
00650  *
00651  * Inputs:
00652  *  locktag: unique identifier for the lockable object
00653  *  lockmode: lock mode to acquire
00654  *  sessionLock: if true, acquire lock for session not current transaction
00655  *  dontWait: if true, don't wait to acquire lock
00656  *
00657  * Returns one of:
00658  *      LOCKACQUIRE_NOT_AVAIL       lock not available, and dontWait=true
00659  *      LOCKACQUIRE_OK              lock successfully acquired
00660  *      LOCKACQUIRE_ALREADY_HELD    incremented count for lock already held
00661  *
00662  * In the normal case where dontWait=false and the caller doesn't need to
00663  * distinguish a freshly acquired lock from one already taken earlier in
00664  * this same transaction, there is no need to examine the return value.
00665  *
00666  * Side Effects: The lock is acquired and recorded in lock tables.
00667  *
00668  * NOTE: if we wait for the lock, there is no way to abort the wait
00669  * short of aborting the transaction.
00670  */
00671 LockAcquireResult
00672 LockAcquire(const LOCKTAG *locktag,
00673             LOCKMODE lockmode,
00674             bool sessionLock,
00675             bool dontWait)
00676 {
00677     return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
00678 }
00679 
00680 /*
00681  * LockAcquireExtended - allows us to specify additional options
00682  *
00683  * reportMemoryError specifies whether a lock request that fills the
00684  * lock table should generate an ERROR or not. This allows a priority
00685  * caller to note that the lock table is full and then begin taking
00686  * extreme action to reduce the number of other lock holders before
00687  * retrying the action.
00688  */
00689 LockAcquireResult
00690 LockAcquireExtended(const LOCKTAG *locktag,
00691                     LOCKMODE lockmode,
00692                     bool sessionLock,
00693                     bool dontWait,
00694                     bool reportMemoryError)
00695 {
00696     LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
00697     LockMethod  lockMethodTable;
00698     LOCALLOCKTAG localtag;
00699     LOCALLOCK  *locallock;
00700     LOCK       *lock;
00701     PROCLOCK   *proclock;
00702     bool        found;
00703     ResourceOwner owner;
00704     uint32      hashcode;
00705     LWLockId    partitionLock;
00706     int         status;
00707     bool        log_lock = false;
00708 
00709     if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
00710         elog(ERROR, "unrecognized lock method: %d", lockmethodid);
00711     lockMethodTable = LockMethods[lockmethodid];
00712     if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
00713         elog(ERROR, "unrecognized lock mode: %d", lockmode);
00714 
00715     if (RecoveryInProgress() && !InRecovery &&
00716         (locktag->locktag_type == LOCKTAG_OBJECT ||
00717          locktag->locktag_type == LOCKTAG_RELATION) &&
00718         lockmode > RowExclusiveLock)
00719         ereport(ERROR,
00720                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
00721                  errmsg("cannot acquire lock mode %s on database objects while recovery is in progress",
00722                         lockMethodTable->lockModeNames[lockmode]),
00723                  errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery.")));
00724 
00725 #ifdef LOCK_DEBUG
00726     if (LOCK_DEBUG_ENABLED(locktag))
00727         elog(LOG, "LockAcquire: lock [%u,%u] %s",
00728              locktag->locktag_field1, locktag->locktag_field2,
00729              lockMethodTable->lockModeNames[lockmode]);
00730 #endif
00731 
00732     /* Identify owner for lock */
00733     if (sessionLock)
00734         owner = NULL;
00735     else
00736         owner = CurrentResourceOwner;
00737 
00738     /*
00739      * Find or create a LOCALLOCK entry for this lock and lockmode
00740      */
00741     MemSet(&localtag, 0, sizeof(localtag));     /* must clear padding */
00742     localtag.lock = *locktag;
00743     localtag.mode = lockmode;
00744 
00745     locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
00746                                           (void *) &localtag,
00747                                           HASH_ENTER, &found);
00748 
00749     /*
00750      * if it's a new locallock object, initialize it
00751      */
00752     if (!found)
00753     {
00754         locallock->lock = NULL;
00755         locallock->proclock = NULL;
00756         locallock->hashcode = LockTagHashCode(&(localtag.lock));
00757         locallock->nLocks = 0;
00758         locallock->numLockOwners = 0;
00759         locallock->maxLockOwners = 8;
00760         locallock->holdsStrongLockCount = FALSE;
00761         locallock->lockOwners = NULL;
00762         locallock->lockOwners = (LOCALLOCKOWNER *)
00763             MemoryContextAlloc(TopMemoryContext,
00764                           locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
00765     }
00766     else
00767     {
00768         /* Make sure there will be room to remember the lock */
00769         if (locallock->numLockOwners >= locallock->maxLockOwners)
00770         {
00771             int         newsize = locallock->maxLockOwners * 2;
00772 
00773             locallock->lockOwners = (LOCALLOCKOWNER *)
00774                 repalloc(locallock->lockOwners,
00775                          newsize * sizeof(LOCALLOCKOWNER));
00776             locallock->maxLockOwners = newsize;
00777         }
00778     }
00779     hashcode = locallock->hashcode;
00780 
00781     /*
00782      * If we already hold the lock, we can just increase the count locally.
00783      */
00784     if (locallock->nLocks > 0)
00785     {
00786         GrantLockLocal(locallock, owner);
00787         return LOCKACQUIRE_ALREADY_HELD;
00788     }
00789 
00790     /*
00791      * Emit a WAL record if acquisition of this lock needs to be replayed in a
00792      * standby server. Only AccessExclusiveLocks can conflict with lock types
00793      * that read-only transactions can acquire in a standby server.
00794      *
00795      * Make sure this definition matches the one in
00796      * GetRunningTransactionLocks().
00797      *
00798      * First we prepare to log, then after lock acquired we issue log record.
00799      */
00800     if (lockmode >= AccessExclusiveLock &&
00801         locktag->locktag_type == LOCKTAG_RELATION &&
00802         !RecoveryInProgress() &&
00803         XLogStandbyInfoActive())
00804     {
00805         LogAccessExclusiveLockPrepare();
00806         log_lock = true;
00807     }
00808 
00809     /*
00810      * Attempt to take lock via fast path, if eligible.  But if we remember
00811      * having filled up the fast path array, we don't attempt to make any
00812      * further use of it until we release some locks.  It's possible that some
00813      * other backend has transferred some of those locks to the shared hash
00814      * table, leaving space free, but it's not worth acquiring the LWLock just
00815      * to check.  It's also possible that we're acquiring a second or third
00816      * lock type on a relation we have already locked using the fast-path, but
00817      * for now we don't worry about that case either.
00818      */
00819     if (EligibleForRelationFastPath(locktag, lockmode)
00820         && FastPathLocalUseCount < FP_LOCK_SLOTS_PER_BACKEND)
00821     {
00822         uint32      fasthashcode = FastPathStrongLockHashPartition(hashcode);
00823         bool        acquired;
00824 
00825         /*
00826          * LWLockAcquire acts as a memory sequencing point, so it's safe to
00827          * assume that any strong locker whose increment to
00828          * FastPathStrongRelationLocks->counts becomes visible after we test
00829          * it has yet to begin to transfer fast-path locks.
00830          */
00831         LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
00832         if (FastPathStrongRelationLocks->count[fasthashcode] != 0)
00833             acquired = false;
00834         else
00835             acquired = FastPathGrantRelationLock(locktag->locktag_field2,
00836                                                  lockmode);
00837         LWLockRelease(MyProc->backendLock);
00838         if (acquired)
00839         {
00840             GrantLockLocal(locallock, owner);
00841             return LOCKACQUIRE_OK;
00842         }
00843     }
00844 
00845     /*
00846      * If this lock could potentially have been taken via the fast-path by
00847      * some other backend, we must (temporarily) disable further use of the
00848      * fast-path for this lock tag, and migrate any locks already taken via
00849      * this method to the main lock table.
00850      */
00851     if (ConflictsWithRelationFastPath(locktag, lockmode))
00852     {
00853         uint32      fasthashcode = FastPathStrongLockHashPartition(hashcode);
00854 
00855         BeginStrongLockAcquire(locallock, fasthashcode);
00856         if (!FastPathTransferRelationLocks(lockMethodTable, locktag,
00857                                            hashcode))
00858         {
00859             AbortStrongLockAcquire();
00860             if (reportMemoryError)
00861                 ereport(ERROR,
00862                         (errcode(ERRCODE_OUT_OF_MEMORY),
00863                          errmsg("out of shared memory"),
00864                          errhint("You might need to increase max_locks_per_transaction.")));
00865             else
00866                 return LOCKACQUIRE_NOT_AVAIL;
00867         }
00868     }
00869 
00870     /*
00871      * We didn't find the lock in our LOCALLOCK table, and we didn't manage to
00872      * take it via the fast-path, either, so we've got to mess with the shared
00873      * lock table.
00874      */
00875     partitionLock = LockHashPartitionLock(hashcode);
00876 
00877     LWLockAcquire(partitionLock, LW_EXCLUSIVE);
00878 
00879     /*
00880      * Find or create a proclock entry with this tag
00881      */
00882     proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
00883                                 hashcode, lockmode);
00884     if (!proclock)
00885     {
00886         AbortStrongLockAcquire();
00887         LWLockRelease(partitionLock);
00888         if (reportMemoryError)
00889             ereport(ERROR,
00890                     (errcode(ERRCODE_OUT_OF_MEMORY),
00891                      errmsg("out of shared memory"),
00892                      errhint("You might need to increase max_locks_per_transaction.")));
00893         else
00894             return LOCKACQUIRE_NOT_AVAIL;
00895     }
00896     locallock->proclock = proclock;
00897     lock = proclock->tag.myLock;
00898     locallock->lock = lock;
00899 
00900     /*
00901      * If lock requested conflicts with locks requested by waiters, must join
00902      * wait queue.  Otherwise, check for conflict with already-held locks.
00903      * (That's last because most complex check.)
00904      */
00905     if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
00906         status = STATUS_FOUND;
00907     else
00908         status = LockCheckConflicts(lockMethodTable, lockmode,
00909                                     lock, proclock, MyProc);
00910 
00911     if (status == STATUS_OK)
00912     {
00913         /* No conflict with held or previously requested locks */
00914         GrantLock(lock, proclock, lockmode);
00915         GrantLockLocal(locallock, owner);
00916     }
00917     else
00918     {
00919         Assert(status == STATUS_FOUND);
00920 
00921         /*
00922          * We can't acquire the lock immediately.  If caller specified no
00923          * blocking, remove useless table entries and return NOT_AVAIL without
00924          * waiting.
00925          */
00926         if (dontWait)
00927         {
00928             AbortStrongLockAcquire();
00929             if (proclock->holdMask == 0)
00930             {
00931                 uint32      proclock_hashcode;
00932 
00933                 proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
00934                 SHMQueueDelete(&proclock->lockLink);
00935                 SHMQueueDelete(&proclock->procLink);
00936                 if (!hash_search_with_hash_value(LockMethodProcLockHash,
00937                                                  (void *) &(proclock->tag),
00938                                                  proclock_hashcode,
00939                                                  HASH_REMOVE,
00940                                                  NULL))
00941                     elog(PANIC, "proclock table corrupted");
00942             }
00943             else
00944                 PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
00945             lock->nRequested--;
00946             lock->requested[lockmode]--;
00947             LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
00948             Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
00949             Assert(lock->nGranted <= lock->nRequested);
00950             LWLockRelease(partitionLock);
00951             if (locallock->nLocks == 0)
00952                 RemoveLocalLock(locallock);
00953             return LOCKACQUIRE_NOT_AVAIL;
00954         }
00955 
00956         /*
00957          * Set bitmask of locks this process already holds on this object.
00958          */
00959         MyProc->heldLocks = proclock->holdMask;
00960 
00961         /*
00962          * Sleep till someone wakes me up.
00963          */
00964 
00965         TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1,
00966                                          locktag->locktag_field2,
00967                                          locktag->locktag_field3,
00968                                          locktag->locktag_field4,
00969                                          locktag->locktag_type,
00970                                          lockmode);
00971 
00972         WaitOnLock(locallock, owner);
00973 
00974         TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
00975                                         locktag->locktag_field2,
00976                                         locktag->locktag_field3,
00977                                         locktag->locktag_field4,
00978                                         locktag->locktag_type,
00979                                         lockmode);
00980 
00981         /*
00982          * NOTE: do not do any material change of state between here and
00983          * return.  All required changes in locktable state must have been
00984          * done when the lock was granted to us --- see notes in WaitOnLock.
00985          */
00986 
00987         /*
00988          * Check the proclock entry status, in case something in the ipc
00989          * communication doesn't work correctly.
00990          */
00991         if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
00992         {
00993             AbortStrongLockAcquire();
00994             PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
00995             LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
00996             /* Should we retry ? */
00997             LWLockRelease(partitionLock);
00998             elog(ERROR, "LockAcquire failed");
00999         }
01000         PROCLOCK_PRINT("LockAcquire: granted", proclock);
01001         LOCK_PRINT("LockAcquire: granted", lock, lockmode);
01002     }
01003 
01004     /*
01005      * Lock state is fully up-to-date now; if we error out after this, no
01006      * special error cleanup is required.
01007      */
01008     FinishStrongLockAcquire();
01009 
01010     LWLockRelease(partitionLock);
01011 
01012     /*
01013      * Emit a WAL record if acquisition of this lock need to be replayed in a
01014      * standby server.
01015      */
01016     if (log_lock)
01017     {
01018         /*
01019          * Decode the locktag back to the original values, to avoid sending
01020          * lots of empty bytes with every message.  See lock.h to check how a
01021          * locktag is defined for LOCKTAG_RELATION
01022          */
01023         LogAccessExclusiveLock(locktag->locktag_field1,
01024                                locktag->locktag_field2);
01025     }
01026 
01027     return LOCKACQUIRE_OK;
01028 }
01029 
01030 /*
01031  * Find or create LOCK and PROCLOCK objects as needed for a new lock
01032  * request.
01033  *
01034  * Returns the PROCLOCK object, or NULL if we failed to create the objects
01035  * for lack of shared memory.
01036  *
01037  * The appropriate partition lock must be held at entry, and will be
01038  * held at exit.
01039  */
01040 static PROCLOCK *
01041 SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
01042                  const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode)
01043 {
01044     LOCK       *lock;
01045     PROCLOCK   *proclock;
01046     PROCLOCKTAG proclocktag;
01047     uint32      proclock_hashcode;
01048     bool        found;
01049 
01050     /*
01051      * Find or create a lock with this tag.
01052      *
01053      * Note: if the locallock object already existed, it might have a pointer
01054      * to the lock already ... but we probably should not assume that that
01055      * pointer is valid, since a lock object with no locks can go away
01056      * anytime.
01057      */
01058     lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
01059                                                 (const void *) locktag,
01060                                                 hashcode,
01061                                                 HASH_ENTER_NULL,
01062                                                 &found);
01063     if (!lock)
01064         return NULL;
01065 
01066     /*
01067      * if it's a new lock object, initialize it
01068      */
01069     if (!found)
01070     {
01071         lock->grantMask = 0;
01072         lock->waitMask = 0;
01073         SHMQueueInit(&(lock->procLocks));
01074         ProcQueueInit(&(lock->waitProcs));
01075         lock->nRequested = 0;
01076         lock->nGranted = 0;
01077         MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
01078         MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
01079         LOCK_PRINT("LockAcquire: new", lock, lockmode);
01080     }
01081     else
01082     {
01083         LOCK_PRINT("LockAcquire: found", lock, lockmode);
01084         Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
01085         Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
01086         Assert(lock->nGranted <= lock->nRequested);
01087     }
01088 
01089     /*
01090      * Create the hash key for the proclock table.
01091      */
01092     proclocktag.myLock = lock;
01093     proclocktag.myProc = proc;
01094 
01095     proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
01096 
01097     /*
01098      * Find or create a proclock entry with this tag
01099      */
01100     proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
01101                                                         (void *) &proclocktag,
01102                                                         proclock_hashcode,
01103                                                         HASH_ENTER_NULL,
01104                                                         &found);
01105     if (!proclock)
01106     {
01107         /* Ooops, not enough shmem for the proclock */
01108         if (lock->nRequested == 0)
01109         {
01110             /*
01111              * There are no other requestors of this lock, so garbage-collect
01112              * the lock object.  We *must* do this to avoid a permanent leak
01113              * of shared memory, because there won't be anything to cause
01114              * anyone to release the lock object later.
01115              */
01116             Assert(SHMQueueEmpty(&(lock->procLocks)));
01117             if (!hash_search_with_hash_value(LockMethodLockHash,
01118                                              (void *) &(lock->tag),
01119                                              hashcode,
01120                                              HASH_REMOVE,
01121                                              NULL))
01122                 elog(PANIC, "lock table corrupted");
01123         }
01124         return NULL;
01125     }
01126 
01127     /*
01128      * If new, initialize the new entry
01129      */
01130     if (!found)
01131     {
01132         uint32      partition = LockHashPartition(hashcode);
01133 
01134         proclock->holdMask = 0;
01135         proclock->releaseMask = 0;
01136         /* Add proclock to appropriate lists */
01137         SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
01138         SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
01139                              &proclock->procLink);
01140         PROCLOCK_PRINT("LockAcquire: new", proclock);
01141     }
01142     else
01143     {
01144         PROCLOCK_PRINT("LockAcquire: found", proclock);
01145         Assert((proclock->holdMask & ~lock->grantMask) == 0);
01146 
01147 #ifdef CHECK_DEADLOCK_RISK
01148 
01149         /*
01150          * Issue warning if we already hold a lower-level lock on this object
01151          * and do not hold a lock of the requested level or higher. This
01152          * indicates a deadlock-prone coding practice (eg, we'd have a
01153          * deadlock if another backend were following the same code path at
01154          * about the same time).
01155          *
01156          * This is not enabled by default, because it may generate log entries
01157          * about user-level coding practices that are in fact safe in context.
01158          * It can be enabled to help find system-level problems.
01159          *
01160          * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
01161          * better to use a table.  For now, though, this works.
01162          */
01163         {
01164             int         i;
01165 
01166             for (i = lockMethodTable->numLockModes; i > 0; i--)
01167             {
01168                 if (proclock->holdMask & LOCKBIT_ON(i))
01169                 {
01170                     if (i >= (int) lockmode)
01171                         break;  /* safe: we have a lock >= req level */
01172                     elog(LOG, "deadlock risk: raising lock level"
01173                          " from %s to %s on object %u/%u/%u",
01174                          lockMethodTable->lockModeNames[i],
01175                          lockMethodTable->lockModeNames[lockmode],
01176                          lock->tag.locktag_field1, lock->tag.locktag_field2,
01177                          lock->tag.locktag_field3);
01178                     break;
01179                 }
01180             }
01181         }
01182 #endif   /* CHECK_DEADLOCK_RISK */
01183     }
01184 
01185     /*
01186      * lock->nRequested and lock->requested[] count the total number of
01187      * requests, whether granted or waiting, so increment those immediately.
01188      * The other counts don't increment till we get the lock.
01189      */
01190     lock->nRequested++;
01191     lock->requested[lockmode]++;
01192     Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
01193 
01194     /*
01195      * We shouldn't already hold the desired lock; else locallock table is
01196      * broken.
01197      */
01198     if (proclock->holdMask & LOCKBIT_ON(lockmode))
01199         elog(ERROR, "lock %s on object %u/%u/%u is already held",
01200              lockMethodTable->lockModeNames[lockmode],
01201              lock->tag.locktag_field1, lock->tag.locktag_field2,
01202              lock->tag.locktag_field3);
01203 
01204     return proclock;
01205 }
01206 
01207 /*
01208  * Subroutine to free a locallock entry
01209  */
01210 static void
01211 RemoveLocalLock(LOCALLOCK *locallock)
01212 {
01213     int i;
01214 
01215     for (i = locallock->numLockOwners - 1; i >= 0; i--)
01216     {
01217         if (locallock->lockOwners[i].owner != NULL)
01218             ResourceOwnerForgetLock(locallock->lockOwners[i].owner, locallock);
01219     }
01220     pfree(locallock->lockOwners);
01221     locallock->lockOwners = NULL;
01222 
01223     if (locallock->holdsStrongLockCount)
01224     {
01225         uint32      fasthashcode;
01226 
01227         fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
01228 
01229         SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
01230         Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
01231         FastPathStrongRelationLocks->count[fasthashcode]--;
01232         locallock->holdsStrongLockCount = FALSE;
01233         SpinLockRelease(&FastPathStrongRelationLocks->mutex);
01234     }
01235 
01236     if (!hash_search(LockMethodLocalHash,
01237                      (void *) &(locallock->tag),
01238                      HASH_REMOVE, NULL))
01239         elog(WARNING, "locallock table corrupted");
01240 }
01241 
01242 /*
01243  * LockCheckConflicts -- test whether requested lock conflicts
01244  *      with those already granted
01245  *
01246  * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
01247  *
01248  * NOTES:
01249  *      Here's what makes this complicated: one process's locks don't
01250  * conflict with one another, no matter what purpose they are held for
01251  * (eg, session and transaction locks do not conflict).
01252  * So, we must subtract off our own locks when determining whether the
01253  * requested new lock conflicts with those already held.
01254  */
01255 int
01256 LockCheckConflicts(LockMethod lockMethodTable,
01257                    LOCKMODE lockmode,
01258                    LOCK *lock,
01259                    PROCLOCK *proclock,
01260                    PGPROC *proc)
01261 {
01262     int         numLockModes = lockMethodTable->numLockModes;
01263     LOCKMASK    myLocks;
01264     LOCKMASK    otherLocks;
01265     int         i;
01266 
01267     /*
01268      * first check for global conflicts: If no locks conflict with my request,
01269      * then I get the lock.
01270      *
01271      * Checking for conflict: lock->grantMask represents the types of
01272      * currently held locks.  conflictTable[lockmode] has a bit set for each
01273      * type of lock that conflicts with request.   Bitwise compare tells if
01274      * there is a conflict.
01275      */
01276     if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
01277     {
01278         PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
01279         return STATUS_OK;
01280     }
01281 
01282     /*
01283      * Rats.  Something conflicts.  But it could still be my own lock. We have
01284      * to construct a conflict mask that does not reflect our own locks, but
01285      * only lock types held by other processes.
01286      */
01287     myLocks = proclock->holdMask;
01288     otherLocks = 0;
01289     for (i = 1; i <= numLockModes; i++)
01290     {
01291         int         myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0;
01292 
01293         if (lock->granted[i] > myHolding)
01294             otherLocks |= LOCKBIT_ON(i);
01295     }
01296 
01297     /*
01298      * now check again for conflicts.  'otherLocks' describes the types of
01299      * locks held by other processes.  If one of these conflicts with the kind
01300      * of lock that I want, there is a conflict and I have to sleep.
01301      */
01302     if (!(lockMethodTable->conflictTab[lockmode] & otherLocks))
01303     {
01304         /* no conflict. OK to get the lock */
01305         PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
01306         return STATUS_OK;
01307     }
01308 
01309     PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock);
01310     return STATUS_FOUND;
01311 }
01312 
01313 /*
01314  * GrantLock -- update the lock and proclock data structures to show
01315  *      the lock request has been granted.
01316  *
01317  * NOTE: if proc was blocked, it also needs to be removed from the wait list
01318  * and have its waitLock/waitProcLock fields cleared.  That's not done here.
01319  *
01320  * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
01321  * table entry; but since we may be awaking some other process, we can't do
01322  * that here; it's done by GrantLockLocal, instead.
01323  */
01324 void
01325 GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
01326 {
01327     lock->nGranted++;
01328     lock->granted[lockmode]++;
01329     lock->grantMask |= LOCKBIT_ON(lockmode);
01330     if (lock->granted[lockmode] == lock->requested[lockmode])
01331         lock->waitMask &= LOCKBIT_OFF(lockmode);
01332     proclock->holdMask |= LOCKBIT_ON(lockmode);
01333     LOCK_PRINT("GrantLock", lock, lockmode);
01334     Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
01335     Assert(lock->nGranted <= lock->nRequested);
01336 }
01337 
01338 /*
01339  * UnGrantLock -- opposite of GrantLock.
01340  *
01341  * Updates the lock and proclock data structures to show that the lock
01342  * is no longer held nor requested by the current holder.
01343  *
01344  * Returns true if there were any waiters waiting on the lock that
01345  * should now be woken up with ProcLockWakeup.
01346  */
01347 static bool
01348 UnGrantLock(LOCK *lock, LOCKMODE lockmode,
01349             PROCLOCK *proclock, LockMethod lockMethodTable)
01350 {
01351     bool        wakeupNeeded = false;
01352 
01353     Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
01354     Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
01355     Assert(lock->nGranted <= lock->nRequested);
01356 
01357     /*
01358      * fix the general lock stats
01359      */
01360     lock->nRequested--;
01361     lock->requested[lockmode]--;
01362     lock->nGranted--;
01363     lock->granted[lockmode]--;
01364 
01365     if (lock->granted[lockmode] == 0)
01366     {
01367         /* change the conflict mask.  No more of this lock type. */
01368         lock->grantMask &= LOCKBIT_OFF(lockmode);
01369     }
01370 
01371     LOCK_PRINT("UnGrantLock: updated", lock, lockmode);
01372 
01373     /*
01374      * We need only run ProcLockWakeup if the released lock conflicts with at
01375      * least one of the lock types requested by waiter(s).  Otherwise whatever
01376      * conflict made them wait must still exist.  NOTE: before MVCC, we could
01377      * skip wakeup if lock->granted[lockmode] was still positive. But that's
01378      * not true anymore, because the remaining granted locks might belong to
01379      * some waiter, who could now be awakened because he doesn't conflict with
01380      * his own locks.
01381      */
01382     if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
01383         wakeupNeeded = true;
01384 
01385     /*
01386      * Now fix the per-proclock state.
01387      */
01388     proclock->holdMask &= LOCKBIT_OFF(lockmode);
01389     PROCLOCK_PRINT("UnGrantLock: updated", proclock);
01390 
01391     return wakeupNeeded;
01392 }
01393 
01394 /*
01395  * CleanUpLock -- clean up after releasing a lock.  We garbage-collect the
01396  * proclock and lock objects if possible, and call ProcLockWakeup if there
01397  * are remaining requests and the caller says it's OK.  (Normally, this
01398  * should be called after UnGrantLock, and wakeupNeeded is the result from
01399  * UnGrantLock.)
01400  *
01401  * The appropriate partition lock must be held at entry, and will be
01402  * held at exit.
01403  */
01404 static void
01405 CleanUpLock(LOCK *lock, PROCLOCK *proclock,
01406             LockMethod lockMethodTable, uint32 hashcode,
01407             bool wakeupNeeded)
01408 {
01409     /*
01410      * If this was my last hold on this lock, delete my entry in the proclock
01411      * table.
01412      */
01413     if (proclock->holdMask == 0)
01414     {
01415         uint32      proclock_hashcode;
01416 
01417         PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
01418         SHMQueueDelete(&proclock->lockLink);
01419         SHMQueueDelete(&proclock->procLink);
01420         proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
01421         if (!hash_search_with_hash_value(LockMethodProcLockHash,
01422                                          (void *) &(proclock->tag),
01423                                          proclock_hashcode,
01424                                          HASH_REMOVE,
01425                                          NULL))
01426             elog(PANIC, "proclock table corrupted");
01427     }
01428 
01429     if (lock->nRequested == 0)
01430     {
01431         /*
01432          * The caller just released the last lock, so garbage-collect the lock
01433          * object.
01434          */
01435         LOCK_PRINT("CleanUpLock: deleting", lock, 0);
01436         Assert(SHMQueueEmpty(&(lock->procLocks)));
01437         if (!hash_search_with_hash_value(LockMethodLockHash,
01438                                          (void *) &(lock->tag),
01439                                          hashcode,
01440                                          HASH_REMOVE,
01441                                          NULL))
01442             elog(PANIC, "lock table corrupted");
01443     }
01444     else if (wakeupNeeded)
01445     {
01446         /* There are waiters on this lock, so wake them up. */
01447         ProcLockWakeup(lockMethodTable, lock);
01448     }
01449 }
01450 
01451 /*
01452  * GrantLockLocal -- update the locallock data structures to show
01453  *      the lock request has been granted.
01454  *
01455  * We expect that LockAcquire made sure there is room to add a new
01456  * ResourceOwner entry.
01457  */
01458 static void
01459 GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
01460 {
01461     LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
01462     int         i;
01463 
01464     Assert(locallock->numLockOwners < locallock->maxLockOwners);
01465     /* Count the total */
01466     locallock->nLocks++;
01467     /* Count the per-owner lock */
01468     for (i = 0; i < locallock->numLockOwners; i++)
01469     {
01470         if (lockOwners[i].owner == owner)
01471         {
01472             lockOwners[i].nLocks++;
01473             return;
01474         }
01475     }
01476     lockOwners[i].owner = owner;
01477     lockOwners[i].nLocks = 1;
01478     locallock->numLockOwners++;
01479     if (owner != NULL)
01480         ResourceOwnerRememberLock(owner, locallock);
01481 }
01482 
01483 /*
01484  * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK,
01485  * and arrange for error cleanup if it fails
01486  */
01487 static void
01488 BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode)
01489 {
01490     Assert(StrongLockInProgress == NULL);
01491     Assert(locallock->holdsStrongLockCount == FALSE);
01492 
01493     /*
01494      * Adding to a memory location is not atomic, so we take a spinlock to
01495      * ensure we don't collide with someone else trying to bump the count at
01496      * the same time.
01497      *
01498      * XXX: It might be worth considering using an atomic fetch-and-add
01499      * instruction here, on architectures where that is supported.
01500      */
01501 
01502     SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
01503     FastPathStrongRelationLocks->count[fasthashcode]++;
01504     locallock->holdsStrongLockCount = TRUE;
01505     StrongLockInProgress = locallock;
01506     SpinLockRelease(&FastPathStrongRelationLocks->mutex);
01507 }
01508 
01509 /*
01510  * FinishStrongLockAcquire - cancel pending cleanup for a strong lock
01511  * acquisition once it's no longer needed
01512  */
01513 static void
01514 FinishStrongLockAcquire(void)
01515 {
01516     StrongLockInProgress = NULL;
01517 }
01518 
01519 /*
01520  * AbortStrongLockAcquire - undo strong lock state changes performed by
01521  * BeginStrongLockAcquire.
01522  */
01523 void
01524 AbortStrongLockAcquire(void)
01525 {
01526     uint32      fasthashcode;
01527     LOCALLOCK  *locallock = StrongLockInProgress;
01528 
01529     if (locallock == NULL)
01530         return;
01531 
01532     fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
01533     Assert(locallock->holdsStrongLockCount == TRUE);
01534     SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
01535     FastPathStrongRelationLocks->count[fasthashcode]--;
01536     locallock->holdsStrongLockCount = FALSE;
01537     StrongLockInProgress = NULL;
01538     SpinLockRelease(&FastPathStrongRelationLocks->mutex);
01539 }
01540 
01541 /*
01542  * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
01543  *      WaitOnLock on.
01544  *
01545  * proc.c needs this for the case where we are booted off the lock by
01546  * timeout, but discover that someone granted us the lock anyway.
01547  *
01548  * We could just export GrantLockLocal, but that would require including
01549  * resowner.h in lock.h, which creates circularity.
01550  */
01551 void
01552 GrantAwaitedLock(void)
01553 {
01554     GrantLockLocal(awaitedLock, awaitedOwner);
01555 }
01556 
01557 /*
01558  * WaitOnLock -- wait to acquire a lock
01559  *
01560  * Caller must have set MyProc->heldLocks to reflect locks already held
01561  * on the lockable object by this process.
01562  *
01563  * The appropriate partition lock must be held at entry.
01564  */
01565 static void
01566 WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
01567 {
01568     LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
01569     LockMethod  lockMethodTable = LockMethods[lockmethodid];
01570     char       *volatile new_status = NULL;
01571 
01572     LOCK_PRINT("WaitOnLock: sleeping on lock",
01573                locallock->lock, locallock->tag.mode);
01574 
01575     /* Report change to waiting status */
01576     if (update_process_title)
01577     {
01578         const char *old_status;
01579         int         len;
01580 
01581         old_status = get_ps_display(&len);
01582         new_status = (char *) palloc(len + 8 + 1);
01583         memcpy(new_status, old_status, len);
01584         strcpy(new_status + len, " waiting");
01585         set_ps_display(new_status, false);
01586         new_status[len] = '\0'; /* truncate off " waiting" */
01587     }
01588     pgstat_report_waiting(true);
01589 
01590     awaitedLock = locallock;
01591     awaitedOwner = owner;
01592 
01593     /*
01594      * NOTE: Think not to put any shared-state cleanup after the call to
01595      * ProcSleep, in either the normal or failure path.  The lock state must
01596      * be fully set by the lock grantor, or by CheckDeadLock if we give up
01597      * waiting for the lock.  This is necessary because of the possibility
01598      * that a cancel/die interrupt will interrupt ProcSleep after someone else
01599      * grants us the lock, but before we've noticed it. Hence, after granting,
01600      * the locktable state must fully reflect the fact that we own the lock;
01601      * we can't do additional work on return.
01602      *
01603      * We can and do use a PG_TRY block to try to clean up after failure, but
01604      * this still has a major limitation: elog(FATAL) can occur while waiting
01605      * (eg, a "die" interrupt), and then control won't come back here. So all
01606      * cleanup of essential state should happen in LockErrorCleanup, not here.
01607      * We can use PG_TRY to clear the "waiting" status flags, since doing that
01608      * is unimportant if the process exits.
01609      */
01610     PG_TRY();
01611     {
01612         if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
01613         {
01614             /*
01615              * We failed as a result of a deadlock, see CheckDeadLock(). Quit
01616              * now.
01617              */
01618             awaitedLock = NULL;
01619             LOCK_PRINT("WaitOnLock: aborting on lock",
01620                        locallock->lock, locallock->tag.mode);
01621             LWLockRelease(LockHashPartitionLock(locallock->hashcode));
01622 
01623             /*
01624              * Now that we aren't holding the partition lock, we can give an
01625              * error report including details about the detected deadlock.
01626              */
01627             DeadLockReport();
01628             /* not reached */
01629         }
01630     }
01631     PG_CATCH();
01632     {
01633         /* In this path, awaitedLock remains set until LockErrorCleanup */
01634 
01635         /* Report change to non-waiting status */
01636         pgstat_report_waiting(false);
01637         if (update_process_title)
01638         {
01639             set_ps_display(new_status, false);
01640             pfree(new_status);
01641         }
01642 
01643         /* and propagate the error */
01644         PG_RE_THROW();
01645     }
01646     PG_END_TRY();
01647 
01648     awaitedLock = NULL;
01649 
01650     /* Report change to non-waiting status */
01651     pgstat_report_waiting(false);
01652     if (update_process_title)
01653     {
01654         set_ps_display(new_status, false);
01655         pfree(new_status);
01656     }
01657 
01658     LOCK_PRINT("WaitOnLock: wakeup on lock",
01659                locallock->lock, locallock->tag.mode);
01660 }
01661 
01662 /*
01663  * Remove a proc from the wait-queue it is on (caller must know it is on one).
01664  * This is only used when the proc has failed to get the lock, so we set its
01665  * waitStatus to STATUS_ERROR.
01666  *
01667  * Appropriate partition lock must be held by caller.  Also, caller is
01668  * responsible for signaling the proc if needed.
01669  *
01670  * NB: this does not clean up any locallock object that may exist for the lock.
01671  */
01672 void
01673 RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
01674 {
01675     LOCK       *waitLock = proc->waitLock;
01676     PROCLOCK   *proclock = proc->waitProcLock;
01677     LOCKMODE    lockmode = proc->waitLockMode;
01678     LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
01679 
01680     /* Make sure proc is waiting */
01681     Assert(proc->waitStatus == STATUS_WAITING);
01682     Assert(proc->links.next != NULL);
01683     Assert(waitLock);
01684     Assert(waitLock->waitProcs.size > 0);
01685     Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
01686 
01687     /* Remove proc from lock's wait queue */
01688     SHMQueueDelete(&(proc->links));
01689     waitLock->waitProcs.size--;
01690 
01691     /* Undo increments of request counts by waiting process */
01692     Assert(waitLock->nRequested > 0);
01693     Assert(waitLock->nRequested > proc->waitLock->nGranted);
01694     waitLock->nRequested--;
01695     Assert(waitLock->requested[lockmode] > 0);
01696     waitLock->requested[lockmode]--;
01697     /* don't forget to clear waitMask bit if appropriate */
01698     if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
01699         waitLock->waitMask &= LOCKBIT_OFF(lockmode);
01700 
01701     /* Clean up the proc's own state, and pass it the ok/fail signal */
01702     proc->waitLock = NULL;
01703     proc->waitProcLock = NULL;
01704     proc->waitStatus = STATUS_ERROR;
01705 
01706     /*
01707      * Delete the proclock immediately if it represents no already-held locks.
01708      * (This must happen now because if the owner of the lock decides to
01709      * release it, and the requested/granted counts then go to zero,
01710      * LockRelease expects there to be no remaining proclocks.) Then see if
01711      * any other waiters for the lock can be woken up now.
01712      */
01713     CleanUpLock(waitLock, proclock,
01714                 LockMethods[lockmethodid], hashcode,
01715                 true);
01716 }
01717 
01718 /*
01719  * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it.
01720  *      Release a session lock if 'sessionLock' is true, else release a
01721  *      regular transaction lock.
01722  *
01723  * Side Effects: find any waiting processes that are now wakable,
01724  *      grant them their requested locks and awaken them.
01725  *      (We have to grant the lock here to avoid a race between
01726  *      the waking process and any new process to
01727  *      come along and request the lock.)
01728  */
01729 bool
01730 LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
01731 {
01732     LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
01733     LockMethod  lockMethodTable;
01734     LOCALLOCKTAG localtag;
01735     LOCALLOCK  *locallock;
01736     LOCK       *lock;
01737     PROCLOCK   *proclock;
01738     LWLockId    partitionLock;
01739     bool        wakeupNeeded;
01740 
01741     if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
01742         elog(ERROR, "unrecognized lock method: %d", lockmethodid);
01743     lockMethodTable = LockMethods[lockmethodid];
01744     if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
01745         elog(ERROR, "unrecognized lock mode: %d", lockmode);
01746 
01747 #ifdef LOCK_DEBUG
01748     if (LOCK_DEBUG_ENABLED(locktag))
01749         elog(LOG, "LockRelease: lock [%u,%u] %s",
01750              locktag->locktag_field1, locktag->locktag_field2,
01751              lockMethodTable->lockModeNames[lockmode]);
01752 #endif
01753 
01754     /*
01755      * Find the LOCALLOCK entry for this lock and lockmode
01756      */
01757     MemSet(&localtag, 0, sizeof(localtag));     /* must clear padding */
01758     localtag.lock = *locktag;
01759     localtag.mode = lockmode;
01760 
01761     locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
01762                                           (void *) &localtag,
01763                                           HASH_FIND, NULL);
01764 
01765     /*
01766      * let the caller print its own error message, too. Do not ereport(ERROR).
01767      */
01768     if (!locallock || locallock->nLocks <= 0)
01769     {
01770         elog(WARNING, "you don't own a lock of type %s",
01771              lockMethodTable->lockModeNames[lockmode]);
01772         return FALSE;
01773     }
01774 
01775     /*
01776      * Decrease the count for the resource owner.
01777      */
01778     {
01779         LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
01780         ResourceOwner owner;
01781         int         i;
01782 
01783         /* Identify owner for lock */
01784         if (sessionLock)
01785             owner = NULL;
01786         else
01787             owner = CurrentResourceOwner;
01788 
01789         for (i = locallock->numLockOwners - 1; i >= 0; i--)
01790         {
01791             if (lockOwners[i].owner == owner)
01792             {
01793                 Assert(lockOwners[i].nLocks > 0);
01794                 if (--lockOwners[i].nLocks == 0)
01795                 {
01796                     if (owner != NULL)
01797                         ResourceOwnerForgetLock(owner, locallock);
01798                     /* compact out unused slot */
01799                     locallock->numLockOwners--;
01800                     if (i < locallock->numLockOwners)
01801                         lockOwners[i] = lockOwners[locallock->numLockOwners];
01802                 }
01803                 break;
01804             }
01805         }
01806         if (i < 0)
01807         {
01808             /* don't release a lock belonging to another owner */
01809             elog(WARNING, "you don't own a lock of type %s",
01810                  lockMethodTable->lockModeNames[lockmode]);
01811             return FALSE;
01812         }
01813     }
01814 
01815     /*
01816      * Decrease the total local count.  If we're still holding the lock, we're
01817      * done.
01818      */
01819     locallock->nLocks--;
01820 
01821     if (locallock->nLocks > 0)
01822         return TRUE;
01823 
01824     /* Attempt fast release of any lock eligible for the fast path. */
01825     if (EligibleForRelationFastPath(locktag, lockmode)
01826         && FastPathLocalUseCount > 0)
01827     {
01828         bool        released;
01829 
01830         /*
01831          * We might not find the lock here, even if we originally entered it
01832          * here.  Another backend may have moved it to the main table.
01833          */
01834         LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
01835         released = FastPathUnGrantRelationLock(locktag->locktag_field2,
01836                                                lockmode);
01837         LWLockRelease(MyProc->backendLock);
01838         if (released)
01839         {
01840             RemoveLocalLock(locallock);
01841             return TRUE;
01842         }
01843     }
01844 
01845     /*
01846      * Otherwise we've got to mess with the shared lock table.
01847      */
01848     partitionLock = LockHashPartitionLock(locallock->hashcode);
01849 
01850     LWLockAcquire(partitionLock, LW_EXCLUSIVE);
01851 
01852     /*
01853      * Normally, we don't need to re-find the lock or proclock, since we kept
01854      * their addresses in the locallock table, and they couldn't have been
01855      * removed while we were holding a lock on them.  But it's possible that
01856      * the locks have been moved to the main hash table by another backend, in
01857      * which case we might need to go look them up after all.
01858      */
01859     lock = locallock->lock;
01860     if (!lock)
01861     {
01862         PROCLOCKTAG proclocktag;
01863         bool        found;
01864 
01865         Assert(EligibleForRelationFastPath(locktag, lockmode));
01866         lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
01867                                                     (const void *) locktag,
01868                                                     locallock->hashcode,
01869                                                     HASH_FIND,
01870                                                     &found);
01871         Assert(found && lock != NULL);
01872         locallock->lock = lock;
01873 
01874         proclocktag.myLock = lock;
01875         proclocktag.myProc = MyProc;
01876         locallock->proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
01877                                                        (void *) &proclocktag,
01878                                                        HASH_FIND, &found);
01879         Assert(found);
01880     }
01881     LOCK_PRINT("LockRelease: found", lock, lockmode);
01882     proclock = locallock->proclock;
01883     PROCLOCK_PRINT("LockRelease: found", proclock);
01884 
01885     /*
01886      * Double-check that we are actually holding a lock of the type we want to
01887      * release.
01888      */
01889     if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
01890     {
01891         PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
01892         LWLockRelease(partitionLock);
01893         elog(WARNING, "you don't own a lock of type %s",
01894              lockMethodTable->lockModeNames[lockmode]);
01895         RemoveLocalLock(locallock);
01896         return FALSE;
01897     }
01898 
01899     /*
01900      * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
01901      */
01902     wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
01903 
01904     CleanUpLock(lock, proclock,
01905                 lockMethodTable, locallock->hashcode,
01906                 wakeupNeeded);
01907 
01908     LWLockRelease(partitionLock);
01909 
01910     RemoveLocalLock(locallock);
01911     return TRUE;
01912 }
01913 
01914 /*
01915  * LockReleaseAll -- Release all locks of the specified lock method that
01916  *      are held by the current process.
01917  *
01918  * Well, not necessarily *all* locks.  The available behaviors are:
01919  *      allLocks == true: release all locks including session locks.
01920  *      allLocks == false: release all non-session locks.
01921  */
01922 void
01923 LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
01924 {
01925     HASH_SEQ_STATUS status;
01926     LockMethod  lockMethodTable;
01927     int         i,
01928                 numLockModes;
01929     LOCALLOCK  *locallock;
01930     LOCK       *lock;
01931     PROCLOCK   *proclock;
01932     int         partition;
01933     bool        have_fast_path_lwlock = false;
01934 
01935     if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
01936         elog(ERROR, "unrecognized lock method: %d", lockmethodid);
01937     lockMethodTable = LockMethods[lockmethodid];
01938 
01939 #ifdef LOCK_DEBUG
01940     if (*(lockMethodTable->trace_flag))
01941         elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
01942 #endif
01943 
01944     /*
01945      * Get rid of our fast-path VXID lock, if appropriate.  Note that this is
01946      * the only way that the lock we hold on our own VXID can ever get
01947      * released: it is always and only released when a toplevel transaction
01948      * ends.
01949      */
01950     if (lockmethodid == DEFAULT_LOCKMETHOD)
01951         VirtualXactLockTableCleanup();
01952 
01953     numLockModes = lockMethodTable->numLockModes;
01954 
01955     /*
01956      * First we run through the locallock table and get rid of unwanted
01957      * entries, then we scan the process's proclocks and get rid of those. We
01958      * do this separately because we may have multiple locallock entries
01959      * pointing to the same proclock, and we daren't end up with any dangling
01960      * pointers.
01961      */
01962     hash_seq_init(&status, LockMethodLocalHash);
01963 
01964     while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
01965     {
01966         /*
01967          * If the LOCALLOCK entry is unused, we must've run out of shared
01968          * memory while trying to set up this lock.  Just forget the local
01969          * entry.
01970          */
01971         if (locallock->nLocks == 0)
01972         {
01973             RemoveLocalLock(locallock);
01974             continue;
01975         }
01976 
01977         /* Ignore items that are not of the lockmethod to be removed */
01978         if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
01979             continue;
01980 
01981         /*
01982          * If we are asked to release all locks, we can just zap the entry.
01983          * Otherwise, must scan to see if there are session locks. We assume
01984          * there is at most one lockOwners entry for session locks.
01985          */
01986         if (!allLocks)
01987         {
01988             LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
01989 
01990             /* If session lock is above array position 0, move it down to 0 */
01991             for (i = 0; i < locallock->numLockOwners ; i++)
01992             {
01993                 if (lockOwners[i].owner == NULL)
01994                     lockOwners[0] = lockOwners[i];
01995                 else
01996                     ResourceOwnerForgetLock(lockOwners[i].owner, locallock);
01997             }
01998 
01999             if (locallock->numLockOwners > 0 &&
02000                 lockOwners[0].owner == NULL &&
02001                 lockOwners[0].nLocks > 0)
02002             {
02003                 /* Fix the locallock to show just the session locks */
02004                 locallock->nLocks = lockOwners[0].nLocks;
02005                 locallock->numLockOwners = 1;
02006                 /* We aren't deleting this locallock, so done */
02007                 continue;
02008             }
02009             else
02010                 locallock->numLockOwners = 0;
02011         }
02012 
02013         /*
02014          * If the lock or proclock pointers are NULL, this lock was taken via
02015          * the relation fast-path.
02016          */
02017         if (locallock->proclock == NULL || locallock->lock == NULL)
02018         {
02019             LOCKMODE    lockmode = locallock->tag.mode;
02020             Oid         relid;
02021 
02022             /* Verify that a fast-path lock is what we've got. */
02023             if (!EligibleForRelationFastPath(&locallock->tag.lock, lockmode))
02024                 elog(PANIC, "locallock table corrupted");
02025 
02026             /*
02027              * If we don't currently hold the LWLock that protects our
02028              * fast-path data structures, we must acquire it before attempting
02029              * to release the lock via the fast-path.
02030              */
02031             if (!have_fast_path_lwlock)
02032             {
02033                 LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
02034                 have_fast_path_lwlock = true;
02035             }
02036 
02037             /* Attempt fast-path release. */
02038             relid = locallock->tag.lock.locktag_field2;
02039             if (FastPathUnGrantRelationLock(relid, lockmode))
02040             {
02041                 RemoveLocalLock(locallock);
02042                 continue;
02043             }
02044 
02045             /*
02046              * Our lock, originally taken via the fast path, has been
02047              * transferred to the main lock table.  That's going to require
02048              * some extra work, so release our fast-path lock before starting.
02049              */
02050             LWLockRelease(MyProc->backendLock);
02051             have_fast_path_lwlock = false;
02052 
02053             /*
02054              * Now dump the lock.  We haven't got a pointer to the LOCK or
02055              * PROCLOCK in this case, so we have to handle this a bit
02056              * differently than a normal lock release.  Unfortunately, this
02057              * requires an extra LWLock acquire-and-release cycle on the
02058              * partitionLock, but hopefully it shouldn't happen often.
02059              */
02060             LockRefindAndRelease(lockMethodTable, MyProc,
02061                                  &locallock->tag.lock, lockmode, false);
02062             RemoveLocalLock(locallock);
02063             continue;
02064         }
02065 
02066         /* Mark the proclock to show we need to release this lockmode */
02067         if (locallock->nLocks > 0)
02068             locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
02069 
02070         /* And remove the locallock hashtable entry */
02071         RemoveLocalLock(locallock);
02072     }
02073 
02074     if (have_fast_path_lwlock)
02075         LWLockRelease(MyProc->backendLock);
02076 
02077     /*
02078      * Now, scan each lock partition separately.
02079      */
02080     for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
02081     {
02082         LWLockId    partitionLock = FirstLockMgrLock + partition;
02083         SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
02084 
02085         proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
02086                                              offsetof(PROCLOCK, procLink));
02087 
02088         if (!proclock)
02089             continue;           /* needn't examine this partition */
02090 
02091         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
02092 
02093         while (proclock)
02094         {
02095             bool        wakeupNeeded = false;
02096             PROCLOCK   *nextplock;
02097 
02098             /* Get link first, since we may unlink/delete this proclock */
02099             nextplock = (PROCLOCK *)
02100                 SHMQueueNext(procLocks, &proclock->procLink,
02101                              offsetof(PROCLOCK, procLink));
02102 
02103             Assert(proclock->tag.myProc == MyProc);
02104 
02105             lock = proclock->tag.myLock;
02106 
02107             /* Ignore items that are not of the lockmethod to be removed */
02108             if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
02109                 goto next_item;
02110 
02111             /*
02112              * In allLocks mode, force release of all locks even if locallock
02113              * table had problems
02114              */
02115             if (allLocks)
02116                 proclock->releaseMask = proclock->holdMask;
02117             else
02118                 Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
02119 
02120             /*
02121              * Ignore items that have nothing to be released, unless they have
02122              * holdMask == 0 and are therefore recyclable
02123              */
02124             if (proclock->releaseMask == 0 && proclock->holdMask != 0)
02125                 goto next_item;
02126 
02127             PROCLOCK_PRINT("LockReleaseAll", proclock);
02128             LOCK_PRINT("LockReleaseAll", lock, 0);
02129             Assert(lock->nRequested >= 0);
02130             Assert(lock->nGranted >= 0);
02131             Assert(lock->nGranted <= lock->nRequested);
02132             Assert((proclock->holdMask & ~lock->grantMask) == 0);
02133 
02134             /*
02135              * Release the previously-marked lock modes
02136              */
02137             for (i = 1; i <= numLockModes; i++)
02138             {
02139                 if (proclock->releaseMask & LOCKBIT_ON(i))
02140                     wakeupNeeded |= UnGrantLock(lock, i, proclock,
02141                                                 lockMethodTable);
02142             }
02143             Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
02144             Assert(lock->nGranted <= lock->nRequested);
02145             LOCK_PRINT("LockReleaseAll: updated", lock, 0);
02146 
02147             proclock->releaseMask = 0;
02148 
02149             /* CleanUpLock will wake up waiters if needed. */
02150             CleanUpLock(lock, proclock,
02151                         lockMethodTable,
02152                         LockTagHashCode(&lock->tag),
02153                         wakeupNeeded);
02154 
02155     next_item:
02156             proclock = nextplock;
02157         }                       /* loop over PROCLOCKs within this partition */
02158 
02159         LWLockRelease(partitionLock);
02160     }                           /* loop over partitions */
02161 
02162 #ifdef LOCK_DEBUG
02163     if (*(lockMethodTable->trace_flag))
02164         elog(LOG, "LockReleaseAll done");
02165 #endif
02166 }
02167 
02168 /*
02169  * LockReleaseSession -- Release all session locks of the specified lock method
02170  *      that are held by the current process.
02171  */
02172 void
02173 LockReleaseSession(LOCKMETHODID lockmethodid)
02174 {
02175     HASH_SEQ_STATUS status;
02176     LOCALLOCK  *locallock;
02177 
02178     if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
02179         elog(ERROR, "unrecognized lock method: %d", lockmethodid);
02180 
02181     hash_seq_init(&status, LockMethodLocalHash);
02182 
02183     while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
02184     {
02185         /* Ignore items that are not of the specified lock method */
02186         if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
02187             continue;
02188 
02189         ReleaseLockIfHeld(locallock, true);
02190     }
02191 }
02192 
02193 /*
02194  * LockReleaseCurrentOwner
02195  *      Release all locks belonging to CurrentResourceOwner
02196  *
02197  * If the caller knows what those locks are, it can pass them as an array.
02198  * That speeds up the call significantly, when a lot of locks are held.
02199  * Otherwise, pass NULL for locallocks, and we'll traverse through our hash
02200  * table to find them.
02201  */
02202 void
02203 LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
02204 {
02205     if (locallocks == NULL)
02206     {
02207         HASH_SEQ_STATUS status;
02208         LOCALLOCK  *locallock;
02209 
02210         hash_seq_init(&status, LockMethodLocalHash);
02211 
02212         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
02213             ReleaseLockIfHeld(locallock, false);
02214     }
02215     else
02216     {
02217         int i;
02218 
02219         for (i = nlocks - 1; i >= 0; i--)
02220             ReleaseLockIfHeld(locallocks[i], false);
02221     }
02222 }
02223 
02224 /*
02225  * ReleaseLockIfHeld
02226  *      Release any session-level locks on this lockable object if sessionLock
02227  *      is true; else, release any locks held by CurrentResourceOwner.
02228  *
02229  * It is tempting to pass this a ResourceOwner pointer (or NULL for session
02230  * locks), but without refactoring LockRelease() we cannot support releasing
02231  * locks belonging to resource owners other than CurrentResourceOwner.
02232  * If we were to refactor, it'd be a good idea to fix it so we don't have to
02233  * do a hashtable lookup of the locallock, too.  However, currently this
02234  * function isn't used heavily enough to justify refactoring for its
02235  * convenience.
02236  */
02237 static void
02238 ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
02239 {
02240     ResourceOwner owner;
02241     LOCALLOCKOWNER *lockOwners;
02242     int         i;
02243 
02244     /* Identify owner for lock (must match LockRelease!) */
02245     if (sessionLock)
02246         owner = NULL;
02247     else
02248         owner = CurrentResourceOwner;
02249 
02250     /* Scan to see if there are any locks belonging to the target owner */
02251     lockOwners = locallock->lockOwners;
02252     for (i = locallock->numLockOwners - 1; i >= 0; i--)
02253     {
02254         if (lockOwners[i].owner == owner)
02255         {
02256             Assert(lockOwners[i].nLocks > 0);
02257             if (lockOwners[i].nLocks < locallock->nLocks)
02258             {
02259                 /*
02260                  * We will still hold this lock after forgetting this
02261                  * ResourceOwner.
02262                  */
02263                 locallock->nLocks -= lockOwners[i].nLocks;
02264                 /* compact out unused slot */
02265                 locallock->numLockOwners--;
02266                 if (owner != NULL)
02267                     ResourceOwnerForgetLock(owner, locallock);
02268                 if (i < locallock->numLockOwners)
02269                     lockOwners[i] = lockOwners[locallock->numLockOwners];
02270             }
02271             else
02272             {
02273                 Assert(lockOwners[i].nLocks == locallock->nLocks);
02274                 /* We want to call LockRelease just once */
02275                 lockOwners[i].nLocks = 1;
02276                 locallock->nLocks = 1;
02277                 if (!LockRelease(&locallock->tag.lock,
02278                                  locallock->tag.mode,
02279                                  sessionLock))
02280                     elog(WARNING, "ReleaseLockIfHeld: failed??");
02281             }
02282             break;
02283         }
02284     }
02285 }
02286 
02287 /*
02288  * LockReassignCurrentOwner
02289  *      Reassign all locks belonging to CurrentResourceOwner to belong
02290  *      to its parent resource owner.
02291  *
02292  * If the caller knows what those locks are, it can pass them as an array.
02293  * That speeds up the call significantly, when a lot of locks are held
02294  * (e.g pg_dump with a large schema).  Otherwise, pass NULL for locallocks,
02295  * and we'll traverse through our hash table to find them.
02296  */
02297 void
02298 LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks)
02299 {
02300     ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
02301 
02302     Assert(parent != NULL);
02303 
02304     if (locallocks == NULL)
02305     {
02306         HASH_SEQ_STATUS status;
02307         LOCALLOCK  *locallock;
02308 
02309         hash_seq_init(&status, LockMethodLocalHash);
02310 
02311         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
02312             LockReassignOwner(locallock, parent);
02313     }
02314     else
02315     {
02316         int i;
02317 
02318         for (i = nlocks - 1; i >= 0; i--)
02319             LockReassignOwner(locallocks[i], parent);
02320     }
02321 }
02322 
02323 /*
02324  * Subroutine of LockReassignCurrentOwner. Reassigns a given lock belonging to
02325  * CurrentResourceOwner to its parent.
02326  */
02327 static void
02328 LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent)
02329 {
02330     LOCALLOCKOWNER *lockOwners;
02331     int         i;
02332     int         ic = -1;
02333     int         ip = -1;
02334 
02335     /*
02336      * Scan to see if there are any locks belonging to current owner or
02337      * its parent
02338      */
02339     lockOwners = locallock->lockOwners;
02340     for (i = locallock->numLockOwners - 1; i >= 0; i--)
02341     {
02342         if (lockOwners[i].owner == CurrentResourceOwner)
02343             ic = i;
02344         else if (lockOwners[i].owner == parent)
02345             ip = i;
02346     }
02347 
02348     if (ic < 0)
02349         return;         /* no current locks */
02350 
02351     if (ip < 0)
02352     {
02353         /* Parent has no slot, so just give it the child's slot */
02354         lockOwners[ic].owner = parent;
02355         ResourceOwnerRememberLock(parent, locallock);
02356     }
02357     else
02358     {
02359         /* Merge child's count with parent's */
02360         lockOwners[ip].nLocks += lockOwners[ic].nLocks;
02361         /* compact out unused slot */
02362         locallock->numLockOwners--;
02363         if (ic < locallock->numLockOwners)
02364             lockOwners[ic] = lockOwners[locallock->numLockOwners];
02365     }
02366     ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
02367 }
02368 
02369 /*
02370  * FastPathGrantRelationLock
02371  *      Grant lock using per-backend fast-path array, if there is space.
02372  */
02373 static bool
02374 FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode)
02375 {
02376     uint32      f;
02377     uint32      unused_slot = FP_LOCK_SLOTS_PER_BACKEND;
02378 
02379     /* Scan for existing entry for this relid, remembering empty slot. */
02380     for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
02381     {
02382         if (FAST_PATH_GET_BITS(MyProc, f) == 0)
02383             unused_slot = f;
02384         else if (MyProc->fpRelId[f] == relid)
02385         {
02386             Assert(!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode));
02387             FAST_PATH_SET_LOCKMODE(MyProc, f, lockmode);
02388             return true;
02389         }
02390     }
02391 
02392     /* If no existing entry, use any empty slot. */
02393     if (unused_slot < FP_LOCK_SLOTS_PER_BACKEND)
02394     {
02395         MyProc->fpRelId[unused_slot] = relid;
02396         FAST_PATH_SET_LOCKMODE(MyProc, unused_slot, lockmode);
02397         ++FastPathLocalUseCount;
02398         return true;
02399     }
02400 
02401     /* No existing entry, and no empty slot. */
02402     return false;
02403 }
02404 
02405 /*
02406  * FastPathUnGrantRelationLock
02407  *      Release fast-path lock, if present.  Update backend-private local
02408  *      use count, while we're at it.
02409  */
02410 static bool
02411 FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode)
02412 {
02413     uint32      f;
02414     bool        result = false;
02415 
02416     FastPathLocalUseCount = 0;
02417     for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
02418     {
02419         if (MyProc->fpRelId[f] == relid
02420             && FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
02421         {
02422             Assert(!result);
02423             FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
02424             result = true;
02425         }
02426         if (FAST_PATH_GET_BITS(MyProc, f) != 0)
02427             ++FastPathLocalUseCount;
02428     }
02429     return result;
02430 }
02431 
02432 /*
02433  * FastPathTransferRelationLocks
02434  *      Transfer locks matching the given lock tag from per-backend fast-path
02435  *      arrays to the shared hash table.
02436  *
02437  * Returns true if successful, false if ran out of shared memory.
02438  */
02439 static bool
02440 FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag,
02441                               uint32 hashcode)
02442 {
02443     LWLockId    partitionLock = LockHashPartitionLock(hashcode);
02444     Oid         relid = locktag->locktag_field2;
02445     uint32      i;
02446 
02447     /*
02448      * Every PGPROC that can potentially hold a fast-path lock is present in
02449      * ProcGlobal->allProcs.  Prepared transactions are not, but any
02450      * outstanding fast-path locks held by prepared transactions are
02451      * transferred to the main lock table.
02452      */
02453     for (i = 0; i < ProcGlobal->allProcCount; i++)
02454     {
02455         PGPROC     *proc = &ProcGlobal->allProcs[i];
02456         uint32      f;
02457 
02458         LWLockAcquire(proc->backendLock, LW_EXCLUSIVE);
02459 
02460         /*
02461          * If the target backend isn't referencing the same database as the
02462          * lock, then we needn't examine the individual relation IDs at all;
02463          * none of them can be relevant.
02464          *
02465          * proc->databaseId is set at backend startup time and never changes
02466          * thereafter, so it might be safe to perform this test before
02467          * acquiring proc->backendLock.  In particular, it's certainly safe to
02468          * assume that if the target backend holds any fast-path locks, it
02469          * must have performed a memory-fencing operation (in particular, an
02470          * LWLock acquisition) since setting proc->databaseId.  However, it's
02471          * less clear that our backend is certain to have performed a memory
02472          * fencing operation since the other backend set proc->databaseId.  So
02473          * for now, we test it after acquiring the LWLock just to be safe.
02474          */
02475         if (proc->databaseId != locktag->locktag_field1)
02476         {
02477             LWLockRelease(proc->backendLock);
02478             continue;
02479         }
02480 
02481         for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
02482         {
02483             uint32      lockmode;
02484 
02485             /* Look for an allocated slot matching the given relid. */
02486             if (relid != proc->fpRelId[f] || FAST_PATH_GET_BITS(proc, f) == 0)
02487                 continue;
02488 
02489             /* Find or create lock object. */
02490             LWLockAcquire(partitionLock, LW_EXCLUSIVE);
02491             for (lockmode = FAST_PATH_LOCKNUMBER_OFFSET;
02492             lockmode < FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT;
02493                  ++lockmode)
02494             {
02495                 PROCLOCK   *proclock;
02496 
02497                 if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode))
02498                     continue;
02499                 proclock = SetupLockInTable(lockMethodTable, proc, locktag,
02500                                             hashcode, lockmode);
02501                 if (!proclock)
02502                 {
02503                     LWLockRelease(partitionLock);
02504                     return false;
02505                 }
02506                 GrantLock(proclock->tag.myLock, proclock, lockmode);
02507                 FAST_PATH_CLEAR_LOCKMODE(proc, f, lockmode);
02508             }
02509             LWLockRelease(partitionLock);
02510         }
02511         LWLockRelease(proc->backendLock);
02512     }
02513     return true;
02514 }
02515 
02516 /*
02517  * FastPathGetLockEntry
02518  *      Return the PROCLOCK for a lock originally taken via the fast-path,
02519  *      transferring it to the primary lock table if necessary.
02520  */
02521 static PROCLOCK *
02522 FastPathGetRelationLockEntry(LOCALLOCK *locallock)
02523 {
02524     LockMethod  lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
02525     LOCKTAG    *locktag = &locallock->tag.lock;
02526     PROCLOCK   *proclock = NULL;
02527     LWLockId    partitionLock = LockHashPartitionLock(locallock->hashcode);
02528     Oid         relid = locktag->locktag_field2;
02529     uint32      f;
02530 
02531     LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
02532 
02533     for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
02534     {
02535         uint32      lockmode;
02536 
02537         /* Look for an allocated slot matching the given relid. */
02538         if (relid != MyProc->fpRelId[f] || FAST_PATH_GET_BITS(MyProc, f) == 0)
02539             continue;
02540 
02541         /* If we don't have a lock of the given mode, forget it! */
02542         lockmode = locallock->tag.mode;
02543         if (!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
02544             break;
02545 
02546         /* Find or create lock object. */
02547         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
02548 
02549         proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
02550                                     locallock->hashcode, lockmode);
02551         if (!proclock)
02552         {
02553             LWLockRelease(partitionLock);
02554             ereport(ERROR,
02555                     (errcode(ERRCODE_OUT_OF_MEMORY),
02556                      errmsg("out of shared memory"),
02557                      errhint("You might need to increase max_locks_per_transaction.")));
02558         }
02559         GrantLock(proclock->tag.myLock, proclock, lockmode);
02560         FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
02561 
02562         LWLockRelease(partitionLock);
02563     }
02564 
02565     LWLockRelease(MyProc->backendLock);
02566 
02567     /* Lock may have already been transferred by some other backend. */
02568     if (proclock == NULL)
02569     {
02570         LOCK       *lock;
02571         PROCLOCKTAG proclocktag;
02572         uint32      proclock_hashcode;
02573 
02574         LWLockAcquire(partitionLock, LW_SHARED);
02575 
02576         lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
02577                                                     (void *) locktag,
02578                                                     locallock->hashcode,
02579                                                     HASH_FIND,
02580                                                     NULL);
02581         if (!lock)
02582             elog(ERROR, "failed to re-find shared lock object");
02583 
02584         proclocktag.myLock = lock;
02585         proclocktag.myProc = MyProc;
02586 
02587         proclock_hashcode = ProcLockHashCode(&proclocktag, locallock->hashcode);
02588         proclock = (PROCLOCK *)
02589             hash_search_with_hash_value(LockMethodProcLockHash,
02590                                         (void *) &proclocktag,
02591                                         proclock_hashcode,
02592                                         HASH_FIND,
02593                                         NULL);
02594         if (!proclock)
02595             elog(ERROR, "failed to re-find shared proclock object");
02596         LWLockRelease(partitionLock);
02597     }
02598 
02599     return proclock;
02600 }
02601 
02602 /*
02603  * GetLockConflicts
02604  *      Get an array of VirtualTransactionIds of xacts currently holding locks
02605  *      that would conflict with the specified lock/lockmode.
02606  *      xacts merely awaiting such a lock are NOT reported.
02607  *
02608  * The result array is palloc'd and is terminated with an invalid VXID.
02609  *
02610  * Of course, the result could be out of date by the time it's returned,
02611  * so use of this function has to be thought about carefully.
02612  *
02613  * Note we never include the current xact's vxid in the result array,
02614  * since an xact never blocks itself.  Also, prepared transactions are
02615  * ignored, which is a bit more debatable but is appropriate for current
02616  * uses of the result.
02617  */
02618 VirtualTransactionId *
02619 GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
02620 {
02621     static VirtualTransactionId *vxids;
02622     LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
02623     LockMethod  lockMethodTable;
02624     LOCK       *lock;
02625     LOCKMASK    conflictMask;
02626     SHM_QUEUE  *procLocks;
02627     PROCLOCK   *proclock;
02628     uint32      hashcode;
02629     LWLockId    partitionLock;
02630     int         count = 0;
02631     int         fast_count = 0;
02632 
02633     if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
02634         elog(ERROR, "unrecognized lock method: %d", lockmethodid);
02635     lockMethodTable = LockMethods[lockmethodid];
02636     if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
02637         elog(ERROR, "unrecognized lock mode: %d", lockmode);
02638 
02639     /*
02640      * Allocate memory to store results, and fill with InvalidVXID.  We only
02641      * need enough space for MaxBackends + a terminator, since prepared xacts
02642      * don't count. InHotStandby allocate once in TopMemoryContext.
02643      */
02644     if (InHotStandby)
02645     {
02646         if (vxids == NULL)
02647             vxids = (VirtualTransactionId *)
02648                 MemoryContextAlloc(TopMemoryContext,
02649                            sizeof(VirtualTransactionId) * (MaxBackends + 1));
02650     }
02651     else
02652         vxids = (VirtualTransactionId *)
02653             palloc0(sizeof(VirtualTransactionId) * (MaxBackends + 1));
02654 
02655     /* Compute hash code and partiton lock, and look up conflicting modes. */
02656     hashcode = LockTagHashCode(locktag);
02657     partitionLock = LockHashPartitionLock(hashcode);
02658     conflictMask = lockMethodTable->conflictTab[lockmode];
02659 
02660     /*
02661      * Fast path locks might not have been entered in the primary lock table.
02662      * If the lock we're dealing with could conflict with such a lock, we must
02663      * examine each backend's fast-path array for conflicts.
02664      */
02665     if (ConflictsWithRelationFastPath(locktag, lockmode))
02666     {
02667         int         i;
02668         Oid         relid = locktag->locktag_field2;
02669         VirtualTransactionId vxid;
02670 
02671         /*
02672          * Iterate over relevant PGPROCs.  Anything held by a prepared
02673          * transaction will have been transferred to the primary lock table,
02674          * so we need not worry about those.  This is all a bit fuzzy, because
02675          * new locks could be taken after we've visited a particular
02676          * partition, but the callers had better be prepared to deal with that
02677          * anyway, since the locks could equally well be taken between the
02678          * time we return the value and the time the caller does something
02679          * with it.
02680          */
02681         for (i = 0; i < ProcGlobal->allProcCount; i++)
02682         {
02683             PGPROC     *proc = &ProcGlobal->allProcs[i];
02684             uint32      f;
02685 
02686             /* A backend never blocks itself */
02687             if (proc == MyProc)
02688                 continue;
02689 
02690             LWLockAcquire(proc->backendLock, LW_SHARED);
02691 
02692             /*
02693              * If the target backend isn't referencing the same database as the
02694              * lock, then we needn't examine the individual relation IDs at
02695              * all; none of them can be relevant.
02696              *
02697              * See FastPathTransferLocks() for discussion of why we do this
02698              * test after acquiring the lock.
02699              */
02700             if (proc->databaseId != locktag->locktag_field1)
02701             {
02702                 LWLockRelease(proc->backendLock);
02703                 continue;
02704             }
02705 
02706             for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
02707             {
02708                 uint32      lockmask;
02709 
02710                 /* Look for an allocated slot matching the given relid. */
02711                 if (relid != proc->fpRelId[f])
02712                     continue;
02713                 lockmask = FAST_PATH_GET_BITS(proc, f);
02714                 if (!lockmask)
02715                     continue;
02716                 lockmask <<= FAST_PATH_LOCKNUMBER_OFFSET;
02717 
02718                 /*
02719                  * There can only be one entry per relation, so if we found it
02720                  * and it doesn't conflict, we can skip the rest of the slots.
02721                  */
02722                 if ((lockmask & conflictMask) == 0)
02723                     break;
02724 
02725                 /* Conflict! */
02726                 GET_VXID_FROM_PGPROC(vxid, *proc);
02727 
02728                 /*
02729                  * If we see an invalid VXID, then either the xact has already
02730                  * committed (or aborted), or it's a prepared xact.  In either
02731                  * case we may ignore it.
02732                  */
02733                 if (VirtualTransactionIdIsValid(vxid))
02734                     vxids[count++] = vxid;
02735                 break;
02736             }
02737 
02738             LWLockRelease(proc->backendLock);
02739         }
02740     }
02741 
02742     /* Remember how many fast-path conflicts we found. */
02743     fast_count = count;
02744 
02745     /*
02746      * Look up the lock object matching the tag.
02747      */
02748     LWLockAcquire(partitionLock, LW_SHARED);
02749 
02750     lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
02751                                                 (const void *) locktag,
02752                                                 hashcode,
02753                                                 HASH_FIND,
02754                                                 NULL);
02755     if (!lock)
02756     {
02757         /*
02758          * If the lock object doesn't exist, there is nothing holding a lock
02759          * on this lockable object.
02760          */
02761         LWLockRelease(partitionLock);
02762         return vxids;
02763     }
02764 
02765     /*
02766      * Examine each existing holder (or awaiter) of the lock.
02767      */
02768 
02769     procLocks = &(lock->procLocks);
02770 
02771     proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
02772                                          offsetof(PROCLOCK, lockLink));
02773 
02774     while (proclock)
02775     {
02776         if (conflictMask & proclock->holdMask)
02777         {
02778             PGPROC     *proc = proclock->tag.myProc;
02779 
02780             /* A backend never blocks itself */
02781             if (proc != MyProc)
02782             {
02783                 VirtualTransactionId vxid;
02784 
02785                 GET_VXID_FROM_PGPROC(vxid, *proc);
02786 
02787                 /*
02788                  * If we see an invalid VXID, then either the xact has already
02789                  * committed (or aborted), or it's a prepared xact.  In either
02790                  * case we may ignore it.
02791                  */
02792                 if (VirtualTransactionIdIsValid(vxid))
02793                 {
02794                     int         i;
02795 
02796                     /* Avoid duplicate entries. */
02797                     for (i = 0; i < fast_count; ++i)
02798                         if (VirtualTransactionIdEquals(vxids[i], vxid))
02799                             break;
02800                     if (i >= fast_count)
02801                         vxids[count++] = vxid;
02802                 }
02803             }
02804         }
02805 
02806         proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
02807                                              offsetof(PROCLOCK, lockLink));
02808     }
02809 
02810     LWLockRelease(partitionLock);
02811 
02812     if (count > MaxBackends)    /* should never happen */
02813         elog(PANIC, "too many conflicting locks found");
02814 
02815     return vxids;
02816 }
02817 
02818 /*
02819  * Find a lock in the shared lock table and release it.  It is the caller's
02820  * responsibility to verify that this is a sane thing to do.  (For example, it
02821  * would be bad to release a lock here if there might still be a LOCALLOCK
02822  * object with pointers to it.)
02823  *
02824  * We currently use this in two situations: first, to release locks held by
02825  * prepared transactions on commit (see lock_twophase_postcommit); and second,
02826  * to release locks taken via the fast-path, transferred to the main hash
02827  * table, and then released (see LockReleaseAll).
02828  */
02829 static void
02830 LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
02831                      LOCKTAG *locktag, LOCKMODE lockmode,
02832                      bool decrement_strong_lock_count)
02833 {
02834     LOCK       *lock;
02835     PROCLOCK   *proclock;
02836     PROCLOCKTAG proclocktag;
02837     uint32      hashcode;
02838     uint32      proclock_hashcode;
02839     LWLockId    partitionLock;
02840     bool        wakeupNeeded;
02841 
02842     hashcode = LockTagHashCode(locktag);
02843     partitionLock = LockHashPartitionLock(hashcode);
02844 
02845     LWLockAcquire(partitionLock, LW_EXCLUSIVE);
02846 
02847     /*
02848      * Re-find the lock object (it had better be there).
02849      */
02850     lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
02851                                                 (void *) locktag,
02852                                                 hashcode,
02853                                                 HASH_FIND,
02854                                                 NULL);
02855     if (!lock)
02856         elog(PANIC, "failed to re-find shared lock object");
02857 
02858     /*
02859      * Re-find the proclock object (ditto).
02860      */
02861     proclocktag.myLock = lock;
02862     proclocktag.myProc = proc;
02863 
02864     proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
02865 
02866     proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
02867                                                         (void *) &proclocktag,
02868                                                         proclock_hashcode,
02869                                                         HASH_FIND,
02870                                                         NULL);
02871     if (!proclock)
02872         elog(PANIC, "failed to re-find shared proclock object");
02873 
02874     /*
02875      * Double-check that we are actually holding a lock of the type we want to
02876      * release.
02877      */
02878     if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
02879     {
02880         PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
02881         LWLockRelease(partitionLock);
02882         elog(WARNING, "you don't own a lock of type %s",
02883              lockMethodTable->lockModeNames[lockmode]);
02884         return;
02885     }
02886 
02887     /*
02888      * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
02889      */
02890     wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
02891 
02892     CleanUpLock(lock, proclock,
02893                 lockMethodTable, hashcode,
02894                 wakeupNeeded);
02895 
02896     LWLockRelease(partitionLock);
02897 
02898     /*
02899      * Decrement strong lock count.  This logic is needed only for 2PC.
02900      */
02901     if (decrement_strong_lock_count
02902         && ConflictsWithRelationFastPath(&lock->tag, lockmode))
02903     {
02904         uint32      fasthashcode = FastPathStrongLockHashPartition(hashcode);
02905 
02906         SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
02907         FastPathStrongRelationLocks->count[fasthashcode]--;
02908         SpinLockRelease(&FastPathStrongRelationLocks->mutex);
02909     }
02910 }
02911 
02912 /*
02913  * AtPrepare_Locks
02914  *      Do the preparatory work for a PREPARE: make 2PC state file records
02915  *      for all locks currently held.
02916  *
02917  * Session-level locks are ignored, as are VXID locks.
02918  *
02919  * There are some special cases that we error out on: we can't be holding any
02920  * locks at both session and transaction level (since we must either keep or
02921  * give away the PROCLOCK object), and we can't be holding any locks on
02922  * temporary objects (since that would mess up the current backend if it tries
02923  * to exit before the prepared xact is committed).
02924  */
02925 void
02926 AtPrepare_Locks(void)
02927 {
02928     HASH_SEQ_STATUS status;
02929     LOCALLOCK  *locallock;
02930 
02931     /*
02932      * For the most part, we don't need to touch shared memory for this ---
02933      * all the necessary state information is in the locallock table.
02934      * Fast-path locks are an exception, however: we move any such locks to
02935      * the main table before allowing PREPARE TRANSACTION to succeed.
02936      */
02937     hash_seq_init(&status, LockMethodLocalHash);
02938 
02939     while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
02940     {
02941         TwoPhaseLockRecord record;
02942         LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
02943         bool        haveSessionLock;
02944         bool        haveXactLock;
02945         int         i;
02946 
02947         /*
02948          * Ignore VXID locks.  We don't want those to be held by prepared
02949          * transactions, since they aren't meaningful after a restart.
02950          */
02951         if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
02952             continue;
02953 
02954         /* Ignore it if we don't actually hold the lock */
02955         if (locallock->nLocks <= 0)
02956             continue;
02957 
02958         /* Scan to see whether we hold it at session or transaction level */
02959         haveSessionLock = haveXactLock = false;
02960         for (i = locallock->numLockOwners - 1; i >= 0; i--)
02961         {
02962             if (lockOwners[i].owner == NULL)
02963                 haveSessionLock = true;
02964             else
02965                 haveXactLock = true;
02966         }
02967 
02968         /* Ignore it if we have only session lock */
02969         if (!haveXactLock)
02970             continue;
02971 
02972         /*
02973          * If we have both session- and transaction-level locks, fail.  This
02974          * should never happen with regular locks, since we only take those at
02975          * session level in some special operations like VACUUM.  It's
02976          * possible to hit this with advisory locks, though.
02977          *
02978          * It would be nice if we could keep the session hold and give away
02979          * the transactional hold to the prepared xact.  However, that would
02980          * require two PROCLOCK objects, and we cannot be sure that another
02981          * PROCLOCK will be available when it comes time for PostPrepare_Locks
02982          * to do the deed.  So for now, we error out while we can still do so
02983          * safely.
02984          */
02985         if (haveSessionLock)
02986             ereport(ERROR,
02987                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
02988                      errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
02989 
02990         /*
02991          * If the local lock was taken via the fast-path, we need to move it
02992          * to the primary lock table, or just get a pointer to the existing
02993          * primary lock table entry if by chance it's already been
02994          * transferred.
02995          */
02996         if (locallock->proclock == NULL)
02997         {
02998             locallock->proclock = FastPathGetRelationLockEntry(locallock);
02999             locallock->lock = locallock->proclock->tag.myLock;
03000         }
03001 
03002         /*
03003          * Arrange to not release any strong lock count held by this lock
03004          * entry.  We must retain the count until the prepared transaction is
03005          * committed or rolled back.
03006          */
03007         locallock->holdsStrongLockCount = FALSE;
03008 
03009         /*
03010          * Create a 2PC record.
03011          */
03012         memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
03013         record.lockmode = locallock->tag.mode;
03014 
03015         RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
03016                                &record, sizeof(TwoPhaseLockRecord));
03017     }
03018 }
03019 
03020 /*
03021  * PostPrepare_Locks
03022  *      Clean up after successful PREPARE
03023  *
03024  * Here, we want to transfer ownership of our locks to a dummy PGPROC
03025  * that's now associated with the prepared transaction, and we want to
03026  * clean out the corresponding entries in the LOCALLOCK table.
03027  *
03028  * Note: by removing the LOCALLOCK entries, we are leaving dangling
03029  * pointers in the transaction's resource owner.  This is OK at the
03030  * moment since resowner.c doesn't try to free locks retail at a toplevel
03031  * transaction commit or abort.  We could alternatively zero out nLocks
03032  * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
03033  * but that probably costs more cycles.
03034  */
03035 void
03036 PostPrepare_Locks(TransactionId xid)
03037 {
03038     PGPROC     *newproc = TwoPhaseGetDummyProc(xid);
03039     HASH_SEQ_STATUS status;
03040     LOCALLOCK  *locallock;
03041     LOCK       *lock;
03042     PROCLOCK   *proclock;
03043     PROCLOCKTAG proclocktag;
03044     int         partition;
03045 
03046     /* This is a critical section: any error means big trouble */
03047     START_CRIT_SECTION();
03048 
03049     /*
03050      * First we run through the locallock table and get rid of unwanted
03051      * entries, then we scan the process's proclocks and transfer them to the
03052      * target proc.
03053      *
03054      * We do this separately because we may have multiple locallock entries
03055      * pointing to the same proclock, and we daren't end up with any dangling
03056      * pointers.
03057      */
03058     hash_seq_init(&status, LockMethodLocalHash);
03059 
03060     while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
03061     {
03062         LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
03063         bool        haveSessionLock;
03064         bool        haveXactLock;
03065         int         i;
03066 
03067         if (locallock->proclock == NULL || locallock->lock == NULL)
03068         {
03069             /*
03070              * We must've run out of shared memory while trying to set up this
03071              * lock.  Just forget the local entry.
03072              */
03073             Assert(locallock->nLocks == 0);
03074             RemoveLocalLock(locallock);
03075             continue;
03076         }
03077 
03078         /* Ignore VXID locks */
03079         if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
03080             continue;
03081 
03082         /* Scan to see whether we hold it at session or transaction level */
03083         haveSessionLock = haveXactLock = false;
03084         for (i = locallock->numLockOwners - 1; i >= 0; i--)
03085         {
03086             if (lockOwners[i].owner == NULL)
03087                 haveSessionLock = true;
03088             else
03089                 haveXactLock = true;
03090         }
03091 
03092         /* Ignore it if we have only session lock */
03093         if (!haveXactLock)
03094             continue;
03095 
03096         /* This can't happen, because we already checked it */
03097         if (haveSessionLock)
03098             ereport(PANIC,
03099                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
03100                      errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
03101 
03102         /* Mark the proclock to show we need to release this lockmode */
03103         if (locallock->nLocks > 0)
03104             locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
03105 
03106         /* And remove the locallock hashtable entry */
03107         RemoveLocalLock(locallock);
03108     }
03109 
03110     /*
03111      * Now, scan each lock partition separately.
03112      */
03113     for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
03114     {
03115         LWLockId    partitionLock = FirstLockMgrLock + partition;
03116         SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
03117 
03118         proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
03119                                              offsetof(PROCLOCK, procLink));
03120 
03121         if (!proclock)
03122             continue;           /* needn't examine this partition */
03123 
03124         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
03125 
03126         while (proclock)
03127         {
03128             PROCLOCK   *nextplock;
03129 
03130             /* Get link first, since we may unlink/relink this proclock */
03131             nextplock = (PROCLOCK *)
03132                 SHMQueueNext(procLocks, &proclock->procLink,
03133                              offsetof(PROCLOCK, procLink));
03134 
03135             Assert(proclock->tag.myProc == MyProc);
03136 
03137             lock = proclock->tag.myLock;
03138 
03139             /* Ignore VXID locks */
03140             if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
03141                 goto next_item;
03142 
03143             PROCLOCK_PRINT("PostPrepare_Locks", proclock);
03144             LOCK_PRINT("PostPrepare_Locks", lock, 0);
03145             Assert(lock->nRequested >= 0);
03146             Assert(lock->nGranted >= 0);
03147             Assert(lock->nGranted <= lock->nRequested);
03148             Assert((proclock->holdMask & ~lock->grantMask) == 0);
03149 
03150             /* Ignore it if nothing to release (must be a session lock) */
03151             if (proclock->releaseMask == 0)
03152                 goto next_item;
03153 
03154             /* Else we should be releasing all locks */
03155             if (proclock->releaseMask != proclock->holdMask)
03156                 elog(PANIC, "we seem to have dropped a bit somewhere");
03157 
03158             /*
03159              * We cannot simply modify proclock->tag.myProc to reassign
03160              * ownership of the lock, because that's part of the hash key and
03161              * the proclock would then be in the wrong hash chain.  Instead
03162              * use hash_update_hash_key.  (We used to create a new hash entry,
03163              * but that risks out-of-memory failure if other processes are
03164              * busy making proclocks too.)  We must unlink the proclock from
03165              * our procLink chain and put it into the new proc's chain, too.
03166              *
03167              * Note: the updated proclock hash key will still belong to the
03168              * same hash partition, cf proclock_hash().  So the partition
03169              * lock we already hold is sufficient for this.
03170              */
03171             SHMQueueDelete(&proclock->procLink);
03172 
03173             /*
03174              * Create the new hash key for the proclock.
03175              */
03176             proclocktag.myLock = lock;
03177             proclocktag.myProc = newproc;
03178 
03179             /*
03180              * Update the proclock.  We should not find any existing entry
03181              * for the same hash key, since there can be only one entry for
03182              * any given lock with my own proc.
03183              */
03184             if (!hash_update_hash_key(LockMethodProcLockHash,
03185                                       (void *) proclock,
03186                                       (void *) &proclocktag))
03187                 elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks");
03188 
03189             /* Re-link into the new proc's proclock list */
03190             SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
03191                                  &proclock->procLink);
03192 
03193             PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock);
03194 
03195     next_item:
03196             proclock = nextplock;
03197         }                       /* loop over PROCLOCKs within this partition */
03198 
03199         LWLockRelease(partitionLock);
03200     }                           /* loop over partitions */
03201 
03202     END_CRIT_SECTION();
03203 }
03204 
03205 
03206 /*
03207  * Estimate shared-memory space used for lock tables
03208  */
03209 Size
03210 LockShmemSize(void)
03211 {
03212     Size        size = 0;
03213     long        max_table_size;
03214 
03215     /* lock hash table */
03216     max_table_size = NLOCKENTS();
03217     size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK)));
03218 
03219     /* proclock hash table */
03220     max_table_size *= 2;
03221     size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK)));
03222 
03223     /*
03224      * Since NLOCKENTS is only an estimate, add 10% safety margin.
03225      */
03226     size = add_size(size, size / 10);
03227 
03228     return size;
03229 }
03230 
03231 /*
03232  * GetLockStatusData - Return a summary of the lock manager's internal
03233  * status, for use in a user-level reporting function.
03234  *
03235  * The return data consists of an array of PROCLOCK objects, with the
03236  * associated PGPROC and LOCK objects for each.  Note that multiple
03237  * copies of the same PGPROC and/or LOCK objects are likely to appear.
03238  * It is the caller's responsibility to match up duplicates if wanted.
03239  *
03240  * The design goal is to hold the LWLocks for as short a time as possible;
03241  * thus, this function simply makes a copy of the necessary data and releases
03242  * the locks, allowing the caller to contemplate and format the data for as
03243  * long as it pleases.
03244  */
03245 LockData *
03246 GetLockStatusData(void)
03247 {
03248     LockData   *data;
03249     PROCLOCK   *proclock;
03250     HASH_SEQ_STATUS seqstat;
03251     int         els;
03252     int         el;
03253     int         i;
03254 
03255     data = (LockData *) palloc(sizeof(LockData));
03256 
03257     /* Guess how much space we'll need. */
03258     els = MaxBackends;
03259     el = 0;
03260     data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * els);
03261 
03262     /*
03263      * First, we iterate through the per-backend fast-path arrays, locking
03264      * them one at a time.  This might produce an inconsistent picture of the
03265      * system state, but taking all of those LWLocks at the same time seems
03266      * impractical (in particular, note MAX_SIMUL_LWLOCKS).  It shouldn't
03267      * matter too much, because none of these locks can be involved in lock
03268      * conflicts anyway - anything that might must be present in the main lock
03269      * table.
03270      */
03271     for (i = 0; i < ProcGlobal->allProcCount; ++i)
03272     {
03273         PGPROC     *proc = &ProcGlobal->allProcs[i];
03274         uint32      f;
03275 
03276         LWLockAcquire(proc->backendLock, LW_SHARED);
03277 
03278         for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; ++f)
03279         {
03280             LockInstanceData *instance;
03281             uint32      lockbits = FAST_PATH_GET_BITS(proc, f);
03282 
03283             /* Skip unallocated slots. */
03284             if (!lockbits)
03285                 continue;
03286 
03287             if (el >= els)
03288             {
03289                 els += MaxBackends;
03290                 data->locks = (LockInstanceData *)
03291                     repalloc(data->locks, sizeof(LockInstanceData) * els);
03292             }
03293 
03294             instance = &data->locks[el];
03295             SET_LOCKTAG_RELATION(instance->locktag, proc->databaseId,
03296                                  proc->fpRelId[f]);
03297             instance->holdMask = lockbits << FAST_PATH_LOCKNUMBER_OFFSET;
03298             instance->waitLockMode = NoLock;
03299             instance->backend = proc->backendId;
03300             instance->lxid = proc->lxid;
03301             instance->pid = proc->pid;
03302             instance->fastpath = true;
03303 
03304             el++;
03305         }
03306 
03307         if (proc->fpVXIDLock)
03308         {
03309             VirtualTransactionId vxid;
03310             LockInstanceData *instance;
03311 
03312             if (el >= els)
03313             {
03314                 els += MaxBackends;
03315                 data->locks = (LockInstanceData *)
03316                     repalloc(data->locks, sizeof(LockInstanceData) * els);
03317             }
03318 
03319             vxid.backendId = proc->backendId;
03320             vxid.localTransactionId = proc->fpLocalTransactionId;
03321 
03322             instance = &data->locks[el];
03323             SET_LOCKTAG_VIRTUALTRANSACTION(instance->locktag, vxid);
03324             instance->holdMask = LOCKBIT_ON(ExclusiveLock);
03325             instance->waitLockMode = NoLock;
03326             instance->backend = proc->backendId;
03327             instance->lxid = proc->lxid;
03328             instance->pid = proc->pid;
03329             instance->fastpath = true;
03330 
03331             el++;
03332         }
03333 
03334         LWLockRelease(proc->backendLock);
03335     }
03336 
03337     /*
03338      * Next, acquire lock on the entire shared lock data structure.  We do
03339      * this so that, at least for locks in the primary lock table, the state
03340      * will be self-consistent.
03341      *
03342      * Since this is a read-only operation, we take shared instead of
03343      * exclusive lock.  There's not a whole lot of point to this, because all
03344      * the normal operations require exclusive lock, but it doesn't hurt
03345      * anything either. It will at least allow two backends to do
03346      * GetLockStatusData in parallel.
03347      *
03348      * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
03349      */
03350     for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
03351         LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
03352 
03353     /* Now we can safely count the number of proclocks */
03354     data->nelements = el + hash_get_num_entries(LockMethodProcLockHash);
03355     if (data->nelements > els)
03356     {
03357         els = data->nelements;
03358         data->locks = (LockInstanceData *)
03359             repalloc(data->locks, sizeof(LockInstanceData) * els);
03360     }
03361 
03362     /* Now scan the tables to copy the data */
03363     hash_seq_init(&seqstat, LockMethodProcLockHash);
03364 
03365     while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
03366     {
03367         PGPROC     *proc = proclock->tag.myProc;
03368         LOCK       *lock = proclock->tag.myLock;
03369         LockInstanceData *instance = &data->locks[el];
03370 
03371         memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
03372         instance->holdMask = proclock->holdMask;
03373         if (proc->waitLock == proclock->tag.myLock)
03374             instance->waitLockMode = proc->waitLockMode;
03375         else
03376             instance->waitLockMode = NoLock;
03377         instance->backend = proc->backendId;
03378         instance->lxid = proc->lxid;
03379         instance->pid = proc->pid;
03380         instance->fastpath = false;
03381 
03382         el++;
03383     }
03384 
03385     /*
03386      * And release locks.  We do this in reverse order for two reasons: (1)
03387      * Anyone else who needs more than one of the locks will be trying to lock
03388      * them in increasing order; we don't want to release the other process
03389      * until it can get all the locks it needs. (2) This avoids O(N^2)
03390      * behavior inside LWLockRelease.
03391      */
03392     for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
03393         LWLockRelease(FirstLockMgrLock + i);
03394 
03395     Assert(el == data->nelements);
03396 
03397     return data;
03398 }
03399 
03400 /*
03401  * Returns a list of currently held AccessExclusiveLocks, for use
03402  * by GetRunningTransactionData().
03403  */
03404 xl_standby_lock *
03405 GetRunningTransactionLocks(int *nlocks)
03406 {
03407     PROCLOCK   *proclock;
03408     HASH_SEQ_STATUS seqstat;
03409     int         i;
03410     int         index;
03411     int         els;
03412     xl_standby_lock *accessExclusiveLocks;
03413 
03414     /*
03415      * Acquire lock on the entire shared lock data structure.
03416      *
03417      * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
03418      */
03419     for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
03420         LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
03421 
03422     /* Now we can safely count the number of proclocks */
03423     els = hash_get_num_entries(LockMethodProcLockHash);
03424 
03425     /*
03426      * Allocating enough space for all locks in the lock table is overkill,
03427      * but it's more convenient and faster than having to enlarge the array.
03428      */
03429     accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock));
03430 
03431     /* Now scan the tables to copy the data */
03432     hash_seq_init(&seqstat, LockMethodProcLockHash);
03433 
03434     /*
03435      * If lock is a currently granted AccessExclusiveLock then it will have
03436      * just one proclock holder, so locks are never accessed twice in this
03437      * particular case. Don't copy this code for use elsewhere because in the
03438      * general case this will give you duplicate locks when looking at
03439      * non-exclusive lock types.
03440      */
03441     index = 0;
03442     while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
03443     {
03444         /* make sure this definition matches the one used in LockAcquire */
03445         if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) &&
03446             proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION)
03447         {
03448             PGPROC     *proc = proclock->tag.myProc;
03449             PGXACT     *pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
03450             LOCK       *lock = proclock->tag.myLock;
03451             TransactionId xid = pgxact->xid;
03452 
03453             /*
03454              * Don't record locks for transactions if we know they have
03455              * already issued their WAL record for commit but not yet released
03456              * lock. It is still possible that we see locks held by already
03457              * complete transactions, if they haven't yet zeroed their xids.
03458              */
03459             if (!TransactionIdIsValid(xid))
03460                 continue;
03461 
03462             accessExclusiveLocks[index].xid = xid;
03463             accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1;
03464             accessExclusiveLocks[index].relOid = lock->tag.locktag_field2;
03465 
03466             index++;
03467         }
03468     }
03469 
03470     /*
03471      * And release locks.  We do this in reverse order for two reasons: (1)
03472      * Anyone else who needs more than one of the locks will be trying to lock
03473      * them in increasing order; we don't want to release the other process
03474      * until it can get all the locks it needs. (2) This avoids O(N^2)
03475      * behavior inside LWLockRelease.
03476      */
03477     for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
03478         LWLockRelease(FirstLockMgrLock + i);
03479 
03480     *nlocks = index;
03481     return accessExclusiveLocks;
03482 }
03483 
03484 /* Provide the textual name of any lock mode */
03485 const char *
03486 GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
03487 {
03488     Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods));
03489     Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes);
03490     return LockMethods[lockmethodid]->lockModeNames[mode];
03491 }
03492 
03493 #ifdef LOCK_DEBUG
03494 /*
03495  * Dump all locks in the given proc's myProcLocks lists.
03496  *
03497  * Caller is responsible for having acquired appropriate LWLocks.
03498  */
03499 void
03500 DumpLocks(PGPROC *proc)
03501 {
03502     SHM_QUEUE  *procLocks;
03503     PROCLOCK   *proclock;
03504     LOCK       *lock;
03505     int         i;
03506 
03507     if (proc == NULL)
03508         return;
03509 
03510     if (proc->waitLock)
03511         LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
03512 
03513     for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
03514     {
03515         procLocks = &(proc->myProcLocks[i]);
03516 
03517         proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
03518                                              offsetof(PROCLOCK, procLink));
03519 
03520         while (proclock)
03521         {
03522             Assert(proclock->tag.myProc == proc);
03523 
03524             lock = proclock->tag.myLock;
03525 
03526             PROCLOCK_PRINT("DumpLocks", proclock);
03527             LOCK_PRINT("DumpLocks", lock, 0);
03528 
03529             proclock = (PROCLOCK *)
03530                 SHMQueueNext(procLocks, &proclock->procLink,
03531                              offsetof(PROCLOCK, procLink));
03532         }
03533     }
03534 }
03535 
03536 /*
03537  * Dump all lmgr locks.
03538  *
03539  * Caller is responsible for having acquired appropriate LWLocks.
03540  */
03541 void
03542 DumpAllLocks(void)
03543 {
03544     PGPROC     *proc;
03545     PROCLOCK   *proclock;
03546     LOCK       *lock;
03547     HASH_SEQ_STATUS status;
03548 
03549     proc = MyProc;
03550 
03551     if (proc && proc->waitLock)
03552         LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
03553 
03554     hash_seq_init(&status, LockMethodProcLockHash);
03555 
03556     while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
03557     {
03558         PROCLOCK_PRINT("DumpAllLocks", proclock);
03559 
03560         lock = proclock->tag.myLock;
03561         if (lock)
03562             LOCK_PRINT("DumpAllLocks", lock, 0);
03563         else
03564             elog(LOG, "DumpAllLocks: proclock->tag.myLock = NULL");
03565     }
03566 }
03567 #endif   /* LOCK_DEBUG */
03568 
03569 /*
03570  * LOCK 2PC resource manager's routines
03571  */
03572 
03573 /*
03574  * Re-acquire a lock belonging to a transaction that was prepared.
03575  *
03576  * Because this function is run at db startup, re-acquiring the locks should
03577  * never conflict with running transactions because there are none.  We
03578  * assume that the lock state represented by the stored 2PC files is legal.
03579  *
03580  * When switching from Hot Standby mode to normal operation, the locks will
03581  * be already held by the startup process. The locks are acquired for the new
03582  * procs without checking for conflicts, so we don't get a conflict between the
03583  * startup process and the dummy procs, even though we will momentarily have
03584  * a situation where two procs are holding the same AccessExclusiveLock,
03585  * which isn't normally possible because the conflict. If we're in standby
03586  * mode, but a recovery snapshot hasn't been established yet, it's possible
03587  * that some but not all of the locks are already held by the startup process.
03588  *
03589  * This approach is simple, but also a bit dangerous, because if there isn't
03590  * enough shared memory to acquire the locks, an error will be thrown, which
03591  * is promoted to FATAL and recovery will abort, bringing down postmaster.
03592  * A safer approach would be to transfer the locks like we do in
03593  * AtPrepare_Locks, but then again, in hot standby mode it's possible for
03594  * read-only backends to use up all the shared lock memory anyway, so that
03595  * replaying the WAL record that needs to acquire a lock will throw an error
03596  * and PANIC anyway.
03597  */
03598 void
03599 lock_twophase_recover(TransactionId xid, uint16 info,
03600                       void *recdata, uint32 len)
03601 {
03602     TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
03603     PGPROC     *proc = TwoPhaseGetDummyProc(xid);
03604     LOCKTAG    *locktag;
03605     LOCKMODE    lockmode;
03606     LOCKMETHODID lockmethodid;
03607     LOCK       *lock;
03608     PROCLOCK   *proclock;
03609     PROCLOCKTAG proclocktag;
03610     bool        found;
03611     uint32      hashcode;
03612     uint32      proclock_hashcode;
03613     int         partition;
03614     LWLockId    partitionLock;
03615     LockMethod  lockMethodTable;
03616 
03617     Assert(len == sizeof(TwoPhaseLockRecord));
03618     locktag = &rec->locktag;
03619     lockmode = rec->lockmode;
03620     lockmethodid = locktag->locktag_lockmethodid;
03621 
03622     if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
03623         elog(ERROR, "unrecognized lock method: %d", lockmethodid);
03624     lockMethodTable = LockMethods[lockmethodid];
03625 
03626     hashcode = LockTagHashCode(locktag);
03627     partition = LockHashPartition(hashcode);
03628     partitionLock = LockHashPartitionLock(hashcode);
03629 
03630     LWLockAcquire(partitionLock, LW_EXCLUSIVE);
03631 
03632     /*
03633      * Find or create a lock with this tag.
03634      */
03635     lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
03636                                                 (void *) locktag,
03637                                                 hashcode,
03638                                                 HASH_ENTER_NULL,
03639                                                 &found);
03640     if (!lock)
03641     {
03642         LWLockRelease(partitionLock);
03643         ereport(ERROR,
03644                 (errcode(ERRCODE_OUT_OF_MEMORY),
03645                  errmsg("out of shared memory"),
03646           errhint("You might need to increase max_locks_per_transaction.")));
03647     }
03648 
03649     /*
03650      * if it's a new lock object, initialize it
03651      */
03652     if (!found)
03653     {
03654         lock->grantMask = 0;
03655         lock->waitMask = 0;
03656         SHMQueueInit(&(lock->procLocks));
03657         ProcQueueInit(&(lock->waitProcs));
03658         lock->nRequested = 0;
03659         lock->nGranted = 0;
03660         MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
03661         MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
03662         LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
03663     }
03664     else
03665     {
03666         LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
03667         Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
03668         Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
03669         Assert(lock->nGranted <= lock->nRequested);
03670     }
03671 
03672     /*
03673      * Create the hash key for the proclock table.
03674      */
03675     proclocktag.myLock = lock;
03676     proclocktag.myProc = proc;
03677 
03678     proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
03679 
03680     /*
03681      * Find or create a proclock entry with this tag
03682      */
03683     proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
03684                                                         (void *) &proclocktag,
03685                                                         proclock_hashcode,
03686                                                         HASH_ENTER_NULL,
03687                                                         &found);
03688     if (!proclock)
03689     {
03690         /* Ooops, not enough shmem for the proclock */
03691         if (lock->nRequested == 0)
03692         {
03693             /*
03694              * There are no other requestors of this lock, so garbage-collect
03695              * the lock object.  We *must* do this to avoid a permanent leak
03696              * of shared memory, because there won't be anything to cause
03697              * anyone to release the lock object later.
03698              */
03699             Assert(SHMQueueEmpty(&(lock->procLocks)));
03700             if (!hash_search_with_hash_value(LockMethodLockHash,
03701                                              (void *) &(lock->tag),
03702                                              hashcode,
03703                                              HASH_REMOVE,
03704                                              NULL))
03705                 elog(PANIC, "lock table corrupted");
03706         }
03707         LWLockRelease(partitionLock);
03708         ereport(ERROR,
03709                 (errcode(ERRCODE_OUT_OF_MEMORY),
03710                  errmsg("out of shared memory"),
03711           errhint("You might need to increase max_locks_per_transaction.")));
03712     }
03713 
03714     /*
03715      * If new, initialize the new entry
03716      */
03717     if (!found)
03718     {
03719         proclock->holdMask = 0;
03720         proclock->releaseMask = 0;
03721         /* Add proclock to appropriate lists */
03722         SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
03723         SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
03724                              &proclock->procLink);
03725         PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
03726     }
03727     else
03728     {
03729         PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
03730         Assert((proclock->holdMask & ~lock->grantMask) == 0);
03731     }
03732 
03733     /*
03734      * lock->nRequested and lock->requested[] count the total number of
03735      * requests, whether granted or waiting, so increment those immediately.
03736      */
03737     lock->nRequested++;
03738     lock->requested[lockmode]++;
03739     Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
03740 
03741     /*
03742      * We shouldn't already hold the desired lock.
03743      */
03744     if (proclock->holdMask & LOCKBIT_ON(lockmode))
03745         elog(ERROR, "lock %s on object %u/%u/%u is already held",
03746              lockMethodTable->lockModeNames[lockmode],
03747              lock->tag.locktag_field1, lock->tag.locktag_field2,
03748              lock->tag.locktag_field3);
03749 
03750     /*
03751      * We ignore any possible conflicts and just grant ourselves the lock. Not
03752      * only because we don't bother, but also to avoid deadlocks when
03753      * switching from standby to normal mode. See function comment.
03754      */
03755     GrantLock(lock, proclock, lockmode);
03756 
03757     /*
03758      * Bump strong lock count, to make sure any fast-path lock requests won't
03759      * be granted without consulting the primary lock table.
03760      */
03761     if (ConflictsWithRelationFastPath(&lock->tag, lockmode))
03762     {
03763         uint32      fasthashcode = FastPathStrongLockHashPartition(hashcode);
03764 
03765         SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
03766         FastPathStrongRelationLocks->count[fasthashcode]++;
03767         SpinLockRelease(&FastPathStrongRelationLocks->mutex);
03768     }
03769 
03770     LWLockRelease(partitionLock);
03771 }
03772 
03773 /*
03774  * Re-acquire a lock belonging to a transaction that was prepared, when
03775  * starting up into hot standby mode.
03776  */
03777 void
03778 lock_twophase_standby_recover(TransactionId xid, uint16 info,
03779                               void *recdata, uint32 len)
03780 {
03781     TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
03782     LOCKTAG    *locktag;
03783     LOCKMODE    lockmode;
03784     LOCKMETHODID lockmethodid;
03785 
03786     Assert(len == sizeof(TwoPhaseLockRecord));
03787     locktag = &rec->locktag;
03788     lockmode = rec->lockmode;
03789     lockmethodid = locktag->locktag_lockmethodid;
03790 
03791     if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
03792         elog(ERROR, "unrecognized lock method: %d", lockmethodid);
03793 
03794     if (lockmode == AccessExclusiveLock &&
03795         locktag->locktag_type == LOCKTAG_RELATION)
03796     {
03797         StandbyAcquireAccessExclusiveLock(xid,
03798                                         locktag->locktag_field1 /* dboid */ ,
03799                                       locktag->locktag_field2 /* reloid */ );
03800     }
03801 }
03802 
03803 
03804 /*
03805  * 2PC processing routine for COMMIT PREPARED case.
03806  *
03807  * Find and release the lock indicated by the 2PC record.
03808  */
03809 void
03810 lock_twophase_postcommit(TransactionId xid, uint16 info,
03811                          void *recdata, uint32 len)
03812 {
03813     TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
03814     PGPROC     *proc = TwoPhaseGetDummyProc(xid);
03815     LOCKTAG    *locktag;
03816     LOCKMETHODID lockmethodid;
03817     LockMethod  lockMethodTable;
03818 
03819     Assert(len == sizeof(TwoPhaseLockRecord));
03820     locktag = &rec->locktag;
03821     lockmethodid = locktag->locktag_lockmethodid;
03822 
03823     if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
03824         elog(ERROR, "unrecognized lock method: %d", lockmethodid);
03825     lockMethodTable = LockMethods[lockmethodid];
03826 
03827     LockRefindAndRelease(lockMethodTable, proc, locktag, rec->lockmode, true);
03828 }
03829 
03830 /*
03831  * 2PC processing routine for ROLLBACK PREPARED case.
03832  *
03833  * This is actually just the same as the COMMIT case.
03834  */
03835 void
03836 lock_twophase_postabort(TransactionId xid, uint16 info,
03837                         void *recdata, uint32 len)
03838 {
03839     lock_twophase_postcommit(xid, info, recdata, len);
03840 }
03841 
03842 /*
03843  *      VirtualXactLockTableInsert
03844  *
03845  *      Take vxid lock via the fast-path.  There can't be any pre-existing
03846  *      lockers, as we haven't advertised this vxid via the ProcArray yet.
03847  *
03848  *      Since MyProc->fpLocalTransactionId will normally contain the same data
03849  *      as MyProc->lxid, you might wonder if we really need both.  The
03850  *      difference is that MyProc->lxid is set and cleared unlocked, and
03851  *      examined by procarray.c, while fpLocalTransactionId is protected by
03852  *      backendLock and is used only by the locking subsystem.  Doing it this
03853  *      way makes it easier to verify that there are no funny race conditions.
03854  *
03855  *      We don't bother recording this lock in the local lock table, since it's
03856  *      only ever released at the end of a transaction.  Instead,
03857  *      LockReleaseAll() calls VirtualXactLockTableCleanup().
03858  */
03859 void
03860 VirtualXactLockTableInsert(VirtualTransactionId vxid)
03861 {
03862     Assert(VirtualTransactionIdIsValid(vxid));
03863 
03864     LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
03865 
03866     Assert(MyProc->backendId == vxid.backendId);
03867     Assert(MyProc->fpLocalTransactionId == InvalidLocalTransactionId);
03868     Assert(MyProc->fpVXIDLock == false);
03869 
03870     MyProc->fpVXIDLock = true;
03871     MyProc->fpLocalTransactionId = vxid.localTransactionId;
03872 
03873     LWLockRelease(MyProc->backendLock);
03874 }
03875 
03876 /*
03877  *      VirtualXactLockTableCleanup
03878  *
03879  *      Check whether a VXID lock has been materialized; if so, release it,
03880  *      unblocking waiters.
03881  */
03882 void
03883 VirtualXactLockTableCleanup()
03884 {
03885     bool        fastpath;
03886     LocalTransactionId lxid;
03887 
03888     Assert(MyProc->backendId != InvalidBackendId);
03889 
03890     /*
03891      * Clean up shared memory state.
03892      */
03893     LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
03894 
03895     fastpath = MyProc->fpVXIDLock;
03896     lxid = MyProc->fpLocalTransactionId;
03897     MyProc->fpVXIDLock = false;
03898     MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
03899 
03900     LWLockRelease(MyProc->backendLock);
03901 
03902     /*
03903      * If fpVXIDLock has been cleared without touching fpLocalTransactionId,
03904      * that means someone transferred the lock to the main lock table.
03905      */
03906     if (!fastpath && LocalTransactionIdIsValid(lxid))
03907     {
03908         VirtualTransactionId vxid;
03909         LOCKTAG     locktag;
03910 
03911         vxid.backendId = MyBackendId;
03912         vxid.localTransactionId = lxid;
03913         SET_LOCKTAG_VIRTUALTRANSACTION(locktag, vxid);
03914 
03915         LockRefindAndRelease(LockMethods[DEFAULT_LOCKMETHOD], MyProc,
03916                              &locktag, ExclusiveLock, false);
03917     }
03918 }
03919 
03920 /*
03921  *      VirtualXactLock
03922  *
03923  * If wait = true, wait until the given VXID has been released, and then
03924  * return true.
03925  *
03926  * If wait = false, just check whether the VXID is still running, and return
03927  * true or false.
03928  */
03929 bool
03930 VirtualXactLock(VirtualTransactionId vxid, bool wait)
03931 {
03932     LOCKTAG     tag;
03933     PGPROC     *proc;
03934 
03935     Assert(VirtualTransactionIdIsValid(vxid));
03936 
03937     SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
03938 
03939     /*
03940      * If a lock table entry must be made, this is the PGPROC on whose behalf
03941      * it must be done.  Note that the transaction might end or the PGPROC
03942      * might be reassigned to a new backend before we get around to examining
03943      * it, but it doesn't matter.  If we find upon examination that the
03944      * relevant lxid is no longer running here, that's enough to prove that
03945      * it's no longer running anywhere.
03946      */
03947     proc = BackendIdGetProc(vxid.backendId);
03948     if (proc == NULL)
03949         return true;
03950 
03951     /*
03952      * We must acquire this lock before checking the backendId and lxid
03953      * against the ones we're waiting for.  The target backend will only set
03954      * or clear lxid while holding this lock.
03955      */
03956     LWLockAcquire(proc->backendLock, LW_EXCLUSIVE);
03957 
03958     /* If the transaction has ended, our work here is done. */
03959     if (proc->backendId != vxid.backendId
03960         || proc->fpLocalTransactionId != vxid.localTransactionId)
03961     {
03962         LWLockRelease(proc->backendLock);
03963         return true;
03964     }
03965 
03966     /*
03967      * If we aren't asked to wait, there's no need to set up a lock table
03968      * entry.  The transaction is still in progress, so just return false.
03969      */
03970     if (!wait)
03971     {
03972         LWLockRelease(proc->backendLock);
03973         return false;
03974     }
03975 
03976     /*
03977      * OK, we're going to need to sleep on the VXID.  But first, we must set
03978      * up the primary lock table entry, if needed (ie, convert the proc's
03979      * fast-path lock on its VXID to a regular lock).
03980      */
03981     if (proc->fpVXIDLock)
03982     {
03983         PROCLOCK   *proclock;
03984         uint32      hashcode;
03985         LWLockId    partitionLock;
03986 
03987         hashcode = LockTagHashCode(&tag);
03988 
03989         partitionLock = LockHashPartitionLock(hashcode);
03990         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
03991 
03992         proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc,
03993                                     &tag, hashcode, ExclusiveLock);
03994         if (!proclock)
03995         {
03996             LWLockRelease(partitionLock);
03997             ereport(ERROR,
03998                     (errcode(ERRCODE_OUT_OF_MEMORY),
03999                      errmsg("out of shared memory"),
04000                      errhint("You might need to increase max_locks_per_transaction.")));
04001         }
04002         GrantLock(proclock->tag.myLock, proclock, ExclusiveLock);
04003 
04004         LWLockRelease(partitionLock);
04005 
04006         proc->fpVXIDLock = false;
04007     }
04008 
04009     /* Done with proc->fpLockBits */
04010     LWLockRelease(proc->backendLock);
04011 
04012     /* Time to wait. */
04013     (void) LockAcquire(&tag, ShareLock, false, false);
04014 
04015     LockRelease(&tag, ShareLock, false);
04016     return true;
04017 }