00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include "postgres.h"
00015
00016 #include "access/gin_private.h"
00017 #include "access/xlogutils.h"
00018 #include "utils/memutils.h"
00019
00020 static MemoryContext opCtx;
00021 static MemoryContext topCtx;
00022
00023 typedef struct ginIncompleteSplit
00024 {
00025 RelFileNode node;
00026 BlockNumber leftBlkno;
00027 BlockNumber rightBlkno;
00028 BlockNumber rootBlkno;
00029 } ginIncompleteSplit;
00030
00031 static List *incomplete_splits;
00032
00033 static void
00034 pushIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber rightBlkno, BlockNumber rootBlkno)
00035 {
00036 ginIncompleteSplit *split;
00037
00038 MemoryContextSwitchTo(topCtx);
00039
00040 split = palloc(sizeof(ginIncompleteSplit));
00041
00042 split->node = node;
00043 split->leftBlkno = leftBlkno;
00044 split->rightBlkno = rightBlkno;
00045 split->rootBlkno = rootBlkno;
00046
00047 incomplete_splits = lappend(incomplete_splits, split);
00048
00049 MemoryContextSwitchTo(opCtx);
00050 }
00051
00052 static void
00053 forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno)
00054 {
00055 ListCell *l;
00056
00057 foreach(l, incomplete_splits)
00058 {
00059 ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
00060
00061 if (RelFileNodeEquals(node, split->node) &&
00062 leftBlkno == split->leftBlkno &&
00063 updateBlkno == split->rightBlkno)
00064 {
00065 incomplete_splits = list_delete_ptr(incomplete_splits, split);
00066 pfree(split);
00067 break;
00068 }
00069 }
00070 }
00071
00072 static void
00073 ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
00074 {
00075 RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
00076 Buffer RootBuffer,
00077 MetaBuffer;
00078 Page page;
00079
00080
00081 Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00082
00083 MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
00084 Assert(BufferIsValid(MetaBuffer));
00085 page = (Page) BufferGetPage(MetaBuffer);
00086
00087 GinInitMetabuffer(MetaBuffer);
00088
00089 PageSetLSN(page, lsn);
00090 MarkBufferDirty(MetaBuffer);
00091
00092 RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
00093 Assert(BufferIsValid(RootBuffer));
00094 page = (Page) BufferGetPage(RootBuffer);
00095
00096 GinInitBuffer(RootBuffer, GIN_LEAF);
00097
00098 PageSetLSN(page, lsn);
00099 MarkBufferDirty(RootBuffer);
00100
00101 UnlockReleaseBuffer(RootBuffer);
00102 UnlockReleaseBuffer(MetaBuffer);
00103 }
00104
00105 static void
00106 ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
00107 {
00108 ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
00109 ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree));
00110 Buffer buffer;
00111 Page page;
00112
00113
00114 Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00115
00116 buffer = XLogReadBuffer(data->node, data->blkno, true);
00117 Assert(BufferIsValid(buffer));
00118 page = (Page) BufferGetPage(buffer);
00119
00120 GinInitBuffer(buffer, GIN_DATA | GIN_LEAF);
00121 memcpy(GinDataPageGetData(page), items, sizeof(ItemPointerData) * data->nitem);
00122 GinPageGetOpaque(page)->maxoff = data->nitem;
00123
00124 PageSetLSN(page, lsn);
00125
00126 MarkBufferDirty(buffer);
00127 UnlockReleaseBuffer(buffer);
00128 }
00129
00130 static void
00131 ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
00132 {
00133 ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
00134 Buffer buffer;
00135 Page page;
00136
00137
00138 if (data->isData)
00139 {
00140 Assert(data->isDelete == FALSE);
00141 if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
00142 {
00143 PostingItem *pitem;
00144
00145 pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
00146 forgetIncompleteSplit(data->node,
00147 PostingItemGetBlockNumber(pitem),
00148 data->updateBlkno);
00149 }
00150
00151 }
00152 else
00153 {
00154 if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
00155 {
00156 IndexTuple itup;
00157
00158 itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
00159 forgetIncompleteSplit(data->node,
00160 GinGetDownlink(itup),
00161 data->updateBlkno);
00162 }
00163 }
00164
00165
00166 if (record->xl_info & XLR_BKP_BLOCK(0))
00167 {
00168 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00169 return;
00170 }
00171
00172 buffer = XLogReadBuffer(data->node, data->blkno, false);
00173 if (!BufferIsValid(buffer))
00174 return;
00175 page = (Page) BufferGetPage(buffer);
00176
00177 if (lsn > PageGetLSN(page))
00178 {
00179 if (data->isData)
00180 {
00181 Assert(GinPageIsData(page));
00182
00183 if (data->isLeaf)
00184 {
00185 OffsetNumber i;
00186 ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
00187
00188 Assert(GinPageIsLeaf(page));
00189 Assert(data->updateBlkno == InvalidBlockNumber);
00190
00191 for (i = 0; i < data->nitem; i++)
00192 GinDataPageAddItem(page, items + i, data->offset + i);
00193 }
00194 else
00195 {
00196 PostingItem *pitem;
00197
00198 Assert(!GinPageIsLeaf(page));
00199
00200 if (data->updateBlkno != InvalidBlockNumber)
00201 {
00202
00203 pitem = (PostingItem *) GinDataPageGetItem(page, data->offset);
00204 PostingItemSetBlockNumber(pitem, data->updateBlkno);
00205 }
00206
00207 pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
00208
00209 GinDataPageAddItem(page, pitem, data->offset);
00210 }
00211 }
00212 else
00213 {
00214 IndexTuple itup;
00215
00216 Assert(!GinPageIsData(page));
00217
00218 if (data->updateBlkno != InvalidBlockNumber)
00219 {
00220
00221 Assert(!GinPageIsLeaf(page));
00222 Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
00223 itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
00224 GinSetDownlink(itup, data->updateBlkno);
00225 }
00226
00227 if (data->isDelete)
00228 {
00229 Assert(GinPageIsLeaf(page));
00230 Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
00231 PageIndexTupleDelete(page, data->offset);
00232 }
00233
00234 itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
00235
00236 if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
00237 elog(ERROR, "failed to add item to index page in %u/%u/%u",
00238 data->node.spcNode, data->node.dbNode, data->node.relNode);
00239 }
00240
00241 PageSetLSN(page, lsn);
00242
00243 MarkBufferDirty(buffer);
00244 }
00245
00246 UnlockReleaseBuffer(buffer);
00247 }
00248
00249 static void
00250 ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
00251 {
00252 ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
00253 Buffer lbuffer,
00254 rbuffer;
00255 Page lpage,
00256 rpage;
00257 uint32 flags = 0;
00258
00259 if (data->isLeaf)
00260 flags |= GIN_LEAF;
00261 if (data->isData)
00262 flags |= GIN_DATA;
00263
00264
00265 Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00266
00267 lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
00268 Assert(BufferIsValid(lbuffer));
00269 lpage = (Page) BufferGetPage(lbuffer);
00270 GinInitBuffer(lbuffer, flags);
00271
00272 rbuffer = XLogReadBuffer(data->node, data->rblkno, true);
00273 Assert(BufferIsValid(rbuffer));
00274 rpage = (Page) BufferGetPage(rbuffer);
00275 GinInitBuffer(rbuffer, flags);
00276
00277 GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber(rbuffer);
00278 GinPageGetOpaque(rpage)->rightlink = data->rrlink;
00279
00280 if (data->isData)
00281 {
00282 char *ptr = XLogRecGetData(record) + sizeof(ginxlogSplit);
00283 Size sizeofitem = GinSizeOfDataPageItem(lpage);
00284 OffsetNumber i;
00285 ItemPointer bound;
00286
00287 for (i = 0; i < data->separator; i++)
00288 {
00289 GinDataPageAddItem(lpage, ptr, InvalidOffsetNumber);
00290 ptr += sizeofitem;
00291 }
00292
00293 for (i = data->separator; i < data->nitem; i++)
00294 {
00295 GinDataPageAddItem(rpage, ptr, InvalidOffsetNumber);
00296 ptr += sizeofitem;
00297 }
00298
00299
00300 bound = GinDataPageGetRightBound(lpage);
00301 if (data->isLeaf)
00302 *bound = *(ItemPointerData *) GinDataPageGetItem(lpage, GinPageGetOpaque(lpage)->maxoff);
00303 else
00304 *bound = ((PostingItem *) GinDataPageGetItem(lpage, GinPageGetOpaque(lpage)->maxoff))->key;
00305
00306 bound = GinDataPageGetRightBound(rpage);
00307 *bound = data->rightbound;
00308 }
00309 else
00310 {
00311 IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogSplit));
00312 OffsetNumber i;
00313
00314 for (i = 0; i < data->separator; i++)
00315 {
00316 if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
00317 elog(ERROR, "failed to add item to index page in %u/%u/%u",
00318 data->node.spcNode, data->node.dbNode, data->node.relNode);
00319 itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
00320 }
00321
00322 for (i = data->separator; i < data->nitem; i++)
00323 {
00324 if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
00325 elog(ERROR, "failed to add item to index page in %u/%u/%u",
00326 data->node.spcNode, data->node.dbNode, data->node.relNode);
00327 itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
00328 }
00329 }
00330
00331 PageSetLSN(rpage, lsn);
00332 MarkBufferDirty(rbuffer);
00333
00334 PageSetLSN(lpage, lsn);
00335 MarkBufferDirty(lbuffer);
00336
00337 if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
00338 forgetIncompleteSplit(data->node, data->leftChildBlkno, data->updateBlkno);
00339
00340 if (data->isRootSplit)
00341 {
00342 Buffer rootBuf = XLogReadBuffer(data->node, data->rootBlkno, true);
00343 Page rootPage = BufferGetPage(rootBuf);
00344
00345 GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
00346
00347 if (data->isData)
00348 {
00349 Assert(data->rootBlkno != GIN_ROOT_BLKNO);
00350 ginDataFillRoot(NULL, rootBuf, lbuffer, rbuffer);
00351 }
00352 else
00353 {
00354 Assert(data->rootBlkno == GIN_ROOT_BLKNO);
00355 ginEntryFillRoot(NULL, rootBuf, lbuffer, rbuffer);
00356 }
00357
00358 PageSetLSN(rootPage, lsn);
00359
00360 MarkBufferDirty(rootBuf);
00361 UnlockReleaseBuffer(rootBuf);
00362 }
00363 else
00364 pushIncompleteSplit(data->node, data->lblkno, data->rblkno, data->rootBlkno);
00365
00366 UnlockReleaseBuffer(rbuffer);
00367 UnlockReleaseBuffer(lbuffer);
00368 }
00369
00370 static void
00371 ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
00372 {
00373 ginxlogVacuumPage *data = (ginxlogVacuumPage *) XLogRecGetData(record);
00374 Buffer buffer;
00375 Page page;
00376
00377
00378 if (record->xl_info & XLR_BKP_BLOCK(0))
00379 {
00380 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00381 return;
00382 }
00383
00384 buffer = XLogReadBuffer(data->node, data->blkno, false);
00385 if (!BufferIsValid(buffer))
00386 return;
00387 page = (Page) BufferGetPage(buffer);
00388
00389 if (lsn > PageGetLSN(page))
00390 {
00391 if (GinPageIsData(page))
00392 {
00393 memcpy(GinDataPageGetData(page),
00394 XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
00395 data->nitem * GinSizeOfDataPageItem(page));
00396 GinPageGetOpaque(page)->maxoff = data->nitem;
00397 }
00398 else
00399 {
00400 OffsetNumber i,
00401 *tod;
00402 IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogVacuumPage));
00403
00404 tod = (OffsetNumber *) palloc(sizeof(OffsetNumber) * PageGetMaxOffsetNumber(page));
00405 for (i = FirstOffsetNumber; i <= PageGetMaxOffsetNumber(page); i++)
00406 tod[i - 1] = i;
00407
00408 PageIndexMultiDelete(page, tod, PageGetMaxOffsetNumber(page));
00409
00410 for (i = 0; i < data->nitem; i++)
00411 {
00412 if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
00413 elog(ERROR, "failed to add item to index page in %u/%u/%u",
00414 data->node.spcNode, data->node.dbNode, data->node.relNode);
00415 itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
00416 }
00417 }
00418
00419 PageSetLSN(page, lsn);
00420 MarkBufferDirty(buffer);
00421 }
00422
00423 UnlockReleaseBuffer(buffer);
00424 }
00425
00426 static void
00427 ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
00428 {
00429 ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
00430 Buffer dbuffer;
00431 Buffer pbuffer;
00432 Buffer lbuffer;
00433 Page page;
00434
00435 if (record->xl_info & XLR_BKP_BLOCK(0))
00436 dbuffer = RestoreBackupBlock(lsn, record, 0, false, true);
00437 else
00438 {
00439 dbuffer = XLogReadBuffer(data->node, data->blkno, false);
00440 if (BufferIsValid(dbuffer))
00441 {
00442 page = BufferGetPage(dbuffer);
00443 if (lsn > PageGetLSN(page))
00444 {
00445 Assert(GinPageIsData(page));
00446 GinPageGetOpaque(page)->flags = GIN_DELETED;
00447 PageSetLSN(page, lsn);
00448 MarkBufferDirty(dbuffer);
00449 }
00450 }
00451 }
00452
00453 if (record->xl_info & XLR_BKP_BLOCK(1))
00454 pbuffer = RestoreBackupBlock(lsn, record, 1, false, true);
00455 else
00456 {
00457 pbuffer = XLogReadBuffer(data->node, data->parentBlkno, false);
00458 if (BufferIsValid(pbuffer))
00459 {
00460 page = BufferGetPage(pbuffer);
00461 if (lsn > PageGetLSN(page))
00462 {
00463 Assert(GinPageIsData(page));
00464 Assert(!GinPageIsLeaf(page));
00465 GinPageDeletePostingItem(page, data->parentOffset);
00466 PageSetLSN(page, lsn);
00467 MarkBufferDirty(pbuffer);
00468 }
00469 }
00470 }
00471
00472 if (record->xl_info & XLR_BKP_BLOCK(2))
00473 (void) RestoreBackupBlock(lsn, record, 2, false, false);
00474 else if (data->leftBlkno != InvalidBlockNumber)
00475 {
00476 lbuffer = XLogReadBuffer(data->node, data->leftBlkno, false);
00477 if (BufferIsValid(lbuffer))
00478 {
00479 page = BufferGetPage(lbuffer);
00480 if (lsn > PageGetLSN(page))
00481 {
00482 Assert(GinPageIsData(page));
00483 GinPageGetOpaque(page)->rightlink = data->rightLink;
00484 PageSetLSN(page, lsn);
00485 MarkBufferDirty(lbuffer);
00486 }
00487 UnlockReleaseBuffer(lbuffer);
00488 }
00489 }
00490
00491 if (BufferIsValid(pbuffer))
00492 UnlockReleaseBuffer(pbuffer);
00493 if (BufferIsValid(dbuffer))
00494 UnlockReleaseBuffer(dbuffer);
00495 }
00496
00497 static void
00498 ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
00499 {
00500 ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);
00501 Buffer metabuffer;
00502 Page metapage;
00503 Buffer buffer;
00504
00505 metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
00506 if (!BufferIsValid(metabuffer))
00507 return;
00508 metapage = BufferGetPage(metabuffer);
00509
00510 if (lsn > PageGetLSN(metapage))
00511 {
00512 memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
00513 PageSetLSN(metapage, lsn);
00514 MarkBufferDirty(metabuffer);
00515 }
00516
00517 if (data->ntuples > 0)
00518 {
00519
00520
00521
00522 if (record->xl_info & XLR_BKP_BLOCK(0))
00523 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00524 else
00525 {
00526 buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
00527 if (BufferIsValid(buffer))
00528 {
00529 Page page = BufferGetPage(buffer);
00530
00531 if (lsn > PageGetLSN(page))
00532 {
00533 OffsetNumber l,
00534 off = (PageIsEmpty(page)) ? FirstOffsetNumber :
00535 OffsetNumberNext(PageGetMaxOffsetNumber(page));
00536 int i,
00537 tupsize;
00538 IndexTuple tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogUpdateMeta));
00539
00540 for (i = 0; i < data->ntuples; i++)
00541 {
00542 tupsize = IndexTupleSize(tuples);
00543
00544 l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
00545
00546 if (l == InvalidOffsetNumber)
00547 elog(ERROR, "failed to add item to index page");
00548
00549 tuples = (IndexTuple) (((char *) tuples) + tupsize);
00550
00551 off++;
00552 }
00553
00554
00555
00556
00557 GinPageGetOpaque(page)->maxoff++;
00558
00559 PageSetLSN(page, lsn);
00560 MarkBufferDirty(buffer);
00561 }
00562 UnlockReleaseBuffer(buffer);
00563 }
00564 }
00565 }
00566 else if (data->prevTail != InvalidBlockNumber)
00567 {
00568
00569
00570
00571 if (record->xl_info & XLR_BKP_BLOCK(0))
00572 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00573 else
00574 {
00575 buffer = XLogReadBuffer(data->node, data->prevTail, false);
00576 if (BufferIsValid(buffer))
00577 {
00578 Page page = BufferGetPage(buffer);
00579
00580 if (lsn > PageGetLSN(page))
00581 {
00582 GinPageGetOpaque(page)->rightlink = data->newRightlink;
00583
00584 PageSetLSN(page, lsn);
00585 MarkBufferDirty(buffer);
00586 }
00587 UnlockReleaseBuffer(buffer);
00588 }
00589 }
00590 }
00591
00592 UnlockReleaseBuffer(metabuffer);
00593 }
00594
00595 static void
00596 ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record)
00597 {
00598 ginxlogInsertListPage *data = (ginxlogInsertListPage *) XLogRecGetData(record);
00599 Buffer buffer;
00600 Page page;
00601 OffsetNumber l,
00602 off = FirstOffsetNumber;
00603 int i,
00604 tupsize;
00605 IndexTuple tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage));
00606
00607
00608 if (record->xl_info & XLR_BKP_BLOCK(0))
00609 {
00610 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00611 return;
00612 }
00613
00614 buffer = XLogReadBuffer(data->node, data->blkno, true);
00615 Assert(BufferIsValid(buffer));
00616 page = BufferGetPage(buffer);
00617
00618 GinInitBuffer(buffer, GIN_LIST);
00619 GinPageGetOpaque(page)->rightlink = data->rightlink;
00620 if (data->rightlink == InvalidBlockNumber)
00621 {
00622
00623 GinPageSetFullRow(page);
00624 GinPageGetOpaque(page)->maxoff = 1;
00625 }
00626 else
00627 {
00628 GinPageGetOpaque(page)->maxoff = 0;
00629 }
00630
00631 for (i = 0; i < data->ntuples; i++)
00632 {
00633 tupsize = IndexTupleSize(tuples);
00634
00635 l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
00636
00637 if (l == InvalidOffsetNumber)
00638 elog(ERROR, "failed to add item to index page");
00639
00640 tuples = (IndexTuple) (((char *) tuples) + tupsize);
00641 }
00642
00643 PageSetLSN(page, lsn);
00644 MarkBufferDirty(buffer);
00645
00646 UnlockReleaseBuffer(buffer);
00647 }
00648
00649 static void
00650 ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
00651 {
00652 ginxlogDeleteListPages *data = (ginxlogDeleteListPages *) XLogRecGetData(record);
00653 Buffer metabuffer;
00654 Page metapage;
00655 int i;
00656
00657
00658 Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00659
00660 metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
00661 if (!BufferIsValid(metabuffer))
00662 return;
00663 metapage = BufferGetPage(metabuffer);
00664
00665 if (lsn > PageGetLSN(metapage))
00666 {
00667 memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
00668 PageSetLSN(metapage, lsn);
00669 MarkBufferDirty(metabuffer);
00670 }
00671
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682 for (i = 0; i < data->ndeleted; i++)
00683 {
00684 Buffer buffer = XLogReadBuffer(data->node, data->toDelete[i], false);
00685
00686 if (BufferIsValid(buffer))
00687 {
00688 Page page = BufferGetPage(buffer);
00689
00690 if (lsn > PageGetLSN(page))
00691 {
00692 GinPageGetOpaque(page)->flags = GIN_DELETED;
00693
00694 PageSetLSN(page, lsn);
00695 MarkBufferDirty(buffer);
00696 }
00697
00698 UnlockReleaseBuffer(buffer);
00699 }
00700 }
00701 UnlockReleaseBuffer(metabuffer);
00702 }
00703
00704 void
00705 gin_redo(XLogRecPtr lsn, XLogRecord *record)
00706 {
00707 uint8 info = record->xl_info & ~XLR_INFO_MASK;
00708
00709
00710
00711
00712
00713
00714
00715 topCtx = MemoryContextSwitchTo(opCtx);
00716 switch (info)
00717 {
00718 case XLOG_GIN_CREATE_INDEX:
00719 ginRedoCreateIndex(lsn, record);
00720 break;
00721 case XLOG_GIN_CREATE_PTREE:
00722 ginRedoCreatePTree(lsn, record);
00723 break;
00724 case XLOG_GIN_INSERT:
00725 ginRedoInsert(lsn, record);
00726 break;
00727 case XLOG_GIN_SPLIT:
00728 ginRedoSplit(lsn, record);
00729 break;
00730 case XLOG_GIN_VACUUM_PAGE:
00731 ginRedoVacuumPage(lsn, record);
00732 break;
00733 case XLOG_GIN_DELETE_PAGE:
00734 ginRedoDeletePage(lsn, record);
00735 break;
00736 case XLOG_GIN_UPDATE_META_PAGE:
00737 ginRedoUpdateMetapage(lsn, record);
00738 break;
00739 case XLOG_GIN_INSERT_LISTPAGE:
00740 ginRedoInsertListPage(lsn, record);
00741 break;
00742 case XLOG_GIN_DELETE_LISTPAGE:
00743 ginRedoDeleteListPages(lsn, record);
00744 break;
00745 default:
00746 elog(PANIC, "gin_redo: unknown op code %u", info);
00747 }
00748 MemoryContextSwitchTo(topCtx);
00749 MemoryContextReset(opCtx);
00750 }
00751
00752 void
00753 gin_xlog_startup(void)
00754 {
00755 incomplete_splits = NIL;
00756
00757 opCtx = AllocSetContextCreate(CurrentMemoryContext,
00758 "GIN recovery temporary context",
00759 ALLOCSET_DEFAULT_MINSIZE,
00760 ALLOCSET_DEFAULT_INITSIZE,
00761 ALLOCSET_DEFAULT_MAXSIZE);
00762 }
00763
00764 static void
00765 ginContinueSplit(ginIncompleteSplit *split)
00766 {
00767 GinBtreeData btree;
00768 GinState ginstate;
00769 Relation reln;
00770 Buffer buffer;
00771 GinBtreeStack stack;
00772
00773
00774
00775
00776
00777 buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
00778
00779
00780
00781
00782 if (!BufferIsValid(buffer))
00783 elog(PANIC, "ginContinueSplit: left block %u not found",
00784 split->leftBlkno);
00785
00786 reln = CreateFakeRelcacheEntry(split->node);
00787
00788 if (split->rootBlkno == GIN_ROOT_BLKNO)
00789 {
00790 MemSet(&ginstate, 0, sizeof(ginstate));
00791 ginstate.index = reln;
00792
00793 ginPrepareEntryScan(&btree,
00794 InvalidOffsetNumber, (Datum) 0, GIN_CAT_NULL_KEY,
00795 &ginstate);
00796 btree.entry = ginPageGetLinkItup(buffer);
00797 }
00798 else
00799 {
00800 Page page = BufferGetPage(buffer);
00801
00802 ginPrepareDataScan(&btree, reln);
00803
00804 PostingItemSetBlockNumber(&(btree.pitem), split->leftBlkno);
00805 if (GinPageIsLeaf(page))
00806 btree.pitem.key = *(ItemPointerData *) GinDataPageGetItem(page,
00807 GinPageGetOpaque(page)->maxoff);
00808 else
00809 btree.pitem.key = ((PostingItem *) GinDataPageGetItem(page,
00810 GinPageGetOpaque(page)->maxoff))->key;
00811 }
00812
00813 btree.rightblkno = split->rightBlkno;
00814
00815 stack.blkno = split->leftBlkno;
00816 stack.buffer = buffer;
00817 stack.off = InvalidOffsetNumber;
00818 stack.parent = NULL;
00819
00820 ginFindParents(&btree, &stack, split->rootBlkno);
00821 ginInsertValue(&btree, stack.parent, NULL);
00822
00823 FreeFakeRelcacheEntry(reln);
00824
00825 UnlockReleaseBuffer(buffer);
00826 }
00827
00828 void
00829 gin_xlog_cleanup(void)
00830 {
00831 ListCell *l;
00832 MemoryContext topCtx;
00833
00834 topCtx = MemoryContextSwitchTo(opCtx);
00835
00836 foreach(l, incomplete_splits)
00837 {
00838 ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
00839
00840 ginContinueSplit(split);
00841 MemoryContextReset(opCtx);
00842 }
00843
00844 MemoryContextSwitchTo(topCtx);
00845 MemoryContextDelete(opCtx);
00846 incomplete_splits = NIL;
00847 }
00848
00849 bool
00850 gin_safe_restartpoint(void)
00851 {
00852 if (incomplete_splits)
00853 return false;
00854 return true;
00855 }