Header And Logo

PostgreSQL
| The world's most advanced open source database.

spgvacuum.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * spgvacuum.c
00004  *    vacuum for SP-GiST
00005  *
00006  *
00007  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00008  * Portions Copyright (c) 1994, Regents of the University of California
00009  *
00010  * IDENTIFICATION
00011  *          src/backend/access/spgist/spgvacuum.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 
00016 #include "postgres.h"
00017 
00018 #include "access/genam.h"
00019 #include "access/spgist_private.h"
00020 #include "access/transam.h"
00021 #include "catalog/storage_xlog.h"
00022 #include "commands/vacuum.h"
00023 #include "miscadmin.h"
00024 #include "storage/bufmgr.h"
00025 #include "storage/indexfsm.h"
00026 #include "storage/lmgr.h"
00027 #include "utils/snapmgr.h"
00028 
00029 
00030 /* Entry in pending-list of TIDs we need to revisit */
00031 typedef struct spgVacPendingItem
00032 {
00033     ItemPointerData tid;        /* redirection target to visit */
00034     bool        done;           /* have we dealt with this? */
00035     struct spgVacPendingItem *next;     /* list link */
00036 } spgVacPendingItem;
00037 
00038 /* Local state for vacuum operations */
00039 typedef struct spgBulkDeleteState
00040 {
00041     /* Parameters passed in to spgvacuumscan */
00042     IndexVacuumInfo *info;
00043     IndexBulkDeleteResult *stats;
00044     IndexBulkDeleteCallback callback;
00045     void       *callback_state;
00046 
00047     /* Additional working state */
00048     SpGistState spgstate;       /* for SPGiST operations that need one */
00049     spgVacPendingItem *pendingList;     /* TIDs we need to (re)visit */
00050     TransactionId myXmin;       /* for detecting newly-added redirects */
00051     BlockNumber lastFilledBlock;    /* last non-deletable block */
00052 } spgBulkDeleteState;
00053 
00054 
00055 /*
00056  * Add TID to pendingList, but only if not already present.
00057  *
00058  * Note that new items are always appended at the end of the list; this
00059  * ensures that scans of the list don't miss items added during the scan.
00060  */
00061 static void
00062 spgAddPendingTID(spgBulkDeleteState *bds, ItemPointer tid)
00063 {
00064     spgVacPendingItem *pitem;
00065     spgVacPendingItem **listLink;
00066 
00067     /* search the list for pre-existing entry */
00068     listLink = &bds->pendingList;
00069     while (*listLink != NULL)
00070     {
00071         pitem = *listLink;
00072         if (ItemPointerEquals(tid, &pitem->tid))
00073             return;             /* already in list, do nothing */
00074         listLink = &pitem->next;
00075     }
00076     /* not there, so append new entry */
00077     pitem = (spgVacPendingItem *) palloc(sizeof(spgVacPendingItem));
00078     pitem->tid = *tid;
00079     pitem->done = false;
00080     pitem->next = NULL;
00081     *listLink = pitem;
00082 }
00083 
00084 /*
00085  * Clear pendingList
00086  */
00087 static void
00088 spgClearPendingList(spgBulkDeleteState *bds)
00089 {
00090     spgVacPendingItem *pitem;
00091     spgVacPendingItem *nitem;
00092 
00093     for (pitem = bds->pendingList; pitem != NULL; pitem = nitem)
00094     {
00095         nitem = pitem->next;
00096         /* All items in list should have been dealt with */
00097         Assert(pitem->done);
00098         pfree(pitem);
00099     }
00100     bds->pendingList = NULL;
00101 }
00102 
00103 /*
00104  * Vacuum a regular (non-root) leaf page
00105  *
00106  * We must delete tuples that are targeted for deletion by the VACUUM,
00107  * but not move any tuples that are referenced by outside links; we assume
00108  * those are the ones that are heads of chains.
00109  *
00110  * If we find a REDIRECT that was made by a concurrently-running transaction,
00111  * we must add its target TID to pendingList.  (We don't try to visit the
00112  * target immediately, first because we don't want VACUUM locking more than
00113  * one buffer at a time, and second because the duplicate-filtering logic
00114  * in spgAddPendingTID is useful to ensure we can't get caught in an infinite
00115  * loop in the face of continuous concurrent insertions.)
00116  *
00117  * If forPending is true, we are examining the page as a consequence of
00118  * chasing a redirect link, not as part of the normal sequential scan.
00119  * We still vacuum the page normally, but we don't increment the stats
00120  * about live tuples; else we'd double-count those tuples, since the page
00121  * has been or will be visited in the sequential scan as well.
00122  */
00123 static void
00124 vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
00125                bool forPending)
00126 {
00127     Page        page = BufferGetPage(buffer);
00128     spgxlogVacuumLeaf xlrec;
00129     XLogRecData rdata[8];
00130     OffsetNumber toDead[MaxIndexTuplesPerPage];
00131     OffsetNumber toPlaceholder[MaxIndexTuplesPerPage];
00132     OffsetNumber moveSrc[MaxIndexTuplesPerPage];
00133     OffsetNumber moveDest[MaxIndexTuplesPerPage];
00134     OffsetNumber chainSrc[MaxIndexTuplesPerPage];
00135     OffsetNumber chainDest[MaxIndexTuplesPerPage];
00136     OffsetNumber predecessor[MaxIndexTuplesPerPage + 1];
00137     bool        deletable[MaxIndexTuplesPerPage + 1];
00138     int         nDeletable;
00139     OffsetNumber i,
00140                 max = PageGetMaxOffsetNumber(page);
00141 
00142     memset(predecessor, 0, sizeof(predecessor));
00143     memset(deletable, 0, sizeof(deletable));
00144     nDeletable = 0;
00145 
00146     /* Scan page, identify tuples to delete, accumulate stats */
00147     for (i = FirstOffsetNumber; i <= max; i++)
00148     {
00149         SpGistLeafTuple lt;
00150 
00151         lt = (SpGistLeafTuple) PageGetItem(page,
00152                                            PageGetItemId(page, i));
00153         if (lt->tupstate == SPGIST_LIVE)
00154         {
00155             Assert(ItemPointerIsValid(&lt->heapPtr));
00156 
00157             if (bds->callback(&lt->heapPtr, bds->callback_state))
00158             {
00159                 bds->stats->tuples_removed += 1;
00160                 deletable[i] = true;
00161                 nDeletable++;
00162             }
00163             else
00164             {
00165                 if (!forPending)
00166                     bds->stats->num_index_tuples += 1;
00167             }
00168 
00169             /* Form predecessor map, too */
00170             if (lt->nextOffset != InvalidOffsetNumber)
00171             {
00172                 /* paranoia about corrupted chain links */
00173                 if (lt->nextOffset < FirstOffsetNumber ||
00174                     lt->nextOffset > max ||
00175                     predecessor[lt->nextOffset] != InvalidOffsetNumber)
00176                     elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"",
00177                          BufferGetBlockNumber(buffer),
00178                          RelationGetRelationName(index));
00179                 predecessor[lt->nextOffset] = i;
00180             }
00181         }
00182         else if (lt->tupstate == SPGIST_REDIRECT)
00183         {
00184             SpGistDeadTuple dt = (SpGistDeadTuple) lt;
00185 
00186             Assert(dt->nextOffset == InvalidOffsetNumber);
00187             Assert(ItemPointerIsValid(&dt->pointer));
00188 
00189             /*
00190              * Add target TID to pending list if the redirection could have
00191              * happened since VACUUM started.
00192              *
00193              * Note: we could make a tighter test by seeing if the xid is
00194              * "running" according to the active snapshot; but tqual.c doesn't
00195              * currently export a suitable API, and it's not entirely clear
00196              * that a tighter test is worth the cycles anyway.
00197              */
00198             if (TransactionIdFollowsOrEquals(dt->xid, bds->myXmin))
00199                 spgAddPendingTID(bds, &dt->pointer);
00200         }
00201         else
00202         {
00203             Assert(lt->nextOffset == InvalidOffsetNumber);
00204         }
00205     }
00206 
00207     if (nDeletable == 0)
00208         return;                 /* nothing more to do */
00209 
00210     /*----------
00211      * Figure out exactly what we have to do.  We do this separately from
00212      * actually modifying the page, mainly so that we have a representation
00213      * that can be dumped into WAL and then the replay code can do exactly
00214      * the same thing.  The output of this step consists of six arrays
00215      * describing four kinds of operations, to be performed in this order:
00216      *
00217      * toDead[]: tuple numbers to be replaced with DEAD tuples
00218      * toPlaceholder[]: tuple numbers to be replaced with PLACEHOLDER tuples
00219      * moveSrc[]: tuple numbers that need to be relocated to another offset
00220      * (replacing the tuple there) and then replaced with PLACEHOLDER tuples
00221      * moveDest[]: new locations for moveSrc tuples
00222      * chainSrc[]: tuple numbers whose chain links (nextOffset) need updates
00223      * chainDest[]: new values of nextOffset for chainSrc members
00224      *
00225      * It's easiest to figure out what we have to do by processing tuple
00226      * chains, so we iterate over all the tuples (not just the deletable
00227      * ones!) to identify chain heads, then chase down each chain and make
00228      * work item entries for deletable tuples within the chain.
00229      *----------
00230      */
00231     xlrec.nDead = xlrec.nPlaceholder = xlrec.nMove = xlrec.nChain = 0;
00232 
00233     for (i = FirstOffsetNumber; i <= max; i++)
00234     {
00235         SpGistLeafTuple head;
00236         bool        interveningDeletable;
00237         OffsetNumber prevLive;
00238         OffsetNumber j;
00239 
00240         head = (SpGistLeafTuple) PageGetItem(page,
00241                                              PageGetItemId(page, i));
00242         if (head->tupstate != SPGIST_LIVE)
00243             continue;           /* can't be a chain member */
00244         if (predecessor[i] != 0)
00245             continue;           /* not a chain head */
00246 
00247         /* initialize ... */
00248         interveningDeletable = false;
00249         prevLive = deletable[i] ? InvalidOffsetNumber : i;
00250 
00251         /* scan down the chain ... */
00252         j = head->nextOffset;
00253         while (j != InvalidOffsetNumber)
00254         {
00255             SpGistLeafTuple lt;
00256 
00257             lt = (SpGistLeafTuple) PageGetItem(page,
00258                                                PageGetItemId(page, j));
00259             if (lt->tupstate != SPGIST_LIVE)
00260             {
00261                 /* all tuples in chain should be live */
00262                 elog(ERROR, "unexpected SPGiST tuple state: %d",
00263                      lt->tupstate);
00264             }
00265 
00266             if (deletable[j])
00267             {
00268                 /* This tuple should be replaced by a placeholder */
00269                 toPlaceholder[xlrec.nPlaceholder] = j;
00270                 xlrec.nPlaceholder++;
00271                 /* previous live tuple's chain link will need an update */
00272                 interveningDeletable = true;
00273             }
00274             else if (prevLive == InvalidOffsetNumber)
00275             {
00276                 /*
00277                  * This is the first live tuple in the chain.  It has to move
00278                  * to the head position.
00279                  */
00280                 moveSrc[xlrec.nMove] = j;
00281                 moveDest[xlrec.nMove] = i;
00282                 xlrec.nMove++;
00283                 /* Chain updates will be applied after the move */
00284                 prevLive = i;
00285                 interveningDeletable = false;
00286             }
00287             else
00288             {
00289                 /*
00290                  * Second or later live tuple.  Arrange to re-chain it to the
00291                  * previous live one, if there was a gap.
00292                  */
00293                 if (interveningDeletable)
00294                 {
00295                     chainSrc[xlrec.nChain] = prevLive;
00296                     chainDest[xlrec.nChain] = j;
00297                     xlrec.nChain++;
00298                 }
00299                 prevLive = j;
00300                 interveningDeletable = false;
00301             }
00302 
00303             j = lt->nextOffset;
00304         }
00305 
00306         if (prevLive == InvalidOffsetNumber)
00307         {
00308             /* The chain is entirely removable, so we need a DEAD tuple */
00309             toDead[xlrec.nDead] = i;
00310             xlrec.nDead++;
00311         }
00312         else if (interveningDeletable)
00313         {
00314             /* One or more deletions at end of chain, so close it off */
00315             chainSrc[xlrec.nChain] = prevLive;
00316             chainDest[xlrec.nChain] = InvalidOffsetNumber;
00317             xlrec.nChain++;
00318         }
00319     }
00320 
00321     /* sanity check ... */
00322     if (nDeletable != xlrec.nDead + xlrec.nPlaceholder + xlrec.nMove)
00323         elog(ERROR, "inconsistent counts of deletable tuples");
00324 
00325     /* Prepare WAL record */
00326     xlrec.node = index->rd_node;
00327     xlrec.blkno = BufferGetBlockNumber(buffer);
00328     STORE_STATE(&bds->spgstate, xlrec.stateSrc);
00329 
00330     ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
00331     /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
00332     ACCEPT_RDATA_DATA(toDead, sizeof(OffsetNumber) * xlrec.nDead, 1);
00333     ACCEPT_RDATA_DATA(toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder, 2);
00334     ACCEPT_RDATA_DATA(moveSrc, sizeof(OffsetNumber) * xlrec.nMove, 3);
00335     ACCEPT_RDATA_DATA(moveDest, sizeof(OffsetNumber) * xlrec.nMove, 4);
00336     ACCEPT_RDATA_DATA(chainSrc, sizeof(OffsetNumber) * xlrec.nChain, 5);
00337     ACCEPT_RDATA_DATA(chainDest, sizeof(OffsetNumber) * xlrec.nChain, 6);
00338     ACCEPT_RDATA_BUFFER(buffer, 7);
00339 
00340     /* Do the updates */
00341     START_CRIT_SECTION();
00342 
00343     spgPageIndexMultiDelete(&bds->spgstate, page,
00344                             toDead, xlrec.nDead,
00345                             SPGIST_DEAD, SPGIST_DEAD,
00346                             InvalidBlockNumber, InvalidOffsetNumber);
00347 
00348     spgPageIndexMultiDelete(&bds->spgstate, page,
00349                             toPlaceholder, xlrec.nPlaceholder,
00350                             SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
00351                             InvalidBlockNumber, InvalidOffsetNumber);
00352 
00353     /*
00354      * We implement the move step by swapping the item pointers of the source
00355      * and target tuples, then replacing the newly-source tuples with
00356      * placeholders.  This is perhaps unduly friendly with the page data
00357      * representation, but it's fast and doesn't risk page overflow when a
00358      * tuple to be relocated is large.
00359      */
00360     for (i = 0; i < xlrec.nMove; i++)
00361     {
00362         ItemId      idSrc = PageGetItemId(page, moveSrc[i]);
00363         ItemId      idDest = PageGetItemId(page, moveDest[i]);
00364         ItemIdData  tmp;
00365 
00366         tmp = *idSrc;
00367         *idSrc = *idDest;
00368         *idDest = tmp;
00369     }
00370 
00371     spgPageIndexMultiDelete(&bds->spgstate, page,
00372                             moveSrc, xlrec.nMove,
00373                             SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
00374                             InvalidBlockNumber, InvalidOffsetNumber);
00375 
00376     for (i = 0; i < xlrec.nChain; i++)
00377     {
00378         SpGistLeafTuple lt;
00379 
00380         lt = (SpGistLeafTuple) PageGetItem(page,
00381                                            PageGetItemId(page, chainSrc[i]));
00382         Assert(lt->tupstate == SPGIST_LIVE);
00383         lt->nextOffset = chainDest[i];
00384     }
00385 
00386     MarkBufferDirty(buffer);
00387 
00388     if (RelationNeedsWAL(index))
00389     {
00390         XLogRecPtr  recptr;
00391 
00392         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF, rdata);
00393 
00394         PageSetLSN(page, recptr);
00395     }
00396 
00397     END_CRIT_SECTION();
00398 }
00399 
00400 /*
00401  * Vacuum a root page when it is also a leaf
00402  *
00403  * On the root, we just delete any dead leaf tuples; no fancy business
00404  */
00405 static void
00406 vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer)
00407 {
00408     Page        page = BufferGetPage(buffer);
00409     spgxlogVacuumRoot xlrec;
00410     XLogRecData rdata[3];
00411     OffsetNumber toDelete[MaxIndexTuplesPerPage];
00412     OffsetNumber i,
00413                 max = PageGetMaxOffsetNumber(page);
00414 
00415     xlrec.blkno = BufferGetBlockNumber(buffer);
00416     xlrec.nDelete = 0;
00417 
00418     /* Scan page, identify tuples to delete, accumulate stats */
00419     for (i = FirstOffsetNumber; i <= max; i++)
00420     {
00421         SpGistLeafTuple lt;
00422 
00423         lt = (SpGistLeafTuple) PageGetItem(page,
00424                                            PageGetItemId(page, i));
00425         if (lt->tupstate == SPGIST_LIVE)
00426         {
00427             Assert(ItemPointerIsValid(&lt->heapPtr));
00428 
00429             if (bds->callback(&lt->heapPtr, bds->callback_state))
00430             {
00431                 bds->stats->tuples_removed += 1;
00432                 toDelete[xlrec.nDelete] = i;
00433                 xlrec.nDelete++;
00434             }
00435             else
00436             {
00437                 bds->stats->num_index_tuples += 1;
00438             }
00439         }
00440         else
00441         {
00442             /* all tuples on root should be live */
00443             elog(ERROR, "unexpected SPGiST tuple state: %d",
00444                  lt->tupstate);
00445         }
00446     }
00447 
00448     if (xlrec.nDelete == 0)
00449         return;                 /* nothing more to do */
00450 
00451     /* Prepare WAL record */
00452     xlrec.node = index->rd_node;
00453     STORE_STATE(&bds->spgstate, xlrec.stateSrc);
00454 
00455     ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
00456     /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
00457     ACCEPT_RDATA_DATA(toDelete, sizeof(OffsetNumber) * xlrec.nDelete, 1);
00458     ACCEPT_RDATA_BUFFER(buffer, 2);
00459 
00460     /* Do the update */
00461     START_CRIT_SECTION();
00462 
00463     /* The tuple numbers are in order, so we can use PageIndexMultiDelete */
00464     PageIndexMultiDelete(page, toDelete, xlrec.nDelete);
00465 
00466     MarkBufferDirty(buffer);
00467 
00468     if (RelationNeedsWAL(index))
00469     {
00470         XLogRecPtr  recptr;
00471 
00472         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT, rdata);
00473 
00474         PageSetLSN(page, recptr);
00475     }
00476 
00477     END_CRIT_SECTION();
00478 }
00479 
00480 /*
00481  * Clean up redirect and placeholder tuples on the given page
00482  *
00483  * Redirect tuples can be marked placeholder once they're old enough.
00484  * Placeholder tuples can be removed if it won't change the offsets of
00485  * non-placeholder ones.
00486  *
00487  * Unlike the routines above, this works on both leaf and inner pages.
00488  */
00489 static void
00490 vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
00491 {
00492     Page        page = BufferGetPage(buffer);
00493     SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
00494     OffsetNumber i,
00495                 max = PageGetMaxOffsetNumber(page),
00496                 firstPlaceholder = InvalidOffsetNumber;
00497     bool        hasNonPlaceholder = false;
00498     bool        hasUpdate = false;
00499     OffsetNumber itemToPlaceholder[MaxIndexTuplesPerPage];
00500     OffsetNumber itemnos[MaxIndexTuplesPerPage];
00501     spgxlogVacuumRedirect xlrec;
00502     XLogRecData rdata[3];
00503 
00504     xlrec.node = index->rd_node;
00505     xlrec.blkno = BufferGetBlockNumber(buffer);
00506     xlrec.nToPlaceholder = 0;
00507     xlrec.newestRedirectXid = InvalidTransactionId;
00508 
00509     START_CRIT_SECTION();
00510 
00511     /*
00512      * Scan backwards to convert old redirection tuples to placeholder tuples,
00513      * and identify location of last non-placeholder tuple while at it.
00514      */
00515     for (i = max;
00516          i >= FirstOffsetNumber &&
00517          (opaque->nRedirection > 0 || !hasNonPlaceholder);
00518          i--)
00519     {
00520         SpGistDeadTuple dt;
00521 
00522         dt = (SpGistDeadTuple) PageGetItem(page, PageGetItemId(page, i));
00523 
00524         if (dt->tupstate == SPGIST_REDIRECT &&
00525             TransactionIdPrecedes(dt->xid, RecentGlobalXmin))
00526         {
00527             dt->tupstate = SPGIST_PLACEHOLDER;
00528             Assert(opaque->nRedirection > 0);
00529             opaque->nRedirection--;
00530             opaque->nPlaceholder++;
00531 
00532             /* remember newest XID among the removed redirects */
00533             if (!TransactionIdIsValid(xlrec.newestRedirectXid) ||
00534                 TransactionIdPrecedes(xlrec.newestRedirectXid, dt->xid))
00535                 xlrec.newestRedirectXid = dt->xid;
00536 
00537             ItemPointerSetInvalid(&dt->pointer);
00538 
00539             itemToPlaceholder[xlrec.nToPlaceholder] = i;
00540             xlrec.nToPlaceholder++;
00541 
00542             hasUpdate = true;
00543         }
00544 
00545         if (dt->tupstate == SPGIST_PLACEHOLDER)
00546         {
00547             if (!hasNonPlaceholder)
00548                 firstPlaceholder = i;
00549         }
00550         else
00551         {
00552             hasNonPlaceholder = true;
00553         }
00554     }
00555 
00556     /*
00557      * Any placeholder tuples at the end of page can safely be removed.  We
00558      * can't remove ones before the last non-placeholder, though, because we
00559      * can't alter the offset numbers of non-placeholder tuples.
00560      */
00561     if (firstPlaceholder != InvalidOffsetNumber)
00562     {
00563         /*
00564          * We do not store this array to rdata because it's easy to recreate.
00565          */
00566         for (i = firstPlaceholder; i <= max; i++)
00567             itemnos[i - firstPlaceholder] = i;
00568 
00569         i = max - firstPlaceholder + 1;
00570         Assert(opaque->nPlaceholder >= i);
00571         opaque->nPlaceholder -= i;
00572 
00573         /* The array is surely sorted, so can use PageIndexMultiDelete */
00574         PageIndexMultiDelete(page, itemnos, i);
00575 
00576         hasUpdate = true;
00577     }
00578 
00579     xlrec.firstPlaceholder = firstPlaceholder;
00580 
00581     if (hasUpdate)
00582         MarkBufferDirty(buffer);
00583 
00584     if (hasUpdate && RelationNeedsWAL(index))
00585     {
00586         XLogRecPtr  recptr;
00587 
00588         ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
00589         ACCEPT_RDATA_DATA(itemToPlaceholder, sizeof(OffsetNumber) * xlrec.nToPlaceholder, 1);
00590         ACCEPT_RDATA_BUFFER(buffer, 2);
00591 
00592         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT, rdata);
00593 
00594         PageSetLSN(page, recptr);
00595     }
00596 
00597     END_CRIT_SECTION();
00598 }
00599 
00600 /*
00601  * Process one page during a bulkdelete scan
00602  */
00603 static void
00604 spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno)
00605 {
00606     Relation    index = bds->info->index;
00607     Buffer      buffer;
00608     Page        page;
00609 
00610     /* call vacuum_delay_point while not holding any buffer lock */
00611     vacuum_delay_point();
00612 
00613     buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
00614                                 RBM_NORMAL, bds->info->strategy);
00615     LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
00616     page = (Page) BufferGetPage(buffer);
00617 
00618     if (PageIsNew(page))
00619     {
00620         /*
00621          * We found an all-zero page, which could happen if the database
00622          * crashed just after extending the file.  Initialize and recycle it.
00623          */
00624         SpGistInitBuffer(buffer, 0);
00625         SpGistPageSetDeleted(page);
00626         /* We don't bother to WAL-log this action; easy to redo */
00627         MarkBufferDirty(buffer);
00628     }
00629     else if (SpGistPageIsDeleted(page))
00630     {
00631         /* nothing to do */
00632     }
00633     else if (SpGistPageIsLeaf(page))
00634     {
00635         if (SpGistBlockIsRoot(blkno))
00636         {
00637             vacuumLeafRoot(bds, index, buffer);
00638             /* no need for vacuumRedirectAndPlaceholder */
00639         }
00640         else
00641         {
00642             vacuumLeafPage(bds, index, buffer, false);
00643             vacuumRedirectAndPlaceholder(index, buffer);
00644         }
00645     }
00646     else
00647     {
00648         /* inner page */
00649         vacuumRedirectAndPlaceholder(index, buffer);
00650     }
00651 
00652     /*
00653      * The root pages must never be deleted, nor marked as available in FSM,
00654      * because we don't want them ever returned by a search for a place to put
00655      * a new tuple.  Otherwise, check for empty/deletable page, and make sure
00656      * FSM knows about it.
00657      */
00658     if (!SpGistBlockIsRoot(blkno))
00659     {
00660         /* If page is now empty, mark it deleted */
00661         if (PageIsEmpty(page) && !SpGistPageIsDeleted(page))
00662         {
00663             SpGistPageSetDeleted(page);
00664             /* We don't bother to WAL-log this action; easy to redo */
00665             MarkBufferDirty(buffer);
00666         }
00667 
00668         if (SpGistPageIsDeleted(page))
00669         {
00670             RecordFreeIndexPage(index, blkno);
00671             bds->stats->pages_deleted++;
00672         }
00673         else
00674             bds->lastFilledBlock = blkno;
00675     }
00676 
00677     SpGistSetLastUsedPage(index, buffer);
00678 
00679     UnlockReleaseBuffer(buffer);
00680 }
00681 
00682 /*
00683  * Process the pending-TID list between pages of the main scan
00684  */
00685 static void
00686 spgprocesspending(spgBulkDeleteState *bds)
00687 {
00688     Relation    index = bds->info->index;
00689     spgVacPendingItem *pitem;
00690     spgVacPendingItem *nitem;
00691     BlockNumber blkno;
00692     Buffer      buffer;
00693     Page        page;
00694 
00695     for (pitem = bds->pendingList; pitem != NULL; pitem = pitem->next)
00696     {
00697         if (pitem->done)
00698             continue;           /* ignore already-done items */
00699 
00700         /* call vacuum_delay_point while not holding any buffer lock */
00701         vacuum_delay_point();
00702 
00703         /* examine the referenced page */
00704         blkno = ItemPointerGetBlockNumber(&pitem->tid);
00705         buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
00706                                     RBM_NORMAL, bds->info->strategy);
00707         LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
00708         page = (Page) BufferGetPage(buffer);
00709 
00710         if (PageIsNew(page) || SpGistPageIsDeleted(page))
00711         {
00712             /* Probably shouldn't happen, but ignore it */
00713         }
00714         else if (SpGistPageIsLeaf(page))
00715         {
00716             if (SpGistBlockIsRoot(blkno))
00717             {
00718                 /* this should definitely not happen */
00719                 elog(ERROR, "redirection leads to root page of index \"%s\"",
00720                      RelationGetRelationName(index));
00721             }
00722 
00723             /* deal with any deletable tuples */
00724             vacuumLeafPage(bds, index, buffer, true);
00725             /* might as well do this while we are here */
00726             vacuumRedirectAndPlaceholder(index, buffer);
00727 
00728             SpGistSetLastUsedPage(index, buffer);
00729 
00730             /*
00731              * We can mark as done not only this item, but any later ones
00732              * pointing at the same page, since we vacuumed the whole page.
00733              */
00734             pitem->done = true;
00735             for (nitem = pitem->next; nitem != NULL; nitem = nitem->next)
00736             {
00737                 if (ItemPointerGetBlockNumber(&nitem->tid) == blkno)
00738                     nitem->done = true;
00739             }
00740         }
00741         else
00742         {
00743             /*
00744              * On an inner page, visit the referenced inner tuple and add all
00745              * its downlinks to the pending list.  We might have pending items
00746              * for more than one inner tuple on the same page (in fact this is
00747              * pretty likely given the way space allocation works), so get
00748              * them all while we are here.
00749              */
00750             for (nitem = pitem; nitem != NULL; nitem = nitem->next)
00751             {
00752                 if (nitem->done)
00753                     continue;
00754                 if (ItemPointerGetBlockNumber(&nitem->tid) == blkno)
00755                 {
00756                     OffsetNumber offset;
00757                     SpGistInnerTuple innerTuple;
00758 
00759                     offset = ItemPointerGetOffsetNumber(&nitem->tid);
00760                     innerTuple = (SpGistInnerTuple) PageGetItem(page,
00761                                                 PageGetItemId(page, offset));
00762                     if (innerTuple->tupstate == SPGIST_LIVE)
00763                     {
00764                         SpGistNodeTuple node;
00765                         int         i;
00766 
00767                         SGITITERATE(innerTuple, i, node)
00768                         {
00769                             if (ItemPointerIsValid(&node->t_tid))
00770                                 spgAddPendingTID(bds, &node->t_tid);
00771                         }
00772                     }
00773                     else if (innerTuple->tupstate == SPGIST_REDIRECT)
00774                     {
00775                         /* transfer attention to redirect point */
00776                         spgAddPendingTID(bds,
00777                                    &((SpGistDeadTuple) innerTuple)->pointer);
00778                     }
00779                     else
00780                         elog(ERROR, "unexpected SPGiST tuple state: %d",
00781                              innerTuple->tupstate);
00782 
00783                     nitem->done = true;
00784                 }
00785             }
00786         }
00787 
00788         UnlockReleaseBuffer(buffer);
00789     }
00790 
00791     spgClearPendingList(bds);
00792 }
00793 
00794 /*
00795  * Perform a bulkdelete scan
00796  */
00797 static void
00798 spgvacuumscan(spgBulkDeleteState *bds)
00799 {
00800     Relation    index = bds->info->index;
00801     bool        needLock;
00802     BlockNumber num_pages,
00803                 blkno;
00804 
00805     /* Finish setting up spgBulkDeleteState */
00806     initSpGistState(&bds->spgstate, index);
00807     bds->pendingList = NULL;
00808     bds->myXmin = GetActiveSnapshot()->xmin;
00809     bds->lastFilledBlock = SPGIST_LAST_FIXED_BLKNO;
00810 
00811     /*
00812      * Reset counts that will be incremented during the scan; needed in case
00813      * of multiple scans during a single VACUUM command
00814      */
00815     bds->stats->estimated_count = false;
00816     bds->stats->num_index_tuples = 0;
00817     bds->stats->pages_deleted = 0;
00818 
00819     /* We can skip locking for new or temp relations */
00820     needLock = !RELATION_IS_LOCAL(index);
00821 
00822     /*
00823      * The outer loop iterates over all index pages except the metapage, in
00824      * physical order (we hope the kernel will cooperate in providing
00825      * read-ahead for speed).  It is critical that we visit all leaf pages,
00826      * including ones added after we start the scan, else we might fail to
00827      * delete some deletable tuples.  See more extensive comments about this
00828      * in btvacuumscan().
00829      */
00830     blkno = SPGIST_METAPAGE_BLKNO + 1;
00831     for (;;)
00832     {
00833         /* Get the current relation length */
00834         if (needLock)
00835             LockRelationForExtension(index, ExclusiveLock);
00836         num_pages = RelationGetNumberOfBlocks(index);
00837         if (needLock)
00838             UnlockRelationForExtension(index, ExclusiveLock);
00839 
00840         /* Quit if we've scanned the whole relation */
00841         if (blkno >= num_pages)
00842             break;
00843         /* Iterate over pages, then loop back to recheck length */
00844         for (; blkno < num_pages; blkno++)
00845         {
00846             spgvacuumpage(bds, blkno);
00847             /* empty the pending-list after each page */
00848             if (bds->pendingList != NULL)
00849                 spgprocesspending(bds);
00850         }
00851     }
00852 
00853     /* Propagate local lastUsedPage cache to metablock */
00854     SpGistUpdateMetaPage(index);
00855 
00856     /*
00857      * Truncate index if possible
00858      *
00859      * XXX disabled because it's unsafe due to possible concurrent inserts.
00860      * We'd have to rescan the pages to make sure they're still empty, and it
00861      * doesn't seem worth it.  Note that btree doesn't do this either.
00862      *
00863      * Another reason not to truncate is that it could invalidate the cached
00864      * pages-with-freespace pointers in the metapage and other backends'
00865      * relation caches, that is leave them pointing to nonexistent pages.
00866      * Adding RelationGetNumberOfBlocks calls to protect the places that use
00867      * those pointers would be unduly expensive.
00868      */
00869 #ifdef NOT_USED
00870     if (num_pages > bds->lastFilledBlock + 1)
00871     {
00872         BlockNumber lastBlock = num_pages - 1;
00873 
00874         num_pages = bds->lastFilledBlock + 1;
00875         RelationTruncate(index, num_pages);
00876         bds->stats->pages_removed += lastBlock - bds->lastFilledBlock;
00877         bds->stats->pages_deleted -= lastBlock - bds->lastFilledBlock;
00878     }
00879 #endif
00880 
00881     /* Report final stats */
00882     bds->stats->num_pages = num_pages;
00883     bds->stats->pages_free = bds->stats->pages_deleted;
00884 }
00885 
00886 /*
00887  * Bulk deletion of all index entries pointing to a set of heap tuples.
00888  * The set of target tuples is specified via a callback routine that tells
00889  * whether any given heap tuple (identified by ItemPointer) is being deleted.
00890  *
00891  * Result: a palloc'd struct containing statistical info for VACUUM displays.
00892  */
00893 Datum
00894 spgbulkdelete(PG_FUNCTION_ARGS)
00895 {
00896     IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
00897     IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
00898     IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
00899     void       *callback_state = (void *) PG_GETARG_POINTER(3);
00900     spgBulkDeleteState bds;
00901 
00902     /* allocate stats if first time through, else re-use existing struct */
00903     if (stats == NULL)
00904         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
00905     bds.info = info;
00906     bds.stats = stats;
00907     bds.callback = callback;
00908     bds.callback_state = callback_state;
00909 
00910     spgvacuumscan(&bds);
00911 
00912     PG_RETURN_POINTER(stats);
00913 }
00914 
00915 /* Dummy callback to delete no tuples during spgvacuumcleanup */
00916 static bool
00917 dummy_callback(ItemPointer itemptr, void *state)
00918 {
00919     return false;
00920 }
00921 
00922 /*
00923  * Post-VACUUM cleanup.
00924  *
00925  * Result: a palloc'd struct containing statistical info for VACUUM displays.
00926  */
00927 Datum
00928 spgvacuumcleanup(PG_FUNCTION_ARGS)
00929 {
00930     IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
00931     IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
00932     Relation    index = info->index;
00933     spgBulkDeleteState bds;
00934 
00935     /* No-op in ANALYZE ONLY mode */
00936     if (info->analyze_only)
00937         PG_RETURN_POINTER(stats);
00938 
00939     /*
00940      * We don't need to scan the index if there was a preceding bulkdelete
00941      * pass.  Otherwise, make a pass that won't delete any live tuples, but
00942      * might still accomplish useful stuff with redirect/placeholder cleanup,
00943      * and in any case will provide stats.
00944      */
00945     if (stats == NULL)
00946     {
00947         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
00948         bds.info = info;
00949         bds.stats = stats;
00950         bds.callback = dummy_callback;
00951         bds.callback_state = NULL;
00952 
00953         spgvacuumscan(&bds);
00954     }
00955 
00956     /* Finally, vacuum the FSM */
00957     IndexFreeSpaceMapVacuum(index);
00958 
00959     /*
00960      * It's quite possible for us to be fooled by concurrent tuple moves into
00961      * double-counting some index tuples, so disbelieve any total that exceeds
00962      * the underlying heap's count ... if we know that accurately.  Otherwise
00963      * this might just make matters worse.
00964      */
00965     if (!info->estimated_count)
00966     {
00967         if (stats->num_index_tuples > info->num_heap_tuples)
00968             stats->num_index_tuples = info->num_heap_tuples;
00969     }
00970 
00971     PG_RETURN_POINTER(stats);
00972 }