Header And Logo

PostgreSQL
| The world's most advanced open source database.

spgutils.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * spgutils.c
00004  *    various support functions for SP-GiST
00005  *
00006  *
00007  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00008  * Portions Copyright (c) 1994, Regents of the University of California
00009  *
00010  * IDENTIFICATION
00011  *          src/backend/access/spgist/spgutils.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 
00016 #include "postgres.h"
00017 
00018 #include "access/genam.h"
00019 #include "access/reloptions.h"
00020 #include "access/spgist_private.h"
00021 #include "access/transam.h"
00022 #include "access/xact.h"
00023 #include "storage/bufmgr.h"
00024 #include "storage/indexfsm.h"
00025 #include "storage/lmgr.h"
00026 #include "utils/lsyscache.h"
00027 
00028 
00029 /* Fill in a SpGistTypeDesc struct with info about the specified data type */
00030 static void
00031 fillTypeDesc(SpGistTypeDesc *desc, Oid type)
00032 {
00033     desc->type = type;
00034     get_typlenbyval(type, &desc->attlen, &desc->attbyval);
00035 }
00036 
00037 /*
00038  * Fetch local cache of AM-specific info about the index, initializing it
00039  * if necessary
00040  */
00041 SpGistCache *
00042 spgGetCache(Relation index)
00043 {
00044     SpGistCache *cache;
00045 
00046     if (index->rd_amcache == NULL)
00047     {
00048         Oid         atttype;
00049         spgConfigIn in;
00050         FmgrInfo   *procinfo;
00051         Buffer      metabuffer;
00052         SpGistMetaPageData *metadata;
00053 
00054         cache = MemoryContextAllocZero(index->rd_indexcxt,
00055                                        sizeof(SpGistCache));
00056 
00057         /* SPGiST doesn't support multi-column indexes */
00058         Assert(index->rd_att->natts == 1);
00059 
00060         /*
00061          * Get the actual data type of the indexed column from the index
00062          * tupdesc.  We pass this to the opclass config function so that
00063          * polymorphic opclasses are possible.
00064          */
00065         atttype = index->rd_att->attrs[0]->atttypid;
00066 
00067         /* Call the config function to get config info for the opclass */
00068         in.attType = atttype;
00069 
00070         procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC);
00071         FunctionCall2Coll(procinfo,
00072                           index->rd_indcollation[0],
00073                           PointerGetDatum(&in),
00074                           PointerGetDatum(&cache->config));
00075 
00076         /* Get the information we need about each relevant datatype */
00077         fillTypeDesc(&cache->attType, atttype);
00078         fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
00079         fillTypeDesc(&cache->attLabelType, cache->config.labelType);
00080 
00081         /* Last, get the lastUsedPages data from the metapage */
00082         metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
00083         LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
00084 
00085         metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
00086 
00087         if (metadata->magicNumber != SPGIST_MAGIC_NUMBER)
00088             elog(ERROR, "index \"%s\" is not an SP-GiST index",
00089                  RelationGetRelationName(index));
00090 
00091         cache->lastUsedPages = metadata->lastUsedPages;
00092 
00093         UnlockReleaseBuffer(metabuffer);
00094 
00095         index->rd_amcache = (void *) cache;
00096     }
00097     else
00098     {
00099         /* assume it's up to date */
00100         cache = (SpGistCache *) index->rd_amcache;
00101     }
00102 
00103     return cache;
00104 }
00105 
00106 /* Initialize SpGistState for working with the given index */
00107 void
00108 initSpGistState(SpGistState *state, Relation index)
00109 {
00110     SpGistCache *cache;
00111 
00112     /* Get cached static information about index */
00113     cache = spgGetCache(index);
00114 
00115     state->config = cache->config;
00116     state->attType = cache->attType;
00117     state->attPrefixType = cache->attPrefixType;
00118     state->attLabelType = cache->attLabelType;
00119 
00120     /* Make workspace for constructing dead tuples */
00121     state->deadTupleStorage = palloc0(SGDTSIZE);
00122 
00123     /* Set XID to use in redirection tuples */
00124     state->myXid = GetTopTransactionIdIfAny();
00125 
00126     /* Assume we're not in an index build (spgbuild will override) */
00127     state->isBuild = false;
00128 }
00129 
00130 /*
00131  * Allocate a new page (either by recycling, or by extending the index file).
00132  *
00133  * The returned buffer is already pinned and exclusive-locked.
00134  * Caller is responsible for initializing the page by calling SpGistInitBuffer.
00135  */
00136 Buffer
00137 SpGistNewBuffer(Relation index)
00138 {
00139     Buffer      buffer;
00140     bool        needLock;
00141 
00142     /* First, try to get a page from FSM */
00143     for (;;)
00144     {
00145         BlockNumber blkno = GetFreeIndexPage(index);
00146 
00147         if (blkno == InvalidBlockNumber)
00148             break;              /* nothing known to FSM */
00149 
00150         /*
00151          * The fixed pages shouldn't ever be listed in FSM, but just in case
00152          * one is, ignore it.
00153          */
00154         if (SpGistBlockIsFixed(blkno))
00155             continue;
00156 
00157         buffer = ReadBuffer(index, blkno);
00158 
00159         /*
00160          * We have to guard against the possibility that someone else already
00161          * recycled this page; the buffer may be locked if so.
00162          */
00163         if (ConditionalLockBuffer(buffer))
00164         {
00165             Page        page = BufferGetPage(buffer);
00166 
00167             if (PageIsNew(page))
00168                 return buffer;  /* OK to use, if never initialized */
00169 
00170             if (SpGistPageIsDeleted(page) || PageIsEmpty(page))
00171                 return buffer;  /* OK to use */
00172 
00173             LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
00174         }
00175 
00176         /* Can't use it, so release buffer and try again */
00177         ReleaseBuffer(buffer);
00178     }
00179 
00180     /* Must extend the file */
00181     needLock = !RELATION_IS_LOCAL(index);
00182     if (needLock)
00183         LockRelationForExtension(index, ExclusiveLock);
00184 
00185     buffer = ReadBuffer(index, P_NEW);
00186     LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
00187 
00188     if (needLock)
00189         UnlockRelationForExtension(index, ExclusiveLock);
00190 
00191     return buffer;
00192 }
00193 
00194 /*
00195  * Update index metapage's lastUsedPages info from local cache, if possible
00196  *
00197  * Updating meta page isn't critical for index working, so
00198  * 1 use ConditionalLockBuffer to improve concurrency
00199  * 2 don't WAL-log metabuffer changes to decrease WAL traffic
00200  */
00201 void
00202 SpGistUpdateMetaPage(Relation index)
00203 {
00204     SpGistCache *cache = (SpGistCache *) index->rd_amcache;
00205 
00206     if (cache != NULL)
00207     {
00208         Buffer      metabuffer;
00209         SpGistMetaPageData *metadata;
00210 
00211         metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
00212 
00213         if (ConditionalLockBuffer(metabuffer))
00214         {
00215             metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
00216             metadata->lastUsedPages = cache->lastUsedPages;
00217 
00218             MarkBufferDirty(metabuffer);
00219             UnlockReleaseBuffer(metabuffer);
00220         }
00221         else
00222         {
00223             ReleaseBuffer(metabuffer);
00224         }
00225     }
00226 }
00227 
00228 /* Macro to select proper element of lastUsedPages cache depending on flags */
00229 /* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */
00230 #define GET_LUP(c, f)  (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES])
00231 
00232 /*
00233  * Allocate and initialize a new buffer of the type and parity specified by
00234  * flags.  The returned buffer is already pinned and exclusive-locked.
00235  *
00236  * When requesting an inner page, if we get one with the wrong parity,
00237  * we just release the buffer and try again.  We will get a different page
00238  * because GetFreeIndexPage will have marked the page used in FSM.  The page
00239  * is entered in our local lastUsedPages cache, so there's some hope of
00240  * making use of it later in this session, but otherwise we rely on VACUUM
00241  * to eventually re-enter the page in FSM, making it available for recycling.
00242  * Note that such a page does not get marked dirty here, so unless it's used
00243  * fairly soon, the buffer will just get discarded and the page will remain
00244  * as it was on disk.
00245  *
00246  * When we return a buffer to the caller, the page is *not* entered into
00247  * the lastUsedPages cache; we expect the caller will do so after it's taken
00248  * whatever space it will use.  This is because after the caller has used up
00249  * some space, the page might have less space than whatever was cached already
00250  * so we'd rather not trash the old cache entry.
00251  */
00252 static Buffer
00253 allocNewBuffer(Relation index, int flags)
00254 {
00255     SpGistCache *cache = spgGetCache(index);
00256     uint16      pageflags = 0;
00257 
00258     if (GBUF_REQ_LEAF(flags))
00259         pageflags |= SPGIST_LEAF;
00260     if (GBUF_REQ_NULLS(flags))
00261         pageflags |= SPGIST_NULLS;
00262 
00263     for (;;)
00264     {
00265         Buffer      buffer;
00266 
00267         buffer = SpGistNewBuffer(index);
00268         SpGistInitBuffer(buffer, pageflags);
00269 
00270         if (pageflags & SPGIST_LEAF)
00271         {
00272             /* Leaf pages have no parity concerns, so just use it */
00273             return buffer;
00274         }
00275         else
00276         {
00277             BlockNumber blkno = BufferGetBlockNumber(buffer);
00278             int         blkFlags = GBUF_INNER_PARITY(blkno);
00279 
00280             if ((flags & GBUF_PARITY_MASK) == blkFlags)
00281             {
00282                 /* Page has right parity, use it */
00283                 return buffer;
00284             }
00285             else
00286             {
00287                 /* Page has wrong parity, record it in cache and try again */
00288                 if (pageflags & SPGIST_NULLS)
00289                     blkFlags |= GBUF_NULLS;
00290                 cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno;
00291                 cache->lastUsedPages.cachedPage[blkFlags].freeSpace =
00292                     PageGetExactFreeSpace(BufferGetPage(buffer));
00293                 UnlockReleaseBuffer(buffer);
00294             }
00295         }
00296     }
00297 }
00298 
00299 /*
00300  * Get a buffer of the type and parity specified by flags, having at least
00301  * as much free space as indicated by needSpace.  We use the lastUsedPages
00302  * cache to assign the same buffer previously requested when possible.
00303  * The returned buffer is already pinned and exclusive-locked.
00304  *
00305  * *isNew is set true if the page was initialized here, false if it was
00306  * already valid.
00307  */
00308 Buffer
00309 SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
00310 {
00311     SpGistCache *cache = spgGetCache(index);
00312     SpGistLastUsedPage *lup;
00313 
00314     /* Bail out if even an empty page wouldn't meet the demand */
00315     if (needSpace > SPGIST_PAGE_CAPACITY)
00316         elog(ERROR, "desired SPGiST tuple size is too big");
00317 
00318     /*
00319      * If possible, increase the space request to include relation's
00320      * fillfactor.  This ensures that when we add unrelated tuples to a page,
00321      * we try to keep 100-fillfactor% available for adding tuples that are
00322      * related to the ones already on it.  But fillfactor mustn't cause an
00323      * error for requests that would otherwise be legal.
00324      */
00325     needSpace += RelationGetTargetPageFreeSpace(index,
00326                                                 SPGIST_DEFAULT_FILLFACTOR);
00327     needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY);
00328 
00329     /* Get the cache entry for this flags setting */
00330     lup = GET_LUP(cache, flags);
00331 
00332     /* If we have nothing cached, just turn it over to allocNewBuffer */
00333     if (lup->blkno == InvalidBlockNumber)
00334     {
00335         *isNew = true;
00336         return allocNewBuffer(index, flags);
00337     }
00338 
00339     /* fixed pages should never be in cache */
00340     Assert(!SpGistBlockIsFixed(lup->blkno));
00341 
00342     /* If cached freeSpace isn't enough, don't bother looking at the page */
00343     if (lup->freeSpace >= needSpace)
00344     {
00345         Buffer      buffer;
00346         Page        page;
00347 
00348         buffer = ReadBuffer(index, lup->blkno);
00349 
00350         if (!ConditionalLockBuffer(buffer))
00351         {
00352             /*
00353              * buffer is locked by another process, so return a new buffer
00354              */
00355             ReleaseBuffer(buffer);
00356             *isNew = true;
00357             return allocNewBuffer(index, flags);
00358         }
00359 
00360         page = BufferGetPage(buffer);
00361 
00362         if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
00363         {
00364             /* OK to initialize the page */
00365             uint16      pageflags = 0;
00366 
00367             if (GBUF_REQ_LEAF(flags))
00368                 pageflags |= SPGIST_LEAF;
00369             if (GBUF_REQ_NULLS(flags))
00370                 pageflags |= SPGIST_NULLS;
00371             SpGistInitBuffer(buffer, pageflags);
00372             lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
00373             *isNew = true;
00374             return buffer;
00375         }
00376 
00377         /*
00378          * Check that page is of right type and has enough space.  We must
00379          * recheck this since our cache isn't necessarily up to date.
00380          */
00381         if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) &&
00382             (GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page)))
00383         {
00384             int         freeSpace = PageGetExactFreeSpace(page);
00385 
00386             if (freeSpace >= needSpace)
00387             {
00388                 /* Success, update freespace info and return the buffer */
00389                 lup->freeSpace = freeSpace - needSpace;
00390                 *isNew = false;
00391                 return buffer;
00392             }
00393         }
00394 
00395         /*
00396          * fallback to allocation of new buffer
00397          */
00398         UnlockReleaseBuffer(buffer);
00399     }
00400 
00401     /* No success with cache, so return a new buffer */
00402     *isNew = true;
00403     return allocNewBuffer(index, flags);
00404 }
00405 
00406 /*
00407  * Update lastUsedPages cache when done modifying a page.
00408  *
00409  * We update the appropriate cache entry if it already contained this page
00410  * (its freeSpace is likely obsolete), or if this page has more space than
00411  * whatever we had cached.
00412  */
00413 void
00414 SpGistSetLastUsedPage(Relation index, Buffer buffer)
00415 {
00416     SpGistCache *cache = spgGetCache(index);
00417     SpGistLastUsedPage *lup;
00418     int         freeSpace;
00419     Page        page = BufferGetPage(buffer);
00420     BlockNumber blkno = BufferGetBlockNumber(buffer);
00421     int         flags;
00422 
00423     /* Never enter fixed pages (root pages) in cache, though */
00424     if (SpGistBlockIsFixed(blkno))
00425         return;
00426 
00427     if (SpGistPageIsLeaf(page))
00428         flags = GBUF_LEAF;
00429     else
00430         flags = GBUF_INNER_PARITY(blkno);
00431     if (SpGistPageStoresNulls(page))
00432         flags |= GBUF_NULLS;
00433 
00434     lup = GET_LUP(cache, flags);
00435 
00436     freeSpace = PageGetExactFreeSpace(page);
00437     if (lup->blkno == InvalidBlockNumber || lup->blkno == blkno ||
00438         lup->freeSpace < freeSpace)
00439     {
00440         lup->blkno = blkno;
00441         lup->freeSpace = freeSpace;
00442     }
00443 }
00444 
00445 /*
00446  * Initialize an SPGiST page to empty, with specified flags
00447  */
00448 void
00449 SpGistInitPage(Page page, uint16 f)
00450 {
00451     SpGistPageOpaque opaque;
00452 
00453     PageInit(page, BLCKSZ, MAXALIGN(sizeof(SpGistPageOpaqueData)));
00454     opaque = SpGistPageGetOpaque(page);
00455     memset(opaque, 0, sizeof(SpGistPageOpaqueData));
00456     opaque->flags = f;
00457     opaque->spgist_page_id = SPGIST_PAGE_ID;
00458 }
00459 
00460 /*
00461  * Initialize a buffer's page to empty, with specified flags
00462  */
00463 void
00464 SpGistInitBuffer(Buffer b, uint16 f)
00465 {
00466     Assert(BufferGetPageSize(b) == BLCKSZ);
00467     SpGistInitPage(BufferGetPage(b), f);
00468 }
00469 
00470 /*
00471  * Initialize metadata page
00472  */
00473 void
00474 SpGistInitMetapage(Page page)
00475 {
00476     SpGistMetaPageData *metadata;
00477     int         i;
00478 
00479     SpGistInitPage(page, SPGIST_META);
00480     metadata = SpGistPageGetMeta(page);
00481     memset(metadata, 0, sizeof(SpGistMetaPageData));
00482     metadata->magicNumber = SPGIST_MAGIC_NUMBER;
00483 
00484     /* initialize last-used-page cache to empty */
00485     for (i = 0; i < SPGIST_CACHED_PAGES; i++)
00486         metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber;
00487 }
00488 
00489 /*
00490  * reloptions processing for SPGiST
00491  */
00492 Datum
00493 spgoptions(PG_FUNCTION_ARGS)
00494 {
00495     Datum       reloptions = PG_GETARG_DATUM(0);
00496     bool        validate = PG_GETARG_BOOL(1);
00497     bytea      *result;
00498 
00499     result = default_reloptions(reloptions, validate, RELOPT_KIND_SPGIST);
00500 
00501     if (result)
00502         PG_RETURN_BYTEA_P(result);
00503     PG_RETURN_NULL();
00504 }
00505 
00506 /*
00507  * Get the space needed to store a non-null datum of the indicated type.
00508  * Note the result is already rounded up to a MAXALIGN boundary.
00509  * Also, we follow the SPGiST convention that pass-by-val types are
00510  * just stored in their Datum representation (compare memcpyDatum).
00511  */
00512 unsigned int
00513 SpGistGetTypeSize(SpGistTypeDesc *att, Datum datum)
00514 {
00515     unsigned int size;
00516 
00517     if (att->attbyval)
00518         size = sizeof(Datum);
00519     else if (att->attlen > 0)
00520         size = att->attlen;
00521     else
00522         size = VARSIZE_ANY(datum);
00523 
00524     return MAXALIGN(size);
00525 }
00526 
00527 /*
00528  * Copy the given non-null datum to *target
00529  */
00530 static void
00531 memcpyDatum(void *target, SpGistTypeDesc *att, Datum datum)
00532 {
00533     unsigned int size;
00534 
00535     if (att->attbyval)
00536     {
00537         memcpy(target, &datum, sizeof(Datum));
00538     }
00539     else
00540     {
00541         size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum);
00542         memcpy(target, DatumGetPointer(datum), size);
00543     }
00544 }
00545 
00546 /*
00547  * Construct a leaf tuple containing the given heap TID and datum value
00548  */
00549 SpGistLeafTuple
00550 spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
00551                  Datum datum, bool isnull)
00552 {
00553     SpGistLeafTuple tup;
00554     unsigned int size;
00555 
00556     /* compute space needed (note result is already maxaligned) */
00557     size = SGLTHDRSZ;
00558     if (!isnull)
00559         size += SpGistGetTypeSize(&state->attType, datum);
00560 
00561     /*
00562      * Ensure that we can replace the tuple with a dead tuple later.  This
00563      * test is unnecessary when !isnull, but let's be safe.
00564      */
00565     if (size < SGDTSIZE)
00566         size = SGDTSIZE;
00567 
00568     /* OK, form the tuple */
00569     tup = (SpGistLeafTuple) palloc0(size);
00570 
00571     tup->size = size;
00572     tup->nextOffset = InvalidOffsetNumber;
00573     tup->heapPtr = *heapPtr;
00574     if (!isnull)
00575         memcpyDatum(SGLTDATAPTR(tup), &state->attType, datum);
00576 
00577     return tup;
00578 }
00579 
00580 /*
00581  * Construct a node (to go into an inner tuple) containing the given label
00582  *
00583  * Note that the node's downlink is just set invalid here.  Caller will fill
00584  * it in later.
00585  */
00586 SpGistNodeTuple
00587 spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
00588 {
00589     SpGistNodeTuple tup;
00590     unsigned int size;
00591     unsigned short infomask = 0;
00592 
00593     /* compute space needed (note result is already maxaligned) */
00594     size = SGNTHDRSZ;
00595     if (!isnull)
00596         size += SpGistGetTypeSize(&state->attLabelType, label);
00597 
00598     /*
00599      * Here we make sure that the size will fit in the field reserved for it
00600      * in t_info.
00601      */
00602     if ((size & INDEX_SIZE_MASK) != size)
00603         ereport(ERROR,
00604                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
00605                  errmsg("index row requires %lu bytes, maximum size is %lu",
00606                         (unsigned long) size,
00607                         (unsigned long) INDEX_SIZE_MASK)));
00608 
00609     tup = (SpGistNodeTuple) palloc0(size);
00610 
00611     if (isnull)
00612         infomask |= INDEX_NULL_MASK;
00613     /* we don't bother setting the INDEX_VAR_MASK bit */
00614     infomask |= size;
00615     tup->t_info = infomask;
00616 
00617     /* The TID field will be filled in later */
00618     ItemPointerSetInvalid(&tup->t_tid);
00619 
00620     if (!isnull)
00621         memcpyDatum(SGNTDATAPTR(tup), &state->attLabelType, label);
00622 
00623     return tup;
00624 }
00625 
00626 /*
00627  * Construct an inner tuple containing the given prefix and node array
00628  */
00629 SpGistInnerTuple
00630 spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix,
00631                   int nNodes, SpGistNodeTuple *nodes)
00632 {
00633     SpGistInnerTuple tup;
00634     unsigned int size;
00635     unsigned int prefixSize;
00636     int         i;
00637     char       *ptr;
00638 
00639     /* Compute size needed */
00640     if (hasPrefix)
00641         prefixSize = SpGistGetTypeSize(&state->attPrefixType, prefix);
00642     else
00643         prefixSize = 0;
00644 
00645     size = SGITHDRSZ + prefixSize;
00646 
00647     /* Note: we rely on node tuple sizes to be maxaligned already */
00648     for (i = 0; i < nNodes; i++)
00649         size += IndexTupleSize(nodes[i]);
00650 
00651     /*
00652      * Ensure that we can replace the tuple with a dead tuple later.  This
00653      * test is unnecessary given current tuple layouts, but let's be safe.
00654      */
00655     if (size < SGDTSIZE)
00656         size = SGDTSIZE;
00657 
00658     /*
00659      * Inner tuple should be small enough to fit on a page
00660      */
00661     if (size > SPGIST_PAGE_CAPACITY - sizeof(ItemIdData))
00662         ereport(ERROR,
00663                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
00664                  errmsg("SP-GiST inner tuple size %lu exceeds maximum %lu",
00665                         (unsigned long) size,
00666                 (unsigned long) (SPGIST_PAGE_CAPACITY - sizeof(ItemIdData))),
00667             errhint("Values larger than a buffer page cannot be indexed.")));
00668 
00669     /*
00670      * Check for overflow of header fields --- probably can't fail if the
00671      * above succeeded, but let's be paranoid
00672      */
00673     if (size > SGITMAXSIZE ||
00674         prefixSize > SGITMAXPREFIXSIZE ||
00675         nNodes > SGITMAXNNODES)
00676         elog(ERROR, "SPGiST inner tuple header field is too small");
00677 
00678     /* OK, form the tuple */
00679     tup = (SpGistInnerTuple) palloc0(size);
00680 
00681     tup->nNodes = nNodes;
00682     tup->prefixSize = prefixSize;
00683     tup->size = size;
00684 
00685     if (hasPrefix)
00686         memcpyDatum(SGITDATAPTR(tup), &state->attPrefixType, prefix);
00687 
00688     ptr = (char *) SGITNODEPTR(tup);
00689 
00690     for (i = 0; i < nNodes; i++)
00691     {
00692         SpGistNodeTuple node = nodes[i];
00693 
00694         memcpy(ptr, node, IndexTupleSize(node));
00695         ptr += IndexTupleSize(node);
00696     }
00697 
00698     return tup;
00699 }
00700 
00701 /*
00702  * Construct a "dead" tuple to replace a tuple being deleted.
00703  *
00704  * The state can be SPGIST_REDIRECT, SPGIST_DEAD, or SPGIST_PLACEHOLDER.
00705  * For a REDIRECT tuple, a pointer (blkno+offset) must be supplied, and
00706  * the xid field is filled in automatically.
00707  *
00708  * This is called in critical sections, so we don't use palloc; the tuple
00709  * is built in preallocated storage.  It should be copied before another
00710  * call with different parameters can occur.
00711  */
00712 SpGistDeadTuple
00713 spgFormDeadTuple(SpGistState *state, int tupstate,
00714                  BlockNumber blkno, OffsetNumber offnum)
00715 {
00716     SpGistDeadTuple tuple = (SpGistDeadTuple) state->deadTupleStorage;
00717 
00718     tuple->tupstate = tupstate;
00719     tuple->size = SGDTSIZE;
00720     tuple->nextOffset = InvalidOffsetNumber;
00721 
00722     if (tupstate == SPGIST_REDIRECT)
00723     {
00724         ItemPointerSet(&tuple->pointer, blkno, offnum);
00725         Assert(TransactionIdIsValid(state->myXid));
00726         tuple->xid = state->myXid;
00727     }
00728     else
00729     {
00730         ItemPointerSetInvalid(&tuple->pointer);
00731         tuple->xid = InvalidTransactionId;
00732     }
00733 
00734     return tuple;
00735 }
00736 
00737 /*
00738  * Extract the label datums of the nodes within innerTuple
00739  *
00740  * Returns NULL if label datums are NULLs
00741  */
00742 Datum *
00743 spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
00744 {
00745     Datum      *nodeLabels;
00746     int         i;
00747     SpGistNodeTuple node;
00748 
00749     /* Either all the labels must be NULL, or none. */
00750     node = SGITNODEPTR(innerTuple);
00751     if (IndexTupleHasNulls(node))
00752     {
00753         SGITITERATE(innerTuple, i, node)
00754         {
00755             if (!IndexTupleHasNulls(node))
00756                 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
00757         }
00758         /* They're all null, so just return NULL */
00759         return NULL;
00760     }
00761     else
00762     {
00763         nodeLabels = (Datum *) palloc(sizeof(Datum) * innerTuple->nNodes);
00764         SGITITERATE(innerTuple, i, node)
00765         {
00766             if (IndexTupleHasNulls(node))
00767                 elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
00768             nodeLabels[i] = SGNTDATUM(node, state);
00769         }
00770         return nodeLabels;
00771     }
00772 }
00773 
00774 /*
00775  * Add a new item to the page, replacing a PLACEHOLDER item if possible.
00776  * Return the location it's inserted at, or InvalidOffsetNumber on failure.
00777  *
00778  * If startOffset isn't NULL, we start searching for placeholders at
00779  * *startOffset, and update that to the next place to search.  This is just
00780  * an optimization for repeated insertions.
00781  *
00782  * If errorOK is false, we throw error when there's not enough room,
00783  * rather than returning InvalidOffsetNumber.
00784  */
00785 OffsetNumber
00786 SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
00787                      OffsetNumber *startOffset, bool errorOK)
00788 {
00789     SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
00790     OffsetNumber i,
00791                 maxoff,
00792                 offnum;
00793 
00794     if (opaque->nPlaceholder > 0 &&
00795         PageGetExactFreeSpace(page) + SGDTSIZE >= MAXALIGN(size))
00796     {
00797         /* Try to replace a placeholder */
00798         maxoff = PageGetMaxOffsetNumber(page);
00799         offnum = InvalidOffsetNumber;
00800 
00801         for (;;)
00802         {
00803             if (startOffset && *startOffset != InvalidOffsetNumber)
00804                 i = *startOffset;
00805             else
00806                 i = FirstOffsetNumber;
00807             for (; i <= maxoff; i++)
00808             {
00809                 SpGistDeadTuple it = (SpGistDeadTuple) PageGetItem(page,
00810                                                      PageGetItemId(page, i));
00811 
00812                 if (it->tupstate == SPGIST_PLACEHOLDER)
00813                 {
00814                     offnum = i;
00815                     break;
00816                 }
00817             }
00818 
00819             /* Done if we found a placeholder */
00820             if (offnum != InvalidOffsetNumber)
00821                 break;
00822 
00823             if (startOffset && *startOffset != InvalidOffsetNumber)
00824             {
00825                 /* Hint was no good, re-search from beginning */
00826                 *startOffset = InvalidOffsetNumber;
00827                 continue;
00828             }
00829 
00830             /* Hmm, no placeholder found? */
00831             opaque->nPlaceholder = 0;
00832             break;
00833         }
00834 
00835         if (offnum != InvalidOffsetNumber)
00836         {
00837             /* Replace the placeholder tuple */
00838             PageIndexTupleDelete(page, offnum);
00839 
00840             offnum = PageAddItem(page, item, size, offnum, false, false);
00841 
00842             /*
00843              * We should not have failed given the size check at the top of
00844              * the function, but test anyway.  If we did fail, we must PANIC
00845              * because we've already deleted the placeholder tuple, and
00846              * there's no other way to keep the damage from getting to disk.
00847              */
00848             if (offnum != InvalidOffsetNumber)
00849             {
00850                 Assert(opaque->nPlaceholder > 0);
00851                 opaque->nPlaceholder--;
00852                 if (startOffset)
00853                     *startOffset = offnum + 1;
00854             }
00855             else
00856                 elog(PANIC, "failed to add item of size %u to SPGiST index page",
00857                      (int) size);
00858 
00859             return offnum;
00860         }
00861     }
00862 
00863     /* No luck in replacing a placeholder, so just add it to the page */
00864     offnum = PageAddItem(page, item, size,
00865                          InvalidOffsetNumber, false, false);
00866 
00867     if (offnum == InvalidOffsetNumber && !errorOK)
00868         elog(ERROR, "failed to add item of size %u to SPGiST index page",
00869              (int) size);
00870 
00871     return offnum;
00872 }