Header And Logo

PostgreSQL
| The world's most advanced open source database.

gist.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * gist.c
00004  *    interface routines for the postgres GiST index access method.
00005  *
00006  *
00007  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00008  * Portions Copyright (c) 1994, Regents of the University of California
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/access/gist/gist.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 #include "postgres.h"
00016 
00017 #include "access/genam.h"
00018 #include "access/gist_private.h"
00019 #include "access/heapam_xlog.h"
00020 #include "catalog/index.h"
00021 #include "catalog/pg_collation.h"
00022 #include "miscadmin.h"
00023 #include "storage/bufmgr.h"
00024 #include "storage/indexfsm.h"
00025 #include "utils/memutils.h"
00026 #include "utils/rel.h"
00027 
00028 /* non-export function prototypes */
00029 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
00030 static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
00031              GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
00032 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
00033                  GISTSTATE *giststate,
00034                  IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
00035                  Buffer leftchild, Buffer rightchild,
00036                  bool unlockbuf, bool unlockleftchild);
00037 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
00038                 GISTSTATE *giststate, List *splitinfo, bool releasebuf);
00039 
00040 
00041 #define ROTATEDIST(d) do { \
00042     SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
00043     memset(tmp,0,sizeof(SplitedPageLayout)); \
00044     tmp->block.blkno = InvalidBlockNumber;  \
00045     tmp->buffer = InvalidBuffer;    \
00046     tmp->next = (d); \
00047     (d)=tmp; \
00048 } while(0)
00049 
00050 
00051 /*
00052  * Create and return a temporary memory context for use by GiST. We
00053  * _always_ invoke user-provided methods in a temporary memory
00054  * context, so that memory leaks in those functions cannot cause
00055  * problems. Also, we use some additional temporary contexts in the
00056  * GiST code itself, to avoid the need to do some awkward manual
00057  * memory management.
00058  */
00059 MemoryContext
00060 createTempGistContext(void)
00061 {
00062     return AllocSetContextCreate(CurrentMemoryContext,
00063                                  "GiST temporary context",
00064                                  ALLOCSET_DEFAULT_MINSIZE,
00065                                  ALLOCSET_DEFAULT_INITSIZE,
00066                                  ALLOCSET_DEFAULT_MAXSIZE);
00067 }
00068 
00069 /*
00070  *  gistbuildempty() -- build an empty gist index in the initialization fork
00071  */
00072 Datum
00073 gistbuildempty(PG_FUNCTION_ARGS)
00074 {
00075     Relation    index = (Relation) PG_GETARG_POINTER(0);
00076     Buffer      buffer;
00077 
00078     /* Initialize the root page */
00079     buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
00080     LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
00081 
00082     /* Initialize and xlog buffer */
00083     START_CRIT_SECTION();
00084     GISTInitBuffer(buffer, F_LEAF);
00085     MarkBufferDirty(buffer);
00086     log_newpage_buffer(buffer);
00087     END_CRIT_SECTION();
00088 
00089     /* Unlock and release the buffer */
00090     UnlockReleaseBuffer(buffer);
00091 
00092     PG_RETURN_VOID();
00093 }
00094 
00095 /*
00096  *  gistinsert -- wrapper for GiST tuple insertion.
00097  *
00098  *    This is the public interface routine for tuple insertion in GiSTs.
00099  *    It doesn't do any work; just locks the relation and passes the buck.
00100  */
00101 Datum
00102 gistinsert(PG_FUNCTION_ARGS)
00103 {
00104     Relation    r = (Relation) PG_GETARG_POINTER(0);
00105     Datum      *values = (Datum *) PG_GETARG_POINTER(1);
00106     bool       *isnull = (bool *) PG_GETARG_POINTER(2);
00107     ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
00108 
00109 #ifdef NOT_USED
00110     Relation    heapRel = (Relation) PG_GETARG_POINTER(4);
00111     IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
00112 #endif
00113     IndexTuple  itup;
00114     GISTSTATE  *giststate;
00115     MemoryContext oldCxt;
00116 
00117     giststate = initGISTstate(r);
00118 
00119     /*
00120      * We use the giststate's scan context as temp context too.  This means
00121      * that any memory leaked by the support functions is not reclaimed until
00122      * end of insert.  In most cases, we aren't going to call the support
00123      * functions very many times before finishing the insert, so this seems
00124      * cheaper than resetting a temp context for each function call.
00125      */
00126     oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
00127 
00128     itup = gistFormTuple(giststate, r,
00129                          values, isnull, true /* size is currently bogus */ );
00130     itup->t_tid = *ht_ctid;
00131 
00132     gistdoinsert(r, itup, 0, giststate);
00133 
00134     /* cleanup */
00135     MemoryContextSwitchTo(oldCxt);
00136     freeGISTstate(giststate);
00137 
00138     PG_RETURN_BOOL(false);
00139 }
00140 
00141 
00142 /*
00143  * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
00144  * at that offset is atomically removed along with inserting the new tuples.
00145  * This is used to replace a tuple with a new one.
00146  *
00147  * If 'leftchildbuf' is valid, we're inserting the downlink for the page
00148  * to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
00149  * F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
00150  *
00151  * If 'markfollowright' is true and the page is split, the left child is
00152  * marked with F_FOLLOW_RIGHT flag. That is the normal case. During buffered
00153  * index build, however, there is no concurrent access and the page splitting
00154  * is done in a slightly simpler fashion, and false is passed.
00155  *
00156  * If there is not enough room on the page, it is split. All the split
00157  * pages are kept pinned and locked and returned in *splitinfo, the caller
00158  * is responsible for inserting the downlinks for them. However, if
00159  * 'buffer' is the root page and it needs to be split, gistplacetopage()
00160  * performs the split as one atomic operation, and *splitinfo is set to NIL.
00161  * In that case, we continue to hold the root page locked, and the child
00162  * pages are released; note that new tuple(s) are *not* on the root page
00163  * but in one of the new child pages.
00164  *
00165  * If 'newblkno' is not NULL, returns the block number of page the first
00166  * new/updated tuple was inserted to. Usually it's the given page, but could
00167  * be its right sibling if the page was split.
00168  *
00169  * Returns 'true' if the page was split, 'false' otherwise.
00170  */
00171 bool
00172 gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
00173                 Buffer buffer,
00174                 IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
00175                 BlockNumber *newblkno,
00176                 Buffer leftchildbuf,
00177                 List **splitinfo,
00178                 bool markfollowright)
00179 {
00180     BlockNumber blkno = BufferGetBlockNumber(buffer);
00181     Page        page = BufferGetPage(buffer);
00182     bool        is_leaf = (GistPageIsLeaf(page)) ? true : false;
00183     XLogRecPtr  recptr;
00184     int         i;
00185     bool        is_split;
00186 
00187     /*
00188      * Refuse to modify a page that's incompletely split. This should not
00189      * happen because we finish any incomplete splits while we walk down the
00190      * tree. However, it's remotely possible that another concurrent inserter
00191      * splits a parent page, and errors out before completing the split. We
00192      * will just throw an error in that case, and leave any split we had in
00193      * progress unfinished too. The next insert that comes along will clean up
00194      * the mess.
00195      */
00196     if (GistFollowRight(page))
00197         elog(ERROR, "concurrent GiST page split was incomplete");
00198 
00199     *splitinfo = NIL;
00200 
00201     /*
00202      * if isupdate, remove old key: This node's key has been modified, either
00203      * because a child split occurred or because we needed to adjust our key
00204      * for an insert in a child node. Therefore, remove the old version of
00205      * this node's key.
00206      *
00207      * for WAL replay, in the non-split case we handle this by setting up a
00208      * one-element todelete array; in the split case, it's handled implicitly
00209      * because the tuple vector passed to gistSplit won't include this tuple.
00210      */
00211     is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
00212     if (is_split)
00213     {
00214         /* no space for insertion */
00215         IndexTuple *itvec;
00216         int         tlen;
00217         SplitedPageLayout *dist = NULL,
00218                    *ptr;
00219         BlockNumber oldrlink = InvalidBlockNumber;
00220         GistNSN     oldnsn = 0;
00221         SplitedPageLayout rootpg;
00222         bool        is_rootsplit;
00223 
00224         is_rootsplit = (blkno == GIST_ROOT_BLKNO);
00225 
00226         /*
00227          * Form index tuples vector to split. If we're replacing an old tuple,
00228          * remove the old version from the vector.
00229          */
00230         itvec = gistextractpage(page, &tlen);
00231         if (OffsetNumberIsValid(oldoffnum))
00232         {
00233             /* on inner page we should remove old tuple */
00234             int         pos = oldoffnum - FirstOffsetNumber;
00235 
00236             tlen--;
00237             if (pos != tlen)
00238                 memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
00239         }
00240         itvec = gistjoinvector(itvec, &tlen, itup, ntup);
00241         dist = gistSplit(rel, page, itvec, tlen, giststate);
00242 
00243         /*
00244          * Set up pages to work with. Allocate new buffers for all but the
00245          * leftmost page. The original page becomes the new leftmost page, and
00246          * is just replaced with the new contents.
00247          *
00248          * For a root-split, allocate new buffers for all child pages, the
00249          * original page is overwritten with new root page containing
00250          * downlinks to the new child pages.
00251          */
00252         ptr = dist;
00253         if (!is_rootsplit)
00254         {
00255             /* save old rightlink and NSN */
00256             oldrlink = GistPageGetOpaque(page)->rightlink;
00257             oldnsn = GistPageGetNSN(page);
00258 
00259             dist->buffer = buffer;
00260             dist->block.blkno = BufferGetBlockNumber(buffer);
00261             dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
00262 
00263             /* clean all flags except F_LEAF */
00264             GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
00265 
00266             ptr = ptr->next;
00267         }
00268         for (; ptr; ptr = ptr->next)
00269         {
00270             /* Allocate new page */
00271             ptr->buffer = gistNewBuffer(rel);
00272             GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
00273             ptr->page = BufferGetPage(ptr->buffer);
00274             ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
00275         }
00276 
00277         /*
00278          * Now that we know which blocks the new pages go to, set up downlink
00279          * tuples to point to them.
00280          */
00281         for (ptr = dist; ptr; ptr = ptr->next)
00282         {
00283             ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
00284             GistTupleSetValid(ptr->itup);
00285         }
00286 
00287         /*
00288          * If this is a root split, we construct the new root page with the
00289          * downlinks here directly, instead of requiring the caller to insert
00290          * them. Add the new root page to the list along with the child pages.
00291          */
00292         if (is_rootsplit)
00293         {
00294             IndexTuple *downlinks;
00295             int         ndownlinks = 0;
00296             int         i;
00297 
00298             rootpg.buffer = buffer;
00299             rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
00300             GistPageGetOpaque(rootpg.page)->flags = 0;
00301 
00302             /* Prepare a vector of all the downlinks */
00303             for (ptr = dist; ptr; ptr = ptr->next)
00304                 ndownlinks++;
00305             downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
00306             for (i = 0, ptr = dist; ptr; ptr = ptr->next)
00307                 downlinks[i++] = ptr->itup;
00308 
00309             rootpg.block.blkno = GIST_ROOT_BLKNO;
00310             rootpg.block.num = ndownlinks;
00311             rootpg.list = gistfillitupvec(downlinks, ndownlinks,
00312                                           &(rootpg.lenlist));
00313             rootpg.itup = NULL;
00314 
00315             rootpg.next = dist;
00316             dist = &rootpg;
00317         }
00318         else
00319         {
00320             /* Prepare split-info to be returned to caller */
00321             for (ptr = dist; ptr; ptr = ptr->next)
00322             {
00323                 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
00324 
00325                 si->buf = ptr->buffer;
00326                 si->downlink = ptr->itup;
00327                 *splitinfo = lappend(*splitinfo, si);
00328             }
00329         }
00330 
00331         /*
00332          * Fill all pages. All the pages are new, ie. freshly allocated empty
00333          * pages, or a temporary copy of the old page.
00334          */
00335         for (ptr = dist; ptr; ptr = ptr->next)
00336         {
00337             char       *data = (char *) (ptr->list);
00338 
00339             for (i = 0; i < ptr->block.num; i++)
00340             {
00341                 IndexTuple  thistup = (IndexTuple) data;
00342 
00343                 if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
00344                     elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
00345 
00346                 /*
00347                  * If this is the first inserted/updated tuple, let the caller
00348                  * know which page it landed on.
00349                  */
00350                 if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
00351                     *newblkno = ptr->block.blkno;
00352 
00353                 data += IndexTupleSize(thistup);
00354             }
00355 
00356             /* Set up rightlinks */
00357             if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
00358                 GistPageGetOpaque(ptr->page)->rightlink =
00359                     ptr->next->block.blkno;
00360             else
00361                 GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
00362 
00363             /*
00364              * Mark the all but the right-most page with the follow-right
00365              * flag. It will be cleared as soon as the downlink is inserted
00366              * into the parent, but this ensures that if we error out before
00367              * that, the index is still consistent. (in buffering build mode,
00368              * any error will abort the index build anyway, so this is not
00369              * needed.)
00370              */
00371             if (ptr->next && !is_rootsplit && markfollowright)
00372                 GistMarkFollowRight(ptr->page);
00373             else
00374                 GistClearFollowRight(ptr->page);
00375 
00376             /*
00377              * Copy the NSN of the original page to all pages. The
00378              * F_FOLLOW_RIGHT flags ensure that scans will follow the
00379              * rightlinks until the downlinks are inserted.
00380              */
00381             GistPageSetNSN(ptr->page, oldnsn);
00382         }
00383 
00384         START_CRIT_SECTION();
00385 
00386         /*
00387          * Must mark buffers dirty before XLogInsert, even though we'll still
00388          * be changing their opaque fields below.
00389          */
00390         for (ptr = dist; ptr; ptr = ptr->next)
00391             MarkBufferDirty(ptr->buffer);
00392         if (BufferIsValid(leftchildbuf))
00393             MarkBufferDirty(leftchildbuf);
00394 
00395         /*
00396          * The first page in the chain was a temporary working copy meant to
00397          * replace the old page. Copy it over the old page.
00398          */
00399         PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
00400         dist->page = BufferGetPage(dist->buffer);
00401 
00402         /* Write the WAL record */
00403         if (RelationNeedsWAL(rel))
00404             recptr = gistXLogSplit(rel->rd_node, blkno, is_leaf,
00405                                    dist, oldrlink, oldnsn, leftchildbuf,
00406                                    markfollowright);
00407         else
00408             recptr = gistGetFakeLSN(rel);
00409 
00410         for (ptr = dist; ptr; ptr = ptr->next)
00411         {
00412             PageSetLSN(ptr->page, recptr);
00413         }
00414 
00415         /*
00416          * Return the new child buffers to the caller.
00417          *
00418          * If this was a root split, we've already inserted the downlink
00419          * pointers, in the form of a new root page. Therefore we can release
00420          * all the new buffers, and keep just the root page locked.
00421          */
00422         if (is_rootsplit)
00423         {
00424             for (ptr = dist->next; ptr; ptr = ptr->next)
00425                 UnlockReleaseBuffer(ptr->buffer);
00426         }
00427     }
00428     else
00429     {
00430         /*
00431          * Enough space. We also get here if ntuples==0.
00432          */
00433         START_CRIT_SECTION();
00434 
00435         if (OffsetNumberIsValid(oldoffnum))
00436             PageIndexTupleDelete(page, oldoffnum);
00437         gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
00438 
00439         MarkBufferDirty(buffer);
00440 
00441         if (BufferIsValid(leftchildbuf))
00442             MarkBufferDirty(leftchildbuf);
00443 
00444         if (RelationNeedsWAL(rel))
00445         {
00446             OffsetNumber ndeloffs = 0,
00447                         deloffs[1];
00448 
00449             if (OffsetNumberIsValid(oldoffnum))
00450             {
00451                 deloffs[0] = oldoffnum;
00452                 ndeloffs = 1;
00453             }
00454 
00455             recptr = gistXLogUpdate(rel->rd_node, buffer,
00456                                     deloffs, ndeloffs, itup, ntup,
00457                                     leftchildbuf);
00458 
00459             PageSetLSN(page, recptr);
00460         }
00461         else
00462         {
00463             recptr = gistGetFakeLSN(rel);
00464             PageSetLSN(page, recptr);
00465         }
00466 
00467         if (newblkno)
00468             *newblkno = blkno;
00469     }
00470 
00471     /*
00472      * If we inserted the downlink for a child page, set NSN and clear
00473      * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know to
00474      * follow the rightlink if and only if they looked at the parent page
00475      * before we inserted the downlink.
00476      *
00477      * Note that we do this *after* writing the WAL record. That means that
00478      * the possible full page image in the WAL record does not include these
00479      * changes, and they must be replayed even if the page is restored from
00480      * the full page image. There's a chicken-and-egg problem: if we updated
00481      * the child pages first, we wouldn't know the recptr of the WAL record
00482      * we're about to write.
00483      */
00484     if (BufferIsValid(leftchildbuf))
00485     {
00486         Page        leftpg = BufferGetPage(leftchildbuf);
00487 
00488         GistPageSetNSN(leftpg, recptr);
00489         GistClearFollowRight(leftpg);
00490 
00491         PageSetLSN(leftpg, recptr);
00492     }
00493 
00494     END_CRIT_SECTION();
00495 
00496     return is_split;
00497 }
00498 
00499 /*
00500  * Workhouse routine for doing insertion into a GiST index. Note that
00501  * this routine assumes it is invoked in a short-lived memory context,
00502  * so it does not bother releasing palloc'd allocations.
00503  */
00504 void
00505 gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
00506 {
00507     ItemId      iid;
00508     IndexTuple  idxtuple;
00509     GISTInsertStack firststack;
00510     GISTInsertStack *stack;
00511     GISTInsertState state;
00512     bool        xlocked = false;
00513 
00514     memset(&state, 0, sizeof(GISTInsertState));
00515     state.freespace = freespace;
00516     state.r = r;
00517 
00518     /* Start from the root */
00519     firststack.blkno = GIST_ROOT_BLKNO;
00520     firststack.lsn = 0;
00521     firststack.parent = NULL;
00522     firststack.downlinkoffnum = InvalidOffsetNumber;
00523     state.stack = stack = &firststack;
00524 
00525     /*
00526      * Walk down along the path of smallest penalty, updating the parent
00527      * pointers with the key we're inserting as we go. If we crash in the
00528      * middle, the tree is consistent, although the possible parent updates
00529      * were a waste.
00530      */
00531     for (;;)
00532     {
00533         if (XLogRecPtrIsInvalid(stack->lsn))
00534             stack->buffer = ReadBuffer(state.r, stack->blkno);
00535 
00536         /*
00537          * Be optimistic and grab shared lock first. Swap it for an exclusive
00538          * lock later if we need to update the page.
00539          */
00540         if (!xlocked)
00541         {
00542             LockBuffer(stack->buffer, GIST_SHARE);
00543             gistcheckpage(state.r, stack->buffer);
00544         }
00545 
00546         stack->page = (Page) BufferGetPage(stack->buffer);
00547         stack->lsn = PageGetLSN(stack->page);
00548         Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
00549 
00550         /*
00551          * If this page was split but the downlink was never inserted to the
00552          * parent because the inserting backend crashed before doing that, fix
00553          * that now.
00554          */
00555         if (GistFollowRight(stack->page))
00556         {
00557             if (!xlocked)
00558             {
00559                 LockBuffer(stack->buffer, GIST_UNLOCK);
00560                 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
00561                 xlocked = true;
00562                 /* someone might've completed the split when we unlocked */
00563                 if (!GistFollowRight(stack->page))
00564                     continue;
00565             }
00566             gistfixsplit(&state, giststate);
00567 
00568             UnlockReleaseBuffer(stack->buffer);
00569             xlocked = false;
00570             state.stack = stack = stack->parent;
00571             continue;
00572         }
00573 
00574         if (stack->blkno != GIST_ROOT_BLKNO &&
00575             stack->parent->lsn < GistPageGetNSN(stack->page))
00576         {
00577             /*
00578              * Concurrent split detected. There's no guarantee that the
00579              * downlink for this page is consistent with the tuple we're
00580              * inserting anymore, so go back to parent and rechoose the best
00581              * child.
00582              */
00583             UnlockReleaseBuffer(stack->buffer);
00584             xlocked = false;
00585             state.stack = stack = stack->parent;
00586             continue;
00587         }
00588 
00589         if (!GistPageIsLeaf(stack->page))
00590         {
00591             /*
00592              * This is an internal page so continue to walk down the tree.
00593              * Find the child node that has the minimum insertion penalty.
00594              */
00595             BlockNumber childblkno;
00596             IndexTuple  newtup;
00597             GISTInsertStack *item;
00598             OffsetNumber downlinkoffnum;
00599 
00600             downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
00601             iid = PageGetItemId(stack->page, downlinkoffnum);
00602             idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
00603             childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
00604 
00605             /*
00606              * Check that it's not a leftover invalid tuple from pre-9.1
00607              */
00608             if (GistTupleIsInvalid(idxtuple))
00609                 ereport(ERROR,
00610                         (errmsg("index \"%s\" contains an inner tuple marked as invalid",
00611                                 RelationGetRelationName(r)),
00612                          errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
00613                          errhint("Please REINDEX it.")));
00614 
00615             /*
00616              * Check that the key representing the target child node is
00617              * consistent with the key we're inserting. Update it if it's not.
00618              */
00619             newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
00620             if (newtup)
00621             {
00622                 /*
00623                  * Swap shared lock for an exclusive one. Beware, the page may
00624                  * change while we unlock/lock the page...
00625                  */
00626                 if (!xlocked)
00627                 {
00628                     LockBuffer(stack->buffer, GIST_UNLOCK);
00629                     LockBuffer(stack->buffer, GIST_EXCLUSIVE);
00630                     xlocked = true;
00631                     stack->page = (Page) BufferGetPage(stack->buffer);
00632 
00633                     if (PageGetLSN(stack->page) != stack->lsn)
00634                     {
00635                         /* the page was changed while we unlocked it, retry */
00636                         continue;
00637                     }
00638                 }
00639 
00640                 /*
00641                  * Update the tuple.
00642                  *
00643                  * We still hold the lock after gistinserttuple(), but it
00644                  * might have to split the page to make the updated tuple fit.
00645                  * In that case the updated tuple might migrate to the other
00646                  * half of the split, so we have to go back to the parent and
00647                  * descend back to the half that's a better fit for the new
00648                  * tuple.
00649                  */
00650                 if (gistinserttuple(&state, stack, giststate, newtup,
00651                                     downlinkoffnum))
00652                 {
00653                     /*
00654                      * If this was a root split, the root page continues to be
00655                      * the parent and the updated tuple went to one of the
00656                      * child pages, so we just need to retry from the root
00657                      * page.
00658                      */
00659                     if (stack->blkno != GIST_ROOT_BLKNO)
00660                     {
00661                         UnlockReleaseBuffer(stack->buffer);
00662                         xlocked = false;
00663                         state.stack = stack = stack->parent;
00664                     }
00665                     continue;
00666                 }
00667             }
00668             LockBuffer(stack->buffer, GIST_UNLOCK);
00669             xlocked = false;
00670 
00671             /* descend to the chosen child */
00672             item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
00673             item->blkno = childblkno;
00674             item->parent = stack;
00675             item->downlinkoffnum = downlinkoffnum;
00676             state.stack = stack = item;
00677         }
00678         else
00679         {
00680             /*
00681              * Leaf page. Insert the new key. We've already updated all the
00682              * parents on the way down, but we might have to split the page if
00683              * it doesn't fit. gistinserthere() will take care of that.
00684              */
00685 
00686             /*
00687              * Swap shared lock for an exclusive one. Be careful, the page may
00688              * change while we unlock/lock the page...
00689              */
00690             if (!xlocked)
00691             {
00692                 LockBuffer(stack->buffer, GIST_UNLOCK);
00693                 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
00694                 xlocked = true;
00695                 stack->page = (Page) BufferGetPage(stack->buffer);
00696                 stack->lsn = PageGetLSN(stack->page);
00697 
00698                 if (stack->blkno == GIST_ROOT_BLKNO)
00699                 {
00700                     /*
00701                      * the only page that can become inner instead of leaf is
00702                      * the root page, so for root we should recheck it
00703                      */
00704                     if (!GistPageIsLeaf(stack->page))
00705                     {
00706                         /*
00707                          * very rare situation: during unlock/lock index with
00708                          * number of pages = 1 was increased
00709                          */
00710                         LockBuffer(stack->buffer, GIST_UNLOCK);
00711                         xlocked = false;
00712                         continue;
00713                     }
00714 
00715                     /*
00716                      * we don't need to check root split, because checking
00717                      * leaf/inner is enough to recognize split for root
00718                      */
00719                 }
00720                 else if (GistFollowRight(stack->page) ||
00721                          stack->parent->lsn < GistPageGetNSN(stack->page))
00722                 {
00723                     /*
00724                      * The page was split while we momentarily unlocked the
00725                      * page. Go back to parent.
00726                      */
00727                     UnlockReleaseBuffer(stack->buffer);
00728                     xlocked = false;
00729                     state.stack = stack = stack->parent;
00730                     continue;
00731                 }
00732             }
00733 
00734             /* now state.stack->(page, buffer and blkno) points to leaf page */
00735 
00736             gistinserttuple(&state, stack, giststate, itup,
00737                             InvalidOffsetNumber);
00738             LockBuffer(stack->buffer, GIST_UNLOCK);
00739 
00740             /* Release any pins we might still hold before exiting */
00741             for (; stack; stack = stack->parent)
00742                 ReleaseBuffer(stack->buffer);
00743             break;
00744         }
00745     }
00746 }
00747 
00748 /*
00749  * Traverse the tree to find path from root page to specified "child" block.
00750  *
00751  * returns a new insertion stack, starting from the parent of "child", up
00752  * to the root. *downlinkoffnum is set to the offset of the downlink in the
00753  * direct parent of child.
00754  *
00755  * To prevent deadlocks, this should lock only one page at a time.
00756  */
00757 static GISTInsertStack *
00758 gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
00759 {
00760     Page        page;
00761     Buffer      buffer;
00762     OffsetNumber i,
00763                 maxoff;
00764     ItemId      iid;
00765     IndexTuple  idxtuple;
00766     List       *fifo;
00767     GISTInsertStack *top,
00768                *ptr;
00769     BlockNumber blkno;
00770 
00771     top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
00772     top->blkno = GIST_ROOT_BLKNO;
00773     top->downlinkoffnum = InvalidOffsetNumber;
00774 
00775     fifo = list_make1(top);
00776     while (fifo != NIL)
00777     {
00778         /* Get next page to visit */
00779         top = linitial(fifo);
00780         fifo = list_delete_first(fifo);
00781 
00782         buffer = ReadBuffer(r, top->blkno);
00783         LockBuffer(buffer, GIST_SHARE);
00784         gistcheckpage(r, buffer);
00785         page = (Page) BufferGetPage(buffer);
00786 
00787         if (GistPageIsLeaf(page))
00788         {
00789             /*
00790              * Because we scan the index top-down, all the rest of the pages
00791              * in the queue must be leaf pages as well.
00792              */
00793             UnlockReleaseBuffer(buffer);
00794             break;
00795         }
00796 
00797         top->lsn = PageGetLSN(page);
00798 
00799         /*
00800          * If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
00801          * downlink. This should not normally happen..
00802          */
00803         if (GistFollowRight(page))
00804             elog(ERROR, "concurrent GiST page split was incomplete");
00805 
00806         if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
00807             GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
00808         {
00809             /*
00810              * Page was split while we looked elsewhere. We didn't see the
00811              * downlink to the right page when we scanned the parent, so add
00812              * it to the queue now.
00813              *
00814              * Put the right page ahead of the queue, so that we visit it
00815              * next. That's important, because if this is the lowest internal
00816              * level, just above leaves, we might already have queued up some
00817              * leaf pages, and we assume that there can't be any non-leaf
00818              * pages behind leaf pages.
00819              */
00820             ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
00821             ptr->blkno = GistPageGetOpaque(page)->rightlink;
00822             ptr->downlinkoffnum = InvalidOffsetNumber;
00823             ptr->parent = top->parent;
00824 
00825             fifo = lcons(ptr, fifo);
00826         }
00827 
00828         maxoff = PageGetMaxOffsetNumber(page);
00829 
00830         for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
00831         {
00832             iid = PageGetItemId(page, i);
00833             idxtuple = (IndexTuple) PageGetItem(page, iid);
00834             blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
00835             if (blkno == child)
00836             {
00837                 /* Found it! */
00838                 UnlockReleaseBuffer(buffer);
00839                 *downlinkoffnum = i;
00840                 return top;
00841             }
00842             else
00843             {
00844                 /* Append this child to the list of pages to visit later */
00845                 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
00846                 ptr->blkno = blkno;
00847                 ptr->downlinkoffnum = i;
00848                 ptr->parent = top;
00849 
00850                 fifo = lappend(fifo, ptr);
00851             }
00852         }
00853 
00854         UnlockReleaseBuffer(buffer);
00855     }
00856 
00857     elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
00858          RelationGetRelationName(r), child);
00859     return NULL;                /* keep compiler quiet */
00860 }
00861 
00862 /*
00863  * Updates the stack so that child->parent is the correct parent of the
00864  * child. child->parent must be exclusively locked on entry, and will
00865  * remain so at exit, but it might not be the same page anymore.
00866  */
00867 static void
00868 gistFindCorrectParent(Relation r, GISTInsertStack *child)
00869 {
00870     GISTInsertStack *parent = child->parent;
00871 
00872     gistcheckpage(r, parent->buffer);
00873     parent->page = (Page) BufferGetPage(parent->buffer);
00874 
00875     /* here we don't need to distinguish between split and page update */
00876     if (child->downlinkoffnum == InvalidOffsetNumber ||
00877         parent->lsn != PageGetLSN(parent->page))
00878     {
00879         /* parent is changed, look child in right links until found */
00880         OffsetNumber i,
00881                     maxoff;
00882         ItemId      iid;
00883         IndexTuple  idxtuple;
00884         GISTInsertStack *ptr;
00885 
00886         while (true)
00887         {
00888             maxoff = PageGetMaxOffsetNumber(parent->page);
00889             for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
00890             {
00891                 iid = PageGetItemId(parent->page, i);
00892                 idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
00893                 if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
00894                 {
00895                     /* yes!!, found */
00896                     child->downlinkoffnum = i;
00897                     return;
00898                 }
00899             }
00900 
00901             parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
00902             UnlockReleaseBuffer(parent->buffer);
00903             if (parent->blkno == InvalidBlockNumber)
00904             {
00905                 /*
00906                  * End of chain and still didn't find parent. It's a very-very
00907                  * rare situation when root splited.
00908                  */
00909                 break;
00910             }
00911             parent->buffer = ReadBuffer(r, parent->blkno);
00912             LockBuffer(parent->buffer, GIST_EXCLUSIVE);
00913             gistcheckpage(r, parent->buffer);
00914             parent->page = (Page) BufferGetPage(parent->buffer);
00915         }
00916 
00917         /*
00918          * awful!!, we need search tree to find parent ... , but before we
00919          * should release all old parent
00920          */
00921 
00922         ptr = child->parent->parent;    /* child->parent already released
00923                                          * above */
00924         while (ptr)
00925         {
00926             ReleaseBuffer(ptr->buffer);
00927             ptr = ptr->parent;
00928         }
00929 
00930         /* ok, find new path */
00931         ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
00932 
00933         /* read all buffers as expected by caller */
00934         /* note we don't lock them or gistcheckpage them here! */
00935         while (ptr)
00936         {
00937             ptr->buffer = ReadBuffer(r, ptr->blkno);
00938             ptr->page = (Page) BufferGetPage(ptr->buffer);
00939             ptr = ptr->parent;
00940         }
00941 
00942         /* install new chain of parents to stack */
00943         child->parent = parent;
00944 
00945         /* make recursive call to normal processing */
00946         LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
00947         gistFindCorrectParent(r, child);
00948     }
00949 
00950     return;
00951 }
00952 
00953 /*
00954  * Form a downlink pointer for the page in 'buf'.
00955  */
00956 static IndexTuple
00957 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
00958                  GISTInsertStack *stack)
00959 {
00960     Page        page = BufferGetPage(buf);
00961     OffsetNumber maxoff;
00962     OffsetNumber offset;
00963     IndexTuple  downlink = NULL;
00964 
00965     maxoff = PageGetMaxOffsetNumber(page);
00966     for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
00967     {
00968         IndexTuple  ituple = (IndexTuple)
00969         PageGetItem(page, PageGetItemId(page, offset));
00970 
00971         if (downlink == NULL)
00972             downlink = CopyIndexTuple(ituple);
00973         else
00974         {
00975             IndexTuple  newdownlink;
00976 
00977             newdownlink = gistgetadjusted(rel, downlink, ituple,
00978                                           giststate);
00979             if (newdownlink)
00980                 downlink = newdownlink;
00981         }
00982     }
00983 
00984     /*
00985      * If the page is completely empty, we can't form a meaningful downlink
00986      * for it. But we have to insert a downlink for the page. Any key will do,
00987      * as long as its consistent with the downlink of parent page, so that we
00988      * can legally insert it to the parent. A minimal one that matches as few
00989      * scans as possible would be best, to keep scans from doing useless work,
00990      * but we don't know how to construct that. So we just use the downlink of
00991      * the original page that was split - that's as far from optimal as it can
00992      * get but will do..
00993      */
00994     if (!downlink)
00995     {
00996         ItemId      iid;
00997 
00998         LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
00999         gistFindCorrectParent(rel, stack);
01000         iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
01001         downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
01002         downlink = CopyIndexTuple(downlink);
01003         LockBuffer(stack->parent->buffer, GIST_UNLOCK);
01004     }
01005 
01006     ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
01007     GistTupleSetValid(downlink);
01008 
01009     return downlink;
01010 }
01011 
01012 
01013 /*
01014  * Complete the incomplete split of state->stack->page.
01015  */
01016 static void
01017 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
01018 {
01019     GISTInsertStack *stack = state->stack;
01020     Buffer      buf;
01021     Page        page;
01022     List       *splitinfo = NIL;
01023 
01024     elog(LOG, "fixing incomplete split in index \"%s\", block %u",
01025          RelationGetRelationName(state->r), stack->blkno);
01026 
01027     Assert(GistFollowRight(stack->page));
01028     Assert(OffsetNumberIsValid(stack->downlinkoffnum));
01029 
01030     buf = stack->buffer;
01031 
01032     /*
01033      * Read the chain of split pages, following the rightlinks. Construct a
01034      * downlink tuple for each page.
01035      */
01036     for (;;)
01037     {
01038         GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
01039         IndexTuple  downlink;
01040 
01041         page = BufferGetPage(buf);
01042 
01043         /* Form the new downlink tuples to insert to parent */
01044         downlink = gistformdownlink(state->r, buf, giststate, stack);
01045 
01046         si->buf = buf;
01047         si->downlink = downlink;
01048 
01049         splitinfo = lappend(splitinfo, si);
01050 
01051         if (GistFollowRight(page))
01052         {
01053             /* lock next page */
01054             buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
01055             LockBuffer(buf, GIST_EXCLUSIVE);
01056         }
01057         else
01058             break;
01059     }
01060 
01061     /* Insert the downlinks */
01062     gistfinishsplit(state, stack, giststate, splitinfo, false);
01063 }
01064 
01065 /*
01066  * Insert or replace a tuple in stack->buffer. If 'oldoffnum' is valid, the
01067  * tuple at 'oldoffnum' is replaced, otherwise the tuple is inserted as new.
01068  * 'stack' represents the path from the root to the page being updated.
01069  *
01070  * The caller must hold an exclusive lock on stack->buffer.  The lock is still
01071  * held on return, but the page might not contain the inserted tuple if the
01072  * page was split. The function returns true if the page was split, false
01073  * otherwise.
01074  */
01075 static bool
01076 gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
01077               GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
01078 {
01079     return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
01080                             InvalidBuffer, InvalidBuffer, false, false);
01081 }
01082 
01083 /* ----------------
01084  * An extended workhorse version of gistinserttuple(). This version allows
01085  * inserting multiple tuples, or replacing a single tuple with multiple tuples.
01086  * This is used to recursively update the downlinks in the parent when a page
01087  * is split.
01088  *
01089  * If leftchild and rightchild are valid, we're inserting/replacing the
01090  * downlink for rightchild, and leftchild is its left sibling. We clear the
01091  * F_FOLLOW_RIGHT flag and update NSN on leftchild, atomically with the
01092  * insertion of the downlink.
01093  *
01094  * To avoid holding locks for longer than necessary, when recursing up the
01095  * tree to update the parents, the locking is a bit peculiar here. On entry,
01096  * the caller must hold an exclusive lock on stack->buffer, as well as
01097  * leftchild and rightchild if given. On return:
01098  *
01099  *  - Lock on stack->buffer is released, if 'unlockbuf' is true. The page is
01100  *    always kept pinned, however.
01101  *  - Lock on 'leftchild' is released, if 'unlockleftchild' is true. The page
01102  *    is kept pinned.
01103  *  - Lock and pin on 'rightchild' are always released.
01104  *
01105  * Returns 'true' if the page had to be split. Note that if the page was
01106  * split, the inserted/updated tuples might've been inserted to a right
01107  * sibling of stack->buffer instead of stack->buffer itself.
01108  */
01109 static bool
01110 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
01111                  GISTSTATE *giststate,
01112                  IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
01113                  Buffer leftchild, Buffer rightchild,
01114                  bool unlockbuf, bool unlockleftchild)
01115 {
01116     List       *splitinfo;
01117     bool        is_split;
01118 
01119     /* Insert the tuple(s) to the page, splitting the page if necessary */
01120     is_split = gistplacetopage(state->r, state->freespace, giststate,
01121                                stack->buffer,
01122                                tuples, ntup,
01123                                oldoffnum, NULL,
01124                                leftchild,
01125                                &splitinfo,
01126                                true);
01127 
01128     /*
01129      * Before recursing up in case the page was split, release locks on the
01130      * child pages. We don't need to keep them locked when updating the
01131      * parent.
01132      */
01133     if (BufferIsValid(rightchild))
01134         UnlockReleaseBuffer(rightchild);
01135     if (BufferIsValid(leftchild) && unlockleftchild)
01136         LockBuffer(leftchild, GIST_UNLOCK);
01137 
01138     /*
01139      * If we had to split, insert/update the downlinks in the parent. If the
01140      * caller requested us to release the lock on stack->buffer, tell
01141      * gistfinishsplit() to do that as soon as it's safe to do so. If we
01142      * didn't have to split, release it ourselves.
01143      */
01144     if (splitinfo)
01145         gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
01146     else if (unlockbuf)
01147         LockBuffer(stack->buffer, GIST_UNLOCK);
01148 
01149     return is_split;
01150 }
01151 
01152 /*
01153  * Finish an incomplete split by inserting/updating the downlinks in parent
01154  * page. 'splitinfo' contains all the child pages involved in the split,
01155  * from left-to-right.
01156  *
01157  * On entry, the caller must hold a lock on stack->buffer and all the child
01158  * pages in 'splitinfo'. If 'unlockbuf' is true, the lock on stack->buffer is
01159  * released on return. The child pages are always unlocked and unpinned.
01160  */
01161 static void
01162 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
01163                 GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
01164 {
01165     ListCell   *lc;
01166     List       *reversed;
01167     GISTPageSplitInfo *right;
01168     GISTPageSplitInfo *left;
01169     IndexTuple  tuples[2];
01170 
01171     /* A split always contains at least two halves */
01172     Assert(list_length(splitinfo) >= 2);
01173 
01174     /*
01175      * We need to insert downlinks for each new page, and update the downlink
01176      * for the original (leftmost) page in the split. Begin at the rightmost
01177      * page, inserting one downlink at a time until there's only two pages
01178      * left. Finally insert the downlink for the last new page and update the
01179      * downlink for the original page as one operation.
01180      */
01181 
01182     /* for convenience, create a copy of the list in reverse order */
01183     reversed = NIL;
01184     foreach(lc, splitinfo)
01185     {
01186         reversed = lcons(lfirst(lc), reversed);
01187     }
01188 
01189     LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
01190     gistFindCorrectParent(state->r, stack);
01191 
01192     /*
01193      * insert downlinks for the siblings from right to left, until there are
01194      * only two siblings left.
01195      */
01196     while (list_length(reversed) > 2)
01197     {
01198         right = (GISTPageSplitInfo *) linitial(reversed);
01199         left = (GISTPageSplitInfo *) lsecond(reversed);
01200 
01201         if (gistinserttuples(state, stack->parent, giststate,
01202                              &right->downlink, 1,
01203                              InvalidOffsetNumber,
01204                              left->buf, right->buf, false, false))
01205         {
01206             /*
01207              * If the parent page was split, need to relocate the original
01208              * parent pointer.
01209              */
01210             gistFindCorrectParent(state->r, stack);
01211         }
01212         /* gistinserttuples() released the lock on right->buf. */
01213         reversed = list_delete_first(reversed);
01214     }
01215 
01216     right = (GISTPageSplitInfo *) linitial(reversed);
01217     left = (GISTPageSplitInfo *) lsecond(reversed);
01218 
01219     /*
01220      * Finally insert downlink for the remaining right page and update the
01221      * downlink for the original page to not contain the tuples that were
01222      * moved to the new pages.
01223      */
01224     tuples[0] = left->downlink;
01225     tuples[1] = right->downlink;
01226     gistinserttuples(state, stack->parent, giststate,
01227                      tuples, 2,
01228                      stack->downlinkoffnum,
01229                      left->buf, right->buf,
01230                      true,      /* Unlock parent */
01231                      unlockbuf  /* Unlock stack->buffer if caller wants that */
01232         );
01233     Assert(left->buf == stack->buffer);
01234 }
01235 
01236 /*
01237  * gistSplit -- split a page in the tree and fill struct
01238  * used for XLOG and real writes buffers. Function is recursive, ie
01239  * it will split page until keys will fit in every page.
01240  */
01241 SplitedPageLayout *
01242 gistSplit(Relation r,
01243           Page page,
01244           IndexTuple *itup,     /* contains compressed entry */
01245           int len,
01246           GISTSTATE *giststate)
01247 {
01248     IndexTuple *lvectup,
01249                *rvectup;
01250     GistSplitVector v;
01251     int         i;
01252     SplitedPageLayout *res = NULL;
01253 
01254     memset(v.spl_lisnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
01255     memset(v.spl_risnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
01256     gistSplitByKey(r, page, itup, len, giststate, &v, 0);
01257 
01258     /* form left and right vector */
01259     lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
01260     rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
01261 
01262     for (i = 0; i < v.splitVector.spl_nleft; i++)
01263         lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
01264 
01265     for (i = 0; i < v.splitVector.spl_nright; i++)
01266         rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
01267 
01268     /* finalize splitting (may need another split) */
01269     if (!gistfitpage(rvectup, v.splitVector.spl_nright))
01270     {
01271         res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
01272     }
01273     else
01274     {
01275         ROTATEDIST(res);
01276         res->block.num = v.splitVector.spl_nright;
01277         res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
01278         res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
01279     }
01280 
01281     if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
01282     {
01283         SplitedPageLayout *resptr,
01284                    *subres;
01285 
01286         resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
01287 
01288         /* install on list's tail */
01289         while (resptr->next)
01290             resptr = resptr->next;
01291 
01292         resptr->next = res;
01293         res = subres;
01294     }
01295     else
01296     {
01297         ROTATEDIST(res);
01298         res->block.num = v.splitVector.spl_nleft;
01299         res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
01300         res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
01301     }
01302 
01303     return res;
01304 }
01305 
01306 /*
01307  * Create a GISTSTATE and fill it with information about the index
01308  */
01309 GISTSTATE *
01310 initGISTstate(Relation index)
01311 {
01312     GISTSTATE  *giststate;
01313     MemoryContext scanCxt;
01314     MemoryContext oldCxt;
01315     int         i;
01316 
01317     /* safety check to protect fixed-size arrays in GISTSTATE */
01318     if (index->rd_att->natts > INDEX_MAX_KEYS)
01319         elog(ERROR, "numberOfAttributes %d > %d",
01320              index->rd_att->natts, INDEX_MAX_KEYS);
01321 
01322     /* Create the memory context that will hold the GISTSTATE */
01323     scanCxt = AllocSetContextCreate(CurrentMemoryContext,
01324                                     "GiST scan context",
01325                                     ALLOCSET_DEFAULT_MINSIZE,
01326                                     ALLOCSET_DEFAULT_INITSIZE,
01327                                     ALLOCSET_DEFAULT_MAXSIZE);
01328     oldCxt = MemoryContextSwitchTo(scanCxt);
01329 
01330     /* Create and fill in the GISTSTATE */
01331     giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
01332 
01333     giststate->scanCxt = scanCxt;
01334     giststate->tempCxt = scanCxt;       /* caller must change this if needed */
01335     giststate->tupdesc = index->rd_att;
01336 
01337     for (i = 0; i < index->rd_att->natts; i++)
01338     {
01339         fmgr_info_copy(&(giststate->consistentFn[i]),
01340                        index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
01341                        scanCxt);
01342         fmgr_info_copy(&(giststate->unionFn[i]),
01343                        index_getprocinfo(index, i + 1, GIST_UNION_PROC),
01344                        scanCxt);
01345         fmgr_info_copy(&(giststate->compressFn[i]),
01346                        index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
01347                        scanCxt);
01348         fmgr_info_copy(&(giststate->decompressFn[i]),
01349                        index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
01350                        scanCxt);
01351         fmgr_info_copy(&(giststate->penaltyFn[i]),
01352                        index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
01353                        scanCxt);
01354         fmgr_info_copy(&(giststate->picksplitFn[i]),
01355                        index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
01356                        scanCxt);
01357         fmgr_info_copy(&(giststate->equalFn[i]),
01358                        index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
01359                        scanCxt);
01360         /* opclasses are not required to provide a Distance method */
01361         if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
01362             fmgr_info_copy(&(giststate->distanceFn[i]),
01363                          index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
01364                            scanCxt);
01365         else
01366             giststate->distanceFn[i].fn_oid = InvalidOid;
01367 
01368         /*
01369          * If the index column has a specified collation, we should honor that
01370          * while doing comparisons.  However, we may have a collatable storage
01371          * type for a noncollatable indexed data type.  If there's no index
01372          * collation then specify default collation in case the support
01373          * functions need collation.  This is harmless if the support
01374          * functions don't care about collation, so we just do it
01375          * unconditionally.  (We could alternatively call get_typcollation,
01376          * but that seems like expensive overkill --- there aren't going to be
01377          * any cases where a GiST storage type has a nondefault collation.)
01378          */
01379         if (OidIsValid(index->rd_indcollation[i]))
01380             giststate->supportCollation[i] = index->rd_indcollation[i];
01381         else
01382             giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
01383     }
01384 
01385     MemoryContextSwitchTo(oldCxt);
01386 
01387     return giststate;
01388 }
01389 
01390 void
01391 freeGISTstate(GISTSTATE *giststate)
01392 {
01393     /* It's sufficient to delete the scanCxt */
01394     MemoryContextDelete(giststate->scanCxt);
01395 }