Header And Logo

PostgreSQL
| The world's most advanced open source database.

hio.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * hio.c
00004  *    POSTGRES heap access method input/output code.
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/access/heap/hio.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 
00016 #include "postgres.h"
00017 
00018 #include "access/heapam.h"
00019 #include "access/hio.h"
00020 #include "access/htup_details.h"
00021 #include "access/visibilitymap.h"
00022 #include "storage/bufmgr.h"
00023 #include "storage/freespace.h"
00024 #include "storage/lmgr.h"
00025 #include "storage/smgr.h"
00026 
00027 
00028 /*
00029  * RelationPutHeapTuple - place tuple at specified page
00030  *
00031  * !!! EREPORT(ERROR) IS DISALLOWED HERE !!!  Must PANIC on failure!!!
00032  *
00033  * Note - caller must hold BUFFER_LOCK_EXCLUSIVE on the buffer.
00034  */
00035 void
00036 RelationPutHeapTuple(Relation relation,
00037                      Buffer buffer,
00038                      HeapTuple tuple)
00039 {
00040     Page        pageHeader;
00041     OffsetNumber offnum;
00042     ItemId      itemId;
00043     Item        item;
00044 
00045     /* Add the tuple to the page */
00046     pageHeader = BufferGetPage(buffer);
00047 
00048     offnum = PageAddItem(pageHeader, (Item) tuple->t_data,
00049                          tuple->t_len, InvalidOffsetNumber, false, true);
00050 
00051     if (offnum == InvalidOffsetNumber)
00052         elog(PANIC, "failed to add tuple to page");
00053 
00054     /* Update tuple->t_self to the actual position where it was stored */
00055     ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum);
00056 
00057     /* Insert the correct position into CTID of the stored tuple, too */
00058     itemId = PageGetItemId(pageHeader, offnum);
00059     item = PageGetItem(pageHeader, itemId);
00060     ((HeapTupleHeader) item)->t_ctid = tuple->t_self;
00061 }
00062 
00063 /*
00064  * Read in a buffer, using bulk-insert strategy if bistate isn't NULL.
00065  */
00066 static Buffer
00067 ReadBufferBI(Relation relation, BlockNumber targetBlock,
00068              BulkInsertState bistate)
00069 {
00070     Buffer      buffer;
00071 
00072     /* If not bulk-insert, exactly like ReadBuffer */
00073     if (!bistate)
00074         return ReadBuffer(relation, targetBlock);
00075 
00076     /* If we have the desired block already pinned, re-pin and return it */
00077     if (bistate->current_buf != InvalidBuffer)
00078     {
00079         if (BufferGetBlockNumber(bistate->current_buf) == targetBlock)
00080         {
00081             IncrBufferRefCount(bistate->current_buf);
00082             return bistate->current_buf;
00083         }
00084         /* ... else drop the old buffer */
00085         ReleaseBuffer(bistate->current_buf);
00086         bistate->current_buf = InvalidBuffer;
00087     }
00088 
00089     /* Perform a read using the buffer strategy */
00090     buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
00091                                 RBM_NORMAL, bistate->strategy);
00092 
00093     /* Save the selected block as target for future inserts */
00094     IncrBufferRefCount(buffer);
00095     bistate->current_buf = buffer;
00096 
00097     return buffer;
00098 }
00099 
00100 /*
00101  * For each heap page which is all-visible, acquire a pin on the appropriate
00102  * visibility map page, if we haven't already got one.
00103  *
00104  * buffer2 may be InvalidBuffer, if only one buffer is involved.  buffer1
00105  * must not be InvalidBuffer.  If both buffers are specified, buffer1 must
00106  * be less than buffer2.
00107  */
00108 static void
00109 GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
00110                      BlockNumber block1, BlockNumber block2,
00111                      Buffer *vmbuffer1, Buffer *vmbuffer2)
00112 {
00113     bool        need_to_pin_buffer1;
00114     bool        need_to_pin_buffer2;
00115 
00116     Assert(BufferIsValid(buffer1));
00117     Assert(buffer2 == InvalidBuffer || buffer1 <= buffer2);
00118 
00119     while (1)
00120     {
00121         /* Figure out which pins we need but don't have. */
00122         need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1))
00123             && !visibilitymap_pin_ok(block1, *vmbuffer1);
00124         need_to_pin_buffer2 = buffer2 != InvalidBuffer
00125             && PageIsAllVisible(BufferGetPage(buffer2))
00126             && !visibilitymap_pin_ok(block2, *vmbuffer2);
00127         if (!need_to_pin_buffer1 && !need_to_pin_buffer2)
00128             return;
00129 
00130         /* We must unlock both buffers before doing any I/O. */
00131         LockBuffer(buffer1, BUFFER_LOCK_UNLOCK);
00132         if (buffer2 != InvalidBuffer && buffer2 != buffer1)
00133             LockBuffer(buffer2, BUFFER_LOCK_UNLOCK);
00134 
00135         /* Get pins. */
00136         if (need_to_pin_buffer1)
00137             visibilitymap_pin(relation, block1, vmbuffer1);
00138         if (need_to_pin_buffer2)
00139             visibilitymap_pin(relation, block2, vmbuffer2);
00140 
00141         /* Relock buffers. */
00142         LockBuffer(buffer1, BUFFER_LOCK_EXCLUSIVE);
00143         if (buffer2 != InvalidBuffer && buffer2 != buffer1)
00144             LockBuffer(buffer2, BUFFER_LOCK_EXCLUSIVE);
00145 
00146         /*
00147          * If there are two buffers involved and we pinned just one of them,
00148          * it's possible that the second one became all-visible while we were
00149          * busy pinning the first one.  If it looks like that's a possible
00150          * scenario, we'll need to make a second pass through this loop.
00151          */
00152         if (buffer2 == InvalidBuffer || buffer1 == buffer2
00153             || (need_to_pin_buffer1 && need_to_pin_buffer2))
00154             break;
00155     }
00156 }
00157 
00158 /*
00159  * RelationGetBufferForTuple
00160  *
00161  *  Returns pinned and exclusive-locked buffer of a page in given relation
00162  *  with free space >= given len.
00163  *
00164  *  If otherBuffer is not InvalidBuffer, then it references a previously
00165  *  pinned buffer of another page in the same relation; on return, this
00166  *  buffer will also be exclusive-locked.  (This case is used by heap_update;
00167  *  the otherBuffer contains the tuple being updated.)
00168  *
00169  *  The reason for passing otherBuffer is that if two backends are doing
00170  *  concurrent heap_update operations, a deadlock could occur if they try
00171  *  to lock the same two buffers in opposite orders.  To ensure that this
00172  *  can't happen, we impose the rule that buffers of a relation must be
00173  *  locked in increasing page number order.  This is most conveniently done
00174  *  by having RelationGetBufferForTuple lock them both, with suitable care
00175  *  for ordering.
00176  *
00177  *  NOTE: it is unlikely, but not quite impossible, for otherBuffer to be the
00178  *  same buffer we select for insertion of the new tuple (this could only
00179  *  happen if space is freed in that page after heap_update finds there's not
00180  *  enough there).  In that case, the page will be pinned and locked only once.
00181  *
00182  *  For the vmbuffer and vmbuffer_other arguments, we avoid deadlock by
00183  *  locking them only after locking the corresponding heap page, and taking
00184  *  no further lwlocks while they are locked.
00185  *
00186  *  We normally use FSM to help us find free space.  However,
00187  *  if HEAP_INSERT_SKIP_FSM is specified, we just append a new empty page to
00188  *  the end of the relation if the tuple won't fit on the current target page.
00189  *  This can save some cycles when we know the relation is new and doesn't
00190  *  contain useful amounts of free space.
00191  *
00192  *  HEAP_INSERT_SKIP_FSM is also useful for non-WAL-logged additions to a
00193  *  relation, if the caller holds exclusive lock and is careful to invalidate
00194  *  relation's smgr_targblock before the first insertion --- that ensures that
00195  *  all insertions will occur into newly added pages and not be intermixed
00196  *  with tuples from other transactions.  That way, a crash can't risk losing
00197  *  any committed data of other transactions.  (See heap_insert's comments
00198  *  for additional constraints needed for safe usage of this behavior.)
00199  *
00200  *  The caller can also provide a BulkInsertState object to optimize many
00201  *  insertions into the same relation.  This keeps a pin on the current
00202  *  insertion target page (to save pin/unpin cycles) and also passes a
00203  *  BULKWRITE buffer selection strategy object to the buffer manager.
00204  *  Passing NULL for bistate selects the default behavior.
00205  *
00206  *  We always try to avoid filling existing pages further than the fillfactor.
00207  *  This is OK since this routine is not consulted when updating a tuple and
00208  *  keeping it on the same page, which is the scenario fillfactor is meant
00209  *  to reserve space for.
00210  *
00211  *  ereport(ERROR) is allowed here, so this routine *must* be called
00212  *  before any (unlogged) changes are made in buffer pool.
00213  */
00214 Buffer
00215 RelationGetBufferForTuple(Relation relation, Size len,
00216                           Buffer otherBuffer, int options,
00217                           BulkInsertState bistate,
00218                           Buffer *vmbuffer, Buffer *vmbuffer_other)
00219 {
00220     bool        use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
00221     Buffer      buffer = InvalidBuffer;
00222     Page        page;
00223     Size        pageFreeSpace,
00224                 saveFreeSpace;
00225     BlockNumber targetBlock,
00226                 otherBlock;
00227     bool        needLock;
00228 
00229     len = MAXALIGN(len);        /* be conservative */
00230 
00231     /* Bulk insert is not supported for updates, only inserts. */
00232     Assert(otherBuffer == InvalidBuffer || !bistate);
00233 
00234     /*
00235      * If we're gonna fail for oversize tuple, do it right away
00236      */
00237     if (len > MaxHeapTupleSize)
00238         ereport(ERROR,
00239                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
00240                  errmsg("row is too big: size %lu, maximum size %lu",
00241                         (unsigned long) len,
00242                         (unsigned long) MaxHeapTupleSize)));
00243 
00244     /* Compute desired extra freespace due to fillfactor option */
00245     saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
00246                                                    HEAP_DEFAULT_FILLFACTOR);
00247 
00248     if (otherBuffer != InvalidBuffer)
00249         otherBlock = BufferGetBlockNumber(otherBuffer);
00250     else
00251         otherBlock = InvalidBlockNumber;        /* just to keep compiler quiet */
00252 
00253     /*
00254      * We first try to put the tuple on the same page we last inserted a tuple
00255      * on, as cached in the BulkInsertState or relcache entry.  If that
00256      * doesn't work, we ask the Free Space Map to locate a suitable page.
00257      * Since the FSM's info might be out of date, we have to be prepared to
00258      * loop around and retry multiple times. (To insure this isn't an infinite
00259      * loop, we must update the FSM with the correct amount of free space on
00260      * each page that proves not to be suitable.)  If the FSM has no record of
00261      * a page with enough free space, we give up and extend the relation.
00262      *
00263      * When use_fsm is false, we either put the tuple onto the existing target
00264      * page or extend the relation.
00265      */
00266     if (len + saveFreeSpace > MaxHeapTupleSize)
00267     {
00268         /* can't fit, don't bother asking FSM */
00269         targetBlock = InvalidBlockNumber;
00270         use_fsm = false;
00271     }
00272     else if (bistate && bistate->current_buf != InvalidBuffer)
00273         targetBlock = BufferGetBlockNumber(bistate->current_buf);
00274     else
00275         targetBlock = RelationGetTargetBlock(relation);
00276 
00277     if (targetBlock == InvalidBlockNumber && use_fsm)
00278     {
00279         /*
00280          * We have no cached target page, so ask the FSM for an initial
00281          * target.
00282          */
00283         targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
00284 
00285         /*
00286          * If the FSM knows nothing of the rel, try the last page before we
00287          * give up and extend.  This avoids one-tuple-per-page syndrome during
00288          * bootstrapping or in a recently-started system.
00289          */
00290         if (targetBlock == InvalidBlockNumber)
00291         {
00292             BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
00293 
00294             if (nblocks > 0)
00295                 targetBlock = nblocks - 1;
00296         }
00297     }
00298 
00299     while (targetBlock != InvalidBlockNumber)
00300     {
00301         /*
00302          * Read and exclusive-lock the target block, as well as the other
00303          * block if one was given, taking suitable care with lock ordering and
00304          * the possibility they are the same block.
00305          *
00306          * If the page-level all-visible flag is set, caller will need to
00307          * clear both that and the corresponding visibility map bit.  However,
00308          * by the time we return, we'll have x-locked the buffer, and we don't
00309          * want to do any I/O while in that state.  So we check the bit here
00310          * before taking the lock, and pin the page if it appears necessary.
00311          * Checking without the lock creates a risk of getting the wrong
00312          * answer, so we'll have to recheck after acquiring the lock.
00313          */
00314         if (otherBuffer == InvalidBuffer)
00315         {
00316             /* easy case */
00317             buffer = ReadBufferBI(relation, targetBlock, bistate);
00318             if (PageIsAllVisible(BufferGetPage(buffer)))
00319                 visibilitymap_pin(relation, targetBlock, vmbuffer);
00320             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
00321         }
00322         else if (otherBlock == targetBlock)
00323         {
00324             /* also easy case */
00325             buffer = otherBuffer;
00326             if (PageIsAllVisible(BufferGetPage(buffer)))
00327                 visibilitymap_pin(relation, targetBlock, vmbuffer);
00328             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
00329         }
00330         else if (otherBlock < targetBlock)
00331         {
00332             /* lock other buffer first */
00333             buffer = ReadBuffer(relation, targetBlock);
00334             if (PageIsAllVisible(BufferGetPage(buffer)))
00335                 visibilitymap_pin(relation, targetBlock, vmbuffer);
00336             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
00337             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
00338         }
00339         else
00340         {
00341             /* lock target buffer first */
00342             buffer = ReadBuffer(relation, targetBlock);
00343             if (PageIsAllVisible(BufferGetPage(buffer)))
00344                 visibilitymap_pin(relation, targetBlock, vmbuffer);
00345             LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
00346             LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
00347         }
00348 
00349         /*
00350          * We now have the target page (and the other buffer, if any) pinned
00351          * and locked.  However, since our initial PageIsAllVisible checks
00352          * were performed before acquiring the lock, the results might now be
00353          * out of date, either for the selected victim buffer, or for the
00354          * other buffer passed by the caller.  In that case, we'll need to
00355          * give up our locks, go get the pin(s) we failed to get earlier, and
00356          * re-lock.  That's pretty painful, but hopefully shouldn't happen
00357          * often.
00358          *
00359          * Note that there's a small possibility that we didn't pin the page
00360          * above but still have the correct page pinned anyway, either because
00361          * we've already made a previous pass through this loop, or because
00362          * caller passed us the right page anyway.
00363          *
00364          * Note also that it's possible that by the time we get the pin and
00365          * retake the buffer locks, the visibility map bit will have been
00366          * cleared by some other backend anyway.  In that case, we'll have
00367          * done a bit of extra work for no gain, but there's no real harm
00368          * done.
00369          */
00370         if (otherBuffer == InvalidBuffer || buffer <= otherBuffer)
00371             GetVisibilityMapPins(relation, buffer, otherBuffer,
00372                                  targetBlock, otherBlock, vmbuffer,
00373                                  vmbuffer_other);
00374         else
00375             GetVisibilityMapPins(relation, otherBuffer, buffer,
00376                                  otherBlock, targetBlock, vmbuffer_other,
00377                                  vmbuffer);
00378 
00379         /*
00380          * Now we can check to see if there's enough free space here. If so,
00381          * we're done.
00382          */
00383         page = BufferGetPage(buffer);
00384         pageFreeSpace = PageGetHeapFreeSpace(page);
00385         if (len + saveFreeSpace <= pageFreeSpace)
00386         {
00387             /* use this page as future insert target, too */
00388             RelationSetTargetBlock(relation, targetBlock);
00389             return buffer;
00390         }
00391 
00392         /*
00393          * Not enough space, so we must give up our page locks and pin (if
00394          * any) and prepare to look elsewhere.  We don't care which order we
00395          * unlock the two buffers in, so this can be slightly simpler than the
00396          * code above.
00397          */
00398         LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
00399         if (otherBuffer == InvalidBuffer)
00400             ReleaseBuffer(buffer);
00401         else if (otherBlock != targetBlock)
00402         {
00403             LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
00404             ReleaseBuffer(buffer);
00405         }
00406 
00407         /* Without FSM, always fall out of the loop and extend */
00408         if (!use_fsm)
00409             break;
00410 
00411         /*
00412          * Update FSM as to condition of this page, and ask for another page
00413          * to try.
00414          */
00415         targetBlock = RecordAndGetPageWithFreeSpace(relation,
00416                                                     targetBlock,
00417                                                     pageFreeSpace,
00418                                                     len + saveFreeSpace);
00419     }
00420 
00421     /*
00422      * Have to extend the relation.
00423      *
00424      * We have to use a lock to ensure no one else is extending the rel at the
00425      * same time, else we will both try to initialize the same new page.  We
00426      * can skip locking for new or temp relations, however, since no one else
00427      * could be accessing them.
00428      */
00429     needLock = !RELATION_IS_LOCAL(relation);
00430 
00431     if (needLock)
00432         LockRelationForExtension(relation, ExclusiveLock);
00433 
00434     /*
00435      * XXX This does an lseek - rather expensive - but at the moment it is the
00436      * only way to accurately determine how many blocks are in a relation.  Is
00437      * it worth keeping an accurate file length in shared memory someplace,
00438      * rather than relying on the kernel to do it for us?
00439      */
00440     buffer = ReadBufferBI(relation, P_NEW, bistate);
00441 
00442     /*
00443      * We can be certain that locking the otherBuffer first is OK, since it
00444      * must have a lower page number.
00445      */
00446     if (otherBuffer != InvalidBuffer)
00447         LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
00448 
00449     /*
00450      * Now acquire lock on the new page.
00451      */
00452     LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
00453 
00454     /*
00455      * Release the file-extension lock; it's now OK for someone else to extend
00456      * the relation some more.  Note that we cannot release this lock before
00457      * we have buffer lock on the new page, or we risk a race condition
00458      * against vacuumlazy.c --- see comments therein.
00459      */
00460     if (needLock)
00461         UnlockRelationForExtension(relation, ExclusiveLock);
00462 
00463     /*
00464      * We need to initialize the empty new page.  Double-check that it really
00465      * is empty (this should never happen, but if it does we don't want to
00466      * risk wiping out valid data).
00467      */
00468     page = BufferGetPage(buffer);
00469 
00470     if (!PageIsNew(page))
00471         elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
00472              BufferGetBlockNumber(buffer),
00473              RelationGetRelationName(relation));
00474 
00475     PageInit(page, BufferGetPageSize(buffer), 0);
00476 
00477     if (len > PageGetHeapFreeSpace(page))
00478     {
00479         /* We should not get here given the test at the top */
00480         elog(PANIC, "tuple is too big: size %lu", (unsigned long) len);
00481     }
00482 
00483     /*
00484      * Remember the new page as our target for future insertions.
00485      *
00486      * XXX should we enter the new page into the free space map immediately,
00487      * or just keep it for this backend's exclusive use in the short run
00488      * (until VACUUM sees it)?  Seems to depend on whether you expect the
00489      * current backend to make more insertions or not, which is probably a
00490      * good bet most of the time.  So for now, don't add it to FSM yet.
00491      */
00492     RelationSetTargetBlock(relation, BufferGetBlockNumber(buffer));
00493 
00494     return buffer;
00495 }