Header And Logo

PostgreSQL
| The world's most advanced open source database.

ginbtree.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * ginbtree.c
00004  *    page utilities routines for the postgres inverted index access method.
00005  *
00006  *
00007  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00008  * Portions Copyright (c) 1994, Regents of the University of California
00009  *
00010  * IDENTIFICATION
00011  *          src/backend/access/gin/ginbtree.c
00012  *-------------------------------------------------------------------------
00013  */
00014 
00015 #include "postgres.h"
00016 
00017 #include "access/gin_private.h"
00018 #include "miscadmin.h"
00019 #include "utils/rel.h"
00020 
00021 /*
00022  * Locks buffer by needed method for search.
00023  */
00024 static int
00025 ginTraverseLock(Buffer buffer, bool searchMode)
00026 {
00027     Page        page;
00028     int         access = GIN_SHARE;
00029 
00030     LockBuffer(buffer, GIN_SHARE);
00031     page = BufferGetPage(buffer);
00032     if (GinPageIsLeaf(page))
00033     {
00034         if (searchMode == FALSE)
00035         {
00036             /* we should relock our page */
00037             LockBuffer(buffer, GIN_UNLOCK);
00038             LockBuffer(buffer, GIN_EXCLUSIVE);
00039 
00040             /* But root can become non-leaf during relock */
00041             if (!GinPageIsLeaf(page))
00042             {
00043                 /* restore old lock type (very rare) */
00044                 LockBuffer(buffer, GIN_UNLOCK);
00045                 LockBuffer(buffer, GIN_SHARE);
00046             }
00047             else
00048                 access = GIN_EXCLUSIVE;
00049         }
00050     }
00051 
00052     return access;
00053 }
00054 
00055 GinBtreeStack *
00056 ginPrepareFindLeafPage(GinBtree btree, BlockNumber blkno)
00057 {
00058     GinBtreeStack *stack = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
00059 
00060     stack->blkno = blkno;
00061     stack->buffer = ReadBuffer(btree->index, stack->blkno);
00062     stack->parent = NULL;
00063     stack->predictNumber = 1;
00064 
00065     ginTraverseLock(stack->buffer, btree->searchMode);
00066 
00067     return stack;
00068 }
00069 
00070 /*
00071  * Locates leaf page contained tuple
00072  */
00073 GinBtreeStack *
00074 ginFindLeafPage(GinBtree btree, GinBtreeStack *stack)
00075 {
00076     bool        isfirst = TRUE;
00077     BlockNumber rootBlkno;
00078 
00079     if (!stack)
00080         stack = ginPrepareFindLeafPage(btree, GIN_ROOT_BLKNO);
00081     rootBlkno = stack->blkno;
00082 
00083     for (;;)
00084     {
00085         Page        page;
00086         BlockNumber child;
00087         int         access = GIN_SHARE;
00088 
00089         stack->off = InvalidOffsetNumber;
00090 
00091         page = BufferGetPage(stack->buffer);
00092 
00093         if (isfirst)
00094         {
00095             if (GinPageIsLeaf(page) && !btree->searchMode)
00096                 access = GIN_EXCLUSIVE;
00097             isfirst = FALSE;
00098         }
00099         else
00100             access = ginTraverseLock(stack->buffer, btree->searchMode);
00101 
00102         /*
00103          * ok, page is correctly locked, we should check to move right ..,
00104          * root never has a right link, so small optimization
00105          */
00106         while (btree->fullScan == FALSE && stack->blkno != rootBlkno &&
00107                btree->isMoveRight(btree, page))
00108         {
00109             BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
00110 
00111             if (rightlink == InvalidBlockNumber)
00112                 /* rightmost page */
00113                 break;
00114 
00115             stack->blkno = rightlink;
00116             LockBuffer(stack->buffer, GIN_UNLOCK);
00117             stack->buffer = ReleaseAndReadBuffer(stack->buffer, btree->index, stack->blkno);
00118             LockBuffer(stack->buffer, access);
00119             page = BufferGetPage(stack->buffer);
00120         }
00121 
00122         if (GinPageIsLeaf(page))    /* we found, return locked page */
00123             return stack;
00124 
00125         /* now we have correct buffer, try to find child */
00126         child = btree->findChildPage(btree, stack);
00127 
00128         LockBuffer(stack->buffer, GIN_UNLOCK);
00129         Assert(child != InvalidBlockNumber);
00130         Assert(stack->blkno != child);
00131 
00132         if (btree->searchMode)
00133         {
00134             /* in search mode we may forget path to leaf */
00135             stack->blkno = child;
00136             stack->buffer = ReleaseAndReadBuffer(stack->buffer, btree->index, stack->blkno);
00137         }
00138         else
00139         {
00140             GinBtreeStack *ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
00141 
00142             ptr->parent = stack;
00143             stack = ptr;
00144             stack->blkno = child;
00145             stack->buffer = ReadBuffer(btree->index, stack->blkno);
00146             stack->predictNumber = 1;
00147         }
00148     }
00149 }
00150 
00151 void
00152 freeGinBtreeStack(GinBtreeStack *stack)
00153 {
00154     while (stack)
00155     {
00156         GinBtreeStack *tmp = stack->parent;
00157 
00158         if (stack->buffer != InvalidBuffer)
00159             ReleaseBuffer(stack->buffer);
00160 
00161         pfree(stack);
00162         stack = tmp;
00163     }
00164 }
00165 
00166 /*
00167  * Try to find parent for current stack position, returns correct
00168  * parent and child's offset in  stack->parent.
00169  * Function should never release root page to prevent conflicts
00170  * with vacuum process
00171  */
00172 void
00173 ginFindParents(GinBtree btree, GinBtreeStack *stack,
00174                BlockNumber rootBlkno)
00175 {
00176 
00177     Page        page;
00178     Buffer      buffer;
00179     BlockNumber blkno,
00180                 leftmostBlkno;
00181     OffsetNumber offset;
00182     GinBtreeStack *root = stack->parent;
00183     GinBtreeStack *ptr;
00184 
00185     if (!root)
00186     {
00187         /* XLog mode... */
00188         root = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
00189         root->blkno = rootBlkno;
00190         root->buffer = ReadBuffer(btree->index, rootBlkno);
00191         LockBuffer(root->buffer, GIN_EXCLUSIVE);
00192         root->parent = NULL;
00193     }
00194     else
00195     {
00196         /*
00197          * find root, we should not release root page until update is
00198          * finished!!
00199          */
00200         while (root->parent)
00201         {
00202             ReleaseBuffer(root->buffer);
00203             root = root->parent;
00204         }
00205 
00206         Assert(root->blkno == rootBlkno);
00207         Assert(BufferGetBlockNumber(root->buffer) == rootBlkno);
00208         LockBuffer(root->buffer, GIN_EXCLUSIVE);
00209     }
00210     root->off = InvalidOffsetNumber;
00211 
00212     page = BufferGetPage(root->buffer);
00213     Assert(!GinPageIsLeaf(page));
00214 
00215     /* check trivial case */
00216     if ((root->off = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) != InvalidOffsetNumber)
00217     {
00218         stack->parent = root;
00219         return;
00220     }
00221 
00222     leftmostBlkno = blkno = btree->getLeftMostPage(btree, page);
00223     LockBuffer(root->buffer, GIN_UNLOCK);
00224     Assert(blkno != InvalidBlockNumber);
00225 
00226     for (;;)
00227     {
00228         buffer = ReadBuffer(btree->index, blkno);
00229         LockBuffer(buffer, GIN_EXCLUSIVE);
00230         page = BufferGetPage(buffer);
00231         if (GinPageIsLeaf(page))
00232             elog(ERROR, "Lost path");
00233 
00234         leftmostBlkno = btree->getLeftMostPage(btree, page);
00235 
00236         while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber)
00237         {
00238             blkno = GinPageGetOpaque(page)->rightlink;
00239             LockBuffer(buffer, GIN_UNLOCK);
00240             ReleaseBuffer(buffer);
00241             if (blkno == InvalidBlockNumber)
00242                 break;
00243             buffer = ReadBuffer(btree->index, blkno);
00244             LockBuffer(buffer, GIN_EXCLUSIVE);
00245             page = BufferGetPage(buffer);
00246         }
00247 
00248         if (blkno != InvalidBlockNumber)
00249         {
00250             ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
00251             ptr->blkno = blkno;
00252             ptr->buffer = buffer;
00253             ptr->parent = root; /* it's may be wrong, but in next call we will
00254                                  * correct */
00255             ptr->off = offset;
00256             stack->parent = ptr;
00257             return;
00258         }
00259 
00260         blkno = leftmostBlkno;
00261     }
00262 }
00263 
00264 /*
00265  * Insert value (stored in GinBtree) to tree described by stack
00266  *
00267  * During an index build, buildStats is non-null and the counters
00268  * it contains should be incremented as needed.
00269  *
00270  * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
00271  */
00272 void
00273 ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
00274 {
00275     GinBtreeStack *parent;
00276     BlockNumber rootBlkno;
00277     Page        page,
00278                 rpage,
00279                 lpage;
00280 
00281     /* extract root BlockNumber from stack */
00282     Assert(stack != NULL);
00283     parent = stack;
00284     while (parent->parent)
00285         parent = parent->parent;
00286     rootBlkno = parent->blkno;
00287     Assert(BlockNumberIsValid(rootBlkno));
00288 
00289     /* this loop crawls up the stack until the insertion is complete */
00290     for (;;)
00291     {
00292         XLogRecData *rdata;
00293         BlockNumber savedRightLink;
00294 
00295         page = BufferGetPage(stack->buffer);
00296         savedRightLink = GinPageGetOpaque(page)->rightlink;
00297 
00298         if (btree->isEnoughSpace(btree, stack->buffer, stack->off))
00299         {
00300             START_CRIT_SECTION();
00301             btree->placeToPage(btree, stack->buffer, stack->off, &rdata);
00302 
00303             MarkBufferDirty(stack->buffer);
00304 
00305             if (RelationNeedsWAL(btree->index))
00306             {
00307                 XLogRecPtr  recptr;
00308 
00309                 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
00310                 PageSetLSN(page, recptr);
00311             }
00312 
00313             LockBuffer(stack->buffer, GIN_UNLOCK);
00314             END_CRIT_SECTION();
00315 
00316             freeGinBtreeStack(stack);
00317 
00318             return;
00319         }
00320         else
00321         {
00322             Buffer      rbuffer = GinNewBuffer(btree->index);
00323             Page        newlpage;
00324 
00325             /*
00326              * newlpage is a pointer to memory page, it doesn't associate with
00327              * buffer, stack->buffer should be untouched
00328              */
00329             newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
00330 
00331             ((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
00332 
00333             /* During index build, count the newly-split page */
00334             if (buildStats)
00335             {
00336                 if (btree->isData)
00337                     buildStats->nDataPages++;
00338                 else
00339                     buildStats->nEntryPages++;
00340             }
00341 
00342             parent = stack->parent;
00343 
00344             if (parent == NULL)
00345             {
00346                 /*
00347                  * split root, so we need to allocate new left page and place
00348                  * pointer on root to left and right page
00349                  */
00350                 Buffer      lbuffer = GinNewBuffer(btree->index);
00351 
00352                 ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
00353                 ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
00354 
00355                 page = BufferGetPage(stack->buffer);
00356                 lpage = BufferGetPage(lbuffer);
00357                 rpage = BufferGetPage(rbuffer);
00358 
00359                 GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
00360                 GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
00361                 ((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
00362 
00363                 START_CRIT_SECTION();
00364 
00365                 GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
00366                 PageRestoreTempPage(newlpage, lpage);
00367                 btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
00368 
00369                 MarkBufferDirty(rbuffer);
00370                 MarkBufferDirty(lbuffer);
00371                 MarkBufferDirty(stack->buffer);
00372 
00373                 if (RelationNeedsWAL(btree->index))
00374                 {
00375                     XLogRecPtr  recptr;
00376 
00377                     recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
00378                     PageSetLSN(page, recptr);
00379                     PageSetLSN(lpage, recptr);
00380                     PageSetLSN(rpage, recptr);
00381                 }
00382 
00383                 UnlockReleaseBuffer(rbuffer);
00384                 UnlockReleaseBuffer(lbuffer);
00385                 LockBuffer(stack->buffer, GIN_UNLOCK);
00386                 END_CRIT_SECTION();
00387 
00388                 freeGinBtreeStack(stack);
00389 
00390                 /* During index build, count the newly-added root page */
00391                 if (buildStats)
00392                 {
00393                     if (btree->isData)
00394                         buildStats->nDataPages++;
00395                     else
00396                         buildStats->nEntryPages++;
00397                 }
00398 
00399                 return;
00400             }
00401             else
00402             {
00403                 /* split non-root page */
00404                 ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
00405                 ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
00406 
00407                 lpage = BufferGetPage(stack->buffer);
00408                 rpage = BufferGetPage(rbuffer);
00409 
00410                 GinPageGetOpaque(rpage)->rightlink = savedRightLink;
00411                 GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
00412 
00413                 START_CRIT_SECTION();
00414                 PageRestoreTempPage(newlpage, lpage);
00415 
00416                 MarkBufferDirty(rbuffer);
00417                 MarkBufferDirty(stack->buffer);
00418 
00419                 if (RelationNeedsWAL(btree->index))
00420                 {
00421                     XLogRecPtr  recptr;
00422 
00423                     recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
00424                     PageSetLSN(lpage, recptr);
00425                     PageSetLSN(rpage, recptr);
00426                 }
00427                 UnlockReleaseBuffer(rbuffer);
00428                 END_CRIT_SECTION();
00429             }
00430         }
00431 
00432         btree->isDelete = FALSE;
00433 
00434         /* search parent to lock */
00435         LockBuffer(parent->buffer, GIN_EXCLUSIVE);
00436 
00437         /* move right if it's needed */
00438         page = BufferGetPage(parent->buffer);
00439         while ((parent->off = btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber)
00440         {
00441             BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
00442 
00443             LockBuffer(parent->buffer, GIN_UNLOCK);
00444 
00445             if (rightlink == InvalidBlockNumber)
00446             {
00447                 /*
00448                  * rightmost page, but we don't find parent, we should use
00449                  * plain search...
00450                  */
00451                 ginFindParents(btree, stack, rootBlkno);
00452                 parent = stack->parent;
00453                 Assert(parent != NULL);
00454                 break;
00455             }
00456 
00457             parent->blkno = rightlink;
00458             parent->buffer = ReleaseAndReadBuffer(parent->buffer, btree->index, parent->blkno);
00459             LockBuffer(parent->buffer, GIN_EXCLUSIVE);
00460             page = BufferGetPage(parent->buffer);
00461         }
00462 
00463         UnlockReleaseBuffer(stack->buffer);
00464         pfree(stack);
00465         stack = parent;
00466     }
00467 }