Header And Logo

PostgreSQL
| The world's most advanced open source database.

bufmgr.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * bufmgr.c
00004  *    buffer manager interface routines
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/storage/buffer/bufmgr.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 /*
00016  * Principal entry points:
00017  *
00018  * ReadBuffer() -- find or create a buffer holding the requested page,
00019  *      and pin it so that no one can destroy it while this process
00020  *      is using it.
00021  *
00022  * ReleaseBuffer() -- unpin a buffer
00023  *
00024  * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
00025  *      The disk write is delayed until buffer replacement or checkpoint.
00026  *
00027  * See also these files:
00028  *      freelist.c -- chooses victim for buffer replacement
00029  *      buf_table.c -- manages the buffer lookup table
00030  */
00031 #include "postgres.h"
00032 
00033 #include <sys/file.h>
00034 #include <unistd.h>
00035 
00036 #include "catalog/catalog.h"
00037 #include "catalog/storage.h"
00038 #include "common/relpath.h"
00039 #include "executor/instrument.h"
00040 #include "miscadmin.h"
00041 #include "pg_trace.h"
00042 #include "pgstat.h"
00043 #include "postmaster/bgwriter.h"
00044 #include "storage/buf_internals.h"
00045 #include "storage/bufmgr.h"
00046 #include "storage/ipc.h"
00047 #include "storage/proc.h"
00048 #include "storage/smgr.h"
00049 #include "storage/standby.h"
00050 #include "utils/rel.h"
00051 #include "utils/resowner_private.h"
00052 #include "utils/timestamp.h"
00053 
00054 
00055 /* Note: these two macros only work on shared buffers, not local ones! */
00056 #define BufHdrGetBlock(bufHdr)  ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
00057 #define BufferGetLSN(bufHdr)    (PageGetLSN(BufHdrGetBlock(bufHdr)))
00058 
00059 /* Note: this macro only works on local buffers, not shared ones! */
00060 #define LocalBufHdrGetBlock(bufHdr) \
00061     LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
00062 
00063 /* Bits in SyncOneBuffer's return value */
00064 #define BUF_WRITTEN             0x01
00065 #define BUF_REUSABLE            0x02
00066 
00067 #define DROP_RELS_BSEARCH_THRESHOLD     20
00068 
00069 /* GUC variables */
00070 bool        zero_damaged_pages = false;
00071 int         bgwriter_lru_maxpages = 100;
00072 double      bgwriter_lru_multiplier = 2.0;
00073 bool        track_io_timing = false;
00074 
00075 /*
00076  * How many buffers PrefetchBuffer callers should try to stay ahead of their
00077  * ReadBuffer calls by.  This is maintained by the assign hook for
00078  * effective_io_concurrency.  Zero means "never prefetch".
00079  */
00080 int         target_prefetch_pages = 0;
00081 
00082 /* local state for StartBufferIO and related functions */
00083 static volatile BufferDesc *InProgressBuf = NULL;
00084 static bool IsForInput;
00085 
00086 /* local state for LockBufferForCleanup */
00087 static volatile BufferDesc *PinCountWaitBuf = NULL;
00088 
00089 
00090 static Buffer ReadBuffer_common(SMgrRelation reln, char relpersistence,
00091                   ForkNumber forkNum, BlockNumber blockNum,
00092                   ReadBufferMode mode, BufferAccessStrategy strategy,
00093                   bool *hit);
00094 static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
00095 static void PinBuffer_Locked(volatile BufferDesc *buf);
00096 static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
00097 static void BufferSync(int flags);
00098 static int  SyncOneBuffer(int buf_id, bool skip_recently_used);
00099 static void WaitIO(volatile BufferDesc *buf);
00100 static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
00101 static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
00102                   int set_flag_bits);
00103 static void shared_buffer_write_error_callback(void *arg);
00104 static void local_buffer_write_error_callback(void *arg);
00105 static volatile BufferDesc *BufferAlloc(SMgrRelation smgr,
00106             char relpersistence,
00107             ForkNumber forkNum,
00108             BlockNumber blockNum,
00109             BufferAccessStrategy strategy,
00110             bool *foundPtr);
00111 static void FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln);
00112 static void AtProcExit_Buffers(int code, Datum arg);
00113 static int rnode_comparator(const void *p1, const void *p2);
00114 
00115 
00116 /*
00117  * PrefetchBuffer -- initiate asynchronous read of a block of a relation
00118  *
00119  * This is named by analogy to ReadBuffer but doesn't actually allocate a
00120  * buffer.  Instead it tries to ensure that a future ReadBuffer for the given
00121  * block will not be delayed by the I/O.  Prefetching is optional.
00122  * No-op if prefetching isn't compiled in.
00123  */
00124 void
00125 PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
00126 {
00127 #ifdef USE_PREFETCH
00128     Assert(RelationIsValid(reln));
00129     Assert(BlockNumberIsValid(blockNum));
00130 
00131     /* Open it at the smgr level if not already done */
00132     RelationOpenSmgr(reln);
00133 
00134     if (RelationUsesLocalBuffers(reln))
00135     {
00136         /* see comments in ReadBufferExtended */
00137         if (RELATION_IS_OTHER_TEMP(reln))
00138             ereport(ERROR,
00139                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00140                 errmsg("cannot access temporary tables of other sessions")));
00141 
00142         /* pass it off to localbuf.c */
00143         LocalPrefetchBuffer(reln->rd_smgr, forkNum, blockNum);
00144     }
00145     else
00146     {
00147         BufferTag   newTag;     /* identity of requested block */
00148         uint32      newHash;    /* hash value for newTag */
00149         LWLockId    newPartitionLock;   /* buffer partition lock for it */
00150         int         buf_id;
00151 
00152         /* create a tag so we can lookup the buffer */
00153         INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode.node,
00154                        forkNum, blockNum);
00155 
00156         /* determine its hash code and partition lock ID */
00157         newHash = BufTableHashCode(&newTag);
00158         newPartitionLock = BufMappingPartitionLock(newHash);
00159 
00160         /* see if the block is in the buffer pool already */
00161         LWLockAcquire(newPartitionLock, LW_SHARED);
00162         buf_id = BufTableLookup(&newTag, newHash);
00163         LWLockRelease(newPartitionLock);
00164 
00165         /* If not in buffers, initiate prefetch */
00166         if (buf_id < 0)
00167             smgrprefetch(reln->rd_smgr, forkNum, blockNum);
00168 
00169         /*
00170          * If the block *is* in buffers, we do nothing.  This is not really
00171          * ideal: the block might be just about to be evicted, which would be
00172          * stupid since we know we are going to need it soon.  But the only
00173          * easy answer is to bump the usage_count, which does not seem like a
00174          * great solution: when the caller does ultimately touch the block,
00175          * usage_count would get bumped again, resulting in too much
00176          * favoritism for blocks that are involved in a prefetch sequence. A
00177          * real fix would involve some additional per-buffer state, and it's
00178          * not clear that there's enough of a problem to justify that.
00179          */
00180     }
00181 #endif   /* USE_PREFETCH */
00182 }
00183 
00184 
00185 /*
00186  * ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
00187  *      fork with RBM_NORMAL mode and default strategy.
00188  */
00189 Buffer
00190 ReadBuffer(Relation reln, BlockNumber blockNum)
00191 {
00192     return ReadBufferExtended(reln, MAIN_FORKNUM, blockNum, RBM_NORMAL, NULL);
00193 }
00194 
00195 /*
00196  * ReadBufferExtended -- returns a buffer containing the requested
00197  *      block of the requested relation.  If the blknum
00198  *      requested is P_NEW, extend the relation file and
00199  *      allocate a new block.  (Caller is responsible for
00200  *      ensuring that only one backend tries to extend a
00201  *      relation at the same time!)
00202  *
00203  * Returns: the buffer number for the buffer containing
00204  *      the block read.  The returned buffer has been pinned.
00205  *      Does not return on error --- elog's instead.
00206  *
00207  * Assume when this function is called, that reln has been opened already.
00208  *
00209  * In RBM_NORMAL mode, the page is read from disk, and the page header is
00210  * validated. An error is thrown if the page header is not valid.
00211  *
00212  * RBM_ZERO_ON_ERROR is like the normal mode, but if the page header is not
00213  * valid, the page is zeroed instead of throwing an error. This is intended
00214  * for non-critical data, where the caller is prepared to repair errors.
00215  *
00216  * In RBM_ZERO mode, if the page isn't in buffer cache already, it's filled
00217  * with zeros instead of reading it from disk.  Useful when the caller is
00218  * going to fill the page from scratch, since this saves I/O and avoids
00219  * unnecessary failure if the page-on-disk has corrupt page headers.
00220  * Caution: do not use this mode to read a page that is beyond the relation's
00221  * current physical EOF; that is likely to cause problems in md.c when
00222  * the page is modified and written out. P_NEW is OK, though.
00223  *
00224  * If strategy is not NULL, a nondefault buffer access strategy is used.
00225  * See buffer/README for details.
00226  */
00227 Buffer
00228 ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
00229                    ReadBufferMode mode, BufferAccessStrategy strategy)
00230 {
00231     bool        hit;
00232     Buffer      buf;
00233 
00234     /* Open it at the smgr level if not already done */
00235     RelationOpenSmgr(reln);
00236 
00237     /*
00238      * Reject attempts to read non-local temporary relations; we would be
00239      * likely to get wrong data since we have no visibility into the owning
00240      * session's local buffers.
00241      */
00242     if (RELATION_IS_OTHER_TEMP(reln))
00243         ereport(ERROR,
00244                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00245                  errmsg("cannot access temporary tables of other sessions")));
00246 
00247     /*
00248      * Read the buffer, and update pgstat counters to reflect a cache hit or
00249      * miss.
00250      */
00251     pgstat_count_buffer_read(reln);
00252     buf = ReadBuffer_common(reln->rd_smgr, reln->rd_rel->relpersistence,
00253                             forkNum, blockNum, mode, strategy, &hit);
00254     if (hit)
00255         pgstat_count_buffer_hit(reln);
00256     return buf;
00257 }
00258 
00259 
00260 /*
00261  * ReadBufferWithoutRelcache -- like ReadBufferExtended, but doesn't require
00262  *      a relcache entry for the relation.
00263  *
00264  * NB: At present, this function may only be used on permanent relations, which
00265  * is OK, because we only use it during XLOG replay.  If in the future we
00266  * want to use it on temporary or unlogged relations, we could pass additional
00267  * parameters.
00268  */
00269 Buffer
00270 ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum,
00271                           BlockNumber blockNum, ReadBufferMode mode,
00272                           BufferAccessStrategy strategy)
00273 {
00274     bool        hit;
00275 
00276     SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
00277 
00278     Assert(InRecovery);
00279 
00280     return ReadBuffer_common(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
00281                              mode, strategy, &hit);
00282 }
00283 
00284 
00285 /*
00286  * ReadBuffer_common -- common logic for all ReadBuffer variants
00287  *
00288  * *hit is set to true if the request was satisfied from shared buffer cache.
00289  */
00290 static Buffer
00291 ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
00292                   BlockNumber blockNum, ReadBufferMode mode,
00293                   BufferAccessStrategy strategy, bool *hit)
00294 {
00295     volatile BufferDesc *bufHdr;
00296     Block       bufBlock;
00297     bool        found;
00298     bool        isExtend;
00299     bool        isLocalBuf = SmgrIsTemp(smgr);
00300 
00301     *hit = false;
00302 
00303     /* Make sure we will have room to remember the buffer pin */
00304     ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
00305 
00306     isExtend = (blockNum == P_NEW);
00307 
00308     TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
00309                                        smgr->smgr_rnode.node.spcNode,
00310                                        smgr->smgr_rnode.node.dbNode,
00311                                        smgr->smgr_rnode.node.relNode,
00312                                        smgr->smgr_rnode.backend,
00313                                        isExtend);
00314 
00315     /* Substitute proper block number if caller asked for P_NEW */
00316     if (isExtend)
00317         blockNum = smgrnblocks(smgr, forkNum);
00318 
00319     if (isLocalBuf)
00320     {
00321         bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
00322         if (found)
00323             pgBufferUsage.local_blks_hit++;
00324         else
00325             pgBufferUsage.local_blks_read++;
00326     }
00327     else
00328     {
00329         /*
00330          * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
00331          * not currently in memory.
00332          */
00333         bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
00334                              strategy, &found);
00335         if (found)
00336             pgBufferUsage.shared_blks_hit++;
00337         else
00338             pgBufferUsage.shared_blks_read++;
00339     }
00340 
00341     /* At this point we do NOT hold any locks. */
00342 
00343     /* if it was already in the buffer pool, we're done */
00344     if (found)
00345     {
00346         if (!isExtend)
00347         {
00348             /* Just need to update stats before we exit */
00349             *hit = true;
00350             VacuumPageHit++;
00351 
00352             if (VacuumCostActive)
00353                 VacuumCostBalance += VacuumCostPageHit;
00354 
00355             TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
00356                                               smgr->smgr_rnode.node.spcNode,
00357                                               smgr->smgr_rnode.node.dbNode,
00358                                               smgr->smgr_rnode.node.relNode,
00359                                               smgr->smgr_rnode.backend,
00360                                               isExtend,
00361                                               found);
00362 
00363             return BufferDescriptorGetBuffer(bufHdr);
00364         }
00365 
00366         /*
00367          * We get here only in the corner case where we are trying to extend
00368          * the relation but we found a pre-existing buffer marked BM_VALID.
00369          * This can happen because mdread doesn't complain about reads beyond
00370          * EOF (when zero_damaged_pages is ON) and so a previous attempt to
00371          * read a block beyond EOF could have left a "valid" zero-filled
00372          * buffer.  Unfortunately, we have also seen this case occurring
00373          * because of buggy Linux kernels that sometimes return an
00374          * lseek(SEEK_END) result that doesn't account for a recent write. In
00375          * that situation, the pre-existing buffer would contain valid data
00376          * that we don't want to overwrite.  Since the legitimate case should
00377          * always have left a zero-filled buffer, complain if not PageIsNew.
00378          */
00379         bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
00380         if (!PageIsNew((Page) bufBlock))
00381             ereport(ERROR,
00382              (errmsg("unexpected data beyond EOF in block %u of relation %s",
00383                      blockNum, relpath(smgr->smgr_rnode, forkNum)),
00384               errhint("This has been seen to occur with buggy kernels; consider updating your system.")));
00385 
00386         /*
00387          * We *must* do smgrextend before succeeding, else the page will not
00388          * be reserved by the kernel, and the next P_NEW call will decide to
00389          * return the same page.  Clear the BM_VALID bit, do the StartBufferIO
00390          * call that BufferAlloc didn't, and proceed.
00391          */
00392         if (isLocalBuf)
00393         {
00394             /* Only need to adjust flags */
00395             Assert(bufHdr->flags & BM_VALID);
00396             bufHdr->flags &= ~BM_VALID;
00397         }
00398         else
00399         {
00400             /*
00401              * Loop to handle the very small possibility that someone re-sets
00402              * BM_VALID between our clearing it and StartBufferIO inspecting
00403              * it.
00404              */
00405             do
00406             {
00407                 LockBufHdr(bufHdr);
00408                 Assert(bufHdr->flags & BM_VALID);
00409                 bufHdr->flags &= ~BM_VALID;
00410                 UnlockBufHdr(bufHdr);
00411             } while (!StartBufferIO(bufHdr, true));
00412         }
00413     }
00414 
00415     /*
00416      * if we have gotten to this point, we have allocated a buffer for the
00417      * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
00418      * if it's a shared buffer.
00419      *
00420      * Note: if smgrextend fails, we will end up with a buffer that is
00421      * allocated but not marked BM_VALID.  P_NEW will still select the same
00422      * block number (because the relation didn't get any longer on disk) and
00423      * so future attempts to extend the relation will find the same buffer (if
00424      * it's not been recycled) but come right back here to try smgrextend
00425      * again.
00426      */
00427     Assert(!(bufHdr->flags & BM_VALID));        /* spinlock not needed */
00428 
00429     bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
00430 
00431     if (isExtend)
00432     {
00433         /* new buffers are zero-filled */
00434         MemSet((char *) bufBlock, 0, BLCKSZ);
00435         /* don't set checksum for all-zero page */
00436         smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
00437     }
00438     else
00439     {
00440         /*
00441          * Read in the page, unless the caller intends to overwrite it and
00442          * just wants us to allocate a buffer.
00443          */
00444         if (mode == RBM_ZERO)
00445             MemSet((char *) bufBlock, 0, BLCKSZ);
00446         else
00447         {
00448             instr_time  io_start,
00449                         io_time;
00450 
00451             if (track_io_timing)
00452                 INSTR_TIME_SET_CURRENT(io_start);
00453 
00454             smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
00455 
00456             if (track_io_timing)
00457             {
00458                 INSTR_TIME_SET_CURRENT(io_time);
00459                 INSTR_TIME_SUBTRACT(io_time, io_start);
00460                 pgstat_count_buffer_read_time(INSTR_TIME_GET_MICROSEC(io_time));
00461                 INSTR_TIME_ADD(pgBufferUsage.blk_read_time, io_time);
00462             }
00463 
00464             /* check for garbage data */
00465             if (!PageIsVerified((Page) bufBlock, blockNum))
00466             {
00467                 if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
00468                 {
00469                     ereport(WARNING,
00470                             (errcode(ERRCODE_DATA_CORRUPTED),
00471                              errmsg("invalid page in block %u of relation %s; zeroing out page",
00472                                     blockNum,
00473                                     relpath(smgr->smgr_rnode, forkNum))));
00474                     MemSet((char *) bufBlock, 0, BLCKSZ);
00475                 }
00476                 else
00477                     ereport(ERROR,
00478                             (errcode(ERRCODE_DATA_CORRUPTED),
00479                      errmsg("invalid page in block %u of relation %s",
00480                             blockNum,
00481                             relpath(smgr->smgr_rnode, forkNum))));
00482             }
00483         }
00484     }
00485 
00486     if (isLocalBuf)
00487     {
00488         /* Only need to adjust flags */
00489         bufHdr->flags |= BM_VALID;
00490     }
00491     else
00492     {
00493         /* Set BM_VALID, terminate IO, and wake up any waiters */
00494         TerminateBufferIO(bufHdr, false, BM_VALID);
00495     }
00496 
00497     VacuumPageMiss++;
00498     if (VacuumCostActive)
00499         VacuumCostBalance += VacuumCostPageMiss;
00500 
00501     TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
00502                                       smgr->smgr_rnode.node.spcNode,
00503                                       smgr->smgr_rnode.node.dbNode,
00504                                       smgr->smgr_rnode.node.relNode,
00505                                       smgr->smgr_rnode.backend,
00506                                       isExtend,
00507                                       found);
00508 
00509     return BufferDescriptorGetBuffer(bufHdr);
00510 }
00511 
00512 /*
00513  * BufferAlloc -- subroutine for ReadBuffer.  Handles lookup of a shared
00514  *      buffer.  If no buffer exists already, selects a replacement
00515  *      victim and evicts the old page, but does NOT read in new page.
00516  *
00517  * "strategy" can be a buffer replacement strategy object, or NULL for
00518  * the default strategy.  The selected buffer's usage_count is advanced when
00519  * using the default strategy, but otherwise possibly not (see PinBuffer).
00520  *
00521  * The returned buffer is pinned and is already marked as holding the
00522  * desired page.  If it already did have the desired page, *foundPtr is
00523  * set TRUE.  Otherwise, *foundPtr is set FALSE and the buffer is marked
00524  * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
00525  *
00526  * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
00527  * we keep it for simplicity in ReadBuffer.
00528  *
00529  * No locks are held either at entry or exit.
00530  */
00531 static volatile BufferDesc *
00532 BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
00533             BlockNumber blockNum,
00534             BufferAccessStrategy strategy,
00535             bool *foundPtr)
00536 {
00537     BufferTag   newTag;         /* identity of requested block */
00538     uint32      newHash;        /* hash value for newTag */
00539     LWLockId    newPartitionLock;       /* buffer partition lock for it */
00540     BufferTag   oldTag;         /* previous identity of selected buffer */
00541     uint32      oldHash;        /* hash value for oldTag */
00542     LWLockId    oldPartitionLock;       /* buffer partition lock for it */
00543     BufFlags    oldFlags;
00544     int         buf_id;
00545     volatile BufferDesc *buf;
00546     bool        valid;
00547 
00548     /* create a tag so we can lookup the buffer */
00549     INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
00550 
00551     /* determine its hash code and partition lock ID */
00552     newHash = BufTableHashCode(&newTag);
00553     newPartitionLock = BufMappingPartitionLock(newHash);
00554 
00555     /* see if the block is in the buffer pool already */
00556     LWLockAcquire(newPartitionLock, LW_SHARED);
00557     buf_id = BufTableLookup(&newTag, newHash);
00558     if (buf_id >= 0)
00559     {
00560         /*
00561          * Found it.  Now, pin the buffer so no one can steal it from the
00562          * buffer pool, and check to see if the correct data has been loaded
00563          * into the buffer.
00564          */
00565         buf = &BufferDescriptors[buf_id];
00566 
00567         valid = PinBuffer(buf, strategy);
00568 
00569         /* Can release the mapping lock as soon as we've pinned it */
00570         LWLockRelease(newPartitionLock);
00571 
00572         *foundPtr = TRUE;
00573 
00574         if (!valid)
00575         {
00576             /*
00577              * We can only get here if (a) someone else is still reading in
00578              * the page, or (b) a previous read attempt failed.  We have to
00579              * wait for any active read attempt to finish, and then set up our
00580              * own read attempt if the page is still not BM_VALID.
00581              * StartBufferIO does it all.
00582              */
00583             if (StartBufferIO(buf, true))
00584             {
00585                 /*
00586                  * If we get here, previous attempts to read the buffer must
00587                  * have failed ... but we shall bravely try again.
00588                  */
00589                 *foundPtr = FALSE;
00590             }
00591         }
00592 
00593         return buf;
00594     }
00595 
00596     /*
00597      * Didn't find it in the buffer pool.  We'll have to initialize a new
00598      * buffer.  Remember to unlock the mapping lock while doing the work.
00599      */
00600     LWLockRelease(newPartitionLock);
00601 
00602     /* Loop here in case we have to try another victim buffer */
00603     for (;;)
00604     {
00605         bool        lock_held;
00606 
00607         /*
00608          * Select a victim buffer.  The buffer is returned with its header
00609          * spinlock still held!  Also (in most cases) the BufFreelistLock is
00610          * still held, since it would be bad to hold the spinlock while
00611          * possibly waking up other processes.
00612          */
00613         buf = StrategyGetBuffer(strategy, &lock_held);
00614 
00615         Assert(buf->refcount == 0);
00616 
00617         /* Must copy buffer flags while we still hold the spinlock */
00618         oldFlags = buf->flags;
00619 
00620         /* Pin the buffer and then release the buffer spinlock */
00621         PinBuffer_Locked(buf);
00622 
00623         /* Now it's safe to release the freelist lock */
00624         if (lock_held)
00625             LWLockRelease(BufFreelistLock);
00626 
00627         /*
00628          * If the buffer was dirty, try to write it out.  There is a race
00629          * condition here, in that someone might dirty it after we released it
00630          * above, or even while we are writing it out (since our share-lock
00631          * won't prevent hint-bit updates).  We will recheck the dirty bit
00632          * after re-locking the buffer header.
00633          */
00634         if (oldFlags & BM_DIRTY)
00635         {
00636             /*
00637              * We need a share-lock on the buffer contents to write it out
00638              * (else we might write invalid data, eg because someone else is
00639              * compacting the page contents while we write).  We must use a
00640              * conditional lock acquisition here to avoid deadlock.  Even
00641              * though the buffer was not pinned (and therefore surely not
00642              * locked) when StrategyGetBuffer returned it, someone else could
00643              * have pinned and exclusive-locked it by the time we get here. If
00644              * we try to get the lock unconditionally, we'd block waiting for
00645              * them; if they later block waiting for us, deadlock ensues.
00646              * (This has been observed to happen when two backends are both
00647              * trying to split btree index pages, and the second one just
00648              * happens to be trying to split the page the first one got from
00649              * StrategyGetBuffer.)
00650              */
00651             if (LWLockConditionalAcquire(buf->content_lock, LW_SHARED))
00652             {
00653                 /*
00654                  * If using a nondefault strategy, and writing the buffer
00655                  * would require a WAL flush, let the strategy decide whether
00656                  * to go ahead and write/reuse the buffer or to choose another
00657                  * victim.  We need lock to inspect the page LSN, so this
00658                  * can't be done inside StrategyGetBuffer.
00659                  */
00660                 if (strategy != NULL)
00661                 {
00662                     XLogRecPtr  lsn;
00663 
00664                     /* Read the LSN while holding buffer header lock */
00665                     LockBufHdr(buf);
00666                     lsn = BufferGetLSN(buf);
00667                     UnlockBufHdr(buf);
00668 
00669                     if (XLogNeedsFlush(lsn) &&
00670                         StrategyRejectBuffer(strategy, buf))
00671                     {
00672                         /* Drop lock/pin and loop around for another buffer */
00673                         LWLockRelease(buf->content_lock);
00674                         UnpinBuffer(buf, true);
00675                         continue;
00676                     }
00677                 }
00678 
00679                 /* OK, do the I/O */
00680                 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(forkNum, blockNum,
00681                                                smgr->smgr_rnode.node.spcNode,
00682                                                 smgr->smgr_rnode.node.dbNode,
00683                                               smgr->smgr_rnode.node.relNode);
00684 
00685                 FlushBuffer(buf, NULL);
00686                 LWLockRelease(buf->content_lock);
00687 
00688                 TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
00689                                                smgr->smgr_rnode.node.spcNode,
00690                                                 smgr->smgr_rnode.node.dbNode,
00691                                               smgr->smgr_rnode.node.relNode);
00692             }
00693             else
00694             {
00695                 /*
00696                  * Someone else has locked the buffer, so give it up and loop
00697                  * back to get another one.
00698                  */
00699                 UnpinBuffer(buf, true);
00700                 continue;
00701             }
00702         }
00703 
00704         /*
00705          * To change the association of a valid buffer, we'll need to have
00706          * exclusive lock on both the old and new mapping partitions.
00707          */
00708         if (oldFlags & BM_TAG_VALID)
00709         {
00710             /*
00711              * Need to compute the old tag's hashcode and partition lock ID.
00712              * XXX is it worth storing the hashcode in BufferDesc so we need
00713              * not recompute it here?  Probably not.
00714              */
00715             oldTag = buf->tag;
00716             oldHash = BufTableHashCode(&oldTag);
00717             oldPartitionLock = BufMappingPartitionLock(oldHash);
00718 
00719             /*
00720              * Must lock the lower-numbered partition first to avoid
00721              * deadlocks.
00722              */
00723             if (oldPartitionLock < newPartitionLock)
00724             {
00725                 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
00726                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
00727             }
00728             else if (oldPartitionLock > newPartitionLock)
00729             {
00730                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
00731                 LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
00732             }
00733             else
00734             {
00735                 /* only one partition, only one lock */
00736                 LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
00737             }
00738         }
00739         else
00740         {
00741             /* if it wasn't valid, we need only the new partition */
00742             LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
00743             /* these just keep the compiler quiet about uninit variables */
00744             oldHash = 0;
00745             oldPartitionLock = 0;
00746         }
00747 
00748         /*
00749          * Try to make a hashtable entry for the buffer under its new tag.
00750          * This could fail because while we were writing someone else
00751          * allocated another buffer for the same block we want to read in.
00752          * Note that we have not yet removed the hashtable entry for the old
00753          * tag.
00754          */
00755         buf_id = BufTableInsert(&newTag, newHash, buf->buf_id);
00756 
00757         if (buf_id >= 0)
00758         {
00759             /*
00760              * Got a collision. Someone has already done what we were about to
00761              * do. We'll just handle this as if it were found in the buffer
00762              * pool in the first place.  First, give up the buffer we were
00763              * planning to use.
00764              */
00765             UnpinBuffer(buf, true);
00766 
00767             /* Can give up that buffer's mapping partition lock now */
00768             if ((oldFlags & BM_TAG_VALID) &&
00769                 oldPartitionLock != newPartitionLock)
00770                 LWLockRelease(oldPartitionLock);
00771 
00772             /* remaining code should match code at top of routine */
00773 
00774             buf = &BufferDescriptors[buf_id];
00775 
00776             valid = PinBuffer(buf, strategy);
00777 
00778             /* Can release the mapping lock as soon as we've pinned it */
00779             LWLockRelease(newPartitionLock);
00780 
00781             *foundPtr = TRUE;
00782 
00783             if (!valid)
00784             {
00785                 /*
00786                  * We can only get here if (a) someone else is still reading
00787                  * in the page, or (b) a previous read attempt failed.  We
00788                  * have to wait for any active read attempt to finish, and
00789                  * then set up our own read attempt if the page is still not
00790                  * BM_VALID.  StartBufferIO does it all.
00791                  */
00792                 if (StartBufferIO(buf, true))
00793                 {
00794                     /*
00795                      * If we get here, previous attempts to read the buffer
00796                      * must have failed ... but we shall bravely try again.
00797                      */
00798                     *foundPtr = FALSE;
00799                 }
00800             }
00801 
00802             return buf;
00803         }
00804 
00805         /*
00806          * Need to lock the buffer header too in order to change its tag.
00807          */
00808         LockBufHdr(buf);
00809 
00810         /*
00811          * Somebody could have pinned or re-dirtied the buffer while we were
00812          * doing the I/O and making the new hashtable entry.  If so, we can't
00813          * recycle this buffer; we must undo everything we've done and start
00814          * over with a new victim buffer.
00815          */
00816         oldFlags = buf->flags;
00817         if (buf->refcount == 1 && !(oldFlags & BM_DIRTY))
00818             break;
00819 
00820         UnlockBufHdr(buf);
00821         BufTableDelete(&newTag, newHash);
00822         if ((oldFlags & BM_TAG_VALID) &&
00823             oldPartitionLock != newPartitionLock)
00824             LWLockRelease(oldPartitionLock);
00825         LWLockRelease(newPartitionLock);
00826         UnpinBuffer(buf, true);
00827     }
00828 
00829     /*
00830      * Okay, it's finally safe to rename the buffer.
00831      *
00832      * Clearing BM_VALID here is necessary, clearing the dirtybits is just
00833      * paranoia.  We also reset the usage_count since any recency of use of
00834      * the old content is no longer relevant.  (The usage_count starts out at
00835      * 1 so that the buffer can survive one clock-sweep pass.)
00836      */
00837     buf->tag = newTag;
00838     buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT);
00839     if (relpersistence == RELPERSISTENCE_PERMANENT)
00840         buf->flags |= BM_TAG_VALID | BM_PERMANENT;
00841     else
00842         buf->flags |= BM_TAG_VALID;
00843     buf->usage_count = 1;
00844 
00845     UnlockBufHdr(buf);
00846 
00847     if (oldFlags & BM_TAG_VALID)
00848     {
00849         BufTableDelete(&oldTag, oldHash);
00850         if (oldPartitionLock != newPartitionLock)
00851             LWLockRelease(oldPartitionLock);
00852     }
00853 
00854     LWLockRelease(newPartitionLock);
00855 
00856     /*
00857      * Buffer contents are currently invalid.  Try to get the io_in_progress
00858      * lock.  If StartBufferIO returns false, then someone else managed to
00859      * read it before we did, so there's nothing left for BufferAlloc() to do.
00860      */
00861     if (StartBufferIO(buf, true))
00862         *foundPtr = FALSE;
00863     else
00864         *foundPtr = TRUE;
00865 
00866     return buf;
00867 }
00868 
00869 /*
00870  * InvalidateBuffer -- mark a shared buffer invalid and return it to the
00871  * freelist.
00872  *
00873  * The buffer header spinlock must be held at entry.  We drop it before
00874  * returning.  (This is sane because the caller must have locked the
00875  * buffer in order to be sure it should be dropped.)
00876  *
00877  * This is used only in contexts such as dropping a relation.  We assume
00878  * that no other backend could possibly be interested in using the page,
00879  * so the only reason the buffer might be pinned is if someone else is
00880  * trying to write it out.  We have to let them finish before we can
00881  * reclaim the buffer.
00882  *
00883  * The buffer could get reclaimed by someone else while we are waiting
00884  * to acquire the necessary locks; if so, don't mess it up.
00885  */
00886 static void
00887 InvalidateBuffer(volatile BufferDesc *buf)
00888 {
00889     BufferTag   oldTag;
00890     uint32      oldHash;        /* hash value for oldTag */
00891     LWLockId    oldPartitionLock;       /* buffer partition lock for it */
00892     BufFlags    oldFlags;
00893 
00894     /* Save the original buffer tag before dropping the spinlock */
00895     oldTag = buf->tag;
00896 
00897     UnlockBufHdr(buf);
00898 
00899     /*
00900      * Need to compute the old tag's hashcode and partition lock ID. XXX is it
00901      * worth storing the hashcode in BufferDesc so we need not recompute it
00902      * here?  Probably not.
00903      */
00904     oldHash = BufTableHashCode(&oldTag);
00905     oldPartitionLock = BufMappingPartitionLock(oldHash);
00906 
00907 retry:
00908 
00909     /*
00910      * Acquire exclusive mapping lock in preparation for changing the buffer's
00911      * association.
00912      */
00913     LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
00914 
00915     /* Re-lock the buffer header */
00916     LockBufHdr(buf);
00917 
00918     /* If it's changed while we were waiting for lock, do nothing */
00919     if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
00920     {
00921         UnlockBufHdr(buf);
00922         LWLockRelease(oldPartitionLock);
00923         return;
00924     }
00925 
00926     /*
00927      * We assume the only reason for it to be pinned is that someone else is
00928      * flushing the page out.  Wait for them to finish.  (This could be an
00929      * infinite loop if the refcount is messed up... it would be nice to time
00930      * out after awhile, but there seems no way to be sure how many loops may
00931      * be needed.  Note that if the other guy has pinned the buffer but not
00932      * yet done StartBufferIO, WaitIO will fall through and we'll effectively
00933      * be busy-looping here.)
00934      */
00935     if (buf->refcount != 0)
00936     {
00937         UnlockBufHdr(buf);
00938         LWLockRelease(oldPartitionLock);
00939         /* safety check: should definitely not be our *own* pin */
00940         if (PrivateRefCount[buf->buf_id] != 0)
00941             elog(ERROR, "buffer is pinned in InvalidateBuffer");
00942         WaitIO(buf);
00943         goto retry;
00944     }
00945 
00946     /*
00947      * Clear out the buffer's tag and flags.  We must do this to ensure that
00948      * linear scans of the buffer array don't think the buffer is valid.
00949      */
00950     oldFlags = buf->flags;
00951     CLEAR_BUFFERTAG(buf->tag);
00952     buf->flags = 0;
00953     buf->usage_count = 0;
00954 
00955     UnlockBufHdr(buf);
00956 
00957     /*
00958      * Remove the buffer from the lookup hashtable, if it was in there.
00959      */
00960     if (oldFlags & BM_TAG_VALID)
00961         BufTableDelete(&oldTag, oldHash);
00962 
00963     /*
00964      * Done with mapping lock.
00965      */
00966     LWLockRelease(oldPartitionLock);
00967 
00968     /*
00969      * Insert the buffer at the head of the list of free buffers.
00970      */
00971     StrategyFreeBuffer(buf);
00972 }
00973 
00974 /*
00975  * MarkBufferDirty
00976  *
00977  *      Marks buffer contents as dirty (actual write happens later).
00978  *
00979  * Buffer must be pinned and exclusive-locked.  (If caller does not hold
00980  * exclusive lock, then somebody could be in process of writing the buffer,
00981  * leading to risk of bad data written to disk.)
00982  */
00983 void
00984 MarkBufferDirty(Buffer buffer)
00985 {
00986     volatile BufferDesc *bufHdr;
00987 
00988     if (!BufferIsValid(buffer))
00989         elog(ERROR, "bad buffer ID: %d", buffer);
00990 
00991     if (BufferIsLocal(buffer))
00992     {
00993         MarkLocalBufferDirty(buffer);
00994         return;
00995     }
00996 
00997     bufHdr = &BufferDescriptors[buffer - 1];
00998 
00999     Assert(PrivateRefCount[buffer - 1] > 0);
01000     /* unfortunately we can't check if the lock is held exclusively */
01001     Assert(LWLockHeldByMe(bufHdr->content_lock));
01002 
01003     LockBufHdr(bufHdr);
01004 
01005     Assert(bufHdr->refcount > 0);
01006 
01007     /*
01008      * If the buffer was not dirty already, do vacuum accounting.
01009      */
01010     if (!(bufHdr->flags & BM_DIRTY))
01011     {
01012         VacuumPageDirty++;
01013         pgBufferUsage.shared_blks_dirtied++;
01014         if (VacuumCostActive)
01015             VacuumCostBalance += VacuumCostPageDirty;
01016     }
01017 
01018     bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
01019 
01020     UnlockBufHdr(bufHdr);
01021 }
01022 
01023 /*
01024  * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
01025  *
01026  * Formerly, this saved one cycle of acquiring/releasing the BufMgrLock
01027  * compared to calling the two routines separately.  Now it's mainly just
01028  * a convenience function.  However, if the passed buffer is valid and
01029  * already contains the desired block, we just return it as-is; and that
01030  * does save considerable work compared to a full release and reacquire.
01031  *
01032  * Note: it is OK to pass buffer == InvalidBuffer, indicating that no old
01033  * buffer actually needs to be released.  This case is the same as ReadBuffer,
01034  * but can save some tests in the caller.
01035  */
01036 Buffer
01037 ReleaseAndReadBuffer(Buffer buffer,
01038                      Relation relation,
01039                      BlockNumber blockNum)
01040 {
01041     ForkNumber  forkNum = MAIN_FORKNUM;
01042     volatile BufferDesc *bufHdr;
01043 
01044     if (BufferIsValid(buffer))
01045     {
01046         if (BufferIsLocal(buffer))
01047         {
01048             Assert(LocalRefCount[-buffer - 1] > 0);
01049             bufHdr = &LocalBufferDescriptors[-buffer - 1];
01050             if (bufHdr->tag.blockNum == blockNum &&
01051                 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
01052                 bufHdr->tag.forkNum == forkNum)
01053                 return buffer;
01054             ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
01055             LocalRefCount[-buffer - 1]--;
01056         }
01057         else
01058         {
01059             Assert(PrivateRefCount[buffer - 1] > 0);
01060             bufHdr = &BufferDescriptors[buffer - 1];
01061             /* we have pin, so it's ok to examine tag without spinlock */
01062             if (bufHdr->tag.blockNum == blockNum &&
01063                 RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
01064                 bufHdr->tag.forkNum == forkNum)
01065                 return buffer;
01066             UnpinBuffer(bufHdr, true);
01067         }
01068     }
01069 
01070     return ReadBuffer(relation, blockNum);
01071 }
01072 
01073 /*
01074  * PinBuffer -- make buffer unavailable for replacement.
01075  *
01076  * For the default access strategy, the buffer's usage_count is incremented
01077  * when we first pin it; for other strategies we just make sure the usage_count
01078  * isn't zero.  (The idea of the latter is that we don't want synchronized
01079  * heap scans to inflate the count, but we need it to not be zero to discourage
01080  * other backends from stealing buffers from our ring.  As long as we cycle
01081  * through the ring faster than the global clock-sweep cycles, buffers in
01082  * our ring won't be chosen as victims for replacement by other backends.)
01083  *
01084  * This should be applied only to shared buffers, never local ones.
01085  *
01086  * Note that ResourceOwnerEnlargeBuffers must have been done already.
01087  *
01088  * Returns TRUE if buffer is BM_VALID, else FALSE.  This provision allows
01089  * some callers to avoid an extra spinlock cycle.
01090  */
01091 static bool
01092 PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy)
01093 {
01094     int         b = buf->buf_id;
01095     bool        result;
01096 
01097     if (PrivateRefCount[b] == 0)
01098     {
01099         LockBufHdr(buf);
01100         buf->refcount++;
01101         if (strategy == NULL)
01102         {
01103             if (buf->usage_count < BM_MAX_USAGE_COUNT)
01104                 buf->usage_count++;
01105         }
01106         else
01107         {
01108             if (buf->usage_count == 0)
01109                 buf->usage_count = 1;
01110         }
01111         result = (buf->flags & BM_VALID) != 0;
01112         UnlockBufHdr(buf);
01113     }
01114     else
01115     {
01116         /* If we previously pinned the buffer, it must surely be valid */
01117         result = true;
01118     }
01119     PrivateRefCount[b]++;
01120     Assert(PrivateRefCount[b] > 0);
01121     ResourceOwnerRememberBuffer(CurrentResourceOwner,
01122                                 BufferDescriptorGetBuffer(buf));
01123     return result;
01124 }
01125 
01126 /*
01127  * PinBuffer_Locked -- as above, but caller already locked the buffer header.
01128  * The spinlock is released before return.
01129  *
01130  * Currently, no callers of this function want to modify the buffer's
01131  * usage_count at all, so there's no need for a strategy parameter.
01132  * Also we don't bother with a BM_VALID test (the caller could check that for
01133  * itself).
01134  *
01135  * Note: use of this routine is frequently mandatory, not just an optimization
01136  * to save a spin lock/unlock cycle, because we need to pin a buffer before
01137  * its state can change under us.
01138  */
01139 static void
01140 PinBuffer_Locked(volatile BufferDesc *buf)
01141 {
01142     int         b = buf->buf_id;
01143 
01144     if (PrivateRefCount[b] == 0)
01145         buf->refcount++;
01146     UnlockBufHdr(buf);
01147     PrivateRefCount[b]++;
01148     Assert(PrivateRefCount[b] > 0);
01149     ResourceOwnerRememberBuffer(CurrentResourceOwner,
01150                                 BufferDescriptorGetBuffer(buf));
01151 }
01152 
01153 /*
01154  * UnpinBuffer -- make buffer available for replacement.
01155  *
01156  * This should be applied only to shared buffers, never local ones.
01157  *
01158  * Most but not all callers want CurrentResourceOwner to be adjusted.
01159  * Those that don't should pass fixOwner = FALSE.
01160  */
01161 static void
01162 UnpinBuffer(volatile BufferDesc *buf, bool fixOwner)
01163 {
01164     int         b = buf->buf_id;
01165 
01166     if (fixOwner)
01167         ResourceOwnerForgetBuffer(CurrentResourceOwner,
01168                                   BufferDescriptorGetBuffer(buf));
01169 
01170     Assert(PrivateRefCount[b] > 0);
01171     PrivateRefCount[b]--;
01172     if (PrivateRefCount[b] == 0)
01173     {
01174         /* I'd better not still hold any locks on the buffer */
01175         Assert(!LWLockHeldByMe(buf->content_lock));
01176         Assert(!LWLockHeldByMe(buf->io_in_progress_lock));
01177 
01178         LockBufHdr(buf);
01179 
01180         /* Decrement the shared reference count */
01181         Assert(buf->refcount > 0);
01182         buf->refcount--;
01183 
01184         /* Support LockBufferForCleanup() */
01185         if ((buf->flags & BM_PIN_COUNT_WAITER) &&
01186             buf->refcount == 1)
01187         {
01188             /* we just released the last pin other than the waiter's */
01189             int         wait_backend_pid = buf->wait_backend_pid;
01190 
01191             buf->flags &= ~BM_PIN_COUNT_WAITER;
01192             UnlockBufHdr(buf);
01193             ProcSendSignal(wait_backend_pid);
01194         }
01195         else
01196             UnlockBufHdr(buf);
01197     }
01198 }
01199 
01200 /*
01201  * BufferSync -- Write out all dirty buffers in the pool.
01202  *
01203  * This is called at checkpoint time to write out all dirty shared buffers.
01204  * The checkpoint request flags should be passed in.  If CHECKPOINT_IMMEDIATE
01205  * is set, we disable delays between writes; if CHECKPOINT_IS_SHUTDOWN is
01206  * set, we write even unlogged buffers, which are otherwise skipped.  The
01207  * remaining flags currently have no effect here.
01208  */
01209 static void
01210 BufferSync(int flags)
01211 {
01212     int         buf_id;
01213     int         num_to_scan;
01214     int         num_to_write;
01215     int         num_written;
01216     int         mask = BM_DIRTY;
01217 
01218     /* Make sure we can handle the pin inside SyncOneBuffer */
01219     ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
01220 
01221     /*
01222      * Unless this is a shutdown checkpoint, we write only permanent, dirty
01223      * buffers.  But at shutdown or end of recovery, we write all dirty buffers.
01224      */
01225     if (!((flags & CHECKPOINT_IS_SHUTDOWN) || (flags & CHECKPOINT_END_OF_RECOVERY)))
01226         mask |= BM_PERMANENT;
01227 
01228     /*
01229      * Loop over all buffers, and mark the ones that need to be written with
01230      * BM_CHECKPOINT_NEEDED.  Count them as we go (num_to_write), so that we
01231      * can estimate how much work needs to be done.
01232      *
01233      * This allows us to write only those pages that were dirty when the
01234      * checkpoint began, and not those that get dirtied while it proceeds.
01235      * Whenever a page with BM_CHECKPOINT_NEEDED is written out, either by us
01236      * later in this function, or by normal backends or the bgwriter cleaning
01237      * scan, the flag is cleared.  Any buffer dirtied after this point won't
01238      * have the flag set.
01239      *
01240      * Note that if we fail to write some buffer, we may leave buffers with
01241      * BM_CHECKPOINT_NEEDED still set.  This is OK since any such buffer would
01242      * certainly need to be written for the next checkpoint attempt, too.
01243      */
01244     num_to_write = 0;
01245     for (buf_id = 0; buf_id < NBuffers; buf_id++)
01246     {
01247         volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
01248 
01249         /*
01250          * Header spinlock is enough to examine BM_DIRTY, see comment in
01251          * SyncOneBuffer.
01252          */
01253         LockBufHdr(bufHdr);
01254 
01255         if ((bufHdr->flags & mask) == mask)
01256         {
01257             bufHdr->flags |= BM_CHECKPOINT_NEEDED;
01258             num_to_write++;
01259         }
01260 
01261         UnlockBufHdr(bufHdr);
01262     }
01263 
01264     if (num_to_write == 0)
01265         return;                 /* nothing to do */
01266 
01267     TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write);
01268 
01269     /*
01270      * Loop over all buffers again, and write the ones (still) marked with
01271      * BM_CHECKPOINT_NEEDED.  In this loop, we start at the clock sweep point
01272      * since we might as well dump soon-to-be-recycled buffers first.
01273      *
01274      * Note that we don't read the buffer alloc count here --- that should be
01275      * left untouched till the next BgBufferSync() call.
01276      */
01277     buf_id = StrategySyncStart(NULL, NULL);
01278     num_to_scan = NBuffers;
01279     num_written = 0;
01280     while (num_to_scan-- > 0)
01281     {
01282         volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
01283 
01284         /*
01285          * We don't need to acquire the lock here, because we're only looking
01286          * at a single bit. It's possible that someone else writes the buffer
01287          * and clears the flag right after we check, but that doesn't matter
01288          * since SyncOneBuffer will then do nothing.  However, there is a
01289          * further race condition: it's conceivable that between the time we
01290          * examine the bit here and the time SyncOneBuffer acquires lock,
01291          * someone else not only wrote the buffer but replaced it with another
01292          * page and dirtied it.  In that improbable case, SyncOneBuffer will
01293          * write the buffer though we didn't need to.  It doesn't seem worth
01294          * guarding against this, though.
01295          */
01296         if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
01297         {
01298             if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN)
01299             {
01300                 TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
01301                 BgWriterStats.m_buf_written_checkpoints++;
01302                 num_written++;
01303 
01304                 /*
01305                  * We know there are at most num_to_write buffers with
01306                  * BM_CHECKPOINT_NEEDED set; so we can stop scanning if
01307                  * num_written reaches num_to_write.
01308                  *
01309                  * Note that num_written doesn't include buffers written by
01310                  * other backends, or by the bgwriter cleaning scan. That
01311                  * means that the estimate of how much progress we've made is
01312                  * conservative, and also that this test will often fail to
01313                  * trigger.  But it seems worth making anyway.
01314                  */
01315                 if (num_written >= num_to_write)
01316                     break;
01317 
01318                 /*
01319                  * Sleep to throttle our I/O rate.
01320                  */
01321                 CheckpointWriteDelay(flags, (double) num_written / num_to_write);
01322             }
01323         }
01324 
01325         if (++buf_id >= NBuffers)
01326             buf_id = 0;
01327     }
01328 
01329     /*
01330      * Update checkpoint statistics. As noted above, this doesn't include
01331      * buffers written by other backends or bgwriter scan.
01332      */
01333     CheckpointStats.ckpt_bufs_written += num_written;
01334 
01335     TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_write);
01336 }
01337 
01338 /*
01339  * BgBufferSync -- Write out some dirty buffers in the pool.
01340  *
01341  * This is called periodically by the background writer process.
01342  *
01343  * Returns true if it's appropriate for the bgwriter process to go into
01344  * low-power hibernation mode.  (This happens if the strategy clock sweep
01345  * has been "lapped" and no buffer allocations have occurred recently,
01346  * or if the bgwriter has been effectively disabled by setting
01347  * bgwriter_lru_maxpages to 0.)
01348  */
01349 bool
01350 BgBufferSync(void)
01351 {
01352     /* info obtained from freelist.c */
01353     int         strategy_buf_id;
01354     uint32      strategy_passes;
01355     uint32      recent_alloc;
01356 
01357     /*
01358      * Information saved between calls so we can determine the strategy
01359      * point's advance rate and avoid scanning already-cleaned buffers.
01360      */
01361     static bool saved_info_valid = false;
01362     static int  prev_strategy_buf_id;
01363     static uint32 prev_strategy_passes;
01364     static int  next_to_clean;
01365     static uint32 next_passes;
01366 
01367     /* Moving averages of allocation rate and clean-buffer density */
01368     static float smoothed_alloc = 0;
01369     static float smoothed_density = 10.0;
01370 
01371     /* Potentially these could be tunables, but for now, not */
01372     float       smoothing_samples = 16;
01373     float       scan_whole_pool_milliseconds = 120000.0;
01374 
01375     /* Used to compute how far we scan ahead */
01376     long        strategy_delta;
01377     int         bufs_to_lap;
01378     int         bufs_ahead;
01379     float       scans_per_alloc;
01380     int         reusable_buffers_est;
01381     int         upcoming_alloc_est;
01382     int         min_scan_buffers;
01383 
01384     /* Variables for the scanning loop proper */
01385     int         num_to_scan;
01386     int         num_written;
01387     int         reusable_buffers;
01388 
01389     /* Variables for final smoothed_density update */
01390     long        new_strategy_delta;
01391     uint32      new_recent_alloc;
01392 
01393     /*
01394      * Find out where the freelist clock sweep currently is, and how many
01395      * buffer allocations have happened since our last call.
01396      */
01397     strategy_buf_id = StrategySyncStart(&strategy_passes, &recent_alloc);
01398 
01399     /* Report buffer alloc counts to pgstat */
01400     BgWriterStats.m_buf_alloc += recent_alloc;
01401 
01402     /*
01403      * If we're not running the LRU scan, just stop after doing the stats
01404      * stuff.  We mark the saved state invalid so that we can recover sanely
01405      * if LRU scan is turned back on later.
01406      */
01407     if (bgwriter_lru_maxpages <= 0)
01408     {
01409         saved_info_valid = false;
01410         return true;
01411     }
01412 
01413     /*
01414      * Compute strategy_delta = how many buffers have been scanned by the
01415      * clock sweep since last time.  If first time through, assume none. Then
01416      * see if we are still ahead of the clock sweep, and if so, how many
01417      * buffers we could scan before we'd catch up with it and "lap" it. Note:
01418      * weird-looking coding of xxx_passes comparisons are to avoid bogus
01419      * behavior when the passes counts wrap around.
01420      */
01421     if (saved_info_valid)
01422     {
01423         int32       passes_delta = strategy_passes - prev_strategy_passes;
01424 
01425         strategy_delta = strategy_buf_id - prev_strategy_buf_id;
01426         strategy_delta += (long) passes_delta *NBuffers;
01427 
01428         Assert(strategy_delta >= 0);
01429 
01430         if ((int32) (next_passes - strategy_passes) > 0)
01431         {
01432             /* we're one pass ahead of the strategy point */
01433             bufs_to_lap = strategy_buf_id - next_to_clean;
01434 #ifdef BGW_DEBUG
01435             elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
01436                  next_passes, next_to_clean,
01437                  strategy_passes, strategy_buf_id,
01438                  strategy_delta, bufs_to_lap);
01439 #endif
01440         }
01441         else if (next_passes == strategy_passes &&
01442                  next_to_clean >= strategy_buf_id)
01443         {
01444             /* on same pass, but ahead or at least not behind */
01445             bufs_to_lap = NBuffers - (next_to_clean - strategy_buf_id);
01446 #ifdef BGW_DEBUG
01447             elog(DEBUG2, "bgwriter ahead: bgw %u-%u strategy %u-%u delta=%ld lap=%d",
01448                  next_passes, next_to_clean,
01449                  strategy_passes, strategy_buf_id,
01450                  strategy_delta, bufs_to_lap);
01451 #endif
01452         }
01453         else
01454         {
01455             /*
01456              * We're behind, so skip forward to the strategy point and start
01457              * cleaning from there.
01458              */
01459 #ifdef BGW_DEBUG
01460             elog(DEBUG2, "bgwriter behind: bgw %u-%u strategy %u-%u delta=%ld",
01461                  next_passes, next_to_clean,
01462                  strategy_passes, strategy_buf_id,
01463                  strategy_delta);
01464 #endif
01465             next_to_clean = strategy_buf_id;
01466             next_passes = strategy_passes;
01467             bufs_to_lap = NBuffers;
01468         }
01469     }
01470     else
01471     {
01472         /*
01473          * Initializing at startup or after LRU scanning had been off. Always
01474          * start at the strategy point.
01475          */
01476 #ifdef BGW_DEBUG
01477         elog(DEBUG2, "bgwriter initializing: strategy %u-%u",
01478              strategy_passes, strategy_buf_id);
01479 #endif
01480         strategy_delta = 0;
01481         next_to_clean = strategy_buf_id;
01482         next_passes = strategy_passes;
01483         bufs_to_lap = NBuffers;
01484     }
01485 
01486     /* Update saved info for next time */
01487     prev_strategy_buf_id = strategy_buf_id;
01488     prev_strategy_passes = strategy_passes;
01489     saved_info_valid = true;
01490 
01491     /*
01492      * Compute how many buffers had to be scanned for each new allocation, ie,
01493      * 1/density of reusable buffers, and track a moving average of that.
01494      *
01495      * If the strategy point didn't move, we don't update the density estimate
01496      */
01497     if (strategy_delta > 0 && recent_alloc > 0)
01498     {
01499         scans_per_alloc = (float) strategy_delta / (float) recent_alloc;
01500         smoothed_density += (scans_per_alloc - smoothed_density) /
01501             smoothing_samples;
01502     }
01503 
01504     /*
01505      * Estimate how many reusable buffers there are between the current
01506      * strategy point and where we've scanned ahead to, based on the smoothed
01507      * density estimate.
01508      */
01509     bufs_ahead = NBuffers - bufs_to_lap;
01510     reusable_buffers_est = (float) bufs_ahead / smoothed_density;
01511 
01512     /*
01513      * Track a moving average of recent buffer allocations.  Here, rather than
01514      * a true average we want a fast-attack, slow-decline behavior: we
01515      * immediately follow any increase.
01516      */
01517     if (smoothed_alloc <= (float) recent_alloc)
01518         smoothed_alloc = recent_alloc;
01519     else
01520         smoothed_alloc += ((float) recent_alloc - smoothed_alloc) /
01521             smoothing_samples;
01522 
01523     /* Scale the estimate by a GUC to allow more aggressive tuning. */
01524     upcoming_alloc_est = (int) (smoothed_alloc * bgwriter_lru_multiplier);
01525 
01526     /*
01527      * If recent_alloc remains at zero for many cycles, smoothed_alloc will
01528      * eventually underflow to zero, and the underflows produce annoying
01529      * kernel warnings on some platforms.  Once upcoming_alloc_est has gone to
01530      * zero, there's no point in tracking smaller and smaller values of
01531      * smoothed_alloc, so just reset it to exactly zero to avoid this
01532      * syndrome.  It will pop back up as soon as recent_alloc increases.
01533      */
01534     if (upcoming_alloc_est == 0)
01535         smoothed_alloc = 0;
01536 
01537     /*
01538      * Even in cases where there's been little or no buffer allocation
01539      * activity, we want to make a small amount of progress through the buffer
01540      * cache so that as many reusable buffers as possible are clean after an
01541      * idle period.
01542      *
01543      * (scan_whole_pool_milliseconds / BgWriterDelay) computes how many times
01544      * the BGW will be called during the scan_whole_pool time; slice the
01545      * buffer pool into that many sections.
01546      */
01547     min_scan_buffers = (int) (NBuffers / (scan_whole_pool_milliseconds / BgWriterDelay));
01548 
01549     if (upcoming_alloc_est < (min_scan_buffers + reusable_buffers_est))
01550     {
01551 #ifdef BGW_DEBUG
01552         elog(DEBUG2, "bgwriter: alloc_est=%d too small, using min=%d + reusable_est=%d",
01553              upcoming_alloc_est, min_scan_buffers, reusable_buffers_est);
01554 #endif
01555         upcoming_alloc_est = min_scan_buffers + reusable_buffers_est;
01556     }
01557 
01558     /*
01559      * Now write out dirty reusable buffers, working forward from the
01560      * next_to_clean point, until we have lapped the strategy scan, or cleaned
01561      * enough buffers to match our estimate of the next cycle's allocation
01562      * requirements, or hit the bgwriter_lru_maxpages limit.
01563      */
01564 
01565     /* Make sure we can handle the pin inside SyncOneBuffer */
01566     ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
01567 
01568     num_to_scan = bufs_to_lap;
01569     num_written = 0;
01570     reusable_buffers = reusable_buffers_est;
01571 
01572     /* Execute the LRU scan */
01573     while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
01574     {
01575         int         buffer_state = SyncOneBuffer(next_to_clean, true);
01576 
01577         if (++next_to_clean >= NBuffers)
01578         {
01579             next_to_clean = 0;
01580             next_passes++;
01581         }
01582         num_to_scan--;
01583 
01584         if (buffer_state & BUF_WRITTEN)
01585         {
01586             reusable_buffers++;
01587             if (++num_written >= bgwriter_lru_maxpages)
01588             {
01589                 BgWriterStats.m_maxwritten_clean++;
01590                 break;
01591             }
01592         }
01593         else if (buffer_state & BUF_REUSABLE)
01594             reusable_buffers++;
01595     }
01596 
01597     BgWriterStats.m_buf_written_clean += num_written;
01598 
01599 #ifdef BGW_DEBUG
01600     elog(DEBUG1, "bgwriter: recent_alloc=%u smoothed=%.2f delta=%ld ahead=%d density=%.2f reusable_est=%d upcoming_est=%d scanned=%d wrote=%d reusable=%d",
01601          recent_alloc, smoothed_alloc, strategy_delta, bufs_ahead,
01602          smoothed_density, reusable_buffers_est, upcoming_alloc_est,
01603          bufs_to_lap - num_to_scan,
01604          num_written,
01605          reusable_buffers - reusable_buffers_est);
01606 #endif
01607 
01608     /*
01609      * Consider the above scan as being like a new allocation scan.
01610      * Characterize its density and update the smoothed one based on it. This
01611      * effectively halves the moving average period in cases where both the
01612      * strategy and the background writer are doing some useful scanning,
01613      * which is helpful because a long memory isn't as desirable on the
01614      * density estimates.
01615      */
01616     new_strategy_delta = bufs_to_lap - num_to_scan;
01617     new_recent_alloc = reusable_buffers - reusable_buffers_est;
01618     if (new_strategy_delta > 0 && new_recent_alloc > 0)
01619     {
01620         scans_per_alloc = (float) new_strategy_delta / (float) new_recent_alloc;
01621         smoothed_density += (scans_per_alloc - smoothed_density) /
01622             smoothing_samples;
01623 
01624 #ifdef BGW_DEBUG
01625         elog(DEBUG2, "bgwriter: cleaner density alloc=%u scan=%ld density=%.2f new smoothed=%.2f",
01626              new_recent_alloc, new_strategy_delta,
01627              scans_per_alloc, smoothed_density);
01628 #endif
01629     }
01630 
01631     /* Return true if OK to hibernate */
01632     return (bufs_to_lap == 0 && recent_alloc == 0);
01633 }
01634 
01635 /*
01636  * SyncOneBuffer -- process a single buffer during syncing.
01637  *
01638  * If skip_recently_used is true, we don't write currently-pinned buffers, nor
01639  * buffers marked recently used, as these are not replacement candidates.
01640  *
01641  * Returns a bitmask containing the following flag bits:
01642  *  BUF_WRITTEN: we wrote the buffer.
01643  *  BUF_REUSABLE: buffer is available for replacement, ie, it has
01644  *      pin count 0 and usage count 0.
01645  *
01646  * (BUF_WRITTEN could be set in error if FlushBuffers finds the buffer clean
01647  * after locking it, but we don't care all that much.)
01648  *
01649  * Note: caller must have done ResourceOwnerEnlargeBuffers.
01650  */
01651 static int
01652 SyncOneBuffer(int buf_id, bool skip_recently_used)
01653 {
01654     volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
01655     int         result = 0;
01656 
01657     /*
01658      * Check whether buffer needs writing.
01659      *
01660      * We can make this check without taking the buffer content lock so long
01661      * as we mark pages dirty in access methods *before* logging changes with
01662      * XLogInsert(): if someone marks the buffer dirty just after our check we
01663      * don't worry because our checkpoint.redo points before log record for
01664      * upcoming changes and so we are not required to write such dirty buffer.
01665      */
01666     LockBufHdr(bufHdr);
01667 
01668     if (bufHdr->refcount == 0 && bufHdr->usage_count == 0)
01669         result |= BUF_REUSABLE;
01670     else if (skip_recently_used)
01671     {
01672         /* Caller told us not to write recently-used buffers */
01673         UnlockBufHdr(bufHdr);
01674         return result;
01675     }
01676 
01677     if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY))
01678     {
01679         /* It's clean, so nothing to do */
01680         UnlockBufHdr(bufHdr);
01681         return result;
01682     }
01683 
01684     /*
01685      * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
01686      * buffer is clean by the time we've locked it.)
01687      */
01688     PinBuffer_Locked(bufHdr);
01689     LWLockAcquire(bufHdr->content_lock, LW_SHARED);
01690 
01691     FlushBuffer(bufHdr, NULL);
01692 
01693     LWLockRelease(bufHdr->content_lock);
01694     UnpinBuffer(bufHdr, true);
01695 
01696     return result | BUF_WRITTEN;
01697 }
01698 
01699 
01700 /*
01701  *      AtEOXact_Buffers - clean up at end of transaction.
01702  *
01703  *      As of PostgreSQL 8.0, buffer pins should get released by the
01704  *      ResourceOwner mechanism.  This routine is just a debugging
01705  *      cross-check that no pins remain.
01706  */
01707 void
01708 AtEOXact_Buffers(bool isCommit)
01709 {
01710 #ifdef USE_ASSERT_CHECKING
01711     if (assert_enabled)
01712     {
01713         int         RefCountErrors = 0;
01714         Buffer      b;
01715 
01716         for (b = 1; b <= NBuffers; b++)
01717         {
01718             if (PrivateRefCount[b - 1] != 0)
01719             {
01720                 PrintBufferLeakWarning(b);
01721                 RefCountErrors++;
01722             }
01723         }
01724         Assert(RefCountErrors == 0);
01725     }
01726 #endif
01727 
01728     AtEOXact_LocalBuffers(isCommit);
01729 }
01730 
01731 /*
01732  * InitBufferPoolBackend --- second-stage initialization of a new backend
01733  *
01734  * This is called after we have acquired a PGPROC and so can safely get
01735  * LWLocks.  We don't currently need to do anything at this stage ...
01736  * except register a shmem-exit callback.  AtProcExit_Buffers needs LWLock
01737  * access, and thereby has to be called at the corresponding phase of
01738  * backend shutdown.
01739  */
01740 void
01741 InitBufferPoolBackend(void)
01742 {
01743     on_shmem_exit(AtProcExit_Buffers, 0);
01744 }
01745 
01746 /*
01747  * During backend exit, ensure that we released all shared-buffer locks and
01748  * assert that we have no remaining pins.
01749  */
01750 static void
01751 AtProcExit_Buffers(int code, Datum arg)
01752 {
01753     AbortBufferIO();
01754     UnlockBuffers();
01755 
01756 #ifdef USE_ASSERT_CHECKING
01757     if (assert_enabled)
01758     {
01759         int         RefCountErrors = 0;
01760         Buffer      b;
01761 
01762         for (b = 1; b <= NBuffers; b++)
01763         {
01764             if (PrivateRefCount[b - 1] != 0)
01765             {
01766                 PrintBufferLeakWarning(b);
01767                 RefCountErrors++;
01768             }
01769         }
01770         Assert(RefCountErrors == 0);
01771     }
01772 #endif
01773 
01774     /* localbuf.c needs a chance too */
01775     AtProcExit_LocalBuffers();
01776 }
01777 
01778 /*
01779  * Helper routine to issue warnings when a buffer is unexpectedly pinned
01780  */
01781 void
01782 PrintBufferLeakWarning(Buffer buffer)
01783 {
01784     volatile BufferDesc *buf;
01785     int32       loccount;
01786     char       *path;
01787     BackendId   backend;
01788 
01789     Assert(BufferIsValid(buffer));
01790     if (BufferIsLocal(buffer))
01791     {
01792         buf = &LocalBufferDescriptors[-buffer - 1];
01793         loccount = LocalRefCount[-buffer - 1];
01794         backend = MyBackendId;
01795     }
01796     else
01797     {
01798         buf = &BufferDescriptors[buffer - 1];
01799         loccount = PrivateRefCount[buffer - 1];
01800         backend = InvalidBackendId;
01801     }
01802 
01803     /* theoretically we should lock the bufhdr here */
01804     path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum);
01805     elog(WARNING,
01806          "buffer refcount leak: [%03d] "
01807          "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
01808          buffer, path,
01809          buf->tag.blockNum, buf->flags,
01810          buf->refcount, loccount);
01811     pfree(path);
01812 }
01813 
01814 /*
01815  * CheckPointBuffers
01816  *
01817  * Flush all dirty blocks in buffer pool to disk at checkpoint time.
01818  *
01819  * Note: temporary relations do not participate in checkpoints, so they don't
01820  * need to be flushed.
01821  */
01822 void
01823 CheckPointBuffers(int flags)
01824 {
01825     TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
01826     CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
01827     BufferSync(flags);
01828     CheckpointStats.ckpt_sync_t = GetCurrentTimestamp();
01829     TRACE_POSTGRESQL_BUFFER_CHECKPOINT_SYNC_START();
01830     smgrsync();
01831     CheckpointStats.ckpt_sync_end_t = GetCurrentTimestamp();
01832     TRACE_POSTGRESQL_BUFFER_CHECKPOINT_DONE();
01833 }
01834 
01835 
01836 /*
01837  * Do whatever is needed to prepare for commit at the bufmgr and smgr levels
01838  */
01839 void
01840 BufmgrCommit(void)
01841 {
01842     /* Nothing to do in bufmgr anymore... */
01843 }
01844 
01845 /*
01846  * BufferGetBlockNumber
01847  *      Returns the block number associated with a buffer.
01848  *
01849  * Note:
01850  *      Assumes that the buffer is valid and pinned, else the
01851  *      value may be obsolete immediately...
01852  */
01853 BlockNumber
01854 BufferGetBlockNumber(Buffer buffer)
01855 {
01856     volatile BufferDesc *bufHdr;
01857 
01858     Assert(BufferIsPinned(buffer));
01859 
01860     if (BufferIsLocal(buffer))
01861         bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
01862     else
01863         bufHdr = &BufferDescriptors[buffer - 1];
01864 
01865     /* pinned, so OK to read tag without spinlock */
01866     return bufHdr->tag.blockNum;
01867 }
01868 
01869 /*
01870  * BufferGetTag
01871  *      Returns the relfilenode, fork number and block number associated with
01872  *      a buffer.
01873  */
01874 void
01875 BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum,
01876              BlockNumber *blknum)
01877 {
01878     volatile BufferDesc *bufHdr;
01879 
01880     /* Do the same checks as BufferGetBlockNumber. */
01881     Assert(BufferIsPinned(buffer));
01882 
01883     if (BufferIsLocal(buffer))
01884         bufHdr = &(LocalBufferDescriptors[-buffer - 1]);
01885     else
01886         bufHdr = &BufferDescriptors[buffer - 1];
01887 
01888     /* pinned, so OK to read tag without spinlock */
01889     *rnode = bufHdr->tag.rnode;
01890     *forknum = bufHdr->tag.forkNum;
01891     *blknum = bufHdr->tag.blockNum;
01892 }
01893 
01894 /*
01895  * FlushBuffer
01896  *      Physically write out a shared buffer.
01897  *
01898  * NOTE: this actually just passes the buffer contents to the kernel; the
01899  * real write to disk won't happen until the kernel feels like it.  This
01900  * is okay from our point of view since we can redo the changes from WAL.
01901  * However, we will need to force the changes to disk via fsync before
01902  * we can checkpoint WAL.
01903  *
01904  * The caller must hold a pin on the buffer and have share-locked the
01905  * buffer contents.  (Note: a share-lock does not prevent updates of
01906  * hint bits in the buffer, so the page could change while the write
01907  * is in progress, but we assume that that will not invalidate the data
01908  * written.)
01909  *
01910  * If the caller has an smgr reference for the buffer's relation, pass it
01911  * as the second parameter.  If not, pass NULL.
01912  */
01913 static void
01914 FlushBuffer(volatile BufferDesc *buf, SMgrRelation reln)
01915 {
01916     XLogRecPtr  recptr;
01917     ErrorContextCallback errcallback;
01918     instr_time  io_start,
01919                 io_time;
01920     Block       bufBlock;
01921     char        *bufToWrite;
01922 
01923     /*
01924      * Acquire the buffer's io_in_progress lock.  If StartBufferIO returns
01925      * false, then someone else flushed the buffer before we could, so we need
01926      * not do anything.
01927      */
01928     if (!StartBufferIO(buf, false))
01929         return;
01930 
01931     /* Setup error traceback support for ereport() */
01932     errcallback.callback = shared_buffer_write_error_callback;
01933     errcallback.arg = (void *) buf;
01934     errcallback.previous = error_context_stack;
01935     error_context_stack = &errcallback;
01936 
01937     /* Find smgr relation for buffer */
01938     if (reln == NULL)
01939         reln = smgropen(buf->tag.rnode, InvalidBackendId);
01940 
01941     TRACE_POSTGRESQL_BUFFER_FLUSH_START(buf->tag.forkNum,
01942                                         buf->tag.blockNum,
01943                                         reln->smgr_rnode.node.spcNode,
01944                                         reln->smgr_rnode.node.dbNode,
01945                                         reln->smgr_rnode.node.relNode);
01946 
01947     LockBufHdr(buf);
01948 
01949     /*
01950      * Run PageGetLSN while holding header lock, since we don't have the
01951      * buffer locked exclusively in all cases.
01952      */
01953     recptr = BufferGetLSN(buf);
01954 
01955     /* To check if block content changes while flushing. - vadim 01/17/97 */
01956     buf->flags &= ~BM_JUST_DIRTIED;
01957     UnlockBufHdr(buf);
01958 
01959     /*
01960      * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
01961      * rule that log updates must hit disk before any of the data-file changes
01962      * they describe do.
01963      *
01964      * However, this rule does not apply to unlogged relations, which will be
01965      * lost after a crash anyway.  Most unlogged relation pages do not bear
01966      * LSNs since we never emit WAL records for them, and therefore flushing
01967      * up through the buffer LSN would be useless, but harmless.  However, GiST
01968      * indexes use LSNs internally to track page-splits, and therefore unlogged
01969      * GiST pages bear "fake" LSNs generated by GetFakeLSNForUnloggedRel.  It
01970      * is unlikely but possible that the fake LSN counter could advance past
01971      * the WAL insertion point; and if it did happen, attempting to flush WAL
01972      * through that location would fail, with disastrous system-wide
01973      * consequences.  To make sure that can't happen, skip the flush if the
01974      * buffer isn't permanent.
01975      */
01976     if (buf->flags & BM_PERMANENT)
01977         XLogFlush(recptr);
01978 
01979     /*
01980      * Now it's safe to write buffer to disk. Note that no one else should
01981      * have been able to write it while we were busy with log flushing because
01982      * we have the io_in_progress lock.
01983      */
01984 
01985     bufBlock = BufHdrGetBlock(buf);
01986 
01987     bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
01988 
01989     if (track_io_timing)
01990         INSTR_TIME_SET_CURRENT(io_start);
01991 
01992     /*
01993      * bufToWrite is either the shared buffer or a copy, as appropriate.
01994      */
01995     smgrwrite(reln,
01996               buf->tag.forkNum,
01997               buf->tag.blockNum,
01998               bufToWrite,
01999               false);
02000 
02001     if (track_io_timing)
02002     {
02003         INSTR_TIME_SET_CURRENT(io_time);
02004         INSTR_TIME_SUBTRACT(io_time, io_start);
02005         pgstat_count_buffer_write_time(INSTR_TIME_GET_MICROSEC(io_time));
02006         INSTR_TIME_ADD(pgBufferUsage.blk_write_time, io_time);
02007     }
02008 
02009     pgBufferUsage.shared_blks_written++;
02010 
02011     /*
02012      * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
02013      * end the io_in_progress state.
02014      */
02015     TerminateBufferIO(buf, true, 0);
02016 
02017     TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(buf->tag.forkNum,
02018                                        buf->tag.blockNum,
02019                                        reln->smgr_rnode.node.spcNode,
02020                                        reln->smgr_rnode.node.dbNode,
02021                                        reln->smgr_rnode.node.relNode);
02022 
02023     /* Pop the error context stack */
02024     error_context_stack = errcallback.previous;
02025 }
02026 
02027 /*
02028  * RelationGetNumberOfBlocks
02029  *      Determines the current number of pages in the relation.
02030  */
02031 BlockNumber
02032 RelationGetNumberOfBlocksInFork(Relation relation, ForkNumber forkNum)
02033 {
02034     /* Open it at the smgr level if not already done */
02035     RelationOpenSmgr(relation);
02036 
02037     return smgrnblocks(relation->rd_smgr, forkNum);
02038 }
02039 
02040 /*
02041  * BufferIsPermanent
02042  *      Determines whether a buffer will potentially still be around after
02043  *      a crash.  Caller must hold a buffer pin.
02044  */
02045 bool
02046 BufferIsPermanent(Buffer buffer)
02047 {
02048     volatile BufferDesc *bufHdr;
02049 
02050     /* Local buffers are used only for temp relations. */
02051     if (BufferIsLocal(buffer))
02052         return false;
02053 
02054     /* Make sure we've got a real buffer, and that we hold a pin on it. */
02055     Assert(BufferIsValid(buffer));
02056     Assert(BufferIsPinned(buffer));
02057 
02058     /*
02059      * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we
02060      * need not bother with the buffer header spinlock.  Even if someone else
02061      * changes the buffer header flags while we're doing this, we assume that
02062      * changing an aligned 2-byte BufFlags value is atomic, so we'll read the
02063      * old value or the new value, but not random garbage.
02064      */
02065     bufHdr = &BufferDescriptors[buffer - 1];
02066     return (bufHdr->flags & BM_PERMANENT) != 0;
02067 }
02068 
02069 /*
02070  * BufferGetLSNAtomic
02071  *      Retrieves the LSN of the buffer atomically using a buffer header lock.
02072  *      This is necessary for some callers who may not have an exclusive lock
02073  *      on the buffer.
02074  */
02075 XLogRecPtr
02076 BufferGetLSNAtomic(Buffer buffer)
02077 {
02078     volatile BufferDesc *bufHdr = &BufferDescriptors[buffer - 1];
02079     char                *page = BufferGetPage(buffer);
02080     XLogRecPtr           lsn;
02081 
02082     /*
02083      * If we don't need locking for correctness, fastpath out.
02084      */
02085     if (!DataChecksumsEnabled() || BufferIsLocal(buffer))
02086         return PageGetLSN(page);
02087 
02088     /* Make sure we've got a real buffer, and that we hold a pin on it. */
02089     Assert(BufferIsValid(buffer));
02090     Assert(BufferIsPinned(buffer));
02091 
02092     LockBufHdr(bufHdr);
02093     lsn = PageGetLSN(page);
02094     UnlockBufHdr(bufHdr);
02095 
02096     return lsn;
02097 }
02098 
02099 /* ---------------------------------------------------------------------
02100  *      DropRelFileNodeBuffers
02101  *
02102  *      This function removes from the buffer pool all the pages of the
02103  *      specified relation fork that have block numbers >= firstDelBlock.
02104  *      (In particular, with firstDelBlock = 0, all pages are removed.)
02105  *      Dirty pages are simply dropped, without bothering to write them
02106  *      out first.  Therefore, this is NOT rollback-able, and so should be
02107  *      used only with extreme caution!
02108  *
02109  *      Currently, this is called only from smgr.c when the underlying file
02110  *      is about to be deleted or truncated (firstDelBlock is needed for
02111  *      the truncation case).  The data in the affected pages would therefore
02112  *      be deleted momentarily anyway, and there is no point in writing it.
02113  *      It is the responsibility of higher-level code to ensure that the
02114  *      deletion or truncation does not lose any data that could be needed
02115  *      later.  It is also the responsibility of higher-level code to ensure
02116  *      that no other process could be trying to load more pages of the
02117  *      relation into buffers.
02118  *
02119  *      XXX currently it sequentially searches the buffer pool, should be
02120  *      changed to more clever ways of searching.  However, this routine
02121  *      is used only in code paths that aren't very performance-critical,
02122  *      and we shouldn't slow down the hot paths to make it faster ...
02123  * --------------------------------------------------------------------
02124  */
02125 void
02126 DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum,
02127                        BlockNumber firstDelBlock)
02128 {
02129     int         i;
02130 
02131     /* If it's a local relation, it's localbuf.c's problem. */
02132     if (RelFileNodeBackendIsTemp(rnode))
02133     {
02134         if (rnode.backend == MyBackendId)
02135             DropRelFileNodeLocalBuffers(rnode.node, forkNum, firstDelBlock);
02136         return;
02137     }
02138 
02139     for (i = 0; i < NBuffers; i++)
02140     {
02141         volatile BufferDesc *bufHdr = &BufferDescriptors[i];
02142 
02143         /*
02144          * We can make this a tad faster by prechecking the buffer tag before
02145          * we attempt to lock the buffer; this saves a lot of lock
02146          * acquisitions in typical cases.  It should be safe because the
02147          * caller must have AccessExclusiveLock on the relation, or some other
02148          * reason to be certain that no one is loading new pages of the rel
02149          * into the buffer pool.  (Otherwise we might well miss such pages
02150          * entirely.)  Therefore, while the tag might be changing while we
02151          * look at it, it can't be changing *to* a value we care about, only
02152          * *away* from such a value.  So false negatives are impossible, and
02153          * false positives are safe because we'll recheck after getting the
02154          * buffer lock.
02155          *
02156          * We could check forkNum and blockNum as well as the rnode, but the
02157          * incremental win from doing so seems small.
02158          */
02159         if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
02160             continue;
02161 
02162         LockBufHdr(bufHdr);
02163         if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) &&
02164             bufHdr->tag.forkNum == forkNum &&
02165             bufHdr->tag.blockNum >= firstDelBlock)
02166             InvalidateBuffer(bufHdr);   /* releases spinlock */
02167         else
02168             UnlockBufHdr(bufHdr);
02169     }
02170 }
02171 
02172 /* ---------------------------------------------------------------------
02173  *      DropRelFileNodesAllBuffers
02174  *
02175  *      This function removes from the buffer pool all the pages of all
02176  *      forks of the specified relations.  It's equivalent to calling
02177  *      DropRelFileNodeBuffers once per fork per relation with
02178  *      firstDelBlock = 0.
02179  * --------------------------------------------------------------------
02180  */
02181 void
02182 DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes)
02183 {
02184     int         i,
02185                 n = 0;
02186     RelFileNode *nodes;
02187     bool        use_bsearch;
02188 
02189     if (nnodes == 0)
02190         return;
02191 
02192     nodes = palloc(sizeof(RelFileNode) * nnodes); /* non-local relations */
02193 
02194     /* If it's a local relation, it's localbuf.c's problem. */
02195     for (i = 0; i < nnodes; i++)
02196     {
02197         if (RelFileNodeBackendIsTemp(rnodes[i]))
02198         {
02199             if (rnodes[i].backend == MyBackendId)
02200                 DropRelFileNodeAllLocalBuffers(rnodes[i].node);
02201         }
02202         else
02203             nodes[n++] = rnodes[i].node;
02204     }
02205 
02206     /*
02207      * If there are no non-local relations, then we're done. Release the memory
02208      * and return.
02209      */
02210     if (n == 0)
02211     {
02212         pfree(nodes);
02213         return;
02214     }
02215 
02216     /*
02217      * For low number of relations to drop just use a simple walk through, to
02218      * save the bsearch overhead. The threshold to use is rather a guess than a
02219      * exactly determined value, as it depends on many factors (CPU and RAM
02220      * speeds, amount of shared buffers etc.).
02221      */
02222     use_bsearch = n > DROP_RELS_BSEARCH_THRESHOLD;
02223 
02224     /* sort the list of rnodes if necessary */
02225     if (use_bsearch)
02226         pg_qsort(nodes, n, sizeof(RelFileNode), rnode_comparator);
02227 
02228     for (i = 0; i < NBuffers; i++)
02229     {
02230         RelFileNode *rnode = NULL;
02231         volatile BufferDesc *bufHdr = &BufferDescriptors[i];
02232 
02233         /*
02234          * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
02235          * and saves some cycles.
02236          */
02237 
02238         if (!use_bsearch)
02239         {
02240             int     j;
02241 
02242             for (j = 0; j < n; j++)
02243             {
02244                 if (RelFileNodeEquals(bufHdr->tag.rnode, nodes[j]))
02245                 {
02246                     rnode = &nodes[j];
02247                     break;
02248                 }
02249             }
02250         }
02251         else
02252         {
02253             rnode = bsearch((const void *) &(bufHdr->tag.rnode),
02254                             nodes, n, sizeof(RelFileNode),
02255                             rnode_comparator);
02256         }
02257 
02258         /* buffer doesn't belong to any of the given relfilenodes; skip it */
02259         if (rnode == NULL)
02260             continue;
02261 
02262         LockBufHdr(bufHdr);
02263         if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode)))
02264             InvalidateBuffer(bufHdr);   /* releases spinlock */
02265         else
02266             UnlockBufHdr(bufHdr);
02267     }
02268 
02269     pfree(nodes);
02270 }
02271 
02272 /* ---------------------------------------------------------------------
02273  *      DropDatabaseBuffers
02274  *
02275  *      This function removes all the buffers in the buffer cache for a
02276  *      particular database.  Dirty pages are simply dropped, without
02277  *      bothering to write them out first.  This is used when we destroy a
02278  *      database, to avoid trying to flush data to disk when the directory
02279  *      tree no longer exists.  Implementation is pretty similar to
02280  *      DropRelFileNodeBuffers() which is for destroying just one relation.
02281  * --------------------------------------------------------------------
02282  */
02283 void
02284 DropDatabaseBuffers(Oid dbid)
02285 {
02286     int         i;
02287 
02288     /*
02289      * We needn't consider local buffers, since by assumption the target
02290      * database isn't our own.
02291      */
02292 
02293     for (i = 0; i < NBuffers; i++)
02294     {
02295         volatile BufferDesc *bufHdr = &BufferDescriptors[i];
02296 
02297         /*
02298          * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
02299          * and saves some cycles.
02300          */
02301         if (bufHdr->tag.rnode.dbNode != dbid)
02302             continue;
02303 
02304         LockBufHdr(bufHdr);
02305         if (bufHdr->tag.rnode.dbNode == dbid)
02306             InvalidateBuffer(bufHdr);   /* releases spinlock */
02307         else
02308             UnlockBufHdr(bufHdr);
02309     }
02310 }
02311 
02312 /* -----------------------------------------------------------------
02313  *      PrintBufferDescs
02314  *
02315  *      this function prints all the buffer descriptors, for debugging
02316  *      use only.
02317  * -----------------------------------------------------------------
02318  */
02319 #ifdef NOT_USED
02320 void
02321 PrintBufferDescs(void)
02322 {
02323     int         i;
02324     volatile BufferDesc *buf = BufferDescriptors;
02325 
02326     for (i = 0; i < NBuffers; ++i, ++buf)
02327     {
02328         /* theoretically we should lock the bufhdr here */
02329         elog(LOG,
02330              "[%02d] (freeNext=%d, rel=%s, "
02331              "blockNum=%u, flags=0x%x, refcount=%u %d)",
02332              i, buf->freeNext,
02333           relpathbackend(buf->tag.rnode, InvalidBackendId, buf->tag.forkNum),
02334              buf->tag.blockNum, buf->flags,
02335              buf->refcount, PrivateRefCount[i]);
02336     }
02337 }
02338 #endif
02339 
02340 #ifdef NOT_USED
02341 void
02342 PrintPinnedBufs(void)
02343 {
02344     int         i;
02345     volatile BufferDesc *buf = BufferDescriptors;
02346 
02347     for (i = 0; i < NBuffers; ++i, ++buf)
02348     {
02349         if (PrivateRefCount[i] > 0)
02350         {
02351             /* theoretically we should lock the bufhdr here */
02352             elog(LOG,
02353                  "[%02d] (freeNext=%d, rel=%s, "
02354                  "blockNum=%u, flags=0x%x, refcount=%u %d)",
02355                  i, buf->freeNext,
02356                  relpath(buf->tag.rnode, buf->tag.forkNum),
02357                  buf->tag.blockNum, buf->flags,
02358                  buf->refcount, PrivateRefCount[i]);
02359         }
02360     }
02361 }
02362 #endif
02363 
02364 /* ---------------------------------------------------------------------
02365  *      FlushRelationBuffers
02366  *
02367  *      This function writes all dirty pages of a relation out to disk
02368  *      (or more accurately, out to kernel disk buffers), ensuring that the
02369  *      kernel has an up-to-date view of the relation.
02370  *
02371  *      Generally, the caller should be holding AccessExclusiveLock on the
02372  *      target relation to ensure that no other backend is busy dirtying
02373  *      more blocks of the relation; the effects can't be expected to last
02374  *      after the lock is released.
02375  *
02376  *      XXX currently it sequentially searches the buffer pool, should be
02377  *      changed to more clever ways of searching.  This routine is not
02378  *      used in any performance-critical code paths, so it's not worth
02379  *      adding additional overhead to normal paths to make it go faster;
02380  *      but see also DropRelFileNodeBuffers.
02381  * --------------------------------------------------------------------
02382  */
02383 void
02384 FlushRelationBuffers(Relation rel)
02385 {
02386     int         i;
02387     volatile BufferDesc *bufHdr;
02388 
02389     /* Open rel at the smgr level if not already done */
02390     RelationOpenSmgr(rel);
02391 
02392     if (RelationUsesLocalBuffers(rel))
02393     {
02394         for (i = 0; i < NLocBuffer; i++)
02395         {
02396             bufHdr = &LocalBufferDescriptors[i];
02397             if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
02398                 (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
02399             {
02400                 ErrorContextCallback    errcallback;
02401                 Page                    localpage;
02402 
02403                 localpage = (char *) LocalBufHdrGetBlock(bufHdr);
02404 
02405                 /* Setup error traceback support for ereport() */
02406                 errcallback.callback = local_buffer_write_error_callback;
02407                 errcallback.arg = (void *) bufHdr;
02408                 errcallback.previous = error_context_stack;
02409                 error_context_stack = &errcallback;
02410 
02411                 PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
02412 
02413                 smgrwrite(rel->rd_smgr,
02414                           bufHdr->tag.forkNum,
02415                           bufHdr->tag.blockNum,
02416                           localpage,
02417                           false);
02418 
02419                 bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
02420 
02421                 /* Pop the error context stack */
02422                 error_context_stack = errcallback.previous;
02423             }
02424         }
02425 
02426         return;
02427     }
02428 
02429     /* Make sure we can handle the pin inside the loop */
02430     ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
02431 
02432     for (i = 0; i < NBuffers; i++)
02433     {
02434         bufHdr = &BufferDescriptors[i];
02435 
02436         /*
02437          * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
02438          * and saves some cycles.
02439          */
02440         if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
02441             continue;
02442 
02443         LockBufHdr(bufHdr);
02444         if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
02445             (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
02446         {
02447             PinBuffer_Locked(bufHdr);
02448             LWLockAcquire(bufHdr->content_lock, LW_SHARED);
02449             FlushBuffer(bufHdr, rel->rd_smgr);
02450             LWLockRelease(bufHdr->content_lock);
02451             UnpinBuffer(bufHdr, true);
02452         }
02453         else
02454             UnlockBufHdr(bufHdr);
02455     }
02456 }
02457 
02458 /* ---------------------------------------------------------------------
02459  *      FlushDatabaseBuffers
02460  *
02461  *      This function writes all dirty pages of a database out to disk
02462  *      (or more accurately, out to kernel disk buffers), ensuring that the
02463  *      kernel has an up-to-date view of the database.
02464  *
02465  *      Generally, the caller should be holding an appropriate lock to ensure
02466  *      no other backend is active in the target database; otherwise more
02467  *      pages could get dirtied.
02468  *
02469  *      Note we don't worry about flushing any pages of temporary relations.
02470  *      It's assumed these wouldn't be interesting.
02471  * --------------------------------------------------------------------
02472  */
02473 void
02474 FlushDatabaseBuffers(Oid dbid)
02475 {
02476     int         i;
02477     volatile BufferDesc *bufHdr;
02478 
02479     /* Make sure we can handle the pin inside the loop */
02480     ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
02481 
02482     for (i = 0; i < NBuffers; i++)
02483     {
02484         bufHdr = &BufferDescriptors[i];
02485 
02486         /*
02487          * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
02488          * and saves some cycles.
02489          */
02490         if (bufHdr->tag.rnode.dbNode != dbid)
02491             continue;
02492 
02493         LockBufHdr(bufHdr);
02494         if (bufHdr->tag.rnode.dbNode == dbid &&
02495             (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
02496         {
02497             PinBuffer_Locked(bufHdr);
02498             LWLockAcquire(bufHdr->content_lock, LW_SHARED);
02499             FlushBuffer(bufHdr, NULL);
02500             LWLockRelease(bufHdr->content_lock);
02501             UnpinBuffer(bufHdr, true);
02502         }
02503         else
02504             UnlockBufHdr(bufHdr);
02505     }
02506 }
02507 
02508 /*
02509  * ReleaseBuffer -- release the pin on a buffer
02510  */
02511 void
02512 ReleaseBuffer(Buffer buffer)
02513 {
02514     volatile BufferDesc *bufHdr;
02515 
02516     if (!BufferIsValid(buffer))
02517         elog(ERROR, "bad buffer ID: %d", buffer);
02518 
02519     ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
02520 
02521     if (BufferIsLocal(buffer))
02522     {
02523         Assert(LocalRefCount[-buffer - 1] > 0);
02524         LocalRefCount[-buffer - 1]--;
02525         return;
02526     }
02527 
02528     bufHdr = &BufferDescriptors[buffer - 1];
02529 
02530     Assert(PrivateRefCount[buffer - 1] > 0);
02531 
02532     if (PrivateRefCount[buffer - 1] > 1)
02533         PrivateRefCount[buffer - 1]--;
02534     else
02535         UnpinBuffer(bufHdr, false);
02536 }
02537 
02538 /*
02539  * UnlockReleaseBuffer -- release the content lock and pin on a buffer
02540  *
02541  * This is just a shorthand for a common combination.
02542  */
02543 void
02544 UnlockReleaseBuffer(Buffer buffer)
02545 {
02546     LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
02547     ReleaseBuffer(buffer);
02548 }
02549 
02550 /*
02551  * IncrBufferRefCount
02552  *      Increment the pin count on a buffer that we have *already* pinned
02553  *      at least once.
02554  *
02555  *      This function cannot be used on a buffer we do not have pinned,
02556  *      because it doesn't change the shared buffer state.
02557  */
02558 void
02559 IncrBufferRefCount(Buffer buffer)
02560 {
02561     Assert(BufferIsPinned(buffer));
02562     ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
02563     ResourceOwnerRememberBuffer(CurrentResourceOwner, buffer);
02564     if (BufferIsLocal(buffer))
02565         LocalRefCount[-buffer - 1]++;
02566     else
02567         PrivateRefCount[buffer - 1]++;
02568 }
02569 
02570 /*
02571  * MarkBufferDirtyHint
02572  *
02573  *  Mark a buffer dirty for non-critical changes.
02574  *
02575  * This is essentially the same as MarkBufferDirty, except:
02576  *
02577  * 1. The caller does not write WAL; so if checksums are enabled, we may need
02578  *    to write an XLOG_HINT WAL record to protect against torn pages.
02579  * 2. The caller might have only share-lock instead of exclusive-lock on the
02580  *    buffer's content lock.
02581  * 3. This function does not guarantee that the buffer is always marked dirty
02582  *    (due to a race condition), so it cannot be used for important changes.
02583  */
02584 void
02585 MarkBufferDirtyHint(Buffer buffer)
02586 {
02587     volatile BufferDesc *bufHdr;
02588     Page    page = BufferGetPage(buffer);
02589 
02590     if (!BufferIsValid(buffer))
02591         elog(ERROR, "bad buffer ID: %d", buffer);
02592 
02593     if (BufferIsLocal(buffer))
02594     {
02595         MarkLocalBufferDirty(buffer);
02596         return;
02597     }
02598 
02599     bufHdr = &BufferDescriptors[buffer - 1];
02600 
02601     Assert(PrivateRefCount[buffer - 1] > 0);
02602     /* here, either share or exclusive lock is OK */
02603     Assert(LWLockHeldByMe(bufHdr->content_lock));
02604 
02605     /*
02606      * This routine might get called many times on the same page, if we are
02607      * making the first scan after commit of an xact that added/deleted many
02608      * tuples. So, be as quick as we can if the buffer is already dirty.  We do
02609      * this by not acquiring spinlock if it looks like the status bits are
02610      * already set.  Since we make this test unlocked, there's a chance we
02611      * might fail to notice that the flags have just been cleared, and failed
02612      * to reset them, due to memory-ordering issues.  But since this function
02613      * is only intended to be used in cases where failing to write out the data
02614      * would be harmless anyway, it doesn't really matter.
02615      */
02616     if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
02617         (BM_DIRTY | BM_JUST_DIRTIED))
02618     {
02619         XLogRecPtr  lsn = InvalidXLogRecPtr;
02620         bool        dirtied = false;
02621         bool        delayChkpt = false;
02622 
02623         /*
02624          * If checksums are enabled, and the buffer is permanent, then a full
02625          * page image may be required even for some hint bit updates to protect
02626          * against torn pages. This full page image is only necessary if the
02627          * hint bit update is the first change to the page since the last
02628          * checkpoint.
02629          *
02630          * We don't check full_page_writes here because that logic is
02631          * included when we call XLogInsert() since the value changes
02632          * dynamically.
02633          */
02634         if (DataChecksumsEnabled() && (bufHdr->flags & BM_PERMANENT))
02635         {
02636             /*
02637              * If we're in recovery we cannot dirty a page because of a hint.
02638              * We can set the hint, just not dirty the page as a result so
02639              * the hint is lost when we evict the page or shutdown.
02640              *
02641              * See src/backend/storage/page/README for longer discussion.
02642              */
02643             if (RecoveryInProgress())
02644                 return;
02645 
02646             /*
02647              * If the block is already dirty because we either made a change
02648              * or set a hint already, then we don't need to write a full page
02649              * image.  Note that aggressive cleaning of blocks
02650              * dirtied by hint bit setting would increase the call rate.
02651              * Bulk setting of hint bits would reduce the call rate...
02652              *
02653              * We must issue the WAL record before we mark the buffer dirty.
02654              * Otherwise we might write the page before we write the WAL.
02655              * That causes a race condition, since a checkpoint might occur
02656              * between writing the WAL record and marking the buffer dirty.
02657              * We solve that with a kluge, but one that is already in use
02658              * during transaction commit to prevent race conditions.
02659              * Basically, we simply prevent the checkpoint WAL record from
02660              * being written until we have marked the buffer dirty. We don't
02661              * start the checkpoint flush until we have marked dirty, so our
02662              * checkpoint must flush the change to disk successfully or the
02663              * checkpoint never gets written, so crash recovery will fix.
02664              *
02665              * It's possible we may enter here without an xid, so it is
02666              * essential that CreateCheckpoint waits for virtual transactions
02667              * rather than full transactionids.
02668              */
02669             MyPgXact->delayChkpt = delayChkpt = true;
02670             lsn = XLogSaveBufferForHint(buffer);
02671         }
02672 
02673         LockBufHdr(bufHdr);
02674         Assert(bufHdr->refcount > 0);
02675         if (!(bufHdr->flags & BM_DIRTY))
02676         {
02677             dirtied = true;     /* Means "will be dirtied by this action" */
02678 
02679             /*
02680              * Set the page LSN if we wrote a backup block. We aren't
02681              * supposed to set this when only holding a share lock but
02682              * as long as we serialise it somehow we're OK. We choose to
02683              * set LSN while holding the buffer header lock, which causes
02684              * any reader of an LSN who holds only a share lock to also
02685              * obtain a buffer header lock before using PageGetLSN(),
02686              * which is enforced in BufferGetLSNAtomic().
02687              *
02688              * If checksums are enabled, you might think we should reset the
02689              * checksum here. That will happen when the page is written
02690              * sometime later in this checkpoint cycle.
02691              */
02692             if (!XLogRecPtrIsInvalid(lsn))
02693                 PageSetLSN(page, lsn);
02694         }
02695         bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
02696         UnlockBufHdr(bufHdr);
02697 
02698         if (delayChkpt)
02699             MyPgXact->delayChkpt = false;
02700 
02701         if (dirtied)
02702         {
02703             VacuumPageDirty++;
02704             if (VacuumCostActive)
02705                 VacuumCostBalance += VacuumCostPageDirty;
02706         }
02707     }
02708 }
02709 
02710 /*
02711  * Release buffer content locks for shared buffers.
02712  *
02713  * Used to clean up after errors.
02714  *
02715  * Currently, we can expect that lwlock.c's LWLockReleaseAll() took care
02716  * of releasing buffer content locks per se; the only thing we need to deal
02717  * with here is clearing any PIN_COUNT request that was in progress.
02718  */
02719 void
02720 UnlockBuffers(void)
02721 {
02722     volatile BufferDesc *buf = PinCountWaitBuf;
02723 
02724     if (buf)
02725     {
02726         LockBufHdr(buf);
02727 
02728         /*
02729          * Don't complain if flag bit not set; it could have been reset but we
02730          * got a cancel/die interrupt before getting the signal.
02731          */
02732         if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
02733             buf->wait_backend_pid == MyProcPid)
02734             buf->flags &= ~BM_PIN_COUNT_WAITER;
02735 
02736         UnlockBufHdr(buf);
02737 
02738         PinCountWaitBuf = NULL;
02739     }
02740 }
02741 
02742 /*
02743  * Acquire or release the content_lock for the buffer.
02744  */
02745 void
02746 LockBuffer(Buffer buffer, int mode)
02747 {
02748     volatile BufferDesc *buf;
02749 
02750     Assert(BufferIsValid(buffer));
02751     if (BufferIsLocal(buffer))
02752         return;                 /* local buffers need no lock */
02753 
02754     buf = &(BufferDescriptors[buffer - 1]);
02755 
02756     if (mode == BUFFER_LOCK_UNLOCK)
02757         LWLockRelease(buf->content_lock);
02758     else if (mode == BUFFER_LOCK_SHARE)
02759         LWLockAcquire(buf->content_lock, LW_SHARED);
02760     else if (mode == BUFFER_LOCK_EXCLUSIVE)
02761         LWLockAcquire(buf->content_lock, LW_EXCLUSIVE);
02762     else
02763         elog(ERROR, "unrecognized buffer lock mode: %d", mode);
02764 }
02765 
02766 /*
02767  * Acquire the content_lock for the buffer, but only if we don't have to wait.
02768  *
02769  * This assumes the caller wants BUFFER_LOCK_EXCLUSIVE mode.
02770  */
02771 bool
02772 ConditionalLockBuffer(Buffer buffer)
02773 {
02774     volatile BufferDesc *buf;
02775 
02776     Assert(BufferIsValid(buffer));
02777     if (BufferIsLocal(buffer))
02778         return true;            /* act as though we got it */
02779 
02780     buf = &(BufferDescriptors[buffer - 1]);
02781 
02782     return LWLockConditionalAcquire(buf->content_lock, LW_EXCLUSIVE);
02783 }
02784 
02785 /*
02786  * LockBufferForCleanup - lock a buffer in preparation for deleting items
02787  *
02788  * Items may be deleted from a disk page only when the caller (a) holds an
02789  * exclusive lock on the buffer and (b) has observed that no other backend
02790  * holds a pin on the buffer.  If there is a pin, then the other backend
02791  * might have a pointer into the buffer (for example, a heapscan reference
02792  * to an item --- see README for more details).  It's OK if a pin is added
02793  * after the cleanup starts, however; the newly-arrived backend will be
02794  * unable to look at the page until we release the exclusive lock.
02795  *
02796  * To implement this protocol, a would-be deleter must pin the buffer and
02797  * then call LockBufferForCleanup().  LockBufferForCleanup() is similar to
02798  * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), except that it loops until
02799  * it has successfully observed pin count = 1.
02800  */
02801 void
02802 LockBufferForCleanup(Buffer buffer)
02803 {
02804     volatile BufferDesc *bufHdr;
02805 
02806     Assert(BufferIsValid(buffer));
02807     Assert(PinCountWaitBuf == NULL);
02808 
02809     if (BufferIsLocal(buffer))
02810     {
02811         /* There should be exactly one pin */
02812         if (LocalRefCount[-buffer - 1] != 1)
02813             elog(ERROR, "incorrect local pin count: %d",
02814                  LocalRefCount[-buffer - 1]);
02815         /* Nobody else to wait for */
02816         return;
02817     }
02818 
02819     /* There should be exactly one local pin */
02820     if (PrivateRefCount[buffer - 1] != 1)
02821         elog(ERROR, "incorrect local pin count: %d",
02822              PrivateRefCount[buffer - 1]);
02823 
02824     bufHdr = &BufferDescriptors[buffer - 1];
02825 
02826     for (;;)
02827     {
02828         /* Try to acquire lock */
02829         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
02830         LockBufHdr(bufHdr);
02831         Assert(bufHdr->refcount > 0);
02832         if (bufHdr->refcount == 1)
02833         {
02834             /* Successfully acquired exclusive lock with pincount 1 */
02835             UnlockBufHdr(bufHdr);
02836             return;
02837         }
02838         /* Failed, so mark myself as waiting for pincount 1 */
02839         if (bufHdr->flags & BM_PIN_COUNT_WAITER)
02840         {
02841             UnlockBufHdr(bufHdr);
02842             LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
02843             elog(ERROR, "multiple backends attempting to wait for pincount 1");
02844         }
02845         bufHdr->wait_backend_pid = MyProcPid;
02846         bufHdr->flags |= BM_PIN_COUNT_WAITER;
02847         PinCountWaitBuf = bufHdr;
02848         UnlockBufHdr(bufHdr);
02849         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
02850 
02851         /* Wait to be signaled by UnpinBuffer() */
02852         if (InHotStandby)
02853         {
02854             /* Publish the bufid that Startup process waits on */
02855             SetStartupBufferPinWaitBufId(buffer - 1);
02856             /* Set alarm and then wait to be signaled by UnpinBuffer() */
02857             ResolveRecoveryConflictWithBufferPin();
02858             /* Reset the published bufid */
02859             SetStartupBufferPinWaitBufId(-1);
02860         }
02861         else
02862             ProcWaitForSignal();
02863 
02864         PinCountWaitBuf = NULL;
02865         /* Loop back and try again */
02866     }
02867 }
02868 
02869 /*
02870  * Check called from RecoveryConflictInterrupt handler when Startup
02871  * process requests cancellation of all pin holders that are blocking it.
02872  */
02873 bool
02874 HoldingBufferPinThatDelaysRecovery(void)
02875 {
02876     int         bufid = GetStartupBufferPinWaitBufId();
02877 
02878     /*
02879      * If we get woken slowly then it's possible that the Startup process was
02880      * already woken by other backends before we got here. Also possible that
02881      * we get here by multiple interrupts or interrupts at inappropriate
02882      * times, so make sure we do nothing if the bufid is not set.
02883      */
02884     if (bufid < 0)
02885         return false;
02886 
02887     if (PrivateRefCount[bufid] > 0)
02888         return true;
02889 
02890     return false;
02891 }
02892 
02893 /*
02894  * ConditionalLockBufferForCleanup - as above, but don't wait to get the lock
02895  *
02896  * We won't loop, but just check once to see if the pin count is OK.  If
02897  * not, return FALSE with no lock held.
02898  */
02899 bool
02900 ConditionalLockBufferForCleanup(Buffer buffer)
02901 {
02902     volatile BufferDesc *bufHdr;
02903 
02904     Assert(BufferIsValid(buffer));
02905 
02906     if (BufferIsLocal(buffer))
02907     {
02908         /* There should be exactly one pin */
02909         Assert(LocalRefCount[-buffer - 1] > 0);
02910         if (LocalRefCount[-buffer - 1] != 1)
02911             return false;
02912         /* Nobody else to wait for */
02913         return true;
02914     }
02915 
02916     /* There should be exactly one local pin */
02917     Assert(PrivateRefCount[buffer - 1] > 0);
02918     if (PrivateRefCount[buffer - 1] != 1)
02919         return false;
02920 
02921     /* Try to acquire lock */
02922     if (!ConditionalLockBuffer(buffer))
02923         return false;
02924 
02925     bufHdr = &BufferDescriptors[buffer - 1];
02926     LockBufHdr(bufHdr);
02927     Assert(bufHdr->refcount > 0);
02928     if (bufHdr->refcount == 1)
02929     {
02930         /* Successfully acquired exclusive lock with pincount 1 */
02931         UnlockBufHdr(bufHdr);
02932         return true;
02933     }
02934 
02935     /* Failed, so release the lock */
02936     UnlockBufHdr(bufHdr);
02937     LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
02938     return false;
02939 }
02940 
02941 
02942 /*
02943  *  Functions for buffer I/O handling
02944  *
02945  *  Note: We assume that nested buffer I/O never occurs.
02946  *  i.e at most one io_in_progress lock is held per proc.
02947  *
02948  *  Also note that these are used only for shared buffers, not local ones.
02949  */
02950 
02951 /*
02952  * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
02953  */
02954 static void
02955 WaitIO(volatile BufferDesc *buf)
02956 {
02957     /*
02958      * Changed to wait until there's no IO - Inoue 01/13/2000
02959      *
02960      * Note this is *necessary* because an error abort in the process doing
02961      * I/O could release the io_in_progress_lock prematurely. See
02962      * AbortBufferIO.
02963      */
02964     for (;;)
02965     {
02966         BufFlags    sv_flags;
02967 
02968         /*
02969          * It may not be necessary to acquire the spinlock to check the flag
02970          * here, but since this test is essential for correctness, we'd better
02971          * play it safe.
02972          */
02973         LockBufHdr(buf);
02974         sv_flags = buf->flags;
02975         UnlockBufHdr(buf);
02976         if (!(sv_flags & BM_IO_IN_PROGRESS))
02977             break;
02978         LWLockAcquire(buf->io_in_progress_lock, LW_SHARED);
02979         LWLockRelease(buf->io_in_progress_lock);
02980     }
02981 }
02982 
02983 /*
02984  * StartBufferIO: begin I/O on this buffer
02985  *  (Assumptions)
02986  *  My process is executing no IO
02987  *  The buffer is Pinned
02988  *
02989  * In some scenarios there are race conditions in which multiple backends
02990  * could attempt the same I/O operation concurrently.  If someone else
02991  * has already started I/O on this buffer then we will block on the
02992  * io_in_progress lock until he's done.
02993  *
02994  * Input operations are only attempted on buffers that are not BM_VALID,
02995  * and output operations only on buffers that are BM_VALID and BM_DIRTY,
02996  * so we can always tell if the work is already done.
02997  *
02998  * Returns TRUE if we successfully marked the buffer as I/O busy,
02999  * FALSE if someone else already did the work.
03000  */
03001 static bool
03002 StartBufferIO(volatile BufferDesc *buf, bool forInput)
03003 {
03004     Assert(!InProgressBuf);
03005 
03006     for (;;)
03007     {
03008         /*
03009          * Grab the io_in_progress lock so that other processes can wait for
03010          * me to finish the I/O.
03011          */
03012         LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
03013 
03014         LockBufHdr(buf);
03015 
03016         if (!(buf->flags & BM_IO_IN_PROGRESS))
03017             break;
03018 
03019         /*
03020          * The only way BM_IO_IN_PROGRESS could be set when the io_in_progress
03021          * lock isn't held is if the process doing the I/O is recovering from
03022          * an error (see AbortBufferIO).  If that's the case, we must wait for
03023          * him to get unwedged.
03024          */
03025         UnlockBufHdr(buf);
03026         LWLockRelease(buf->io_in_progress_lock);
03027         WaitIO(buf);
03028     }
03029 
03030     /* Once we get here, there is definitely no I/O active on this buffer */
03031 
03032     if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
03033     {
03034         /* someone else already did the I/O */
03035         UnlockBufHdr(buf);
03036         LWLockRelease(buf->io_in_progress_lock);
03037         return false;
03038     }
03039 
03040     buf->flags |= BM_IO_IN_PROGRESS;
03041 
03042     UnlockBufHdr(buf);
03043 
03044     InProgressBuf = buf;
03045     IsForInput = forInput;
03046 
03047     return true;
03048 }
03049 
03050 /*
03051  * TerminateBufferIO: release a buffer we were doing I/O on
03052  *  (Assumptions)
03053  *  My process is executing IO for the buffer
03054  *  BM_IO_IN_PROGRESS bit is set for the buffer
03055  *  We hold the buffer's io_in_progress lock
03056  *  The buffer is Pinned
03057  *
03058  * If clear_dirty is TRUE and BM_JUST_DIRTIED is not set, we clear the
03059  * buffer's BM_DIRTY flag.  This is appropriate when terminating a
03060  * successful write.  The check on BM_JUST_DIRTIED is necessary to avoid
03061  * marking the buffer clean if it was re-dirtied while we were writing.
03062  *
03063  * set_flag_bits gets ORed into the buffer's flags.  It must include
03064  * BM_IO_ERROR in a failure case.  For successful completion it could
03065  * be 0, or BM_VALID if we just finished reading in the page.
03066  */
03067 static void
03068 TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
03069                   int set_flag_bits)
03070 {
03071     Assert(buf == InProgressBuf);
03072 
03073     LockBufHdr(buf);
03074 
03075     Assert(buf->flags & BM_IO_IN_PROGRESS);
03076     buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
03077     if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED))
03078         buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
03079     buf->flags |= set_flag_bits;
03080 
03081     UnlockBufHdr(buf);
03082 
03083     InProgressBuf = NULL;
03084 
03085     LWLockRelease(buf->io_in_progress_lock);
03086 }
03087 
03088 /*
03089  * AbortBufferIO: Clean up any active buffer I/O after an error.
03090  *
03091  *  All LWLocks we might have held have been released,
03092  *  but we haven't yet released buffer pins, so the buffer is still pinned.
03093  *
03094  *  If I/O was in progress, we always set BM_IO_ERROR, even though it's
03095  *  possible the error condition wasn't related to the I/O.
03096  */
03097 void
03098 AbortBufferIO(void)
03099 {
03100     volatile BufferDesc *buf = InProgressBuf;
03101 
03102     if (buf)
03103     {
03104         /*
03105          * Since LWLockReleaseAll has already been called, we're not holding
03106          * the buffer's io_in_progress_lock. We have to re-acquire it so that
03107          * we can use TerminateBufferIO. Anyone who's executing WaitIO on the
03108          * buffer will be in a busy spin until we succeed in doing this.
03109          */
03110         LWLockAcquire(buf->io_in_progress_lock, LW_EXCLUSIVE);
03111 
03112         LockBufHdr(buf);
03113         Assert(buf->flags & BM_IO_IN_PROGRESS);
03114         if (IsForInput)
03115         {
03116             Assert(!(buf->flags & BM_DIRTY));
03117             /* We'd better not think buffer is valid yet */
03118             Assert(!(buf->flags & BM_VALID));
03119             UnlockBufHdr(buf);
03120         }
03121         else
03122         {
03123             BufFlags    sv_flags;
03124 
03125             sv_flags = buf->flags;
03126             Assert(sv_flags & BM_DIRTY);
03127             UnlockBufHdr(buf);
03128             /* Issue notice if this is not the first failure... */
03129             if (sv_flags & BM_IO_ERROR)
03130             {
03131                 /* Buffer is pinned, so we can read tag without spinlock */
03132                 char       *path;
03133 
03134                 path = relpathperm(buf->tag.rnode, buf->tag.forkNum);
03135                 ereport(WARNING,
03136                         (errcode(ERRCODE_IO_ERROR),
03137                          errmsg("could not write block %u of %s",
03138                                 buf->tag.blockNum, path),
03139                          errdetail("Multiple failures --- write error might be permanent.")));
03140                 pfree(path);
03141             }
03142         }
03143         TerminateBufferIO(buf, false, BM_IO_ERROR);
03144     }
03145 }
03146 
03147 /*
03148  * Error context callback for errors occurring during shared buffer writes.
03149  */
03150 static void
03151 shared_buffer_write_error_callback(void *arg)
03152 {
03153     volatile BufferDesc *bufHdr = (volatile BufferDesc *) arg;
03154 
03155     /* Buffer is pinned, so we can read the tag without locking the spinlock */
03156     if (bufHdr != NULL)
03157     {
03158         char       *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum);
03159 
03160         errcontext("writing block %u of relation %s",
03161                    bufHdr->tag.blockNum, path);
03162         pfree(path);
03163     }
03164 }
03165 
03166 /*
03167  * Error context callback for errors occurring during local buffer writes.
03168  */
03169 static void
03170 local_buffer_write_error_callback(void *arg)
03171 {
03172     volatile BufferDesc *bufHdr = (volatile BufferDesc *) arg;
03173 
03174     if (bufHdr != NULL)
03175     {
03176         char       *path = relpathbackend(bufHdr->tag.rnode, MyBackendId,
03177                                           bufHdr->tag.forkNum);
03178 
03179         errcontext("writing block %u of relation %s",
03180                    bufHdr->tag.blockNum, path);
03181         pfree(path);
03182     }
03183 }
03184 
03185 /*
03186  * RelFileNode qsort/bsearch comparator; see RelFileNodeEquals.
03187  */
03188 static int
03189 rnode_comparator(const void *p1, const void *p2)
03190 {
03191     RelFileNode n1 = *(RelFileNode *) p1;
03192     RelFileNode n2 = *(RelFileNode *) p2;
03193 
03194     if (n1.relNode < n2.relNode)
03195         return -1;
03196     else if (n1.relNode > n2.relNode)
03197         return 1;
03198 
03199     if (n1.dbNode < n2.dbNode)
03200         return -1;
03201     else if (n1.dbNode > n2.dbNode)
03202         return 1;
03203 
03204     if (n1.spcNode < n2.spcNode)
03205         return -1;
03206     else if (n1.spcNode > n2.spcNode)
03207         return 1;
03208     else
03209         return 0;
03210 }