Header And Logo

PostgreSQL
| The world's most advanced open source database.

spgdoinsert.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * spgdoinsert.c
00004  *    implementation of insert algorithm
00005  *
00006  *
00007  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00008  * Portions Copyright (c) 1994, Regents of the University of California
00009  *
00010  * IDENTIFICATION
00011  *          src/backend/access/spgist/spgdoinsert.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 
00016 #include "postgres.h"
00017 
00018 #include "access/genam.h"
00019 #include "access/spgist_private.h"
00020 #include "miscadmin.h"
00021 #include "storage/bufmgr.h"
00022 #include "utils/rel.h"
00023 
00024 
00025 /*
00026  * SPPageDesc tracks all info about a page we are inserting into.  In some
00027  * situations it actually identifies a tuple, or even a specific node within
00028  * an inner tuple.  But any of the fields can be invalid.  If the buffer
00029  * field is valid, it implies we hold pin and exclusive lock on that buffer.
00030  * page pointer should be valid exactly when buffer is.
00031  */
00032 typedef struct SPPageDesc
00033 {
00034     BlockNumber blkno;          /* block number, or InvalidBlockNumber */
00035     Buffer      buffer;         /* page's buffer number, or InvalidBuffer */
00036     Page        page;           /* pointer to page buffer, or NULL */
00037     OffsetNumber offnum;        /* offset of tuple, or InvalidOffsetNumber */
00038     int         node;           /* node number within inner tuple, or -1 */
00039 } SPPageDesc;
00040 
00041 
00042 /*
00043  * Set the item pointer in the nodeN'th entry in inner tuple tup.  This
00044  * is used to update the parent inner tuple's downlink after a move or
00045  * split operation.
00046  */
00047 void
00048 spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
00049                   BlockNumber blkno, OffsetNumber offset)
00050 {
00051     int         i;
00052     SpGistNodeTuple node;
00053 
00054     SGITITERATE(tup, i, node)
00055     {
00056         if (i == nodeN)
00057         {
00058             ItemPointerSet(&node->t_tid, blkno, offset);
00059             return;
00060         }
00061     }
00062 
00063     elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
00064          nodeN);
00065 }
00066 
00067 /*
00068  * Form a new inner tuple containing one more node than the given one, with
00069  * the specified label datum, inserted at offset "offset" in the node array.
00070  * The new tuple's prefix is the same as the old one's.
00071  *
00072  * Note that the new node initially has an invalid downlink.  We'll find a
00073  * page to point it to later.
00074  */
00075 static SpGistInnerTuple
00076 addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
00077 {
00078     SpGistNodeTuple node,
00079                *nodes;
00080     int         i;
00081 
00082     /* if offset is negative, insert at end */
00083     if (offset < 0)
00084         offset = tuple->nNodes;
00085     else if (offset > tuple->nNodes)
00086         elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
00087 
00088     nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
00089     SGITITERATE(tuple, i, node)
00090     {
00091         if (i < offset)
00092             nodes[i] = node;
00093         else
00094             nodes[i + 1] = node;
00095     }
00096 
00097     nodes[offset] = spgFormNodeTuple(state, label, false);
00098 
00099     return spgFormInnerTuple(state,
00100                              (tuple->prefixSize > 0),
00101                              SGITDATUM(tuple, state),
00102                              tuple->nNodes + 1,
00103                              nodes);
00104 }
00105 
00106 /* qsort comparator for sorting OffsetNumbers */
00107 static int
00108 cmpOffsetNumbers(const void *a, const void *b)
00109 {
00110     if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
00111         return 0;
00112     return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
00113 }
00114 
00115 /*
00116  * Delete multiple tuples from an index page, preserving tuple offset numbers.
00117  *
00118  * The first tuple in the given list is replaced with a dead tuple of type
00119  * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
00120  * with dead tuples of type "reststate".  If either firststate or reststate
00121  * is REDIRECT, blkno/offnum specify where to link to.
00122  *
00123  * NB: this is used during WAL replay, so beware of trying to make it too
00124  * smart.  In particular, it shouldn't use "state" except for calling
00125  * spgFormDeadTuple().
00126  */
00127 void
00128 spgPageIndexMultiDelete(SpGistState *state, Page page,
00129                         OffsetNumber *itemnos, int nitems,
00130                         int firststate, int reststate,
00131                         BlockNumber blkno, OffsetNumber offnum)
00132 {
00133     OffsetNumber firstItem;
00134     OffsetNumber *sortednos;
00135     SpGistDeadTuple tuple = NULL;
00136     int         i;
00137 
00138     if (nitems == 0)
00139         return;                 /* nothing to do */
00140 
00141     /*
00142      * For efficiency we want to use PageIndexMultiDelete, which requires the
00143      * targets to be listed in sorted order, so we have to sort the itemnos
00144      * array.  (This also greatly simplifies the math for reinserting the
00145      * replacement tuples.)  However, we must not scribble on the caller's
00146      * array, so we have to make a copy.
00147      */
00148     sortednos = (OffsetNumber *) palloc(sizeof(OffsetNumber) * nitems);
00149     memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
00150     if (nitems > 1)
00151         qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
00152 
00153     PageIndexMultiDelete(page, sortednos, nitems);
00154 
00155     firstItem = itemnos[0];
00156 
00157     for (i = 0; i < nitems; i++)
00158     {
00159         OffsetNumber itemno = sortednos[i];
00160         int         tupstate;
00161 
00162         tupstate = (itemno == firstItem) ? firststate : reststate;
00163         if (tuple == NULL || tuple->tupstate != tupstate)
00164             tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
00165 
00166         if (PageAddItem(page, (Item) tuple, tuple->size,
00167                         itemno, false, false) != itemno)
00168             elog(ERROR, "failed to add item of size %u to SPGiST index page",
00169                  tuple->size);
00170 
00171         if (tupstate == SPGIST_REDIRECT)
00172             SpGistPageGetOpaque(page)->nRedirection++;
00173         else if (tupstate == SPGIST_PLACEHOLDER)
00174             SpGistPageGetOpaque(page)->nPlaceholder++;
00175     }
00176 
00177     pfree(sortednos);
00178 }
00179 
00180 /*
00181  * Update the parent inner tuple's downlink, and mark the parent buffer
00182  * dirty (this must be the last change to the parent page in the current
00183  * WAL action).
00184  */
00185 static void
00186 saveNodeLink(Relation index, SPPageDesc *parent,
00187              BlockNumber blkno, OffsetNumber offnum)
00188 {
00189     SpGistInnerTuple innerTuple;
00190 
00191     innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
00192                                 PageGetItemId(parent->page, parent->offnum));
00193 
00194     spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
00195 
00196     MarkBufferDirty(parent->buffer);
00197 }
00198 
00199 /*
00200  * Add a leaf tuple to a leaf page where there is known to be room for it
00201  */
00202 static void
00203 addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
00204            SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
00205 {
00206     XLogRecData rdata[4];
00207     spgxlogAddLeaf xlrec;
00208 
00209     xlrec.node = index->rd_node;
00210     xlrec.blknoLeaf = current->blkno;
00211     xlrec.newPage = isNew;
00212     xlrec.storesNulls = isNulls;
00213 
00214     /* these will be filled below as needed */
00215     xlrec.offnumLeaf = InvalidOffsetNumber;
00216     xlrec.offnumHeadLeaf = InvalidOffsetNumber;
00217     xlrec.blknoParent = InvalidBlockNumber;
00218     xlrec.offnumParent = InvalidOffsetNumber;
00219     xlrec.nodeI = 0;
00220 
00221     ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
00222     /* we assume sizeof(xlrec) is at least int-aligned */
00223     ACCEPT_RDATA_DATA(leafTuple, leafTuple->size, 1);
00224     ACCEPT_RDATA_BUFFER(current->buffer, 2);
00225 
00226     START_CRIT_SECTION();
00227 
00228     if (current->offnum == InvalidOffsetNumber ||
00229         SpGistBlockIsRoot(current->blkno))
00230     {
00231         /* Tuple is not part of a chain */
00232         leafTuple->nextOffset = InvalidOffsetNumber;
00233         current->offnum = SpGistPageAddNewItem(state, current->page,
00234                                            (Item) leafTuple, leafTuple->size,
00235                                                NULL, false);
00236 
00237         xlrec.offnumLeaf = current->offnum;
00238 
00239         /* Must update parent's downlink if any */
00240         if (parent->buffer != InvalidBuffer)
00241         {
00242             xlrec.blknoParent = parent->blkno;
00243             xlrec.offnumParent = parent->offnum;
00244             xlrec.nodeI = parent->node;
00245 
00246             saveNodeLink(index, parent, current->blkno, current->offnum);
00247 
00248             ACCEPT_RDATA_BUFFER(parent->buffer, 3);
00249         }
00250     }
00251     else
00252     {
00253         /*
00254          * Tuple must be inserted into existing chain.  We mustn't change the
00255          * chain's head address, but we don't need to chase the entire chain
00256          * to put the tuple at the end; we can insert it second.
00257          *
00258          * Also, it's possible that the "chain" consists only of a DEAD tuple,
00259          * in which case we should replace the DEAD tuple in-place.
00260          */
00261         SpGistLeafTuple head;
00262         OffsetNumber offnum;
00263 
00264         head = (SpGistLeafTuple) PageGetItem(current->page,
00265                               PageGetItemId(current->page, current->offnum));
00266         if (head->tupstate == SPGIST_LIVE)
00267         {
00268             leafTuple->nextOffset = head->nextOffset;
00269             offnum = SpGistPageAddNewItem(state, current->page,
00270                                           (Item) leafTuple, leafTuple->size,
00271                                           NULL, false);
00272 
00273             /*
00274              * re-get head of list because it could have been moved on page,
00275              * and set new second element
00276              */
00277             head = (SpGistLeafTuple) PageGetItem(current->page,
00278                               PageGetItemId(current->page, current->offnum));
00279             head->nextOffset = offnum;
00280 
00281             xlrec.offnumLeaf = offnum;
00282             xlrec.offnumHeadLeaf = current->offnum;
00283         }
00284         else if (head->tupstate == SPGIST_DEAD)
00285         {
00286             leafTuple->nextOffset = InvalidOffsetNumber;
00287             PageIndexTupleDelete(current->page, current->offnum);
00288             if (PageAddItem(current->page,
00289                             (Item) leafTuple, leafTuple->size,
00290                             current->offnum, false, false) != current->offnum)
00291                 elog(ERROR, "failed to add item of size %u to SPGiST index page",
00292                      leafTuple->size);
00293 
00294             /* WAL replay distinguishes this case by equal offnums */
00295             xlrec.offnumLeaf = current->offnum;
00296             xlrec.offnumHeadLeaf = current->offnum;
00297         }
00298         else
00299             elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
00300     }
00301 
00302     MarkBufferDirty(current->buffer);
00303 
00304     if (RelationNeedsWAL(index))
00305     {
00306         XLogRecPtr  recptr;
00307 
00308         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata);
00309 
00310         PageSetLSN(current->page, recptr);
00311 
00312         /* update parent only if we actually changed it */
00313         if (xlrec.blknoParent != InvalidBlockNumber)
00314         {
00315             PageSetLSN(parent->page, recptr);
00316         }
00317     }
00318 
00319     END_CRIT_SECTION();
00320 }
00321 
00322 /*
00323  * Count the number and total size of leaf tuples in the chain starting at
00324  * current->offnum.  Return number into *nToSplit and total size as function
00325  * result.
00326  *
00327  * Klugy special case when considering the root page (i.e., root is a leaf
00328  * page, but we're about to split for the first time): return fake large
00329  * values to force spgdoinsert() to take the doPickSplit rather than
00330  * moveLeafs code path.  moveLeafs is not prepared to deal with root page.
00331  */
00332 static int
00333 checkSplitConditions(Relation index, SpGistState *state,
00334                      SPPageDesc *current, int *nToSplit)
00335 {
00336     int         i,
00337                 n = 0,
00338                 totalSize = 0;
00339 
00340     if (SpGistBlockIsRoot(current->blkno))
00341     {
00342         /* return impossible values to force split */
00343         *nToSplit = BLCKSZ;
00344         return BLCKSZ;
00345     }
00346 
00347     i = current->offnum;
00348     while (i != InvalidOffsetNumber)
00349     {
00350         SpGistLeafTuple it;
00351 
00352         Assert(i >= FirstOffsetNumber &&
00353                i <= PageGetMaxOffsetNumber(current->page));
00354         it = (SpGistLeafTuple) PageGetItem(current->page,
00355                                            PageGetItemId(current->page, i));
00356         if (it->tupstate == SPGIST_LIVE)
00357         {
00358             n++;
00359             totalSize += it->size + sizeof(ItemIdData);
00360         }
00361         else if (it->tupstate == SPGIST_DEAD)
00362         {
00363             /* We could see a DEAD tuple as first/only chain item */
00364             Assert(i == current->offnum);
00365             Assert(it->nextOffset == InvalidOffsetNumber);
00366             /* Don't count it in result, because it won't go to other page */
00367         }
00368         else
00369             elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
00370 
00371         i = it->nextOffset;
00372     }
00373 
00374     *nToSplit = n;
00375 
00376     return totalSize;
00377 }
00378 
00379 /*
00380  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
00381  * but the chain has to be moved because there's not enough room to add
00382  * newLeafTuple to its page.  We use this method when the chain contains
00383  * very little data so a split would be inefficient.  We are sure we can
00384  * fit the chain plus newLeafTuple on one other page.
00385  */
00386 static void
00387 moveLeafs(Relation index, SpGistState *state,
00388           SPPageDesc *current, SPPageDesc *parent,
00389           SpGistLeafTuple newLeafTuple, bool isNulls)
00390 {
00391     int         i,
00392                 nDelete,
00393                 nInsert,
00394                 size;
00395     Buffer      nbuf;
00396     Page        npage;
00397     SpGistLeafTuple it;
00398     OffsetNumber r = InvalidOffsetNumber,
00399                 startOffset = InvalidOffsetNumber;
00400     bool        replaceDead = false;
00401     OffsetNumber *toDelete;
00402     OffsetNumber *toInsert;
00403     BlockNumber nblkno;
00404     XLogRecData rdata[7];
00405     spgxlogMoveLeafs xlrec;
00406     char       *leafdata,
00407                *leafptr;
00408 
00409     /* This doesn't work on root page */
00410     Assert(parent->buffer != InvalidBuffer);
00411     Assert(parent->buffer != current->buffer);
00412 
00413     /* Locate the tuples to be moved, and count up the space needed */
00414     i = PageGetMaxOffsetNumber(current->page);
00415     toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
00416     toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
00417 
00418     size = newLeafTuple->size + sizeof(ItemIdData);
00419 
00420     nDelete = 0;
00421     i = current->offnum;
00422     while (i != InvalidOffsetNumber)
00423     {
00424         SpGistLeafTuple it;
00425 
00426         Assert(i >= FirstOffsetNumber &&
00427                i <= PageGetMaxOffsetNumber(current->page));
00428         it = (SpGistLeafTuple) PageGetItem(current->page,
00429                                            PageGetItemId(current->page, i));
00430 
00431         if (it->tupstate == SPGIST_LIVE)
00432         {
00433             toDelete[nDelete] = i;
00434             size += it->size + sizeof(ItemIdData);
00435             nDelete++;
00436         }
00437         else if (it->tupstate == SPGIST_DEAD)
00438         {
00439             /* We could see a DEAD tuple as first/only chain item */
00440             Assert(i == current->offnum);
00441             Assert(it->nextOffset == InvalidOffsetNumber);
00442             /* We don't want to move it, so don't count it in size */
00443             toDelete[nDelete] = i;
00444             nDelete++;
00445             replaceDead = true;
00446         }
00447         else
00448             elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
00449 
00450         i = it->nextOffset;
00451     }
00452 
00453     /* Find a leaf page that will hold them */
00454     nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
00455                            size, &xlrec.newPage);
00456     npage = BufferGetPage(nbuf);
00457     nblkno = BufferGetBlockNumber(nbuf);
00458     Assert(nblkno != current->blkno);
00459 
00460     /* prepare WAL info */
00461     xlrec.node = index->rd_node;
00462     STORE_STATE(state, xlrec.stateSrc);
00463 
00464     xlrec.blknoSrc = current->blkno;
00465     xlrec.blknoDst = nblkno;
00466     xlrec.nMoves = nDelete;
00467     xlrec.replaceDead = replaceDead;
00468     xlrec.storesNulls = isNulls;
00469 
00470     xlrec.blknoParent = parent->blkno;
00471     xlrec.offnumParent = parent->offnum;
00472     xlrec.nodeI = parent->node;
00473 
00474     leafdata = leafptr = palloc(size);
00475 
00476     START_CRIT_SECTION();
00477 
00478     /* copy all the old tuples to new page, unless they're dead */
00479     nInsert = 0;
00480     if (!replaceDead)
00481     {
00482         for (i = 0; i < nDelete; i++)
00483         {
00484             it = (SpGistLeafTuple) PageGetItem(current->page,
00485                                   PageGetItemId(current->page, toDelete[i]));
00486             Assert(it->tupstate == SPGIST_LIVE);
00487 
00488             /*
00489              * Update chain link (notice the chain order gets reversed, but we
00490              * don't care).  We're modifying the tuple on the source page
00491              * here, but it's okay since we're about to delete it.
00492              */
00493             it->nextOffset = r;
00494 
00495             r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
00496                                      &startOffset, false);
00497 
00498             toInsert[nInsert] = r;
00499             nInsert++;
00500 
00501             /* save modified tuple into leafdata as well */
00502             memcpy(leafptr, it, it->size);
00503             leafptr += it->size;
00504         }
00505     }
00506 
00507     /* add the new tuple as well */
00508     newLeafTuple->nextOffset = r;
00509     r = SpGistPageAddNewItem(state, npage,
00510                              (Item) newLeafTuple, newLeafTuple->size,
00511                              &startOffset, false);
00512     toInsert[nInsert] = r;
00513     nInsert++;
00514     memcpy(leafptr, newLeafTuple, newLeafTuple->size);
00515     leafptr += newLeafTuple->size;
00516 
00517     /*
00518      * Now delete the old tuples, leaving a redirection pointer behind for the
00519      * first one, unless we're doing an index build; in which case there can't
00520      * be any concurrent scan so we need not provide a redirect.
00521      */
00522     spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
00523                        state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
00524                             SPGIST_PLACEHOLDER,
00525                             nblkno, r);
00526 
00527     /* Update parent's downlink and mark parent page dirty */
00528     saveNodeLink(index, parent, nblkno, r);
00529 
00530     /* Mark the leaf pages too */
00531     MarkBufferDirty(current->buffer);
00532     MarkBufferDirty(nbuf);
00533 
00534     if (RelationNeedsWAL(index))
00535     {
00536         XLogRecPtr  recptr;
00537 
00538         ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0);
00539         ACCEPT_RDATA_DATA(toDelete, MAXALIGN(sizeof(OffsetNumber) * nDelete), 1);
00540         ACCEPT_RDATA_DATA(toInsert, MAXALIGN(sizeof(OffsetNumber) * nInsert), 2);
00541         ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, 3);
00542         ACCEPT_RDATA_BUFFER(current->buffer, 4);
00543         ACCEPT_RDATA_BUFFER(nbuf, 5);
00544         ACCEPT_RDATA_BUFFER(parent->buffer, 6);
00545 
00546         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata);
00547 
00548         PageSetLSN(current->page, recptr);
00549         PageSetLSN(npage, recptr);
00550         PageSetLSN(parent->page, recptr);
00551     }
00552 
00553     END_CRIT_SECTION();
00554 
00555     /* Update local free-space cache and release new buffer */
00556     SpGistSetLastUsedPage(index, nbuf);
00557     UnlockReleaseBuffer(nbuf);
00558 }
00559 
00560 /*
00561  * Update previously-created redirection tuple with appropriate destination
00562  *
00563  * We use this when it's not convenient to know the destination first.
00564  * The tuple should have been made with the "impossible" destination of
00565  * the metapage.
00566  */
00567 static void
00568 setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
00569                     BlockNumber blkno, OffsetNumber offnum)
00570 {
00571     SpGistDeadTuple dt;
00572 
00573     dt = (SpGistDeadTuple) PageGetItem(current->page,
00574                                      PageGetItemId(current->page, position));
00575     Assert(dt->tupstate == SPGIST_REDIRECT);
00576     Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
00577     ItemPointerSet(&dt->pointer, blkno, offnum);
00578 }
00579 
00580 /*
00581  * Test to see if the user-defined picksplit function failed to do its job,
00582  * ie, it put all the leaf tuples into the same node.
00583  * If so, randomly divide the tuples into several nodes (all with the same
00584  * label) and return TRUE to select allTheSame mode for this inner tuple.
00585  *
00586  * (This code is also used to forcibly select allTheSame mode for nulls.)
00587  *
00588  * If we know that the leaf tuples wouldn't all fit on one page, then we
00589  * exclude the last tuple (which is the incoming new tuple that forced a split)
00590  * from the check to see if more than one node is used.  The reason for this
00591  * is that if the existing tuples are put into only one chain, then even if
00592  * we move them all to an empty page, there would still not be room for the
00593  * new tuple, so we'd get into an infinite loop of picksplit attempts.
00594  * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
00595  * be split across pages.  (Exercise for the reader: figure out why this
00596  * fixes the problem even when there is only one old tuple.)
00597  */
00598 static bool
00599 checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
00600                 bool *includeNew)
00601 {
00602     int         theNode;
00603     int         limit;
00604     int         i;
00605 
00606     /* For the moment, assume we can include the new leaf tuple */
00607     *includeNew = true;
00608 
00609     /* If there's only the new leaf tuple, don't select allTheSame mode */
00610     if (in->nTuples <= 1)
00611         return false;
00612 
00613     /* If tuple set doesn't fit on one page, ignore the new tuple in test */
00614     limit = tooBig ? in->nTuples - 1 : in->nTuples;
00615 
00616     /* Check to see if more than one node is populated */
00617     theNode = out->mapTuplesToNodes[0];
00618     for (i = 1; i < limit; i++)
00619     {
00620         if (out->mapTuplesToNodes[i] != theNode)
00621             return false;
00622     }
00623 
00624     /* Nope, so override the picksplit function's decisions */
00625 
00626     /* If the new tuple is in its own node, it can't be included in split */
00627     if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
00628         *includeNew = false;
00629 
00630     out->nNodes = 8;            /* arbitrary number of child nodes */
00631 
00632     /* Random assignment of tuples to nodes (note we include new tuple) */
00633     for (i = 0; i < in->nTuples; i++)
00634         out->mapTuplesToNodes[i] = i % out->nNodes;
00635 
00636     /* The opclass may not use node labels, but if it does, duplicate 'em */
00637     if (out->nodeLabels)
00638     {
00639         Datum       theLabel = out->nodeLabels[theNode];
00640 
00641         out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
00642         for (i = 0; i < out->nNodes; i++)
00643             out->nodeLabels[i] = theLabel;
00644     }
00645 
00646     /* We don't touch the prefix or the leaf tuple datum assignments */
00647 
00648     return true;
00649 }
00650 
00651 /*
00652  * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
00653  * but the chain has to be split because there's not enough room to add
00654  * newLeafTuple to its page.
00655  *
00656  * This function splits the leaf tuple set according to picksplit's rules,
00657  * creating one or more new chains that are spread across the current page
00658  * and an additional leaf page (we assume that two leaf pages will be
00659  * sufficient).  A new inner tuple is created, and the parent downlink
00660  * pointer is updated to point to that inner tuple instead of the leaf chain.
00661  *
00662  * On exit, current contains the address of the new inner tuple.
00663  *
00664  * Returns true if we successfully inserted newLeafTuple during this function,
00665  * false if caller still has to do it (meaning another picksplit operation is
00666  * probably needed).  Failure could occur if the picksplit result is fairly
00667  * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
00668  * Because we force the picksplit result to be at least two chains, each
00669  * cycle will get rid of at least one leaf tuple from the chain, so the loop
00670  * will eventually terminate if lack of balance is the issue.  If the tuple
00671  * is too big, we assume that repeated picksplit operations will eventually
00672  * make it small enough by repeated prefix-stripping.  A broken opclass could
00673  * make this an infinite loop, though.
00674  */
00675 static bool
00676 doPickSplit(Relation index, SpGistState *state,
00677             SPPageDesc *current, SPPageDesc *parent,
00678             SpGistLeafTuple newLeafTuple,
00679             int level, bool isNulls, bool isNew)
00680 {
00681     bool        insertedNew = false;
00682     spgPickSplitIn in;
00683     spgPickSplitOut out;
00684     FmgrInfo   *procinfo;
00685     bool        includeNew;
00686     int         i,
00687                 max,
00688                 n;
00689     SpGistInnerTuple innerTuple;
00690     SpGistNodeTuple node,
00691                *nodes;
00692     Buffer      newInnerBuffer,
00693                 newLeafBuffer;
00694     ItemPointerData *heapPtrs;
00695     uint8      *leafPageSelect;
00696     int        *leafSizes;
00697     OffsetNumber *toDelete;
00698     OffsetNumber *toInsert;
00699     OffsetNumber redirectTuplePos = InvalidOffsetNumber;
00700     OffsetNumber startOffsets[2];
00701     SpGistLeafTuple *newLeafs;
00702     int         spaceToDelete;
00703     int         currentFreeSpace;
00704     int         totalLeafSizes;
00705     bool        allTheSame;
00706     XLogRecData rdata[10];
00707     int         nRdata;
00708     spgxlogPickSplit xlrec;
00709     char       *leafdata,
00710                *leafptr;
00711     SPPageDesc  saveCurrent;
00712     int         nToDelete,
00713                 nToInsert,
00714                 maxToInclude;
00715 
00716     in.level = level;
00717 
00718     /*
00719      * Allocate per-leaf-tuple work arrays with max possible size
00720      */
00721     max = PageGetMaxOffsetNumber(current->page);
00722     n = max + 1;
00723     in.datums = (Datum *) palloc(sizeof(Datum) * n);
00724     heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
00725     toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
00726     toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
00727     newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
00728     leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
00729 
00730     xlrec.node = index->rd_node;
00731     STORE_STATE(state, xlrec.stateSrc);
00732 
00733     /*
00734      * Form list of leaf tuples which will be distributed as split result;
00735      * also, count up the amount of space that will be freed from current.
00736      * (Note that in the non-root case, we won't actually delete the old
00737      * tuples, only replace them with redirects or placeholders.)
00738      *
00739      * Note: the SGLTDATUM calls here are safe even when dealing with a nulls
00740      * page.  For a pass-by-value data type we will fetch a word that must
00741      * exist even though it may contain garbage (because of the fact that leaf
00742      * tuples must have size at least SGDTSIZE).  For a pass-by-reference type
00743      * we are just computing a pointer that isn't going to get dereferenced.
00744      * So it's not worth guarding the calls with isNulls checks.
00745      */
00746     nToInsert = 0;
00747     nToDelete = 0;
00748     spaceToDelete = 0;
00749     if (SpGistBlockIsRoot(current->blkno))
00750     {
00751         /*
00752          * We are splitting the root (which up to now is also a leaf page).
00753          * Its tuples are not linked, so scan sequentially to get them all. We
00754          * ignore the original value of current->offnum.
00755          */
00756         for (i = FirstOffsetNumber; i <= max; i++)
00757         {
00758             SpGistLeafTuple it;
00759 
00760             it = (SpGistLeafTuple) PageGetItem(current->page,
00761                                             PageGetItemId(current->page, i));
00762             if (it->tupstate == SPGIST_LIVE)
00763             {
00764                 in.datums[nToInsert] = SGLTDATUM(it, state);
00765                 heapPtrs[nToInsert] = it->heapPtr;
00766                 nToInsert++;
00767                 toDelete[nToDelete] = i;
00768                 nToDelete++;
00769                 /* we will delete the tuple altogether, so count full space */
00770                 spaceToDelete += it->size + sizeof(ItemIdData);
00771             }
00772             else    /* tuples on root should be live */
00773                 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
00774         }
00775     }
00776     else
00777     {
00778         /* Normal case, just collect the leaf tuples in the chain */
00779         i = current->offnum;
00780         while (i != InvalidOffsetNumber)
00781         {
00782             SpGistLeafTuple it;
00783 
00784             Assert(i >= FirstOffsetNumber && i <= max);
00785             it = (SpGistLeafTuple) PageGetItem(current->page,
00786                                             PageGetItemId(current->page, i));
00787             if (it->tupstate == SPGIST_LIVE)
00788             {
00789                 in.datums[nToInsert] = SGLTDATUM(it, state);
00790                 heapPtrs[nToInsert] = it->heapPtr;
00791                 nToInsert++;
00792                 toDelete[nToDelete] = i;
00793                 nToDelete++;
00794                 /* we will not delete the tuple, only replace with dead */
00795                 Assert(it->size >= SGDTSIZE);
00796                 spaceToDelete += it->size - SGDTSIZE;
00797             }
00798             else if (it->tupstate == SPGIST_DEAD)
00799             {
00800                 /* We could see a DEAD tuple as first/only chain item */
00801                 Assert(i == current->offnum);
00802                 Assert(it->nextOffset == InvalidOffsetNumber);
00803                 toDelete[nToDelete] = i;
00804                 nToDelete++;
00805                 /* replacing it with redirect will save no space */
00806             }
00807             else
00808                 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
00809 
00810             i = it->nextOffset;
00811         }
00812     }
00813     in.nTuples = nToInsert;
00814 
00815     /*
00816      * We may not actually insert new tuple because another picksplit may be
00817      * necessary due to too large value, but we will try to allocate enough
00818      * space to include it; and in any case it has to be included in the input
00819      * for the picksplit function.  So don't increment nToInsert yet.
00820      */
00821     in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
00822     heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
00823     in.nTuples++;
00824 
00825     memset(&out, 0, sizeof(out));
00826 
00827     if (!isNulls)
00828     {
00829         /*
00830          * Perform split using user-defined method.
00831          */
00832         procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
00833         FunctionCall2Coll(procinfo,
00834                           index->rd_indcollation[0],
00835                           PointerGetDatum(&in),
00836                           PointerGetDatum(&out));
00837 
00838         /*
00839          * Form new leaf tuples and count up the total space needed.
00840          */
00841         totalLeafSizes = 0;
00842         for (i = 0; i < in.nTuples; i++)
00843         {
00844             newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
00845                                            out.leafTupleDatums[i],
00846                                            false);
00847             totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
00848         }
00849     }
00850     else
00851     {
00852         /*
00853          * Perform dummy split that puts all tuples into one node.
00854          * checkAllTheSame will override this and force allTheSame mode.
00855          */
00856         out.hasPrefix = false;
00857         out.nNodes = 1;
00858         out.nodeLabels = NULL;
00859         out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
00860 
00861         /*
00862          * Form new leaf tuples and count up the total space needed.
00863          */
00864         totalLeafSizes = 0;
00865         for (i = 0; i < in.nTuples; i++)
00866         {
00867             newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
00868                                            (Datum) 0,
00869                                            true);
00870             totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
00871         }
00872     }
00873 
00874     /*
00875      * Check to see if the picksplit function failed to separate the values,
00876      * ie, it put them all into the same child node.  If so, select allTheSame
00877      * mode and create a random split instead.  See comments for
00878      * checkAllTheSame as to why we need to know if the new leaf tuples could
00879      * fit on one page.
00880      */
00881     allTheSame = checkAllTheSame(&in, &out,
00882                                  totalLeafSizes > SPGIST_PAGE_CAPACITY,
00883                                  &includeNew);
00884 
00885     /*
00886      * If checkAllTheSame decided we must exclude the new tuple, don't
00887      * consider it any further.
00888      */
00889     if (includeNew)
00890         maxToInclude = in.nTuples;
00891     else
00892     {
00893         maxToInclude = in.nTuples - 1;
00894         totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
00895     }
00896 
00897     /*
00898      * Allocate per-node work arrays.  Since checkAllTheSame could replace
00899      * out.nNodes with a value larger than the number of tuples on the input
00900      * page, we can't allocate these arrays before here.
00901      */
00902     nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
00903     leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
00904 
00905     /*
00906      * Form nodes of inner tuple and inner tuple itself
00907      */
00908     for (i = 0; i < out.nNodes; i++)
00909     {
00910         Datum       label = (Datum) 0;
00911         bool        labelisnull = (out.nodeLabels == NULL);
00912 
00913         if (!labelisnull)
00914             label = out.nodeLabels[i];
00915         nodes[i] = spgFormNodeTuple(state, label, labelisnull);
00916     }
00917     innerTuple = spgFormInnerTuple(state,
00918                                    out.hasPrefix, out.prefixDatum,
00919                                    out.nNodes, nodes);
00920     innerTuple->allTheSame = allTheSame;
00921 
00922     /*
00923      * Update nodes[] array to point into the newly formed innerTuple, so that
00924      * we can adjust their downlinks below.
00925      */
00926     SGITITERATE(innerTuple, i, node)
00927     {
00928         nodes[i] = node;
00929     }
00930 
00931     /*
00932      * Re-scan new leaf tuples and count up the space needed under each node.
00933      */
00934     for (i = 0; i < maxToInclude; i++)
00935     {
00936         n = out.mapTuplesToNodes[i];
00937         if (n < 0 || n >= out.nNodes)
00938             elog(ERROR, "inconsistent result of SPGiST picksplit function");
00939         leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
00940     }
00941 
00942     /*
00943      * To perform the split, we must insert a new inner tuple, which can't go
00944      * on a leaf page; and unless we are splitting the root page, we must then
00945      * update the parent tuple's downlink to point to the inner tuple.  If
00946      * there is room, we'll put the new inner tuple on the same page as the
00947      * parent tuple, otherwise we need another non-leaf buffer. But if the
00948      * parent page is the root, we can't add the new inner tuple there,
00949      * because the root page must have only one inner tuple.
00950      */
00951     xlrec.initInner = false;
00952     if (parent->buffer != InvalidBuffer &&
00953         !SpGistBlockIsRoot(parent->blkno) &&
00954         (SpGistPageGetFreeSpace(parent->page, 1) >=
00955          innerTuple->size + sizeof(ItemIdData)))
00956     {
00957         /* New inner tuple will fit on parent page */
00958         newInnerBuffer = parent->buffer;
00959     }
00960     else if (parent->buffer != InvalidBuffer)
00961     {
00962         /* Send tuple to page with next triple parity (see README) */
00963         newInnerBuffer = SpGistGetBuffer(index,
00964                                        GBUF_INNER_PARITY(parent->blkno + 1) |
00965                                          (isNulls ? GBUF_NULLS : 0),
00966                                        innerTuple->size + sizeof(ItemIdData),
00967                                          &xlrec.initInner);
00968     }
00969     else
00970     {
00971         /* Root page split ... inner tuple will go to root page */
00972         newInnerBuffer = InvalidBuffer;
00973     }
00974 
00975     /*
00976      * Because a WAL record can't involve more than four buffers, we can only
00977      * afford to deal with two leaf pages in each picksplit action, ie the
00978      * current page and at most one other.
00979      *
00980      * The new leaf tuples converted from the existing ones should require the
00981      * same or less space, and therefore should all fit onto one page
00982      * (although that's not necessarily the current page, since we can't
00983      * delete the old tuples but only replace them with placeholders).
00984      * However, the incoming new tuple might not also fit, in which case we
00985      * might need another picksplit cycle to reduce it some more.
00986      *
00987      * If there's not room to put everything back onto the current page, then
00988      * we decide on a per-node basis which tuples go to the new page. (We do
00989      * it like that because leaf tuple chains can't cross pages, so we must
00990      * place all leaf tuples belonging to the same parent node on the same
00991      * page.)
00992      *
00993      * If we are splitting the root page (turning it from a leaf page into an
00994      * inner page), then no leaf tuples can go back to the current page; they
00995      * must all go somewhere else.
00996      */
00997     if (!SpGistBlockIsRoot(current->blkno))
00998         currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
00999     else
01000         currentFreeSpace = 0;   /* prevent assigning any tuples to current */
01001 
01002     xlrec.initDest = false;
01003 
01004     if (totalLeafSizes <= currentFreeSpace)
01005     {
01006         /* All the leaf tuples will fit on current page */
01007         newLeafBuffer = InvalidBuffer;
01008         /* mark new leaf tuple as included in insertions, if allowed */
01009         if (includeNew)
01010         {
01011             nToInsert++;
01012             insertedNew = true;
01013         }
01014         for (i = 0; i < nToInsert; i++)
01015             leafPageSelect[i] = 0;      /* signifies current page */
01016     }
01017     else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
01018     {
01019         /*
01020          * We're trying to split up a long value by repeated suffixing, but
01021          * it's not going to fit yet.  Don't bother allocating a second leaf
01022          * buffer that we won't be able to use.
01023          */
01024         newLeafBuffer = InvalidBuffer;
01025         Assert(includeNew);
01026         Assert(nToInsert == 0);
01027     }
01028     else
01029     {
01030         /* We will need another leaf page */
01031         uint8      *nodePageSelect;
01032         int         curspace;
01033         int         newspace;
01034 
01035         newLeafBuffer = SpGistGetBuffer(index,
01036                                       GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
01037                                         Min(totalLeafSizes,
01038                                             SPGIST_PAGE_CAPACITY),
01039                                         &xlrec.initDest);
01040 
01041         /*
01042          * Attempt to assign node groups to the two pages.  We might fail to
01043          * do so, even if totalLeafSizes is less than the available space,
01044          * because we can't split a group across pages.
01045          */
01046         nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
01047 
01048         curspace = currentFreeSpace;
01049         newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
01050         for (i = 0; i < out.nNodes; i++)
01051         {
01052             if (leafSizes[i] <= curspace)
01053             {
01054                 nodePageSelect[i] = 0;  /* signifies current page */
01055                 curspace -= leafSizes[i];
01056             }
01057             else
01058             {
01059                 nodePageSelect[i] = 1;  /* signifies new leaf page */
01060                 newspace -= leafSizes[i];
01061             }
01062         }
01063         if (curspace >= 0 && newspace >= 0)
01064         {
01065             /* Successful assignment, so we can include the new leaf tuple */
01066             if (includeNew)
01067             {
01068                 nToInsert++;
01069                 insertedNew = true;
01070             }
01071         }
01072         else if (includeNew)
01073         {
01074             /* We must exclude the new leaf tuple from the split */
01075             int         nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
01076 
01077             leafSizes[nodeOfNewTuple] -=
01078                 newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
01079 
01080             /* Repeat the node assignment process --- should succeed now */
01081             curspace = currentFreeSpace;
01082             newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
01083             for (i = 0; i < out.nNodes; i++)
01084             {
01085                 if (leafSizes[i] <= curspace)
01086                 {
01087                     nodePageSelect[i] = 0;      /* signifies current page */
01088                     curspace -= leafSizes[i];
01089                 }
01090                 else
01091                 {
01092                     nodePageSelect[i] = 1;      /* signifies new leaf page */
01093                     newspace -= leafSizes[i];
01094                 }
01095             }
01096             if (curspace < 0 || newspace < 0)
01097                 elog(ERROR, "failed to divide leaf tuple groups across pages");
01098         }
01099         else
01100         {
01101             /* oops, we already excluded new tuple ... should not get here */
01102             elog(ERROR, "failed to divide leaf tuple groups across pages");
01103         }
01104         /* Expand the per-node assignments to be shown per leaf tuple */
01105         for (i = 0; i < nToInsert; i++)
01106         {
01107             n = out.mapTuplesToNodes[i];
01108             leafPageSelect[i] = nodePageSelect[n];
01109         }
01110     }
01111 
01112     /* Start preparing WAL record */
01113     xlrec.blknoSrc = current->blkno;
01114     xlrec.blknoDest = InvalidBlockNumber;
01115     xlrec.nDelete = 0;
01116     xlrec.initSrc = isNew;
01117     xlrec.storesNulls = isNulls;
01118 
01119     leafdata = leafptr = (char *) palloc(totalLeafSizes);
01120 
01121     ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0);
01122     ACCEPT_RDATA_DATA(innerTuple, innerTuple->size, 1);
01123     nRdata = 2;
01124 
01125     /* Here we begin making the changes to the target pages */
01126     START_CRIT_SECTION();
01127 
01128     /*
01129      * Delete old leaf tuples from current buffer, except when we're splitting
01130      * the root; in that case there's no need because we'll re-init the page
01131      * below.  We do this first to make room for reinserting new leaf tuples.
01132      */
01133     if (!SpGistBlockIsRoot(current->blkno))
01134     {
01135         /*
01136          * Init buffer instead of deleting individual tuples, but only if
01137          * there aren't any other live tuples and only during build; otherwise
01138          * we need to set a redirection tuple for concurrent scans.
01139          */
01140         if (state->isBuild &&
01141             nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
01142             PageGetMaxOffsetNumber(current->page))
01143         {
01144             SpGistInitBuffer(current->buffer,
01145                              SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
01146             xlrec.initSrc = true;
01147         }
01148         else if (isNew)
01149         {
01150             /* don't expose the freshly init'd buffer as a backup block */
01151             Assert(nToDelete == 0);
01152         }
01153         else
01154         {
01155             xlrec.nDelete = nToDelete;
01156             ACCEPT_RDATA_DATA(toDelete,
01157                               MAXALIGN(sizeof(OffsetNumber) * nToDelete),
01158                               nRdata);
01159             nRdata++;
01160             ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
01161             nRdata++;
01162 
01163             if (!state->isBuild)
01164             {
01165                 /*
01166                  * Need to create redirect tuple (it will point to new inner
01167                  * tuple) but right now the new tuple's location is not known
01168                  * yet.  So, set the redirection pointer to "impossible" value
01169                  * and remember its position to update tuple later.
01170                  */
01171                 if (nToDelete > 0)
01172                     redirectTuplePos = toDelete[0];
01173                 spgPageIndexMultiDelete(state, current->page,
01174                                         toDelete, nToDelete,
01175                                         SPGIST_REDIRECT,
01176                                         SPGIST_PLACEHOLDER,
01177                                         SPGIST_METAPAGE_BLKNO,
01178                                         FirstOffsetNumber);
01179             }
01180             else
01181             {
01182                 /*
01183                  * During index build there is not concurrent searches, so we
01184                  * don't need to create redirection tuple.
01185                  */
01186                 spgPageIndexMultiDelete(state, current->page,
01187                                         toDelete, nToDelete,
01188                                         SPGIST_PLACEHOLDER,
01189                                         SPGIST_PLACEHOLDER,
01190                                         InvalidBlockNumber,
01191                                         InvalidOffsetNumber);
01192             }
01193         }
01194     }
01195 
01196     /*
01197      * Put leaf tuples on proper pages, and update downlinks in innerTuple's
01198      * nodes.
01199      */
01200     startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
01201     for (i = 0; i < nToInsert; i++)
01202     {
01203         SpGistLeafTuple it = newLeafs[i];
01204         Buffer      leafBuffer;
01205         BlockNumber leafBlock;
01206         OffsetNumber newoffset;
01207 
01208         /* Which page is it going to? */
01209         leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
01210         leafBlock = BufferGetBlockNumber(leafBuffer);
01211 
01212         /* Link tuple into correct chain for its node */
01213         n = out.mapTuplesToNodes[i];
01214 
01215         if (ItemPointerIsValid(&nodes[n]->t_tid))
01216         {
01217             Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
01218             it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
01219         }
01220         else
01221             it->nextOffset = InvalidOffsetNumber;
01222 
01223         /* Insert it on page */
01224         newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
01225                                          (Item) it, it->size,
01226                                          &startOffsets[leafPageSelect[i]],
01227                                          false);
01228         toInsert[i] = newoffset;
01229 
01230         /* ... and complete the chain linking */
01231         ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
01232 
01233         /* Also copy leaf tuple into WAL data */
01234         memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
01235         leafptr += newLeafs[i]->size;
01236     }
01237 
01238     /*
01239      * We're done modifying the other leaf buffer (if any), so mark it dirty.
01240      * current->buffer will be marked below, after we're entirely done
01241      * modifying it.
01242      */
01243     if (newLeafBuffer != InvalidBuffer)
01244     {
01245         MarkBufferDirty(newLeafBuffer);
01246         /* also save block number for WAL */
01247         xlrec.blknoDest = BufferGetBlockNumber(newLeafBuffer);
01248         if (!xlrec.initDest)
01249         {
01250             ACCEPT_RDATA_BUFFER(newLeafBuffer, nRdata);
01251             nRdata++;
01252         }
01253     }
01254 
01255     xlrec.nInsert = nToInsert;
01256     ACCEPT_RDATA_DATA(toInsert,
01257                       MAXALIGN(sizeof(OffsetNumber) * nToInsert),
01258                       nRdata);
01259     nRdata++;
01260     ACCEPT_RDATA_DATA(leafPageSelect,
01261                       MAXALIGN(sizeof(uint8) * nToInsert),
01262                       nRdata);
01263     nRdata++;
01264     ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, nRdata);
01265     nRdata++;
01266 
01267     /* Remember current buffer, since we're about to change "current" */
01268     saveCurrent = *current;
01269 
01270     /*
01271      * Store the new innerTuple
01272      */
01273     if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
01274     {
01275         /*
01276          * new inner tuple goes to parent page
01277          */
01278         Assert(current->buffer != parent->buffer);
01279 
01280         /* Repoint "current" at the new inner tuple */
01281         current->blkno = parent->blkno;
01282         current->buffer = parent->buffer;
01283         current->page = parent->page;
01284         xlrec.blknoInner = current->blkno;
01285         xlrec.offnumInner = current->offnum =
01286             SpGistPageAddNewItem(state, current->page,
01287                                  (Item) innerTuple, innerTuple->size,
01288                                  NULL, false);
01289 
01290         /*
01291          * Update parent node link and mark parent page dirty
01292          */
01293         xlrec.blknoParent = parent->blkno;
01294         xlrec.offnumParent = parent->offnum;
01295         xlrec.nodeI = parent->node;
01296         saveNodeLink(index, parent, current->blkno, current->offnum);
01297 
01298         ACCEPT_RDATA_BUFFER(parent->buffer, nRdata);
01299         nRdata++;
01300 
01301         /*
01302          * Update redirection link (in old current buffer)
01303          */
01304         if (redirectTuplePos != InvalidOffsetNumber)
01305             setRedirectionTuple(&saveCurrent, redirectTuplePos,
01306                                 current->blkno, current->offnum);
01307 
01308         /* Done modifying old current buffer, mark it dirty */
01309         MarkBufferDirty(saveCurrent.buffer);
01310     }
01311     else if (parent->buffer != InvalidBuffer)
01312     {
01313         /*
01314          * new inner tuple will be stored on a new page
01315          */
01316         Assert(newInnerBuffer != InvalidBuffer);
01317 
01318         /* Repoint "current" at the new inner tuple */
01319         current->buffer = newInnerBuffer;
01320         current->blkno = BufferGetBlockNumber(current->buffer);
01321         current->page = BufferGetPage(current->buffer);
01322         xlrec.blknoInner = current->blkno;
01323         xlrec.offnumInner = current->offnum =
01324             SpGistPageAddNewItem(state, current->page,
01325                                  (Item) innerTuple, innerTuple->size,
01326                                  NULL, false);
01327 
01328         /* Done modifying new current buffer, mark it dirty */
01329         MarkBufferDirty(current->buffer);
01330 
01331         /*
01332          * Update parent node link and mark parent page dirty
01333          */
01334         xlrec.blknoParent = parent->blkno;
01335         xlrec.offnumParent = parent->offnum;
01336         xlrec.nodeI = parent->node;
01337         saveNodeLink(index, parent, current->blkno, current->offnum);
01338 
01339         ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
01340         nRdata++;
01341         ACCEPT_RDATA_BUFFER(parent->buffer, nRdata);
01342         nRdata++;
01343 
01344         /*
01345          * Update redirection link (in old current buffer)
01346          */
01347         if (redirectTuplePos != InvalidOffsetNumber)
01348             setRedirectionTuple(&saveCurrent, redirectTuplePos,
01349                                 current->blkno, current->offnum);
01350 
01351         /* Done modifying old current buffer, mark it dirty */
01352         MarkBufferDirty(saveCurrent.buffer);
01353     }
01354     else
01355     {
01356         /*
01357          * Splitting root page, which was a leaf but now becomes inner page
01358          * (and so "current" continues to point at it)
01359          */
01360         Assert(SpGistBlockIsRoot(current->blkno));
01361         Assert(redirectTuplePos == InvalidOffsetNumber);
01362 
01363         SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
01364         xlrec.initInner = true;
01365 
01366         xlrec.blknoInner = current->blkno;
01367         xlrec.offnumInner = current->offnum =
01368             PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
01369                         InvalidOffsetNumber, false, false);
01370         if (current->offnum != FirstOffsetNumber)
01371             elog(ERROR, "failed to add item of size %u to SPGiST index page",
01372                  innerTuple->size);
01373 
01374         /* No parent link to update, nor redirection to do */
01375         xlrec.blknoParent = InvalidBlockNumber;
01376         xlrec.offnumParent = InvalidOffsetNumber;
01377         xlrec.nodeI = 0;
01378 
01379         /* Done modifying new current buffer, mark it dirty */
01380         MarkBufferDirty(current->buffer);
01381 
01382         /* saveCurrent doesn't represent a different buffer */
01383         saveCurrent.buffer = InvalidBuffer;
01384     }
01385 
01386     if (RelationNeedsWAL(index))
01387     {
01388         XLogRecPtr  recptr;
01389 
01390         /* Issue the WAL record */
01391         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT, rdata);
01392 
01393         /* Update page LSNs on all affected pages */
01394         if (newLeafBuffer != InvalidBuffer)
01395         {
01396             Page        page = BufferGetPage(newLeafBuffer);
01397 
01398             PageSetLSN(page, recptr);
01399         }
01400 
01401         if (saveCurrent.buffer != InvalidBuffer)
01402         {
01403             Page        page = BufferGetPage(saveCurrent.buffer);
01404 
01405             PageSetLSN(page, recptr);
01406         }
01407 
01408         PageSetLSN(current->page, recptr);
01409 
01410         if (parent->buffer != InvalidBuffer)
01411         {
01412             PageSetLSN(parent->page, recptr);
01413         }
01414     }
01415 
01416     END_CRIT_SECTION();
01417 
01418     /* Update local free-space cache and unlock buffers */
01419     if (newLeafBuffer != InvalidBuffer)
01420     {
01421         SpGistSetLastUsedPage(index, newLeafBuffer);
01422         UnlockReleaseBuffer(newLeafBuffer);
01423     }
01424     if (saveCurrent.buffer != InvalidBuffer)
01425     {
01426         SpGistSetLastUsedPage(index, saveCurrent.buffer);
01427         UnlockReleaseBuffer(saveCurrent.buffer);
01428     }
01429 
01430     return insertedNew;
01431 }
01432 
01433 /*
01434  * spgMatchNode action: descend to N'th child node of current inner tuple
01435  */
01436 static void
01437 spgMatchNodeAction(Relation index, SpGistState *state,
01438                    SpGistInnerTuple innerTuple,
01439                    SPPageDesc *current, SPPageDesc *parent, int nodeN)
01440 {
01441     int         i;
01442     SpGistNodeTuple node;
01443 
01444     /* Release previous parent buffer if any */
01445     if (parent->buffer != InvalidBuffer &&
01446         parent->buffer != current->buffer)
01447     {
01448         SpGistSetLastUsedPage(index, parent->buffer);
01449         UnlockReleaseBuffer(parent->buffer);
01450     }
01451 
01452     /* Repoint parent to specified node of current inner tuple */
01453     parent->blkno = current->blkno;
01454     parent->buffer = current->buffer;
01455     parent->page = current->page;
01456     parent->offnum = current->offnum;
01457     parent->node = nodeN;
01458 
01459     /* Locate that node */
01460     SGITITERATE(innerTuple, i, node)
01461     {
01462         if (i == nodeN)
01463             break;
01464     }
01465 
01466     if (i != nodeN)
01467         elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
01468              nodeN);
01469 
01470     /* Point current to the downlink location, if any */
01471     if (ItemPointerIsValid(&node->t_tid))
01472     {
01473         current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
01474         current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
01475     }
01476     else
01477     {
01478         /* Downlink is empty, so we'll need to find a new page */
01479         current->blkno = InvalidBlockNumber;
01480         current->offnum = InvalidOffsetNumber;
01481     }
01482 
01483     current->buffer = InvalidBuffer;
01484     current->page = NULL;
01485 }
01486 
01487 /*
01488  * spgAddNode action: add a node to the inner tuple at current
01489  */
01490 static void
01491 spgAddNodeAction(Relation index, SpGistState *state,
01492                  SpGistInnerTuple innerTuple,
01493                  SPPageDesc *current, SPPageDesc *parent,
01494                  int nodeN, Datum nodeLabel)
01495 {
01496     SpGistInnerTuple newInnerTuple;
01497     XLogRecData rdata[5];
01498     spgxlogAddNode xlrec;
01499 
01500     /* Should not be applied to nulls */
01501     Assert(!SpGistPageStoresNulls(current->page));
01502 
01503     /* Construct new inner tuple with additional node */
01504     newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
01505 
01506     /* Prepare WAL record */
01507     xlrec.node = index->rd_node;
01508     STORE_STATE(state, xlrec.stateSrc);
01509     xlrec.blkno = current->blkno;
01510     xlrec.offnum = current->offnum;
01511 
01512     /* we don't fill these unless we need to change the parent downlink */
01513     xlrec.blknoParent = InvalidBlockNumber;
01514     xlrec.offnumParent = InvalidOffsetNumber;
01515     xlrec.nodeI = 0;
01516 
01517     /* we don't fill these unless tuple has to be moved */
01518     xlrec.blknoNew = InvalidBlockNumber;
01519     xlrec.offnumNew = InvalidOffsetNumber;
01520     xlrec.newPage = false;
01521 
01522     ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
01523     /* we assume sizeof(xlrec) is at least int-aligned */
01524     ACCEPT_RDATA_DATA(newInnerTuple, newInnerTuple->size, 1);
01525     ACCEPT_RDATA_BUFFER(current->buffer, 2);
01526 
01527     if (PageGetExactFreeSpace(current->page) >=
01528         newInnerTuple->size - innerTuple->size)
01529     {
01530         /*
01531          * We can replace the inner tuple by new version in-place
01532          */
01533         START_CRIT_SECTION();
01534 
01535         PageIndexTupleDelete(current->page, current->offnum);
01536         if (PageAddItem(current->page,
01537                         (Item) newInnerTuple, newInnerTuple->size,
01538                         current->offnum, false, false) != current->offnum)
01539             elog(ERROR, "failed to add item of size %u to SPGiST index page",
01540                  newInnerTuple->size);
01541 
01542         MarkBufferDirty(current->buffer);
01543 
01544         if (RelationNeedsWAL(index))
01545         {
01546             XLogRecPtr  recptr;
01547 
01548             recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
01549 
01550             PageSetLSN(current->page, recptr);
01551         }
01552 
01553         END_CRIT_SECTION();
01554     }
01555     else
01556     {
01557         /*
01558          * move inner tuple to another page, and update parent
01559          */
01560         SpGistDeadTuple dt;
01561         SPPageDesc  saveCurrent;
01562 
01563         /*
01564          * It should not be possible to get here for the root page, since we
01565          * allow only one inner tuple on the root page, and spgFormInnerTuple
01566          * always checks that inner tuples don't exceed the size of a page.
01567          */
01568         if (SpGistBlockIsRoot(current->blkno))
01569             elog(ERROR, "cannot enlarge root tuple any more");
01570         Assert(parent->buffer != InvalidBuffer);
01571 
01572         saveCurrent = *current;
01573 
01574         xlrec.blknoParent = parent->blkno;
01575         xlrec.offnumParent = parent->offnum;
01576         xlrec.nodeI = parent->node;
01577 
01578         /*
01579          * obtain new buffer with the same parity as current, since it will be
01580          * a child of same parent tuple
01581          */
01582         current->buffer = SpGistGetBuffer(index,
01583                                           GBUF_INNER_PARITY(current->blkno),
01584                                     newInnerTuple->size + sizeof(ItemIdData),
01585                                           &xlrec.newPage);
01586         current->blkno = BufferGetBlockNumber(current->buffer);
01587         current->page = BufferGetPage(current->buffer);
01588 
01589         xlrec.blknoNew = current->blkno;
01590 
01591         /*
01592          * Let's just make real sure new current isn't same as old.  Right now
01593          * that's impossible, but if SpGistGetBuffer ever got smart enough to
01594          * delete placeholder tuples before checking space, maybe it wouldn't
01595          * be impossible.  The case would appear to work except that WAL
01596          * replay would be subtly wrong, so I think a mere assert isn't enough
01597          * here.
01598          */
01599         if (xlrec.blknoNew == xlrec.blkno)
01600             elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
01601 
01602         /*
01603          * New current and parent buffer will both be modified; but note that
01604          * parent buffer could be same as either new or old current.
01605          */
01606         ACCEPT_RDATA_BUFFER(current->buffer, 3);
01607         if (parent->buffer != current->buffer &&
01608             parent->buffer != saveCurrent.buffer)
01609             ACCEPT_RDATA_BUFFER(parent->buffer, 4);
01610 
01611         START_CRIT_SECTION();
01612 
01613         /* insert new ... */
01614         xlrec.offnumNew = current->offnum =
01615             SpGistPageAddNewItem(state, current->page,
01616                                  (Item) newInnerTuple, newInnerTuple->size,
01617                                  NULL, false);
01618 
01619         MarkBufferDirty(current->buffer);
01620 
01621         /* update parent's downlink and mark parent page dirty */
01622         saveNodeLink(index, parent, current->blkno, current->offnum);
01623 
01624         /*
01625          * Replace old tuple with a placeholder or redirection tuple.  Unless
01626          * doing an index build, we have to insert a redirection tuple for
01627          * possible concurrent scans.  We can't just delete it in any case,
01628          * because that could change the offsets of other tuples on the page,
01629          * breaking downlinks from their parents.
01630          */
01631         if (state->isBuild)
01632             dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
01633                                   InvalidBlockNumber, InvalidOffsetNumber);
01634         else
01635             dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
01636                                   current->blkno, current->offnum);
01637 
01638         PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
01639         if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
01640                         saveCurrent.offnum,
01641                         false, false) != saveCurrent.offnum)
01642             elog(ERROR, "failed to add item of size %u to SPGiST index page",
01643                  dt->size);
01644 
01645         if (state->isBuild)
01646             SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
01647         else
01648             SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
01649 
01650         MarkBufferDirty(saveCurrent.buffer);
01651 
01652         if (RelationNeedsWAL(index))
01653         {
01654             XLogRecPtr  recptr;
01655 
01656             recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
01657 
01658             /* we don't bother to check if any of these are redundant */
01659             PageSetLSN(current->page, recptr);
01660             PageSetLSN(parent->page, recptr);
01661             PageSetLSN(saveCurrent.page, recptr);
01662         }
01663 
01664         END_CRIT_SECTION();
01665 
01666         /* Release saveCurrent if it's not same as current or parent */
01667         if (saveCurrent.buffer != current->buffer &&
01668             saveCurrent.buffer != parent->buffer)
01669         {
01670             SpGistSetLastUsedPage(index, saveCurrent.buffer);
01671             UnlockReleaseBuffer(saveCurrent.buffer);
01672         }
01673     }
01674 }
01675 
01676 /*
01677  * spgSplitNode action: split inner tuple at current into prefix and postfix
01678  */
01679 static void
01680 spgSplitNodeAction(Relation index, SpGistState *state,
01681                    SpGistInnerTuple innerTuple,
01682                    SPPageDesc *current, spgChooseOut *out)
01683 {
01684     SpGistInnerTuple prefixTuple,
01685                 postfixTuple;
01686     SpGistNodeTuple node,
01687                *nodes;
01688     BlockNumber postfixBlkno;
01689     OffsetNumber postfixOffset;
01690     int         i;
01691     XLogRecData rdata[5];
01692     spgxlogSplitTuple xlrec;
01693     Buffer      newBuffer = InvalidBuffer;
01694 
01695     /* Should not be applied to nulls */
01696     Assert(!SpGistPageStoresNulls(current->page));
01697 
01698     /*
01699      * Construct new prefix tuple, containing a single node with the specified
01700      * label.  (We'll update the node's downlink to point to the new postfix
01701      * tuple, below.)
01702      */
01703     node = spgFormNodeTuple(state, out->result.splitTuple.nodeLabel, false);
01704 
01705     prefixTuple = spgFormInnerTuple(state,
01706                                     out->result.splitTuple.prefixHasPrefix,
01707                                     out->result.splitTuple.prefixPrefixDatum,
01708                                     1, &node);
01709 
01710     /* it must fit in the space that innerTuple now occupies */
01711     if (prefixTuple->size > innerTuple->size)
01712         elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
01713 
01714     /*
01715      * Construct new postfix tuple, containing all nodes of innerTuple with
01716      * same node datums, but with the prefix specified by the picksplit
01717      * function.
01718      */
01719     nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
01720     SGITITERATE(innerTuple, i, node)
01721     {
01722         nodes[i] = node;
01723     }
01724 
01725     postfixTuple = spgFormInnerTuple(state,
01726                                      out->result.splitTuple.postfixHasPrefix,
01727                                    out->result.splitTuple.postfixPrefixDatum,
01728                                      innerTuple->nNodes, nodes);
01729 
01730     /* Postfix tuple is allTheSame if original tuple was */
01731     postfixTuple->allTheSame = innerTuple->allTheSame;
01732 
01733     /* prep data for WAL record */
01734     xlrec.node = index->rd_node;
01735     xlrec.newPage = false;
01736 
01737     ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
01738     /* we assume sizeof(xlrec) is at least int-aligned */
01739     ACCEPT_RDATA_DATA(prefixTuple, prefixTuple->size, 1);
01740     ACCEPT_RDATA_DATA(postfixTuple, postfixTuple->size, 2);
01741     ACCEPT_RDATA_BUFFER(current->buffer, 3);
01742 
01743     /*
01744      * If we can't fit both tuples on the current page, get a new page for the
01745      * postfix tuple.  In particular, can't split to the root page.
01746      *
01747      * For the space calculation, note that prefixTuple replaces innerTuple
01748      * but postfixTuple will be a new entry.
01749      */
01750     if (SpGistBlockIsRoot(current->blkno) ||
01751         SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
01752         prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
01753     {
01754         /*
01755          * Choose page with next triple parity, because postfix tuple is a
01756          * child of prefix one
01757          */
01758         newBuffer = SpGistGetBuffer(index,
01759                                     GBUF_INNER_PARITY(current->blkno + 1),
01760                                     postfixTuple->size + sizeof(ItemIdData),
01761                                     &xlrec.newPage);
01762         ACCEPT_RDATA_BUFFER(newBuffer, 4);
01763     }
01764 
01765     START_CRIT_SECTION();
01766 
01767     /*
01768      * Replace old tuple by prefix tuple
01769      */
01770     PageIndexTupleDelete(current->page, current->offnum);
01771     xlrec.offnumPrefix = PageAddItem(current->page,
01772                                      (Item) prefixTuple, prefixTuple->size,
01773                                      current->offnum, false, false);
01774     if (xlrec.offnumPrefix != current->offnum)
01775         elog(ERROR, "failed to add item of size %u to SPGiST index page",
01776              prefixTuple->size);
01777     xlrec.blknoPrefix = current->blkno;
01778 
01779     /*
01780      * put postfix tuple into appropriate page
01781      */
01782     if (newBuffer == InvalidBuffer)
01783     {
01784         xlrec.blknoPostfix = postfixBlkno = current->blkno;
01785         xlrec.offnumPostfix = postfixOffset =
01786             SpGistPageAddNewItem(state, current->page,
01787                                  (Item) postfixTuple, postfixTuple->size,
01788                                  NULL, false);
01789     }
01790     else
01791     {
01792         xlrec.blknoPostfix = postfixBlkno = BufferGetBlockNumber(newBuffer);
01793         xlrec.offnumPostfix = postfixOffset =
01794             SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
01795                                  (Item) postfixTuple, postfixTuple->size,
01796                                  NULL, false);
01797         MarkBufferDirty(newBuffer);
01798     }
01799 
01800     /*
01801      * And set downlink pointer in the prefix tuple to point to postfix tuple.
01802      * (We can't avoid this step by doing the above two steps in opposite
01803      * order, because there might not be enough space on the page to insert
01804      * the postfix tuple first.)  We have to update the local copy of the
01805      * prefixTuple too, because that's what will be written to WAL.
01806      */
01807     spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
01808     prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
01809                               PageGetItemId(current->page, current->offnum));
01810     spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
01811 
01812     MarkBufferDirty(current->buffer);
01813 
01814     if (RelationNeedsWAL(index))
01815     {
01816         XLogRecPtr  recptr;
01817 
01818         recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata);
01819 
01820         PageSetLSN(current->page, recptr);
01821 
01822         if (newBuffer != InvalidBuffer)
01823         {
01824             PageSetLSN(BufferGetPage(newBuffer), recptr);
01825         }
01826     }
01827 
01828     END_CRIT_SECTION();
01829 
01830     /* Update local free-space cache and release buffer */
01831     if (newBuffer != InvalidBuffer)
01832     {
01833         SpGistSetLastUsedPage(index, newBuffer);
01834         UnlockReleaseBuffer(newBuffer);
01835     }
01836 }
01837 
01838 /*
01839  * Insert one item into the index
01840  */
01841 void
01842 spgdoinsert(Relation index, SpGistState *state,
01843             ItemPointer heapPtr, Datum datum, bool isnull)
01844 {
01845     int         level = 0;
01846     Datum       leafDatum;
01847     int         leafSize;
01848     SPPageDesc  current,
01849                 parent;
01850     FmgrInfo   *procinfo = NULL;
01851 
01852     /*
01853      * Look up FmgrInfo of the user-defined choose function once, to save
01854      * cycles in the loop below.
01855      */
01856     if (!isnull)
01857         procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
01858 
01859     /*
01860      * Since we don't use index_form_tuple in this AM, we have to make sure
01861      * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
01862      * that.
01863      */
01864     if (!isnull && state->attType.attlen == -1)
01865         datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
01866 
01867     leafDatum = datum;
01868 
01869     /*
01870      * Compute space needed for a leaf tuple containing the given datum.
01871      *
01872      * If it isn't gonna fit, and the opclass can't reduce the datum size by
01873      * suffixing, bail out now rather than getting into an endless loop.
01874      */
01875     if (!isnull)
01876         leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
01877             SpGistGetTypeSize(&state->attType, leafDatum);
01878     else
01879         leafSize = SGDTSIZE + sizeof(ItemIdData);
01880 
01881     if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
01882         ereport(ERROR,
01883                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
01884             errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
01885                    (unsigned long) (leafSize - sizeof(ItemIdData)),
01886                  (unsigned long) (SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
01887                    RelationGetRelationName(index)),
01888             errhint("Values larger than a buffer page cannot be indexed.")));
01889 
01890     /* Initialize "current" to the appropriate root page */
01891     current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
01892     current.buffer = InvalidBuffer;
01893     current.page = NULL;
01894     current.offnum = FirstOffsetNumber;
01895     current.node = -1;
01896 
01897     /* "parent" is invalid for the moment */
01898     parent.blkno = InvalidBlockNumber;
01899     parent.buffer = InvalidBuffer;
01900     parent.page = NULL;
01901     parent.offnum = InvalidOffsetNumber;
01902     parent.node = -1;
01903 
01904     for (;;)
01905     {
01906         bool        isNew = false;
01907 
01908         /*
01909          * Bail out if query cancel is pending.  We must have this somewhere
01910          * in the loop since a broken opclass could produce an infinite
01911          * picksplit loop.
01912          */
01913         CHECK_FOR_INTERRUPTS();
01914 
01915         if (current.blkno == InvalidBlockNumber)
01916         {
01917             /*
01918              * Create a leaf page.  If leafSize is too large to fit on a page,
01919              * we won't actually use the page yet, but it simplifies the API
01920              * for doPickSplit to always have a leaf page at hand; so just
01921              * quietly limit our request to a page size.
01922              */
01923             current.buffer =
01924                 SpGistGetBuffer(index,
01925                                 GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
01926                                 Min(leafSize, SPGIST_PAGE_CAPACITY),
01927                                 &isNew);
01928             current.blkno = BufferGetBlockNumber(current.buffer);
01929         }
01930         else if (parent.buffer == InvalidBuffer ||
01931                  current.blkno != parent.blkno)
01932         {
01933             current.buffer = ReadBuffer(index, current.blkno);
01934             LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
01935         }
01936         else
01937         {
01938             /* inner tuple can be stored on the same page as parent one */
01939             current.buffer = parent.buffer;
01940         }
01941         current.page = BufferGetPage(current.buffer);
01942 
01943         /* should not arrive at a page of the wrong type */
01944         if (isnull ? !SpGistPageStoresNulls(current.page) :
01945             SpGistPageStoresNulls(current.page))
01946             elog(ERROR, "SPGiST index page %u has wrong nulls flag",
01947                  current.blkno);
01948 
01949         if (SpGistPageIsLeaf(current.page))
01950         {
01951             SpGistLeafTuple leafTuple;
01952             int         nToSplit,
01953                         sizeToSplit;
01954 
01955             leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
01956             if (leafTuple->size + sizeof(ItemIdData) <=
01957                 SpGistPageGetFreeSpace(current.page, 1))
01958             {
01959                 /* it fits on page, so insert it and we're done */
01960                 addLeafTuple(index, state, leafTuple,
01961                              &current, &parent, isnull, isNew);
01962                 break;
01963             }
01964             else if ((sizeToSplit =
01965                       checkSplitConditions(index, state, &current,
01966                                     &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
01967                      nToSplit < 64 &&
01968                      leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
01969             {
01970                 /*
01971                  * the amount of data is pretty small, so just move the whole
01972                  * chain to another leaf page rather than splitting it.
01973                  */
01974                 Assert(!isNew);
01975                 moveLeafs(index, state, &current, &parent, leafTuple, isnull);
01976                 break;          /* we're done */
01977             }
01978             else
01979             {
01980                 /* picksplit */
01981                 if (doPickSplit(index, state, &current, &parent,
01982                                 leafTuple, level, isnull, isNew))
01983                     break;      /* doPickSplit installed new tuples */
01984 
01985                 /* leaf tuple will not be inserted yet */
01986                 pfree(leafTuple);
01987 
01988                 /*
01989                  * current now describes new inner tuple, go insert into it
01990                  */
01991                 Assert(!SpGistPageIsLeaf(current.page));
01992                 goto process_inner_tuple;
01993             }
01994         }
01995         else    /* non-leaf page */
01996         {
01997             /*
01998              * Apply the opclass choose function to figure out how to insert
01999              * the given datum into the current inner tuple.
02000              */
02001             SpGistInnerTuple innerTuple;
02002             spgChooseIn in;
02003             spgChooseOut out;
02004 
02005             /*
02006              * spgAddNode and spgSplitTuple cases will loop back to here to
02007              * complete the insertion operation.  Just in case the choose
02008              * function is broken and produces add or split requests
02009              * repeatedly, check for query cancel.
02010              */
02011     process_inner_tuple:
02012             CHECK_FOR_INTERRUPTS();
02013 
02014             innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
02015                                 PageGetItemId(current.page, current.offnum));
02016 
02017             in.datum = datum;
02018             in.leafDatum = leafDatum;
02019             in.level = level;
02020             in.allTheSame = innerTuple->allTheSame;
02021             in.hasPrefix = (innerTuple->prefixSize > 0);
02022             in.prefixDatum = SGITDATUM(innerTuple, state);
02023             in.nNodes = innerTuple->nNodes;
02024             in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
02025 
02026             memset(&out, 0, sizeof(out));
02027 
02028             if (!isnull)
02029             {
02030                 /* use user-defined choose method */
02031                 FunctionCall2Coll(procinfo,
02032                                   index->rd_indcollation[0],
02033                                   PointerGetDatum(&in),
02034                                   PointerGetDatum(&out));
02035             }
02036             else
02037             {
02038                 /* force "match" action (to insert to random subnode) */
02039                 out.resultType = spgMatchNode;
02040             }
02041 
02042             if (innerTuple->allTheSame)
02043             {
02044                 /*
02045                  * It's not allowed to do an AddNode at an allTheSame tuple.
02046                  * Opclass must say "match", in which case we choose a random
02047                  * one of the nodes to descend into, or "split".
02048                  */
02049                 if (out.resultType == spgAddNode)
02050                     elog(ERROR, "cannot add a node to an allTheSame inner tuple");
02051                 else if (out.resultType == spgMatchNode)
02052                     out.result.matchNode.nodeN = random() % innerTuple->nNodes;
02053             }
02054 
02055             switch (out.resultType)
02056             {
02057                 case spgMatchNode:
02058                     /* Descend to N'th child node */
02059                     spgMatchNodeAction(index, state, innerTuple,
02060                                        &current, &parent,
02061                                        out.result.matchNode.nodeN);
02062                     /* Adjust level as per opclass request */
02063                     level += out.result.matchNode.levelAdd;
02064                     /* Replace leafDatum and recompute leafSize */
02065                     if (!isnull)
02066                     {
02067                         leafDatum = out.result.matchNode.restDatum;
02068                         leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
02069                             SpGistGetTypeSize(&state->attType, leafDatum);
02070                     }
02071 
02072                     /*
02073                      * Loop around and attempt to insert the new leafDatum at
02074                      * "current" (which might reference an existing child
02075                      * tuple, or might be invalid to force us to find a new
02076                      * page for the tuple).
02077                      *
02078                      * Note: if the opclass sets longValuesOK, we rely on the
02079                      * choose function to eventually shorten the leafDatum
02080                      * enough to fit on a page.  We could add a test here to
02081                      * complain if the datum doesn't get visibly shorter each
02082                      * time, but that could get in the way of opclasses that
02083                      * "simplify" datums in a way that doesn't necessarily
02084                      * lead to physical shortening on every cycle.
02085                      */
02086                     break;
02087                 case spgAddNode:
02088                     /* AddNode is not sensible if nodes don't have labels */
02089                     if (in.nodeLabels == NULL)
02090                         elog(ERROR, "cannot add a node to an inner tuple without node labels");
02091                     /* Add node to inner tuple, per request */
02092                     spgAddNodeAction(index, state, innerTuple,
02093                                      &current, &parent,
02094                                      out.result.addNode.nodeN,
02095                                      out.result.addNode.nodeLabel);
02096 
02097                     /*
02098                      * Retry insertion into the enlarged node.  We assume that
02099                      * we'll get a MatchNode result this time.
02100                      */
02101                     goto process_inner_tuple;
02102                     break;
02103                 case spgSplitTuple:
02104                     /* Split inner tuple, per request */
02105                     spgSplitNodeAction(index, state, innerTuple,
02106                                        &current, &out);
02107 
02108                     /* Retry insertion into the split node */
02109                     goto process_inner_tuple;
02110                     break;
02111                 default:
02112                     elog(ERROR, "unrecognized SPGiST choose result: %d",
02113                          (int) out.resultType);
02114                     break;
02115             }
02116         }
02117     }                           /* end loop */
02118 
02119     /*
02120      * Release any buffers we're still holding.  Beware of possibility that
02121      * current and parent reference same buffer.
02122      */
02123     if (current.buffer != InvalidBuffer)
02124     {
02125         SpGistSetLastUsedPage(index, current.buffer);
02126         UnlockReleaseBuffer(current.buffer);
02127     }
02128     if (parent.buffer != InvalidBuffer &&
02129         parent.buffer != current.buffer)
02130     {
02131         SpGistSetLastUsedPage(index, parent.buffer);
02132         UnlockReleaseBuffer(parent.buffer);
02133     }
02134 }