Header And Logo

PostgreSQL
| The world's most advanced open source database.

logtape.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * logtape.c
00004  *    Management of "logical tapes" within temporary files.
00005  *
00006  * This module exists to support sorting via multiple merge passes (see
00007  * tuplesort.c).  Merging is an ideal algorithm for tape devices, but if
00008  * we implement it on disk by creating a separate file for each "tape",
00009  * there is an annoying problem: the peak space usage is at least twice
00010  * the volume of actual data to be sorted.  (This must be so because each
00011  * datum will appear in both the input and output tapes of the final
00012  * merge pass.  For seven-tape polyphase merge, which is otherwise a
00013  * pretty good algorithm, peak usage is more like 4x actual data volume.)
00014  *
00015  * We can work around this problem by recognizing that any one tape
00016  * dataset (with the possible exception of the final output) is written
00017  * and read exactly once in a perfectly sequential manner.  Therefore,
00018  * a datum once read will not be required again, and we can recycle its
00019  * space for use by the new tape dataset(s) being generated.  In this way,
00020  * the total space usage is essentially just the actual data volume, plus
00021  * insignificant bookkeeping and start/stop overhead.
00022  *
00023  * Few OSes allow arbitrary parts of a file to be released back to the OS,
00024  * so we have to implement this space-recycling ourselves within a single
00025  * logical file.  logtape.c exists to perform this bookkeeping and provide
00026  * the illusion of N independent tape devices to tuplesort.c.  Note that
00027  * logtape.c itself depends on buffile.c to provide a "logical file" of
00028  * larger size than the underlying OS may support.
00029  *
00030  * For simplicity, we allocate and release space in the underlying file
00031  * in BLCKSZ-size blocks.  Space allocation boils down to keeping track
00032  * of which blocks in the underlying file belong to which logical tape,
00033  * plus any blocks that are free (recycled and not yet reused).
00034  * The blocks in each logical tape are remembered using a method borrowed
00035  * from the Unix HFS filesystem: we store data block numbers in an
00036  * "indirect block".  If an indirect block fills up, we write it out to
00037  * the underlying file and remember its location in a second-level indirect
00038  * block.  In the same way second-level blocks are remembered in third-
00039  * level blocks, and so on if necessary (of course we're talking huge
00040  * amounts of data here).  The topmost indirect block of a given logical
00041  * tape is never actually written out to the physical file, but all lower-
00042  * level indirect blocks will be.
00043  *
00044  * The initial write pass is guaranteed to fill the underlying file
00045  * perfectly sequentially, no matter how data is divided into logical tapes.
00046  * Once we begin merge passes, the access pattern becomes considerably
00047  * less predictable --- but the seeking involved should be comparable to
00048  * what would happen if we kept each logical tape in a separate file,
00049  * so there's no serious performance penalty paid to obtain the space
00050  * savings of recycling.  We try to localize the write accesses by always
00051  * writing to the lowest-numbered free block when we have a choice; it's
00052  * not clear this helps much, but it can't hurt.  (XXX perhaps a LIFO
00053  * policy for free blocks would be better?)
00054  *
00055  * To support the above policy of writing to the lowest free block,
00056  * ltsGetFreeBlock sorts the list of free block numbers into decreasing
00057  * order each time it is asked for a block and the list isn't currently
00058  * sorted.  This is an efficient way to handle it because we expect cycles
00059  * of releasing many blocks followed by re-using many blocks, due to
00060  * tuplesort.c's "preread" behavior.
00061  *
00062  * Since all the bookkeeping and buffer memory is allocated with palloc(),
00063  * and the underlying file(s) are made with OpenTemporaryFile, all resources
00064  * for a logical tape set are certain to be cleaned up even if processing
00065  * is aborted by ereport(ERROR).  To avoid confusion, the caller should take
00066  * care that all calls for a single LogicalTapeSet are made in the same
00067  * palloc context.
00068  *
00069  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00070  * Portions Copyright (c) 1994, Regents of the University of California
00071  *
00072  * IDENTIFICATION
00073  *    src/backend/utils/sort/logtape.c
00074  *
00075  *-------------------------------------------------------------------------
00076  */
00077 
00078 #include "postgres.h"
00079 
00080 #include "storage/buffile.h"
00081 #include "utils/logtape.h"
00082 
00083 /*
00084  * Block indexes are "long"s, so we can fit this many per indirect block.
00085  * NB: we assume this is an exact fit!
00086  */
00087 #define BLOCKS_PER_INDIR_BLOCK  ((int) (BLCKSZ / sizeof(long)))
00088 
00089 /*
00090  * We use a struct like this for each active indirection level of each
00091  * logical tape.  If the indirect block is not the highest level of its
00092  * tape, the "nextup" link points to the next higher level.  Only the
00093  * "ptrs" array is written out if we have to dump the indirect block to
00094  * disk.  If "ptrs" is not completely full, we store -1L in the first
00095  * unused slot at completion of the write phase for the logical tape.
00096  */
00097 typedef struct IndirectBlock
00098 {
00099     int         nextSlot;       /* next pointer slot to write or read */
00100     struct IndirectBlock *nextup;       /* parent indirect level, or NULL if
00101                                          * top */
00102     long        ptrs[BLOCKS_PER_INDIR_BLOCK];   /* indexes of contained blocks */
00103 } IndirectBlock;
00104 
00105 /*
00106  * This data structure represents a single "logical tape" within the set
00107  * of logical tapes stored in the same file.  We must keep track of the
00108  * current partially-read-or-written data block as well as the active
00109  * indirect block level(s).
00110  */
00111 typedef struct LogicalTape
00112 {
00113     IndirectBlock *indirect;    /* bottom of my indirect-block hierarchy */
00114     bool        writing;        /* T while in write phase */
00115     bool        frozen;         /* T if blocks should not be freed when read */
00116     bool        dirty;          /* does buffer need to be written? */
00117 
00118     /*
00119      * The total data volume in the logical tape is numFullBlocks * BLCKSZ +
00120      * lastBlockBytes.  BUT: we do not update lastBlockBytes during writing,
00121      * only at completion of a write phase.
00122      */
00123     long        numFullBlocks;  /* number of complete blocks in log tape */
00124     int         lastBlockBytes; /* valid bytes in last (incomplete) block */
00125 
00126     /*
00127      * Buffer for current data block.  Note we don't bother to store the
00128      * actual file block number of the data block (during the write phase it
00129      * hasn't been assigned yet, and during read we don't care anymore). But
00130      * we do need the relative block number so we can detect end-of-tape while
00131      * reading.
00132      */
00133     char       *buffer;         /* physical buffer (separately palloc'd) */
00134     long        curBlockNumber; /* this block's logical blk# within tape */
00135     int         pos;            /* next read/write position in buffer */
00136     int         nbytes;         /* total # of valid bytes in buffer */
00137 } LogicalTape;
00138 
00139 /*
00140  * This data structure represents a set of related "logical tapes" sharing
00141  * space in a single underlying file.  (But that "file" may be multiple files
00142  * if needed to escape OS limits on file size; buffile.c handles that for us.)
00143  * The number of tapes is fixed at creation.
00144  */
00145 struct LogicalTapeSet
00146 {
00147     BufFile    *pfile;          /* underlying file for whole tape set */
00148     long        nFileBlocks;    /* # of blocks used in underlying file */
00149 
00150     /*
00151      * We store the numbers of recycled-and-available blocks in freeBlocks[].
00152      * When there are no such blocks, we extend the underlying file.
00153      *
00154      * If forgetFreeSpace is true then any freed blocks are simply forgotten
00155      * rather than being remembered in freeBlocks[].  See notes for
00156      * LogicalTapeSetForgetFreeSpace().
00157      *
00158      * If blocksSorted is true then the block numbers in freeBlocks are in
00159      * *decreasing* order, so that removing the last entry gives us the lowest
00160      * free block.  We re-sort the blocks whenever a block is demanded; this
00161      * should be reasonably efficient given the expected usage pattern.
00162      */
00163     bool        forgetFreeSpace;    /* are we remembering free blocks? */
00164     bool        blocksSorted;   /* is freeBlocks[] currently in order? */
00165     long       *freeBlocks;     /* resizable array */
00166     int         nFreeBlocks;    /* # of currently free blocks */
00167     int         freeBlocksLen;  /* current allocated length of freeBlocks[] */
00168 
00169     /*
00170      * tapes[] is declared size 1 since C wants a fixed size, but actually it
00171      * is of length nTapes.
00172      */
00173     int         nTapes;         /* # of logical tapes in set */
00174     LogicalTape tapes[1];       /* must be last in struct! */
00175 };
00176 
00177 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
00178 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
00179 static long ltsGetFreeBlock(LogicalTapeSet *lts);
00180 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
00181 static void ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
00182                   long blocknum);
00183 static long ltsRewindIndirectBlock(LogicalTapeSet *lts,
00184                        IndirectBlock *indirect,
00185                        bool freezing);
00186 static long ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
00187                              IndirectBlock *indirect);
00188 static long ltsRecallNextBlockNum(LogicalTapeSet *lts,
00189                       IndirectBlock *indirect,
00190                       bool frozen);
00191 static long ltsRecallPrevBlockNum(LogicalTapeSet *lts,
00192                       IndirectBlock *indirect);
00193 static void ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt);
00194 
00195 
00196 /*
00197  * Write a block-sized buffer to the specified block of the underlying file.
00198  *
00199  * NB: should not attempt to write beyond current end of file (ie, create
00200  * "holes" in file), since BufFile doesn't allow that.  The first write pass
00201  * must write blocks sequentially.
00202  *
00203  * No need for an error return convention; we ereport() on any error.
00204  */
00205 static void
00206 ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
00207 {
00208     if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
00209         BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
00210         ereport(ERROR,
00211         /* XXX is it okay to assume errno is correct? */
00212                 (errcode_for_file_access(),
00213                  errmsg("could not write block %ld of temporary file: %m",
00214                         blocknum),
00215                  errhint("Perhaps out of disk space?")));
00216 }
00217 
00218 /*
00219  * Read a block-sized buffer from the specified block of the underlying file.
00220  *
00221  * No need for an error return convention; we ereport() on any error.   This
00222  * module should never attempt to read a block it doesn't know is there.
00223  */
00224 static void
00225 ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
00226 {
00227     if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
00228         BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
00229         ereport(ERROR,
00230         /* XXX is it okay to assume errno is correct? */
00231                 (errcode_for_file_access(),
00232                  errmsg("could not read block %ld of temporary file: %m",
00233                         blocknum)));
00234 }
00235 
00236 /*
00237  * qsort comparator for sorting freeBlocks[] into decreasing order.
00238  */
00239 static int
00240 freeBlocks_cmp(const void *a, const void *b)
00241 {
00242     long        ablk = *((const long *) a);
00243     long        bblk = *((const long *) b);
00244 
00245     /* can't just subtract because long might be wider than int */
00246     if (ablk < bblk)
00247         return 1;
00248     if (ablk > bblk)
00249         return -1;
00250     return 0;
00251 }
00252 
00253 /*
00254  * Select a currently unused block for writing to.
00255  *
00256  * NB: should only be called when writer is ready to write immediately,
00257  * to ensure that first write pass is sequential.
00258  */
00259 static long
00260 ltsGetFreeBlock(LogicalTapeSet *lts)
00261 {
00262     /*
00263      * If there are multiple free blocks, we select the one appearing last in
00264      * freeBlocks[] (after sorting the array if needed).  If there are none,
00265      * assign the next block at the end of the file.
00266      */
00267     if (lts->nFreeBlocks > 0)
00268     {
00269         if (!lts->blocksSorted)
00270         {
00271             qsort((void *) lts->freeBlocks, lts->nFreeBlocks,
00272                   sizeof(long), freeBlocks_cmp);
00273             lts->blocksSorted = true;
00274         }
00275         return lts->freeBlocks[--lts->nFreeBlocks];
00276     }
00277     else
00278         return lts->nFileBlocks++;
00279 }
00280 
00281 /*
00282  * Return a block# to the freelist.
00283  */
00284 static void
00285 ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
00286 {
00287     int         ndx;
00288 
00289     /*
00290      * Do nothing if we're no longer interested in remembering free space.
00291      */
00292     if (lts->forgetFreeSpace)
00293         return;
00294 
00295     /*
00296      * Enlarge freeBlocks array if full.
00297      */
00298     if (lts->nFreeBlocks >= lts->freeBlocksLen)
00299     {
00300         lts->freeBlocksLen *= 2;
00301         lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
00302                                           lts->freeBlocksLen * sizeof(long));
00303     }
00304 
00305     /*
00306      * Add blocknum to array, and mark the array unsorted if it's no longer in
00307      * decreasing order.
00308      */
00309     ndx = lts->nFreeBlocks++;
00310     lts->freeBlocks[ndx] = blocknum;
00311     if (ndx > 0 && lts->freeBlocks[ndx - 1] < blocknum)
00312         lts->blocksSorted = false;
00313 }
00314 
00315 /*
00316  * These routines manipulate indirect-block hierarchies.  All are recursive
00317  * so that they don't have any specific limit on the depth of hierarchy.
00318  */
00319 
00320 /*
00321  * Record a data block number in a logical tape's lowest indirect block,
00322  * or record an indirect block's number in the next higher indirect level.
00323  */
00324 static void
00325 ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
00326                   long blocknum)
00327 {
00328     if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK)
00329     {
00330         /*
00331          * This indirect block is full, so dump it out and recursively save
00332          * its address in the next indirection level.  Create a new
00333          * indirection level if there wasn't one before.
00334          */
00335         long        indirblock = ltsGetFreeBlock(lts);
00336 
00337         ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
00338         if (indirect->nextup == NULL)
00339         {
00340             indirect->nextup = (IndirectBlock *) palloc(sizeof(IndirectBlock));
00341             indirect->nextup->nextSlot = 0;
00342             indirect->nextup->nextup = NULL;
00343         }
00344         ltsRecordBlockNum(lts, indirect->nextup, indirblock);
00345 
00346         /*
00347          * Reset to fill another indirect block at this level.
00348          */
00349         indirect->nextSlot = 0;
00350     }
00351     indirect->ptrs[indirect->nextSlot++] = blocknum;
00352 }
00353 
00354 /*
00355  * Reset a logical tape's indirect-block hierarchy after a write pass
00356  * to prepare for reading.  We dump out partly-filled blocks except
00357  * at the top of the hierarchy, and we rewind each level to the start.
00358  * This call returns the first data block number, or -1L if the tape
00359  * is empty.
00360  *
00361  * Unless 'freezing' is true, release indirect blocks to the free pool after
00362  * reading them.
00363  */
00364 static long
00365 ltsRewindIndirectBlock(LogicalTapeSet *lts,
00366                        IndirectBlock *indirect,
00367                        bool freezing)
00368 {
00369     /* Handle case of never-written-to tape */
00370     if (indirect == NULL)
00371         return -1L;
00372 
00373     /* Insert sentinel if block is not full */
00374     if (indirect->nextSlot < BLOCKS_PER_INDIR_BLOCK)
00375         indirect->ptrs[indirect->nextSlot] = -1L;
00376 
00377     /*
00378      * If block is not topmost, write it out, and recurse to obtain address of
00379      * first block in this hierarchy level.  Read that one in.
00380      */
00381     if (indirect->nextup != NULL)
00382     {
00383         long        indirblock = ltsGetFreeBlock(lts);
00384 
00385         ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
00386         ltsRecordBlockNum(lts, indirect->nextup, indirblock);
00387         indirblock = ltsRewindIndirectBlock(lts, indirect->nextup, freezing);
00388         Assert(indirblock != -1L);
00389         ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
00390         if (!freezing)
00391             ltsReleaseBlock(lts, indirblock);
00392     }
00393 
00394     /*
00395      * Reset my next-block pointer, and then fetch a block number if any.
00396      */
00397     indirect->nextSlot = 0;
00398     if (indirect->ptrs[0] == -1L)
00399         return -1L;
00400     return indirect->ptrs[indirect->nextSlot++];
00401 }
00402 
00403 /*
00404  * Rewind a previously-frozen indirect-block hierarchy for another read pass.
00405  * This call returns the first data block number, or -1L if the tape
00406  * is empty.
00407  */
00408 static long
00409 ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
00410                              IndirectBlock *indirect)
00411 {
00412     /* Handle case of never-written-to tape */
00413     if (indirect == NULL)
00414         return -1L;
00415 
00416     /*
00417      * If block is not topmost, recurse to obtain address of first block in
00418      * this hierarchy level.  Read that one in.
00419      */
00420     if (indirect->nextup != NULL)
00421     {
00422         long        indirblock;
00423 
00424         indirblock = ltsRewindFrozenIndirectBlock(lts, indirect->nextup);
00425         Assert(indirblock != -1L);
00426         ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
00427     }
00428 
00429     /*
00430      * Reset my next-block pointer, and then fetch a block number if any.
00431      */
00432     indirect->nextSlot = 0;
00433     if (indirect->ptrs[0] == -1L)
00434         return -1L;
00435     return indirect->ptrs[indirect->nextSlot++];
00436 }
00437 
00438 /*
00439  * Obtain next data block number in the forward direction, or -1L if no more.
00440  *
00441  * Unless 'frozen' is true, release indirect blocks to the free pool after
00442  * reading them.
00443  */
00444 static long
00445 ltsRecallNextBlockNum(LogicalTapeSet *lts,
00446                       IndirectBlock *indirect,
00447                       bool frozen)
00448 {
00449     /* Handle case of never-written-to tape */
00450     if (indirect == NULL)
00451         return -1L;
00452 
00453     if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK ||
00454         indirect->ptrs[indirect->nextSlot] == -1L)
00455     {
00456         long        indirblock;
00457 
00458         if (indirect->nextup == NULL)
00459             return -1L;         /* nothing left at this level */
00460         indirblock = ltsRecallNextBlockNum(lts, indirect->nextup, frozen);
00461         if (indirblock == -1L)
00462             return -1L;         /* nothing left at this level */
00463         ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
00464         if (!frozen)
00465             ltsReleaseBlock(lts, indirblock);
00466         indirect->nextSlot = 0;
00467     }
00468     if (indirect->ptrs[indirect->nextSlot] == -1L)
00469         return -1L;
00470     return indirect->ptrs[indirect->nextSlot++];
00471 }
00472 
00473 /*
00474  * Obtain next data block number in the reverse direction, or -1L if no more.
00475  *
00476  * Note this fetches the block# before the one last returned, no matter which
00477  * direction of call returned that one.  If we fail, no change in state.
00478  *
00479  * This routine can only be used in 'frozen' state, so there's no need to
00480  * pass a parameter telling whether to release blocks ... we never do.
00481  */
00482 static long
00483 ltsRecallPrevBlockNum(LogicalTapeSet *lts,
00484                       IndirectBlock *indirect)
00485 {
00486     /* Handle case of never-written-to tape */
00487     if (indirect == NULL)
00488         return -1L;
00489 
00490     if (indirect->nextSlot <= 1)
00491     {
00492         long        indirblock;
00493 
00494         if (indirect->nextup == NULL)
00495             return -1L;         /* nothing left at this level */
00496         indirblock = ltsRecallPrevBlockNum(lts, indirect->nextup);
00497         if (indirblock == -1L)
00498             return -1L;         /* nothing left at this level */
00499         ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
00500 
00501         /*
00502          * The previous block would only have been written out if full, so we
00503          * need not search it for a -1 sentinel.
00504          */
00505         indirect->nextSlot = BLOCKS_PER_INDIR_BLOCK + 1;
00506     }
00507     indirect->nextSlot--;
00508     return indirect->ptrs[indirect->nextSlot - 1];
00509 }
00510 
00511 
00512 /*
00513  * Create a set of logical tapes in a temporary underlying file.
00514  *
00515  * Each tape is initialized in write state.
00516  */
00517 LogicalTapeSet *
00518 LogicalTapeSetCreate(int ntapes)
00519 {
00520     LogicalTapeSet *lts;
00521     LogicalTape *lt;
00522     int         i;
00523 
00524     /*
00525      * Create top-level struct including per-tape LogicalTape structs. First
00526      * LogicalTape struct is already counted in sizeof(LogicalTapeSet).
00527      */
00528     Assert(ntapes > 0);
00529     lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet) +
00530                                     (ntapes - 1) *sizeof(LogicalTape));
00531     lts->pfile = BufFileCreateTemp(false);
00532     lts->nFileBlocks = 0L;
00533     lts->forgetFreeSpace = false;
00534     lts->blocksSorted = true;   /* a zero-length array is sorted ... */
00535     lts->freeBlocksLen = 32;    /* reasonable initial guess */
00536     lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
00537     lts->nFreeBlocks = 0;
00538     lts->nTapes = ntapes;
00539 
00540     /*
00541      * Initialize per-tape structs.  Note we allocate the I/O buffer and
00542      * first-level indirect block for a tape only when it is first actually
00543      * written to.  This avoids wasting memory space when tuplesort.c
00544      * overestimates the number of tapes needed.
00545      */
00546     for (i = 0; i < ntapes; i++)
00547     {
00548         lt = &lts->tapes[i];
00549         lt->indirect = NULL;
00550         lt->writing = true;
00551         lt->frozen = false;
00552         lt->dirty = false;
00553         lt->numFullBlocks = 0L;
00554         lt->lastBlockBytes = 0;
00555         lt->buffer = NULL;
00556         lt->curBlockNumber = 0L;
00557         lt->pos = 0;
00558         lt->nbytes = 0;
00559     }
00560     return lts;
00561 }
00562 
00563 /*
00564  * Close a logical tape set and release all resources.
00565  */
00566 void
00567 LogicalTapeSetClose(LogicalTapeSet *lts)
00568 {
00569     LogicalTape *lt;
00570     IndirectBlock *ib,
00571                *nextib;
00572     int         i;
00573 
00574     BufFileClose(lts->pfile);
00575     for (i = 0; i < lts->nTapes; i++)
00576     {
00577         lt = &lts->tapes[i];
00578         for (ib = lt->indirect; ib != NULL; ib = nextib)
00579         {
00580             nextib = ib->nextup;
00581             pfree(ib);
00582         }
00583         if (lt->buffer)
00584             pfree(lt->buffer);
00585     }
00586     pfree(lts->freeBlocks);
00587     pfree(lts);
00588 }
00589 
00590 /*
00591  * Mark a logical tape set as not needing management of free space anymore.
00592  *
00593  * This should be called if the caller does not intend to write any more data
00594  * into the tape set, but is reading from un-frozen tapes.  Since no more
00595  * writes are planned, remembering free blocks is no longer useful.  Setting
00596  * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
00597  * is not designed to handle large numbers of free blocks.
00598  */
00599 void
00600 LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
00601 {
00602     lts->forgetFreeSpace = true;
00603 }
00604 
00605 /*
00606  * Dump the dirty buffer of a logical tape.
00607  */
00608 static void
00609 ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt)
00610 {
00611     long        datablock = ltsGetFreeBlock(lts);
00612 
00613     Assert(lt->dirty);
00614     ltsWriteBlock(lts, datablock, (void *) lt->buffer);
00615     ltsRecordBlockNum(lts, lt->indirect, datablock);
00616     lt->dirty = false;
00617     /* Caller must do other state update as needed */
00618 }
00619 
00620 /*
00621  * Write to a logical tape.
00622  *
00623  * There are no error returns; we ereport() on failure.
00624  */
00625 void
00626 LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
00627                  void *ptr, size_t size)
00628 {
00629     LogicalTape *lt;
00630     size_t      nthistime;
00631 
00632     Assert(tapenum >= 0 && tapenum < lts->nTapes);
00633     lt = &lts->tapes[tapenum];
00634     Assert(lt->writing);
00635 
00636     /* Allocate data buffer and first indirect block on first write */
00637     if (lt->buffer == NULL)
00638         lt->buffer = (char *) palloc(BLCKSZ);
00639     if (lt->indirect == NULL)
00640     {
00641         lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
00642         lt->indirect->nextSlot = 0;
00643         lt->indirect->nextup = NULL;
00644     }
00645 
00646     while (size > 0)
00647     {
00648         if (lt->pos >= BLCKSZ)
00649         {
00650             /* Buffer full, dump it out */
00651             if (lt->dirty)
00652                 ltsDumpBuffer(lts, lt);
00653             else
00654             {
00655                 /* Hmm, went directly from reading to writing? */
00656                 elog(ERROR, "invalid logtape state: should be dirty");
00657             }
00658             lt->numFullBlocks++;
00659             lt->curBlockNumber++;
00660             lt->pos = 0;
00661             lt->nbytes = 0;
00662         }
00663 
00664         nthistime = BLCKSZ - lt->pos;
00665         if (nthistime > size)
00666             nthistime = size;
00667         Assert(nthistime > 0);
00668 
00669         memcpy(lt->buffer + lt->pos, ptr, nthistime);
00670 
00671         lt->dirty = true;
00672         lt->pos += nthistime;
00673         if (lt->nbytes < lt->pos)
00674             lt->nbytes = lt->pos;
00675         ptr = (void *) ((char *) ptr + nthistime);
00676         size -= nthistime;
00677     }
00678 }
00679 
00680 /*
00681  * Rewind logical tape and switch from writing to reading or vice versa.
00682  *
00683  * Unless the tape has been "frozen" in read state, forWrite must be the
00684  * opposite of the previous tape state.
00685  */
00686 void
00687 LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
00688 {
00689     LogicalTape *lt;
00690     long        datablocknum;
00691 
00692     Assert(tapenum >= 0 && tapenum < lts->nTapes);
00693     lt = &lts->tapes[tapenum];
00694 
00695     if (!forWrite)
00696     {
00697         if (lt->writing)
00698         {
00699             /*
00700              * Completion of a write phase.  Flush last partial data block,
00701              * flush any partial indirect blocks, rewind for normal
00702              * (destructive) read.
00703              */
00704             if (lt->dirty)
00705                 ltsDumpBuffer(lts, lt);
00706             lt->lastBlockBytes = lt->nbytes;
00707             lt->writing = false;
00708             datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
00709         }
00710         else
00711         {
00712             /*
00713              * This is only OK if tape is frozen; we rewind for (another) read
00714              * pass.
00715              */
00716             Assert(lt->frozen);
00717             datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
00718         }
00719         /* Read the first block, or reset if tape is empty */
00720         lt->curBlockNumber = 0L;
00721         lt->pos = 0;
00722         lt->nbytes = 0;
00723         if (datablocknum != -1L)
00724         {
00725             ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
00726             if (!lt->frozen)
00727                 ltsReleaseBlock(lts, datablocknum);
00728             lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
00729                 BLCKSZ : lt->lastBlockBytes;
00730         }
00731     }
00732     else
00733     {
00734         /*
00735          * Completion of a read phase.  Rewind and prepare for write.
00736          *
00737          * NOTE: we assume the caller has read the tape to the end; otherwise
00738          * untouched data and indirect blocks will not have been freed. We
00739          * could add more code to free any unread blocks, but in current usage
00740          * of this module it'd be useless code.
00741          */
00742         IndirectBlock *ib,
00743                    *nextib;
00744 
00745         Assert(!lt->writing && !lt->frozen);
00746         /* Must truncate the indirect-block hierarchy down to one level. */
00747         if (lt->indirect)
00748         {
00749             for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
00750             {
00751                 nextib = ib->nextup;
00752                 pfree(ib);
00753             }
00754             lt->indirect->nextSlot = 0;
00755             lt->indirect->nextup = NULL;
00756         }
00757         lt->writing = true;
00758         lt->dirty = false;
00759         lt->numFullBlocks = 0L;
00760         lt->lastBlockBytes = 0;
00761         lt->curBlockNumber = 0L;
00762         lt->pos = 0;
00763         lt->nbytes = 0;
00764     }
00765 }
00766 
00767 /*
00768  * Read from a logical tape.
00769  *
00770  * Early EOF is indicated by return value less than #bytes requested.
00771  */
00772 size_t
00773 LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
00774                 void *ptr, size_t size)
00775 {
00776     LogicalTape *lt;
00777     size_t      nread = 0;
00778     size_t      nthistime;
00779 
00780     Assert(tapenum >= 0 && tapenum < lts->nTapes);
00781     lt = &lts->tapes[tapenum];
00782     Assert(!lt->writing);
00783 
00784     while (size > 0)
00785     {
00786         if (lt->pos >= lt->nbytes)
00787         {
00788             /* Try to load more data into buffer. */
00789             long        datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
00790                                                              lt->frozen);
00791 
00792             if (datablocknum == -1L)
00793                 break;          /* EOF */
00794             lt->curBlockNumber++;
00795             lt->pos = 0;
00796             ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
00797             if (!lt->frozen)
00798                 ltsReleaseBlock(lts, datablocknum);
00799             lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
00800                 BLCKSZ : lt->lastBlockBytes;
00801             if (lt->nbytes <= 0)
00802                 break;          /* EOF (possible here?) */
00803         }
00804 
00805         nthistime = lt->nbytes - lt->pos;
00806         if (nthistime > size)
00807             nthistime = size;
00808         Assert(nthistime > 0);
00809 
00810         memcpy(ptr, lt->buffer + lt->pos, nthistime);
00811 
00812         lt->pos += nthistime;
00813         ptr = (void *) ((char *) ptr + nthistime);
00814         size -= nthistime;
00815         nread += nthistime;
00816     }
00817 
00818     return nread;
00819 }
00820 
00821 /*
00822  * "Freeze" the contents of a tape so that it can be read multiple times
00823  * and/or read backwards.  Once a tape is frozen, its contents will not
00824  * be released until the LogicalTapeSet is destroyed.  This is expected
00825  * to be used only for the final output pass of a merge.
00826  *
00827  * This *must* be called just at the end of a write pass, before the
00828  * tape is rewound (after rewind is too late!).  It performs a rewind
00829  * and switch to read mode "for free".  An immediately following rewind-
00830  * for-read call is OK but not necessary.
00831  */
00832 void
00833 LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
00834 {
00835     LogicalTape *lt;
00836     long        datablocknum;
00837 
00838     Assert(tapenum >= 0 && tapenum < lts->nTapes);
00839     lt = &lts->tapes[tapenum];
00840     Assert(lt->writing);
00841 
00842     /*
00843      * Completion of a write phase.  Flush last partial data block, flush any
00844      * partial indirect blocks, rewind for nondestructive read.
00845      */
00846     if (lt->dirty)
00847         ltsDumpBuffer(lts, lt);
00848     lt->lastBlockBytes = lt->nbytes;
00849     lt->writing = false;
00850     lt->frozen = true;
00851     datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
00852     /* Read the first block, or reset if tape is empty */
00853     lt->curBlockNumber = 0L;
00854     lt->pos = 0;
00855     lt->nbytes = 0;
00856     if (datablocknum != -1L)
00857     {
00858         ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
00859         lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
00860             BLCKSZ : lt->lastBlockBytes;
00861     }
00862 }
00863 
00864 /*
00865  * Backspace the tape a given number of bytes.  (We also support a more
00866  * general seek interface, see below.)
00867  *
00868  * *Only* a frozen-for-read tape can be backed up; we don't support
00869  * random access during write, and an unfrozen read tape may have
00870  * already discarded the desired data!
00871  *
00872  * Return value is TRUE if seek successful, FALSE if there isn't that much
00873  * data before the current point (in which case there's no state change).
00874  */
00875 bool
00876 LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
00877 {
00878     LogicalTape *lt;
00879     long        nblocks;
00880     int         newpos;
00881 
00882     Assert(tapenum >= 0 && tapenum < lts->nTapes);
00883     lt = &lts->tapes[tapenum];
00884     Assert(lt->frozen);
00885 
00886     /*
00887      * Easy case for seek within current block.
00888      */
00889     if (size <= (size_t) lt->pos)
00890     {
00891         lt->pos -= (int) size;
00892         return true;
00893     }
00894 
00895     /*
00896      * Not-so-easy case.  Figure out whether it's possible at all.
00897      */
00898     size -= (size_t) lt->pos;   /* part within this block */
00899     nblocks = size / BLCKSZ;
00900     size = size % BLCKSZ;
00901     if (size)
00902     {
00903         nblocks++;
00904         newpos = (int) (BLCKSZ - size);
00905     }
00906     else
00907         newpos = 0;
00908     if (nblocks > lt->curBlockNumber)
00909         return false;           /* a seek too far... */
00910 
00911     /*
00912      * OK, we need to back up nblocks blocks.  This implementation would be
00913      * pretty inefficient for long seeks, but we really aren't expecting that
00914      * (a seek over one tuple is typical).
00915      */
00916     while (nblocks-- > 0)
00917     {
00918         long        datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
00919 
00920         if (datablocknum == -1L)
00921             elog(ERROR, "unexpected end of tape");
00922         lt->curBlockNumber--;
00923         if (nblocks == 0)
00924         {
00925             ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
00926             lt->nbytes = BLCKSZ;
00927         }
00928     }
00929     lt->pos = newpos;
00930     return true;
00931 }
00932 
00933 /*
00934  * Seek to an arbitrary position in a logical tape.
00935  *
00936  * *Only* a frozen-for-read tape can be seeked.
00937  *
00938  * Return value is TRUE if seek successful, FALSE if there isn't that much
00939  * data in the tape (in which case there's no state change).
00940  */
00941 bool
00942 LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
00943                 long blocknum, int offset)
00944 {
00945     LogicalTape *lt;
00946 
00947     Assert(tapenum >= 0 && tapenum < lts->nTapes);
00948     lt = &lts->tapes[tapenum];
00949     Assert(lt->frozen);
00950     Assert(offset >= 0 && offset <= BLCKSZ);
00951 
00952     /*
00953      * Easy case for seek within current block.
00954      */
00955     if (blocknum == lt->curBlockNumber && offset <= lt->nbytes)
00956     {
00957         lt->pos = offset;
00958         return true;
00959     }
00960 
00961     /*
00962      * Not-so-easy case.  Figure out whether it's possible at all.
00963      */
00964     if (blocknum < 0 || blocknum > lt->numFullBlocks ||
00965         (blocknum == lt->numFullBlocks && offset > lt->lastBlockBytes))
00966         return false;
00967 
00968     /*
00969      * OK, advance or back up to the target block.  This implementation would
00970      * be pretty inefficient for long seeks, but we really aren't expecting
00971      * that (a seek over one tuple is typical).
00972      */
00973     while (lt->curBlockNumber > blocknum)
00974     {
00975         long        datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
00976 
00977         if (datablocknum == -1L)
00978             elog(ERROR, "unexpected end of tape");
00979         if (--lt->curBlockNumber == blocknum)
00980             ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
00981     }
00982     while (lt->curBlockNumber < blocknum)
00983     {
00984         long        datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
00985                                                          lt->frozen);
00986 
00987         if (datablocknum == -1L)
00988             elog(ERROR, "unexpected end of tape");
00989         if (++lt->curBlockNumber == blocknum)
00990             ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
00991     }
00992     lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
00993         BLCKSZ : lt->lastBlockBytes;
00994     lt->pos = offset;
00995     return true;
00996 }
00997 
00998 /*
00999  * Obtain current position in a form suitable for a later LogicalTapeSeek.
01000  *
01001  * NOTE: it'd be OK to do this during write phase with intention of using
01002  * the position for a seek after freezing.  Not clear if anyone needs that.
01003  */
01004 void
01005 LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
01006                 long *blocknum, int *offset)
01007 {
01008     LogicalTape *lt;
01009 
01010     Assert(tapenum >= 0 && tapenum < lts->nTapes);
01011     lt = &lts->tapes[tapenum];
01012     *blocknum = lt->curBlockNumber;
01013     *offset = lt->pos;
01014 }
01015 
01016 /*
01017  * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
01018  */
01019 long
01020 LogicalTapeSetBlocks(LogicalTapeSet *lts)
01021 {
01022     return lts->nFileBlocks;
01023 }