Header And Logo

PostgreSQL
| The world's most advanced open source database.

ginxlog.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * ginxlog.c
00004  *    WAL replay logic for inverted index.
00005  *
00006  *
00007  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00008  * Portions Copyright (c) 1994, Regents of the University of California
00009  *
00010  * IDENTIFICATION
00011  *           src/backend/access/gin/ginxlog.c
00012  *-------------------------------------------------------------------------
00013  */
00014 #include "postgres.h"
00015 
00016 #include "access/gin_private.h"
00017 #include "access/xlogutils.h"
00018 #include "utils/memutils.h"
00019 
00020 static MemoryContext opCtx;     /* working memory for operations */
00021 static MemoryContext topCtx;
00022 
00023 typedef struct ginIncompleteSplit
00024 {
00025     RelFileNode node;
00026     BlockNumber leftBlkno;
00027     BlockNumber rightBlkno;
00028     BlockNumber rootBlkno;
00029 } ginIncompleteSplit;
00030 
00031 static List *incomplete_splits;
00032 
00033 static void
00034 pushIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber rightBlkno, BlockNumber rootBlkno)
00035 {
00036     ginIncompleteSplit *split;
00037 
00038     MemoryContextSwitchTo(topCtx);
00039 
00040     split = palloc(sizeof(ginIncompleteSplit));
00041 
00042     split->node = node;
00043     split->leftBlkno = leftBlkno;
00044     split->rightBlkno = rightBlkno;
00045     split->rootBlkno = rootBlkno;
00046 
00047     incomplete_splits = lappend(incomplete_splits, split);
00048 
00049     MemoryContextSwitchTo(opCtx);
00050 }
00051 
00052 static void
00053 forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno)
00054 {
00055     ListCell   *l;
00056 
00057     foreach(l, incomplete_splits)
00058     {
00059         ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
00060 
00061         if (RelFileNodeEquals(node, split->node) &&
00062             leftBlkno == split->leftBlkno &&
00063             updateBlkno == split->rightBlkno)
00064         {
00065             incomplete_splits = list_delete_ptr(incomplete_splits, split);
00066             pfree(split);
00067             break;
00068         }
00069     }
00070 }
00071 
00072 static void
00073 ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
00074 {
00075     RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
00076     Buffer      RootBuffer,
00077                 MetaBuffer;
00078     Page        page;
00079 
00080     /* Backup blocks are not used in create_index records */
00081     Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00082 
00083     MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
00084     Assert(BufferIsValid(MetaBuffer));
00085     page = (Page) BufferGetPage(MetaBuffer);
00086 
00087     GinInitMetabuffer(MetaBuffer);
00088 
00089     PageSetLSN(page, lsn);
00090     MarkBufferDirty(MetaBuffer);
00091 
00092     RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
00093     Assert(BufferIsValid(RootBuffer));
00094     page = (Page) BufferGetPage(RootBuffer);
00095 
00096     GinInitBuffer(RootBuffer, GIN_LEAF);
00097 
00098     PageSetLSN(page, lsn);
00099     MarkBufferDirty(RootBuffer);
00100 
00101     UnlockReleaseBuffer(RootBuffer);
00102     UnlockReleaseBuffer(MetaBuffer);
00103 }
00104 
00105 static void
00106 ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
00107 {
00108     ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
00109     ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
00110     Buffer      buffer;
00111     Page        page;
00112 
00113     /* Backup blocks are not used in create_ptree records */
00114     Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00115 
00116     buffer = XLogReadBuffer(data->node, data->blkno, true);
00117     Assert(BufferIsValid(buffer));
00118     page = (Page) BufferGetPage(buffer);
00119 
00120     GinInitBuffer(buffer, GIN_DATA | GIN_LEAF);
00121     memcpy(GinDataPageGetData(page), items, sizeof(ItemPointerData) * data->nitem);
00122     GinPageGetOpaque(page)->maxoff = data->nitem;
00123 
00124     PageSetLSN(page, lsn);
00125 
00126     MarkBufferDirty(buffer);
00127     UnlockReleaseBuffer(buffer);
00128 }
00129 
00130 static void
00131 ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
00132 {
00133     ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
00134     Buffer      buffer;
00135     Page        page;
00136 
00137     /* first, forget any incomplete split this insertion completes */
00138     if (data->isData)
00139     {
00140         Assert(data->isDelete == FALSE);
00141         if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
00142         {
00143             PostingItem *pitem;
00144 
00145             pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
00146             forgetIncompleteSplit(data->node,
00147                                   PostingItemGetBlockNumber(pitem),
00148                                   data->updateBlkno);
00149         }
00150 
00151     }
00152     else
00153     {
00154         if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
00155         {
00156             IndexTuple  itup;
00157 
00158             itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
00159             forgetIncompleteSplit(data->node,
00160                                   GinGetDownlink(itup),
00161                                   data->updateBlkno);
00162         }
00163     }
00164 
00165     /* If we have a full-page image, restore it and we're done */
00166     if (record->xl_info & XLR_BKP_BLOCK(0))
00167     {
00168         (void) RestoreBackupBlock(lsn, record, 0, false, false);
00169         return;
00170     }
00171 
00172     buffer = XLogReadBuffer(data->node, data->blkno, false);
00173     if (!BufferIsValid(buffer))
00174         return;                 /* page was deleted, nothing to do */
00175     page = (Page) BufferGetPage(buffer);
00176 
00177     if (lsn > PageGetLSN(page))
00178     {
00179         if (data->isData)
00180         {
00181             Assert(GinPageIsData(page));
00182 
00183             if (data->isLeaf)
00184             {
00185                 OffsetNumber i;
00186                 ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
00187 
00188                 Assert(GinPageIsLeaf(page));
00189                 Assert(data->updateBlkno == InvalidBlockNumber);
00190 
00191                 for (i = 0; i < data->nitem; i++)
00192                     GinDataPageAddItem(page, items + i, data->offset + i);
00193             }
00194             else
00195             {
00196                 PostingItem *pitem;
00197 
00198                 Assert(!GinPageIsLeaf(page));
00199 
00200                 if (data->updateBlkno != InvalidBlockNumber)
00201                 {
00202                     /* update link to right page after split */
00203                     pitem = (PostingItem *) GinDataPageGetItem(page, data->offset);
00204                     PostingItemSetBlockNumber(pitem, data->updateBlkno);
00205                 }
00206 
00207                 pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
00208 
00209                 GinDataPageAddItem(page, pitem, data->offset);
00210             }
00211         }
00212         else
00213         {
00214             IndexTuple  itup;
00215 
00216             Assert(!GinPageIsData(page));
00217 
00218             if (data->updateBlkno != InvalidBlockNumber)
00219             {
00220                 /* update link to right page after split */
00221                 Assert(!GinPageIsLeaf(page));
00222                 Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
00223                 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
00224                 GinSetDownlink(itup, data->updateBlkno);
00225             }
00226 
00227             if (data->isDelete)
00228             {
00229                 Assert(GinPageIsLeaf(page));
00230                 Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
00231                 PageIndexTupleDelete(page, data->offset);
00232             }
00233 
00234             itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
00235 
00236             if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
00237                 elog(ERROR, "failed to add item to index page in %u/%u/%u",
00238                   data->node.spcNode, data->node.dbNode, data->node.relNode);
00239         }
00240 
00241         PageSetLSN(page, lsn);
00242 
00243         MarkBufferDirty(buffer);
00244     }
00245 
00246     UnlockReleaseBuffer(buffer);
00247 }
00248 
00249 static void
00250 ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
00251 {
00252     ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
00253     Buffer      lbuffer,
00254                 rbuffer;
00255     Page        lpage,
00256                 rpage;
00257     uint32      flags = 0;
00258 
00259     if (data->isLeaf)
00260         flags |= GIN_LEAF;
00261     if (data->isData)
00262         flags |= GIN_DATA;
00263 
00264     /* Backup blocks are not used in split records */
00265     Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00266 
00267     lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
00268     Assert(BufferIsValid(lbuffer));
00269     lpage = (Page) BufferGetPage(lbuffer);
00270     GinInitBuffer(lbuffer, flags);
00271 
00272     rbuffer = XLogReadBuffer(data->node, data->rblkno, true);
00273     Assert(BufferIsValid(rbuffer));
00274     rpage = (Page) BufferGetPage(rbuffer);
00275     GinInitBuffer(rbuffer, flags);
00276 
00277     GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber(rbuffer);
00278     GinPageGetOpaque(rpage)->rightlink = data->rrlink;
00279 
00280     if (data->isData)
00281     {
00282         char       *ptr = XLogRecGetData(record) + sizeof(ginxlogSplit);
00283         Size        sizeofitem = GinSizeOfDataPageItem(lpage);
00284         OffsetNumber i;
00285         ItemPointer bound;
00286 
00287         for (i = 0; i < data->separator; i++)
00288         {
00289             GinDataPageAddItem(lpage, ptr, InvalidOffsetNumber);
00290             ptr += sizeofitem;
00291         }
00292 
00293         for (i = data->separator; i < data->nitem; i++)
00294         {
00295             GinDataPageAddItem(rpage, ptr, InvalidOffsetNumber);
00296             ptr += sizeofitem;
00297         }
00298 
00299         /* set up right key */
00300         bound = GinDataPageGetRightBound(lpage);
00301         if (data->isLeaf)
00302             *bound = *(ItemPointerData *) GinDataPageGetItem(lpage, GinPageGetOpaque(lpage)->maxoff);
00303         else
00304             *bound = ((PostingItem *) GinDataPageGetItem(lpage, GinPageGetOpaque(lpage)->maxoff))->key;
00305 
00306         bound = GinDataPageGetRightBound(rpage);
00307         *bound = data->rightbound;
00308     }
00309     else
00310     {
00311         IndexTuple  itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogSplit));
00312         OffsetNumber i;
00313 
00314         for (i = 0; i < data->separator; i++)
00315         {
00316             if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
00317                 elog(ERROR, "failed to add item to index page in %u/%u/%u",
00318                   data->node.spcNode, data->node.dbNode, data->node.relNode);
00319             itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
00320         }
00321 
00322         for (i = data->separator; i < data->nitem; i++)
00323         {
00324             if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
00325                 elog(ERROR, "failed to add item to index page in %u/%u/%u",
00326                   data->node.spcNode, data->node.dbNode, data->node.relNode);
00327             itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
00328         }
00329     }
00330 
00331     PageSetLSN(rpage, lsn);
00332     MarkBufferDirty(rbuffer);
00333 
00334     PageSetLSN(lpage, lsn);
00335     MarkBufferDirty(lbuffer);
00336 
00337     if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
00338         forgetIncompleteSplit(data->node, data->leftChildBlkno, data->updateBlkno);
00339 
00340     if (data->isRootSplit)
00341     {
00342         Buffer      rootBuf = XLogReadBuffer(data->node, data->rootBlkno, true);
00343         Page        rootPage = BufferGetPage(rootBuf);
00344 
00345         GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
00346 
00347         if (data->isData)
00348         {
00349             Assert(data->rootBlkno != GIN_ROOT_BLKNO);
00350             ginDataFillRoot(NULL, rootBuf, lbuffer, rbuffer);
00351         }
00352         else
00353         {
00354             Assert(data->rootBlkno == GIN_ROOT_BLKNO);
00355             ginEntryFillRoot(NULL, rootBuf, lbuffer, rbuffer);
00356         }
00357 
00358         PageSetLSN(rootPage, lsn);
00359 
00360         MarkBufferDirty(rootBuf);
00361         UnlockReleaseBuffer(rootBuf);
00362     }
00363     else
00364         pushIncompleteSplit(data->node, data->lblkno, data->rblkno, data->rootBlkno);
00365 
00366     UnlockReleaseBuffer(rbuffer);
00367     UnlockReleaseBuffer(lbuffer);
00368 }
00369 
00370 static void
00371 ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
00372 {
00373     ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record);
00374     Buffer      buffer;
00375     Page        page;
00376 
00377     /* If we have a full-page image, restore it and we're done */
00378     if (record->xl_info & XLR_BKP_BLOCK(0))
00379     {
00380         (void) RestoreBackupBlock(lsn, record, 0, false, false);
00381         return;
00382     }
00383 
00384     buffer = XLogReadBuffer(data->node, data->blkno, false);
00385     if (!BufferIsValid(buffer))
00386         return;
00387     page = (Page) BufferGetPage(buffer);
00388 
00389     if (lsn > PageGetLSN(page))
00390     {
00391         if (GinPageIsData(page))
00392         {
00393             memcpy(GinDataPageGetData(page),
00394                    XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
00395                    data->nitem * GinSizeOfDataPageItem(page));
00396             GinPageGetOpaque(page)->maxoff = data->nitem;
00397         }
00398         else
00399         {
00400             OffsetNumber i,
00401                        *tod;
00402             IndexTuple  itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
00403 
00404             tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
00405             for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
00406                 tod[i - 1] = i;
00407 
00408             PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
00409 
00410             for (i = 0; i < data->nitem; i++)
00411             {
00412                 if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
00413                     elog(ERROR, "failed to add item to index page in %u/%u/%u",
00414                          data->node.spcNode, data->node.dbNode, data->node.relNode);
00415                 itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
00416             }
00417         }
00418 
00419         PageSetLSN(page, lsn);
00420         MarkBufferDirty(buffer);
00421     }
00422 
00423     UnlockReleaseBuffer(buffer);
00424 }
00425 
00426 static void
00427 ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
00428 {
00429     ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
00430     Buffer      dbuffer;
00431     Buffer      pbuffer;
00432     Buffer      lbuffer;
00433     Page        page;
00434 
00435     if (record->xl_info & XLR_BKP_BLOCK(0))
00436         dbuffer = RestoreBackupBlock(lsn, record, 0, false, true);
00437     else
00438     {
00439         dbuffer = XLogReadBuffer(data->node, data->blkno, false);
00440         if (BufferIsValid(dbuffer))
00441         {
00442             page = BufferGetPage(dbuffer);
00443             if (lsn > PageGetLSN(page))
00444             {
00445                 Assert(GinPageIsData(page));
00446                 GinPageGetOpaque(page)->flags = GIN_DELETED;
00447                 PageSetLSN(page, lsn);
00448                 MarkBufferDirty(dbuffer);
00449             }
00450         }
00451     }
00452 
00453     if (record->xl_info & XLR_BKP_BLOCK(1))
00454         pbuffer = RestoreBackupBlock(lsn, record, 1, false, true);
00455     else
00456     {
00457         pbuffer = XLogReadBuffer(data->node, data->parentBlkno, false);
00458         if (BufferIsValid(pbuffer))
00459         {
00460             page = BufferGetPage(pbuffer);
00461             if (lsn > PageGetLSN(page))
00462             {
00463                 Assert(GinPageIsData(page));
00464                 Assert(!GinPageIsLeaf(page));
00465                 GinPageDeletePostingItem(page, data->parentOffset);
00466                 PageSetLSN(page, lsn);
00467                 MarkBufferDirty(pbuffer);
00468             }
00469         }
00470     }
00471 
00472     if (record->xl_info & XLR_BKP_BLOCK(2))
00473         (void) RestoreBackupBlock(lsn, record, 2, false, false);
00474     else if (data->leftBlkno != InvalidBlockNumber)
00475     {
00476         lbuffer = XLogReadBuffer(data->node, data->leftBlkno, false);
00477         if (BufferIsValid(lbuffer))
00478         {
00479             page = BufferGetPage(lbuffer);
00480             if (lsn > PageGetLSN(page))
00481             {
00482                 Assert(GinPageIsData(page));
00483                 GinPageGetOpaque(page)->rightlink = data->rightLink;
00484                 PageSetLSN(page, lsn);
00485                 MarkBufferDirty(lbuffer);
00486             }
00487             UnlockReleaseBuffer(lbuffer);
00488         }
00489     }
00490 
00491     if (BufferIsValid(pbuffer))
00492         UnlockReleaseBuffer(pbuffer);
00493     if (BufferIsValid(dbuffer))
00494         UnlockReleaseBuffer(dbuffer);
00495 }
00496 
00497 static void
00498 ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
00499 {
00500     ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);
00501     Buffer      metabuffer;
00502     Page        metapage;
00503     Buffer      buffer;
00504 
00505     metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
00506     if (!BufferIsValid(metabuffer))
00507         return;                 /* assume index was deleted, nothing to do */
00508     metapage = BufferGetPage(metabuffer);
00509 
00510     if (lsn > PageGetLSN(metapage))
00511     {
00512         memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
00513         PageSetLSN(metapage, lsn);
00514         MarkBufferDirty(metabuffer);
00515     }
00516 
00517     if (data->ntuples > 0)
00518     {
00519         /*
00520          * insert into tail page
00521          */
00522         if (record->xl_info & XLR_BKP_BLOCK(0))
00523             (void) RestoreBackupBlock(lsn, record, 0, false, false);
00524         else
00525         {
00526             buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
00527             if (BufferIsValid(buffer))
00528             {
00529                 Page        page = BufferGetPage(buffer);
00530 
00531                 if (lsn > PageGetLSN(page))
00532                 {
00533                     OffsetNumber l,
00534                                 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
00535                     OffsetNumberNext(PageGetMaxOffsetNumber(page));
00536                     int         i,
00537                                 tupsize;
00538                     IndexTuple  tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
00539 
00540                     for (i = 0; i < data->ntuples; i++)
00541                     {
00542                         tupsize = IndexTupleSize(tuples);
00543 
00544                         l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
00545 
00546                         if (l == InvalidOffsetNumber)
00547                             elog(ERROR, "failed to add item to index page");
00548 
00549                         tuples = (IndexTuple) (((char *) tuples) + tupsize);
00550 
00551                         off++;
00552                     }
00553 
00554                     /*
00555                      * Increase counter of heap tuples
00556                      */
00557                     GinPageGetOpaque(page)->maxoff++;
00558 
00559                     PageSetLSN(page, lsn);
00560                     MarkBufferDirty(buffer);
00561                 }
00562                 UnlockReleaseBuffer(buffer);
00563             }
00564         }
00565     }
00566     else if (data->prevTail != InvalidBlockNumber)
00567     {
00568         /*
00569          * New tail
00570          */
00571         if (record->xl_info & XLR_BKP_BLOCK(0))
00572             (void) RestoreBackupBlock(lsn, record, 0, false, false);
00573         else
00574         {
00575             buffer = XLogReadBuffer(data->node, data->prevTail, false);
00576             if (BufferIsValid(buffer))
00577             {
00578                 Page        page = BufferGetPage(buffer);
00579 
00580                 if (lsn > PageGetLSN(page))
00581                 {
00582                     GinPageGetOpaque(page)->rightlink = data->newRightlink;
00583 
00584                     PageSetLSN(page, lsn);
00585                     MarkBufferDirty(buffer);
00586                 }
00587                 UnlockReleaseBuffer(buffer);
00588             }
00589         }
00590     }
00591 
00592     UnlockReleaseBuffer(metabuffer);
00593 }
00594 
00595 static void
00596 ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record)
00597 {
00598     ginxlogInsertListPage *data = (ginxlogInsertListPage *) XLogRecGetData(record);
00599     Buffer      buffer;
00600     Page        page;
00601     OffsetNumber l,
00602                 off = FirstOffsetNumber;
00603     int         i,
00604                 tupsize;
00605     IndexTuple  tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage));
00606 
00607     /* If we have a full-page image, restore it and we're done */
00608     if (record->xl_info & XLR_BKP_BLOCK(0))
00609     {
00610         (void) RestoreBackupBlock(lsn, record, 0, false, false);
00611         return;
00612     }
00613 
00614     buffer = XLogReadBuffer(data->node, data->blkno, true);
00615     Assert(BufferIsValid(buffer));
00616     page = BufferGetPage(buffer);
00617 
00618     GinInitBuffer(buffer, GIN_LIST);
00619     GinPageGetOpaque(page)->rightlink = data->rightlink;
00620     if (data->rightlink == InvalidBlockNumber)
00621     {
00622         /* tail of sublist */
00623         GinPageSetFullRow(page);
00624         GinPageGetOpaque(page)->maxoff = 1;
00625     }
00626     else
00627     {
00628         GinPageGetOpaque(page)->maxoff = 0;
00629     }
00630 
00631     for (i = 0; i < data->ntuples; i++)
00632     {
00633         tupsize = IndexTupleSize(tuples);
00634 
00635         l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
00636 
00637         if (l == InvalidOffsetNumber)
00638             elog(ERROR, "failed to add item to index page");
00639 
00640         tuples = (IndexTuple) (((char *) tuples) + tupsize);
00641     }
00642 
00643     PageSetLSN(page, lsn);
00644     MarkBufferDirty(buffer);
00645 
00646     UnlockReleaseBuffer(buffer);
00647 }
00648 
00649 static void
00650 ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
00651 {
00652     ginxlogDeleteListPages *data = (ginxlogDeleteListPages *) XLogRecGetData(record);
00653     Buffer      metabuffer;
00654     Page        metapage;
00655     int         i;
00656 
00657     /* Backup blocks are not used in delete_listpage records */
00658     Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00659 
00660     metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
00661     if (!BufferIsValid(metabuffer))
00662         return;                 /* assume index was deleted, nothing to do */
00663     metapage = BufferGetPage(metabuffer);
00664 
00665     if (lsn > PageGetLSN(metapage))
00666     {
00667         memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
00668         PageSetLSN(metapage, lsn);
00669         MarkBufferDirty(metabuffer);
00670     }
00671 
00672     /*
00673      * In normal operation, shiftList() takes exclusive lock on all the
00674      * pages-to-be-deleted simultaneously.  During replay, however, it should
00675      * be all right to lock them one at a time.  This is dependent on the fact
00676      * that we are deleting pages from the head of the list, and that readers
00677      * share-lock the next page before releasing the one they are on. So we
00678      * cannot get past a reader that is on, or due to visit, any page we are
00679      * going to delete.  New incoming readers will block behind our metapage
00680      * lock and then see a fully updated page list.
00681      */
00682     for (i = 0; i < data->ndeleted; i++)
00683     {
00684         Buffer      buffer = XLogReadBuffer(data->node, data->toDelete[i], false);
00685 
00686         if (BufferIsValid(buffer))
00687         {
00688             Page        page = BufferGetPage(buffer);
00689 
00690             if (lsn > PageGetLSN(page))
00691             {
00692                 GinPageGetOpaque(page)->flags = GIN_DELETED;
00693 
00694                 PageSetLSN(page, lsn);
00695                 MarkBufferDirty(buffer);
00696             }
00697 
00698             UnlockReleaseBuffer(buffer);
00699         }
00700     }
00701     UnlockReleaseBuffer(metabuffer);
00702 }
00703 
00704 void
00705 gin_redo(XLogRecPtr lsn, XLogRecord *record)
00706 {
00707     uint8       info = record->xl_info & ~XLR_INFO_MASK;
00708 
00709     /*
00710      * GIN indexes do not require any conflict processing. NB: If we ever
00711      * implement a similar optimization as we have in b-tree, and remove
00712      * killed tuples outside VACUUM, we'll need to handle that here.
00713      */
00714 
00715     topCtx = MemoryContextSwitchTo(opCtx);
00716     switch (info)
00717     {
00718         case XLOG_GIN_CREATE_INDEX:
00719             ginRedoCreateIndex(lsn, record);
00720             break;
00721         case XLOG_GIN_CREATE_PTREE:
00722             ginRedoCreatePTree(lsn, record);
00723             break;
00724         case XLOG_GIN_INSERT:
00725             ginRedoInsert(lsn, record);
00726             break;
00727         case XLOG_GIN_SPLIT:
00728             ginRedoSplit(lsn, record);
00729             break;
00730         case XLOG_GIN_VACUUM_PAGE:
00731             ginRedoVacuumPage(lsn, record);
00732             break;
00733         case XLOG_GIN_DELETE_PAGE:
00734             ginRedoDeletePage(lsn, record);
00735             break;
00736         case XLOG_GIN_UPDATE_META_PAGE:
00737             ginRedoUpdateMetapage(lsn, record);
00738             break;
00739         case XLOG_GIN_INSERT_LISTPAGE:
00740             ginRedoInsertListPage(lsn, record);
00741             break;
00742         case XLOG_GIN_DELETE_LISTPAGE:
00743             ginRedoDeleteListPages(lsn, record);
00744             break;
00745         default:
00746             elog(PANIC, "gin_redo: unknown op code %u", info);
00747     }
00748     MemoryContextSwitchTo(topCtx);
00749     MemoryContextReset(opCtx);
00750 }
00751 
00752 void
00753 gin_xlog_startup(void)
00754 {
00755     incomplete_splits = NIL;
00756 
00757     opCtx = AllocSetContextCreate(CurrentMemoryContext,
00758                                   "GIN recovery temporary context",
00759                                   ALLOCSET_DEFAULT_MINSIZE,
00760                                   ALLOCSET_DEFAULT_INITSIZE,
00761                                   ALLOCSET_DEFAULT_MAXSIZE);
00762 }
00763 
00764 static void
00765 ginContinueSplit(ginIncompleteSplit *split)
00766 {
00767     GinBtreeData btree;
00768     GinState    ginstate;
00769     Relation    reln;
00770     Buffer      buffer;
00771     GinBtreeStack stack;
00772 
00773     /*
00774      * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u",  split->rootBlkno,
00775      * split->leftBlkno, split->rightBlkno);
00776      */
00777     buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
00778 
00779     /*
00780      * Failure should be impossible here, because we wrote the page earlier.
00781      */
00782     if (!BufferIsValid(buffer))
00783         elog(PANIC, "ginContinueSplit: left block %u not found",
00784              split->leftBlkno);
00785 
00786     reln = CreateFakeRelcacheEntry(split->node);
00787 
00788     if (split->rootBlkno == GIN_ROOT_BLKNO)
00789     {
00790         MemSet(&ginstate, 0, sizeof(ginstate));
00791         ginstate.index = reln;
00792 
00793         ginPrepareEntryScan(&btree,
00794                             InvalidOffsetNumber, (Datum) 0, GIN_CAT_NULL_KEY,
00795                             &ginstate);
00796         btree.entry = ginPageGetLinkItup(buffer);
00797     }
00798     else
00799     {
00800         Page        page = BufferGetPage(buffer);
00801 
00802         ginPrepareDataScan(&btree, reln);
00803 
00804         PostingItemSetBlockNumber(&(btree.pitem), split->leftBlkno);
00805         if (GinPageIsLeaf(page))
00806             btree.pitem.key = *(ItemPointerData *) GinDataPageGetItem(page,
00807                                              GinPageGetOpaque(page)->maxoff);
00808         else
00809             btree.pitem.key = ((PostingItem *) GinDataPageGetItem(page,
00810                                        GinPageGetOpaque(page)->maxoff))->key;
00811     }
00812 
00813     btree.rightblkno = split->rightBlkno;
00814 
00815     stack.blkno = split->leftBlkno;
00816     stack.buffer = buffer;
00817     stack.off = InvalidOffsetNumber;
00818     stack.parent = NULL;
00819 
00820     ginFindParents(&btree, &stack, split->rootBlkno);
00821     ginInsertValue(&btree, stack.parent, NULL);
00822 
00823     FreeFakeRelcacheEntry(reln);
00824 
00825     UnlockReleaseBuffer(buffer);
00826 }
00827 
00828 void
00829 gin_xlog_cleanup(void)
00830 {
00831     ListCell   *l;
00832     MemoryContext topCtx;
00833 
00834     topCtx = MemoryContextSwitchTo(opCtx);
00835 
00836     foreach(l, incomplete_splits)
00837     {
00838         ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
00839 
00840         ginContinueSplit(split);
00841         MemoryContextReset(opCtx);
00842     }
00843 
00844     MemoryContextSwitchTo(topCtx);
00845     MemoryContextDelete(opCtx);
00846     incomplete_splits = NIL;
00847 }
00848 
00849 bool
00850 gin_safe_restartpoint(void)
00851 {
00852     if (incomplete_splits)
00853         return false;
00854     return true;
00855 }