00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include "postgres.h"
00015
00016 #include "access/gist_private.h"
00017 #include "access/xlogutils.h"
00018 #include "utils/memutils.h"
00019
00020 typedef struct
00021 {
00022 gistxlogPage *header;
00023 IndexTuple *itup;
00024 } NewPage;
00025
00026 typedef struct
00027 {
00028 gistxlogPageSplit *data;
00029 NewPage *page;
00030 } PageSplitRecord;
00031
00032 static MemoryContext opCtx;
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045 static void
00046 gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index,
00047 RelFileNode node, BlockNumber childblkno)
00048 {
00049 Buffer buffer;
00050 Page page;
00051
00052 if (record->xl_info & XLR_BKP_BLOCK(block_index))
00053 buffer = RestoreBackupBlock(lsn, record, block_index, false, true);
00054 else
00055 {
00056 buffer = XLogReadBuffer(node, childblkno, false);
00057 if (!BufferIsValid(buffer))
00058 return;
00059 }
00060 page = (Page) BufferGetPage(buffer);
00061
00062
00063
00064
00065
00066
00067 if (lsn >= PageGetLSN(page))
00068 {
00069 GistPageSetNSN(page, lsn);
00070 GistClearFollowRight(page);
00071
00072 PageSetLSN(page, lsn);
00073 MarkBufferDirty(buffer);
00074 }
00075 UnlockReleaseBuffer(buffer);
00076 }
00077
00078
00079
00080
00081 static void
00082 gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
00083 {
00084 char *begin = XLogRecGetData(record);
00085 gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
00086 Buffer buffer;
00087 Page page;
00088 char *data;
00089
00090
00091
00092
00093
00094
00095
00096
00097 if (record->xl_info & XLR_BKP_BLOCK(0))
00098 buffer = RestoreBackupBlock(lsn, record, 0, false, true);
00099 else
00100 buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
00101
00102
00103 if (BlockNumberIsValid(xldata->leftchild))
00104 gistRedoClearFollowRight(lsn, record, 1,
00105 xldata->node, xldata->leftchild);
00106
00107
00108 if (!BufferIsValid(buffer))
00109 return;
00110
00111
00112 if (record->xl_info & XLR_BKP_BLOCK(0))
00113 {
00114 UnlockReleaseBuffer(buffer);
00115 return;
00116 }
00117
00118 page = (Page) BufferGetPage(buffer);
00119
00120
00121 if (lsn <= PageGetLSN(page))
00122 {
00123 UnlockReleaseBuffer(buffer);
00124 return;
00125 }
00126
00127 data = begin + sizeof(gistxlogPageUpdate);
00128
00129
00130 if (xldata->ntodelete > 0)
00131 {
00132 int i;
00133 OffsetNumber *todelete = (OffsetNumber *) data;
00134
00135 data += sizeof(OffsetNumber) * xldata->ntodelete;
00136
00137 for (i = 0; i < xldata->ntodelete; i++)
00138 PageIndexTupleDelete(page, todelete[i]);
00139 if (GistPageIsLeaf(page))
00140 GistMarkTuplesDeleted(page);
00141 }
00142
00143
00144 if (data - begin < record->xl_len)
00145 {
00146 OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
00147 OffsetNumberNext(PageGetMaxOffsetNumber(page));
00148
00149 while (data - begin < record->xl_len)
00150 {
00151 IndexTuple itup = (IndexTuple) data;
00152 Size sz = IndexTupleSize(itup);
00153 OffsetNumber l;
00154
00155 data += sz;
00156
00157 l = PageAddItem(page, (Item) itup, sz, off, false, false);
00158 if (l == InvalidOffsetNumber)
00159 elog(ERROR, "failed to add item to GiST index page, size %d bytes",
00160 (int) sz);
00161 off++;
00162 }
00163 }
00164 else
00165 {
00166
00167
00168
00169
00170 if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
00171 GistClearTuplesDeleted(page);
00172 }
00173
00174 if (!GistPageIsLeaf(page) &&
00175 PageGetMaxOffsetNumber(page) == InvalidOffsetNumber &&
00176 xldata->blkno == GIST_ROOT_BLKNO)
00177 {
00178
00179
00180
00181
00182 GistPageSetLeaf(page);
00183 }
00184
00185 GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
00186 PageSetLSN(page, lsn);
00187 MarkBufferDirty(buffer);
00188 UnlockReleaseBuffer(buffer);
00189 }
00190
00191 static void
00192 decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
00193 {
00194 char *begin = XLogRecGetData(record),
00195 *ptr;
00196 int j,
00197 i = 0;
00198
00199 decoded->data = (gistxlogPageSplit *) begin;
00200 decoded->page = (NewPage *) palloc(sizeof(NewPage) * decoded->data->npage);
00201
00202 ptr = begin + sizeof(gistxlogPageSplit);
00203 for (i = 0; i < decoded->data->npage; i++)
00204 {
00205 Assert(ptr - begin < record->xl_len);
00206 decoded->page[i].header = (gistxlogPage *) ptr;
00207 ptr += sizeof(gistxlogPage);
00208
00209 decoded->page[i].itup = (IndexTuple *)
00210 palloc(sizeof(IndexTuple) * decoded->page[i].header->num);
00211 j = 0;
00212 while (j < decoded->page[i].header->num)
00213 {
00214 Assert(ptr - begin < record->xl_len);
00215 decoded->page[i].itup[j] = (IndexTuple) ptr;
00216 ptr += IndexTupleSize((IndexTuple) ptr);
00217 j++;
00218 }
00219 }
00220 }
00221
00222 static void
00223 gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
00224 {
00225 gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
00226 PageSplitRecord xlrec;
00227 Buffer firstbuffer = InvalidBuffer;
00228 Buffer buffer;
00229 Page page;
00230 int i;
00231 bool isrootsplit = false;
00232
00233 decodePageSplitRecord(&xlrec, record);
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244 for (i = 0; i < xlrec.data->npage; i++)
00245 {
00246 NewPage *newpage = xlrec.page + i;
00247 int flags;
00248
00249 if (newpage->header->blkno == GIST_ROOT_BLKNO)
00250 {
00251 Assert(i == 0);
00252 isrootsplit = true;
00253 }
00254
00255 buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
00256 Assert(BufferIsValid(buffer));
00257 page = (Page) BufferGetPage(buffer);
00258
00259
00260 if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
00261 flags = F_LEAF;
00262 else
00263 flags = 0;
00264 GISTInitBuffer(buffer, flags);
00265
00266
00267 gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
00268
00269 if (newpage->header->blkno == GIST_ROOT_BLKNO)
00270 {
00271 GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
00272 GistPageSetNSN(page, xldata->orignsn);
00273 GistClearFollowRight(page);
00274 }
00275 else
00276 {
00277 if (i < xlrec.data->npage - 1)
00278 GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
00279 else
00280 GistPageGetOpaque(page)->rightlink = xldata->origrlink;
00281 GistPageSetNSN(page, xldata->orignsn);
00282 if (i < xlrec.data->npage - 1 && !isrootsplit &&
00283 xldata->markfollowright)
00284 GistMarkFollowRight(page);
00285 else
00286 GistClearFollowRight(page);
00287 }
00288
00289 PageSetLSN(page, lsn);
00290 MarkBufferDirty(buffer);
00291
00292 if (i == 0)
00293 firstbuffer = buffer;
00294 else
00295 UnlockReleaseBuffer(buffer);
00296 }
00297
00298
00299 if (BlockNumberIsValid(xldata->leftchild))
00300 gistRedoClearFollowRight(lsn, record, 0,
00301 xldata->node, xldata->leftchild);
00302
00303
00304 UnlockReleaseBuffer(firstbuffer);
00305 }
00306
00307 static void
00308 gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
00309 {
00310 RelFileNode *node = (RelFileNode *) XLogRecGetData(record);
00311 Buffer buffer;
00312 Page page;
00313
00314
00315 Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
00316
00317 buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
00318 Assert(BufferIsValid(buffer));
00319 page = (Page) BufferGetPage(buffer);
00320
00321 GISTInitBuffer(buffer, F_LEAF);
00322
00323 PageSetLSN(page, lsn);
00324
00325 MarkBufferDirty(buffer);
00326 UnlockReleaseBuffer(buffer);
00327 }
00328
00329 void
00330 gist_redo(XLogRecPtr lsn, XLogRecord *record)
00331 {
00332 uint8 info = record->xl_info & ~XLR_INFO_MASK;
00333 MemoryContext oldCxt;
00334
00335
00336
00337
00338
00339
00340
00341 oldCxt = MemoryContextSwitchTo(opCtx);
00342 switch (info)
00343 {
00344 case XLOG_GIST_PAGE_UPDATE:
00345 gistRedoPageUpdateRecord(lsn, record);
00346 break;
00347 case XLOG_GIST_PAGE_SPLIT:
00348 gistRedoPageSplitRecord(lsn, record);
00349 break;
00350 case XLOG_GIST_CREATE_INDEX:
00351 gistRedoCreateIndex(lsn, record);
00352 break;
00353 default:
00354 elog(PANIC, "gist_redo: unknown op code %u", info);
00355 }
00356
00357 MemoryContextSwitchTo(oldCxt);
00358 MemoryContextReset(opCtx);
00359 }
00360
00361 void
00362 gist_xlog_startup(void)
00363 {
00364 opCtx = createTempGistContext();
00365 }
00366
00367 void
00368 gist_xlog_cleanup(void)
00369 {
00370 MemoryContextDelete(opCtx);
00371 }
00372
00373
00374
00375
00376 XLogRecPtr
00377 gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
00378 SplitedPageLayout *dist,
00379 BlockNumber origrlink, GistNSN orignsn,
00380 Buffer leftchildbuf, bool markfollowright)
00381 {
00382 XLogRecData *rdata;
00383 gistxlogPageSplit xlrec;
00384 SplitedPageLayout *ptr;
00385 int npage = 0,
00386 cur;
00387 XLogRecPtr recptr;
00388
00389 for (ptr = dist; ptr; ptr = ptr->next)
00390 npage++;
00391
00392 rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
00393
00394 xlrec.node = node;
00395 xlrec.origblkno = blkno;
00396 xlrec.origrlink = origrlink;
00397 xlrec.orignsn = orignsn;
00398 xlrec.origleaf = page_is_leaf;
00399 xlrec.npage = (uint16) npage;
00400 xlrec.leftchild =
00401 BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
00402 xlrec.markfollowright = markfollowright;
00403
00404 rdata[0].data = (char *) &xlrec;
00405 rdata[0].len = sizeof(gistxlogPageSplit);
00406 rdata[0].buffer = InvalidBuffer;
00407
00408 cur = 1;
00409
00410
00411
00412
00413
00414 if (BufferIsValid(leftchildbuf))
00415 {
00416 rdata[cur - 1].next = &(rdata[cur]);
00417 rdata[cur].data = NULL;
00418 rdata[cur].len = 0;
00419 rdata[cur].buffer = leftchildbuf;
00420 rdata[cur].buffer_std = true;
00421 cur++;
00422 }
00423
00424 for (ptr = dist; ptr; ptr = ptr->next)
00425 {
00426 rdata[cur - 1].next = &(rdata[cur]);
00427 rdata[cur].buffer = InvalidBuffer;
00428 rdata[cur].data = (char *) &(ptr->block);
00429 rdata[cur].len = sizeof(gistxlogPage);
00430 cur++;
00431
00432 rdata[cur - 1].next = &(rdata[cur]);
00433 rdata[cur].buffer = InvalidBuffer;
00434 rdata[cur].data = (char *) (ptr->list);
00435 rdata[cur].len = ptr->lenlist;
00436 cur++;
00437 }
00438 rdata[cur - 1].next = NULL;
00439
00440 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
00441
00442 pfree(rdata);
00443 return recptr;
00444 }
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459 XLogRecPtr
00460 gistXLogUpdate(RelFileNode node, Buffer buffer,
00461 OffsetNumber *todelete, int ntodelete,
00462 IndexTuple *itup, int ituplen,
00463 Buffer leftchildbuf)
00464 {
00465 XLogRecData *rdata;
00466 gistxlogPageUpdate xlrec;
00467 int cur,
00468 i;
00469 XLogRecPtr recptr;
00470
00471 rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
00472
00473 xlrec.node = node;
00474 xlrec.blkno = BufferGetBlockNumber(buffer);
00475 xlrec.ntodelete = ntodelete;
00476 xlrec.leftchild =
00477 BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
00478
00479 rdata[0].data = (char *) &xlrec;
00480 rdata[0].len = sizeof(gistxlogPageUpdate);
00481 rdata[0].buffer = InvalidBuffer;
00482 rdata[0].next = &(rdata[1]);
00483
00484 rdata[1].data = (char *) todelete;
00485 rdata[1].len = sizeof(OffsetNumber) * ntodelete;
00486 rdata[1].buffer = buffer;
00487 rdata[1].buffer_std = true;
00488
00489 cur = 2;
00490
00491
00492 for (i = 0; i < ituplen; i++)
00493 {
00494 rdata[cur - 1].next = &(rdata[cur]);
00495 rdata[cur].data = (char *) (itup[i]);
00496 rdata[cur].len = IndexTupleSize(itup[i]);
00497 rdata[cur].buffer = buffer;
00498 rdata[cur].buffer_std = true;
00499 cur++;
00500 }
00501
00502
00503
00504
00505
00506 if (BufferIsValid(leftchildbuf))
00507 {
00508 rdata[cur - 1].next = &(rdata[cur]);
00509 rdata[cur].data = NULL;
00510 rdata[cur].len = 0;
00511 rdata[cur].buffer = leftchildbuf;
00512 rdata[cur].buffer_std = true;
00513 cur++;
00514 }
00515 rdata[cur - 1].next = NULL;
00516
00517 recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
00518
00519 pfree(rdata);
00520 return recptr;
00521 }