Header And Logo

PostgreSQL
| The world's most advanced open source database.

hash.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * hash.c
00004  *    Implementation of Margo Seltzer's Hashing package for postgres.
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/access/hash/hash.c
00012  *
00013  * NOTES
00014  *    This file contains only the public interface routines.
00015  *
00016  *-------------------------------------------------------------------------
00017  */
00018 
00019 #include "postgres.h"
00020 
00021 #include "access/hash.h"
00022 #include "access/relscan.h"
00023 #include "catalog/index.h"
00024 #include "commands/vacuum.h"
00025 #include "optimizer/cost.h"
00026 #include "optimizer/plancat.h"
00027 #include "storage/bufmgr.h"
00028 #include "utils/rel.h"
00029 
00030 
00031 /* Working state for hashbuild and its callback */
00032 typedef struct
00033 {
00034     HSpool     *spool;          /* NULL if not using spooling */
00035     double      indtuples;      /* # tuples accepted into index */
00036 } HashBuildState;
00037 
00038 static void hashbuildCallback(Relation index,
00039                   HeapTuple htup,
00040                   Datum *values,
00041                   bool *isnull,
00042                   bool tupleIsAlive,
00043                   void *state);
00044 
00045 
00046 /*
00047  *  hashbuild() -- build a new hash index.
00048  */
00049 Datum
00050 hashbuild(PG_FUNCTION_ARGS)
00051 {
00052     Relation    heap = (Relation) PG_GETARG_POINTER(0);
00053     Relation    index = (Relation) PG_GETARG_POINTER(1);
00054     IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
00055     IndexBuildResult *result;
00056     BlockNumber relpages;
00057     double      reltuples;
00058     double      allvisfrac;
00059     uint32      num_buckets;
00060     HashBuildState buildstate;
00061 
00062     /*
00063      * We expect to be called exactly once for any index relation. If that's
00064      * not the case, big trouble's what we have.
00065      */
00066     if (RelationGetNumberOfBlocks(index) != 0)
00067         elog(ERROR, "index \"%s\" already contains data",
00068              RelationGetRelationName(index));
00069 
00070     /* Estimate the number of rows currently present in the table */
00071     estimate_rel_size(heap, NULL, &relpages, &reltuples, &allvisfrac);
00072 
00073     /* Initialize the hash index metadata page and initial buckets */
00074     num_buckets = _hash_metapinit(index, reltuples, MAIN_FORKNUM);
00075 
00076     /*
00077      * If we just insert the tuples into the index in scan order, then
00078      * (assuming their hash codes are pretty random) there will be no locality
00079      * of access to the index, and if the index is bigger than available RAM
00080      * then we'll thrash horribly.  To prevent that scenario, we can sort the
00081      * tuples by (expected) bucket number.  However, such a sort is useless
00082      * overhead when the index does fit in RAM.  We choose to sort if the
00083      * initial index size exceeds NBuffers.
00084      *
00085      * NOTE: this test will need adjustment if a bucket is ever different from
00086      * one page.
00087      */
00088     if (num_buckets >= (uint32) NBuffers)
00089         buildstate.spool = _h_spoolinit(heap, index, num_buckets);
00090     else
00091         buildstate.spool = NULL;
00092 
00093     /* prepare to build the index */
00094     buildstate.indtuples = 0;
00095 
00096     /* do the heap scan */
00097     reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
00098                                    hashbuildCallback, (void *) &buildstate);
00099 
00100     if (buildstate.spool)
00101     {
00102         /* sort the tuples and insert them into the index */
00103         _h_indexbuild(buildstate.spool);
00104         _h_spooldestroy(buildstate.spool);
00105     }
00106 
00107     /*
00108      * Return statistics
00109      */
00110     result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
00111 
00112     result->heap_tuples = reltuples;
00113     result->index_tuples = buildstate.indtuples;
00114 
00115     PG_RETURN_POINTER(result);
00116 }
00117 
00118 /*
00119  *  hashbuildempty() -- build an empty hash index in the initialization fork
00120  */
00121 Datum
00122 hashbuildempty(PG_FUNCTION_ARGS)
00123 {
00124     Relation    index = (Relation) PG_GETARG_POINTER(0);
00125 
00126     _hash_metapinit(index, 0, INIT_FORKNUM);
00127 
00128     PG_RETURN_VOID();
00129 }
00130 
00131 /*
00132  * Per-tuple callback from IndexBuildHeapScan
00133  */
00134 static void
00135 hashbuildCallback(Relation index,
00136                   HeapTuple htup,
00137                   Datum *values,
00138                   bool *isnull,
00139                   bool tupleIsAlive,
00140                   void *state)
00141 {
00142     HashBuildState *buildstate = (HashBuildState *) state;
00143     IndexTuple  itup;
00144 
00145     /* form an index tuple and point it at the heap tuple */
00146     itup = _hash_form_tuple(index, values, isnull);
00147     itup->t_tid = htup->t_self;
00148 
00149     /* Hash indexes don't index nulls, see notes in hashinsert */
00150     if (IndexTupleHasNulls(itup))
00151     {
00152         pfree(itup);
00153         return;
00154     }
00155 
00156     /* Either spool the tuple for sorting, or just put it into the index */
00157     if (buildstate->spool)
00158         _h_spool(itup, buildstate->spool);
00159     else
00160         _hash_doinsert(index, itup);
00161 
00162     buildstate->indtuples += 1;
00163 
00164     pfree(itup);
00165 }
00166 
00167 /*
00168  *  hashinsert() -- insert an index tuple into a hash table.
00169  *
00170  *  Hash on the heap tuple's key, form an index tuple with hash code.
00171  *  Find the appropriate location for the new tuple, and put it there.
00172  */
00173 Datum
00174 hashinsert(PG_FUNCTION_ARGS)
00175 {
00176     Relation    rel = (Relation) PG_GETARG_POINTER(0);
00177     Datum      *values = (Datum *) PG_GETARG_POINTER(1);
00178     bool       *isnull = (bool *) PG_GETARG_POINTER(2);
00179     ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
00180 
00181 #ifdef NOT_USED
00182     Relation    heapRel = (Relation) PG_GETARG_POINTER(4);
00183     IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
00184 #endif
00185     IndexTuple  itup;
00186 
00187     /* generate an index tuple */
00188     itup = _hash_form_tuple(rel, values, isnull);
00189     itup->t_tid = *ht_ctid;
00190 
00191     /*
00192      * If the single index key is null, we don't insert it into the index.
00193      * Hash tables support scans on '='. Relational algebra says that A = B
00194      * returns null if either A or B is null.  This means that no
00195      * qualification used in an index scan could ever return true on a null
00196      * attribute.  It also means that indices can't be used by ISNULL or
00197      * NOTNULL scans, but that's an artifact of the strategy map architecture
00198      * chosen in 1986, not of the way nulls are handled here.
00199      */
00200     if (IndexTupleHasNulls(itup))
00201     {
00202         pfree(itup);
00203         PG_RETURN_BOOL(false);
00204     }
00205 
00206     _hash_doinsert(rel, itup);
00207 
00208     pfree(itup);
00209 
00210     PG_RETURN_BOOL(false);
00211 }
00212 
00213 
00214 /*
00215  *  hashgettuple() -- Get the next tuple in the scan.
00216  */
00217 Datum
00218 hashgettuple(PG_FUNCTION_ARGS)
00219 {
00220     IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00221     ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
00222     HashScanOpaque so = (HashScanOpaque) scan->opaque;
00223     Relation    rel = scan->indexRelation;
00224     Buffer      buf;
00225     Page        page;
00226     OffsetNumber offnum;
00227     ItemPointer current;
00228     bool        res;
00229 
00230     /* Hash indexes are always lossy since we store only the hash code */
00231     scan->xs_recheck = true;
00232 
00233     /*
00234      * We hold pin but not lock on current buffer while outside the hash AM.
00235      * Reacquire the read lock here.
00236      */
00237     if (BufferIsValid(so->hashso_curbuf))
00238         _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_NOLOCK, HASH_READ);
00239 
00240     /*
00241      * If we've already initialized this scan, we can just advance it in the
00242      * appropriate direction.  If we haven't done so yet, we call a routine to
00243      * get the first item in the scan.
00244      */
00245     current = &(so->hashso_curpos);
00246     if (ItemPointerIsValid(current))
00247     {
00248         /*
00249          * An insertion into the current index page could have happened while
00250          * we didn't have read lock on it.  Re-find our position by looking
00251          * for the TID we previously returned.  (Because we hold share lock on
00252          * the bucket, no deletions or splits could have occurred; therefore
00253          * we can expect that the TID still exists in the current index page,
00254          * at an offset >= where we were.)
00255          */
00256         OffsetNumber maxoffnum;
00257 
00258         buf = so->hashso_curbuf;
00259         Assert(BufferIsValid(buf));
00260         page = BufferGetPage(buf);
00261         maxoffnum = PageGetMaxOffsetNumber(page);
00262         for (offnum = ItemPointerGetOffsetNumber(current);
00263              offnum <= maxoffnum;
00264              offnum = OffsetNumberNext(offnum))
00265         {
00266             IndexTuple  itup;
00267 
00268             itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
00269             if (ItemPointerEquals(&(so->hashso_heappos), &(itup->t_tid)))
00270                 break;
00271         }
00272         if (offnum > maxoffnum)
00273             elog(ERROR, "failed to re-find scan position within index \"%s\"",
00274                  RelationGetRelationName(rel));
00275         ItemPointerSetOffsetNumber(current, offnum);
00276 
00277         /*
00278          * Check to see if we should kill the previously-fetched tuple.
00279          */
00280         if (scan->kill_prior_tuple)
00281         {
00282             /*
00283              * Yes, so mark it by setting the LP_DEAD state in the item flags.
00284              */
00285             ItemIdMarkDead(PageGetItemId(page, offnum));
00286 
00287             /*
00288              * Since this can be redone later if needed, mark as a hint.
00289              */
00290             MarkBufferDirtyHint(buf);
00291         }
00292 
00293         /*
00294          * Now continue the scan.
00295          */
00296         res = _hash_next(scan, dir);
00297     }
00298     else
00299         res = _hash_first(scan, dir);
00300 
00301     /*
00302      * Skip killed tuples if asked to.
00303      */
00304     if (scan->ignore_killed_tuples)
00305     {
00306         while (res)
00307         {
00308             offnum = ItemPointerGetOffsetNumber(current);
00309             page = BufferGetPage(so->hashso_curbuf);
00310             if (!ItemIdIsDead(PageGetItemId(page, offnum)))
00311                 break;
00312             res = _hash_next(scan, dir);
00313         }
00314     }
00315 
00316     /* Release read lock on current buffer, but keep it pinned */
00317     if (BufferIsValid(so->hashso_curbuf))
00318         _hash_chgbufaccess(rel, so->hashso_curbuf, HASH_READ, HASH_NOLOCK);
00319 
00320     /* Return current heap TID on success */
00321     scan->xs_ctup.t_self = so->hashso_heappos;
00322 
00323     PG_RETURN_BOOL(res);
00324 }
00325 
00326 
00327 /*
00328  *  hashgetbitmap() -- get all tuples at once
00329  */
00330 Datum
00331 hashgetbitmap(PG_FUNCTION_ARGS)
00332 {
00333     IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00334     TIDBitmap  *tbm = (TIDBitmap *) PG_GETARG_POINTER(1);
00335     HashScanOpaque so = (HashScanOpaque) scan->opaque;
00336     bool        res;
00337     int64       ntids = 0;
00338 
00339     res = _hash_first(scan, ForwardScanDirection);
00340 
00341     while (res)
00342     {
00343         bool        add_tuple;
00344 
00345         /*
00346          * Skip killed tuples if asked to.
00347          */
00348         if (scan->ignore_killed_tuples)
00349         {
00350             Page        page;
00351             OffsetNumber offnum;
00352 
00353             offnum = ItemPointerGetOffsetNumber(&(so->hashso_curpos));
00354             page = BufferGetPage(so->hashso_curbuf);
00355             add_tuple = !ItemIdIsDead(PageGetItemId(page, offnum));
00356         }
00357         else
00358             add_tuple = true;
00359 
00360         /* Save tuple ID, and continue scanning */
00361         if (add_tuple)
00362         {
00363             /* Note we mark the tuple ID as requiring recheck */
00364             tbm_add_tuples(tbm, &(so->hashso_heappos), 1, true);
00365             ntids++;
00366         }
00367 
00368         res = _hash_next(scan, ForwardScanDirection);
00369     }
00370 
00371     PG_RETURN_INT64(ntids);
00372 }
00373 
00374 
00375 /*
00376  *  hashbeginscan() -- start a scan on a hash index
00377  */
00378 Datum
00379 hashbeginscan(PG_FUNCTION_ARGS)
00380 {
00381     Relation    rel = (Relation) PG_GETARG_POINTER(0);
00382     int         nkeys = PG_GETARG_INT32(1);
00383     int         norderbys = PG_GETARG_INT32(2);
00384     IndexScanDesc scan;
00385     HashScanOpaque so;
00386 
00387     /* no order by operators allowed */
00388     Assert(norderbys == 0);
00389 
00390     scan = RelationGetIndexScan(rel, nkeys, norderbys);
00391 
00392     so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
00393     so->hashso_bucket_valid = false;
00394     so->hashso_bucket_blkno = 0;
00395     so->hashso_curbuf = InvalidBuffer;
00396     /* set position invalid (this will cause _hash_first call) */
00397     ItemPointerSetInvalid(&(so->hashso_curpos));
00398     ItemPointerSetInvalid(&(so->hashso_heappos));
00399 
00400     scan->opaque = so;
00401 
00402     /* register scan in case we change pages it's using */
00403     _hash_regscan(scan);
00404 
00405     PG_RETURN_POINTER(scan);
00406 }
00407 
00408 /*
00409  *  hashrescan() -- rescan an index relation
00410  */
00411 Datum
00412 hashrescan(PG_FUNCTION_ARGS)
00413 {
00414     IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00415     ScanKey     scankey = (ScanKey) PG_GETARG_POINTER(1);
00416 
00417     /* remaining arguments are ignored */
00418     HashScanOpaque so = (HashScanOpaque) scan->opaque;
00419     Relation    rel = scan->indexRelation;
00420 
00421     /* release any pin we still hold */
00422     if (BufferIsValid(so->hashso_curbuf))
00423         _hash_dropbuf(rel, so->hashso_curbuf);
00424     so->hashso_curbuf = InvalidBuffer;
00425 
00426     /* release lock on bucket, too */
00427     if (so->hashso_bucket_blkno)
00428         _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
00429     so->hashso_bucket_blkno = 0;
00430 
00431     /* set position invalid (this will cause _hash_first call) */
00432     ItemPointerSetInvalid(&(so->hashso_curpos));
00433     ItemPointerSetInvalid(&(so->hashso_heappos));
00434 
00435     /* Update scan key, if a new one is given */
00436     if (scankey && scan->numberOfKeys > 0)
00437     {
00438         memmove(scan->keyData,
00439                 scankey,
00440                 scan->numberOfKeys * sizeof(ScanKeyData));
00441         so->hashso_bucket_valid = false;
00442     }
00443 
00444     PG_RETURN_VOID();
00445 }
00446 
00447 /*
00448  *  hashendscan() -- close down a scan
00449  */
00450 Datum
00451 hashendscan(PG_FUNCTION_ARGS)
00452 {
00453     IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
00454     HashScanOpaque so = (HashScanOpaque) scan->opaque;
00455     Relation    rel = scan->indexRelation;
00456 
00457     /* don't need scan registered anymore */
00458     _hash_dropscan(scan);
00459 
00460     /* release any pin we still hold */
00461     if (BufferIsValid(so->hashso_curbuf))
00462         _hash_dropbuf(rel, so->hashso_curbuf);
00463     so->hashso_curbuf = InvalidBuffer;
00464 
00465     /* release lock on bucket, too */
00466     if (so->hashso_bucket_blkno)
00467         _hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
00468     so->hashso_bucket_blkno = 0;
00469 
00470     pfree(so);
00471     scan->opaque = NULL;
00472 
00473     PG_RETURN_VOID();
00474 }
00475 
00476 /*
00477  *  hashmarkpos() -- save current scan position
00478  */
00479 Datum
00480 hashmarkpos(PG_FUNCTION_ARGS)
00481 {
00482     elog(ERROR, "hash does not support mark/restore");
00483     PG_RETURN_VOID();
00484 }
00485 
00486 /*
00487  *  hashrestrpos() -- restore scan to last saved position
00488  */
00489 Datum
00490 hashrestrpos(PG_FUNCTION_ARGS)
00491 {
00492     elog(ERROR, "hash does not support mark/restore");
00493     PG_RETURN_VOID();
00494 }
00495 
00496 /*
00497  * Bulk deletion of all index entries pointing to a set of heap tuples.
00498  * The set of target tuples is specified via a callback routine that tells
00499  * whether any given heap tuple (identified by ItemPointer) is being deleted.
00500  *
00501  * Result: a palloc'd struct containing statistical info for VACUUM displays.
00502  */
00503 Datum
00504 hashbulkdelete(PG_FUNCTION_ARGS)
00505 {
00506     IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
00507     IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
00508     IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
00509     void       *callback_state = (void *) PG_GETARG_POINTER(3);
00510     Relation    rel = info->index;
00511     double      tuples_removed;
00512     double      num_index_tuples;
00513     double      orig_ntuples;
00514     Bucket      orig_maxbucket;
00515     Bucket      cur_maxbucket;
00516     Bucket      cur_bucket;
00517     Buffer      metabuf;
00518     HashMetaPage metap;
00519     HashMetaPageData local_metapage;
00520 
00521     tuples_removed = 0;
00522     num_index_tuples = 0;
00523 
00524     /*
00525      * Read the metapage to fetch original bucket and tuple counts.  Also, we
00526      * keep a copy of the last-seen metapage so that we can use its
00527      * hashm_spares[] values to compute bucket page addresses.  This is a bit
00528      * hokey but perfectly safe, since the interesting entries in the spares
00529      * array cannot change under us; and it beats rereading the metapage for
00530      * each bucket.
00531      */
00532     metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
00533     metap = HashPageGetMeta(BufferGetPage(metabuf));
00534     orig_maxbucket = metap->hashm_maxbucket;
00535     orig_ntuples = metap->hashm_ntuples;
00536     memcpy(&local_metapage, metap, sizeof(local_metapage));
00537     _hash_relbuf(rel, metabuf);
00538 
00539     /* Scan the buckets that we know exist */
00540     cur_bucket = 0;
00541     cur_maxbucket = orig_maxbucket;
00542 
00543 loop_top:
00544     while (cur_bucket <= cur_maxbucket)
00545     {
00546         BlockNumber bucket_blkno;
00547         BlockNumber blkno;
00548         bool        bucket_dirty = false;
00549 
00550         /* Get address of bucket's start page */
00551         bucket_blkno = BUCKET_TO_BLKNO(&local_metapage, cur_bucket);
00552 
00553         /* Exclusive-lock the bucket so we can shrink it */
00554         _hash_getlock(rel, bucket_blkno, HASH_EXCLUSIVE);
00555 
00556         /* Shouldn't have any active scans locally, either */
00557         if (_hash_has_active_scan(rel, cur_bucket))
00558             elog(ERROR, "hash index has active scan during VACUUM");
00559 
00560         /* Scan each page in bucket */
00561         blkno = bucket_blkno;
00562         while (BlockNumberIsValid(blkno))
00563         {
00564             Buffer      buf;
00565             Page        page;
00566             HashPageOpaque opaque;
00567             OffsetNumber offno;
00568             OffsetNumber maxoffno;
00569             OffsetNumber deletable[MaxOffsetNumber];
00570             int         ndeletable = 0;
00571 
00572             vacuum_delay_point();
00573 
00574             buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
00575                                            LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
00576                                              info->strategy);
00577             page = BufferGetPage(buf);
00578             opaque = (HashPageOpaque) PageGetSpecialPointer(page);
00579             Assert(opaque->hasho_bucket == cur_bucket);
00580 
00581             /* Scan each tuple in page */
00582             maxoffno = PageGetMaxOffsetNumber(page);
00583             for (offno = FirstOffsetNumber;
00584                  offno <= maxoffno;
00585                  offno = OffsetNumberNext(offno))
00586             {
00587                 IndexTuple  itup;
00588                 ItemPointer htup;
00589 
00590                 itup = (IndexTuple) PageGetItem(page,
00591                                                 PageGetItemId(page, offno));
00592                 htup = &(itup->t_tid);
00593                 if (callback(htup, callback_state))
00594                 {
00595                     /* mark the item for deletion */
00596                     deletable[ndeletable++] = offno;
00597                     tuples_removed += 1;
00598                 }
00599                 else
00600                     num_index_tuples += 1;
00601             }
00602 
00603             /*
00604              * Apply deletions and write page if needed, advance to next page.
00605              */
00606             blkno = opaque->hasho_nextblkno;
00607 
00608             if (ndeletable > 0)
00609             {
00610                 PageIndexMultiDelete(page, deletable, ndeletable);
00611                 _hash_wrtbuf(rel, buf);
00612                 bucket_dirty = true;
00613             }
00614             else
00615                 _hash_relbuf(rel, buf);
00616         }
00617 
00618         /* If we deleted anything, try to compact free space */
00619         if (bucket_dirty)
00620             _hash_squeezebucket(rel, cur_bucket, bucket_blkno,
00621                                 info->strategy);
00622 
00623         /* Release bucket lock */
00624         _hash_droplock(rel, bucket_blkno, HASH_EXCLUSIVE);
00625 
00626         /* Advance to next bucket */
00627         cur_bucket++;
00628     }
00629 
00630     /* Write-lock metapage and check for split since we started */
00631     metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE, LH_META_PAGE);
00632     metap = HashPageGetMeta(BufferGetPage(metabuf));
00633 
00634     if (cur_maxbucket != metap->hashm_maxbucket)
00635     {
00636         /* There's been a split, so process the additional bucket(s) */
00637         cur_maxbucket = metap->hashm_maxbucket;
00638         memcpy(&local_metapage, metap, sizeof(local_metapage));
00639         _hash_relbuf(rel, metabuf);
00640         goto loop_top;
00641     }
00642 
00643     /* Okay, we're really done.  Update tuple count in metapage. */
00644 
00645     if (orig_maxbucket == metap->hashm_maxbucket &&
00646         orig_ntuples == metap->hashm_ntuples)
00647     {
00648         /*
00649          * No one has split or inserted anything since start of scan, so
00650          * believe our count as gospel.
00651          */
00652         metap->hashm_ntuples = num_index_tuples;
00653     }
00654     else
00655     {
00656         /*
00657          * Otherwise, our count is untrustworthy since we may have
00658          * double-scanned tuples in split buckets.  Proceed by dead-reckoning.
00659          * (Note: we still return estimated_count = false, because using this
00660          * count is better than not updating reltuples at all.)
00661          */
00662         if (metap->hashm_ntuples > tuples_removed)
00663             metap->hashm_ntuples -= tuples_removed;
00664         else
00665             metap->hashm_ntuples = 0;
00666         num_index_tuples = metap->hashm_ntuples;
00667     }
00668 
00669     _hash_wrtbuf(rel, metabuf);
00670 
00671     /* return statistics */
00672     if (stats == NULL)
00673         stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
00674     stats->estimated_count = false;
00675     stats->num_index_tuples = num_index_tuples;
00676     stats->tuples_removed += tuples_removed;
00677     /* hashvacuumcleanup will fill in num_pages */
00678 
00679     PG_RETURN_POINTER(stats);
00680 }
00681 
00682 /*
00683  * Post-VACUUM cleanup.
00684  *
00685  * Result: a palloc'd struct containing statistical info for VACUUM displays.
00686  */
00687 Datum
00688 hashvacuumcleanup(PG_FUNCTION_ARGS)
00689 {
00690     IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
00691     IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
00692     Relation    rel = info->index;
00693     BlockNumber num_pages;
00694 
00695     /* If hashbulkdelete wasn't called, return NULL signifying no change */
00696     /* Note: this covers the analyze_only case too */
00697     if (stats == NULL)
00698         PG_RETURN_POINTER(NULL);
00699 
00700     /* update statistics */
00701     num_pages = RelationGetNumberOfBlocks(rel);
00702     stats->num_pages = num_pages;
00703 
00704     PG_RETURN_POINTER(stats);
00705 }
00706 
00707 
00708 void
00709 hash_redo(XLogRecPtr lsn, XLogRecord *record)
00710 {
00711     elog(PANIC, "hash_redo: unimplemented");
00712 }