Header And Logo

PostgreSQL
| The world's most advanced open source database.

nbtinsert.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * nbtinsert.c
00004  *    Item insertion in Lehman and Yao btrees for Postgres.
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/access/nbtree/nbtinsert.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 
00016 #include "postgres.h"
00017 
00018 #include "access/heapam.h"
00019 #include "access/nbtree.h"
00020 #include "access/transam.h"
00021 #include "miscadmin.h"
00022 #include "storage/lmgr.h"
00023 #include "storage/predicate.h"
00024 #include "utils/inval.h"
00025 #include "utils/tqual.h"
00026 
00027 
00028 typedef struct
00029 {
00030     /* context data for _bt_checksplitloc */
00031     Size        newitemsz;      /* size of new item to be inserted */
00032     int         fillfactor;     /* needed when splitting rightmost page */
00033     bool        is_leaf;        /* T if splitting a leaf page */
00034     bool        is_rightmost;   /* T if splitting a rightmost page */
00035     OffsetNumber newitemoff;    /* where the new item is to be inserted */
00036     int         leftspace;      /* space available for items on left page */
00037     int         rightspace;     /* space available for items on right page */
00038     int         olddataitemstotal;      /* space taken by old items */
00039 
00040     bool        have_split;     /* found a valid split? */
00041 
00042     /* these fields valid only if have_split is true */
00043     bool        newitemonleft;  /* new item on left or right of best split */
00044     OffsetNumber firstright;    /* best split point */
00045     int         best_delta;     /* best size delta so far */
00046 } FindSplitData;
00047 
00048 
00049 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
00050 
00051 static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
00052                  Relation heapRel, Buffer buf, OffsetNumber offset,
00053                  ScanKey itup_scankey,
00054                  IndexUniqueCheck checkUnique, bool *is_unique);
00055 static void _bt_findinsertloc(Relation rel,
00056                   Buffer *bufptr,
00057                   OffsetNumber *offsetptr,
00058                   int keysz,
00059                   ScanKey scankey,
00060                   IndexTuple newtup,
00061                   Relation heapRel);
00062 static void _bt_insertonpg(Relation rel, Buffer buf,
00063                BTStack stack,
00064                IndexTuple itup,
00065                OffsetNumber newitemoff,
00066                bool split_only_page);
00067 static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
00068           OffsetNumber newitemoff, Size newitemsz,
00069           IndexTuple newitem, bool newitemonleft);
00070 static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
00071                  OffsetNumber newitemoff,
00072                  Size newitemsz,
00073                  bool *newitemonleft);
00074 static void _bt_checksplitloc(FindSplitData *state,
00075                   OffsetNumber firstoldonright, bool newitemonleft,
00076                   int dataitemstoleft, Size firstoldonrightsz);
00077 static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
00078              OffsetNumber itup_off);
00079 static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
00080             int keysz, ScanKey scankey);
00081 static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
00082 
00083 
00084 /*
00085  *  _bt_doinsert() -- Handle insertion of a single index tuple in the tree.
00086  *
00087  *      This routine is called by the public interface routines, btbuild
00088  *      and btinsert.  By here, itup is filled in, including the TID.
00089  *
00090  *      If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
00091  *      will allow duplicates.  Otherwise (UNIQUE_CHECK_YES or
00092  *      UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
00093  *      For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
00094  *      don't actually insert.
00095  *
00096  *      The result value is only significant for UNIQUE_CHECK_PARTIAL:
00097  *      it must be TRUE if the entry is known unique, else FALSE.
00098  *      (In the current implementation we'll also return TRUE after a
00099  *      successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
00100  *      that's just a coding artifact.)
00101  */
00102 bool
00103 _bt_doinsert(Relation rel, IndexTuple itup,
00104              IndexUniqueCheck checkUnique, Relation heapRel)
00105 {
00106     bool        is_unique = false;
00107     int         natts = rel->rd_rel->relnatts;
00108     ScanKey     itup_scankey;
00109     BTStack     stack;
00110     Buffer      buf;
00111     OffsetNumber offset;
00112 
00113     /* we need an insertion scan key to do our search, so build one */
00114     itup_scankey = _bt_mkscankey(rel, itup);
00115 
00116 top:
00117     /* find the first page containing this key */
00118     stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
00119 
00120     offset = InvalidOffsetNumber;
00121 
00122     /* trade in our read lock for a write lock */
00123     LockBuffer(buf, BUFFER_LOCK_UNLOCK);
00124     LockBuffer(buf, BT_WRITE);
00125 
00126     /*
00127      * If the page was split between the time that we surrendered our read
00128      * lock and acquired our write lock, then this page may no longer be the
00129      * right place for the key we want to insert.  In this case, we need to
00130      * move right in the tree.  See Lehman and Yao for an excruciatingly
00131      * precise description.
00132      */
00133     buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
00134 
00135     /*
00136      * If we're not allowing duplicates, make sure the key isn't already in
00137      * the index.
00138      *
00139      * NOTE: obviously, _bt_check_unique can only detect keys that are already
00140      * in the index; so it cannot defend against concurrent insertions of the
00141      * same key.  We protect against that by means of holding a write lock on
00142      * the target page.  Any other would-be inserter of the same key must
00143      * acquire a write lock on the same target page, so only one would-be
00144      * inserter can be making the check at one time.  Furthermore, once we are
00145      * past the check we hold write locks continuously until we have performed
00146      * our insertion, so no later inserter can fail to see our insertion.
00147      * (This requires some care in _bt_insertonpg.)
00148      *
00149      * If we must wait for another xact, we release the lock while waiting,
00150      * and then must start over completely.
00151      *
00152      * For a partial uniqueness check, we don't wait for the other xact. Just
00153      * let the tuple in and return false for possibly non-unique, or true for
00154      * definitely unique.
00155      */
00156     if (checkUnique != UNIQUE_CHECK_NO)
00157     {
00158         TransactionId xwait;
00159 
00160         offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
00161         xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
00162                                  checkUnique, &is_unique);
00163 
00164         if (TransactionIdIsValid(xwait))
00165         {
00166             /* Have to wait for the other guy ... */
00167             _bt_relbuf(rel, buf);
00168             XactLockTableWait(xwait);
00169             /* start over... */
00170             _bt_freestack(stack);
00171             goto top;
00172         }
00173     }
00174 
00175     if (checkUnique != UNIQUE_CHECK_EXISTING)
00176     {
00177         /*
00178          * The only conflict predicate locking cares about for indexes is when
00179          * an index tuple insert conflicts with an existing lock.  Since the
00180          * actual location of the insert is hard to predict because of the
00181          * random search used to prevent O(N^2) performance when there are
00182          * many duplicate entries, we can just use the "first valid" page.
00183          */
00184         CheckForSerializableConflictIn(rel, NULL, buf);
00185         /* do the insertion */
00186         _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, heapRel);
00187         _bt_insertonpg(rel, buf, stack, itup, offset, false);
00188     }
00189     else
00190     {
00191         /* just release the buffer */
00192         _bt_relbuf(rel, buf);
00193     }
00194 
00195     /* be tidy */
00196     _bt_freestack(stack);
00197     _bt_freeskey(itup_scankey);
00198 
00199     return is_unique;
00200 }
00201 
00202 /*
00203  *  _bt_check_unique() -- Check for violation of unique index constraint
00204  *
00205  * offset points to the first possible item that could conflict. It can
00206  * also point to end-of-page, which means that the first tuple to check
00207  * is the first tuple on the next page.
00208  *
00209  * Returns InvalidTransactionId if there is no conflict, else an xact ID
00210  * we must wait for to see if it commits a conflicting tuple.   If an actual
00211  * conflict is detected, no return --- just ereport().
00212  *
00213  * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
00214  * InvalidTransactionId because we don't want to wait.  In this case we
00215  * set *is_unique to false if there is a potential conflict, and the
00216  * core code must redo the uniqueness check later.
00217  */
00218 static TransactionId
00219 _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
00220                  Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
00221                  IndexUniqueCheck checkUnique, bool *is_unique)
00222 {
00223     TupleDesc   itupdesc = RelationGetDescr(rel);
00224     int         natts = rel->rd_rel->relnatts;
00225     SnapshotData SnapshotDirty;
00226     OffsetNumber maxoff;
00227     Page        page;
00228     BTPageOpaque opaque;
00229     Buffer      nbuf = InvalidBuffer;
00230     bool        found = false;
00231 
00232     /* Assume unique until we find a duplicate */
00233     *is_unique = true;
00234 
00235     InitDirtySnapshot(SnapshotDirty);
00236 
00237     page = BufferGetPage(buf);
00238     opaque = (BTPageOpaque) PageGetSpecialPointer(page);
00239     maxoff = PageGetMaxOffsetNumber(page);
00240 
00241     /*
00242      * Scan over all equal tuples, looking for live conflicts.
00243      */
00244     for (;;)
00245     {
00246         ItemId      curitemid;
00247         IndexTuple  curitup;
00248         BlockNumber nblkno;
00249 
00250         /*
00251          * make sure the offset points to an actual item before trying to
00252          * examine it...
00253          */
00254         if (offset <= maxoff)
00255         {
00256             curitemid = PageGetItemId(page, offset);
00257 
00258             /*
00259              * We can skip items that are marked killed.
00260              *
00261              * Formerly, we applied _bt_isequal() before checking the kill
00262              * flag, so as to fall out of the item loop as soon as possible.
00263              * However, in the presence of heavy update activity an index may
00264              * contain many killed items with the same key; running
00265              * _bt_isequal() on each killed item gets expensive. Furthermore
00266              * it is likely that the non-killed version of each key appears
00267              * first, so that we didn't actually get to exit any sooner
00268              * anyway. So now we just advance over killed items as quickly as
00269              * we can. We only apply _bt_isequal() when we get to a non-killed
00270              * item or the end of the page.
00271              */
00272             if (!ItemIdIsDead(curitemid))
00273             {
00274                 ItemPointerData htid;
00275                 bool        all_dead;
00276 
00277                 /*
00278                  * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
00279                  * how we handling NULLs - and so we must not use _bt_compare
00280                  * in real comparison, but only for ordering/finding items on
00281                  * pages. - vadim 03/24/97
00282                  */
00283                 if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
00284                     break;      /* we're past all the equal tuples */
00285 
00286                 /* okay, we gotta fetch the heap tuple ... */
00287                 curitup = (IndexTuple) PageGetItem(page, curitemid);
00288                 htid = curitup->t_tid;
00289 
00290                 /*
00291                  * If we are doing a recheck, we expect to find the tuple we
00292                  * are rechecking.  It's not a duplicate, but we have to keep
00293                  * scanning.
00294                  */
00295                 if (checkUnique == UNIQUE_CHECK_EXISTING &&
00296                     ItemPointerCompare(&htid, &itup->t_tid) == 0)
00297                 {
00298                     found = true;
00299                 }
00300 
00301                 /*
00302                  * We check the whole HOT-chain to see if there is any tuple
00303                  * that satisfies SnapshotDirty.  This is necessary because we
00304                  * have just a single index entry for the entire chain.
00305                  */
00306                 else if (heap_hot_search(&htid, heapRel, &SnapshotDirty,
00307                                          &all_dead))
00308                 {
00309                     TransactionId xwait;
00310 
00311                     /*
00312                      * It is a duplicate. If we are only doing a partial
00313                      * check, then don't bother checking if the tuple is being
00314                      * updated in another transaction. Just return the fact
00315                      * that it is a potential conflict and leave the full
00316                      * check till later.
00317                      */
00318                     if (checkUnique == UNIQUE_CHECK_PARTIAL)
00319                     {
00320                         if (nbuf != InvalidBuffer)
00321                             _bt_relbuf(rel, nbuf);
00322                         *is_unique = false;
00323                         return InvalidTransactionId;
00324                     }
00325 
00326                     /*
00327                      * If this tuple is being updated by other transaction
00328                      * then we have to wait for its commit/abort.
00329                      */
00330                     xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
00331                         SnapshotDirty.xmin : SnapshotDirty.xmax;
00332 
00333                     if (TransactionIdIsValid(xwait))
00334                     {
00335                         if (nbuf != InvalidBuffer)
00336                             _bt_relbuf(rel, nbuf);
00337                         /* Tell _bt_doinsert to wait... */
00338                         return xwait;
00339                     }
00340 
00341                     /*
00342                      * Otherwise we have a definite conflict.  But before
00343                      * complaining, look to see if the tuple we want to insert
00344                      * is itself now committed dead --- if so, don't complain.
00345                      * This is a waste of time in normal scenarios but we must
00346                      * do it to support CREATE INDEX CONCURRENTLY.
00347                      *
00348                      * We must follow HOT-chains here because during
00349                      * concurrent index build, we insert the root TID though
00350                      * the actual tuple may be somewhere in the HOT-chain.
00351                      * While following the chain we might not stop at the
00352                      * exact tuple which triggered the insert, but that's OK
00353                      * because if we find a live tuple anywhere in this chain,
00354                      * we have a unique key conflict.  The other live tuple is
00355                      * not part of this chain because it had a different index
00356                      * entry.
00357                      */
00358                     htid = itup->t_tid;
00359                     if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
00360                     {
00361                         /* Normal case --- it's still live */
00362                     }
00363                     else
00364                     {
00365                         /*
00366                          * It's been deleted, so no error, and no need to
00367                          * continue searching
00368                          */
00369                         break;
00370                     }
00371 
00372                     /*
00373                      * This is a definite conflict.  Break the tuple down into
00374                      * datums and report the error.  But first, make sure we
00375                      * release the buffer locks we're holding ---
00376                      * BuildIndexValueDescription could make catalog accesses,
00377                      * which in the worst case might touch this same index and
00378                      * cause deadlocks.
00379                      */
00380                     if (nbuf != InvalidBuffer)
00381                         _bt_relbuf(rel, nbuf);
00382                     _bt_relbuf(rel, buf);
00383 
00384                     {
00385                         Datum       values[INDEX_MAX_KEYS];
00386                         bool        isnull[INDEX_MAX_KEYS];
00387 
00388                         index_deform_tuple(itup, RelationGetDescr(rel),
00389                                            values, isnull);
00390                         ereport(ERROR,
00391                                 (errcode(ERRCODE_UNIQUE_VIOLATION),
00392                                  errmsg("duplicate key value violates unique constraint \"%s\"",
00393                                         RelationGetRelationName(rel)),
00394                                  errdetail("Key %s already exists.",
00395                                            BuildIndexValueDescription(rel,
00396                                                             values, isnull)),
00397                                  errtableconstraint(heapRel,
00398                                              RelationGetRelationName(rel))));
00399                     }
00400                 }
00401                 else if (all_dead)
00402                 {
00403                     /*
00404                      * The conflicting tuple (or whole HOT chain) is dead to
00405                      * everyone, so we may as well mark the index entry
00406                      * killed.
00407                      */
00408                     ItemIdMarkDead(curitemid);
00409                     opaque->btpo_flags |= BTP_HAS_GARBAGE;
00410 
00411                     /*
00412                      * Mark buffer with a dirty hint, since state is not
00413                      * crucial. Be sure to mark the proper buffer dirty.
00414                      */
00415                     if (nbuf != InvalidBuffer)
00416                         MarkBufferDirtyHint(nbuf);
00417                     else
00418                         MarkBufferDirtyHint(buf);
00419                 }
00420             }
00421         }
00422 
00423         /*
00424          * Advance to next tuple to continue checking.
00425          */
00426         if (offset < maxoff)
00427             offset = OffsetNumberNext(offset);
00428         else
00429         {
00430             /* If scankey == hikey we gotta check the next page too */
00431             if (P_RIGHTMOST(opaque))
00432                 break;
00433             if (!_bt_isequal(itupdesc, page, P_HIKEY,
00434                              natts, itup_scankey))
00435                 break;
00436             /* Advance to next non-dead page --- there must be one */
00437             for (;;)
00438             {
00439                 nblkno = opaque->btpo_next;
00440                 nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
00441                 page = BufferGetPage(nbuf);
00442                 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
00443                 if (!P_IGNORE(opaque))
00444                     break;
00445                 if (P_RIGHTMOST(opaque))
00446                     elog(ERROR, "fell off the end of index \"%s\"",
00447                          RelationGetRelationName(rel));
00448             }
00449             maxoff = PageGetMaxOffsetNumber(page);
00450             offset = P_FIRSTDATAKEY(opaque);
00451         }
00452     }
00453 
00454     /*
00455      * If we are doing a recheck then we should have found the tuple we are
00456      * checking.  Otherwise there's something very wrong --- probably, the
00457      * index is on a non-immutable expression.
00458      */
00459     if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
00460         ereport(ERROR,
00461                 (errcode(ERRCODE_INTERNAL_ERROR),
00462                  errmsg("failed to re-find tuple within index \"%s\"",
00463                         RelationGetRelationName(rel)),
00464          errhint("This may be because of a non-immutable index expression."),
00465                  errtableconstraint(heapRel,
00466                                     RelationGetRelationName(rel))));
00467 
00468     if (nbuf != InvalidBuffer)
00469         _bt_relbuf(rel, nbuf);
00470 
00471     return InvalidTransactionId;
00472 }
00473 
00474 
00475 /*
00476  *  _bt_findinsertloc() -- Finds an insert location for a tuple
00477  *
00478  *      If the new key is equal to one or more existing keys, we can
00479  *      legitimately place it anywhere in the series of equal keys --- in fact,
00480  *      if the new key is equal to the page's "high key" we can place it on
00481  *      the next page.  If it is equal to the high key, and there's not room
00482  *      to insert the new tuple on the current page without splitting, then
00483  *      we can move right hoping to find more free space and avoid a split.
00484  *      (We should not move right indefinitely, however, since that leads to
00485  *      O(N^2) insertion behavior in the presence of many equal keys.)
00486  *      Once we have chosen the page to put the key on, we'll insert it before
00487  *      any existing equal keys because of the way _bt_binsrch() works.
00488  *
00489  *      If there's not enough room in the space, we try to make room by
00490  *      removing any LP_DEAD tuples.
00491  *
00492  *      On entry, *buf and *offsetptr point to the first legal position
00493  *      where the new tuple could be inserted.  The caller should hold an
00494  *      exclusive lock on *buf.  *offsetptr can also be set to
00495  *      InvalidOffsetNumber, in which case the function will search for the
00496  *      right location within the page if needed.  On exit, they point to the
00497  *      chosen insert location.  If _bt_findinsertloc decides to move right,
00498  *      the lock and pin on the original page will be released and the new
00499  *      page returned to the caller is exclusively locked instead.
00500  *
00501  *      newtup is the new tuple we're inserting, and scankey is an insertion
00502  *      type scan key for it.
00503  */
00504 static void
00505 _bt_findinsertloc(Relation rel,
00506                   Buffer *bufptr,
00507                   OffsetNumber *offsetptr,
00508                   int keysz,
00509                   ScanKey scankey,
00510                   IndexTuple newtup,
00511                   Relation heapRel)
00512 {
00513     Buffer      buf = *bufptr;
00514     Page        page = BufferGetPage(buf);
00515     Size        itemsz;
00516     BTPageOpaque lpageop;
00517     bool        movedright,
00518                 vacuumed;
00519     OffsetNumber newitemoff;
00520     OffsetNumber firstlegaloff = *offsetptr;
00521 
00522     lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
00523 
00524     itemsz = IndexTupleDSize(*newtup);
00525     itemsz = MAXALIGN(itemsz);  /* be safe, PageAddItem will do this but we
00526                                  * need to be consistent */
00527 
00528     /*
00529      * Check whether the item can fit on a btree page at all. (Eventually, we
00530      * ought to try to apply TOAST methods if not.) We actually need to be
00531      * able to fit three items on every page, so restrict any one item to 1/3
00532      * the per-page available space. Note that at this point, itemsz doesn't
00533      * include the ItemId.
00534      *
00535      * NOTE: if you change this, see also the similar code in _bt_buildadd().
00536      */
00537     if (itemsz > BTMaxItemSize(page))
00538         ereport(ERROR,
00539                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
00540             errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
00541                    (unsigned long) itemsz,
00542                    (unsigned long) BTMaxItemSize(page),
00543                    RelationGetRelationName(rel)),
00544         errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
00545                 "Consider a function index of an MD5 hash of the value, "
00546                 "or use full text indexing."),
00547                  errtableconstraint(heapRel,
00548                                     RelationGetRelationName(rel))));
00549 
00550     /*----------
00551      * If we will need to split the page to put the item on this page,
00552      * check whether we can put the tuple somewhere to the right,
00553      * instead.  Keep scanning right until we
00554      *      (a) find a page with enough free space,
00555      *      (b) reach the last page where the tuple can legally go, or
00556      *      (c) get tired of searching.
00557      * (c) is not flippant; it is important because if there are many
00558      * pages' worth of equal keys, it's better to split one of the early
00559      * pages than to scan all the way to the end of the run of equal keys
00560      * on every insert.  We implement "get tired" as a random choice,
00561      * since stopping after scanning a fixed number of pages wouldn't work
00562      * well (we'd never reach the right-hand side of previously split
00563      * pages).  Currently the probability of moving right is set at 0.99,
00564      * which may seem too high to change the behavior much, but it does an
00565      * excellent job of preventing O(N^2) behavior with many equal keys.
00566      *----------
00567      */
00568     movedright = false;
00569     vacuumed = false;
00570     while (PageGetFreeSpace(page) < itemsz)
00571     {
00572         Buffer      rbuf;
00573 
00574         /*
00575          * before considering moving right, see if we can obtain enough space
00576          * by erasing LP_DEAD items
00577          */
00578         if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
00579         {
00580             _bt_vacuum_one_page(rel, buf, heapRel);
00581 
00582             /*
00583              * remember that we vacuumed this page, because that makes the
00584              * hint supplied by the caller invalid
00585              */
00586             vacuumed = true;
00587 
00588             if (PageGetFreeSpace(page) >= itemsz)
00589                 break;          /* OK, now we have enough space */
00590         }
00591 
00592         /*
00593          * nope, so check conditions (b) and (c) enumerated above
00594          */
00595         if (P_RIGHTMOST(lpageop) ||
00596             _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
00597             random() <= (MAX_RANDOM_VALUE / 100))
00598             break;
00599 
00600         /*
00601          * step right to next non-dead page
00602          *
00603          * must write-lock that page before releasing write lock on current
00604          * page; else someone else's _bt_check_unique scan could fail to see
00605          * our insertion.  write locks on intermediate dead pages won't do
00606          * because we don't know when they will get de-linked from the tree.
00607          */
00608         rbuf = InvalidBuffer;
00609 
00610         for (;;)
00611         {
00612             BlockNumber rblkno = lpageop->btpo_next;
00613 
00614             rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
00615             page = BufferGetPage(rbuf);
00616             lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
00617             if (!P_IGNORE(lpageop))
00618                 break;
00619             if (P_RIGHTMOST(lpageop))
00620                 elog(ERROR, "fell off the end of index \"%s\"",
00621                      RelationGetRelationName(rel));
00622         }
00623         _bt_relbuf(rel, buf);
00624         buf = rbuf;
00625         movedright = true;
00626         vacuumed = false;
00627     }
00628 
00629     /*
00630      * Now we are on the right page, so find the insert position. If we moved
00631      * right at all, we know we should insert at the start of the page. If we
00632      * didn't move right, we can use the firstlegaloff hint if the caller
00633      * supplied one, unless we vacuumed the page which might have moved tuples
00634      * around making the hint invalid. If we didn't move right or can't use
00635      * the hint, find the position by searching.
00636      */
00637     if (movedright)
00638         newitemoff = P_FIRSTDATAKEY(lpageop);
00639     else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
00640         newitemoff = firstlegaloff;
00641     else
00642         newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
00643 
00644     *bufptr = buf;
00645     *offsetptr = newitemoff;
00646 }
00647 
00648 /*----------
00649  *  _bt_insertonpg() -- Insert a tuple on a particular page in the index.
00650  *
00651  *      This recursive procedure does the following things:
00652  *
00653  *          +  if necessary, splits the target page (making sure that the
00654  *             split is equitable as far as post-insert free space goes).
00655  *          +  inserts the tuple.
00656  *          +  if the page was split, pops the parent stack, and finds the
00657  *             right place to insert the new child pointer (by walking
00658  *             right using information stored in the parent stack).
00659  *          +  invokes itself with the appropriate tuple for the right
00660  *             child page on the parent.
00661  *          +  updates the metapage if a true root or fast root is split.
00662  *
00663  *      On entry, we must have the right buffer in which to do the
00664  *      insertion, and the buffer must be pinned and write-locked.  On return,
00665  *      we will have dropped both the pin and the lock on the buffer.
00666  *
00667  *      The locking interactions in this code are critical.  You should
00668  *      grok Lehman and Yao's paper before making any changes.  In addition,
00669  *      you need to understand how we disambiguate duplicate keys in this
00670  *      implementation, in order to be able to find our location using
00671  *      L&Y "move right" operations.  Since we may insert duplicate user
00672  *      keys, and since these dups may propagate up the tree, we use the
00673  *      'afteritem' parameter to position ourselves correctly for the
00674  *      insertion on internal pages.
00675  *----------
00676  */
00677 static void
00678 _bt_insertonpg(Relation rel,
00679                Buffer buf,
00680                BTStack stack,
00681                IndexTuple itup,
00682                OffsetNumber newitemoff,
00683                bool split_only_page)
00684 {
00685     Page        page;
00686     BTPageOpaque lpageop;
00687     OffsetNumber firstright = InvalidOffsetNumber;
00688     Size        itemsz;
00689 
00690     page = BufferGetPage(buf);
00691     lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
00692 
00693     itemsz = IndexTupleDSize(*itup);
00694     itemsz = MAXALIGN(itemsz);  /* be safe, PageAddItem will do this but we
00695                                  * need to be consistent */
00696 
00697     /*
00698      * Do we need to split the page to fit the item on it?
00699      *
00700      * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
00701      * so this comparison is correct even though we appear to be accounting
00702      * only for the item and not for its line pointer.
00703      */
00704     if (PageGetFreeSpace(page) < itemsz)
00705     {
00706         bool        is_root = P_ISROOT(lpageop);
00707         bool        is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
00708         bool        newitemonleft;
00709         Buffer      rbuf;
00710 
00711         /* Choose the split point */
00712         firstright = _bt_findsplitloc(rel, page,
00713                                       newitemoff, itemsz,
00714                                       &newitemonleft);
00715 
00716         /* split the buffer into left and right halves */
00717         rbuf = _bt_split(rel, buf, firstright,
00718                          newitemoff, itemsz, itup, newitemonleft);
00719         PredicateLockPageSplit(rel,
00720                                BufferGetBlockNumber(buf),
00721                                BufferGetBlockNumber(rbuf));
00722 
00723         /*----------
00724          * By here,
00725          *
00726          *      +  our target page has been split;
00727          *      +  the original tuple has been inserted;
00728          *      +  we have write locks on both the old (left half)
00729          *         and new (right half) buffers, after the split; and
00730          *      +  we know the key we want to insert into the parent
00731          *         (it's the "high key" on the left child page).
00732          *
00733          * We're ready to do the parent insertion.  We need to hold onto the
00734          * locks for the child pages until we locate the parent, but we can
00735          * release them before doing the actual insertion (see Lehman and Yao
00736          * for the reasoning).
00737          *----------
00738          */
00739         _bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
00740     }
00741     else
00742     {
00743         Buffer      metabuf = InvalidBuffer;
00744         Page        metapg = NULL;
00745         BTMetaPageData *metad = NULL;
00746         OffsetNumber itup_off;
00747         BlockNumber itup_blkno;
00748 
00749         itup_off = newitemoff;
00750         itup_blkno = BufferGetBlockNumber(buf);
00751 
00752         /*
00753          * If we are doing this insert because we split a page that was the
00754          * only one on its tree level, but was not the root, it may have been
00755          * the "fast root".  We need to ensure that the fast root link points
00756          * at or above the current page.  We can safely acquire a lock on the
00757          * metapage here --- see comments for _bt_newroot().
00758          */
00759         if (split_only_page)
00760         {
00761             Assert(!P_ISLEAF(lpageop));
00762 
00763             metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
00764             metapg = BufferGetPage(metabuf);
00765             metad = BTPageGetMeta(metapg);
00766 
00767             if (metad->btm_fastlevel >= lpageop->btpo.level)
00768             {
00769                 /* no update wanted */
00770                 _bt_relbuf(rel, metabuf);
00771                 metabuf = InvalidBuffer;
00772             }
00773         }
00774 
00775         /* Do the update.  No ereport(ERROR) until changes are logged */
00776         START_CRIT_SECTION();
00777 
00778         if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
00779             elog(PANIC, "failed to add new item to block %u in index \"%s\"",
00780                  itup_blkno, RelationGetRelationName(rel));
00781 
00782         MarkBufferDirty(buf);
00783 
00784         if (BufferIsValid(metabuf))
00785         {
00786             metad->btm_fastroot = itup_blkno;
00787             metad->btm_fastlevel = lpageop->btpo.level;
00788             MarkBufferDirty(metabuf);
00789         }
00790 
00791         /* XLOG stuff */
00792         if (RelationNeedsWAL(rel))
00793         {
00794             xl_btree_insert xlrec;
00795             BlockNumber xldownlink;
00796             xl_btree_metadata xlmeta;
00797             uint8       xlinfo;
00798             XLogRecPtr  recptr;
00799             XLogRecData rdata[4];
00800             XLogRecData *nextrdata;
00801             IndexTupleData trunctuple;
00802 
00803             xlrec.target.node = rel->rd_node;
00804             ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);
00805 
00806             rdata[0].data = (char *) &xlrec;
00807             rdata[0].len = SizeOfBtreeInsert;
00808             rdata[0].buffer = InvalidBuffer;
00809             rdata[0].next = nextrdata = &(rdata[1]);
00810 
00811             if (P_ISLEAF(lpageop))
00812                 xlinfo = XLOG_BTREE_INSERT_LEAF;
00813             else
00814             {
00815                 xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
00816                 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
00817 
00818                 nextrdata->data = (char *) &xldownlink;
00819                 nextrdata->len = sizeof(BlockNumber);
00820                 nextrdata->buffer = InvalidBuffer;
00821                 nextrdata->next = nextrdata + 1;
00822                 nextrdata++;
00823 
00824                 xlinfo = XLOG_BTREE_INSERT_UPPER;
00825             }
00826 
00827             if (BufferIsValid(metabuf))
00828             {
00829                 xlmeta.root = metad->btm_root;
00830                 xlmeta.level = metad->btm_level;
00831                 xlmeta.fastroot = metad->btm_fastroot;
00832                 xlmeta.fastlevel = metad->btm_fastlevel;
00833 
00834                 nextrdata->data = (char *) &xlmeta;
00835                 nextrdata->len = sizeof(xl_btree_metadata);
00836                 nextrdata->buffer = InvalidBuffer;
00837                 nextrdata->next = nextrdata + 1;
00838                 nextrdata++;
00839 
00840                 xlinfo = XLOG_BTREE_INSERT_META;
00841             }
00842 
00843             /* Read comments in _bt_pgaddtup */
00844             if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
00845             {
00846                 trunctuple = *itup;
00847                 trunctuple.t_info = sizeof(IndexTupleData);
00848                 nextrdata->data = (char *) &trunctuple;
00849                 nextrdata->len = sizeof(IndexTupleData);
00850             }
00851             else
00852             {
00853                 nextrdata->data = (char *) itup;
00854                 nextrdata->len = IndexTupleDSize(*itup);
00855             }
00856             nextrdata->buffer = buf;
00857             nextrdata->buffer_std = true;
00858             nextrdata->next = NULL;
00859 
00860             recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
00861 
00862             if (BufferIsValid(metabuf))
00863             {
00864                 PageSetLSN(metapg, recptr);
00865             }
00866 
00867             PageSetLSN(page, recptr);
00868         }
00869 
00870         END_CRIT_SECTION();
00871 
00872         /* release buffers; send out relcache inval if metapage changed */
00873         if (BufferIsValid(metabuf))
00874         {
00875             if (!InRecovery)
00876                 CacheInvalidateRelcache(rel);
00877             _bt_relbuf(rel, metabuf);
00878         }
00879 
00880         _bt_relbuf(rel, buf);
00881     }
00882 }
00883 
00884 /*
00885  *  _bt_split() -- split a page in the btree.
00886  *
00887  *      On entry, buf is the page to split, and is pinned and write-locked.
00888  *      firstright is the item index of the first item to be moved to the
00889  *      new right page.  newitemoff etc. tell us about the new item that
00890  *      must be inserted along with the data from the old page.
00891  *
00892  *      Returns the new right sibling of buf, pinned and write-locked.
00893  *      The pin and lock on buf are maintained.
00894  */
00895 static Buffer
00896 _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
00897           OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
00898           bool newitemonleft)
00899 {
00900     Buffer      rbuf;
00901     Page        origpage;
00902     Page        leftpage,
00903                 rightpage;
00904     BlockNumber origpagenumber,
00905                 rightpagenumber;
00906     BTPageOpaque ropaque,
00907                 lopaque,
00908                 oopaque;
00909     Buffer      sbuf = InvalidBuffer;
00910     Page        spage = NULL;
00911     BTPageOpaque sopaque = NULL;
00912     Size        itemsz;
00913     ItemId      itemid;
00914     IndexTuple  item;
00915     OffsetNumber leftoff,
00916                 rightoff;
00917     OffsetNumber maxoff;
00918     OffsetNumber i;
00919     bool        isroot;
00920 
00921     /* Acquire a new page to split into */
00922     rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
00923 
00924     /*
00925      * origpage is the original page to be split.  leftpage is a temporary
00926      * buffer that receives the left-sibling data, which will be copied back
00927      * into origpage on success.  rightpage is the new page that receives the
00928      * right-sibling data.  If we fail before reaching the critical section,
00929      * origpage hasn't been modified and leftpage is only workspace. In
00930      * principle we shouldn't need to worry about rightpage either, because it
00931      * hasn't been linked into the btree page structure; but to avoid leaving
00932      * possibly-confusing junk behind, we are careful to rewrite rightpage as
00933      * zeroes before throwing any error.
00934      */
00935     origpage = BufferGetPage(buf);
00936     leftpage = PageGetTempPage(origpage);
00937     rightpage = BufferGetPage(rbuf);
00938 
00939     origpagenumber = BufferGetBlockNumber(buf);
00940     rightpagenumber = BufferGetBlockNumber(rbuf);
00941 
00942     _bt_pageinit(leftpage, BufferGetPageSize(buf));
00943     /* rightpage was already initialized by _bt_getbuf */
00944 
00945     /*
00946      * Copy the original page's LSN and TLI into leftpage, which will become
00947      * the updated version of the page.  We need this because XLogInsert will
00948      * examine these fields and possibly dump them in a page image.
00949      */
00950     PageSetLSN(leftpage, PageGetLSN(origpage));
00951 
00952     /* init btree private data */
00953     oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
00954     lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
00955     ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
00956 
00957     isroot = P_ISROOT(oopaque);
00958 
00959     /* if we're splitting this page, it won't be the root when we're done */
00960     /* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
00961     lopaque->btpo_flags = oopaque->btpo_flags;
00962     lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
00963     ropaque->btpo_flags = lopaque->btpo_flags;
00964     lopaque->btpo_prev = oopaque->btpo_prev;
00965     lopaque->btpo_next = rightpagenumber;
00966     ropaque->btpo_prev = origpagenumber;
00967     ropaque->btpo_next = oopaque->btpo_next;
00968     lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
00969     /* Since we already have write-lock on both pages, ok to read cycleid */
00970     lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
00971     ropaque->btpo_cycleid = lopaque->btpo_cycleid;
00972 
00973     /*
00974      * If the page we're splitting is not the rightmost page at its level in
00975      * the tree, then the first entry on the page is the high key for the
00976      * page.  We need to copy that to the right half.  Otherwise (meaning the
00977      * rightmost page case), all the items on the right half will be user
00978      * data.
00979      */
00980     rightoff = P_HIKEY;
00981 
00982     if (!P_RIGHTMOST(oopaque))
00983     {
00984         itemid = PageGetItemId(origpage, P_HIKEY);
00985         itemsz = ItemIdGetLength(itemid);
00986         item = (IndexTuple) PageGetItem(origpage, itemid);
00987         if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
00988                         false, false) == InvalidOffsetNumber)
00989         {
00990             memset(rightpage, 0, BufferGetPageSize(rbuf));
00991             elog(ERROR, "failed to add hikey to the right sibling"
00992                  " while splitting block %u of index \"%s\"",
00993                  origpagenumber, RelationGetRelationName(rel));
00994         }
00995         rightoff = OffsetNumberNext(rightoff);
00996     }
00997 
00998     /*
00999      * The "high key" for the new left page will be the first key that's going
01000      * to go into the new right page.  This might be either the existing data
01001      * item at position firstright, or the incoming tuple.
01002      */
01003     leftoff = P_HIKEY;
01004     if (!newitemonleft && newitemoff == firstright)
01005     {
01006         /* incoming tuple will become first on right page */
01007         itemsz = newitemsz;
01008         item = newitem;
01009     }
01010     else
01011     {
01012         /* existing item at firstright will become first on right page */
01013         itemid = PageGetItemId(origpage, firstright);
01014         itemsz = ItemIdGetLength(itemid);
01015         item = (IndexTuple) PageGetItem(origpage, itemid);
01016     }
01017     if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
01018                     false, false) == InvalidOffsetNumber)
01019     {
01020         memset(rightpage, 0, BufferGetPageSize(rbuf));
01021         elog(ERROR, "failed to add hikey to the left sibling"
01022              " while splitting block %u of index \"%s\"",
01023              origpagenumber, RelationGetRelationName(rel));
01024     }
01025     leftoff = OffsetNumberNext(leftoff);
01026 
01027     /*
01028      * Now transfer all the data items to the appropriate page.
01029      *
01030      * Note: we *must* insert at least the right page's items in item-number
01031      * order, for the benefit of _bt_restore_page().
01032      */
01033     maxoff = PageGetMaxOffsetNumber(origpage);
01034 
01035     for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
01036     {
01037         itemid = PageGetItemId(origpage, i);
01038         itemsz = ItemIdGetLength(itemid);
01039         item = (IndexTuple) PageGetItem(origpage, itemid);
01040 
01041         /* does new item belong before this one? */
01042         if (i == newitemoff)
01043         {
01044             if (newitemonleft)
01045             {
01046                 if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
01047                 {
01048                     memset(rightpage, 0, BufferGetPageSize(rbuf));
01049                     elog(ERROR, "failed to add new item to the left sibling"
01050                          " while splitting block %u of index \"%s\"",
01051                          origpagenumber, RelationGetRelationName(rel));
01052                 }
01053                 leftoff = OffsetNumberNext(leftoff);
01054             }
01055             else
01056             {
01057                 if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
01058                 {
01059                     memset(rightpage, 0, BufferGetPageSize(rbuf));
01060                     elog(ERROR, "failed to add new item to the right sibling"
01061                          " while splitting block %u of index \"%s\"",
01062                          origpagenumber, RelationGetRelationName(rel));
01063                 }
01064                 rightoff = OffsetNumberNext(rightoff);
01065             }
01066         }
01067 
01068         /* decide which page to put it on */
01069         if (i < firstright)
01070         {
01071             if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
01072             {
01073                 memset(rightpage, 0, BufferGetPageSize(rbuf));
01074                 elog(ERROR, "failed to add old item to the left sibling"
01075                      " while splitting block %u of index \"%s\"",
01076                      origpagenumber, RelationGetRelationName(rel));
01077             }
01078             leftoff = OffsetNumberNext(leftoff);
01079         }
01080         else
01081         {
01082             if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
01083             {
01084                 memset(rightpage, 0, BufferGetPageSize(rbuf));
01085                 elog(ERROR, "failed to add old item to the right sibling"
01086                      " while splitting block %u of index \"%s\"",
01087                      origpagenumber, RelationGetRelationName(rel));
01088             }
01089             rightoff = OffsetNumberNext(rightoff);
01090         }
01091     }
01092 
01093     /* cope with possibility that newitem goes at the end */
01094     if (i <= newitemoff)
01095     {
01096         /*
01097          * Can't have newitemonleft here; that would imply we were told to put
01098          * *everything* on the left page, which cannot fit (if it could, we'd
01099          * not be splitting the page).
01100          */
01101         Assert(!newitemonleft);
01102         if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
01103         {
01104             memset(rightpage, 0, BufferGetPageSize(rbuf));
01105             elog(ERROR, "failed to add new item to the right sibling"
01106                  " while splitting block %u of index \"%s\"",
01107                  origpagenumber, RelationGetRelationName(rel));
01108         }
01109         rightoff = OffsetNumberNext(rightoff);
01110     }
01111 
01112     /*
01113      * We have to grab the right sibling (if any) and fix the prev pointer
01114      * there. We are guaranteed that this is deadlock-free since no other
01115      * writer will be holding a lock on that page and trying to move left, and
01116      * all readers release locks on a page before trying to fetch its
01117      * neighbors.
01118      */
01119 
01120     if (!P_RIGHTMOST(oopaque))
01121     {
01122         sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
01123         spage = BufferGetPage(sbuf);
01124         sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
01125         if (sopaque->btpo_prev != origpagenumber)
01126         {
01127             memset(rightpage, 0, BufferGetPageSize(rbuf));
01128             elog(ERROR, "right sibling's left-link doesn't match: "
01129                "block %u links to %u instead of expected %u in index \"%s\"",
01130                  oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
01131                  RelationGetRelationName(rel));
01132         }
01133 
01134         /*
01135          * Check to see if we can set the SPLIT_END flag in the right-hand
01136          * split page; this can save some I/O for vacuum since it need not
01137          * proceed to the right sibling.  We can set the flag if the right
01138          * sibling has a different cycleid: that means it could not be part of
01139          * a group of pages that were all split off from the same ancestor
01140          * page.  If you're confused, imagine that page A splits to A B and
01141          * then again, yielding A C B, while vacuum is in progress.  Tuples
01142          * originally in A could now be in either B or C, hence vacuum must
01143          * examine both pages.  But if D, our right sibling, has a different
01144          * cycleid then it could not contain any tuples that were in A when
01145          * the vacuum started.
01146          */
01147         if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
01148             ropaque->btpo_flags |= BTP_SPLIT_END;
01149     }
01150 
01151     /*
01152      * Right sibling is locked, new siblings are prepared, but original page
01153      * is not updated yet.
01154      *
01155      * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
01156      * not starting the critical section till here because we haven't been
01157      * scribbling on the original page yet; see comments above.
01158      */
01159     START_CRIT_SECTION();
01160 
01161     /*
01162      * By here, the original data page has been split into two new halves, and
01163      * these are correct.  The algorithm requires that the left page never
01164      * move during a split, so we copy the new left page back on top of the
01165      * original.  Note that this is not a waste of time, since we also require
01166      * (in the page management code) that the center of a page always be
01167      * clean, and the most efficient way to guarantee this is just to compact
01168      * the data by reinserting it into a new left page.  (XXX the latter
01169      * comment is probably obsolete; but in any case it's good to not scribble
01170      * on the original page until we enter the critical section.)
01171      *
01172      * We need to do this before writing the WAL record, so that XLogInsert
01173      * can WAL log an image of the page if necessary.
01174      */
01175     PageRestoreTempPage(leftpage, origpage);
01176     /* leftpage, lopaque must not be used below here */
01177 
01178     MarkBufferDirty(buf);
01179     MarkBufferDirty(rbuf);
01180 
01181     if (!P_RIGHTMOST(ropaque))
01182     {
01183         sopaque->btpo_prev = rightpagenumber;
01184         MarkBufferDirty(sbuf);
01185     }
01186 
01187     /* XLOG stuff */
01188     if (RelationNeedsWAL(rel))
01189     {
01190         xl_btree_split xlrec;
01191         uint8       xlinfo;
01192         XLogRecPtr  recptr;
01193         XLogRecData rdata[7];
01194         XLogRecData *lastrdata;
01195 
01196         xlrec.node = rel->rd_node;
01197         xlrec.leftsib = origpagenumber;
01198         xlrec.rightsib = rightpagenumber;
01199         xlrec.rnext = ropaque->btpo_next;
01200         xlrec.level = ropaque->btpo.level;
01201         xlrec.firstright = firstright;
01202 
01203         rdata[0].data = (char *) &xlrec;
01204         rdata[0].len = SizeOfBtreeSplit;
01205         rdata[0].buffer = InvalidBuffer;
01206 
01207         lastrdata = &rdata[0];
01208 
01209         if (ropaque->btpo.level > 0)
01210         {
01211             /* Log downlink on non-leaf pages */
01212             lastrdata->next = lastrdata + 1;
01213             lastrdata++;
01214 
01215             lastrdata->data = (char *) &newitem->t_tid.ip_blkid;
01216             lastrdata->len = sizeof(BlockIdData);
01217             lastrdata->buffer = InvalidBuffer;
01218 
01219             /*
01220              * We must also log the left page's high key, because the right
01221              * page's leftmost key is suppressed on non-leaf levels.  Show it
01222              * as belonging to the left page buffer, so that it is not stored
01223              * if XLogInsert decides it needs a full-page image of the left
01224              * page.
01225              */
01226             lastrdata->next = lastrdata + 1;
01227             lastrdata++;
01228 
01229             itemid = PageGetItemId(origpage, P_HIKEY);
01230             item = (IndexTuple) PageGetItem(origpage, itemid);
01231             lastrdata->data = (char *) item;
01232             lastrdata->len = MAXALIGN(IndexTupleSize(item));
01233             lastrdata->buffer = buf;    /* backup block 1 */
01234             lastrdata->buffer_std = true;
01235         }
01236 
01237         /*
01238          * Log the new item and its offset, if it was inserted on the left
01239          * page. (If it was put on the right page, we don't need to explicitly
01240          * WAL log it because it's included with all the other items on the
01241          * right page.) Show the new item as belonging to the left page
01242          * buffer, so that it is not stored if XLogInsert decides it needs a
01243          * full-page image of the left page.  We store the offset anyway,
01244          * though, to support archive compression of these records.
01245          */
01246         if (newitemonleft)
01247         {
01248             lastrdata->next = lastrdata + 1;
01249             lastrdata++;
01250 
01251             lastrdata->data = (char *) &newitemoff;
01252             lastrdata->len = sizeof(OffsetNumber);
01253             lastrdata->buffer = InvalidBuffer;
01254 
01255             lastrdata->next = lastrdata + 1;
01256             lastrdata++;
01257 
01258             lastrdata->data = (char *) newitem;
01259             lastrdata->len = MAXALIGN(newitemsz);
01260             lastrdata->buffer = buf;    /* backup block 1 */
01261             lastrdata->buffer_std = true;
01262         }
01263         else if (ropaque->btpo.level == 0)
01264         {
01265             /*
01266              * Although we don't need to WAL-log the new item, we still need
01267              * XLogInsert to consider storing a full-page image of the left
01268              * page, so make an empty entry referencing that buffer. This also
01269              * ensures that the left page is always backup block 1.
01270              */
01271             lastrdata->next = lastrdata + 1;
01272             lastrdata++;
01273 
01274             lastrdata->data = NULL;
01275             lastrdata->len = 0;
01276             lastrdata->buffer = buf;    /* backup block 1 */
01277             lastrdata->buffer_std = true;
01278         }
01279 
01280         /*
01281          * Log the contents of the right page in the format understood by
01282          * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
01283          * because we're going to recreate the whole page anyway, so it should
01284          * never be stored by XLogInsert.
01285          *
01286          * Direct access to page is not good but faster - we should implement
01287          * some new func in page API.  Note we only store the tuples
01288          * themselves, knowing that they were inserted in item-number order
01289          * and so the item pointers can be reconstructed.  See comments for
01290          * _bt_restore_page().
01291          */
01292         lastrdata->next = lastrdata + 1;
01293         lastrdata++;
01294 
01295         lastrdata->data = (char *) rightpage +
01296             ((PageHeader) rightpage)->pd_upper;
01297         lastrdata->len = ((PageHeader) rightpage)->pd_special -
01298             ((PageHeader) rightpage)->pd_upper;
01299         lastrdata->buffer = InvalidBuffer;
01300 
01301         /* Log the right sibling, because we've changed its' prev-pointer. */
01302         if (!P_RIGHTMOST(ropaque))
01303         {
01304             lastrdata->next = lastrdata + 1;
01305             lastrdata++;
01306 
01307             lastrdata->data = NULL;
01308             lastrdata->len = 0;
01309             lastrdata->buffer = sbuf;   /* backup block 2 */
01310             lastrdata->buffer_std = true;
01311         }
01312 
01313         lastrdata->next = NULL;
01314 
01315         if (isroot)
01316             xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
01317         else
01318             xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
01319 
01320         recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
01321 
01322         PageSetLSN(origpage, recptr);
01323         PageSetLSN(rightpage, recptr);
01324         if (!P_RIGHTMOST(ropaque))
01325         {
01326             PageSetLSN(spage, recptr);
01327         }
01328     }
01329 
01330     END_CRIT_SECTION();
01331 
01332     /* release the old right sibling */
01333     if (!P_RIGHTMOST(ropaque))
01334         _bt_relbuf(rel, sbuf);
01335 
01336     /* split's done */
01337     return rbuf;
01338 }
01339 
01340 /*
01341  *  _bt_findsplitloc() -- find an appropriate place to split a page.
01342  *
01343  * The idea here is to equalize the free space that will be on each split
01344  * page, *after accounting for the inserted tuple*.  (If we fail to account
01345  * for it, we might find ourselves with too little room on the page that
01346  * it needs to go into!)
01347  *
01348  * If the page is the rightmost page on its level, we instead try to arrange
01349  * to leave the left split page fillfactor% full.  In this way, when we are
01350  * inserting successively increasing keys (consider sequences, timestamps,
01351  * etc) we will end up with a tree whose pages are about fillfactor% full,
01352  * instead of the 50% full result that we'd get without this special case.
01353  * This is the same as nbtsort.c produces for a newly-created tree.  Note
01354  * that leaf and nonleaf pages use different fillfactors.
01355  *
01356  * We are passed the intended insert position of the new tuple, expressed as
01357  * the offsetnumber of the tuple it must go in front of.  (This could be
01358  * maxoff+1 if the tuple is to go at the end.)
01359  *
01360  * We return the index of the first existing tuple that should go on the
01361  * righthand page, plus a boolean indicating whether the new tuple goes on
01362  * the left or right page.  The bool is necessary to disambiguate the case
01363  * where firstright == newitemoff.
01364  */
01365 static OffsetNumber
01366 _bt_findsplitloc(Relation rel,
01367                  Page page,
01368                  OffsetNumber newitemoff,
01369                  Size newitemsz,
01370                  bool *newitemonleft)
01371 {
01372     BTPageOpaque opaque;
01373     OffsetNumber offnum;
01374     OffsetNumber maxoff;
01375     ItemId      itemid;
01376     FindSplitData state;
01377     int         leftspace,
01378                 rightspace,
01379                 goodenough,
01380                 olddataitemstotal,
01381                 olddataitemstoleft;
01382     bool        goodenoughfound;
01383 
01384     opaque = (BTPageOpaque) PageGetSpecialPointer(page);
01385 
01386     /* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
01387     newitemsz += sizeof(ItemIdData);
01388 
01389     /* Total free space available on a btree page, after fixed overhead */
01390     leftspace = rightspace =
01391         PageGetPageSize(page) - SizeOfPageHeaderData -
01392         MAXALIGN(sizeof(BTPageOpaqueData));
01393 
01394     /* The right page will have the same high key as the old page */
01395     if (!P_RIGHTMOST(opaque))
01396     {
01397         itemid = PageGetItemId(page, P_HIKEY);
01398         rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
01399                              sizeof(ItemIdData));
01400     }
01401 
01402     /* Count up total space in data items without actually scanning 'em */
01403     olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);
01404 
01405     state.newitemsz = newitemsz;
01406     state.is_leaf = P_ISLEAF(opaque);
01407     state.is_rightmost = P_RIGHTMOST(opaque);
01408     state.have_split = false;
01409     if (state.is_leaf)
01410         state.fillfactor = RelationGetFillFactor(rel,
01411                                                  BTREE_DEFAULT_FILLFACTOR);
01412     else
01413         state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
01414     state.newitemonleft = false;    /* these just to keep compiler quiet */
01415     state.firstright = 0;
01416     state.best_delta = 0;
01417     state.leftspace = leftspace;
01418     state.rightspace = rightspace;
01419     state.olddataitemstotal = olddataitemstotal;
01420     state.newitemoff = newitemoff;
01421 
01422     /*
01423      * Finding the best possible split would require checking all the possible
01424      * split points, because of the high-key and left-key special cases.
01425      * That's probably more work than it's worth; instead, stop as soon as we
01426      * find a "good-enough" split, where good-enough is defined as an
01427      * imbalance in free space of no more than pagesize/16 (arbitrary...) This
01428      * should let us stop near the middle on most pages, instead of plowing to
01429      * the end.
01430      */
01431     goodenough = leftspace / 16;
01432 
01433     /*
01434      * Scan through the data items and calculate space usage for a split at
01435      * each possible position.
01436      */
01437     olddataitemstoleft = 0;
01438     goodenoughfound = false;
01439     maxoff = PageGetMaxOffsetNumber(page);
01440 
01441     for (offnum = P_FIRSTDATAKEY(opaque);
01442          offnum <= maxoff;
01443          offnum = OffsetNumberNext(offnum))
01444     {
01445         Size        itemsz;
01446 
01447         itemid = PageGetItemId(page, offnum);
01448         itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
01449 
01450         /*
01451          * Will the new item go to left or right of split?
01452          */
01453         if (offnum > newitemoff)
01454             _bt_checksplitloc(&state, offnum, true,
01455                               olddataitemstoleft, itemsz);
01456 
01457         else if (offnum < newitemoff)
01458             _bt_checksplitloc(&state, offnum, false,
01459                               olddataitemstoleft, itemsz);
01460         else
01461         {
01462             /* need to try it both ways! */
01463             _bt_checksplitloc(&state, offnum, true,
01464                               olddataitemstoleft, itemsz);
01465 
01466             _bt_checksplitloc(&state, offnum, false,
01467                               olddataitemstoleft, itemsz);
01468         }
01469 
01470         /* Abort scan once we find a good-enough choice */
01471         if (state.have_split && state.best_delta <= goodenough)
01472         {
01473             goodenoughfound = true;
01474             break;
01475         }
01476 
01477         olddataitemstoleft += itemsz;
01478     }
01479 
01480     /*
01481      * If the new item goes as the last item, check for splitting so that all
01482      * the old items go to the left page and the new item goes to the right
01483      * page.
01484      */
01485     if (newitemoff > maxoff && !goodenoughfound)
01486         _bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);
01487 
01488     /*
01489      * I believe it is not possible to fail to find a feasible split, but just
01490      * in case ...
01491      */
01492     if (!state.have_split)
01493         elog(ERROR, "could not find a feasible split point for index \"%s\"",
01494              RelationGetRelationName(rel));
01495 
01496     *newitemonleft = state.newitemonleft;
01497     return state.firstright;
01498 }
01499 
01500 /*
01501  * Subroutine to analyze a particular possible split choice (ie, firstright
01502  * and newitemonleft settings), and record the best split so far in *state.
01503  *
01504  * firstoldonright is the offset of the first item on the original page
01505  * that goes to the right page, and firstoldonrightsz is the size of that
01506  * tuple. firstoldonright can be > max offset, which means that all the old
01507  * items go to the left page and only the new item goes to the right page.
01508  * In that case, firstoldonrightsz is not used.
01509  *
01510  * olddataitemstoleft is the total size of all old items to the left of
01511  * firstoldonright.
01512  */
01513 static void
01514 _bt_checksplitloc(FindSplitData *state,
01515                   OffsetNumber firstoldonright,
01516                   bool newitemonleft,
01517                   int olddataitemstoleft,
01518                   Size firstoldonrightsz)
01519 {
01520     int         leftfree,
01521                 rightfree;
01522     Size        firstrightitemsz;
01523     bool        newitemisfirstonright;
01524 
01525     /* Is the new item going to be the first item on the right page? */
01526     newitemisfirstonright = (firstoldonright == state->newitemoff
01527                              && !newitemonleft);
01528 
01529     if (newitemisfirstonright)
01530         firstrightitemsz = state->newitemsz;
01531     else
01532         firstrightitemsz = firstoldonrightsz;
01533 
01534     /* Account for all the old tuples */
01535     leftfree = state->leftspace - olddataitemstoleft;
01536     rightfree = state->rightspace -
01537         (state->olddataitemstotal - olddataitemstoleft);
01538 
01539     /*
01540      * The first item on the right page becomes the high key of the left page;
01541      * therefore it counts against left space as well as right space.
01542      */
01543     leftfree -= firstrightitemsz;
01544 
01545     /* account for the new item */
01546     if (newitemonleft)
01547         leftfree -= (int) state->newitemsz;
01548     else
01549         rightfree -= (int) state->newitemsz;
01550 
01551     /*
01552      * If we are not on the leaf level, we will be able to discard the key
01553      * data from the first item that winds up on the right page.
01554      */
01555     if (!state->is_leaf)
01556         rightfree += (int) firstrightitemsz -
01557             (int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
01558 
01559     /*
01560      * If feasible split point, remember best delta.
01561      */
01562     if (leftfree >= 0 && rightfree >= 0)
01563     {
01564         int         delta;
01565 
01566         if (state->is_rightmost)
01567         {
01568             /*
01569              * If splitting a rightmost page, try to put (100-fillfactor)% of
01570              * free space on left page. See comments for _bt_findsplitloc.
01571              */
01572             delta = (state->fillfactor * leftfree)
01573                 - ((100 - state->fillfactor) * rightfree);
01574         }
01575         else
01576         {
01577             /* Otherwise, aim for equal free space on both sides */
01578             delta = leftfree - rightfree;
01579         }
01580 
01581         if (delta < 0)
01582             delta = -delta;
01583         if (!state->have_split || delta < state->best_delta)
01584         {
01585             state->have_split = true;
01586             state->newitemonleft = newitemonleft;
01587             state->firstright = firstoldonright;
01588             state->best_delta = delta;
01589         }
01590     }
01591 }
01592 
01593 /*
01594  * _bt_insert_parent() -- Insert downlink into parent after a page split.
01595  *
01596  * On entry, buf and rbuf are the left and right split pages, which we
01597  * still hold write locks on per the L&Y algorithm.  We release the
01598  * write locks once we have write lock on the parent page.  (Any sooner,
01599  * and it'd be possible for some other process to try to split or delete
01600  * one of these pages, and get confused because it cannot find the downlink.)
01601  *
01602  * stack - stack showing how we got here.  May be NULL in cases that don't
01603  *          have to be efficient (concurrent ROOT split, WAL recovery)
01604  * is_root - we split the true root
01605  * is_only - we split a page alone on its level (might have been fast root)
01606  *
01607  * This is exported so it can be called by nbtxlog.c.
01608  */
01609 void
01610 _bt_insert_parent(Relation rel,
01611                   Buffer buf,
01612                   Buffer rbuf,
01613                   BTStack stack,
01614                   bool is_root,
01615                   bool is_only)
01616 {
01617     /*
01618      * Here we have to do something Lehman and Yao don't talk about: deal with
01619      * a root split and construction of a new root.  If our stack is empty
01620      * then we have just split a node on what had been the root level when we
01621      * descended the tree.  If it was still the root then we perform a
01622      * new-root construction.  If it *wasn't* the root anymore, search to find
01623      * the next higher level that someone constructed meanwhile, and find the
01624      * right place to insert as for the normal case.
01625      *
01626      * If we have to search for the parent level, we do so by re-descending
01627      * from the root.  This is not super-efficient, but it's rare enough not
01628      * to matter.  (This path is also taken when called from WAL recovery ---
01629      * we have no stack in that case.)
01630      */
01631     if (is_root)
01632     {
01633         Buffer      rootbuf;
01634 
01635         Assert(stack == NULL);
01636         Assert(is_only);
01637         /* create a new root node and update the metapage */
01638         rootbuf = _bt_newroot(rel, buf, rbuf);
01639         /* release the split buffers */
01640         _bt_relbuf(rel, rootbuf);
01641         _bt_relbuf(rel, rbuf);
01642         _bt_relbuf(rel, buf);
01643     }
01644     else
01645     {
01646         BlockNumber bknum = BufferGetBlockNumber(buf);
01647         BlockNumber rbknum = BufferGetBlockNumber(rbuf);
01648         Page        page = BufferGetPage(buf);
01649         IndexTuple  new_item;
01650         BTStackData fakestack;
01651         IndexTuple  ritem;
01652         Buffer      pbuf;
01653 
01654         if (stack == NULL)
01655         {
01656             BTPageOpaque lpageop;
01657 
01658             if (!InRecovery)
01659                 elog(DEBUG2, "concurrent ROOT page split");
01660             lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
01661             /* Find the leftmost page at the next level up */
01662             pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
01663             /* Set up a phony stack entry pointing there */
01664             stack = &fakestack;
01665             stack->bts_blkno = BufferGetBlockNumber(pbuf);
01666             stack->bts_offset = InvalidOffsetNumber;
01667             /* bts_btentry will be initialized below */
01668             stack->bts_parent = NULL;
01669             _bt_relbuf(rel, pbuf);
01670         }
01671 
01672         /* get high key from left page == lowest key on new right page */
01673         ritem = (IndexTuple) PageGetItem(page,
01674                                          PageGetItemId(page, P_HIKEY));
01675 
01676         /* form an index tuple that points at the new right page */
01677         new_item = CopyIndexTuple(ritem);
01678         ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
01679 
01680         /*
01681          * Find the parent buffer and get the parent page.
01682          *
01683          * Oops - if we were moved right then we need to change stack item! We
01684          * want to find parent pointing to where we are, right ?    - vadim
01685          * 05/27/97
01686          */
01687         ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
01688 
01689         pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
01690 
01691         /* Now we can unlock the children */
01692         _bt_relbuf(rel, rbuf);
01693         _bt_relbuf(rel, buf);
01694 
01695         /* Check for error only after writing children */
01696         if (pbuf == InvalidBuffer)
01697             elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
01698                  RelationGetRelationName(rel), bknum, rbknum);
01699 
01700         /* Recursively update the parent */
01701         _bt_insertonpg(rel, pbuf, stack->bts_parent,
01702                        new_item, stack->bts_offset + 1,
01703                        is_only);
01704 
01705         /* be tidy */
01706         pfree(new_item);
01707     }
01708 }
01709 
01710 /*
01711  *  _bt_getstackbuf() -- Walk back up the tree one step, and find the item
01712  *                       we last looked at in the parent.
01713  *
01714  *      This is possible because we save the downlink from the parent item,
01715  *      which is enough to uniquely identify it.  Insertions into the parent
01716  *      level could cause the item to move right; deletions could cause it
01717  *      to move left, but not left of the page we previously found it in.
01718  *
01719  *      Adjusts bts_blkno & bts_offset if changed.
01720  *
01721  *      Returns InvalidBuffer if item not found (should not happen).
01722  */
01723 Buffer
01724 _bt_getstackbuf(Relation rel, BTStack stack, int access)
01725 {
01726     BlockNumber blkno;
01727     OffsetNumber start;
01728 
01729     blkno = stack->bts_blkno;
01730     start = stack->bts_offset;
01731 
01732     for (;;)
01733     {
01734         Buffer      buf;
01735         Page        page;
01736         BTPageOpaque opaque;
01737 
01738         buf = _bt_getbuf(rel, blkno, access);
01739         page = BufferGetPage(buf);
01740         opaque = (BTPageOpaque) PageGetSpecialPointer(page);
01741 
01742         if (!P_IGNORE(opaque))
01743         {
01744             OffsetNumber offnum,
01745                         minoff,
01746                         maxoff;
01747             ItemId      itemid;
01748             IndexTuple  item;
01749 
01750             minoff = P_FIRSTDATAKEY(opaque);
01751             maxoff = PageGetMaxOffsetNumber(page);
01752 
01753             /*
01754              * start = InvalidOffsetNumber means "search the whole page". We
01755              * need this test anyway due to possibility that page has a high
01756              * key now when it didn't before.
01757              */
01758             if (start < minoff)
01759                 start = minoff;
01760 
01761             /*
01762              * Need this check too, to guard against possibility that page
01763              * split since we visited it originally.
01764              */
01765             if (start > maxoff)
01766                 start = OffsetNumberNext(maxoff);
01767 
01768             /*
01769              * These loops will check every item on the page --- but in an
01770              * order that's attuned to the probability of where it actually
01771              * is.  Scan to the right first, then to the left.
01772              */
01773             for (offnum = start;
01774                  offnum <= maxoff;
01775                  offnum = OffsetNumberNext(offnum))
01776             {
01777                 itemid = PageGetItemId(page, offnum);
01778                 item = (IndexTuple) PageGetItem(page, itemid);
01779                 if (BTEntrySame(item, &stack->bts_btentry))
01780                 {
01781                     /* Return accurate pointer to where link is now */
01782                     stack->bts_blkno = blkno;
01783                     stack->bts_offset = offnum;
01784                     return buf;
01785                 }
01786             }
01787 
01788             for (offnum = OffsetNumberPrev(start);
01789                  offnum >= minoff;
01790                  offnum = OffsetNumberPrev(offnum))
01791             {
01792                 itemid = PageGetItemId(page, offnum);
01793                 item = (IndexTuple) PageGetItem(page, itemid);
01794                 if (BTEntrySame(item, &stack->bts_btentry))
01795                 {
01796                     /* Return accurate pointer to where link is now */
01797                     stack->bts_blkno = blkno;
01798                     stack->bts_offset = offnum;
01799                     return buf;
01800                 }
01801             }
01802         }
01803 
01804         /*
01805          * The item we're looking for moved right at least one page.
01806          */
01807         if (P_RIGHTMOST(opaque))
01808         {
01809             _bt_relbuf(rel, buf);
01810             return InvalidBuffer;
01811         }
01812         blkno = opaque->btpo_next;
01813         start = InvalidOffsetNumber;
01814         _bt_relbuf(rel, buf);
01815     }
01816 }
01817 
01818 /*
01819  *  _bt_newroot() -- Create a new root page for the index.
01820  *
01821  *      We've just split the old root page and need to create a new one.
01822  *      In order to do this, we add a new root page to the file, then lock
01823  *      the metadata page and update it.  This is guaranteed to be deadlock-
01824  *      free, because all readers release their locks on the metadata page
01825  *      before trying to lock the root, and all writers lock the root before
01826  *      trying to lock the metadata page.  We have a write lock on the old
01827  *      root page, so we have not introduced any cycles into the waits-for
01828  *      graph.
01829  *
01830  *      On entry, lbuf (the old root) and rbuf (its new peer) are write-
01831  *      locked. On exit, a new root page exists with entries for the
01832  *      two new children, metapage is updated and unlocked/unpinned.
01833  *      The new root buffer is returned to caller which has to unlock/unpin
01834  *      lbuf, rbuf & rootbuf.
01835  */
01836 static Buffer
01837 _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
01838 {
01839     Buffer      rootbuf;
01840     Page        lpage,
01841                 rootpage;
01842     BlockNumber lbkno,
01843                 rbkno;
01844     BlockNumber rootblknum;
01845     BTPageOpaque rootopaque;
01846     ItemId      itemid;
01847     IndexTuple  item;
01848     Size        itemsz;
01849     IndexTuple  new_item;
01850     Buffer      metabuf;
01851     Page        metapg;
01852     BTMetaPageData *metad;
01853 
01854     lbkno = BufferGetBlockNumber(lbuf);
01855     rbkno = BufferGetBlockNumber(rbuf);
01856     lpage = BufferGetPage(lbuf);
01857 
01858     /* get a new root page */
01859     rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
01860     rootpage = BufferGetPage(rootbuf);
01861     rootblknum = BufferGetBlockNumber(rootbuf);
01862 
01863     /* acquire lock on the metapage */
01864     metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
01865     metapg = BufferGetPage(metabuf);
01866     metad = BTPageGetMeta(metapg);
01867 
01868     /* NO EREPORT(ERROR) from here till newroot op is logged */
01869     START_CRIT_SECTION();
01870 
01871     /* set btree special data */
01872     rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
01873     rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
01874     rootopaque->btpo_flags = BTP_ROOT;
01875     rootopaque->btpo.level =
01876         ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
01877     rootopaque->btpo_cycleid = 0;
01878 
01879     /* update metapage data */
01880     metad->btm_root = rootblknum;
01881     metad->btm_level = rootopaque->btpo.level;
01882     metad->btm_fastroot = rootblknum;
01883     metad->btm_fastlevel = rootopaque->btpo.level;
01884 
01885     /*
01886      * Create downlink item for left page (old root).  Since this will be the
01887      * first item in a non-leaf page, it implicitly has minus-infinity key
01888      * value, so we need not store any actual key in it.
01889      */
01890     itemsz = sizeof(IndexTupleData);
01891     new_item = (IndexTuple) palloc(itemsz);
01892     new_item->t_info = itemsz;
01893     ItemPointerSet(&(new_item->t_tid), lbkno, P_HIKEY);
01894 
01895     /*
01896      * Insert the left page pointer into the new root page.  The root page is
01897      * the rightmost page on its level so there is no "high key" in it; the
01898      * two items will go into positions P_HIKEY and P_FIRSTKEY.
01899      *
01900      * Note: we *must* insert the two items in item-number order, for the
01901      * benefit of _bt_restore_page().
01902      */
01903     if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY,
01904                     false, false) == InvalidOffsetNumber)
01905         elog(PANIC, "failed to add leftkey to new root page"
01906              " while splitting block %u of index \"%s\"",
01907              BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
01908     pfree(new_item);
01909 
01910     /*
01911      * Create downlink item for right page.  The key for it is obtained from
01912      * the "high key" position in the left page.
01913      */
01914     itemid = PageGetItemId(lpage, P_HIKEY);
01915     itemsz = ItemIdGetLength(itemid);
01916     item = (IndexTuple) PageGetItem(lpage, itemid);
01917     new_item = CopyIndexTuple(item);
01918     ItemPointerSet(&(new_item->t_tid), rbkno, P_HIKEY);
01919 
01920     /*
01921      * insert the right page pointer into the new root page.
01922      */
01923     if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY,
01924                     false, false) == InvalidOffsetNumber)
01925         elog(PANIC, "failed to add rightkey to new root page"
01926              " while splitting block %u of index \"%s\"",
01927              BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
01928     pfree(new_item);
01929 
01930     MarkBufferDirty(rootbuf);
01931     MarkBufferDirty(metabuf);
01932 
01933     /* XLOG stuff */
01934     if (RelationNeedsWAL(rel))
01935     {
01936         xl_btree_newroot xlrec;
01937         XLogRecPtr  recptr;
01938         XLogRecData rdata[2];
01939 
01940         xlrec.node = rel->rd_node;
01941         xlrec.rootblk = rootblknum;
01942         xlrec.level = metad->btm_level;
01943 
01944         rdata[0].data = (char *) &xlrec;
01945         rdata[0].len = SizeOfBtreeNewroot;
01946         rdata[0].buffer = InvalidBuffer;
01947         rdata[0].next = &(rdata[1]);
01948 
01949         /*
01950          * Direct access to page is not good but faster - we should implement
01951          * some new func in page API.
01952          */
01953         rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
01954         rdata[1].len = ((PageHeader) rootpage)->pd_special -
01955             ((PageHeader) rootpage)->pd_upper;
01956         rdata[1].buffer = InvalidBuffer;
01957         rdata[1].next = NULL;
01958 
01959         recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
01960 
01961         PageSetLSN(rootpage, recptr);
01962         PageSetLSN(metapg, recptr);
01963     }
01964 
01965     END_CRIT_SECTION();
01966 
01967     /* send out relcache inval for metapage change */
01968     if (!InRecovery)
01969         CacheInvalidateRelcache(rel);
01970 
01971     /* done with metapage */
01972     _bt_relbuf(rel, metabuf);
01973 
01974     return rootbuf;
01975 }
01976 
01977 /*
01978  *  _bt_pgaddtup() -- add a tuple to a particular page in the index.
01979  *
01980  *      This routine adds the tuple to the page as requested.  It does
01981  *      not affect pin/lock status, but you'd better have a write lock
01982  *      and pin on the target buffer!  Don't forget to write and release
01983  *      the buffer afterwards, either.
01984  *
01985  *      The main difference between this routine and a bare PageAddItem call
01986  *      is that this code knows that the leftmost index tuple on a non-leaf
01987  *      btree page doesn't need to have a key.  Therefore, it strips such
01988  *      tuples down to just the tuple header.  CAUTION: this works ONLY if
01989  *      we insert the tuples in order, so that the given itup_off does
01990  *      represent the final position of the tuple!
01991  */
01992 static bool
01993 _bt_pgaddtup(Page page,
01994              Size itemsize,
01995              IndexTuple itup,
01996              OffsetNumber itup_off)
01997 {
01998     BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
01999     IndexTupleData trunctuple;
02000 
02001     if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
02002     {
02003         trunctuple = *itup;
02004         trunctuple.t_info = sizeof(IndexTupleData);
02005         itup = &trunctuple;
02006         itemsize = sizeof(IndexTupleData);
02007     }
02008 
02009     if (PageAddItem(page, (Item) itup, itemsize, itup_off,
02010                     false, false) == InvalidOffsetNumber)
02011         return false;
02012 
02013     return true;
02014 }
02015 
02016 /*
02017  * _bt_isequal - used in _bt_doinsert in check for duplicates.
02018  *
02019  * This is very similar to _bt_compare, except for NULL handling.
02020  * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
02021  */
02022 static bool
02023 _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
02024             int keysz, ScanKey scankey)
02025 {
02026     IndexTuple  itup;
02027     int         i;
02028 
02029     /* Better be comparing to a leaf item */
02030     Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
02031 
02032     itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
02033 
02034     for (i = 1; i <= keysz; i++)
02035     {
02036         AttrNumber  attno;
02037         Datum       datum;
02038         bool        isNull;
02039         int32       result;
02040 
02041         attno = scankey->sk_attno;
02042         Assert(attno == i);
02043         datum = index_getattr(itup, attno, itupdesc, &isNull);
02044 
02045         /* NULLs are never equal to anything */
02046         if (isNull || (scankey->sk_flags & SK_ISNULL))
02047             return false;
02048 
02049         result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
02050                                                  scankey->sk_collation,
02051                                                  datum,
02052                                                  scankey->sk_argument));
02053 
02054         if (result != 0)
02055             return false;
02056 
02057         scankey++;
02058     }
02059 
02060     /* if we get here, the keys are equal */
02061     return true;
02062 }
02063 
02064 /*
02065  * _bt_vacuum_one_page - vacuum just one index page.
02066  *
02067  * Try to remove LP_DEAD items from the given page.  The passed buffer
02068  * must be exclusive-locked, but unlike a real VACUUM, we don't need a
02069  * super-exclusive "cleanup" lock (see nbtree/README).
02070  */
02071 static void
02072 _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
02073 {
02074     OffsetNumber deletable[MaxOffsetNumber];
02075     int         ndeletable = 0;
02076     OffsetNumber offnum,
02077                 minoff,
02078                 maxoff;
02079     Page        page = BufferGetPage(buffer);
02080     BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
02081 
02082     /*
02083      * Scan over all items to see which ones need to be deleted according to
02084      * LP_DEAD flags.
02085      */
02086     minoff = P_FIRSTDATAKEY(opaque);
02087     maxoff = PageGetMaxOffsetNumber(page);
02088     for (offnum = minoff;
02089          offnum <= maxoff;
02090          offnum = OffsetNumberNext(offnum))
02091     {
02092         ItemId      itemId = PageGetItemId(page, offnum);
02093 
02094         if (ItemIdIsDead(itemId))
02095             deletable[ndeletable++] = offnum;
02096     }
02097 
02098     if (ndeletable > 0)
02099         _bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
02100 
02101     /*
02102      * Note: if we didn't find any LP_DEAD items, then the page's
02103      * BTP_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
02104      * separate write to clear it, however.  We will clear it when we split
02105      * the page.
02106      */
02107 }