Header And Logo

PostgreSQL
| The world's most advanced open source database.

spgxlog.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * spgxlog.c
00004  *    WAL replay logic for SP-GiST
00005  *
00006  *
00007  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00008  * Portions Copyright (c) 1994, Regents of the University of California
00009  *
00010  * IDENTIFICATION
00011  *           src/backend/access/spgist/spgxlog.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 #include "postgres.h"
00016 
00017 #include "access/spgist_private.h"
00018 #include "access/transam.h"
00019 #include "access/xlogutils.h"
00020 #include "storage/standby.h"
00021 #include "utils/memutils.h"
00022 
00023 
00024 static MemoryContext opCtx;     /* working memory for operations */
00025 
00026 
00027 /*
00028  * Prepare a dummy SpGistState, with just the minimum info needed for replay.
00029  *
00030  * At present, all we need is enough info to support spgFormDeadTuple(),
00031  * plus the isBuild flag.
00032  */
00033 static void
00034 fillFakeState(SpGistState *state, spgxlogState stateSrc)
00035 {
00036     memset(state, 0, sizeof(*state));
00037 
00038     state->myXid = stateSrc.myXid;
00039     state->isBuild = stateSrc.isBuild;
00040     state->deadTupleStorage = palloc0(SGDTSIZE);
00041 }
00042 
00043 /*
00044  * Add a leaf tuple, or replace an existing placeholder tuple.  This is used
00045  * to replay SpGistPageAddNewItem() operations.  If the offset points at an
00046  * existing tuple, it had better be a placeholder tuple.
00047  */
00048 static void
00049 addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
00050 {
00051     if (offset <= PageGetMaxOffsetNumber(page))
00052     {
00053         SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
00054                                                 PageGetItemId(page, offset));
00055 
00056         if (dt->tupstate != SPGIST_PLACEHOLDER)
00057             elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
00058 
00059         Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
00060         SpGistPageGetOpaque(page)->nPlaceholder--;
00061 
00062         PageIndexTupleDelete(page, offset);
00063     }
00064 
00065     Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
00066 
00067     if (PageAddItem(page, tuple, size, offset, false, false) != offset)
00068         elog(ERROR, "failed to add item of size %u to SPGiST index page",
00069              size);
00070 }
00071 
00072 static void
00073 spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
00074 {
00075     RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
00076     Buffer      buffer;
00077     Page        page;
00078 
00079     /* Backup blocks are not used in create_index records */
00080     Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00081 
00082     buffer = XLogReadBuffer(*node, SPGIST_METAPAGE_BLKNO, true);
00083     Assert(BufferIsValid(buffer));
00084     page = (Page) BufferGetPage(buffer);
00085     SpGistInitMetapage(page);
00086     PageSetLSN(page, lsn);
00087     MarkBufferDirty(buffer);
00088     UnlockReleaseBuffer(buffer);
00089 
00090     buffer = XLogReadBuffer(*node, SPGIST_ROOT_BLKNO, true);
00091     Assert(BufferIsValid(buffer));
00092     SpGistInitBuffer(buffer, SPGIST_LEAF);
00093     page = (Page) BufferGetPage(buffer);
00094     PageSetLSN(page, lsn);
00095     MarkBufferDirty(buffer);
00096     UnlockReleaseBuffer(buffer);
00097 
00098     buffer = XLogReadBuffer(*node, SPGIST_NULL_BLKNO, true);
00099     Assert(BufferIsValid(buffer));
00100     SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS);
00101     page = (Page) BufferGetPage(buffer);
00102     PageSetLSN(page, lsn);
00103     MarkBufferDirty(buffer);
00104     UnlockReleaseBuffer(buffer);
00105 }
00106 
00107 static void
00108 spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
00109 {
00110     char       *ptr = XLogRecGetData(record);
00111     spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
00112     SpGistLeafTuple leafTuple;
00113     Buffer      buffer;
00114     Page        page;
00115 
00116     /* we assume this is adequately aligned */
00117     ptr += sizeof(spgxlogAddLeaf);
00118     leafTuple = (SpGistLeafTuple) ptr;
00119 
00120     /*
00121      * In normal operation we would have both current and parent pages locked
00122      * simultaneously; but in WAL replay it should be safe to update the leaf
00123      * page before updating the parent.
00124      */
00125     if (record->xl_info & XLR_BKP_BLOCK(0))
00126         (void) RestoreBackupBlock(lsn, record, 0, false, false);
00127     else
00128     {
00129         buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf,
00130                                 xldata->newPage);
00131         if (BufferIsValid(buffer))
00132         {
00133             page = BufferGetPage(buffer);
00134 
00135             if (xldata->newPage)
00136                 SpGistInitBuffer(buffer,
00137                      SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
00138 
00139             if (lsn > PageGetLSN(page))
00140             {
00141                 /* insert new tuple */
00142                 if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
00143                 {
00144                     /* normal cases, tuple was added by SpGistPageAddNewItem */
00145                     addOrReplaceTuple(page, (Item) leafTuple, leafTuple->size,
00146                                       xldata->offnumLeaf);
00147 
00148                     /* update head tuple's chain link if needed */
00149                     if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
00150                     {
00151                         SpGistLeafTuple head;
00152 
00153                         head = (SpGistLeafTuple) PageGetItem(page,
00154                                 PageGetItemId(page, xldata->offnumHeadLeaf));
00155                         Assert(head->nextOffset == leafTuple->nextOffset);
00156                         head->nextOffset = xldata->offnumLeaf;
00157                     }
00158                 }
00159                 else
00160                 {
00161                     /* replacing a DEAD tuple */
00162                     PageIndexTupleDelete(page, xldata->offnumLeaf);
00163                     if (PageAddItem(page,
00164                                     (Item) leafTuple, leafTuple->size,
00165                      xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
00166                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
00167                              leafTuple->size);
00168                 }
00169 
00170                 PageSetLSN(page, lsn);
00171                 MarkBufferDirty(buffer);
00172             }
00173             UnlockReleaseBuffer(buffer);
00174         }
00175     }
00176 
00177     /* update parent downlink if necessary */
00178     if (record->xl_info & XLR_BKP_BLOCK(1))
00179         (void) RestoreBackupBlock(lsn, record, 1, false, false);
00180     else if (xldata->blknoParent != InvalidBlockNumber)
00181     {
00182         buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
00183         if (BufferIsValid(buffer))
00184         {
00185             page = BufferGetPage(buffer);
00186             if (lsn > PageGetLSN(page))
00187             {
00188                 SpGistInnerTuple tuple;
00189 
00190                 tuple = (SpGistInnerTuple) PageGetItem(page,
00191                                   PageGetItemId(page, xldata->offnumParent));
00192 
00193                 spgUpdateNodeLink(tuple, xldata->nodeI,
00194                                   xldata->blknoLeaf, xldata->offnumLeaf);
00195 
00196                 PageSetLSN(page, lsn);
00197                 MarkBufferDirty(buffer);
00198             }
00199             UnlockReleaseBuffer(buffer);
00200         }
00201     }
00202 }
00203 
00204 static void
00205 spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
00206 {
00207     char       *ptr = XLogRecGetData(record);
00208     spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
00209     SpGistState state;
00210     OffsetNumber *toDelete;
00211     OffsetNumber *toInsert;
00212     int         nInsert;
00213     Buffer      buffer;
00214     Page        page;
00215 
00216     fillFakeState(&state, xldata->stateSrc);
00217 
00218     nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
00219 
00220     ptr += MAXALIGN(sizeof(spgxlogMoveLeafs));
00221     toDelete = (OffsetNumber *) ptr;
00222     ptr += MAXALIGN(sizeof(OffsetNumber) * xldata->nMoves);
00223     toInsert = (OffsetNumber *) ptr;
00224     ptr += MAXALIGN(sizeof(OffsetNumber) * nInsert);
00225 
00226     /* now ptr points to the list of leaf tuples */
00227 
00228     /*
00229      * In normal operation we would have all three pages (source, dest, and
00230      * parent) locked simultaneously; but in WAL replay it should be safe to
00231      * update them one at a time, as long as we do it in the right order.
00232      */
00233 
00234     /* Insert tuples on the dest page (do first, so redirect is valid) */
00235     if (record->xl_info & XLR_BKP_BLOCK(1))
00236         (void) RestoreBackupBlock(lsn, record, 1, false, false);
00237     else
00238     {
00239         buffer = XLogReadBuffer(xldata->node, xldata->blknoDst,
00240                                 xldata->newPage);
00241         if (BufferIsValid(buffer))
00242         {
00243             page = BufferGetPage(buffer);
00244 
00245             if (xldata->newPage)
00246                 SpGistInitBuffer(buffer,
00247                      SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
00248 
00249             if (lsn > PageGetLSN(page))
00250             {
00251                 int         i;
00252 
00253                 for (i = 0; i < nInsert; i++)
00254                 {
00255                     SpGistLeafTuple lt = (SpGistLeafTuple) ptr;
00256 
00257                     addOrReplaceTuple(page, (Item) lt, lt->size, toInsert[i]);
00258                     ptr += lt->size;
00259                 }
00260 
00261                 PageSetLSN(page, lsn);
00262                 MarkBufferDirty(buffer);
00263             }
00264             UnlockReleaseBuffer(buffer);
00265         }
00266     }
00267 
00268     /* Delete tuples from the source page, inserting a redirection pointer */
00269     if (record->xl_info & XLR_BKP_BLOCK(0))
00270         (void) RestoreBackupBlock(lsn, record, 0, false, false);
00271     else
00272     {
00273         buffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false);
00274         if (BufferIsValid(buffer))
00275         {
00276             page = BufferGetPage(buffer);
00277             if (lsn > PageGetLSN(page))
00278             {
00279                 spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
00280                         state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
00281                                         SPGIST_PLACEHOLDER,
00282                                         xldata->blknoDst,
00283                                         toInsert[nInsert - 1]);
00284 
00285                 PageSetLSN(page, lsn);
00286                 MarkBufferDirty(buffer);
00287             }
00288             UnlockReleaseBuffer(buffer);
00289         }
00290     }
00291 
00292     /* And update the parent downlink */
00293     if (record->xl_info & XLR_BKP_BLOCK(2))
00294         (void) RestoreBackupBlock(lsn, record, 2, false, false);
00295     else
00296     {
00297         buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
00298         if (BufferIsValid(buffer))
00299         {
00300             page = BufferGetPage(buffer);
00301             if (lsn > PageGetLSN(page))
00302             {
00303                 SpGistInnerTuple tuple;
00304 
00305                 tuple = (SpGistInnerTuple) PageGetItem(page,
00306                                   PageGetItemId(page, xldata->offnumParent));
00307 
00308                 spgUpdateNodeLink(tuple, xldata->nodeI,
00309                                   xldata->blknoDst, toInsert[nInsert - 1]);
00310 
00311                 PageSetLSN(page, lsn);
00312                 MarkBufferDirty(buffer);
00313             }
00314             UnlockReleaseBuffer(buffer);
00315         }
00316     }
00317 }
00318 
00319 static void
00320 spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
00321 {
00322     char       *ptr = XLogRecGetData(record);
00323     spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
00324     SpGistInnerTuple innerTuple;
00325     SpGistState state;
00326     Buffer      buffer;
00327     Page        page;
00328     int         bbi;
00329 
00330     /* we assume this is adequately aligned */
00331     ptr += sizeof(spgxlogAddNode);
00332     innerTuple = (SpGistInnerTuple) ptr;
00333 
00334     fillFakeState(&state, xldata->stateSrc);
00335 
00336     if (xldata->blknoNew == InvalidBlockNumber)
00337     {
00338         /* update in place */
00339         Assert(xldata->blknoParent == InvalidBlockNumber);
00340         if (record->xl_info & XLR_BKP_BLOCK(0))
00341             (void) RestoreBackupBlock(lsn, record, 0, false, false);
00342         else
00343         {
00344             buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
00345             if (BufferIsValid(buffer))
00346             {
00347                 page = BufferGetPage(buffer);
00348                 if (lsn > PageGetLSN(page))
00349                 {
00350                     PageIndexTupleDelete(page, xldata->offnum);
00351                     if (PageAddItem(page, (Item) innerTuple, innerTuple->size,
00352                                     xldata->offnum,
00353                                     false, false) != xldata->offnum)
00354                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
00355                              innerTuple->size);
00356 
00357                     PageSetLSN(page, lsn);
00358                     MarkBufferDirty(buffer);
00359                 }
00360                 UnlockReleaseBuffer(buffer);
00361             }
00362         }
00363     }
00364     else
00365     {
00366         /*
00367          * In normal operation we would have all three pages (source, dest,
00368          * and parent) locked simultaneously; but in WAL replay it should be
00369          * safe to update them one at a time, as long as we do it in the right
00370          * order.
00371          *
00372          * The logic here depends on the assumption that blkno != blknoNew,
00373          * else we can't tell which BKP bit goes with which page, and the LSN
00374          * checks could go wrong too.
00375          */
00376         Assert(xldata->blkno != xldata->blknoNew);
00377 
00378         /* Install new tuple first so redirect is valid */
00379         if (record->xl_info & XLR_BKP_BLOCK(1))
00380             (void) RestoreBackupBlock(lsn, record, 1, false, false);
00381         else
00382         {
00383             buffer = XLogReadBuffer(xldata->node, xldata->blknoNew,
00384                                     xldata->newPage);
00385             if (BufferIsValid(buffer))
00386             {
00387                 page = BufferGetPage(buffer);
00388 
00389                 /* AddNode is not used for nulls pages */
00390                 if (xldata->newPage)
00391                     SpGistInitBuffer(buffer, 0);
00392 
00393                 if (lsn > PageGetLSN(page))
00394                 {
00395                     addOrReplaceTuple(page, (Item) innerTuple,
00396                                       innerTuple->size, xldata->offnumNew);
00397 
00398                     /*
00399                      * If parent is in this same page, don't advance LSN;
00400                      * doing so would fool us into not applying the parent
00401                      * downlink update below.  We'll update the LSN when we
00402                      * fix the parent downlink.
00403                      */
00404                     if (xldata->blknoParent != xldata->blknoNew)
00405                     {
00406                         PageSetLSN(page, lsn);
00407                     }
00408                     MarkBufferDirty(buffer);
00409                 }
00410                 UnlockReleaseBuffer(buffer);
00411             }
00412         }
00413 
00414         /* Delete old tuple, replacing it with redirect or placeholder tuple */
00415         if (record->xl_info & XLR_BKP_BLOCK(0))
00416             (void) RestoreBackupBlock(lsn, record, 0, false, false);
00417         else
00418         {
00419             buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
00420             if (BufferIsValid(buffer))
00421             {
00422                 page = BufferGetPage(buffer);
00423                 if (lsn > PageGetLSN(page))
00424                 {
00425                     SpGistDeadTuple dt;
00426 
00427                     if (state.isBuild)
00428                         dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
00429                                               InvalidBlockNumber,
00430                                               InvalidOffsetNumber);
00431                     else
00432                         dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
00433                                               xldata->blknoNew,
00434                                               xldata->offnumNew);
00435 
00436                     PageIndexTupleDelete(page, xldata->offnum);
00437                     if (PageAddItem(page, (Item) dt, dt->size,
00438                                     xldata->offnum,
00439                                     false, false) != xldata->offnum)
00440                         elog(ERROR, "failed to add item of size %u to SPGiST index page",
00441                              dt->size);
00442 
00443                     if (state.isBuild)
00444                         SpGistPageGetOpaque(page)->nPlaceholder++;
00445                     else
00446                         SpGistPageGetOpaque(page)->nRedirection++;
00447 
00448                     /*
00449                      * If parent is in this same page, don't advance LSN;
00450                      * doing so would fool us into not applying the parent
00451                      * downlink update below.  We'll update the LSN when we
00452                      * fix the parent downlink.
00453                      */
00454                     if (xldata->blknoParent != xldata->blkno)
00455                     {
00456                         PageSetLSN(page, lsn);
00457                     }
00458                     MarkBufferDirty(buffer);
00459                 }
00460                 UnlockReleaseBuffer(buffer);
00461             }
00462         }
00463 
00464         /*
00465          * Update parent downlink.  Since parent could be in either of the
00466          * previous two buffers, it's a bit tricky to determine which BKP bit
00467          * applies.
00468          */
00469         if (xldata->blknoParent == xldata->blkno)
00470             bbi = 0;
00471         else if (xldata->blknoParent == xldata->blknoNew)
00472             bbi = 1;
00473         else
00474             bbi = 2;
00475 
00476         if (record->xl_info & XLR_BKP_BLOCK(bbi))
00477         {
00478             if (bbi == 2)       /* else we already did it */
00479                 (void) RestoreBackupBlock(lsn, record, bbi, false, false);
00480         }
00481         else
00482         {
00483             buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
00484             if (BufferIsValid(buffer))
00485             {
00486                 page = BufferGetPage(buffer);
00487                 if (lsn > PageGetLSN(page))
00488                 {
00489                     SpGistInnerTuple innerTuple;
00490 
00491                     innerTuple = (SpGistInnerTuple) PageGetItem(page,
00492                                   PageGetItemId(page, xldata->offnumParent));
00493 
00494                     spgUpdateNodeLink(innerTuple, xldata->nodeI,
00495                                       xldata->blknoNew, xldata->offnumNew);
00496 
00497                     PageSetLSN(page, lsn);
00498                     MarkBufferDirty(buffer);
00499                 }
00500                 UnlockReleaseBuffer(buffer);
00501             }
00502         }
00503     }
00504 }
00505 
00506 static void
00507 spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
00508 {
00509     char       *ptr = XLogRecGetData(record);
00510     spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
00511     SpGistInnerTuple prefixTuple;
00512     SpGistInnerTuple postfixTuple;
00513     Buffer      buffer;
00514     Page        page;
00515 
00516     /* we assume this is adequately aligned */
00517     ptr += sizeof(spgxlogSplitTuple);
00518     prefixTuple = (SpGistInnerTuple) ptr;
00519     ptr += prefixTuple->size;
00520     postfixTuple = (SpGistInnerTuple) ptr;
00521 
00522     /*
00523      * In normal operation we would have both pages locked simultaneously; but
00524      * in WAL replay it should be safe to update them one at a time, as long
00525      * as we do it in the right order.
00526      */
00527 
00528     /* insert postfix tuple first to avoid dangling link */
00529     if (record->xl_info & XLR_BKP_BLOCK(1))
00530         (void) RestoreBackupBlock(lsn, record, 1, false, false);
00531     else if (xldata->blknoPostfix != xldata->blknoPrefix)
00532     {
00533         buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix,
00534                                 xldata->newPage);
00535         if (BufferIsValid(buffer))
00536         {
00537             page = BufferGetPage(buffer);
00538 
00539             /* SplitTuple is not used for nulls pages */
00540             if (xldata->newPage)
00541                 SpGistInitBuffer(buffer, 0);
00542 
00543             if (lsn > PageGetLSN(page))
00544             {
00545                 addOrReplaceTuple(page, (Item) postfixTuple,
00546                                   postfixTuple->size, xldata->offnumPostfix);
00547 
00548                 PageSetLSN(page, lsn);
00549                 MarkBufferDirty(buffer);
00550             }
00551             UnlockReleaseBuffer(buffer);
00552         }
00553     }
00554 
00555     /* now handle the original page */
00556     if (record->xl_info & XLR_BKP_BLOCK(0))
00557         (void) RestoreBackupBlock(lsn, record, 0, false, false);
00558     else
00559     {
00560         buffer = XLogReadBuffer(xldata->node, xldata->blknoPrefix, false);
00561         if (BufferIsValid(buffer))
00562         {
00563             page = BufferGetPage(buffer);
00564             if (lsn > PageGetLSN(page))
00565             {
00566                 PageIndexTupleDelete(page, xldata->offnumPrefix);
00567                 if (PageAddItem(page, (Item) prefixTuple, prefixTuple->size,
00568                  xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
00569                     elog(ERROR, "failed to add item of size %u to SPGiST index page",
00570                          prefixTuple->size);
00571 
00572                 if (xldata->blknoPostfix == xldata->blknoPrefix)
00573                     addOrReplaceTuple(page, (Item) postfixTuple,
00574                                       postfixTuple->size,
00575                                       xldata->offnumPostfix);
00576 
00577                 PageSetLSN(page, lsn);
00578                 MarkBufferDirty(buffer);
00579             }
00580             UnlockReleaseBuffer(buffer);
00581         }
00582     }
00583 }
00584 
00585 static void
00586 spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
00587 {
00588     char       *ptr = XLogRecGetData(record);
00589     spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
00590     SpGistInnerTuple innerTuple;
00591     SpGistState state;
00592     OffsetNumber *toDelete;
00593     OffsetNumber *toInsert;
00594     uint8      *leafPageSelect;
00595     Buffer      srcBuffer;
00596     Buffer      destBuffer;
00597     Page        srcPage;
00598     Page        destPage;
00599     Page        page;
00600     int         bbi;
00601     int         i;
00602 
00603     fillFakeState(&state, xldata->stateSrc);
00604 
00605     ptr += MAXALIGN(sizeof(spgxlogPickSplit));
00606     innerTuple = (SpGistInnerTuple) ptr;
00607     ptr += innerTuple->size;
00608     toDelete = (OffsetNumber *) ptr;
00609     ptr += MAXALIGN(sizeof(OffsetNumber) * xldata->nDelete);
00610     toInsert = (OffsetNumber *) ptr;
00611     ptr += MAXALIGN(sizeof(OffsetNumber) * xldata->nInsert);
00612     leafPageSelect = (uint8 *) ptr;
00613     ptr += MAXALIGN(sizeof(uint8) * xldata->nInsert);
00614 
00615     /* now ptr points to the list of leaf tuples */
00616 
00617     /*
00618      * It's a bit tricky to identify which pages have been handled as
00619      * full-page images, so we explicitly count each referenced buffer.
00620      */
00621     bbi = 0;
00622 
00623     if (SpGistBlockIsRoot(xldata->blknoSrc))
00624     {
00625         /* when splitting root, we touch it only in the guise of new inner */
00626         srcBuffer = InvalidBuffer;
00627         srcPage = NULL;
00628     }
00629     else if (xldata->initSrc)
00630     {
00631         /* just re-init the source page */
00632         srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, true);
00633         Assert(BufferIsValid(srcBuffer));
00634         srcPage = (Page) BufferGetPage(srcBuffer);
00635 
00636         SpGistInitBuffer(srcBuffer,
00637                      SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
00638         /* don't update LSN etc till we're done with it */
00639     }
00640     else
00641     {
00642         /*
00643          * Delete the specified tuples from source page.  (In case we're in
00644          * Hot Standby, we need to hold lock on the page till we're done
00645          * inserting leaf tuples and the new inner tuple, else the added
00646          * redirect tuple will be a dangling link.)
00647          */
00648         if (record->xl_info & XLR_BKP_BLOCK(bbi))
00649         {
00650             srcBuffer = RestoreBackupBlock(lsn, record, bbi, false, true);
00651             srcPage = NULL;     /* don't need to do any page updates */
00652         }
00653         else
00654         {
00655             srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false);
00656             if (BufferIsValid(srcBuffer))
00657             {
00658                 srcPage = BufferGetPage(srcBuffer);
00659                 if (lsn > PageGetLSN(srcPage))
00660                 {
00661                     /*
00662                      * We have it a bit easier here than in doPickSplit(),
00663                      * because we know the inner tuple's location already, so
00664                      * we can inject the correct redirection tuple now.
00665                      */
00666                     if (!state.isBuild)
00667                         spgPageIndexMultiDelete(&state, srcPage,
00668                                                 toDelete, xldata->nDelete,
00669                                                 SPGIST_REDIRECT,
00670                                                 SPGIST_PLACEHOLDER,
00671                                                 xldata->blknoInner,
00672                                                 xldata->offnumInner);
00673                     else
00674                         spgPageIndexMultiDelete(&state, srcPage,
00675                                                 toDelete, xldata->nDelete,
00676                                                 SPGIST_PLACEHOLDER,
00677                                                 SPGIST_PLACEHOLDER,
00678                                                 InvalidBlockNumber,
00679                                                 InvalidOffsetNumber);
00680 
00681                     /* don't update LSN etc till we're done with it */
00682                 }
00683                 else
00684                     srcPage = NULL;     /* don't do any page updates */
00685             }
00686             else
00687                 srcPage = NULL;
00688         }
00689         bbi++;
00690     }
00691 
00692     /* try to access dest page if any */
00693     if (xldata->blknoDest == InvalidBlockNumber)
00694     {
00695         destBuffer = InvalidBuffer;
00696         destPage = NULL;
00697     }
00698     else if (xldata->initDest)
00699     {
00700         /* just re-init the dest page */
00701         destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, true);
00702         Assert(BufferIsValid(destBuffer));
00703         destPage = (Page) BufferGetPage(destBuffer);
00704 
00705         SpGistInitBuffer(destBuffer,
00706                      SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
00707         /* don't update LSN etc till we're done with it */
00708     }
00709     else
00710     {
00711         /*
00712          * We could probably release the page lock immediately in the
00713          * full-page-image case, but for safety let's hold it till later.
00714          */
00715         if (record->xl_info & XLR_BKP_BLOCK(bbi))
00716         {
00717             destBuffer = RestoreBackupBlock(lsn, record, bbi, false, true);
00718             destPage = NULL;    /* don't need to do any page updates */
00719         }
00720         else
00721         {
00722             destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, false);
00723             if (BufferIsValid(destBuffer))
00724             {
00725                 destPage = (Page) BufferGetPage(destBuffer);
00726                 if (lsn <= PageGetLSN(destPage))
00727                     destPage = NULL;    /* don't do any page updates */
00728             }
00729             else
00730                 destPage = NULL;
00731         }
00732         bbi++;
00733     }
00734 
00735     /* restore leaf tuples to src and/or dest page */
00736     for (i = 0; i < xldata->nInsert; i++)
00737     {
00738         SpGistLeafTuple lt = (SpGistLeafTuple) ptr;
00739 
00740         ptr += lt->size;
00741 
00742         page = leafPageSelect[i] ? destPage : srcPage;
00743         if (page == NULL)
00744             continue;           /* no need to touch this page */
00745 
00746         addOrReplaceTuple(page, (Item) lt, lt->size, toInsert[i]);
00747     }
00748 
00749     /* Now update src and dest page LSNs if needed */
00750     if (srcPage != NULL)
00751     {
00752         PageSetLSN(srcPage, lsn);
00753         MarkBufferDirty(srcBuffer);
00754     }
00755     if (destPage != NULL)
00756     {
00757         PageSetLSN(destPage, lsn);
00758         MarkBufferDirty(destBuffer);
00759     }
00760 
00761     /* restore new inner tuple */
00762     if (record->xl_info & XLR_BKP_BLOCK(bbi))
00763         (void) RestoreBackupBlock(lsn, record, bbi, false, false);
00764     else
00765     {
00766         Buffer      buffer = XLogReadBuffer(xldata->node, xldata->blknoInner,
00767                                             xldata->initInner);
00768 
00769         if (BufferIsValid(buffer))
00770         {
00771             page = BufferGetPage(buffer);
00772 
00773             if (xldata->initInner)
00774                 SpGistInitBuffer(buffer,
00775                                  (xldata->storesNulls ? SPGIST_NULLS : 0));
00776 
00777             if (lsn > PageGetLSN(page))
00778             {
00779                 addOrReplaceTuple(page, (Item) innerTuple, innerTuple->size,
00780                                   xldata->offnumInner);
00781 
00782                 /* if inner is also parent, update link while we're here */
00783                 if (xldata->blknoInner == xldata->blknoParent)
00784                 {
00785                     SpGistInnerTuple parent;
00786 
00787                     parent = (SpGistInnerTuple) PageGetItem(page,
00788                                   PageGetItemId(page, xldata->offnumParent));
00789                     spgUpdateNodeLink(parent, xldata->nodeI,
00790                                     xldata->blknoInner, xldata->offnumInner);
00791                 }
00792 
00793                 PageSetLSN(page, lsn);
00794                 MarkBufferDirty(buffer);
00795             }
00796             UnlockReleaseBuffer(buffer);
00797         }
00798     }
00799     bbi++;
00800 
00801     /*
00802      * Now we can release the leaf-page locks.  It's okay to do this before
00803      * updating the parent downlink.
00804      */
00805     if (BufferIsValid(srcBuffer))
00806         UnlockReleaseBuffer(srcBuffer);
00807     if (BufferIsValid(destBuffer))
00808         UnlockReleaseBuffer(destBuffer);
00809 
00810     /* update parent downlink, unless we did it above */
00811     if (xldata->blknoParent == InvalidBlockNumber)
00812     {
00813         /* no parent cause we split the root */
00814         Assert(SpGistBlockIsRoot(xldata->blknoInner));
00815     }
00816     else if (xldata->blknoInner != xldata->blknoParent)
00817     {
00818         if (record->xl_info & XLR_BKP_BLOCK(bbi))
00819             (void) RestoreBackupBlock(lsn, record, bbi, false, false);
00820         else
00821         {
00822             Buffer      buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
00823 
00824             if (BufferIsValid(buffer))
00825             {
00826                 page = BufferGetPage(buffer);
00827 
00828                 if (lsn > PageGetLSN(page))
00829                 {
00830                     SpGistInnerTuple parent;
00831 
00832                     parent = (SpGistInnerTuple) PageGetItem(page,
00833                                   PageGetItemId(page, xldata->offnumParent));
00834                     spgUpdateNodeLink(parent, xldata->nodeI,
00835                                     xldata->blknoInner, xldata->offnumInner);
00836 
00837                     PageSetLSN(page, lsn);
00838                     MarkBufferDirty(buffer);
00839                 }
00840                 UnlockReleaseBuffer(buffer);
00841             }
00842         }
00843     }
00844 }
00845 
00846 static void
00847 spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record)
00848 {
00849     char       *ptr = XLogRecGetData(record);
00850     spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
00851     OffsetNumber *toDead;
00852     OffsetNumber *toPlaceholder;
00853     OffsetNumber *moveSrc;
00854     OffsetNumber *moveDest;
00855     OffsetNumber *chainSrc;
00856     OffsetNumber *chainDest;
00857     SpGistState state;
00858     Buffer      buffer;
00859     Page        page;
00860     int         i;
00861 
00862     fillFakeState(&state, xldata->stateSrc);
00863 
00864     ptr += sizeof(spgxlogVacuumLeaf);
00865     toDead = (OffsetNumber *) ptr;
00866     ptr += sizeof(OffsetNumber) * xldata->nDead;
00867     toPlaceholder = (OffsetNumber *) ptr;
00868     ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
00869     moveSrc = (OffsetNumber *) ptr;
00870     ptr += sizeof(OffsetNumber) * xldata->nMove;
00871     moveDest = (OffsetNumber *) ptr;
00872     ptr += sizeof(OffsetNumber) * xldata->nMove;
00873     chainSrc = (OffsetNumber *) ptr;
00874     ptr += sizeof(OffsetNumber) * xldata->nChain;
00875     chainDest = (OffsetNumber *) ptr;
00876 
00877     if (record->xl_info & XLR_BKP_BLOCK(0))
00878         (void) RestoreBackupBlock(lsn, record, 0, false, false);
00879     else
00880     {
00881         buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
00882         if (BufferIsValid(buffer))
00883         {
00884             page = BufferGetPage(buffer);
00885             if (lsn > PageGetLSN(page))
00886             {
00887                 spgPageIndexMultiDelete(&state, page,
00888                                         toDead, xldata->nDead,
00889                                         SPGIST_DEAD, SPGIST_DEAD,
00890                                         InvalidBlockNumber,
00891                                         InvalidOffsetNumber);
00892 
00893                 spgPageIndexMultiDelete(&state, page,
00894                                         toPlaceholder, xldata->nPlaceholder,
00895                                       SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
00896                                         InvalidBlockNumber,
00897                                         InvalidOffsetNumber);
00898 
00899                 /* see comments in vacuumLeafPage() */
00900                 for (i = 0; i < xldata->nMove; i++)
00901                 {
00902                     ItemId      idSrc = PageGetItemId(page, moveSrc[i]);
00903                     ItemId      idDest = PageGetItemId(page, moveDest[i]);
00904                     ItemIdData  tmp;
00905 
00906                     tmp = *idSrc;
00907                     *idSrc = *idDest;
00908                     *idDest = tmp;
00909                 }
00910 
00911                 spgPageIndexMultiDelete(&state, page,
00912                                         moveSrc, xldata->nMove,
00913                                       SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
00914                                         InvalidBlockNumber,
00915                                         InvalidOffsetNumber);
00916 
00917                 for (i = 0; i < xldata->nChain; i++)
00918                 {
00919                     SpGistLeafTuple lt;
00920 
00921                     lt = (SpGistLeafTuple) PageGetItem(page,
00922                                            PageGetItemId(page, chainSrc[i]));
00923                     Assert(lt->tupstate == SPGIST_LIVE);
00924                     lt->nextOffset = chainDest[i];
00925                 }
00926 
00927                 PageSetLSN(page, lsn);
00928                 MarkBufferDirty(buffer);
00929             }
00930             UnlockReleaseBuffer(buffer);
00931         }
00932     }
00933 }
00934 
00935 static void
00936 spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record)
00937 {
00938     char       *ptr = XLogRecGetData(record);
00939     spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
00940     OffsetNumber *toDelete;
00941     Buffer      buffer;
00942     Page        page;
00943 
00944     ptr += sizeof(spgxlogVacuumRoot);
00945     toDelete = (OffsetNumber *) ptr;
00946 
00947     if (record->xl_info & XLR_BKP_BLOCK(0))
00948         (void) RestoreBackupBlock(lsn, record, 0, false, false);
00949     else
00950     {
00951         buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
00952         if (BufferIsValid(buffer))
00953         {
00954             page = BufferGetPage(buffer);
00955             if (lsn > PageGetLSN(page))
00956             {
00957                 /* The tuple numbers are in order */
00958                 PageIndexMultiDelete(page, toDelete, xldata->nDelete);
00959 
00960                 PageSetLSN(page, lsn);
00961                 MarkBufferDirty(buffer);
00962             }
00963             UnlockReleaseBuffer(buffer);
00964         }
00965     }
00966 }
00967 
00968 static void
00969 spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record)
00970 {
00971     char       *ptr = XLogRecGetData(record);
00972     spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
00973     OffsetNumber *itemToPlaceholder;
00974     Buffer      buffer;
00975     Page        page;
00976 
00977     ptr += sizeof(spgxlogVacuumRedirect);
00978     itemToPlaceholder = (OffsetNumber *) ptr;
00979 
00980     /*
00981      * If any redirection tuples are being removed, make sure there are no
00982      * live Hot Standby transactions that might need to see them.
00983      */
00984     if (InHotStandby)
00985     {
00986         if (TransactionIdIsValid(xldata->newestRedirectXid))
00987             ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
00988                                                 xldata->node);
00989     }
00990 
00991     if (record->xl_info & XLR_BKP_BLOCK(0))
00992         (void) RestoreBackupBlock(lsn, record, 0, false, false);
00993     else
00994     {
00995         buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
00996 
00997         if (BufferIsValid(buffer))
00998         {
00999             page = BufferGetPage(buffer);
01000             if (lsn > PageGetLSN(page))
01001             {
01002                 SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
01003                 int         i;
01004 
01005                 /* Convert redirect pointers to plain placeholders */
01006                 for (i = 0; i < xldata->nToPlaceholder; i++)
01007                 {
01008                     SpGistDeadTuple dt;
01009 
01010                     dt = (SpGistDeadTuple) PageGetItem(page,
01011                                   PageGetItemId(page, itemToPlaceholder[i]));
01012                     Assert(dt->tupstate == SPGIST_REDIRECT);
01013                     dt->tupstate = SPGIST_PLACEHOLDER;
01014                     ItemPointerSetInvalid(&dt->pointer);
01015                 }
01016 
01017                 Assert(opaque->nRedirection >= xldata->nToPlaceholder);
01018                 opaque->nRedirection -= xldata->nToPlaceholder;
01019                 opaque->nPlaceholder += xldata->nToPlaceholder;
01020 
01021                 /* Remove placeholder tuples at end of page */
01022                 if (xldata->firstPlaceholder != InvalidOffsetNumber)
01023                 {
01024                     int         max = PageGetMaxOffsetNumber(page);
01025                     OffsetNumber *toDelete;
01026 
01027                     toDelete = palloc(sizeof(OffsetNumber) * max);
01028 
01029                     for (i = xldata->firstPlaceholder; i <= max; i++)
01030                         toDelete[i - xldata->firstPlaceholder] = i;
01031 
01032                     i = max - xldata->firstPlaceholder + 1;
01033                     Assert(opaque->nPlaceholder >= i);
01034                     opaque->nPlaceholder -= i;
01035 
01036                     /* The array is sorted, so can use PageIndexMultiDelete */
01037                     PageIndexMultiDelete(page, toDelete, i);
01038 
01039                     pfree(toDelete);
01040                 }
01041 
01042                 PageSetLSN(page, lsn);
01043                 MarkBufferDirty(buffer);
01044             }
01045 
01046             UnlockReleaseBuffer(buffer);
01047         }
01048     }
01049 }
01050 
01051 void
01052 spg_redo(XLogRecPtr lsn, XLogRecord *record)
01053 {
01054     uint8       info = record->xl_info & ~XLR_INFO_MASK;
01055     MemoryContext oldCxt;
01056 
01057     oldCxt = MemoryContextSwitchTo(opCtx);
01058     switch (info)
01059     {
01060         case XLOG_SPGIST_CREATE_INDEX:
01061             spgRedoCreateIndex(lsn, record);
01062             break;
01063         case XLOG_SPGIST_ADD_LEAF:
01064             spgRedoAddLeaf(lsn, record);
01065             break;
01066         case XLOG_SPGIST_MOVE_LEAFS:
01067             spgRedoMoveLeafs(lsn, record);
01068             break;
01069         case XLOG_SPGIST_ADD_NODE:
01070             spgRedoAddNode(lsn, record);
01071             break;
01072         case XLOG_SPGIST_SPLIT_TUPLE:
01073             spgRedoSplitTuple(lsn, record);
01074             break;
01075         case XLOG_SPGIST_PICKSPLIT:
01076             spgRedoPickSplit(lsn, record);
01077             break;
01078         case XLOG_SPGIST_VACUUM_LEAF:
01079             spgRedoVacuumLeaf(lsn, record);
01080             break;
01081         case XLOG_SPGIST_VACUUM_ROOT:
01082             spgRedoVacuumRoot(lsn, record);
01083             break;
01084         case XLOG_SPGIST_VACUUM_REDIRECT:
01085             spgRedoVacuumRedirect(lsn, record);
01086             break;
01087         default:
01088             elog(PANIC, "spg_redo: unknown op code %u", info);
01089     }
01090 
01091     MemoryContextSwitchTo(oldCxt);
01092     MemoryContextReset(opCtx);
01093 }
01094 
01095 void
01096 spg_xlog_startup(void)
01097 {
01098     opCtx = AllocSetContextCreate(CurrentMemoryContext,
01099                                   "SP-GiST temporary context",
01100                                   ALLOCSET_DEFAULT_MINSIZE,
01101                                   ALLOCSET_DEFAULT_INITSIZE,
01102                                   ALLOCSET_DEFAULT_MAXSIZE);
01103 }
01104 
01105 void
01106 spg_xlog_cleanup(void)
01107 {
01108     MemoryContextDelete(opCtx);
01109     opCtx = NULL;
01110 }