00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "postgres.h"
00016
00017 #include "access/genam.h"
00018 #include "access/gist_private.h"
00019 #include "access/heapam_xlog.h"
00020 #include "catalog/index.h"
00021 #include "catalog/pg_collation.h"
00022 #include "miscadmin.h"
00023 #include "storage/bufmgr.h"
00024 #include "storage/indexfsm.h"
00025 #include "utils/memutils.h"
00026 #include "utils/rel.h"
00027
00028
00029 static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
00030 static bool gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
00031 GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum);
00032 static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
00033 GISTSTATE *giststate,
00034 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
00035 Buffer leftchild, Buffer rightchild,
00036 bool unlockbuf, bool unlockleftchild);
00037 static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
00038 GISTSTATE *giststate, List *splitinfo, bool releasebuf);
00039
00040
00041 #define ROTATEDIST(d) do { \
00042 SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
00043 memset(tmp,0,sizeof(SplitedPageLayout)); \
00044 tmp->block.blkno = InvalidBlockNumber; \
00045 tmp->buffer = InvalidBuffer; \
00046 tmp->next = (d); \
00047 (d)=tmp; \
00048 } while(0)
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059 MemoryContext
00060 createTempGistContext(void)
00061 {
00062 return AllocSetContextCreate(CurrentMemoryContext,
00063 "GiST temporary context",
00064 ALLOCSET_DEFAULT_MINSIZE,
00065 ALLOCSET_DEFAULT_INITSIZE,
00066 ALLOCSET_DEFAULT_MAXSIZE);
00067 }
00068
00069
00070
00071
00072 Datum
00073 gistbuildempty(PG_FUNCTION_ARGS)
00074 {
00075 Relation index = (Relation) PG_GETARG_POINTER(0);
00076 Buffer buffer;
00077
00078
00079 buffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
00080 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
00081
00082
00083 START_CRIT_SECTION();
00084 GISTInitBuffer(buffer, F_LEAF);
00085 MarkBufferDirty(buffer);
00086 log_newpage_buffer(buffer);
00087 END_CRIT_SECTION();
00088
00089
00090 UnlockReleaseBuffer(buffer);
00091
00092 PG_RETURN_VOID();
00093 }
00094
00095
00096
00097
00098
00099
00100
00101 Datum
00102 gistinsert(PG_FUNCTION_ARGS)
00103 {
00104 Relation r = (Relation) PG_GETARG_POINTER(0);
00105 Datum *values = (Datum *) PG_GETARG_POINTER(1);
00106 bool *isnull = (bool *) PG_GETARG_POINTER(2);
00107 ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
00108
00109 #ifdef NOT_USED
00110 Relation heapRel = (Relation) PG_GETARG_POINTER(4);
00111 IndexUniqueCheck checkUnique = (IndexUniqueCheck) PG_GETARG_INT32(5);
00112 #endif
00113 IndexTuple itup;
00114 GISTSTATE *giststate;
00115 MemoryContext oldCxt;
00116
00117 giststate = initGISTstate(r);
00118
00119
00120
00121
00122
00123
00124
00125
00126 oldCxt = MemoryContextSwitchTo(giststate->tempCxt);
00127
00128 itup = gistFormTuple(giststate, r,
00129 values, isnull, true );
00130 itup->t_tid = *ht_ctid;
00131
00132 gistdoinsert(r, itup, 0, giststate);
00133
00134
00135 MemoryContextSwitchTo(oldCxt);
00136 freeGISTstate(giststate);
00137
00138 PG_RETURN_BOOL(false);
00139 }
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171 bool
00172 gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
00173 Buffer buffer,
00174 IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
00175 BlockNumber *newblkno,
00176 Buffer leftchildbuf,
00177 List **splitinfo,
00178 bool markfollowright)
00179 {
00180 BlockNumber blkno = BufferGetBlockNumber(buffer);
00181 Page page = BufferGetPage(buffer);
00182 bool is_leaf = (GistPageIsLeaf(page)) ? true : false;
00183 XLogRecPtr recptr;
00184 int i;
00185 bool is_split;
00186
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196 if (GistFollowRight(page))
00197 elog(ERROR, "concurrent GiST page split was incomplete");
00198
00199 *splitinfo = NIL;
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211 is_split = gistnospace(page, itup, ntup, oldoffnum, freespace);
00212 if (is_split)
00213 {
00214
00215 IndexTuple *itvec;
00216 int tlen;
00217 SplitedPageLayout *dist = NULL,
00218 *ptr;
00219 BlockNumber oldrlink = InvalidBlockNumber;
00220 GistNSN oldnsn = 0;
00221 SplitedPageLayout rootpg;
00222 bool is_rootsplit;
00223
00224 is_rootsplit = (blkno == GIST_ROOT_BLKNO);
00225
00226
00227
00228
00229
00230 itvec = gistextractpage(page, &tlen);
00231 if (OffsetNumberIsValid(oldoffnum))
00232 {
00233
00234 int pos = oldoffnum - FirstOffsetNumber;
00235
00236 tlen--;
00237 if (pos != tlen)
00238 memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
00239 }
00240 itvec = gistjoinvector(itvec, &tlen, itup, ntup);
00241 dist = gistSplit(rel, page, itvec, tlen, giststate);
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252 ptr = dist;
00253 if (!is_rootsplit)
00254 {
00255
00256 oldrlink = GistPageGetOpaque(page)->rightlink;
00257 oldnsn = GistPageGetNSN(page);
00258
00259 dist->buffer = buffer;
00260 dist->block.blkno = BufferGetBlockNumber(buffer);
00261 dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
00262
00263
00264 GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
00265
00266 ptr = ptr->next;
00267 }
00268 for (; ptr; ptr = ptr->next)
00269 {
00270
00271 ptr->buffer = gistNewBuffer(rel);
00272 GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
00273 ptr->page = BufferGetPage(ptr->buffer);
00274 ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
00275 }
00276
00277
00278
00279
00280
00281 for (ptr = dist; ptr; ptr = ptr->next)
00282 {
00283 ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
00284 GistTupleSetValid(ptr->itup);
00285 }
00286
00287
00288
00289
00290
00291
00292 if (is_rootsplit)
00293 {
00294 IndexTuple *downlinks;
00295 int ndownlinks = 0;
00296 int i;
00297
00298 rootpg.buffer = buffer;
00299 rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
00300 GistPageGetOpaque(rootpg.page)->flags = 0;
00301
00302
00303 for (ptr = dist; ptr; ptr = ptr->next)
00304 ndownlinks++;
00305 downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
00306 for (i = 0, ptr = dist; ptr; ptr = ptr->next)
00307 downlinks[i++] = ptr->itup;
00308
00309 rootpg.block.blkno = GIST_ROOT_BLKNO;
00310 rootpg.block.num = ndownlinks;
00311 rootpg.list = gistfillitupvec(downlinks, ndownlinks,
00312 &(rootpg.lenlist));
00313 rootpg.itup = NULL;
00314
00315 rootpg.next = dist;
00316 dist = &rootpg;
00317 }
00318 else
00319 {
00320
00321 for (ptr = dist; ptr; ptr = ptr->next)
00322 {
00323 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
00324
00325 si->buf = ptr->buffer;
00326 si->downlink = ptr->itup;
00327 *splitinfo = lappend(*splitinfo, si);
00328 }
00329 }
00330
00331
00332
00333
00334
00335 for (ptr = dist; ptr; ptr = ptr->next)
00336 {
00337 char *data = (char *) (ptr->list);
00338
00339 for (i = 0; i < ptr->block.num; i++)
00340 {
00341 IndexTuple thistup = (IndexTuple) data;
00342
00343 if (PageAddItem(ptr->page, (Item) data, IndexTupleSize(thistup), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
00344 elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(rel));
00345
00346
00347
00348
00349
00350 if (newblkno && ItemPointerEquals(&thistup->t_tid, &(*itup)->t_tid))
00351 *newblkno = ptr->block.blkno;
00352
00353 data += IndexTupleSize(thistup);
00354 }
00355
00356
00357 if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
00358 GistPageGetOpaque(ptr->page)->rightlink =
00359 ptr->next->block.blkno;
00360 else
00361 GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371 if (ptr->next && !is_rootsplit && markfollowright)
00372 GistMarkFollowRight(ptr->page);
00373 else
00374 GistClearFollowRight(ptr->page);
00375
00376
00377
00378
00379
00380
00381 GistPageSetNSN(ptr->page, oldnsn);
00382 }
00383
00384 START_CRIT_SECTION();
00385
00386
00387
00388
00389
00390 for (ptr = dist; ptr; ptr = ptr->next)
00391 MarkBufferDirty(ptr->buffer);
00392 if (BufferIsValid(leftchildbuf))
00393 MarkBufferDirty(leftchildbuf);
00394
00395
00396
00397
00398
00399 PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
00400 dist->page = BufferGetPage(dist->buffer);
00401
00402
00403 if (RelationNeedsWAL(rel))
00404 recptr = gistXLogSplit(rel->rd_node, blkno, is_leaf,
00405 dist, oldrlink, oldnsn, leftchildbuf,
00406 markfollowright);
00407 else
00408 recptr = gistGetFakeLSN(rel);
00409
00410 for (ptr = dist; ptr; ptr = ptr->next)
00411 {
00412 PageSetLSN(ptr->page, recptr);
00413 }
00414
00415
00416
00417
00418
00419
00420
00421
00422 if (is_rootsplit)
00423 {
00424 for (ptr = dist->next; ptr; ptr = ptr->next)
00425 UnlockReleaseBuffer(ptr->buffer);
00426 }
00427 }
00428 else
00429 {
00430
00431
00432
00433 START_CRIT_SECTION();
00434
00435 if (OffsetNumberIsValid(oldoffnum))
00436 PageIndexTupleDelete(page, oldoffnum);
00437 gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
00438
00439 MarkBufferDirty(buffer);
00440
00441 if (BufferIsValid(leftchildbuf))
00442 MarkBufferDirty(leftchildbuf);
00443
00444 if (RelationNeedsWAL(rel))
00445 {
00446 OffsetNumber ndeloffs = 0,
00447 deloffs[1];
00448
00449 if (OffsetNumberIsValid(oldoffnum))
00450 {
00451 deloffs[0] = oldoffnum;
00452 ndeloffs = 1;
00453 }
00454
00455 recptr = gistXLogUpdate(rel->rd_node, buffer,
00456 deloffs, ndeloffs, itup, ntup,
00457 leftchildbuf);
00458
00459 PageSetLSN(page, recptr);
00460 }
00461 else
00462 {
00463 recptr = gistGetFakeLSN(rel);
00464 PageSetLSN(page, recptr);
00465 }
00466
00467 if (newblkno)
00468 *newblkno = blkno;
00469 }
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484 if (BufferIsValid(leftchildbuf))
00485 {
00486 Page leftpg = BufferGetPage(leftchildbuf);
00487
00488 GistPageSetNSN(leftpg, recptr);
00489 GistClearFollowRight(leftpg);
00490
00491 PageSetLSN(leftpg, recptr);
00492 }
00493
00494 END_CRIT_SECTION();
00495
00496 return is_split;
00497 }
00498
00499
00500
00501
00502
00503
00504 void
00505 gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
00506 {
00507 ItemId iid;
00508 IndexTuple idxtuple;
00509 GISTInsertStack firststack;
00510 GISTInsertStack *stack;
00511 GISTInsertState state;
00512 bool xlocked = false;
00513
00514 memset(&state, 0, sizeof(GISTInsertState));
00515 state.freespace = freespace;
00516 state.r = r;
00517
00518
00519 firststack.blkno = GIST_ROOT_BLKNO;
00520 firststack.lsn = 0;
00521 firststack.parent = NULL;
00522 firststack.downlinkoffnum = InvalidOffsetNumber;
00523 state.stack = stack = &firststack;
00524
00525
00526
00527
00528
00529
00530
00531 for (;;)
00532 {
00533 if (XLogRecPtrIsInvalid(stack->lsn))
00534 stack->buffer = ReadBuffer(state.r, stack->blkno);
00535
00536
00537
00538
00539
00540 if (!xlocked)
00541 {
00542 LockBuffer(stack->buffer, GIST_SHARE);
00543 gistcheckpage(state.r, stack->buffer);
00544 }
00545
00546 stack->page = (Page) BufferGetPage(stack->buffer);
00547 stack->lsn = PageGetLSN(stack->page);
00548 Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
00549
00550
00551
00552
00553
00554
00555 if (GistFollowRight(stack->page))
00556 {
00557 if (!xlocked)
00558 {
00559 LockBuffer(stack->buffer, GIST_UNLOCK);
00560 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
00561 xlocked = true;
00562
00563 if (!GistFollowRight(stack->page))
00564 continue;
00565 }
00566 gistfixsplit(&state, giststate);
00567
00568 UnlockReleaseBuffer(stack->buffer);
00569 xlocked = false;
00570 state.stack = stack = stack->parent;
00571 continue;
00572 }
00573
00574 if (stack->blkno != GIST_ROOT_BLKNO &&
00575 stack->parent->lsn < GistPageGetNSN(stack->page))
00576 {
00577
00578
00579
00580
00581
00582
00583 UnlockReleaseBuffer(stack->buffer);
00584 xlocked = false;
00585 state.stack = stack = stack->parent;
00586 continue;
00587 }
00588
00589 if (!GistPageIsLeaf(stack->page))
00590 {
00591
00592
00593
00594
00595 BlockNumber childblkno;
00596 IndexTuple newtup;
00597 GISTInsertStack *item;
00598 OffsetNumber downlinkoffnum;
00599
00600 downlinkoffnum = gistchoose(state.r, stack->page, itup, giststate);
00601 iid = PageGetItemId(stack->page, downlinkoffnum);
00602 idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
00603 childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
00604
00605
00606
00607
00608 if (GistTupleIsInvalid(idxtuple))
00609 ereport(ERROR,
00610 (errmsg("index \"%s\" contains an inner tuple marked as invalid",
00611 RelationGetRelationName(r)),
00612 errdetail("This is caused by an incomplete page split at crash recovery before upgrading to PostgreSQL 9.1."),
00613 errhint("Please REINDEX it.")));
00614
00615
00616
00617
00618
00619 newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
00620 if (newtup)
00621 {
00622
00623
00624
00625
00626 if (!xlocked)
00627 {
00628 LockBuffer(stack->buffer, GIST_UNLOCK);
00629 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
00630 xlocked = true;
00631 stack->page = (Page) BufferGetPage(stack->buffer);
00632
00633 if (PageGetLSN(stack->page) != stack->lsn)
00634 {
00635
00636 continue;
00637 }
00638 }
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650 if (gistinserttuple(&state, stack, giststate, newtup,
00651 downlinkoffnum))
00652 {
00653
00654
00655
00656
00657
00658
00659 if (stack->blkno != GIST_ROOT_BLKNO)
00660 {
00661 UnlockReleaseBuffer(stack->buffer);
00662 xlocked = false;
00663 state.stack = stack = stack->parent;
00664 }
00665 continue;
00666 }
00667 }
00668 LockBuffer(stack->buffer, GIST_UNLOCK);
00669 xlocked = false;
00670
00671
00672 item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
00673 item->blkno = childblkno;
00674 item->parent = stack;
00675 item->downlinkoffnum = downlinkoffnum;
00676 state.stack = stack = item;
00677 }
00678 else
00679 {
00680
00681
00682
00683
00684
00685
00686
00687
00688
00689
00690 if (!xlocked)
00691 {
00692 LockBuffer(stack->buffer, GIST_UNLOCK);
00693 LockBuffer(stack->buffer, GIST_EXCLUSIVE);
00694 xlocked = true;
00695 stack->page = (Page) BufferGetPage(stack->buffer);
00696 stack->lsn = PageGetLSN(stack->page);
00697
00698 if (stack->blkno == GIST_ROOT_BLKNO)
00699 {
00700
00701
00702
00703
00704 if (!GistPageIsLeaf(stack->page))
00705 {
00706
00707
00708
00709
00710 LockBuffer(stack->buffer, GIST_UNLOCK);
00711 xlocked = false;
00712 continue;
00713 }
00714
00715
00716
00717
00718
00719 }
00720 else if (GistFollowRight(stack->page) ||
00721 stack->parent->lsn < GistPageGetNSN(stack->page))
00722 {
00723
00724
00725
00726
00727 UnlockReleaseBuffer(stack->buffer);
00728 xlocked = false;
00729 state.stack = stack = stack->parent;
00730 continue;
00731 }
00732 }
00733
00734
00735
00736 gistinserttuple(&state, stack, giststate, itup,
00737 InvalidOffsetNumber);
00738 LockBuffer(stack->buffer, GIST_UNLOCK);
00739
00740
00741 for (; stack; stack = stack->parent)
00742 ReleaseBuffer(stack->buffer);
00743 break;
00744 }
00745 }
00746 }
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756
00757 static GISTInsertStack *
00758 gistFindPath(Relation r, BlockNumber child, OffsetNumber *downlinkoffnum)
00759 {
00760 Page page;
00761 Buffer buffer;
00762 OffsetNumber i,
00763 maxoff;
00764 ItemId iid;
00765 IndexTuple idxtuple;
00766 List *fifo;
00767 GISTInsertStack *top,
00768 *ptr;
00769 BlockNumber blkno;
00770
00771 top = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
00772 top->blkno = GIST_ROOT_BLKNO;
00773 top->downlinkoffnum = InvalidOffsetNumber;
00774
00775 fifo = list_make1(top);
00776 while (fifo != NIL)
00777 {
00778
00779 top = linitial(fifo);
00780 fifo = list_delete_first(fifo);
00781
00782 buffer = ReadBuffer(r, top->blkno);
00783 LockBuffer(buffer, GIST_SHARE);
00784 gistcheckpage(r, buffer);
00785 page = (Page) BufferGetPage(buffer);
00786
00787 if (GistPageIsLeaf(page))
00788 {
00789
00790
00791
00792
00793 UnlockReleaseBuffer(buffer);
00794 break;
00795 }
00796
00797 top->lsn = PageGetLSN(page);
00798
00799
00800
00801
00802
00803 if (GistFollowRight(page))
00804 elog(ERROR, "concurrent GiST page split was incomplete");
00805
00806 if (top->parent && top->parent->lsn < GistPageGetNSN(page) &&
00807 GistPageGetOpaque(page)->rightlink != InvalidBlockNumber )
00808 {
00809
00810
00811
00812
00813
00814
00815
00816
00817
00818
00819
00820 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
00821 ptr->blkno = GistPageGetOpaque(page)->rightlink;
00822 ptr->downlinkoffnum = InvalidOffsetNumber;
00823 ptr->parent = top->parent;
00824
00825 fifo = lcons(ptr, fifo);
00826 }
00827
00828 maxoff = PageGetMaxOffsetNumber(page);
00829
00830 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
00831 {
00832 iid = PageGetItemId(page, i);
00833 idxtuple = (IndexTuple) PageGetItem(page, iid);
00834 blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
00835 if (blkno == child)
00836 {
00837
00838 UnlockReleaseBuffer(buffer);
00839 *downlinkoffnum = i;
00840 return top;
00841 }
00842 else
00843 {
00844
00845 ptr = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
00846 ptr->blkno = blkno;
00847 ptr->downlinkoffnum = i;
00848 ptr->parent = top;
00849
00850 fifo = lappend(fifo, ptr);
00851 }
00852 }
00853
00854 UnlockReleaseBuffer(buffer);
00855 }
00856
00857 elog(ERROR, "failed to re-find parent of a page in index \"%s\", block %u",
00858 RelationGetRelationName(r), child);
00859 return NULL;
00860 }
00861
00862
00863
00864
00865
00866
00867 static void
00868 gistFindCorrectParent(Relation r, GISTInsertStack *child)
00869 {
00870 GISTInsertStack *parent = child->parent;
00871
00872 gistcheckpage(r, parent->buffer);
00873 parent->page = (Page) BufferGetPage(parent->buffer);
00874
00875
00876 if (child->downlinkoffnum == InvalidOffsetNumber ||
00877 parent->lsn != PageGetLSN(parent->page))
00878 {
00879
00880 OffsetNumber i,
00881 maxoff;
00882 ItemId iid;
00883 IndexTuple idxtuple;
00884 GISTInsertStack *ptr;
00885
00886 while (true)
00887 {
00888 maxoff = PageGetMaxOffsetNumber(parent->page);
00889 for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
00890 {
00891 iid = PageGetItemId(parent->page, i);
00892 idxtuple = (IndexTuple) PageGetItem(parent->page, iid);
00893 if (ItemPointerGetBlockNumber(&(idxtuple->t_tid)) == child->blkno)
00894 {
00895
00896 child->downlinkoffnum = i;
00897 return;
00898 }
00899 }
00900
00901 parent->blkno = GistPageGetOpaque(parent->page)->rightlink;
00902 UnlockReleaseBuffer(parent->buffer);
00903 if (parent->blkno == InvalidBlockNumber)
00904 {
00905
00906
00907
00908
00909 break;
00910 }
00911 parent->buffer = ReadBuffer(r, parent->blkno);
00912 LockBuffer(parent->buffer, GIST_EXCLUSIVE);
00913 gistcheckpage(r, parent->buffer);
00914 parent->page = (Page) BufferGetPage(parent->buffer);
00915 }
00916
00917
00918
00919
00920
00921
00922 ptr = child->parent->parent;
00923
00924 while (ptr)
00925 {
00926 ReleaseBuffer(ptr->buffer);
00927 ptr = ptr->parent;
00928 }
00929
00930
00931 ptr = parent = gistFindPath(r, child->blkno, &child->downlinkoffnum);
00932
00933
00934
00935 while (ptr)
00936 {
00937 ptr->buffer = ReadBuffer(r, ptr->blkno);
00938 ptr->page = (Page) BufferGetPage(ptr->buffer);
00939 ptr = ptr->parent;
00940 }
00941
00942
00943 child->parent = parent;
00944
00945
00946 LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
00947 gistFindCorrectParent(r, child);
00948 }
00949
00950 return;
00951 }
00952
00953
00954
00955
00956 static IndexTuple
00957 gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
00958 GISTInsertStack *stack)
00959 {
00960 Page page = BufferGetPage(buf);
00961 OffsetNumber maxoff;
00962 OffsetNumber offset;
00963 IndexTuple downlink = NULL;
00964
00965 maxoff = PageGetMaxOffsetNumber(page);
00966 for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
00967 {
00968 IndexTuple ituple = (IndexTuple)
00969 PageGetItem(page, PageGetItemId(page, offset));
00970
00971 if (downlink == NULL)
00972 downlink = CopyIndexTuple(ituple);
00973 else
00974 {
00975 IndexTuple newdownlink;
00976
00977 newdownlink = gistgetadjusted(rel, downlink, ituple,
00978 giststate);
00979 if (newdownlink)
00980 downlink = newdownlink;
00981 }
00982 }
00983
00984
00985
00986
00987
00988
00989
00990
00991
00992
00993
00994 if (!downlink)
00995 {
00996 ItemId iid;
00997
00998 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
00999 gistFindCorrectParent(rel, stack);
01000 iid = PageGetItemId(stack->parent->page, stack->downlinkoffnum);
01001 downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
01002 downlink = CopyIndexTuple(downlink);
01003 LockBuffer(stack->parent->buffer, GIST_UNLOCK);
01004 }
01005
01006 ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
01007 GistTupleSetValid(downlink);
01008
01009 return downlink;
01010 }
01011
01012
01013
01014
01015
01016 static void
01017 gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
01018 {
01019 GISTInsertStack *stack = state->stack;
01020 Buffer buf;
01021 Page page;
01022 List *splitinfo = NIL;
01023
01024 elog(LOG, "fixing incomplete split in index \"%s\", block %u",
01025 RelationGetRelationName(state->r), stack->blkno);
01026
01027 Assert(GistFollowRight(stack->page));
01028 Assert(OffsetNumberIsValid(stack->downlinkoffnum));
01029
01030 buf = stack->buffer;
01031
01032
01033
01034
01035
01036 for (;;)
01037 {
01038 GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
01039 IndexTuple downlink;
01040
01041 page = BufferGetPage(buf);
01042
01043
01044 downlink = gistformdownlink(state->r, buf, giststate, stack);
01045
01046 si->buf = buf;
01047 si->downlink = downlink;
01048
01049 splitinfo = lappend(splitinfo, si);
01050
01051 if (GistFollowRight(page))
01052 {
01053
01054 buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
01055 LockBuffer(buf, GIST_EXCLUSIVE);
01056 }
01057 else
01058 break;
01059 }
01060
01061
01062 gistfinishsplit(state, stack, giststate, splitinfo, false);
01063 }
01064
01065
01066
01067
01068
01069
01070
01071
01072
01073
01074
01075 static bool
01076 gistinserttuple(GISTInsertState *state, GISTInsertStack *stack,
01077 GISTSTATE *giststate, IndexTuple tuple, OffsetNumber oldoffnum)
01078 {
01079 return gistinserttuples(state, stack, giststate, &tuple, 1, oldoffnum,
01080 InvalidBuffer, InvalidBuffer, false, false);
01081 }
01082
01083
01084
01085
01086
01087
01088
01089
01090
01091
01092
01093
01094
01095
01096
01097
01098
01099
01100
01101
01102
01103
01104
01105
01106
01107
01108
01109 static bool
01110 gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
01111 GISTSTATE *giststate,
01112 IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
01113 Buffer leftchild, Buffer rightchild,
01114 bool unlockbuf, bool unlockleftchild)
01115 {
01116 List *splitinfo;
01117 bool is_split;
01118
01119
01120 is_split = gistplacetopage(state->r, state->freespace, giststate,
01121 stack->buffer,
01122 tuples, ntup,
01123 oldoffnum, NULL,
01124 leftchild,
01125 &splitinfo,
01126 true);
01127
01128
01129
01130
01131
01132
01133 if (BufferIsValid(rightchild))
01134 UnlockReleaseBuffer(rightchild);
01135 if (BufferIsValid(leftchild) && unlockleftchild)
01136 LockBuffer(leftchild, GIST_UNLOCK);
01137
01138
01139
01140
01141
01142
01143
01144 if (splitinfo)
01145 gistfinishsplit(state, stack, giststate, splitinfo, unlockbuf);
01146 else if (unlockbuf)
01147 LockBuffer(stack->buffer, GIST_UNLOCK);
01148
01149 return is_split;
01150 }
01151
01152
01153
01154
01155
01156
01157
01158
01159
01160
01161 static void
01162 gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
01163 GISTSTATE *giststate, List *splitinfo, bool unlockbuf)
01164 {
01165 ListCell *lc;
01166 List *reversed;
01167 GISTPageSplitInfo *right;
01168 GISTPageSplitInfo *left;
01169 IndexTuple tuples[2];
01170
01171
01172 Assert(list_length(splitinfo) >= 2);
01173
01174
01175
01176
01177
01178
01179
01180
01181
01182
01183 reversed = NIL;
01184 foreach(lc, splitinfo)
01185 {
01186 reversed = lcons(lfirst(lc), reversed);
01187 }
01188
01189 LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
01190 gistFindCorrectParent(state->r, stack);
01191
01192
01193
01194
01195
01196 while (list_length(reversed) > 2)
01197 {
01198 right = (GISTPageSplitInfo *) linitial(reversed);
01199 left = (GISTPageSplitInfo *) lsecond(reversed);
01200
01201 if (gistinserttuples(state, stack->parent, giststate,
01202 &right->downlink, 1,
01203 InvalidOffsetNumber,
01204 left->buf, right->buf, false, false))
01205 {
01206
01207
01208
01209
01210 gistFindCorrectParent(state->r, stack);
01211 }
01212
01213 reversed = list_delete_first(reversed);
01214 }
01215
01216 right = (GISTPageSplitInfo *) linitial(reversed);
01217 left = (GISTPageSplitInfo *) lsecond(reversed);
01218
01219
01220
01221
01222
01223
01224 tuples[0] = left->downlink;
01225 tuples[1] = right->downlink;
01226 gistinserttuples(state, stack->parent, giststate,
01227 tuples, 2,
01228 stack->downlinkoffnum,
01229 left->buf, right->buf,
01230 true,
01231 unlockbuf
01232 );
01233 Assert(left->buf == stack->buffer);
01234 }
01235
01236
01237
01238
01239
01240
01241 SplitedPageLayout *
01242 gistSplit(Relation r,
01243 Page page,
01244 IndexTuple *itup,
01245 int len,
01246 GISTSTATE *giststate)
01247 {
01248 IndexTuple *lvectup,
01249 *rvectup;
01250 GistSplitVector v;
01251 int i;
01252 SplitedPageLayout *res = NULL;
01253
01254 memset(v.spl_lisnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
01255 memset(v.spl_risnull, TRUE, sizeof(bool) * giststate->tupdesc->natts);
01256 gistSplitByKey(r, page, itup, len, giststate, &v, 0);
01257
01258
01259 lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
01260 rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
01261
01262 for (i = 0; i < v.splitVector.spl_nleft; i++)
01263 lvectup[i] = itup[v.splitVector.spl_left[i] - 1];
01264
01265 for (i = 0; i < v.splitVector.spl_nright; i++)
01266 rvectup[i] = itup[v.splitVector.spl_right[i] - 1];
01267
01268
01269 if (!gistfitpage(rvectup, v.splitVector.spl_nright))
01270 {
01271 res = gistSplit(r, page, rvectup, v.splitVector.spl_nright, giststate);
01272 }
01273 else
01274 {
01275 ROTATEDIST(res);
01276 res->block.num = v.splitVector.spl_nright;
01277 res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
01278 res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
01279 }
01280
01281 if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
01282 {
01283 SplitedPageLayout *resptr,
01284 *subres;
01285
01286 resptr = subres = gistSplit(r, page, lvectup, v.splitVector.spl_nleft, giststate);
01287
01288
01289 while (resptr->next)
01290 resptr = resptr->next;
01291
01292 resptr->next = res;
01293 res = subres;
01294 }
01295 else
01296 {
01297 ROTATEDIST(res);
01298 res->block.num = v.splitVector.spl_nleft;
01299 res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
01300 res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
01301 }
01302
01303 return res;
01304 }
01305
01306
01307
01308
01309 GISTSTATE *
01310 initGISTstate(Relation index)
01311 {
01312 GISTSTATE *giststate;
01313 MemoryContext scanCxt;
01314 MemoryContext oldCxt;
01315 int i;
01316
01317
01318 if (index->rd_att->natts > INDEX_MAX_KEYS)
01319 elog(ERROR, "numberOfAttributes %d > %d",
01320 index->rd_att->natts, INDEX_MAX_KEYS);
01321
01322
01323 scanCxt = AllocSetContextCreate(CurrentMemoryContext,
01324 "GiST scan context",
01325 ALLOCSET_DEFAULT_MINSIZE,
01326 ALLOCSET_DEFAULT_INITSIZE,
01327 ALLOCSET_DEFAULT_MAXSIZE);
01328 oldCxt = MemoryContextSwitchTo(scanCxt);
01329
01330
01331 giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
01332
01333 giststate->scanCxt = scanCxt;
01334 giststate->tempCxt = scanCxt;
01335 giststate->tupdesc = index->rd_att;
01336
01337 for (i = 0; i < index->rd_att->natts; i++)
01338 {
01339 fmgr_info_copy(&(giststate->consistentFn[i]),
01340 index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
01341 scanCxt);
01342 fmgr_info_copy(&(giststate->unionFn[i]),
01343 index_getprocinfo(index, i + 1, GIST_UNION_PROC),
01344 scanCxt);
01345 fmgr_info_copy(&(giststate->compressFn[i]),
01346 index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
01347 scanCxt);
01348 fmgr_info_copy(&(giststate->decompressFn[i]),
01349 index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
01350 scanCxt);
01351 fmgr_info_copy(&(giststate->penaltyFn[i]),
01352 index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
01353 scanCxt);
01354 fmgr_info_copy(&(giststate->picksplitFn[i]),
01355 index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
01356 scanCxt);
01357 fmgr_info_copy(&(giststate->equalFn[i]),
01358 index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
01359 scanCxt);
01360
01361 if (OidIsValid(index_getprocid(index, i + 1, GIST_DISTANCE_PROC)))
01362 fmgr_info_copy(&(giststate->distanceFn[i]),
01363 index_getprocinfo(index, i + 1, GIST_DISTANCE_PROC),
01364 scanCxt);
01365 else
01366 giststate->distanceFn[i].fn_oid = InvalidOid;
01367
01368
01369
01370
01371
01372
01373
01374
01375
01376
01377
01378
01379 if (OidIsValid(index->rd_indcollation[i]))
01380 giststate->supportCollation[i] = index->rd_indcollation[i];
01381 else
01382 giststate->supportCollation[i] = DEFAULT_COLLATION_OID;
01383 }
01384
01385 MemoryContextSwitchTo(oldCxt);
01386
01387 return giststate;
01388 }
01389
01390 void
01391 freeGISTstate(GISTSTATE *giststate)
01392 {
01393
01394 MemoryContextDelete(giststate->scanCxt);
01395 }