Header And Logo

PostgreSQL
| The world's most advanced open source database.

gistbuildbuffers.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * gistbuildbuffers.c
00004  *    node buffer management functions for GiST buffering build algorithm.
00005  *
00006  *
00007  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00008  * Portions Copyright (c) 1994, Regents of the University of California
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/access/gist/gistbuildbuffers.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 #include "postgres.h"
00016 
00017 #include "access/genam.h"
00018 #include "access/gist_private.h"
00019 #include "catalog/index.h"
00020 #include "miscadmin.h"
00021 #include "storage/buffile.h"
00022 #include "storage/bufmgr.h"
00023 #include "utils/memutils.h"
00024 #include "utils/rel.h"
00025 
00026 static GISTNodeBufferPage *gistAllocateNewPageBuffer(GISTBuildBuffers *gfbb);
00027 static void gistAddLoadedBuffer(GISTBuildBuffers *gfbb,
00028                     GISTNodeBuffer *nodeBuffer);
00029 static void gistLoadNodeBuffer(GISTBuildBuffers *gfbb,
00030                    GISTNodeBuffer *nodeBuffer);
00031 static void gistUnloadNodeBuffer(GISTBuildBuffers *gfbb,
00032                      GISTNodeBuffer *nodeBuffer);
00033 static void gistPlaceItupToPage(GISTNodeBufferPage *pageBuffer,
00034                     IndexTuple item);
00035 static void gistGetItupFromPage(GISTNodeBufferPage *pageBuffer,
00036                     IndexTuple *item);
00037 static long gistBuffersGetFreeBlock(GISTBuildBuffers *gfbb);
00038 static void gistBuffersReleaseBlock(GISTBuildBuffers *gfbb, long blocknum);
00039 
00040 static void ReadTempFileBlock(BufFile *file, long blknum, void *ptr);
00041 static void WriteTempFileBlock(BufFile *file, long blknum, void *ptr);
00042 
00043 
00044 /*
00045  * Initialize GiST build buffers.
00046  */
00047 GISTBuildBuffers *
00048 gistInitBuildBuffers(int pagesPerBuffer, int levelStep, int maxLevel)
00049 {
00050     GISTBuildBuffers *gfbb;
00051     HASHCTL     hashCtl;
00052 
00053     gfbb = palloc(sizeof(GISTBuildBuffers));
00054     gfbb->pagesPerBuffer = pagesPerBuffer;
00055     gfbb->levelStep = levelStep;
00056 
00057     /*
00058      * Create a temporary file to hold buffer pages that are swapped out of
00059      * memory.
00060      */
00061     gfbb->pfile = BufFileCreateTemp(false);
00062     gfbb->nFileBlocks = 0;
00063 
00064     /* Initialize free page management. */
00065     gfbb->nFreeBlocks = 0;
00066     gfbb->freeBlocksLen = 32;
00067     gfbb->freeBlocks = (long *) palloc(gfbb->freeBlocksLen * sizeof(long));
00068 
00069     /*
00070      * Current memory context will be used for all in-memory data structures
00071      * of buffers which are persistent during buffering build.
00072      */
00073     gfbb->context = CurrentMemoryContext;
00074 
00075     /*
00076      * nodeBuffersTab hash is association between index blocks and it's
00077      * buffers.
00078      */
00079     hashCtl.keysize = sizeof(BlockNumber);
00080     hashCtl.entrysize = sizeof(GISTNodeBuffer);
00081     hashCtl.hcxt = CurrentMemoryContext;
00082     hashCtl.hash = tag_hash;
00083     hashCtl.match = memcmp;
00084     gfbb->nodeBuffersTab = hash_create("gistbuildbuffers",
00085                                        1024,
00086                                        &hashCtl,
00087                                        HASH_ELEM | HASH_CONTEXT
00088                                        | HASH_FUNCTION | HASH_COMPARE);
00089 
00090     gfbb->bufferEmptyingQueue = NIL;
00091 
00092     /*
00093      * Per-level node buffers lists for final buffers emptying process. Node
00094      * buffers are inserted here when they are created.
00095      */
00096     gfbb->buffersOnLevelsLen = 1;
00097     gfbb->buffersOnLevels = (List **) palloc(sizeof(List *) *
00098                                              gfbb->buffersOnLevelsLen);
00099     gfbb->buffersOnLevels[0] = NIL;
00100 
00101     /*
00102      * Block numbers of node buffers which last pages are currently loaded
00103      * into main memory.
00104      */
00105     gfbb->loadedBuffersLen = 32;
00106     gfbb->loadedBuffers = (GISTNodeBuffer **) palloc(gfbb->loadedBuffersLen *
00107                                                    sizeof(GISTNodeBuffer *));
00108     gfbb->loadedBuffersCount = 0;
00109 
00110     gfbb->rootlevel = maxLevel;
00111 
00112     return gfbb;
00113 }
00114 
00115 /*
00116  * Returns a node buffer for given block. The buffer is created if it
00117  * doesn't exist yet.
00118  */
00119 GISTNodeBuffer *
00120 gistGetNodeBuffer(GISTBuildBuffers *gfbb, GISTSTATE *giststate,
00121                   BlockNumber nodeBlocknum, int level)
00122 {
00123     GISTNodeBuffer *nodeBuffer;
00124     bool        found;
00125 
00126     /* Find node buffer in hash table */
00127     nodeBuffer = (GISTNodeBuffer *) hash_search(gfbb->nodeBuffersTab,
00128                                                 (const void *) &nodeBlocknum,
00129                                                 HASH_ENTER,
00130                                                 &found);
00131     if (!found)
00132     {
00133         /*
00134          * Node buffer wasn't found. Initialize the new buffer as empty.
00135          */
00136         MemoryContext oldcxt = MemoryContextSwitchTo(gfbb->context);
00137 
00138         /* nodeBuffer->nodeBlocknum is the hash key and was filled in already */
00139         nodeBuffer->blocksCount = 0;
00140         nodeBuffer->pageBlocknum = InvalidBlockNumber;
00141         nodeBuffer->pageBuffer = NULL;
00142         nodeBuffer->queuedForEmptying = false;
00143         nodeBuffer->level = level;
00144 
00145         /*
00146          * Add this buffer to the list of buffers on this level. Enlarge
00147          * buffersOnLevels array if needed.
00148          */
00149         if (level >= gfbb->buffersOnLevelsLen)
00150         {
00151             int         i;
00152 
00153             gfbb->buffersOnLevels =
00154                 (List **) repalloc(gfbb->buffersOnLevels,
00155                                    (level + 1) * sizeof(List *));
00156 
00157             /* initialize the enlarged portion */
00158             for (i = gfbb->buffersOnLevelsLen; i <= level; i++)
00159                 gfbb->buffersOnLevels[i] = NIL;
00160             gfbb->buffersOnLevelsLen = level + 1;
00161         }
00162 
00163         /*
00164          * Prepend the new buffer to the list of buffers on this level. It's
00165          * not arbitrary that the new buffer is put to the beginning of the
00166          * list: in the final emptying phase we loop through all buffers at
00167          * each level, and flush them. If a page is split during the emptying,
00168          * it's more efficient to flush the new splitted pages first, before
00169          * moving on to pre-existing pages on the level. The buffers just
00170          * created during the page split are likely still in cache, so
00171          * flushing them immediately is more efficient than putting them to
00172          * the end of the queue.
00173          */
00174         gfbb->buffersOnLevels[level] = lcons(nodeBuffer,
00175                                              gfbb->buffersOnLevels[level]);
00176 
00177         MemoryContextSwitchTo(oldcxt);
00178     }
00179 
00180     return nodeBuffer;
00181 }
00182 
00183 /*
00184  * Allocate memory for a buffer page.
00185  */
00186 static GISTNodeBufferPage *
00187 gistAllocateNewPageBuffer(GISTBuildBuffers *gfbb)
00188 {
00189     GISTNodeBufferPage *pageBuffer;
00190 
00191     pageBuffer = (GISTNodeBufferPage *) MemoryContextAlloc(gfbb->context,
00192                                                            BLCKSZ);
00193     pageBuffer->prev = InvalidBlockNumber;
00194 
00195     /* Set page free space */
00196     PAGE_FREE_SPACE(pageBuffer) = BLCKSZ - BUFFER_PAGE_DATA_OFFSET;
00197     return pageBuffer;
00198 }
00199 
00200 /*
00201  * Add specified buffer into loadedBuffers array.
00202  */
00203 static void
00204 gistAddLoadedBuffer(GISTBuildBuffers *gfbb, GISTNodeBuffer *nodeBuffer)
00205 {
00206     /* Never add a temporary buffer to the array */
00207     if (nodeBuffer->isTemp)
00208         return;
00209 
00210     /* Enlarge the array if needed */
00211     if (gfbb->loadedBuffersCount >= gfbb->loadedBuffersLen)
00212     {
00213         gfbb->loadedBuffersLen *= 2;
00214         gfbb->loadedBuffers = (GISTNodeBuffer **)
00215             repalloc(gfbb->loadedBuffers,
00216                      gfbb->loadedBuffersLen * sizeof(GISTNodeBuffer *));
00217     }
00218 
00219     gfbb->loadedBuffers[gfbb->loadedBuffersCount] = nodeBuffer;
00220     gfbb->loadedBuffersCount++;
00221 }
00222 
00223 /*
00224  * Load last page of node buffer into main memory.
00225  */
00226 static void
00227 gistLoadNodeBuffer(GISTBuildBuffers *gfbb, GISTNodeBuffer *nodeBuffer)
00228 {
00229     /* Check if we really should load something */
00230     if (!nodeBuffer->pageBuffer && nodeBuffer->blocksCount > 0)
00231     {
00232         /* Allocate memory for page */
00233         nodeBuffer->pageBuffer = gistAllocateNewPageBuffer(gfbb);
00234 
00235         /* Read block from temporary file */
00236         ReadTempFileBlock(gfbb->pfile, nodeBuffer->pageBlocknum,
00237                           nodeBuffer->pageBuffer);
00238 
00239         /* Mark file block as free */
00240         gistBuffersReleaseBlock(gfbb, nodeBuffer->pageBlocknum);
00241 
00242         /* Mark node buffer as loaded */
00243         gistAddLoadedBuffer(gfbb, nodeBuffer);
00244         nodeBuffer->pageBlocknum = InvalidBlockNumber;
00245     }
00246 }
00247 
00248 /*
00249  * Write last page of node buffer to the disk.
00250  */
00251 static void
00252 gistUnloadNodeBuffer(GISTBuildBuffers *gfbb, GISTNodeBuffer *nodeBuffer)
00253 {
00254     /* Check if we have something to write */
00255     if (nodeBuffer->pageBuffer)
00256     {
00257         BlockNumber blkno;
00258 
00259         /* Get free file block */
00260         blkno = gistBuffersGetFreeBlock(gfbb);
00261 
00262         /* Write block to the temporary file */
00263         WriteTempFileBlock(gfbb->pfile, blkno, nodeBuffer->pageBuffer);
00264 
00265         /* Free memory of that page */
00266         pfree(nodeBuffer->pageBuffer);
00267         nodeBuffer->pageBuffer = NULL;
00268 
00269         /* Save block number */
00270         nodeBuffer->pageBlocknum = blkno;
00271     }
00272 }
00273 
00274 /*
00275  * Write last pages of all node buffers to the disk.
00276  */
00277 void
00278 gistUnloadNodeBuffers(GISTBuildBuffers *gfbb)
00279 {
00280     int         i;
00281 
00282     /* Unload all the buffers that have a page loaded in memory. */
00283     for (i = 0; i < gfbb->loadedBuffersCount; i++)
00284         gistUnloadNodeBuffer(gfbb, gfbb->loadedBuffers[i]);
00285 
00286     /* Now there are no node buffers with loaded last page */
00287     gfbb->loadedBuffersCount = 0;
00288 }
00289 
00290 /*
00291  * Add index tuple to buffer page.
00292  */
00293 static void
00294 gistPlaceItupToPage(GISTNodeBufferPage *pageBuffer, IndexTuple itup)
00295 {
00296     Size        itupsz = IndexTupleSize(itup);
00297     char       *ptr;
00298 
00299     /* There should be enough of space. */
00300     Assert(PAGE_FREE_SPACE(pageBuffer) >= MAXALIGN(itupsz));
00301 
00302     /* Reduce free space value of page to reserve a spot for the tuple. */
00303     PAGE_FREE_SPACE(pageBuffer) -= MAXALIGN(itupsz);
00304 
00305     /* Get pointer to the spot we reserved (ie. end of free space). */
00306     ptr = (char *) pageBuffer + BUFFER_PAGE_DATA_OFFSET
00307         + PAGE_FREE_SPACE(pageBuffer);
00308 
00309     /* Copy the index tuple there. */
00310     memcpy(ptr, itup, itupsz);
00311 }
00312 
00313 /*
00314  * Get last item from buffer page and remove it from page.
00315  */
00316 static void
00317 gistGetItupFromPage(GISTNodeBufferPage *pageBuffer, IndexTuple *itup)
00318 {
00319     IndexTuple  ptr;
00320     Size        itupsz;
00321 
00322     Assert(!PAGE_IS_EMPTY(pageBuffer)); /* Page shouldn't be empty */
00323 
00324     /* Get pointer to last index tuple */
00325     ptr = (IndexTuple) ((char *) pageBuffer
00326                         + BUFFER_PAGE_DATA_OFFSET
00327                         + PAGE_FREE_SPACE(pageBuffer));
00328     itupsz = IndexTupleSize(ptr);
00329 
00330     /* Make a copy of the tuple */
00331     *itup = (IndexTuple) palloc(itupsz);
00332     memcpy(*itup, ptr, itupsz);
00333 
00334     /* Mark the space used by the tuple as free */
00335     PAGE_FREE_SPACE(pageBuffer) += MAXALIGN(itupsz);
00336 }
00337 
00338 /*
00339  * Push an index tuple to node buffer.
00340  */
00341 void
00342 gistPushItupToNodeBuffer(GISTBuildBuffers *gfbb, GISTNodeBuffer *nodeBuffer,
00343                          IndexTuple itup)
00344 {
00345     /*
00346      * Most part of memory operations will be in buffering build persistent
00347      * context. So, let's switch to it.
00348      */
00349     MemoryContext oldcxt = MemoryContextSwitchTo(gfbb->context);
00350 
00351     /*
00352      * If the buffer is currently empty, create the first page.
00353      */
00354     if (nodeBuffer->blocksCount == 0)
00355     {
00356         nodeBuffer->pageBuffer = gistAllocateNewPageBuffer(gfbb);
00357         nodeBuffer->blocksCount = 1;
00358         gistAddLoadedBuffer(gfbb, nodeBuffer);
00359     }
00360 
00361     /* Load last page of node buffer if it wasn't in memory already */
00362     if (!nodeBuffer->pageBuffer)
00363         gistLoadNodeBuffer(gfbb, nodeBuffer);
00364 
00365     /*
00366      * Check if there is enough space on the last page for the tuple.
00367      */
00368     if (PAGE_NO_SPACE(nodeBuffer->pageBuffer, itup))
00369     {
00370         /*
00371          * Nope. Swap previous block to disk and allocate a new one.
00372          */
00373         BlockNumber blkno;
00374 
00375         /* Write filled page to the disk */
00376         blkno = gistBuffersGetFreeBlock(gfbb);
00377         WriteTempFileBlock(gfbb->pfile, blkno, nodeBuffer->pageBuffer);
00378 
00379         /*
00380          * Reset the in-memory page as empty, and link the previous block to
00381          * the new page by storing its block number in the prev-link.
00382          */
00383         PAGE_FREE_SPACE(nodeBuffer->pageBuffer) =
00384             BLCKSZ - MAXALIGN(offsetof(GISTNodeBufferPage, tupledata));
00385         nodeBuffer->pageBuffer->prev = blkno;
00386 
00387         /* We've just added one more page */
00388         nodeBuffer->blocksCount++;
00389     }
00390 
00391     gistPlaceItupToPage(nodeBuffer->pageBuffer, itup);
00392 
00393     /*
00394      * If the buffer just overflowed, add it to the emptying queue.
00395      */
00396     if (BUFFER_HALF_FILLED(nodeBuffer, gfbb) && !nodeBuffer->queuedForEmptying)
00397     {
00398         gfbb->bufferEmptyingQueue = lcons(nodeBuffer,
00399                                           gfbb->bufferEmptyingQueue);
00400         nodeBuffer->queuedForEmptying = true;
00401     }
00402 
00403     /* Restore memory context */
00404     MemoryContextSwitchTo(oldcxt);
00405 }
00406 
00407 /*
00408  * Removes one index tuple from node buffer. Returns true if success and false
00409  * if node buffer is empty.
00410  */
00411 bool
00412 gistPopItupFromNodeBuffer(GISTBuildBuffers *gfbb, GISTNodeBuffer *nodeBuffer,
00413                           IndexTuple *itup)
00414 {
00415     /*
00416      * If node buffer is empty then return false.
00417      */
00418     if (nodeBuffer->blocksCount <= 0)
00419         return false;
00420 
00421     /* Load last page of node buffer if needed */
00422     if (!nodeBuffer->pageBuffer)
00423         gistLoadNodeBuffer(gfbb, nodeBuffer);
00424 
00425     /*
00426      * Get index tuple from last non-empty page.
00427      */
00428     gistGetItupFromPage(nodeBuffer->pageBuffer, itup);
00429 
00430     /*
00431      * If we just removed the last tuple from the page, fetch previous page on
00432      * this node buffer (if any).
00433      */
00434     if (PAGE_IS_EMPTY(nodeBuffer->pageBuffer))
00435     {
00436         BlockNumber prevblkno;
00437 
00438         /*
00439          * blocksCount includes the page in pageBuffer, so decrease it now.
00440          */
00441         nodeBuffer->blocksCount--;
00442 
00443         /*
00444          * If there's more pages, fetch previous one.
00445          */
00446         prevblkno = nodeBuffer->pageBuffer->prev;
00447         if (prevblkno != InvalidBlockNumber)
00448         {
00449             /* There is a previous page. Fetch it. */
00450             Assert(nodeBuffer->blocksCount > 0);
00451             ReadTempFileBlock(gfbb->pfile, prevblkno, nodeBuffer->pageBuffer);
00452 
00453             /*
00454              * Now that we've read the block in memory, we can release its
00455              * on-disk block for reuse.
00456              */
00457             gistBuffersReleaseBlock(gfbb, prevblkno);
00458         }
00459         else
00460         {
00461             /* No more pages. Free memory. */
00462             Assert(nodeBuffer->blocksCount == 0);
00463             pfree(nodeBuffer->pageBuffer);
00464             nodeBuffer->pageBuffer = NULL;
00465         }
00466     }
00467     return true;
00468 }
00469 
00470 /*
00471  * Select a currently unused block for writing to.
00472  */
00473 static long
00474 gistBuffersGetFreeBlock(GISTBuildBuffers *gfbb)
00475 {
00476     /*
00477      * If there are multiple free blocks, we select the one appearing last in
00478      * freeBlocks[].  If there are none, assign the next block at the end of
00479      * the file (causing the file to be extended).
00480      */
00481     if (gfbb->nFreeBlocks > 0)
00482         return gfbb->freeBlocks[--gfbb->nFreeBlocks];
00483     else
00484         return gfbb->nFileBlocks++;
00485 }
00486 
00487 /*
00488  * Return a block# to the freelist.
00489  */
00490 static void
00491 gistBuffersReleaseBlock(GISTBuildBuffers *gfbb, long blocknum)
00492 {
00493     int         ndx;
00494 
00495     /* Enlarge freeBlocks array if full. */
00496     if (gfbb->nFreeBlocks >= gfbb->freeBlocksLen)
00497     {
00498         gfbb->freeBlocksLen *= 2;
00499         gfbb->freeBlocks = (long *) repalloc(gfbb->freeBlocks,
00500                                              gfbb->freeBlocksLen *
00501                                              sizeof(long));
00502     }
00503 
00504     /* Add blocknum to array */
00505     ndx = gfbb->nFreeBlocks++;
00506     gfbb->freeBlocks[ndx] = blocknum;
00507 }
00508 
00509 /*
00510  * Free buffering build data structure.
00511  */
00512 void
00513 gistFreeBuildBuffers(GISTBuildBuffers *gfbb)
00514 {
00515     /* Close buffers file. */
00516     BufFileClose(gfbb->pfile);
00517 
00518     /* All other things will be freed on memory context release */
00519 }
00520 
00521 /*
00522  * Data structure representing information about node buffer for index tuples
00523  * relocation from splitted node buffer.
00524  */
00525 typedef struct
00526 {
00527     GISTENTRY   entry[INDEX_MAX_KEYS];
00528     bool        isnull[INDEX_MAX_KEYS];
00529     GISTPageSplitInfo *splitinfo;
00530     GISTNodeBuffer *nodeBuffer;
00531 } RelocationBufferInfo;
00532 
00533 /*
00534  * At page split, distribute tuples from the buffer of the split page to
00535  * new buffers for the created page halves. This also adjusts the downlinks
00536  * in 'splitinfo' to include the tuples in the buffers.
00537  */
00538 void
00539 gistRelocateBuildBuffersOnSplit(GISTBuildBuffers *gfbb, GISTSTATE *giststate,
00540                                 Relation r, int level,
00541                                 Buffer buffer, List *splitinfo)
00542 {
00543     RelocationBufferInfo *relocationBuffersInfos;
00544     bool        found;
00545     GISTNodeBuffer *nodeBuffer;
00546     BlockNumber blocknum;
00547     IndexTuple  itup;
00548     int         splitPagesCount = 0,
00549                 i;
00550     GISTENTRY   entry[INDEX_MAX_KEYS];
00551     bool        isnull[INDEX_MAX_KEYS];
00552     GISTNodeBuffer oldBuf;
00553     ListCell   *lc;
00554 
00555     /* If the splitted page doesn't have buffers, we have nothing to do. */
00556     if (!LEVEL_HAS_BUFFERS(level, gfbb))
00557         return;
00558 
00559     /*
00560      * Get the node buffer of the splitted page.
00561      */
00562     blocknum = BufferGetBlockNumber(buffer);
00563     nodeBuffer = hash_search(gfbb->nodeBuffersTab, &blocknum,
00564                              HASH_FIND, &found);
00565     if (!found)
00566     {
00567         /* The page has no buffer, so we have nothing to do. */
00568         return;
00569     }
00570 
00571     /*
00572      * Make a copy of the old buffer, as we're going reuse it as the buffer
00573      * for the new left page, which is on the same block as the old page.
00574      * That's not true for the root page, but that's fine because we never
00575      * have a buffer on the root page anyway. The original algorithm as
00576      * described by Arge et al did, but it's of no use, as you might as well
00577      * read the tuples straight from the heap instead of the root buffer.
00578      */
00579     Assert(blocknum != GIST_ROOT_BLKNO);
00580     memcpy(&oldBuf, nodeBuffer, sizeof(GISTNodeBuffer));
00581     oldBuf.isTemp = true;
00582 
00583     /* Reset the old buffer, used for the new left page from now on */
00584     nodeBuffer->blocksCount = 0;
00585     nodeBuffer->pageBuffer = NULL;
00586     nodeBuffer->pageBlocknum = InvalidBlockNumber;
00587 
00588     /*
00589      * Allocate memory for information about relocation buffers.
00590      */
00591     splitPagesCount = list_length(splitinfo);
00592     relocationBuffersInfos =
00593         (RelocationBufferInfo *) palloc(sizeof(RelocationBufferInfo) *
00594                                         splitPagesCount);
00595 
00596     /*
00597      * Fill relocation buffers information for node buffers of pages produced
00598      * by split.
00599      */
00600     i = 0;
00601     foreach(lc, splitinfo)
00602     {
00603         GISTPageSplitInfo *si = (GISTPageSplitInfo *) lfirst(lc);
00604         GISTNodeBuffer *newNodeBuffer;
00605 
00606         /* Decompress parent index tuple of node buffer page. */
00607         gistDeCompressAtt(giststate, r,
00608                           si->downlink, NULL, (OffsetNumber) 0,
00609                           relocationBuffersInfos[i].entry,
00610                           relocationBuffersInfos[i].isnull);
00611 
00612         /*
00613          * Create a node buffer for the page. The leftmost half is on the same
00614          * block as the old page before split, so for the leftmost half this
00615          * will return the original buffer. The tuples on the original buffer
00616          * were relinked to the temporary buffer, so the original one is now
00617          * empty.
00618          */
00619         newNodeBuffer = gistGetNodeBuffer(gfbb, giststate, BufferGetBlockNumber(si->buf), level);
00620 
00621         relocationBuffersInfos[i].nodeBuffer = newNodeBuffer;
00622         relocationBuffersInfos[i].splitinfo = si;
00623 
00624         i++;
00625     }
00626 
00627     /*
00628      * Loop through all index tuples in the buffer of the page being split,
00629      * moving them to buffers for the new pages.  We try to move each tuple to
00630      * the page that will result in the lowest penalty for the leading column
00631      * or, in the case of a tie, the lowest penalty for the earliest column
00632      * that is not tied.
00633      *
00634      * The page searching logic is very similar to gistchoose().
00635      */
00636     while (gistPopItupFromNodeBuffer(gfbb, &oldBuf, &itup))
00637     {
00638         float       best_penalty[INDEX_MAX_KEYS];
00639         int         i,
00640                     which;
00641         IndexTuple  newtup;
00642         RelocationBufferInfo *targetBufferInfo;
00643 
00644         gistDeCompressAtt(giststate, r,
00645                           itup, NULL, (OffsetNumber) 0, entry, isnull);
00646 
00647         /* default to using first page (shouldn't matter) */
00648         which = 0;
00649 
00650         /*
00651          * best_penalty[j] is the best penalty we have seen so far for column
00652          * j, or -1 when we haven't yet examined column j.  Array entries to
00653          * the right of the first -1 are undefined.
00654          */
00655         best_penalty[0] = -1;
00656 
00657         /*
00658          * Loop over possible target pages, looking for one to move this tuple
00659          * to.
00660          */
00661         for (i = 0; i < splitPagesCount; i++)
00662         {
00663             RelocationBufferInfo *splitPageInfo = &relocationBuffersInfos[i];
00664             bool        zero_penalty;
00665             int         j;
00666 
00667             zero_penalty = true;
00668 
00669             /* Loop over index attributes. */
00670             for (j = 0; j < r->rd_att->natts; j++)
00671             {
00672                 float       usize;
00673 
00674                 /* Compute penalty for this column. */
00675                 usize = gistpenalty(giststate, j,
00676                                     &splitPageInfo->entry[j],
00677                                     splitPageInfo->isnull[j],
00678                                     &entry[j], isnull[j]);
00679                 if (usize > 0)
00680                     zero_penalty = false;
00681 
00682                 if (best_penalty[j] < 0 || usize < best_penalty[j])
00683                 {
00684                     /*
00685                      * New best penalty for column.  Tentatively select this
00686                      * page as the target, and record the best penalty.  Then
00687                      * reset the next column's penalty to "unknown" (and
00688                      * indirectly, the same for all the ones to its right).
00689                      * This will force us to adopt this page's penalty values
00690                      * as the best for all the remaining columns during
00691                      * subsequent loop iterations.
00692                      */
00693                     which = i;
00694                     best_penalty[j] = usize;
00695 
00696                     if (j < r->rd_att->natts - 1)
00697                         best_penalty[j + 1] = -1;
00698                 }
00699                 else if (best_penalty[j] == usize)
00700                 {
00701                     /*
00702                      * The current page is exactly as good for this column as
00703                      * the best page seen so far.  The next iteration of this
00704                      * loop will compare the next column.
00705                      */
00706                 }
00707                 else
00708                 {
00709                     /*
00710                      * The current page is worse for this column than the best
00711                      * page seen so far.  Skip the remaining columns and move
00712                      * on to the next page, if any.
00713                      */
00714                     zero_penalty = false;       /* so outer loop won't exit */
00715                     break;
00716                 }
00717             }
00718 
00719             /*
00720              * If we find a page with zero penalty for all columns, there's no
00721              * need to examine remaining pages; just break out of the loop and
00722              * return it.
00723              */
00724             if (zero_penalty)
00725                 break;
00726         }
00727 
00728         /* OK, "which" is the page index to push the tuple to */
00729         targetBufferInfo = &relocationBuffersInfos[which];
00730 
00731         /* Push item to selected node buffer */
00732         gistPushItupToNodeBuffer(gfbb, targetBufferInfo->nodeBuffer, itup);
00733 
00734         /* Adjust the downlink for this page, if needed. */
00735         newtup = gistgetadjusted(r, targetBufferInfo->splitinfo->downlink,
00736                                  itup, giststate);
00737         if (newtup)
00738         {
00739             gistDeCompressAtt(giststate, r,
00740                               newtup, NULL, (OffsetNumber) 0,
00741                               targetBufferInfo->entry,
00742                               targetBufferInfo->isnull);
00743 
00744             targetBufferInfo->splitinfo->downlink = newtup;
00745         }
00746     }
00747 
00748     pfree(relocationBuffersInfos);
00749 }
00750 
00751 
00752 /*
00753  * Wrappers around BufFile operations. The main difference is that these
00754  * wrappers report errors with ereport(), so that the callers don't need
00755  * to check the return code.
00756  */
00757 
00758 static void
00759 ReadTempFileBlock(BufFile *file, long blknum, void *ptr)
00760 {
00761     if (BufFileSeekBlock(file, blknum) != 0)
00762         elog(ERROR, "could not seek temporary file: %m");
00763     if (BufFileRead(file, ptr, BLCKSZ) != BLCKSZ)
00764         elog(ERROR, "could not read temporary file: %m");
00765 }
00766 
00767 static void
00768 WriteTempFileBlock(BufFile *file, long blknum, void *ptr)
00769 {
00770     if (BufFileSeekBlock(file, blknum) != 0)
00771         elog(ERROR, "could not seek temporary file: %m");
00772     if (BufFileWrite(file, ptr, BLCKSZ) != BLCKSZ)
00773     {
00774         /*
00775          * the other errors in Read/WriteTempFileBlock shouldn't happen, but
00776          * an error at write can easily happen if you run out of disk space.
00777          */
00778         ereport(ERROR,
00779                 (errcode_for_file_access(),
00780                  errmsg("could not write block %ld of temporary file: %m",
00781                         blknum)));
00782     }
00783 }