Header And Logo

PostgreSQL
| The world's most advanced open source database.

buffile.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * buffile.c
00004  *    Management of large buffered files, primarily temporary files.
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  * IDENTIFICATION
00010  *    src/backend/storage/file/buffile.c
00011  *
00012  * NOTES:
00013  *
00014  * BufFiles provide a very incomplete emulation of stdio atop virtual Files
00015  * (as managed by fd.c).  Currently, we only support the buffered-I/O
00016  * aspect of stdio: a read or write of the low-level File occurs only
00017  * when the buffer is filled or emptied.  This is an even bigger win
00018  * for virtual Files than for ordinary kernel files, since reducing the
00019  * frequency with which a virtual File is touched reduces "thrashing"
00020  * of opening/closing file descriptors.
00021  *
00022  * Note that BufFile structs are allocated with palloc(), and therefore
00023  * will go away automatically at transaction end.  If the underlying
00024  * virtual File is made with OpenTemporaryFile, then all resources for
00025  * the file are certain to be cleaned up even if processing is aborted
00026  * by ereport(ERROR).   To avoid confusion, the caller should take care that
00027  * all calls for a single BufFile are made in the same palloc context.
00028  *
00029  * BufFile also supports temporary files that exceed the OS file size limit
00030  * (by opening multiple fd.c temporary files).  This is an essential feature
00031  * for sorts and hashjoins on large amounts of data.
00032  *-------------------------------------------------------------------------
00033  */
00034 
00035 #include "postgres.h"
00036 
00037 #include "executor/instrument.h"
00038 #include "storage/fd.h"
00039 #include "storage/buffile.h"
00040 #include "storage/buf_internals.h"
00041 
00042 /*
00043  * We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE.
00044  * The reason is that we'd like large temporary BufFiles to be spread across
00045  * multiple tablespaces when available.
00046  */
00047 #define MAX_PHYSICAL_FILESIZE   0x40000000
00048 #define BUFFILE_SEG_SIZE        (MAX_PHYSICAL_FILESIZE / BLCKSZ)
00049 
00050 /*
00051  * This data structure represents a buffered file that consists of one or
00052  * more physical files (each accessed through a virtual file descriptor
00053  * managed by fd.c).
00054  */
00055 struct BufFile
00056 {
00057     int         numFiles;       /* number of physical files in set */
00058     /* all files except the last have length exactly MAX_PHYSICAL_FILESIZE */
00059     File       *files;          /* palloc'd array with numFiles entries */
00060     off_t      *offsets;        /* palloc'd array with numFiles entries */
00061 
00062     /*
00063      * offsets[i] is the current seek position of files[i].  We use this to
00064      * avoid making redundant FileSeek calls.
00065      */
00066 
00067     bool        isTemp;         /* can only add files if this is TRUE */
00068     bool        isInterXact;    /* keep open over transactions? */
00069     bool        dirty;          /* does buffer need to be written? */
00070 
00071     /*
00072      * "current pos" is position of start of buffer within the logical file.
00073      * Position as seen by user of BufFile is (curFile, curOffset + pos).
00074      */
00075     int         curFile;        /* file index (0..n) part of current pos */
00076     off_t       curOffset;      /* offset part of current pos */
00077     int         pos;            /* next read/write position in buffer */
00078     int         nbytes;         /* total # of valid bytes in buffer */
00079     char        buffer[BLCKSZ];
00080 };
00081 
00082 static BufFile *makeBufFile(File firstfile);
00083 static void extendBufFile(BufFile *file);
00084 static void BufFileLoadBuffer(BufFile *file);
00085 static void BufFileDumpBuffer(BufFile *file);
00086 static int  BufFileFlush(BufFile *file);
00087 
00088 
00089 /*
00090  * Create a BufFile given the first underlying physical file.
00091  * NOTE: caller must set isTemp and isInterXact if appropriate.
00092  */
00093 static BufFile *
00094 makeBufFile(File firstfile)
00095 {
00096     BufFile    *file = (BufFile *) palloc(sizeof(BufFile));
00097 
00098     file->numFiles = 1;
00099     file->files = (File *) palloc(sizeof(File));
00100     file->files[0] = firstfile;
00101     file->offsets = (off_t *) palloc(sizeof(off_t));
00102     file->offsets[0] = 0L;
00103     file->isTemp = false;
00104     file->isInterXact = false;
00105     file->dirty = false;
00106     file->curFile = 0;
00107     file->curOffset = 0L;
00108     file->pos = 0;
00109     file->nbytes = 0;
00110 
00111     return file;
00112 }
00113 
00114 /*
00115  * Add another component temp file.
00116  */
00117 static void
00118 extendBufFile(BufFile *file)
00119 {
00120     File        pfile;
00121 
00122     Assert(file->isTemp);
00123     pfile = OpenTemporaryFile(file->isInterXact);
00124     Assert(pfile >= 0);
00125 
00126     file->files = (File *) repalloc(file->files,
00127                                     (file->numFiles + 1) * sizeof(File));
00128     file->offsets = (off_t *) repalloc(file->offsets,
00129                                        (file->numFiles + 1) * sizeof(off_t));
00130     file->files[file->numFiles] = pfile;
00131     file->offsets[file->numFiles] = 0L;
00132     file->numFiles++;
00133 }
00134 
00135 /*
00136  * Create a BufFile for a new temporary file (which will expand to become
00137  * multiple temporary files if more than MAX_PHYSICAL_FILESIZE bytes are
00138  * written to it).
00139  *
00140  * If interXact is true, the temp file will not be automatically deleted
00141  * at end of transaction.
00142  *
00143  * Note: if interXact is true, the caller had better be calling us in a
00144  * memory context that will survive across transaction boundaries.
00145  */
00146 BufFile *
00147 BufFileCreateTemp(bool interXact)
00148 {
00149     BufFile    *file;
00150     File        pfile;
00151 
00152     pfile = OpenTemporaryFile(interXact);
00153     Assert(pfile >= 0);
00154 
00155     file = makeBufFile(pfile);
00156     file->isTemp = true;
00157     file->isInterXact = interXact;
00158 
00159     return file;
00160 }
00161 
00162 #ifdef NOT_USED
00163 /*
00164  * Create a BufFile and attach it to an already-opened virtual File.
00165  *
00166  * This is comparable to fdopen() in stdio.  This is the only way at present
00167  * to attach a BufFile to a non-temporary file.  Note that BufFiles created
00168  * in this way CANNOT be expanded into multiple files.
00169  */
00170 BufFile *
00171 BufFileCreate(File file)
00172 {
00173     return makeBufFile(file);
00174 }
00175 #endif
00176 
00177 /*
00178  * Close a BufFile
00179  *
00180  * Like fclose(), this also implicitly FileCloses the underlying File.
00181  */
00182 void
00183 BufFileClose(BufFile *file)
00184 {
00185     int         i;
00186 
00187     /* flush any unwritten data */
00188     BufFileFlush(file);
00189     /* close the underlying file(s) (with delete if it's a temp file) */
00190     for (i = 0; i < file->numFiles; i++)
00191         FileClose(file->files[i]);
00192     /* release the buffer space */
00193     pfree(file->files);
00194     pfree(file->offsets);
00195     pfree(file);
00196 }
00197 
00198 /*
00199  * BufFileLoadBuffer
00200  *
00201  * Load some data into buffer, if possible, starting from curOffset.
00202  * At call, must have dirty = false, pos and nbytes = 0.
00203  * On exit, nbytes is number of bytes loaded.
00204  */
00205 static void
00206 BufFileLoadBuffer(BufFile *file)
00207 {
00208     File        thisfile;
00209 
00210     /*
00211      * Advance to next component file if necessary and possible.
00212      *
00213      * This path can only be taken if there is more than one component, so it
00214      * won't interfere with reading a non-temp file that is over
00215      * MAX_PHYSICAL_FILESIZE.
00216      */
00217     if (file->curOffset >= MAX_PHYSICAL_FILESIZE &&
00218         file->curFile + 1 < file->numFiles)
00219     {
00220         file->curFile++;
00221         file->curOffset = 0L;
00222     }
00223 
00224     /*
00225      * May need to reposition physical file.
00226      */
00227     thisfile = file->files[file->curFile];
00228     if (file->curOffset != file->offsets[file->curFile])
00229     {
00230         if (FileSeek(thisfile, file->curOffset, SEEK_SET) != file->curOffset)
00231             return;             /* seek failed, read nothing */
00232         file->offsets[file->curFile] = file->curOffset;
00233     }
00234 
00235     /*
00236      * Read whatever we can get, up to a full bufferload.
00237      */
00238     file->nbytes = FileRead(thisfile, file->buffer, sizeof(file->buffer));
00239     if (file->nbytes < 0)
00240         file->nbytes = 0;
00241     file->offsets[file->curFile] += file->nbytes;
00242     /* we choose not to advance curOffset here */
00243 
00244     pgBufferUsage.temp_blks_read++;
00245 }
00246 
00247 /*
00248  * BufFileDumpBuffer
00249  *
00250  * Dump buffer contents starting at curOffset.
00251  * At call, should have dirty = true, nbytes > 0.
00252  * On exit, dirty is cleared if successful write, and curOffset is advanced.
00253  */
00254 static void
00255 BufFileDumpBuffer(BufFile *file)
00256 {
00257     int         wpos = 0;
00258     int         bytestowrite;
00259     File        thisfile;
00260 
00261     /*
00262      * Unlike BufFileLoadBuffer, we must dump the whole buffer even if it
00263      * crosses a component-file boundary; so we need a loop.
00264      */
00265     while (wpos < file->nbytes)
00266     {
00267         /*
00268          * Advance to next component file if necessary and possible.
00269          */
00270         if (file->curOffset >= MAX_PHYSICAL_FILESIZE && file->isTemp)
00271         {
00272             while (file->curFile + 1 >= file->numFiles)
00273                 extendBufFile(file);
00274             file->curFile++;
00275             file->curOffset = 0L;
00276         }
00277 
00278         /*
00279          * Enforce per-file size limit only for temp files, else just try to
00280          * write as much as asked...
00281          */
00282         bytestowrite = file->nbytes - wpos;
00283         if (file->isTemp)
00284         {
00285             off_t       availbytes = MAX_PHYSICAL_FILESIZE - file->curOffset;
00286 
00287             if ((off_t) bytestowrite > availbytes)
00288                 bytestowrite = (int) availbytes;
00289         }
00290 
00291         /*
00292          * May need to reposition physical file.
00293          */
00294         thisfile = file->files[file->curFile];
00295         if (file->curOffset != file->offsets[file->curFile])
00296         {
00297             if (FileSeek(thisfile, file->curOffset, SEEK_SET) != file->curOffset)
00298                 return;         /* seek failed, give up */
00299             file->offsets[file->curFile] = file->curOffset;
00300         }
00301         bytestowrite = FileWrite(thisfile, file->buffer + wpos, bytestowrite);
00302         if (bytestowrite <= 0)
00303             return;             /* failed to write */
00304         file->offsets[file->curFile] += bytestowrite;
00305         file->curOffset += bytestowrite;
00306         wpos += bytestowrite;
00307 
00308         pgBufferUsage.temp_blks_written++;
00309     }
00310     file->dirty = false;
00311 
00312     /*
00313      * At this point, curOffset has been advanced to the end of the buffer,
00314      * ie, its original value + nbytes.  We need to make it point to the
00315      * logical file position, ie, original value + pos, in case that is less
00316      * (as could happen due to a small backwards seek in a dirty buffer!)
00317      */
00318     file->curOffset -= (file->nbytes - file->pos);
00319     if (file->curOffset < 0)    /* handle possible segment crossing */
00320     {
00321         file->curFile--;
00322         Assert(file->curFile >= 0);
00323         file->curOffset += MAX_PHYSICAL_FILESIZE;
00324     }
00325 
00326     /*
00327      * Now we can set the buffer empty without changing the logical position
00328      */
00329     file->pos = 0;
00330     file->nbytes = 0;
00331 }
00332 
00333 /*
00334  * BufFileRead
00335  *
00336  * Like fread() except we assume 1-byte element size.
00337  */
00338 size_t
00339 BufFileRead(BufFile *file, void *ptr, size_t size)
00340 {
00341     size_t      nread = 0;
00342     size_t      nthistime;
00343 
00344     if (file->dirty)
00345     {
00346         if (BufFileFlush(file) != 0)
00347             return 0;           /* could not flush... */
00348         Assert(!file->dirty);
00349     }
00350 
00351     while (size > 0)
00352     {
00353         if (file->pos >= file->nbytes)
00354         {
00355             /* Try to load more data into buffer. */
00356             file->curOffset += file->pos;
00357             file->pos = 0;
00358             file->nbytes = 0;
00359             BufFileLoadBuffer(file);
00360             if (file->nbytes <= 0)
00361                 break;          /* no more data available */
00362         }
00363 
00364         nthistime = file->nbytes - file->pos;
00365         if (nthistime > size)
00366             nthistime = size;
00367         Assert(nthistime > 0);
00368 
00369         memcpy(ptr, file->buffer + file->pos, nthistime);
00370 
00371         file->pos += nthistime;
00372         ptr = (void *) ((char *) ptr + nthistime);
00373         size -= nthistime;
00374         nread += nthistime;
00375     }
00376 
00377     return nread;
00378 }
00379 
00380 /*
00381  * BufFileWrite
00382  *
00383  * Like fwrite() except we assume 1-byte element size.
00384  */
00385 size_t
00386 BufFileWrite(BufFile *file, void *ptr, size_t size)
00387 {
00388     size_t      nwritten = 0;
00389     size_t      nthistime;
00390 
00391     while (size > 0)
00392     {
00393         if (file->pos >= BLCKSZ)
00394         {
00395             /* Buffer full, dump it out */
00396             if (file->dirty)
00397             {
00398                 BufFileDumpBuffer(file);
00399                 if (file->dirty)
00400                     break;      /* I/O error */
00401             }
00402             else
00403             {
00404                 /* Hmm, went directly from reading to writing? */
00405                 file->curOffset += file->pos;
00406                 file->pos = 0;
00407                 file->nbytes = 0;
00408             }
00409         }
00410 
00411         nthistime = BLCKSZ - file->pos;
00412         if (nthistime > size)
00413             nthistime = size;
00414         Assert(nthistime > 0);
00415 
00416         memcpy(file->buffer + file->pos, ptr, nthistime);
00417 
00418         file->dirty = true;
00419         file->pos += nthistime;
00420         if (file->nbytes < file->pos)
00421             file->nbytes = file->pos;
00422         ptr = (void *) ((char *) ptr + nthistime);
00423         size -= nthistime;
00424         nwritten += nthistime;
00425     }
00426 
00427     return nwritten;
00428 }
00429 
00430 /*
00431  * BufFileFlush
00432  *
00433  * Like fflush()
00434  */
00435 static int
00436 BufFileFlush(BufFile *file)
00437 {
00438     if (file->dirty)
00439     {
00440         BufFileDumpBuffer(file);
00441         if (file->dirty)
00442             return EOF;
00443     }
00444 
00445     return 0;
00446 }
00447 
00448 /*
00449  * BufFileSeek
00450  *
00451  * Like fseek(), except that target position needs two values in order to
00452  * work when logical filesize exceeds maximum value representable by long.
00453  * We do not support relative seeks across more than LONG_MAX, however.
00454  *
00455  * Result is 0 if OK, EOF if not.  Logical position is not moved if an
00456  * impossible seek is attempted.
00457  */
00458 int
00459 BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
00460 {
00461     int         newFile;
00462     off_t       newOffset;
00463 
00464     switch (whence)
00465     {
00466         case SEEK_SET:
00467             if (fileno < 0)
00468                 return EOF;
00469             newFile = fileno;
00470             newOffset = offset;
00471             break;
00472         case SEEK_CUR:
00473 
00474             /*
00475              * Relative seek considers only the signed offset, ignoring
00476              * fileno. Note that large offsets (> 1 gig) risk overflow in this
00477              * add, unless we have 64-bit off_t.
00478              */
00479             newFile = file->curFile;
00480             newOffset = (file->curOffset + file->pos) + offset;
00481             break;
00482 #ifdef NOT_USED
00483         case SEEK_END:
00484             /* could be implemented, not needed currently */
00485             break;
00486 #endif
00487         default:
00488             elog(ERROR, "invalid whence: %d", whence);
00489             return EOF;
00490     }
00491     while (newOffset < 0)
00492     {
00493         if (--newFile < 0)
00494             return EOF;
00495         newOffset += MAX_PHYSICAL_FILESIZE;
00496     }
00497     if (newFile == file->curFile &&
00498         newOffset >= file->curOffset &&
00499         newOffset <= file->curOffset + file->nbytes)
00500     {
00501         /*
00502          * Seek is to a point within existing buffer; we can just adjust
00503          * pos-within-buffer, without flushing buffer.  Note this is OK
00504          * whether reading or writing, but buffer remains dirty if we were
00505          * writing.
00506          */
00507         file->pos = (int) (newOffset - file->curOffset);
00508         return 0;
00509     }
00510     /* Otherwise, must reposition buffer, so flush any dirty data */
00511     if (BufFileFlush(file) != 0)
00512         return EOF;
00513 
00514     /*
00515      * At this point and no sooner, check for seek past last segment. The
00516      * above flush could have created a new segment, so checking sooner would
00517      * not work (at least not with this code).
00518      */
00519     if (file->isTemp)
00520     {
00521         /* convert seek to "start of next seg" to "end of last seg" */
00522         if (newFile == file->numFiles && newOffset == 0)
00523         {
00524             newFile--;
00525             newOffset = MAX_PHYSICAL_FILESIZE;
00526         }
00527         while (newOffset > MAX_PHYSICAL_FILESIZE)
00528         {
00529             if (++newFile >= file->numFiles)
00530                 return EOF;
00531             newOffset -= MAX_PHYSICAL_FILESIZE;
00532         }
00533     }
00534     if (newFile >= file->numFiles)
00535         return EOF;
00536     /* Seek is OK! */
00537     file->curFile = newFile;
00538     file->curOffset = newOffset;
00539     file->pos = 0;
00540     file->nbytes = 0;
00541     return 0;
00542 }
00543 
00544 void
00545 BufFileTell(BufFile *file, int *fileno, off_t *offset)
00546 {
00547     *fileno = file->curFile;
00548     *offset = file->curOffset + file->pos;
00549 }
00550 
00551 /*
00552  * BufFileSeekBlock --- block-oriented seek
00553  *
00554  * Performs absolute seek to the start of the n'th BLCKSZ-sized block of
00555  * the file.  Note that users of this interface will fail if their files
00556  * exceed BLCKSZ * LONG_MAX bytes, but that is quite a lot; we don't work
00557  * with tables bigger than that, either...
00558  *
00559  * Result is 0 if OK, EOF if not.  Logical position is not moved if an
00560  * impossible seek is attempted.
00561  */
00562 int
00563 BufFileSeekBlock(BufFile *file, long blknum)
00564 {
00565     return BufFileSeek(file,
00566                        (int) (blknum / BUFFILE_SEG_SIZE),
00567                        (off_t) (blknum % BUFFILE_SEG_SIZE) * BLCKSZ,
00568                        SEEK_SET);
00569 }
00570 
00571 #ifdef NOT_USED
00572 /*
00573  * BufFileTellBlock --- block-oriented tell
00574  *
00575  * Any fractional part of a block in the current seek position is ignored.
00576  */
00577 long
00578 BufFileTellBlock(BufFile *file)
00579 {
00580     long        blknum;
00581 
00582     blknum = (file->curOffset + file->pos) / BLCKSZ;
00583     blknum += file->curFile * BUFFILE_SEG_SIZE;
00584     return blknum;
00585 }
00586 
00587 #endif