Header And Logo

PostgreSQL
| The world's most advanced open source database.

gistxlog.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * gistxlog.c
00004  *    WAL replay logic for GiST.
00005  *
00006  *
00007  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00008  * Portions Copyright (c) 1994, Regents of the University of California
00009  *
00010  * IDENTIFICATION
00011  *           src/backend/access/gist/gistxlog.c
00012  *-------------------------------------------------------------------------
00013  */
00014 #include "postgres.h"
00015 
00016 #include "access/gist_private.h"
00017 #include "access/xlogutils.h"
00018 #include "utils/memutils.h"
00019 
00020 typedef struct
00021 {
00022     gistxlogPage *header;
00023     IndexTuple *itup;
00024 } NewPage;
00025 
00026 typedef struct
00027 {
00028     gistxlogPageSplit *data;
00029     NewPage    *page;
00030 } PageSplitRecord;
00031 
00032 static MemoryContext opCtx;     /* working memory for operations */
00033 
00034 /*
00035  * Replay the clearing of F_FOLLOW_RIGHT flag on a child page.
00036  *
00037  * Even if the WAL record includes a full-page image, we have to update the
00038  * follow-right flag, because that change is not included in the full-page
00039  * image.  To be sure that the intermediate state with the wrong flag value is
00040  * not visible to concurrent Hot Standby queries, this function handles
00041  * restoring the full-page image as well as updating the flag.  (Note that
00042  * we never need to do anything else to the child page in the current WAL
00043  * action.)
00044  */
00045 static void
00046 gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index,
00047                          RelFileNode node, BlockNumber childblkno)
00048 {
00049     Buffer      buffer;
00050     Page        page;
00051 
00052     if (record->xl_info & XLR_BKP_BLOCK(block_index))
00053         buffer = RestoreBackupBlock(lsn, record, block_index, false, true);
00054     else
00055     {
00056         buffer = XLogReadBuffer(node, childblkno, false);
00057         if (!BufferIsValid(buffer))
00058             return;             /* page was deleted, nothing to do */
00059     }
00060     page = (Page) BufferGetPage(buffer);
00061 
00062     /*
00063      * Note that we still update the page even if page LSN is equal to the LSN
00064      * of this record, because the updated NSN is not included in the full
00065      * page image.
00066      */
00067     if (lsn >= PageGetLSN(page))
00068     {
00069         GistPageSetNSN(page, lsn);
00070         GistClearFollowRight(page);
00071 
00072         PageSetLSN(page, lsn);
00073         MarkBufferDirty(buffer);
00074     }
00075     UnlockReleaseBuffer(buffer);
00076 }
00077 
00078 /*
00079  * redo any page update (except page split)
00080  */
00081 static void
00082 gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
00083 {
00084     char       *begin = XLogRecGetData(record);
00085     gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
00086     Buffer      buffer;
00087     Page        page;
00088     char       *data;
00089 
00090     /*
00091      * We need to acquire and hold lock on target page while updating the left
00092      * child page.  If we have a full-page image of target page, getting the
00093      * lock is a side-effect of restoring that image.  Note that even if the
00094      * target page no longer exists, we'll still attempt to replay the change
00095      * on the child page.
00096      */
00097     if (record->xl_info & XLR_BKP_BLOCK(0))
00098         buffer = RestoreBackupBlock(lsn, record, 0, false, true);
00099     else
00100         buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
00101 
00102     /* Fix follow-right data on left child page */
00103     if (BlockNumberIsValid(xldata->leftchild))
00104         gistRedoClearFollowRight(lsn, record, 1,
00105                                  xldata->node, xldata->leftchild);
00106 
00107     /* Done if target page no longer exists */
00108     if (!BufferIsValid(buffer))
00109         return;
00110 
00111     /* nothing more to do if page was backed up (and no info to do it with) */
00112     if (record->xl_info & XLR_BKP_BLOCK(0))
00113     {
00114         UnlockReleaseBuffer(buffer);
00115         return;
00116     }
00117 
00118     page = (Page) BufferGetPage(buffer);
00119 
00120     /* nothing more to do if change already applied */
00121     if (lsn <= PageGetLSN(page))
00122     {
00123         UnlockReleaseBuffer(buffer);
00124         return;
00125     }
00126 
00127     data = begin + sizeof(gistxlogPageUpdate);
00128 
00129     /* Delete old tuples */
00130     if (xldata->ntodelete > 0)
00131     {
00132         int         i;
00133         OffsetNumber *todelete = (OffsetNumber *) data;
00134 
00135         data += sizeof(OffsetNumber) * xldata->ntodelete;
00136 
00137         for (i = 0; i < xldata->ntodelete; i++)
00138             PageIndexTupleDelete(page, todelete[i]);
00139         if (GistPageIsLeaf(page))
00140             GistMarkTuplesDeleted(page);
00141     }
00142 
00143     /* add tuples */
00144     if (data - begin < record->xl_len)
00145     {
00146         OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
00147         OffsetNumberNext(PageGetMaxOffsetNumber(page));
00148 
00149         while (data - begin < record->xl_len)
00150         {
00151             IndexTuple  itup = (IndexTuple) data;
00152             Size        sz = IndexTupleSize(itup);
00153             OffsetNumber l;
00154 
00155             data += sz;
00156 
00157             l = PageAddItem(page, (Item) itup, sz, off, false, false);
00158             if (l == InvalidOffsetNumber)
00159                 elog(ERROR, "failed to add item to GiST index page, size %d bytes",
00160                      (int) sz);
00161             off++;
00162         }
00163     }
00164     else
00165     {
00166         /*
00167          * special case: leafpage, nothing to insert, nothing to delete, then
00168          * vacuum marks page
00169          */
00170         if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
00171             GistClearTuplesDeleted(page);
00172     }
00173 
00174     if (!GistPageIsLeaf(page) &&
00175         PageGetMaxOffsetNumber(page) == InvalidOffsetNumber &&
00176         xldata->blkno == GIST_ROOT_BLKNO)
00177     {
00178         /*
00179          * all links on non-leaf root page was deleted by vacuum full, so root
00180          * page becomes a leaf
00181          */
00182         GistPageSetLeaf(page);
00183     }
00184 
00185     GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
00186     PageSetLSN(page, lsn);
00187     MarkBufferDirty(buffer);
00188     UnlockReleaseBuffer(buffer);
00189 }
00190 
00191 static void
00192 decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
00193 {
00194     char       *begin = XLogRecGetData(record),
00195                *ptr;
00196     int         j,
00197                 i = 0;
00198 
00199     decoded->data = (gistxlogPageSplit *) begin;
00200     decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
00201 
00202     ptr = begin + sizeof(gistxlogPageSplit);
00203     for (i = 0; i < decoded->data->npage; i++)
00204     {
00205         Assert(ptr - begin < record->xl_len);
00206         decoded->page[i].header = (gistxlogPage *) ptr;
00207         ptr += sizeof(gistxlogPage);
00208 
00209         decoded->page[i].itup = (IndexTuple *)
00210             palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
00211         j = 0;
00212         while (j < decoded->page[i].header->num)
00213         {
00214             Assert(ptr - begin < record->xl_len);
00215             decoded->page[i].itup[j] = (IndexTuple) ptr;
00216             ptr += IndexTupleSize((IndexTuple) ptr);
00217             j++;
00218         }
00219     }
00220 }
00221 
00222 static void
00223 gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
00224 {
00225     gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
00226     PageSplitRecord xlrec;
00227     Buffer      firstbuffer = InvalidBuffer;
00228     Buffer      buffer;
00229     Page        page;
00230     int         i;
00231     bool        isrootsplit = false;
00232 
00233     decodePageSplitRecord(&xlrec, record);
00234 
00235     /*
00236      * We must hold lock on the first-listed page throughout the action,
00237      * including while updating the left child page (if any).  We can unlock
00238      * remaining pages in the list as soon as they've been written, because
00239      * there is no path for concurrent queries to reach those pages without
00240      * first visiting the first-listed page.
00241      */
00242 
00243     /* loop around all pages */
00244     for (i = 0; i < xlrec.data->npage; i++)
00245     {
00246         NewPage    *newpage = xlrec.page + i;
00247         int         flags;
00248 
00249         if (newpage->header->blkno == GIST_ROOT_BLKNO)
00250         {
00251             Assert(i == 0);
00252             isrootsplit = true;
00253         }
00254 
00255         buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
00256         Assert(BufferIsValid(buffer));
00257         page = (Page) BufferGetPage(buffer);
00258 
00259         /* ok, clear buffer */
00260         if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
00261             flags = F_LEAF;
00262         else
00263             flags = 0;
00264         GISTInitBuffer(buffer, flags);
00265 
00266         /* and fill it */
00267         gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
00268 
00269         if (newpage->header->blkno == GIST_ROOT_BLKNO)
00270         {
00271             GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
00272             GistPageSetNSN(page, xldata->orignsn);
00273             GistClearFollowRight(page);
00274         }
00275         else
00276         {
00277             if (i < xlrec.data->npage - 1)
00278                 GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
00279             else
00280                 GistPageGetOpaque(page)->rightlink = xldata->origrlink;
00281             GistPageSetNSN(page, xldata->orignsn);
00282             if (i < xlrec.data->npage - 1 && !isrootsplit &&
00283                 xldata->markfollowright)
00284                 GistMarkFollowRight(page);
00285             else
00286                 GistClearFollowRight(page);
00287         }
00288 
00289         PageSetLSN(page, lsn);
00290         MarkBufferDirty(buffer);
00291 
00292         if (i == 0)
00293             firstbuffer = buffer;
00294         else
00295             UnlockReleaseBuffer(buffer);
00296     }
00297 
00298     /* Fix follow-right data on left child page, if any */
00299     if (BlockNumberIsValid(xldata->leftchild))
00300         gistRedoClearFollowRight(lsn, record, 0,
00301                                  xldata->node, xldata->leftchild);
00302 
00303     /* Finally, release lock on the first page */
00304     UnlockReleaseBuffer(firstbuffer);
00305 }
00306 
00307 static void
00308 gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
00309 {
00310     RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
00311     Buffer      buffer;
00312     Page        page;
00313 
00314     /* Backup blocks are not used in create_index records */
00315     Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00316 
00317     buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
00318     Assert(BufferIsValid(buffer));
00319     page = (Page) BufferGetPage(buffer);
00320 
00321     GISTInitBuffer(buffer, F_LEAF);
00322 
00323     PageSetLSN(page, lsn);
00324 
00325     MarkBufferDirty(buffer);
00326     UnlockReleaseBuffer(buffer);
00327 }
00328 
00329 void
00330 gist_redo(XLogRecPtr lsn, XLogRecord *record)
00331 {
00332     uint8       info = record->xl_info & ~XLR_INFO_MASK;
00333     MemoryContext oldCxt;
00334 
00335     /*
00336      * GiST indexes do not require any conflict processing. NB: If we ever
00337      * implement a similar optimization we have in b-tree, and remove killed
00338      * tuples outside VACUUM, we'll need to handle that here.
00339      */
00340 
00341     oldCxt = MemoryContextSwitchTo(opCtx);
00342     switch (info)
00343     {
00344         case XLOG_GIST_PAGE_UPDATE:
00345             gistRedoPageUpdateRecord(lsn, record);
00346             break;
00347         case XLOG_GIST_PAGE_SPLIT:
00348             gistRedoPageSplitRecord(lsn, record);
00349             break;
00350         case XLOG_GIST_CREATE_INDEX:
00351             gistRedoCreateIndex(lsn, record);
00352             break;
00353         default:
00354             elog(PANIC, "gist_redo: unknown op code %u", info);
00355     }
00356 
00357     MemoryContextSwitchTo(oldCxt);
00358     MemoryContextReset(opCtx);
00359 }
00360 
00361 void
00362 gist_xlog_startup(void)
00363 {
00364     opCtx = createTempGistContext();
00365 }
00366 
00367 void
00368 gist_xlog_cleanup(void)
00369 {
00370     MemoryContextDelete(opCtx);
00371 }
00372 
00373 /*
00374  * Write WAL record of a page split.
00375  */
00376 XLogRecPtr
00377 gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
00378               SplitedPageLayout *dist,
00379               BlockNumber origrlink, GistNSN orignsn,
00380               Buffer leftchildbuf, bool markfollowright)
00381 {
00382     XLogRecData *rdata;
00383     gistxlogPageSplit xlrec;
00384     SplitedPageLayout *ptr;
00385     int         npage = 0,
00386                 cur;
00387     XLogRecPtr  recptr;
00388 
00389     for (ptr = dist; ptr; ptr = ptr->next)
00390         npage++;
00391 
00392     rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
00393 
00394     xlrec.node = node;
00395     xlrec.origblkno = blkno;
00396     xlrec.origrlink = origrlink;
00397     xlrec.orignsn = orignsn;
00398     xlrec.origleaf = page_is_leaf;
00399     xlrec.npage = (uint16) npage;
00400     xlrec.leftchild =
00401         BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
00402     xlrec.markfollowright = markfollowright;
00403 
00404     rdata[0].data = (char *) &xlrec;
00405     rdata[0].len = sizeof(gistxlogPageSplit);
00406     rdata[0].buffer = InvalidBuffer;
00407 
00408     cur = 1;
00409 
00410     /*
00411      * Include a full page image of the child buf. (only necessary if a
00412      * checkpoint happened since the child page was split)
00413      */
00414     if (BufferIsValid(leftchildbuf))
00415     {
00416         rdata[cur - 1].next = &(rdata[cur]);
00417         rdata[cur].data = NULL;
00418         rdata[cur].len = 0;
00419         rdata[cur].buffer = leftchildbuf;
00420         rdata[cur].buffer_std = true;
00421         cur++;
00422     }
00423 
00424     for (ptr = dist; ptr; ptr = ptr->next)
00425     {
00426         rdata[cur - 1].next = &(rdata[cur]);
00427         rdata[cur].buffer = InvalidBuffer;
00428         rdata[cur].data = (char *) &(ptr->block);
00429         rdata[cur].len = sizeof(gistxlogPage);
00430         cur++;
00431 
00432         rdata[cur - 1].next = &(rdata[cur]);
00433         rdata[cur].buffer = InvalidBuffer;
00434         rdata[cur].data = (char *) (ptr->list);
00435         rdata[cur].len = ptr->lenlist;
00436         cur++;
00437     }
00438     rdata[cur - 1].next = NULL;
00439 
00440     recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
00441 
00442     pfree(rdata);
00443     return recptr;
00444 }
00445 
00446 /*
00447  * Write XLOG record describing a page update. The update can include any
00448  * number of deletions and/or insertions of tuples on a single index page.
00449  *
00450  * If this update inserts a downlink for a split page, also record that
00451  * the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
00452  *
00453  * Note that both the todelete array and the tuples are marked as belonging
00454  * to the target buffer; they need not be stored in XLOG if XLogInsert decides
00455  * to log the whole buffer contents instead.  Also, we take care that there's
00456  * at least one rdata item referencing the buffer, even when ntodelete and
00457  * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
00458  */
00459 XLogRecPtr
00460 gistXLogUpdate(RelFileNode node, Buffer buffer,
00461                OffsetNumber *todelete, int ntodelete,
00462                IndexTuple *itup, int ituplen,
00463                Buffer leftchildbuf)
00464 {
00465     XLogRecData *rdata;
00466     gistxlogPageUpdate xlrec;
00467     int         cur,
00468                 i;
00469     XLogRecPtr  recptr;
00470 
00471     rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
00472 
00473     xlrec.node = node;
00474     xlrec.blkno = BufferGetBlockNumber(buffer);
00475     xlrec.ntodelete = ntodelete;
00476     xlrec.leftchild =
00477         BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
00478 
00479     rdata[0].data = (char *) &xlrec;
00480     rdata[0].len = sizeof(gistxlogPageUpdate);
00481     rdata[0].buffer = InvalidBuffer;
00482     rdata[0].next = &(rdata[1]);
00483 
00484     rdata[1].data = (char *) todelete;
00485     rdata[1].len = sizeof(OffsetNumber) * ntodelete;
00486     rdata[1].buffer = buffer;
00487     rdata[1].buffer_std = true;
00488 
00489     cur = 2;
00490 
00491     /* new tuples */
00492     for (i = 0; i < ituplen; i++)
00493     {
00494         rdata[cur - 1].next = &(rdata[cur]);
00495         rdata[cur].data = (char *) (itup[i]);
00496         rdata[cur].len = IndexTupleSize(itup[i]);
00497         rdata[cur].buffer = buffer;
00498         rdata[cur].buffer_std = true;
00499         cur++;
00500     }
00501 
00502     /*
00503      * Include a full page image of the child buf. (only necessary if a
00504      * checkpoint happened since the child page was split)
00505      */
00506     if (BufferIsValid(leftchildbuf))
00507     {
00508         rdata[cur - 1].next = &(rdata[cur]);
00509         rdata[cur].data = NULL;
00510         rdata[cur].len = 0;
00511         rdata[cur].buffer = leftchildbuf;
00512         rdata[cur].buffer_std = true;
00513         cur++;
00514     }
00515     rdata[cur - 1].next = NULL;
00516 
00517     recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
00518 
00519     pfree(rdata);
00520     return recptr;
00521 }