Header And Logo

PostgreSQL
| The world's most advanced open source database.

nbtree.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * nbtree.c
00004  *    Implementation of Lehman and Yao's btree management algorithm for
00005  *    Postgres.
00006  *
00007  * NOTES
00008  *    This file contains only the public interface routines.
00009  *
00010  *
00011  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00012  * Portions Copyright (c) 1994, Regents of the University of California
00013  *
00014  * IDENTIFICATION
00015  *    src/backend/access/nbtree/nbtree.c
00016  *
00017  *-------------------------------------------------------------------------
00018  */
00019 #include "postgres.h"
00020 
00021 #include "access/heapam_xlog.h"
00022 #include "access/nbtree.h"
00023 #include "access/relscan.h"
00024 #include "catalog/index.h"
00025 #include "commands/vacuum.h"
00026 #include "storage/indexfsm.h"
00027 #include "storage/ipc.h"
00028 #include "storage/lmgr.h"
00029 #include "storage/smgr.h"
00030 #include "tcop/tcopprot.h"
00031 #include "utils/memutils.h"
00032 
00033 
00034 /* Working state for btbuild and its callback */
00035 typedef struct
00036 {
00037     bool        isUnique;
00038     bool        haveDead;
00039     Relation    heapRel;
00040     BTSpool    *spool;
00041 
00042     /*
00043      * spool2 is needed only when the index is an unique index. Dead tuples
00044      * are put into spool2 instead of spool in order to avoid uniqueness
00045      * check.
00046      */
00047     BTSpool    *spool2;
00048     double      indtuples;
00049 } BTBuildState;
00050 
00051 /* Working state needed by btvacuumpage */
00052 typedef struct
00053 {
00054     IndexVacuumInfo *info;
00055     IndexBulkDeleteResult *stats;
00056     IndexBulkDeleteCallback callback;
00057     void       *callback_state;
00058     BTCycleId   cycleid;
00059     BlockNumber lastBlockVacuumed;      /* last blkno reached by Vacuum scan */
00060     BlockNumber lastUsedPage;   /* blkno of last non-recyclable page */
00061     BlockNumber totFreePages;   /* true total # of free pages */
00062     MemoryContext pagedelcontext;
00063 } BTVacState;
00064 
00065 
00066 static void btbuildCallback(Relation index,
00067                 HeapTuple htup,
00068                 Datum *values,
00069                 bool *isnull,
00070                 bool tupleIsAlive,
00071                 void *state);
00072 static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
00073              IndexBulkDeleteCallback callback, void *callback_state,
00074              BTCycleId cycleid);
00075 static void btvacuumpage(BTVacState *vstate, BlockNumber blkno,
00076              BlockNumber orig_blkno);
00077 
00078 
00079 /*
00080  *  btbuild() -- build a new btree index.
00081  */
00082 Datum
00083 btbuild(PG_FUNCTION_ARGS)
00084 {
00085     Relation    heap = (Relation) PG_GETARG_POINTER(0);
00086     Relation    index = (Relation) PG_GETARG_POINTER(1);
00087     IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
00088     IndexBuildResult *result;
00089     double      reltuples;
00090     BTBuildState buildstate;
00091 
00092     buildstate.isUnique = indexInfo->ii_Unique;
00093     buildstate.haveDead = false;
00094     buildstate.heapRel = heap;
00095     buildstate.spool = NULL;
00096     buildstate.spool2 = NULL;
00097     buildstate.indtuples = 0;
00098 
00099 #ifdef BTREE_BUILD_STATS
00100     if (log_btree_build_stats)
00101         ResetUsage();
00102 #endif   /* BTREE_BUILD_STATS */
00103 
00104     /*
00105      * We expect to be called exactly once for any index relation. If that's
00106      * not the case, big trouble's what we have.
00107      */
00108     if (RelationGetNumberOfBlocks(index) != 0)
00109         elog(ERROR, "index \"%s\" already contains data",
00110              RelationGetRelationName(index));
00111 
00112     buildstate.spool = _bt_spoolinit(heap, index, indexInfo->ii_Unique, false);
00113 
00114     /*
00115      * If building a unique index, put dead tuples in a second spool to keep
00116      * them out of the uniqueness check.
00117      */
00118     if (indexInfo->ii_Unique)
00119         buildstate.spool2 = _bt_spoolinit(heap, index, false, true);
00120 
00121     /* do the heap scan */
00122     reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
00123                                    btbuildCallback, (void *) &buildstate);
00124 
00125     /* okay, all heap tuples are indexed */
00126     if (buildstate.spool2 && !buildstate.haveDead)
00127     {
00128         /* spool2 turns out to be unnecessary */
00129         _bt_spooldestroy(buildstate.spool2);
00130         buildstate.spool2 = NULL;
00131     }
00132 
00133     /*
00134      * Finish the build by (1) completing the sort of the spool file, (2)
00135      * inserting the sorted tuples into btree pages and (3) building the upper
00136      * levels.
00137      */
00138     _bt_leafbuild(buildstate.spool, buildstate.spool2);
00139     _bt_spooldestroy(buildstate.spool);
00140     if (buildstate.spool2)
00141         _bt_spooldestroy(buildstate.spool2);
00142 
00143 #ifdef BTREE_BUILD_STATS
00144     if (log_btree_build_stats)
00145     {
00146         ShowUsage("BTREE BUILD STATS");
00147         ResetUsage();
00148     }
00149 #endif   /* BTREE_BUILD_STATS */
00150 
00151     /*
00152      * If we are reindexing a pre-existing index, it is critical to send out a
00153      * relcache invalidation SI message to ensure all backends re-read the
00154      * index metapage.  We expect that the caller will ensure that happens
00155      * (typically as a side effect of updating index stats, but it must happen
00156      * even if the stats don't change!)
00157      */
00158 
00159     /*
00160      * Return statistics
00161      */
00162     result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
00163 
00164     result->heap_tuples = reltuples;
00165     result->index_tuples = buildstate.indtuples;
00166 
00167     PG_RETURN_POINTER(result);
00168 }
00169 
00170 /*
00171  * Per-tuple callback from IndexBuildHeapScan
00172  */
00173 static void
00174 btbuildCallback(Relation index,
00175                 HeapTuple htup,
00176                 Datum *values,
00177                 bool *isnull,
00178                 bool tupleIsAlive,
00179                 void *state)
00180 {
00181     BTBuildState *buildstate = (BTBuildState *) state;
00182     IndexTuple  itup;
00183 
00184     /* form an index tuple and point it at the heap tuple */
00185     itup = index_form_tuple(RelationGetDescr(index), values, isnull);
00186     itup->t_tid = htup->t_self;
00187 
00188     /*
00189      * insert the index tuple into the appropriate spool file for subsequent
00190      * processing
00191      */
00192     if (tupleIsAlive || buildstate->spool2 == NULL)
00193         _bt_spool(itup, buildstate->spool);
00194     else
00195     {
00196         /* dead tuples are put into spool2 */
00197         buildstate->haveDead = true;
00198         _bt_spool(itup, buildstate->spool2);
00199     }
00200 
00201     buildstate->indtuples += 1;
00202 
00203     pfree(itup);
00204 }
00205 
00206 /*
00207  *  btbuildempty() -- build an empty btree index in the initialization fork
00208  */
00209 Datum
00210 btbuildempty(PG_FUNCTION_ARGS)
00211 {
00212     Relation    index = (Relation) PG_GETARG_POINTER(0);
00213     Page        metapage;
00214 
00215     /* Construct metapage. */
00216     metapage = (Page) palloc(BLCKSZ);
00217     _bt_initmetapage(metapage, P_NONE, 0);
00218 
00219     /* Write the page.  If archiving/streaming, XLOG it. */
00220     PageSetChecksumInplace(metapage, BTREE_METAPAGE);
00221     smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE,
00222               (char *) metapage, true);
00223     if (XLogIsNeeded())
00224         log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
00225                     BTREE_METAPAGE, metapage);
00226 
00227     /*
00228      * An immediate sync is require even if we xlog'd the page, because the
00229      * write did not go through shared_buffers and therefore a concurrent
00230      * checkpoint may have move the redo pointer past our xlog record.
00231      */
00232     smgrimmedsync(index->rd_smgr, INIT_FORKNUM);
00233 
00234     PG_RETURN_VOID();
00235 }
00236 
00237 /*
00238  *  btinsert() -- insert an index tuple into a btree.
00239  *
00240  *      Descend the tree recursively, find the appropriate location for our
00241  *      new tuple, and put it there.
00242  */
00243 Datum
00244 btinsert(PG_FUNCTION_ARGS)
00245 {
00246     Relation    rel = (Relation) PG_GETARG_POINTER(0);
00247     Datum      *values = (Datum *) PG_GETARG_POINTER(1);
00248     bool       *isnull = (bool *) PG_GETARG_POINTER(2);
00249     ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
00250     Relation    heapRel = (Relation) PG_GETARG_POINTER(4);
00251     IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
00252     bool        result;
00253     IndexTuple  itup;
00254 
00255     /* generate an index tuple */
00256     itup = index_form_tuple(RelationGetDescr(rel), values, isnull);
00257     itup->t_tid = *ht_ctid;
00258 
00259     result = _bt_doinsert(rel, itup, checkUnique, heapRel);
00260 
00261     pfree(itup);
00262 
00263     PG_RETURN_BOOL(result);
00264 }
00265 
00266 /*
00267  *  btgettuple() -- Get the next tuple in the scan.
00268  */
00269 Datum
00270 btgettuple(PG_FUNCTION_ARGS)
00271 {
00272     IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00273     ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
00274     BTScanOpaque so = (BTScanOpaque) scan->opaque;
00275     bool        res;
00276 
00277     /* btree indexes are never lossy */
00278     scan->xs_recheck = false;
00279 
00280     /*
00281      * If we have any array keys, initialize them during first call for a
00282      * scan.  We can't do this in btrescan because we don't know the scan
00283      * direction at that time.
00284      */
00285     if (so->numArrayKeys && !BTScanPosIsValid(so->currPos))
00286     {
00287         /* punt if we have any unsatisfiable array keys */
00288         if (so->numArrayKeys < 0)
00289             PG_RETURN_BOOL(false);
00290 
00291         _bt_start_array_keys(scan, dir);
00292     }
00293 
00294     /* This loop handles advancing to the next array elements, if any */
00295     do
00296     {
00297         /*
00298          * If we've already initialized this scan, we can just advance it in
00299          * the appropriate direction.  If we haven't done so yet, we call
00300          * _bt_first() to get the first item in the scan.
00301          */
00302         if (!BTScanPosIsValid(so->currPos))
00303             res = _bt_first(scan, dir);
00304         else
00305         {
00306             /*
00307              * Check to see if we should kill the previously-fetched tuple.
00308              */
00309             if (scan->kill_prior_tuple)
00310             {
00311                 /*
00312                  * Yes, remember it for later. (We'll deal with all such
00313                  * tuples at once right before leaving the index page.)  The
00314                  * test for numKilled overrun is not just paranoia: if the
00315                  * caller reverses direction in the indexscan then the same
00316                  * item might get entered multiple times. It's not worth
00317                  * trying to optimize that, so we don't detect it, but instead
00318                  * just forget any excess entries.
00319                  */
00320                 if (so->killedItems == NULL)
00321                     so->killedItems = (int *)
00322                         palloc(MaxIndexTuplesPerPage * sizeof(int));
00323                 if (so->numKilled < MaxIndexTuplesPerPage)
00324                     so->killedItems[so->numKilled++] = so->currPos.itemIndex;
00325             }
00326 
00327             /*
00328              * Now continue the scan.
00329              */
00330             res = _bt_next(scan, dir);
00331         }
00332 
00333         /* If we have a tuple, return it ... */
00334         if (res)
00335             break;
00336         /* ... otherwise see if we have more array keys to deal with */
00337     } while (so->numArrayKeys && _bt_advance_array_keys(scan, dir));
00338 
00339     PG_RETURN_BOOL(res);
00340 }
00341 
00342 /*
00343  * btgetbitmap() -- gets all matching tuples, and adds them to a bitmap
00344  */
00345 Datum
00346 btgetbitmap(PG_FUNCTION_ARGS)
00347 {
00348     IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00349     TIDBitmap  *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
00350     BTScanOpaque so = (BTScanOpaque) scan->opaque;
00351     int64       ntids = 0;
00352     ItemPointer heapTid;
00353 
00354     /*
00355      * If we have any array keys, initialize them.
00356      */
00357     if (so->numArrayKeys)
00358     {
00359         /* punt if we have any unsatisfiable array keys */
00360         if (so->numArrayKeys < 0)
00361             PG_RETURN_INT64(ntids);
00362 
00363         _bt_start_array_keys(scan, ForwardScanDirection);
00364     }
00365 
00366     /* This loop handles advancing to the next array elements, if any */
00367     do
00368     {
00369         /* Fetch the first page & tuple */
00370         if (_bt_first(scan, ForwardScanDirection))
00371         {
00372             /* Save tuple ID, and continue scanning */
00373             heapTid = &scan->xs_ctup.t_self;
00374             tbm_add_tuples(tbm, heapTid, 1, false);
00375             ntids++;
00376 
00377             for (;;)
00378             {
00379                 /*
00380                  * Advance to next tuple within page.  This is the same as the
00381                  * easy case in _bt_next().
00382                  */
00383                 if (++so->currPos.itemIndex > so->currPos.lastItem)
00384                 {
00385                     /* let _bt_next do the heavy lifting */
00386                     if (!_bt_next(scan, ForwardScanDirection))
00387                         break;
00388                 }
00389 
00390                 /* Save tuple ID, and continue scanning */
00391                 heapTid = &so->currPos.items[so->currPos.itemIndex].heapTid;
00392                 tbm_add_tuples(tbm, heapTid, 1, false);
00393                 ntids++;
00394             }
00395         }
00396         /* Now see if we have more array keys to deal with */
00397     } while (so->numArrayKeys && _bt_advance_array_keys(scan, ForwardScanDirection));
00398 
00399     PG_RETURN_INT64(ntids);
00400 }
00401 
00402 /*
00403  *  btbeginscan() -- start a scan on a btree index
00404  */
00405 Datum
00406 btbeginscan(PG_FUNCTION_ARGS)
00407 {
00408     Relation    rel = (Relation) PG_GETARG_POINTER(0);
00409     int         nkeys = PG_GETARG_INT32(1);
00410     int         norderbys = PG_GETARG_INT32(2);
00411     IndexScanDesc scan;
00412     BTScanOpaque so;
00413 
00414     /* no order by operators allowed */
00415     Assert(norderbys == 0);
00416 
00417     /* get the scan */
00418     scan = RelationGetIndexScan(rel, nkeys, norderbys);
00419 
00420     /* allocate private workspace */
00421     so = (BTScanOpaque) palloc(sizeof(BTScanOpaqueData));
00422     so->currPos.buf = so->markPos.buf = InvalidBuffer;
00423     if (scan->numberOfKeys > 0)
00424         so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
00425     else
00426         so->keyData = NULL;
00427 
00428     so->arrayKeyData = NULL;    /* assume no array keys for now */
00429     so->numArrayKeys = 0;
00430     so->arrayKeys = NULL;
00431     so->arrayContext = NULL;
00432 
00433     so->killedItems = NULL;     /* until needed */
00434     so->numKilled = 0;
00435 
00436     /*
00437      * We don't know yet whether the scan will be index-only, so we do not
00438      * allocate the tuple workspace arrays until btrescan.  However, we set up
00439      * scan->xs_itupdesc whether we'll need it or not, since that's so cheap.
00440      */
00441     so->currTuples = so->markTuples = NULL;
00442     so->currPos.nextTupleOffset = 0;
00443     so->markPos.nextTupleOffset = 0;
00444 
00445     scan->xs_itupdesc = RelationGetDescr(rel);
00446 
00447     scan->opaque = so;
00448 
00449     PG_RETURN_POINTER(scan);
00450 }
00451 
00452 /*
00453  *  btrescan() -- rescan an index relation
00454  */
00455 Datum
00456 btrescan(PG_FUNCTION_ARGS)
00457 {
00458     IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00459     ScanKey     scankey = (ScanKey) PG_GETARG_POINTER(1);
00460 
00461     /* remaining arguments are ignored */
00462     BTScanOpaque so = (BTScanOpaque) scan->opaque;
00463 
00464     /* we aren't holding any read locks, but gotta drop the pins */
00465     if (BTScanPosIsValid(so->currPos))
00466     {
00467         /* Before leaving current page, deal with any killed items */
00468         if (so->numKilled > 0)
00469             _bt_killitems(scan, false);
00470         ReleaseBuffer(so->currPos.buf);
00471         so->currPos.buf = InvalidBuffer;
00472     }
00473 
00474     if (BTScanPosIsValid(so->markPos))
00475     {
00476         ReleaseBuffer(so->markPos.buf);
00477         so->markPos.buf = InvalidBuffer;
00478     }
00479     so->markItemIndex = -1;
00480 
00481     /*
00482      * Allocate tuple workspace arrays, if needed for an index-only scan and
00483      * not already done in a previous rescan call.  To save on palloc
00484      * overhead, both workspaces are allocated as one palloc block; only this
00485      * function and btendscan know that.
00486      *
00487      * NOTE: this data structure also makes it safe to return data from a
00488      * "name" column, even though btree name_ops uses an underlying storage
00489      * datatype of cstring.  The risk there is that "name" is supposed to be
00490      * padded to NAMEDATALEN, but the actual index tuple is probably shorter.
00491      * However, since we only return data out of tuples sitting in the
00492      * currTuples array, a fetch of NAMEDATALEN bytes can at worst pull some
00493      * data out of the markTuples array --- running off the end of memory for
00494      * a SIGSEGV is not possible.  Yeah, this is ugly as sin, but it beats
00495      * adding special-case treatment for name_ops elsewhere.
00496      */
00497     if (scan->xs_want_itup && so->currTuples == NULL)
00498     {
00499         so->currTuples = (char *) palloc(BLCKSZ * 2);
00500         so->markTuples = so->currTuples + BLCKSZ;
00501     }
00502 
00503     /*
00504      * Reset the scan keys. Note that keys ordering stuff moved to _bt_first.
00505      * - vadim 05/05/97
00506      */
00507     if (scankey && scan->numberOfKeys > 0)
00508         memmove(scan->keyData,
00509                 scankey,
00510                 scan->numberOfKeys * sizeof(ScanKeyData));
00511     so->numberOfKeys = 0;       /* until _bt_preprocess_keys sets it */
00512 
00513     /* If any keys are SK_SEARCHARRAY type, set up array-key info */
00514     _bt_preprocess_array_keys(scan);
00515 
00516     PG_RETURN_VOID();
00517 }
00518 
00519 /*
00520  *  btendscan() -- close down a scan
00521  */
00522 Datum
00523 btendscan(PG_FUNCTION_ARGS)
00524 {
00525     IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00526     BTScanOpaque so = (BTScanOpaque) scan->opaque;
00527 
00528     /* we aren't holding any read locks, but gotta drop the pins */
00529     if (BTScanPosIsValid(so->currPos))
00530     {
00531         /* Before leaving current page, deal with any killed items */
00532         if (so->numKilled > 0)
00533             _bt_killitems(scan, false);
00534         ReleaseBuffer(so->currPos.buf);
00535         so->currPos.buf = InvalidBuffer;
00536     }
00537 
00538     if (BTScanPosIsValid(so->markPos))
00539     {
00540         ReleaseBuffer(so->markPos.buf);
00541         so->markPos.buf = InvalidBuffer;
00542     }
00543     so->markItemIndex = -1;
00544 
00545     /* Release storage */
00546     if (so->keyData != NULL)
00547         pfree(so->keyData);
00548     /* so->arrayKeyData and so->arrayKeys are in arrayContext */
00549     if (so->arrayContext != NULL)
00550         MemoryContextDelete(so->arrayContext);
00551     if (so->killedItems != NULL)
00552         pfree(so->killedItems);
00553     if (so->currTuples != NULL)
00554         pfree(so->currTuples);
00555     /* so->markTuples should not be pfree'd, see btrescan */
00556     pfree(so);
00557 
00558     PG_RETURN_VOID();
00559 }
00560 
00561 /*
00562  *  btmarkpos() -- save current scan position
00563  */
00564 Datum
00565 btmarkpos(PG_FUNCTION_ARGS)
00566 {
00567     IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00568     BTScanOpaque so = (BTScanOpaque) scan->opaque;
00569 
00570     /* we aren't holding any read locks, but gotta drop the pin */
00571     if (BTScanPosIsValid(so->markPos))
00572     {
00573         ReleaseBuffer(so->markPos.buf);
00574         so->markPos.buf = InvalidBuffer;
00575     }
00576 
00577     /*
00578      * Just record the current itemIndex.  If we later step to next page
00579      * before releasing the marked position, _bt_steppage makes a full copy of
00580      * the currPos struct in markPos.  If (as often happens) the mark is moved
00581      * before we leave the page, we don't have to do that work.
00582      */
00583     if (BTScanPosIsValid(so->currPos))
00584         so->markItemIndex = so->currPos.itemIndex;
00585     else
00586         so->markItemIndex = -1;
00587 
00588     /* Also record the current positions of any array keys */
00589     if (so->numArrayKeys)
00590         _bt_mark_array_keys(scan);
00591 
00592     PG_RETURN_VOID();
00593 }
00594 
00595 /*
00596  *  btrestrpos() -- restore scan to last saved position
00597  */
00598 Datum
00599 btrestrpos(PG_FUNCTION_ARGS)
00600 {
00601     IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00602     BTScanOpaque so = (BTScanOpaque) scan->opaque;
00603 
00604     /* Restore the marked positions of any array keys */
00605     if (so->numArrayKeys)
00606         _bt_restore_array_keys(scan);
00607 
00608     if (so->markItemIndex >= 0)
00609     {
00610         /*
00611          * The mark position is on the same page we are currently on. Just
00612          * restore the itemIndex.
00613          */
00614         so->currPos.itemIndex = so->markItemIndex;
00615     }
00616     else
00617     {
00618         /* we aren't holding any read locks, but gotta drop the pin */
00619         if (BTScanPosIsValid(so->currPos))
00620         {
00621             /* Before leaving current page, deal with any killed items */
00622             if (so->numKilled > 0 &&
00623                 so->currPos.buf != so->markPos.buf)
00624                 _bt_killitems(scan, false);
00625             ReleaseBuffer(so->currPos.buf);
00626             so->currPos.buf = InvalidBuffer;
00627         }
00628 
00629         if (BTScanPosIsValid(so->markPos))
00630         {
00631             /* bump pin on mark buffer for assignment to current buffer */
00632             IncrBufferRefCount(so->markPos.buf);
00633             memcpy(&so->currPos, &so->markPos,
00634                    offsetof(BTScanPosData, items[1]) +
00635                    so->markPos.lastItem * sizeof(BTScanPosItem));
00636             if (so->currTuples)
00637                 memcpy(so->currTuples, so->markTuples,
00638                        so->markPos.nextTupleOffset);
00639         }
00640     }
00641 
00642     PG_RETURN_VOID();
00643 }
00644 
00645 /*
00646  * Bulk deletion of all index entries pointing to a set of heap tuples.
00647  * The set of target tuples is specified via a callback routine that tells
00648  * whether any given heap tuple (identified by ItemPointer) is being deleted.
00649  *
00650  * Result: a palloc'd struct containing statistical info for VACUUM displays.
00651  */
00652 Datum
00653 btbulkdelete(PG_FUNCTION_ARGS)
00654 {
00655     IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
00656     IndexBulkDeleteResult *volatile stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
00657     IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
00658     void       *callback_state = (void *) PG_GETARG_POINTER(3);
00659     Relation    rel = info->index;
00660     BTCycleId   cycleid;
00661 
00662     /* allocate stats if first time through, else re-use existing struct */
00663     if (stats == NULL)
00664         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
00665 
00666     /* Establish the vacuum cycle ID to use for this scan */
00667     /* The ENSURE stuff ensures we clean up shared memory on failure */
00668     PG_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel));
00669     {
00670         cycleid = _bt_start_vacuum(rel);
00671 
00672         btvacuumscan(info, stats, callback, callback_state, cycleid);
00673     }
00674     PG_END_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel));
00675     _bt_end_vacuum(rel);
00676 
00677     PG_RETURN_POINTER(stats);
00678 }
00679 
00680 /*
00681  * Post-VACUUM cleanup.
00682  *
00683  * Result: a palloc'd struct containing statistical info for VACUUM displays.
00684  */
00685 Datum
00686 btvacuumcleanup(PG_FUNCTION_ARGS)
00687 {
00688     IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
00689     IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
00690 
00691     /* No-op in ANALYZE ONLY mode */
00692     if (info->analyze_only)
00693         PG_RETURN_POINTER(stats);
00694 
00695     /*
00696      * If btbulkdelete was called, we need not do anything, just return the
00697      * stats from the latest btbulkdelete call.  If it wasn't called, we must
00698      * still do a pass over the index, to recycle any newly-recyclable pages
00699      * and to obtain index statistics.
00700      *
00701      * Since we aren't going to actually delete any leaf items, there's no
00702      * need to go through all the vacuum-cycle-ID pushups.
00703      */
00704     if (stats == NULL)
00705     {
00706         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
00707         btvacuumscan(info, stats, NULL, NULL, 0);
00708     }
00709 
00710     /* Finally, vacuum the FSM */
00711     IndexFreeSpaceMapVacuum(info->index);
00712 
00713     /*
00714      * It's quite possible for us to be fooled by concurrent page splits into
00715      * double-counting some index tuples, so disbelieve any total that exceeds
00716      * the underlying heap's count ... if we know that accurately.  Otherwise
00717      * this might just make matters worse.
00718      */
00719     if (!info->estimated_count)
00720     {
00721         if (stats->num_index_tuples > info->num_heap_tuples)
00722             stats->num_index_tuples = info->num_heap_tuples;
00723     }
00724 
00725     PG_RETURN_POINTER(stats);
00726 }
00727 
00728 /*
00729  * btvacuumscan --- scan the index for VACUUMing purposes
00730  *
00731  * This combines the functions of looking for leaf tuples that are deletable
00732  * according to the vacuum callback, looking for empty pages that can be
00733  * deleted, and looking for old deleted pages that can be recycled.  Both
00734  * btbulkdelete and btvacuumcleanup invoke this (the latter only if no
00735  * btbulkdelete call occurred).
00736  *
00737  * The caller is responsible for initially allocating/zeroing a stats struct
00738  * and for obtaining a vacuum cycle ID if necessary.
00739  */
00740 static void
00741 btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
00742              IndexBulkDeleteCallback callback, void *callback_state,
00743              BTCycleId cycleid)
00744 {
00745     Relation    rel = info->index;
00746     BTVacState  vstate;
00747     BlockNumber num_pages;
00748     BlockNumber blkno;
00749     bool        needLock;
00750 
00751     /*
00752      * Reset counts that will be incremented during the scan; needed in case
00753      * of multiple scans during a single VACUUM command
00754      */
00755     stats->estimated_count = false;
00756     stats->num_index_tuples = 0;
00757     stats->pages_deleted = 0;
00758 
00759     /* Set up info to pass down to btvacuumpage */
00760     vstate.info = info;
00761     vstate.stats = stats;
00762     vstate.callback = callback;
00763     vstate.callback_state = callback_state;
00764     vstate.cycleid = cycleid;
00765     vstate.lastBlockVacuumed = BTREE_METAPAGE;  /* Initialise at first block */
00766     vstate.lastUsedPage = BTREE_METAPAGE;
00767     vstate.totFreePages = 0;
00768 
00769     /* Create a temporary memory context to run _bt_pagedel in */
00770     vstate.pagedelcontext = AllocSetContextCreate(CurrentMemoryContext,
00771                                                   "_bt_pagedel",
00772                                                   ALLOCSET_DEFAULT_MINSIZE,
00773                                                   ALLOCSET_DEFAULT_INITSIZE,
00774                                                   ALLOCSET_DEFAULT_MAXSIZE);
00775 
00776     /*
00777      * The outer loop iterates over all index pages except the metapage, in
00778      * physical order (we hope the kernel will cooperate in providing
00779      * read-ahead for speed).  It is critical that we visit all leaf pages,
00780      * including ones added after we start the scan, else we might fail to
00781      * delete some deletable tuples.  Hence, we must repeatedly check the
00782      * relation length.  We must acquire the relation-extension lock while
00783      * doing so to avoid a race condition: if someone else is extending the
00784      * relation, there is a window where bufmgr/smgr have created a new
00785      * all-zero page but it hasn't yet been write-locked by _bt_getbuf(). If
00786      * we manage to scan such a page here, we'll improperly assume it can be
00787      * recycled.  Taking the lock synchronizes things enough to prevent a
00788      * problem: either num_pages won't include the new page, or _bt_getbuf
00789      * already has write lock on the buffer and it will be fully initialized
00790      * before we can examine it.  (See also vacuumlazy.c, which has the same
00791      * issue.)  Also, we need not worry if a page is added immediately after
00792      * we look; the page splitting code already has write-lock on the left
00793      * page before it adds a right page, so we must already have processed any
00794      * tuples due to be moved into such a page.
00795      *
00796      * We can skip locking for new or temp relations, however, since no one
00797      * else could be accessing them.
00798      */
00799     needLock = !RELATION_IS_LOCAL(rel);
00800 
00801     blkno = BTREE_METAPAGE + 1;
00802     for (;;)
00803     {
00804         /* Get the current relation length */
00805         if (needLock)
00806             LockRelationForExtension(rel, ExclusiveLock);
00807         num_pages = RelationGetNumberOfBlocks(rel);
00808         if (needLock)
00809             UnlockRelationForExtension(rel, ExclusiveLock);
00810 
00811         /* Quit if we've scanned the whole relation */
00812         if (blkno >= num_pages)
00813             break;
00814         /* Iterate over pages, then loop back to recheck length */
00815         for (; blkno < num_pages; blkno++)
00816         {
00817             btvacuumpage(&vstate, blkno, blkno);
00818         }
00819     }
00820 
00821     /*
00822      * InHotStandby we need to scan right up to the end of the index for
00823      * correct locking, so we may need to write a WAL record for the final
00824      * block in the index if it was not vacuumed. It's possible that VACUUMing
00825      * has actually removed zeroed pages at the end of the index so we need to
00826      * take care to issue the record for last actual block and not for the
00827      * last block that was scanned. Ignore empty indexes.
00828      */
00829     if (XLogStandbyInfoActive() &&
00830         num_pages > 1 && vstate.lastBlockVacuumed < (num_pages - 1))
00831     {
00832         Buffer      buf;
00833 
00834         /*
00835          * We can't use _bt_getbuf() here because it always applies
00836          * _bt_checkpage(), which will barf on an all-zero page. We want to
00837          * recycle all-zero pages, not fail.  Also, we want to use a
00838          * nondefault buffer access strategy.
00839          */
00840         buf = ReadBufferExtended(rel, MAIN_FORKNUM, num_pages - 1, RBM_NORMAL,
00841                                  info->strategy);
00842         LockBufferForCleanup(buf);
00843         _bt_delitems_vacuum(rel, buf, NULL, 0, vstate.lastBlockVacuumed);
00844         _bt_relbuf(rel, buf);
00845     }
00846 
00847     MemoryContextDelete(vstate.pagedelcontext);
00848 
00849     /* update statistics */
00850     stats->num_pages = num_pages;
00851     stats->pages_free = vstate.totFreePages;
00852 }
00853 
00854 /*
00855  * btvacuumpage --- VACUUM one page
00856  *
00857  * This processes a single page for btvacuumscan().  In some cases we
00858  * must go back and re-examine previously-scanned pages; this routine
00859  * recurses when necessary to handle that case.
00860  *
00861  * blkno is the page to process.  orig_blkno is the highest block number
00862  * reached by the outer btvacuumscan loop (the same as blkno, unless we
00863  * are recursing to re-examine a previous page).
00864  */
00865 static void
00866 btvacuumpage(BTVacState *vstate, BlockNumber blkno, BlockNumber orig_blkno)
00867 {
00868     IndexVacuumInfo *info = vstate->info;
00869     IndexBulkDeleteResult *stats = vstate->stats;
00870     IndexBulkDeleteCallback callback = vstate->callback;
00871     void       *callback_state = vstate->callback_state;
00872     Relation    rel = info->index;
00873     bool        delete_now;
00874     BlockNumber recurse_to;
00875     Buffer      buf;
00876     Page        page;
00877     BTPageOpaque opaque;
00878 
00879 restart:
00880     delete_now = false;
00881     recurse_to = P_NONE;
00882 
00883     /* call vacuum_delay_point while not holding any buffer lock */
00884     vacuum_delay_point();
00885 
00886     /*
00887      * We can't use _bt_getbuf() here because it always applies
00888      * _bt_checkpage(), which will barf on an all-zero page. We want to
00889      * recycle all-zero pages, not fail.  Also, we want to use a nondefault
00890      * buffer access strategy.
00891      */
00892     buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
00893                              info->strategy);
00894     LockBuffer(buf, BT_READ);
00895     page = BufferGetPage(buf);
00896     opaque = (BTPageOpaque) PageGetSpecialPointer(page);
00897     if (!PageIsNew(page))
00898         _bt_checkpage(rel, buf);
00899 
00900     /*
00901      * If we are recursing, the only case we want to do anything with is a
00902      * live leaf page having the current vacuum cycle ID.  Any other state
00903      * implies we already saw the page (eg, deleted it as being empty).
00904      */
00905     if (blkno != orig_blkno)
00906     {
00907         if (_bt_page_recyclable(page) ||
00908             P_IGNORE(opaque) ||
00909             !P_ISLEAF(opaque) ||
00910             opaque->btpo_cycleid != vstate->cycleid)
00911         {
00912             _bt_relbuf(rel, buf);
00913             return;
00914         }
00915     }
00916 
00917     /* If the page is in use, update lastUsedPage */
00918     if (!_bt_page_recyclable(page) && vstate->lastUsedPage < blkno)
00919         vstate->lastUsedPage = blkno;
00920 
00921     /* Page is valid, see what to do with it */
00922     if (_bt_page_recyclable(page))
00923     {
00924         /* Okay to recycle this page */
00925         RecordFreeIndexPage(rel, blkno);
00926         vstate->totFreePages++;
00927         stats->pages_deleted++;
00928     }
00929     else if (P_ISDELETED(opaque))
00930     {
00931         /* Already deleted, but can't recycle yet */
00932         stats->pages_deleted++;
00933     }
00934     else if (P_ISHALFDEAD(opaque))
00935     {
00936         /* Half-dead, try to delete */
00937         delete_now = true;
00938     }
00939     else if (P_ISLEAF(opaque))
00940     {
00941         OffsetNumber deletable[MaxOffsetNumber];
00942         int         ndeletable;
00943         OffsetNumber offnum,
00944                     minoff,
00945                     maxoff;
00946 
00947         /*
00948          * Trade in the initial read lock for a super-exclusive write lock on
00949          * this page.  We must get such a lock on every leaf page over the
00950          * course of the vacuum scan, whether or not it actually contains any
00951          * deletable tuples --- see nbtree/README.
00952          */
00953         LockBuffer(buf, BUFFER_LOCK_UNLOCK);
00954         LockBufferForCleanup(buf);
00955 
00956         /*
00957          * Check whether we need to recurse back to earlier pages.  What we
00958          * are concerned about is a page split that happened since we started
00959          * the vacuum scan.  If the split moved some tuples to a lower page
00960          * then we might have missed 'em.  If so, set up for tail recursion.
00961          * (Must do this before possibly clearing btpo_cycleid below!)
00962          */
00963         if (vstate->cycleid != 0 &&
00964             opaque->btpo_cycleid == vstate->cycleid &&
00965             !(opaque->btpo_flags & BTP_SPLIT_END) &&
00966             !P_RIGHTMOST(opaque) &&
00967             opaque->btpo_next < orig_blkno)
00968             recurse_to = opaque->btpo_next;
00969 
00970         /*
00971          * Scan over all items to see which ones need deleted according to the
00972          * callback function.
00973          */
00974         ndeletable = 0;
00975         minoff = P_FIRSTDATAKEY(opaque);
00976         maxoff = PageGetMaxOffsetNumber(page);
00977         if (callback)
00978         {
00979             for (offnum = minoff;
00980                  offnum <= maxoff;
00981                  offnum = OffsetNumberNext(offnum))
00982             {
00983                 IndexTuple  itup;
00984                 ItemPointer htup;
00985 
00986                 itup = (IndexTuple) PageGetItem(page,
00987                                                 PageGetItemId(page, offnum));
00988                 htup = &(itup->t_tid);
00989 
00990                 /*
00991                  * During Hot Standby we currently assume that
00992                  * XLOG_BTREE_VACUUM records do not produce conflicts. That is
00993                  * only true as long as the callback function depends only
00994                  * upon whether the index tuple refers to heap tuples removed
00995                  * in the initial heap scan. When vacuum starts it derives a
00996                  * value of OldestXmin. Backends taking later snapshots could
00997                  * have a RecentGlobalXmin with a later xid than the vacuum's
00998                  * OldestXmin, so it is possible that row versions deleted
00999                  * after OldestXmin could be marked as killed by other
01000                  * backends. The callback function *could* look at the index
01001                  * tuple state in isolation and decide to delete the index
01002                  * tuple, though currently it does not. If it ever did, we
01003                  * would need to reconsider whether XLOG_BTREE_VACUUM records
01004                  * should cause conflicts. If they did cause conflicts they
01005                  * would be fairly harsh conflicts, since we haven't yet
01006                  * worked out a way to pass a useful value for
01007                  * latestRemovedXid on the XLOG_BTREE_VACUUM records. This
01008                  * applies to *any* type of index that marks index tuples as
01009                  * killed.
01010                  */
01011                 if (callback(htup, callback_state))
01012                     deletable[ndeletable++] = offnum;
01013             }
01014         }
01015 
01016         /*
01017          * Apply any needed deletes.  We issue just one _bt_delitems_vacuum()
01018          * call per page, so as to minimize WAL traffic.
01019          */
01020         if (ndeletable > 0)
01021         {
01022             BlockNumber lastBlockVacuumed = BufferGetBlockNumber(buf);
01023 
01024             _bt_delitems_vacuum(rel, buf, deletable, ndeletable,
01025                                 vstate->lastBlockVacuumed);
01026 
01027             /*
01028              * Keep track of the block number of the lastBlockVacuumed, so we
01029              * can scan those blocks as well during WAL replay. This then
01030              * provides concurrency protection and allows btrees to be used
01031              * while in recovery.
01032              */
01033             if (lastBlockVacuumed > vstate->lastBlockVacuumed)
01034                 vstate->lastBlockVacuumed = lastBlockVacuumed;
01035 
01036             stats->tuples_removed += ndeletable;
01037             /* must recompute maxoff */
01038             maxoff = PageGetMaxOffsetNumber(page);
01039         }
01040         else
01041         {
01042             /*
01043              * If the page has been split during this vacuum cycle, it seems
01044              * worth expending a write to clear btpo_cycleid even if we don't
01045              * have any deletions to do.  (If we do, _bt_delitems_vacuum takes
01046              * care of this.)  This ensures we won't process the page again.
01047              *
01048              * We treat this like a hint-bit update because there's no need to
01049              * WAL-log it.
01050              */
01051             if (vstate->cycleid != 0 &&
01052                 opaque->btpo_cycleid == vstate->cycleid)
01053             {
01054                 opaque->btpo_cycleid = 0;
01055                 MarkBufferDirtyHint(buf);
01056             }
01057         }
01058 
01059         /*
01060          * If it's now empty, try to delete; else count the live tuples. We
01061          * don't delete when recursing, though, to avoid putting entries into
01062          * freePages out-of-order (doesn't seem worth any extra code to handle
01063          * the case).
01064          */
01065         if (minoff > maxoff)
01066             delete_now = (blkno == orig_blkno);
01067         else
01068             stats->num_index_tuples += maxoff - minoff + 1;
01069     }
01070 
01071     if (delete_now)
01072     {
01073         MemoryContext oldcontext;
01074         int         ndel;
01075 
01076         /* Run pagedel in a temp context to avoid memory leakage */
01077         MemoryContextReset(vstate->pagedelcontext);
01078         oldcontext = MemoryContextSwitchTo(vstate->pagedelcontext);
01079 
01080         ndel = _bt_pagedel(rel, buf, NULL);
01081 
01082         /* count only this page, else may double-count parent */
01083         if (ndel)
01084             stats->pages_deleted++;
01085 
01086         MemoryContextSwitchTo(oldcontext);
01087         /* pagedel released buffer, so we shouldn't */
01088     }
01089     else
01090         _bt_relbuf(rel, buf);
01091 
01092     /*
01093      * This is really tail recursion, but if the compiler is too stupid to
01094      * optimize it as such, we'd eat an uncomfortably large amount of stack
01095      * space per recursion level (due to the deletable[] array). A failure is
01096      * improbable since the number of levels isn't likely to be large ... but
01097      * just in case, let's hand-optimize into a loop.
01098      */
01099     if (recurse_to != P_NONE)
01100     {
01101         blkno = recurse_to;
01102         goto restart;
01103     }
01104 }
01105 
01106 /*
01107  *  btcanreturn() -- Check whether btree indexes support index-only scans.
01108  *
01109  * btrees always do, so this is trivial.
01110  */
01111 Datum
01112 btcanreturn(PG_FUNCTION_ARGS)
01113 {
01114     PG_RETURN_BOOL(true);
01115 }