00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "postgres.h"
00023
00024 #include "access/clog.h"
00025 #include "access/multixact.h"
00026 #include "access/subtrans.h"
00027 #include "commands/async.h"
00028 #include "miscadmin.h"
00029 #include "pg_trace.h"
00030 #include "storage/ipc.h"
00031 #include "storage/predicate.h"
00032 #include "storage/proc.h"
00033 #include "storage/spin.h"
00034
00035
00036
00037 extern slock_t *ShmemLock;
00038
00039
00040 typedef struct LWLock
00041 {
00042 slock_t mutex;
00043 bool releaseOK;
00044 char exclusive;
00045 int shared;
00046 PGPROC *head;
00047 PGPROC *tail;
00048
00049 } LWLock;
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063 #define LWLOCK_PADDED_SIZE (sizeof(LWLock) <= 16 ? 16 : 32)
00064
00065 typedef union LWLockPadded
00066 {
00067 LWLock lock;
00068 char pad[LWLOCK_PADDED_SIZE];
00069 } LWLockPadded;
00070
00071
00072
00073
00074
00075
00076 NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
00077
00078
00079
00080
00081
00082
00083
00084
00085 #define MAX_SIMUL_LWLOCKS 100
00086
00087 static int num_held_lwlocks = 0;
00088 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
00089
00090 static int lock_addin_request = 0;
00091 static bool lock_addin_request_allowed = true;
00092
00093 #ifdef LWLOCK_STATS
00094 static int counts_for_pid = 0;
00095 static int *sh_acquire_counts;
00096 static int *ex_acquire_counts;
00097 static int *block_counts;
00098 static int *spin_delay_counts;
00099 #endif
00100
00101 #ifdef LOCK_DEBUG
00102 bool Trace_lwlocks = false;
00103
00104 inline static void
00105 PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
00106 {
00107 if (Trace_lwlocks)
00108 elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
00109 where, (int) lockid,
00110 (int) lock->exclusive, lock->shared, lock->head,
00111 (int) lock->releaseOK);
00112 }
00113
00114 inline static void
00115 LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
00116 {
00117 if (Trace_lwlocks)
00118 elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
00119 }
00120 #else
00121 #define PRINT_LWDEBUG(a,b,c)
00122 #define LOG_LWDEBUG(a,b,c)
00123 #endif
00124
00125 #ifdef LWLOCK_STATS
00126
00127 static void init_lwlock_stats(void);
00128 static void print_lwlock_stats(int code, Datum arg);
00129
00130 static void
00131 init_lwlock_stats(void)
00132 {
00133 int *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
00134 int numLocks = LWLockCounter[1];
00135
00136 sh_acquire_counts = calloc(numLocks, sizeof(int));
00137 ex_acquire_counts = calloc(numLocks, sizeof(int));
00138 spin_delay_counts = calloc(numLocks, sizeof(int));
00139 block_counts = calloc(numLocks, sizeof(int));
00140 counts_for_pid = MyProcPid;
00141 on_shmem_exit(print_lwlock_stats, 0);
00142 }
00143
00144 static void
00145 print_lwlock_stats(int code, Datum arg)
00146 {
00147 int i;
00148 int *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
00149 int numLocks = LWLockCounter[1];
00150
00151
00152 LWLockAcquire(0, LW_EXCLUSIVE);
00153
00154 for (i = 0; i < numLocks; i++)
00155 {
00156 if (sh_acquire_counts[i] || ex_acquire_counts[i] || block_counts[i] || spin_delay_counts[i])
00157 fprintf(stderr, "PID %d lwlock %d: shacq %u exacq %u blk %u spindelay %u\n",
00158 MyProcPid, i, sh_acquire_counts[i], ex_acquire_counts[i],
00159 block_counts[i], spin_delay_counts[i]);
00160 }
00161
00162 LWLockRelease(0);
00163 }
00164 #endif
00165
00166
00167
00168
00169
00170 int
00171 NumLWLocks(void)
00172 {
00173 int numLocks;
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183 numLocks = (int) NumFixedLWLocks;
00184
00185
00186 numLocks += 2 * NBuffers;
00187
00188
00189 numLocks += MaxBackends + NUM_AUXILIARY_PROCS;
00190
00191
00192 numLocks += CLOGShmemBuffers();
00193
00194
00195 numLocks += NUM_SUBTRANS_BUFFERS;
00196
00197
00198 numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS;
00199
00200
00201 numLocks += NUM_ASYNC_BUFFERS;
00202
00203
00204 numLocks += NUM_OLDSERXID_BUFFERS;
00205
00206
00207
00208
00209
00210
00211 lock_addin_request_allowed = false;
00212 numLocks += Max(lock_addin_request, NUM_USER_DEFINED_LWLOCKS);
00213
00214 return numLocks;
00215 }
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229 void
00230 RequestAddinLWLocks(int n)
00231 {
00232 if (IsUnderPostmaster || !lock_addin_request_allowed)
00233 return;
00234 lock_addin_request += n;
00235 }
00236
00237
00238
00239
00240
00241 Size
00242 LWLockShmemSize(void)
00243 {
00244 Size size;
00245 int numLocks = NumLWLocks();
00246
00247
00248 size = mul_size(numLocks, sizeof(LWLockPadded));
00249
00250
00251 size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE);
00252
00253 return size;
00254 }
00255
00256
00257
00258
00259
00260 void
00261 CreateLWLocks(void)
00262 {
00263 int numLocks = NumLWLocks();
00264 Size spaceLocks = LWLockShmemSize();
00265 LWLockPadded *lock;
00266 int *LWLockCounter;
00267 char *ptr;
00268 int id;
00269
00270
00271 ptr = (char *) ShmemAlloc(spaceLocks);
00272
00273
00274 ptr += 2 * sizeof(int);
00275
00276
00277 ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
00278
00279 LWLockArray = (LWLockPadded *) ptr;
00280
00281
00282
00283
00284 for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
00285 {
00286 SpinLockInit(&lock->lock.mutex);
00287 lock->lock.releaseOK = true;
00288 lock->lock.exclusive = 0;
00289 lock->lock.shared = 0;
00290 lock->lock.head = NULL;
00291 lock->lock.tail = NULL;
00292 }
00293
00294
00295
00296
00297
00298 LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
00299 LWLockCounter[0] = (int) NumFixedLWLocks;
00300 LWLockCounter[1] = numLocks;
00301 }
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312 LWLockId
00313 LWLockAssign(void)
00314 {
00315 LWLockId result;
00316
00317
00318 volatile int *LWLockCounter;
00319
00320 LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
00321 SpinLockAcquire(ShmemLock);
00322 if (LWLockCounter[0] >= LWLockCounter[1])
00323 {
00324 SpinLockRelease(ShmemLock);
00325 elog(ERROR, "no more LWLockIds available");
00326 }
00327 result = (LWLockId) (LWLockCounter[0]++);
00328 SpinLockRelease(ShmemLock);
00329 return result;
00330 }
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340 void
00341 LWLockAcquire(LWLockId lockid, LWLockMode mode)
00342 {
00343 volatile LWLock *lock = &(LWLockArray[lockid].lock);
00344 PGPROC *proc = MyProc;
00345 bool retry = false;
00346 int extraWaits = 0;
00347
00348 PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
00349
00350 #ifdef LWLOCK_STATS
00351
00352 if (counts_for_pid != MyProcPid)
00353 init_lwlock_stats();
00354
00355 if (mode == LW_EXCLUSIVE)
00356 ex_acquire_counts[lockid]++;
00357 else
00358 sh_acquire_counts[lockid]++;
00359 #endif
00360
00361
00362
00363
00364
00365
00366 Assert(!(proc == NULL && IsUnderPostmaster));
00367
00368
00369 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
00370 elog(ERROR, "too many LWLocks taken");
00371
00372
00373
00374
00375
00376
00377 HOLD_INTERRUPTS();
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395 for (;;)
00396 {
00397 bool mustwait;
00398
00399
00400 #ifdef LWLOCK_STATS
00401 spin_delay_counts[lockid] += SpinLockAcquire(&lock->mutex);
00402 #else
00403 SpinLockAcquire(&lock->mutex);
00404 #endif
00405
00406
00407 if (retry)
00408 lock->releaseOK = true;
00409
00410
00411 if (mode == LW_EXCLUSIVE)
00412 {
00413 if (lock->exclusive == 0 && lock->shared == 0)
00414 {
00415 lock->exclusive++;
00416 mustwait = false;
00417 }
00418 else
00419 mustwait = true;
00420 }
00421 else
00422 {
00423 if (lock->exclusive == 0)
00424 {
00425 lock->shared++;
00426 mustwait = false;
00427 }
00428 else
00429 mustwait = true;
00430 }
00431
00432 if (!mustwait)
00433 break;
00434
00435
00436
00437
00438
00439
00440
00441
00442 if (proc == NULL)
00443 elog(PANIC, "cannot wait without a PGPROC structure");
00444
00445 proc->lwWaiting = true;
00446 proc->lwWaitMode = mode;
00447 proc->lwWaitLink = NULL;
00448 if (lock->head == NULL)
00449 lock->head = proc;
00450 else
00451 lock->tail->lwWaitLink = proc;
00452 lock->tail = proc;
00453
00454
00455 SpinLockRelease(&lock->mutex);
00456
00457
00458
00459
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469 LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
00470
00471 #ifdef LWLOCK_STATS
00472 block_counts[lockid]++;
00473 #endif
00474
00475 TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
00476
00477 for (;;)
00478 {
00479
00480 PGSemaphoreLock(&proc->sem, false);
00481 if (!proc->lwWaiting)
00482 break;
00483 extraWaits++;
00484 }
00485
00486 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
00487
00488 LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
00489
00490
00491 retry = true;
00492 }
00493
00494
00495 SpinLockRelease(&lock->mutex);
00496
00497 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode);
00498
00499
00500 held_lwlocks[num_held_lwlocks++] = lockid;
00501
00502
00503
00504
00505 while (extraWaits-- > 0)
00506 PGSemaphoreUnlock(&proc->sem);
00507 }
00508
00509
00510
00511
00512
00513
00514
00515
00516 bool
00517 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
00518 {
00519 volatile LWLock *lock = &(LWLockArray[lockid].lock);
00520 bool mustwait;
00521
00522 PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
00523
00524
00525 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
00526 elog(ERROR, "too many LWLocks taken");
00527
00528
00529
00530
00531
00532
00533 HOLD_INTERRUPTS();
00534
00535
00536 SpinLockAcquire(&lock->mutex);
00537
00538
00539 if (mode == LW_EXCLUSIVE)
00540 {
00541 if (lock->exclusive == 0 && lock->shared == 0)
00542 {
00543 lock->exclusive++;
00544 mustwait = false;
00545 }
00546 else
00547 mustwait = true;
00548 }
00549 else
00550 {
00551 if (lock->exclusive == 0)
00552 {
00553 lock->shared++;
00554 mustwait = false;
00555 }
00556 else
00557 mustwait = true;
00558 }
00559
00560
00561 SpinLockRelease(&lock->mutex);
00562
00563 if (mustwait)
00564 {
00565
00566 RESUME_INTERRUPTS();
00567 LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
00568 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode);
00569 }
00570 else
00571 {
00572
00573 held_lwlocks[num_held_lwlocks++] = lockid;
00574 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode);
00575 }
00576
00577 return !mustwait;
00578 }
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594 bool
00595 LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode)
00596 {
00597 volatile LWLock *lock = &(LWLockArray[lockid].lock);
00598 PGPROC *proc = MyProc;
00599 bool mustwait;
00600 int extraWaits = 0;
00601
00602 PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock);
00603
00604 #ifdef LWLOCK_STATS
00605
00606 if (counts_for_pid != MyProcPid)
00607 init_lwlock_stats();
00608 #endif
00609
00610
00611 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
00612 elog(ERROR, "too many LWLocks taken");
00613
00614
00615
00616
00617
00618
00619 HOLD_INTERRUPTS();
00620
00621
00622 SpinLockAcquire(&lock->mutex);
00623
00624
00625 if (mode == LW_EXCLUSIVE)
00626 {
00627 if (lock->exclusive == 0 && lock->shared == 0)
00628 {
00629 lock->exclusive++;
00630 mustwait = false;
00631 }
00632 else
00633 mustwait = true;
00634 }
00635 else
00636 {
00637 if (lock->exclusive == 0)
00638 {
00639 lock->shared++;
00640 mustwait = false;
00641 }
00642 else
00643 mustwait = true;
00644 }
00645
00646 if (mustwait)
00647 {
00648
00649
00650
00651
00652
00653
00654
00655 if (proc == NULL)
00656 elog(PANIC, "cannot wait without a PGPROC structure");
00657
00658 proc->lwWaiting = true;
00659 proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
00660 proc->lwWaitLink = NULL;
00661 if (lock->head == NULL)
00662 lock->head = proc;
00663 else
00664 lock->tail->lwWaitLink = proc;
00665 lock->tail = proc;
00666
00667
00668 SpinLockRelease(&lock->mutex);
00669
00670
00671
00672
00673
00674 LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "waiting");
00675
00676 #ifdef LWLOCK_STATS
00677 block_counts[lockid]++;
00678 #endif
00679
00680 TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
00681
00682 for (;;)
00683 {
00684
00685 PGSemaphoreLock(&proc->sem, false);
00686 if (!proc->lwWaiting)
00687 break;
00688 extraWaits++;
00689 }
00690
00691 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
00692
00693 LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "awakened");
00694 }
00695 else
00696 {
00697
00698 SpinLockRelease(&lock->mutex);
00699 }
00700
00701
00702
00703
00704 while (extraWaits-- > 0)
00705 PGSemaphoreUnlock(&proc->sem);
00706
00707 if (mustwait)
00708 {
00709
00710 RESUME_INTERRUPTS();
00711 LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "failed");
00712 TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode);
00713 }
00714 else
00715 {
00716
00717 held_lwlocks[num_held_lwlocks++] = lockid;
00718 TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode);
00719 }
00720
00721 return !mustwait;
00722 }
00723
00724
00725
00726
00727 void
00728 LWLockRelease(LWLockId lockid)
00729 {
00730 volatile LWLock *lock = &(LWLockArray[lockid].lock);
00731 PGPROC *head;
00732 PGPROC *proc;
00733 int i;
00734
00735 PRINT_LWDEBUG("LWLockRelease", lockid, lock);
00736
00737
00738
00739
00740
00741 for (i = num_held_lwlocks; --i >= 0;)
00742 {
00743 if (lockid == held_lwlocks[i])
00744 break;
00745 }
00746 if (i < 0)
00747 elog(ERROR, "lock %d is not held", (int) lockid);
00748 num_held_lwlocks--;
00749 for (; i < num_held_lwlocks; i++)
00750 held_lwlocks[i] = held_lwlocks[i + 1];
00751
00752
00753 SpinLockAcquire(&lock->mutex);
00754
00755
00756 if (lock->exclusive > 0)
00757 lock->exclusive--;
00758 else
00759 {
00760 Assert(lock->shared > 0);
00761 lock->shared--;
00762 }
00763
00764
00765
00766
00767
00768
00769
00770 head = lock->head;
00771 if (head != NULL)
00772 {
00773 if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
00774 {
00775
00776
00777
00778 bool releaseOK = true;
00779
00780 proc = head;
00781
00782
00783
00784
00785
00786 while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
00787 proc = proc->lwWaitLink;
00788
00789
00790
00791
00792
00793 if (proc->lwWaitMode != LW_EXCLUSIVE)
00794 {
00795 while (proc->lwWaitLink != NULL &&
00796 proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
00797 {
00798 if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
00799 releaseOK = false;
00800 proc = proc->lwWaitLink;
00801 }
00802 }
00803
00804 lock->head = proc->lwWaitLink;
00805 proc->lwWaitLink = NULL;
00806
00807
00808
00809
00810
00811
00812 if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
00813 releaseOK = false;
00814
00815 lock->releaseOK = releaseOK;
00816 }
00817 else
00818 {
00819
00820 head = NULL;
00821 }
00822 }
00823
00824
00825 SpinLockRelease(&lock->mutex);
00826
00827 TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid);
00828
00829
00830
00831
00832 while (head != NULL)
00833 {
00834 LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
00835 proc = head;
00836 head = proc->lwWaitLink;
00837 proc->lwWaitLink = NULL;
00838 proc->lwWaiting = false;
00839 PGSemaphoreUnlock(&proc->sem);
00840 }
00841
00842
00843
00844
00845 RESUME_INTERRUPTS();
00846 }
00847
00848
00849
00850
00851
00852
00853
00854
00855
00856
00857
00858 void
00859 LWLockReleaseAll(void)
00860 {
00861 while (num_held_lwlocks > 0)
00862 {
00863 HOLD_INTERRUPTS();
00864
00865 LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
00866 }
00867 }
00868
00869
00870
00871
00872
00873
00874
00875
00876 bool
00877 LWLockHeldByMe(LWLockId lockid)
00878 {
00879 int i;
00880
00881 for (i = 0; i < num_held_lwlocks; i++)
00882 {
00883 if (held_lwlocks[i] == lockid)
00884 return true;
00885 }
00886 return false;
00887 }