Header And Logo

PostgreSQL
| The world's most advanced open source database.

lwlock.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * lwlock.c
00004  *    Lightweight lock manager
00005  *
00006  * Lightweight locks are intended primarily to provide mutual exclusion of
00007  * access to shared-memory data structures.  Therefore, they offer both
00008  * exclusive and shared lock modes (to support read/write and read-only
00009  * access to a shared object).  There are few other frammishes.  User-level
00010  * locking should be done with the full lock manager --- which depends on
00011  * LWLocks to protect its shared state.
00012  *
00013  *
00014  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00015  * Portions Copyright (c) 1994, Regents of the University of California
00016  *
00017  * IDENTIFICATION
00018  *    src/backend/storage/lmgr/lwlock.c
00019  *
00020  *-------------------------------------------------------------------------
00021  */
00022 #include "postgres.h"
00023 
00024 #include "access/clog.h"
00025 #include "access/multixact.h"
00026 #include "access/subtrans.h"
00027 #include "commands/async.h"
00028 #include "miscadmin.h"
00029 #include "pg_trace.h"
00030 #include "storage/ipc.h"
00031 #include "storage/predicate.h"
00032 #include "storage/proc.h"
00033 #include "storage/spin.h"
00034 
00035 
00036 /* We use the ShmemLock spinlock to protect LWLockAssign */
00037 extern slock_t *ShmemLock;
00038 
00039 
00040 typedef struct LWLock
00041 {
00042     slock_t     mutex;          /* Protects LWLock and queue of PGPROCs */
00043     bool        releaseOK;      /* T if ok to release waiters */
00044     char        exclusive;      /* # of exclusive holders (0 or 1) */
00045     int         shared;         /* # of shared holders (0..MaxBackends) */
00046     PGPROC     *head;           /* head of list of waiting PGPROCs */
00047     PGPROC     *tail;           /* tail of list of waiting PGPROCs */
00048     /* tail is undefined when head is NULL */
00049 } LWLock;
00050 
00051 /*
00052  * All the LWLock structs are allocated as an array in shared memory.
00053  * (LWLockIds are indexes into the array.)  We force the array stride to
00054  * be a power of 2, which saves a few cycles in indexing, but more
00055  * importantly also ensures that individual LWLocks don't cross cache line
00056  * boundaries.  This reduces cache contention problems, especially on AMD
00057  * Opterons.  (Of course, we have to also ensure that the array start
00058  * address is suitably aligned.)
00059  *
00060  * LWLock is between 16 and 32 bytes on all known platforms, so these two
00061  * cases are sufficient.
00062  */
00063 #define LWLOCK_PADDED_SIZE  (sizeof(LWLock) <= 16 ? 16 : 32)
00064 
00065 typedef union LWLockPadded
00066 {
00067     LWLock      lock;
00068     char        pad[LWLOCK_PADDED_SIZE];
00069 } LWLockPadded;
00070 
00071 /*
00072  * This points to the array of LWLocks in shared memory.  Backends inherit
00073  * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
00074  * where we have special measures to pass it down).
00075  */
00076 NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
00077 
00078 
00079 /*
00080  * We use this structure to keep track of locked LWLocks for release
00081  * during error recovery.  The maximum size could be determined at runtime
00082  * if necessary, but it seems unlikely that more than a few locks could
00083  * ever be held simultaneously.
00084  */
00085 #define MAX_SIMUL_LWLOCKS   100
00086 
00087 static int  num_held_lwlocks = 0;
00088 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
00089 
00090 static int  lock_addin_request = 0;
00091 static bool lock_addin_request_allowed = true;
00092 
00093 #ifdef LWLOCK_STATS
00094 static int  counts_for_pid = 0;
00095 static int *sh_acquire_counts;
00096 static int *ex_acquire_counts;
00097 static int *block_counts;
00098 static int *spin_delay_counts;
00099 #endif
00100 
00101 #ifdef LOCK_DEBUG
00102 bool        Trace_lwlocks = false;
00103 
00104 inline static void
00105 PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
00106 {
00107     if (Trace_lwlocks)
00108         elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
00109              where, (int) lockid,
00110              (int) lock->exclusive, lock->shared, lock->head,
00111              (int) lock->releaseOK);
00112 }
00113 
00114 inline static void
00115 LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
00116 {
00117     if (Trace_lwlocks)
00118         elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
00119 }
00120 #else                           /* not LOCK_DEBUG */
00121 #define PRINT_LWDEBUG(a,b,c)
00122 #define LOG_LWDEBUG(a,b,c)
00123 #endif   /* LOCK_DEBUG */
00124 
00125 #ifdef LWLOCK_STATS
00126 
00127 static void init_lwlock_stats(void);
00128 static void print_lwlock_stats(int code, Datum arg);
00129 
00130 static void
00131 init_lwlock_stats(void)
00132 {
00133     int        *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
00134     int         numLocks = LWLockCounter[1];
00135 
00136     sh_acquire_counts = calloc(numLocks, sizeof(int));
00137     ex_acquire_counts = calloc(numLocks, sizeof(int));
00138     spin_delay_counts = calloc(numLocks, sizeof(int));
00139     block_counts = calloc(numLocks, sizeof(int));
00140     counts_for_pid = MyProcPid;
00141     on_shmem_exit(print_lwlock_stats, 0);
00142 }
00143 
00144 static void
00145 print_lwlock_stats(int code, Datum arg)
00146 {
00147     int         i;
00148     int        *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
00149     int         numLocks = LWLockCounter[1];
00150 
00151     /* Grab an LWLock to keep different backends from mixing reports */
00152     LWLockAcquire(0, LW_EXCLUSIVE);
00153 
00154     for (i = 0; i < numLocks; i++)
00155     {
00156         if (sh_acquire_counts[i] || ex_acquire_counts[i] || block_counts[i] || spin_delay_counts[i])
00157             fprintf(stderr, "PID %d lwlock %d: shacq %u exacq %u blk %u spindelay %u\n",
00158                     MyProcPid, i, sh_acquire_counts[i], ex_acquire_counts[i],
00159                     block_counts[i], spin_delay_counts[i]);
00160     }
00161 
00162     LWLockRelease(0);
00163 }
00164 #endif   /* LWLOCK_STATS */
00165 
00166 
00167 /*
00168  * Compute number of LWLocks to allocate.
00169  */
00170 int
00171 NumLWLocks(void)
00172 {
00173     int         numLocks;
00174 
00175     /*
00176      * Possibly this logic should be spread out among the affected modules,
00177      * the same way that shmem space estimation is done.  But for now, there
00178      * are few enough users of LWLocks that we can get away with just keeping
00179      * the knowledge here.
00180      */
00181 
00182     /* Predefined LWLocks */
00183     numLocks = (int) NumFixedLWLocks;
00184 
00185     /* bufmgr.c needs two for each shared buffer */
00186     numLocks += 2 * NBuffers;
00187 
00188     /* proc.c needs one for each backend or auxiliary process */
00189     numLocks += MaxBackends + NUM_AUXILIARY_PROCS;
00190 
00191     /* clog.c needs one per CLOG buffer */
00192     numLocks += CLOGShmemBuffers();
00193 
00194     /* subtrans.c needs one per SubTrans buffer */
00195     numLocks += NUM_SUBTRANS_BUFFERS;
00196 
00197     /* multixact.c needs two SLRU areas */
00198     numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS;
00199 
00200     /* async.c needs one per Async buffer */
00201     numLocks += NUM_ASYNC_BUFFERS;
00202 
00203     /* predicate.c needs one per old serializable xid buffer */
00204     numLocks += NUM_OLDSERXID_BUFFERS;
00205 
00206     /*
00207      * Add any requested by loadable modules; for backwards-compatibility
00208      * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
00209      * there are no explicit requests.
00210      */
00211     lock_addin_request_allowed = false;
00212     numLocks += Max(lock_addin_request, NUM_USER_DEFINED_LWLOCKS);
00213 
00214     return numLocks;
00215 }
00216 
00217 
00218 /*
00219  * RequestAddinLWLocks
00220  *      Request that extra LWLocks be allocated for use by
00221  *      a loadable module.
00222  *
00223  * This is only useful if called from the _PG_init hook of a library that
00224  * is loaded into the postmaster via shared_preload_libraries.  Once
00225  * shared memory has been allocated, calls will be ignored.  (We could
00226  * raise an error, but it seems better to make it a no-op, so that
00227  * libraries containing such calls can be reloaded if needed.)
00228  */
00229 void
00230 RequestAddinLWLocks(int n)
00231 {
00232     if (IsUnderPostmaster || !lock_addin_request_allowed)
00233         return;                 /* too late */
00234     lock_addin_request += n;
00235 }
00236 
00237 
00238 /*
00239  * Compute shmem space needed for LWLocks.
00240  */
00241 Size
00242 LWLockShmemSize(void)
00243 {
00244     Size        size;
00245     int         numLocks = NumLWLocks();
00246 
00247     /* Space for the LWLock array. */
00248     size = mul_size(numLocks, sizeof(LWLockPadded));
00249 
00250     /* Space for dynamic allocation counter, plus room for alignment. */
00251     size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE);
00252 
00253     return size;
00254 }
00255 
00256 
00257 /*
00258  * Allocate shmem space for LWLocks and initialize the locks.
00259  */
00260 void
00261 CreateLWLocks(void)
00262 {
00263     int         numLocks = NumLWLocks();
00264     Size        spaceLocks = LWLockShmemSize();
00265     LWLockPadded *lock;
00266     int        *LWLockCounter;
00267     char       *ptr;
00268     int         id;
00269 
00270     /* Allocate space */
00271     ptr = (char *) ShmemAlloc(spaceLocks);
00272 
00273     /* Leave room for dynamic allocation counter */
00274     ptr += 2 * sizeof(int);
00275 
00276     /* Ensure desired alignment of LWLock array */
00277     ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
00278 
00279     LWLockArray = (LWLockPadded *) ptr;
00280 
00281     /*
00282      * Initialize all LWLocks to "unlocked" state
00283      */
00284     for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
00285     {
00286         SpinLockInit(&lock->lock.mutex);
00287         lock->lock.releaseOK = true;
00288         lock->lock.exclusive = 0;
00289         lock->lock.shared = 0;
00290         lock->lock.head = NULL;
00291         lock->lock.tail = NULL;
00292     }
00293 
00294     /*
00295      * Initialize the dynamic-allocation counter, which is stored just before
00296      * the first LWLock.
00297      */
00298     LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
00299     LWLockCounter[0] = (int) NumFixedLWLocks;
00300     LWLockCounter[1] = numLocks;
00301 }
00302 
00303 
00304 /*
00305  * LWLockAssign - assign a dynamically-allocated LWLock number
00306  *
00307  * We interlock this using the same spinlock that is used to protect
00308  * ShmemAlloc().  Interlocking is not really necessary during postmaster
00309  * startup, but it is needed if any user-defined code tries to allocate
00310  * LWLocks after startup.
00311  */
00312 LWLockId
00313 LWLockAssign(void)
00314 {
00315     LWLockId    result;
00316 
00317     /* use volatile pointer to prevent code rearrangement */
00318     volatile int *LWLockCounter;
00319 
00320     LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
00321     SpinLockAcquire(ShmemLock);
00322     if (LWLockCounter[0] >= LWLockCounter[1])
00323     {
00324         SpinLockRelease(ShmemLock);
00325         elog(ERROR, "no more LWLockIds available");
00326     }
00327     result = (LWLockId) (LWLockCounter[0]++);
00328     SpinLockRelease(ShmemLock);
00329     return result;
00330 }
00331 
00332 
00333 /*
00334  * LWLockAcquire - acquire a lightweight lock in the specified mode
00335  *
00336  * If the lock is not available, sleep until it is.
00337  *
00338  * Side effect: cancel/die interrupts are held off until lock release.
00339  */
00340 void
00341 LWLockAcquire(LWLockId lockid, LWLockMode mode)
00342 {
00343     volatile LWLock *lock = &(LWLockArray[lockid].lock);
00344     PGPROC     *proc = MyProc;
00345     bool        retry = false;
00346     int         extraWaits = 0;
00347 
00348     PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
00349 
00350 #ifdef LWLOCK_STATS
00351     /* Set up local count state first time through in a given process */
00352     if (counts_for_pid != MyProcPid)
00353         init_lwlock_stats();
00354     /* Count lock acquisition attempts */
00355     if (mode == LW_EXCLUSIVE)
00356         ex_acquire_counts[lockid]++;
00357     else
00358         sh_acquire_counts[lockid]++;
00359 #endif   /* LWLOCK_STATS */
00360 
00361     /*
00362      * We can't wait if we haven't got a PGPROC.  This should only occur
00363      * during bootstrap or shared memory initialization.  Put an Assert here
00364      * to catch unsafe coding practices.
00365      */
00366     Assert(!(proc == NULL && IsUnderPostmaster));
00367 
00368     /* Ensure we will have room to remember the lock */
00369     if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
00370         elog(ERROR, "too many LWLocks taken");
00371 
00372     /*
00373      * Lock out cancel/die interrupts until we exit the code section protected
00374      * by the LWLock.  This ensures that interrupts will not interfere with
00375      * manipulations of data structures in shared memory.
00376      */
00377     HOLD_INTERRUPTS();
00378 
00379     /*
00380      * Loop here to try to acquire lock after each time we are signaled by
00381      * LWLockRelease.
00382      *
00383      * NOTE: it might seem better to have LWLockRelease actually grant us the
00384      * lock, rather than retrying and possibly having to go back to sleep. But
00385      * in practice that is no good because it means a process swap for every
00386      * lock acquisition when two or more processes are contending for the same
00387      * lock.  Since LWLocks are normally used to protect not-very-long
00388      * sections of computation, a process needs to be able to acquire and
00389      * release the same lock many times during a single CPU time slice, even
00390      * in the presence of contention.  The efficiency of being able to do that
00391      * outweighs the inefficiency of sometimes wasting a process dispatch
00392      * cycle because the lock is not free when a released waiter finally gets
00393      * to run.  See pgsql-hackers archives for 29-Dec-01.
00394      */
00395     for (;;)
00396     {
00397         bool        mustwait;
00398 
00399         /* Acquire mutex.  Time spent holding mutex should be short! */
00400 #ifdef LWLOCK_STATS
00401         spin_delay_counts[lockid] += SpinLockAcquire(&lock->mutex);
00402 #else
00403         SpinLockAcquire(&lock->mutex);
00404 #endif
00405 
00406         /* If retrying, allow LWLockRelease to release waiters again */
00407         if (retry)
00408             lock->releaseOK = true;
00409 
00410         /* If I can get the lock, do so quickly. */
00411         if (mode == LW_EXCLUSIVE)
00412         {
00413             if (lock->exclusive == 0 && lock->shared == 0)
00414             {
00415                 lock->exclusive++;
00416                 mustwait = false;
00417             }
00418             else
00419                 mustwait = true;
00420         }
00421         else
00422         {
00423             if (lock->exclusive == 0)
00424             {
00425                 lock->shared++;
00426                 mustwait = false;
00427             }
00428             else
00429                 mustwait = true;
00430         }
00431 
00432         if (!mustwait)
00433             break;              /* got the lock */
00434 
00435         /*
00436          * Add myself to wait queue.
00437          *
00438          * If we don't have a PGPROC structure, there's no way to wait. This
00439          * should never occur, since MyProc should only be null during shared
00440          * memory initialization.
00441          */
00442         if (proc == NULL)
00443             elog(PANIC, "cannot wait without a PGPROC structure");
00444 
00445         proc->lwWaiting = true;
00446         proc->lwWaitMode = mode;
00447         proc->lwWaitLink = NULL;
00448         if (lock->head == NULL)
00449             lock->head = proc;
00450         else
00451             lock->tail->lwWaitLink = proc;
00452         lock->tail = proc;
00453 
00454         /* Can release the mutex now */
00455         SpinLockRelease(&lock->mutex);
00456 
00457         /*
00458          * Wait until awakened.
00459          *
00460          * Since we share the process wait semaphore with the regular lock
00461          * manager and ProcWaitForSignal, and we may need to acquire an LWLock
00462          * while one of those is pending, it is possible that we get awakened
00463          * for a reason other than being signaled by LWLockRelease. If so,
00464          * loop back and wait again.  Once we've gotten the LWLock,
00465          * re-increment the sema by the number of additional signals received,
00466          * so that the lock manager or signal manager will see the received
00467          * signal when it next waits.
00468          */
00469         LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
00470 
00471 #ifdef LWLOCK_STATS
00472         block_counts[lockid]++;
00473 #endif
00474 
00475         TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
00476 
00477         for (;;)
00478         {
00479             /* "false" means cannot accept cancel/die interrupt here. */
00480             PGSemaphoreLock(&proc->sem, false);
00481             if (!proc->lwWaiting)
00482                 break;
00483             extraWaits++;
00484         }
00485 
00486         TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
00487 
00488         LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
00489 
00490         /* Now loop back and try to acquire lock again. */
00491         retry = true;
00492     }
00493 
00494     /* We are done updating shared state of the lock itself. */
00495     SpinLockRelease(&lock->mutex);
00496 
00497     TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode);
00498 
00499     /* Add lock to list of locks held by this backend */
00500     held_lwlocks[num_held_lwlocks++] = lockid;
00501 
00502     /*
00503      * Fix the process wait semaphore's count for any absorbed wakeups.
00504      */
00505     while (extraWaits-- > 0)
00506         PGSemaphoreUnlock(&proc->sem);
00507 }
00508 
00509 /*
00510  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
00511  *
00512  * If the lock is not available, return FALSE with no side-effects.
00513  *
00514  * If successful, cancel/die interrupts are held off until lock release.
00515  */
00516 bool
00517 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
00518 {
00519     volatile LWLock *lock = &(LWLockArray[lockid].lock);
00520     bool        mustwait;
00521 
00522     PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
00523 
00524     /* Ensure we will have room to remember the lock */
00525     if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
00526         elog(ERROR, "too many LWLocks taken");
00527 
00528     /*
00529      * Lock out cancel/die interrupts until we exit the code section protected
00530      * by the LWLock.  This ensures that interrupts will not interfere with
00531      * manipulations of data structures in shared memory.
00532      */
00533     HOLD_INTERRUPTS();
00534 
00535     /* Acquire mutex.  Time spent holding mutex should be short! */
00536     SpinLockAcquire(&lock->mutex);
00537 
00538     /* If I can get the lock, do so quickly. */
00539     if (mode == LW_EXCLUSIVE)
00540     {
00541         if (lock->exclusive == 0 && lock->shared == 0)
00542         {
00543             lock->exclusive++;
00544             mustwait = false;
00545         }
00546         else
00547             mustwait = true;
00548     }
00549     else
00550     {
00551         if (lock->exclusive == 0)
00552         {
00553             lock->shared++;
00554             mustwait = false;
00555         }
00556         else
00557             mustwait = true;
00558     }
00559 
00560     /* We are done updating shared state of the lock itself. */
00561     SpinLockRelease(&lock->mutex);
00562 
00563     if (mustwait)
00564     {
00565         /* Failed to get lock, so release interrupt holdoff */
00566         RESUME_INTERRUPTS();
00567         LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
00568         TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode);
00569     }
00570     else
00571     {
00572         /* Add lock to list of locks held by this backend */
00573         held_lwlocks[num_held_lwlocks++] = lockid;
00574         TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode);
00575     }
00576 
00577     return !mustwait;
00578 }
00579 
00580 /*
00581  * LWLockAcquireOrWait - Acquire lock, or wait until it's free
00582  *
00583  * The semantics of this function are a bit funky.  If the lock is currently
00584  * free, it is acquired in the given mode, and the function returns true.  If
00585  * the lock isn't immediately free, the function waits until it is released
00586  * and returns false, but does not acquire the lock.
00587  *
00588  * This is currently used for WALWriteLock: when a backend flushes the WAL,
00589  * holding WALWriteLock, it can flush the commit records of many other
00590  * backends as a side-effect.  Those other backends need to wait until the
00591  * flush finishes, but don't need to acquire the lock anymore.  They can just
00592  * wake up, observe that their records have already been flushed, and return.
00593  */
00594 bool
00595 LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
00596 {
00597     volatile LWLock *lock = &(LWLockArray[lockid].lock);
00598     PGPROC     *proc = MyProc;
00599     bool        mustwait;
00600     int         extraWaits = 0;
00601 
00602     PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock);
00603 
00604 #ifdef LWLOCK_STATS
00605     /* Set up local count state first time through in a given process */
00606     if (counts_for_pid != MyProcPid)
00607         init_lwlock_stats();
00608 #endif
00609 
00610     /* Ensure we will have room to remember the lock */
00611     if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
00612         elog(ERROR, "too many LWLocks taken");
00613 
00614     /*
00615      * Lock out cancel/die interrupts until we exit the code section protected
00616      * by the LWLock.  This ensures that interrupts will not interfere with
00617      * manipulations of data structures in shared memory.
00618      */
00619     HOLD_INTERRUPTS();
00620 
00621     /* Acquire mutex.  Time spent holding mutex should be short! */
00622     SpinLockAcquire(&lock->mutex);
00623 
00624     /* If I can get the lock, do so quickly. */
00625     if (mode == LW_EXCLUSIVE)
00626     {
00627         if (lock->exclusive == 0 && lock->shared == 0)
00628         {
00629             lock->exclusive++;
00630             mustwait = false;
00631         }
00632         else
00633             mustwait = true;
00634     }
00635     else
00636     {
00637         if (lock->exclusive == 0)
00638         {
00639             lock->shared++;
00640             mustwait = false;
00641         }
00642         else
00643             mustwait = true;
00644     }
00645 
00646     if (mustwait)
00647     {
00648         /*
00649          * Add myself to wait queue.
00650          *
00651          * If we don't have a PGPROC structure, there's no way to wait.  This
00652          * should never occur, since MyProc should only be null during shared
00653          * memory initialization.
00654          */
00655         if (proc == NULL)
00656             elog(PANIC, "cannot wait without a PGPROC structure");
00657 
00658         proc->lwWaiting = true;
00659         proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
00660         proc->lwWaitLink = NULL;
00661         if (lock->head == NULL)
00662             lock->head = proc;
00663         else
00664             lock->tail->lwWaitLink = proc;
00665         lock->tail = proc;
00666 
00667         /* Can release the mutex now */
00668         SpinLockRelease(&lock->mutex);
00669 
00670         /*
00671          * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
00672          * wakups, because we share the semaphore with ProcWaitForSignal.
00673          */
00674         LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "waiting");
00675 
00676 #ifdef LWLOCK_STATS
00677         block_counts[lockid]++;
00678 #endif
00679 
00680         TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
00681 
00682         for (;;)
00683         {
00684             /* "false" means cannot accept cancel/die interrupt here. */
00685             PGSemaphoreLock(&proc->sem, false);
00686             if (!proc->lwWaiting)
00687                 break;
00688             extraWaits++;
00689         }
00690 
00691         TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
00692 
00693         LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "awakened");
00694     }
00695     else
00696     {
00697         /* We are done updating shared state of the lock itself. */
00698         SpinLockRelease(&lock->mutex);
00699     }
00700 
00701     /*
00702      * Fix the process wait semaphore's count for any absorbed wakeups.
00703      */
00704     while (extraWaits-- > 0)
00705         PGSemaphoreUnlock(&proc->sem);
00706 
00707     if (mustwait)
00708     {
00709         /* Failed to get lock, so release interrupt holdoff */
00710         RESUME_INTERRUPTS();
00711         LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "failed");
00712         TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode);
00713     }
00714     else
00715     {
00716         /* Add lock to list of locks held by this backend */
00717         held_lwlocks[num_held_lwlocks++] = lockid;
00718         TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode);
00719     }
00720 
00721     return !mustwait;
00722 }
00723 
00724 /*
00725  * LWLockRelease - release a previously acquired lock
00726  */
00727 void
00728 LWLockRelease(LWLockId lockid)
00729 {
00730     volatile LWLock *lock = &(LWLockArray[lockid].lock);
00731     PGPROC     *head;
00732     PGPROC     *proc;
00733     int         i;
00734 
00735     PRINT_LWDEBUG("LWLockRelease", lockid, lock);
00736 
00737     /*
00738      * Remove lock from list of locks held.  Usually, but not always, it will
00739      * be the latest-acquired lock; so search array backwards.
00740      */
00741     for (i = num_held_lwlocks; --i >= 0;)
00742     {
00743         if (lockid == held_lwlocks[i])
00744             break;
00745     }
00746     if (i < 0)
00747         elog(ERROR, "lock %d is not held", (int) lockid);
00748     num_held_lwlocks--;
00749     for (; i < num_held_lwlocks; i++)
00750         held_lwlocks[i] = held_lwlocks[i + 1];
00751 
00752     /* Acquire mutex.  Time spent holding mutex should be short! */
00753     SpinLockAcquire(&lock->mutex);
00754 
00755     /* Release my hold on lock */
00756     if (lock->exclusive > 0)
00757         lock->exclusive--;
00758     else
00759     {
00760         Assert(lock->shared > 0);
00761         lock->shared--;
00762     }
00763 
00764     /*
00765      * See if I need to awaken any waiters.  If I released a non-last shared
00766      * hold, there cannot be anything to do.  Also, do not awaken any waiters
00767      * if someone has already awakened waiters that haven't yet acquired the
00768      * lock.
00769      */
00770     head = lock->head;
00771     if (head != NULL)
00772     {
00773         if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
00774         {
00775             /*
00776              * Remove the to-be-awakened PGPROCs from the queue.
00777              */
00778             bool        releaseOK = true;
00779 
00780             proc = head;
00781 
00782             /*
00783              * First wake up any backends that want to be woken up without
00784              * acquiring the lock.
00785              */
00786             while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
00787                 proc = proc->lwWaitLink;
00788 
00789             /*
00790              * If the front waiter wants exclusive lock, awaken him only.
00791              * Otherwise awaken as many waiters as want shared access.
00792              */
00793             if (proc->lwWaitMode != LW_EXCLUSIVE)
00794             {
00795                 while (proc->lwWaitLink != NULL &&
00796                        proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
00797                 {
00798                     if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
00799                         releaseOK = false;
00800                     proc = proc->lwWaitLink;
00801                 }
00802             }
00803             /* proc is now the last PGPROC to be released */
00804             lock->head = proc->lwWaitLink;
00805             proc->lwWaitLink = NULL;
00806 
00807             /*
00808              * Prevent additional wakeups until retryer gets to run. Backends
00809              * that are just waiting for the lock to become free don't retry
00810              * automatically.
00811              */
00812             if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
00813                 releaseOK = false;
00814 
00815             lock->releaseOK = releaseOK;
00816         }
00817         else
00818         {
00819             /* lock is still held, can't awaken anything */
00820             head = NULL;
00821         }
00822     }
00823 
00824     /* We are done updating shared state of the lock itself. */
00825     SpinLockRelease(&lock->mutex);
00826 
00827     TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid);
00828 
00829     /*
00830      * Awaken any waiters I removed from the queue.
00831      */
00832     while (head != NULL)
00833     {
00834         LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
00835         proc = head;
00836         head = proc->lwWaitLink;
00837         proc->lwWaitLink = NULL;
00838         proc->lwWaiting = false;
00839         PGSemaphoreUnlock(&proc->sem);
00840     }
00841 
00842     /*
00843      * Now okay to allow cancel/die interrupts.
00844      */
00845     RESUME_INTERRUPTS();
00846 }
00847 
00848 
00849 /*
00850  * LWLockReleaseAll - release all currently-held locks
00851  *
00852  * Used to clean up after ereport(ERROR). An important difference between this
00853  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
00854  * unchanged by this operation.  This is necessary since InterruptHoldoffCount
00855  * has been set to an appropriate level earlier in error recovery. We could
00856  * decrement it below zero if we allow it to drop for each released lock!
00857  */
00858 void
00859 LWLockReleaseAll(void)
00860 {
00861     while (num_held_lwlocks > 0)
00862     {
00863         HOLD_INTERRUPTS();      /* match the upcoming RESUME_INTERRUPTS */
00864 
00865         LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
00866     }
00867 }
00868 
00869 
00870 /*
00871  * LWLockHeldByMe - test whether my process currently holds a lock
00872  *
00873  * This is meant as debug support only.  We do not distinguish whether the
00874  * lock is held shared or exclusive.
00875  */
00876 bool
00877 LWLockHeldByMe(LWLockId lockid)
00878 {
00879     int         i;
00880 
00881     for (i = 0; i < num_held_lwlocks; i++)
00882     {
00883         if (held_lwlocks[i] == lockid)
00884             return true;
00885     }
00886     return false;
00887 }