Header And Logo

PostgreSQL
| The world's most advanced open source database.

rewriteheap.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * rewriteheap.c
00004  *    Support functions to rewrite tables.
00005  *
00006  * These functions provide a facility to completely rewrite a heap, while
00007  * preserving visibility information and update chains.
00008  *
00009  * INTERFACE
00010  *
00011  * The caller is responsible for creating the new heap, all catalog
00012  * changes, supplying the tuples to be written to the new heap, and
00013  * rebuilding indexes.  The caller must hold AccessExclusiveLock on the
00014  * target table, because we assume no one else is writing into it.
00015  *
00016  * To use the facility:
00017  *
00018  * begin_heap_rewrite
00019  * while (fetch next tuple)
00020  * {
00021  *     if (tuple is dead)
00022  *         rewrite_heap_dead_tuple
00023  *     else
00024  *     {
00025  *         // do any transformations here if required
00026  *         rewrite_heap_tuple
00027  *     }
00028  * }
00029  * end_heap_rewrite
00030  *
00031  * The contents of the new relation shouldn't be relied on until after
00032  * end_heap_rewrite is called.
00033  *
00034  *
00035  * IMPLEMENTATION
00036  *
00037  * This would be a fairly trivial affair, except that we need to maintain
00038  * the ctid chains that link versions of an updated tuple together.
00039  * Since the newly stored tuples will have tids different from the original
00040  * ones, if we just copied t_ctid fields to the new table the links would
00041  * be wrong.  When we are required to copy a (presumably recently-dead or
00042  * delete-in-progress) tuple whose ctid doesn't point to itself, we have
00043  * to substitute the correct ctid instead.
00044  *
00045  * For each ctid reference from A -> B, we might encounter either A first
00046  * or B first.  (Note that a tuple in the middle of a chain is both A and B
00047  * of different pairs.)
00048  *
00049  * If we encounter A first, we'll store the tuple in the unresolved_tups
00050  * hash table. When we later encounter B, we remove A from the hash table,
00051  * fix the ctid to point to the new location of B, and insert both A and B
00052  * to the new heap.
00053  *
00054  * If we encounter B first, we can insert B to the new heap right away.
00055  * We then add an entry to the old_new_tid_map hash table showing B's
00056  * original tid (in the old heap) and new tid (in the new heap).
00057  * When we later encounter A, we get the new location of B from the table,
00058  * and can write A immediately with the correct ctid.
00059  *
00060  * Entries in the hash tables can be removed as soon as the later tuple
00061  * is encountered.  That helps to keep the memory usage down.  At the end,
00062  * both tables are usually empty; we should have encountered both A and B
00063  * of each pair.  However, it's possible for A to be RECENTLY_DEAD and B
00064  * entirely DEAD according to HeapTupleSatisfiesVacuum, because the test
00065  * for deadness using OldestXmin is not exact.  In such a case we might
00066  * encounter B first, and skip it, and find A later.  Then A would be added
00067  * to unresolved_tups, and stay there until end of the rewrite.  Since
00068  * this case is very unusual, we don't worry about the memory usage.
00069  *
00070  * Using in-memory hash tables means that we use some memory for each live
00071  * update chain in the table, from the time we find one end of the
00072  * reference until we find the other end.  That shouldn't be a problem in
00073  * practice, but if you do something like an UPDATE without a where-clause
00074  * on a large table, and then run CLUSTER in the same transaction, you
00075  * could run out of memory.  It doesn't seem worthwhile to add support for
00076  * spill-to-disk, as there shouldn't be that many RECENTLY_DEAD tuples in a
00077  * table under normal circumstances.  Furthermore, in the typical scenario
00078  * of CLUSTERing on an unchanging key column, we'll see all the versions
00079  * of a given tuple together anyway, and so the peak memory usage is only
00080  * proportional to the number of RECENTLY_DEAD versions of a single row, not
00081  * in the whole table.  Note that if we do fail halfway through a CLUSTER,
00082  * the old table is still valid, so failure is not catastrophic.
00083  *
00084  * We can't use the normal heap_insert function to insert into the new
00085  * heap, because heap_insert overwrites the visibility information.
00086  * We use a special-purpose raw_heap_insert function instead, which
00087  * is optimized for bulk inserting a lot of tuples, knowing that we have
00088  * exclusive access to the heap.  raw_heap_insert builds new pages in
00089  * local storage.  When a page is full, or at the end of the process,
00090  * we insert it to WAL as a single record and then write it to disk
00091  * directly through smgr.  Note, however, that any data sent to the new
00092  * heap's TOAST table will go through the normal bufmgr.
00093  *
00094  *
00095  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00096  * Portions Copyright (c) 1994-5, Regents of the University of California
00097  *
00098  * IDENTIFICATION
00099  *    src/backend/access/heap/rewriteheap.c
00100  *
00101  *-------------------------------------------------------------------------
00102  */
00103 #include "postgres.h"
00104 
00105 #include "access/heapam.h"
00106 #include "access/heapam_xlog.h"
00107 #include "access/rewriteheap.h"
00108 #include "access/transam.h"
00109 #include "access/tuptoaster.h"
00110 #include "storage/bufmgr.h"
00111 #include "storage/smgr.h"
00112 #include "utils/memutils.h"
00113 #include "utils/rel.h"
00114 #include "utils/tqual.h"
00115 
00116 
00117 /*
00118  * State associated with a rewrite operation. This is opaque to the user
00119  * of the rewrite facility.
00120  */
00121 typedef struct RewriteStateData
00122 {
00123     Relation    rs_new_rel;     /* destination heap */
00124     Page        rs_buffer;      /* page currently being built */
00125     BlockNumber rs_blockno;     /* block where page will go */
00126     bool        rs_buffer_valid;    /* T if any tuples in buffer */
00127     bool        rs_use_wal;     /* must we WAL-log inserts? */
00128     TransactionId rs_oldest_xmin;       /* oldest xmin used by caller to
00129                                          * determine tuple visibility */
00130     TransactionId rs_freeze_xid;/* Xid that will be used as freeze cutoff
00131                                  * point */
00132     MultiXactId rs_freeze_multi;/* MultiXactId that will be used as freeze
00133                                  * cutoff point for multixacts */
00134     MemoryContext rs_cxt;       /* for hash tables and entries and tuples in
00135                                  * them */
00136     HTAB       *rs_unresolved_tups;     /* unmatched A tuples */
00137     HTAB       *rs_old_new_tid_map;     /* unmatched B tuples */
00138 }   RewriteStateData;
00139 
00140 /*
00141  * The lookup keys for the hash tables are tuple TID and xmin (we must check
00142  * both to avoid false matches from dead tuples).  Beware that there is
00143  * probably some padding space in this struct; it must be zeroed out for
00144  * correct hashtable operation.
00145  */
00146 typedef struct
00147 {
00148     TransactionId xmin;         /* tuple xmin */
00149     ItemPointerData tid;        /* tuple location in old heap */
00150 } TidHashKey;
00151 
00152 /*
00153  * Entry structures for the hash tables
00154  */
00155 typedef struct
00156 {
00157     TidHashKey  key;            /* expected xmin/old location of B tuple */
00158     ItemPointerData old_tid;    /* A's location in the old heap */
00159     HeapTuple   tuple;          /* A's tuple contents */
00160 } UnresolvedTupData;
00161 
00162 typedef UnresolvedTupData *UnresolvedTup;
00163 
00164 typedef struct
00165 {
00166     TidHashKey  key;            /* actual xmin/old location of B tuple */
00167     ItemPointerData new_tid;    /* where we put it in the new heap */
00168 } OldToNewMappingData;
00169 
00170 typedef OldToNewMappingData *OldToNewMapping;
00171 
00172 
00173 /* prototypes for internal functions */
00174 static void raw_heap_insert(RewriteState state, HeapTuple tup);
00175 
00176 
00177 /*
00178  * Begin a rewrite of a table
00179  *
00180  * new_heap     new, locked heap relation to insert tuples to
00181  * oldest_xmin  xid used by the caller to determine which tuples are dead
00182  * freeze_xid   xid before which tuples will be frozen
00183  * freeze_multi multixact before which multis will be frozen
00184  * use_wal      should the inserts to the new heap be WAL-logged?
00185  *
00186  * Returns an opaque RewriteState, allocated in current memory context,
00187  * to be used in subsequent calls to the other functions.
00188  */
00189 RewriteState
00190 begin_heap_rewrite(Relation new_heap, TransactionId oldest_xmin,
00191                    TransactionId freeze_xid, MultiXactId freeze_multi,
00192                    bool use_wal)
00193 {
00194     RewriteState state;
00195     MemoryContext rw_cxt;
00196     MemoryContext old_cxt;
00197     HASHCTL     hash_ctl;
00198 
00199     /*
00200      * To ease cleanup, make a separate context that will contain the
00201      * RewriteState struct itself plus all subsidiary data.
00202      */
00203     rw_cxt = AllocSetContextCreate(CurrentMemoryContext,
00204                                    "Table rewrite",
00205                                    ALLOCSET_DEFAULT_MINSIZE,
00206                                    ALLOCSET_DEFAULT_INITSIZE,
00207                                    ALLOCSET_DEFAULT_MAXSIZE);
00208     old_cxt = MemoryContextSwitchTo(rw_cxt);
00209 
00210     /* Create and fill in the state struct */
00211     state = palloc0(sizeof(RewriteStateData));
00212 
00213     state->rs_new_rel = new_heap;
00214     state->rs_buffer = (Page) palloc(BLCKSZ);
00215     /* new_heap needn't be empty, just locked */
00216     state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
00217     state->rs_buffer_valid = false;
00218     state->rs_use_wal = use_wal;
00219     state->rs_oldest_xmin = oldest_xmin;
00220     state->rs_freeze_xid = freeze_xid;
00221     state->rs_freeze_multi = freeze_multi;
00222     state->rs_cxt = rw_cxt;
00223 
00224     /* Initialize hash tables used to track update chains */
00225     memset(&hash_ctl, 0, sizeof(hash_ctl));
00226     hash_ctl.keysize = sizeof(TidHashKey);
00227     hash_ctl.entrysize = sizeof(UnresolvedTupData);
00228     hash_ctl.hcxt = state->rs_cxt;
00229     hash_ctl.hash = tag_hash;
00230 
00231     state->rs_unresolved_tups =
00232         hash_create("Rewrite / Unresolved ctids",
00233                     128,        /* arbitrary initial size */
00234                     &hash_ctl,
00235                     HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
00236 
00237     hash_ctl.entrysize = sizeof(OldToNewMappingData);
00238 
00239     state->rs_old_new_tid_map =
00240         hash_create("Rewrite / Old to new tid map",
00241                     128,        /* arbitrary initial size */
00242                     &hash_ctl,
00243                     HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
00244 
00245     MemoryContextSwitchTo(old_cxt);
00246 
00247     return state;
00248 }
00249 
00250 /*
00251  * End a rewrite.
00252  *
00253  * state and any other resources are freed.
00254  */
00255 void
00256 end_heap_rewrite(RewriteState state)
00257 {
00258     HASH_SEQ_STATUS seq_status;
00259     UnresolvedTup unresolved;
00260 
00261     /*
00262      * Write any remaining tuples in the UnresolvedTups table. If we have any
00263      * left, they should in fact be dead, but let's err on the safe side.
00264      */
00265     hash_seq_init(&seq_status, state->rs_unresolved_tups);
00266 
00267     while ((unresolved = hash_seq_search(&seq_status)) != NULL)
00268     {
00269         ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
00270         raw_heap_insert(state, unresolved->tuple);
00271     }
00272 
00273     /* Write the last page, if any */
00274     if (state->rs_buffer_valid)
00275     {
00276         if (state->rs_use_wal)
00277             log_newpage(&state->rs_new_rel->rd_node,
00278                         MAIN_FORKNUM,
00279                         state->rs_blockno,
00280                         state->rs_buffer);
00281         RelationOpenSmgr(state->rs_new_rel);
00282 
00283         PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
00284 
00285         smgrextend(state->rs_new_rel->rd_smgr, MAIN_FORKNUM, state->rs_blockno,
00286                    (char *) state->rs_buffer, true);
00287     }
00288 
00289     /*
00290      * If the rel is WAL-logged, must fsync before commit.  We use heap_sync
00291      * to ensure that the toast table gets fsync'd too.
00292      *
00293      * It's obvious that we must do this when not WAL-logging. It's less
00294      * obvious that we have to do it even if we did WAL-log the pages. The
00295      * reason is the same as in tablecmds.c's copy_relation_data(): we're
00296      * writing data that's not in shared buffers, and so a CHECKPOINT
00297      * occurring during the rewriteheap operation won't have fsync'd data we
00298      * wrote before the checkpoint.
00299      */
00300     if (RelationNeedsWAL(state->rs_new_rel))
00301         heap_sync(state->rs_new_rel);
00302 
00303     /* Deleting the context frees everything */
00304     MemoryContextDelete(state->rs_cxt);
00305 }
00306 
00307 /*
00308  * Add a tuple to the new heap.
00309  *
00310  * Visibility information is copied from the original tuple, except that
00311  * we "freeze" very-old tuples.  Note that since we scribble on new_tuple,
00312  * it had better be temp storage not a pointer to the original tuple.
00313  *
00314  * state        opaque state as returned by begin_heap_rewrite
00315  * old_tuple    original tuple in the old heap
00316  * new_tuple    new, rewritten tuple to be inserted to new heap
00317  */
00318 void
00319 rewrite_heap_tuple(RewriteState state,
00320                    HeapTuple old_tuple, HeapTuple new_tuple)
00321 {
00322     MemoryContext old_cxt;
00323     ItemPointerData old_tid;
00324     TidHashKey  hashkey;
00325     bool        found;
00326     bool        free_new;
00327 
00328     old_cxt = MemoryContextSwitchTo(state->rs_cxt);
00329 
00330     /*
00331      * Copy the original tuple's visibility information into new_tuple.
00332      *
00333      * XXX we might later need to copy some t_infomask2 bits, too? Right now,
00334      * we intentionally clear the HOT status bits.
00335      */
00336     memcpy(&new_tuple->t_data->t_choice.t_heap,
00337            &old_tuple->t_data->t_choice.t_heap,
00338            sizeof(HeapTupleFields));
00339 
00340     new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
00341     new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
00342     new_tuple->t_data->t_infomask |=
00343         old_tuple->t_data->t_infomask & HEAP_XACT_MASK;
00344 
00345     /*
00346      * While we have our hands on the tuple, we may as well freeze any
00347      * very-old xmin or xmax, so that future VACUUM effort can be saved.
00348      */
00349     heap_freeze_tuple(new_tuple->t_data, state->rs_freeze_xid,
00350                       state->rs_freeze_multi);
00351 
00352     /*
00353      * Invalid ctid means that ctid should point to the tuple itself. We'll
00354      * override it later if the tuple is part of an update chain.
00355      */
00356     ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
00357 
00358     /*
00359      * If the tuple has been updated, check the old-to-new mapping hash table.
00360      */
00361     if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
00362           HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
00363         !(ItemPointerEquals(&(old_tuple->t_self),
00364                             &(old_tuple->t_data->t_ctid))))
00365     {
00366         OldToNewMapping mapping;
00367 
00368         memset(&hashkey, 0, sizeof(hashkey));
00369         hashkey.xmin = HeapTupleHeaderGetUpdateXid(old_tuple->t_data);
00370         hashkey.tid = old_tuple->t_data->t_ctid;
00371 
00372         mapping = (OldToNewMapping)
00373             hash_search(state->rs_old_new_tid_map, &hashkey,
00374                         HASH_FIND, NULL);
00375 
00376         if (mapping != NULL)
00377         {
00378             /*
00379              * We've already copied the tuple that t_ctid points to, so we can
00380              * set the ctid of this tuple to point to the new location, and
00381              * insert it right away.
00382              */
00383             new_tuple->t_data->t_ctid = mapping->new_tid;
00384 
00385             /* We don't need the mapping entry anymore */
00386             hash_search(state->rs_old_new_tid_map, &hashkey,
00387                         HASH_REMOVE, &found);
00388             Assert(found);
00389         }
00390         else
00391         {
00392             /*
00393              * We haven't seen the tuple t_ctid points to yet. Stash this
00394              * tuple into unresolved_tups to be written later.
00395              */
00396             UnresolvedTup unresolved;
00397 
00398             unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
00399                                      HASH_ENTER, &found);
00400             Assert(!found);
00401 
00402             unresolved->old_tid = old_tuple->t_self;
00403             unresolved->tuple = heap_copytuple(new_tuple);
00404 
00405             /*
00406              * We can't do anything more now, since we don't know where the
00407              * tuple will be written.
00408              */
00409             MemoryContextSwitchTo(old_cxt);
00410             return;
00411         }
00412     }
00413 
00414     /*
00415      * Now we will write the tuple, and then check to see if it is the B tuple
00416      * in any new or known pair.  When we resolve a known pair, we will be
00417      * able to write that pair's A tuple, and then we have to check if it
00418      * resolves some other pair.  Hence, we need a loop here.
00419      */
00420     old_tid = old_tuple->t_self;
00421     free_new = false;
00422 
00423     for (;;)
00424     {
00425         ItemPointerData new_tid;
00426 
00427         /* Insert the tuple and find out where it's put in new_heap */
00428         raw_heap_insert(state, new_tuple);
00429         new_tid = new_tuple->t_self;
00430 
00431         /*
00432          * If the tuple is the updated version of a row, and the prior version
00433          * wouldn't be DEAD yet, then we need to either resolve the prior
00434          * version (if it's waiting in rs_unresolved_tups), or make an entry
00435          * in rs_old_new_tid_map (so we can resolve it when we do see it). The
00436          * previous tuple's xmax would equal this one's xmin, so it's
00437          * RECENTLY_DEAD if and only if the xmin is not before OldestXmin.
00438          */
00439         if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
00440             !TransactionIdPrecedes(HeapTupleHeaderGetXmin(new_tuple->t_data),
00441                                    state->rs_oldest_xmin))
00442         {
00443             /*
00444              * Okay, this is B in an update pair.  See if we've seen A.
00445              */
00446             UnresolvedTup unresolved;
00447 
00448             memset(&hashkey, 0, sizeof(hashkey));
00449             hashkey.xmin = HeapTupleHeaderGetXmin(new_tuple->t_data);
00450             hashkey.tid = old_tid;
00451 
00452             unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
00453                                      HASH_FIND, NULL);
00454 
00455             if (unresolved != NULL)
00456             {
00457                 /*
00458                  * We have seen and memorized the previous tuple already. Now
00459                  * that we know where we inserted the tuple its t_ctid points
00460                  * to, fix its t_ctid and insert it to the new heap.
00461                  */
00462                 if (free_new)
00463                     heap_freetuple(new_tuple);
00464                 new_tuple = unresolved->tuple;
00465                 free_new = true;
00466                 old_tid = unresolved->old_tid;
00467                 new_tuple->t_data->t_ctid = new_tid;
00468 
00469                 /*
00470                  * We don't need the hash entry anymore, but don't free its
00471                  * tuple just yet.
00472                  */
00473                 hash_search(state->rs_unresolved_tups, &hashkey,
00474                             HASH_REMOVE, &found);
00475                 Assert(found);
00476 
00477                 /* loop back to insert the previous tuple in the chain */
00478                 continue;
00479             }
00480             else
00481             {
00482                 /*
00483                  * Remember the new tid of this tuple. We'll use it to set the
00484                  * ctid when we find the previous tuple in the chain.
00485                  */
00486                 OldToNewMapping mapping;
00487 
00488                 mapping = hash_search(state->rs_old_new_tid_map, &hashkey,
00489                                       HASH_ENTER, &found);
00490                 Assert(!found);
00491 
00492                 mapping->new_tid = new_tid;
00493             }
00494         }
00495 
00496         /* Done with this (chain of) tuples, for now */
00497         if (free_new)
00498             heap_freetuple(new_tuple);
00499         break;
00500     }
00501 
00502     MemoryContextSwitchTo(old_cxt);
00503 }
00504 
00505 /*
00506  * Register a dead tuple with an ongoing rewrite. Dead tuples are not
00507  * copied to the new table, but we still make note of them so that we
00508  * can release some resources earlier.
00509  *
00510  * Returns true if a tuple was removed from the unresolved_tups table.
00511  * This indicates that that tuple, previously thought to be "recently dead",
00512  * is now known really dead and won't be written to the output.
00513  */
00514 bool
00515 rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
00516 {
00517     /*
00518      * If we have already seen an earlier tuple in the update chain that
00519      * points to this tuple, let's forget about that earlier tuple. It's in
00520      * fact dead as well, our simple xmax < OldestXmin test in
00521      * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It happens
00522      * when xmin of a tuple is greater than xmax, which sounds
00523      * counter-intuitive but is perfectly valid.
00524      *
00525      * We don't bother to try to detect the situation the other way round,
00526      * when we encounter the dead tuple first and then the recently dead one
00527      * that points to it. If that happens, we'll have some unmatched entries
00528      * in the UnresolvedTups hash table at the end. That can happen anyway,
00529      * because a vacuum might have removed the dead tuple in the chain before
00530      * us.
00531      */
00532     UnresolvedTup unresolved;
00533     TidHashKey  hashkey;
00534     bool        found;
00535 
00536     memset(&hashkey, 0, sizeof(hashkey));
00537     hashkey.xmin = HeapTupleHeaderGetXmin(old_tuple->t_data);
00538     hashkey.tid = old_tuple->t_self;
00539 
00540     unresolved = hash_search(state->rs_unresolved_tups, &hashkey,
00541                              HASH_FIND, NULL);
00542 
00543     if (unresolved != NULL)
00544     {
00545         /* Need to free the contained tuple as well as the hashtable entry */
00546         heap_freetuple(unresolved->tuple);
00547         hash_search(state->rs_unresolved_tups, &hashkey,
00548                     HASH_REMOVE, &found);
00549         Assert(found);
00550         return true;
00551     }
00552 
00553     return false;
00554 }
00555 
00556 /*
00557  * Insert a tuple to the new relation.  This has to track heap_insert
00558  * and its subsidiary functions!
00559  *
00560  * t_self of the tuple is set to the new TID of the tuple. If t_ctid of the
00561  * tuple is invalid on entry, it's replaced with the new TID as well (in
00562  * the inserted data only, not in the caller's copy).
00563  */
00564 static void
00565 raw_heap_insert(RewriteState state, HeapTuple tup)
00566 {
00567     Page        page = state->rs_buffer;
00568     Size        pageFreeSpace,
00569                 saveFreeSpace;
00570     Size        len;
00571     OffsetNumber newoff;
00572     HeapTuple   heaptup;
00573 
00574     /*
00575      * If the new tuple is too big for storage or contains already toasted
00576      * out-of-line attributes from some other relation, invoke the toaster.
00577      *
00578      * Note: below this point, heaptup is the data we actually intend to store
00579      * into the relation; tup is the caller's original untoasted data.
00580      */
00581     if (state->rs_new_rel->rd_rel->relkind == RELKIND_TOASTVALUE)
00582     {
00583         /* toast table entries should never be recursively toasted */
00584         Assert(!HeapTupleHasExternal(tup));
00585         heaptup = tup;
00586     }
00587     else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
00588         heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL,
00589                                          HEAP_INSERT_SKIP_FSM |
00590                                          (state->rs_use_wal ?
00591                                           0 : HEAP_INSERT_SKIP_WAL));
00592     else
00593         heaptup = tup;
00594 
00595     len = MAXALIGN(heaptup->t_len);     /* be conservative */
00596 
00597     /*
00598      * If we're gonna fail for oversize tuple, do it right away
00599      */
00600     if (len > MaxHeapTupleSize)
00601         ereport(ERROR,
00602                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
00603                  errmsg("row is too big: size %lu, maximum size %lu",
00604                         (unsigned long) len,
00605                         (unsigned long) MaxHeapTupleSize)));
00606 
00607     /* Compute desired extra freespace due to fillfactor option */
00608     saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
00609                                                    HEAP_DEFAULT_FILLFACTOR);
00610 
00611     /* Now we can check to see if there's enough free space already. */
00612     if (state->rs_buffer_valid)
00613     {
00614         pageFreeSpace = PageGetHeapFreeSpace(page);
00615 
00616         if (len + saveFreeSpace > pageFreeSpace)
00617         {
00618             /* Doesn't fit, so write out the existing page */
00619 
00620             /* XLOG stuff */
00621             if (state->rs_use_wal)
00622                 log_newpage(&state->rs_new_rel->rd_node,
00623                             MAIN_FORKNUM,
00624                             state->rs_blockno,
00625                             page);
00626 
00627             /*
00628              * Now write the page. We say isTemp = true even if it's not a
00629              * temp table, because there's no need for smgr to schedule an
00630              * fsync for this write; we'll do it ourselves in
00631              * end_heap_rewrite.
00632              */
00633             RelationOpenSmgr(state->rs_new_rel);
00634 
00635             PageSetChecksumInplace(page, state->rs_blockno);
00636 
00637             smgrextend(state->rs_new_rel->rd_smgr, MAIN_FORKNUM,
00638                        state->rs_blockno, (char *) page, true);
00639 
00640             state->rs_blockno++;
00641             state->rs_buffer_valid = false;
00642         }
00643     }
00644 
00645     if (!state->rs_buffer_valid)
00646     {
00647         /* Initialize a new empty page */
00648         PageInit(page, BLCKSZ, 0);
00649         state->rs_buffer_valid = true;
00650     }
00651 
00652     /* And now we can insert the tuple into the page */
00653     newoff = PageAddItem(page, (Item) heaptup->t_data, heaptup->t_len,
00654                          InvalidOffsetNumber, false, true);
00655     if (newoff == InvalidOffsetNumber)
00656         elog(ERROR, "failed to add tuple");
00657 
00658     /* Update caller's t_self to the actual position where it was stored */
00659     ItemPointerSet(&(tup->t_self), state->rs_blockno, newoff);
00660 
00661     /*
00662      * Insert the correct position into CTID of the stored tuple, too, if the
00663      * caller didn't supply a valid CTID.
00664      */
00665     if (!ItemPointerIsValid(&tup->t_data->t_ctid))
00666     {
00667         ItemId      newitemid;
00668         HeapTupleHeader onpage_tup;
00669 
00670         newitemid = PageGetItemId(page, newoff);
00671         onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
00672 
00673         onpage_tup->t_ctid = tup->t_self;
00674     }
00675 
00676     /* If heaptup is a private copy, release it. */
00677     if (heaptup != tup)
00678         heap_freetuple(heaptup);
00679 }