00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "postgres.h"
00032
00033 #include <sys/file.h>
00034 #include <unistd.h>
00035
00036 #include "catalog/catalog.h"
00037 #include "catalog/storage.h"
00038 #include "common/relpath.h"
00039 #include "executor/instrument.h"
00040 #include "miscadmin.h"
00041 #include "pg_trace.h"
00042 #include "pgstat.h"
00043 #include "postmaster/bgwriter.h"
00044 #include "storage/buf_internals.h"
00045 #include "storage/bufmgr.h"
00046 #include "storage/ipc.h"
00047 #include "storage/proc.h"
00048 #include "storage/smgr.h"
00049 #include "storage/standby.h"
00050 #include "utils/rel.h"
00051 #include "utils/resowner_private.h"
00052 #include "utils/timestamp.h"
00053
00054
00055
00056 #define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
00057 #define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr)))
00058
00059
00060 #define LocalBufHdrGetBlock(bufHdr) \
00061 LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
00062
00063
00064 #define BUF_WRITTEN 0x01
00065 #define BUF_REUSABLE 0x02
00066
00067 #define DROP_RELS_BSEARCH_THRESHOLD 20
00068
00069
00070 bool zero_damaged_pages = false;
00071 int bgwriter_lru_maxpages = 100;
00072 double bgwriter_lru_multiplier = 2.0;
00073 bool track_io_timing = false;
00074
00075
00076
00077
00078
00079
00080 int target_prefetch_pages = 0;
00081
00082
00083 static volatile BufferDesc *InProgressBuf = NULL;
00084 static bool IsForInput;
00085
00086
00087 static volatile BufferDesc *PinCountWaitBuf = NULL;
00088
00089
00090 static Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence,
00091 ForkNumber forkNum, BlockNumber blockNum,
00092 ReadBufferMode mode, BufferAccessStrategy strategy,
00093 bool *hit);
00094 static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
00095 static void PinBuffer_Locked(volatile BufferDesc *buf);
00096 static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
00097 static void BufferSync(int flags);
00098 static int SyncOneBuffer(int buf_id, bool skip_recently_used);
00099 static void WaitIO(volatile BufferDesc *buf);
00100 static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
00101 static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
00102 int set_flag_bits);
00103 static void shared_buffer_write_error_callback(void *arg);
00104 static void local_buffer_write_error_callback(void *arg);
00105 static volatile BufferDesc *BufferAlloc(SMgrRelation smgr,
00106 char relpersistence,
00107 ForkNumber forkNum,
00108 BlockNumber blockNum,
00109 BufferAccessStrategy strategy,
00110 bool *foundPtr);
00111 static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
00112 static void AtProcExit_Buffers(int code, Datum arg);
00113 static int rnode_comparator(const void *p1, const void *p2);
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124 void
00125 PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
00126 {
00127 #ifdef USE_PREFETCH
00128 Assert(RelationIsValid(reln));
00129 Assert(BlockNumberIsValid(blockNum));
00130
00131
00132 RelationOpenSmgr(reln);
00133
00134 if (RelationUsesLocalBuffers(reln))
00135 {
00136
00137 if (RELATION_IS_OTHER_TEMP(reln))
00138 ereport(ERROR,
00139 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00140 errmsg("cannot access temporary tables of other sessions")));
00141
00142
00143 LocalPrefetchBuffer(reln->rd_smgr, forkNum, blockNum);
00144 }
00145 else
00146 {
00147 BufferTag newTag;
00148 uint32 newHash;
00149 LWLockId newPartitionLock;
00150 int buf_id;
00151
00152
00153 INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node,
00154 forkNum, blockNum);
00155
00156
00157 newHash = BufTableHashCode(&newTag);
00158 newPartitionLock = BufMappingPartitionLock(newHash);
00159
00160
00161 LWLockAcquire(newPartitionLock, LW_SHARED);
00162 buf_id = BufTableLookup(&newTag, newHash);
00163 LWLockRelease(newPartitionLock);
00164
00165
00166 if (buf_id < 0)
00167 smgrprefetch(reln->rd_smgr, forkNum, blockNum);
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180 }
00181 #endif
00182 }
00183
00184
00185
00186
00187
00188
00189 Buffer
00190 ReadBuffer(Relation reln, BlockNumber blockNum)
00191 {
00192 return ReadBufferExtended(reln, MAIN_FORKNUM, blockNum, RBM_NORMAL, NULL);
00193 }
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227 Buffer
00228 ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
00229 ReadBufferMode mode, BufferAccessStrategy strategy)
00230 {
00231 bool hit;
00232 Buffer buf;
00233
00234
00235 RelationOpenSmgr(reln);
00236
00237
00238
00239
00240
00241
00242 if (RELATION_IS_OTHER_TEMP(reln))
00243 ereport(ERROR,
00244 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00245 errmsg("cannot access temporary tables of other sessions")));
00246
00247
00248
00249
00250
00251 pgstat_count_buffer_read(reln);
00252 buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence,
00253 forkNum, blockNum, mode, strategy, &hit);
00254 if (hit)
00255 pgstat_count_buffer_hit(reln);
00256 return buf;
00257 }
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269 Buffer
00270 ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
00271 BlockNumber blockNum, ReadBufferMode mode,
00272 BufferAccessStrategy strategy)
00273 {
00274 bool hit;
00275
00276 SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
00277
00278 Assert(InRecovery);
00279
00280 return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
00281 mode, strategy, &hit);
00282 }
00283
00284
00285
00286
00287
00288
00289
00290 static Buffer
00291 ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
00292 BlockNumber blockNum, ReadBufferMode mode,
00293 BufferAccessStrategy strategy, bool *hit)
00294 {
00295 volatile BufferDesc *bufHdr;
00296 Block bufBlock;
00297 bool found;
00298 bool isExtend;
00299 bool isLocalBuf = SmgrIsTemp(smgr);
00300
00301 *hit = false;
00302
00303
00304 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
00305
00306 isExtend = (blockNum == P_NEW);
00307
00308 TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
00309 smgr->smgr_rnode.node.spcNode,
00310 smgr->smgr_rnode.node.dbNode,
00311 smgr->smgr_rnode.node.relNode,
00312 smgr->smgr_rnode.backend,
00313 isExtend);
00314
00315
00316 if (isExtend)
00317 blockNum = smgrnblocks(smgr, forkNum);
00318
00319 if (isLocalBuf)
00320 {
00321 bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
00322 if (found)
00323 pgBufferUsage.local_blks_hit++;
00324 else
00325 pgBufferUsage.local_blks_read++;
00326 }
00327 else
00328 {
00329
00330
00331
00332
00333 bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
00334 strategy, &found);
00335 if (found)
00336 pgBufferUsage.shared_blks_hit++;
00337 else
00338 pgBufferUsage.shared_blks_read++;
00339 }
00340
00341
00342
00343
00344 if (found)
00345 {
00346 if (!isExtend)
00347 {
00348
00349 *hit = true;
00350 VacuumPageHit++;
00351
00352 if (VacuumCostActive)
00353 VacuumCostBalance += VacuumCostPageHit;
00354
00355 TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
00356 smgr->smgr_rnode.node.spcNode,
00357 smgr->smgr_rnode.node.dbNode,
00358 smgr->smgr_rnode.node.relNode,
00359 smgr->smgr_rnode.backend,
00360 isExtend,
00361 found);
00362
00363 return BufferDescriptorGetBuffer(bufHdr);
00364 }
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379 bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
00380 if (!PageIsNew((Page) bufBlock))
00381 ereport(ERROR,
00382 (errmsg("unexpected data beyond EOF in block %u of relation %s",
00383 blockNum, relpath(smgr->smgr_rnode, forkNum)),
00384 errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
00385
00386
00387
00388
00389
00390
00391
00392 if (isLocalBuf)
00393 {
00394
00395 Assert(bufHdr->flags & BM_VALID);
00396 bufHdr->flags &= ~BM_VALID;
00397 }
00398 else
00399 {
00400
00401
00402
00403
00404
00405 do
00406 {
00407 LockBufHdr(bufHdr);
00408 Assert(bufHdr->flags & BM_VALID);
00409 bufHdr->flags &= ~BM_VALID;
00410 UnlockBufHdr(bufHdr);
00411 } while (!StartBufferIO(bufHdr, true));
00412 }
00413 }
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427 Assert(!(bufHdr->flags & BM_VALID));
00428
00429 bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
00430
00431 if (isExtend)
00432 {
00433
00434 MemSet((char *) bufBlock, 0, BLCKSZ);
00435
00436 smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
00437 }
00438 else
00439 {
00440
00441
00442
00443
00444 if (mode == RBM_ZERO)
00445 MemSet((char *) bufBlock, 0, BLCKSZ);
00446 else
00447 {
00448 instr_time io_start,
00449 io_time;
00450
00451 if (track_io_timing)
00452 INSTR_TIME_SET_CURRENT(io_start);
00453
00454 smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
00455
00456 if (track_io_timing)
00457 {
00458 INSTR_TIME_SET_CURRENT(io_time);
00459 INSTR_TIME_SUBTRACT(io_time, io_start);
00460 pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
00461 INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
00462 }
00463
00464
00465 if (!PageIsVerified((Page) bufBlock, blockNum))
00466 {
00467 if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
00468 {
00469 ereport(WARNING,
00470 (errcode(ERRCODE_DATA_CORRUPTED),
00471 errmsg("invalid page in block %u of relation %s; zeroing out page",
00472 blockNum,
00473 relpath(smgr->smgr_rnode, forkNum))));
00474 MemSet((char *) bufBlock, 0, BLCKSZ);
00475 }
00476 else
00477 ereport(ERROR,
00478 (errcode(ERRCODE_DATA_CORRUPTED),
00479 errmsg("invalid page in block %u of relation %s",
00480 blockNum,
00481 relpath(smgr->smgr_rnode, forkNum))));
00482 }
00483 }
00484 }
00485
00486 if (isLocalBuf)
00487 {
00488
00489 bufHdr->flags |= BM_VALID;
00490 }
00491 else
00492 {
00493
00494 TerminateBufferIO(bufHdr, false, BM_VALID);
00495 }
00496
00497 VacuumPageMiss++;
00498 if (VacuumCostActive)
00499 VacuumCostBalance += VacuumCostPageMiss;
00500
00501 TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
00502 smgr->smgr_rnode.node.spcNode,
00503 smgr->smgr_rnode.node.dbNode,
00504 smgr->smgr_rnode.node.relNode,
00505 smgr->smgr_rnode.backend,
00506 isExtend,
00507 found);
00508
00509 return BufferDescriptorGetBuffer(bufHdr);
00510 }
00511
00512
00513
00514
00515
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531 static volatile BufferDesc *
00532 BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
00533 BlockNumber blockNum,
00534 BufferAccessStrategy strategy,
00535 bool *foundPtr)
00536 {
00537 BufferTag newTag;
00538 uint32 newHash;
00539 LWLockId newPartitionLock;
00540 BufferTag oldTag;
00541 uint32 oldHash;
00542 LWLockId oldPartitionLock;
00543 BufFlags oldFlags;
00544 int buf_id;
00545 volatile BufferDesc *buf;
00546 bool valid;
00547
00548
00549 INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
00550
00551
00552 newHash = BufTableHashCode(&newTag);
00553 newPartitionLock = BufMappingPartitionLock(newHash);
00554
00555
00556 LWLockAcquire(newPartitionLock, LW_SHARED);
00557 buf_id = BufTableLookup(&newTag, newHash);
00558 if (buf_id >= 0)
00559 {
00560
00561
00562
00563
00564
00565 buf = &BufferDescriptors[buf_id];
00566
00567 valid = PinBuffer(buf, strategy);
00568
00569
00570 LWLockRelease(newPartitionLock);
00571
00572 *foundPtr = TRUE;
00573
00574 if (!valid)
00575 {
00576
00577
00578
00579
00580
00581
00582
00583 if (StartBufferIO(buf, true))
00584 {
00585
00586
00587
00588
00589 *foundPtr = FALSE;
00590 }
00591 }
00592
00593 return buf;
00594 }
00595
00596
00597
00598
00599
00600 LWLockRelease(newPartitionLock);
00601
00602
00603 for (;;)
00604 {
00605 bool lock_held;
00606
00607
00608
00609
00610
00611
00612
00613 buf = StrategyGetBuffer(strategy, &lock_held);
00614
00615 Assert(buf->refcount == 0);
00616
00617
00618 oldFlags = buf->flags;
00619
00620
00621 PinBuffer_Locked(buf);
00622
00623
00624 if (lock_held)
00625 LWLockRelease(BufFreelistLock);
00626
00627
00628
00629
00630
00631
00632
00633
00634 if (oldFlags & BM_DIRTY)
00635 {
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651 if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED))
00652 {
00653
00654
00655
00656
00657
00658
00659
00660 if (strategy != NULL)
00661 {
00662 XLogRecPtr lsn;
00663
00664
00665 LockBufHdr(buf);
00666 lsn = BufferGetLSN(buf);
00667 UnlockBufHdr(buf);
00668
00669 if (XLogNeedsFlush(lsn) &&
00670 StrategyRejectBuffer(strategy, buf))
00671 {
00672
00673 LWLockRelease(buf->content_lock);
00674 UnpinBuffer(buf, true);
00675 continue;
00676 }
00677 }
00678
00679
00680 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(forkNum, blockNum,
00681 smgr->smgr_rnode.node.spcNode,
00682 smgr->smgr_rnode.node.dbNode,
00683 smgr->smgr_rnode.node.relNode);
00684
00685 FlushBuffer(buf, NULL);
00686 LWLockRelease(buf->content_lock);
00687
00688 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
00689 smgr->smgr_rnode.node.spcNode,
00690 smgr->smgr_rnode.node.dbNode,
00691 smgr->smgr_rnode.node.relNode);
00692 }
00693 else
00694 {
00695
00696
00697
00698
00699 UnpinBuffer(buf, true);
00700 continue;
00701 }
00702 }
00703
00704
00705
00706
00707
00708 if (oldFlags & BM_TAG_VALID)
00709 {
00710
00711
00712
00713
00714
00715 oldTag = buf->tag;
00716 oldHash = BufTableHashCode(&oldTag);
00717 oldPartitionLock = BufMappingPartitionLock(oldHash);
00718
00719
00720
00721
00722
00723 if (oldPartitionLock < newPartitionLock)
00724 {
00725 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
00726 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
00727 }
00728 else if (oldPartitionLock > newPartitionLock)
00729 {
00730 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
00731 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
00732 }
00733 else
00734 {
00735
00736 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
00737 }
00738 }
00739 else
00740 {
00741
00742 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
00743
00744 oldHash = 0;
00745 oldPartitionLock = 0;
00746 }
00747
00748
00749
00750
00751
00752
00753
00754
00755 buf_id = BufTableInsert(&newTag, newHash, buf->buf_id);
00756
00757 if (buf_id >= 0)
00758 {
00759
00760
00761
00762
00763
00764
00765 UnpinBuffer(buf, true);
00766
00767
00768 if ((oldFlags & BM_TAG_VALID) &&
00769 oldPartitionLock != newPartitionLock)
00770 LWLockRelease(oldPartitionLock);
00771
00772
00773
00774 buf = &BufferDescriptors[buf_id];
00775
00776 valid = PinBuffer(buf, strategy);
00777
00778
00779 LWLockRelease(newPartitionLock);
00780
00781 *foundPtr = TRUE;
00782
00783 if (!valid)
00784 {
00785
00786
00787
00788
00789
00790
00791
00792 if (StartBufferIO(buf, true))
00793 {
00794
00795
00796
00797
00798 *foundPtr = FALSE;
00799 }
00800 }
00801
00802 return buf;
00803 }
00804
00805
00806
00807
00808 LockBufHdr(buf);
00809
00810
00811
00812
00813
00814
00815
00816 oldFlags = buf->flags;
00817 if (buf->refcount == 1 && !(oldFlags & BM_DIRTY))
00818 break;
00819
00820 UnlockBufHdr(buf);
00821 BufTableDelete(&newTag, newHash);
00822 if ((oldFlags & BM_TAG_VALID) &&
00823 oldPartitionLock != newPartitionLock)
00824 LWLockRelease(oldPartitionLock);
00825 LWLockRelease(newPartitionLock);
00826 UnpinBuffer(buf, true);
00827 }
00828
00829
00830
00831
00832
00833
00834
00835
00836
00837 buf->tag = newTag;
00838 buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT);
00839 if (relpersistence == RELPERSISTENCE_PERMANENT)
00840 buf->flags |= BM_TAG_VALID | BM_PERMANENT;
00841 else
00842 buf->flags |= BM_TAG_VALID;
00843 buf->usage_count = 1;
00844
00845 UnlockBufHdr(buf);
00846
00847 if (oldFlags & BM_TAG_VALID)
00848 {
00849 BufTableDelete(&oldTag, oldHash);
00850 if (oldPartitionLock != newPartitionLock)
00851 LWLockRelease(oldPartitionLock);
00852 }
00853
00854 LWLockRelease(newPartitionLock);
00855
00856
00857
00858
00859
00860
00861 if (StartBufferIO(buf, true))
00862 *foundPtr = FALSE;
00863 else
00864 *foundPtr = TRUE;
00865
00866 return buf;
00867 }
00868
00869
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880
00881
00882
00883
00884
00885
00886 static void
00887 InvalidateBuffer(volatile BufferDesc *buf)
00888 {
00889 BufferTag oldTag;
00890 uint32 oldHash;
00891 LWLockId oldPartitionLock;
00892 BufFlags oldFlags;
00893
00894
00895 oldTag = buf->tag;
00896
00897 UnlockBufHdr(buf);
00898
00899
00900
00901
00902
00903
00904 oldHash = BufTableHashCode(&oldTag);
00905 oldPartitionLock = BufMappingPartitionLock(oldHash);
00906
00907 retry:
00908
00909
00910
00911
00912
00913 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
00914
00915
00916 LockBufHdr(buf);
00917
00918
00919 if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
00920 {
00921 UnlockBufHdr(buf);
00922 LWLockRelease(oldPartitionLock);
00923 return;
00924 }
00925
00926
00927
00928
00929
00930
00931
00932
00933
00934
00935 if (buf->refcount != 0)
00936 {
00937 UnlockBufHdr(buf);
00938 LWLockRelease(oldPartitionLock);
00939
00940 if (PrivateRefCount[buf->buf_id] != 0)
00941 elog(ERROR, "buffer is pinned in InvalidateBuffer");
00942 WaitIO(buf);
00943 goto retry;
00944 }
00945
00946
00947
00948
00949
00950 oldFlags = buf->flags;
00951 CLEAR_BUFFERTAG(buf->tag);
00952 buf->flags = 0;
00953 buf->usage_count = 0;
00954
00955 UnlockBufHdr(buf);
00956
00957
00958
00959
00960 if (oldFlags & BM_TAG_VALID)
00961 BufTableDelete(&oldTag, oldHash);
00962
00963
00964
00965
00966 LWLockRelease(oldPartitionLock);
00967
00968
00969
00970
00971 StrategyFreeBuffer(buf);
00972 }
00973
00974
00975
00976
00977
00978
00979
00980
00981
00982
00983 void
00984 MarkBufferDirty(Buffer buffer)
00985 {
00986 volatile BufferDesc *bufHdr;
00987
00988 if (!BufferIsValid(buffer))
00989 elog(ERROR, "bad buffer ID: %d", buffer);
00990
00991 if (BufferIsLocal(buffer))
00992 {
00993 MarkLocalBufferDirty(buffer);
00994 return;
00995 }
00996
00997 bufHdr = &BufferDescriptors[buffer - 1];
00998
00999 Assert(PrivateRefCount[buffer - 1] > 0);
01000
01001 Assert(LWLockHeldByMe(bufHdr->content_lock));
01002
01003 LockBufHdr(bufHdr);
01004
01005 Assert(bufHdr->refcount > 0);
01006
01007
01008
01009
01010 if (!(bufHdr->flags & BM_DIRTY))
01011 {
01012 VacuumPageDirty++;
01013 pgBufferUsage.shared_blks_dirtied++;
01014 if (VacuumCostActive)
01015 VacuumCostBalance += VacuumCostPageDirty;
01016 }
01017
01018 bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
01019
01020 UnlockBufHdr(bufHdr);
01021 }
01022
01023
01024
01025
01026
01027
01028
01029
01030
01031
01032
01033
01034
01035
01036 Buffer
01037 ReleaseAndReadBuffer(Buffer buffer,
01038 Relation relation,
01039 BlockNumber blockNum)
01040 {
01041 ForkNumber forkNum = MAIN_FORKNUM;
01042 volatile BufferDesc *bufHdr;
01043
01044 if (BufferIsValid(buffer))
01045 {
01046 if (BufferIsLocal(buffer))
01047 {
01048 Assert(LocalRefCount[-buffer - 1] > 0);
01049 bufHdr = &LocalBufferDescriptors[-buffer - 1];
01050 if (bufHdr->tag.blockNum == blockNum &&
01051 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
01052 bufHdr->tag.forkNum == forkNum)
01053 return buffer;
01054 ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
01055 LocalRefCount[-buffer - 1]--;
01056 }
01057 else
01058 {
01059 Assert(PrivateRefCount[buffer - 1] > 0);
01060 bufHdr = &BufferDescriptors[buffer - 1];
01061
01062 if (bufHdr->tag.blockNum == blockNum &&
01063 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
01064 bufHdr->tag.forkNum == forkNum)
01065 return buffer;
01066 UnpinBuffer(bufHdr, true);
01067 }
01068 }
01069
01070 return ReadBuffer(relation, blockNum);
01071 }
01072
01073
01074
01075
01076
01077
01078
01079
01080
01081
01082
01083
01084
01085
01086
01087
01088
01089
01090
01091 static bool
01092 PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy)
01093 {
01094 int b = buf->buf_id;
01095 bool result;
01096
01097 if (PrivateRefCount[b] == 0)
01098 {
01099 LockBufHdr(buf);
01100 buf->refcount++;
01101 if (strategy == NULL)
01102 {
01103 if (buf->usage_count < BM_MAX_USAGE_COUNT)
01104 buf->usage_count++;
01105 }
01106 else
01107 {
01108 if (buf->usage_count == 0)
01109 buf->usage_count = 1;
01110 }
01111 result = (buf->flags & BM_VALID) != 0;
01112 UnlockBufHdr(buf);
01113 }
01114 else
01115 {
01116
01117 result = true;
01118 }
01119 PrivateRefCount[b]++;
01120 Assert(PrivateRefCount[b] > 0);
01121 ResourceOwnerRememberBuffer(CurrentResourceOwner,
01122 BufferDescriptorGetBuffer(buf));
01123 return result;
01124 }
01125
01126
01127
01128
01129
01130
01131
01132
01133
01134
01135
01136
01137
01138
01139 static void
01140 PinBuffer_Locked(volatile BufferDesc *buf)
01141 {
01142 int b = buf->buf_id;
01143
01144 if (PrivateRefCount[b] == 0)
01145 buf->refcount++;
01146 UnlockBufHdr(buf);
01147 PrivateRefCount[b]++;
01148 Assert(PrivateRefCount[b] > 0);
01149 ResourceOwnerRememberBuffer(CurrentResourceOwner,
01150 BufferDescriptorGetBuffer(buf));
01151 }
01152
01153
01154
01155
01156
01157
01158
01159
01160
01161 static void
01162 UnpinBuffer(volatile BufferDesc *buf, bool fixOwner)
01163 {
01164 int b = buf->buf_id;
01165
01166 if (fixOwner)
01167 ResourceOwnerForgetBuffer(CurrentResourceOwner,
01168 BufferDescriptorGetBuffer(buf));
01169
01170 Assert(PrivateRefCount[b] > 0);
01171 PrivateRefCount[b]--;
01172 if (PrivateRefCount[b] == 0)
01173 {
01174
01175 Assert(!LWLockHeldByMe(buf->content_lock));
01176 Assert(!LWLockHeldByMe(buf->io_in_progress_lock));
01177
01178 LockBufHdr(buf);
01179
01180
01181 Assert(buf->refcount > 0);
01182 buf->refcount--;
01183
01184
01185 if ((buf->flags & BM_PIN_COUNT_WAITER) &&
01186 buf->refcount == 1)
01187 {
01188
01189 int wait_backend_pid = buf->wait_backend_pid;
01190
01191 buf->flags &= ~BM_PIN_COUNT_WAITER;
01192 UnlockBufHdr(buf);
01193 ProcSendSignal(wait_backend_pid);
01194 }
01195 else
01196 UnlockBufHdr(buf);
01197 }
01198 }
01199
01200
01201
01202
01203
01204
01205
01206
01207
01208
01209 static void
01210 BufferSync(int flags)
01211 {
01212 int buf_id;
01213 int num_to_scan;
01214 int num_to_write;
01215 int num_written;
01216 int mask = BM_DIRTY;
01217
01218
01219 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
01220
01221
01222
01223
01224
01225 if (!((flags & CHECKPOINT_IS_SHUTDOWN) || (flags & CHECKPOINT_END_OF_RECOVERY)))
01226 mask |= BM_PERMANENT;
01227
01228
01229
01230
01231
01232
01233
01234
01235
01236
01237
01238
01239
01240
01241
01242
01243
01244 num_to_write = 0;
01245 for (buf_id = 0; buf_id < NBuffers; buf_id++)
01246 {
01247 volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
01248
01249
01250
01251
01252
01253 LockBufHdr(bufHdr);
01254
01255 if ((bufHdr->flags & mask) == mask)
01256 {
01257 bufHdr->flags |= BM_CHECKPOINT_NEEDED;
01258 num_to_write++;
01259 }
01260
01261 UnlockBufHdr(bufHdr);
01262 }
01263
01264 if (num_to_write == 0)
01265 return;
01266
01267 TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write);
01268
01269
01270
01271
01272
01273
01274
01275
01276
01277 buf_id = StrategySyncStart(NULL, NULL);
01278 num_to_scan = NBuffers;
01279 num_written = 0;
01280 while (num_to_scan-- > 0)
01281 {
01282 volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
01283
01284
01285
01286
01287
01288
01289
01290
01291
01292
01293
01294
01295
01296 if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
01297 {
01298 if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN)
01299 {
01300 TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
01301 BgWriterStats.m_buf_written_checkpoints++;
01302 num_written++;
01303
01304
01305
01306
01307
01308
01309
01310
01311
01312
01313
01314
01315 if (num_written >= num_to_write)
01316 break;
01317
01318
01319
01320
01321 CheckpointWriteDelay(flags, (double) num_written / num_to_write);
01322 }
01323 }
01324
01325 if (++buf_id >= NBuffers)
01326 buf_id = 0;
01327 }
01328
01329
01330
01331
01332
01333 CheckpointStats.ckpt_bufs_written += num_written;
01334
01335 TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_write);
01336 }
01337
01338
01339
01340
01341
01342
01343
01344
01345
01346
01347
01348
01349 bool
01350 BgBufferSync(void)
01351 {
01352
01353 int strategy_buf_id;
01354 uint32 strategy_passes;
01355 uint32 recent_alloc;
01356
01357
01358
01359
01360
01361 static bool saved_info_valid = false;
01362 static int prev_strategy_buf_id;
01363 static uint32 prev_strategy_passes;
01364 static int next_to_clean;
01365 static uint32 next_passes;
01366
01367
01368 static float smoothed_alloc = 0;
01369 static float smoothed_density = 10.0;
01370
01371
01372 float smoothing_samples = 16;
01373 float scan_whole_pool_milliseconds = 120000.0;
01374
01375
01376 long strategy_delta;
01377 int bufs_to_lap;
01378 int bufs_ahead;
01379 float scans_per_alloc;
01380 int reusable_buffers_est;
01381 int upcoming_alloc_est;
01382 int min_scan_buffers;
01383
01384
01385 int num_to_scan;
01386 int num_written;
01387 int reusable_buffers;
01388
01389
01390 long new_strategy_delta;
01391 uint32 new_recent_alloc;
01392
01393
01394
01395
01396
01397 strategy_buf_id = StrategySyncStart(&strategy_passes, &recent_alloc);
01398
01399
01400 BgWriterStats.m_buf_alloc += recent_alloc;
01401
01402
01403
01404
01405
01406
01407 if (bgwriter_lru_maxpages <= 0)
01408 {
01409 saved_info_valid = false;
01410 return true;
01411 }
01412
01413
01414
01415
01416
01417
01418
01419
01420
01421 if (saved_info_valid)
01422 {
01423 int32 passes_delta = strategy_passes - prev_strategy_passes;
01424
01425 strategy_delta = strategy_buf_id - prev_strategy_buf_id;
01426 strategy_delta += (long) passes_delta *NBuffers;
01427
01428 Assert(strategy_delta >= 0);
01429
01430 if ((int32) (next_passes - strategy_passes) > 0)
01431 {
01432
01433 bufs_to_lap = strategy_buf_id - next_to_clean;
01434 #ifdef BGW_DEBUG
01435 elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
01436 next_passes, next_to_clean,
01437 strategy_passes, strategy_buf_id,
01438 strategy_delta, bufs_to_lap);
01439 #endif
01440 }
01441 else if (next_passes == strategy_passes &&
01442 next_to_clean >= strategy_buf_id)
01443 {
01444
01445 bufs_to_lap = NBuffers - (next_to_clean - strategy_buf_id);
01446 #ifdef BGW_DEBUG
01447 elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
01448 next_passes, next_to_clean,
01449 strategy_passes, strategy_buf_id,
01450 strategy_delta, bufs_to_lap);
01451 #endif
01452 }
01453 else
01454 {
01455
01456
01457
01458
01459 #ifdef BGW_DEBUG
01460 elog(DEBUG2, "bgwriter behind: bgw %u-%u strategy %u-%u delta=%ld",
01461 next_passes, next_to_clean,
01462 strategy_passes, strategy_buf_id,
01463 strategy_delta);
01464 #endif
01465 next_to_clean = strategy_buf_id;
01466 next_passes = strategy_passes;
01467 bufs_to_lap = NBuffers;
01468 }
01469 }
01470 else
01471 {
01472
01473
01474
01475
01476 #ifdef BGW_DEBUG
01477 elog(DEBUG2, "bgwriter initializing: strategy %u-%u",
01478 strategy_passes, strategy_buf_id);
01479 #endif
01480 strategy_delta = 0;
01481 next_to_clean = strategy_buf_id;
01482 next_passes = strategy_passes;
01483 bufs_to_lap = NBuffers;
01484 }
01485
01486
01487 prev_strategy_buf_id = strategy_buf_id;
01488 prev_strategy_passes = strategy_passes;
01489 saved_info_valid = true;
01490
01491
01492
01493
01494
01495
01496
01497 if (strategy_delta > 0 && recent_alloc > 0)
01498 {
01499 scans_per_alloc = (float) strategy_delta / (float) recent_alloc;
01500 smoothed_density += (scans_per_alloc - smoothed_density) /
01501 smoothing_samples;
01502 }
01503
01504
01505
01506
01507
01508
01509 bufs_ahead = NBuffers - bufs_to_lap;
01510 reusable_buffers_est = (float) bufs_ahead / smoothed_density;
01511
01512
01513
01514
01515
01516
01517 if (smoothed_alloc <= (float) recent_alloc)
01518 smoothed_alloc = recent_alloc;
01519 else
01520 smoothed_alloc += ((float) recent_alloc - smoothed_alloc) /
01521 smoothing_samples;
01522
01523
01524 upcoming_alloc_est = (int) (smoothed_alloc * bgwriter_lru_multiplier);
01525
01526
01527
01528
01529
01530
01531
01532
01533
01534 if (upcoming_alloc_est == 0)
01535 smoothed_alloc = 0;
01536
01537
01538
01539
01540
01541
01542
01543
01544
01545
01546
01547 min_scan_buffers = (int) (NBuffers / (scan_whole_pool_milliseconds / BgWriterDelay));
01548
01549 if (upcoming_alloc_est < (min_scan_buffers + reusable_buffers_est))
01550 {
01551 #ifdef BGW_DEBUG
01552 elog(DEBUG2, "bgwriter: alloc_est=%d too small, using min=%d + reusable_est=%d",
01553 upcoming_alloc_est, min_scan_buffers, reusable_buffers_est);
01554 #endif
01555 upcoming_alloc_est = min_scan_buffers + reusable_buffers_est;
01556 }
01557
01558
01559
01560
01561
01562
01563
01564
01565
01566 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
01567
01568 num_to_scan = bufs_to_lap;
01569 num_written = 0;
01570 reusable_buffers = reusable_buffers_est;
01571
01572
01573 while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
01574 {
01575 int buffer_state = SyncOneBuffer(next_to_clean, true);
01576
01577 if (++next_to_clean >= NBuffers)
01578 {
01579 next_to_clean = 0;
01580 next_passes++;
01581 }
01582 num_to_scan--;
01583
01584 if (buffer_state & BUF_WRITTEN)
01585 {
01586 reusable_buffers++;
01587 if (++num_written >= bgwriter_lru_maxpages)
01588 {
01589 BgWriterStats.m_maxwritten_clean++;
01590 break;
01591 }
01592 }
01593 else if (buffer_state & BUF_REUSABLE)
01594 reusable_buffers++;
01595 }
01596
01597 BgWriterStats.m_buf_written_clean += num_written;
01598
01599 #ifdef BGW_DEBUG
01600 elog(DEBUG1, "bgwriter: recent_alloc=%u smoothed=%.2f delta=%ld ahead=%d density=%.2f reusable_est=%d upcoming_est=%d scanned=%d wrote=%d reusable=%d",
01601 recent_alloc, smoothed_alloc, strategy_delta, bufs_ahead,
01602 smoothed_density, reusable_buffers_est, upcoming_alloc_est,
01603 bufs_to_lap - num_to_scan,
01604 num_written,
01605 reusable_buffers - reusable_buffers_est);
01606 #endif
01607
01608
01609
01610
01611
01612
01613
01614
01615
01616 new_strategy_delta = bufs_to_lap - num_to_scan;
01617 new_recent_alloc = reusable_buffers - reusable_buffers_est;
01618 if (new_strategy_delta > 0 && new_recent_alloc > 0)
01619 {
01620 scans_per_alloc = (float) new_strategy_delta / (float) new_recent_alloc;
01621 smoothed_density += (scans_per_alloc - smoothed_density) /
01622 smoothing_samples;
01623
01624 #ifdef BGW_DEBUG
01625 elog(DEBUG2, "bgwriter: cleaner density alloc=%u scan=%ld density=%.2f new smoothed=%.2f",
01626 new_recent_alloc, new_strategy_delta,
01627 scans_per_alloc, smoothed_density);
01628 #endif
01629 }
01630
01631
01632 return (bufs_to_lap == 0 && recent_alloc == 0);
01633 }
01634
01635
01636
01637
01638
01639
01640
01641
01642
01643
01644
01645
01646
01647
01648
01649
01650
01651 static int
01652 SyncOneBuffer(int buf_id, bool skip_recently_used)
01653 {
01654 volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
01655 int result = 0;
01656
01657
01658
01659
01660
01661
01662
01663
01664
01665
01666 LockBufHdr(bufHdr);
01667
01668 if (bufHdr->refcount == 0 && bufHdr->usage_count == 0)
01669 result |= BUF_REUSABLE;
01670 else if (skip_recently_used)
01671 {
01672
01673 UnlockBufHdr(bufHdr);
01674 return result;
01675 }
01676
01677 if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY))
01678 {
01679
01680 UnlockBufHdr(bufHdr);
01681 return result;
01682 }
01683
01684
01685
01686
01687
01688 PinBuffer_Locked(bufHdr);
01689 LWLockAcquire(bufHdr->content_lock, LW_SHARED);
01690
01691 FlushBuffer(bufHdr, NULL);
01692
01693 LWLockRelease(bufHdr->content_lock);
01694 UnpinBuffer(bufHdr, true);
01695
01696 return result | BUF_WRITTEN;
01697 }
01698
01699
01700
01701
01702
01703
01704
01705
01706
01707 void
01708 AtEOXact_Buffers(bool isCommit)
01709 {
01710 #ifdef USE_ASSERT_CHECKING
01711 if (assert_enabled)
01712 {
01713 int RefCountErrors = 0;
01714 Buffer b;
01715
01716 for (b = 1; b <= NBuffers; b++)
01717 {
01718 if (PrivateRefCount[b - 1] != 0)
01719 {
01720 PrintBufferLeakWarning(b);
01721 RefCountErrors++;
01722 }
01723 }
01724 Assert(RefCountErrors == 0);
01725 }
01726 #endif
01727
01728 AtEOXact_LocalBuffers(isCommit);
01729 }
01730
01731
01732
01733
01734
01735
01736
01737
01738
01739
01740 void
01741 InitBufferPoolBackend(void)
01742 {
01743 on_shmem_exit(AtProcExit_Buffers, 0);
01744 }
01745
01746
01747
01748
01749
01750 static void
01751 AtProcExit_Buffers(int code, Datum arg)
01752 {
01753 AbortBufferIO();
01754 UnlockBuffers();
01755
01756 #ifdef USE_ASSERT_CHECKING
01757 if (assert_enabled)
01758 {
01759 int RefCountErrors = 0;
01760 Buffer b;
01761
01762 for (b = 1; b <= NBuffers; b++)
01763 {
01764 if (PrivateRefCount[b - 1] != 0)
01765 {
01766 PrintBufferLeakWarning(b);
01767 RefCountErrors++;
01768 }
01769 }
01770 Assert(RefCountErrors == 0);
01771 }
01772 #endif
01773
01774
01775 AtProcExit_LocalBuffers();
01776 }
01777
01778
01779
01780
01781 void
01782 PrintBufferLeakWarning(Buffer buffer)
01783 {
01784 volatile BufferDesc *buf;
01785 int32 loccount;
01786 char *path;
01787 BackendId backend;
01788
01789 Assert(BufferIsValid(buffer));
01790 if (BufferIsLocal(buffer))
01791 {
01792 buf = &LocalBufferDescriptors[-buffer - 1];
01793 loccount = LocalRefCount[-buffer - 1];
01794 backend = MyBackendId;
01795 }
01796 else
01797 {
01798 buf = &BufferDescriptors[buffer - 1];
01799 loccount = PrivateRefCount[buffer - 1];
01800 backend = InvalidBackendId;
01801 }
01802
01803
01804 path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum);
01805 elog(WARNING,
01806 "buffer refcount leak: [%03d] "
01807 "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
01808 buffer, path,
01809 buf->tag.blockNum, buf->flags,
01810 buf->refcount, loccount);
01811 pfree(path);
01812 }
01813
01814
01815
01816
01817
01818
01819
01820
01821
01822 void
01823 CheckPointBuffers(int flags)
01824 {
01825 TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
01826 CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
01827 BufferSync(flags);
01828 CheckpointStats.ckpt_sync_t = GetCurrentTimestamp();
01829 TRACE_POSTGRESQL_BUFFER_CHECKPOINT_SYNC_START();
01830 smgrsync();
01831 CheckpointStats.ckpt_sync_end_t = GetCurrentTimestamp();
01832 TRACE_POSTGRESQL_BUFFER_CHECKPOINT_DONE();
01833 }
01834
01835
01836
01837
01838
01839 void
01840 BufmgrCommit(void)
01841 {
01842
01843 }
01844
01845
01846
01847
01848
01849
01850
01851
01852
01853 BlockNumber
01854 BufferGetBlockNumber(Buffer buffer)
01855 {
01856 volatile BufferDesc *bufHdr;
01857
01858 Assert(BufferIsPinned(buffer));
01859
01860 if (BufferIsLocal(buffer))
01861 bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
01862 else
01863 bufHdr = &BufferDescriptors[buffer - 1];
01864
01865
01866 return bufHdr->tag.blockNum;
01867 }
01868
01869
01870
01871
01872
01873
01874 void
01875 BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum,
01876 BlockNumber *blknum)
01877 {
01878 volatile BufferDesc *bufHdr;
01879
01880
01881 Assert(BufferIsPinned(buffer));
01882
01883 if (BufferIsLocal(buffer))
01884 bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
01885 else
01886 bufHdr = &BufferDescriptors[buffer - 1];
01887
01888
01889 *rnode = bufHdr->tag.rnode;
01890 *forknum = bufHdr->tag.forkNum;
01891 *blknum = bufHdr->tag.blockNum;
01892 }
01893
01894
01895
01896
01897
01898
01899
01900
01901
01902
01903
01904
01905
01906
01907
01908
01909
01910
01911
01912
01913 static void
01914 FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
01915 {
01916 XLogRecPtr recptr;
01917 ErrorContextCallback errcallback;
01918 instr_time io_start,
01919 io_time;
01920 Block bufBlock;
01921 char *bufToWrite;
01922
01923
01924
01925
01926
01927
01928 if (!StartBufferIO(buf, false))
01929 return;
01930
01931
01932 errcallback.callback = shared_buffer_write_error_callback;
01933 errcallback.arg = (void *) buf;
01934 errcallback.previous = error_context_stack;
01935 error_context_stack = &errcallback;
01936
01937
01938 if (reln == NULL)
01939 reln = smgropen(buf->tag.rnode, InvalidBackendId);
01940
01941 TRACE_POSTGRESQL_BUFFER_FLUSH_START(buf->tag.forkNum,
01942 buf->tag.blockNum,
01943 reln->smgr_rnode.node.spcNode,
01944 reln->smgr_rnode.node.dbNode,
01945 reln->smgr_rnode.node.relNode);
01946
01947 LockBufHdr(buf);
01948
01949
01950
01951
01952
01953 recptr = BufferGetLSN(buf);
01954
01955
01956 buf->flags &= ~BM_JUST_DIRTIED;
01957 UnlockBufHdr(buf);
01958
01959
01960
01961
01962
01963
01964
01965
01966
01967
01968
01969
01970
01971
01972
01973
01974
01975
01976 if (buf->flags & BM_PERMANENT)
01977 XLogFlush(recptr);
01978
01979
01980
01981
01982
01983
01984
01985 bufBlock = BufHdrGetBlock(buf);
01986
01987 bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
01988
01989 if (track_io_timing)
01990 INSTR_TIME_SET_CURRENT(io_start);
01991
01992
01993
01994
01995 smgrwrite(reln,
01996 buf->tag.forkNum,
01997 buf->tag.blockNum,
01998 bufToWrite,
01999 false);
02000
02001 if (track_io_timing)
02002 {
02003 INSTR_TIME_SET_CURRENT(io_time);
02004 INSTR_TIME_SUBTRACT(io_time, io_start);
02005 pgstat_count_buffer_write_time(INSTR_TIME_GET_MICROSEC(io_time));
02006 INSTR_TIME_ADD(pgBufferUsage.blk_write_time, io_time);
02007 }
02008
02009 pgBufferUsage.shared_blks_written++;
02010
02011
02012
02013
02014
02015 TerminateBufferIO(buf, true, 0);
02016
02017 TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(buf->tag.forkNum,
02018 buf->tag.blockNum,
02019 reln->smgr_rnode.node.spcNode,
02020 reln->smgr_rnode.node.dbNode,
02021 reln->smgr_rnode.node.relNode);
02022
02023
02024 error_context_stack = errcallback.previous;
02025 }
02026
02027
02028
02029
02030
02031 BlockNumber
02032 RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum)
02033 {
02034
02035 RelationOpenSmgr(relation);
02036
02037 return smgrnblocks(relation->rd_smgr, forkNum);
02038 }
02039
02040
02041
02042
02043
02044
02045 bool
02046 BufferIsPermanent(Buffer buffer)
02047 {
02048 volatile BufferDesc *bufHdr;
02049
02050
02051 if (BufferIsLocal(buffer))
02052 return false;
02053
02054
02055 Assert(BufferIsValid(buffer));
02056 Assert(BufferIsPinned(buffer));
02057
02058
02059
02060
02061
02062
02063
02064
02065 bufHdr = &BufferDescriptors[buffer - 1];
02066 return (bufHdr->flags & BM_PERMANENT) != 0;
02067 }
02068
02069
02070
02071
02072
02073
02074
02075 XLogRecPtr
02076 BufferGetLSNAtomic(Buffer buffer)
02077 {
02078 volatile BufferDesc *bufHdr = &BufferDescriptors[buffer - 1];
02079 char *page = BufferGetPage(buffer);
02080 XLogRecPtr lsn;
02081
02082
02083
02084
02085 if (!DataChecksumsEnabled() || BufferIsLocal(buffer))
02086 return PageGetLSN(page);
02087
02088
02089 Assert(BufferIsValid(buffer));
02090 Assert(BufferIsPinned(buffer));
02091
02092 LockBufHdr(bufHdr);
02093 lsn = PageGetLSN(page);
02094 UnlockBufHdr(bufHdr);
02095
02096 return lsn;
02097 }
02098
02099
02100
02101
02102
02103
02104
02105
02106
02107
02108
02109
02110
02111
02112
02113
02114
02115
02116
02117
02118
02119
02120
02121
02122
02123
02124
02125 void
02126 DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum,
02127 BlockNumber firstDelBlock)
02128 {
02129 int i;
02130
02131
02132 if (RelFileNodeBackendIsTemp(rnode))
02133 {
02134 if (rnode.backend == MyBackendId)
02135 DropRelFileNodeLocalBuffers(rnode.node, forkNum, firstDelBlock);
02136 return;
02137 }
02138
02139 for (i = 0; i < NBuffers; i++)
02140 {
02141 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
02142
02143
02144
02145
02146
02147
02148
02149
02150
02151
02152
02153
02154
02155
02156
02157
02158
02159 if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
02160 continue;
02161
02162 LockBufHdr(bufHdr);
02163 if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) &&
02164 bufHdr->tag.forkNum == forkNum &&
02165 bufHdr->tag.blockNum >= firstDelBlock)
02166 InvalidateBuffer(bufHdr);
02167 else
02168 UnlockBufHdr(bufHdr);
02169 }
02170 }
02171
02172
02173
02174
02175
02176
02177
02178
02179
02180
02181 void
02182 DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes)
02183 {
02184 int i,
02185 n = 0;
02186 RelFileNode *nodes;
02187 bool use_bsearch;
02188
02189 if (nnodes == 0)
02190 return;
02191
02192 nodes = palloc(sizeof(RelFileNode) * nnodes);
02193
02194
02195 for (i = 0; i < nnodes; i++)
02196 {
02197 if (RelFileNodeBackendIsTemp(rnodes[i]))
02198 {
02199 if (rnodes[i].backend == MyBackendId)
02200 DropRelFileNodeAllLocalBuffers(rnodes[i].node);
02201 }
02202 else
02203 nodes[n++] = rnodes[i].node;
02204 }
02205
02206
02207
02208
02209
02210 if (n == 0)
02211 {
02212 pfree(nodes);
02213 return;
02214 }
02215
02216
02217
02218
02219
02220
02221
02222 use_bsearch = n > DROP_RELS_BSEARCH_THRESHOLD;
02223
02224
02225 if (use_bsearch)
02226 pg_qsort(nodes, n, sizeof(RelFileNode), rnode_comparator);
02227
02228 for (i = 0; i < NBuffers; i++)
02229 {
02230 RelFileNode *rnode = NULL;
02231 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
02232
02233
02234
02235
02236
02237
02238 if (!use_bsearch)
02239 {
02240 int j;
02241
02242 for (j = 0; j < n; j++)
02243 {
02244 if (RelFileNodeEquals(bufHdr->tag.rnode, nodes[j]))
02245 {
02246 rnode = &nodes[j];
02247 break;
02248 }
02249 }
02250 }
02251 else
02252 {
02253 rnode = bsearch((const void *) &(bufHdr->tag.rnode),
02254 nodes, n, sizeof(RelFileNode),
02255 rnode_comparator);
02256 }
02257
02258
02259 if (rnode == NULL)
02260 continue;
02261
02262 LockBufHdr(bufHdr);
02263 if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode)))
02264 InvalidateBuffer(bufHdr);
02265 else
02266 UnlockBufHdr(bufHdr);
02267 }
02268
02269 pfree(nodes);
02270 }
02271
02272
02273
02274
02275
02276
02277
02278
02279
02280
02281
02282
02283 void
02284 DropDatabaseBuffers(Oid dbid)
02285 {
02286 int i;
02287
02288
02289
02290
02291
02292
02293 for (i = 0; i < NBuffers; i++)
02294 {
02295 volatile BufferDesc *bufHdr = &BufferDescriptors[i];
02296
02297
02298
02299
02300
02301 if (bufHdr->tag.rnode.dbNode != dbid)
02302 continue;
02303
02304 LockBufHdr(bufHdr);
02305 if (bufHdr->tag.rnode.dbNode == dbid)
02306 InvalidateBuffer(bufHdr);
02307 else
02308 UnlockBufHdr(bufHdr);
02309 }
02310 }
02311
02312
02313
02314
02315
02316
02317
02318
02319 #ifdef NOT_USED
02320 void
02321 PrintBufferDescs(void)
02322 {
02323 int i;
02324 volatile BufferDesc *buf = BufferDescriptors;
02325
02326 for (i = 0; i < NBuffers; ++i, ++buf)
02327 {
02328
02329 elog(LOG,
02330 "[%02d] (freeNext=%d, rel=%s, "
02331 "blockNum=%u, flags=0x%x, refcount=%u %d)",
02332 i, buf->freeNext,
02333 relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum),
02334 buf->tag.blockNum, buf->flags,
02335 buf->refcount, PrivateRefCount[i]);
02336 }
02337 }
02338 #endif
02339
02340 #ifdef NOT_USED
02341 void
02342 PrintPinnedBufs(void)
02343 {
02344 int i;
02345 volatile BufferDesc *buf = BufferDescriptors;
02346
02347 for (i = 0; i < NBuffers; ++i, ++buf)
02348 {
02349 if (PrivateRefCount[i] > 0)
02350 {
02351
02352 elog(LOG,
02353 "[%02d] (freeNext=%d, rel=%s, "
02354 "blockNum=%u, flags=0x%x, refcount=%u %d)",
02355 i, buf->freeNext,
02356 relpath(buf->tag.rnode, buf->tag.forkNum),
02357 buf->tag.blockNum, buf->flags,
02358 buf->refcount, PrivateRefCount[i]);
02359 }
02360 }
02361 }
02362 #endif
02363
02364
02365
02366
02367
02368
02369
02370
02371
02372
02373
02374
02375
02376
02377
02378
02379
02380
02381
02382
02383 void
02384 FlushRelationBuffers(Relation rel)
02385 {
02386 int i;
02387 volatile BufferDesc *bufHdr;
02388
02389
02390 RelationOpenSmgr(rel);
02391
02392 if (RelationUsesLocalBuffers(rel))
02393 {
02394 for (i = 0; i < NLocBuffer; i++)
02395 {
02396 bufHdr = &LocalBufferDescriptors[i];
02397 if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
02398 (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
02399 {
02400 ErrorContextCallback errcallback;
02401 Page localpage;
02402
02403 localpage = (char *) LocalBufHdrGetBlock(bufHdr);
02404
02405
02406 errcallback.callback = local_buffer_write_error_callback;
02407 errcallback.arg = (void *) bufHdr;
02408 errcallback.previous = error_context_stack;
02409 error_context_stack = &errcallback;
02410
02411 PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
02412
02413 smgrwrite(rel->rd_smgr,
02414 bufHdr->tag.forkNum,
02415 bufHdr->tag.blockNum,
02416 localpage,
02417 false);
02418
02419 bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
02420
02421
02422 error_context_stack = errcallback.previous;
02423 }
02424 }
02425
02426 return;
02427 }
02428
02429
02430 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
02431
02432 for (i = 0; i < NBuffers; i++)
02433 {
02434 bufHdr = &BufferDescriptors[i];
02435
02436
02437
02438
02439
02440 if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
02441 continue;
02442
02443 LockBufHdr(bufHdr);
02444 if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
02445 (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
02446 {
02447 PinBuffer_Locked(bufHdr);
02448 LWLockAcquire(bufHdr->content_lock, LW_SHARED);
02449 FlushBuffer(bufHdr, rel->rd_smgr);
02450 LWLockRelease(bufHdr->content_lock);
02451 UnpinBuffer(bufHdr, true);
02452 }
02453 else
02454 UnlockBufHdr(bufHdr);
02455 }
02456 }
02457
02458
02459
02460
02461
02462
02463
02464
02465
02466
02467
02468
02469
02470
02471
02472
02473 void
02474 FlushDatabaseBuffers(Oid dbid)
02475 {
02476 int i;
02477 volatile BufferDesc *bufHdr;
02478
02479
02480 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
02481
02482 for (i = 0; i < NBuffers; i++)
02483 {
02484 bufHdr = &BufferDescriptors[i];
02485
02486
02487
02488
02489
02490 if (bufHdr->tag.rnode.dbNode != dbid)
02491 continue;
02492
02493 LockBufHdr(bufHdr);
02494 if (bufHdr->tag.rnode.dbNode == dbid &&
02495 (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
02496 {
02497 PinBuffer_Locked(bufHdr);
02498 LWLockAcquire(bufHdr->content_lock, LW_SHARED);
02499 FlushBuffer(bufHdr, NULL);
02500 LWLockRelease(bufHdr->content_lock);
02501 UnpinBuffer(bufHdr, true);
02502 }
02503 else
02504 UnlockBufHdr(bufHdr);
02505 }
02506 }
02507
02508
02509
02510
02511 void
02512 ReleaseBuffer(Buffer buffer)
02513 {
02514 volatile BufferDesc *bufHdr;
02515
02516 if (!BufferIsValid(buffer))
02517 elog(ERROR, "bad buffer ID: %d", buffer);
02518
02519 ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
02520
02521 if (BufferIsLocal(buffer))
02522 {
02523 Assert(LocalRefCount[-buffer - 1] > 0);
02524 LocalRefCount[-buffer - 1]--;
02525 return;
02526 }
02527
02528 bufHdr = &BufferDescriptors[buffer - 1];
02529
02530 Assert(PrivateRefCount[buffer - 1] > 0);
02531
02532 if (PrivateRefCount[buffer - 1] > 1)
02533 PrivateRefCount[buffer - 1]--;
02534 else
02535 UnpinBuffer(bufHdr, false);
02536 }
02537
02538
02539
02540
02541
02542
02543 void
02544 UnlockReleaseBuffer(Buffer buffer)
02545 {
02546 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
02547 ReleaseBuffer(buffer);
02548 }
02549
02550
02551
02552
02553
02554
02555
02556
02557
02558 void
02559 IncrBufferRefCount(Buffer buffer)
02560 {
02561 Assert(BufferIsPinned(buffer));
02562 ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
02563 ResourceOwnerRememberBuffer(CurrentResourceOwner, buffer);
02564 if (BufferIsLocal(buffer))
02565 LocalRefCount[-buffer - 1]++;
02566 else
02567 PrivateRefCount[buffer - 1]++;
02568 }
02569
02570
02571
02572
02573
02574
02575
02576
02577
02578
02579
02580
02581
02582
02583
02584 void
02585 MarkBufferDirtyHint(Buffer buffer)
02586 {
02587 volatile BufferDesc *bufHdr;
02588 Page page = BufferGetPage(buffer);
02589
02590 if (!BufferIsValid(buffer))
02591 elog(ERROR, "bad buffer ID: %d", buffer);
02592
02593 if (BufferIsLocal(buffer))
02594 {
02595 MarkLocalBufferDirty(buffer);
02596 return;
02597 }
02598
02599 bufHdr = &BufferDescriptors[buffer - 1];
02600
02601 Assert(PrivateRefCount[buffer - 1] > 0);
02602
02603 Assert(LWLockHeldByMe(bufHdr->content_lock));
02604
02605
02606
02607
02608
02609
02610
02611
02612
02613
02614
02615
02616 if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
02617 (BM_DIRTY | BM_JUST_DIRTIED))
02618 {
02619 XLogRecPtr lsn = InvalidXLogRecPtr;
02620 bool dirtied = false;
02621 bool delayChkpt = false;
02622
02623
02624
02625
02626
02627
02628
02629
02630
02631
02632
02633
02634 if (DataChecksumsEnabled() && (bufHdr->flags & BM_PERMANENT))
02635 {
02636
02637
02638
02639
02640
02641
02642
02643 if (RecoveryInProgress())
02644 return;
02645
02646
02647
02648
02649
02650
02651
02652
02653
02654
02655
02656
02657
02658
02659
02660
02661
02662
02663
02664
02665
02666
02667
02668
02669 MyPgXact->delayChkpt = delayChkpt = true;
02670 lsn = XLogSaveBufferForHint(buffer);
02671 }
02672
02673 LockBufHdr(bufHdr);
02674 Assert(bufHdr->refcount > 0);
02675 if (!(bufHdr->flags & BM_DIRTY))
02676 {
02677 dirtied = true;
02678
02679
02680
02681
02682
02683
02684
02685
02686
02687
02688
02689
02690
02691
02692 if (!XLogRecPtrIsInvalid(lsn))
02693 PageSetLSN(page, lsn);
02694 }
02695 bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
02696 UnlockBufHdr(bufHdr);
02697
02698 if (delayChkpt)
02699 MyPgXact->delayChkpt = false;
02700
02701 if (dirtied)
02702 {
02703 VacuumPageDirty++;
02704 if (VacuumCostActive)
02705 VacuumCostBalance += VacuumCostPageDirty;
02706 }
02707 }
02708 }
02709
02710
02711
02712
02713
02714
02715
02716
02717
02718
02719 void
02720 UnlockBuffers(void)
02721 {
02722 volatile BufferDesc *buf = PinCountWaitBuf;
02723
02724 if (buf)
02725 {
02726 LockBufHdr(buf);
02727
02728
02729
02730
02731
02732 if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
02733 buf->wait_backend_pid == MyProcPid)
02734 buf->flags &= ~BM_PIN_COUNT_WAITER;
02735
02736 UnlockBufHdr(buf);
02737
02738 PinCountWaitBuf = NULL;
02739 }
02740 }
02741
02742
02743
02744
02745 void
02746 LockBuffer(Buffer buffer, int mode)
02747 {
02748 volatile BufferDesc *buf;
02749
02750 Assert(BufferIsValid(buffer));
02751 if (BufferIsLocal(buffer))
02752 return;
02753
02754 buf = &(BufferDescriptors[buffer - 1]);
02755
02756 if (mode == BUFFER_LOCK_UNLOCK)
02757 LWLockRelease(buf->content_lock);
02758 else if (mode == BUFFER_LOCK_SHARE)
02759 LWLockAcquire(buf->content_lock, LW_SHARED);
02760 else if (mode == BUFFER_LOCK_EXCLUSIVE)
02761 LWLockAcquire(buf->content_lock, LW_EXCLUSIVE);
02762 else
02763 elog(ERROR, "unrecognized buffer lock mode: %d", mode);
02764 }
02765
02766
02767
02768
02769
02770
02771 bool
02772 ConditionalLockBuffer(Buffer buffer)
02773 {
02774 volatile BufferDesc *buf;
02775
02776 Assert(BufferIsValid(buffer));
02777 if (BufferIsLocal(buffer))
02778 return true;
02779
02780 buf = &(BufferDescriptors[buffer - 1]);
02781
02782 return LWLockConditionalAcquire(buf->content_lock, LW_EXCLUSIVE);
02783 }
02784
02785
02786
02787
02788
02789
02790
02791
02792
02793
02794
02795
02796
02797
02798
02799
02800
02801 void
02802 LockBufferForCleanup(Buffer buffer)
02803 {
02804 volatile BufferDesc *bufHdr;
02805
02806 Assert(BufferIsValid(buffer));
02807 Assert(PinCountWaitBuf == NULL);
02808
02809 if (BufferIsLocal(buffer))
02810 {
02811
02812 if (LocalRefCount[-buffer - 1] != 1)
02813 elog(ERROR, "incorrect local pin count: %d",
02814 LocalRefCount[-buffer - 1]);
02815
02816 return;
02817 }
02818
02819
02820 if (PrivateRefCount[buffer - 1] != 1)
02821 elog(ERROR, "incorrect local pin count: %d",
02822 PrivateRefCount[buffer - 1]);
02823
02824 bufHdr = &BufferDescriptors[buffer - 1];
02825
02826 for (;;)
02827 {
02828
02829 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
02830 LockBufHdr(bufHdr);
02831 Assert(bufHdr->refcount > 0);
02832 if (bufHdr->refcount == 1)
02833 {
02834
02835 UnlockBufHdr(bufHdr);
02836 return;
02837 }
02838
02839 if (bufHdr->flags & BM_PIN_COUNT_WAITER)
02840 {
02841 UnlockBufHdr(bufHdr);
02842 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
02843 elog(ERROR, "multiple backends attempting to wait for pincount 1");
02844 }
02845 bufHdr->wait_backend_pid = MyProcPid;
02846 bufHdr->flags |= BM_PIN_COUNT_WAITER;
02847 PinCountWaitBuf = bufHdr;
02848 UnlockBufHdr(bufHdr);
02849 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
02850
02851
02852 if (InHotStandby)
02853 {
02854
02855 SetStartupBufferPinWaitBufId(buffer - 1);
02856
02857 ResolveRecoveryConflictWithBufferPin();
02858
02859 SetStartupBufferPinWaitBufId(-1);
02860 }
02861 else
02862 ProcWaitForSignal();
02863
02864 PinCountWaitBuf = NULL;
02865
02866 }
02867 }
02868
02869
02870
02871
02872
02873 bool
02874 HoldingBufferPinThatDelaysRecovery(void)
02875 {
02876 int bufid = GetStartupBufferPinWaitBufId();
02877
02878
02879
02880
02881
02882
02883
02884 if (bufid < 0)
02885 return false;
02886
02887 if (PrivateRefCount[bufid] > 0)
02888 return true;
02889
02890 return false;
02891 }
02892
02893
02894
02895
02896
02897
02898
02899 bool
02900 ConditionalLockBufferForCleanup(Buffer buffer)
02901 {
02902 volatile BufferDesc *bufHdr;
02903
02904 Assert(BufferIsValid(buffer));
02905
02906 if (BufferIsLocal(buffer))
02907 {
02908
02909 Assert(LocalRefCount[-buffer - 1] > 0);
02910 if (LocalRefCount[-buffer - 1] != 1)
02911 return false;
02912
02913 return true;
02914 }
02915
02916
02917 Assert(PrivateRefCount[buffer - 1] > 0);
02918 if (PrivateRefCount[buffer - 1] != 1)
02919 return false;
02920
02921
02922 if (!ConditionalLockBuffer(buffer))
02923 return false;
02924
02925 bufHdr = &BufferDescriptors[buffer - 1];
02926 LockBufHdr(bufHdr);
02927 Assert(bufHdr->refcount > 0);
02928 if (bufHdr->refcount == 1)
02929 {
02930
02931 UnlockBufHdr(bufHdr);
02932 return true;
02933 }
02934
02935
02936 UnlockBufHdr(bufHdr);
02937 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
02938 return false;
02939 }
02940
02941
02942
02943
02944
02945
02946
02947
02948
02949
02950
02951
02952
02953
02954 static void
02955 WaitIO(volatile BufferDesc *buf)
02956 {
02957
02958
02959
02960
02961
02962
02963
02964 for (;;)
02965 {
02966 BufFlags sv_flags;
02967
02968
02969
02970
02971
02972
02973 LockBufHdr(buf);
02974 sv_flags = buf->flags;
02975 UnlockBufHdr(buf);
02976 if (!(sv_flags & BM_IO_IN_PROGRESS))
02977 break;
02978 LWLockAcquire(buf->io_in_progress_lock, LW_SHARED);
02979 LWLockRelease(buf->io_in_progress_lock);
02980 }
02981 }
02982
02983
02984
02985
02986
02987
02988
02989
02990
02991
02992
02993
02994
02995
02996
02997
02998
02999
03000
03001 static bool
03002 StartBufferIO(volatile BufferDesc *buf, bool forInput)
03003 {
03004 Assert(!InProgressBuf);
03005
03006 for (;;)
03007 {
03008
03009
03010
03011
03012 LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
03013
03014 LockBufHdr(buf);
03015
03016 if (!(buf->flags & BM_IO_IN_PROGRESS))
03017 break;
03018
03019
03020
03021
03022
03023
03024
03025 UnlockBufHdr(buf);
03026 LWLockRelease(buf->io_in_progress_lock);
03027 WaitIO(buf);
03028 }
03029
03030
03031
03032 if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
03033 {
03034
03035 UnlockBufHdr(buf);
03036 LWLockRelease(buf->io_in_progress_lock);
03037 return false;
03038 }
03039
03040 buf->flags |= BM_IO_IN_PROGRESS;
03041
03042 UnlockBufHdr(buf);
03043
03044 InProgressBuf = buf;
03045 IsForInput = forInput;
03046
03047 return true;
03048 }
03049
03050
03051
03052
03053
03054
03055
03056
03057
03058
03059
03060
03061
03062
03063
03064
03065
03066
03067 static void
03068 TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
03069 int set_flag_bits)
03070 {
03071 Assert(buf == InProgressBuf);
03072
03073 LockBufHdr(buf);
03074
03075 Assert(buf->flags & BM_IO_IN_PROGRESS);
03076 buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
03077 if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED))
03078 buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
03079 buf->flags |= set_flag_bits;
03080
03081 UnlockBufHdr(buf);
03082
03083 InProgressBuf = NULL;
03084
03085 LWLockRelease(buf->io_in_progress_lock);
03086 }
03087
03088
03089
03090
03091
03092
03093
03094
03095
03096
03097 void
03098 AbortBufferIO(void)
03099 {
03100 volatile BufferDesc *buf = InProgressBuf;
03101
03102 if (buf)
03103 {
03104
03105
03106
03107
03108
03109
03110 LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
03111
03112 LockBufHdr(buf);
03113 Assert(buf->flags & BM_IO_IN_PROGRESS);
03114 if (IsForInput)
03115 {
03116 Assert(!(buf->flags & BM_DIRTY));
03117
03118 Assert(!(buf->flags & BM_VALID));
03119 UnlockBufHdr(buf);
03120 }
03121 else
03122 {
03123 BufFlags sv_flags;
03124
03125 sv_flags = buf->flags;
03126 Assert(sv_flags & BM_DIRTY);
03127 UnlockBufHdr(buf);
03128
03129 if (sv_flags & BM_IO_ERROR)
03130 {
03131
03132 char *path;
03133
03134 path = relpathperm(buf->tag.rnode, buf->tag.forkNum);
03135 ereport(WARNING,
03136 (errcode(ERRCODE_IO_ERROR),
03137 errmsg("could not write block %u of %s",
03138 buf->tag.blockNum, path),
03139 errdetail("Multiple failures --- write error might be permanent.")));
03140 pfree(path);
03141 }
03142 }
03143 TerminateBufferIO(buf, false, BM_IO_ERROR);
03144 }
03145 }
03146
03147
03148
03149
03150 static void
03151 shared_buffer_write_error_callback(void *arg)
03152 {
03153 volatile BufferDesc *bufHdr = (volatile BufferDesc *) arg;
03154
03155
03156 if (bufHdr != NULL)
03157 {
03158 char *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum);
03159
03160 errcontext("writing block %u of relation %s",
03161 bufHdr->tag.blockNum, path);
03162 pfree(path);
03163 }
03164 }
03165
03166
03167
03168
03169 static void
03170 local_buffer_write_error_callback(void *arg)
03171 {
03172 volatile BufferDesc *bufHdr = (volatile BufferDesc *) arg;
03173
03174 if (bufHdr != NULL)
03175 {
03176 char *path = relpathbackend(bufHdr->tag.rnode, MyBackendId,
03177 bufHdr->tag.forkNum);
03178
03179 errcontext("writing block %u of relation %s",
03180 bufHdr->tag.blockNum, path);
03181 pfree(path);
03182 }
03183 }
03184
03185
03186
03187
03188 static int
03189 rnode_comparator(const void *p1, const void *p2)
03190 {
03191 RelFileNode n1 = *(RelFileNode *) p1;
03192 RelFileNode n2 = *(RelFileNode *) p2;
03193
03194 if (n1.relNode < n2.relNode)
03195 return -1;
03196 else if (n1.relNode > n2.relNode)
03197 return 1;
03198
03199 if (n1.dbNode < n2.dbNode)
03200 return -1;
03201 else if (n1.dbNode > n2.dbNode)
03202 return 1;
03203
03204 if (n1.spcNode < n2.spcNode)
03205 return -1;
03206 else if (n1.spcNode > n2.spcNode)
03207 return 1;
03208 else
03209 return 0;
03210 }