Header And Logo

PostgreSQL
| The world's most advanced open source database.

tuptoaster.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * tuptoaster.c
00004  *    Support routines for external and compressed storage of
00005  *    variable size attributes.
00006  *
00007  * Copyright (c) 2000-2013, PostgreSQL Global Development Group
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/access/heap/tuptoaster.c
00012  *
00013  *
00014  * INTERFACE ROUTINES
00015  *      toast_insert_or_update -
00016  *          Try to make a given tuple fit into one page by compressing
00017  *          or moving off attributes
00018  *
00019  *      toast_delete -
00020  *          Reclaim toast storage when a tuple is deleted
00021  *
00022  *      heap_tuple_untoast_attr -
00023  *          Fetch back a given value from the "secondary" relation
00024  *
00025  *-------------------------------------------------------------------------
00026  */
00027 
00028 #include "postgres.h"
00029 
00030 #include <unistd.h>
00031 #include <fcntl.h>
00032 
00033 #include "access/genam.h"
00034 #include "access/heapam.h"
00035 #include "access/tuptoaster.h"
00036 #include "access/xact.h"
00037 #include "catalog/catalog.h"
00038 #include "utils/fmgroids.h"
00039 #include "utils/pg_lzcompress.h"
00040 #include "utils/rel.h"
00041 #include "utils/typcache.h"
00042 #include "utils/tqual.h"
00043 
00044 
00045 #undef TOAST_DEBUG
00046 
00047 /* Size of an EXTERNAL datum that contains a standard TOAST pointer */
00048 #define TOAST_POINTER_SIZE (VARHDRSZ_EXTERNAL + sizeof(struct varatt_external))
00049 
00050 /*
00051  * Testing whether an externally-stored value is compressed now requires
00052  * comparing extsize (the actual length of the external data) to rawsize
00053  * (the original uncompressed datum's size).  The latter includes VARHDRSZ
00054  * overhead, the former doesn't.  We never use compression unless it actually
00055  * saves space, so we expect either equality or less-than.
00056  */
00057 #define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) \
00058     ((toast_pointer).va_extsize < (toast_pointer).va_rawsize - VARHDRSZ)
00059 
00060 /*
00061  * Macro to fetch the possibly-unaligned contents of an EXTERNAL datum
00062  * into a local "struct varatt_external" toast pointer.  This should be
00063  * just a memcpy, but some versions of gcc seem to produce broken code
00064  * that assumes the datum contents are aligned.  Introducing an explicit
00065  * intermediate "varattrib_1b_e *" variable seems to fix it.
00066  */
00067 #define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr) \
00068 do { \
00069     varattrib_1b_e *attre = (varattrib_1b_e *) (attr); \
00070     Assert(VARATT_IS_EXTERNAL(attre)); \
00071     Assert(VARSIZE_EXTERNAL(attre) == sizeof(toast_pointer) + VARHDRSZ_EXTERNAL); \
00072     memcpy(&(toast_pointer), VARDATA_EXTERNAL(attre), sizeof(toast_pointer)); \
00073 } while (0)
00074 
00075 
00076 static void toast_delete_datum(Relation rel, Datum value);
00077 static Datum toast_save_datum(Relation rel, Datum value,
00078                  struct varlena * oldexternal, int options);
00079 static bool toastrel_valueid_exists(Relation toastrel, Oid valueid);
00080 static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
00081 static struct varlena *toast_fetch_datum(struct varlena * attr);
00082 static struct varlena *toast_fetch_datum_slice(struct varlena * attr,
00083                         int32 sliceoffset, int32 length);
00084 
00085 
00086 /* ----------
00087  * heap_tuple_fetch_attr -
00088  *
00089  *  Public entry point to get back a toasted value from
00090  *  external storage (possibly still in compressed format).
00091  *
00092  * This will return a datum that contains all the data internally, ie, not
00093  * relying on external storage, but it can still be compressed or have a short
00094  * header.
00095  ----------
00096  */
00097 struct varlena *
00098 heap_tuple_fetch_attr(struct varlena * attr)
00099 {
00100     struct varlena *result;
00101 
00102     if (VARATT_IS_EXTERNAL(attr))
00103     {
00104         /*
00105          * This is an external stored plain value
00106          */
00107         result = toast_fetch_datum(attr);
00108     }
00109     else
00110     {
00111         /*
00112          * This is a plain value inside of the main tuple - why am I called?
00113          */
00114         result = attr;
00115     }
00116 
00117     return result;
00118 }
00119 
00120 
00121 /* ----------
00122  * heap_tuple_untoast_attr -
00123  *
00124  *  Public entry point to get back a toasted value from compression
00125  *  or external storage.
00126  * ----------
00127  */
00128 struct varlena *
00129 heap_tuple_untoast_attr(struct varlena * attr)
00130 {
00131     if (VARATT_IS_EXTERNAL(attr))
00132     {
00133         /*
00134          * This is an externally stored datum --- fetch it back from there
00135          */
00136         attr = toast_fetch_datum(attr);
00137         /* If it's compressed, decompress it */
00138         if (VARATT_IS_COMPRESSED(attr))
00139         {
00140             PGLZ_Header *tmp = (PGLZ_Header *) attr;
00141 
00142             attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
00143             SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
00144             pglz_decompress(tmp, VARDATA(attr));
00145             pfree(tmp);
00146         }
00147     }
00148     else if (VARATT_IS_COMPRESSED(attr))
00149     {
00150         /*
00151          * This is a compressed value inside of the main tuple
00152          */
00153         PGLZ_Header *tmp = (PGLZ_Header *) attr;
00154 
00155         attr = (struct varlena *) palloc(PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
00156         SET_VARSIZE(attr, PGLZ_RAW_SIZE(tmp) + VARHDRSZ);
00157         pglz_decompress(tmp, VARDATA(attr));
00158     }
00159     else if (VARATT_IS_SHORT(attr))
00160     {
00161         /*
00162          * This is a short-header varlena --- convert to 4-byte header format
00163          */
00164         Size        data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
00165         Size        new_size = data_size + VARHDRSZ;
00166         struct varlena *new_attr;
00167 
00168         new_attr = (struct varlena *) palloc(new_size);
00169         SET_VARSIZE(new_attr, new_size);
00170         memcpy(VARDATA(new_attr), VARDATA_SHORT(attr), data_size);
00171         attr = new_attr;
00172     }
00173 
00174     return attr;
00175 }
00176 
00177 
00178 /* ----------
00179  * heap_tuple_untoast_attr_slice -
00180  *
00181  *      Public entry point to get back part of a toasted value
00182  *      from compression or external storage.
00183  * ----------
00184  */
00185 struct varlena *
00186 heap_tuple_untoast_attr_slice(struct varlena * attr,
00187                               int32 sliceoffset, int32 slicelength)
00188 {
00189     struct varlena *preslice;
00190     struct varlena *result;
00191     char       *attrdata;
00192     int32       attrsize;
00193 
00194     if (VARATT_IS_EXTERNAL(attr))
00195     {
00196         struct varatt_external toast_pointer;
00197 
00198         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
00199 
00200         /* fast path for non-compressed external datums */
00201         if (!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
00202             return toast_fetch_datum_slice(attr, sliceoffset, slicelength);
00203 
00204         /* fetch it back (compressed marker will get set automatically) */
00205         preslice = toast_fetch_datum(attr);
00206     }
00207     else
00208         preslice = attr;
00209 
00210     if (VARATT_IS_COMPRESSED(preslice))
00211     {
00212         PGLZ_Header *tmp = (PGLZ_Header *) preslice;
00213         Size        size = PGLZ_RAW_SIZE(tmp) + VARHDRSZ;
00214 
00215         preslice = (struct varlena *) palloc(size);
00216         SET_VARSIZE(preslice, size);
00217         pglz_decompress(tmp, VARDATA(preslice));
00218 
00219         if (tmp != (PGLZ_Header *) attr)
00220             pfree(tmp);
00221     }
00222 
00223     if (VARATT_IS_SHORT(preslice))
00224     {
00225         attrdata = VARDATA_SHORT(preslice);
00226         attrsize = VARSIZE_SHORT(preslice) - VARHDRSZ_SHORT;
00227     }
00228     else
00229     {
00230         attrdata = VARDATA(preslice);
00231         attrsize = VARSIZE(preslice) - VARHDRSZ;
00232     }
00233 
00234     /* slicing of datum for compressed cases and plain value */
00235 
00236     if (sliceoffset >= attrsize)
00237     {
00238         sliceoffset = 0;
00239         slicelength = 0;
00240     }
00241 
00242     if (((sliceoffset + slicelength) > attrsize) || slicelength < 0)
00243         slicelength = attrsize - sliceoffset;
00244 
00245     result = (struct varlena *) palloc(slicelength + VARHDRSZ);
00246     SET_VARSIZE(result, slicelength + VARHDRSZ);
00247 
00248     memcpy(VARDATA(result), attrdata + sliceoffset, slicelength);
00249 
00250     if (preslice != attr)
00251         pfree(preslice);
00252 
00253     return result;
00254 }
00255 
00256 
00257 /* ----------
00258  * toast_raw_datum_size -
00259  *
00260  *  Return the raw (detoasted) size of a varlena datum
00261  *  (including the VARHDRSZ header)
00262  * ----------
00263  */
00264 Size
00265 toast_raw_datum_size(Datum value)
00266 {
00267     struct varlena *attr = (struct varlena *) DatumGetPointer(value);
00268     Size        result;
00269 
00270     if (VARATT_IS_EXTERNAL(attr))
00271     {
00272         /* va_rawsize is the size of the original datum -- including header */
00273         struct varatt_external toast_pointer;
00274 
00275         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
00276         result = toast_pointer.va_rawsize;
00277     }
00278     else if (VARATT_IS_COMPRESSED(attr))
00279     {
00280         /* here, va_rawsize is just the payload size */
00281         result = VARRAWSIZE_4B_C(attr) + VARHDRSZ;
00282     }
00283     else if (VARATT_IS_SHORT(attr))
00284     {
00285         /*
00286          * we have to normalize the header length to VARHDRSZ or else the
00287          * callers of this function will be confused.
00288          */
00289         result = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT + VARHDRSZ;
00290     }
00291     else
00292     {
00293         /* plain untoasted datum */
00294         result = VARSIZE(attr);
00295     }
00296     return result;
00297 }
00298 
00299 /* ----------
00300  * toast_datum_size
00301  *
00302  *  Return the physical storage size (possibly compressed) of a varlena datum
00303  * ----------
00304  */
00305 Size
00306 toast_datum_size(Datum value)
00307 {
00308     struct varlena *attr = (struct varlena *) DatumGetPointer(value);
00309     Size        result;
00310 
00311     if (VARATT_IS_EXTERNAL(attr))
00312     {
00313         /*
00314          * Attribute is stored externally - return the extsize whether
00315          * compressed or not.  We do not count the size of the toast pointer
00316          * ... should we?
00317          */
00318         struct varatt_external toast_pointer;
00319 
00320         VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
00321         result = toast_pointer.va_extsize;
00322     }
00323     else if (VARATT_IS_SHORT(attr))
00324     {
00325         result = VARSIZE_SHORT(attr);
00326     }
00327     else
00328     {
00329         /*
00330          * Attribute is stored inline either compressed or not, just calculate
00331          * the size of the datum in either case.
00332          */
00333         result = VARSIZE(attr);
00334     }
00335     return result;
00336 }
00337 
00338 
00339 /* ----------
00340  * toast_delete -
00341  *
00342  *  Cascaded delete toast-entries on DELETE
00343  * ----------
00344  */
00345 void
00346 toast_delete(Relation rel, HeapTuple oldtup)
00347 {
00348     TupleDesc   tupleDesc;
00349     Form_pg_attribute *att;
00350     int         numAttrs;
00351     int         i;
00352     Datum       toast_values[MaxHeapAttributeNumber];
00353     bool        toast_isnull[MaxHeapAttributeNumber];
00354 
00355     /*
00356      * We should only ever be called for tuples of plain relations or
00357      * materialized views --- recursing on a toast rel is bad news.
00358      */
00359     Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
00360            rel->rd_rel->relkind == RELKIND_MATVIEW);
00361 
00362     /*
00363      * Get the tuple descriptor and break down the tuple into fields.
00364      *
00365      * NOTE: it's debatable whether to use heap_deform_tuple() here or just
00366      * heap_getattr() only the varlena columns.  The latter could win if there
00367      * are few varlena columns and many non-varlena ones. However,
00368      * heap_deform_tuple costs only O(N) while the heap_getattr way would cost
00369      * O(N^2) if there are many varlena columns, so it seems better to err on
00370      * the side of linear cost.  (We won't even be here unless there's at
00371      * least one varlena column, by the way.)
00372      */
00373     tupleDesc = rel->rd_att;
00374     att = tupleDesc->attrs;
00375     numAttrs = tupleDesc->natts;
00376 
00377     Assert(numAttrs <= MaxHeapAttributeNumber);
00378     heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
00379 
00380     /*
00381      * Check for external stored attributes and delete them from the secondary
00382      * relation.
00383      */
00384     for (i = 0; i < numAttrs; i++)
00385     {
00386         if (att[i]->attlen == -1)
00387         {
00388             Datum       value = toast_values[i];
00389 
00390             if (!toast_isnull[i] && VARATT_IS_EXTERNAL(PointerGetDatum(value)))
00391                 toast_delete_datum(rel, value);
00392         }
00393     }
00394 }
00395 
00396 
00397 /* ----------
00398  * toast_insert_or_update -
00399  *
00400  *  Delete no-longer-used toast-entries and create new ones to
00401  *  make the new tuple fit on INSERT or UPDATE
00402  *
00403  * Inputs:
00404  *  newtup: the candidate new tuple to be inserted
00405  *  oldtup: the old row version for UPDATE, or NULL for INSERT
00406  *  options: options to be passed to heap_insert() for toast rows
00407  * Result:
00408  *  either newtup if no toasting is needed, or a palloc'd modified tuple
00409  *  that is what should actually get stored
00410  *
00411  * NOTE: neither newtup nor oldtup will be modified.  This is a change
00412  * from the pre-8.1 API of this routine.
00413  * ----------
00414  */
00415 HeapTuple
00416 toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
00417                        int options)
00418 {
00419     HeapTuple   result_tuple;
00420     TupleDesc   tupleDesc;
00421     Form_pg_attribute *att;
00422     int         numAttrs;
00423     int         i;
00424 
00425     bool        need_change = false;
00426     bool        need_free = false;
00427     bool        need_delold = false;
00428     bool        has_nulls = false;
00429 
00430     Size        maxDataLen;
00431     Size        hoff;
00432 
00433     char        toast_action[MaxHeapAttributeNumber];
00434     bool        toast_isnull[MaxHeapAttributeNumber];
00435     bool        toast_oldisnull[MaxHeapAttributeNumber];
00436     Datum       toast_values[MaxHeapAttributeNumber];
00437     Datum       toast_oldvalues[MaxHeapAttributeNumber];
00438     struct varlena *toast_oldexternal[MaxHeapAttributeNumber];
00439     int32       toast_sizes[MaxHeapAttributeNumber];
00440     bool        toast_free[MaxHeapAttributeNumber];
00441     bool        toast_delold[MaxHeapAttributeNumber];
00442 
00443     /*
00444      * We should only ever be called for tuples of plain relations ---
00445      * recursing on a toast rel is bad news.
00446      */
00447     Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
00448            rel->rd_rel->relkind == RELKIND_MATVIEW);
00449 
00450     /*
00451      * Get the tuple descriptor and break down the tuple(s) into fields.
00452      */
00453     tupleDesc = rel->rd_att;
00454     att = tupleDesc->attrs;
00455     numAttrs = tupleDesc->natts;
00456 
00457     Assert(numAttrs <= MaxHeapAttributeNumber);
00458     heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);
00459     if (oldtup != NULL)
00460         heap_deform_tuple(oldtup, tupleDesc, toast_oldvalues, toast_oldisnull);
00461 
00462     /* ----------
00463      * Then collect information about the values given
00464      *
00465      * NOTE: toast_action[i] can have these values:
00466      *      ' '     default handling
00467      *      'p'     already processed --- don't touch it
00468      *      'x'     incompressible, but OK to move off
00469      *
00470      * NOTE: toast_sizes[i] is only made valid for varlena attributes with
00471      *      toast_action[i] different from 'p'.
00472      * ----------
00473      */
00474     memset(toast_action, ' ', numAttrs * sizeof(char));
00475     memset(toast_oldexternal, 0, numAttrs * sizeof(struct varlena *));
00476     memset(toast_free, 0, numAttrs * sizeof(bool));
00477     memset(toast_delold, 0, numAttrs * sizeof(bool));
00478 
00479     for (i = 0; i < numAttrs; i++)
00480     {
00481         struct varlena *old_value;
00482         struct varlena *new_value;
00483 
00484         if (oldtup != NULL)
00485         {
00486             /*
00487              * For UPDATE get the old and new values of this attribute
00488              */
00489             old_value = (struct varlena *) DatumGetPointer(toast_oldvalues[i]);
00490             new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
00491 
00492             /*
00493              * If the old value is an external stored one, check if it has
00494              * changed so we have to delete it later.
00495              */
00496             if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
00497                 VARATT_IS_EXTERNAL(old_value))
00498             {
00499                 if (toast_isnull[i] || !VARATT_IS_EXTERNAL(new_value) ||
00500                     memcmp((char *) old_value, (char *) new_value,
00501                            VARSIZE_EXTERNAL(old_value)) != 0)
00502                 {
00503                     /*
00504                      * The old external stored value isn't needed any more
00505                      * after the update
00506                      */
00507                     toast_delold[i] = true;
00508                     need_delold = true;
00509                 }
00510                 else
00511                 {
00512                     /*
00513                      * This attribute isn't changed by this update so we reuse
00514                      * the original reference to the old value in the new
00515                      * tuple.
00516                      */
00517                     toast_action[i] = 'p';
00518                     continue;
00519                 }
00520             }
00521         }
00522         else
00523         {
00524             /*
00525              * For INSERT simply get the new value
00526              */
00527             new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
00528         }
00529 
00530         /*
00531          * Handle NULL attributes
00532          */
00533         if (toast_isnull[i])
00534         {
00535             toast_action[i] = 'p';
00536             has_nulls = true;
00537             continue;
00538         }
00539 
00540         /*
00541          * Now look at varlena attributes
00542          */
00543         if (att[i]->attlen == -1)
00544         {
00545             /*
00546              * If the table's attribute says PLAIN always, force it so.
00547              */
00548             if (att[i]->attstorage == 'p')
00549                 toast_action[i] = 'p';
00550 
00551             /*
00552              * We took care of UPDATE above, so any external value we find
00553              * still in the tuple must be someone else's we cannot reuse.
00554              * Fetch it back (without decompression, unless we are forcing
00555              * PLAIN storage).  If necessary, we'll push it out as a new
00556              * external value below.
00557              */
00558             if (VARATT_IS_EXTERNAL(new_value))
00559             {
00560                 toast_oldexternal[i] = new_value;
00561                 if (att[i]->attstorage == 'p')
00562                     new_value = heap_tuple_untoast_attr(new_value);
00563                 else
00564                     new_value = heap_tuple_fetch_attr(new_value);
00565                 toast_values[i] = PointerGetDatum(new_value);
00566                 toast_free[i] = true;
00567                 need_change = true;
00568                 need_free = true;
00569             }
00570 
00571             /*
00572              * Remember the size of this attribute
00573              */
00574             toast_sizes[i] = VARSIZE_ANY(new_value);
00575         }
00576         else
00577         {
00578             /*
00579              * Not a varlena attribute, plain storage always
00580              */
00581             toast_action[i] = 'p';
00582         }
00583     }
00584 
00585     /* ----------
00586      * Compress and/or save external until data fits into target length
00587      *
00588      *  1: Inline compress attributes with attstorage 'x', and store very
00589      *     large attributes with attstorage 'x' or 'e' external immediately
00590      *  2: Store attributes with attstorage 'x' or 'e' external
00591      *  3: Inline compress attributes with attstorage 'm'
00592      *  4: Store attributes with attstorage 'm' external
00593      * ----------
00594      */
00595 
00596     /* compute header overhead --- this should match heap_form_tuple() */
00597     hoff = offsetof(HeapTupleHeaderData, t_bits);
00598     if (has_nulls)
00599         hoff += BITMAPLEN(numAttrs);
00600     if (newtup->t_data->t_infomask & HEAP_HASOID)
00601         hoff += sizeof(Oid);
00602     hoff = MAXALIGN(hoff);
00603     /* now convert to a limit on the tuple data size */
00604     maxDataLen = TOAST_TUPLE_TARGET - hoff;
00605 
00606     /*
00607      * Look for attributes with attstorage 'x' to compress.  Also find large
00608      * attributes with attstorage 'x' or 'e', and store them external.
00609      */
00610     while (heap_compute_data_size(tupleDesc,
00611                                   toast_values, toast_isnull) > maxDataLen)
00612     {
00613         int         biggest_attno = -1;
00614         int32       biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
00615         Datum       old_value;
00616         Datum       new_value;
00617 
00618         /*
00619          * Search for the biggest yet unprocessed internal attribute
00620          */
00621         for (i = 0; i < numAttrs; i++)
00622         {
00623             if (toast_action[i] != ' ')
00624                 continue;
00625             if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
00626                 continue;       /* can't happen, toast_action would be 'p' */
00627             if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
00628                 continue;
00629             if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
00630                 continue;
00631             if (toast_sizes[i] > biggest_size)
00632             {
00633                 biggest_attno = i;
00634                 biggest_size = toast_sizes[i];
00635             }
00636         }
00637 
00638         if (biggest_attno < 0)
00639             break;
00640 
00641         /*
00642          * Attempt to compress it inline, if it has attstorage 'x'
00643          */
00644         i = biggest_attno;
00645         if (att[i]->attstorage == 'x')
00646         {
00647             old_value = toast_values[i];
00648             new_value = toast_compress_datum(old_value);
00649 
00650             if (DatumGetPointer(new_value) != NULL)
00651             {
00652                 /* successful compression */
00653                 if (toast_free[i])
00654                     pfree(DatumGetPointer(old_value));
00655                 toast_values[i] = new_value;
00656                 toast_free[i] = true;
00657                 toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
00658                 need_change = true;
00659                 need_free = true;
00660             }
00661             else
00662             {
00663                 /* incompressible, ignore on subsequent compression passes */
00664                 toast_action[i] = 'x';
00665             }
00666         }
00667         else
00668         {
00669             /* has attstorage 'e', ignore on subsequent compression passes */
00670             toast_action[i] = 'x';
00671         }
00672 
00673         /*
00674          * If this value is by itself more than maxDataLen (after compression
00675          * if any), push it out to the toast table immediately, if possible.
00676          * This avoids uselessly compressing other fields in the common case
00677          * where we have one long field and several short ones.
00678          *
00679          * XXX maybe the threshold should be less than maxDataLen?
00680          */
00681         if (toast_sizes[i] > maxDataLen &&
00682             rel->rd_rel->reltoastrelid != InvalidOid)
00683         {
00684             old_value = toast_values[i];
00685             toast_action[i] = 'p';
00686             toast_values[i] = toast_save_datum(rel, toast_values[i],
00687                                                toast_oldexternal[i], options);
00688             if (toast_free[i])
00689                 pfree(DatumGetPointer(old_value));
00690             toast_free[i] = true;
00691             need_change = true;
00692             need_free = true;
00693         }
00694     }
00695 
00696     /*
00697      * Second we look for attributes of attstorage 'x' or 'e' that are still
00698      * inline.  But skip this if there's no toast table to push them to.
00699      */
00700     while (heap_compute_data_size(tupleDesc,
00701                                   toast_values, toast_isnull) > maxDataLen &&
00702            rel->rd_rel->reltoastrelid != InvalidOid)
00703     {
00704         int         biggest_attno = -1;
00705         int32       biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
00706         Datum       old_value;
00707 
00708         /*------
00709          * Search for the biggest yet inlined attribute with
00710          * attstorage equals 'x' or 'e'
00711          *------
00712          */
00713         for (i = 0; i < numAttrs; i++)
00714         {
00715             if (toast_action[i] == 'p')
00716                 continue;
00717             if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
00718                 continue;       /* can't happen, toast_action would be 'p' */
00719             if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
00720                 continue;
00721             if (toast_sizes[i] > biggest_size)
00722             {
00723                 biggest_attno = i;
00724                 biggest_size = toast_sizes[i];
00725             }
00726         }
00727 
00728         if (biggest_attno < 0)
00729             break;
00730 
00731         /*
00732          * Store this external
00733          */
00734         i = biggest_attno;
00735         old_value = toast_values[i];
00736         toast_action[i] = 'p';
00737         toast_values[i] = toast_save_datum(rel, toast_values[i],
00738                                            toast_oldexternal[i], options);
00739         if (toast_free[i])
00740             pfree(DatumGetPointer(old_value));
00741         toast_free[i] = true;
00742 
00743         need_change = true;
00744         need_free = true;
00745     }
00746 
00747     /*
00748      * Round 3 - this time we take attributes with storage 'm' into
00749      * compression
00750      */
00751     while (heap_compute_data_size(tupleDesc,
00752                                   toast_values, toast_isnull) > maxDataLen)
00753     {
00754         int         biggest_attno = -1;
00755         int32       biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
00756         Datum       old_value;
00757         Datum       new_value;
00758 
00759         /*
00760          * Search for the biggest yet uncompressed internal attribute
00761          */
00762         for (i = 0; i < numAttrs; i++)
00763         {
00764             if (toast_action[i] != ' ')
00765                 continue;
00766             if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
00767                 continue;       /* can't happen, toast_action would be 'p' */
00768             if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
00769                 continue;
00770             if (att[i]->attstorage != 'm')
00771                 continue;
00772             if (toast_sizes[i] > biggest_size)
00773             {
00774                 biggest_attno = i;
00775                 biggest_size = toast_sizes[i];
00776             }
00777         }
00778 
00779         if (biggest_attno < 0)
00780             break;
00781 
00782         /*
00783          * Attempt to compress it inline
00784          */
00785         i = biggest_attno;
00786         old_value = toast_values[i];
00787         new_value = toast_compress_datum(old_value);
00788 
00789         if (DatumGetPointer(new_value) != NULL)
00790         {
00791             /* successful compression */
00792             if (toast_free[i])
00793                 pfree(DatumGetPointer(old_value));
00794             toast_values[i] = new_value;
00795             toast_free[i] = true;
00796             toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
00797             need_change = true;
00798             need_free = true;
00799         }
00800         else
00801         {
00802             /* incompressible, ignore on subsequent compression passes */
00803             toast_action[i] = 'x';
00804         }
00805     }
00806 
00807     /*
00808      * Finally we store attributes of type 'm' externally.  At this point we
00809      * increase the target tuple size, so that 'm' attributes aren't stored
00810      * externally unless really necessary.
00811      */
00812     maxDataLen = TOAST_TUPLE_TARGET_MAIN - hoff;
00813 
00814     while (heap_compute_data_size(tupleDesc,
00815                                   toast_values, toast_isnull) > maxDataLen &&
00816            rel->rd_rel->reltoastrelid != InvalidOid)
00817     {
00818         int         biggest_attno = -1;
00819         int32       biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
00820         Datum       old_value;
00821 
00822         /*--------
00823          * Search for the biggest yet inlined attribute with
00824          * attstorage = 'm'
00825          *--------
00826          */
00827         for (i = 0; i < numAttrs; i++)
00828         {
00829             if (toast_action[i] == 'p')
00830                 continue;
00831             if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
00832                 continue;       /* can't happen, toast_action would be 'p' */
00833             if (att[i]->attstorage != 'm')
00834                 continue;
00835             if (toast_sizes[i] > biggest_size)
00836             {
00837                 biggest_attno = i;
00838                 biggest_size = toast_sizes[i];
00839             }
00840         }
00841 
00842         if (biggest_attno < 0)
00843             break;
00844 
00845         /*
00846          * Store this external
00847          */
00848         i = biggest_attno;
00849         old_value = toast_values[i];
00850         toast_action[i] = 'p';
00851         toast_values[i] = toast_save_datum(rel, toast_values[i],
00852                                            toast_oldexternal[i], options);
00853         if (toast_free[i])
00854             pfree(DatumGetPointer(old_value));
00855         toast_free[i] = true;
00856 
00857         need_change = true;
00858         need_free = true;
00859     }
00860 
00861     /*
00862      * In the case we toasted any values, we need to build a new heap tuple
00863      * with the changed values.
00864      */
00865     if (need_change)
00866     {
00867         HeapTupleHeader olddata = newtup->t_data;
00868         HeapTupleHeader new_data;
00869         int32       new_header_len;
00870         int32       new_data_len;
00871         int32       new_tuple_len;
00872 
00873         /*
00874          * Calculate the new size of the tuple.
00875          *
00876          * Note: we used to assume here that the old tuple's t_hoff must equal
00877          * the new_header_len value, but that was incorrect.  The old tuple
00878          * might have a smaller-than-current natts, if there's been an ALTER
00879          * TABLE ADD COLUMN since it was stored; and that would lead to a
00880          * different conclusion about the size of the null bitmap, or even
00881          * whether there needs to be one at all.
00882          */
00883         new_header_len = offsetof(HeapTupleHeaderData, t_bits);
00884         if (has_nulls)
00885             new_header_len += BITMAPLEN(numAttrs);
00886         if (olddata->t_infomask & HEAP_HASOID)
00887             new_header_len += sizeof(Oid);
00888         new_header_len = MAXALIGN(new_header_len);
00889         new_data_len = heap_compute_data_size(tupleDesc,
00890                                               toast_values, toast_isnull);
00891         new_tuple_len = new_header_len + new_data_len;
00892 
00893         /*
00894          * Allocate and zero the space needed, and fill HeapTupleData fields.
00895          */
00896         result_tuple = (HeapTuple) palloc0(HEAPTUPLESIZE + new_tuple_len);
00897         result_tuple->t_len = new_tuple_len;
00898         result_tuple->t_self = newtup->t_self;
00899         result_tuple->t_tableOid = newtup->t_tableOid;
00900         new_data = (HeapTupleHeader) ((char *) result_tuple + HEAPTUPLESIZE);
00901         result_tuple->t_data = new_data;
00902 
00903         /*
00904          * Copy the existing tuple header, but adjust natts and t_hoff.
00905          */
00906         memcpy(new_data, olddata, offsetof(HeapTupleHeaderData, t_bits));
00907         HeapTupleHeaderSetNatts(new_data, numAttrs);
00908         new_data->t_hoff = new_header_len;
00909         if (olddata->t_infomask & HEAP_HASOID)
00910             HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(olddata));
00911 
00912         /* Copy over the data, and fill the null bitmap if needed */
00913         heap_fill_tuple(tupleDesc,
00914                         toast_values,
00915                         toast_isnull,
00916                         (char *) new_data + new_header_len,
00917                         new_data_len,
00918                         &(new_data->t_infomask),
00919                         has_nulls ? new_data->t_bits : NULL);
00920     }
00921     else
00922         result_tuple = newtup;
00923 
00924     /*
00925      * Free allocated temp values
00926      */
00927     if (need_free)
00928         for (i = 0; i < numAttrs; i++)
00929             if (toast_free[i])
00930                 pfree(DatumGetPointer(toast_values[i]));
00931 
00932     /*
00933      * Delete external values from the old tuple
00934      */
00935     if (need_delold)
00936         for (i = 0; i < numAttrs; i++)
00937             if (toast_delold[i])
00938                 toast_delete_datum(rel, toast_oldvalues[i]);
00939 
00940     return result_tuple;
00941 }
00942 
00943 
00944 /* ----------
00945  * toast_flatten_tuple -
00946  *
00947  *  "Flatten" a tuple to contain no out-of-line toasted fields.
00948  *  (This does not eliminate compressed or short-header datums.)
00949  * ----------
00950  */
00951 HeapTuple
00952 toast_flatten_tuple(HeapTuple tup, TupleDesc tupleDesc)
00953 {
00954     HeapTuple   new_tuple;
00955     Form_pg_attribute *att = tupleDesc->attrs;
00956     int         numAttrs = tupleDesc->natts;
00957     int         i;
00958     Datum       toast_values[MaxTupleAttributeNumber];
00959     bool        toast_isnull[MaxTupleAttributeNumber];
00960     bool        toast_free[MaxTupleAttributeNumber];
00961 
00962     /*
00963      * Break down the tuple into fields.
00964      */
00965     Assert(numAttrs <= MaxTupleAttributeNumber);
00966     heap_deform_tuple(tup, tupleDesc, toast_values, toast_isnull);
00967 
00968     memset(toast_free, 0, numAttrs * sizeof(bool));
00969 
00970     for (i = 0; i < numAttrs; i++)
00971     {
00972         /*
00973          * Look at non-null varlena attributes
00974          */
00975         if (!toast_isnull[i] && att[i]->attlen == -1)
00976         {
00977             struct varlena *new_value;
00978 
00979             new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
00980             if (VARATT_IS_EXTERNAL(new_value))
00981             {
00982                 new_value = toast_fetch_datum(new_value);
00983                 toast_values[i] = PointerGetDatum(new_value);
00984                 toast_free[i] = true;
00985             }
00986         }
00987     }
00988 
00989     /*
00990      * Form the reconfigured tuple.
00991      */
00992     new_tuple = heap_form_tuple(tupleDesc, toast_values, toast_isnull);
00993 
00994     /*
00995      * Be sure to copy the tuple's OID and identity fields.  We also make a
00996      * point of copying visibility info, just in case anybody looks at those
00997      * fields in a syscache entry.
00998      */
00999     if (tupleDesc->tdhasoid)
01000         HeapTupleSetOid(new_tuple, HeapTupleGetOid(tup));
01001 
01002     new_tuple->t_self = tup->t_self;
01003     new_tuple->t_tableOid = tup->t_tableOid;
01004 
01005     new_tuple->t_data->t_choice = tup->t_data->t_choice;
01006     new_tuple->t_data->t_ctid = tup->t_data->t_ctid;
01007     new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK;
01008     new_tuple->t_data->t_infomask |=
01009         tup->t_data->t_infomask & HEAP_XACT_MASK;
01010     new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK;
01011     new_tuple->t_data->t_infomask2 |=
01012         tup->t_data->t_infomask2 & HEAP2_XACT_MASK;
01013 
01014     /*
01015      * Free allocated temp values
01016      */
01017     for (i = 0; i < numAttrs; i++)
01018         if (toast_free[i])
01019             pfree(DatumGetPointer(toast_values[i]));
01020 
01021     return new_tuple;
01022 }
01023 
01024 
01025 /* ----------
01026  * toast_flatten_tuple_attribute -
01027  *
01028  *  If a Datum is of composite type, "flatten" it to contain no toasted fields.
01029  *  This must be invoked on any potentially-composite field that is to be
01030  *  inserted into a tuple.  Doing this preserves the invariant that toasting
01031  *  goes only one level deep in a tuple.
01032  *
01033  *  Note that flattening does not mean expansion of short-header varlenas,
01034  *  so in one sense toasting is allowed within composite datums.
01035  * ----------
01036  */
01037 Datum
01038 toast_flatten_tuple_attribute(Datum value,
01039                               Oid typeId, int32 typeMod)
01040 {
01041     TupleDesc   tupleDesc;
01042     HeapTupleHeader olddata;
01043     HeapTupleHeader new_data;
01044     int32       new_header_len;
01045     int32       new_data_len;
01046     int32       new_tuple_len;
01047     HeapTupleData tmptup;
01048     Form_pg_attribute *att;
01049     int         numAttrs;
01050     int         i;
01051     bool        need_change = false;
01052     bool        has_nulls = false;
01053     Datum       toast_values[MaxTupleAttributeNumber];
01054     bool        toast_isnull[MaxTupleAttributeNumber];
01055     bool        toast_free[MaxTupleAttributeNumber];
01056 
01057     /*
01058      * See if it's a composite type, and get the tupdesc if so.
01059      */
01060     tupleDesc = lookup_rowtype_tupdesc_noerror(typeId, typeMod, true);
01061     if (tupleDesc == NULL)
01062         return value;           /* not a composite type */
01063 
01064     att = tupleDesc->attrs;
01065     numAttrs = tupleDesc->natts;
01066 
01067     /*
01068      * Break down the tuple into fields.
01069      */
01070     olddata = DatumGetHeapTupleHeader(value);
01071     Assert(typeId == HeapTupleHeaderGetTypeId(olddata));
01072     Assert(typeMod == HeapTupleHeaderGetTypMod(olddata));
01073     /* Build a temporary HeapTuple control structure */
01074     tmptup.t_len = HeapTupleHeaderGetDatumLength(olddata);
01075     ItemPointerSetInvalid(&(tmptup.t_self));
01076     tmptup.t_tableOid = InvalidOid;
01077     tmptup.t_data = olddata;
01078 
01079     Assert(numAttrs <= MaxTupleAttributeNumber);
01080     heap_deform_tuple(&tmptup, tupleDesc, toast_values, toast_isnull);
01081 
01082     memset(toast_free, 0, numAttrs * sizeof(bool));
01083 
01084     for (i = 0; i < numAttrs; i++)
01085     {
01086         /*
01087          * Look at non-null varlena attributes
01088          */
01089         if (toast_isnull[i])
01090             has_nulls = true;
01091         else if (att[i]->attlen == -1)
01092         {
01093             struct varlena *new_value;
01094 
01095             new_value = (struct varlena *) DatumGetPointer(toast_values[i]);
01096             if (VARATT_IS_EXTERNAL(new_value) ||
01097                 VARATT_IS_COMPRESSED(new_value))
01098             {
01099                 new_value = heap_tuple_untoast_attr(new_value);
01100                 toast_values[i] = PointerGetDatum(new_value);
01101                 toast_free[i] = true;
01102                 need_change = true;
01103             }
01104         }
01105     }
01106 
01107     /*
01108      * If nothing to untoast, just return the original tuple.
01109      */
01110     if (!need_change)
01111     {
01112         ReleaseTupleDesc(tupleDesc);
01113         return value;
01114     }
01115 
01116     /*
01117      * Calculate the new size of the tuple.
01118      *
01119      * This should match the reconstruction code in toast_insert_or_update.
01120      */
01121     new_header_len = offsetof(HeapTupleHeaderData, t_bits);
01122     if (has_nulls)
01123         new_header_len += BITMAPLEN(numAttrs);
01124     if (olddata->t_infomask & HEAP_HASOID)
01125         new_header_len += sizeof(Oid);
01126     new_header_len = MAXALIGN(new_header_len);
01127     new_data_len = heap_compute_data_size(tupleDesc,
01128                                           toast_values, toast_isnull);
01129     new_tuple_len = new_header_len + new_data_len;
01130 
01131     new_data = (HeapTupleHeader) palloc0(new_tuple_len);
01132 
01133     /*
01134      * Copy the existing tuple header, but adjust natts and t_hoff.
01135      */
01136     memcpy(new_data, olddata, offsetof(HeapTupleHeaderData, t_bits));
01137     HeapTupleHeaderSetNatts(new_data, numAttrs);
01138     new_data->t_hoff = new_header_len;
01139     if (olddata->t_infomask & HEAP_HASOID)
01140         HeapTupleHeaderSetOid(new_data, HeapTupleHeaderGetOid(olddata));
01141 
01142     /* Reset the datum length field, too */
01143     HeapTupleHeaderSetDatumLength(new_data, new_tuple_len);
01144 
01145     /* Copy over the data, and fill the null bitmap if needed */
01146     heap_fill_tuple(tupleDesc,
01147                     toast_values,
01148                     toast_isnull,
01149                     (char *) new_data + new_header_len,
01150                     new_data_len,
01151                     &(new_data->t_infomask),
01152                     has_nulls ? new_data->t_bits : NULL);
01153 
01154     /*
01155      * Free allocated temp values
01156      */
01157     for (i = 0; i < numAttrs; i++)
01158         if (toast_free[i])
01159             pfree(DatumGetPointer(toast_values[i]));
01160     ReleaseTupleDesc(tupleDesc);
01161 
01162     return PointerGetDatum(new_data);
01163 }
01164 
01165 
01166 /* ----------
01167  * toast_compress_datum -
01168  *
01169  *  Create a compressed version of a varlena datum
01170  *
01171  *  If we fail (ie, compressed result is actually bigger than original)
01172  *  then return NULL.  We must not use compressed data if it'd expand
01173  *  the tuple!
01174  *
01175  *  We use VAR{SIZE,DATA}_ANY so we can handle short varlenas here without
01176  *  copying them.  But we can't handle external or compressed datums.
01177  * ----------
01178  */
01179 Datum
01180 toast_compress_datum(Datum value)
01181 {
01182     struct varlena *tmp;
01183     int32       valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
01184 
01185     Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
01186     Assert(!VARATT_IS_COMPRESSED(DatumGetPointer(value)));
01187 
01188     /*
01189      * No point in wasting a palloc cycle if value size is out of the allowed
01190      * range for compression
01191      */
01192     if (valsize < PGLZ_strategy_default->min_input_size ||
01193         valsize > PGLZ_strategy_default->max_input_size)
01194         return PointerGetDatum(NULL);
01195 
01196     tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize));
01197 
01198     /*
01199      * We recheck the actual size even if pglz_compress() reports success,
01200      * because it might be satisfied with having saved as little as one byte
01201      * in the compressed data --- which could turn into a net loss once you
01202      * consider header and alignment padding.  Worst case, the compressed
01203      * format might require three padding bytes (plus header, which is
01204      * included in VARSIZE(tmp)), whereas the uncompressed format would take
01205      * only one header byte and no padding if the value is short enough.  So
01206      * we insist on a savings of more than 2 bytes to ensure we have a gain.
01207      */
01208     if (pglz_compress(VARDATA_ANY(DatumGetPointer(value)), valsize,
01209                       (PGLZ_Header *) tmp, PGLZ_strategy_default) &&
01210         VARSIZE(tmp) < valsize - 2)
01211     {
01212         /* successful compression */
01213         return PointerGetDatum(tmp);
01214     }
01215     else
01216     {
01217         /* incompressible data */
01218         pfree(tmp);
01219         return PointerGetDatum(NULL);
01220     }
01221 }
01222 
01223 
01224 /* ----------
01225  * toast_save_datum -
01226  *
01227  *  Save one single datum into the secondary relation and return
01228  *  a Datum reference for it.
01229  *
01230  * rel: the main relation we're working with (not the toast rel!)
01231  * value: datum to be pushed to toast storage
01232  * oldexternal: if not NULL, toast pointer previously representing the datum
01233  * options: options to be passed to heap_insert() for toast rows
01234  * ----------
01235  */
01236 static Datum
01237 toast_save_datum(Relation rel, Datum value,
01238                  struct varlena * oldexternal, int options)
01239 {
01240     Relation    toastrel;
01241     Relation    toastidx;
01242     HeapTuple   toasttup;
01243     TupleDesc   toasttupDesc;
01244     Datum       t_values[3];
01245     bool        t_isnull[3];
01246     CommandId   mycid = GetCurrentCommandId(true);
01247     struct varlena *result;
01248     struct varatt_external toast_pointer;
01249     struct
01250     {
01251         struct varlena hdr;
01252         char        data[TOAST_MAX_CHUNK_SIZE]; /* make struct big enough */
01253         int32       align_it;   /* ensure struct is aligned well enough */
01254     }           chunk_data;
01255     int32       chunk_size;
01256     int32       chunk_seq = 0;
01257     char       *data_p;
01258     int32       data_todo;
01259     Pointer     dval = DatumGetPointer(value);
01260 
01261     /*
01262      * Open the toast relation and its index.  We can use the index to check
01263      * uniqueness of the OID we assign to the toasted item, even though it has
01264      * additional columns besides OID.
01265      */
01266     toastrel = heap_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
01267     toasttupDesc = toastrel->rd_att;
01268     toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
01269 
01270     /*
01271      * Get the data pointer and length, and compute va_rawsize and va_extsize.
01272      *
01273      * va_rawsize is the size of the equivalent fully uncompressed datum, so
01274      * we have to adjust for short headers.
01275      *
01276      * va_extsize is the actual size of the data payload in the toast records.
01277      */
01278     if (VARATT_IS_SHORT(dval))
01279     {
01280         data_p = VARDATA_SHORT(dval);
01281         data_todo = VARSIZE_SHORT(dval) - VARHDRSZ_SHORT;
01282         toast_pointer.va_rawsize = data_todo + VARHDRSZ;        /* as if not short */
01283         toast_pointer.va_extsize = data_todo;
01284     }
01285     else if (VARATT_IS_COMPRESSED(dval))
01286     {
01287         data_p = VARDATA(dval);
01288         data_todo = VARSIZE(dval) - VARHDRSZ;
01289         /* rawsize in a compressed datum is just the size of the payload */
01290         toast_pointer.va_rawsize = VARRAWSIZE_4B_C(dval) + VARHDRSZ;
01291         toast_pointer.va_extsize = data_todo;
01292         /* Assert that the numbers look like it's compressed */
01293         Assert(VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
01294     }
01295     else
01296     {
01297         data_p = VARDATA(dval);
01298         data_todo = VARSIZE(dval) - VARHDRSZ;
01299         toast_pointer.va_rawsize = VARSIZE(dval);
01300         toast_pointer.va_extsize = data_todo;
01301     }
01302 
01303     /*
01304      * Insert the correct table OID into the result TOAST pointer.
01305      *
01306      * Normally this is the actual OID of the target toast table, but during
01307      * table-rewriting operations such as CLUSTER, we have to insert the OID
01308      * of the table's real permanent toast table instead.  rd_toastoid is set
01309      * if we have to substitute such an OID.
01310      */
01311     if (OidIsValid(rel->rd_toastoid))
01312         toast_pointer.va_toastrelid = rel->rd_toastoid;
01313     else
01314         toast_pointer.va_toastrelid = RelationGetRelid(toastrel);
01315 
01316     /*
01317      * Choose an OID to use as the value ID for this toast value.
01318      *
01319      * Normally we just choose an unused OID within the toast table.  But
01320      * during table-rewriting operations where we are preserving an existing
01321      * toast table OID, we want to preserve toast value OIDs too.  So, if
01322      * rd_toastoid is set and we had a prior external value from that same
01323      * toast table, re-use its value ID.  If we didn't have a prior external
01324      * value (which is a corner case, but possible if the table's attstorage
01325      * options have been changed), we have to pick a value ID that doesn't
01326      * conflict with either new or existing toast value OIDs.
01327      */
01328     if (!OidIsValid(rel->rd_toastoid))
01329     {
01330         /* normal case: just choose an unused OID */
01331         toast_pointer.va_valueid =
01332             GetNewOidWithIndex(toastrel,
01333                                RelationGetRelid(toastidx),
01334                                (AttrNumber) 1);
01335     }
01336     else
01337     {
01338         /* rewrite case: check to see if value was in old toast table */
01339         toast_pointer.va_valueid = InvalidOid;
01340         if (oldexternal != NULL)
01341         {
01342             struct varatt_external old_toast_pointer;
01343 
01344             Assert(VARATT_IS_EXTERNAL(oldexternal));
01345             /* Must copy to access aligned fields */
01346             VARATT_EXTERNAL_GET_POINTER(old_toast_pointer, oldexternal);
01347             if (old_toast_pointer.va_toastrelid == rel->rd_toastoid)
01348             {
01349                 /* This value came from the old toast table; reuse its OID */
01350                 toast_pointer.va_valueid = old_toast_pointer.va_valueid;
01351 
01352                 /*
01353                  * There is a corner case here: the table rewrite might have
01354                  * to copy both live and recently-dead versions of a row, and
01355                  * those versions could easily reference the same toast value.
01356                  * When we copy the second or later version of such a row,
01357                  * reusing the OID will mean we select an OID that's already
01358                  * in the new toast table.  Check for that, and if so, just
01359                  * fall through without writing the data again.
01360                  *
01361                  * While annoying and ugly-looking, this is a good thing
01362                  * because it ensures that we wind up with only one copy of
01363                  * the toast value when there is only one copy in the old
01364                  * toast table.  Before we detected this case, we'd have made
01365                  * multiple copies, wasting space; and what's worse, the
01366                  * copies belonging to already-deleted heap tuples would not
01367                  * be reclaimed by VACUUM.
01368                  */
01369                 if (toastrel_valueid_exists(toastrel,
01370                                             toast_pointer.va_valueid))
01371                 {
01372                     /* Match, so short-circuit the data storage loop below */
01373                     data_todo = 0;
01374                 }
01375             }
01376         }
01377         if (toast_pointer.va_valueid == InvalidOid)
01378         {
01379             /*
01380              * new value; must choose an OID that doesn't conflict in either
01381              * old or new toast table
01382              */
01383             do
01384             {
01385                 toast_pointer.va_valueid =
01386                     GetNewOidWithIndex(toastrel,
01387                                        RelationGetRelid(toastidx),
01388                                        (AttrNumber) 1);
01389             } while (toastid_valueid_exists(rel->rd_toastoid,
01390                                             toast_pointer.va_valueid));
01391         }
01392     }
01393 
01394     /*
01395      * Initialize constant parts of the tuple data
01396      */
01397     t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
01398     t_values[2] = PointerGetDatum(&chunk_data);
01399     t_isnull[0] = false;
01400     t_isnull[1] = false;
01401     t_isnull[2] = false;
01402 
01403     /*
01404      * Split up the item into chunks
01405      */
01406     while (data_todo > 0)
01407     {
01408         /*
01409          * Calculate the size of this chunk
01410          */
01411         chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
01412 
01413         /*
01414          * Build a tuple and store it
01415          */
01416         t_values[1] = Int32GetDatum(chunk_seq++);
01417         SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
01418         memcpy(VARDATA(&chunk_data), data_p, chunk_size);
01419         toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
01420 
01421         heap_insert(toastrel, toasttup, mycid, options, NULL);
01422 
01423         /*
01424          * Create the index entry.  We cheat a little here by not using
01425          * FormIndexDatum: this relies on the knowledge that the index columns
01426          * are the same as the initial columns of the table.
01427          *
01428          * Note also that there had better not be any user-created index on
01429          * the TOAST table, since we don't bother to update anything else.
01430          */
01431         index_insert(toastidx, t_values, t_isnull,
01432                      &(toasttup->t_self),
01433                      toastrel,
01434                      toastidx->rd_index->indisunique ?
01435                      UNIQUE_CHECK_YES : UNIQUE_CHECK_NO);
01436 
01437         /*
01438          * Free memory
01439          */
01440         heap_freetuple(toasttup);
01441 
01442         /*
01443          * Move on to next chunk
01444          */
01445         data_todo -= chunk_size;
01446         data_p += chunk_size;
01447     }
01448 
01449     /*
01450      * Done - close toast relation
01451      */
01452     index_close(toastidx, RowExclusiveLock);
01453     heap_close(toastrel, RowExclusiveLock);
01454 
01455     /*
01456      * Create the TOAST pointer value that we'll return
01457      */
01458     result = (struct varlena *) palloc(TOAST_POINTER_SIZE);
01459     SET_VARSIZE_EXTERNAL(result, TOAST_POINTER_SIZE);
01460     memcpy(VARDATA_EXTERNAL(result), &toast_pointer, sizeof(toast_pointer));
01461 
01462     return PointerGetDatum(result);
01463 }
01464 
01465 
01466 /* ----------
01467  * toast_delete_datum -
01468  *
01469  *  Delete a single external stored value.
01470  * ----------
01471  */
01472 static void
01473 toast_delete_datum(Relation rel, Datum value)
01474 {
01475     struct varlena *attr = (struct varlena *) DatumGetPointer(value);
01476     struct varatt_external toast_pointer;
01477     Relation    toastrel;
01478     Relation    toastidx;
01479     ScanKeyData toastkey;
01480     SysScanDesc toastscan;
01481     HeapTuple   toasttup;
01482 
01483     if (!VARATT_IS_EXTERNAL(attr))
01484         return;
01485 
01486     /* Must copy to access aligned fields */
01487     VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
01488 
01489     /*
01490      * Open the toast relation and its index
01491      */
01492     toastrel = heap_open(toast_pointer.va_toastrelid, RowExclusiveLock);
01493     toastidx = index_open(toastrel->rd_rel->reltoastidxid, RowExclusiveLock);
01494 
01495     /*
01496      * Setup a scan key to find chunks with matching va_valueid
01497      */
01498     ScanKeyInit(&toastkey,
01499                 (AttrNumber) 1,
01500                 BTEqualStrategyNumber, F_OIDEQ,
01501                 ObjectIdGetDatum(toast_pointer.va_valueid));
01502 
01503     /*
01504      * Find all the chunks.  (We don't actually care whether we see them in
01505      * sequence or not, but since we've already locked the index we might as
01506      * well use systable_beginscan_ordered.)
01507      */
01508     toastscan = systable_beginscan_ordered(toastrel, toastidx,
01509                                            SnapshotToast, 1, &toastkey);
01510     while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
01511     {
01512         /*
01513          * Have a chunk, delete it
01514          */
01515         simple_heap_delete(toastrel, &toasttup->t_self);
01516     }
01517 
01518     /*
01519      * End scan and close relations
01520      */
01521     systable_endscan_ordered(toastscan);
01522     index_close(toastidx, RowExclusiveLock);
01523     heap_close(toastrel, RowExclusiveLock);
01524 }
01525 
01526 
01527 /* ----------
01528  * toastrel_valueid_exists -
01529  *
01530  *  Test whether a toast value with the given ID exists in the toast relation
01531  * ----------
01532  */
01533 static bool
01534 toastrel_valueid_exists(Relation toastrel, Oid valueid)
01535 {
01536     bool        result = false;
01537     ScanKeyData toastkey;
01538     SysScanDesc toastscan;
01539 
01540     /*
01541      * Setup a scan key to find chunks with matching va_valueid
01542      */
01543     ScanKeyInit(&toastkey,
01544                 (AttrNumber) 1,
01545                 BTEqualStrategyNumber, F_OIDEQ,
01546                 ObjectIdGetDatum(valueid));
01547 
01548     /*
01549      * Is there any such chunk?
01550      */
01551     toastscan = systable_beginscan(toastrel, toastrel->rd_rel->reltoastidxid,
01552                                    true, SnapshotToast, 1, &toastkey);
01553 
01554     if (systable_getnext(toastscan) != NULL)
01555         result = true;
01556 
01557     systable_endscan(toastscan);
01558 
01559     return result;
01560 }
01561 
01562 /* ----------
01563  * toastid_valueid_exists -
01564  *
01565  *  As above, but work from toast rel's OID not an open relation
01566  * ----------
01567  */
01568 static bool
01569 toastid_valueid_exists(Oid toastrelid, Oid valueid)
01570 {
01571     bool        result;
01572     Relation    toastrel;
01573 
01574     toastrel = heap_open(toastrelid, AccessShareLock);
01575 
01576     result = toastrel_valueid_exists(toastrel, valueid);
01577 
01578     heap_close(toastrel, AccessShareLock);
01579 
01580     return result;
01581 }
01582 
01583 
01584 /* ----------
01585  * toast_fetch_datum -
01586  *
01587  *  Reconstruct an in memory Datum from the chunks saved
01588  *  in the toast relation
01589  * ----------
01590  */
01591 static struct varlena *
01592 toast_fetch_datum(struct varlena * attr)
01593 {
01594     Relation    toastrel;
01595     Relation    toastidx;
01596     ScanKeyData toastkey;
01597     SysScanDesc toastscan;
01598     HeapTuple   ttup;
01599     TupleDesc   toasttupDesc;
01600     struct varlena *result;
01601     struct varatt_external toast_pointer;
01602     int32       ressize;
01603     int32       residx,
01604                 nextidx;
01605     int32       numchunks;
01606     Pointer     chunk;
01607     bool        isnull;
01608     char       *chunkdata;
01609     int32       chunksize;
01610 
01611     /* Must copy to access aligned fields */
01612     VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
01613 
01614     ressize = toast_pointer.va_extsize;
01615     numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
01616 
01617     result = (struct varlena *) palloc(ressize + VARHDRSZ);
01618 
01619     if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
01620         SET_VARSIZE_COMPRESSED(result, ressize + VARHDRSZ);
01621     else
01622         SET_VARSIZE(result, ressize + VARHDRSZ);
01623 
01624     /*
01625      * Open the toast relation and its index
01626      */
01627     toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
01628     toasttupDesc = toastrel->rd_att;
01629     toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
01630 
01631     /*
01632      * Setup a scan key to fetch from the index by va_valueid
01633      */
01634     ScanKeyInit(&toastkey,
01635                 (AttrNumber) 1,
01636                 BTEqualStrategyNumber, F_OIDEQ,
01637                 ObjectIdGetDatum(toast_pointer.va_valueid));
01638 
01639     /*
01640      * Read the chunks by index
01641      *
01642      * Note that because the index is actually on (valueid, chunkidx) we will
01643      * see the chunks in chunkidx order, even though we didn't explicitly ask
01644      * for it.
01645      */
01646     nextidx = 0;
01647 
01648     toastscan = systable_beginscan_ordered(toastrel, toastidx,
01649                                            SnapshotToast, 1, &toastkey);
01650     while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
01651     {
01652         /*
01653          * Have a chunk, extract the sequence number and the data
01654          */
01655         residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
01656         Assert(!isnull);
01657         chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
01658         Assert(!isnull);
01659         if (!VARATT_IS_EXTENDED(chunk))
01660         {
01661             chunksize = VARSIZE(chunk) - VARHDRSZ;
01662             chunkdata = VARDATA(chunk);
01663         }
01664         else if (VARATT_IS_SHORT(chunk))
01665         {
01666             /* could happen due to heap_form_tuple doing its thing */
01667             chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
01668             chunkdata = VARDATA_SHORT(chunk);
01669         }
01670         else
01671         {
01672             /* should never happen */
01673             elog(ERROR, "found toasted toast chunk for toast value %u in %s",
01674                  toast_pointer.va_valueid,
01675                  RelationGetRelationName(toastrel));
01676             chunksize = 0;      /* keep compiler quiet */
01677             chunkdata = NULL;
01678         }
01679 
01680         /*
01681          * Some checks on the data we've found
01682          */
01683         if (residx != nextidx)
01684             elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
01685                  residx, nextidx,
01686                  toast_pointer.va_valueid,
01687                  RelationGetRelationName(toastrel));
01688         if (residx < numchunks - 1)
01689         {
01690             if (chunksize != TOAST_MAX_CHUNK_SIZE)
01691                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s",
01692                      chunksize, (int) TOAST_MAX_CHUNK_SIZE,
01693                      residx, numchunks,
01694                      toast_pointer.va_valueid,
01695                      RelationGetRelationName(toastrel));
01696         }
01697         else if (residx == numchunks - 1)
01698         {
01699             if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
01700                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s",
01701                      chunksize,
01702                      (int) (ressize - residx * TOAST_MAX_CHUNK_SIZE),
01703                      residx,
01704                      toast_pointer.va_valueid,
01705                      RelationGetRelationName(toastrel));
01706         }
01707         else
01708             elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
01709                  residx,
01710                  0, numchunks - 1,
01711                  toast_pointer.va_valueid,
01712                  RelationGetRelationName(toastrel));
01713 
01714         /*
01715          * Copy the data into proper place in our result
01716          */
01717         memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
01718                chunkdata,
01719                chunksize);
01720 
01721         nextidx++;
01722     }
01723 
01724     /*
01725      * Final checks that we successfully fetched the datum
01726      */
01727     if (nextidx != numchunks)
01728         elog(ERROR, "missing chunk number %d for toast value %u in %s",
01729              nextidx,
01730              toast_pointer.va_valueid,
01731              RelationGetRelationName(toastrel));
01732 
01733     /*
01734      * End scan and close relations
01735      */
01736     systable_endscan_ordered(toastscan);
01737     index_close(toastidx, AccessShareLock);
01738     heap_close(toastrel, AccessShareLock);
01739 
01740     return result;
01741 }
01742 
01743 /* ----------
01744  * toast_fetch_datum_slice -
01745  *
01746  *  Reconstruct a segment of a Datum from the chunks saved
01747  *  in the toast relation
01748  * ----------
01749  */
01750 static struct varlena *
01751 toast_fetch_datum_slice(struct varlena * attr, int32 sliceoffset, int32 length)
01752 {
01753     Relation    toastrel;
01754     Relation    toastidx;
01755     ScanKeyData toastkey[3];
01756     int         nscankeys;
01757     SysScanDesc toastscan;
01758     HeapTuple   ttup;
01759     TupleDesc   toasttupDesc;
01760     struct varlena *result;
01761     struct varatt_external toast_pointer;
01762     int32       attrsize;
01763     int32       residx;
01764     int32       nextidx;
01765     int         numchunks;
01766     int         startchunk;
01767     int         endchunk;
01768     int32       startoffset;
01769     int32       endoffset;
01770     int         totalchunks;
01771     Pointer     chunk;
01772     bool        isnull;
01773     char       *chunkdata;
01774     int32       chunksize;
01775     int32       chcpystrt;
01776     int32       chcpyend;
01777 
01778     Assert(VARATT_IS_EXTERNAL(attr));
01779 
01780     /* Must copy to access aligned fields */
01781     VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
01782 
01783     /*
01784      * It's nonsense to fetch slices of a compressed datum -- this isn't lo_*
01785      * we can't return a compressed datum which is meaningful to toast later
01786      */
01787     Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer));
01788 
01789     attrsize = toast_pointer.va_extsize;
01790     totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
01791 
01792     if (sliceoffset >= attrsize)
01793     {
01794         sliceoffset = 0;
01795         length = 0;
01796     }
01797 
01798     if (((sliceoffset + length) > attrsize) || length < 0)
01799         length = attrsize - sliceoffset;
01800 
01801     result = (struct varlena *) palloc(length + VARHDRSZ);
01802 
01803     if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
01804         SET_VARSIZE_COMPRESSED(result, length + VARHDRSZ);
01805     else
01806         SET_VARSIZE(result, length + VARHDRSZ);
01807 
01808     if (length == 0)
01809         return result;          /* Can save a lot of work at this point! */
01810 
01811     startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
01812     endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
01813     numchunks = (endchunk - startchunk) + 1;
01814 
01815     startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
01816     endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
01817 
01818     /*
01819      * Open the toast relation and its index
01820      */
01821     toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock);
01822     toasttupDesc = toastrel->rd_att;
01823     toastidx = index_open(toastrel->rd_rel->reltoastidxid, AccessShareLock);
01824 
01825     /*
01826      * Setup a scan key to fetch from the index. This is either two keys or
01827      * three depending on the number of chunks.
01828      */
01829     ScanKeyInit(&toastkey[0],
01830                 (AttrNumber) 1,
01831                 BTEqualStrategyNumber, F_OIDEQ,
01832                 ObjectIdGetDatum(toast_pointer.va_valueid));
01833 
01834     /*
01835      * Use equality condition for one chunk, a range condition otherwise:
01836      */
01837     if (numchunks == 1)
01838     {
01839         ScanKeyInit(&toastkey[1],
01840                     (AttrNumber) 2,
01841                     BTEqualStrategyNumber, F_INT4EQ,
01842                     Int32GetDatum(startchunk));
01843         nscankeys = 2;
01844     }
01845     else
01846     {
01847         ScanKeyInit(&toastkey[1],
01848                     (AttrNumber) 2,
01849                     BTGreaterEqualStrategyNumber, F_INT4GE,
01850                     Int32GetDatum(startchunk));
01851         ScanKeyInit(&toastkey[2],
01852                     (AttrNumber) 2,
01853                     BTLessEqualStrategyNumber, F_INT4LE,
01854                     Int32GetDatum(endchunk));
01855         nscankeys = 3;
01856     }
01857 
01858     /*
01859      * Read the chunks by index
01860      *
01861      * The index is on (valueid, chunkidx) so they will come in order
01862      */
01863     nextidx = startchunk;
01864     toastscan = systable_beginscan_ordered(toastrel, toastidx,
01865                                          SnapshotToast, nscankeys, toastkey);
01866     while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
01867     {
01868         /*
01869          * Have a chunk, extract the sequence number and the data
01870          */
01871         residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
01872         Assert(!isnull);
01873         chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
01874         Assert(!isnull);
01875         if (!VARATT_IS_EXTENDED(chunk))
01876         {
01877             chunksize = VARSIZE(chunk) - VARHDRSZ;
01878             chunkdata = VARDATA(chunk);
01879         }
01880         else if (VARATT_IS_SHORT(chunk))
01881         {
01882             /* could happen due to heap_form_tuple doing its thing */
01883             chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
01884             chunkdata = VARDATA_SHORT(chunk);
01885         }
01886         else
01887         {
01888             /* should never happen */
01889             elog(ERROR, "found toasted toast chunk for toast value %u in %s",
01890                  toast_pointer.va_valueid,
01891                  RelationGetRelationName(toastrel));
01892             chunksize = 0;      /* keep compiler quiet */
01893             chunkdata = NULL;
01894         }
01895 
01896         /*
01897          * Some checks on the data we've found
01898          */
01899         if ((residx != nextidx) || (residx > endchunk) || (residx < startchunk))
01900             elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
01901                  residx, nextidx,
01902                  toast_pointer.va_valueid,
01903                  RelationGetRelationName(toastrel));
01904         if (residx < totalchunks - 1)
01905         {
01906             if (chunksize != TOAST_MAX_CHUNK_SIZE)
01907                 elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s when fetching slice",
01908                      chunksize, (int) TOAST_MAX_CHUNK_SIZE,
01909                      residx, totalchunks,
01910                      toast_pointer.va_valueid,
01911                      RelationGetRelationName(toastrel));
01912         }
01913         else if (residx == totalchunks - 1)
01914         {
01915             if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
01916                 elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s when fetching slice",
01917                      chunksize,
01918                      (int) (attrsize - residx * TOAST_MAX_CHUNK_SIZE),
01919                      residx,
01920                      toast_pointer.va_valueid,
01921                      RelationGetRelationName(toastrel));
01922         }
01923         else
01924             elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
01925                  residx,
01926                  0, totalchunks - 1,
01927                  toast_pointer.va_valueid,
01928                  RelationGetRelationName(toastrel));
01929 
01930         /*
01931          * Copy the data into proper place in our result
01932          */
01933         chcpystrt = 0;
01934         chcpyend = chunksize - 1;
01935         if (residx == startchunk)
01936             chcpystrt = startoffset;
01937         if (residx == endchunk)
01938             chcpyend = endoffset;
01939 
01940         memcpy(VARDATA(result) +
01941                (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
01942                chunkdata + chcpystrt,
01943                (chcpyend - chcpystrt) + 1);
01944 
01945         nextidx++;
01946     }
01947 
01948     /*
01949      * Final checks that we successfully fetched the datum
01950      */
01951     if (nextidx != (endchunk + 1))
01952         elog(ERROR, "missing chunk number %d for toast value %u in %s",
01953              nextidx,
01954              toast_pointer.va_valueid,
01955              RelationGetRelationName(toastrel));
01956 
01957     /*
01958      * End scan and close relations
01959      */
01960     systable_endscan_ordered(toastscan);
01961     index_close(toastidx, AccessShareLock);
01962     heap_close(toastrel, AccessShareLock);
01963 
01964     return result;
01965 }