#include "postgres.h"
#include "access/genam.h"
#include "access/spgist_private.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "utils/rel.h"
Go to the source code of this file.
typedef struct SPPageDesc SPPageDesc |
static void addLeafTuple | ( | Relation | index, | |
SpGistState * | state, | |||
SpGistLeafTuple | leafTuple, | |||
SPPageDesc * | current, | |||
SPPageDesc * | parent, | |||
bool | isNulls, | |||
bool | isNew | |||
) | [static] |
Definition at line 203 of file spgdoinsert.c.
References ACCEPT_RDATA_BUFFER, ACCEPT_RDATA_DATA, SPPageDesc::blkno, spgxlogAddLeaf::blknoLeaf, spgxlogAddLeaf::blknoParent, SPPageDesc::buffer, elog, END_CRIT_SECTION, ERROR, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, MarkBufferDirty(), spgxlogAddLeaf::newPage, SpGistLeafTupleData::nextOffset, SPPageDesc::node, spgxlogAddLeaf::node, spgxlogAddLeaf::nodeI, NULL, SPPageDesc::offnum, spgxlogAddLeaf::offnumHeadLeaf, spgxlogAddLeaf::offnumLeaf, spgxlogAddLeaf::offnumParent, SPPageDesc::page, PageAddItem(), PageGetItem, PageGetItemId, PageIndexTupleDelete(), PageSetLSN, RelationData::rd_node, RelationNeedsWAL, saveNodeLink(), SpGistLeafTupleData::size, SPGIST_DEAD, SPGIST_LIVE, SpGistBlockIsRoot, SpGistPageAddNewItem(), START_CRIT_SECTION, spgxlogAddLeaf::storesNulls, SpGistLeafTupleData::tupstate, XLOG_SPGIST_ADD_LEAF, and XLogInsert().
Referenced by spgdoinsert().
{ XLogRecData rdata[4]; spgxlogAddLeaf xlrec; xlrec.node = index->rd_node; xlrec.blknoLeaf = current->blkno; xlrec.newPage = isNew; xlrec.storesNulls = isNulls; /* these will be filled below as needed */ xlrec.offnumLeaf = InvalidOffsetNumber; xlrec.offnumHeadLeaf = InvalidOffsetNumber; xlrec.blknoParent = InvalidBlockNumber; xlrec.offnumParent = InvalidOffsetNumber; xlrec.nodeI = 0; ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); /* we assume sizeof(xlrec) is at least int-aligned */ ACCEPT_RDATA_DATA(leafTuple, leafTuple->size, 1); ACCEPT_RDATA_BUFFER(current->buffer, 2); START_CRIT_SECTION(); if (current->offnum == InvalidOffsetNumber || SpGistBlockIsRoot(current->blkno)) { /* Tuple is not part of a chain */ leafTuple->nextOffset = InvalidOffsetNumber; current->offnum = SpGistPageAddNewItem(state, current->page, (Item) leafTuple, leafTuple->size, NULL, false); xlrec.offnumLeaf = current->offnum; /* Must update parent's downlink if any */ if (parent->buffer != InvalidBuffer) { xlrec.blknoParent = parent->blkno; xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; saveNodeLink(index, parent, current->blkno, current->offnum); ACCEPT_RDATA_BUFFER(parent->buffer, 3); } } else { /* * Tuple must be inserted into existing chain. We mustn't change the * chain's head address, but we don't need to chase the entire chain * to put the tuple at the end; we can insert it second. * * Also, it's possible that the "chain" consists only of a DEAD tuple, * in which case we should replace the DEAD tuple in-place. */ SpGistLeafTuple head; OffsetNumber offnum; head = (SpGistLeafTuple) PageGetItem(current->page, PageGetItemId(current->page, current->offnum)); if (head->tupstate == SPGIST_LIVE) { leafTuple->nextOffset = head->nextOffset; offnum = SpGistPageAddNewItem(state, current->page, (Item) leafTuple, leafTuple->size, NULL, false); /* * re-get head of list because it could have been moved on page, * and set new second element */ head = (SpGistLeafTuple) PageGetItem(current->page, PageGetItemId(current->page, current->offnum)); head->nextOffset = offnum; xlrec.offnumLeaf = offnum; xlrec.offnumHeadLeaf = current->offnum; } else if (head->tupstate == SPGIST_DEAD) { leafTuple->nextOffset = InvalidOffsetNumber; PageIndexTupleDelete(current->page, current->offnum); if (PageAddItem(current->page, (Item) leafTuple, leafTuple->size, current->offnum, false, false) != current->offnum) elog(ERROR, "failed to add item of size %u to SPGiST index page", leafTuple->size); /* WAL replay distinguishes this case by equal offnums */ xlrec.offnumLeaf = current->offnum; xlrec.offnumHeadLeaf = current->offnum; } else elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate); } MarkBufferDirty(current->buffer); if (RelationNeedsWAL(index)) { XLogRecPtr recptr; recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata); PageSetLSN(current->page, recptr); /* update parent only if we actually changed it */ if (xlrec.blknoParent != InvalidBlockNumber) { PageSetLSN(parent->page, recptr); } } END_CRIT_SECTION(); }
static SpGistInnerTuple addNode | ( | SpGistState * | state, | |
SpGistInnerTuple | tuple, | |||
Datum | label, | |||
int | offset | |||
) | [static] |
Definition at line 76 of file spgdoinsert.c.
References elog, ERROR, i, SpGistInnerTupleData::nNodes, palloc(), SpGistInnerTupleData::prefixSize, SGITDATUM, SGITITERATE, spgFormInnerTuple(), and spgFormNodeTuple().
Referenced by spgAddNodeAction().
{ SpGistNodeTuple node, *nodes; int i; /* if offset is negative, insert at end */ if (offset < 0) offset = tuple->nNodes; else if (offset > tuple->nNodes) elog(ERROR, "invalid offset for adding node to SPGiST inner tuple"); nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1)); SGITITERATE(tuple, i, node) { if (i < offset) nodes[i] = node; else nodes[i + 1] = node; } nodes[offset] = spgFormNodeTuple(state, label, false); return spgFormInnerTuple(state, (tuple->prefixSize > 0), SGITDATUM(tuple, state), tuple->nNodes + 1, nodes); }
static bool checkAllTheSame | ( | spgPickSplitIn * | in, | |
spgPickSplitOut * | out, | |||
bool | tooBig, | |||
bool * | includeNew | |||
) | [static] |
Definition at line 599 of file spgdoinsert.c.
References i, spgPickSplitOut::mapTuplesToNodes, spgPickSplitOut::nNodes, spgPickSplitOut::nodeLabels, spgPickSplitIn::nTuples, and palloc().
Referenced by doPickSplit().
{ int theNode; int limit; int i; /* For the moment, assume we can include the new leaf tuple */ *includeNew = true; /* If there's only the new leaf tuple, don't select allTheSame mode */ if (in->nTuples <= 1) return false; /* If tuple set doesn't fit on one page, ignore the new tuple in test */ limit = tooBig ? in->nTuples - 1 : in->nTuples; /* Check to see if more than one node is populated */ theNode = out->mapTuplesToNodes[0]; for (i = 1; i < limit; i++) { if (out->mapTuplesToNodes[i] != theNode) return false; } /* Nope, so override the picksplit function's decisions */ /* If the new tuple is in its own node, it can't be included in split */ if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode) *includeNew = false; out->nNodes = 8; /* arbitrary number of child nodes */ /* Random assignment of tuples to nodes (note we include new tuple) */ for (i = 0; i < in->nTuples; i++) out->mapTuplesToNodes[i] = i % out->nNodes; /* The opclass may not use node labels, but if it does, duplicate 'em */ if (out->nodeLabels) { Datum theLabel = out->nodeLabels[theNode]; out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes); for (i = 0; i < out->nNodes; i++) out->nodeLabels[i] = theLabel; } /* We don't touch the prefix or the leaf tuple datum assignments */ return true; }
static int checkSplitConditions | ( | Relation | index, | |
SpGistState * | state, | |||
SPPageDesc * | current, | |||
int * | nToSplit | |||
) | [static] |
Definition at line 333 of file spgdoinsert.c.
References Assert, SPPageDesc::blkno, elog, ERROR, FirstOffsetNumber, i, InvalidOffsetNumber, SpGistLeafTupleData::nextOffset, SPPageDesc::offnum, SPPageDesc::page, PageGetItem, PageGetItemId, PageGetMaxOffsetNumber, SpGistLeafTupleData::size, SPGIST_DEAD, SPGIST_LIVE, SpGistBlockIsRoot, and SpGistLeafTupleData::tupstate.
Referenced by spgdoinsert().
{ int i, n = 0, totalSize = 0; if (SpGistBlockIsRoot(current->blkno)) { /* return impossible values to force split */ *nToSplit = BLCKSZ; return BLCKSZ; } i = current->offnum; while (i != InvalidOffsetNumber) { SpGistLeafTuple it; Assert(i >= FirstOffsetNumber && i <= PageGetMaxOffsetNumber(current->page)); it = (SpGistLeafTuple) PageGetItem(current->page, PageGetItemId(current->page, i)); if (it->tupstate == SPGIST_LIVE) { n++; totalSize += it->size + sizeof(ItemIdData); } else if (it->tupstate == SPGIST_DEAD) { /* We could see a DEAD tuple as first/only chain item */ Assert(i == current->offnum); Assert(it->nextOffset == InvalidOffsetNumber); /* Don't count it in result, because it won't go to other page */ } else elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate); i = it->nextOffset; } *nToSplit = n; return totalSize; }
static int cmpOffsetNumbers | ( | const void * | a, | |
const void * | b | |||
) | [static] |
Definition at line 108 of file spgdoinsert.c.
Referenced by spgPageIndexMultiDelete().
{ if (*(const OffsetNumber *) a == *(const OffsetNumber *) b) return 0; return (*(const OffsetNumber *) a > *(const OffsetNumber *) b) ? 1 : -1; }
static bool doPickSplit | ( | Relation | index, | |
SpGistState * | state, | |||
SPPageDesc * | current, | |||
SPPageDesc * | parent, | |||
SpGistLeafTuple | newLeafTuple, | |||
int | level, | |||
bool | isNulls, | |||
bool | isNew | |||
) | [static] |
Definition at line 676 of file spgdoinsert.c.
References ACCEPT_RDATA_BUFFER, ACCEPT_RDATA_DATA, SpGistInnerTupleData::allTheSame, Assert, SPPageDesc::blkno, spgxlogPickSplit::blknoDest, spgxlogPickSplit::blknoInner, spgxlogPickSplit::blknoParent, spgxlogPickSplit::blknoSrc, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage, checkAllTheSame(), spgPickSplitIn::datums, elog, END_CRIT_SECTION, ERROR, FirstOffsetNumber, FunctionCall2Coll(), GBUF_INNER_PARITY, GBUF_LEAF, GBUF_NULLS, spgPickSplitOut::hasPrefix, SpGistLeafTupleData::heapPtr, i, index_getprocinfo(), spgxlogPickSplit::initDest, spgxlogPickSplit::initInner, spgxlogPickSplit::initSrc, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, SpGistState::isBuild, ItemPointerGetBlockNumber, ItemPointerGetOffsetNumber, ItemPointerIsValid, ItemPointerSet, label, spgPickSplitOut::leafTupleDatums, spgPickSplitIn::level, spgPickSplitOut::mapTuplesToNodes, MarkBufferDirty(), MAXALIGN, Min, spgxlogPickSplit::nDelete, SpGistLeafTupleData::nextOffset, spgxlogPickSplit::nInsert, spgPickSplitOut::nNodes, SPPageDesc::node, spgxlogPickSplit::node, spgxlogPickSplit::nodeI, spgPickSplitOut::nodeLabels, spgPickSplitIn::nTuples, NULL, SPPageDesc::offnum, spgxlogPickSplit::offnumInner, spgxlogPickSplit::offnumParent, SPPageDesc::page, PageAddItem(), PageGetExactFreeSpace(), PageGetItem, PageGetItemId, PageGetMaxOffsetNumber, PageSetLSN, palloc(), palloc0(), PointerGetDatum, spgPickSplitOut::prefixDatum, RelationData::rd_indcollation, RelationData::rd_node, RelationNeedsWAL, saveNodeLink(), setRedirectionTuple(), SGDTSIZE, SGITITERATE, SGLTDATUM, SpGistInnerTupleData::size, SpGistLeafTupleData::size, spgFormInnerTuple(), spgFormLeafTuple(), spgFormNodeTuple(), SPGIST_DEAD, SPGIST_LEAF, SPGIST_LIVE, SPGIST_METAPAGE_BLKNO, SPGIST_NULLS, SPGIST_PAGE_CAPACITY, SPGIST_PICKSPLIT_PROC, SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistBlockIsRoot, SpGistGetBuffer(), SpGistInitBuffer(), SpGistPageAddNewItem(), SpGistPageGetFreeSpace, SpGistPageGetOpaque, SpGistSetLastUsedPage(), spgPageIndexMultiDelete(), START_CRIT_SECTION, spgxlogPickSplit::stateSrc, STORE_STATE, spgxlogPickSplit::storesNulls, SpGistLeafTupleData::tupstate, UnlockReleaseBuffer(), XLOG_SPGIST_PICKSPLIT, and XLogInsert().
Referenced by spgdoinsert().
{ bool insertedNew = false; spgPickSplitIn in; spgPickSplitOut out; FmgrInfo *procinfo; bool includeNew; int i, max, n; SpGistInnerTuple innerTuple; SpGistNodeTuple node, *nodes; Buffer newInnerBuffer, newLeafBuffer; ItemPointerData *heapPtrs; uint8 *leafPageSelect; int *leafSizes; OffsetNumber *toDelete; OffsetNumber *toInsert; OffsetNumber redirectTuplePos = InvalidOffsetNumber; OffsetNumber startOffsets[2]; SpGistLeafTuple *newLeafs; int spaceToDelete; int currentFreeSpace; int totalLeafSizes; bool allTheSame; XLogRecData rdata[10]; int nRdata; spgxlogPickSplit xlrec; char *leafdata, *leafptr; SPPageDesc saveCurrent; int nToDelete, nToInsert, maxToInclude; in.level = level; /* * Allocate per-leaf-tuple work arrays with max possible size */ max = PageGetMaxOffsetNumber(current->page); n = max + 1; in.datums = (Datum *) palloc(sizeof(Datum) * n); heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n); toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n); toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n); newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n); leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n); xlrec.node = index->rd_node; STORE_STATE(state, xlrec.stateSrc); /* * Form list of leaf tuples which will be distributed as split result; * also, count up the amount of space that will be freed from current. * (Note that in the non-root case, we won't actually delete the old * tuples, only replace them with redirects or placeholders.) * * Note: the SGLTDATUM calls here are safe even when dealing with a nulls * page. For a pass-by-value data type we will fetch a word that must * exist even though it may contain garbage (because of the fact that leaf * tuples must have size at least SGDTSIZE). For a pass-by-reference type * we are just computing a pointer that isn't going to get dereferenced. * So it's not worth guarding the calls with isNulls checks. */ nToInsert = 0; nToDelete = 0; spaceToDelete = 0; if (SpGistBlockIsRoot(current->blkno)) { /* * We are splitting the root (which up to now is also a leaf page). * Its tuples are not linked, so scan sequentially to get them all. We * ignore the original value of current->offnum. */ for (i = FirstOffsetNumber; i <= max; i++) { SpGistLeafTuple it; it = (SpGistLeafTuple) PageGetItem(current->page, PageGetItemId(current->page, i)); if (it->tupstate == SPGIST_LIVE) { in.datums[nToInsert] = SGLTDATUM(it, state); heapPtrs[nToInsert] = it->heapPtr; nToInsert++; toDelete[nToDelete] = i; nToDelete++; /* we will delete the tuple altogether, so count full space */ spaceToDelete += it->size + sizeof(ItemIdData); } else /* tuples on root should be live */ elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate); } } else { /* Normal case, just collect the leaf tuples in the chain */ i = current->offnum; while (i != InvalidOffsetNumber) { SpGistLeafTuple it; Assert(i >= FirstOffsetNumber && i <= max); it = (SpGistLeafTuple) PageGetItem(current->page, PageGetItemId(current->page, i)); if (it->tupstate == SPGIST_LIVE) { in.datums[nToInsert] = SGLTDATUM(it, state); heapPtrs[nToInsert] = it->heapPtr; nToInsert++; toDelete[nToDelete] = i; nToDelete++; /* we will not delete the tuple, only replace with dead */ Assert(it->size >= SGDTSIZE); spaceToDelete += it->size - SGDTSIZE; } else if (it->tupstate == SPGIST_DEAD) { /* We could see a DEAD tuple as first/only chain item */ Assert(i == current->offnum); Assert(it->nextOffset == InvalidOffsetNumber); toDelete[nToDelete] = i; nToDelete++; /* replacing it with redirect will save no space */ } else elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate); i = it->nextOffset; } } in.nTuples = nToInsert; /* * We may not actually insert new tuple because another picksplit may be * necessary due to too large value, but we will try to allocate enough * space to include it; and in any case it has to be included in the input * for the picksplit function. So don't increment nToInsert yet. */ in.datums[in.nTuples] = SGLTDATUM(newLeafTuple, state); heapPtrs[in.nTuples] = newLeafTuple->heapPtr; in.nTuples++; memset(&out, 0, sizeof(out)); if (!isNulls) { /* * Perform split using user-defined method. */ procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC); FunctionCall2Coll(procinfo, index->rd_indcollation[0], PointerGetDatum(&in), PointerGetDatum(&out)); /* * Form new leaf tuples and count up the total space needed. */ totalLeafSizes = 0; for (i = 0; i < in.nTuples; i++) { newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, out.leafTupleDatums[i], false); totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData); } } else { /* * Perform dummy split that puts all tuples into one node. * checkAllTheSame will override this and force allTheSame mode. */ out.hasPrefix = false; out.nNodes = 1; out.nodeLabels = NULL; out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples); /* * Form new leaf tuples and count up the total space needed. */ totalLeafSizes = 0; for (i = 0; i < in.nTuples; i++) { newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i, (Datum) 0, true); totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData); } } /* * Check to see if the picksplit function failed to separate the values, * ie, it put them all into the same child node. If so, select allTheSame * mode and create a random split instead. See comments for * checkAllTheSame as to why we need to know if the new leaf tuples could * fit on one page. */ allTheSame = checkAllTheSame(&in, &out, totalLeafSizes > SPGIST_PAGE_CAPACITY, &includeNew); /* * If checkAllTheSame decided we must exclude the new tuple, don't * consider it any further. */ if (includeNew) maxToInclude = in.nTuples; else { maxToInclude = in.nTuples - 1; totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData); } /* * Allocate per-node work arrays. Since checkAllTheSame could replace * out.nNodes with a value larger than the number of tuples on the input * page, we can't allocate these arrays before here. */ nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes); leafSizes = (int *) palloc0(sizeof(int) * out.nNodes); /* * Form nodes of inner tuple and inner tuple itself */ for (i = 0; i < out.nNodes; i++) { Datum label = (Datum) 0; bool labelisnull = (out.nodeLabels == NULL); if (!labelisnull) label = out.nodeLabels[i]; nodes[i] = spgFormNodeTuple(state, label, labelisnull); } innerTuple = spgFormInnerTuple(state, out.hasPrefix, out.prefixDatum, out.nNodes, nodes); innerTuple->allTheSame = allTheSame; /* * Update nodes[] array to point into the newly formed innerTuple, so that * we can adjust their downlinks below. */ SGITITERATE(innerTuple, i, node) { nodes[i] = node; } /* * Re-scan new leaf tuples and count up the space needed under each node. */ for (i = 0; i < maxToInclude; i++) { n = out.mapTuplesToNodes[i]; if (n < 0 || n >= out.nNodes) elog(ERROR, "inconsistent result of SPGiST picksplit function"); leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData); } /* * To perform the split, we must insert a new inner tuple, which can't go * on a leaf page; and unless we are splitting the root page, we must then * update the parent tuple's downlink to point to the inner tuple. If * there is room, we'll put the new inner tuple on the same page as the * parent tuple, otherwise we need another non-leaf buffer. But if the * parent page is the root, we can't add the new inner tuple there, * because the root page must have only one inner tuple. */ xlrec.initInner = false; if (parent->buffer != InvalidBuffer && !SpGistBlockIsRoot(parent->blkno) && (SpGistPageGetFreeSpace(parent->page, 1) >= innerTuple->size + sizeof(ItemIdData))) { /* New inner tuple will fit on parent page */ newInnerBuffer = parent->buffer; } else if (parent->buffer != InvalidBuffer) { /* Send tuple to page with next triple parity (see README) */ newInnerBuffer = SpGistGetBuffer(index, GBUF_INNER_PARITY(parent->blkno + 1) | (isNulls ? GBUF_NULLS : 0), innerTuple->size + sizeof(ItemIdData), &xlrec.initInner); } else { /* Root page split ... inner tuple will go to root page */ newInnerBuffer = InvalidBuffer; } /* * Because a WAL record can't involve more than four buffers, we can only * afford to deal with two leaf pages in each picksplit action, ie the * current page and at most one other. * * The new leaf tuples converted from the existing ones should require the * same or less space, and therefore should all fit onto one page * (although that's not necessarily the current page, since we can't * delete the old tuples but only replace them with placeholders). * However, the incoming new tuple might not also fit, in which case we * might need another picksplit cycle to reduce it some more. * * If there's not room to put everything back onto the current page, then * we decide on a per-node basis which tuples go to the new page. (We do * it like that because leaf tuple chains can't cross pages, so we must * place all leaf tuples belonging to the same parent node on the same * page.) * * If we are splitting the root page (turning it from a leaf page into an * inner page), then no leaf tuples can go back to the current page; they * must all go somewhere else. */ if (!SpGistBlockIsRoot(current->blkno)) currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete; else currentFreeSpace = 0; /* prevent assigning any tuples to current */ xlrec.initDest = false; if (totalLeafSizes <= currentFreeSpace) { /* All the leaf tuples will fit on current page */ newLeafBuffer = InvalidBuffer; /* mark new leaf tuple as included in insertions, if allowed */ if (includeNew) { nToInsert++; insertedNew = true; } for (i = 0; i < nToInsert; i++) leafPageSelect[i] = 0; /* signifies current page */ } else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY) { /* * We're trying to split up a long value by repeated suffixing, but * it's not going to fit yet. Don't bother allocating a second leaf * buffer that we won't be able to use. */ newLeafBuffer = InvalidBuffer; Assert(includeNew); Assert(nToInsert == 0); } else { /* We will need another leaf page */ uint8 *nodePageSelect; int curspace; int newspace; newLeafBuffer = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0), Min(totalLeafSizes, SPGIST_PAGE_CAPACITY), &xlrec.initDest); /* * Attempt to assign node groups to the two pages. We might fail to * do so, even if totalLeafSizes is less than the available space, * because we can't split a group across pages. */ nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes); curspace = currentFreeSpace; newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer)); for (i = 0; i < out.nNodes; i++) { if (leafSizes[i] <= curspace) { nodePageSelect[i] = 0; /* signifies current page */ curspace -= leafSizes[i]; } else { nodePageSelect[i] = 1; /* signifies new leaf page */ newspace -= leafSizes[i]; } } if (curspace >= 0 && newspace >= 0) { /* Successful assignment, so we can include the new leaf tuple */ if (includeNew) { nToInsert++; insertedNew = true; } } else if (includeNew) { /* We must exclude the new leaf tuple from the split */ int nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1]; leafSizes[nodeOfNewTuple] -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData); /* Repeat the node assignment process --- should succeed now */ curspace = currentFreeSpace; newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer)); for (i = 0; i < out.nNodes; i++) { if (leafSizes[i] <= curspace) { nodePageSelect[i] = 0; /* signifies current page */ curspace -= leafSizes[i]; } else { nodePageSelect[i] = 1; /* signifies new leaf page */ newspace -= leafSizes[i]; } } if (curspace < 0 || newspace < 0) elog(ERROR, "failed to divide leaf tuple groups across pages"); } else { /* oops, we already excluded new tuple ... should not get here */ elog(ERROR, "failed to divide leaf tuple groups across pages"); } /* Expand the per-node assignments to be shown per leaf tuple */ for (i = 0; i < nToInsert; i++) { n = out.mapTuplesToNodes[i]; leafPageSelect[i] = nodePageSelect[n]; } } /* Start preparing WAL record */ xlrec.blknoSrc = current->blkno; xlrec.blknoDest = InvalidBlockNumber; xlrec.nDelete = 0; xlrec.initSrc = isNew; xlrec.storesNulls = isNulls; leafdata = leafptr = (char *) palloc(totalLeafSizes); ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0); ACCEPT_RDATA_DATA(innerTuple, innerTuple->size, 1); nRdata = 2; /* Here we begin making the changes to the target pages */ START_CRIT_SECTION(); /* * Delete old leaf tuples from current buffer, except when we're splitting * the root; in that case there's no need because we'll re-init the page * below. We do this first to make room for reinserting new leaf tuples. */ if (!SpGistBlockIsRoot(current->blkno)) { /* * Init buffer instead of deleting individual tuples, but only if * there aren't any other live tuples and only during build; otherwise * we need to set a redirection tuple for concurrent scans. */ if (state->isBuild && nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder == PageGetMaxOffsetNumber(current->page)) { SpGistInitBuffer(current->buffer, SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0)); xlrec.initSrc = true; } else if (isNew) { /* don't expose the freshly init'd buffer as a backup block */ Assert(nToDelete == 0); } else { xlrec.nDelete = nToDelete; ACCEPT_RDATA_DATA(toDelete, MAXALIGN(sizeof(OffsetNumber) * nToDelete), nRdata); nRdata++; ACCEPT_RDATA_BUFFER(current->buffer, nRdata); nRdata++; if (!state->isBuild) { /* * Need to create redirect tuple (it will point to new inner * tuple) but right now the new tuple's location is not known * yet. So, set the redirection pointer to "impossible" value * and remember its position to update tuple later. */ if (nToDelete > 0) redirectTuplePos = toDelete[0]; spgPageIndexMultiDelete(state, current->page, toDelete, nToDelete, SPGIST_REDIRECT, SPGIST_PLACEHOLDER, SPGIST_METAPAGE_BLKNO, FirstOffsetNumber); } else { /* * During index build there is not concurrent searches, so we * don't need to create redirection tuple. */ spgPageIndexMultiDelete(state, current->page, toDelete, nToDelete, SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER, InvalidBlockNumber, InvalidOffsetNumber); } } } /* * Put leaf tuples on proper pages, and update downlinks in innerTuple's * nodes. */ startOffsets[0] = startOffsets[1] = InvalidOffsetNumber; for (i = 0; i < nToInsert; i++) { SpGistLeafTuple it = newLeafs[i]; Buffer leafBuffer; BlockNumber leafBlock; OffsetNumber newoffset; /* Which page is it going to? */ leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer; leafBlock = BufferGetBlockNumber(leafBuffer); /* Link tuple into correct chain for its node */ n = out.mapTuplesToNodes[i]; if (ItemPointerIsValid(&nodes[n]->t_tid)) { Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock); it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid); } else it->nextOffset = InvalidOffsetNumber; /* Insert it on page */ newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer), (Item) it, it->size, &startOffsets[leafPageSelect[i]], false); toInsert[i] = newoffset; /* ... and complete the chain linking */ ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset); /* Also copy leaf tuple into WAL data */ memcpy(leafptr, newLeafs[i], newLeafs[i]->size); leafptr += newLeafs[i]->size; } /* * We're done modifying the other leaf buffer (if any), so mark it dirty. * current->buffer will be marked below, after we're entirely done * modifying it. */ if (newLeafBuffer != InvalidBuffer) { MarkBufferDirty(newLeafBuffer); /* also save block number for WAL */ xlrec.blknoDest = BufferGetBlockNumber(newLeafBuffer); if (!xlrec.initDest) { ACCEPT_RDATA_BUFFER(newLeafBuffer, nRdata); nRdata++; } } xlrec.nInsert = nToInsert; ACCEPT_RDATA_DATA(toInsert, MAXALIGN(sizeof(OffsetNumber) * nToInsert), nRdata); nRdata++; ACCEPT_RDATA_DATA(leafPageSelect, MAXALIGN(sizeof(uint8) * nToInsert), nRdata); nRdata++; ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, nRdata); nRdata++; /* Remember current buffer, since we're about to change "current" */ saveCurrent = *current; /* * Store the new innerTuple */ if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer) { /* * new inner tuple goes to parent page */ Assert(current->buffer != parent->buffer); /* Repoint "current" at the new inner tuple */ current->blkno = parent->blkno; current->buffer = parent->buffer; current->page = parent->page; xlrec.blknoInner = current->blkno; xlrec.offnumInner = current->offnum = SpGistPageAddNewItem(state, current->page, (Item) innerTuple, innerTuple->size, NULL, false); /* * Update parent node link and mark parent page dirty */ xlrec.blknoParent = parent->blkno; xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; saveNodeLink(index, parent, current->blkno, current->offnum); ACCEPT_RDATA_BUFFER(parent->buffer, nRdata); nRdata++; /* * Update redirection link (in old current buffer) */ if (redirectTuplePos != InvalidOffsetNumber) setRedirectionTuple(&saveCurrent, redirectTuplePos, current->blkno, current->offnum); /* Done modifying old current buffer, mark it dirty */ MarkBufferDirty(saveCurrent.buffer); } else if (parent->buffer != InvalidBuffer) { /* * new inner tuple will be stored on a new page */ Assert(newInnerBuffer != InvalidBuffer); /* Repoint "current" at the new inner tuple */ current->buffer = newInnerBuffer; current->blkno = BufferGetBlockNumber(current->buffer); current->page = BufferGetPage(current->buffer); xlrec.blknoInner = current->blkno; xlrec.offnumInner = current->offnum = SpGistPageAddNewItem(state, current->page, (Item) innerTuple, innerTuple->size, NULL, false); /* Done modifying new current buffer, mark it dirty */ MarkBufferDirty(current->buffer); /* * Update parent node link and mark parent page dirty */ xlrec.blknoParent = parent->blkno; xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; saveNodeLink(index, parent, current->blkno, current->offnum); ACCEPT_RDATA_BUFFER(current->buffer, nRdata); nRdata++; ACCEPT_RDATA_BUFFER(parent->buffer, nRdata); nRdata++; /* * Update redirection link (in old current buffer) */ if (redirectTuplePos != InvalidOffsetNumber) setRedirectionTuple(&saveCurrent, redirectTuplePos, current->blkno, current->offnum); /* Done modifying old current buffer, mark it dirty */ MarkBufferDirty(saveCurrent.buffer); } else { /* * Splitting root page, which was a leaf but now becomes inner page * (and so "current" continues to point at it) */ Assert(SpGistBlockIsRoot(current->blkno)); Assert(redirectTuplePos == InvalidOffsetNumber); SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0)); xlrec.initInner = true; xlrec.blknoInner = current->blkno; xlrec.offnumInner = current->offnum = PageAddItem(current->page, (Item) innerTuple, innerTuple->size, InvalidOffsetNumber, false, false); if (current->offnum != FirstOffsetNumber) elog(ERROR, "failed to add item of size %u to SPGiST index page", innerTuple->size); /* No parent link to update, nor redirection to do */ xlrec.blknoParent = InvalidBlockNumber; xlrec.offnumParent = InvalidOffsetNumber; xlrec.nodeI = 0; /* Done modifying new current buffer, mark it dirty */ MarkBufferDirty(current->buffer); /* saveCurrent doesn't represent a different buffer */ saveCurrent.buffer = InvalidBuffer; } if (RelationNeedsWAL(index)) { XLogRecPtr recptr; /* Issue the WAL record */ recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT, rdata); /* Update page LSNs on all affected pages */ if (newLeafBuffer != InvalidBuffer) { Page page = BufferGetPage(newLeafBuffer); PageSetLSN(page, recptr); } if (saveCurrent.buffer != InvalidBuffer) { Page page = BufferGetPage(saveCurrent.buffer); PageSetLSN(page, recptr); } PageSetLSN(current->page, recptr); if (parent->buffer != InvalidBuffer) { PageSetLSN(parent->page, recptr); } } END_CRIT_SECTION(); /* Update local free-space cache and unlock buffers */ if (newLeafBuffer != InvalidBuffer) { SpGistSetLastUsedPage(index, newLeafBuffer); UnlockReleaseBuffer(newLeafBuffer); } if (saveCurrent.buffer != InvalidBuffer) { SpGistSetLastUsedPage(index, saveCurrent.buffer); UnlockReleaseBuffer(saveCurrent.buffer); } return insertedNew; }
static void moveLeafs | ( | Relation | index, | |
SpGistState * | state, | |||
SPPageDesc * | current, | |||
SPPageDesc * | parent, | |||
SpGistLeafTuple | newLeafTuple, | |||
bool | isNulls | |||
) | [static] |
Definition at line 387 of file spgdoinsert.c.
References ACCEPT_RDATA_BUFFER, ACCEPT_RDATA_DATA, Assert, SPPageDesc::blkno, spgxlogMoveLeafs::blknoDst, spgxlogMoveLeafs::blknoParent, spgxlogMoveLeafs::blknoSrc, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage, elog, END_CRIT_SECTION, ERROR, FirstOffsetNumber, GBUF_LEAF, GBUF_NULLS, i, InvalidBuffer, InvalidOffsetNumber, SpGistState::isBuild, MarkBufferDirty(), MAXALIGN, spgxlogMoveLeafs::newPage, SpGistLeafTupleData::nextOffset, spgxlogMoveLeafs::nMoves, SPPageDesc::node, spgxlogMoveLeafs::node, spgxlogMoveLeafs::nodeI, SPPageDesc::offnum, spgxlogMoveLeafs::offnumParent, SPPageDesc::page, PageGetItem, PageGetItemId, PageGetMaxOffsetNumber, PageSetLSN, palloc(), RelationData::rd_node, RelationNeedsWAL, spgxlogMoveLeafs::replaceDead, saveNodeLink(), SpGistLeafTupleData::size, SPGIST_DEAD, SPGIST_LIVE, SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistSetLastUsedPage(), spgPageIndexMultiDelete(), START_CRIT_SECTION, spgxlogMoveLeafs::stateSrc, STORE_STATE, spgxlogMoveLeafs::storesNulls, SpGistLeafTupleData::tupstate, UnlockReleaseBuffer(), XLOG_SPGIST_MOVE_LEAFS, and XLogInsert().
Referenced by spgdoinsert().
{ int i, nDelete, nInsert, size; Buffer nbuf; Page npage; SpGistLeafTuple it; OffsetNumber r = InvalidOffsetNumber, startOffset = InvalidOffsetNumber; bool replaceDead = false; OffsetNumber *toDelete; OffsetNumber *toInsert; BlockNumber nblkno; XLogRecData rdata[7]; spgxlogMoveLeafs xlrec; char *leafdata, *leafptr; /* This doesn't work on root page */ Assert(parent->buffer != InvalidBuffer); Assert(parent->buffer != current->buffer); /* Locate the tuples to be moved, and count up the space needed */ i = PageGetMaxOffsetNumber(current->page); toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i); toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1)); size = newLeafTuple->size + sizeof(ItemIdData); nDelete = 0; i = current->offnum; while (i != InvalidOffsetNumber) { SpGistLeafTuple it; Assert(i >= FirstOffsetNumber && i <= PageGetMaxOffsetNumber(current->page)); it = (SpGistLeafTuple) PageGetItem(current->page, PageGetItemId(current->page, i)); if (it->tupstate == SPGIST_LIVE) { toDelete[nDelete] = i; size += it->size + sizeof(ItemIdData); nDelete++; } else if (it->tupstate == SPGIST_DEAD) { /* We could see a DEAD tuple as first/only chain item */ Assert(i == current->offnum); Assert(it->nextOffset == InvalidOffsetNumber); /* We don't want to move it, so don't count it in size */ toDelete[nDelete] = i; nDelete++; replaceDead = true; } else elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate); i = it->nextOffset; } /* Find a leaf page that will hold them */ nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0), size, &xlrec.newPage); npage = BufferGetPage(nbuf); nblkno = BufferGetBlockNumber(nbuf); Assert(nblkno != current->blkno); /* prepare WAL info */ xlrec.node = index->rd_node; STORE_STATE(state, xlrec.stateSrc); xlrec.blknoSrc = current->blkno; xlrec.blknoDst = nblkno; xlrec.nMoves = nDelete; xlrec.replaceDead = replaceDead; xlrec.storesNulls = isNulls; xlrec.blknoParent = parent->blkno; xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; leafdata = leafptr = palloc(size); START_CRIT_SECTION(); /* copy all the old tuples to new page, unless they're dead */ nInsert = 0; if (!replaceDead) { for (i = 0; i < nDelete; i++) { it = (SpGistLeafTuple) PageGetItem(current->page, PageGetItemId(current->page, toDelete[i])); Assert(it->tupstate == SPGIST_LIVE); /* * Update chain link (notice the chain order gets reversed, but we * don't care). We're modifying the tuple on the source page * here, but it's okay since we're about to delete it. */ it->nextOffset = r; r = SpGistPageAddNewItem(state, npage, (Item) it, it->size, &startOffset, false); toInsert[nInsert] = r; nInsert++; /* save modified tuple into leafdata as well */ memcpy(leafptr, it, it->size); leafptr += it->size; } } /* add the new tuple as well */ newLeafTuple->nextOffset = r; r = SpGistPageAddNewItem(state, npage, (Item) newLeafTuple, newLeafTuple->size, &startOffset, false); toInsert[nInsert] = r; nInsert++; memcpy(leafptr, newLeafTuple, newLeafTuple->size); leafptr += newLeafTuple->size; /* * Now delete the old tuples, leaving a redirection pointer behind for the * first one, unless we're doing an index build; in which case there can't * be any concurrent scan so we need not provide a redirect. */ spgPageIndexMultiDelete(state, current->page, toDelete, nDelete, state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT, SPGIST_PLACEHOLDER, nblkno, r); /* Update parent's downlink and mark parent page dirty */ saveNodeLink(index, parent, nblkno, r); /* Mark the leaf pages too */ MarkBufferDirty(current->buffer); MarkBufferDirty(nbuf); if (RelationNeedsWAL(index)) { XLogRecPtr recptr; ACCEPT_RDATA_DATA(&xlrec, MAXALIGN(sizeof(xlrec)), 0); ACCEPT_RDATA_DATA(toDelete, MAXALIGN(sizeof(OffsetNumber) * nDelete), 1); ACCEPT_RDATA_DATA(toInsert, MAXALIGN(sizeof(OffsetNumber) * nInsert), 2); ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, 3); ACCEPT_RDATA_BUFFER(current->buffer, 4); ACCEPT_RDATA_BUFFER(nbuf, 5); ACCEPT_RDATA_BUFFER(parent->buffer, 6); recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata); PageSetLSN(current->page, recptr); PageSetLSN(npage, recptr); PageSetLSN(parent->page, recptr); } END_CRIT_SECTION(); /* Update local free-space cache and release new buffer */ SpGistSetLastUsedPage(index, nbuf); UnlockReleaseBuffer(nbuf); }
static void saveNodeLink | ( | Relation | index, | |
SPPageDesc * | parent, | |||
BlockNumber | blkno, | |||
OffsetNumber | offnum | |||
) | [static] |
Definition at line 186 of file spgdoinsert.c.
References SPPageDesc::buffer, MarkBufferDirty(), SPPageDesc::node, SPPageDesc::offnum, SPPageDesc::page, PageGetItem, PageGetItemId, and spgUpdateNodeLink().
Referenced by addLeafTuple(), doPickSplit(), moveLeafs(), and spgAddNodeAction().
{ SpGistInnerTuple innerTuple; innerTuple = (SpGistInnerTuple) PageGetItem(parent->page, PageGetItemId(parent->page, parent->offnum)); spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum); MarkBufferDirty(parent->buffer); }
static void setRedirectionTuple | ( | SPPageDesc * | current, | |
OffsetNumber | position, | |||
BlockNumber | blkno, | |||
OffsetNumber | offnum | |||
) | [static] |
Definition at line 568 of file spgdoinsert.c.
References Assert, ItemPointerGetBlockNumber, ItemPointerSet, SPPageDesc::page, PageGetItem, PageGetItemId, SpGistDeadTupleData::pointer, SPGIST_METAPAGE_BLKNO, SPGIST_REDIRECT, and SpGistDeadTupleData::tupstate.
Referenced by doPickSplit().
{ SpGistDeadTuple dt; dt = (SpGistDeadTuple) PageGetItem(current->page, PageGetItemId(current->page, position)); Assert(dt->tupstate == SPGIST_REDIRECT); Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO); ItemPointerSet(&dt->pointer, blkno, offnum); }
static void spgAddNodeAction | ( | Relation | index, | |
SpGistState * | state, | |||
SpGistInnerTuple | innerTuple, | |||
SPPageDesc * | current, | |||
SPPageDesc * | parent, | |||
int | nodeN, | |||
Datum | nodeLabel | |||
) | [static] |
Definition at line 1491 of file spgdoinsert.c.
References ACCEPT_RDATA_BUFFER, ACCEPT_RDATA_DATA, addNode(), Assert, SPPageDesc::blkno, spgxlogAddNode::blkno, spgxlogAddNode::blknoNew, spgxlogAddNode::blknoParent, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage, elog, END_CRIT_SECTION, ERROR, GBUF_INNER_PARITY, InvalidBlockNumber, InvalidBuffer, InvalidOffsetNumber, SpGistState::isBuild, MarkBufferDirty(), spgxlogAddNode::newPage, SPPageDesc::node, spgxlogAddNode::node, spgxlogAddNode::nodeI, NULL, SPPageDesc::offnum, spgxlogAddNode::offnum, spgxlogAddNode::offnumNew, spgxlogAddNode::offnumParent, SPPageDesc::page, PageAddItem(), PageGetExactFreeSpace(), PageIndexTupleDelete(), PageSetLSN, RelationData::rd_node, RelationNeedsWAL, saveNodeLink(), SpGistDeadTupleData::size, SpGistInnerTupleData::size, spgFormDeadTuple(), SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistBlockIsRoot, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistPageGetOpaque, SpGistPageStoresNulls, SpGistSetLastUsedPage(), START_CRIT_SECTION, spgxlogAddNode::stateSrc, STORE_STATE, UnlockReleaseBuffer(), XLOG_SPGIST_ADD_NODE, and XLogInsert().
Referenced by spgdoinsert().
{ SpGistInnerTuple newInnerTuple; XLogRecData rdata[5]; spgxlogAddNode xlrec; /* Should not be applied to nulls */ Assert(!SpGistPageStoresNulls(current->page)); /* Construct new inner tuple with additional node */ newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN); /* Prepare WAL record */ xlrec.node = index->rd_node; STORE_STATE(state, xlrec.stateSrc); xlrec.blkno = current->blkno; xlrec.offnum = current->offnum; /* we don't fill these unless we need to change the parent downlink */ xlrec.blknoParent = InvalidBlockNumber; xlrec.offnumParent = InvalidOffsetNumber; xlrec.nodeI = 0; /* we don't fill these unless tuple has to be moved */ xlrec.blknoNew = InvalidBlockNumber; xlrec.offnumNew = InvalidOffsetNumber; xlrec.newPage = false; ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); /* we assume sizeof(xlrec) is at least int-aligned */ ACCEPT_RDATA_DATA(newInnerTuple, newInnerTuple->size, 1); ACCEPT_RDATA_BUFFER(current->buffer, 2); if (PageGetExactFreeSpace(current->page) >= newInnerTuple->size - innerTuple->size) { /* * We can replace the inner tuple by new version in-place */ START_CRIT_SECTION(); PageIndexTupleDelete(current->page, current->offnum); if (PageAddItem(current->page, (Item) newInnerTuple, newInnerTuple->size, current->offnum, false, false) != current->offnum) elog(ERROR, "failed to add item of size %u to SPGiST index page", newInnerTuple->size); MarkBufferDirty(current->buffer); if (RelationNeedsWAL(index)) { XLogRecPtr recptr; recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata); PageSetLSN(current->page, recptr); } END_CRIT_SECTION(); } else { /* * move inner tuple to another page, and update parent */ SpGistDeadTuple dt; SPPageDesc saveCurrent; /* * It should not be possible to get here for the root page, since we * allow only one inner tuple on the root page, and spgFormInnerTuple * always checks that inner tuples don't exceed the size of a page. */ if (SpGistBlockIsRoot(current->blkno)) elog(ERROR, "cannot enlarge root tuple any more"); Assert(parent->buffer != InvalidBuffer); saveCurrent = *current; xlrec.blknoParent = parent->blkno; xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; /* * obtain new buffer with the same parity as current, since it will be * a child of same parent tuple */ current->buffer = SpGistGetBuffer(index, GBUF_INNER_PARITY(current->blkno), newInnerTuple->size + sizeof(ItemIdData), &xlrec.newPage); current->blkno = BufferGetBlockNumber(current->buffer); current->page = BufferGetPage(current->buffer); xlrec.blknoNew = current->blkno; /* * Let's just make real sure new current isn't same as old. Right now * that's impossible, but if SpGistGetBuffer ever got smart enough to * delete placeholder tuples before checking space, maybe it wouldn't * be impossible. The case would appear to work except that WAL * replay would be subtly wrong, so I think a mere assert isn't enough * here. */ if (xlrec.blknoNew == xlrec.blkno) elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer"); /* * New current and parent buffer will both be modified; but note that * parent buffer could be same as either new or old current. */ ACCEPT_RDATA_BUFFER(current->buffer, 3); if (parent->buffer != current->buffer && parent->buffer != saveCurrent.buffer) ACCEPT_RDATA_BUFFER(parent->buffer, 4); START_CRIT_SECTION(); /* insert new ... */ xlrec.offnumNew = current->offnum = SpGistPageAddNewItem(state, current->page, (Item) newInnerTuple, newInnerTuple->size, NULL, false); MarkBufferDirty(current->buffer); /* update parent's downlink and mark parent page dirty */ saveNodeLink(index, parent, current->blkno, current->offnum); /* * Replace old tuple with a placeholder or redirection tuple. Unless * doing an index build, we have to insert a redirection tuple for * possible concurrent scans. We can't just delete it in any case, * because that could change the offsets of other tuples on the page, * breaking downlinks from their parents. */ if (state->isBuild) dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER, InvalidBlockNumber, InvalidOffsetNumber); else dt = spgFormDeadTuple(state, SPGIST_REDIRECT, current->blkno, current->offnum); PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum); if (PageAddItem(saveCurrent.page, (Item) dt, dt->size, saveCurrent.offnum, false, false) != saveCurrent.offnum) elog(ERROR, "failed to add item of size %u to SPGiST index page", dt->size); if (state->isBuild) SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++; else SpGistPageGetOpaque(saveCurrent.page)->nRedirection++; MarkBufferDirty(saveCurrent.buffer); if (RelationNeedsWAL(index)) { XLogRecPtr recptr; recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata); /* we don't bother to check if any of these are redundant */ PageSetLSN(current->page, recptr); PageSetLSN(parent->page, recptr); PageSetLSN(saveCurrent.page, recptr); } END_CRIT_SECTION(); /* Release saveCurrent if it's not same as current or parent */ if (saveCurrent.buffer != current->buffer && saveCurrent.buffer != parent->buffer) { SpGistSetLastUsedPage(index, saveCurrent.buffer); UnlockReleaseBuffer(saveCurrent.buffer); } } }
void spgdoinsert | ( | Relation | index, | |
SpGistState * | state, | |||
ItemPointer | heapPtr, | |||
Datum | datum, | |||
bool | isnull | |||
) |
Definition at line 1842 of file spgdoinsert.c.
References addLeafTuple(), spgChooseOut::addNode, SpGistInnerTupleData::allTheSame, spgChooseIn::allTheSame, Assert, SpGistTypeDesc::attlen, SpGistState::attType, SPPageDesc::blkno, SPPageDesc::buffer, BUFFER_LOCK_EXCLUSIVE, BufferGetBlockNumber(), BufferGetPage, CHECK_FOR_INTERRUPTS, checkSplitConditions(), SpGistState::config, spgChooseIn::datum, doPickSplit(), elog, ereport, errcode(), errhint(), errmsg(), ERROR, FunctionCall2Coll(), GBUF_LEAF, GBUF_NULLS, spgChooseIn::hasPrefix, index_getprocinfo(), InvalidBlockNumber, InvalidBuffer, spgChooseIn::leafDatum, spgChooseIn::level, LockBuffer(), spgConfigOut::longValuesOK, spgChooseOut::matchNode, Min, moveLeafs(), SpGistInnerTupleData::nNodes, spgChooseIn::nNodes, SPPageDesc::node, spgChooseIn::nodeLabels, NULL, SPPageDesc::offnum, SPPageDesc::page, PageGetItem, PageGetItemId, pfree(), PG_DETOAST_DATUM, PointerGetDatum, spgChooseIn::prefixDatum, SpGistInnerTupleData::prefixSize, random(), RelationData::rd_indcollation, ReadBuffer(), RelationGetRelationName, spgChooseOut::result, spgChooseOut::resultType, SGDTSIZE, SGITDATUM, SGLTHDRSZ, SpGistLeafTupleData::size, spgAddNode, spgAddNodeAction(), spgExtractNodeLabels(), spgFormLeafTuple(), SPGIST_CHOOSE_PROC, SPGIST_NULL_BLKNO, SPGIST_PAGE_CAPACITY, SpGistGetBuffer(), SpGistGetTypeSize(), SpGistPageGetFreeSpace, SpGistPageIsLeaf, SpGistPageStoresNulls, SpGistSetLastUsedPage(), spgMatchNode, spgMatchNodeAction(), spgSplitNodeAction(), spgSplitTuple, and UnlockReleaseBuffer().
Referenced by spginsert(), and spgistBuildCallback().
{ int level = 0; Datum leafDatum; int leafSize; SPPageDesc current, parent; FmgrInfo *procinfo = NULL; /* * Look up FmgrInfo of the user-defined choose function once, to save * cycles in the loop below. */ if (!isnull) procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC); /* * Since we don't use index_form_tuple in this AM, we have to make sure * value to be inserted is not toasted; FormIndexDatum doesn't guarantee * that. */ if (!isnull && state->attType.attlen == -1) datum = PointerGetDatum(PG_DETOAST_DATUM(datum)); leafDatum = datum; /* * Compute space needed for a leaf tuple containing the given datum. * * If it isn't gonna fit, and the opclass can't reduce the datum size by * suffixing, bail out now rather than getting into an endless loop. */ if (!isnull) leafSize = SGLTHDRSZ + sizeof(ItemIdData) + SpGistGetTypeSize(&state->attType, leafDatum); else leafSize = SGDTSIZE + sizeof(ItemIdData); if (leafSize > SPGIST_PAGE_CAPACITY && !state->config.longValuesOK) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("index row size %lu exceeds maximum %lu for index \"%s\"", (unsigned long) (leafSize - sizeof(ItemIdData)), (unsigned long) (SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)), RelationGetRelationName(index)), errhint("Values larger than a buffer page cannot be indexed."))); /* Initialize "current" to the appropriate root page */ current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO; current.buffer = InvalidBuffer; current.page = NULL; current.offnum = FirstOffsetNumber; current.node = -1; /* "parent" is invalid for the moment */ parent.blkno = InvalidBlockNumber; parent.buffer = InvalidBuffer; parent.page = NULL; parent.offnum = InvalidOffsetNumber; parent.node = -1; for (;;) { bool isNew = false; /* * Bail out if query cancel is pending. We must have this somewhere * in the loop since a broken opclass could produce an infinite * picksplit loop. */ CHECK_FOR_INTERRUPTS(); if (current.blkno == InvalidBlockNumber) { /* * Create a leaf page. If leafSize is too large to fit on a page, * we won't actually use the page yet, but it simplifies the API * for doPickSplit to always have a leaf page at hand; so just * quietly limit our request to a page size. */ current.buffer = SpGistGetBuffer(index, GBUF_LEAF | (isnull ? GBUF_NULLS : 0), Min(leafSize, SPGIST_PAGE_CAPACITY), &isNew); current.blkno = BufferGetBlockNumber(current.buffer); } else if (parent.buffer == InvalidBuffer || current.blkno != parent.blkno) { current.buffer = ReadBuffer(index, current.blkno); LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE); } else { /* inner tuple can be stored on the same page as parent one */ current.buffer = parent.buffer; } current.page = BufferGetPage(current.buffer); /* should not arrive at a page of the wrong type */ if (isnull ? !SpGistPageStoresNulls(current.page) : SpGistPageStoresNulls(current.page)) elog(ERROR, "SPGiST index page %u has wrong nulls flag", current.blkno); if (SpGistPageIsLeaf(current.page)) { SpGistLeafTuple leafTuple; int nToSplit, sizeToSplit; leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull); if (leafTuple->size + sizeof(ItemIdData) <= SpGistPageGetFreeSpace(current.page, 1)) { /* it fits on page, so insert it and we're done */ addLeafTuple(index, state, leafTuple, ¤t, &parent, isnull, isNew); break; } else if ((sizeToSplit = checkSplitConditions(index, state, ¤t, &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 && nToSplit < 64 && leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY) { /* * the amount of data is pretty small, so just move the whole * chain to another leaf page rather than splitting it. */ Assert(!isNew); moveLeafs(index, state, ¤t, &parent, leafTuple, isnull); break; /* we're done */ } else { /* picksplit */ if (doPickSplit(index, state, ¤t, &parent, leafTuple, level, isnull, isNew)) break; /* doPickSplit installed new tuples */ /* leaf tuple will not be inserted yet */ pfree(leafTuple); /* * current now describes new inner tuple, go insert into it */ Assert(!SpGistPageIsLeaf(current.page)); goto process_inner_tuple; } } else /* non-leaf page */ { /* * Apply the opclass choose function to figure out how to insert * the given datum into the current inner tuple. */ SpGistInnerTuple innerTuple; spgChooseIn in; spgChooseOut out; /* * spgAddNode and spgSplitTuple cases will loop back to here to * complete the insertion operation. Just in case the choose * function is broken and produces add or split requests * repeatedly, check for query cancel. */ process_inner_tuple: CHECK_FOR_INTERRUPTS(); innerTuple = (SpGistInnerTuple) PageGetItem(current.page, PageGetItemId(current.page, current.offnum)); in.datum = datum; in.leafDatum = leafDatum; in.level = level; in.allTheSame = innerTuple->allTheSame; in.hasPrefix = (innerTuple->prefixSize > 0); in.prefixDatum = SGITDATUM(innerTuple, state); in.nNodes = innerTuple->nNodes; in.nodeLabels = spgExtractNodeLabels(state, innerTuple); memset(&out, 0, sizeof(out)); if (!isnull) { /* use user-defined choose method */ FunctionCall2Coll(procinfo, index->rd_indcollation[0], PointerGetDatum(&in), PointerGetDatum(&out)); } else { /* force "match" action (to insert to random subnode) */ out.resultType = spgMatchNode; } if (innerTuple->allTheSame) { /* * It's not allowed to do an AddNode at an allTheSame tuple. * Opclass must say "match", in which case we choose a random * one of the nodes to descend into, or "split". */ if (out.resultType == spgAddNode) elog(ERROR, "cannot add a node to an allTheSame inner tuple"); else if (out.resultType == spgMatchNode) out.result.matchNode.nodeN = random() % innerTuple->nNodes; } switch (out.resultType) { case spgMatchNode: /* Descend to N'th child node */ spgMatchNodeAction(index, state, innerTuple, ¤t, &parent, out.result.matchNode.nodeN); /* Adjust level as per opclass request */ level += out.result.matchNode.levelAdd; /* Replace leafDatum and recompute leafSize */ if (!isnull) { leafDatum = out.result.matchNode.restDatum; leafSize = SGLTHDRSZ + sizeof(ItemIdData) + SpGistGetTypeSize(&state->attType, leafDatum); } /* * Loop around and attempt to insert the new leafDatum at * "current" (which might reference an existing child * tuple, or might be invalid to force us to find a new * page for the tuple). * * Note: if the opclass sets longValuesOK, we rely on the * choose function to eventually shorten the leafDatum * enough to fit on a page. We could add a test here to * complain if the datum doesn't get visibly shorter each * time, but that could get in the way of opclasses that * "simplify" datums in a way that doesn't necessarily * lead to physical shortening on every cycle. */ break; case spgAddNode: /* AddNode is not sensible if nodes don't have labels */ if (in.nodeLabels == NULL) elog(ERROR, "cannot add a node to an inner tuple without node labels"); /* Add node to inner tuple, per request */ spgAddNodeAction(index, state, innerTuple, ¤t, &parent, out.result.addNode.nodeN, out.result.addNode.nodeLabel); /* * Retry insertion into the enlarged node. We assume that * we'll get a MatchNode result this time. */ goto process_inner_tuple; break; case spgSplitTuple: /* Split inner tuple, per request */ spgSplitNodeAction(index, state, innerTuple, ¤t, &out); /* Retry insertion into the split node */ goto process_inner_tuple; break; default: elog(ERROR, "unrecognized SPGiST choose result: %d", (int) out.resultType); break; } } } /* end loop */ /* * Release any buffers we're still holding. Beware of possibility that * current and parent reference same buffer. */ if (current.buffer != InvalidBuffer) { SpGistSetLastUsedPage(index, current.buffer); UnlockReleaseBuffer(current.buffer); } if (parent.buffer != InvalidBuffer && parent.buffer != current.buffer) { SpGistSetLastUsedPage(index, parent.buffer); UnlockReleaseBuffer(parent.buffer); } }
static void spgMatchNodeAction | ( | Relation | index, | |
SpGistState * | state, | |||
SpGistInnerTuple | innerTuple, | |||
SPPageDesc * | current, | |||
SPPageDesc * | parent, | |||
int | nodeN | |||
) | [static] |
Definition at line 1437 of file spgdoinsert.c.
References SPPageDesc::blkno, SPPageDesc::buffer, elog, ERROR, i, InvalidBuffer, ItemPointerGetBlockNumber, ItemPointerGetOffsetNumber, ItemPointerIsValid, SPPageDesc::node, SPPageDesc::offnum, SPPageDesc::page, SGITITERATE, SpGistSetLastUsedPage(), IndexTupleData::t_tid, and UnlockReleaseBuffer().
Referenced by spgdoinsert().
{ int i; SpGistNodeTuple node; /* Release previous parent buffer if any */ if (parent->buffer != InvalidBuffer && parent->buffer != current->buffer) { SpGistSetLastUsedPage(index, parent->buffer); UnlockReleaseBuffer(parent->buffer); } /* Repoint parent to specified node of current inner tuple */ parent->blkno = current->blkno; parent->buffer = current->buffer; parent->page = current->page; parent->offnum = current->offnum; parent->node = nodeN; /* Locate that node */ SGITITERATE(innerTuple, i, node) { if (i == nodeN) break; } if (i != nodeN) elog(ERROR, "failed to find requested node %d in SPGiST inner tuple", nodeN); /* Point current to the downlink location, if any */ if (ItemPointerIsValid(&node->t_tid)) { current->blkno = ItemPointerGetBlockNumber(&node->t_tid); current->offnum = ItemPointerGetOffsetNumber(&node->t_tid); } else { /* Downlink is empty, so we'll need to find a new page */ current->blkno = InvalidBlockNumber; current->offnum = InvalidOffsetNumber; } current->buffer = InvalidBuffer; current->page = NULL; }
void spgPageIndexMultiDelete | ( | SpGistState * | state, | |
Page | page, | |||
OffsetNumber * | itemnos, | |||
int | nitems, | |||
int | firststate, | |||
int | reststate, | |||
BlockNumber | blkno, | |||
OffsetNumber | offnum | |||
) |
Definition at line 128 of file spgdoinsert.c.
References cmpOffsetNumbers(), elog, ERROR, i, NULL, PageAddItem(), PageIndexMultiDelete(), palloc(), pfree(), qsort, SpGistDeadTupleData::size, spgFormDeadTuple(), SPGIST_PLACEHOLDER, SPGIST_REDIRECT, SpGistPageGetOpaque, and SpGistDeadTupleData::tupstate.
Referenced by doPickSplit(), moveLeafs(), spgRedoMoveLeafs(), spgRedoPickSplit(), spgRedoVacuumLeaf(), and vacuumLeafPage().
{ OffsetNumber firstItem; OffsetNumber *sortednos; SpGistDeadTuple tuple = NULL; int i; if (nitems == 0) return; /* nothing to do */ /* * For efficiency we want to use PageIndexMultiDelete, which requires the * targets to be listed in sorted order, so we have to sort the itemnos * array. (This also greatly simplifies the math for reinserting the * replacement tuples.) However, we must not scribble on the caller's * array, so we have to make a copy. */ sortednos = (OffsetNumber *) palloc(sizeof(OffsetNumber) * nitems); memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems); if (nitems > 1) qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers); PageIndexMultiDelete(page, sortednos, nitems); firstItem = itemnos[0]; for (i = 0; i < nitems; i++) { OffsetNumber itemno = sortednos[i]; int tupstate; tupstate = (itemno == firstItem) ? firststate : reststate; if (tuple == NULL || tuple->tupstate != tupstate) tuple = spgFormDeadTuple(state, tupstate, blkno, offnum); if (PageAddItem(page, (Item) tuple, tuple->size, itemno, false, false) != itemno) elog(ERROR, "failed to add item of size %u to SPGiST index page", tuple->size); if (tupstate == SPGIST_REDIRECT) SpGistPageGetOpaque(page)->nRedirection++; else if (tupstate == SPGIST_PLACEHOLDER) SpGistPageGetOpaque(page)->nPlaceholder++; } pfree(sortednos); }
static void spgSplitNodeAction | ( | Relation | index, | |
SpGistState * | state, | |||
SpGistInnerTuple | innerTuple, | |||
SPPageDesc * | current, | |||
spgChooseOut * | out | |||
) | [static] |
Definition at line 1680 of file spgdoinsert.c.
References ACCEPT_RDATA_BUFFER, ACCEPT_RDATA_DATA, SpGistInnerTupleData::allTheSame, Assert, SPPageDesc::blkno, spgxlogSplitTuple::blknoPostfix, spgxlogSplitTuple::blknoPrefix, SPPageDesc::buffer, BufferGetBlockNumber(), BufferGetPage, elog, END_CRIT_SECTION, ERROR, GBUF_INNER_PARITY, i, InvalidBuffer, MarkBufferDirty(), spgxlogSplitTuple::newPage, SpGistInnerTupleData::nNodes, spgxlogSplitTuple::node, NULL, SPPageDesc::offnum, spgxlogSplitTuple::offnumPostfix, spgxlogSplitTuple::offnumPrefix, SPPageDesc::page, PageAddItem(), PageGetItem, PageGetItemId, PageIndexTupleDelete(), PageSetLSN, palloc(), RelationData::rd_node, RelationNeedsWAL, spgChooseOut::result, SGITITERATE, SpGistInnerTupleData::size, spgFormInnerTuple(), spgFormNodeTuple(), SpGistBlockIsRoot, SpGistGetBuffer(), SpGistPageAddNewItem(), SpGistPageGetFreeSpace, SpGistPageStoresNulls, SpGistSetLastUsedPage(), spgUpdateNodeLink(), spgChooseOut::splitTuple, START_CRIT_SECTION, UnlockReleaseBuffer(), XLOG_SPGIST_SPLIT_TUPLE, and XLogInsert().
Referenced by spgdoinsert().
{ SpGistInnerTuple prefixTuple, postfixTuple; SpGistNodeTuple node, *nodes; BlockNumber postfixBlkno; OffsetNumber postfixOffset; int i; XLogRecData rdata[5]; spgxlogSplitTuple xlrec; Buffer newBuffer = InvalidBuffer; /* Should not be applied to nulls */ Assert(!SpGistPageStoresNulls(current->page)); /* * Construct new prefix tuple, containing a single node with the specified * label. (We'll update the node's downlink to point to the new postfix * tuple, below.) */ node = spgFormNodeTuple(state, out->result.splitTuple.nodeLabel, false); prefixTuple = spgFormInnerTuple(state, out->result.splitTuple.prefixHasPrefix, out->result.splitTuple.prefixPrefixDatum, 1, &node); /* it must fit in the space that innerTuple now occupies */ if (prefixTuple->size > innerTuple->size) elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix"); /* * Construct new postfix tuple, containing all nodes of innerTuple with * same node datums, but with the prefix specified by the picksplit * function. */ nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes); SGITITERATE(innerTuple, i, node) { nodes[i] = node; } postfixTuple = spgFormInnerTuple(state, out->result.splitTuple.postfixHasPrefix, out->result.splitTuple.postfixPrefixDatum, innerTuple->nNodes, nodes); /* Postfix tuple is allTheSame if original tuple was */ postfixTuple->allTheSame = innerTuple->allTheSame; /* prep data for WAL record */ xlrec.node = index->rd_node; xlrec.newPage = false; ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); /* we assume sizeof(xlrec) is at least int-aligned */ ACCEPT_RDATA_DATA(prefixTuple, prefixTuple->size, 1); ACCEPT_RDATA_DATA(postfixTuple, postfixTuple->size, 2); ACCEPT_RDATA_BUFFER(current->buffer, 3); /* * If we can't fit both tuples on the current page, get a new page for the * postfix tuple. In particular, can't split to the root page. * * For the space calculation, note that prefixTuple replaces innerTuple * but postfixTuple will be a new entry. */ if (SpGistBlockIsRoot(current->blkno) || SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size < prefixTuple->size + postfixTuple->size + sizeof(ItemIdData)) { /* * Choose page with next triple parity, because postfix tuple is a * child of prefix one */ newBuffer = SpGistGetBuffer(index, GBUF_INNER_PARITY(current->blkno + 1), postfixTuple->size + sizeof(ItemIdData), &xlrec.newPage); ACCEPT_RDATA_BUFFER(newBuffer, 4); } START_CRIT_SECTION(); /* * Replace old tuple by prefix tuple */ PageIndexTupleDelete(current->page, current->offnum); xlrec.offnumPrefix = PageAddItem(current->page, (Item) prefixTuple, prefixTuple->size, current->offnum, false, false); if (xlrec.offnumPrefix != current->offnum) elog(ERROR, "failed to add item of size %u to SPGiST index page", prefixTuple->size); xlrec.blknoPrefix = current->blkno; /* * put postfix tuple into appropriate page */ if (newBuffer == InvalidBuffer) { xlrec.blknoPostfix = postfixBlkno = current->blkno; xlrec.offnumPostfix = postfixOffset = SpGistPageAddNewItem(state, current->page, (Item) postfixTuple, postfixTuple->size, NULL, false); } else { xlrec.blknoPostfix = postfixBlkno = BufferGetBlockNumber(newBuffer); xlrec.offnumPostfix = postfixOffset = SpGistPageAddNewItem(state, BufferGetPage(newBuffer), (Item) postfixTuple, postfixTuple->size, NULL, false); MarkBufferDirty(newBuffer); } /* * And set downlink pointer in the prefix tuple to point to postfix tuple. * (We can't avoid this step by doing the above two steps in opposite * order, because there might not be enough space on the page to insert * the postfix tuple first.) We have to update the local copy of the * prefixTuple too, because that's what will be written to WAL. */ spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset); prefixTuple = (SpGistInnerTuple) PageGetItem(current->page, PageGetItemId(current->page, current->offnum)); spgUpdateNodeLink(prefixTuple, 0, postfixBlkno, postfixOffset); MarkBufferDirty(current->buffer); if (RelationNeedsWAL(index)) { XLogRecPtr recptr; recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata); PageSetLSN(current->page, recptr); if (newBuffer != InvalidBuffer) { PageSetLSN(BufferGetPage(newBuffer), recptr); } } END_CRIT_SECTION(); /* Update local free-space cache and release buffer */ if (newBuffer != InvalidBuffer) { SpGistSetLastUsedPage(index, newBuffer); UnlockReleaseBuffer(newBuffer); } }
void spgUpdateNodeLink | ( | SpGistInnerTuple | tup, | |
int | nodeN, | |||
BlockNumber | blkno, | |||
OffsetNumber | offset | |||
) |
Definition at line 48 of file spgdoinsert.c.
References elog, ERROR, i, ItemPointerSet, SGITITERATE, and IndexTupleData::t_tid.
Referenced by saveNodeLink(), spgRedoAddLeaf(), spgRedoAddNode(), spgRedoMoveLeafs(), spgRedoPickSplit(), and spgSplitNodeAction().
{ int i; SpGistNodeTuple node; SGITITERATE(tup, i, node) { if (i == nodeN) { ItemPointerSet(&node->t_tid, blkno, offset); return; } } elog(ERROR, "failed to find requested node %d in SPGiST inner tuple", nodeN); }