Header And Logo

PostgreSQL
| The world's most advanced open source database.

bufpage.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * bufpage.c
00004  *    POSTGRES standard buffer page code.
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/storage/page/bufpage.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 #include "postgres.h"
00016 
00017 #include "access/htup_details.h"
00018 #include "access/xlog.h"
00019 #include "storage/checksum.h"
00020 
00021 bool ignore_checksum_failure = false;
00022 
00023 static char pageCopyData[BLCKSZ];   /* for checksum calculation */
00024 static Page pageCopy = pageCopyData;
00025 
00026 static uint16 PageCalcChecksum16(Page page, BlockNumber blkno);
00027 
00028 /* ----------------------------------------------------------------
00029  *                      Page support functions
00030  * ----------------------------------------------------------------
00031  */
00032 
00033 /*
00034  * PageInit
00035  *      Initializes the contents of a page.
00036  *      Note that we don't calculate an initial checksum here; that's not done
00037  *      until it's time to write.
00038  */
00039 void
00040 PageInit(Page page, Size pageSize, Size specialSize)
00041 {
00042     PageHeader  p = (PageHeader) page;
00043 
00044     specialSize = MAXALIGN(specialSize);
00045 
00046     Assert(pageSize == BLCKSZ);
00047     Assert(pageSize > specialSize + SizeOfPageHeaderData);
00048 
00049     /* Make sure all fields of page are zero, as well as unused space */
00050     MemSet(p, 0, pageSize);
00051 
00052     p->pd_flags = 0;
00053     p->pd_lower = SizeOfPageHeaderData;
00054     p->pd_upper = pageSize - specialSize;
00055     p->pd_special = pageSize - specialSize;
00056     PageSetPageSizeAndVersion(page, pageSize, PG_PAGE_LAYOUT_VERSION);
00057     /* p->pd_prune_xid = InvalidTransactionId;      done by above MemSet */
00058 }
00059 
00060 
00061 /*
00062  * PageIsVerified
00063  *      Check that the page header and checksum (if any) appear valid.
00064  *
00065  * This is called when a page has just been read in from disk.  The idea is
00066  * to cheaply detect trashed pages before we go nuts following bogus item
00067  * pointers, testing invalid transaction identifiers, etc.
00068  *
00069  * It turns out to be necessary to allow zeroed pages here too.  Even though
00070  * this routine is *not* called when deliberately adding a page to a relation,
00071  * there are scenarios in which a zeroed page might be found in a table.
00072  * (Example: a backend extends a relation, then crashes before it can write
00073  * any WAL entry about the new page.  The kernel will already have the
00074  * zeroed page in the file, and it will stay that way after restart.)  So we
00075  * allow zeroed pages here, and are careful that the page access macros
00076  * treat such a page as empty and without free space.  Eventually, VACUUM
00077  * will clean up such a page and make it usable.
00078  */
00079 bool
00080 PageIsVerified(Page page, BlockNumber blkno)
00081 {
00082     PageHeader  p = (PageHeader) page;
00083     char       *pagebytes;
00084     int         i;
00085     bool        checksum_failure = false;
00086     bool        header_sane = false;
00087     bool        all_zeroes = false;
00088     uint16      checksum = 0;
00089 
00090     /*
00091      * Don't verify page data unless the page passes basic non-zero test
00092      */
00093     if (!PageIsNew(page))
00094     {
00095         if (DataChecksumsEnabled())
00096         {
00097             checksum = PageCalcChecksum16(page, blkno);
00098 
00099             if (checksum != p->pd_checksum)
00100                 checksum_failure = true;
00101         }
00102 
00103         /*
00104          * The following checks don't prove the header is correct,
00105          * only that it looks sane enough to allow into the buffer pool.
00106          * Later usage of the block can still reveal problems,
00107          * which is why we offer the checksum option.
00108          */
00109         if ((p->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
00110              p->pd_lower <= p->pd_upper &&
00111              p->pd_upper <= p->pd_special &&
00112              p->pd_special <= BLCKSZ &&
00113              p->pd_special == MAXALIGN(p->pd_special))
00114             header_sane = true;
00115 
00116         if (header_sane && !checksum_failure)
00117             return true;
00118     }
00119 
00120     /* Check all-zeroes case */
00121     all_zeroes = true;
00122     pagebytes = (char *) page;
00123     for (i = 0; i < BLCKSZ; i++)
00124     {
00125         if (pagebytes[i] != 0)
00126         {
00127             all_zeroes = false;
00128             break;
00129         }
00130     }
00131 
00132     if (all_zeroes)
00133         return true;
00134 
00135     /*
00136      * Throw a WARNING if the checksum fails, but only after we've checked for
00137      * the all-zeroes case.
00138      */
00139     if (checksum_failure)
00140     {
00141         ereport(WARNING,
00142                 (ERRCODE_DATA_CORRUPTED,
00143                  errmsg("page verification failed, calculated checksum %u but expected %u",
00144                         checksum, p->pd_checksum)));
00145 
00146         if (header_sane && ignore_checksum_failure)
00147             return true;
00148     }
00149 
00150     return false;
00151 }
00152 
00153 
00154 /*
00155  *  PageAddItem
00156  *
00157  *  Add an item to a page.  Return value is offset at which it was
00158  *  inserted, or InvalidOffsetNumber if there's not room to insert.
00159  *
00160  *  If overwrite is true, we just store the item at the specified
00161  *  offsetNumber (which must be either a currently-unused item pointer,
00162  *  or one past the last existing item).  Otherwise,
00163  *  if offsetNumber is valid and <= current max offset in the page,
00164  *  insert item into the array at that position by shuffling ItemId's
00165  *  down to make room.
00166  *  If offsetNumber is not valid, then assign one by finding the first
00167  *  one that is both unused and deallocated.
00168  *
00169  *  If is_heap is true, we enforce that there can't be more than
00170  *  MaxHeapTuplesPerPage line pointers on the page.
00171  *
00172  *  !!! EREPORT(ERROR) IS DISALLOWED HERE !!!
00173  */
00174 OffsetNumber
00175 PageAddItem(Page page,
00176             Item item,
00177             Size size,
00178             OffsetNumber offsetNumber,
00179             bool overwrite,
00180             bool is_heap)
00181 {
00182     PageHeader  phdr = (PageHeader) page;
00183     Size        alignedSize;
00184     int         lower;
00185     int         upper;
00186     ItemId      itemId;
00187     OffsetNumber limit;
00188     bool        needshuffle = false;
00189 
00190     /*
00191      * Be wary about corrupted page pointers
00192      */
00193     if (phdr->pd_lower < SizeOfPageHeaderData ||
00194         phdr->pd_lower > phdr->pd_upper ||
00195         phdr->pd_upper > phdr->pd_special ||
00196         phdr->pd_special > BLCKSZ)
00197         ereport(PANIC,
00198                 (errcode(ERRCODE_DATA_CORRUPTED),
00199                  errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
00200                         phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
00201 
00202     /*
00203      * Select offsetNumber to place the new item at
00204      */
00205     limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
00206 
00207     /* was offsetNumber passed in? */
00208     if (OffsetNumberIsValid(offsetNumber))
00209     {
00210         /* yes, check it */
00211         if (overwrite)
00212         {
00213             if (offsetNumber < limit)
00214             {
00215                 itemId = PageGetItemId(phdr, offsetNumber);
00216                 if (ItemIdIsUsed(itemId) || ItemIdHasStorage(itemId))
00217                 {
00218                     elog(WARNING, "will not overwrite a used ItemId");
00219                     return InvalidOffsetNumber;
00220                 }
00221             }
00222         }
00223         else
00224         {
00225             if (offsetNumber < limit)
00226                 needshuffle = true;     /* need to move existing linp's */
00227         }
00228     }
00229     else
00230     {
00231         /* offsetNumber was not passed in, so find a free slot */
00232         /* if no free slot, we'll put it at limit (1st open slot) */
00233         if (PageHasFreeLinePointers(phdr))
00234         {
00235             /*
00236              * Look for "recyclable" (unused) ItemId.  We check for no storage
00237              * as well, just to be paranoid --- unused items should never have
00238              * storage.
00239              */
00240             for (offsetNumber = 1; offsetNumber < limit; offsetNumber++)
00241             {
00242                 itemId = PageGetItemId(phdr, offsetNumber);
00243                 if (!ItemIdIsUsed(itemId) && !ItemIdHasStorage(itemId))
00244                     break;
00245             }
00246             if (offsetNumber >= limit)
00247             {
00248                 /* the hint is wrong, so reset it */
00249                 PageClearHasFreeLinePointers(phdr);
00250             }
00251         }
00252         else
00253         {
00254             /* don't bother searching if hint says there's no free slot */
00255             offsetNumber = limit;
00256         }
00257     }
00258 
00259     if (offsetNumber > limit)
00260     {
00261         elog(WARNING, "specified item offset is too large");
00262         return InvalidOffsetNumber;
00263     }
00264 
00265     if (is_heap && offsetNumber > MaxHeapTuplesPerPage)
00266     {
00267         elog(WARNING, "can't put more than MaxHeapTuplesPerPage items in a heap page");
00268         return InvalidOffsetNumber;
00269     }
00270 
00271     /*
00272      * Compute new lower and upper pointers for page, see if it'll fit.
00273      *
00274      * Note: do arithmetic as signed ints, to avoid mistakes if, say,
00275      * alignedSize > pd_upper.
00276      */
00277     if (offsetNumber == limit || needshuffle)
00278         lower = phdr->pd_lower + sizeof(ItemIdData);
00279     else
00280         lower = phdr->pd_lower;
00281 
00282     alignedSize = MAXALIGN(size);
00283 
00284     upper = (int) phdr->pd_upper - (int) alignedSize;
00285 
00286     if (lower > upper)
00287         return InvalidOffsetNumber;
00288 
00289     /*
00290      * OK to insert the item.  First, shuffle the existing pointers if needed.
00291      */
00292     itemId = PageGetItemId(phdr, offsetNumber);
00293 
00294     if (needshuffle)
00295         memmove(itemId + 1, itemId,
00296                 (limit - offsetNumber) * sizeof(ItemIdData));
00297 
00298     /* set the item pointer */
00299     ItemIdSetNormal(itemId, upper, size);
00300 
00301     /* copy the item's data onto the page */
00302     memcpy((char *) page + upper, item, size);
00303 
00304     /* adjust page header */
00305     phdr->pd_lower = (LocationIndex) lower;
00306     phdr->pd_upper = (LocationIndex) upper;
00307 
00308     return offsetNumber;
00309 }
00310 
00311 /*
00312  * PageGetTempPage
00313  *      Get a temporary page in local memory for special processing.
00314  *      The returned page is not initialized at all; caller must do that.
00315  */
00316 Page
00317 PageGetTempPage(Page page)
00318 {
00319     Size        pageSize;
00320     Page        temp;
00321 
00322     pageSize = PageGetPageSize(page);
00323     temp = (Page) palloc(pageSize);
00324 
00325     return temp;
00326 }
00327 
00328 /*
00329  * PageGetTempPageCopy
00330  *      Get a temporary page in local memory for special processing.
00331  *      The page is initialized by copying the contents of the given page.
00332  */
00333 Page
00334 PageGetTempPageCopy(Page page)
00335 {
00336     Size        pageSize;
00337     Page        temp;
00338 
00339     pageSize = PageGetPageSize(page);
00340     temp = (Page) palloc(pageSize);
00341 
00342     memcpy(temp, page, pageSize);
00343 
00344     return temp;
00345 }
00346 
00347 /*
00348  * PageGetTempPageCopySpecial
00349  *      Get a temporary page in local memory for special processing.
00350  *      The page is PageInit'd with the same special-space size as the
00351  *      given page, and the special space is copied from the given page.
00352  */
00353 Page
00354 PageGetTempPageCopySpecial(Page page)
00355 {
00356     Size        pageSize;
00357     Page        temp;
00358 
00359     pageSize = PageGetPageSize(page);
00360     temp = (Page) palloc(pageSize);
00361 
00362     PageInit(temp, pageSize, PageGetSpecialSize(page));
00363     memcpy(PageGetSpecialPointer(temp),
00364            PageGetSpecialPointer(page),
00365            PageGetSpecialSize(page));
00366 
00367     return temp;
00368 }
00369 
00370 /*
00371  * PageRestoreTempPage
00372  *      Copy temporary page back to permanent page after special processing
00373  *      and release the temporary page.
00374  */
00375 void
00376 PageRestoreTempPage(Page tempPage, Page oldPage)
00377 {
00378     Size        pageSize;
00379 
00380     pageSize = PageGetPageSize(tempPage);
00381     memcpy((char *) oldPage, (char *) tempPage, pageSize);
00382 
00383     pfree(tempPage);
00384 }
00385 
00386 /*
00387  * sorting support for PageRepairFragmentation and PageIndexMultiDelete
00388  */
00389 typedef struct itemIdSortData
00390 {
00391     int         offsetindex;    /* linp array index */
00392     int         itemoff;        /* page offset of item data */
00393     Size        alignedlen;     /* MAXALIGN(item data len) */
00394     ItemIdData  olditemid;      /* used only in PageIndexMultiDelete */
00395 } itemIdSortData;
00396 typedef itemIdSortData *itemIdSort;
00397 
00398 static int
00399 itemoffcompare(const void *itemidp1, const void *itemidp2)
00400 {
00401     /* Sort in decreasing itemoff order */
00402     return ((itemIdSort) itemidp2)->itemoff -
00403         ((itemIdSort) itemidp1)->itemoff;
00404 }
00405 
00406 /*
00407  * PageRepairFragmentation
00408  *
00409  * Frees fragmented space on a page.
00410  * It doesn't remove unused line pointers! Please don't change this.
00411  *
00412  * This routine is usable for heap pages only, but see PageIndexMultiDelete.
00413  *
00414  * As a side effect, the page's PD_HAS_FREE_LINES hint bit is updated.
00415  */
00416 void
00417 PageRepairFragmentation(Page page)
00418 {
00419     Offset      pd_lower = ((PageHeader) page)->pd_lower;
00420     Offset      pd_upper = ((PageHeader) page)->pd_upper;
00421     Offset      pd_special = ((PageHeader) page)->pd_special;
00422     itemIdSort  itemidbase,
00423                 itemidptr;
00424     ItemId      lp;
00425     int         nline,
00426                 nstorage,
00427                 nunused;
00428     int         i;
00429     Size        totallen;
00430     Offset      upper;
00431 
00432     /*
00433      * It's worth the trouble to be more paranoid here than in most places,
00434      * because we are about to reshuffle data in (what is usually) a shared
00435      * disk buffer.  If we aren't careful then corrupted pointers, lengths,
00436      * etc could cause us to clobber adjacent disk buffers, spreading the data
00437      * loss further.  So, check everything.
00438      */
00439     if (pd_lower < SizeOfPageHeaderData ||
00440         pd_lower > pd_upper ||
00441         pd_upper > pd_special ||
00442         pd_special > BLCKSZ ||
00443         pd_special != MAXALIGN(pd_special))
00444         ereport(ERROR,
00445                 (errcode(ERRCODE_DATA_CORRUPTED),
00446                  errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
00447                         pd_lower, pd_upper, pd_special)));
00448 
00449     nline = PageGetMaxOffsetNumber(page);
00450     nunused = nstorage = 0;
00451     for (i = FirstOffsetNumber; i <= nline; i++)
00452     {
00453         lp = PageGetItemId(page, i);
00454         if (ItemIdIsUsed(lp))
00455         {
00456             if (ItemIdHasStorage(lp))
00457                 nstorage++;
00458         }
00459         else
00460         {
00461             /* Unused entries should have lp_len = 0, but make sure */
00462             ItemIdSetUnused(lp);
00463             nunused++;
00464         }
00465     }
00466 
00467     if (nstorage == 0)
00468     {
00469         /* Page is completely empty, so just reset it quickly */
00470         ((PageHeader) page)->pd_upper = pd_special;
00471     }
00472     else
00473     {                           /* nstorage != 0 */
00474         /* Need to compact the page the hard way */
00475         itemidbase = (itemIdSort) palloc(sizeof(itemIdSortData) * nstorage);
00476         itemidptr = itemidbase;
00477         totallen = 0;
00478         for (i = 0; i < nline; i++)
00479         {
00480             lp = PageGetItemId(page, i + 1);
00481             if (ItemIdHasStorage(lp))
00482             {
00483                 itemidptr->offsetindex = i;
00484                 itemidptr->itemoff = ItemIdGetOffset(lp);
00485                 if (itemidptr->itemoff < (int) pd_upper ||
00486                     itemidptr->itemoff >= (int) pd_special)
00487                     ereport(ERROR,
00488                             (errcode(ERRCODE_DATA_CORRUPTED),
00489                              errmsg("corrupted item pointer: %u",
00490                                     itemidptr->itemoff)));
00491                 itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
00492                 totallen += itemidptr->alignedlen;
00493                 itemidptr++;
00494             }
00495         }
00496 
00497         if (totallen > (Size) (pd_special - pd_lower))
00498             ereport(ERROR,
00499                     (errcode(ERRCODE_DATA_CORRUPTED),
00500                errmsg("corrupted item lengths: total %u, available space %u",
00501                       (unsigned int) totallen, pd_special - pd_lower)));
00502 
00503         /* sort itemIdSortData array into decreasing itemoff order */
00504         qsort((char *) itemidbase, nstorage, sizeof(itemIdSortData),
00505               itemoffcompare);
00506 
00507         /* compactify page */
00508         upper = pd_special;
00509 
00510         for (i = 0, itemidptr = itemidbase; i < nstorage; i++, itemidptr++)
00511         {
00512             lp = PageGetItemId(page, itemidptr->offsetindex + 1);
00513             upper -= itemidptr->alignedlen;
00514             memmove((char *) page + upper,
00515                     (char *) page + itemidptr->itemoff,
00516                     itemidptr->alignedlen);
00517             lp->lp_off = upper;
00518         }
00519 
00520         ((PageHeader) page)->pd_upper = upper;
00521 
00522         pfree(itemidbase);
00523     }
00524 
00525     /* Set hint bit for PageAddItem */
00526     if (nunused > 0)
00527         PageSetHasFreeLinePointers(page);
00528     else
00529         PageClearHasFreeLinePointers(page);
00530 }
00531 
00532 /*
00533  * PageGetFreeSpace
00534  *      Returns the size of the free (allocatable) space on a page,
00535  *      reduced by the space needed for a new line pointer.
00536  *
00537  * Note: this should usually only be used on index pages.  Use
00538  * PageGetHeapFreeSpace on heap pages.
00539  */
00540 Size
00541 PageGetFreeSpace(Page page)
00542 {
00543     int         space;
00544 
00545     /*
00546      * Use signed arithmetic here so that we behave sensibly if pd_lower >
00547      * pd_upper.
00548      */
00549     space = (int) ((PageHeader) page)->pd_upper -
00550         (int) ((PageHeader) page)->pd_lower;
00551 
00552     if (space < (int) sizeof(ItemIdData))
00553         return 0;
00554     space -= sizeof(ItemIdData);
00555 
00556     return (Size) space;
00557 }
00558 
00559 /*
00560  * PageGetExactFreeSpace
00561  *      Returns the size of the free (allocatable) space on a page,
00562  *      without any consideration for adding/removing line pointers.
00563  */
00564 Size
00565 PageGetExactFreeSpace(Page page)
00566 {
00567     int         space;
00568 
00569     /*
00570      * Use signed arithmetic here so that we behave sensibly if pd_lower >
00571      * pd_upper.
00572      */
00573     space = (int) ((PageHeader) page)->pd_upper -
00574         (int) ((PageHeader) page)->pd_lower;
00575 
00576     if (space < 0)
00577         return 0;
00578 
00579     return (Size) space;
00580 }
00581 
00582 
00583 /*
00584  * PageGetHeapFreeSpace
00585  *      Returns the size of the free (allocatable) space on a page,
00586  *      reduced by the space needed for a new line pointer.
00587  *
00588  * The difference between this and PageGetFreeSpace is that this will return
00589  * zero if there are already MaxHeapTuplesPerPage line pointers in the page
00590  * and none are free.  We use this to enforce that no more than
00591  * MaxHeapTuplesPerPage line pointers are created on a heap page.  (Although
00592  * no more tuples than that could fit anyway, in the presence of redirected
00593  * or dead line pointers it'd be possible to have too many line pointers.
00594  * To avoid breaking code that assumes MaxHeapTuplesPerPage is a hard limit
00595  * on the number of line pointers, we make this extra check.)
00596  */
00597 Size
00598 PageGetHeapFreeSpace(Page page)
00599 {
00600     Size        space;
00601 
00602     space = PageGetFreeSpace(page);
00603     if (space > 0)
00604     {
00605         OffsetNumber offnum,
00606                     nline;
00607 
00608         /*
00609          * Are there already MaxHeapTuplesPerPage line pointers in the page?
00610          */
00611         nline = PageGetMaxOffsetNumber(page);
00612         if (nline >= MaxHeapTuplesPerPage)
00613         {
00614             if (PageHasFreeLinePointers((PageHeader) page))
00615             {
00616                 /*
00617                  * Since this is just a hint, we must confirm that there is
00618                  * indeed a free line pointer
00619                  */
00620                 for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
00621                 {
00622                     ItemId      lp = PageGetItemId(page, offnum);
00623 
00624                     if (!ItemIdIsUsed(lp))
00625                         break;
00626                 }
00627 
00628                 if (offnum > nline)
00629                 {
00630                     /*
00631                      * The hint is wrong, but we can't clear it here since we
00632                      * don't have the ability to mark the page dirty.
00633                      */
00634                     space = 0;
00635                 }
00636             }
00637             else
00638             {
00639                 /*
00640                  * Although the hint might be wrong, PageAddItem will believe
00641                  * it anyway, so we must believe it too.
00642                  */
00643                 space = 0;
00644             }
00645         }
00646     }
00647     return space;
00648 }
00649 
00650 
00651 /*
00652  * PageIndexTupleDelete
00653  *
00654  * This routine does the work of removing a tuple from an index page.
00655  *
00656  * Unlike heap pages, we compact out the line pointer for the removed tuple.
00657  */
00658 void
00659 PageIndexTupleDelete(Page page, OffsetNumber offnum)
00660 {
00661     PageHeader  phdr = (PageHeader) page;
00662     char       *addr;
00663     ItemId      tup;
00664     Size        size;
00665     unsigned    offset;
00666     int         nbytes;
00667     int         offidx;
00668     int         nline;
00669 
00670     /*
00671      * As with PageRepairFragmentation, paranoia seems justified.
00672      */
00673     if (phdr->pd_lower < SizeOfPageHeaderData ||
00674         phdr->pd_lower > phdr->pd_upper ||
00675         phdr->pd_upper > phdr->pd_special ||
00676         phdr->pd_special > BLCKSZ)
00677         ereport(ERROR,
00678                 (errcode(ERRCODE_DATA_CORRUPTED),
00679                  errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
00680                         phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
00681 
00682     nline = PageGetMaxOffsetNumber(page);
00683     if ((int) offnum <= 0 || (int) offnum > nline)
00684         elog(ERROR, "invalid index offnum: %u", offnum);
00685 
00686     /* change offset number to offset index */
00687     offidx = offnum - 1;
00688 
00689     tup = PageGetItemId(page, offnum);
00690     Assert(ItemIdHasStorage(tup));
00691     size = ItemIdGetLength(tup);
00692     offset = ItemIdGetOffset(tup);
00693 
00694     if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
00695         offset != MAXALIGN(offset) || size != MAXALIGN(size))
00696         ereport(ERROR,
00697                 (errcode(ERRCODE_DATA_CORRUPTED),
00698                  errmsg("corrupted item pointer: offset = %u, size = %u",
00699                         offset, (unsigned int) size)));
00700 
00701     /*
00702      * First, we want to get rid of the pd_linp entry for the index tuple. We
00703      * copy all subsequent linp's back one slot in the array. We don't use
00704      * PageGetItemId, because we are manipulating the _array_, not individual
00705      * linp's.
00706      */
00707     nbytes = phdr->pd_lower -
00708         ((char *) &phdr->pd_linp[offidx + 1] - (char *) phdr);
00709 
00710     if (nbytes > 0)
00711         memmove((char *) &(phdr->pd_linp[offidx]),
00712                 (char *) &(phdr->pd_linp[offidx + 1]),
00713                 nbytes);
00714 
00715     /*
00716      * Now move everything between the old upper bound (beginning of tuple
00717      * space) and the beginning of the deleted tuple forward, so that space in
00718      * the middle of the page is left free.  If we've just deleted the tuple
00719      * at the beginning of tuple space, then there's no need to do the copy
00720      * (and bcopy on some architectures SEGV's if asked to move zero bytes).
00721      */
00722 
00723     /* beginning of tuple space */
00724     addr = (char *) page + phdr->pd_upper;
00725 
00726     if (offset > phdr->pd_upper)
00727         memmove(addr + size, addr, (int) (offset - phdr->pd_upper));
00728 
00729     /* adjust free space boundary pointers */
00730     phdr->pd_upper += size;
00731     phdr->pd_lower -= sizeof(ItemIdData);
00732 
00733     /*
00734      * Finally, we need to adjust the linp entries that remain.
00735      *
00736      * Anything that used to be before the deleted tuple's data was moved
00737      * forward by the size of the deleted tuple.
00738      */
00739     if (!PageIsEmpty(page))
00740     {
00741         int         i;
00742 
00743         nline--;                /* there's one less than when we started */
00744         for (i = 1; i <= nline; i++)
00745         {
00746             ItemId      ii = PageGetItemId(phdr, i);
00747 
00748             Assert(ItemIdHasStorage(ii));
00749             if (ItemIdGetOffset(ii) <= offset)
00750                 ii->lp_off += size;
00751         }
00752     }
00753 }
00754 
00755 
00756 /*
00757  * PageIndexMultiDelete
00758  *
00759  * This routine handles the case of deleting multiple tuples from an
00760  * index page at once.  It is considerably faster than a loop around
00761  * PageIndexTupleDelete ... however, the caller *must* supply the array
00762  * of item numbers to be deleted in item number order!
00763  */
00764 void
00765 PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
00766 {
00767     PageHeader  phdr = (PageHeader) page;
00768     Offset      pd_lower = phdr->pd_lower;
00769     Offset      pd_upper = phdr->pd_upper;
00770     Offset      pd_special = phdr->pd_special;
00771     itemIdSort  itemidbase,
00772                 itemidptr;
00773     ItemId      lp;
00774     int         nline,
00775                 nused;
00776     int         i;
00777     Size        totallen;
00778     Offset      upper;
00779     Size        size;
00780     unsigned    offset;
00781     int         nextitm;
00782     OffsetNumber offnum;
00783 
00784     /*
00785      * If there aren't very many items to delete, then retail
00786      * PageIndexTupleDelete is the best way.  Delete the items in reverse
00787      * order so we don't have to think about adjusting item numbers for
00788      * previous deletions.
00789      *
00790      * TODO: tune the magic number here
00791      */
00792     if (nitems <= 2)
00793     {
00794         while (--nitems >= 0)
00795             PageIndexTupleDelete(page, itemnos[nitems]);
00796         return;
00797     }
00798 
00799     /*
00800      * As with PageRepairFragmentation, paranoia seems justified.
00801      */
00802     if (pd_lower < SizeOfPageHeaderData ||
00803         pd_lower > pd_upper ||
00804         pd_upper > pd_special ||
00805         pd_special > BLCKSZ ||
00806         pd_special != MAXALIGN(pd_special))
00807         ereport(ERROR,
00808                 (errcode(ERRCODE_DATA_CORRUPTED),
00809                  errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
00810                         pd_lower, pd_upper, pd_special)));
00811 
00812     /*
00813      * Scan the item pointer array and build a list of just the ones we are
00814      * going to keep.  Notice we do not modify the page yet, since we are
00815      * still validity-checking.
00816      */
00817     nline = PageGetMaxOffsetNumber(page);
00818     itemidbase = (itemIdSort) palloc(sizeof(itemIdSortData) * nline);
00819     itemidptr = itemidbase;
00820     totallen = 0;
00821     nused = 0;
00822     nextitm = 0;
00823     for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
00824     {
00825         lp = PageGetItemId(page, offnum);
00826         Assert(ItemIdHasStorage(lp));
00827         size = ItemIdGetLength(lp);
00828         offset = ItemIdGetOffset(lp);
00829         if (offset < pd_upper ||
00830             (offset + size) > pd_special ||
00831             offset != MAXALIGN(offset))
00832             ereport(ERROR,
00833                     (errcode(ERRCODE_DATA_CORRUPTED),
00834                      errmsg("corrupted item pointer: offset = %u, size = %u",
00835                             offset, (unsigned int) size)));
00836 
00837         if (nextitm < nitems && offnum == itemnos[nextitm])
00838         {
00839             /* skip item to be deleted */
00840             nextitm++;
00841         }
00842         else
00843         {
00844             itemidptr->offsetindex = nused;     /* where it will go */
00845             itemidptr->itemoff = offset;
00846             itemidptr->olditemid = *lp;
00847             itemidptr->alignedlen = MAXALIGN(size);
00848             totallen += itemidptr->alignedlen;
00849             itemidptr++;
00850             nused++;
00851         }
00852     }
00853 
00854     /* this will catch invalid or out-of-order itemnos[] */
00855     if (nextitm != nitems)
00856         elog(ERROR, "incorrect index offsets supplied");
00857 
00858     if (totallen > (Size) (pd_special - pd_lower))
00859         ereport(ERROR,
00860                 (errcode(ERRCODE_DATA_CORRUPTED),
00861                errmsg("corrupted item lengths: total %u, available space %u",
00862                       (unsigned int) totallen, pd_special - pd_lower)));
00863 
00864     /* sort itemIdSortData array into decreasing itemoff order */
00865     qsort((char *) itemidbase, nused, sizeof(itemIdSortData),
00866           itemoffcompare);
00867 
00868     /* compactify page and install new itemids */
00869     upper = pd_special;
00870 
00871     for (i = 0, itemidptr = itemidbase; i < nused; i++, itemidptr++)
00872     {
00873         lp = PageGetItemId(page, itemidptr->offsetindex + 1);
00874         upper -= itemidptr->alignedlen;
00875         memmove((char *) page + upper,
00876                 (char *) page + itemidptr->itemoff,
00877                 itemidptr->alignedlen);
00878         *lp = itemidptr->olditemid;
00879         lp->lp_off = upper;
00880     }
00881 
00882     phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData);
00883     phdr->pd_upper = upper;
00884 
00885     pfree(itemidbase);
00886 }
00887 
00888 /*
00889  * Set checksum for page in shared buffers.
00890  *
00891  * If checksums are disabled, or if the page is not initialized, just return
00892  * the input. Otherwise, we must make a copy of the page before calculating the
00893  * checksum, to prevent concurrent modifications (e.g. setting hint bits) from
00894  * making the final checksum invalid.
00895  *
00896  * Returns a pointer to the block-sized data that needs to be written. Uses
00897  * statically-allocated memory, so the caller must immediately write the
00898  * returned page and not refer to it again.
00899  */
00900 char *
00901 PageSetChecksumCopy(Page page, BlockNumber blkno)
00902 {
00903     if (PageIsNew(page) || !DataChecksumsEnabled())
00904         return (char *) page;
00905 
00906     /*
00907      * We make a copy iff we need to calculate a checksum because other
00908      * backends may set hint bits on this page while we write, which
00909      * would mean the checksum differs from the page contents. It doesn't
00910      * matter if we include or exclude hints during the copy, as long
00911      * as we write a valid page and associated checksum.
00912      */
00913     memcpy((char *) pageCopy, (char *) page, BLCKSZ);
00914     PageSetChecksumInplace(pageCopy, blkno);
00915     return (char *) pageCopy;
00916 }
00917 
00918 /*
00919  * Set checksum for page in private memory.
00920  *
00921  * This is a simpler version of PageSetChecksumCopy(). The more explicit API
00922  * allows us to more easily see if we're making the correct call and reduces
00923  * the amount of additional code specific to page verification.
00924  */
00925 void
00926 PageSetChecksumInplace(Page page, BlockNumber blkno)
00927 {
00928     if (PageIsNew(page))
00929         return;
00930 
00931     if (DataChecksumsEnabled())
00932     {
00933         PageHeader  p = (PageHeader) page;
00934         p->pd_checksum = PageCalcChecksum16(page, blkno);
00935     }
00936 
00937     return;
00938 }
00939 
00940 /*
00941  * Calculate checksum for a PostgreSQL Page. This includes the block number (to
00942  * detect the case when a page is somehow moved to a different location), the
00943  * page header (excluding the checksum itself), and the page data.
00944  *
00945  * Note that if the checksum validation fails we cannot tell the difference
00946  * between a transposed block and failure from direct on-block corruption,
00947  * though that is better than just ignoring transposed blocks altogether.
00948  */
00949 static uint16
00950 PageCalcChecksum16(Page page, BlockNumber blkno)
00951 {
00952     PageHeader  phdr   = (PageHeader) page;
00953     uint16      save_checksum;
00954     uint32      checksum;
00955 
00956     /* only calculate the checksum for properly-initialized pages */
00957     Assert(!PageIsNew(page));
00958 
00959     /*
00960      * Save pd_checksum and set it to zero, so that the checksum calculation
00961      * isn't affected by the checksum stored on the page. We do this to
00962      * allow optimization of the checksum calculation on the whole block
00963      * in one go.
00964      */
00965     save_checksum = phdr->pd_checksum;
00966     phdr->pd_checksum = 0;
00967     checksum = checksum_block(page, BLCKSZ);
00968     phdr->pd_checksum = save_checksum;
00969 
00970     /* mix in the block number to detect transposed pages */
00971     checksum ^= blkno;
00972 
00973     /*
00974      * Reduce to a uint16 (to fit in the pd_checksum field) with an offset of
00975      * one. That avoids checksums of zero, which seems like a good idea.
00976      */
00977     return (checksum % 65535) + 1;
00978 }