00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "postgres.h"
00016
00017 #include "access/heapam_xlog.h"
00018 #include "access/nbtree.h"
00019 #include "access/transam.h"
00020 #include "storage/procarray.h"
00021 #include "miscadmin.h"
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 typedef struct bt_incomplete_action
00037 {
00038 RelFileNode node;
00039 bool is_split;
00040
00041 bool is_root;
00042 BlockNumber leftblk;
00043 BlockNumber rightblk;
00044
00045 BlockNumber delblk;
00046 } bt_incomplete_action;
00047
00048 static List *incomplete_actions;
00049
00050
00051 static void
00052 log_incomplete_split(RelFileNode node, BlockNumber leftblk,
00053 BlockNumber rightblk, bool is_root)
00054 {
00055 bt_incomplete_action *action = palloc(sizeof(bt_incomplete_action));
00056
00057 action->node = node;
00058 action->is_split = true;
00059 action->is_root = is_root;
00060 action->leftblk = leftblk;
00061 action->rightblk = rightblk;
00062 incomplete_actions = lappend(incomplete_actions, action);
00063 }
00064
00065 static void
00066 forget_matching_split(RelFileNode node, BlockNumber downlink, bool is_root)
00067 {
00068 ListCell *l;
00069
00070 foreach(l, incomplete_actions)
00071 {
00072 bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
00073
00074 if (RelFileNodeEquals(node, action->node) &&
00075 action->is_split &&
00076 downlink == action->rightblk)
00077 {
00078 if (is_root != action->is_root)
00079 elog(LOG, "forget_matching_split: fishy is_root data (expected %d, got %d)",
00080 action->is_root, is_root);
00081 incomplete_actions = list_delete_ptr(incomplete_actions, action);
00082 pfree(action);
00083 break;
00084 }
00085 }
00086 }
00087
00088 static void
00089 log_incomplete_deletion(RelFileNode node, BlockNumber delblk)
00090 {
00091 bt_incomplete_action *action = palloc(sizeof(bt_incomplete_action));
00092
00093 action->node = node;
00094 action->is_split = false;
00095 action->delblk = delblk;
00096 incomplete_actions = lappend(incomplete_actions, action);
00097 }
00098
00099 static void
00100 forget_matching_deletion(RelFileNode node, BlockNumber delblk)
00101 {
00102 ListCell *l;
00103
00104 foreach(l, incomplete_actions)
00105 {
00106 bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
00107
00108 if (RelFileNodeEquals(node, action->node) &&
00109 !action->is_split &&
00110 delblk == action->delblk)
00111 {
00112 incomplete_actions = list_delete_ptr(incomplete_actions, action);
00113 pfree(action);
00114 break;
00115 }
00116 }
00117 }
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134 static void
00135 _bt_restore_page(Page page, char *from, int len)
00136 {
00137 IndexTupleData itupdata;
00138 Size itemsz;
00139 char *end = from + len;
00140
00141 for (; from < end;)
00142 {
00143
00144 memcpy(&itupdata, from, sizeof(IndexTupleData));
00145 itemsz = IndexTupleDSize(itupdata);
00146 itemsz = MAXALIGN(itemsz);
00147 if (PageAddItem(page, (Item) from, itemsz, FirstOffsetNumber,
00148 false, false) == InvalidOffsetNumber)
00149 elog(PANIC, "_bt_restore_page: cannot add item to page");
00150 from += itemsz;
00151 }
00152 }
00153
00154 static void
00155 _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
00156 BlockNumber root, uint32 level,
00157 BlockNumber fastroot, uint32 fastlevel)
00158 {
00159 Buffer metabuf;
00160 Page metapg;
00161 BTMetaPageData *md;
00162 BTPageOpaque pageop;
00163
00164 metabuf = XLogReadBuffer(rnode, BTREE_METAPAGE, true);
00165 Assert(BufferIsValid(metabuf));
00166 metapg = BufferGetPage(metabuf);
00167
00168 _bt_pageinit(metapg, BufferGetPageSize(metabuf));
00169
00170 md = BTPageGetMeta(metapg);
00171 md->btm_magic = BTREE_MAGIC;
00172 md->btm_version = BTREE_VERSION;
00173 md->btm_root = root;
00174 md->btm_level = level;
00175 md->btm_fastroot = fastroot;
00176 md->btm_fastlevel = fastlevel;
00177
00178 pageop = (BTPageOpaque) PageGetSpecialPointer(metapg);
00179 pageop->btpo_flags = BTP_META;
00180
00181
00182
00183
00184
00185 ((PageHeader) metapg)->pd_lower =
00186 ((char *) md + sizeof(BTMetaPageData)) - (char *) metapg;
00187
00188 PageSetLSN(metapg, lsn);
00189 MarkBufferDirty(metabuf);
00190 UnlockReleaseBuffer(metabuf);
00191 }
00192
00193 static void
00194 btree_xlog_insert(bool isleaf, bool ismeta,
00195 XLogRecPtr lsn, XLogRecord *record)
00196 {
00197 xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
00198 Buffer buffer;
00199 Page page;
00200 char *datapos;
00201 int datalen;
00202 xl_btree_metadata md;
00203 BlockNumber downlink = 0;
00204
00205 datapos = (char *) xlrec + SizeOfBtreeInsert;
00206 datalen = record->xl_len - SizeOfBtreeInsert;
00207 if (!isleaf)
00208 {
00209 memcpy(&downlink, datapos, sizeof(BlockNumber));
00210 datapos += sizeof(BlockNumber);
00211 datalen -= sizeof(BlockNumber);
00212 }
00213 if (ismeta)
00214 {
00215 memcpy(&md, datapos, sizeof(xl_btree_metadata));
00216 datapos += sizeof(xl_btree_metadata);
00217 datalen -= sizeof(xl_btree_metadata);
00218 }
00219
00220 if (record->xl_info & XLR_BKP_BLOCK(0))
00221 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00222 else
00223 {
00224 buffer = XLogReadBuffer(xlrec->target.node,
00225 ItemPointerGetBlockNumber(&(xlrec->target.tid)),
00226 false);
00227 if (BufferIsValid(buffer))
00228 {
00229 page = (Page) BufferGetPage(buffer);
00230
00231 if (lsn <= PageGetLSN(page))
00232 {
00233 UnlockReleaseBuffer(buffer);
00234 }
00235 else
00236 {
00237 if (PageAddItem(page, (Item) datapos, datalen,
00238 ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
00239 false, false) == InvalidOffsetNumber)
00240 elog(PANIC, "btree_insert_redo: failed to add item");
00241
00242 PageSetLSN(page, lsn);
00243 MarkBufferDirty(buffer);
00244 UnlockReleaseBuffer(buffer);
00245 }
00246 }
00247 }
00248
00249
00250
00251
00252
00253
00254
00255
00256 if (ismeta)
00257 _bt_restore_meta(xlrec->target.node, lsn,
00258 md.root, md.level,
00259 md.fastroot, md.fastlevel);
00260
00261
00262 if (!isleaf)
00263 forget_matching_split(xlrec->target.node, downlink, false);
00264 }
00265
00266 static void
00267 btree_xlog_split(bool onleft, bool isroot,
00268 XLogRecPtr lsn, XLogRecord *record)
00269 {
00270 xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
00271 Buffer rbuf;
00272 Page rpage;
00273 BTPageOpaque ropaque;
00274 char *datapos;
00275 int datalen;
00276 OffsetNumber newitemoff = 0;
00277 Item newitem = NULL;
00278 Size newitemsz = 0;
00279 Item left_hikey = NULL;
00280 Size left_hikeysz = 0;
00281
00282 datapos = (char *) xlrec + SizeOfBtreeSplit;
00283 datalen = record->xl_len - SizeOfBtreeSplit;
00284
00285
00286 if (xlrec->level > 0)
00287 {
00288
00289 BlockNumber downlink = BlockIdGetBlockNumber((BlockId) datapos);
00290
00291 datapos += sizeof(BlockIdData);
00292 datalen -= sizeof(BlockIdData);
00293
00294 forget_matching_split(xlrec->node, downlink, false);
00295
00296
00297 if (!(record->xl_info & XLR_BKP_BLOCK(0)))
00298 {
00299
00300 left_hikey = (Item) datapos;
00301 left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
00302
00303 datapos += left_hikeysz;
00304 datalen -= left_hikeysz;
00305 }
00306 }
00307
00308
00309 if (onleft)
00310 {
00311
00312 memcpy(&newitemoff, datapos, sizeof(OffsetNumber));
00313 datapos += sizeof(OffsetNumber);
00314 datalen -= sizeof(OffsetNumber);
00315 }
00316
00317 if (onleft && !(record->xl_info & XLR_BKP_BLOCK(0)))
00318 {
00319
00320
00321
00322
00323
00324 newitem = (Item) datapos;
00325 newitemsz = MAXALIGN(IndexTupleSize(newitem));
00326 datapos += newitemsz;
00327 datalen -= newitemsz;
00328 }
00329
00330
00331 rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
00332 Assert(BufferIsValid(rbuf));
00333 rpage = (Page) BufferGetPage(rbuf);
00334
00335 _bt_pageinit(rpage, BufferGetPageSize(rbuf));
00336 ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
00337
00338 ropaque->btpo_prev = xlrec->leftsib;
00339 ropaque->btpo_next = xlrec->rnext;
00340 ropaque->btpo.level = xlrec->level;
00341 ropaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
00342 ropaque->btpo_cycleid = 0;
00343
00344 _bt_restore_page(rpage, datapos, datalen);
00345
00346
00347
00348
00349
00350 if (xlrec->level == 0)
00351 {
00352 ItemId hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
00353
00354 left_hikey = PageGetItem(rpage, hiItemId);
00355 left_hikeysz = ItemIdGetLength(hiItemId);
00356 }
00357
00358 PageSetLSN(rpage, lsn);
00359 MarkBufferDirty(rbuf);
00360
00361
00362
00363
00364 if (record->xl_info & XLR_BKP_BLOCK(0))
00365 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00366 else
00367 {
00368 Buffer lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
00369
00370 if (BufferIsValid(lbuf))
00371 {
00372
00373
00374
00375
00376
00377
00378 Page lpage = (Page) BufferGetPage(lbuf);
00379 BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
00380
00381 if (lsn > PageGetLSN(lpage))
00382 {
00383 OffsetNumber off;
00384 OffsetNumber maxoff = PageGetMaxOffsetNumber(lpage);
00385 OffsetNumber deletable[MaxOffsetNumber];
00386 int ndeletable = 0;
00387
00388
00389
00390
00391
00392
00393
00394 if (!P_RIGHTMOST(lopaque))
00395 {
00396 deletable[ndeletable++] = P_HIKEY;
00397
00398
00399
00400
00401
00402 newitemoff--;
00403 }
00404 for (off = xlrec->firstright; off <= maxoff; off++)
00405 deletable[ndeletable++] = off;
00406 if (ndeletable > 0)
00407 PageIndexMultiDelete(lpage, deletable, ndeletable);
00408
00409
00410
00411
00412 if (onleft)
00413 {
00414 if (PageAddItem(lpage, newitem, newitemsz, newitemoff,
00415 false, false) == InvalidOffsetNumber)
00416 elog(PANIC, "failed to add new item to left page after split");
00417 }
00418
00419
00420 if (PageAddItem(lpage, left_hikey, left_hikeysz,
00421 P_HIKEY, false, false) == InvalidOffsetNumber)
00422 elog(PANIC, "failed to add high key to left page after split");
00423
00424
00425 lopaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
00426 lopaque->btpo_next = xlrec->rightsib;
00427 lopaque->btpo_cycleid = 0;
00428
00429 PageSetLSN(lpage, lsn);
00430 MarkBufferDirty(lbuf);
00431 }
00432
00433 UnlockReleaseBuffer(lbuf);
00434 }
00435 }
00436
00437
00438 UnlockReleaseBuffer(rbuf);
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448 if (record->xl_info & XLR_BKP_BLOCK(1))
00449 (void) RestoreBackupBlock(lsn, record, 1, false, false);
00450 else if (xlrec->rnext != P_NONE)
00451 {
00452 Buffer buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
00453
00454 if (BufferIsValid(buffer))
00455 {
00456 Page page = (Page) BufferGetPage(buffer);
00457
00458 if (lsn > PageGetLSN(page))
00459 {
00460 BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
00461
00462 pageop->btpo_prev = xlrec->rightsib;
00463
00464 PageSetLSN(page, lsn);
00465 MarkBufferDirty(buffer);
00466 }
00467 UnlockReleaseBuffer(buffer);
00468 }
00469 }
00470
00471
00472 log_incomplete_split(xlrec->node,
00473 xlrec->leftsib, xlrec->rightsib, isroot);
00474 }
00475
00476 static void
00477 btree_xlog_vacuum(XLogRecPtr lsn, XLogRecord *record)
00478 {
00479 xl_btree_vacuum *xlrec = (xl_btree_vacuum *) XLogRecGetData(record);
00480 Buffer buffer;
00481 Page page;
00482 BTPageOpaque opaque;
00483
00484
00485
00486
00487
00488
00489
00490 if (standbyState == STANDBY_SNAPSHOT_READY &&
00491 (xlrec->lastBlockVacuumed + 1) != xlrec->block)
00492 {
00493 BlockNumber blkno = xlrec->lastBlockVacuumed + 1;
00494
00495 for (; blkno < xlrec->block; blkno++)
00496 {
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506 buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, blkno, RBM_NORMAL);
00507 if (BufferIsValid(buffer))
00508 {
00509 LockBufferForCleanup(buffer);
00510 UnlockReleaseBuffer(buffer);
00511 }
00512 }
00513 }
00514
00515
00516
00517
00518
00519 if (record->xl_info & XLR_BKP_BLOCK(0))
00520 {
00521 (void) RestoreBackupBlock(lsn, record, 0, true, false);
00522 return;
00523 }
00524
00525
00526
00527
00528
00529 buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
00530 if (!BufferIsValid(buffer))
00531 return;
00532 LockBufferForCleanup(buffer);
00533 page = (Page) BufferGetPage(buffer);
00534
00535 if (lsn <= PageGetLSN(page))
00536 {
00537 UnlockReleaseBuffer(buffer);
00538 return;
00539 }
00540
00541 if (record->xl_len > SizeOfBtreeVacuum)
00542 {
00543 OffsetNumber *unused;
00544 OffsetNumber *unend;
00545
00546 unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeVacuum);
00547 unend = (OffsetNumber *) ((char *) xlrec + record->xl_len);
00548
00549 if ((unend - unused) > 0)
00550 PageIndexMultiDelete(page, unused, unend - unused);
00551 }
00552
00553
00554
00555
00556
00557 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
00558 opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
00559
00560 PageSetLSN(page, lsn);
00561 MarkBufferDirty(buffer);
00562 UnlockReleaseBuffer(buffer);
00563 }
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576 static TransactionId
00577 btree_xlog_delete_get_latestRemovedXid(xl_btree_delete *xlrec)
00578 {
00579 OffsetNumber *unused;
00580 Buffer ibuffer,
00581 hbuffer;
00582 Page ipage,
00583 hpage;
00584 ItemId iitemid,
00585 hitemid;
00586 IndexTuple itup;
00587 HeapTupleHeader htuphdr;
00588 BlockNumber hblkno;
00589 OffsetNumber hoffnum;
00590 TransactionId latestRemovedXid = InvalidTransactionId;
00591 int i;
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604 if (CountDBBackends(InvalidOid) == 0)
00605 return latestRemovedXid;
00606
00607
00608
00609
00610
00611
00612
00613
00614 if (!reachedConsistency)
00615 elog(PANIC, "btree_xlog_delete_get_latestRemovedXid: cannot operate with inconsistent data");
00616
00617
00618
00619
00620
00621
00622
00623 ibuffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
00624 if (!BufferIsValid(ibuffer))
00625 return InvalidTransactionId;
00626 ipage = (Page) BufferGetPage(ibuffer);
00627
00628
00629
00630
00631
00632 unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
00633
00634 for (i = 0; i < xlrec->nitems; i++)
00635 {
00636
00637
00638
00639 iitemid = PageGetItemId(ipage, unused[i]);
00640 itup = (IndexTuple) PageGetItem(ipage, iitemid);
00641
00642
00643
00644
00645 hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
00646 hbuffer = XLogReadBuffer(xlrec->hnode, hblkno, false);
00647 if (!BufferIsValid(hbuffer))
00648 {
00649 UnlockReleaseBuffer(ibuffer);
00650 return InvalidTransactionId;
00651 }
00652 hpage = (Page) BufferGetPage(hbuffer);
00653
00654
00655
00656
00657
00658
00659
00660 hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
00661 hitemid = PageGetItemId(hpage, hoffnum);
00662
00663
00664
00665
00666 while (ItemIdIsRedirected(hitemid))
00667 {
00668 hoffnum = ItemIdGetRedirect(hitemid);
00669 hitemid = PageGetItemId(hpage, hoffnum);
00670 CHECK_FOR_INTERRUPTS();
00671 }
00672
00673
00674
00675
00676
00677
00678
00679 if (ItemIdHasStorage(hitemid))
00680 {
00681 htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
00682
00683 HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
00684 }
00685 else if (ItemIdIsDead(hitemid))
00686 {
00687
00688
00689
00690
00691
00692
00693 }
00694 else
00695 Assert(!ItemIdIsUsed(hitemid));
00696
00697 UnlockReleaseBuffer(hbuffer);
00698 }
00699
00700 UnlockReleaseBuffer(ibuffer);
00701
00702
00703
00704
00705
00706
00707
00708
00709 return latestRemovedXid;
00710 }
00711
00712 static void
00713 btree_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
00714 {
00715 xl_btree_delete *xlrec = (xl_btree_delete *) XLogRecGetData(record);
00716 Buffer buffer;
00717 Page page;
00718 BTPageOpaque opaque;
00719
00720
00721
00722
00723
00724
00725
00726
00727
00728
00729
00730
00731 if (InHotStandby)
00732 {
00733 TransactionId latestRemovedXid = btree_xlog_delete_get_latestRemovedXid(xlrec);
00734
00735 ResolveRecoveryConflictWithSnapshot(latestRemovedXid, xlrec->node);
00736 }
00737
00738
00739 if (record->xl_info & XLR_BKP_BLOCK(0))
00740 {
00741 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00742 return;
00743 }
00744
00745
00746
00747
00748
00749 buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
00750 if (!BufferIsValid(buffer))
00751 return;
00752 page = (Page) BufferGetPage(buffer);
00753
00754 if (lsn <= PageGetLSN(page))
00755 {
00756 UnlockReleaseBuffer(buffer);
00757 return;
00758 }
00759
00760 if (record->xl_len > SizeOfBtreeDelete)
00761 {
00762 OffsetNumber *unused;
00763
00764 unused = (OffsetNumber *) ((char *) xlrec + SizeOfBtreeDelete);
00765
00766 PageIndexMultiDelete(page, unused, xlrec->nitems);
00767 }
00768
00769
00770
00771
00772
00773 opaque = (BTPageOpaque) PageGetSpecialPointer(page);
00774 opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
00775
00776 PageSetLSN(page, lsn);
00777 MarkBufferDirty(buffer);
00778 UnlockReleaseBuffer(buffer);
00779 }
00780
00781 static void
00782 btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
00783 {
00784 xl_btree_delete_page *xlrec = (xl_btree_delete_page *) XLogRecGetData(record);
00785 BlockNumber parent;
00786 BlockNumber target;
00787 BlockNumber leftsib;
00788 BlockNumber rightsib;
00789 Buffer buffer;
00790 Page page;
00791 BTPageOpaque pageop;
00792
00793 parent = ItemPointerGetBlockNumber(&(xlrec->target.tid));
00794 target = xlrec->deadblk;
00795 leftsib = xlrec->leftblk;
00796 rightsib = xlrec->rightblk;
00797
00798
00799
00800
00801
00802
00803
00804
00805
00806
00807 if (record->xl_info & XLR_BKP_BLOCK(0))
00808 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00809 else
00810 {
00811 buffer = XLogReadBuffer(xlrec->target.node, parent, false);
00812 if (BufferIsValid(buffer))
00813 {
00814 page = (Page) BufferGetPage(buffer);
00815 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
00816 if (lsn <= PageGetLSN(page))
00817 {
00818 UnlockReleaseBuffer(buffer);
00819 }
00820 else
00821 {
00822 OffsetNumber poffset;
00823
00824 poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
00825 if (poffset >= PageGetMaxOffsetNumber(page))
00826 {
00827 Assert(info == XLOG_BTREE_DELETE_PAGE_HALF);
00828 Assert(poffset == P_FIRSTDATAKEY(pageop));
00829 PageIndexTupleDelete(page, poffset);
00830 pageop->btpo_flags |= BTP_HALF_DEAD;
00831 }
00832 else
00833 {
00834 ItemId itemid;
00835 IndexTuple itup;
00836 OffsetNumber nextoffset;
00837
00838 Assert(info != XLOG_BTREE_DELETE_PAGE_HALF);
00839 itemid = PageGetItemId(page, poffset);
00840 itup = (IndexTuple) PageGetItem(page, itemid);
00841 ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
00842 nextoffset = OffsetNumberNext(poffset);
00843 PageIndexTupleDelete(page, nextoffset);
00844 }
00845
00846 PageSetLSN(page, lsn);
00847 MarkBufferDirty(buffer);
00848 UnlockReleaseBuffer(buffer);
00849 }
00850 }
00851 }
00852
00853
00854 if (record->xl_info & XLR_BKP_BLOCK(1))
00855 (void) RestoreBackupBlock(lsn, record, 1, false, false);
00856 else
00857 {
00858 buffer = XLogReadBuffer(xlrec->target.node, rightsib, false);
00859 if (BufferIsValid(buffer))
00860 {
00861 page = (Page) BufferGetPage(buffer);
00862 if (lsn <= PageGetLSN(page))
00863 {
00864 UnlockReleaseBuffer(buffer);
00865 }
00866 else
00867 {
00868 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
00869 pageop->btpo_prev = leftsib;
00870
00871 PageSetLSN(page, lsn);
00872 MarkBufferDirty(buffer);
00873 UnlockReleaseBuffer(buffer);
00874 }
00875 }
00876 }
00877
00878
00879 if (record->xl_info & XLR_BKP_BLOCK(2))
00880 (void) RestoreBackupBlock(lsn, record, 2, false, false);
00881 else
00882 {
00883 if (leftsib != P_NONE)
00884 {
00885 buffer = XLogReadBuffer(xlrec->target.node, leftsib, false);
00886 if (BufferIsValid(buffer))
00887 {
00888 page = (Page) BufferGetPage(buffer);
00889 if (lsn <= PageGetLSN(page))
00890 {
00891 UnlockReleaseBuffer(buffer);
00892 }
00893 else
00894 {
00895 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
00896 pageop->btpo_next = rightsib;
00897
00898 PageSetLSN(page, lsn);
00899 MarkBufferDirty(buffer);
00900 UnlockReleaseBuffer(buffer);
00901 }
00902 }
00903 }
00904 }
00905
00906
00907 buffer = XLogReadBuffer(xlrec->target.node, target, true);
00908 Assert(BufferIsValid(buffer));
00909 page = (Page) BufferGetPage(buffer);
00910
00911 _bt_pageinit(page, BufferGetPageSize(buffer));
00912 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
00913
00914 pageop->btpo_prev = leftsib;
00915 pageop->btpo_next = rightsib;
00916 pageop->btpo.xact = xlrec->btpo_xact;
00917 pageop->btpo_flags = BTP_DELETED;
00918 pageop->btpo_cycleid = 0;
00919
00920 PageSetLSN(page, lsn);
00921 MarkBufferDirty(buffer);
00922 UnlockReleaseBuffer(buffer);
00923
00924
00925 if (info == XLOG_BTREE_DELETE_PAGE_META)
00926 {
00927 xl_btree_metadata md;
00928
00929 memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage,
00930 sizeof(xl_btree_metadata));
00931 _bt_restore_meta(xlrec->target.node, lsn,
00932 md.root, md.level,
00933 md.fastroot, md.fastlevel);
00934 }
00935
00936
00937 forget_matching_deletion(xlrec->target.node, target);
00938
00939
00940 if (info == XLOG_BTREE_DELETE_PAGE_HALF)
00941 log_incomplete_deletion(xlrec->target.node, parent);
00942 }
00943
00944 static void
00945 btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
00946 {
00947 xl_btree_newroot *xlrec = (xl_btree_newroot *) XLogRecGetData(record);
00948 Buffer buffer;
00949 Page page;
00950 BTPageOpaque pageop;
00951 BlockNumber downlink = 0;
00952
00953
00954 Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00955
00956 buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
00957 Assert(BufferIsValid(buffer));
00958 page = (Page) BufferGetPage(buffer);
00959
00960 _bt_pageinit(page, BufferGetPageSize(buffer));
00961 pageop = (BTPageOpaque) PageGetSpecialPointer(page);
00962
00963 pageop->btpo_flags = BTP_ROOT;
00964 pageop->btpo_prev = pageop->btpo_next = P_NONE;
00965 pageop->btpo.level = xlrec->level;
00966 if (xlrec->level == 0)
00967 pageop->btpo_flags |= BTP_LEAF;
00968 pageop->btpo_cycleid = 0;
00969
00970 if (record->xl_len > SizeOfBtreeNewroot)
00971 {
00972 IndexTuple itup;
00973
00974 _bt_restore_page(page,
00975 (char *) xlrec + SizeOfBtreeNewroot,
00976 record->xl_len - SizeOfBtreeNewroot);
00977
00978 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY));
00979 downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
00980 Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
00981 }
00982
00983 PageSetLSN(page, lsn);
00984 MarkBufferDirty(buffer);
00985 UnlockReleaseBuffer(buffer);
00986
00987 _bt_restore_meta(xlrec->node, lsn,
00988 xlrec->rootblk, xlrec->level,
00989 xlrec->rootblk, xlrec->level);
00990
00991
00992 if (record->xl_len > SizeOfBtreeNewroot)
00993 forget_matching_split(xlrec->node, downlink, true);
00994 }
00995
00996 static void
00997 btree_xlog_reuse_page(XLogRecPtr lsn, XLogRecord *record)
00998 {
00999 xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *) XLogRecGetData(record);
01000
01001
01002
01003
01004
01005
01006
01007
01008
01009
01010
01011 if (InHotStandby)
01012 {
01013 ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
01014 xlrec->node);
01015 }
01016
01017
01018 Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
01019 }
01020
01021
01022 void
01023 btree_redo(XLogRecPtr lsn, XLogRecord *record)
01024 {
01025 uint8 info = record->xl_info & ~XLR_INFO_MASK;
01026
01027 switch (info)
01028 {
01029 case XLOG_BTREE_INSERT_LEAF:
01030 btree_xlog_insert(true, false, lsn, record);
01031 break;
01032 case XLOG_BTREE_INSERT_UPPER:
01033 btree_xlog_insert(false, false, lsn, record);
01034 break;
01035 case XLOG_BTREE_INSERT_META:
01036 btree_xlog_insert(false, true, lsn, record);
01037 break;
01038 case XLOG_BTREE_SPLIT_L:
01039 btree_xlog_split(true, false, lsn, record);
01040 break;
01041 case XLOG_BTREE_SPLIT_R:
01042 btree_xlog_split(false, false, lsn, record);
01043 break;
01044 case XLOG_BTREE_SPLIT_L_ROOT:
01045 btree_xlog_split(true, true, lsn, record);
01046 break;
01047 case XLOG_BTREE_SPLIT_R_ROOT:
01048 btree_xlog_split(false, true, lsn, record);
01049 break;
01050 case XLOG_BTREE_VACUUM:
01051 btree_xlog_vacuum(lsn, record);
01052 break;
01053 case XLOG_BTREE_DELETE:
01054 btree_xlog_delete(lsn, record);
01055 break;
01056 case XLOG_BTREE_DELETE_PAGE:
01057 case XLOG_BTREE_DELETE_PAGE_META:
01058 case XLOG_BTREE_DELETE_PAGE_HALF:
01059 btree_xlog_delete_page(info, lsn, record);
01060 break;
01061 case XLOG_BTREE_NEWROOT:
01062 btree_xlog_newroot(lsn, record);
01063 break;
01064 case XLOG_BTREE_REUSE_PAGE:
01065 btree_xlog_reuse_page(lsn, record);
01066 break;
01067 default:
01068 elog(PANIC, "btree_redo: unknown op code %u", info);
01069 }
01070 }
01071
01072 void
01073 btree_xlog_startup(void)
01074 {
01075 incomplete_actions = NIL;
01076 }
01077
01078 void
01079 btree_xlog_cleanup(void)
01080 {
01081 ListCell *l;
01082
01083 foreach(l, incomplete_actions)
01084 {
01085 bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
01086
01087 if (action->is_split)
01088 {
01089
01090 Buffer lbuf,
01091 rbuf;
01092 Page lpage,
01093 rpage;
01094 BTPageOpaque lpageop,
01095 rpageop;
01096 bool is_only;
01097 Relation reln;
01098
01099 lbuf = XLogReadBuffer(action->node, action->leftblk, false);
01100
01101 if (!BufferIsValid(lbuf))
01102 elog(PANIC, "btree_xlog_cleanup: left block unfound");
01103 lpage = (Page) BufferGetPage(lbuf);
01104 lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
01105 rbuf = XLogReadBuffer(action->node, action->rightblk, false);
01106
01107 if (!BufferIsValid(rbuf))
01108 elog(PANIC, "btree_xlog_cleanup: right block unfound");
01109 rpage = (Page) BufferGetPage(rbuf);
01110 rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
01111
01112
01113 is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);
01114
01115 reln = CreateFakeRelcacheEntry(action->node);
01116 _bt_insert_parent(reln, lbuf, rbuf, NULL,
01117 action->is_root, is_only);
01118 FreeFakeRelcacheEntry(reln);
01119 }
01120 else
01121 {
01122
01123 Buffer buf;
01124
01125 buf = XLogReadBuffer(action->node, action->delblk, false);
01126 if (BufferIsValid(buf))
01127 {
01128 Relation reln;
01129
01130 reln = CreateFakeRelcacheEntry(action->node);
01131 if (_bt_pagedel(reln, buf, NULL) == 0)
01132 elog(PANIC, "btree_xlog_cleanup: _bt_pagedel failed");
01133 FreeFakeRelcacheEntry(reln);
01134 }
01135 }
01136 }
01137 incomplete_actions = NIL;
01138 }
01139
01140 bool
01141 btree_safe_restartpoint(void)
01142 {
01143 if (incomplete_actions)
01144 return false;
01145 return true;
01146 }