#include "postgres.h"#include "access/spgist.h"#include "access/skey.h"#include "catalog/pg_type.h"#include "utils/builtins.h"#include "utils/datum.h"#include "utils/rangetypes.h"
Go to the source code of this file.
Functions | |
| Datum | spg_range_quad_config (PG_FUNCTION_ARGS) |
| Datum | spg_range_quad_choose (PG_FUNCTION_ARGS) |
| Datum | spg_range_quad_picksplit (PG_FUNCTION_ARGS) |
| Datum | spg_range_quad_inner_consistent (PG_FUNCTION_ARGS) |
| Datum | spg_range_quad_leaf_consistent (PG_FUNCTION_ARGS) |
| static int16 | getQuadrant (TypeCacheEntry *typcache, RangeType *centroid, RangeType *tst) |
| static int | bound_cmp (const void *a, const void *b, void *arg) |
| static int bound_cmp | ( | const void * | a, | |
| const void * | b, | |||
| void * | arg | |||
| ) | [static] |
Definition at line 187 of file rangetypes_spgist.c.
References range_cmp_bounds().
Referenced by spg_range_quad_picksplit().
{
RangeBound *ba = (RangeBound *) a;
RangeBound *bb = (RangeBound *) b;
TypeCacheEntry *typcache = (TypeCacheEntry *) arg;
return range_cmp_bounds(typcache, ba, bb);
}
| static int16 getQuadrant | ( | TypeCacheEntry * | typcache, | |
| RangeType * | centroid, | |||
| RangeType * | tst | |||
| ) | [static] |
Definition at line 96 of file rangetypes_spgist.c.
References range_cmp_bounds(), and range_deserialize().
Referenced by spg_range_quad_choose(), spg_range_quad_inner_consistent(), and spg_range_quad_picksplit().
{
RangeBound centroidLower,
centroidUpper;
bool centroidEmpty;
RangeBound lower,
upper;
bool empty;
range_deserialize(typcache, centroid, ¢roidLower, ¢roidUpper,
¢roidEmpty);
range_deserialize(typcache, tst, &lower, &upper, &empty);
if (empty)
return 5;
if (range_cmp_bounds(typcache, &lower, ¢roidLower) >= 0)
{
if (range_cmp_bounds(typcache, &upper, ¢roidUpper) >= 0)
return 1;
else
return 2;
}
else
{
if (range_cmp_bounds(typcache, &upper, ¢roidUpper) >= 0)
return 4;
else
return 3;
}
}
| Datum spg_range_quad_choose | ( | PG_FUNCTION_ARGS | ) |
Definition at line 132 of file rangetypes_spgist.c.
References spgChooseIn::allTheSame, Assert, spgChooseIn::datum, DatumGetRangeType, getQuadrant(), spgChooseIn::hasPrefix, spgChooseOut::matchNode, PG_GETARG_POINTER, PG_RETURN_VOID, spgChooseIn::prefixDatum, range_get_typcache(), RangeIsEmpty, RangeTypeGetDatum, RangeTypeGetOid, spgChooseOut::result, and spgChooseOut::resultType.
{
spgChooseIn *in = (spgChooseIn *) PG_GETARG_POINTER(0);
spgChooseOut *out = (spgChooseOut *) PG_GETARG_POINTER(1);
RangeType *inRange = DatumGetRangeType(in->datum),
*centroid;
int16 quadrant;
TypeCacheEntry *typcache;
if (in->allTheSame)
{
out->resultType = spgMatchNode;
/* nodeN will be set by core */
out->result.matchNode.levelAdd = 0;
out->result.matchNode.restDatum = RangeTypeGetDatum(inRange);
PG_RETURN_VOID();
}
typcache = range_get_typcache(fcinfo, RangeTypeGetOid(inRange));
/*
* A node with no centroid divides ranges purely on whether they're empty
* or not. All empty ranges go to child node 0, all non-empty ranges go
* to node 1.
*/
if (!in->hasPrefix)
{
out->resultType = spgMatchNode;
if (RangeIsEmpty(inRange))
out->result.matchNode.nodeN = 0;
else
out->result.matchNode.nodeN = 1;
out->result.matchNode.levelAdd = 1;
out->result.matchNode.restDatum = RangeTypeGetDatum(inRange);
PG_RETURN_VOID();
}
centroid = DatumGetRangeType(in->prefixDatum);
quadrant = getQuadrant(typcache, centroid, inRange);
Assert(quadrant <= in->nNodes);
/* Select node matching to quadrant number */
out->resultType = spgMatchNode;
out->result.matchNode.nodeN = quadrant - 1;
out->result.matchNode.levelAdd = 1;
out->result.matchNode.restDatum = RangeTypeGetDatum(inRange);
PG_RETURN_VOID();
}
| Datum spg_range_quad_config | ( | PG_FUNCTION_ARGS | ) |
Definition at line 61 of file rangetypes_spgist.c.
References spgConfigOut::canReturnData, spgConfigOut::labelType, spgConfigOut::longValuesOK, PG_GETARG_POINTER, PG_RETURN_VOID, and spgConfigOut::prefixType.
{
/* spgConfigIn *cfgin = (spgConfigIn *) PG_GETARG_POINTER(0); */
spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1);
cfg->prefixType = ANYRANGEOID;
cfg->labelType = VOIDOID; /* we don't need node labels */
cfg->canReturnData = true;
cfg->longValuesOK = false;
PG_RETURN_VOID();
}
| Datum spg_range_quad_inner_consistent | ( | PG_FUNCTION_ARGS | ) |
Definition at line 301 of file rangetypes_spgist.c.
References spgInnerConsistentIn::allTheSame, Assert, bounds_adjacent(), DatumGetRangeType, elog, ERROR, getQuadrant(), spgInnerConsistentIn::hasPrefix, i, RangeBound::inclusive, RangeBound::infinite, RangeBound::lower, spgInnerConsistentIn::nkeys, spgInnerConsistentIn::nNodes, spgInnerConsistentOut::nNodes, spgInnerConsistentOut::nodeNumbers, palloc(), PG_GETARG_POINTER, PG_RETURN_VOID, spgInnerConsistentIn::prefixDatum, range(), range_cmp_bounds(), range_deserialize(), range_get_typcache(), RangeIsEmpty, RANGESTRAT_ADJACENT, RANGESTRAT_AFTER, RANGESTRAT_BEFORE, RANGESTRAT_CONTAINED_BY, RANGESTRAT_CONTAINS, RANGESTRAT_CONTAINS_ELEM, RANGESTRAT_EQ, RANGESTRAT_OVERLAPS, RANGESTRAT_OVERLEFT, RANGESTRAT_OVERRIGHT, RangeTypeGetOid, spgInnerConsistentIn::reconstructedValue, spgInnerConsistentOut::reconstructedValues, spgInnerConsistentIn::scankeys, ScanKeyData::sk_argument, ScanKeyData::sk_strategy, and RangeBound::val.
{
spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
int which;
int i;
/*
* For adjacent search we need also previous centroid (if any) to improve
* the precision of the consistent check. In this case needPrevious flag is
* set and centroid is passed into reconstructedValues. This is not the
* intended purpose of reconstructedValues (because we already have the
* full value available at the leaf), but it's a convenient place to store
* state while traversing the tree.
*/
bool needPrevious = false;
if (in->allTheSame)
{
/* Report that all nodes should be visited */
out->nNodes = in->nNodes;
out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
for (i = 0; i < in->nNodes; i++)
out->nodeNumbers[i] = i;
PG_RETURN_VOID();
}
if (!in->hasPrefix)
{
/*
* No centroid on this inner node. Such a node has two child nodes,
* the first for empty ranges, and the second for non-empty ones.
*/
Assert(in->nNodes == 2);
/*
* Nth bit of which variable means that (N - 1)th node should be
* visited. Initially all bits are set. Bits of nodes which should be
* skipped will be unset.
*/
which = (1 << 1) | (1 << 2);
for (i = 0; i < in->nkeys; i++)
{
StrategyNumber strategy = in->scankeys[i].sk_strategy;
bool empty;
/*
* The only strategy when second argument of operator is not range
* is RANGESTRAT_CONTAINS_ELEM.
*/
if (strategy != RANGESTRAT_CONTAINS_ELEM)
empty = RangeIsEmpty(
DatumGetRangeType(in->scankeys[i].sk_argument));
else
empty = false;
switch (strategy)
{
case RANGESTRAT_BEFORE:
case RANGESTRAT_OVERLEFT:
case RANGESTRAT_OVERLAPS:
case RANGESTRAT_OVERRIGHT:
case RANGESTRAT_AFTER:
case RANGESTRAT_ADJACENT:
/* These strategies return false if any argument is empty */
if (empty)
which = 0;
else
which &= (1 << 2);
break;
case RANGESTRAT_CONTAINS:
/*
* All ranges contain an empty range. Only non-empty ranges
* can contain a non-empty range.
*/
if (!empty)
which &= (1 << 2);
break;
case RANGESTRAT_CONTAINED_BY:
/*
* Only an empty range is contained by an empty range. Both
* empty and non-empty ranges can be contained by a
* non-empty range.
*/
if (empty)
which &= (1 << 1);
break;
case RANGESTRAT_CONTAINS_ELEM:
which &= (1 << 2);
break;
case RANGESTRAT_EQ:
if (empty)
which &= (1 << 1);
else
which &= (1 << 2);
break;
default:
elog(ERROR, "unrecognized range strategy: %d", strategy);
break;
}
if (which == 0)
break; /* no need to consider remaining conditions */
}
}
else
{
RangeBound centroidLower,
centroidUpper;
bool centroidEmpty;
TypeCacheEntry *typcache;
RangeType *centroid;
/* This node has a centroid. Fetch it. */
centroid = DatumGetRangeType(in->prefixDatum);
typcache = range_get_typcache(fcinfo,
RangeTypeGetOid(DatumGetRangeType(centroid)));
range_deserialize(typcache, centroid, ¢roidLower, ¢roidUpper,
¢roidEmpty);
Assert(in->nNodes == 4 || in->nNodes == 5);
/*
* Nth bit of which variable means that (N - 1)th node (Nth quadrant)
* should be visited. Initially all bits are set. Bits of nodes which
* can be skipped will be unset.
*/
which = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4) | (1 << 5);
for (i = 0; i < in->nkeys; i++)
{
StrategyNumber strategy;
RangeBound lower,
upper;
bool empty;
RangeType *range = NULL;
/* Restrictions on range bounds according to scan strategy */
RangeBound *minLower = NULL,
*maxLower = NULL,
*minUpper = NULL,
*maxUpper = NULL;
/* Are the restrictions on range bounds inclusive? */
bool inclusive = true;
bool strictEmpty = true;
int cmp,
which1,
which2;
strategy = in->scankeys[i].sk_strategy;
/*
* RANGESTRAT_CONTAINS_ELEM is just like RANGESTRAT_CONTAINS, but
* the argument is a single element. Expand the single element to
* a range containing only the element, and treat it like
* RANGESTRAT_CONTAINS.
*/
if (strategy == RANGESTRAT_CONTAINS_ELEM)
{
lower.inclusive = true;
lower.infinite = false;
lower.lower = true;
lower.val = in->scankeys[i].sk_argument;
upper.inclusive = true;
upper.infinite = false;
upper.lower = false;
upper.val = in->scankeys[i].sk_argument;
empty = false;
strategy = RANGESTRAT_CONTAINS;
}
else
{
range = DatumGetRangeType(in->scankeys[i].sk_argument);
range_deserialize(typcache, range, &lower, &upper, &empty);
}
/*
* Most strategies are handled by forming a bounding box from the
* search key, defined by a minLower, maxLower, minUpper, maxUpper.
* Some modify 'which' directly, to specify exactly which quadrants
* need to be visited.
*
* For most strategies, nothing matches an empty search key, and
* an empty range never matches a non-empty key. If a strategy
* does not behave like that wrt. empty ranges, set strictEmpty to
* false.
*/
switch (strategy)
{
case RANGESTRAT_BEFORE:
/*
* Range A is before range B if upper bound of A is lower
* than lower bound of B.
*/
maxUpper = &lower;
inclusive = false;
break;
case RANGESTRAT_OVERLEFT:
/*
* Range A is overleft to range B if upper bound of A is
* less or equal to upper bound of B.
*/
maxUpper = &upper;
break;
case RANGESTRAT_OVERLAPS:
/*
* Non-empty ranges overlap, if lower bound of each range
* is lower or equal to upper bound of the other range.
*/
maxLower = &upper;
minUpper = &lower;
break;
case RANGESTRAT_OVERRIGHT:
/*
* Range A is overright to range B if lower bound of A is
* greater or equal to lower bound of B.
*/
minLower = &lower;
break;
case RANGESTRAT_AFTER:
/*
* Range A is after range B if lower bound of A is greater
* than upper bound of B.
*/
minLower = &upper;
inclusive = false;
break;
case RANGESTRAT_ADJACENT:
if (empty)
break; /* Skip to strictEmpty check. */
/*
* which1 is bitmask for possibility to be adjacent with
* lower bound of argument. which2 is bitmask for
* possibility to be adjacent with upper bound of argument.
*/
which1 = which2 = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
/*
* Previously selected quadrant could exclude possibility
* for lower or upper bounds to be adjacent. Deserialize
* previous centroid range if present for checking this.
*/
if (in->reconstructedValue != (Datum) 0)
{
RangeType *prevCentroid;
RangeBound prevLower,
prevUpper;
bool prevEmpty;
int cmp1,
cmp2;
prevCentroid = DatumGetRangeType(in->reconstructedValue);
range_deserialize(typcache, prevCentroid,
&prevLower, &prevUpper, &prevEmpty);
/*
* Check if lower bound of argument is not in a
* quadrant we visited in the previous step.
*/
cmp1 = range_cmp_bounds(typcache, &lower, &prevUpper);
cmp2 = range_cmp_bounds(typcache, ¢roidUpper,
&prevUpper);
if ((cmp2 < 0 && cmp1 > 0) || (cmp2 > 0 && cmp1 < 0))
which1 = 0;
/*
* Check if upper bound of argument is not in a
* quadrant we visited in the previous step.
*/
cmp1 = range_cmp_bounds(typcache, &upper, &prevLower);
cmp2 = range_cmp_bounds(typcache, ¢roidLower,
&prevLower);
if ((cmp2 < 0 && cmp1 > 0) || (cmp2 > 0 && cmp1 < 0))
which2 = 0;
}
if (which1)
{
/*
* For a range's upper bound to be adjacent to the
* argument's lower bound, it will be found along the
* line adjacent to (and just below) Y=lower.
* Therefore, if the argument's lower bound is less
* than the centroid's upper bound, the line falls in
* quadrants 2 and 3; if greater, the line falls in
* quadrants 1 and 4.
*
* The above is true even when the argument's lower
* bound is greater and adjacent to the centroid's
* upper bound. If the argument's lower bound is
* greater than the centroid's upper bound, then the
* lowest value that an adjacent range could have is
* that of the centroid's upper bound, which still
* falls in quadrants 1 and 4.
*
* In the edge case, where the argument's lower bound
* is equal to the cetroid's upper bound, there may be
* adjacent ranges in any quadrant.
*/
cmp = range_cmp_bounds(typcache, &lower,
¢roidUpper);
if (cmp < 0)
which1 &= (1 << 2) | (1 << 3);
else if (cmp > 0)
which1 &= (1 << 1) | (1 << 4);
}
if (which2)
{
/*
* For a range's lower bound to be adjacent to the
* argument's upper bound, it will be found along the
* line adjacent to (and just right of)
* X=upper. Therefore, if the argument's upper bound is
* less than (and not adjacent to) the centroid's upper
* bound, the line falls in quadrants 3 and 4; if
* greater or equal to, the line falls in quadrants 1
* and 2.
*
* The edge case is when the argument's upper bound is
* less than and adjacent to the centroid's lower
* bound. In that case, adjacent ranges may be in any
* quadrant.
*/
cmp = range_cmp_bounds(typcache, &lower,
¢roidUpper);
if (cmp < 0 &&
!bounds_adjacent(typcache, upper, centroidLower))
which1 &= (1 << 3) | (1 << 4);
else if (cmp > 0)
which1 &= (1 << 1) | (1 << 2);
}
which &= which1 | which2;
needPrevious = true;
break;
case RANGESTRAT_CONTAINS:
/*
* Non-empty range A contains non-empty range B if lower
* bound of A is lower or equal to lower bound of range B
* and upper bound of range A is greater or equal to upper
* bound of range A.
*
* All non-empty ranges contain an empty range.
*/
strictEmpty = false;
if (!empty)
{
which &= (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
maxLower = &lower;
minUpper = &upper;
}
break;
case RANGESTRAT_CONTAINED_BY:
/* The opposite of contains. */
strictEmpty = false;
if (empty)
{
/* An empty range is only contained by an empty range */
which &= (1 << 5);
}
else
{
minLower = &lower;
maxUpper = &upper;
}
break;
case RANGESTRAT_EQ:
/*
* Equal range can be only in the same quadrant where
* argument would be placed to.
*/
strictEmpty = false;
which &= (1 << getQuadrant(typcache, centroid, range));
break;
default:
elog(ERROR, "unrecognized range strategy: %d", strategy);
break;
}
if (strictEmpty)
{
if (empty)
{
/* Scan key is empty, no branches are satisfying */
which = 0;
break;
}
else
{
/* Shouldn't visit tree branch with empty ranges */
which &= (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
}
}
/*
* Using the bounding box, see which quadrants we have to descend
* into.
*/
if (minLower)
{
/*
* If the centroid's lower bound is less than or equal to
* the minimum lower bound, anything in the 3rd and 4th
* quadrants will have an even smaller lower bound, and thus
* can't match.
*/
if (range_cmp_bounds(typcache, ¢roidLower, minLower) <= 0)
which &= (1 << 1) | (1 << 2) | (1 << 5);
}
if (maxLower)
{
/*
* If the centroid's lower bound is greater than the maximum
* lower bound, anything in the 1st and 2nd quadrants will
* also have a greater than or equal lower bound, and thus
* can't match. If the centroid's lower bound is equal to
* the maximum lower bound, we can still exclude the 1st and
* 2nd quadrants if we're looking for a value strictly greater
* than the maximum.
*/
int cmp;
cmp = range_cmp_bounds(typcache, ¢roidLower, maxLower);
if (cmp > 0 || (!inclusive && cmp == 0))
which &= (1 << 3) | (1 << 4) | (1 << 5);
}
if (minUpper)
{
/*
* If the centroid's upper bound is less than or equal to
* the minimum upper bound, anything in the 2nd and 3rd
* quadrants will have an even smaller upper bound, and thus
* can't match.
*/
if (range_cmp_bounds(typcache, ¢roidUpper, minUpper) <= 0)
which &= (1 << 1) | (1 << 4) | (1 << 5);
}
if (maxUpper)
{
/*
* If the centroid's upper bound is greater than the maximum
* upper bound, anything in the 1st and 4th quadrants will
* also have a greater than or equal upper bound, and thus
* can't match. If the centroid's upper bound is equal to
* the maximum upper bound, we can still exclude the 1st and
* 4th quadrants if we're looking for a value strictly greater
* than the maximum.
*/
int cmp;
cmp = range_cmp_bounds(typcache, ¢roidUpper, maxUpper);
if (cmp > 0 || (!inclusive && cmp == 0))
which &= (1 << 2) | (1 << 3) | (1 << 5);
}
if (which == 0)
break; /* no need to consider remaining conditions */
}
}
/* We must descend into the quadrant(s) identified by 'which' */
out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
if (needPrevious)
out->reconstructedValues = (Datum *) palloc(sizeof(Datum) * in->nNodes);
out->nNodes = 0;
for (i = 1; i <= in->nNodes; i++)
{
if (which & (1 << i))
{
/* Save previous prefix if needed */
if (needPrevious)
out->reconstructedValues[out->nNodes] = in->prefixDatum;
out->nodeNumbers[out->nNodes++] = i - 1;
}
}
PG_RETURN_VOID();
}
| Datum spg_range_quad_leaf_consistent | ( | PG_FUNCTION_ARGS | ) |
Definition at line 803 of file rangetypes_spgist.c.
References DatumGetRangeType, elog, ERROR, i, spgLeafConsistentIn::leafDatum, spgLeafConsistentOut::leafValue, spgLeafConsistentIn::nkeys, PG_GETARG_POINTER, PG_RETURN_BOOL, range_adjacent_internal(), range_after_internal(), range_before_internal(), range_contained_by_internal(), range_contains_elem_internal(), range_contains_internal(), range_eq_internal(), range_get_typcache(), range_overlaps_internal(), range_overleft_internal(), range_overright_internal(), RANGESTRAT_ADJACENT, RANGESTRAT_AFTER, RANGESTRAT_BEFORE, RANGESTRAT_CONTAINED_BY, RANGESTRAT_CONTAINS, RANGESTRAT_CONTAINS_ELEM, RANGESTRAT_EQ, RANGESTRAT_OVERLAPS, RANGESTRAT_OVERLEFT, RANGESTRAT_OVERRIGHT, RangeTypeGetOid, spgLeafConsistentOut::recheck, spgLeafConsistentIn::scankeys, ScanKeyData::sk_argument, and ScanKeyData::sk_strategy.
{
spgLeafConsistentIn *in = (spgLeafConsistentIn *) PG_GETARG_POINTER(0);
spgLeafConsistentOut *out = (spgLeafConsistentOut *) PG_GETARG_POINTER(1);
RangeType *leafRange = DatumGetRangeType(in->leafDatum);
TypeCacheEntry *typcache;
bool res;
int i;
/* all tests are exact */
out->recheck = false;
/* leafDatum is what it is... */
out->leafValue = in->leafDatum;
typcache = range_get_typcache(fcinfo, RangeTypeGetOid(leafRange));
/* Perform the required comparison(s) */
res = true;
for (i = 0; i < in->nkeys; i++)
{
Datum keyDatum = in->scankeys[i].sk_argument;
/* Call the function corresponding to the scan strategy */
switch (in->scankeys[i].sk_strategy)
{
case RANGESTRAT_BEFORE:
res = range_before_internal(typcache, leafRange,
DatumGetRangeType(keyDatum));
break;
case RANGESTRAT_OVERLEFT:
res = range_overleft_internal(typcache, leafRange,
DatumGetRangeType(keyDatum));
break;
case RANGESTRAT_OVERLAPS:
res = range_overlaps_internal(typcache, leafRange,
DatumGetRangeType(keyDatum));
break;
case RANGESTRAT_OVERRIGHT:
res = range_overright_internal(typcache, leafRange,
DatumGetRangeType(keyDatum));
break;
case RANGESTRAT_AFTER:
res = range_after_internal(typcache, leafRange,
DatumGetRangeType(keyDatum));
break;
case RANGESTRAT_ADJACENT:
res = range_adjacent_internal(typcache, leafRange,
DatumGetRangeType(keyDatum));
break;
case RANGESTRAT_CONTAINS:
res = range_contains_internal(typcache, leafRange,
DatumGetRangeType(keyDatum));
break;
case RANGESTRAT_CONTAINED_BY:
res = range_contained_by_internal(typcache, leafRange,
DatumGetRangeType(keyDatum));
break;
case RANGESTRAT_CONTAINS_ELEM:
res = range_contains_elem_internal(typcache, leafRange,
keyDatum);
break;
case RANGESTRAT_EQ:
res = range_eq_internal(typcache, leafRange,
DatumGetRangeType(keyDatum));
break;
default:
elog(ERROR, "unrecognized range strategy: %d",
in->scankeys[i].sk_strategy);
break;
}
/*
* If leaf datum doesn't match to a query key, no need to check
* subsequent keys.
*/
if (!res)
break;
}
PG_RETURN_BOOL(res);
}
| Datum spg_range_quad_picksplit | ( | PG_FUNCTION_ARGS | ) |
Definition at line 201 of file rangetypes_spgist.c.
References bound_cmp(), DatumGetRangeType, spgPickSplitIn::datums, getQuadrant(), spgPickSplitOut::hasPrefix, i, spgPickSplitOut::leafTupleDatums, spgPickSplitIn::level, spgPickSplitOut::mapTuplesToNodes, spgPickSplitOut::nNodes, spgPickSplitOut::nodeLabels, spgPickSplitIn::nTuples, NULL, palloc(), PG_GETARG_POINTER, PG_RETURN_VOID, PointerGetDatum, spgPickSplitOut::prefixDatum, qsort_arg(), range(), range_deserialize(), range_get_typcache(), range_serialize(), RangeTypeGetDatum, and RangeTypeGetOid.
{
spgPickSplitIn *in = (spgPickSplitIn *) PG_GETARG_POINTER(0);
spgPickSplitOut *out = (spgPickSplitOut *) PG_GETARG_POINTER(1);
int i;
int j;
int nonEmptyCount;
RangeType *centroid;
bool empty;
TypeCacheEntry *typcache;
/* Use the median values of lower and upper bounds as the centroid range */
RangeBound *lowerBounds,
*upperBounds;
typcache = range_get_typcache(fcinfo,
RangeTypeGetOid(DatumGetRangeType(in->datums[0])));
/* Allocate memory for bounds */
lowerBounds = palloc(sizeof(RangeBound) * in->nTuples);
upperBounds = palloc(sizeof(RangeBound) * in->nTuples);
j = 0;
/* Deserialize bounds of ranges, count non-empty ranges */
for (i = 0; i < in->nTuples; i++)
{
range_deserialize(typcache, DatumGetRangeType(in->datums[i]),
&lowerBounds[j], &upperBounds[j], &empty);
if (!empty)
j++;
}
nonEmptyCount = j;
/*
* All the ranges are empty. The best we can do is to construct an inner
* node with no centroid, and put all ranges into node 0. If non-empty
* ranges are added later, they will be routed to node 1.
*/
if (nonEmptyCount == 0)
{
out->nNodes = 2;
out->hasPrefix = false;
/* Prefix is empty */
out->prefixDatum = PointerGetDatum(NULL);
out->nodeLabels = NULL;
out->mapTuplesToNodes = palloc(sizeof(int) * in->nTuples);
out->leafTupleDatums = palloc(sizeof(Datum) * in->nTuples);
/* Place all ranges into node 0 */
for (i = 0; i < in->nTuples; i++)
{
RangeType *range = DatumGetRangeType(in->datums[i]);
out->leafTupleDatums[i] = RangeTypeGetDatum(range);
out->mapTuplesToNodes[i] = 0;
}
PG_RETURN_VOID();
}
/* Sort range bounds in order to find medians */
qsort_arg(lowerBounds, nonEmptyCount, sizeof(RangeBound),
bound_cmp, typcache);
qsort_arg(upperBounds, nonEmptyCount, sizeof(RangeBound),
bound_cmp, typcache);
/* Construct "centroid" range from medians of lower and upper bounds */
centroid = range_serialize(typcache, &lowerBounds[nonEmptyCount / 2],
&upperBounds[nonEmptyCount / 2], false);
out->hasPrefix = true;
out->prefixDatum = RangeTypeGetDatum(centroid);
/* Create node for empty ranges only if it is a root node */
out->nNodes = (in->level == 0) ? 5 : 4;
out->nodeLabels = NULL; /* we don't need node labels */
out->mapTuplesToNodes = palloc(sizeof(int) * in->nTuples);
out->leafTupleDatums = palloc(sizeof(Datum) * in->nTuples);
/*
* Assign ranges to corresponding nodes according to quadrants relative to
* "centroid" range.
*/
for (i = 0; i < in->nTuples; i++)
{
RangeType *range = DatumGetRangeType(in->datums[i]);
int16 quadrant = getQuadrant(typcache, centroid, range);
out->leafTupleDatums[i] = RangeTypeGetDatum(range);
out->mapTuplesToNodes[i] = quadrant - 1;
}
PG_RETURN_VOID();
}
1.7.1