Header And Logo

PostgreSQL
| The world's most advanced open source database.

ginfast.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * ginfast.c
00004  *    Fast insert routines for the Postgres inverted index access method.
00005  *    Pending entries are stored in linear list of pages.  Later on
00006  *    (typically during VACUUM), ginInsertCleanup() will be invoked to
00007  *    transfer pending entries into the regular index structure.  This
00008  *    wins because bulk insertion is much more efficient than retail.
00009  *
00010  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00011  * Portions Copyright (c) 1994, Regents of the University of California
00012  *
00013  * IDENTIFICATION
00014  *          src/backend/access/gin/ginfast.c
00015  *
00016  *-------------------------------------------------------------------------
00017  */
00018 
00019 #include "postgres.h"
00020 
00021 #include "access/gin_private.h"
00022 #include "commands/vacuum.h"
00023 #include "miscadmin.h"
00024 #include "utils/memutils.h"
00025 #include "utils/rel.h"
00026 
00027 
00028 #define GIN_PAGE_FREESIZE \
00029     ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
00030 
00031 typedef struct KeyArray
00032 {
00033     Datum      *keys;           /* expansible array */
00034     GinNullCategory *categories;    /* another expansible array */
00035     int32       nvalues;        /* current number of valid entries */
00036     int32       maxvalues;      /* allocated size of arrays */
00037 } KeyArray;
00038 
00039 
00040 /*
00041  * Build a pending-list page from the given array of tuples, and write it out.
00042  *
00043  * Returns amount of free space left on the page.
00044  */
00045 static int32
00046 writeListPage(Relation index, Buffer buffer,
00047               IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
00048 {
00049     Page        page = BufferGetPage(buffer);
00050     int32       i,
00051                 freesize,
00052                 size = 0;
00053     OffsetNumber l,
00054                 off;
00055     char       *workspace;
00056     char       *ptr;
00057 
00058     /* workspace could be a local array; we use palloc for alignment */
00059     workspace = palloc(BLCKSZ);
00060 
00061     START_CRIT_SECTION();
00062 
00063     GinInitBuffer(buffer, GIN_LIST);
00064 
00065     off = FirstOffsetNumber;
00066     ptr = workspace;
00067 
00068     for (i = 0; i < ntuples; i++)
00069     {
00070         int         this_size = IndexTupleSize(tuples[i]);
00071 
00072         memcpy(ptr, tuples[i], this_size);
00073         ptr += this_size;
00074         size += this_size;
00075 
00076         l = PageAddItem(page, (Item) tuples[i], this_size, off, false, false);
00077 
00078         if (l == InvalidOffsetNumber)
00079             elog(ERROR, "failed to add item to index page in \"%s\"",
00080                  RelationGetRelationName(index));
00081 
00082         off++;
00083     }
00084 
00085     Assert(size <= BLCKSZ);     /* else we overran workspace */
00086 
00087     GinPageGetOpaque(page)->rightlink = rightlink;
00088 
00089     /*
00090      * tail page may contain only whole row(s) or final part of row placed on
00091      * previous pages (a "row" here meaning all the index tuples generated for
00092      * one heap tuple)
00093      */
00094     if (rightlink == InvalidBlockNumber)
00095     {
00096         GinPageSetFullRow(page);
00097         GinPageGetOpaque(page)->maxoff = 1;
00098     }
00099     else
00100     {
00101         GinPageGetOpaque(page)->maxoff = 0;
00102     }
00103 
00104     MarkBufferDirty(buffer);
00105 
00106     if (RelationNeedsWAL(index))
00107     {
00108         XLogRecData rdata[2];
00109         ginxlogInsertListPage data;
00110         XLogRecPtr  recptr;
00111 
00112         data.node = index->rd_node;
00113         data.blkno = BufferGetBlockNumber(buffer);
00114         data.rightlink = rightlink;
00115         data.ntuples = ntuples;
00116 
00117         rdata[0].buffer = InvalidBuffer;
00118         rdata[0].data = (char *) &data;
00119         rdata[0].len = sizeof(ginxlogInsertListPage);
00120         rdata[0].next = rdata + 1;
00121 
00122         rdata[1].buffer = buffer;
00123         rdata[1].buffer_std = true;
00124         rdata[1].data = workspace;
00125         rdata[1].len = size;
00126         rdata[1].next = NULL;
00127 
00128         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE, rdata);
00129         PageSetLSN(page, recptr);
00130     }
00131 
00132     /* get free space before releasing buffer */
00133     freesize = PageGetExactFreeSpace(page);
00134 
00135     UnlockReleaseBuffer(buffer);
00136 
00137     END_CRIT_SECTION();
00138 
00139     pfree(workspace);
00140 
00141     return freesize;
00142 }
00143 
00144 static void
00145 makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
00146             GinMetaPageData *res)
00147 {
00148     Buffer      curBuffer = InvalidBuffer;
00149     Buffer      prevBuffer = InvalidBuffer;
00150     int         i,
00151                 size = 0,
00152                 tupsize;
00153     int         startTuple = 0;
00154 
00155     Assert(ntuples > 0);
00156 
00157     /*
00158      * Split tuples into pages
00159      */
00160     for (i = 0; i < ntuples; i++)
00161     {
00162         if (curBuffer == InvalidBuffer)
00163         {
00164             curBuffer = GinNewBuffer(index);
00165 
00166             if (prevBuffer != InvalidBuffer)
00167             {
00168                 res->nPendingPages++;
00169                 writeListPage(index, prevBuffer,
00170                               tuples + startTuple,
00171                               i - startTuple,
00172                               BufferGetBlockNumber(curBuffer));
00173             }
00174             else
00175             {
00176                 res->head = BufferGetBlockNumber(curBuffer);
00177             }
00178 
00179             prevBuffer = curBuffer;
00180             startTuple = i;
00181             size = 0;
00182         }
00183 
00184         tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
00185 
00186         if (size + tupsize > GinListPageSize)
00187         {
00188             /* won't fit, force a new page and reprocess */
00189             i--;
00190             curBuffer = InvalidBuffer;
00191         }
00192         else
00193         {
00194             size += tupsize;
00195         }
00196     }
00197 
00198     /*
00199      * Write last page
00200      */
00201     res->tail = BufferGetBlockNumber(curBuffer);
00202     res->tailFreeSize = writeListPage(index, curBuffer,
00203                                       tuples + startTuple,
00204                                       ntuples - startTuple,
00205                                       InvalidBlockNumber);
00206     res->nPendingPages++;
00207     /* that was only one heap tuple */
00208     res->nPendingHeapTuples = 1;
00209 }
00210 
00211 /*
00212  * Write the index tuples contained in *collector into the index's
00213  * pending list.
00214  *
00215  * Function guarantees that all these tuples will be inserted consecutively,
00216  * preserving order
00217  */
00218 void
00219 ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
00220 {
00221     Relation    index = ginstate->index;
00222     Buffer      metabuffer;
00223     Page        metapage;
00224     GinMetaPageData *metadata = NULL;
00225     XLogRecData rdata[2];
00226     Buffer      buffer = InvalidBuffer;
00227     Page        page = NULL;
00228     ginxlogUpdateMeta data;
00229     bool        separateList = false;
00230     bool        needCleanup = false;
00231 
00232     if (collector->ntuples == 0)
00233         return;
00234 
00235     data.node = index->rd_node;
00236     data.ntuples = 0;
00237     data.newRightlink = data.prevTail = InvalidBlockNumber;
00238 
00239     rdata[0].buffer = InvalidBuffer;
00240     rdata[0].data = (char *) &data;
00241     rdata[0].len = sizeof(ginxlogUpdateMeta);
00242     rdata[0].next = NULL;
00243 
00244     metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
00245     metapage = BufferGetPage(metabuffer);
00246 
00247     if (collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GinListPageSize)
00248     {
00249         /*
00250          * Total size is greater than one page => make sublist
00251          */
00252         separateList = true;
00253     }
00254     else
00255     {
00256         LockBuffer(metabuffer, GIN_EXCLUSIVE);
00257         metadata = GinPageGetMeta(metapage);
00258 
00259         if (metadata->head == InvalidBlockNumber ||
00260             collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize)
00261         {
00262             /*
00263              * Pending list is empty or total size is greater than freespace
00264              * on tail page => make sublist
00265              *
00266              * We unlock metabuffer to keep high concurrency
00267              */
00268             separateList = true;
00269             LockBuffer(metabuffer, GIN_UNLOCK);
00270         }
00271     }
00272 
00273     if (separateList)
00274     {
00275         /*
00276          * We should make sublist separately and append it to the tail
00277          */
00278         GinMetaPageData sublist;
00279 
00280         memset(&sublist, 0, sizeof(GinMetaPageData));
00281         makeSublist(index, collector->tuples, collector->ntuples, &sublist);
00282 
00283         /*
00284          * metapage was unlocked, see above
00285          */
00286         LockBuffer(metabuffer, GIN_EXCLUSIVE);
00287         metadata = GinPageGetMeta(metapage);
00288 
00289         if (metadata->head == InvalidBlockNumber)
00290         {
00291             /*
00292              * Main list is empty, so just insert sublist as main list
00293              */
00294             START_CRIT_SECTION();
00295 
00296             metadata->head = sublist.head;
00297             metadata->tail = sublist.tail;
00298             metadata->tailFreeSize = sublist.tailFreeSize;
00299 
00300             metadata->nPendingPages = sublist.nPendingPages;
00301             metadata->nPendingHeapTuples = sublist.nPendingHeapTuples;
00302         }
00303         else
00304         {
00305             /*
00306              * Merge lists
00307              */
00308             data.prevTail = metadata->tail;
00309             data.newRightlink = sublist.head;
00310 
00311             buffer = ReadBuffer(index, metadata->tail);
00312             LockBuffer(buffer, GIN_EXCLUSIVE);
00313             page = BufferGetPage(buffer);
00314 
00315             rdata[0].next = rdata + 1;
00316 
00317             rdata[1].buffer = buffer;
00318             rdata[1].buffer_std = true;
00319             rdata[1].data = NULL;
00320             rdata[1].len = 0;
00321             rdata[1].next = NULL;
00322 
00323             Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
00324 
00325             START_CRIT_SECTION();
00326 
00327             GinPageGetOpaque(page)->rightlink = sublist.head;
00328 
00329             MarkBufferDirty(buffer);
00330 
00331             metadata->tail = sublist.tail;
00332             metadata->tailFreeSize = sublist.tailFreeSize;
00333 
00334             metadata->nPendingPages += sublist.nPendingPages;
00335             metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
00336         }
00337     }
00338     else
00339     {
00340         /*
00341          * Insert into tail page.  Metapage is already locked
00342          */
00343         OffsetNumber l,
00344                     off;
00345         int         i,
00346                     tupsize;
00347         char       *ptr;
00348 
00349         buffer = ReadBuffer(index, metadata->tail);
00350         LockBuffer(buffer, GIN_EXCLUSIVE);
00351         page = BufferGetPage(buffer);
00352 
00353         off = (PageIsEmpty(page)) ? FirstOffsetNumber :
00354             OffsetNumberNext(PageGetMaxOffsetNumber(page));
00355 
00356         rdata[0].next = rdata + 1;
00357 
00358         rdata[1].buffer = buffer;
00359         rdata[1].buffer_std = true;
00360         ptr = rdata[1].data = (char *) palloc(collector->sumsize);
00361         rdata[1].len = collector->sumsize;
00362         rdata[1].next = NULL;
00363 
00364         data.ntuples = collector->ntuples;
00365 
00366         START_CRIT_SECTION();
00367 
00368         /*
00369          * Increase counter of heap tuples
00370          */
00371         Assert(GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples);
00372         GinPageGetOpaque(page)->maxoff++;
00373         metadata->nPendingHeapTuples++;
00374 
00375         for (i = 0; i < collector->ntuples; i++)
00376         {
00377             tupsize = IndexTupleSize(collector->tuples[i]);
00378             l = PageAddItem(page, (Item) collector->tuples[i], tupsize, off, false, false);
00379 
00380             if (l == InvalidOffsetNumber)
00381                 elog(ERROR, "failed to add item to index page in \"%s\"",
00382                      RelationGetRelationName(index));
00383 
00384             memcpy(ptr, collector->tuples[i], tupsize);
00385             ptr += tupsize;
00386 
00387             off++;
00388         }
00389 
00390         Assert((ptr - rdata[1].data) <= collector->sumsize);
00391 
00392         metadata->tailFreeSize = PageGetExactFreeSpace(page);
00393 
00394         MarkBufferDirty(buffer);
00395     }
00396 
00397     /*
00398      * Write metabuffer, make xlog entry
00399      */
00400     MarkBufferDirty(metabuffer);
00401 
00402     if (RelationNeedsWAL(index))
00403     {
00404         XLogRecPtr  recptr;
00405 
00406         memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
00407 
00408         recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE, rdata);
00409         PageSetLSN(metapage, recptr);
00410 
00411         if (buffer != InvalidBuffer)
00412         {
00413             PageSetLSN(page, recptr);
00414         }
00415     }
00416 
00417     if (buffer != InvalidBuffer)
00418         UnlockReleaseBuffer(buffer);
00419 
00420     /*
00421      * Force pending list cleanup when it becomes too long. And,
00422      * ginInsertCleanup could take significant amount of time, so we prefer to
00423      * call it when it can do all the work in a single collection cycle. In
00424      * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it
00425      * while pending list is still small enough to fit into work_mem.
00426      *
00427      * ginInsertCleanup() should not be called inside our CRIT_SECTION.
00428      */
00429     if (metadata->nPendingPages * GIN_PAGE_FREESIZE > work_mem * 1024L)
00430         needCleanup = true;
00431 
00432     UnlockReleaseBuffer(metabuffer);
00433 
00434     END_CRIT_SECTION();
00435 
00436     if (needCleanup)
00437         ginInsertCleanup(ginstate, false, NULL);
00438 }
00439 
00440 /*
00441  * Create temporary index tuples for a single indexable item (one index column
00442  * for the heap tuple specified by ht_ctid), and append them to the array
00443  * in *collector.  They will subsequently be written out using
00444  * ginHeapTupleFastInsert.  Note that to guarantee consistent state, all
00445  * temp tuples for a given heap tuple must be written in one call to
00446  * ginHeapTupleFastInsert.
00447  */
00448 void
00449 ginHeapTupleFastCollect(GinState *ginstate,
00450                         GinTupleCollector *collector,
00451                         OffsetNumber attnum, Datum value, bool isNull,
00452                         ItemPointer ht_ctid)
00453 {
00454     Datum      *entries;
00455     GinNullCategory *categories;
00456     int32       i,
00457                 nentries;
00458 
00459     /*
00460      * Extract the key values that need to be inserted in the index
00461      */
00462     entries = ginExtractEntries(ginstate, attnum, value, isNull,
00463                                 &nentries, &categories);
00464 
00465     /*
00466      * Allocate/reallocate memory for storing collected tuples
00467      */
00468     if (collector->tuples == NULL)
00469     {
00470         collector->lentuples = nentries * ginstate->origTupdesc->natts;
00471         collector->tuples = (IndexTuple *) palloc(sizeof(IndexTuple) * collector->lentuples);
00472     }
00473 
00474     while (collector->ntuples + nentries > collector->lentuples)
00475     {
00476         collector->lentuples *= 2;
00477         collector->tuples = (IndexTuple *) repalloc(collector->tuples,
00478                                   sizeof(IndexTuple) * collector->lentuples);
00479     }
00480 
00481     /*
00482      * Build an index tuple for each key value, and add to array.  In pending
00483      * tuples we just stick the heap TID into t_tid.
00484      */
00485     for (i = 0; i < nentries; i++)
00486     {
00487         IndexTuple  itup;
00488 
00489         itup = GinFormTuple(ginstate, attnum, entries[i], categories[i],
00490                             NULL, 0, true);
00491         itup->t_tid = *ht_ctid;
00492         collector->tuples[collector->ntuples++] = itup;
00493         collector->sumsize += IndexTupleSize(itup);
00494     }
00495 }
00496 
00497 /*
00498  * Deletes pending list pages up to (not including) newHead page.
00499  * If newHead == InvalidBlockNumber then function drops the whole list.
00500  *
00501  * metapage is pinned and exclusive-locked throughout this function.
00502  *
00503  * Returns true if another cleanup process is running concurrently
00504  * (if so, we can just abandon our own efforts)
00505  */
00506 static bool
00507 shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
00508           IndexBulkDeleteResult *stats)
00509 {
00510     Page        metapage;
00511     GinMetaPageData *metadata;
00512     BlockNumber blknoToDelete;
00513 
00514     metapage = BufferGetPage(metabuffer);
00515     metadata = GinPageGetMeta(metapage);
00516     blknoToDelete = metadata->head;
00517 
00518     do
00519     {
00520         Page        page;
00521         int         i;
00522         int64       nDeletedHeapTuples = 0;
00523         ginxlogDeleteListPages data;
00524         XLogRecData rdata[1];
00525         Buffer      buffers[GIN_NDELETE_AT_ONCE];
00526 
00527         data.node = index->rd_node;
00528 
00529         rdata[0].buffer = InvalidBuffer;
00530         rdata[0].data = (char *) &data;
00531         rdata[0].len = sizeof(ginxlogDeleteListPages);
00532         rdata[0].next = NULL;
00533 
00534         data.ndeleted = 0;
00535         while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
00536         {
00537             data.toDelete[data.ndeleted] = blknoToDelete;
00538             buffers[data.ndeleted] = ReadBuffer(index, blknoToDelete);
00539             LockBuffer(buffers[data.ndeleted], GIN_EXCLUSIVE);
00540             page = BufferGetPage(buffers[data.ndeleted]);
00541 
00542             data.ndeleted++;
00543 
00544             if (GinPageIsDeleted(page))
00545             {
00546                 /* concurrent cleanup process is detected */
00547                 for (i = 0; i < data.ndeleted; i++)
00548                     UnlockReleaseBuffer(buffers[i]);
00549 
00550                 return true;
00551             }
00552 
00553             nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
00554             blknoToDelete = GinPageGetOpaque(page)->rightlink;
00555         }
00556 
00557         if (stats)
00558             stats->pages_deleted += data.ndeleted;
00559 
00560         START_CRIT_SECTION();
00561 
00562         metadata->head = blknoToDelete;
00563 
00564         Assert(metadata->nPendingPages >= data.ndeleted);
00565         metadata->nPendingPages -= data.ndeleted;
00566         Assert(metadata->nPendingHeapTuples >= nDeletedHeapTuples);
00567         metadata->nPendingHeapTuples -= nDeletedHeapTuples;
00568 
00569         if (blknoToDelete == InvalidBlockNumber)
00570         {
00571             metadata->tail = InvalidBlockNumber;
00572             metadata->tailFreeSize = 0;
00573             metadata->nPendingPages = 0;
00574             metadata->nPendingHeapTuples = 0;
00575         }
00576 
00577         MarkBufferDirty(metabuffer);
00578 
00579         for (i = 0; i < data.ndeleted; i++)
00580         {
00581             page = BufferGetPage(buffers[i]);
00582             GinPageGetOpaque(page)->flags = GIN_DELETED;
00583             MarkBufferDirty(buffers[i]);
00584         }
00585 
00586         if (RelationNeedsWAL(index))
00587         {
00588             XLogRecPtr  recptr;
00589 
00590             memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
00591 
00592             recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE, rdata);
00593             PageSetLSN(metapage, recptr);
00594 
00595             for (i = 0; i < data.ndeleted; i++)
00596             {
00597                 page = BufferGetPage(buffers[i]);
00598                 PageSetLSN(page, recptr);
00599             }
00600         }
00601 
00602         for (i = 0; i < data.ndeleted; i++)
00603             UnlockReleaseBuffer(buffers[i]);
00604 
00605         END_CRIT_SECTION();
00606     } while (blknoToDelete != newHead);
00607 
00608     return false;
00609 }
00610 
00611 /* Initialize empty KeyArray */
00612 static void
00613 initKeyArray(KeyArray *keys, int32 maxvalues)
00614 {
00615     keys->keys = (Datum *) palloc(sizeof(Datum) * maxvalues);
00616     keys->categories = (GinNullCategory *)
00617         palloc(sizeof(GinNullCategory) * maxvalues);
00618     keys->nvalues = 0;
00619     keys->maxvalues = maxvalues;
00620 }
00621 
00622 /* Add datum to KeyArray, resizing if needed */
00623 static void
00624 addDatum(KeyArray *keys, Datum datum, GinNullCategory category)
00625 {
00626     if (keys->nvalues >= keys->maxvalues)
00627     {
00628         keys->maxvalues *= 2;
00629         keys->keys = (Datum *)
00630             repalloc(keys->keys, sizeof(Datum) * keys->maxvalues);
00631         keys->categories = (GinNullCategory *)
00632             repalloc(keys->categories, sizeof(GinNullCategory) * keys->maxvalues);
00633     }
00634 
00635     keys->keys[keys->nvalues] = datum;
00636     keys->categories[keys->nvalues] = category;
00637     keys->nvalues++;
00638 }
00639 
00640 /*
00641  * Collect data from a pending-list page in preparation for insertion into
00642  * the main index.
00643  *
00644  * Go through all tuples >= startoff on page and collect values in accum
00645  *
00646  * Note that ka is just workspace --- it does not carry any state across
00647  * calls.
00648  */
00649 static void
00650 processPendingPage(BuildAccumulator *accum, KeyArray *ka,
00651                    Page page, OffsetNumber startoff)
00652 {
00653     ItemPointerData heapptr;
00654     OffsetNumber i,
00655                 maxoff;
00656     OffsetNumber attrnum;
00657 
00658     /* reset *ka to empty */
00659     ka->nvalues = 0;
00660 
00661     maxoff = PageGetMaxOffsetNumber(page);
00662     Assert(maxoff >= FirstOffsetNumber);
00663     ItemPointerSetInvalid(&heapptr);
00664     attrnum = 0;
00665 
00666     for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
00667     {
00668         IndexTuple  itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
00669         OffsetNumber curattnum;
00670         Datum       curkey;
00671         GinNullCategory curcategory;
00672 
00673         /* Check for change of heap TID or attnum */
00674         curattnum = gintuple_get_attrnum(accum->ginstate, itup);
00675 
00676         if (!ItemPointerIsValid(&heapptr))
00677         {
00678             heapptr = itup->t_tid;
00679             attrnum = curattnum;
00680         }
00681         else if (!(ItemPointerEquals(&heapptr, &itup->t_tid) &&
00682                    curattnum == attrnum))
00683         {
00684             /*
00685              * ginInsertBAEntries can insert several datums per call, but only
00686              * for one heap tuple and one column.  So call it at a boundary,
00687              * and reset ka.
00688              */
00689             ginInsertBAEntries(accum, &heapptr, attrnum,
00690                                ka->keys, ka->categories, ka->nvalues);
00691             ka->nvalues = 0;
00692             heapptr = itup->t_tid;
00693             attrnum = curattnum;
00694         }
00695 
00696         /* Add key to KeyArray */
00697         curkey = gintuple_get_key(accum->ginstate, itup, &curcategory);
00698         addDatum(ka, curkey, curcategory);
00699     }
00700 
00701     /* Dump out all remaining keys */
00702     ginInsertBAEntries(accum, &heapptr, attrnum,
00703                        ka->keys, ka->categories, ka->nvalues);
00704 }
00705 
00706 /*
00707  * Move tuples from pending pages into regular GIN structure.
00708  *
00709  * This can be called concurrently by multiple backends, so it must cope.
00710  * On first glance it looks completely not concurrent-safe and not crash-safe
00711  * either.  The reason it's okay is that multiple insertion of the same entry
00712  * is detected and treated as a no-op by gininsert.c.  If we crash after
00713  * posting entries to the main index and before removing them from the
00714  * pending list, it's okay because when we redo the posting later on, nothing
00715  * bad will happen.  Likewise, if two backends simultaneously try to post
00716  * a pending entry into the main index, one will succeed and one will do
00717  * nothing.  We try to notice when someone else is a little bit ahead of
00718  * us in the process, but that's just to avoid wasting cycles.  Only the
00719  * action of removing a page from the pending list really needs exclusive
00720  * lock.
00721  *
00722  * vac_delay indicates that ginInsertCleanup is called from vacuum process,
00723  * so call vacuum_delay_point() periodically.
00724  * If stats isn't null, we count deleted pending pages into the counts.
00725  */
00726 void
00727 ginInsertCleanup(GinState *ginstate,
00728                  bool vac_delay, IndexBulkDeleteResult *stats)
00729 {
00730     Relation    index = ginstate->index;
00731     Buffer      metabuffer,
00732                 buffer;
00733     Page        metapage,
00734                 page;
00735     GinMetaPageData *metadata;
00736     MemoryContext opCtx,
00737                 oldCtx;
00738     BuildAccumulator accum;
00739     KeyArray    datums;
00740     BlockNumber blkno;
00741 
00742     metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
00743     LockBuffer(metabuffer, GIN_SHARE);
00744     metapage = BufferGetPage(metabuffer);
00745     metadata = GinPageGetMeta(metapage);
00746 
00747     if (metadata->head == InvalidBlockNumber)
00748     {
00749         /* Nothing to do */
00750         UnlockReleaseBuffer(metabuffer);
00751         return;
00752     }
00753 
00754     /*
00755      * Read and lock head of pending list
00756      */
00757     blkno = metadata->head;
00758     buffer = ReadBuffer(index, blkno);
00759     LockBuffer(buffer, GIN_SHARE);
00760     page = BufferGetPage(buffer);
00761 
00762     LockBuffer(metabuffer, GIN_UNLOCK);
00763 
00764     /*
00765      * Initialize.  All temporary space will be in opCtx
00766      */
00767     opCtx = AllocSetContextCreate(CurrentMemoryContext,
00768                                   "GIN insert cleanup temporary context",
00769                                   ALLOCSET_DEFAULT_MINSIZE,
00770                                   ALLOCSET_DEFAULT_INITSIZE,
00771                                   ALLOCSET_DEFAULT_MAXSIZE);
00772 
00773     oldCtx = MemoryContextSwitchTo(opCtx);
00774 
00775     initKeyArray(&datums, 128);
00776     ginInitBA(&accum);
00777     accum.ginstate = ginstate;
00778 
00779     /*
00780      * At the top of this loop, we have pin and lock on the current page of
00781      * the pending list.  However, we'll release that before exiting the loop.
00782      * Note we also have pin but not lock on the metapage.
00783      */
00784     for (;;)
00785     {
00786         if (GinPageIsDeleted(page))
00787         {
00788             /* another cleanup process is running concurrently */
00789             UnlockReleaseBuffer(buffer);
00790             break;
00791         }
00792 
00793         /*
00794          * read page's datums into accum
00795          */
00796         processPendingPage(&accum, &datums, page, FirstOffsetNumber);
00797 
00798         if (vac_delay)
00799             vacuum_delay_point();
00800 
00801         /*
00802          * Is it time to flush memory to disk?  Flush if we are at the end of
00803          * the pending list, or if we have a full row and memory is getting
00804          * full.
00805          *
00806          * XXX using up maintenance_work_mem here is probably unreasonably
00807          * much, since vacuum might already be using that much.
00808          */
00809         if (GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
00810             (GinPageHasFullRow(page) &&
00811              (accum.allocatedMemory >= maintenance_work_mem * 1024L)))
00812         {
00813             ItemPointerData *list;
00814             uint32      nlist;
00815             Datum       key;
00816             GinNullCategory category;
00817             OffsetNumber maxoff,
00818                         attnum;
00819 
00820             /*
00821              * Unlock current page to increase performance. Changes of page
00822              * will be checked later by comparing maxoff after completion of
00823              * memory flush.
00824              */
00825             maxoff = PageGetMaxOffsetNumber(page);
00826             LockBuffer(buffer, GIN_UNLOCK);
00827 
00828             /*
00829              * Moving collected data into regular structure can take
00830              * significant amount of time - so, run it without locking pending
00831              * list.
00832              */
00833             ginBeginBAScan(&accum);
00834             while ((list = ginGetBAEntry(&accum,
00835                                   &attnum, &key, &category, &nlist)) != NULL)
00836             {
00837                 ginEntryInsert(ginstate, attnum, key, category,
00838                                list, nlist, NULL);
00839                 if (vac_delay)
00840                     vacuum_delay_point();
00841             }
00842 
00843             /*
00844              * Lock the whole list to remove pages
00845              */
00846             LockBuffer(metabuffer, GIN_EXCLUSIVE);
00847             LockBuffer(buffer, GIN_SHARE);
00848 
00849             if (GinPageIsDeleted(page))
00850             {
00851                 /* another cleanup process is running concurrently */
00852                 UnlockReleaseBuffer(buffer);
00853                 LockBuffer(metabuffer, GIN_UNLOCK);
00854                 break;
00855             }
00856 
00857             /*
00858              * While we left the page unlocked, more stuff might have gotten
00859              * added to it.  If so, process those entries immediately.  There
00860              * shouldn't be very many, so we don't worry about the fact that
00861              * we're doing this with exclusive lock. Insertion algorithm
00862              * guarantees that inserted row(s) will not continue on next page.
00863              * NOTE: intentionally no vacuum_delay_point in this loop.
00864              */
00865             if (PageGetMaxOffsetNumber(page) != maxoff)
00866             {
00867                 ginInitBA(&accum);
00868                 processPendingPage(&accum, &datums, page, maxoff + 1);
00869 
00870                 ginBeginBAScan(&accum);
00871                 while ((list = ginGetBAEntry(&accum,
00872                                   &attnum, &key, &category, &nlist)) != NULL)
00873                     ginEntryInsert(ginstate, attnum, key, category,
00874                                    list, nlist, NULL);
00875             }
00876 
00877             /*
00878              * Remember next page - it will become the new list head
00879              */
00880             blkno = GinPageGetOpaque(page)->rightlink;
00881             UnlockReleaseBuffer(buffer);        /* shiftList will do exclusive
00882                                                  * locking */
00883 
00884             /*
00885              * remove readed pages from pending list, at this point all
00886              * content of readed pages is in regular structure
00887              */
00888             if (shiftList(index, metabuffer, blkno, stats))
00889             {
00890                 /* another cleanup process is running concurrently */
00891                 LockBuffer(metabuffer, GIN_UNLOCK);
00892                 break;
00893             }
00894 
00895             Assert(blkno == metadata->head);
00896             LockBuffer(metabuffer, GIN_UNLOCK);
00897 
00898             /*
00899              * if we removed the whole pending list just exit
00900              */
00901             if (blkno == InvalidBlockNumber)
00902                 break;
00903 
00904             /*
00905              * release memory used so far and reinit state
00906              */
00907             MemoryContextReset(opCtx);
00908             initKeyArray(&datums, datums.maxvalues);
00909             ginInitBA(&accum);
00910         }
00911         else
00912         {
00913             blkno = GinPageGetOpaque(page)->rightlink;
00914             UnlockReleaseBuffer(buffer);
00915         }
00916 
00917         /*
00918          * Read next page in pending list
00919          */
00920         CHECK_FOR_INTERRUPTS();
00921         buffer = ReadBuffer(index, blkno);
00922         LockBuffer(buffer, GIN_SHARE);
00923         page = BufferGetPage(buffer);
00924     }
00925 
00926     ReleaseBuffer(metabuffer);
00927 
00928     /* Clean up temporary space */
00929     MemoryContextSwitchTo(oldCtx);
00930     MemoryContextDelete(opCtx);
00931 }