00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "postgres.h"
00016
00017 #include "access/spgist_private.h"
00018 #include "access/transam.h"
00019 #include "access/xlogutils.h"
00020 #include "storage/standby.h"
00021 #include "utils/memutils.h"
00022
00023
00024 static MemoryContext opCtx;
00025
00026
00027
00028
00029
00030
00031
00032
00033 static void
00034 fillFakeState(SpGistState *state, spgxlogState stateSrc)
00035 {
00036 memset(state, 0, sizeof(*state));
00037
00038 state->myXid = stateSrc.myXid;
00039 state->isBuild = stateSrc.isBuild;
00040 state->deadTupleStorage = palloc0(SGDTSIZE);
00041 }
00042
00043
00044
00045
00046
00047
00048 static void
00049 addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
00050 {
00051 if (offset <= PageGetMaxOffsetNumber(page))
00052 {
00053 SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
00054 PageGetItemId(page, offset));
00055
00056 if (dt->tupstate != SPGIST_PLACEHOLDER)
00057 elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
00058
00059 Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
00060 SpGistPageGetOpaque(page)->nPlaceholder--;
00061
00062 PageIndexTupleDelete(page, offset);
00063 }
00064
00065 Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
00066
00067 if (PageAddItem(page, tuple, size, offset, false, false) != offset)
00068 elog(ERROR, "failed to add item of size %u to SPGiST index page",
00069 size);
00070 }
00071
00072 static void
00073 spgRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
00074 {
00075 RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
00076 Buffer buffer;
00077 Page page;
00078
00079
00080 Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00081
00082 buffer = XLogReadBuffer(*node, SPGIST_METAPAGE_BLKNO, true);
00083 Assert(BufferIsValid(buffer));
00084 page = (Page) BufferGetPage(buffer);
00085 SpGistInitMetapage(page);
00086 PageSetLSN(page, lsn);
00087 MarkBufferDirty(buffer);
00088 UnlockReleaseBuffer(buffer);
00089
00090 buffer = XLogReadBuffer(*node, SPGIST_ROOT_BLKNO, true);
00091 Assert(BufferIsValid(buffer));
00092 SpGistInitBuffer(buffer, SPGIST_LEAF);
00093 page = (Page) BufferGetPage(buffer);
00094 PageSetLSN(page, lsn);
00095 MarkBufferDirty(buffer);
00096 UnlockReleaseBuffer(buffer);
00097
00098 buffer = XLogReadBuffer(*node, SPGIST_NULL_BLKNO, true);
00099 Assert(BufferIsValid(buffer));
00100 SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS);
00101 page = (Page) BufferGetPage(buffer);
00102 PageSetLSN(page, lsn);
00103 MarkBufferDirty(buffer);
00104 UnlockReleaseBuffer(buffer);
00105 }
00106
00107 static void
00108 spgRedoAddLeaf(XLogRecPtr lsn, XLogRecord *record)
00109 {
00110 char *ptr = XLogRecGetData(record);
00111 spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
00112 SpGistLeafTuple leafTuple;
00113 Buffer buffer;
00114 Page page;
00115
00116
00117 ptr += sizeof(spgxlogAddLeaf);
00118 leafTuple = (SpGistLeafTuple) ptr;
00119
00120
00121
00122
00123
00124
00125 if (record->xl_info & XLR_BKP_BLOCK(0))
00126 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00127 else
00128 {
00129 buffer = XLogReadBuffer(xldata->node, xldata->blknoLeaf,
00130 xldata->newPage);
00131 if (BufferIsValid(buffer))
00132 {
00133 page = BufferGetPage(buffer);
00134
00135 if (xldata->newPage)
00136 SpGistInitBuffer(buffer,
00137 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
00138
00139 if (lsn > PageGetLSN(page))
00140 {
00141
00142 if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
00143 {
00144
00145 addOrReplaceTuple(page, (Item) leafTuple, leafTuple->size,
00146 xldata->offnumLeaf);
00147
00148
00149 if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
00150 {
00151 SpGistLeafTuple head;
00152
00153 head = (SpGistLeafTuple) PageGetItem(page,
00154 PageGetItemId(page, xldata->offnumHeadLeaf));
00155 Assert(head->nextOffset == leafTuple->nextOffset);
00156 head->nextOffset = xldata->offnumLeaf;
00157 }
00158 }
00159 else
00160 {
00161
00162 PageIndexTupleDelete(page, xldata->offnumLeaf);
00163 if (PageAddItem(page,
00164 (Item) leafTuple, leafTuple->size,
00165 xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
00166 elog(ERROR, "failed to add item of size %u to SPGiST index page",
00167 leafTuple->size);
00168 }
00169
00170 PageSetLSN(page, lsn);
00171 MarkBufferDirty(buffer);
00172 }
00173 UnlockReleaseBuffer(buffer);
00174 }
00175 }
00176
00177
00178 if (record->xl_info & XLR_BKP_BLOCK(1))
00179 (void) RestoreBackupBlock(lsn, record, 1, false, false);
00180 else if (xldata->blknoParent != InvalidBlockNumber)
00181 {
00182 buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
00183 if (BufferIsValid(buffer))
00184 {
00185 page = BufferGetPage(buffer);
00186 if (lsn > PageGetLSN(page))
00187 {
00188 SpGistInnerTuple tuple;
00189
00190 tuple = (SpGistInnerTuple) PageGetItem(page,
00191 PageGetItemId(page, xldata->offnumParent));
00192
00193 spgUpdateNodeLink(tuple, xldata->nodeI,
00194 xldata->blknoLeaf, xldata->offnumLeaf);
00195
00196 PageSetLSN(page, lsn);
00197 MarkBufferDirty(buffer);
00198 }
00199 UnlockReleaseBuffer(buffer);
00200 }
00201 }
00202 }
00203
00204 static void
00205 spgRedoMoveLeafs(XLogRecPtr lsn, XLogRecord *record)
00206 {
00207 char *ptr = XLogRecGetData(record);
00208 spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
00209 SpGistState state;
00210 OffsetNumber *toDelete;
00211 OffsetNumber *toInsert;
00212 int nInsert;
00213 Buffer buffer;
00214 Page page;
00215
00216 fillFakeState(&state, xldata->stateSrc);
00217
00218 nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
00219
00220 ptr += MAXALIGN(sizeof(spgxlogMoveLeafs));
00221 toDelete = (OffsetNumber *) ptr;
00222 ptr += MAXALIGN(sizeof(OffsetNumber) * xldata->nMoves);
00223 toInsert = (OffsetNumber *) ptr;
00224 ptr += MAXALIGN(sizeof(OffsetNumber) * nInsert);
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235 if (record->xl_info & XLR_BKP_BLOCK(1))
00236 (void) RestoreBackupBlock(lsn, record, 1, false, false);
00237 else
00238 {
00239 buffer = XLogReadBuffer(xldata->node, xldata->blknoDst,
00240 xldata->newPage);
00241 if (BufferIsValid(buffer))
00242 {
00243 page = BufferGetPage(buffer);
00244
00245 if (xldata->newPage)
00246 SpGistInitBuffer(buffer,
00247 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
00248
00249 if (lsn > PageGetLSN(page))
00250 {
00251 int i;
00252
00253 for (i = 0; i < nInsert; i++)
00254 {
00255 SpGistLeafTuple lt = (SpGistLeafTuple) ptr;
00256
00257 addOrReplaceTuple(page, (Item) lt, lt->size, toInsert[i]);
00258 ptr += lt->size;
00259 }
00260
00261 PageSetLSN(page, lsn);
00262 MarkBufferDirty(buffer);
00263 }
00264 UnlockReleaseBuffer(buffer);
00265 }
00266 }
00267
00268
00269 if (record->xl_info & XLR_BKP_BLOCK(0))
00270 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00271 else
00272 {
00273 buffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false);
00274 if (BufferIsValid(buffer))
00275 {
00276 page = BufferGetPage(buffer);
00277 if (lsn > PageGetLSN(page))
00278 {
00279 spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
00280 state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
00281 SPGIST_PLACEHOLDER,
00282 xldata->blknoDst,
00283 toInsert[nInsert - 1]);
00284
00285 PageSetLSN(page, lsn);
00286 MarkBufferDirty(buffer);
00287 }
00288 UnlockReleaseBuffer(buffer);
00289 }
00290 }
00291
00292
00293 if (record->xl_info & XLR_BKP_BLOCK(2))
00294 (void) RestoreBackupBlock(lsn, record, 2, false, false);
00295 else
00296 {
00297 buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
00298 if (BufferIsValid(buffer))
00299 {
00300 page = BufferGetPage(buffer);
00301 if (lsn > PageGetLSN(page))
00302 {
00303 SpGistInnerTuple tuple;
00304
00305 tuple = (SpGistInnerTuple) PageGetItem(page,
00306 PageGetItemId(page, xldata->offnumParent));
00307
00308 spgUpdateNodeLink(tuple, xldata->nodeI,
00309 xldata->blknoDst, toInsert[nInsert - 1]);
00310
00311 PageSetLSN(page, lsn);
00312 MarkBufferDirty(buffer);
00313 }
00314 UnlockReleaseBuffer(buffer);
00315 }
00316 }
00317 }
00318
00319 static void
00320 spgRedoAddNode(XLogRecPtr lsn, XLogRecord *record)
00321 {
00322 char *ptr = XLogRecGetData(record);
00323 spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
00324 SpGistInnerTuple innerTuple;
00325 SpGistState state;
00326 Buffer buffer;
00327 Page page;
00328 int bbi;
00329
00330
00331 ptr += sizeof(spgxlogAddNode);
00332 innerTuple = (SpGistInnerTuple) ptr;
00333
00334 fillFakeState(&state, xldata->stateSrc);
00335
00336 if (xldata->blknoNew == InvalidBlockNumber)
00337 {
00338
00339 Assert(xldata->blknoParent == InvalidBlockNumber);
00340 if (record->xl_info & XLR_BKP_BLOCK(0))
00341 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00342 else
00343 {
00344 buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
00345 if (BufferIsValid(buffer))
00346 {
00347 page = BufferGetPage(buffer);
00348 if (lsn > PageGetLSN(page))
00349 {
00350 PageIndexTupleDelete(page, xldata->offnum);
00351 if (PageAddItem(page, (Item) innerTuple, innerTuple->size,
00352 xldata->offnum,
00353 false, false) != xldata->offnum)
00354 elog(ERROR, "failed to add item of size %u to SPGiST index page",
00355 innerTuple->size);
00356
00357 PageSetLSN(page, lsn);
00358 MarkBufferDirty(buffer);
00359 }
00360 UnlockReleaseBuffer(buffer);
00361 }
00362 }
00363 }
00364 else
00365 {
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376 Assert(xldata->blkno != xldata->blknoNew);
00377
00378
00379 if (record->xl_info & XLR_BKP_BLOCK(1))
00380 (void) RestoreBackupBlock(lsn, record, 1, false, false);
00381 else
00382 {
00383 buffer = XLogReadBuffer(xldata->node, xldata->blknoNew,
00384 xldata->newPage);
00385 if (BufferIsValid(buffer))
00386 {
00387 page = BufferGetPage(buffer);
00388
00389
00390 if (xldata->newPage)
00391 SpGistInitBuffer(buffer, 0);
00392
00393 if (lsn > PageGetLSN(page))
00394 {
00395 addOrReplaceTuple(page, (Item) innerTuple,
00396 innerTuple->size, xldata->offnumNew);
00397
00398
00399
00400
00401
00402
00403
00404 if (xldata->blknoParent != xldata->blknoNew)
00405 {
00406 PageSetLSN(page, lsn);
00407 }
00408 MarkBufferDirty(buffer);
00409 }
00410 UnlockReleaseBuffer(buffer);
00411 }
00412 }
00413
00414
00415 if (record->xl_info & XLR_BKP_BLOCK(0))
00416 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00417 else
00418 {
00419 buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
00420 if (BufferIsValid(buffer))
00421 {
00422 page = BufferGetPage(buffer);
00423 if (lsn > PageGetLSN(page))
00424 {
00425 SpGistDeadTuple dt;
00426
00427 if (state.isBuild)
00428 dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
00429 InvalidBlockNumber,
00430 InvalidOffsetNumber);
00431 else
00432 dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
00433 xldata->blknoNew,
00434 xldata->offnumNew);
00435
00436 PageIndexTupleDelete(page, xldata->offnum);
00437 if (PageAddItem(page, (Item) dt, dt->size,
00438 xldata->offnum,
00439 false, false) != xldata->offnum)
00440 elog(ERROR, "failed to add item of size %u to SPGiST index page",
00441 dt->size);
00442
00443 if (state.isBuild)
00444 SpGistPageGetOpaque(page)->nPlaceholder++;
00445 else
00446 SpGistPageGetOpaque(page)->nRedirection++;
00447
00448
00449
00450
00451
00452
00453
00454 if (xldata->blknoParent != xldata->blkno)
00455 {
00456 PageSetLSN(page, lsn);
00457 }
00458 MarkBufferDirty(buffer);
00459 }
00460 UnlockReleaseBuffer(buffer);
00461 }
00462 }
00463
00464
00465
00466
00467
00468
00469 if (xldata->blknoParent == xldata->blkno)
00470 bbi = 0;
00471 else if (xldata->blknoParent == xldata->blknoNew)
00472 bbi = 1;
00473 else
00474 bbi = 2;
00475
00476 if (record->xl_info & XLR_BKP_BLOCK(bbi))
00477 {
00478 if (bbi == 2)
00479 (void) RestoreBackupBlock(lsn, record, bbi, false, false);
00480 }
00481 else
00482 {
00483 buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
00484 if (BufferIsValid(buffer))
00485 {
00486 page = BufferGetPage(buffer);
00487 if (lsn > PageGetLSN(page))
00488 {
00489 SpGistInnerTuple innerTuple;
00490
00491 innerTuple = (SpGistInnerTuple) PageGetItem(page,
00492 PageGetItemId(page, xldata->offnumParent));
00493
00494 spgUpdateNodeLink(innerTuple, xldata->nodeI,
00495 xldata->blknoNew, xldata->offnumNew);
00496
00497 PageSetLSN(page, lsn);
00498 MarkBufferDirty(buffer);
00499 }
00500 UnlockReleaseBuffer(buffer);
00501 }
00502 }
00503 }
00504 }
00505
00506 static void
00507 spgRedoSplitTuple(XLogRecPtr lsn, XLogRecord *record)
00508 {
00509 char *ptr = XLogRecGetData(record);
00510 spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
00511 SpGistInnerTuple prefixTuple;
00512 SpGistInnerTuple postfixTuple;
00513 Buffer buffer;
00514 Page page;
00515
00516
00517 ptr += sizeof(spgxlogSplitTuple);
00518 prefixTuple = (SpGistInnerTuple) ptr;
00519 ptr += prefixTuple->size;
00520 postfixTuple = (SpGistInnerTuple) ptr;
00521
00522
00523
00524
00525
00526
00527
00528
00529 if (record->xl_info & XLR_BKP_BLOCK(1))
00530 (void) RestoreBackupBlock(lsn, record, 1, false, false);
00531 else if (xldata->blknoPostfix != xldata->blknoPrefix)
00532 {
00533 buffer = XLogReadBuffer(xldata->node, xldata->blknoPostfix,
00534 xldata->newPage);
00535 if (BufferIsValid(buffer))
00536 {
00537 page = BufferGetPage(buffer);
00538
00539
00540 if (xldata->newPage)
00541 SpGistInitBuffer(buffer, 0);
00542
00543 if (lsn > PageGetLSN(page))
00544 {
00545 addOrReplaceTuple(page, (Item) postfixTuple,
00546 postfixTuple->size, xldata->offnumPostfix);
00547
00548 PageSetLSN(page, lsn);
00549 MarkBufferDirty(buffer);
00550 }
00551 UnlockReleaseBuffer(buffer);
00552 }
00553 }
00554
00555
00556 if (record->xl_info & XLR_BKP_BLOCK(0))
00557 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00558 else
00559 {
00560 buffer = XLogReadBuffer(xldata->node, xldata->blknoPrefix, false);
00561 if (BufferIsValid(buffer))
00562 {
00563 page = BufferGetPage(buffer);
00564 if (lsn > PageGetLSN(page))
00565 {
00566 PageIndexTupleDelete(page, xldata->offnumPrefix);
00567 if (PageAddItem(page, (Item) prefixTuple, prefixTuple->size,
00568 xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
00569 elog(ERROR, "failed to add item of size %u to SPGiST index page",
00570 prefixTuple->size);
00571
00572 if (xldata->blknoPostfix == xldata->blknoPrefix)
00573 addOrReplaceTuple(page, (Item) postfixTuple,
00574 postfixTuple->size,
00575 xldata->offnumPostfix);
00576
00577 PageSetLSN(page, lsn);
00578 MarkBufferDirty(buffer);
00579 }
00580 UnlockReleaseBuffer(buffer);
00581 }
00582 }
00583 }
00584
00585 static void
00586 spgRedoPickSplit(XLogRecPtr lsn, XLogRecord *record)
00587 {
00588 char *ptr = XLogRecGetData(record);
00589 spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
00590 SpGistInnerTuple innerTuple;
00591 SpGistState state;
00592 OffsetNumber *toDelete;
00593 OffsetNumber *toInsert;
00594 uint8 *leafPageSelect;
00595 Buffer srcBuffer;
00596 Buffer destBuffer;
00597 Page srcPage;
00598 Page destPage;
00599 Page page;
00600 int bbi;
00601 int i;
00602
00603 fillFakeState(&state, xldata->stateSrc);
00604
00605 ptr += MAXALIGN(sizeof(spgxlogPickSplit));
00606 innerTuple = (SpGistInnerTuple) ptr;
00607 ptr += innerTuple->size;
00608 toDelete = (OffsetNumber *) ptr;
00609 ptr += MAXALIGN(sizeof(OffsetNumber) * xldata->nDelete);
00610 toInsert = (OffsetNumber *) ptr;
00611 ptr += MAXALIGN(sizeof(OffsetNumber) * xldata->nInsert);
00612 leafPageSelect = (uint8 *) ptr;
00613 ptr += MAXALIGN(sizeof(uint8) * xldata->nInsert);
00614
00615
00616
00617
00618
00619
00620
00621 bbi = 0;
00622
00623 if (SpGistBlockIsRoot(xldata->blknoSrc))
00624 {
00625
00626 srcBuffer = InvalidBuffer;
00627 srcPage = NULL;
00628 }
00629 else if (xldata->initSrc)
00630 {
00631
00632 srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, true);
00633 Assert(BufferIsValid(srcBuffer));
00634 srcPage = (Page) BufferGetPage(srcBuffer);
00635
00636 SpGistInitBuffer(srcBuffer,
00637 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
00638
00639 }
00640 else
00641 {
00642
00643
00644
00645
00646
00647
00648 if (record->xl_info & XLR_BKP_BLOCK(bbi))
00649 {
00650 srcBuffer = RestoreBackupBlock(lsn, record, bbi, false, true);
00651 srcPage = NULL;
00652 }
00653 else
00654 {
00655 srcBuffer = XLogReadBuffer(xldata->node, xldata->blknoSrc, false);
00656 if (BufferIsValid(srcBuffer))
00657 {
00658 srcPage = BufferGetPage(srcBuffer);
00659 if (lsn > PageGetLSN(srcPage))
00660 {
00661
00662
00663
00664
00665
00666 if (!state.isBuild)
00667 spgPageIndexMultiDelete(&state, srcPage,
00668 toDelete, xldata->nDelete,
00669 SPGIST_REDIRECT,
00670 SPGIST_PLACEHOLDER,
00671 xldata->blknoInner,
00672 xldata->offnumInner);
00673 else
00674 spgPageIndexMultiDelete(&state, srcPage,
00675 toDelete, xldata->nDelete,
00676 SPGIST_PLACEHOLDER,
00677 SPGIST_PLACEHOLDER,
00678 InvalidBlockNumber,
00679 InvalidOffsetNumber);
00680
00681
00682 }
00683 else
00684 srcPage = NULL;
00685 }
00686 else
00687 srcPage = NULL;
00688 }
00689 bbi++;
00690 }
00691
00692
00693 if (xldata->blknoDest == InvalidBlockNumber)
00694 {
00695 destBuffer = InvalidBuffer;
00696 destPage = NULL;
00697 }
00698 else if (xldata->initDest)
00699 {
00700
00701 destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, true);
00702 Assert(BufferIsValid(destBuffer));
00703 destPage = (Page) BufferGetPage(destBuffer);
00704
00705 SpGistInitBuffer(destBuffer,
00706 SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
00707
00708 }
00709 else
00710 {
00711
00712
00713
00714
00715 if (record->xl_info & XLR_BKP_BLOCK(bbi))
00716 {
00717 destBuffer = RestoreBackupBlock(lsn, record, bbi, false, true);
00718 destPage = NULL;
00719 }
00720 else
00721 {
00722 destBuffer = XLogReadBuffer(xldata->node, xldata->blknoDest, false);
00723 if (BufferIsValid(destBuffer))
00724 {
00725 destPage = (Page) BufferGetPage(destBuffer);
00726 if (lsn <= PageGetLSN(destPage))
00727 destPage = NULL;
00728 }
00729 else
00730 destPage = NULL;
00731 }
00732 bbi++;
00733 }
00734
00735
00736 for (i = 0; i < xldata->nInsert; i++)
00737 {
00738 SpGistLeafTuple lt = (SpGistLeafTuple) ptr;
00739
00740 ptr += lt->size;
00741
00742 page = leafPageSelect[i] ? destPage : srcPage;
00743 if (page == NULL)
00744 continue;
00745
00746 addOrReplaceTuple(page, (Item) lt, lt->size, toInsert[i]);
00747 }
00748
00749
00750 if (srcPage != NULL)
00751 {
00752 PageSetLSN(srcPage, lsn);
00753 MarkBufferDirty(srcBuffer);
00754 }
00755 if (destPage != NULL)
00756 {
00757 PageSetLSN(destPage, lsn);
00758 MarkBufferDirty(destBuffer);
00759 }
00760
00761
00762 if (record->xl_info & XLR_BKP_BLOCK(bbi))
00763 (void) RestoreBackupBlock(lsn, record, bbi, false, false);
00764 else
00765 {
00766 Buffer buffer = XLogReadBuffer(xldata->node, xldata->blknoInner,
00767 xldata->initInner);
00768
00769 if (BufferIsValid(buffer))
00770 {
00771 page = BufferGetPage(buffer);
00772
00773 if (xldata->initInner)
00774 SpGistInitBuffer(buffer,
00775 (xldata->storesNulls ? SPGIST_NULLS : 0));
00776
00777 if (lsn > PageGetLSN(page))
00778 {
00779 addOrReplaceTuple(page, (Item) innerTuple, innerTuple->size,
00780 xldata->offnumInner);
00781
00782
00783 if (xldata->blknoInner == xldata->blknoParent)
00784 {
00785 SpGistInnerTuple parent;
00786
00787 parent = (SpGistInnerTuple) PageGetItem(page,
00788 PageGetItemId(page, xldata->offnumParent));
00789 spgUpdateNodeLink(parent, xldata->nodeI,
00790 xldata->blknoInner, xldata->offnumInner);
00791 }
00792
00793 PageSetLSN(page, lsn);
00794 MarkBufferDirty(buffer);
00795 }
00796 UnlockReleaseBuffer(buffer);
00797 }
00798 }
00799 bbi++;
00800
00801
00802
00803
00804
00805 if (BufferIsValid(srcBuffer))
00806 UnlockReleaseBuffer(srcBuffer);
00807 if (BufferIsValid(destBuffer))
00808 UnlockReleaseBuffer(destBuffer);
00809
00810
00811 if (xldata->blknoParent == InvalidBlockNumber)
00812 {
00813
00814 Assert(SpGistBlockIsRoot(xldata->blknoInner));
00815 }
00816 else if (xldata->blknoInner != xldata->blknoParent)
00817 {
00818 if (record->xl_info & XLR_BKP_BLOCK(bbi))
00819 (void) RestoreBackupBlock(lsn, record, bbi, false, false);
00820 else
00821 {
00822 Buffer buffer = XLogReadBuffer(xldata->node, xldata->blknoParent, false);
00823
00824 if (BufferIsValid(buffer))
00825 {
00826 page = BufferGetPage(buffer);
00827
00828 if (lsn > PageGetLSN(page))
00829 {
00830 SpGistInnerTuple parent;
00831
00832 parent = (SpGistInnerTuple) PageGetItem(page,
00833 PageGetItemId(page, xldata->offnumParent));
00834 spgUpdateNodeLink(parent, xldata->nodeI,
00835 xldata->blknoInner, xldata->offnumInner);
00836
00837 PageSetLSN(page, lsn);
00838 MarkBufferDirty(buffer);
00839 }
00840 UnlockReleaseBuffer(buffer);
00841 }
00842 }
00843 }
00844 }
00845
00846 static void
00847 spgRedoVacuumLeaf(XLogRecPtr lsn, XLogRecord *record)
00848 {
00849 char *ptr = XLogRecGetData(record);
00850 spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
00851 OffsetNumber *toDead;
00852 OffsetNumber *toPlaceholder;
00853 OffsetNumber *moveSrc;
00854 OffsetNumber *moveDest;
00855 OffsetNumber *chainSrc;
00856 OffsetNumber *chainDest;
00857 SpGistState state;
00858 Buffer buffer;
00859 Page page;
00860 int i;
00861
00862 fillFakeState(&state, xldata->stateSrc);
00863
00864 ptr += sizeof(spgxlogVacuumLeaf);
00865 toDead = (OffsetNumber *) ptr;
00866 ptr += sizeof(OffsetNumber) * xldata->nDead;
00867 toPlaceholder = (OffsetNumber *) ptr;
00868 ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
00869 moveSrc = (OffsetNumber *) ptr;
00870 ptr += sizeof(OffsetNumber) * xldata->nMove;
00871 moveDest = (OffsetNumber *) ptr;
00872 ptr += sizeof(OffsetNumber) * xldata->nMove;
00873 chainSrc = (OffsetNumber *) ptr;
00874 ptr += sizeof(OffsetNumber) * xldata->nChain;
00875 chainDest = (OffsetNumber *) ptr;
00876
00877 if (record->xl_info & XLR_BKP_BLOCK(0))
00878 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00879 else
00880 {
00881 buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
00882 if (BufferIsValid(buffer))
00883 {
00884 page = BufferGetPage(buffer);
00885 if (lsn > PageGetLSN(page))
00886 {
00887 spgPageIndexMultiDelete(&state, page,
00888 toDead, xldata->nDead,
00889 SPGIST_DEAD, SPGIST_DEAD,
00890 InvalidBlockNumber,
00891 InvalidOffsetNumber);
00892
00893 spgPageIndexMultiDelete(&state, page,
00894 toPlaceholder, xldata->nPlaceholder,
00895 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
00896 InvalidBlockNumber,
00897 InvalidOffsetNumber);
00898
00899
00900 for (i = 0; i < xldata->nMove; i++)
00901 {
00902 ItemId idSrc = PageGetItemId(page, moveSrc[i]);
00903 ItemId idDest = PageGetItemId(page, moveDest[i]);
00904 ItemIdData tmp;
00905
00906 tmp = *idSrc;
00907 *idSrc = *idDest;
00908 *idDest = tmp;
00909 }
00910
00911 spgPageIndexMultiDelete(&state, page,
00912 moveSrc, xldata->nMove,
00913 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
00914 InvalidBlockNumber,
00915 InvalidOffsetNumber);
00916
00917 for (i = 0; i < xldata->nChain; i++)
00918 {
00919 SpGistLeafTuple lt;
00920
00921 lt = (SpGistLeafTuple) PageGetItem(page,
00922 PageGetItemId(page, chainSrc[i]));
00923 Assert(lt->tupstate == SPGIST_LIVE);
00924 lt->nextOffset = chainDest[i];
00925 }
00926
00927 PageSetLSN(page, lsn);
00928 MarkBufferDirty(buffer);
00929 }
00930 UnlockReleaseBuffer(buffer);
00931 }
00932 }
00933 }
00934
00935 static void
00936 spgRedoVacuumRoot(XLogRecPtr lsn, XLogRecord *record)
00937 {
00938 char *ptr = XLogRecGetData(record);
00939 spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
00940 OffsetNumber *toDelete;
00941 Buffer buffer;
00942 Page page;
00943
00944 ptr += sizeof(spgxlogVacuumRoot);
00945 toDelete = (OffsetNumber *) ptr;
00946
00947 if (record->xl_info & XLR_BKP_BLOCK(0))
00948 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00949 else
00950 {
00951 buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
00952 if (BufferIsValid(buffer))
00953 {
00954 page = BufferGetPage(buffer);
00955 if (lsn > PageGetLSN(page))
00956 {
00957
00958 PageIndexMultiDelete(page, toDelete, xldata->nDelete);
00959
00960 PageSetLSN(page, lsn);
00961 MarkBufferDirty(buffer);
00962 }
00963 UnlockReleaseBuffer(buffer);
00964 }
00965 }
00966 }
00967
00968 static void
00969 spgRedoVacuumRedirect(XLogRecPtr lsn, XLogRecord *record)
00970 {
00971 char *ptr = XLogRecGetData(record);
00972 spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
00973 OffsetNumber *itemToPlaceholder;
00974 Buffer buffer;
00975 Page page;
00976
00977 ptr += sizeof(spgxlogVacuumRedirect);
00978 itemToPlaceholder = (OffsetNumber *) ptr;
00979
00980
00981
00982
00983
00984 if (InHotStandby)
00985 {
00986 if (TransactionIdIsValid(xldata->newestRedirectXid))
00987 ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
00988 xldata->node);
00989 }
00990
00991 if (record->xl_info & XLR_BKP_BLOCK(0))
00992 (void) RestoreBackupBlock(lsn, record, 0, false, false);
00993 else
00994 {
00995 buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
00996
00997 if (BufferIsValid(buffer))
00998 {
00999 page = BufferGetPage(buffer);
01000 if (lsn > PageGetLSN(page))
01001 {
01002 SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
01003 int i;
01004
01005
01006 for (i = 0; i < xldata->nToPlaceholder; i++)
01007 {
01008 SpGistDeadTuple dt;
01009
01010 dt = (SpGistDeadTuple) PageGetItem(page,
01011 PageGetItemId(page, itemToPlaceholder[i]));
01012 Assert(dt->tupstate == SPGIST_REDIRECT);
01013 dt->tupstate = SPGIST_PLACEHOLDER;
01014 ItemPointerSetInvalid(&dt->pointer);
01015 }
01016
01017 Assert(opaque->nRedirection >= xldata->nToPlaceholder);
01018 opaque->nRedirection -= xldata->nToPlaceholder;
01019 opaque->nPlaceholder += xldata->nToPlaceholder;
01020
01021
01022 if (xldata->firstPlaceholder != InvalidOffsetNumber)
01023 {
01024 int max = PageGetMaxOffsetNumber(page);
01025 OffsetNumber *toDelete;
01026
01027 toDelete = palloc(sizeof(OffsetNumber) * max);
01028
01029 for (i = xldata->firstPlaceholder; i <= max; i++)
01030 toDelete[i - xldata->firstPlaceholder] = i;
01031
01032 i = max - xldata->firstPlaceholder + 1;
01033 Assert(opaque->nPlaceholder >= i);
01034 opaque->nPlaceholder -= i;
01035
01036
01037 PageIndexMultiDelete(page, toDelete, i);
01038
01039 pfree(toDelete);
01040 }
01041
01042 PageSetLSN(page, lsn);
01043 MarkBufferDirty(buffer);
01044 }
01045
01046 UnlockReleaseBuffer(buffer);
01047 }
01048 }
01049 }
01050
01051 void
01052 spg_redo(XLogRecPtr lsn, XLogRecord *record)
01053 {
01054 uint8 info = record->xl_info & ~XLR_INFO_MASK;
01055 MemoryContext oldCxt;
01056
01057 oldCxt = MemoryContextSwitchTo(opCtx);
01058 switch (info)
01059 {
01060 case XLOG_SPGIST_CREATE_INDEX:
01061 spgRedoCreateIndex(lsn, record);
01062 break;
01063 case XLOG_SPGIST_ADD_LEAF:
01064 spgRedoAddLeaf(lsn, record);
01065 break;
01066 case XLOG_SPGIST_MOVE_LEAFS:
01067 spgRedoMoveLeafs(lsn, record);
01068 break;
01069 case XLOG_SPGIST_ADD_NODE:
01070 spgRedoAddNode(lsn, record);
01071 break;
01072 case XLOG_SPGIST_SPLIT_TUPLE:
01073 spgRedoSplitTuple(lsn, record);
01074 break;
01075 case XLOG_SPGIST_PICKSPLIT:
01076 spgRedoPickSplit(lsn, record);
01077 break;
01078 case XLOG_SPGIST_VACUUM_LEAF:
01079 spgRedoVacuumLeaf(lsn, record);
01080 break;
01081 case XLOG_SPGIST_VACUUM_ROOT:
01082 spgRedoVacuumRoot(lsn, record);
01083 break;
01084 case XLOG_SPGIST_VACUUM_REDIRECT:
01085 spgRedoVacuumRedirect(lsn, record);
01086 break;
01087 default:
01088 elog(PANIC, "spg_redo: unknown op code %u", info);
01089 }
01090
01091 MemoryContextSwitchTo(oldCxt);
01092 MemoryContextReset(opCtx);
01093 }
01094
01095 void
01096 spg_xlog_startup(void)
01097 {
01098 opCtx = AllocSetContextCreate(CurrentMemoryContext,
01099 "SP-GiST temporary context",
01100 ALLOCSET_DEFAULT_MINSIZE,
01101 ALLOCSET_DEFAULT_INITSIZE,
01102 ALLOCSET_DEFAULT_MAXSIZE);
01103 }
01104
01105 void
01106 spg_xlog_cleanup(void)
01107 {
01108 MemoryContextDelete(opCtx);
01109 opCtx = NULL;
01110 }