Header And Logo

PostgreSQL
| The world's most advanced open source database.

slru.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * slru.c
00004  *      Simple LRU buffering for transaction status logfiles
00005  *
00006  * We use a simple least-recently-used scheme to manage a pool of page
00007  * buffers.  Under ordinary circumstances we expect that write
00008  * traffic will occur mostly to the latest page (and to the just-prior
00009  * page, soon after a page transition).  Read traffic will probably touch
00010  * a larger span of pages, but in any case a fairly small number of page
00011  * buffers should be sufficient.  So, we just search the buffers using plain
00012  * linear search; there's no need for a hashtable or anything fancy.
00013  * The management algorithm is straight LRU except that we will never swap
00014  * out the latest page (since we know it's going to be hit again eventually).
00015  *
00016  * We use a control LWLock to protect the shared data structures, plus
00017  * per-buffer LWLocks that synchronize I/O for each buffer.  The control lock
00018  * must be held to examine or modify any shared state.  A process that is
00019  * reading in or writing out a page buffer does not hold the control lock,
00020  * only the per-buffer lock for the buffer it is working on.
00021  *
00022  * "Holding the control lock" means exclusive lock in all cases except for
00023  * SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for
00024  * the implications of that.
00025  *
00026  * When initiating I/O on a buffer, we acquire the per-buffer lock exclusively
00027  * before releasing the control lock.  The per-buffer lock is released after
00028  * completing the I/O, re-acquiring the control lock, and updating the shared
00029  * state.  (Deadlock is not possible here, because we never try to initiate
00030  * I/O when someone else is already doing I/O on the same buffer.)
00031  * To wait for I/O to complete, release the control lock, acquire the
00032  * per-buffer lock in shared mode, immediately release the per-buffer lock,
00033  * reacquire the control lock, and then recheck state (since arbitrary things
00034  * could have happened while we didn't have the lock).
00035  *
00036  * As with the regular buffer manager, it is possible for another process
00037  * to re-dirty a page that is currently being written out.  This is handled
00038  * by re-setting the page's page_dirty flag.
00039  *
00040  *
00041  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00042  * Portions Copyright (c) 1994, Regents of the University of California
00043  *
00044  * src/backend/access/transam/slru.c
00045  *
00046  *-------------------------------------------------------------------------
00047  */
00048 #include "postgres.h"
00049 
00050 #include <fcntl.h>
00051 #include <sys/stat.h>
00052 #include <unistd.h>
00053 
00054 #include "access/slru.h"
00055 #include "access/transam.h"
00056 #include "access/xlog.h"
00057 #include "storage/fd.h"
00058 #include "storage/shmem.h"
00059 #include "miscadmin.h"
00060 
00061 
00062 #define SlruFileName(ctl, path, seg) \
00063     snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir, seg)
00064 
00065 /*
00066  * During SimpleLruFlush(), we will usually not need to write/fsync more
00067  * than one or two physical files, but we may need to write several pages
00068  * per file.  We can consolidate the I/O requests by leaving files open
00069  * until control returns to SimpleLruFlush().  This data structure remembers
00070  * which files are open.
00071  */
00072 #define MAX_FLUSH_BUFFERS   16
00073 
00074 typedef struct SlruFlushData
00075 {
00076     int         num_files;      /* # files actually open */
00077     int         fd[MAX_FLUSH_BUFFERS];  /* their FD's */
00078     int         segno[MAX_FLUSH_BUFFERS];       /* their log seg#s */
00079 } SlruFlushData;
00080 
00081 typedef struct SlruFlushData *SlruFlush;
00082 
00083 /*
00084  * Macro to mark a buffer slot "most recently used".  Note multiple evaluation
00085  * of arguments!
00086  *
00087  * The reason for the if-test is that there are often many consecutive
00088  * accesses to the same page (particularly the latest page).  By suppressing
00089  * useless increments of cur_lru_count, we reduce the probability that old
00090  * pages' counts will "wrap around" and make them appear recently used.
00091  *
00092  * We allow this code to be executed concurrently by multiple processes within
00093  * SimpleLruReadPage_ReadOnly().  As long as int reads and writes are atomic,
00094  * this should not cause any completely-bogus values to enter the computation.
00095  * However, it is possible for either cur_lru_count or individual
00096  * page_lru_count entries to be "reset" to lower values than they should have,
00097  * in case a process is delayed while it executes this macro.  With care in
00098  * SlruSelectLRUPage(), this does little harm, and in any case the absolute
00099  * worst possible consequence is a nonoptimal choice of page to evict.  The
00100  * gain from allowing concurrent reads of SLRU pages seems worth it.
00101  */
00102 #define SlruRecentlyUsed(shared, slotno)    \
00103     do { \
00104         int     new_lru_count = (shared)->cur_lru_count; \
00105         if (new_lru_count != (shared)->page_lru_count[slotno]) { \
00106             (shared)->cur_lru_count = ++new_lru_count; \
00107             (shared)->page_lru_count[slotno] = new_lru_count; \
00108         } \
00109     } while (0)
00110 
00111 /* Saved info for SlruReportIOError */
00112 typedef enum
00113 {
00114     SLRU_OPEN_FAILED,
00115     SLRU_SEEK_FAILED,
00116     SLRU_READ_FAILED,
00117     SLRU_WRITE_FAILED,
00118     SLRU_FSYNC_FAILED,
00119     SLRU_CLOSE_FAILED
00120 } SlruErrorCause;
00121 
00122 static SlruErrorCause slru_errcause;
00123 static int  slru_errno;
00124 
00125 
00126 static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno);
00127 static void SimpleLruWaitIO(SlruCtl ctl, int slotno);
00128 static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata);
00129 static bool SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno);
00130 static bool SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno,
00131                       SlruFlush fdata);
00132 static void SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid);
00133 static int  SlruSelectLRUPage(SlruCtl ctl, int pageno);
00134 
00135 static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
00136                           int segpage, void *data);
00137 
00138 /*
00139  * Initialization of shared memory
00140  */
00141 
00142 Size
00143 SimpleLruShmemSize(int nslots, int nlsns)
00144 {
00145     Size        sz;
00146 
00147     /* we assume nslots isn't so large as to risk overflow */
00148     sz = MAXALIGN(sizeof(SlruSharedData));
00149     sz += MAXALIGN(nslots * sizeof(char *));    /* page_buffer[] */
00150     sz += MAXALIGN(nslots * sizeof(SlruPageStatus));    /* page_status[] */
00151     sz += MAXALIGN(nslots * sizeof(bool));      /* page_dirty[] */
00152     sz += MAXALIGN(nslots * sizeof(int));       /* page_number[] */
00153     sz += MAXALIGN(nslots * sizeof(int));       /* page_lru_count[] */
00154     sz += MAXALIGN(nslots * sizeof(LWLockId));  /* buffer_locks[] */
00155 
00156     if (nlsns > 0)
00157         sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));    /* group_lsn[] */
00158 
00159     return BUFFERALIGN(sz) + BLCKSZ * nslots;
00160 }
00161 
00162 void
00163 SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
00164               LWLockId ctllock, const char *subdir)
00165 {
00166     SlruShared  shared;
00167     bool        found;
00168 
00169     shared = (SlruShared) ShmemInitStruct(name,
00170                                           SimpleLruShmemSize(nslots, nlsns),
00171                                           &found);
00172 
00173     if (!IsUnderPostmaster)
00174     {
00175         /* Initialize locks and shared memory area */
00176         char       *ptr;
00177         Size        offset;
00178         int         slotno;
00179 
00180         Assert(!found);
00181 
00182         memset(shared, 0, sizeof(SlruSharedData));
00183 
00184         shared->ControlLock = ctllock;
00185 
00186         shared->num_slots = nslots;
00187         shared->lsn_groups_per_page = nlsns;
00188 
00189         shared->cur_lru_count = 0;
00190 
00191         /* shared->latest_page_number will be set later */
00192 
00193         ptr = (char *) shared;
00194         offset = MAXALIGN(sizeof(SlruSharedData));
00195         shared->page_buffer = (char **) (ptr + offset);
00196         offset += MAXALIGN(nslots * sizeof(char *));
00197         shared->page_status = (SlruPageStatus *) (ptr + offset);
00198         offset += MAXALIGN(nslots * sizeof(SlruPageStatus));
00199         shared->page_dirty = (bool *) (ptr + offset);
00200         offset += MAXALIGN(nslots * sizeof(bool));
00201         shared->page_number = (int *) (ptr + offset);
00202         offset += MAXALIGN(nslots * sizeof(int));
00203         shared->page_lru_count = (int *) (ptr + offset);
00204         offset += MAXALIGN(nslots * sizeof(int));
00205         shared->buffer_locks = (LWLockId *) (ptr + offset);
00206         offset += MAXALIGN(nslots * sizeof(LWLockId));
00207 
00208         if (nlsns > 0)
00209         {
00210             shared->group_lsn = (XLogRecPtr *) (ptr + offset);
00211             offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));
00212         }
00213 
00214         ptr += BUFFERALIGN(offset);
00215         for (slotno = 0; slotno < nslots; slotno++)
00216         {
00217             shared->page_buffer[slotno] = ptr;
00218             shared->page_status[slotno] = SLRU_PAGE_EMPTY;
00219             shared->page_dirty[slotno] = false;
00220             shared->page_lru_count[slotno] = 0;
00221             shared->buffer_locks[slotno] = LWLockAssign();
00222             ptr += BLCKSZ;
00223         }
00224     }
00225     else
00226         Assert(found);
00227 
00228     /*
00229      * Initialize the unshared control struct, including directory path. We
00230      * assume caller set PagePrecedes.
00231      */
00232     ctl->shared = shared;
00233     ctl->do_fsync = true;       /* default behavior */
00234     StrNCpy(ctl->Dir, subdir, sizeof(ctl->Dir));
00235 }
00236 
00237 /*
00238  * Initialize (or reinitialize) a page to zeroes.
00239  *
00240  * The page is not actually written, just set up in shared memory.
00241  * The slot number of the new page is returned.
00242  *
00243  * Control lock must be held at entry, and will be held at exit.
00244  */
00245 int
00246 SimpleLruZeroPage(SlruCtl ctl, int pageno)
00247 {
00248     SlruShared  shared = ctl->shared;
00249     int         slotno;
00250 
00251     /* Find a suitable buffer slot for the page */
00252     slotno = SlruSelectLRUPage(ctl, pageno);
00253     Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
00254            (shared->page_status[slotno] == SLRU_PAGE_VALID &&
00255             !shared->page_dirty[slotno]) ||
00256            shared->page_number[slotno] == pageno);
00257 
00258     /* Mark the slot as containing this page */
00259     shared->page_number[slotno] = pageno;
00260     shared->page_status[slotno] = SLRU_PAGE_VALID;
00261     shared->page_dirty[slotno] = true;
00262     SlruRecentlyUsed(shared, slotno);
00263 
00264     /* Set the buffer to zeroes */
00265     MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
00266 
00267     /* Set the LSNs for this new page to zero */
00268     SimpleLruZeroLSNs(ctl, slotno);
00269 
00270     /* Assume this page is now the latest active page */
00271     shared->latest_page_number = pageno;
00272 
00273     return slotno;
00274 }
00275 
00276 /*
00277  * Zero all the LSNs we store for this slru page.
00278  *
00279  * This should be called each time we create a new page, and each time we read
00280  * in a page from disk into an existing buffer.  (Such an old page cannot
00281  * have any interesting LSNs, since we'd have flushed them before writing
00282  * the page in the first place.)
00283  *
00284  * This assumes that InvalidXLogRecPtr is bitwise-all-0.
00285  */
00286 static void
00287 SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
00288 {
00289     SlruShared  shared = ctl->shared;
00290 
00291     if (shared->lsn_groups_per_page > 0)
00292         MemSet(&shared->group_lsn[slotno * shared->lsn_groups_per_page], 0,
00293                shared->lsn_groups_per_page * sizeof(XLogRecPtr));
00294 }
00295 
00296 /*
00297  * Wait for any active I/O on a page slot to finish.  (This does not
00298  * guarantee that new I/O hasn't been started before we return, though.
00299  * In fact the slot might not even contain the same page anymore.)
00300  *
00301  * Control lock must be held at entry, and will be held at exit.
00302  */
00303 static void
00304 SimpleLruWaitIO(SlruCtl ctl, int slotno)
00305 {
00306     SlruShared  shared = ctl->shared;
00307 
00308     /* See notes at top of file */
00309     LWLockRelease(shared->ControlLock);
00310     LWLockAcquire(shared->buffer_locks[slotno], LW_SHARED);
00311     LWLockRelease(shared->buffer_locks[slotno]);
00312     LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
00313 
00314     /*
00315      * If the slot is still in an io-in-progress state, then either someone
00316      * already started a new I/O on the slot, or a previous I/O failed and
00317      * neglected to reset the page state.  That shouldn't happen, really, but
00318      * it seems worth a few extra cycles to check and recover from it. We can
00319      * cheaply test for failure by seeing if the buffer lock is still held (we
00320      * assume that transaction abort would release the lock).
00321      */
00322     if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
00323         shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS)
00324     {
00325         if (LWLockConditionalAcquire(shared->buffer_locks[slotno], LW_SHARED))
00326         {
00327             /* indeed, the I/O must have failed */
00328             if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS)
00329                 shared->page_status[slotno] = SLRU_PAGE_EMPTY;
00330             else    /* write_in_progress */
00331             {
00332                 shared->page_status[slotno] = SLRU_PAGE_VALID;
00333                 shared->page_dirty[slotno] = true;
00334             }
00335             LWLockRelease(shared->buffer_locks[slotno]);
00336         }
00337     }
00338 }
00339 
00340 /*
00341  * Find a page in a shared buffer, reading it in if necessary.
00342  * The page number must correspond to an already-initialized page.
00343  *
00344  * If write_ok is true then it is OK to return a page that is in
00345  * WRITE_IN_PROGRESS state; it is the caller's responsibility to be sure
00346  * that modification of the page is safe.  If write_ok is false then we
00347  * will not return the page until it is not undergoing active I/O.
00348  *
00349  * The passed-in xid is used only for error reporting, and may be
00350  * InvalidTransactionId if no specific xid is associated with the action.
00351  *
00352  * Return value is the shared-buffer slot number now holding the page.
00353  * The buffer's LRU access info is updated.
00354  *
00355  * Control lock must be held at entry, and will be held at exit.
00356  */
00357 int
00358 SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
00359                   TransactionId xid)
00360 {
00361     SlruShared  shared = ctl->shared;
00362 
00363     /* Outer loop handles restart if we must wait for someone else's I/O */
00364     for (;;)
00365     {
00366         int         slotno;
00367         bool        ok;
00368 
00369         /* See if page already is in memory; if not, pick victim slot */
00370         slotno = SlruSelectLRUPage(ctl, pageno);
00371 
00372         /* Did we find the page in memory? */
00373         if (shared->page_number[slotno] == pageno &&
00374             shared->page_status[slotno] != SLRU_PAGE_EMPTY)
00375         {
00376             /*
00377              * If page is still being read in, we must wait for I/O.  Likewise
00378              * if the page is being written and the caller said that's not OK.
00379              */
00380             if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
00381                 (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
00382                  !write_ok))
00383             {
00384                 SimpleLruWaitIO(ctl, slotno);
00385                 /* Now we must recheck state from the top */
00386                 continue;
00387             }
00388             /* Otherwise, it's ready to use */
00389             SlruRecentlyUsed(shared, slotno);
00390             return slotno;
00391         }
00392 
00393         /* We found no match; assert we selected a freeable slot */
00394         Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
00395                (shared->page_status[slotno] == SLRU_PAGE_VALID &&
00396                 !shared->page_dirty[slotno]));
00397 
00398         /* Mark the slot read-busy */
00399         shared->page_number[slotno] = pageno;
00400         shared->page_status[slotno] = SLRU_PAGE_READ_IN_PROGRESS;
00401         shared->page_dirty[slotno] = false;
00402 
00403         /* Acquire per-buffer lock (cannot deadlock, see notes at top) */
00404         LWLockAcquire(shared->buffer_locks[slotno], LW_EXCLUSIVE);
00405 
00406         /* Release control lock while doing I/O */
00407         LWLockRelease(shared->ControlLock);
00408 
00409         /* Do the read */
00410         ok = SlruPhysicalReadPage(ctl, pageno, slotno);
00411 
00412         /* Set the LSNs for this newly read-in page to zero */
00413         SimpleLruZeroLSNs(ctl, slotno);
00414 
00415         /* Re-acquire control lock and update page state */
00416         LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
00417 
00418         Assert(shared->page_number[slotno] == pageno &&
00419                shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS &&
00420                !shared->page_dirty[slotno]);
00421 
00422         shared->page_status[slotno] = ok ? SLRU_PAGE_VALID : SLRU_PAGE_EMPTY;
00423 
00424         LWLockRelease(shared->buffer_locks[slotno]);
00425 
00426         /* Now it's okay to ereport if we failed */
00427         if (!ok)
00428             SlruReportIOError(ctl, pageno, xid);
00429 
00430         SlruRecentlyUsed(shared, slotno);
00431         return slotno;
00432     }
00433 }
00434 
00435 /*
00436  * Find a page in a shared buffer, reading it in if necessary.
00437  * The page number must correspond to an already-initialized page.
00438  * The caller must intend only read-only access to the page.
00439  *
00440  * The passed-in xid is used only for error reporting, and may be
00441  * InvalidTransactionId if no specific xid is associated with the action.
00442  *
00443  * Return value is the shared-buffer slot number now holding the page.
00444  * The buffer's LRU access info is updated.
00445  *
00446  * Control lock must NOT be held at entry, but will be held at exit.
00447  * It is unspecified whether the lock will be shared or exclusive.
00448  */
00449 int
00450 SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
00451 {
00452     SlruShared  shared = ctl->shared;
00453     int         slotno;
00454 
00455     /* Try to find the page while holding only shared lock */
00456     LWLockAcquire(shared->ControlLock, LW_SHARED);
00457 
00458     /* See if page is already in a buffer */
00459     for (slotno = 0; slotno < shared->num_slots; slotno++)
00460     {
00461         if (shared->page_number[slotno] == pageno &&
00462             shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
00463             shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS)
00464         {
00465             /* See comments for SlruRecentlyUsed macro */
00466             SlruRecentlyUsed(shared, slotno);
00467             return slotno;
00468         }
00469     }
00470 
00471     /* No luck, so switch to normal exclusive lock and do regular read */
00472     LWLockRelease(shared->ControlLock);
00473     LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
00474 
00475     return SimpleLruReadPage(ctl, pageno, true, xid);
00476 }
00477 
00478 /*
00479  * Write a page from a shared buffer, if necessary.
00480  * Does nothing if the specified slot is not dirty.
00481  *
00482  * NOTE: only one write attempt is made here.  Hence, it is possible that
00483  * the page is still dirty at exit (if someone else re-dirtied it during
00484  * the write).  However, we *do* attempt a fresh write even if the page
00485  * is already being written; this is for checkpoints.
00486  *
00487  * Control lock must be held at entry, and will be held at exit.
00488  */
00489 static void
00490 SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
00491 {
00492     SlruShared  shared = ctl->shared;
00493     int         pageno = shared->page_number[slotno];
00494     bool        ok;
00495 
00496     /* If a write is in progress, wait for it to finish */
00497     while (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
00498            shared->page_number[slotno] == pageno)
00499     {
00500         SimpleLruWaitIO(ctl, slotno);
00501     }
00502 
00503     /*
00504      * Do nothing if page is not dirty, or if buffer no longer contains the
00505      * same page we were called for.
00506      */
00507     if (!shared->page_dirty[slotno] ||
00508         shared->page_status[slotno] != SLRU_PAGE_VALID ||
00509         shared->page_number[slotno] != pageno)
00510         return;
00511 
00512     /*
00513      * Mark the slot write-busy, and clear the dirtybit.  After this point, a
00514      * transaction status update on this page will mark it dirty again.
00515      */
00516     shared->page_status[slotno] = SLRU_PAGE_WRITE_IN_PROGRESS;
00517     shared->page_dirty[slotno] = false;
00518 
00519     /* Acquire per-buffer lock (cannot deadlock, see notes at top) */
00520     LWLockAcquire(shared->buffer_locks[slotno], LW_EXCLUSIVE);
00521 
00522     /* Release control lock while doing I/O */
00523     LWLockRelease(shared->ControlLock);
00524 
00525     /* Do the write */
00526     ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata);
00527 
00528     /* If we failed, and we're in a flush, better close the files */
00529     if (!ok && fdata)
00530     {
00531         int         i;
00532 
00533         for (i = 0; i < fdata->num_files; i++)
00534             CloseTransientFile(fdata->fd[i]);
00535     }
00536 
00537     /* Re-acquire control lock and update page state */
00538     LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
00539 
00540     Assert(shared->page_number[slotno] == pageno &&
00541            shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS);
00542 
00543     /* If we failed to write, mark the page dirty again */
00544     if (!ok)
00545         shared->page_dirty[slotno] = true;
00546 
00547     shared->page_status[slotno] = SLRU_PAGE_VALID;
00548 
00549     LWLockRelease(shared->buffer_locks[slotno]);
00550 
00551     /* Now it's okay to ereport if we failed */
00552     if (!ok)
00553         SlruReportIOError(ctl, pageno, InvalidTransactionId);
00554 }
00555 
00556 /*
00557  * Wrapper of SlruInternalWritePage, for external callers.
00558  * fdata is always passed a NULL here.
00559  */
00560 void
00561 SimpleLruWritePage(SlruCtl ctl, int slotno)
00562 {
00563     SlruInternalWritePage(ctl, slotno, NULL);
00564 }
00565 
00566 
00567 /*
00568  * Physical read of a (previously existing) page into a buffer slot
00569  *
00570  * On failure, we cannot just ereport(ERROR) since caller has put state in
00571  * shared memory that must be undone.  So, we return FALSE and save enough
00572  * info in static variables to let SlruReportIOError make the report.
00573  *
00574  * For now, assume it's not worth keeping a file pointer open across
00575  * read/write operations.  We could cache one virtual file pointer ...
00576  */
00577 static bool
00578 SlruPhysicalReadPage(SlruCtl ctl, int pageno, int slotno)
00579 {
00580     SlruShared  shared = ctl->shared;
00581     int         segno = pageno / SLRU_PAGES_PER_SEGMENT;
00582     int         rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
00583     int         offset = rpageno * BLCKSZ;
00584     char        path[MAXPGPATH];
00585     int         fd;
00586 
00587     SlruFileName(ctl, path, segno);
00588 
00589     /*
00590      * In a crash-and-restart situation, it's possible for us to receive
00591      * commands to set the commit status of transactions whose bits are in
00592      * already-truncated segments of the commit log (see notes in
00593      * SlruPhysicalWritePage).  Hence, if we are InRecovery, allow the case
00594      * where the file doesn't exist, and return zeroes instead.
00595      */
00596     fd = OpenTransientFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
00597     if (fd < 0)
00598     {
00599         if (errno != ENOENT || !InRecovery)
00600         {
00601             slru_errcause = SLRU_OPEN_FAILED;
00602             slru_errno = errno;
00603             return false;
00604         }
00605 
00606         ereport(LOG,
00607                 (errmsg("file \"%s\" doesn't exist, reading as zeroes",
00608                         path)));
00609         MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
00610         return true;
00611     }
00612 
00613     if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
00614     {
00615         slru_errcause = SLRU_SEEK_FAILED;
00616         slru_errno = errno;
00617         CloseTransientFile(fd);
00618         return false;
00619     }
00620 
00621     errno = 0;
00622     if (read(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
00623     {
00624         slru_errcause = SLRU_READ_FAILED;
00625         slru_errno = errno;
00626         CloseTransientFile(fd);
00627         return false;
00628     }
00629 
00630     if (CloseTransientFile(fd))
00631     {
00632         slru_errcause = SLRU_CLOSE_FAILED;
00633         slru_errno = errno;
00634         return false;
00635     }
00636 
00637     return true;
00638 }
00639 
00640 /*
00641  * Physical write of a page from a buffer slot
00642  *
00643  * On failure, we cannot just ereport(ERROR) since caller has put state in
00644  * shared memory that must be undone.  So, we return FALSE and save enough
00645  * info in static variables to let SlruReportIOError make the report.
00646  *
00647  * For now, assume it's not worth keeping a file pointer open across
00648  * independent read/write operations.  We do batch operations during
00649  * SimpleLruFlush, though.
00650  *
00651  * fdata is NULL for a standalone write, pointer to open-file info during
00652  * SimpleLruFlush.
00653  */
00654 static bool
00655 SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata)
00656 {
00657     SlruShared  shared = ctl->shared;
00658     int         segno = pageno / SLRU_PAGES_PER_SEGMENT;
00659     int         rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
00660     int         offset = rpageno * BLCKSZ;
00661     char        path[MAXPGPATH];
00662     int         fd = -1;
00663 
00664     /*
00665      * Honor the write-WAL-before-data rule, if appropriate, so that we do not
00666      * write out data before associated WAL records.  This is the same action
00667      * performed during FlushBuffer() in the main buffer manager.
00668      */
00669     if (shared->group_lsn != NULL)
00670     {
00671         /*
00672          * We must determine the largest async-commit LSN for the page. This
00673          * is a bit tedious, but since this entire function is a slow path
00674          * anyway, it seems better to do this here than to maintain a per-page
00675          * LSN variable (which'd need an extra comparison in the
00676          * transaction-commit path).
00677          */
00678         XLogRecPtr  max_lsn;
00679         int         lsnindex,
00680                     lsnoff;
00681 
00682         lsnindex = slotno * shared->lsn_groups_per_page;
00683         max_lsn = shared->group_lsn[lsnindex++];
00684         for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++)
00685         {
00686             XLogRecPtr  this_lsn = shared->group_lsn[lsnindex++];
00687 
00688             if (max_lsn < this_lsn)
00689                 max_lsn = this_lsn;
00690         }
00691 
00692         if (!XLogRecPtrIsInvalid(max_lsn))
00693         {
00694             /*
00695              * As noted above, elog(ERROR) is not acceptable here, so if
00696              * XLogFlush were to fail, we must PANIC.  This isn't much of a
00697              * restriction because XLogFlush is just about all critical
00698              * section anyway, but let's make sure.
00699              */
00700             START_CRIT_SECTION();
00701             XLogFlush(max_lsn);
00702             END_CRIT_SECTION();
00703         }
00704     }
00705 
00706     /*
00707      * During a Flush, we may already have the desired file open.
00708      */
00709     if (fdata)
00710     {
00711         int         i;
00712 
00713         for (i = 0; i < fdata->num_files; i++)
00714         {
00715             if (fdata->segno[i] == segno)
00716             {
00717                 fd = fdata->fd[i];
00718                 break;
00719             }
00720         }
00721     }
00722 
00723     if (fd < 0)
00724     {
00725         /*
00726          * If the file doesn't already exist, we should create it.  It is
00727          * possible for this to need to happen when writing a page that's not
00728          * first in its segment; we assume the OS can cope with that. (Note:
00729          * it might seem that it'd be okay to create files only when
00730          * SimpleLruZeroPage is called for the first page of a segment.
00731          * However, if after a crash and restart the REDO logic elects to
00732          * replay the log from a checkpoint before the latest one, then it's
00733          * possible that we will get commands to set transaction status of
00734          * transactions that have already been truncated from the commit log.
00735          * Easiest way to deal with that is to accept references to
00736          * nonexistent files here and in SlruPhysicalReadPage.)
00737          *
00738          * Note: it is possible for more than one backend to be executing this
00739          * code simultaneously for different pages of the same file. Hence,
00740          * don't use O_EXCL or O_TRUNC or anything like that.
00741          */
00742         SlruFileName(ctl, path, segno);
00743         fd = OpenTransientFile(path, O_RDWR | O_CREAT | PG_BINARY,
00744                                S_IRUSR | S_IWUSR);
00745         if (fd < 0)
00746         {
00747             slru_errcause = SLRU_OPEN_FAILED;
00748             slru_errno = errno;
00749             return false;
00750         }
00751 
00752         if (fdata)
00753         {
00754             if (fdata->num_files < MAX_FLUSH_BUFFERS)
00755             {
00756                 fdata->fd[fdata->num_files] = fd;
00757                 fdata->segno[fdata->num_files] = segno;
00758                 fdata->num_files++;
00759             }
00760             else
00761             {
00762                 /*
00763                  * In the unlikely event that we exceed MAX_FLUSH_BUFFERS,
00764                  * fall back to treating it as a standalone write.
00765                  */
00766                 fdata = NULL;
00767             }
00768         }
00769     }
00770 
00771     if (lseek(fd, (off_t) offset, SEEK_SET) < 0)
00772     {
00773         slru_errcause = SLRU_SEEK_FAILED;
00774         slru_errno = errno;
00775         if (!fdata)
00776             CloseTransientFile(fd);
00777         return false;
00778     }
00779 
00780     errno = 0;
00781     if (write(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ)
00782     {
00783         /* if write didn't set errno, assume problem is no disk space */
00784         if (errno == 0)
00785             errno = ENOSPC;
00786         slru_errcause = SLRU_WRITE_FAILED;
00787         slru_errno = errno;
00788         if (!fdata)
00789             CloseTransientFile(fd);
00790         return false;
00791     }
00792 
00793     /*
00794      * If not part of Flush, need to fsync now.  We assume this happens
00795      * infrequently enough that it's not a performance issue.
00796      */
00797     if (!fdata)
00798     {
00799         if (ctl->do_fsync && pg_fsync(fd))
00800         {
00801             slru_errcause = SLRU_FSYNC_FAILED;
00802             slru_errno = errno;
00803             CloseTransientFile(fd);
00804             return false;
00805         }
00806 
00807         if (CloseTransientFile(fd))
00808         {
00809             slru_errcause = SLRU_CLOSE_FAILED;
00810             slru_errno = errno;
00811             return false;
00812         }
00813     }
00814 
00815     return true;
00816 }
00817 
00818 /*
00819  * Issue the error message after failure of SlruPhysicalReadPage or
00820  * SlruPhysicalWritePage.  Call this after cleaning up shared-memory state.
00821  */
00822 static void
00823 SlruReportIOError(SlruCtl ctl, int pageno, TransactionId xid)
00824 {
00825     int         segno = pageno / SLRU_PAGES_PER_SEGMENT;
00826     int         rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
00827     int         offset = rpageno * BLCKSZ;
00828     char        path[MAXPGPATH];
00829 
00830     SlruFileName(ctl, path, segno);
00831     errno = slru_errno;
00832     switch (slru_errcause)
00833     {
00834         case SLRU_OPEN_FAILED:
00835             ereport(ERROR,
00836                     (errcode_for_file_access(),
00837                      errmsg("could not access status of transaction %u", xid),
00838                      errdetail("Could not open file \"%s\": %m.", path)));
00839             break;
00840         case SLRU_SEEK_FAILED:
00841             ereport(ERROR,
00842                     (errcode_for_file_access(),
00843                      errmsg("could not access status of transaction %u", xid),
00844                  errdetail("Could not seek in file \"%s\" to offset %u: %m.",
00845                            path, offset)));
00846             break;
00847         case SLRU_READ_FAILED:
00848             ereport(ERROR,
00849                     (errcode_for_file_access(),
00850                      errmsg("could not access status of transaction %u", xid),
00851                errdetail("Could not read from file \"%s\" at offset %u: %m.",
00852                          path, offset)));
00853             break;
00854         case SLRU_WRITE_FAILED:
00855             ereport(ERROR,
00856                     (errcode_for_file_access(),
00857                      errmsg("could not access status of transaction %u", xid),
00858                 errdetail("Could not write to file \"%s\" at offset %u: %m.",
00859                           path, offset)));
00860             break;
00861         case SLRU_FSYNC_FAILED:
00862             ereport(ERROR,
00863                     (errcode_for_file_access(),
00864                      errmsg("could not access status of transaction %u", xid),
00865                      errdetail("Could not fsync file \"%s\": %m.",
00866                                path)));
00867             break;
00868         case SLRU_CLOSE_FAILED:
00869             ereport(ERROR,
00870                     (errcode_for_file_access(),
00871                      errmsg("could not access status of transaction %u", xid),
00872                      errdetail("Could not close file \"%s\": %m.",
00873                                path)));
00874             break;
00875         default:
00876             /* can't get here, we trust */
00877             elog(ERROR, "unrecognized SimpleLru error cause: %d",
00878                  (int) slru_errcause);
00879             break;
00880     }
00881 }
00882 
00883 /*
00884  * Select the slot to re-use when we need a free slot.
00885  *
00886  * The target page number is passed because we need to consider the
00887  * possibility that some other process reads in the target page while
00888  * we are doing I/O to free a slot.  Hence, check or recheck to see if
00889  * any slot already holds the target page, and return that slot if so.
00890  * Thus, the returned slot is *either* a slot already holding the pageno
00891  * (could be any state except EMPTY), *or* a freeable slot (state EMPTY
00892  * or CLEAN).
00893  *
00894  * Control lock must be held at entry, and will be held at exit.
00895  */
00896 static int
00897 SlruSelectLRUPage(SlruCtl ctl, int pageno)
00898 {
00899     SlruShared  shared = ctl->shared;
00900 
00901     /* Outer loop handles restart after I/O */
00902     for (;;)
00903     {
00904         int         slotno;
00905         int         cur_count;
00906         int         bestvalidslot = 0;  /* keep compiler quiet */
00907         int         best_valid_delta = -1;
00908         int         best_valid_page_number = 0; /* keep compiler quiet */
00909         int         bestinvalidslot = 0;        /* keep compiler quiet */
00910         int         best_invalid_delta = -1;
00911         int         best_invalid_page_number = 0;       /* keep compiler quiet */
00912 
00913         /* See if page already has a buffer assigned */
00914         for (slotno = 0; slotno < shared->num_slots; slotno++)
00915         {
00916             if (shared->page_number[slotno] == pageno &&
00917                 shared->page_status[slotno] != SLRU_PAGE_EMPTY)
00918                 return slotno;
00919         }
00920 
00921         /*
00922          * If we find any EMPTY slot, just select that one. Else choose a
00923          * victim page to replace.  We normally take the least recently used
00924          * valid page, but we will never take the slot containing
00925          * latest_page_number, even if it appears least recently used.  We
00926          * will select a slot that is already I/O busy only if there is no
00927          * other choice: a read-busy slot will not be least recently used once
00928          * the read finishes, and waiting for an I/O on a write-busy slot is
00929          * inferior to just picking some other slot.  Testing shows the slot
00930          * we pick instead will often be clean, allowing us to begin a read at
00931          * once.
00932          *
00933          * Normally the page_lru_count values will all be different and so
00934          * there will be a well-defined LRU page.  But since we allow
00935          * concurrent execution of SlruRecentlyUsed() within
00936          * SimpleLruReadPage_ReadOnly(), it is possible that multiple pages
00937          * acquire the same lru_count values.  In that case we break ties by
00938          * choosing the furthest-back page.
00939          *
00940          * Notice that this next line forcibly advances cur_lru_count to a
00941          * value that is certainly beyond any value that will be in the
00942          * page_lru_count array after the loop finishes.  This ensures that
00943          * the next execution of SlruRecentlyUsed will mark the page newly
00944          * used, even if it's for a page that has the current counter value.
00945          * That gets us back on the path to having good data when there are
00946          * multiple pages with the same lru_count.
00947          */
00948         cur_count = (shared->cur_lru_count)++;
00949         for (slotno = 0; slotno < shared->num_slots; slotno++)
00950         {
00951             int         this_delta;
00952             int         this_page_number;
00953 
00954             if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
00955                 return slotno;
00956             this_delta = cur_count - shared->page_lru_count[slotno];
00957             if (this_delta < 0)
00958             {
00959                 /*
00960                  * Clean up in case shared updates have caused cur_count
00961                  * increments to get "lost".  We back off the page counts,
00962                  * rather than trying to increase cur_count, to avoid any
00963                  * question of infinite loops or failure in the presence of
00964                  * wrapped-around counts.
00965                  */
00966                 shared->page_lru_count[slotno] = cur_count;
00967                 this_delta = 0;
00968             }
00969             this_page_number = shared->page_number[slotno];
00970             if (this_page_number == shared->latest_page_number)
00971                 continue;
00972             if (shared->page_status[slotno] == SLRU_PAGE_VALID)
00973             {
00974                 if (this_delta > best_valid_delta ||
00975                     (this_delta == best_valid_delta &&
00976                      ctl->PagePrecedes(this_page_number,
00977                                        best_valid_page_number)))
00978                 {
00979                     bestvalidslot = slotno;
00980                     best_valid_delta = this_delta;
00981                     best_valid_page_number = this_page_number;
00982                 }
00983             }
00984             else
00985             {
00986                 if (this_delta > best_invalid_delta ||
00987                     (this_delta == best_invalid_delta &&
00988                      ctl->PagePrecedes(this_page_number,
00989                                        best_invalid_page_number)))
00990                 {
00991                     bestinvalidslot = slotno;
00992                     best_invalid_delta = this_delta;
00993                     best_invalid_page_number = this_page_number;
00994                 }
00995             }
00996         }
00997 
00998         /*
00999          * If all pages (except possibly the latest one) are I/O busy, we'll
01000          * have to wait for an I/O to complete and then retry.  In that
01001          * unhappy case, we choose to wait for the I/O on the least recently
01002          * used slot, on the assumption that it was likely initiated first of
01003          * all the I/Os in progress and may therefore finish first.
01004          */
01005         if (best_valid_delta < 0)
01006         {
01007             SimpleLruWaitIO(ctl, bestinvalidslot);
01008             continue;
01009         }
01010 
01011         /*
01012          * If the selected page is clean, we're set.
01013          */
01014         if (!shared->page_dirty[bestvalidslot])
01015             return bestvalidslot;
01016 
01017         /*
01018          * Write the page.
01019          */
01020         SlruInternalWritePage(ctl, bestvalidslot, NULL);
01021 
01022         /*
01023          * Now loop back and try again.  This is the easiest way of dealing
01024          * with corner cases such as the victim page being re-dirtied while we
01025          * wrote it.
01026          */
01027     }
01028 }
01029 
01030 /*
01031  * Flush dirty pages to disk during checkpoint or database shutdown
01032  */
01033 void
01034 SimpleLruFlush(SlruCtl ctl, bool checkpoint)
01035 {
01036     SlruShared  shared = ctl->shared;
01037     SlruFlushData fdata;
01038     int         slotno;
01039     int         pageno = 0;
01040     int         i;
01041     bool        ok;
01042 
01043     /*
01044      * Find and write dirty pages
01045      */
01046     fdata.num_files = 0;
01047 
01048     LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
01049 
01050     for (slotno = 0; slotno < shared->num_slots; slotno++)
01051     {
01052         SlruInternalWritePage(ctl, slotno, &fdata);
01053 
01054         /*
01055          * When called during a checkpoint, we cannot assert that the slot is
01056          * clean now, since another process might have re-dirtied it already.
01057          * That's okay.
01058          */
01059         Assert(checkpoint ||
01060                shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
01061                (shared->page_status[slotno] == SLRU_PAGE_VALID &&
01062                 !shared->page_dirty[slotno]));
01063     }
01064 
01065     LWLockRelease(shared->ControlLock);
01066 
01067     /*
01068      * Now fsync and close any files that were open
01069      */
01070     ok = true;
01071     for (i = 0; i < fdata.num_files; i++)
01072     {
01073         if (ctl->do_fsync && pg_fsync(fdata.fd[i]))
01074         {
01075             slru_errcause = SLRU_FSYNC_FAILED;
01076             slru_errno = errno;
01077             pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
01078             ok = false;
01079         }
01080 
01081         if (CloseTransientFile(fdata.fd[i]))
01082         {
01083             slru_errcause = SLRU_CLOSE_FAILED;
01084             slru_errno = errno;
01085             pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
01086             ok = false;
01087         }
01088     }
01089     if (!ok)
01090         SlruReportIOError(ctl, pageno, InvalidTransactionId);
01091 }
01092 
01093 /*
01094  * Remove all segments before the one holding the passed page number
01095  */
01096 void
01097 SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
01098 {
01099     SlruShared  shared = ctl->shared;
01100     int         slotno;
01101 
01102     /*
01103      * The cutoff point is the start of the segment containing cutoffPage.
01104      */
01105     cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
01106 
01107     /*
01108      * Scan shared memory and remove any pages preceding the cutoff page, to
01109      * ensure we won't rewrite them later.  (Since this is normally called in
01110      * or just after a checkpoint, any dirty pages should have been flushed
01111      * already ... we're just being extra careful here.)
01112      */
01113     LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
01114 
01115 restart:;
01116 
01117     /*
01118      * While we are holding the lock, make an important safety check: the
01119      * planned cutoff point must be <= the current endpoint page. Otherwise we
01120      * have already wrapped around, and proceeding with the truncation would
01121      * risk removing the current segment.
01122      */
01123     if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
01124     {
01125         LWLockRelease(shared->ControlLock);
01126         ereport(LOG,
01127           (errmsg("could not truncate directory \"%s\": apparent wraparound",
01128                   ctl->Dir)));
01129         return;
01130     }
01131 
01132     for (slotno = 0; slotno < shared->num_slots; slotno++)
01133     {
01134         if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
01135             continue;
01136         if (!ctl->PagePrecedes(shared->page_number[slotno], cutoffPage))
01137             continue;
01138 
01139         /*
01140          * If page is clean, just change state to EMPTY (expected case).
01141          */
01142         if (shared->page_status[slotno] == SLRU_PAGE_VALID &&
01143             !shared->page_dirty[slotno])
01144         {
01145             shared->page_status[slotno] = SLRU_PAGE_EMPTY;
01146             continue;
01147         }
01148 
01149         /*
01150          * Hmm, we have (or may have) I/O operations acting on the page, so
01151          * we've got to wait for them to finish and then start again. This is
01152          * the same logic as in SlruSelectLRUPage.  (XXX if page is dirty,
01153          * wouldn't it be OK to just discard it without writing it?  For now,
01154          * keep the logic the same as it was.)
01155          */
01156         if (shared->page_status[slotno] == SLRU_PAGE_VALID)
01157             SlruInternalWritePage(ctl, slotno, NULL);
01158         else
01159             SimpleLruWaitIO(ctl, slotno);
01160         goto restart;
01161     }
01162 
01163     LWLockRelease(shared->ControlLock);
01164 
01165     /* Now we can remove the old segment(s) */
01166     (void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage);
01167 }
01168 
01169 /*
01170  * SlruScanDirectory callback
01171  *      This callback reports true if there's any segment prior to the one
01172  *      containing the page passed as "data".
01173  */
01174 bool
01175 SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data)
01176 {
01177     int         cutoffPage = *(int *) data;
01178 
01179     cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
01180 
01181     if (ctl->PagePrecedes(segpage, cutoffPage))
01182         return true;            /* found one; don't iterate any more */
01183 
01184     return false;               /* keep going */
01185 }
01186 
01187 /*
01188  * SlruScanDirectory callback.
01189  *      This callback deletes segments prior to the one passed in as "data".
01190  */
01191 static bool
01192 SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data)
01193 {
01194     char        path[MAXPGPATH];
01195     int         cutoffPage = *(int *) data;
01196 
01197     if (ctl->PagePrecedes(segpage, cutoffPage))
01198     {
01199         snprintf(path, MAXPGPATH, "%s/%s", ctl->Dir, filename);
01200         ereport(DEBUG2,
01201                 (errmsg("removing file \"%s\"", path)));
01202         unlink(path);
01203     }
01204 
01205     return false;               /* keep going */
01206 }
01207 
01208 /*
01209  * SlruScanDirectory callback.
01210  *      This callback deletes all segments.
01211  */
01212 bool
01213 SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data)
01214 {
01215     char        path[MAXPGPATH];
01216 
01217     snprintf(path, MAXPGPATH, "%s/%s", ctl->Dir, filename);
01218     ereport(DEBUG2,
01219             (errmsg("removing file \"%s\"", path)));
01220     unlink(path);
01221 
01222     return false;               /* keep going */
01223 }
01224 
01225 /*
01226  * Scan the SimpleLRU directory and apply a callback to each file found in it.
01227  *
01228  * If the callback returns true, the scan is stopped.  The last return value
01229  * from the callback is returned.
01230  *
01231  * Note that the ordering in which the directory is scanned is not guaranteed.
01232  *
01233  * Note that no locking is applied.
01234  */
01235 bool
01236 SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
01237 {
01238     bool        retval = false;
01239     DIR        *cldir;
01240     struct dirent *clde;
01241     int         segno;
01242     int         segpage;
01243 
01244     cldir = AllocateDir(ctl->Dir);
01245     while ((clde = ReadDir(cldir, ctl->Dir)) != NULL)
01246     {
01247         if (strlen(clde->d_name) == 4 &&
01248             strspn(clde->d_name, "0123456789ABCDEF") == 4)
01249         {
01250             segno = (int) strtol(clde->d_name, NULL, 16);
01251             segpage = segno * SLRU_PAGES_PER_SEGMENT;
01252 
01253             elog(DEBUG2, "SlruScanDirectory invoking callback on %s/%s",
01254                  ctl->Dir, clde->d_name);
01255             retval = callback(ctl, clde->d_name, segpage, data);
01256             if (retval)
01257                 break;
01258         }
01259     }
01260     FreeDir(cldir);
01261 
01262     return retval;
01263 }