00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "postgres.h"
00017
00018 #include "access/genam.h"
00019 #include "access/spgist_private.h"
00020 #include "miscadmin.h"
00021 #include "storage/bufmgr.h"
00022 #include "utils/rel.h"
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 typedef struct SPPageDesc
00033 {
00034 BlockNumber blkno;
00035 Buffer buffer;
00036 Page page;
00037 OffsetNumber offnum;
00038 int node;
00039 } SPPageDesc;
00040
00041
00042
00043
00044
00045
00046
00047 void
00048 spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
00049 BlockNumber blkno, OffsetNumber offset)
00050 {
00051 int i;
00052 SpGistNodeTuple node;
00053
00054 SGITITERATE(tup, i, node)
00055 {
00056 if (i == nodeN)
00057 {
00058 ItemPointerSet(&node->t_tid, blkno, offset);
00059 return;
00060 }
00061 }
00062
00063 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
00064 nodeN);
00065 }
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075 static SpGistInnerTuple
00076 addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
00077 {
00078 SpGistNodeTuple node,
00079 *nodes;
00080 int i;
00081
00082
00083 if (offset < 0)
00084 offset = tuple->nNodes;
00085 else if (offset > tuple->nNodes)
00086 elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
00087
00088 nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
00089 SGITITERATE(tuple, i, node)
00090 {
00091 if (i < offset)
00092 nodes[i] = node;
00093 else
00094 nodes[i + 1] = node;
00095 }
00096
00097 nodes[offset] = spgFormNodeTuple(state, label, false);
00098
00099 return spgFormInnerTuple(state,
00100 (tuple->prefixSize > 0),
00101 SGITDATUM(tuple, state),
00102 tuple->nNodes + 1,
00103 nodes);
00104 }
00105
00106
00107 static int
00108 cmpOffsetNumbers(const void *a, const void *b)
00109 {
00110 if (*(const OffsetNumber *) a == *(const OffsetNumber *) b)
00111 return 0;
00112 return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1;
00113 }
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127 void
00128 spgPageIndexMultiDelete(SpGistState *state, Page page,
00129 OffsetNumber *itemnos, int nitems,
00130 int firststate, int reststate,
00131 BlockNumber blkno, OffsetNumber offnum)
00132 {
00133 OffsetNumber firstItem;
00134 OffsetNumber *sortednos;
00135 SpGistDeadTuple tuple = NULL;
00136 int i;
00137
00138 if (nitems == 0)
00139 return;
00140
00141
00142
00143
00144
00145
00146
00147
00148 sortednos = (OffsetNumber *) palloc(sizeof(OffsetNumber) * nitems);
00149 memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
00150 if (nitems > 1)
00151 qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
00152
00153 PageIndexMultiDelete(page, sortednos, nitems);
00154
00155 firstItem = itemnos[0];
00156
00157 for (i = 0; i < nitems; i++)
00158 {
00159 OffsetNumber itemno = sortednos[i];
00160 int tupstate;
00161
00162 tupstate = (itemno == firstItem) ? firststate : reststate;
00163 if (tuple == NULL || tuple->tupstate != tupstate)
00164 tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
00165
00166 if (PageAddItem(page, (Item) tuple, tuple->size,
00167 itemno, false, false) != itemno)
00168 elog(ERROR, "failed to add item of size %u to SPGiST index page",
00169 tuple->size);
00170
00171 if (tupstate == SPGIST_REDIRECT)
00172 SpGistPageGetOpaque(page)->nRedirection++;
00173 else if (tupstate == SPGIST_PLACEHOLDER)
00174 SpGistPageGetOpaque(page)->nPlaceholder++;
00175 }
00176
00177 pfree(sortednos);
00178 }
00179
00180
00181
00182
00183
00184
00185 static void
00186 saveNodeLink(Relation index, SPPageDesc *parent,
00187 BlockNumber blkno, OffsetNumber offnum)
00188 {
00189 SpGistInnerTuple innerTuple;
00190
00191 innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
00192 PageGetItemId(parent->page, parent->offnum));
00193
00194 spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
00195
00196 MarkBufferDirty(parent->buffer);
00197 }
00198
00199
00200
00201
00202 static void
00203 addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
00204 SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
00205 {
00206 XLogRecData rdata[4];
00207 spgxlogAddLeaf xlrec;
00208
00209 xlrec.node = index->rd_node;
00210 xlrec.blknoLeaf = current->blkno;
00211 xlrec.newPage = isNew;
00212 xlrec.storesNulls = isNulls;
00213
00214
00215 xlrec.offnumLeaf = InvalidOffsetNumber;
00216 xlrec.offnumHeadLeaf = InvalidOffsetNumber;
00217 xlrec.blknoParent = InvalidBlockNumber;
00218 xlrec.offnumParent = InvalidOffsetNumber;
00219 xlrec.nodeI = 0;
00220
00221 ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
00222
00223 ACCEPT_RDATA_DATA(leafTuple, leafTuple->size, 1);
00224 ACCEPT_RDATA_BUFFER(current->buffer, 2);
00225
00226 START_CRIT_SECTION();
00227
00228 if (current->offnum == InvalidOffsetNumber ||
00229 SpGistBlockIsRoot(current->blkno))
00230 {
00231
00232 leafTuple->nextOffset = InvalidOffsetNumber;
00233 current->offnum = SpGistPageAddNewItem(state, current->page,
00234 (Item) leafTuple, leafTuple->size,
00235 NULL, false);
00236
00237 xlrec.offnumLeaf = current->offnum;
00238
00239
00240 if (parent->buffer != InvalidBuffer)
00241 {
00242 xlrec.blknoParent = parent->blkno;
00243 xlrec.offnumParent = parent->offnum;
00244 xlrec.nodeI = parent->node;
00245
00246 saveNodeLink(index, parent, current->blkno, current->offnum);
00247
00248 ACCEPT_RDATA_BUFFER(parent->buffer, 3);
00249 }
00250 }
00251 else
00252 {
00253
00254
00255
00256
00257
00258
00259
00260
00261 SpGistLeafTuple head;
00262 OffsetNumber offnum;
00263
00264 head = (SpGistLeafTuple) PageGetItem(current->page,
00265 PageGetItemId(current->page, current->offnum));
00266 if (head->tupstate == SPGIST_LIVE)
00267 {
00268 leafTuple->nextOffset = head->nextOffset;
00269 offnum = SpGistPageAddNewItem(state, current->page,
00270 (Item) leafTuple, leafTuple->size,
00271 NULL, false);
00272
00273
00274
00275
00276
00277 head = (SpGistLeafTuple) PageGetItem(current->page,
00278 PageGetItemId(current->page, current->offnum));
00279 head->nextOffset = offnum;
00280
00281 xlrec.offnumLeaf = offnum;
00282 xlrec.offnumHeadLeaf = current->offnum;
00283 }
00284 else if (head->tupstate == SPGIST_DEAD)
00285 {
00286 leafTuple->nextOffset = InvalidOffsetNumber;
00287 PageIndexTupleDelete(current->page, current->offnum);
00288 if (PageAddItem(current->page,
00289 (Item) leafTuple, leafTuple->size,
00290 current->offnum, false, false) != current->offnum)
00291 elog(ERROR, "failed to add item of size %u to SPGiST index page",
00292 leafTuple->size);
00293
00294
00295 xlrec.offnumLeaf = current->offnum;
00296 xlrec.offnumHeadLeaf = current->offnum;
00297 }
00298 else
00299 elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
00300 }
00301
00302 MarkBufferDirty(current->buffer);
00303
00304 if (RelationNeedsWAL(index))
00305 {
00306 XLogRecPtr recptr;
00307
00308 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata);
00309
00310 PageSetLSN(current->page, recptr);
00311
00312
00313 if (xlrec.blknoParent != InvalidBlockNumber)
00314 {
00315 PageSetLSN(parent->page, recptr);
00316 }
00317 }
00318
00319 END_CRIT_SECTION();
00320 }
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332 static int
00333 checkSplitConditions(Relation index, SpGistState *state,
00334 SPPageDesc *current, int *nToSplit)
00335 {
00336 int i,
00337 n = 0,
00338 totalSize = 0;
00339
00340 if (SpGistBlockIsRoot(current->blkno))
00341 {
00342
00343 *nToSplit = BLCKSZ;
00344 return BLCKSZ;
00345 }
00346
00347 i = current->offnum;
00348 while (i != InvalidOffsetNumber)
00349 {
00350 SpGistLeafTuple it;
00351
00352 Assert(i >= FirstOffsetNumber &&
00353 i <= PageGetMaxOffsetNumber(current->page));
00354 it = (SpGistLeafTuple) PageGetItem(current->page,
00355 PageGetItemId(current->page, i));
00356 if (it->tupstate == SPGIST_LIVE)
00357 {
00358 n++;
00359 totalSize += it->size + sizeof(ItemIdData);
00360 }
00361 else if (it->tupstate == SPGIST_DEAD)
00362 {
00363
00364 Assert(i == current->offnum);
00365 Assert(it->nextOffset == InvalidOffsetNumber);
00366
00367 }
00368 else
00369 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
00370
00371 i = it->nextOffset;
00372 }
00373
00374 *nToSplit = n;
00375
00376 return totalSize;
00377 }
00378
00379
00380
00381
00382
00383
00384
00385
00386 static void
00387 moveLeafs(Relation index, SpGistState *state,
00388 SPPageDesc *current, SPPageDesc *parent,
00389 SpGistLeafTuple newLeafTuple, bool isNulls)
00390 {
00391 int i,
00392 nDelete,
00393 nInsert,
00394 size;
00395 Buffer nbuf;
00396 Page npage;
00397 SpGistLeafTuple it;
00398 OffsetNumber r = InvalidOffsetNumber,
00399 startOffset = InvalidOffsetNumber;
00400 bool replaceDead = false;
00401 OffsetNumber *toDelete;
00402 OffsetNumber *toInsert;
00403 BlockNumber nblkno;
00404 XLogRecData rdata[7];
00405 spgxlogMoveLeafs xlrec;
00406 char *leafdata,
00407 *leafptr;
00408
00409
00410 Assert(parent->buffer != InvalidBuffer);
00411 Assert(parent->buffer != current->buffer);
00412
00413
00414 i = PageGetMaxOffsetNumber(current->page);
00415 toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
00416 toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
00417
00418 size = newLeafTuple->size + sizeof(ItemIdData);
00419
00420 nDelete = 0;
00421 i = current->offnum;
00422 while (i != InvalidOffsetNumber)
00423 {
00424 SpGistLeafTuple it;
00425
00426 Assert(i >= FirstOffsetNumber &&
00427 i <= PageGetMaxOffsetNumber(current->page));
00428 it = (SpGistLeafTuple) PageGetItem(current->page,
00429 PageGetItemId(current->page, i));
00430
00431 if (it->tupstate == SPGIST_LIVE)
00432 {
00433 toDelete[nDelete] = i;
00434 size += it->size + sizeof(ItemIdData);
00435 nDelete++;
00436 }
00437 else if (it->tupstate == SPGIST_DEAD)
00438 {
00439
00440 Assert(i == current->offnum);
00441 Assert(it->nextOffset == InvalidOffsetNumber);
00442
00443 toDelete[nDelete] = i;
00444 nDelete++;
00445 replaceDead = true;
00446 }
00447 else
00448 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
00449
00450 i = it->nextOffset;
00451 }
00452
00453
00454 nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
00455 size, &xlrec.newPage);
00456 npage = BufferGetPage(nbuf);
00457 nblkno = BufferGetBlockNumber(nbuf);
00458 Assert(nblkno != current->blkno);
00459
00460
00461 xlrec.node = index->rd_node;
00462 STORE_STATE(state, xlrec.stateSrc);
00463
00464 xlrec.blknoSrc = current->blkno;
00465 xlrec.blknoDst = nblkno;
00466 xlrec.nMoves = nDelete;
00467 xlrec.replaceDead = replaceDead;
00468 xlrec.storesNulls = isNulls;
00469
00470 xlrec.blknoParent = parent->blkno;
00471 xlrec.offnumParent = parent->offnum;
00472 xlrec.nodeI = parent->node;
00473
00474 leafdata = leafptr = palloc(size);
00475
00476 START_CRIT_SECTION();
00477
00478
00479 nInsert = 0;
00480 if (!replaceDead)
00481 {
00482 for (i = 0; i < nDelete; i++)
00483 {
00484 it = (SpGistLeafTuple) PageGetItem(current->page,
00485 PageGetItemId(current->page, toDelete[i]));
00486 Assert(it->tupstate == SPGIST_LIVE);
00487
00488
00489
00490
00491
00492
00493 it->nextOffset = r;
00494
00495 r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
00496 &startOffset, false);
00497
00498 toInsert[nInsert] = r;
00499 nInsert++;
00500
00501
00502 memcpy(leafptr, it, it->size);
00503 leafptr += it->size;
00504 }
00505 }
00506
00507
00508 newLeafTuple->nextOffset = r;
00509 r = SpGistPageAddNewItem(state, npage,
00510 (Item) newLeafTuple, newLeafTuple->size,
00511 &startOffset, false);
00512 toInsert[nInsert] = r;
00513 nInsert++;
00514 memcpy(leafptr, newLeafTuple, newLeafTuple->size);
00515 leafptr += newLeafTuple->size;
00516
00517
00518
00519
00520
00521
00522 spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
00523 state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
00524 SPGIST_PLACEHOLDER,
00525 nblkno, r);
00526
00527
00528 saveNodeLink(index, parent, nblkno, r);
00529
00530
00531 MarkBufferDirty(current->buffer);
00532 MarkBufferDirty(nbuf);
00533
00534 if (RelationNeedsWAL(index))
00535 {
00536 XLogRecPtr recptr;
00537
00538 ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0);
00539 ACCEPT_RDATA_DATA(toDelete, MAXALIGN(sizeof(OffsetNumber) * nDelete), 1);
00540 ACCEPT_RDATA_DATA(toInsert, MAXALIGN(sizeof(OffsetNumber) * nInsert), 2);
00541 ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, 3);
00542 ACCEPT_RDATA_BUFFER(current->buffer, 4);
00543 ACCEPT_RDATA_BUFFER(nbuf, 5);
00544 ACCEPT_RDATA_BUFFER(parent->buffer, 6);
00545
00546 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata);
00547
00548 PageSetLSN(current->page, recptr);
00549 PageSetLSN(npage, recptr);
00550 PageSetLSN(parent->page, recptr);
00551 }
00552
00553 END_CRIT_SECTION();
00554
00555
00556 SpGistSetLastUsedPage(index, nbuf);
00557 UnlockReleaseBuffer(nbuf);
00558 }
00559
00560
00561
00562
00563
00564
00565
00566
00567 static void
00568 setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
00569 BlockNumber blkno, OffsetNumber offnum)
00570 {
00571 SpGistDeadTuple dt;
00572
00573 dt = (SpGistDeadTuple) PageGetItem(current->page,
00574 PageGetItemId(current->page, position));
00575 Assert(dt->tupstate == SPGIST_REDIRECT);
00576 Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
00577 ItemPointerSet(&dt->pointer, blkno, offnum);
00578 }
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598 static bool
00599 checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
00600 bool *includeNew)
00601 {
00602 int theNode;
00603 int limit;
00604 int i;
00605
00606
00607 *includeNew = true;
00608
00609
00610 if (in->nTuples <= 1)
00611 return false;
00612
00613
00614 limit = tooBig ? in->nTuples - 1 : in->nTuples;
00615
00616
00617 theNode = out->mapTuplesToNodes[0];
00618 for (i = 1; i < limit; i++)
00619 {
00620 if (out->mapTuplesToNodes[i] != theNode)
00621 return false;
00622 }
00623
00624
00625
00626
00627 if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
00628 *includeNew = false;
00629
00630 out->nNodes = 8;
00631
00632
00633 for (i = 0; i < in->nTuples; i++)
00634 out->mapTuplesToNodes[i] = i % out->nNodes;
00635
00636
00637 if (out->nodeLabels)
00638 {
00639 Datum theLabel = out->nodeLabels[theNode];
00640
00641 out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
00642 for (i = 0; i < out->nNodes; i++)
00643 out->nodeLabels[i] = theLabel;
00644 }
00645
00646
00647
00648 return true;
00649 }
00650
00651
00652
00653
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675 static bool
00676 doPickSplit(Relation index, SpGistState *state,
00677 SPPageDesc *current, SPPageDesc *parent,
00678 SpGistLeafTuple newLeafTuple,
00679 int level, bool isNulls, bool isNew)
00680 {
00681 bool insertedNew = false;
00682 spgPickSplitIn in;
00683 spgPickSplitOut out;
00684 FmgrInfo *procinfo;
00685 bool includeNew;
00686 int i,
00687 max,
00688 n;
00689 SpGistInnerTuple innerTuple;
00690 SpGistNodeTuple node,
00691 *nodes;
00692 Buffer newInnerBuffer,
00693 newLeafBuffer;
00694 ItemPointerData *heapPtrs;
00695 uint8 *leafPageSelect;
00696 int *leafSizes;
00697 OffsetNumber *toDelete;
00698 OffsetNumber *toInsert;
00699 OffsetNumber redirectTuplePos = InvalidOffsetNumber;
00700 OffsetNumber startOffsets[2];
00701 SpGistLeafTuple *newLeafs;
00702 int spaceToDelete;
00703 int currentFreeSpace;
00704 int totalLeafSizes;
00705 bool allTheSame;
00706 XLogRecData rdata[10];
00707 int nRdata;
00708 spgxlogPickSplit xlrec;
00709 char *leafdata,
00710 *leafptr;
00711 SPPageDesc saveCurrent;
00712 int nToDelete,
00713 nToInsert,
00714 maxToInclude;
00715
00716 in.level = level;
00717
00718
00719
00720
00721 max = PageGetMaxOffsetNumber(current->page);
00722 n = max + 1;
00723 in.datums = (Datum *) palloc(sizeof(Datum) * n);
00724 heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
00725 toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
00726 toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
00727 newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
00728 leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
00729
00730 xlrec.node = index->rd_node;
00731 STORE_STATE(state, xlrec.stateSrc);
00732
00733
00734
00735
00736
00737
00738
00739
00740
00741
00742
00743
00744
00745
00746 nToInsert = 0;
00747 nToDelete = 0;
00748 spaceToDelete = 0;
00749 if (SpGistBlockIsRoot(current->blkno))
00750 {
00751
00752
00753
00754
00755
00756 for (i = FirstOffsetNumber; i <= max; i++)
00757 {
00758 SpGistLeafTuple it;
00759
00760 it = (SpGistLeafTuple) PageGetItem(current->page,
00761 PageGetItemId(current->page, i));
00762 if (it->tupstate == SPGIST_LIVE)
00763 {
00764 in.datums[nToInsert] = SGLTDATUM(it, state);
00765 heapPtrs[nToInsert] = it->heapPtr;
00766 nToInsert++;
00767 toDelete[nToDelete] = i;
00768 nToDelete++;
00769
00770 spaceToDelete += it->size + sizeof(ItemIdData);
00771 }
00772 else
00773 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
00774 }
00775 }
00776 else
00777 {
00778
00779 i = current->offnum;
00780 while (i != InvalidOffsetNumber)
00781 {
00782 SpGistLeafTuple it;
00783
00784 Assert(i >= FirstOffsetNumber && i <= max);
00785 it = (SpGistLeafTuple) PageGetItem(current->page,
00786 PageGetItemId(current->page, i));
00787 if (it->tupstate == SPGIST_LIVE)
00788 {
00789 in.datums[nToInsert] = SGLTDATUM(it, state);
00790 heapPtrs[nToInsert] = it->heapPtr;
00791 nToInsert++;
00792 toDelete[nToDelete] = i;
00793 nToDelete++;
00794
00795 Assert(it->size >= SGDTSIZE);
00796 spaceToDelete += it->size - SGDTSIZE;
00797 }
00798 else if (it->tupstate == SPGIST_DEAD)
00799 {
00800
00801 Assert(i == current->offnum);
00802 Assert(it->nextOffset == InvalidOffsetNumber);
00803 toDelete[nToDelete] = i;
00804 nToDelete++;
00805
00806 }
00807 else
00808 elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
00809
00810 i = it->nextOffset;
00811 }
00812 }
00813 in.nTuples = nToInsert;
00814
00815
00816
00817
00818
00819
00820
00821 in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state);
00822 heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
00823 in.nTuples++;
00824
00825 memset(&out, 0, sizeof(out));
00826
00827 if (!isNulls)
00828 {
00829
00830
00831
00832 procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
00833 FunctionCall2Coll(procinfo,
00834 index->rd_indcollation[0],
00835 PointerGetDatum(&in),
00836 PointerGetDatum(&out));
00837
00838
00839
00840
00841 totalLeafSizes = 0;
00842 for (i = 0; i < in.nTuples; i++)
00843 {
00844 newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
00845 out.leafTupleDatums[i],
00846 false);
00847 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
00848 }
00849 }
00850 else
00851 {
00852
00853
00854
00855
00856 out.hasPrefix = false;
00857 out.nNodes = 1;
00858 out.nodeLabels = NULL;
00859 out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
00860
00861
00862
00863
00864 totalLeafSizes = 0;
00865 for (i = 0; i < in.nTuples; i++)
00866 {
00867 newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
00868 (Datum) 0,
00869 true);
00870 totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
00871 }
00872 }
00873
00874
00875
00876
00877
00878
00879
00880
00881 allTheSame = checkAllTheSame(&in, &out,
00882 totalLeafSizes > SPGIST_PAGE_CAPACITY,
00883 &includeNew);
00884
00885
00886
00887
00888
00889 if (includeNew)
00890 maxToInclude = in.nTuples;
00891 else
00892 {
00893 maxToInclude = in.nTuples - 1;
00894 totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
00895 }
00896
00897
00898
00899
00900
00901
00902 nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
00903 leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
00904
00905
00906
00907
00908 for (i = 0; i < out.nNodes; i++)
00909 {
00910 Datum label = (Datum) 0;
00911 bool labelisnull = (out.nodeLabels == NULL);
00912
00913 if (!labelisnull)
00914 label = out.nodeLabels[i];
00915 nodes[i] = spgFormNodeTuple(state, label, labelisnull);
00916 }
00917 innerTuple = spgFormInnerTuple(state,
00918 out.hasPrefix, out.prefixDatum,
00919 out.nNodes, nodes);
00920 innerTuple->allTheSame = allTheSame;
00921
00922
00923
00924
00925
00926 SGITITERATE(innerTuple, i, node)
00927 {
00928 nodes[i] = node;
00929 }
00930
00931
00932
00933
00934 for (i = 0; i < maxToInclude; i++)
00935 {
00936 n = out.mapTuplesToNodes[i];
00937 if (n < 0 || n >= out.nNodes)
00938 elog(ERROR, "inconsistent result of SPGiST picksplit function");
00939 leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
00940 }
00941
00942
00943
00944
00945
00946
00947
00948
00949
00950
00951 xlrec.initInner = false;
00952 if (parent->buffer != InvalidBuffer &&
00953 !SpGistBlockIsRoot(parent->blkno) &&
00954 (SpGistPageGetFreeSpace(parent->page, 1) >=
00955 innerTuple->size + sizeof(ItemIdData)))
00956 {
00957
00958 newInnerBuffer = parent->buffer;
00959 }
00960 else if (parent->buffer != InvalidBuffer)
00961 {
00962
00963 newInnerBuffer = SpGistGetBuffer(index,
00964 GBUF_INNER_PARITY(parent->blkno + 1) |
00965 (isNulls ? GBUF_NULLS : 0),
00966 innerTuple->size + sizeof(ItemIdData),
00967 &xlrec.initInner);
00968 }
00969 else
00970 {
00971
00972 newInnerBuffer = InvalidBuffer;
00973 }
00974
00975
00976
00977
00978
00979
00980
00981
00982
00983
00984
00985
00986
00987
00988
00989
00990
00991
00992
00993
00994
00995
00996
00997 if (!SpGistBlockIsRoot(current->blkno))
00998 currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
00999 else
01000 currentFreeSpace = 0;
01001
01002 xlrec.initDest = false;
01003
01004 if (totalLeafSizes <= currentFreeSpace)
01005 {
01006
01007 newLeafBuffer = InvalidBuffer;
01008
01009 if (includeNew)
01010 {
01011 nToInsert++;
01012 insertedNew = true;
01013 }
01014 for (i = 0; i < nToInsert; i++)
01015 leafPageSelect[i] = 0;
01016 }
01017 else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
01018 {
01019
01020
01021
01022
01023
01024 newLeafBuffer = InvalidBuffer;
01025 Assert(includeNew);
01026 Assert(nToInsert == 0);
01027 }
01028 else
01029 {
01030
01031 uint8 *nodePageSelect;
01032 int curspace;
01033 int newspace;
01034
01035 newLeafBuffer = SpGistGetBuffer(index,
01036 GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
01037 Min(totalLeafSizes,
01038 SPGIST_PAGE_CAPACITY),
01039 &xlrec.initDest);
01040
01041
01042
01043
01044
01045
01046 nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
01047
01048 curspace = currentFreeSpace;
01049 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
01050 for (i = 0; i < out.nNodes; i++)
01051 {
01052 if (leafSizes[i] <= curspace)
01053 {
01054 nodePageSelect[i] = 0;
01055 curspace -= leafSizes[i];
01056 }
01057 else
01058 {
01059 nodePageSelect[i] = 1;
01060 newspace -= leafSizes[i];
01061 }
01062 }
01063 if (curspace >= 0 && newspace >= 0)
01064 {
01065
01066 if (includeNew)
01067 {
01068 nToInsert++;
01069 insertedNew = true;
01070 }
01071 }
01072 else if (includeNew)
01073 {
01074
01075 int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
01076
01077 leafSizes[nodeOfNewTuple] -=
01078 newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
01079
01080
01081 curspace = currentFreeSpace;
01082 newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
01083 for (i = 0; i < out.nNodes; i++)
01084 {
01085 if (leafSizes[i] <= curspace)
01086 {
01087 nodePageSelect[i] = 0;
01088 curspace -= leafSizes[i];
01089 }
01090 else
01091 {
01092 nodePageSelect[i] = 1;
01093 newspace -= leafSizes[i];
01094 }
01095 }
01096 if (curspace < 0 || newspace < 0)
01097 elog(ERROR, "failed to divide leaf tuple groups across pages");
01098 }
01099 else
01100 {
01101
01102 elog(ERROR, "failed to divide leaf tuple groups across pages");
01103 }
01104
01105 for (i = 0; i < nToInsert; i++)
01106 {
01107 n = out.mapTuplesToNodes[i];
01108 leafPageSelect[i] = nodePageSelect[n];
01109 }
01110 }
01111
01112
01113 xlrec.blknoSrc = current->blkno;
01114 xlrec.blknoDest = InvalidBlockNumber;
01115 xlrec.nDelete = 0;
01116 xlrec.initSrc = isNew;
01117 xlrec.storesNulls = isNulls;
01118
01119 leafdata = leafptr = (char *) palloc(totalLeafSizes);
01120
01121 ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0);
01122 ACCEPT_RDATA_DATA(innerTuple, innerTuple->size, 1);
01123 nRdata = 2;
01124
01125
01126 START_CRIT_SECTION();
01127
01128
01129
01130
01131
01132
01133 if (!SpGistBlockIsRoot(current->blkno))
01134 {
01135
01136
01137
01138
01139
01140 if (state->isBuild &&
01141 nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
01142 PageGetMaxOffsetNumber(current->page))
01143 {
01144 SpGistInitBuffer(current->buffer,
01145 SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
01146 xlrec.initSrc = true;
01147 }
01148 else if (isNew)
01149 {
01150
01151 Assert(nToDelete == 0);
01152 }
01153 else
01154 {
01155 xlrec.nDelete = nToDelete;
01156 ACCEPT_RDATA_DATA(toDelete,
01157 MAXALIGN(sizeof(OffsetNumber) * nToDelete),
01158 nRdata);
01159 nRdata++;
01160 ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
01161 nRdata++;
01162
01163 if (!state->isBuild)
01164 {
01165
01166
01167
01168
01169
01170
01171 if (nToDelete > 0)
01172 redirectTuplePos = toDelete[0];
01173 spgPageIndexMultiDelete(state, current->page,
01174 toDelete, nToDelete,
01175 SPGIST_REDIRECT,
01176 SPGIST_PLACEHOLDER,
01177 SPGIST_METAPAGE_BLKNO,
01178 FirstOffsetNumber);
01179 }
01180 else
01181 {
01182
01183
01184
01185
01186 spgPageIndexMultiDelete(state, current->page,
01187 toDelete, nToDelete,
01188 SPGIST_PLACEHOLDER,
01189 SPGIST_PLACEHOLDER,
01190 InvalidBlockNumber,
01191 InvalidOffsetNumber);
01192 }
01193 }
01194 }
01195
01196
01197
01198
01199
01200 startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
01201 for (i = 0; i < nToInsert; i++)
01202 {
01203 SpGistLeafTuple it = newLeafs[i];
01204 Buffer leafBuffer;
01205 BlockNumber leafBlock;
01206 OffsetNumber newoffset;
01207
01208
01209 leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
01210 leafBlock = BufferGetBlockNumber(leafBuffer);
01211
01212
01213 n = out.mapTuplesToNodes[i];
01214
01215 if (ItemPointerIsValid(&nodes[n]->t_tid))
01216 {
01217 Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
01218 it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
01219 }
01220 else
01221 it->nextOffset = InvalidOffsetNumber;
01222
01223
01224 newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
01225 (Item) it, it->size,
01226 &startOffsets[leafPageSelect[i]],
01227 false);
01228 toInsert[i] = newoffset;
01229
01230
01231 ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
01232
01233
01234 memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
01235 leafptr += newLeafs[i]->size;
01236 }
01237
01238
01239
01240
01241
01242
01243 if (newLeafBuffer != InvalidBuffer)
01244 {
01245 MarkBufferDirty(newLeafBuffer);
01246
01247 xlrec.blknoDest = BufferGetBlockNumber(newLeafBuffer);
01248 if (!xlrec.initDest)
01249 {
01250 ACCEPT_RDATA_BUFFER(newLeafBuffer, nRdata);
01251 nRdata++;
01252 }
01253 }
01254
01255 xlrec.nInsert = nToInsert;
01256 ACCEPT_RDATA_DATA(toInsert,
01257 MAXALIGN(sizeof(OffsetNumber) * nToInsert),
01258 nRdata);
01259 nRdata++;
01260 ACCEPT_RDATA_DATA(leafPageSelect,
01261 MAXALIGN(sizeof(uint8) * nToInsert),
01262 nRdata);
01263 nRdata++;
01264 ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, nRdata);
01265 nRdata++;
01266
01267
01268 saveCurrent = *current;
01269
01270
01271
01272
01273 if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
01274 {
01275
01276
01277
01278 Assert(current->buffer != parent->buffer);
01279
01280
01281 current->blkno = parent->blkno;
01282 current->buffer = parent->buffer;
01283 current->page = parent->page;
01284 xlrec.blknoInner = current->blkno;
01285 xlrec.offnumInner = current->offnum =
01286 SpGistPageAddNewItem(state, current->page,
01287 (Item) innerTuple, innerTuple->size,
01288 NULL, false);
01289
01290
01291
01292
01293 xlrec.blknoParent = parent->blkno;
01294 xlrec.offnumParent = parent->offnum;
01295 xlrec.nodeI = parent->node;
01296 saveNodeLink(index, parent, current->blkno, current->offnum);
01297
01298 ACCEPT_RDATA_BUFFER(parent->buffer, nRdata);
01299 nRdata++;
01300
01301
01302
01303
01304 if (redirectTuplePos != InvalidOffsetNumber)
01305 setRedirectionTuple(&saveCurrent, redirectTuplePos,
01306 current->blkno, current->offnum);
01307
01308
01309 MarkBufferDirty(saveCurrent.buffer);
01310 }
01311 else if (parent->buffer != InvalidBuffer)
01312 {
01313
01314
01315
01316 Assert(newInnerBuffer != InvalidBuffer);
01317
01318
01319 current->buffer = newInnerBuffer;
01320 current->blkno = BufferGetBlockNumber(current->buffer);
01321 current->page = BufferGetPage(current->buffer);
01322 xlrec.blknoInner = current->blkno;
01323 xlrec.offnumInner = current->offnum =
01324 SpGistPageAddNewItem(state, current->page,
01325 (Item) innerTuple, innerTuple->size,
01326 NULL, false);
01327
01328
01329 MarkBufferDirty(current->buffer);
01330
01331
01332
01333
01334 xlrec.blknoParent = parent->blkno;
01335 xlrec.offnumParent = parent->offnum;
01336 xlrec.nodeI = parent->node;
01337 saveNodeLink(index, parent, current->blkno, current->offnum);
01338
01339 ACCEPT_RDATA_BUFFER(current->buffer, nRdata);
01340 nRdata++;
01341 ACCEPT_RDATA_BUFFER(parent->buffer, nRdata);
01342 nRdata++;
01343
01344
01345
01346
01347 if (redirectTuplePos != InvalidOffsetNumber)
01348 setRedirectionTuple(&saveCurrent, redirectTuplePos,
01349 current->blkno, current->offnum);
01350
01351
01352 MarkBufferDirty(saveCurrent.buffer);
01353 }
01354 else
01355 {
01356
01357
01358
01359
01360 Assert(SpGistBlockIsRoot(current->blkno));
01361 Assert(redirectTuplePos == InvalidOffsetNumber);
01362
01363 SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
01364 xlrec.initInner = true;
01365
01366 xlrec.blknoInner = current->blkno;
01367 xlrec.offnumInner = current->offnum =
01368 PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
01369 InvalidOffsetNumber, false, false);
01370 if (current->offnum != FirstOffsetNumber)
01371 elog(ERROR, "failed to add item of size %u to SPGiST index page",
01372 innerTuple->size);
01373
01374
01375 xlrec.blknoParent = InvalidBlockNumber;
01376 xlrec.offnumParent = InvalidOffsetNumber;
01377 xlrec.nodeI = 0;
01378
01379
01380 MarkBufferDirty(current->buffer);
01381
01382
01383 saveCurrent.buffer = InvalidBuffer;
01384 }
01385
01386 if (RelationNeedsWAL(index))
01387 {
01388 XLogRecPtr recptr;
01389
01390
01391 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT, rdata);
01392
01393
01394 if (newLeafBuffer != InvalidBuffer)
01395 {
01396 Page page = BufferGetPage(newLeafBuffer);
01397
01398 PageSetLSN(page, recptr);
01399 }
01400
01401 if (saveCurrent.buffer != InvalidBuffer)
01402 {
01403 Page page = BufferGetPage(saveCurrent.buffer);
01404
01405 PageSetLSN(page, recptr);
01406 }
01407
01408 PageSetLSN(current->page, recptr);
01409
01410 if (parent->buffer != InvalidBuffer)
01411 {
01412 PageSetLSN(parent->page, recptr);
01413 }
01414 }
01415
01416 END_CRIT_SECTION();
01417
01418
01419 if (newLeafBuffer != InvalidBuffer)
01420 {
01421 SpGistSetLastUsedPage(index, newLeafBuffer);
01422 UnlockReleaseBuffer(newLeafBuffer);
01423 }
01424 if (saveCurrent.buffer != InvalidBuffer)
01425 {
01426 SpGistSetLastUsedPage(index, saveCurrent.buffer);
01427 UnlockReleaseBuffer(saveCurrent.buffer);
01428 }
01429
01430 return insertedNew;
01431 }
01432
01433
01434
01435
01436 static void
01437 spgMatchNodeAction(Relation index, SpGistState *state,
01438 SpGistInnerTuple innerTuple,
01439 SPPageDesc *current, SPPageDesc *parent, int nodeN)
01440 {
01441 int i;
01442 SpGistNodeTuple node;
01443
01444
01445 if (parent->buffer != InvalidBuffer &&
01446 parent->buffer != current->buffer)
01447 {
01448 SpGistSetLastUsedPage(index, parent->buffer);
01449 UnlockReleaseBuffer(parent->buffer);
01450 }
01451
01452
01453 parent->blkno = current->blkno;
01454 parent->buffer = current->buffer;
01455 parent->page = current->page;
01456 parent->offnum = current->offnum;
01457 parent->node = nodeN;
01458
01459
01460 SGITITERATE(innerTuple, i, node)
01461 {
01462 if (i == nodeN)
01463 break;
01464 }
01465
01466 if (i != nodeN)
01467 elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
01468 nodeN);
01469
01470
01471 if (ItemPointerIsValid(&node->t_tid))
01472 {
01473 current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
01474 current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
01475 }
01476 else
01477 {
01478
01479 current->blkno = InvalidBlockNumber;
01480 current->offnum = InvalidOffsetNumber;
01481 }
01482
01483 current->buffer = InvalidBuffer;
01484 current->page = NULL;
01485 }
01486
01487
01488
01489
01490 static void
01491 spgAddNodeAction(Relation index, SpGistState *state,
01492 SpGistInnerTuple innerTuple,
01493 SPPageDesc *current, SPPageDesc *parent,
01494 int nodeN, Datum nodeLabel)
01495 {
01496 SpGistInnerTuple newInnerTuple;
01497 XLogRecData rdata[5];
01498 spgxlogAddNode xlrec;
01499
01500
01501 Assert(!SpGistPageStoresNulls(current->page));
01502
01503
01504 newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
01505
01506
01507 xlrec.node = index->rd_node;
01508 STORE_STATE(state, xlrec.stateSrc);
01509 xlrec.blkno = current->blkno;
01510 xlrec.offnum = current->offnum;
01511
01512
01513 xlrec.blknoParent = InvalidBlockNumber;
01514 xlrec.offnumParent = InvalidOffsetNumber;
01515 xlrec.nodeI = 0;
01516
01517
01518 xlrec.blknoNew = InvalidBlockNumber;
01519 xlrec.offnumNew = InvalidOffsetNumber;
01520 xlrec.newPage = false;
01521
01522 ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
01523
01524 ACCEPT_RDATA_DATA(newInnerTuple, newInnerTuple->size, 1);
01525 ACCEPT_RDATA_BUFFER(current->buffer, 2);
01526
01527 if (PageGetExactFreeSpace(current->page) >=
01528 newInnerTuple->size - innerTuple->size)
01529 {
01530
01531
01532
01533 START_CRIT_SECTION();
01534
01535 PageIndexTupleDelete(current->page, current->offnum);
01536 if (PageAddItem(current->page,
01537 (Item) newInnerTuple, newInnerTuple->size,
01538 current->offnum, false, false) != current->offnum)
01539 elog(ERROR, "failed to add item of size %u to SPGiST index page",
01540 newInnerTuple->size);
01541
01542 MarkBufferDirty(current->buffer);
01543
01544 if (RelationNeedsWAL(index))
01545 {
01546 XLogRecPtr recptr;
01547
01548 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
01549
01550 PageSetLSN(current->page, recptr);
01551 }
01552
01553 END_CRIT_SECTION();
01554 }
01555 else
01556 {
01557
01558
01559
01560 SpGistDeadTuple dt;
01561 SPPageDesc saveCurrent;
01562
01563
01564
01565
01566
01567
01568 if (SpGistBlockIsRoot(current->blkno))
01569 elog(ERROR, "cannot enlarge root tuple any more");
01570 Assert(parent->buffer != InvalidBuffer);
01571
01572 saveCurrent = *current;
01573
01574 xlrec.blknoParent = parent->blkno;
01575 xlrec.offnumParent = parent->offnum;
01576 xlrec.nodeI = parent->node;
01577
01578
01579
01580
01581
01582 current->buffer = SpGistGetBuffer(index,
01583 GBUF_INNER_PARITY(current->blkno),
01584 newInnerTuple->size + sizeof(ItemIdData),
01585 &xlrec.newPage);
01586 current->blkno = BufferGetBlockNumber(current->buffer);
01587 current->page = BufferGetPage(current->buffer);
01588
01589 xlrec.blknoNew = current->blkno;
01590
01591
01592
01593
01594
01595
01596
01597
01598
01599 if (xlrec.blknoNew == xlrec.blkno)
01600 elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
01601
01602
01603
01604
01605
01606 ACCEPT_RDATA_BUFFER(current->buffer, 3);
01607 if (parent->buffer != current->buffer &&
01608 parent->buffer != saveCurrent.buffer)
01609 ACCEPT_RDATA_BUFFER(parent->buffer, 4);
01610
01611 START_CRIT_SECTION();
01612
01613
01614 xlrec.offnumNew = current->offnum =
01615 SpGistPageAddNewItem(state, current->page,
01616 (Item) newInnerTuple, newInnerTuple->size,
01617 NULL, false);
01618
01619 MarkBufferDirty(current->buffer);
01620
01621
01622 saveNodeLink(index, parent, current->blkno, current->offnum);
01623
01624
01625
01626
01627
01628
01629
01630
01631 if (state->isBuild)
01632 dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
01633 InvalidBlockNumber, InvalidOffsetNumber);
01634 else
01635 dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
01636 current->blkno, current->offnum);
01637
01638 PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
01639 if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
01640 saveCurrent.offnum,
01641 false, false) != saveCurrent.offnum)
01642 elog(ERROR, "failed to add item of size %u to SPGiST index page",
01643 dt->size);
01644
01645 if (state->isBuild)
01646 SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
01647 else
01648 SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
01649
01650 MarkBufferDirty(saveCurrent.buffer);
01651
01652 if (RelationNeedsWAL(index))
01653 {
01654 XLogRecPtr recptr;
01655
01656 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata);
01657
01658
01659 PageSetLSN(current->page, recptr);
01660 PageSetLSN(parent->page, recptr);
01661 PageSetLSN(saveCurrent.page, recptr);
01662 }
01663
01664 END_CRIT_SECTION();
01665
01666
01667 if (saveCurrent.buffer != current->buffer &&
01668 saveCurrent.buffer != parent->buffer)
01669 {
01670 SpGistSetLastUsedPage(index, saveCurrent.buffer);
01671 UnlockReleaseBuffer(saveCurrent.buffer);
01672 }
01673 }
01674 }
01675
01676
01677
01678
01679 static void
01680 spgSplitNodeAction(Relation index, SpGistState *state,
01681 SpGistInnerTuple innerTuple,
01682 SPPageDesc *current, spgChooseOut *out)
01683 {
01684 SpGistInnerTuple prefixTuple,
01685 postfixTuple;
01686 SpGistNodeTuple node,
01687 *nodes;
01688 BlockNumber postfixBlkno;
01689 OffsetNumber postfixOffset;
01690 int i;
01691 XLogRecData rdata[5];
01692 spgxlogSplitTuple xlrec;
01693 Buffer newBuffer = InvalidBuffer;
01694
01695
01696 Assert(!SpGistPageStoresNulls(current->page));
01697
01698
01699
01700
01701
01702
01703 node = spgFormNodeTuple(state, out->result.splitTuple.nodeLabel, false);
01704
01705 prefixTuple = spgFormInnerTuple(state,
01706 out->result.splitTuple.prefixHasPrefix,
01707 out->result.splitTuple.prefixPrefixDatum,
01708 1, &node);
01709
01710
01711 if (prefixTuple->size > innerTuple->size)
01712 elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
01713
01714
01715
01716
01717
01718
01719 nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
01720 SGITITERATE(innerTuple, i, node)
01721 {
01722 nodes[i] = node;
01723 }
01724
01725 postfixTuple = spgFormInnerTuple(state,
01726 out->result.splitTuple.postfixHasPrefix,
01727 out->result.splitTuple.postfixPrefixDatum,
01728 innerTuple->nNodes, nodes);
01729
01730
01731 postfixTuple->allTheSame = innerTuple->allTheSame;
01732
01733
01734 xlrec.node = index->rd_node;
01735 xlrec.newPage = false;
01736
01737 ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0);
01738
01739 ACCEPT_RDATA_DATA(prefixTuple, prefixTuple->size, 1);
01740 ACCEPT_RDATA_DATA(postfixTuple, postfixTuple->size, 2);
01741 ACCEPT_RDATA_BUFFER(current->buffer, 3);
01742
01743
01744
01745
01746
01747
01748
01749
01750 if (SpGistBlockIsRoot(current->blkno) ||
01751 SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
01752 prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
01753 {
01754
01755
01756
01757
01758 newBuffer = SpGistGetBuffer(index,
01759 GBUF_INNER_PARITY(current->blkno + 1),
01760 postfixTuple->size + sizeof(ItemIdData),
01761 &xlrec.newPage);
01762 ACCEPT_RDATA_BUFFER(newBuffer, 4);
01763 }
01764
01765 START_CRIT_SECTION();
01766
01767
01768
01769
01770 PageIndexTupleDelete(current->page, current->offnum);
01771 xlrec.offnumPrefix = PageAddItem(current->page,
01772 (Item) prefixTuple, prefixTuple->size,
01773 current->offnum, false, false);
01774 if (xlrec.offnumPrefix != current->offnum)
01775 elog(ERROR, "failed to add item of size %u to SPGiST index page",
01776 prefixTuple->size);
01777 xlrec.blknoPrefix = current->blkno;
01778
01779
01780
01781
01782 if (newBuffer == InvalidBuffer)
01783 {
01784 xlrec.blknoPostfix = postfixBlkno = current->blkno;
01785 xlrec.offnumPostfix = postfixOffset =
01786 SpGistPageAddNewItem(state, current->page,
01787 (Item) postfixTuple, postfixTuple->size,
01788 NULL, false);
01789 }
01790 else
01791 {
01792 xlrec.blknoPostfix = postfixBlkno = BufferGetBlockNumber(newBuffer);
01793 xlrec.offnumPostfix = postfixOffset =
01794 SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
01795 (Item) postfixTuple, postfixTuple->size,
01796 NULL, false);
01797 MarkBufferDirty(newBuffer);
01798 }
01799
01800
01801
01802
01803
01804
01805
01806
01807 spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
01808 prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
01809 PageGetItemId(current->page, current->offnum));
01810 spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset);
01811
01812 MarkBufferDirty(current->buffer);
01813
01814 if (RelationNeedsWAL(index))
01815 {
01816 XLogRecPtr recptr;
01817
01818 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata);
01819
01820 PageSetLSN(current->page, recptr);
01821
01822 if (newBuffer != InvalidBuffer)
01823 {
01824 PageSetLSN(BufferGetPage(newBuffer), recptr);
01825 }
01826 }
01827
01828 END_CRIT_SECTION();
01829
01830
01831 if (newBuffer != InvalidBuffer)
01832 {
01833 SpGistSetLastUsedPage(index, newBuffer);
01834 UnlockReleaseBuffer(newBuffer);
01835 }
01836 }
01837
01838
01839
01840
01841 void
01842 spgdoinsert(Relation index, SpGistState *state,
01843 ItemPointer heapPtr, Datum datum, bool isnull)
01844 {
01845 int level = 0;
01846 Datum leafDatum;
01847 int leafSize;
01848 SPPageDesc current,
01849 parent;
01850 FmgrInfo *procinfo = NULL;
01851
01852
01853
01854
01855
01856 if (!isnull)
01857 procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
01858
01859
01860
01861
01862
01863
01864 if (!isnull && state->attType.attlen == -1)
01865 datum = PointerGetDatum(PG_DETOAST_DATUM(datum));
01866
01867 leafDatum = datum;
01868
01869
01870
01871
01872
01873
01874
01875 if (!isnull)
01876 leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
01877 SpGistGetTypeSize(&state->attType, leafDatum);
01878 else
01879 leafSize = SGDTSIZE + sizeof(ItemIdData);
01880
01881 if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK)
01882 ereport(ERROR,
01883 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
01884 errmsg("index row size %lu exceeds maximum %lu for index \"%s\"",
01885 (unsigned long) (leafSize - sizeof(ItemIdData)),
01886 (unsigned long) (SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
01887 RelationGetRelationName(index)),
01888 errhint("Values larger than a buffer page cannot be indexed.")));
01889
01890
01891 current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
01892 current.buffer = InvalidBuffer;
01893 current.page = NULL;
01894 current.offnum = FirstOffsetNumber;
01895 current.node = -1;
01896
01897
01898 parent.blkno = InvalidBlockNumber;
01899 parent.buffer = InvalidBuffer;
01900 parent.page = NULL;
01901 parent.offnum = InvalidOffsetNumber;
01902 parent.node = -1;
01903
01904 for (;;)
01905 {
01906 bool isNew = false;
01907
01908
01909
01910
01911
01912
01913 CHECK_FOR_INTERRUPTS();
01914
01915 if (current.blkno == InvalidBlockNumber)
01916 {
01917
01918
01919
01920
01921
01922
01923 current.buffer =
01924 SpGistGetBuffer(index,
01925 GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
01926 Min(leafSize, SPGIST_PAGE_CAPACITY),
01927 &isNew);
01928 current.blkno = BufferGetBlockNumber(current.buffer);
01929 }
01930 else if (parent.buffer == InvalidBuffer ||
01931 current.blkno != parent.blkno)
01932 {
01933 current.buffer = ReadBuffer(index, current.blkno);
01934 LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
01935 }
01936 else
01937 {
01938
01939 current.buffer = parent.buffer;
01940 }
01941 current.page = BufferGetPage(current.buffer);
01942
01943
01944 if (isnull ? !SpGistPageStoresNulls(current.page) :
01945 SpGistPageStoresNulls(current.page))
01946 elog(ERROR, "SPGiST index page %u has wrong nulls flag",
01947 current.blkno);
01948
01949 if (SpGistPageIsLeaf(current.page))
01950 {
01951 SpGistLeafTuple leafTuple;
01952 int nToSplit,
01953 sizeToSplit;
01954
01955 leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
01956 if (leafTuple->size + sizeof(ItemIdData) <=
01957 SpGistPageGetFreeSpace(current.page, 1))
01958 {
01959
01960 addLeafTuple(index, state, leafTuple,
01961 ¤t, &parent, isnull, isNew);
01962 break;
01963 }
01964 else if ((sizeToSplit =
01965 checkSplitConditions(index, state, ¤t,
01966 &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
01967 nToSplit < 64 &&
01968 leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
01969 {
01970
01971
01972
01973
01974 Assert(!isNew);
01975 moveLeafs(index, state, ¤t, &parent, leafTuple, isnull);
01976 break;
01977 }
01978 else
01979 {
01980
01981 if (doPickSplit(index, state, ¤t, &parent,
01982 leafTuple, level, isnull, isNew))
01983 break;
01984
01985
01986 pfree(leafTuple);
01987
01988
01989
01990
01991 Assert(!SpGistPageIsLeaf(current.page));
01992 goto process_inner_tuple;
01993 }
01994 }
01995 else
01996 {
01997
01998
01999
02000
02001 SpGistInnerTuple innerTuple;
02002 spgChooseIn in;
02003 spgChooseOut out;
02004
02005
02006
02007
02008
02009
02010
02011 process_inner_tuple:
02012 CHECK_FOR_INTERRUPTS();
02013
02014 innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
02015 PageGetItemId(current.page, current.offnum));
02016
02017 in.datum = datum;
02018 in.leafDatum = leafDatum;
02019 in.level = level;
02020 in.allTheSame = innerTuple->allTheSame;
02021 in.hasPrefix = (innerTuple->prefixSize > 0);
02022 in.prefixDatum = SGITDATUM(innerTuple, state);
02023 in.nNodes = innerTuple->nNodes;
02024 in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
02025
02026 memset(&out, 0, sizeof(out));
02027
02028 if (!isnull)
02029 {
02030
02031 FunctionCall2Coll(procinfo,
02032 index->rd_indcollation[0],
02033 PointerGetDatum(&in),
02034 PointerGetDatum(&out));
02035 }
02036 else
02037 {
02038
02039 out.resultType = spgMatchNode;
02040 }
02041
02042 if (innerTuple->allTheSame)
02043 {
02044
02045
02046
02047
02048
02049 if (out.resultType == spgAddNode)
02050 elog(ERROR, "cannot add a node to an allTheSame inner tuple");
02051 else if (out.resultType == spgMatchNode)
02052 out.result.matchNode.nodeN = random() % innerTuple->nNodes;
02053 }
02054
02055 switch (out.resultType)
02056 {
02057 case spgMatchNode:
02058
02059 spgMatchNodeAction(index, state, innerTuple,
02060 ¤t, &parent,
02061 out.result.matchNode.nodeN);
02062
02063 level += out.result.matchNode.levelAdd;
02064
02065 if (!isnull)
02066 {
02067 leafDatum = out.result.matchNode.restDatum;
02068 leafSize = SGLTHDRSZ + sizeof(ItemIdData) +
02069 SpGistGetTypeSize(&state->attType, leafDatum);
02070 }
02071
02072
02073
02074
02075
02076
02077
02078
02079
02080
02081
02082
02083
02084
02085
02086 break;
02087 case spgAddNode:
02088
02089 if (in.nodeLabels == NULL)
02090 elog(ERROR, "cannot add a node to an inner tuple without node labels");
02091
02092 spgAddNodeAction(index, state, innerTuple,
02093 ¤t, &parent,
02094 out.result.addNode.nodeN,
02095 out.result.addNode.nodeLabel);
02096
02097
02098
02099
02100
02101 goto process_inner_tuple;
02102 break;
02103 case spgSplitTuple:
02104
02105 spgSplitNodeAction(index, state, innerTuple,
02106 ¤t, &out);
02107
02108
02109 goto process_inner_tuple;
02110 break;
02111 default:
02112 elog(ERROR, "unrecognized SPGiST choose result: %d",
02113 (int) out.resultType);
02114 break;
02115 }
02116 }
02117 }
02118
02119
02120
02121
02122
02123 if (current.buffer != InvalidBuffer)
02124 {
02125 SpGistSetLastUsedPage(index, current.buffer);
02126 UnlockReleaseBuffer(current.buffer);
02127 }
02128 if (parent.buffer != InvalidBuffer &&
02129 parent.buffer != current.buffer)
02130 {
02131 SpGistSetLastUsedPage(index, parent.buffer);
02132 UnlockReleaseBuffer(parent.buffer);
02133 }
02134 }