Header And Logo

PostgreSQL
| The world's most advanced open source database.

rangetypes_gist.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * rangetypes_gist.c
00004  *    GiST support for range types.
00005  *
00006  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    src/backend/utils/adt/rangetypes_gist.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 #include "postgres.h"
00016 
00017 #include "access/gist.h"
00018 #include "access/skey.h"
00019 #include "utils/builtins.h"
00020 #include "utils/datum.h"
00021 #include "utils/rangetypes.h"
00022 
00023 
00024 /*
00025  * Range class properties used to segregate different classes of ranges in
00026  * GiST.  Each unique combination of properties is a class.  CLS_EMPTY cannot
00027  * be combined with anything else.
00028  */
00029 #define CLS_NORMAL          0   /* Ordinary finite range (no bits set) */
00030 #define CLS_LOWER_INF       1   /* Lower bound is infinity */
00031 #define CLS_UPPER_INF       2   /* Upper bound is infinity */
00032 #define CLS_CONTAIN_EMPTY   4   /* Contains underlying empty ranges */
00033 #define CLS_EMPTY           8   /* Special class for empty ranges */
00034 
00035 #define CLS_COUNT           9   /* # of classes; includes all combinations of
00036                                  * properties. CLS_EMPTY doesn't combine with
00037                                  * anything else, so it's only 2^3 + 1. */
00038 
00039 /*
00040  * Minimum accepted ratio of split for items of the same class.  If the items
00041  * are of different classes, we will separate along those lines regardless of
00042  * the ratio.
00043  */
00044 #define LIMIT_RATIO  0.3
00045 
00046 /* Constants for fixed penalty values */
00047 #define INFINITE_BOUND_PENALTY  2.0
00048 #define CONTAIN_EMPTY_PENALTY  1.0
00049 #define DEFAULT_SUBTYPE_DIFF_PENALTY  1.0
00050 
00051 /*
00052  * Per-item data for range_gist_single_sorting_split.
00053  */
00054 typedef struct
00055 {
00056     int         index;
00057     RangeBound  bound;
00058 } SingleBoundSortItem;
00059 
00060 /* place on left or right side of split? */
00061 typedef enum
00062 {
00063     SPLIT_LEFT = 0,             /* makes initialization to SPLIT_LEFT easier */
00064     SPLIT_RIGHT
00065 } SplitLR;
00066 
00067 /*
00068  * Context for range_gist_consider_split.
00069  */
00070 typedef struct
00071 {
00072     TypeCacheEntry *typcache;   /* typcache for range type */
00073     bool        has_subtype_diff;       /* does it have subtype_diff? */
00074     int         entries_count;  /* total number of entries being split */
00075 
00076     /* Information about currently selected split follows */
00077 
00078     bool        first;          /* true if no split was selected yet */
00079 
00080     RangeBound *left_upper;     /* upper bound of left interval */
00081     RangeBound *right_lower;    /* lower bound of right interval */
00082 
00083     float4      ratio;          /* split ratio */
00084     float4      overlap;        /* overlap between left and right predicate */
00085     int         common_left;    /* # common entries destined for each side */
00086     int         common_right;
00087 } ConsiderSplitContext;
00088 
00089 /*
00090  * Bounds extracted from a non-empty range, for use in
00091  * range_gist_double_sorting_split.
00092  */
00093 typedef struct
00094 {
00095     RangeBound  lower;
00096     RangeBound  upper;
00097 } NonEmptyRange;
00098 
00099 /*
00100  * Represents information about an entry that can be placed in either group
00101  * without affecting overlap over selected axis ("common entry").
00102  */
00103 typedef struct
00104 {
00105     /* Index of entry in the initial array */
00106     int         index;
00107     /* Delta between closeness of range to each of the two groups */
00108     double      delta;
00109 } CommonEntry;
00110 
00111 /* Helper macros to place an entry in the left or right group during split */
00112 /* Note direct access to variables v, typcache, left_range, right_range */
00113 #define PLACE_LEFT(range, off)                  \
00114     do {                                        \
00115         if (v->spl_nleft > 0)                   \
00116             left_range = range_super_union(typcache, left_range, range); \
00117         else                                    \
00118             left_range = (range);               \
00119         v->spl_left[v->spl_nleft++] = (off);    \
00120     } while(0)
00121 
00122 #define PLACE_RIGHT(range, off)                 \
00123     do {                                        \
00124         if (v->spl_nright > 0)                  \
00125             right_range = range_super_union(typcache, right_range, range); \
00126         else                                    \
00127             right_range = (range);              \
00128         v->spl_right[v->spl_nright++] = (off);  \
00129     } while(0)
00130 
00131 /* Copy a RangeType datum (hardwires typbyval and typlen for ranges...) */
00132 #define rangeCopy(r) \
00133     ((RangeType *) DatumGetPointer(datumCopy(PointerGetDatum(r), \
00134                                              false, -1)))
00135 
00136 static RangeType *range_super_union(TypeCacheEntry *typcache, RangeType *r1,
00137                   RangeType *r2);
00138 static bool range_gist_consistent_int(TypeCacheEntry *typcache,
00139                           StrategyNumber strategy, RangeType *key,
00140                           Datum query);
00141 static bool range_gist_consistent_leaf(TypeCacheEntry *typcache,
00142                            StrategyNumber strategy, RangeType *key,
00143                            Datum query);
00144 static void range_gist_fallback_split(TypeCacheEntry *typcache,
00145                           GistEntryVector *entryvec,
00146                           GIST_SPLITVEC *v);
00147 static void range_gist_class_split(TypeCacheEntry *typcache,
00148                        GistEntryVector *entryvec,
00149                        GIST_SPLITVEC *v,
00150                        SplitLR *classes_groups);
00151 static void range_gist_single_sorting_split(TypeCacheEntry *typcache,
00152                                 GistEntryVector *entryvec,
00153                                 GIST_SPLITVEC *v,
00154                                 bool use_upper_bound);
00155 static void range_gist_double_sorting_split(TypeCacheEntry *typcache,
00156                                 GistEntryVector *entryvec,
00157                                 GIST_SPLITVEC *v);
00158 static void range_gist_consider_split(ConsiderSplitContext *context,
00159                           RangeBound *right_lower, int min_left_count,
00160                           RangeBound *left_upper, int max_left_count);
00161 static int  get_gist_range_class(RangeType *range);
00162 static int  single_bound_cmp(const void *a, const void *b, void *arg);
00163 static int  interval_cmp_lower(const void *a, const void *b, void *arg);
00164 static int  interval_cmp_upper(const void *a, const void *b, void *arg);
00165 static int  common_entry_cmp(const void *i1, const void *i2);
00166 static float8 call_subtype_diff(TypeCacheEntry *typcache,
00167                   Datum val1, Datum val2);
00168 
00169 
00170 /* GiST query consistency check */
00171 Datum
00172 range_gist_consistent(PG_FUNCTION_ARGS)
00173 {
00174     GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
00175     Datum       query = PG_GETARG_DATUM(1);
00176     StrategyNumber strategy = (StrategyNumber) PG_GETARG_UINT16(2);
00177 
00178     /* Oid subtype = PG_GETARG_OID(3); */
00179     bool       *recheck = (bool *) PG_GETARG_POINTER(4);
00180     RangeType  *key = DatumGetRangeType(entry->key);
00181     TypeCacheEntry *typcache;
00182 
00183     /* All operators served by this function are exact */
00184     *recheck = false;
00185 
00186     typcache = range_get_typcache(fcinfo, RangeTypeGetOid(key));
00187 
00188     if (GIST_LEAF(entry))
00189         PG_RETURN_BOOL(range_gist_consistent_leaf(typcache, strategy,
00190                                                   key, query));
00191     else
00192         PG_RETURN_BOOL(range_gist_consistent_int(typcache, strategy,
00193                                                  key, query));
00194 }
00195 
00196 /* form union range */
00197 Datum
00198 range_gist_union(PG_FUNCTION_ARGS)
00199 {
00200     GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
00201     GISTENTRY  *ent = entryvec->vector;
00202     RangeType  *result_range;
00203     TypeCacheEntry *typcache;
00204     int         i;
00205 
00206     result_range = DatumGetRangeType(ent[0].key);
00207 
00208     typcache = range_get_typcache(fcinfo, RangeTypeGetOid(result_range));
00209 
00210     for (i = 1; i < entryvec->n; i++)
00211     {
00212         result_range = range_super_union(typcache, result_range,
00213                                          DatumGetRangeType(ent[i].key));
00214     }
00215 
00216     PG_RETURN_RANGE(result_range);
00217 }
00218 
00219 /* compress, decompress are no-ops */
00220 Datum
00221 range_gist_compress(PG_FUNCTION_ARGS)
00222 {
00223     GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
00224 
00225     PG_RETURN_POINTER(entry);
00226 }
00227 
00228 Datum
00229 range_gist_decompress(PG_FUNCTION_ARGS)
00230 {
00231     GISTENTRY  *entry = (GISTENTRY *) PG_GETARG_POINTER(0);
00232 
00233     PG_RETURN_POINTER(entry);
00234 }
00235 
00236 /*
00237  * GiST page split penalty function.
00238  *
00239  * The penalty function has the following goals (in order from most to least
00240  * important):
00241  * - Keep normal ranges separate
00242  * - Avoid broadening the class of the original predicate
00243  * - Avoid broadening (as determined by subtype_diff) the original predicate
00244  * - Favor adding ranges to narrower original predicates
00245  */
00246 Datum
00247 range_gist_penalty(PG_FUNCTION_ARGS)
00248 {
00249     GISTENTRY  *origentry = (GISTENTRY *) PG_GETARG_POINTER(0);
00250     GISTENTRY  *newentry = (GISTENTRY *) PG_GETARG_POINTER(1);
00251     float      *penalty = (float *) PG_GETARG_POINTER(2);
00252     RangeType  *orig = DatumGetRangeType(origentry->key);
00253     RangeType  *new = DatumGetRangeType(newentry->key);
00254     TypeCacheEntry *typcache;
00255     bool        has_subtype_diff;
00256     RangeBound  orig_lower,
00257                 new_lower,
00258                 orig_upper,
00259                 new_upper;
00260     bool        orig_empty,
00261                 new_empty;
00262 
00263     if (RangeTypeGetOid(orig) != RangeTypeGetOid(new))
00264         elog(ERROR, "range types do not match");
00265 
00266     typcache = range_get_typcache(fcinfo, RangeTypeGetOid(orig));
00267 
00268     has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
00269 
00270     range_deserialize(typcache, orig, &orig_lower, &orig_upper, &orig_empty);
00271     range_deserialize(typcache, new, &new_lower, &new_upper, &new_empty);
00272 
00273     /*
00274      * Distinct branches for handling distinct classes of ranges.  Note that
00275      * penalty values only need to be commensurate within the same class of
00276      * new range.
00277      */
00278     if (new_empty)
00279     {
00280         /* Handle insertion of empty range */
00281         if (orig_empty)
00282         {
00283             /*
00284              * The best case is to insert it to empty original range.
00285              * Insertion here means no broadening of original range. Also
00286              * original range is the most narrow.
00287              */
00288             *penalty = 0.0;
00289         }
00290         else if (RangeIsOrContainsEmpty(orig))
00291         {
00292             /*
00293              * The second case is to insert empty range into range which
00294              * contains at least one underlying empty range.  There is still
00295              * no broadening of original range, but original range is not as
00296              * narrow as possible.
00297              */
00298             *penalty = CONTAIN_EMPTY_PENALTY;
00299         }
00300         else if (orig_lower.infinite && orig_upper.infinite)
00301         {
00302             /*
00303              * Original range requires broadening.  (-inf; +inf) is most far
00304              * from normal range in this case.
00305              */
00306             *penalty = 2 * CONTAIN_EMPTY_PENALTY;
00307         }
00308         else if (orig_lower.infinite || orig_upper.infinite)
00309         {
00310             /*
00311              * (-inf, x) or (x, +inf) original ranges are closer to normal
00312              * ranges, so it's worse to mix it with empty ranges.
00313              */
00314             *penalty = 3 * CONTAIN_EMPTY_PENALTY;
00315         }
00316         else
00317         {
00318             /*
00319              * The least preferred case is broadening of normal range.
00320              */
00321             *penalty = 4 * CONTAIN_EMPTY_PENALTY;
00322         }
00323     }
00324     else if (new_lower.infinite && new_upper.infinite)
00325     {
00326         /* Handle insertion of (-inf, +inf) range */
00327         if (orig_lower.infinite && orig_upper.infinite)
00328         {
00329             /*
00330              * Best case is inserting to (-inf, +inf) original range.
00331              */
00332             *penalty = 0.0;
00333         }
00334         else if (orig_lower.infinite || orig_upper.infinite)
00335         {
00336             /*
00337              * When original range is (-inf, x) or (x, +inf) it requires
00338              * broadening of original range (extension of one bound to
00339              * infinity).
00340              */
00341             *penalty = INFINITE_BOUND_PENALTY;
00342         }
00343         else
00344         {
00345             /*
00346              * Insertion to normal original range is least preferred.
00347              */
00348             *penalty = 2 * INFINITE_BOUND_PENALTY;
00349         }
00350 
00351         if (RangeIsOrContainsEmpty(orig))
00352         {
00353             /*
00354              * Original range is narrower when it doesn't contain empty
00355              * ranges. Add additional penalty otherwise.
00356              */
00357             *penalty += CONTAIN_EMPTY_PENALTY;
00358         }
00359     }
00360     else if (new_lower.infinite)
00361     {
00362         /* Handle insertion of (-inf, x) range */
00363         if (!orig_empty && orig_lower.infinite)
00364         {
00365             if (orig_upper.infinite)
00366             {
00367                 /*
00368                  * (-inf, +inf) range won't be extended by insertion of (-inf,
00369                  * x) range. It's a less desirable case than insertion to
00370                  * (-inf, y) original range without extension, because in that
00371                  * case original range is narrower. But we can't express that
00372                  * in single float value.
00373                  */
00374                 *penalty = 0.0;
00375             }
00376             else
00377             {
00378                 if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
00379                 {
00380                     /*
00381                      * Get extension of original range using subtype_diff. Use
00382                      * constant if subtype_diff unavailable.
00383                      */
00384                     if (has_subtype_diff)
00385                         *penalty = call_subtype_diff(typcache,
00386                                                      new_upper.val,
00387                                                      orig_upper.val);
00388                     else
00389                         *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
00390                 }
00391                 else
00392                 {
00393                     /* No extension of original range */
00394                     *penalty = 0.0;
00395                 }
00396             }
00397         }
00398         else
00399         {
00400             /*
00401              * If lower bound of original range is not -inf, then extension of
00402              * it is infinity.
00403              */
00404             *penalty = get_float4_infinity();
00405         }
00406     }
00407     else if (new_upper.infinite)
00408     {
00409         /* Handle insertion of (x, +inf) range */
00410         if (!orig_empty && orig_upper.infinite)
00411         {
00412             if (orig_lower.infinite)
00413             {
00414                 /*
00415                  * (-inf, +inf) range won't be extended by insertion of (x,
00416                  * +inf) range. It's a less desirable case than insertion to
00417                  * (y, +inf) original range without extension, because in that
00418                  * case original range is narrower. But we can't express that
00419                  * in single float value.
00420                  */
00421                 *penalty = 0.0;
00422             }
00423             else
00424             {
00425                 if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
00426                 {
00427                     /*
00428                      * Get extension of original range using subtype_diff. Use
00429                      * constant if subtype_diff unavailable.
00430                      */
00431                     if (has_subtype_diff)
00432                         *penalty = call_subtype_diff(typcache,
00433                                                      orig_lower.val,
00434                                                      new_lower.val);
00435                     else
00436                         *penalty = DEFAULT_SUBTYPE_DIFF_PENALTY;
00437                 }
00438                 else
00439                 {
00440                     /* No extension of original range */
00441                     *penalty = 0.0;
00442                 }
00443             }
00444         }
00445         else
00446         {
00447             /*
00448              * If upper bound of original range is not +inf, then extension of
00449              * it is infinity.
00450              */
00451             *penalty = get_float4_infinity();
00452         }
00453     }
00454     else
00455     {
00456         /* Handle insertion of normal (non-empty, non-infinite) range */
00457         if (orig_empty || orig_lower.infinite || orig_upper.infinite)
00458         {
00459             /*
00460              * Avoid mixing normal ranges with infinite and empty ranges.
00461              */
00462             *penalty = get_float4_infinity();
00463         }
00464         else
00465         {
00466             /*
00467              * Calculate extension of original range by calling subtype_diff.
00468              * Use constant if subtype_diff unavailable.
00469              */
00470             float8      diff = 0.0;
00471 
00472             if (range_cmp_bounds(typcache, &new_lower, &orig_lower) < 0)
00473             {
00474                 if (has_subtype_diff)
00475                     diff += call_subtype_diff(typcache,
00476                                               orig_lower.val,
00477                                               new_lower.val);
00478                 else
00479                     diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
00480             }
00481             if (range_cmp_bounds(typcache, &new_upper, &orig_upper) > 0)
00482             {
00483                 if (has_subtype_diff)
00484                     diff += call_subtype_diff(typcache,
00485                                               new_upper.val,
00486                                               orig_upper.val);
00487                 else
00488                     diff += DEFAULT_SUBTYPE_DIFF_PENALTY;
00489             }
00490             *penalty = diff;
00491         }
00492     }
00493 
00494     PG_RETURN_POINTER(penalty);
00495 }
00496 
00497 /*
00498  * The GiST PickSplit method for ranges
00499  *
00500  * Primarily, we try to segregate ranges of different classes.  If splitting
00501  * ranges of the same class, use the appropriate split method for that class.
00502  */
00503 Datum
00504 range_gist_picksplit(PG_FUNCTION_ARGS)
00505 {
00506     GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
00507     GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
00508     TypeCacheEntry *typcache;
00509     OffsetNumber i;
00510     RangeType  *pred_left;
00511     int         nbytes;
00512     OffsetNumber maxoff;
00513     int         count_in_classes[CLS_COUNT];
00514     int         j;
00515     int         non_empty_classes_count = 0;
00516     int         biggest_class = -1;
00517     int         biggest_class_count = 0;
00518     int         total_count;
00519 
00520     /* use first item to look up range type's info */
00521     pred_left = DatumGetRangeType(entryvec->vector[FirstOffsetNumber].key);
00522     typcache = range_get_typcache(fcinfo, RangeTypeGetOid(pred_left));
00523 
00524     maxoff = entryvec->n - 1;
00525     nbytes = (maxoff + 1) * sizeof(OffsetNumber);
00526     v->spl_left = (OffsetNumber *) palloc(nbytes);
00527     v->spl_right = (OffsetNumber *) palloc(nbytes);
00528 
00529     /*
00530      * Get count distribution of range classes.
00531      */
00532     memset(count_in_classes, 0, sizeof(count_in_classes));
00533     for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
00534     {
00535         RangeType  *range = DatumGetRangeType(entryvec->vector[i].key);
00536 
00537         count_in_classes[get_gist_range_class(range)]++;
00538     }
00539 
00540     /*
00541      * Count non-empty classes and find biggest class.
00542      */
00543     total_count = maxoff;
00544     for (j = 0; j < CLS_COUNT; j++)
00545     {
00546         if (count_in_classes[j] > 0)
00547         {
00548             if (count_in_classes[j] > biggest_class_count)
00549             {
00550                 biggest_class_count = count_in_classes[j];
00551                 biggest_class = j;
00552             }
00553             non_empty_classes_count++;
00554         }
00555     }
00556 
00557     Assert(non_empty_classes_count > 0);
00558 
00559     if (non_empty_classes_count == 1)
00560     {
00561         /* One non-empty class, so split inside class */
00562         if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_NORMAL)
00563         {
00564             /* double sorting split for normal ranges */
00565             range_gist_double_sorting_split(typcache, entryvec, v);
00566         }
00567         else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_LOWER_INF)
00568         {
00569             /* upper bound sorting split for (-inf, x) ranges */
00570             range_gist_single_sorting_split(typcache, entryvec, v, true);
00571         }
00572         else if ((biggest_class & ~CLS_CONTAIN_EMPTY) == CLS_UPPER_INF)
00573         {
00574             /* lower bound sorting split for (x, +inf) ranges */
00575             range_gist_single_sorting_split(typcache, entryvec, v, false);
00576         }
00577         else
00578         {
00579             /* trivial split for all (-inf, +inf) or all empty ranges */
00580             range_gist_fallback_split(typcache, entryvec, v);
00581         }
00582     }
00583     else
00584     {
00585         /*
00586          * Class based split.
00587          *
00588          * To which side of the split should each class go?  Initialize them
00589          * all to go to the left side.
00590          */
00591         SplitLR     classes_groups[CLS_COUNT];
00592 
00593         memset(classes_groups, 0, sizeof(classes_groups));
00594 
00595         if (count_in_classes[CLS_NORMAL] > 0)
00596         {
00597             /* separate normal ranges if any */
00598             classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
00599         }
00600         else
00601         {
00602             /*----------
00603              * Try to split classes in one of two ways:
00604              *  1) containing infinities - not containing infinities
00605              *  2) containing empty - not containing empty
00606              *
00607              * Select the way which balances the ranges between left and right
00608              * the best. If split in these ways is not possible, there are at
00609              * most 3 classes, so just separate biggest class.
00610              *----------
00611              */
00612             int         infCount,
00613                         nonInfCount;
00614             int         emptyCount,
00615                         nonEmptyCount;
00616 
00617             nonInfCount =
00618                 count_in_classes[CLS_NORMAL] +
00619                 count_in_classes[CLS_CONTAIN_EMPTY] +
00620                 count_in_classes[CLS_EMPTY];
00621             infCount = total_count - nonInfCount;
00622 
00623             nonEmptyCount =
00624                 count_in_classes[CLS_NORMAL] +
00625                 count_in_classes[CLS_LOWER_INF] +
00626                 count_in_classes[CLS_UPPER_INF] +
00627                 count_in_classes[CLS_LOWER_INF | CLS_UPPER_INF];
00628             emptyCount = total_count - nonEmptyCount;
00629 
00630             if (infCount > 0 && nonInfCount > 0 &&
00631                 (Abs(infCount - nonInfCount) <=
00632                  Abs(emptyCount - nonEmptyCount)))
00633             {
00634                 classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
00635                 classes_groups[CLS_CONTAIN_EMPTY] = SPLIT_RIGHT;
00636                 classes_groups[CLS_EMPTY] = SPLIT_RIGHT;
00637             }
00638             else if (emptyCount > 0 && nonEmptyCount > 0)
00639             {
00640                 classes_groups[CLS_NORMAL] = SPLIT_RIGHT;
00641                 classes_groups[CLS_LOWER_INF] = SPLIT_RIGHT;
00642                 classes_groups[CLS_UPPER_INF] = SPLIT_RIGHT;
00643                 classes_groups[CLS_LOWER_INF | CLS_UPPER_INF] = SPLIT_RIGHT;
00644             }
00645             else
00646             {
00647                 /*
00648                  * Either total_count == emptyCount or total_count ==
00649                  * infCount.
00650                  */
00651                 classes_groups[biggest_class] = SPLIT_RIGHT;
00652             }
00653         }
00654 
00655         range_gist_class_split(typcache, entryvec, v, classes_groups);
00656     }
00657 
00658     PG_RETURN_POINTER(v);
00659 }
00660 
00661 /* equality comparator for GiST */
00662 Datum
00663 range_gist_same(PG_FUNCTION_ARGS)
00664 {
00665     RangeType  *r1 = PG_GETARG_RANGE(0);
00666     RangeType  *r2 = PG_GETARG_RANGE(1);
00667     bool       *result = (bool *) PG_GETARG_POINTER(2);
00668 
00669     /*
00670      * range_eq will ignore the RANGE_CONTAIN_EMPTY flag, so we have to check
00671      * that for ourselves.  More generally, if the entries have been properly
00672      * normalized, then unequal flags bytes must mean unequal ranges ... so
00673      * let's just test all the flag bits at once.
00674      */
00675     if (range_get_flags(r1) != range_get_flags(r2))
00676         *result = false;
00677     else
00678     {
00679         TypeCacheEntry *typcache;
00680         typcache = range_get_typcache(fcinfo, RangeTypeGetOid(r1));
00681 
00682         *result = range_eq_internal(typcache, r1, r2);
00683     }
00684 
00685     PG_RETURN_POINTER(result);
00686 }
00687 
00688 /*
00689  *----------------------------------------------------------
00690  * STATIC FUNCTIONS
00691  *----------------------------------------------------------
00692  */
00693 
00694 /*
00695  * Return the smallest range that contains r1 and r2
00696  *
00697  * This differs from regular range_union in two critical ways:
00698  * 1. It won't throw an error for non-adjacent r1 and r2, but just absorb
00699  * the intervening values into the result range.
00700  * 2. We track whether any empty range has been union'd into the result,
00701  * so that contained_by searches can be indexed.  Note that this means
00702  * that *all* unions formed within the GiST index must go through here.
00703  */
00704 static RangeType *
00705 range_super_union(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
00706 {
00707     RangeType  *result;
00708     RangeBound  lower1,
00709                 lower2;
00710     RangeBound  upper1,
00711                 upper2;
00712     bool        empty1,
00713                 empty2;
00714     char        flags1,
00715                 flags2;
00716     RangeBound *result_lower;
00717     RangeBound *result_upper;
00718 
00719     range_deserialize(typcache, r1, &lower1, &upper1, &empty1);
00720     range_deserialize(typcache, r2, &lower2, &upper2, &empty2);
00721     flags1 = range_get_flags(r1);
00722     flags2 = range_get_flags(r2);
00723 
00724     if (empty1)
00725     {
00726         /* We can return r2 as-is if it already is or contains empty */
00727         if (flags2 & (RANGE_EMPTY | RANGE_CONTAIN_EMPTY))
00728             return r2;
00729         /* Else we'd better copy it (modify-in-place isn't safe) */
00730         r2 = rangeCopy(r2);
00731         range_set_contain_empty(r2);
00732         return r2;
00733     }
00734     if (empty2)
00735     {
00736         /* We can return r1 as-is if it already is or contains empty */
00737         if (flags1 & (RANGE_EMPTY | RANGE_CONTAIN_EMPTY))
00738             return r1;
00739         /* Else we'd better copy it (modify-in-place isn't safe) */
00740         r1 = rangeCopy(r1);
00741         range_set_contain_empty(r1);
00742         return r1;
00743     }
00744 
00745     if (range_cmp_bounds(typcache, &lower1, &lower2) <= 0)
00746         result_lower = &lower1;
00747     else
00748         result_lower = &lower2;
00749 
00750     if (range_cmp_bounds(typcache, &upper1, &upper2) >= 0)
00751         result_upper = &upper1;
00752     else
00753         result_upper = &upper2;
00754 
00755     /* optimization to avoid constructing a new range */
00756     if (result_lower == &lower1 && result_upper == &upper1 &&
00757         ((flags1 & RANGE_CONTAIN_EMPTY) || !(flags2 & RANGE_CONTAIN_EMPTY)))
00758         return r1;
00759     if (result_lower == &lower2 && result_upper == &upper2 &&
00760         ((flags2 & RANGE_CONTAIN_EMPTY) || !(flags1 & RANGE_CONTAIN_EMPTY)))
00761         return r2;
00762 
00763     result = make_range(typcache, result_lower, result_upper, false);
00764 
00765     if ((flags1 & RANGE_CONTAIN_EMPTY) || (flags2 & RANGE_CONTAIN_EMPTY))
00766         range_set_contain_empty(result);
00767 
00768     return result;
00769 }
00770 
00771 /*
00772  * GiST consistent test on an index internal page
00773  */
00774 static bool
00775 range_gist_consistent_int(TypeCacheEntry *typcache, StrategyNumber strategy,
00776                           RangeType *key, Datum query)
00777 {
00778     switch (strategy)
00779     {
00780         case RANGESTRAT_BEFORE:
00781             if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
00782                 return false;
00783             return (!range_overright_internal(typcache, key,
00784                                                     DatumGetRangeType(query)));
00785         case RANGESTRAT_OVERLEFT:
00786             if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
00787                 return false;
00788             return (!range_after_internal(typcache, key,
00789                                                     DatumGetRangeType(query)));
00790         case RANGESTRAT_OVERLAPS:
00791             return range_overlaps_internal(typcache, key,
00792                                                     DatumGetRangeType(query));
00793         case RANGESTRAT_OVERRIGHT:
00794             if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
00795                 return false;
00796             return (!range_before_internal(typcache, key,
00797                                                     DatumGetRangeType(query)));
00798         case RANGESTRAT_AFTER:
00799             if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
00800                 return false;
00801             return (!range_overleft_internal(typcache, key,
00802                                                     DatumGetRangeType(query)));
00803         case RANGESTRAT_ADJACENT:
00804             if (RangeIsEmpty(key) || RangeIsEmpty(DatumGetRangeType(query)))
00805                 return false;
00806             if (range_adjacent_internal(typcache, key,
00807                                                     DatumGetRangeType(query)))
00808                 return true;
00809             return range_overlaps_internal(typcache, key,
00810                                                     DatumGetRangeType(query));
00811         case RANGESTRAT_CONTAINS:
00812             return range_contains_internal(typcache, key,
00813                                                     DatumGetRangeType(query));
00814         case RANGESTRAT_CONTAINED_BY:
00815 
00816             /*
00817              * Empty ranges are contained by anything, so if key is or
00818              * contains any empty ranges, we must descend into it.  Otherwise,
00819              * descend only if key overlaps the query.
00820              */
00821             if (RangeIsOrContainsEmpty(key))
00822                 return true;
00823             return range_overlaps_internal(typcache, key,
00824                                                     DatumGetRangeType(query));
00825         case RANGESTRAT_CONTAINS_ELEM:
00826             return range_contains_elem_internal(typcache, key, query);
00827         case RANGESTRAT_EQ:
00828 
00829             /*
00830              * If query is empty, descend only if the key is or contains any
00831              * empty ranges.  Otherwise, descend if key contains query.
00832              */
00833             if (RangeIsEmpty(DatumGetRangeType(query)))
00834                 return RangeIsOrContainsEmpty(key);
00835             return range_contains_internal(typcache, key,
00836                                                     DatumGetRangeType(query));
00837         default:
00838             elog(ERROR, "unrecognized range strategy: %d", strategy);
00839             return false;           /* keep compiler quiet */
00840     }
00841 }
00842 
00843 /*
00844  * GiST consistent test on an index leaf page
00845  */
00846 static bool
00847 range_gist_consistent_leaf(TypeCacheEntry *typcache, StrategyNumber strategy,
00848                            RangeType *key, Datum query)
00849 {
00850     switch (strategy)
00851     {
00852         case RANGESTRAT_BEFORE:
00853             return range_before_internal(typcache, key,
00854                                                     DatumGetRangeType(query));
00855         case RANGESTRAT_OVERLEFT:
00856             return range_overleft_internal(typcache, key,
00857                                                     DatumGetRangeType(query));
00858         case RANGESTRAT_OVERLAPS:
00859             return range_overlaps_internal(typcache, key,
00860                                                     DatumGetRangeType(query));
00861         case RANGESTRAT_OVERRIGHT:
00862             return range_overright_internal(typcache, key,
00863                                                     DatumGetRangeType(query));
00864         case RANGESTRAT_AFTER:
00865             return range_after_internal(typcache, key,
00866                                                     DatumGetRangeType(query));
00867         case RANGESTRAT_ADJACENT:
00868             return range_adjacent_internal(typcache, key,
00869                                                     DatumGetRangeType(query));
00870         case RANGESTRAT_CONTAINS:
00871             return range_contains_internal(typcache, key,
00872                                                     DatumGetRangeType(query));
00873         case RANGESTRAT_CONTAINED_BY:
00874             return range_contained_by_internal(typcache, key,
00875                                                     DatumGetRangeType(query));
00876         case RANGESTRAT_CONTAINS_ELEM:
00877             return range_contains_elem_internal(typcache, key, query);
00878         case RANGESTRAT_EQ:
00879             return range_eq_internal(typcache, key, DatumGetRangeType(query));
00880         default:
00881             elog(ERROR, "unrecognized range strategy: %d", strategy);
00882             return false;           /* keep compiler quiet */
00883     }
00884 }
00885 
00886 /*
00887  * Trivial split: half of entries will be placed on one page
00888  * and the other half on the other page.
00889  */
00890 static void
00891 range_gist_fallback_split(TypeCacheEntry *typcache,
00892                           GistEntryVector *entryvec,
00893                           GIST_SPLITVEC *v)
00894 {
00895     RangeType  *left_range = NULL;
00896     RangeType  *right_range = NULL;
00897     OffsetNumber i,
00898                 maxoff,
00899                 split_idx;
00900 
00901     maxoff = entryvec->n - 1;
00902     /* Split entries before this to left page, after to right: */
00903     split_idx = (maxoff - FirstOffsetNumber) / 2 + FirstOffsetNumber;
00904 
00905     v->spl_nleft = 0;
00906     v->spl_nright = 0;
00907     for (i = FirstOffsetNumber; i <= maxoff; i++)
00908     {
00909         RangeType  *range = DatumGetRangeType(entryvec->vector[i].key);
00910 
00911         if (i < split_idx)
00912             PLACE_LEFT(range, i);
00913         else
00914             PLACE_RIGHT(range, i);
00915     }
00916 
00917     v->spl_ldatum = RangeTypeGetDatum(left_range);
00918     v->spl_rdatum = RangeTypeGetDatum(right_range);
00919 }
00920 
00921 /*
00922  * Split based on classes of ranges.
00923  *
00924  * See get_gist_range_class for class definitions.
00925  * classes_groups is an array of length CLS_COUNT indicating the side of the
00926  * split to which each class should go.
00927  */
00928 static void
00929 range_gist_class_split(TypeCacheEntry *typcache,
00930                        GistEntryVector *entryvec,
00931                        GIST_SPLITVEC *v,
00932                        SplitLR *classes_groups)
00933 {
00934     RangeType  *left_range = NULL;
00935     RangeType  *right_range = NULL;
00936     OffsetNumber i,
00937                 maxoff;
00938 
00939     maxoff = entryvec->n - 1;
00940 
00941     v->spl_nleft = 0;
00942     v->spl_nright = 0;
00943     for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
00944     {
00945         RangeType  *range = DatumGetRangeType(entryvec->vector[i].key);
00946         int         class;
00947 
00948         /* Get class of range */
00949         class = get_gist_range_class(range);
00950 
00951         /* Place range to appropriate page */
00952         if (classes_groups[class] == SPLIT_LEFT)
00953             PLACE_LEFT(range, i);
00954         else
00955         {
00956             Assert(classes_groups[class] == SPLIT_RIGHT);
00957             PLACE_RIGHT(range, i);
00958         }
00959     }
00960 
00961     v->spl_ldatum = RangeTypeGetDatum(left_range);
00962     v->spl_rdatum = RangeTypeGetDatum(right_range);
00963 }
00964 
00965 /*
00966  * Sorting based split. First half of entries according to the sort will be
00967  * placed to one page, and second half of entries will be placed to other
00968  * page. use_upper_bound parameter indicates whether to use upper or lower
00969  * bound for sorting.
00970  */
00971 static void
00972 range_gist_single_sorting_split(TypeCacheEntry *typcache,
00973                                 GistEntryVector *entryvec,
00974                                 GIST_SPLITVEC *v,
00975                                 bool use_upper_bound)
00976 {
00977     SingleBoundSortItem *sortItems;
00978     RangeType  *left_range = NULL;
00979     RangeType  *right_range = NULL;
00980     OffsetNumber i,
00981                 maxoff,
00982                 split_idx;
00983 
00984     maxoff = entryvec->n - 1;
00985 
00986     sortItems = (SingleBoundSortItem *)
00987         palloc(maxoff * sizeof(SingleBoundSortItem));
00988 
00989     /*
00990      * Prepare auxiliary array and sort the values.
00991      */
00992     for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
00993     {
00994         RangeType  *range = DatumGetRangeType(entryvec->vector[i].key);
00995         RangeBound  bound2;
00996         bool        empty;
00997 
00998         sortItems[i - 1].index = i;
00999         /* Put appropriate bound into array */
01000         if (use_upper_bound)
01001             range_deserialize(typcache, range, &bound2,
01002                               &sortItems[i - 1].bound, &empty);
01003         else
01004             range_deserialize(typcache, range, &sortItems[i - 1].bound,
01005                               &bound2, &empty);
01006         Assert(!empty);
01007     }
01008 
01009     qsort_arg(sortItems, maxoff, sizeof(SingleBoundSortItem),
01010               single_bound_cmp, typcache);
01011 
01012     split_idx = maxoff / 2;
01013 
01014     v->spl_nleft = 0;
01015     v->spl_nright = 0;
01016 
01017     for (i = 0; i < maxoff; i++)
01018     {
01019         int         idx = sortItems[i].index;
01020         RangeType  *range = DatumGetRangeType(entryvec->vector[idx].key);
01021 
01022         if (i < split_idx)
01023             PLACE_LEFT(range, idx);
01024         else
01025             PLACE_RIGHT(range, idx);
01026     }
01027 
01028     v->spl_ldatum = RangeTypeGetDatum(left_range);
01029     v->spl_rdatum = RangeTypeGetDatum(right_range);
01030 }
01031 
01032 /*
01033  * Double sorting split algorithm.
01034  *
01035  * The algorithm considers dividing ranges into two groups. The first (left)
01036  * group contains general left bound. The second (right) group contains
01037  * general right bound. The challenge is to find upper bound of left group
01038  * and lower bound of right group so that overlap of groups is minimal and
01039  * ratio of distribution is acceptable. Algorithm finds for each lower bound of
01040  * right group minimal upper bound of left group, and for each upper bound of
01041  * left group maximal lower bound of right group. For each found pair
01042  * range_gist_consider_split considers replacement of currently selected
01043  * split with the new one.
01044  *
01045  * After that, all the entries are divided into three groups:
01046  * 1) Entries which should be placed to the left group
01047  * 2) Entries which should be placed to the right group
01048  * 3) "Common entries" which can be placed to either group without affecting
01049  *    amount of overlap.
01050  *
01051  * The common ranges are distributed by difference of distance from lower
01052  * bound of common range to lower bound of right group and distance from upper
01053  * bound of common range to upper bound of left group.
01054  *
01055  * For details see:
01056  * "A new double sorting-based node splitting algorithm for R-tree",
01057  * A. Korotkov
01058  * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
01059  */
01060 static void
01061 range_gist_double_sorting_split(TypeCacheEntry *typcache,
01062                                 GistEntryVector *entryvec,
01063                                 GIST_SPLITVEC *v)
01064 {
01065     ConsiderSplitContext context;
01066     OffsetNumber i,
01067                 maxoff;
01068     RangeType  *range,
01069                *left_range = NULL,
01070                *right_range = NULL;
01071     int         common_entries_count;
01072     NonEmptyRange *by_lower,
01073                *by_upper;
01074     CommonEntry *common_entries;
01075     int         nentries,
01076                 i1,
01077                 i2;
01078     RangeBound *right_lower,
01079                *left_upper;
01080 
01081     memset(&context, 0, sizeof(ConsiderSplitContext));
01082     context.typcache = typcache;
01083     context.has_subtype_diff = OidIsValid(typcache->rng_subdiff_finfo.fn_oid);
01084 
01085     maxoff = entryvec->n - 1;
01086     nentries = context.entries_count = maxoff - FirstOffsetNumber + 1;
01087     context.first = true;
01088 
01089     /* Allocate arrays for sorted range bounds */
01090     by_lower = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
01091     by_upper = (NonEmptyRange *) palloc(nentries * sizeof(NonEmptyRange));
01092 
01093     /* Fill arrays of bounds */
01094     for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
01095     {
01096         RangeType  *range = DatumGetRangeType(entryvec->vector[i].key);
01097         bool        empty;
01098 
01099         range_deserialize(typcache, range,
01100                           &by_lower[i - FirstOffsetNumber].lower,
01101                           &by_lower[i - FirstOffsetNumber].upper,
01102                           &empty);
01103         Assert(!empty);
01104     }
01105 
01106     /*
01107      * Make two arrays of range bounds: one sorted by lower bound and another
01108      * sorted by upper bound.
01109      */
01110     memcpy(by_upper, by_lower, nentries * sizeof(NonEmptyRange));
01111     qsort_arg(by_lower, nentries, sizeof(NonEmptyRange),
01112               interval_cmp_lower, typcache);
01113     qsort_arg(by_upper, nentries, sizeof(NonEmptyRange),
01114               interval_cmp_upper, typcache);
01115 
01116     /*----------
01117      * The goal is to form a left and right range, so that every entry
01118      * range is contained by either left or right interval (or both).
01119      *
01120      * For example, with the ranges (0,1), (1,3), (2,3), (2,4):
01121      *
01122      * 0 1 2 3 4
01123      * +-+
01124      *   +---+
01125      *     +-+
01126      *     +---+
01127      *
01128      * The left and right ranges are of the form (0,a) and (b,4).
01129      * We first consider splits where b is the lower bound of an entry.
01130      * We iterate through all entries, and for each b, calculate the
01131      * smallest possible a. Then we consider splits where a is the
01132      * upper bound of an entry, and for each a, calculate the greatest
01133      * possible b.
01134      *
01135      * In the above example, the first loop would consider splits:
01136      * b=0: (0,1)-(0,4)
01137      * b=1: (0,1)-(1,4)
01138      * b=2: (0,3)-(2,4)
01139      *
01140      * And the second loop:
01141      * a=1: (0,1)-(1,4)
01142      * a=3: (0,3)-(2,4)
01143      * a=4: (0,4)-(2,4)
01144      *----------
01145      */
01146 
01147     /*
01148      * Iterate over lower bound of right group, finding smallest possible
01149      * upper bound of left group.
01150      */
01151     i1 = 0;
01152     i2 = 0;
01153     right_lower = &by_lower[i1].lower;
01154     left_upper = &by_upper[i2].lower;
01155     while (true)
01156     {
01157         /*
01158          * Find next lower bound of right group.
01159          */
01160         while (i1 < nentries &&
01161                range_cmp_bounds(typcache, right_lower,
01162                                 &by_lower[i1].lower) == 0)
01163         {
01164             if (range_cmp_bounds(typcache, &by_lower[i1].upper,
01165                                  left_upper) > 0)
01166                 left_upper = &by_lower[i1].upper;
01167             i1++;
01168         }
01169         if (i1 >= nentries)
01170             break;
01171         right_lower = &by_lower[i1].lower;
01172 
01173         /*
01174          * Find count of ranges which anyway should be placed to the left
01175          * group.
01176          */
01177         while (i2 < nentries &&
01178                range_cmp_bounds(typcache, &by_upper[i2].upper,
01179                                 left_upper) <= 0)
01180             i2++;
01181 
01182         /*
01183          * Consider found split to see if it's better than what we had.
01184          */
01185         range_gist_consider_split(&context, right_lower, i1, left_upper, i2);
01186     }
01187 
01188     /*
01189      * Iterate over upper bound of left group finding greatest possible lower
01190      * bound of right group.
01191      */
01192     i1 = nentries - 1;
01193     i2 = nentries - 1;
01194     right_lower = &by_lower[i1].upper;
01195     left_upper = &by_upper[i2].upper;
01196     while (true)
01197     {
01198         /*
01199          * Find next upper bound of left group.
01200          */
01201         while (i2 >= 0 &&
01202                range_cmp_bounds(typcache, left_upper,
01203                                 &by_upper[i2].upper) == 0)
01204         {
01205             if (range_cmp_bounds(typcache, &by_upper[i2].lower,
01206                                  right_lower) < 0)
01207                 right_lower = &by_upper[i2].lower;
01208             i2--;
01209         }
01210         if (i2 < 0)
01211             break;
01212         left_upper = &by_upper[i2].upper;
01213 
01214         /*
01215          * Find count of intervals which anyway should be placed to the right
01216          * group.
01217          */
01218         while (i1 >= 0 &&
01219                range_cmp_bounds(typcache, &by_lower[i1].lower,
01220                                 right_lower) >= 0)
01221             i1--;
01222 
01223         /*
01224          * Consider found split to see if it's better than what we had.
01225          */
01226         range_gist_consider_split(&context, right_lower, i1 + 1,
01227                                   left_upper, i2 + 1);
01228     }
01229 
01230     /*
01231      * If we failed to find any acceptable splits, use trivial split.
01232      */
01233     if (context.first)
01234     {
01235         range_gist_fallback_split(typcache, entryvec, v);
01236         return;
01237     }
01238 
01239     /*
01240      * Ok, we have now selected bounds of the groups. Now we have to
01241      * distribute entries themselves. At first we distribute entries which can
01242      * be placed unambiguously and collect "common entries" to array.
01243      */
01244 
01245     /* Allocate vectors for results */
01246     v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
01247     v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
01248     v->spl_nleft = 0;
01249     v->spl_nright = 0;
01250 
01251     /*
01252      * Allocate an array for "common entries" - entries which can be placed to
01253      * either group without affecting overlap along selected axis.
01254      */
01255     common_entries_count = 0;
01256     common_entries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
01257 
01258     /*
01259      * Distribute entries which can be distributed unambiguously, and collect
01260      * common entries.
01261      */
01262     for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
01263     {
01264         RangeBound  lower,
01265                     upper;
01266         bool        empty;
01267 
01268         /*
01269          * Get upper and lower bounds along selected axis.
01270          */
01271         range = DatumGetRangeType(entryvec->vector[i].key);
01272 
01273         range_deserialize(typcache, range, &lower, &upper, &empty);
01274 
01275         if (range_cmp_bounds(typcache, &upper, context.left_upper) <= 0)
01276         {
01277             /* Fits in the left group */
01278             if (range_cmp_bounds(typcache, &lower, context.right_lower) >= 0)
01279             {
01280                 /* Fits also in the right group, so "common entry" */
01281                 common_entries[common_entries_count].index = i;
01282                 if (context.has_subtype_diff)
01283                 {
01284                     /*
01285                      * delta = (lower - context.right_lower) -
01286                      * (context.left_upper - upper)
01287                      */
01288                     common_entries[common_entries_count].delta =
01289                         call_subtype_diff(typcache,
01290                                           lower.val,
01291                                           context.right_lower->val) -
01292                         call_subtype_diff(typcache,
01293                                           context.left_upper->val,
01294                                           upper.val);
01295                 }
01296                 else
01297                 {
01298                     /* Without subtype_diff, take all deltas as zero */
01299                     common_entries[common_entries_count].delta = 0;
01300                 }
01301                 common_entries_count++;
01302             }
01303             else
01304             {
01305                 /* Doesn't fit to the right group, so join to the left group */
01306                 PLACE_LEFT(range, i);
01307             }
01308         }
01309         else
01310         {
01311             /*
01312              * Each entry should fit on either left or right group. Since this
01313              * entry didn't fit in the left group, it better fit in the right
01314              * group.
01315              */
01316             Assert(range_cmp_bounds(typcache, &lower,
01317                                     context.right_lower) >= 0);
01318             PLACE_RIGHT(range, i);
01319         }
01320     }
01321 
01322     /*
01323      * Distribute "common entries", if any.
01324      */
01325     if (common_entries_count > 0)
01326     {
01327         /*
01328          * Sort "common entries" by calculated deltas in order to distribute
01329          * the most ambiguous entries first.
01330          */
01331         qsort(common_entries, common_entries_count, sizeof(CommonEntry),
01332               common_entry_cmp);
01333 
01334         /*
01335          * Distribute "common entries" between groups according to sorting.
01336          */
01337         for (i = 0; i < common_entries_count; i++)
01338         {
01339             int         idx = common_entries[i].index;
01340 
01341             range = DatumGetRangeType(entryvec->vector[idx].key);
01342 
01343             /*
01344              * Check if we have to place this entry in either group to achieve
01345              * LIMIT_RATIO.
01346              */
01347             if (i < context.common_left)
01348                 PLACE_LEFT(range, idx);
01349             else
01350                 PLACE_RIGHT(range, idx);
01351         }
01352     }
01353 
01354     v->spl_ldatum = PointerGetDatum(left_range);
01355     v->spl_rdatum = PointerGetDatum(right_range);
01356 }
01357 
01358 /*
01359  * Consider replacement of currently selected split with a better one
01360  * during range_gist_double_sorting_split.
01361  */
01362 static void
01363 range_gist_consider_split(ConsiderSplitContext *context,
01364                           RangeBound *right_lower, int min_left_count,
01365                           RangeBound *left_upper, int max_left_count)
01366 {
01367     int         left_count,
01368                 right_count;
01369     float4      ratio,
01370                 overlap;
01371 
01372     /*
01373      * Calculate entries distribution ratio assuming most uniform distribution
01374      * of common entries.
01375      */
01376     if (min_left_count >= (context->entries_count + 1) / 2)
01377         left_count = min_left_count;
01378     else if (max_left_count <= context->entries_count / 2)
01379         left_count = max_left_count;
01380     else
01381         left_count = context->entries_count / 2;
01382     right_count = context->entries_count - left_count;
01383 
01384     /*
01385      * Ratio of split: quotient between size of smaller group and total
01386      * entries count.  This is necessarily 0.5 or less; if it's less than
01387      * LIMIT_RATIO then we will never accept the new split.
01388      */
01389     ratio = ((float4) Min(left_count, right_count)) /
01390         ((float4) context->entries_count);
01391 
01392     if (ratio > LIMIT_RATIO)
01393     {
01394         bool        selectthis = false;
01395 
01396         /*
01397          * The ratio is acceptable, so compare current split with previously
01398          * selected one. We search for minimal overlap (allowing negative
01399          * values) and minimal ratio secondarily.  If subtype_diff is
01400          * available, it's used for overlap measure.  Without subtype_diff we
01401          * use number of "common entries" as an overlap measure.
01402          */
01403         if (context->has_subtype_diff)
01404             overlap = call_subtype_diff(context->typcache,
01405                                         left_upper->val,
01406                                         right_lower->val);
01407         else
01408             overlap = max_left_count - min_left_count;
01409 
01410         /* If there is no previous selection, select this split */
01411         if (context->first)
01412             selectthis = true;
01413         else
01414         {
01415             /*
01416              * Choose the new split if it has a smaller overlap, or same
01417              * overlap but better ratio.
01418              */
01419             if (overlap < context->overlap ||
01420                 (overlap == context->overlap && ratio > context->ratio))
01421                 selectthis = true;
01422         }
01423 
01424         if (selectthis)
01425         {
01426             /* save information about selected split */
01427             context->first = false;
01428             context->ratio = ratio;
01429             context->overlap = overlap;
01430             context->right_lower = right_lower;
01431             context->left_upper = left_upper;
01432             context->common_left = max_left_count - left_count;
01433             context->common_right = left_count - min_left_count;
01434         }
01435     }
01436 }
01437 
01438 /*
01439  * Find class number for range.
01440  *
01441  * The class number is a valid combination of the properties of the
01442  * range.  Note: the highest possible number is 8, because CLS_EMPTY
01443  * can't be combined with anything else.
01444  */
01445 static int
01446 get_gist_range_class(RangeType *range)
01447 {
01448     int         classNumber;
01449     char        flags;
01450 
01451     flags = range_get_flags(range);
01452     if (flags & RANGE_EMPTY)
01453     {
01454         classNumber = CLS_EMPTY;
01455     }
01456     else
01457     {
01458         classNumber = 0;
01459         if (flags & RANGE_LB_INF)
01460             classNumber |= CLS_LOWER_INF;
01461         if (flags & RANGE_UB_INF)
01462             classNumber |= CLS_UPPER_INF;
01463         if (flags & RANGE_CONTAIN_EMPTY)
01464             classNumber |= CLS_CONTAIN_EMPTY;
01465     }
01466     return classNumber;
01467 }
01468 
01469 /*
01470  * Comparison function for range_gist_single_sorting_split.
01471  */
01472 static int
01473 single_bound_cmp(const void *a, const void *b, void *arg)
01474 {
01475     SingleBoundSortItem *i1 = (SingleBoundSortItem *) a;
01476     SingleBoundSortItem *i2 = (SingleBoundSortItem *) b;
01477     TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
01478 
01479     return range_cmp_bounds(typcache, &i1->bound, &i2->bound);
01480 }
01481 
01482 /*
01483  * Compare NonEmptyRanges by lower bound.
01484  */
01485 static int
01486 interval_cmp_lower(const void *a, const void *b, void *arg)
01487 {
01488     NonEmptyRange *i1 = (NonEmptyRange *) a;
01489     NonEmptyRange *i2 = (NonEmptyRange *) b;
01490     TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
01491 
01492     return range_cmp_bounds(typcache, &i1->lower, &i2->lower);
01493 }
01494 
01495 /*
01496  * Compare NonEmptyRanges by upper bound.
01497  */
01498 static int
01499 interval_cmp_upper(const void *a, const void *b, void *arg)
01500 {
01501     NonEmptyRange *i1 = (NonEmptyRange *) a;
01502     NonEmptyRange *i2 = (NonEmptyRange *) b;
01503     TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
01504 
01505     return range_cmp_bounds(typcache, &i1->upper, &i2->upper);
01506 }
01507 
01508 /*
01509  * Compare CommonEntrys by their deltas.
01510  */
01511 static int
01512 common_entry_cmp(const void *i1, const void *i2)
01513 {
01514     double      delta1 = ((CommonEntry *) i1)->delta;
01515     double      delta2 = ((CommonEntry *) i2)->delta;
01516 
01517     if (delta1 < delta2)
01518         return -1;
01519     else if (delta1 > delta2)
01520         return 1;
01521     else
01522         return 0;
01523 }
01524 
01525 /*
01526  * Convenience function to invoke type-specific subtype_diff function.
01527  * Caller must have already checked that there is one for the range type.
01528  */
01529 static float8
01530 call_subtype_diff(TypeCacheEntry *typcache, Datum val1, Datum val2)
01531 {
01532     float8      value;
01533 
01534     value = DatumGetFloat8(FunctionCall2Coll(&typcache->rng_subdiff_finfo,
01535                                              typcache->rng_collation,
01536                                              val1, val2));
01537     /* Cope with buggy subtype_diff function by returning zero */
01538     if (value >= 0.0)
01539         return value;
01540     return 0.0;
01541 }