Header And Logo

PostgreSQL
| The world's most advanced open source database.

nbtxlog.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * nbtxlog.c
00004  *    WAL replay logic for btrees.
00005  *
00006  *
00007  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00008  * Portions Copyright (c) 1994, Regents of the University of California
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/access/nbtree/nbtxlog.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 #include "postgres.h"
00016 
00017 #include "access/heapam_xlog.h"
00018 #include "access/nbtree.h"
00019 #include "access/transam.h"
00020 #include "storage/procarray.h"
00021 #include "miscadmin.h"
00022 
00023 /*
00024  * We must keep track of expected insertions due to page splits, and apply
00025  * them manually if they are not seen in the WAL log during replay.  This
00026  * makes it safe for page insertion to be a multiple-WAL-action process.
00027  *
00028  * Similarly, deletion of an only child page and deletion of its parent page
00029  * form multiple WAL log entries, and we have to be prepared to follow through
00030  * with the deletion if the log ends between.
00031  *
00032  * The data structure is a simple linked list --- this should be good enough,
00033  * since we don't expect a page split or multi deletion to remain incomplete
00034  * for long.  In any case we need to respect the order of operations.
00035  */
00036 typedef struct bt_incomplete_action
00037 {
00038     RelFileNode node;           /* the index */
00039     bool        is_split;       /* T = pending split, F = pending delete */
00040     /* these fields are for a split: */
00041     bool        is_root;        /* we split the root */
00042     BlockNumber leftblk;        /* left half of split */
00043     BlockNumber rightblk;       /* right half of split */
00044     /* these fields are for a delete: */
00045     BlockNumber delblk;         /* parent block to be deleted */
00046 } bt_incomplete_action;
00047 
00048 static List *incomplete_actions;
00049 
00050 
00051 static void
00052 log_incomplete_split(RelFileNode node, BlockNumber leftblk,
00053                      BlockNumber rightblk, bool is_root)
00054 {
00055     bt_incomplete_action *action = palloc(sizeof(bt_incomplete_action));
00056 
00057     action->node = node;
00058     action->is_split = true;
00059     action->is_root = is_root;
00060     action->leftblk = leftblk;
00061     action->rightblk = rightblk;
00062     incomplete_actions = lappend(incomplete_actions, action);
00063 }
00064 
00065 static void
00066 forget_matching_split(RelFileNode node, BlockNumber downlink, bool is_root)
00067 {
00068     ListCell   *l;
00069 
00070     foreach(l, incomplete_actions)
00071     {
00072         bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
00073 
00074         if (RelFileNodeEquals(node, action->node) &&
00075             action->is_split &&
00076             downlink == action->rightblk)
00077         {
00078             if (is_root != action->is_root)
00079                 elog(LOG, "forget_matching_split: fishy is_root data (expected %d, got %d)",
00080                      action->is_root, is_root);
00081             incomplete_actions = list_delete_ptr(incomplete_actions, action);
00082             pfree(action);
00083             break;              /* need not look further */
00084         }
00085     }
00086 }
00087 
00088 static void
00089 log_incomplete_deletion(RelFileNode node, BlockNumber delblk)
00090 {
00091     bt_incomplete_action *action = palloc(sizeof(bt_incomplete_action));
00092 
00093     action->node = node;
00094     action->is_split = false;
00095     action->delblk = delblk;
00096     incomplete_actions = lappend(incomplete_actions, action);
00097 }
00098 
00099 static void
00100 forget_matching_deletion(RelFileNode node, BlockNumber delblk)
00101 {
00102     ListCell   *l;
00103 
00104     foreach(l, incomplete_actions)
00105     {
00106         bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
00107 
00108         if (RelFileNodeEquals(node, action->node) &&
00109             !action->is_split &&
00110             delblk == action->delblk)
00111         {
00112             incomplete_actions = list_delete_ptr(incomplete_actions, action);
00113             pfree(action);
00114             break;              /* need not look further */
00115         }
00116     }
00117 }
00118 
00119 /*
00120  * _bt_restore_page -- re-enter all the index tuples on a page
00121  *
00122  * The page is freshly init'd, and *from (length len) is a copy of what
00123  * had been its upper part (pd_upper to pd_special).  We assume that the
00124  * tuples had been added to the page in item-number order, and therefore
00125  * the one with highest item number appears first (lowest on the page).
00126  *
00127  * NOTE: the way this routine is coded, the rebuilt page will have the items
00128  * in correct itemno sequence, but physically the opposite order from the
00129  * original, because we insert them in the opposite of itemno order.  This
00130  * does not matter in any current btree code, but it's something to keep an
00131  * eye on.  Is it worth changing just on general principles?  See also the
00132  * notes in btree_xlog_split().
00133  */
00134 static void
00135 _bt_restore_page(Page page, char *from, int len)
00136 {
00137     IndexTupleData itupdata;
00138     Size        itemsz;
00139     char       *end = from + len;
00140 
00141     for (; from < end;)
00142     {
00143         /* Need to copy tuple header due to alignment considerations */
00144         memcpy(&itupdata, from, sizeof(IndexTupleData));
00145         itemsz = IndexTupleDSize(itupdata);
00146         itemsz = MAXALIGN(itemsz);
00147         if (PageAddItem(page, (Item) from, itemsz, FirstOffsetNumber,
00148                         false, false) == InvalidOffsetNumber)
00149             elog(PANIC, "_bt_restore_page: cannot add item to page");
00150         from += itemsz;
00151     }
00152 }
00153 
00154 static void
00155 _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
00156                  BlockNumber root, uint32 level,
00157                  BlockNumber fastroot, uint32 fastlevel)
00158 {
00159     Buffer      metabuf;
00160     Page        metapg;
00161     BTMetaPageData *md;
00162     BTPageOpaque pageop;
00163 
00164     metabuf = XLogReadBuffer(rnode, BTREE_METAPAGE, true);
00165     Assert(BufferIsValid(metabuf));
00166     metapg = BufferGetPage(metabuf);
00167 
00168     _bt_pageinit(metapg, BufferGetPageSize(metabuf));
00169 
00170     md = BTPageGetMeta(metapg);
00171     md->btm_magic = BTREE_MAGIC;
00172     md->btm_version = BTREE_VERSION;
00173     md->btm_root = root;
00174     md->btm_level = level;
00175     md->btm_fastroot = fastroot;
00176     md->btm_fastlevel = fastlevel;
00177 
00178     pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
00179     pageop->btpo_flags = BTP_META;
00180 
00181     /*
00182      * Set pd_lower just past the end of the metadata.  This is not essential
00183      * but it makes the page look compressible to xlog.c.
00184      */
00185     ((PageHeader) metapg)->pd_lower =
00186         ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
00187 
00188     PageSetLSN(metapg, lsn);
00189     MarkBufferDirty(metabuf);
00190     UnlockReleaseBuffer(metabuf);
00191 }
00192 
00193 static void
00194 btree_xlog_insert(bool isleaf, bool ismeta,
00195                   XLogRecPtr lsn, XLogRecord *record)
00196 {
00197     xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
00198     Buffer      buffer;
00199     Page        page;
00200     char       *datapos;
00201     int         datalen;
00202     xl_btree_metadata md;
00203     BlockNumber downlink = 0;
00204 
00205     datapos = (char *) xlrec + SizeOfBtreeInsert;
00206     datalen = record->xl_len - SizeOfBtreeInsert;
00207     if (!isleaf)
00208     {
00209         memcpy(&downlink, datapos, sizeof(BlockNumber));
00210         datapos += sizeof(BlockNumber);
00211         datalen -= sizeof(BlockNumber);
00212     }
00213     if (ismeta)
00214     {
00215         memcpy(&md, datapos, sizeof(xl_btree_metadata));
00216         datapos += sizeof(xl_btree_metadata);
00217         datalen -= sizeof(xl_btree_metadata);
00218     }
00219 
00220     if (record->xl_info & XLR_BKP_BLOCK(0))
00221         (void) RestoreBackupBlock(lsn, record, 0, false, false);
00222     else
00223     {
00224         buffer = XLogReadBuffer(xlrec->target.node,
00225                              ItemPointerGetBlockNumber(&(xlrec->target.tid)),
00226                                 false);
00227         if (BufferIsValid(buffer))
00228         {
00229             page = (Page) BufferGetPage(buffer);
00230 
00231             if (lsn <= PageGetLSN(page))
00232             {
00233                 UnlockReleaseBuffer(buffer);
00234             }
00235             else
00236             {
00237                 if (PageAddItem(page, (Item) datapos, datalen,
00238                             ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
00239                                 false, false) == InvalidOffsetNumber)
00240                     elog(PANIC, "btree_insert_redo: failed to add item");
00241 
00242                 PageSetLSN(page, lsn);
00243                 MarkBufferDirty(buffer);
00244                 UnlockReleaseBuffer(buffer);
00245             }
00246         }
00247     }
00248 
00249     /*
00250      * Note: in normal operation, we'd update the metapage while still holding
00251      * lock on the page we inserted into.  But during replay it's not
00252      * necessary to hold that lock, since no other index updates can be
00253      * happening concurrently, and readers will cope fine with following an
00254      * obsolete link from the metapage.
00255      */
00256     if (ismeta)
00257         _bt_restore_meta(xlrec->target.node, lsn,
00258                          md.root, md.level,
00259                          md.fastroot, md.fastlevel);
00260 
00261     /* Forget any split this insertion completes */
00262     if (!isleaf)
00263         forget_matching_split(xlrec->target.node, downlink, false);
00264 }
00265 
00266 static void
00267 btree_xlog_split(bool onleft, bool isroot,
00268                  XLogRecPtr lsn, XLogRecord *record)
00269 {
00270     xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
00271     Buffer      rbuf;
00272     Page        rpage;
00273     BTPageOpaque ropaque;
00274     char       *datapos;
00275     int         datalen;
00276     OffsetNumber newitemoff = 0;
00277     Item        newitem = NULL;
00278     Size        newitemsz = 0;
00279     Item        left_hikey = NULL;
00280     Size        left_hikeysz = 0;
00281 
00282     datapos = (char *) xlrec + SizeOfBtreeSplit;
00283     datalen = record->xl_len - SizeOfBtreeSplit;
00284 
00285     /* Forget any split this insertion completes */
00286     if (xlrec->level > 0)
00287     {
00288         /* we assume SizeOfBtreeSplit is at least 16-bit aligned */
00289         BlockNumber downlink = BlockIdGetBlockNumber((BlockId) datapos);
00290 
00291         datapos += sizeof(BlockIdData);
00292         datalen -= sizeof(BlockIdData);
00293 
00294         forget_matching_split(xlrec->node, downlink, false);
00295 
00296         /* Extract left hikey and its size (still assuming 16-bit alignment) */
00297         if (!(record->xl_info & XLR_BKP_BLOCK(0)))
00298         {
00299             /* We assume 16-bit alignment is enough for IndexTupleSize */
00300             left_hikey = (Item) datapos;
00301             left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
00302 
00303             datapos += left_hikeysz;
00304             datalen -= left_hikeysz;
00305         }
00306     }
00307 
00308     /* Extract newitem and newitemoff, if present */
00309     if (onleft)
00310     {
00311         /* Extract the offset (still assuming 16-bit alignment) */
00312         memcpy(&newitemoff, datapos, sizeof(OffsetNumber));
00313         datapos += sizeof(OffsetNumber);
00314         datalen -= sizeof(OffsetNumber);
00315     }
00316 
00317     if (onleft && !(record->xl_info & XLR_BKP_BLOCK(0)))
00318     {
00319         /*
00320          * We assume that 16-bit alignment is enough to apply IndexTupleSize
00321          * (since it's fetching from a uint16 field) and also enough for
00322          * PageAddItem to insert the tuple.
00323          */
00324         newitem = (Item) datapos;
00325         newitemsz = MAXALIGN(IndexTupleSize(newitem));
00326         datapos += newitemsz;
00327         datalen -= newitemsz;
00328     }
00329 
00330     /* Reconstruct right (new) sibling page from scratch */
00331     rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
00332     Assert(BufferIsValid(rbuf));
00333     rpage = (Page) BufferGetPage(rbuf);
00334 
00335     _bt_pageinit(rpage, BufferGetPageSize(rbuf));
00336     ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
00337 
00338     ropaque->btpo_prev = xlrec->leftsib;
00339     ropaque->btpo_next = xlrec->rnext;
00340     ropaque->btpo.level = xlrec->level;
00341     ropaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
00342     ropaque->btpo_cycleid = 0;
00343 
00344     _bt_restore_page(rpage, datapos, datalen);
00345 
00346     /*
00347      * On leaf level, the high key of the left page is equal to the first key
00348      * on the right page.
00349      */
00350     if (xlrec->level == 0)
00351     {
00352         ItemId      hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
00353 
00354         left_hikey = PageGetItem(rpage, hiItemId);
00355         left_hikeysz = ItemIdGetLength(hiItemId);
00356     }
00357 
00358     PageSetLSN(rpage, lsn);
00359     MarkBufferDirty(rbuf);
00360 
00361     /* don't release the buffer yet; we touch right page's first item below */
00362 
00363     /* Now reconstruct left (original) sibling page */
00364     if (record->xl_info & XLR_BKP_BLOCK(0))
00365         (void) RestoreBackupBlock(lsn, record, 0, false, false);
00366     else
00367     {
00368         Buffer      lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
00369 
00370         if (BufferIsValid(lbuf))
00371         {
00372             /*
00373              * Note that this code ensures that the items remaining on the
00374              * left page are in the correct item number order, but it does not
00375              * reproduce the physical order they would have had.  Is this
00376              * worth changing?  See also _bt_restore_page().
00377              */
00378             Page        lpage = (Page) BufferGetPage(lbuf);
00379             BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
00380 
00381             if (lsn > PageGetLSN(lpage))
00382             {
00383                 OffsetNumber off;
00384                 OffsetNumber maxoff = PageGetMaxOffsetNumber(lpage);
00385                 OffsetNumber deletable[MaxOffsetNumber];
00386                 int         ndeletable = 0;
00387 
00388                 /*
00389                  * Remove the items from the left page that were copied to the
00390                  * right page.  Also remove the old high key, if any. (We must
00391                  * remove everything before trying to insert any items, else
00392                  * we risk not having enough space.)
00393                  */
00394                 if (!P_RIGHTMOST(lopaque))
00395                 {
00396                     deletable[ndeletable++] = P_HIKEY;
00397 
00398                     /*
00399                      * newitemoff is given to us relative to the original
00400                      * page's item numbering, so adjust it for this deletion.
00401                      */
00402                     newitemoff--;
00403                 }
00404                 for (off = xlrec->firstright; off <= maxoff; off++)
00405                     deletable[ndeletable++] = off;
00406                 if (ndeletable > 0)
00407                     PageIndexMultiDelete(lpage, deletable, ndeletable);
00408 
00409                 /*
00410                  * Add the new item if it was inserted on left page.
00411                  */
00412                 if (onleft)
00413                 {
00414                     if (PageAddItem(lpage, newitem, newitemsz, newitemoff,
00415                                     false, false) == InvalidOffsetNumber)
00416                         elog(PANIC, "failed to add new item to left page after split");
00417                 }
00418 
00419                 /* Set high key */
00420                 if (PageAddItem(lpage, left_hikey, left_hikeysz,
00421                                 P_HIKEY, false, false) == InvalidOffsetNumber)
00422                     elog(PANIC, "failed to add high key to left page after split");
00423 
00424                 /* Fix opaque fields */
00425                 lopaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
00426                 lopaque->btpo_next = xlrec->rightsib;
00427                 lopaque->btpo_cycleid = 0;
00428 
00429                 PageSetLSN(lpage, lsn);
00430                 MarkBufferDirty(lbuf);
00431             }
00432 
00433             UnlockReleaseBuffer(lbuf);
00434         }
00435     }
00436 
00437     /* We no longer need the right buffer */
00438     UnlockReleaseBuffer(rbuf);
00439 
00440     /*
00441      * Fix left-link of the page to the right of the new right sibling.
00442      *
00443      * Note: in normal operation, we do this while still holding lock on the
00444      * two split pages.  However, that's not necessary for correctness in WAL
00445      * replay, because no other index update can be in progress, and readers
00446      * will cope properly when following an obsolete left-link.
00447      */
00448     if (record->xl_info & XLR_BKP_BLOCK(1))
00449         (void) RestoreBackupBlock(lsn, record, 1, false, false);
00450     else if (xlrec->rnext != P_NONE)
00451     {
00452         Buffer      buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
00453 
00454         if (BufferIsValid(buffer))
00455         {
00456             Page        page = (Page) BufferGetPage(buffer);
00457 
00458             if (lsn > PageGetLSN(page))
00459             {
00460                 BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
00461 
00462                 pageop->btpo_prev = xlrec->rightsib;
00463 
00464                 PageSetLSN(page, lsn);
00465                 MarkBufferDirty(buffer);
00466             }
00467             UnlockReleaseBuffer(buffer);
00468         }
00469     }
00470 
00471     /* The job ain't done till the parent link is inserted... */
00472     log_incomplete_split(xlrec->node,
00473                          xlrec->leftsib, xlrec->rightsib, isroot);
00474 }
00475 
00476 static void
00477 btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
00478 {
00479     xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
00480     Buffer      buffer;
00481     Page        page;
00482     BTPageOpaque opaque;
00483 
00484     /*
00485      * If queries might be active then we need to ensure every block is
00486      * unpinned between the lastBlockVacuumed and the current block, if there
00487      * are any. This ensures that every block in the index is touched during
00488      * VACUUM as required to ensure scans work correctly.
00489      */
00490     if (standbyState == STANDBY_SNAPSHOT_READY &&
00491         (xlrec->lastBlockVacuumed + 1) != xlrec->block)
00492     {
00493         BlockNumber blkno = xlrec->lastBlockVacuumed + 1;
00494 
00495         for (; blkno < xlrec->block; blkno++)
00496         {
00497             /*
00498              * XXX we don't actually need to read the block, we just need to
00499              * confirm it is unpinned. If we had a special call into the
00500              * buffer manager we could optimise this so that if the block is
00501              * not in shared_buffers we confirm it as unpinned.
00502              *
00503              * Another simple optimization would be to check if there's any
00504              * backends running; if not, we could just skip this.
00505              */
00506             buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, blkno, RBM_NORMAL);
00507             if (BufferIsValid(buffer))
00508             {
00509                 LockBufferForCleanup(buffer);
00510                 UnlockReleaseBuffer(buffer);
00511             }
00512         }
00513     }
00514 
00515     /*
00516      * If we have a full-page image, restore it (using a cleanup lock) and
00517      * we're done.
00518      */
00519     if (record->xl_info & XLR_BKP_BLOCK(0))
00520     {
00521         (void) RestoreBackupBlock(lsn, record, 0, true, false);
00522         return;
00523     }
00524 
00525     /*
00526      * Like in btvacuumpage(), we need to take a cleanup lock on every leaf
00527      * page. See nbtree/README for details.
00528      */
00529     buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
00530     if (!BufferIsValid(buffer))
00531         return;
00532     LockBufferForCleanup(buffer);
00533     page = (Page) BufferGetPage(buffer);
00534 
00535     if (lsn <= PageGetLSN(page))
00536     {
00537         UnlockReleaseBuffer(buffer);
00538         return;
00539     }
00540 
00541     if (record->xl_len > SizeOfBtreeVacuum)
00542     {
00543         OffsetNumber *unused;
00544         OffsetNumber *unend;
00545 
00546         unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeVacuum);
00547         unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
00548 
00549         if ((unend - unused) > 0)
00550             PageIndexMultiDelete(page, unused, unend - unused);
00551     }
00552 
00553     /*
00554      * Mark the page as not containing any LP_DEAD items --- see comments in
00555      * _bt_delitems_vacuum().
00556      */
00557     opaque = (BTPageOpaque) PageGetSpecialPointer(page);
00558     opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
00559 
00560     PageSetLSN(page, lsn);
00561     MarkBufferDirty(buffer);
00562     UnlockReleaseBuffer(buffer);
00563 }
00564 
00565 /*
00566  * Get the latestRemovedXid from the heap pages pointed at by the index
00567  * tuples being deleted. This puts the work for calculating latestRemovedXid
00568  * into the recovery path rather than the primary path.
00569  *
00570  * It's possible that this generates a fair amount of I/O, since an index
00571  * block may have hundreds of tuples being deleted. Repeat accesses to the
00572  * same heap blocks are common, though are not yet optimised.
00573  *
00574  * XXX optimise later with something like XLogPrefetchBuffer()
00575  */
00576 static TransactionId
00577 btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
00578 {
00579     OffsetNumber *unused;
00580     Buffer      ibuffer,
00581                 hbuffer;
00582     Page        ipage,
00583                 hpage;
00584     ItemId      iitemid,
00585                 hitemid;
00586     IndexTuple  itup;
00587     HeapTupleHeader htuphdr;
00588     BlockNumber hblkno;
00589     OffsetNumber hoffnum;
00590     TransactionId latestRemovedXid = InvalidTransactionId;
00591     int         i;
00592 
00593     /*
00594      * If there's nothing running on the standby we don't need to derive a
00595      * full latestRemovedXid value, so use a fast path out of here.  This
00596      * returns InvalidTransactionId, and so will conflict with all HS
00597      * transactions; but since we just worked out that that's zero people,
00598      * it's OK.
00599      *
00600      * XXX There is a race condition here, which is that a new backend might
00601      * start just after we look.  If so, it cannot need to conflict, but this
00602      * coding will result in throwing a conflict anyway.
00603      */
00604     if (CountDBBackends(InvalidOid) == 0)
00605         return latestRemovedXid;
00606 
00607     /*
00608      * In what follows, we have to examine the previous state of the index
00609      * page, as well as the heap page(s) it points to.  This is only valid if
00610      * WAL replay has reached a consistent database state; which means that
00611      * the preceding check is not just an optimization, but is *necessary*.
00612      * We won't have let in any user sessions before we reach consistency.
00613      */
00614     if (!reachedConsistency)
00615         elog(PANIC, "btree_xlog_delete_get_latestRemovedXid: cannot operate with inconsistent data");
00616 
00617     /*
00618      * Get index page.  If the DB is consistent, this should not fail, nor
00619      * should any of the heap page fetches below.  If one does, we return
00620      * InvalidTransactionId to cancel all HS transactions.  That's probably
00621      * overkill, but it's safe, and certainly better than panicking here.
00622      */
00623     ibuffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
00624     if (!BufferIsValid(ibuffer))
00625         return InvalidTransactionId;
00626     ipage = (Page) BufferGetPage(ibuffer);
00627 
00628     /*
00629      * Loop through the deleted index items to obtain the TransactionId from
00630      * the heap items they point to.
00631      */
00632     unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
00633 
00634     for (i = 0; i < xlrec->nitems; i++)
00635     {
00636         /*
00637          * Identify the index tuple about to be deleted
00638          */
00639         iitemid = PageGetItemId(ipage, unused[i]);
00640         itup = (IndexTuple) PageGetItem(ipage, iitemid);
00641 
00642         /*
00643          * Locate the heap page that the index tuple points at
00644          */
00645         hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
00646         hbuffer = XLogReadBuffer(xlrec->hnode, hblkno, false);
00647         if (!BufferIsValid(hbuffer))
00648         {
00649             UnlockReleaseBuffer(ibuffer);
00650             return InvalidTransactionId;
00651         }
00652         hpage = (Page) BufferGetPage(hbuffer);
00653 
00654         /*
00655          * Look up the heap tuple header that the index tuple points at by
00656          * using the heap node supplied with the xlrec. We can't use
00657          * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
00658          * Note that we are not looking at tuple data here, just headers.
00659          */
00660         hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
00661         hitemid = PageGetItemId(hpage, hoffnum);
00662 
00663         /*
00664          * Follow any redirections until we find something useful.
00665          */
00666         while (ItemIdIsRedirected(hitemid))
00667         {
00668             hoffnum = ItemIdGetRedirect(hitemid);
00669             hitemid = PageGetItemId(hpage, hoffnum);
00670             CHECK_FOR_INTERRUPTS();
00671         }
00672 
00673         /*
00674          * If the heap item has storage, then read the header and use that to
00675          * set latestRemovedXid.
00676          *
00677          * Some LP_DEAD items may not be accessible, so we ignore them.
00678          */
00679         if (ItemIdHasStorage(hitemid))
00680         {
00681             htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
00682 
00683             HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
00684         }
00685         else if (ItemIdIsDead(hitemid))
00686         {
00687             /*
00688              * Conjecture: if hitemid is dead then it had xids before the xids
00689              * marked on LP_NORMAL items. So we just ignore this item and move
00690              * onto the next, for the purposes of calculating
00691              * latestRemovedxids.
00692              */
00693         }
00694         else
00695             Assert(!ItemIdIsUsed(hitemid));
00696 
00697         UnlockReleaseBuffer(hbuffer);
00698     }
00699 
00700     UnlockReleaseBuffer(ibuffer);
00701 
00702     /*
00703      * XXX If all heap tuples were LP_DEAD then we will be returning
00704      * InvalidTransactionId here, causing conflict for all HS
00705      * transactions. That should happen very rarely (reasoning please?). Also
00706      * note that caller can't tell the difference between this case and the
00707      * fast path exit above. May need to change that in future.
00708      */
00709     return latestRemovedXid;
00710 }
00711 
00712 static void
00713 btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
00714 {
00715     xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
00716     Buffer      buffer;
00717     Page        page;
00718     BTPageOpaque opaque;
00719 
00720     /*
00721      * If we have any conflict processing to do, it must happen before we
00722      * update the page.
00723      *
00724      * Btree delete records can conflict with standby queries.  You might
00725      * think that vacuum records would conflict as well, but we've handled
00726      * that already.  XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
00727      * cleaned by the vacuum of the heap and so we can resolve any conflicts
00728      * just once when that arrives.  After that we know that no conflicts
00729      * exist from individual btree vacuum records on that index.
00730      */
00731     if (InHotStandby)
00732     {
00733         TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(xlrec);
00734 
00735         ResolveRecoveryConflictWithSnapshot(latestRemovedXid, xlrec->node);
00736     }
00737 
00738     /* If we have a full-page image, restore it and we're done */
00739     if (record->xl_info & XLR_BKP_BLOCK(0))
00740     {
00741         (void) RestoreBackupBlock(lsn, record, 0, false, false);
00742         return;
00743     }
00744 
00745     /*
00746      * We don't need to take a cleanup lock to apply these changes. See
00747      * nbtree/README for details.
00748      */
00749     buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
00750     if (!BufferIsValid(buffer))
00751         return;
00752     page = (Page) BufferGetPage(buffer);
00753 
00754     if (lsn <= PageGetLSN(page))
00755     {
00756         UnlockReleaseBuffer(buffer);
00757         return;
00758     }
00759 
00760     if (record->xl_len > SizeOfBtreeDelete)
00761     {
00762         OffsetNumber *unused;
00763 
00764         unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
00765 
00766         PageIndexMultiDelete(page, unused, xlrec->nitems);
00767     }
00768 
00769     /*
00770      * Mark the page as not containing any LP_DEAD items --- see comments in
00771      * _bt_delitems_delete().
00772      */
00773     opaque = (BTPageOpaque) PageGetSpecialPointer(page);
00774     opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
00775 
00776     PageSetLSN(page, lsn);
00777     MarkBufferDirty(buffer);
00778     UnlockReleaseBuffer(buffer);
00779 }
00780 
00781 static void
00782 btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
00783 {
00784     xl_btree_delete_page *xlrec = (xl_btree_delete_page *) XLogRecGetData(record);
00785     BlockNumber parent;
00786     BlockNumber target;
00787     BlockNumber leftsib;
00788     BlockNumber rightsib;
00789     Buffer      buffer;
00790     Page        page;
00791     BTPageOpaque pageop;
00792 
00793     parent = ItemPointerGetBlockNumber(&(xlrec->target.tid));
00794     target = xlrec->deadblk;
00795     leftsib = xlrec->leftblk;
00796     rightsib = xlrec->rightblk;
00797 
00798     /*
00799      * In normal operation, we would lock all the pages this WAL record
00800      * touches before changing any of them.  In WAL replay, it should be okay
00801      * to lock just one page at a time, since no concurrent index updates can
00802      * be happening, and readers should not care whether they arrive at the
00803      * target page or not (since it's surely empty).
00804      */
00805 
00806     /* parent page */
00807     if (record->xl_info & XLR_BKP_BLOCK(0))
00808         (void) RestoreBackupBlock(lsn, record, 0, false, false);
00809     else
00810     {
00811         buffer = XLogReadBuffer(xlrec->target.node, parent, false);
00812         if (BufferIsValid(buffer))
00813         {
00814             page = (Page) BufferGetPage(buffer);
00815             pageop = (BTPageOpaque) PageGetSpecialPointer(page);
00816             if (lsn <= PageGetLSN(page))
00817             {
00818                 UnlockReleaseBuffer(buffer);
00819             }
00820             else
00821             {
00822                 OffsetNumber poffset;
00823 
00824                 poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
00825                 if (poffset >= PageGetMaxOffsetNumber(page))
00826                 {
00827                     Assert(info == XLOG_BTREE_DELETE_PAGE_HALF);
00828                     Assert(poffset == P_FIRSTDATAKEY(pageop));
00829                     PageIndexTupleDelete(page, poffset);
00830                     pageop->btpo_flags |= BTP_HALF_DEAD;
00831                 }
00832                 else
00833                 {
00834                     ItemId      itemid;
00835                     IndexTuple  itup;
00836                     OffsetNumber nextoffset;
00837 
00838                     Assert(info != XLOG_BTREE_DELETE_PAGE_HALF);
00839                     itemid = PageGetItemId(page, poffset);
00840                     itup = (IndexTuple) PageGetItem(page, itemid);
00841                     ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
00842                     nextoffset = OffsetNumberNext(poffset);
00843                     PageIndexTupleDelete(page, nextoffset);
00844                 }
00845 
00846                 PageSetLSN(page, lsn);
00847                 MarkBufferDirty(buffer);
00848                 UnlockReleaseBuffer(buffer);
00849             }
00850         }
00851     }
00852 
00853     /* Fix left-link of right sibling */
00854     if (record->xl_info & XLR_BKP_BLOCK(1))
00855         (void) RestoreBackupBlock(lsn, record, 1, false, false);
00856     else
00857     {
00858         buffer = XLogReadBuffer(xlrec->target.node, rightsib, false);
00859         if (BufferIsValid(buffer))
00860         {
00861             page = (Page) BufferGetPage(buffer);
00862             if (lsn <= PageGetLSN(page))
00863             {
00864                 UnlockReleaseBuffer(buffer);
00865             }
00866             else
00867             {
00868                 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
00869                 pageop->btpo_prev = leftsib;
00870 
00871                 PageSetLSN(page, lsn);
00872                 MarkBufferDirty(buffer);
00873                 UnlockReleaseBuffer(buffer);
00874             }
00875         }
00876     }
00877 
00878     /* Fix right-link of left sibling, if any */
00879     if (record->xl_info & XLR_BKP_BLOCK(2))
00880         (void) RestoreBackupBlock(lsn, record, 2, false, false);
00881     else
00882     {
00883         if (leftsib != P_NONE)
00884         {
00885             buffer = XLogReadBuffer(xlrec->target.node, leftsib, false);
00886             if (BufferIsValid(buffer))
00887             {
00888                 page = (Page) BufferGetPage(buffer);
00889                 if (lsn <= PageGetLSN(page))
00890                 {
00891                     UnlockReleaseBuffer(buffer);
00892                 }
00893                 else
00894                 {
00895                     pageop = (BTPageOpaque) PageGetSpecialPointer(page);
00896                     pageop->btpo_next = rightsib;
00897 
00898                     PageSetLSN(page, lsn);
00899                     MarkBufferDirty(buffer);
00900                     UnlockReleaseBuffer(buffer);
00901                 }
00902             }
00903         }
00904     }
00905 
00906     /* Rewrite target page as empty deleted page */
00907     buffer = XLogReadBuffer(xlrec->target.node, target, true);
00908     Assert(BufferIsValid(buffer));
00909     page = (Page) BufferGetPage(buffer);
00910 
00911     _bt_pageinit(page, BufferGetPageSize(buffer));
00912     pageop = (BTPageOpaque) PageGetSpecialPointer(page);
00913 
00914     pageop->btpo_prev = leftsib;
00915     pageop->btpo_next = rightsib;
00916     pageop->btpo.xact = xlrec->btpo_xact;
00917     pageop->btpo_flags = BTP_DELETED;
00918     pageop->btpo_cycleid = 0;
00919 
00920     PageSetLSN(page, lsn);
00921     MarkBufferDirty(buffer);
00922     UnlockReleaseBuffer(buffer);
00923 
00924     /* Update metapage if needed */
00925     if (info == XLOG_BTREE_DELETE_PAGE_META)
00926     {
00927         xl_btree_metadata md;
00928 
00929         memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage,
00930                sizeof(xl_btree_metadata));
00931         _bt_restore_meta(xlrec->target.node, lsn,
00932                          md.root, md.level,
00933                          md.fastroot, md.fastlevel);
00934     }
00935 
00936     /* Forget any completed deletion */
00937     forget_matching_deletion(xlrec->target.node, target);
00938 
00939     /* If parent became half-dead, remember it for deletion */
00940     if (info == XLOG_BTREE_DELETE_PAGE_HALF)
00941         log_incomplete_deletion(xlrec->target.node, parent);
00942 }
00943 
00944 static void
00945 btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
00946 {
00947     xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
00948     Buffer      buffer;
00949     Page        page;
00950     BTPageOpaque pageop;
00951     BlockNumber downlink = 0;
00952 
00953     /* Backup blocks are not used in newroot records */
00954     Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00955 
00956     buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
00957     Assert(BufferIsValid(buffer));
00958     page = (Page) BufferGetPage(buffer);
00959 
00960     _bt_pageinit(page, BufferGetPageSize(buffer));
00961     pageop = (BTPageOpaque) PageGetSpecialPointer(page);
00962 
00963     pageop->btpo_flags = BTP_ROOT;
00964     pageop->btpo_prev = pageop->btpo_next = P_NONE;
00965     pageop->btpo.level = xlrec->level;
00966     if (xlrec->level == 0)
00967         pageop->btpo_flags |= BTP_LEAF;
00968     pageop->btpo_cycleid = 0;
00969 
00970     if (record->xl_len > SizeOfBtreeNewroot)
00971     {
00972         IndexTuple  itup;
00973 
00974         _bt_restore_page(page,
00975                          (char *) xlrec + SizeOfBtreeNewroot,
00976                          record->xl_len - SizeOfBtreeNewroot);
00977         /* extract downlink to the right-hand split page */
00978         itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY));
00979         downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
00980         Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
00981     }
00982 
00983     PageSetLSN(page, lsn);
00984     MarkBufferDirty(buffer);
00985     UnlockReleaseBuffer(buffer);
00986 
00987     _bt_restore_meta(xlrec->node, lsn,
00988                      xlrec->rootblk, xlrec->level,
00989                      xlrec->rootblk, xlrec->level);
00990 
00991     /* Check to see if this satisfies any incomplete insertions */
00992     if (record->xl_len > SizeOfBtreeNewroot)
00993         forget_matching_split(xlrec->node, downlink, true);
00994 }
00995 
00996 static void
00997 btree_xlog_reuse_page(XLogRecPtr lsn, XLogRecord *record)
00998 {
00999     xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
01000 
01001     /*
01002      * Btree reuse_page records exist to provide a conflict point when we
01003      * reuse pages in the index via the FSM.  That's all they do though.
01004      *
01005      * latestRemovedXid was the page's btpo.xact.  The btpo.xact <
01006      * RecentGlobalXmin test in _bt_page_recyclable() conceptually mirrors the
01007      * pgxact->xmin > limitXmin test in GetConflictingVirtualXIDs().
01008      * Consequently, one XID value achieves the same exclusion effect on
01009      * master and standby.
01010      */
01011     if (InHotStandby)
01012     {
01013         ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
01014                                             xlrec->node);
01015     }
01016 
01017     /* Backup blocks are not used in reuse_page records */
01018     Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
01019 }
01020 
01021 
01022 void
01023 btree_redo(XLogRecPtr lsn, XLogRecord *record)
01024 {
01025     uint8       info = record->xl_info & ~XLR_INFO_MASK;
01026 
01027     switch (info)
01028     {
01029         case XLOG_BTREE_INSERT_LEAF:
01030             btree_xlog_insert(true, false, lsn, record);
01031             break;
01032         case XLOG_BTREE_INSERT_UPPER:
01033             btree_xlog_insert(false, false, lsn, record);
01034             break;
01035         case XLOG_BTREE_INSERT_META:
01036             btree_xlog_insert(false, true, lsn, record);
01037             break;
01038         case XLOG_BTREE_SPLIT_L:
01039             btree_xlog_split(true, false, lsn, record);
01040             break;
01041         case XLOG_BTREE_SPLIT_R:
01042             btree_xlog_split(false, false, lsn, record);
01043             break;
01044         case XLOG_BTREE_SPLIT_L_ROOT:
01045             btree_xlog_split(true, true, lsn, record);
01046             break;
01047         case XLOG_BTREE_SPLIT_R_ROOT:
01048             btree_xlog_split(false, true, lsn, record);
01049             break;
01050         case XLOG_BTREE_VACUUM:
01051             btree_xlog_vacuum(lsn, record);
01052             break;
01053         case XLOG_BTREE_DELETE:
01054             btree_xlog_delete(lsn, record);
01055             break;
01056         case XLOG_BTREE_DELETE_PAGE:
01057         case XLOG_BTREE_DELETE_PAGE_META:
01058         case XLOG_BTREE_DELETE_PAGE_HALF:
01059             btree_xlog_delete_page(info, lsn, record);
01060             break;
01061         case XLOG_BTREE_NEWROOT:
01062             btree_xlog_newroot(lsn, record);
01063             break;
01064         case XLOG_BTREE_REUSE_PAGE:
01065             btree_xlog_reuse_page(lsn, record);
01066             break;
01067         default:
01068             elog(PANIC, "btree_redo: unknown op code %u", info);
01069     }
01070 }
01071 
01072 void
01073 btree_xlog_startup(void)
01074 {
01075     incomplete_actions = NIL;
01076 }
01077 
01078 void
01079 btree_xlog_cleanup(void)
01080 {
01081     ListCell   *l;
01082 
01083     foreach(l, incomplete_actions)
01084     {
01085         bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
01086 
01087         if (action->is_split)
01088         {
01089             /* finish an incomplete split */
01090             Buffer      lbuf,
01091                         rbuf;
01092             Page        lpage,
01093                         rpage;
01094             BTPageOpaque lpageop,
01095                         rpageop;
01096             bool        is_only;
01097             Relation    reln;
01098 
01099             lbuf = XLogReadBuffer(action->node, action->leftblk, false);
01100             /* failure is impossible because we wrote this page earlier */
01101             if (!BufferIsValid(lbuf))
01102                 elog(PANIC, "btree_xlog_cleanup: left block unfound");
01103             lpage = (Page) BufferGetPage(lbuf);
01104             lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
01105             rbuf = XLogReadBuffer(action->node, action->rightblk, false);
01106             /* failure is impossible because we wrote this page earlier */
01107             if (!BufferIsValid(rbuf))
01108                 elog(PANIC, "btree_xlog_cleanup: right block unfound");
01109             rpage = (Page) BufferGetPage(rbuf);
01110             rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
01111 
01112             /* if the pages are all of their level, it's a only-page split */
01113             is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);
01114 
01115             reln = CreateFakeRelcacheEntry(action->node);
01116             _bt_insert_parent(reln, lbuf, rbuf, NULL,
01117                               action->is_root, is_only);
01118             FreeFakeRelcacheEntry(reln);
01119         }
01120         else
01121         {
01122             /* finish an incomplete deletion (of a half-dead page) */
01123             Buffer      buf;
01124 
01125             buf = XLogReadBuffer(action->node, action->delblk, false);
01126             if (BufferIsValid(buf))
01127             {
01128                 Relation    reln;
01129 
01130                 reln = CreateFakeRelcacheEntry(action->node);
01131                 if (_bt_pagedel(reln, buf, NULL) == 0)
01132                     elog(PANIC, "btree_xlog_cleanup: _bt_pagedel failed");
01133                 FreeFakeRelcacheEntry(reln);
01134             }
01135         }
01136     }
01137     incomplete_actions = NIL;
01138 }
01139 
01140 bool
01141 btree_safe_restartpoint(void)
01142 {
01143     if (incomplete_actions)
01144         return false;
01145     return true;
01146 }