Header And Logo

PostgreSQL
| The world's most advanced open source database.

numeric.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * numeric.c
00004  *    An exact numeric data type for the Postgres database system
00005  *
00006  * Original coding 1998, Jan Wieck.  Heavily revised 2003, Tom Lane.
00007  *
00008  * Many of the algorithmic ideas are borrowed from David M. Smith's "FM"
00009  * multiple-precision math library, most recently published as Algorithm
00010  * 786: Multiple-Precision Complex Arithmetic and Functions, ACM
00011  * Transactions on Mathematical Software, Vol. 24, No. 4, December 1998,
00012  * pages 359-367.
00013  *
00014  * Copyright (c) 1998-2013, PostgreSQL Global Development Group
00015  *
00016  * IDENTIFICATION
00017  *    src/backend/utils/adt/numeric.c
00018  *
00019  *-------------------------------------------------------------------------
00020  */
00021 
00022 #include "postgres.h"
00023 
00024 #include <ctype.h>
00025 #include <float.h>
00026 #include <limits.h>
00027 #include <math.h>
00028 
00029 #include "access/hash.h"
00030 #include "catalog/pg_type.h"
00031 #include "libpq/pqformat.h"
00032 #include "miscadmin.h"
00033 #include "nodes/nodeFuncs.h"
00034 #include "utils/array.h"
00035 #include "utils/builtins.h"
00036 #include "utils/int8.h"
00037 #include "utils/numeric.h"
00038 
00039 /* ----------
00040  * Uncomment the following to enable compilation of dump_numeric()
00041  * and dump_var() and to get a dump of any result produced by make_result().
00042  * ----------
00043 #define NUMERIC_DEBUG
00044  */
00045 
00046 
00047 /* ----------
00048  * Local data types
00049  *
00050  * Numeric values are represented in a base-NBASE floating point format.
00051  * Each "digit" ranges from 0 to NBASE-1.  The type NumericDigit is signed
00052  * and wide enough to store a digit.  We assume that NBASE*NBASE can fit in
00053  * an int.  Although the purely calculational routines could handle any even
00054  * NBASE that's less than sqrt(INT_MAX), in practice we are only interested
00055  * in NBASE a power of ten, so that I/O conversions and decimal rounding
00056  * are easy.  Also, it's actually more efficient if NBASE is rather less than
00057  * sqrt(INT_MAX), so that there is "headroom" for mul_var and div_var_fast to
00058  * postpone processing carries.
00059  * ----------
00060  */
00061 
00062 #if 0
00063 #define NBASE       10
00064 #define HALF_NBASE  5
00065 #define DEC_DIGITS  1           /* decimal digits per NBASE digit */
00066 #define MUL_GUARD_DIGITS    4   /* these are measured in NBASE digits */
00067 #define DIV_GUARD_DIGITS    8
00068 
00069 typedef signed char NumericDigit;
00070 #endif
00071 
00072 #if 0
00073 #define NBASE       100
00074 #define HALF_NBASE  50
00075 #define DEC_DIGITS  2           /* decimal digits per NBASE digit */
00076 #define MUL_GUARD_DIGITS    3   /* these are measured in NBASE digits */
00077 #define DIV_GUARD_DIGITS    6
00078 
00079 typedef signed char NumericDigit;
00080 #endif
00081 
00082 #if 1
00083 #define NBASE       10000
00084 #define HALF_NBASE  5000
00085 #define DEC_DIGITS  4           /* decimal digits per NBASE digit */
00086 #define MUL_GUARD_DIGITS    2   /* these are measured in NBASE digits */
00087 #define DIV_GUARD_DIGITS    4
00088 
00089 typedef int16 NumericDigit;
00090 #endif
00091 
00092 /*
00093  * The Numeric type as stored on disk.
00094  *
00095  * If the high bits of the first word of a NumericChoice (n_header, or
00096  * n_short.n_header, or n_long.n_sign_dscale) are NUMERIC_SHORT, then the
00097  * numeric follows the NumericShort format; if they are NUMERIC_POS or
00098  * NUMERIC_NEG, it follows the NumericLong format.  If they are NUMERIC_NAN,
00099  * it is a NaN.  We currently always store a NaN using just two bytes (i.e.
00100  * only n_header), but previous releases used only the NumericLong format,
00101  * so we might find 4-byte NaNs on disk if a database has been migrated using
00102  * pg_upgrade.  In either case, when the high bits indicate a NaN, the
00103  * remaining bits are never examined.  Currently, we always initialize these
00104  * to zero, but it might be possible to use them for some other purpose in
00105  * the future.
00106  *
00107  * In the NumericShort format, the remaining 14 bits of the header word
00108  * (n_short.n_header) are allocated as follows: 1 for sign (positive or
00109  * negative), 6 for dynamic scale, and 7 for weight.  In practice, most
00110  * commonly-encountered values can be represented this way.
00111  *
00112  * In the NumericLong format, the remaining 14 bits of the header word
00113  * (n_long.n_sign_dscale) represent the display scale; and the weight is
00114  * stored separately in n_weight.
00115  *
00116  * NOTE: by convention, values in the packed form have been stripped of
00117  * all leading and trailing zero digits (where a "digit" is of base NBASE).
00118  * In particular, if the value is zero, there will be no digits at all!
00119  * The weight is arbitrary in that case, but we normally set it to zero.
00120  */
00121 
00122 struct NumericShort
00123 {
00124     uint16      n_header;       /* Sign + display scale + weight */
00125     NumericDigit n_data[1];     /* Digits */
00126 };
00127 
00128 struct NumericLong
00129 {
00130     uint16      n_sign_dscale;  /* Sign + display scale */
00131     int16       n_weight;       /* Weight of 1st digit  */
00132     NumericDigit n_data[1];     /* Digits */
00133 };
00134 
00135 union NumericChoice
00136 {
00137     uint16      n_header;       /* Header word */
00138     struct NumericLong n_long;  /* Long form (4-byte header) */
00139     struct NumericShort n_short;    /* Short form (2-byte header) */
00140 };
00141 
00142 struct NumericData
00143 {
00144     int32       vl_len_;        /* varlena header (do not touch directly!) */
00145     union NumericChoice choice; /* choice of format */
00146 };
00147 
00148 
00149 /*
00150  * Interpretation of high bits.
00151  */
00152 
00153 #define NUMERIC_SIGN_MASK   0xC000
00154 #define NUMERIC_POS         0x0000
00155 #define NUMERIC_NEG         0x4000
00156 #define NUMERIC_SHORT       0x8000
00157 #define NUMERIC_NAN         0xC000
00158 
00159 #define NUMERIC_FLAGBITS(n) ((n)->choice.n_header & NUMERIC_SIGN_MASK)
00160 #define NUMERIC_IS_NAN(n)       (NUMERIC_FLAGBITS(n) == NUMERIC_NAN)
00161 #define NUMERIC_IS_SHORT(n)     (NUMERIC_FLAGBITS(n) == NUMERIC_SHORT)
00162 
00163 #define NUMERIC_HDRSZ   (VARHDRSZ + sizeof(uint16) + sizeof(int16))
00164 #define NUMERIC_HDRSZ_SHORT (VARHDRSZ + sizeof(uint16))
00165 
00166 /*
00167  * If the flag bits are NUMERIC_SHORT or NUMERIC_NAN, we want the short header;
00168  * otherwise, we want the long one.  Instead of testing against each value, we
00169  * can just look at the high bit, for a slight efficiency gain.
00170  */
00171 #define NUMERIC_HEADER_SIZE(n) \
00172     (VARHDRSZ + sizeof(uint16) + \
00173         (((NUMERIC_FLAGBITS(n) & 0x8000) == 0) ? sizeof(int16) : 0))
00174 
00175 /*
00176  * Short format definitions.
00177  */
00178 
00179 #define NUMERIC_SHORT_SIGN_MASK         0x2000
00180 #define NUMERIC_SHORT_DSCALE_MASK       0x1F80
00181 #define NUMERIC_SHORT_DSCALE_SHIFT      7
00182 #define NUMERIC_SHORT_DSCALE_MAX        \
00183     (NUMERIC_SHORT_DSCALE_MASK >> NUMERIC_SHORT_DSCALE_SHIFT)
00184 #define NUMERIC_SHORT_WEIGHT_SIGN_MASK  0x0040
00185 #define NUMERIC_SHORT_WEIGHT_MASK       0x003F
00186 #define NUMERIC_SHORT_WEIGHT_MAX        NUMERIC_SHORT_WEIGHT_MASK
00187 #define NUMERIC_SHORT_WEIGHT_MIN        (-(NUMERIC_SHORT_WEIGHT_MASK+1))
00188 
00189 /*
00190  * Extract sign, display scale, weight.
00191  */
00192 
00193 #define NUMERIC_DSCALE_MASK         0x3FFF
00194 
00195 #define NUMERIC_SIGN(n) \
00196     (NUMERIC_IS_SHORT(n) ? \
00197         (((n)->choice.n_short.n_header & NUMERIC_SHORT_SIGN_MASK) ? \
00198         NUMERIC_NEG : NUMERIC_POS) : NUMERIC_FLAGBITS(n))
00199 #define NUMERIC_DSCALE(n)   (NUMERIC_IS_SHORT((n)) ? \
00200     ((n)->choice.n_short.n_header & NUMERIC_SHORT_DSCALE_MASK) \
00201         >> NUMERIC_SHORT_DSCALE_SHIFT \
00202     : ((n)->choice.n_long.n_sign_dscale & NUMERIC_DSCALE_MASK))
00203 #define NUMERIC_WEIGHT(n)   (NUMERIC_IS_SHORT((n)) ? \
00204     (((n)->choice.n_short.n_header & NUMERIC_SHORT_WEIGHT_SIGN_MASK ? \
00205         ~NUMERIC_SHORT_WEIGHT_MASK : 0) \
00206      | ((n)->choice.n_short.n_header & NUMERIC_SHORT_WEIGHT_MASK)) \
00207     : ((n)->choice.n_long.n_weight))
00208 
00209 /* ----------
00210  * NumericVar is the format we use for arithmetic.  The digit-array part
00211  * is the same as the NumericData storage format, but the header is more
00212  * complex.
00213  *
00214  * The value represented by a NumericVar is determined by the sign, weight,
00215  * ndigits, and digits[] array.
00216  * Note: the first digit of a NumericVar's value is assumed to be multiplied
00217  * by NBASE ** weight.  Another way to say it is that there are weight+1
00218  * digits before the decimal point.  It is possible to have weight < 0.
00219  *
00220  * buf points at the physical start of the palloc'd digit buffer for the
00221  * NumericVar.  digits points at the first digit in actual use (the one
00222  * with the specified weight).  We normally leave an unused digit or two
00223  * (preset to zeroes) between buf and digits, so that there is room to store
00224  * a carry out of the top digit without reallocating space.  We just need to
00225  * decrement digits (and increment weight) to make room for the carry digit.
00226  * (There is no such extra space in a numeric value stored in the database,
00227  * only in a NumericVar in memory.)
00228  *
00229  * If buf is NULL then the digit buffer isn't actually palloc'd and should
00230  * not be freed --- see the constants below for an example.
00231  *
00232  * dscale, or display scale, is the nominal precision expressed as number
00233  * of digits after the decimal point (it must always be >= 0 at present).
00234  * dscale may be more than the number of physically stored fractional digits,
00235  * implying that we have suppressed storage of significant trailing zeroes.
00236  * It should never be less than the number of stored digits, since that would
00237  * imply hiding digits that are present.  NOTE that dscale is always expressed
00238  * in *decimal* digits, and so it may correspond to a fractional number of
00239  * base-NBASE digits --- divide by DEC_DIGITS to convert to NBASE digits.
00240  *
00241  * rscale, or result scale, is the target precision for a computation.
00242  * Like dscale it is expressed as number of *decimal* digits after the decimal
00243  * point, and is always >= 0 at present.
00244  * Note that rscale is not stored in variables --- it's figured on-the-fly
00245  * from the dscales of the inputs.
00246  *
00247  * NB: All the variable-level functions are written in a style that makes it
00248  * possible to give one and the same variable as argument and destination.
00249  * This is feasible because the digit buffer is separate from the variable.
00250  * ----------
00251  */
00252 typedef struct NumericVar
00253 {
00254     int         ndigits;        /* # of digits in digits[] - can be 0! */
00255     int         weight;         /* weight of first digit */
00256     int         sign;           /* NUMERIC_POS, NUMERIC_NEG, or NUMERIC_NAN */
00257     int         dscale;         /* display scale */
00258     NumericDigit *buf;          /* start of palloc'd space for digits[] */
00259     NumericDigit *digits;       /* base-NBASE digits */
00260 } NumericVar;
00261 
00262 
00263 /* ----------
00264  * Some preinitialized constants
00265  * ----------
00266  */
00267 static NumericDigit const_zero_data[1] = {0};
00268 static NumericVar const_zero =
00269 {0, 0, NUMERIC_POS, 0, NULL, const_zero_data};
00270 
00271 static NumericDigit const_one_data[1] = {1};
00272 static NumericVar const_one =
00273 {1, 0, NUMERIC_POS, 0, NULL, const_one_data};
00274 
00275 static NumericDigit const_two_data[1] = {2};
00276 static NumericVar const_two =
00277 {1, 0, NUMERIC_POS, 0, NULL, const_two_data};
00278 
00279 #if DEC_DIGITS == 4 || DEC_DIGITS == 2
00280 static NumericDigit const_ten_data[1] = {10};
00281 static NumericVar const_ten =
00282 {1, 0, NUMERIC_POS, 0, NULL, const_ten_data};
00283 #elif DEC_DIGITS == 1
00284 static NumericDigit const_ten_data[1] = {1};
00285 static NumericVar const_ten =
00286 {1, 1, NUMERIC_POS, 0, NULL, const_ten_data};
00287 #endif
00288 
00289 #if DEC_DIGITS == 4
00290 static NumericDigit const_zero_point_five_data[1] = {5000};
00291 #elif DEC_DIGITS == 2
00292 static NumericDigit const_zero_point_five_data[1] = {50};
00293 #elif DEC_DIGITS == 1
00294 static NumericDigit const_zero_point_five_data[1] = {5};
00295 #endif
00296 static NumericVar const_zero_point_five =
00297 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_five_data};
00298 
00299 #if DEC_DIGITS == 4
00300 static NumericDigit const_zero_point_nine_data[1] = {9000};
00301 #elif DEC_DIGITS == 2
00302 static NumericDigit const_zero_point_nine_data[1] = {90};
00303 #elif DEC_DIGITS == 1
00304 static NumericDigit const_zero_point_nine_data[1] = {9};
00305 #endif
00306 static NumericVar const_zero_point_nine =
00307 {1, -1, NUMERIC_POS, 1, NULL, const_zero_point_nine_data};
00308 
00309 #if DEC_DIGITS == 4
00310 static NumericDigit const_zero_point_01_data[1] = {100};
00311 static NumericVar const_zero_point_01 =
00312 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
00313 #elif DEC_DIGITS == 2
00314 static NumericDigit const_zero_point_01_data[1] = {1};
00315 static NumericVar const_zero_point_01 =
00316 {1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
00317 #elif DEC_DIGITS == 1
00318 static NumericDigit const_zero_point_01_data[1] = {1};
00319 static NumericVar const_zero_point_01 =
00320 {1, -2, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
00321 #endif
00322 
00323 #if DEC_DIGITS == 4
00324 static NumericDigit const_one_point_one_data[2] = {1, 1000};
00325 #elif DEC_DIGITS == 2
00326 static NumericDigit const_one_point_one_data[2] = {1, 10};
00327 #elif DEC_DIGITS == 1
00328 static NumericDigit const_one_point_one_data[2] = {1, 1};
00329 #endif
00330 static NumericVar const_one_point_one =
00331 {2, 0, NUMERIC_POS, 1, NULL, const_one_point_one_data};
00332 
00333 static NumericVar const_nan =
00334 {0, 0, NUMERIC_NAN, 0, NULL, NULL};
00335 
00336 #if DEC_DIGITS == 4
00337 static const int round_powers[4] = {0, 1000, 100, 10};
00338 #endif
00339 
00340 
00341 /* ----------
00342  * Local functions
00343  * ----------
00344  */
00345 
00346 #ifdef NUMERIC_DEBUG
00347 static void dump_numeric(const char *str, Numeric num);
00348 static void dump_var(const char *str, NumericVar *var);
00349 #else
00350 #define dump_numeric(s,n)
00351 #define dump_var(s,v)
00352 #endif
00353 
00354 #define digitbuf_alloc(ndigits)  \
00355     ((NumericDigit *) palloc((ndigits) * sizeof(NumericDigit)))
00356 #define digitbuf_free(buf)  \
00357     do { \
00358          if ((buf) != NULL) \
00359              pfree(buf); \
00360     } while (0)
00361 
00362 #define init_var(v)     MemSetAligned(v, 0, sizeof(NumericVar))
00363 
00364 #define NUMERIC_DIGITS(num) (NUMERIC_IS_SHORT(num) ? \
00365     (num)->choice.n_short.n_data : (num)->choice.n_long.n_data)
00366 #define NUMERIC_NDIGITS(num) \
00367     ((VARSIZE(num) - NUMERIC_HEADER_SIZE(num)) / sizeof(NumericDigit))
00368 #define NUMERIC_CAN_BE_SHORT(scale,weight) \
00369     ((scale) <= NUMERIC_SHORT_DSCALE_MAX && \
00370     (weight) <= NUMERIC_SHORT_WEIGHT_MAX && \
00371     (weight) >= NUMERIC_SHORT_WEIGHT_MIN)
00372 
00373 static void alloc_var(NumericVar *var, int ndigits);
00374 static void free_var(NumericVar *var);
00375 static void zero_var(NumericVar *var);
00376 
00377 static const char *set_var_from_str(const char *str, const char *cp,
00378                  NumericVar *dest);
00379 static void set_var_from_num(Numeric value, NumericVar *dest);
00380 static void init_var_from_num(Numeric num, NumericVar *dest);
00381 static void set_var_from_var(NumericVar *value, NumericVar *dest);
00382 static char *get_str_from_var(NumericVar *var);
00383 static char *get_str_from_var_sci(NumericVar *var, int rscale);
00384 
00385 static Numeric make_result(NumericVar *var);
00386 
00387 static void apply_typmod(NumericVar *var, int32 typmod);
00388 
00389 static int32 numericvar_to_int4(NumericVar *var);
00390 static bool numericvar_to_int8(NumericVar *var, int64 *result);
00391 static void int8_to_numericvar(int64 val, NumericVar *var);
00392 static double numeric_to_double_no_overflow(Numeric num);
00393 static double numericvar_to_double_no_overflow(NumericVar *var);
00394 
00395 static int  cmp_numerics(Numeric num1, Numeric num2);
00396 static int  cmp_var(NumericVar *var1, NumericVar *var2);
00397 static int cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
00398                int var1weight, int var1sign,
00399                const NumericDigit *var2digits, int var2ndigits,
00400                int var2weight, int var2sign);
00401 static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
00402 static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
00403 static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
00404         int rscale);
00405 static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
00406         int rscale, bool round);
00407 static void div_var_fast(NumericVar *var1, NumericVar *var2, NumericVar *result,
00408              int rscale, bool round);
00409 static int  select_div_scale(NumericVar *var1, NumericVar *var2);
00410 static void mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
00411 static void ceil_var(NumericVar *var, NumericVar *result);
00412 static void floor_var(NumericVar *var, NumericVar *result);
00413 
00414 static void sqrt_var(NumericVar *arg, NumericVar *result, int rscale);
00415 static void exp_var(NumericVar *arg, NumericVar *result, int rscale);
00416 static void exp_var_internal(NumericVar *arg, NumericVar *result, int rscale);
00417 static void ln_var(NumericVar *arg, NumericVar *result, int rscale);
00418 static void log_var(NumericVar *base, NumericVar *num, NumericVar *result);
00419 static void power_var(NumericVar *base, NumericVar *exp, NumericVar *result);
00420 static void power_var_int(NumericVar *base, int exp, NumericVar *result,
00421               int rscale);
00422 
00423 static int  cmp_abs(NumericVar *var1, NumericVar *var2);
00424 static int cmp_abs_common(const NumericDigit *var1digits, int var1ndigits,
00425                int var1weight,
00426                const NumericDigit *var2digits, int var2ndigits,
00427                int var2weight);
00428 static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
00429 static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
00430 static void round_var(NumericVar *var, int rscale);
00431 static void trunc_var(NumericVar *var, int rscale);
00432 static void strip_var(NumericVar *var);
00433 static void compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
00434                NumericVar *count_var, NumericVar *result_var);
00435 
00436 
00437 /* ----------------------------------------------------------------------
00438  *
00439  * Input-, output- and rounding-functions
00440  *
00441  * ----------------------------------------------------------------------
00442  */
00443 
00444 
00445 /*
00446  * numeric_in() -
00447  *
00448  *  Input function for numeric data type
00449  */
00450 Datum
00451 numeric_in(PG_FUNCTION_ARGS)
00452 {
00453     char       *str = PG_GETARG_CSTRING(0);
00454 
00455 #ifdef NOT_USED
00456     Oid         typelem = PG_GETARG_OID(1);
00457 #endif
00458     int32       typmod = PG_GETARG_INT32(2);
00459     Numeric     res;
00460     const char *cp;
00461 
00462     /* Skip leading spaces */
00463     cp = str;
00464     while (*cp)
00465     {
00466         if (!isspace((unsigned char) *cp))
00467             break;
00468         cp++;
00469     }
00470 
00471     /*
00472      * Check for NaN
00473      */
00474     if (pg_strncasecmp(cp, "NaN", 3) == 0)
00475     {
00476         res = make_result(&const_nan);
00477 
00478         /* Should be nothing left but spaces */
00479         cp += 3;
00480         while (*cp)
00481         {
00482             if (!isspace((unsigned char) *cp))
00483                 ereport(ERROR,
00484                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
00485                       errmsg("invalid input syntax for type numeric: \"%s\"",
00486                              str)));
00487             cp++;
00488         }
00489     }
00490     else
00491     {
00492         /*
00493          * Use set_var_from_str() to parse a normal numeric value
00494          */
00495         NumericVar  value;
00496 
00497         init_var(&value);
00498 
00499         cp = set_var_from_str(str, cp, &value);
00500 
00501         /*
00502          * We duplicate a few lines of code here because we would like to
00503          * throw any trailing-junk syntax error before any semantic error
00504          * resulting from apply_typmod.  We can't easily fold the two cases
00505          * together because we mustn't apply apply_typmod to a NaN.
00506          */
00507         while (*cp)
00508         {
00509             if (!isspace((unsigned char) *cp))
00510                 ereport(ERROR,
00511                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
00512                       errmsg("invalid input syntax for type numeric: \"%s\"",
00513                              str)));
00514             cp++;
00515         }
00516 
00517         apply_typmod(&value, typmod);
00518 
00519         res = make_result(&value);
00520         free_var(&value);
00521     }
00522 
00523     PG_RETURN_NUMERIC(res);
00524 }
00525 
00526 
00527 /*
00528  * numeric_out() -
00529  *
00530  *  Output function for numeric data type
00531  */
00532 Datum
00533 numeric_out(PG_FUNCTION_ARGS)
00534 {
00535     Numeric     num = PG_GETARG_NUMERIC(0);
00536     NumericVar  x;
00537     char       *str;
00538 
00539     /*
00540      * Handle NaN
00541      */
00542     if (NUMERIC_IS_NAN(num))
00543         PG_RETURN_CSTRING(pstrdup("NaN"));
00544 
00545     /*
00546      * Get the number in the variable format.
00547      */
00548     init_var_from_num(num, &x);
00549 
00550     str = get_str_from_var(&x);
00551 
00552     PG_RETURN_CSTRING(str);
00553 }
00554 
00555 /*
00556  * numeric_is_nan() -
00557  *
00558  *  Is Numeric value a NaN?
00559  */
00560 bool
00561 numeric_is_nan(Numeric num)
00562 {
00563     return NUMERIC_IS_NAN(num);
00564 }
00565 
00566 /*
00567  * numeric_maximum_size() -
00568  *
00569  *  Maximum size of a numeric with given typmod, or -1 if unlimited/unknown.
00570  */
00571 int32
00572 numeric_maximum_size(int32 typmod)
00573 {
00574     int         precision;
00575     int         numeric_digits;
00576 
00577     if (typmod < (int32) (VARHDRSZ))
00578         return -1;
00579 
00580     /* precision (ie, max # of digits) is in upper bits of typmod */
00581     precision = ((typmod - VARHDRSZ) >> 16) & 0xffff;
00582 
00583     /*
00584      * This formula computes the maximum number of NumericDigits we could need
00585      * in order to store the specified number of decimal digits. Because the
00586      * weight is stored as a number of NumericDigits rather than a number of
00587      * decimal digits, it's possible that the first NumericDigit will contain
00588      * only a single decimal digit.  Thus, the first two decimal digits can
00589      * require two NumericDigits to store, but it isn't until we reach
00590      * DEC_DIGITS + 2 decimal digits that we potentially need a third
00591      * NumericDigit.
00592      */
00593     numeric_digits = (precision + 2 * (DEC_DIGITS - 1)) / DEC_DIGITS;
00594 
00595     /*
00596      * In most cases, the size of a numeric will be smaller than the value
00597      * computed below, because the varlena header will typically get toasted
00598      * down to a single byte before being stored on disk, and it may also be
00599      * possible to use a short numeric header.  But our job here is to compute
00600      * the worst case.
00601      */
00602     return NUMERIC_HDRSZ + (numeric_digits * sizeof(NumericDigit));
00603 }
00604 
00605 /*
00606  * numeric_out_sci() -
00607  *
00608  *  Output function for numeric data type in scientific notation.
00609  */
00610 char *
00611 numeric_out_sci(Numeric num, int scale)
00612 {
00613     NumericVar  x;
00614     char       *str;
00615 
00616     /*
00617      * Handle NaN
00618      */
00619     if (NUMERIC_IS_NAN(num))
00620         return pstrdup("NaN");
00621 
00622     init_var_from_num(num, &x);
00623 
00624     str = get_str_from_var_sci(&x, scale);
00625 
00626     return str;
00627 }
00628 
00629 /*
00630  *      numeric_recv            - converts external binary format to numeric
00631  *
00632  * External format is a sequence of int16's:
00633  * ndigits, weight, sign, dscale, NumericDigits.
00634  */
00635 Datum
00636 numeric_recv(PG_FUNCTION_ARGS)
00637 {
00638     StringInfo  buf = (StringInfo) PG_GETARG_POINTER(0);
00639 
00640 #ifdef NOT_USED
00641     Oid         typelem = PG_GETARG_OID(1);
00642 #endif
00643     int32       typmod = PG_GETARG_INT32(2);
00644     NumericVar  value;
00645     Numeric     res;
00646     int         len,
00647                 i;
00648 
00649     init_var(&value);
00650 
00651     len = (uint16) pq_getmsgint(buf, sizeof(uint16));
00652     if (len < 0 || len > NUMERIC_MAX_PRECISION + NUMERIC_MAX_RESULT_SCALE)
00653         ereport(ERROR,
00654                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
00655                  errmsg("invalid length in external \"numeric\" value")));
00656 
00657     alloc_var(&value, len);
00658 
00659     value.weight = (int16) pq_getmsgint(buf, sizeof(int16));
00660     value.sign = (uint16) pq_getmsgint(buf, sizeof(uint16));
00661     if (!(value.sign == NUMERIC_POS ||
00662           value.sign == NUMERIC_NEG ||
00663           value.sign == NUMERIC_NAN))
00664         ereport(ERROR,
00665                 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
00666                  errmsg("invalid sign in external \"numeric\" value")));
00667 
00668     value.dscale = (uint16) pq_getmsgint(buf, sizeof(uint16));
00669     for (i = 0; i < len; i++)
00670     {
00671         NumericDigit d = pq_getmsgint(buf, sizeof(NumericDigit));
00672 
00673         if (d < 0 || d >= NBASE)
00674             ereport(ERROR,
00675                     (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
00676                      errmsg("invalid digit in external \"numeric\" value")));
00677         value.digits[i] = d;
00678     }
00679 
00680     apply_typmod(&value, typmod);
00681 
00682     res = make_result(&value);
00683     free_var(&value);
00684 
00685     PG_RETURN_NUMERIC(res);
00686 }
00687 
00688 /*
00689  *      numeric_send            - converts numeric to binary format
00690  */
00691 Datum
00692 numeric_send(PG_FUNCTION_ARGS)
00693 {
00694     Numeric     num = PG_GETARG_NUMERIC(0);
00695     NumericVar  x;
00696     StringInfoData buf;
00697     int         i;
00698 
00699     init_var_from_num(num, &x);
00700 
00701     pq_begintypsend(&buf);
00702 
00703     pq_sendint(&buf, x.ndigits, sizeof(int16));
00704     pq_sendint(&buf, x.weight, sizeof(int16));
00705     pq_sendint(&buf, x.sign, sizeof(int16));
00706     pq_sendint(&buf, x.dscale, sizeof(int16));
00707     for (i = 0; i < x.ndigits; i++)
00708         pq_sendint(&buf, x.digits[i], sizeof(NumericDigit));
00709 
00710     PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
00711 }
00712 
00713 
00714 /*
00715  * numeric_transform() -
00716  *
00717  * Flatten calls to numeric's length coercion function that solely represent
00718  * increases in allowable precision.  Scale changes mutate every datum, so
00719  * they are unoptimizable.  Some values, e.g. 1E-1001, can only fit into an
00720  * unconstrained numeric, so a change from an unconstrained numeric to any
00721  * constrained numeric is also unoptimizable.
00722  */
00723 Datum
00724 numeric_transform(PG_FUNCTION_ARGS)
00725 {
00726     FuncExpr   *expr = (FuncExpr *) PG_GETARG_POINTER(0);
00727     Node       *ret = NULL;
00728     Node       *typmod;
00729 
00730     Assert(IsA(expr, FuncExpr));
00731     Assert(list_length(expr->args) >= 2);
00732 
00733     typmod = (Node *) lsecond(expr->args);
00734 
00735     if (IsA(typmod, Const) &&!((Const *) typmod)->constisnull)
00736     {
00737         Node       *source = (Node *) linitial(expr->args);
00738         int32       old_typmod = exprTypmod(source);
00739         int32       new_typmod = DatumGetInt32(((Const *) typmod)->constvalue);
00740         int32       old_scale = (old_typmod - VARHDRSZ) & 0xffff;
00741         int32       new_scale = (new_typmod - VARHDRSZ) & 0xffff;
00742         int32       old_precision = (old_typmod - VARHDRSZ) >> 16 & 0xffff;
00743         int32       new_precision = (new_typmod - VARHDRSZ) >> 16 & 0xffff;
00744 
00745         /*
00746          * If new_typmod < VARHDRSZ, the destination is unconstrained; that's
00747          * always OK.  If old_typmod >= VARHDRSZ, the source is constrained,
00748          * and we're OK if the scale is unchanged and the precision is not
00749          * decreasing.  See further notes in function header comment.
00750          */
00751         if (new_typmod < (int32) VARHDRSZ ||
00752             (old_typmod >= (int32) VARHDRSZ &&
00753              new_scale == old_scale && new_precision >= old_precision))
00754             ret = relabel_to_typmod(source, new_typmod);
00755     }
00756 
00757     PG_RETURN_POINTER(ret);
00758 }
00759 
00760 /*
00761  * numeric() -
00762  *
00763  *  This is a special function called by the Postgres database system
00764  *  before a value is stored in a tuple's attribute. The precision and
00765  *  scale of the attribute have to be applied on the value.
00766  */
00767 Datum
00768 numeric     (PG_FUNCTION_ARGS)
00769 {
00770     Numeric     num = PG_GETARG_NUMERIC(0);
00771     int32       typmod = PG_GETARG_INT32(1);
00772     Numeric     new;
00773     int32       tmp_typmod;
00774     int         precision;
00775     int         scale;
00776     int         ddigits;
00777     int         maxdigits;
00778     NumericVar  var;
00779 
00780     /*
00781      * Handle NaN
00782      */
00783     if (NUMERIC_IS_NAN(num))
00784         PG_RETURN_NUMERIC(make_result(&const_nan));
00785 
00786     /*
00787      * If the value isn't a valid type modifier, simply return a copy of the
00788      * input value
00789      */
00790     if (typmod < (int32) (VARHDRSZ))
00791     {
00792         new = (Numeric) palloc(VARSIZE(num));
00793         memcpy(new, num, VARSIZE(num));
00794         PG_RETURN_NUMERIC(new);
00795     }
00796 
00797     /*
00798      * Get the precision and scale out of the typmod value
00799      */
00800     tmp_typmod = typmod - VARHDRSZ;
00801     precision = (tmp_typmod >> 16) & 0xffff;
00802     scale = tmp_typmod & 0xffff;
00803     maxdigits = precision - scale;
00804 
00805     /*
00806      * If the number is certainly in bounds and due to the target scale no
00807      * rounding could be necessary, just make a copy of the input and modify
00808      * its scale fields, unless the larger scale forces us to abandon the
00809      * short representation.  (Note we assume the existing dscale is
00810      * honest...)
00811      */
00812     ddigits = (NUMERIC_WEIGHT(num) + 1) * DEC_DIGITS;
00813     if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num)
00814         && (NUMERIC_CAN_BE_SHORT(scale, NUMERIC_WEIGHT(num))
00815             || !NUMERIC_IS_SHORT(num)))
00816     {
00817         new = (Numeric) palloc(VARSIZE(num));
00818         memcpy(new, num, VARSIZE(num));
00819         if (NUMERIC_IS_SHORT(num))
00820             new->choice.n_short.n_header =
00821                 (num->choice.n_short.n_header & ~NUMERIC_SHORT_DSCALE_MASK)
00822                 | (scale << NUMERIC_SHORT_DSCALE_SHIFT);
00823         else
00824             new->choice.n_long.n_sign_dscale = NUMERIC_SIGN(new) |
00825                 ((uint16) scale & NUMERIC_DSCALE_MASK);
00826         PG_RETURN_NUMERIC(new);
00827     }
00828 
00829     /*
00830      * We really need to fiddle with things - unpack the number into a
00831      * variable and let apply_typmod() do it.
00832      */
00833     init_var(&var);
00834 
00835     set_var_from_num(num, &var);
00836     apply_typmod(&var, typmod);
00837     new = make_result(&var);
00838 
00839     free_var(&var);
00840 
00841     PG_RETURN_NUMERIC(new);
00842 }
00843 
00844 Datum
00845 numerictypmodin(PG_FUNCTION_ARGS)
00846 {
00847     ArrayType  *ta = PG_GETARG_ARRAYTYPE_P(0);
00848     int32      *tl;
00849     int         n;
00850     int32       typmod;
00851 
00852     tl = ArrayGetIntegerTypmods(ta, &n);
00853 
00854     if (n == 2)
00855     {
00856         if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
00857             ereport(ERROR,
00858                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00859                      errmsg("NUMERIC precision %d must be between 1 and %d",
00860                             tl[0], NUMERIC_MAX_PRECISION)));
00861         if (tl[1] < 0 || tl[1] > tl[0])
00862             ereport(ERROR,
00863                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00864                 errmsg("NUMERIC scale %d must be between 0 and precision %d",
00865                        tl[1], tl[0])));
00866         typmod = ((tl[0] << 16) | tl[1]) + VARHDRSZ;
00867     }
00868     else if (n == 1)
00869     {
00870         if (tl[0] < 1 || tl[0] > NUMERIC_MAX_PRECISION)
00871             ereport(ERROR,
00872                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00873                      errmsg("NUMERIC precision %d must be between 1 and %d",
00874                             tl[0], NUMERIC_MAX_PRECISION)));
00875         /* scale defaults to zero */
00876         typmod = (tl[0] << 16) + VARHDRSZ;
00877     }
00878     else
00879     {
00880         ereport(ERROR,
00881                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00882                  errmsg("invalid NUMERIC type modifier")));
00883         typmod = 0;             /* keep compiler quiet */
00884     }
00885 
00886     PG_RETURN_INT32(typmod);
00887 }
00888 
00889 Datum
00890 numerictypmodout(PG_FUNCTION_ARGS)
00891 {
00892     int32       typmod = PG_GETARG_INT32(0);
00893     char       *res = (char *) palloc(64);
00894 
00895     if (typmod >= 0)
00896         snprintf(res, 64, "(%d,%d)",
00897                  ((typmod - VARHDRSZ) >> 16) & 0xffff,
00898                  (typmod - VARHDRSZ) & 0xffff);
00899     else
00900         *res = '\0';
00901 
00902     PG_RETURN_CSTRING(res);
00903 }
00904 
00905 
00906 /* ----------------------------------------------------------------------
00907  *
00908  * Sign manipulation, rounding and the like
00909  *
00910  * ----------------------------------------------------------------------
00911  */
00912 
00913 Datum
00914 numeric_abs(PG_FUNCTION_ARGS)
00915 {
00916     Numeric     num = PG_GETARG_NUMERIC(0);
00917     Numeric     res;
00918 
00919     /*
00920      * Handle NaN
00921      */
00922     if (NUMERIC_IS_NAN(num))
00923         PG_RETURN_NUMERIC(make_result(&const_nan));
00924 
00925     /*
00926      * Do it the easy way directly on the packed format
00927      */
00928     res = (Numeric) palloc(VARSIZE(num));
00929     memcpy(res, num, VARSIZE(num));
00930 
00931     if (NUMERIC_IS_SHORT(num))
00932         res->choice.n_short.n_header =
00933             num->choice.n_short.n_header & ~NUMERIC_SHORT_SIGN_MASK;
00934     else
00935         res->choice.n_long.n_sign_dscale = NUMERIC_POS | NUMERIC_DSCALE(num);
00936 
00937     PG_RETURN_NUMERIC(res);
00938 }
00939 
00940 
00941 Datum
00942 numeric_uminus(PG_FUNCTION_ARGS)
00943 {
00944     Numeric     num = PG_GETARG_NUMERIC(0);
00945     Numeric     res;
00946 
00947     /*
00948      * Handle NaN
00949      */
00950     if (NUMERIC_IS_NAN(num))
00951         PG_RETURN_NUMERIC(make_result(&const_nan));
00952 
00953     /*
00954      * Do it the easy way directly on the packed format
00955      */
00956     res = (Numeric) palloc(VARSIZE(num));
00957     memcpy(res, num, VARSIZE(num));
00958 
00959     /*
00960      * The packed format is known to be totally zero digit trimmed always. So
00961      * we can identify a ZERO by the fact that there are no digits at all.  Do
00962      * nothing to a zero.
00963      */
00964     if (NUMERIC_NDIGITS(num) != 0)
00965     {
00966         /* Else, flip the sign */
00967         if (NUMERIC_IS_SHORT(num))
00968             res->choice.n_short.n_header =
00969                 num->choice.n_short.n_header ^ NUMERIC_SHORT_SIGN_MASK;
00970         else if (NUMERIC_SIGN(num) == NUMERIC_POS)
00971             res->choice.n_long.n_sign_dscale =
00972                 NUMERIC_NEG | NUMERIC_DSCALE(num);
00973         else
00974             res->choice.n_long.n_sign_dscale =
00975                 NUMERIC_POS | NUMERIC_DSCALE(num);
00976     }
00977 
00978     PG_RETURN_NUMERIC(res);
00979 }
00980 
00981 
00982 Datum
00983 numeric_uplus(PG_FUNCTION_ARGS)
00984 {
00985     Numeric     num = PG_GETARG_NUMERIC(0);
00986     Numeric     res;
00987 
00988     res = (Numeric) palloc(VARSIZE(num));
00989     memcpy(res, num, VARSIZE(num));
00990 
00991     PG_RETURN_NUMERIC(res);
00992 }
00993 
00994 /*
00995  * numeric_sign() -
00996  *
00997  * returns -1 if the argument is less than 0, 0 if the argument is equal
00998  * to 0, and 1 if the argument is greater than zero.
00999  */
01000 Datum
01001 numeric_sign(PG_FUNCTION_ARGS)
01002 {
01003     Numeric     num = PG_GETARG_NUMERIC(0);
01004     Numeric     res;
01005     NumericVar  result;
01006 
01007     /*
01008      * Handle NaN
01009      */
01010     if (NUMERIC_IS_NAN(num))
01011         PG_RETURN_NUMERIC(make_result(&const_nan));
01012 
01013     init_var(&result);
01014 
01015     /*
01016      * The packed format is known to be totally zero digit trimmed always. So
01017      * we can identify a ZERO by the fact that there are no digits at all.
01018      */
01019     if (NUMERIC_NDIGITS(num) == 0)
01020         set_var_from_var(&const_zero, &result);
01021     else
01022     {
01023         /*
01024          * And if there are some, we return a copy of ONE with the sign of our
01025          * argument
01026          */
01027         set_var_from_var(&const_one, &result);
01028         result.sign = NUMERIC_SIGN(num);
01029     }
01030 
01031     res = make_result(&result);
01032     free_var(&result);
01033 
01034     PG_RETURN_NUMERIC(res);
01035 }
01036 
01037 
01038 /*
01039  * numeric_round() -
01040  *
01041  *  Round a value to have 'scale' digits after the decimal point.
01042  *  We allow negative 'scale', implying rounding before the decimal
01043  *  point --- Oracle interprets rounding that way.
01044  */
01045 Datum
01046 numeric_round(PG_FUNCTION_ARGS)
01047 {
01048     Numeric     num = PG_GETARG_NUMERIC(0);
01049     int32       scale = PG_GETARG_INT32(1);
01050     Numeric     res;
01051     NumericVar  arg;
01052 
01053     /*
01054      * Handle NaN
01055      */
01056     if (NUMERIC_IS_NAN(num))
01057         PG_RETURN_NUMERIC(make_result(&const_nan));
01058 
01059     /*
01060      * Limit the scale value to avoid possible overflow in calculations
01061      */
01062     scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
01063     scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
01064 
01065     /*
01066      * Unpack the argument and round it at the proper digit position
01067      */
01068     init_var(&arg);
01069     set_var_from_num(num, &arg);
01070 
01071     round_var(&arg, scale);
01072 
01073     /* We don't allow negative output dscale */
01074     if (scale < 0)
01075         arg.dscale = 0;
01076 
01077     /*
01078      * Return the rounded result
01079      */
01080     res = make_result(&arg);
01081 
01082     free_var(&arg);
01083     PG_RETURN_NUMERIC(res);
01084 }
01085 
01086 
01087 /*
01088  * numeric_trunc() -
01089  *
01090  *  Truncate a value to have 'scale' digits after the decimal point.
01091  *  We allow negative 'scale', implying a truncation before the decimal
01092  *  point --- Oracle interprets truncation that way.
01093  */
01094 Datum
01095 numeric_trunc(PG_FUNCTION_ARGS)
01096 {
01097     Numeric     num = PG_GETARG_NUMERIC(0);
01098     int32       scale = PG_GETARG_INT32(1);
01099     Numeric     res;
01100     NumericVar  arg;
01101 
01102     /*
01103      * Handle NaN
01104      */
01105     if (NUMERIC_IS_NAN(num))
01106         PG_RETURN_NUMERIC(make_result(&const_nan));
01107 
01108     /*
01109      * Limit the scale value to avoid possible overflow in calculations
01110      */
01111     scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
01112     scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
01113 
01114     /*
01115      * Unpack the argument and truncate it at the proper digit position
01116      */
01117     init_var(&arg);
01118     set_var_from_num(num, &arg);
01119 
01120     trunc_var(&arg, scale);
01121 
01122     /* We don't allow negative output dscale */
01123     if (scale < 0)
01124         arg.dscale = 0;
01125 
01126     /*
01127      * Return the truncated result
01128      */
01129     res = make_result(&arg);
01130 
01131     free_var(&arg);
01132     PG_RETURN_NUMERIC(res);
01133 }
01134 
01135 
01136 /*
01137  * numeric_ceil() -
01138  *
01139  *  Return the smallest integer greater than or equal to the argument
01140  */
01141 Datum
01142 numeric_ceil(PG_FUNCTION_ARGS)
01143 {
01144     Numeric     num = PG_GETARG_NUMERIC(0);
01145     Numeric     res;
01146     NumericVar  result;
01147 
01148     if (NUMERIC_IS_NAN(num))
01149         PG_RETURN_NUMERIC(make_result(&const_nan));
01150 
01151     init_var_from_num(num, &result);
01152     ceil_var(&result, &result);
01153 
01154     res = make_result(&result);
01155     free_var(&result);
01156 
01157     PG_RETURN_NUMERIC(res);
01158 }
01159 
01160 
01161 /*
01162  * numeric_floor() -
01163  *
01164  *  Return the largest integer equal to or less than the argument
01165  */
01166 Datum
01167 numeric_floor(PG_FUNCTION_ARGS)
01168 {
01169     Numeric     num = PG_GETARG_NUMERIC(0);
01170     Numeric     res;
01171     NumericVar  result;
01172 
01173     if (NUMERIC_IS_NAN(num))
01174         PG_RETURN_NUMERIC(make_result(&const_nan));
01175 
01176     init_var_from_num(num, &result);
01177     floor_var(&result, &result);
01178 
01179     res = make_result(&result);
01180     free_var(&result);
01181 
01182     PG_RETURN_NUMERIC(res);
01183 }
01184 
01185 /*
01186  * Implements the numeric version of the width_bucket() function
01187  * defined by SQL2003. See also width_bucket_float8().
01188  *
01189  * 'bound1' and 'bound2' are the lower and upper bounds of the
01190  * histogram's range, respectively. 'count' is the number of buckets
01191  * in the histogram. width_bucket() returns an integer indicating the
01192  * bucket number that 'operand' belongs to in an equiwidth histogram
01193  * with the specified characteristics. An operand smaller than the
01194  * lower bound is assigned to bucket 0. An operand greater than the
01195  * upper bound is assigned to an additional bucket (with number
01196  * count+1). We don't allow "NaN" for any of the numeric arguments.
01197  */
01198 Datum
01199 width_bucket_numeric(PG_FUNCTION_ARGS)
01200 {
01201     Numeric     operand = PG_GETARG_NUMERIC(0);
01202     Numeric     bound1 = PG_GETARG_NUMERIC(1);
01203     Numeric     bound2 = PG_GETARG_NUMERIC(2);
01204     int32       count = PG_GETARG_INT32(3);
01205     NumericVar  count_var;
01206     NumericVar  result_var;
01207     int32       result;
01208 
01209     if (count <= 0)
01210         ereport(ERROR,
01211                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
01212                  errmsg("count must be greater than zero")));
01213 
01214     if (NUMERIC_IS_NAN(operand) ||
01215         NUMERIC_IS_NAN(bound1) ||
01216         NUMERIC_IS_NAN(bound2))
01217         ereport(ERROR,
01218                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
01219              errmsg("operand, lower bound, and upper bound cannot be NaN")));
01220 
01221     init_var(&result_var);
01222     init_var(&count_var);
01223 
01224     /* Convert 'count' to a numeric, for ease of use later */
01225     int8_to_numericvar((int64) count, &count_var);
01226 
01227     switch (cmp_numerics(bound1, bound2))
01228     {
01229         case 0:
01230             ereport(ERROR,
01231                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_WIDTH_BUCKET_FUNCTION),
01232                  errmsg("lower bound cannot equal upper bound")));
01233 
01234             /* bound1 < bound2 */
01235         case -1:
01236             if (cmp_numerics(operand, bound1) < 0)
01237                 set_var_from_var(&const_zero, &result_var);
01238             else if (cmp_numerics(operand, bound2) >= 0)
01239                 add_var(&count_var, &const_one, &result_var);
01240             else
01241                 compute_bucket(operand, bound1, bound2,
01242                                &count_var, &result_var);
01243             break;
01244 
01245             /* bound1 > bound2 */
01246         case 1:
01247             if (cmp_numerics(operand, bound1) > 0)
01248                 set_var_from_var(&const_zero, &result_var);
01249             else if (cmp_numerics(operand, bound2) <= 0)
01250                 add_var(&count_var, &const_one, &result_var);
01251             else
01252                 compute_bucket(operand, bound1, bound2,
01253                                &count_var, &result_var);
01254             break;
01255     }
01256 
01257     /* if result exceeds the range of a legal int4, we ereport here */
01258     result = numericvar_to_int4(&result_var);
01259 
01260     free_var(&count_var);
01261     free_var(&result_var);
01262 
01263     PG_RETURN_INT32(result);
01264 }
01265 
01266 /*
01267  * If 'operand' is not outside the bucket range, determine the correct
01268  * bucket for it to go. The calculations performed by this function
01269  * are derived directly from the SQL2003 spec.
01270  */
01271 static void
01272 compute_bucket(Numeric operand, Numeric bound1, Numeric bound2,
01273                NumericVar *count_var, NumericVar *result_var)
01274 {
01275     NumericVar  bound1_var;
01276     NumericVar  bound2_var;
01277     NumericVar  operand_var;
01278 
01279     init_var_from_num(bound1, &bound1_var);
01280     init_var_from_num(bound2, &bound2_var);
01281     init_var_from_num(operand, &operand_var);
01282 
01283     if (cmp_var(&bound1_var, &bound2_var) < 0)
01284     {
01285         sub_var(&operand_var, &bound1_var, &operand_var);
01286         sub_var(&bound2_var, &bound1_var, &bound2_var);
01287         div_var(&operand_var, &bound2_var, result_var,
01288                 select_div_scale(&operand_var, &bound2_var), true);
01289     }
01290     else
01291     {
01292         sub_var(&bound1_var, &operand_var, &operand_var);
01293         sub_var(&bound1_var, &bound2_var, &bound1_var);
01294         div_var(&operand_var, &bound1_var, result_var,
01295                 select_div_scale(&operand_var, &bound1_var), true);
01296     }
01297 
01298     mul_var(result_var, count_var, result_var,
01299             result_var->dscale + count_var->dscale);
01300     add_var(result_var, &const_one, result_var);
01301     floor_var(result_var, result_var);
01302 
01303     free_var(&bound1_var);
01304     free_var(&bound2_var);
01305     free_var(&operand_var);
01306 }
01307 
01308 /* ----------------------------------------------------------------------
01309  *
01310  * Comparison functions
01311  *
01312  * Note: btree indexes need these routines not to leak memory; therefore,
01313  * be careful to free working copies of toasted datums.  Most places don't
01314  * need to be so careful.
01315  * ----------------------------------------------------------------------
01316  */
01317 
01318 
01319 Datum
01320 numeric_cmp(PG_FUNCTION_ARGS)
01321 {
01322     Numeric     num1 = PG_GETARG_NUMERIC(0);
01323     Numeric     num2 = PG_GETARG_NUMERIC(1);
01324     int         result;
01325 
01326     result = cmp_numerics(num1, num2);
01327 
01328     PG_FREE_IF_COPY(num1, 0);
01329     PG_FREE_IF_COPY(num2, 1);
01330 
01331     PG_RETURN_INT32(result);
01332 }
01333 
01334 
01335 Datum
01336 numeric_eq(PG_FUNCTION_ARGS)
01337 {
01338     Numeric     num1 = PG_GETARG_NUMERIC(0);
01339     Numeric     num2 = PG_GETARG_NUMERIC(1);
01340     bool        result;
01341 
01342     result = cmp_numerics(num1, num2) == 0;
01343 
01344     PG_FREE_IF_COPY(num1, 0);
01345     PG_FREE_IF_COPY(num2, 1);
01346 
01347     PG_RETURN_BOOL(result);
01348 }
01349 
01350 Datum
01351 numeric_ne(PG_FUNCTION_ARGS)
01352 {
01353     Numeric     num1 = PG_GETARG_NUMERIC(0);
01354     Numeric     num2 = PG_GETARG_NUMERIC(1);
01355     bool        result;
01356 
01357     result = cmp_numerics(num1, num2) != 0;
01358 
01359     PG_FREE_IF_COPY(num1, 0);
01360     PG_FREE_IF_COPY(num2, 1);
01361 
01362     PG_RETURN_BOOL(result);
01363 }
01364 
01365 Datum
01366 numeric_gt(PG_FUNCTION_ARGS)
01367 {
01368     Numeric     num1 = PG_GETARG_NUMERIC(0);
01369     Numeric     num2 = PG_GETARG_NUMERIC(1);
01370     bool        result;
01371 
01372     result = cmp_numerics(num1, num2) > 0;
01373 
01374     PG_FREE_IF_COPY(num1, 0);
01375     PG_FREE_IF_COPY(num2, 1);
01376 
01377     PG_RETURN_BOOL(result);
01378 }
01379 
01380 Datum
01381 numeric_ge(PG_FUNCTION_ARGS)
01382 {
01383     Numeric     num1 = PG_GETARG_NUMERIC(0);
01384     Numeric     num2 = PG_GETARG_NUMERIC(1);
01385     bool        result;
01386 
01387     result = cmp_numerics(num1, num2) >= 0;
01388 
01389     PG_FREE_IF_COPY(num1, 0);
01390     PG_FREE_IF_COPY(num2, 1);
01391 
01392     PG_RETURN_BOOL(result);
01393 }
01394 
01395 Datum
01396 numeric_lt(PG_FUNCTION_ARGS)
01397 {
01398     Numeric     num1 = PG_GETARG_NUMERIC(0);
01399     Numeric     num2 = PG_GETARG_NUMERIC(1);
01400     bool        result;
01401 
01402     result = cmp_numerics(num1, num2) < 0;
01403 
01404     PG_FREE_IF_COPY(num1, 0);
01405     PG_FREE_IF_COPY(num2, 1);
01406 
01407     PG_RETURN_BOOL(result);
01408 }
01409 
01410 Datum
01411 numeric_le(PG_FUNCTION_ARGS)
01412 {
01413     Numeric     num1 = PG_GETARG_NUMERIC(0);
01414     Numeric     num2 = PG_GETARG_NUMERIC(1);
01415     bool        result;
01416 
01417     result = cmp_numerics(num1, num2) <= 0;
01418 
01419     PG_FREE_IF_COPY(num1, 0);
01420     PG_FREE_IF_COPY(num2, 1);
01421 
01422     PG_RETURN_BOOL(result);
01423 }
01424 
01425 static int
01426 cmp_numerics(Numeric num1, Numeric num2)
01427 {
01428     int         result;
01429 
01430     /*
01431      * We consider all NANs to be equal and larger than any non-NAN. This is
01432      * somewhat arbitrary; the important thing is to have a consistent sort
01433      * order.
01434      */
01435     if (NUMERIC_IS_NAN(num1))
01436     {
01437         if (NUMERIC_IS_NAN(num2))
01438             result = 0;         /* NAN = NAN */
01439         else
01440             result = 1;         /* NAN > non-NAN */
01441     }
01442     else if (NUMERIC_IS_NAN(num2))
01443     {
01444         result = -1;            /* non-NAN < NAN */
01445     }
01446     else
01447     {
01448         result = cmp_var_common(NUMERIC_DIGITS(num1), NUMERIC_NDIGITS(num1),
01449                                 NUMERIC_WEIGHT(num1), NUMERIC_SIGN(num1),
01450                                 NUMERIC_DIGITS(num2), NUMERIC_NDIGITS(num2),
01451                                 NUMERIC_WEIGHT(num2), NUMERIC_SIGN(num2));
01452     }
01453 
01454     return result;
01455 }
01456 
01457 Datum
01458 hash_numeric(PG_FUNCTION_ARGS)
01459 {
01460     Numeric     key = PG_GETARG_NUMERIC(0);
01461     Datum       digit_hash;
01462     Datum       result;
01463     int         weight;
01464     int         start_offset;
01465     int         end_offset;
01466     int         i;
01467     int         hash_len;
01468     NumericDigit *digits;
01469 
01470     /* If it's NaN, don't try to hash the rest of the fields */
01471     if (NUMERIC_IS_NAN(key))
01472         PG_RETURN_UINT32(0);
01473 
01474     weight = NUMERIC_WEIGHT(key);
01475     start_offset = 0;
01476     end_offset = 0;
01477 
01478     /*
01479      * Omit any leading or trailing zeros from the input to the hash. The
01480      * numeric implementation *should* guarantee that leading and trailing
01481      * zeros are suppressed, but we're paranoid. Note that we measure the
01482      * starting and ending offsets in units of NumericDigits, not bytes.
01483      */
01484     digits = NUMERIC_DIGITS(key);
01485     for (i = 0; i < NUMERIC_NDIGITS(key); i++)
01486     {
01487         if (digits[i] != (NumericDigit) 0)
01488             break;
01489 
01490         start_offset++;
01491 
01492         /*
01493          * The weight is effectively the # of digits before the decimal point,
01494          * so decrement it for each leading zero we skip.
01495          */
01496         weight--;
01497     }
01498 
01499     /*
01500      * If there are no non-zero digits, then the value of the number is zero,
01501      * regardless of any other fields.
01502      */
01503     if (NUMERIC_NDIGITS(key) == start_offset)
01504         PG_RETURN_UINT32(-1);
01505 
01506     for (i = NUMERIC_NDIGITS(key) - 1; i >= 0; i--)
01507     {
01508         if (digits[i] != (NumericDigit) 0)
01509             break;
01510 
01511         end_offset++;
01512     }
01513 
01514     /* If we get here, there should be at least one non-zero digit */
01515     Assert(start_offset + end_offset < NUMERIC_NDIGITS(key));
01516 
01517     /*
01518      * Note that we don't hash on the Numeric's scale, since two numerics can
01519      * compare equal but have different scales. We also don't hash on the
01520      * sign, although we could: since a sign difference implies inequality,
01521      * this shouldn't affect correctness.
01522      */
01523     hash_len = NUMERIC_NDIGITS(key) - start_offset - end_offset;
01524     digit_hash = hash_any((unsigned char *) (NUMERIC_DIGITS(key) + start_offset),
01525                           hash_len * sizeof(NumericDigit));
01526 
01527     /* Mix in the weight, via XOR */
01528     result = digit_hash ^ weight;
01529 
01530     PG_RETURN_DATUM(result);
01531 }
01532 
01533 
01534 /* ----------------------------------------------------------------------
01535  *
01536  * Basic arithmetic functions
01537  *
01538  * ----------------------------------------------------------------------
01539  */
01540 
01541 
01542 /*
01543  * numeric_add() -
01544  *
01545  *  Add two numerics
01546  */
01547 Datum
01548 numeric_add(PG_FUNCTION_ARGS)
01549 {
01550     Numeric     num1 = PG_GETARG_NUMERIC(0);
01551     Numeric     num2 = PG_GETARG_NUMERIC(1);
01552     NumericVar  arg1;
01553     NumericVar  arg2;
01554     NumericVar  result;
01555     Numeric     res;
01556 
01557     /*
01558      * Handle NaN
01559      */
01560     if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
01561         PG_RETURN_NUMERIC(make_result(&const_nan));
01562 
01563     /*
01564      * Unpack the values, let add_var() compute the result and return it.
01565      */
01566     init_var_from_num(num1, &arg1);
01567     init_var_from_num(num2, &arg2);
01568 
01569     init_var(&result);
01570     add_var(&arg1, &arg2, &result);
01571 
01572     res = make_result(&result);
01573 
01574     free_var(&result);
01575 
01576     PG_RETURN_NUMERIC(res);
01577 }
01578 
01579 
01580 /*
01581  * numeric_sub() -
01582  *
01583  *  Subtract one numeric from another
01584  */
01585 Datum
01586 numeric_sub(PG_FUNCTION_ARGS)
01587 {
01588     Numeric     num1 = PG_GETARG_NUMERIC(0);
01589     Numeric     num2 = PG_GETARG_NUMERIC(1);
01590     NumericVar  arg1;
01591     NumericVar  arg2;
01592     NumericVar  result;
01593     Numeric     res;
01594 
01595     /*
01596      * Handle NaN
01597      */
01598     if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
01599         PG_RETURN_NUMERIC(make_result(&const_nan));
01600 
01601     /*
01602      * Unpack the values, let sub_var() compute the result and return it.
01603      */
01604     init_var_from_num(num1, &arg1);
01605     init_var_from_num(num2, &arg2);
01606 
01607     init_var(&result);
01608     sub_var(&arg1, &arg2, &result);
01609 
01610     res = make_result(&result);
01611 
01612     free_var(&result);
01613 
01614     PG_RETURN_NUMERIC(res);
01615 }
01616 
01617 
01618 /*
01619  * numeric_mul() -
01620  *
01621  *  Calculate the product of two numerics
01622  */
01623 Datum
01624 numeric_mul(PG_FUNCTION_ARGS)
01625 {
01626     Numeric     num1 = PG_GETARG_NUMERIC(0);
01627     Numeric     num2 = PG_GETARG_NUMERIC(1);
01628     NumericVar  arg1;
01629     NumericVar  arg2;
01630     NumericVar  result;
01631     Numeric     res;
01632 
01633     /*
01634      * Handle NaN
01635      */
01636     if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
01637         PG_RETURN_NUMERIC(make_result(&const_nan));
01638 
01639     /*
01640      * Unpack the values, let mul_var() compute the result and return it.
01641      * Unlike add_var() and sub_var(), mul_var() will round its result. In the
01642      * case of numeric_mul(), which is invoked for the * operator on numerics,
01643      * we request exact representation for the product (rscale = sum(dscale of
01644      * arg1, dscale of arg2)).
01645      */
01646     init_var_from_num(num1, &arg1);
01647     init_var_from_num(num2, &arg2);
01648 
01649     init_var(&result);
01650     mul_var(&arg1, &arg2, &result, arg1.dscale + arg2.dscale);
01651 
01652     res = make_result(&result);
01653 
01654     free_var(&result);
01655 
01656     PG_RETURN_NUMERIC(res);
01657 }
01658 
01659 
01660 /*
01661  * numeric_div() -
01662  *
01663  *  Divide one numeric into another
01664  */
01665 Datum
01666 numeric_div(PG_FUNCTION_ARGS)
01667 {
01668     Numeric     num1 = PG_GETARG_NUMERIC(0);
01669     Numeric     num2 = PG_GETARG_NUMERIC(1);
01670     NumericVar  arg1;
01671     NumericVar  arg2;
01672     NumericVar  result;
01673     Numeric     res;
01674     int         rscale;
01675 
01676     /*
01677      * Handle NaN
01678      */
01679     if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
01680         PG_RETURN_NUMERIC(make_result(&const_nan));
01681 
01682     /*
01683      * Unpack the arguments
01684      */
01685     init_var_from_num(num1, &arg1);
01686     init_var_from_num(num2, &arg2);
01687 
01688     init_var(&result);
01689 
01690     /*
01691      * Select scale for division result
01692      */
01693     rscale = select_div_scale(&arg1, &arg2);
01694 
01695     /*
01696      * Do the divide and return the result
01697      */
01698     div_var(&arg1, &arg2, &result, rscale, true);
01699 
01700     res = make_result(&result);
01701 
01702     free_var(&result);
01703 
01704     PG_RETURN_NUMERIC(res);
01705 }
01706 
01707 
01708 /*
01709  * numeric_div_trunc() -
01710  *
01711  *  Divide one numeric into another, truncating the result to an integer
01712  */
01713 Datum
01714 numeric_div_trunc(PG_FUNCTION_ARGS)
01715 {
01716     Numeric     num1 = PG_GETARG_NUMERIC(0);
01717     Numeric     num2 = PG_GETARG_NUMERIC(1);
01718     NumericVar  arg1;
01719     NumericVar  arg2;
01720     NumericVar  result;
01721     Numeric     res;
01722 
01723     /*
01724      * Handle NaN
01725      */
01726     if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
01727         PG_RETURN_NUMERIC(make_result(&const_nan));
01728 
01729     /*
01730      * Unpack the arguments
01731      */
01732     init_var_from_num(num1, &arg1);
01733     init_var_from_num(num2, &arg2);
01734 
01735     init_var(&result);
01736 
01737     /*
01738      * Do the divide and return the result
01739      */
01740     div_var(&arg1, &arg2, &result, 0, false);
01741 
01742     res = make_result(&result);
01743 
01744     free_var(&result);
01745 
01746     PG_RETURN_NUMERIC(res);
01747 }
01748 
01749 
01750 /*
01751  * numeric_mod() -
01752  *
01753  *  Calculate the modulo of two numerics
01754  */
01755 Datum
01756 numeric_mod(PG_FUNCTION_ARGS)
01757 {
01758     Numeric     num1 = PG_GETARG_NUMERIC(0);
01759     Numeric     num2 = PG_GETARG_NUMERIC(1);
01760     Numeric     res;
01761     NumericVar  arg1;
01762     NumericVar  arg2;
01763     NumericVar  result;
01764 
01765     if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
01766         PG_RETURN_NUMERIC(make_result(&const_nan));
01767 
01768     init_var_from_num(num1, &arg1);
01769     init_var_from_num(num2, &arg2);
01770 
01771     init_var(&result);
01772 
01773     mod_var(&arg1, &arg2, &result);
01774 
01775     res = make_result(&result);
01776 
01777     free_var(&result);
01778 
01779     PG_RETURN_NUMERIC(res);
01780 }
01781 
01782 
01783 /*
01784  * numeric_inc() -
01785  *
01786  *  Increment a number by one
01787  */
01788 Datum
01789 numeric_inc(PG_FUNCTION_ARGS)
01790 {
01791     Numeric     num = PG_GETARG_NUMERIC(0);
01792     NumericVar  arg;
01793     Numeric     res;
01794 
01795     /*
01796      * Handle NaN
01797      */
01798     if (NUMERIC_IS_NAN(num))
01799         PG_RETURN_NUMERIC(make_result(&const_nan));
01800 
01801     /*
01802      * Compute the result and return it
01803      */
01804     init_var_from_num(num, &arg);
01805 
01806     add_var(&arg, &const_one, &arg);
01807 
01808     res = make_result(&arg);
01809 
01810     free_var(&arg);
01811 
01812     PG_RETURN_NUMERIC(res);
01813 }
01814 
01815 
01816 /*
01817  * numeric_smaller() -
01818  *
01819  *  Return the smaller of two numbers
01820  */
01821 Datum
01822 numeric_smaller(PG_FUNCTION_ARGS)
01823 {
01824     Numeric     num1 = PG_GETARG_NUMERIC(0);
01825     Numeric     num2 = PG_GETARG_NUMERIC(1);
01826 
01827     /*
01828      * Use cmp_numerics so that this will agree with the comparison operators,
01829      * particularly as regards comparisons involving NaN.
01830      */
01831     if (cmp_numerics(num1, num2) < 0)
01832         PG_RETURN_NUMERIC(num1);
01833     else
01834         PG_RETURN_NUMERIC(num2);
01835 }
01836 
01837 
01838 /*
01839  * numeric_larger() -
01840  *
01841  *  Return the larger of two numbers
01842  */
01843 Datum
01844 numeric_larger(PG_FUNCTION_ARGS)
01845 {
01846     Numeric     num1 = PG_GETARG_NUMERIC(0);
01847     Numeric     num2 = PG_GETARG_NUMERIC(1);
01848 
01849     /*
01850      * Use cmp_numerics so that this will agree with the comparison operators,
01851      * particularly as regards comparisons involving NaN.
01852      */
01853     if (cmp_numerics(num1, num2) > 0)
01854         PG_RETURN_NUMERIC(num1);
01855     else
01856         PG_RETURN_NUMERIC(num2);
01857 }
01858 
01859 
01860 /* ----------------------------------------------------------------------
01861  *
01862  * Advanced math functions
01863  *
01864  * ----------------------------------------------------------------------
01865  */
01866 
01867 /*
01868  * numeric_fac()
01869  *
01870  * Compute factorial
01871  */
01872 Datum
01873 numeric_fac(PG_FUNCTION_ARGS)
01874 {
01875     int64       num = PG_GETARG_INT64(0);
01876     Numeric     res;
01877     NumericVar  fact;
01878     NumericVar  result;
01879 
01880     if (num <= 1)
01881     {
01882         res = make_result(&const_one);
01883         PG_RETURN_NUMERIC(res);
01884     }
01885     /* Fail immediately if the result would overflow */
01886     if (num > 32177)
01887         ereport(ERROR,
01888                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
01889                  errmsg("value overflows numeric format")));
01890 
01891     init_var(&fact);
01892     init_var(&result);
01893 
01894     int8_to_numericvar(num, &result);
01895 
01896     for (num = num - 1; num > 1; num--)
01897     {
01898         /* this loop can take awhile, so allow it to be interrupted */
01899         CHECK_FOR_INTERRUPTS();
01900 
01901         int8_to_numericvar(num, &fact);
01902 
01903         mul_var(&result, &fact, &result, 0);
01904     }
01905 
01906     res = make_result(&result);
01907 
01908     free_var(&fact);
01909     free_var(&result);
01910 
01911     PG_RETURN_NUMERIC(res);
01912 }
01913 
01914 
01915 /*
01916  * numeric_sqrt() -
01917  *
01918  *  Compute the square root of a numeric.
01919  */
01920 Datum
01921 numeric_sqrt(PG_FUNCTION_ARGS)
01922 {
01923     Numeric     num = PG_GETARG_NUMERIC(0);
01924     Numeric     res;
01925     NumericVar  arg;
01926     NumericVar  result;
01927     int         sweight;
01928     int         rscale;
01929 
01930     /*
01931      * Handle NaN
01932      */
01933     if (NUMERIC_IS_NAN(num))
01934         PG_RETURN_NUMERIC(make_result(&const_nan));
01935 
01936     /*
01937      * Unpack the argument and determine the result scale.  We choose a scale
01938      * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
01939      * case not less than the input's dscale.
01940      */
01941     init_var_from_num(num, &arg);
01942 
01943     init_var(&result);
01944 
01945     /* Assume the input was normalized, so arg.weight is accurate */
01946     sweight = (arg.weight + 1) * DEC_DIGITS / 2 - 1;
01947 
01948     rscale = NUMERIC_MIN_SIG_DIGITS - sweight;
01949     rscale = Max(rscale, arg.dscale);
01950     rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
01951     rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
01952 
01953     /*
01954      * Let sqrt_var() do the calculation and return the result.
01955      */
01956     sqrt_var(&arg, &result, rscale);
01957 
01958     res = make_result(&result);
01959 
01960     free_var(&result);
01961 
01962     PG_RETURN_NUMERIC(res);
01963 }
01964 
01965 
01966 /*
01967  * numeric_exp() -
01968  *
01969  *  Raise e to the power of x
01970  */
01971 Datum
01972 numeric_exp(PG_FUNCTION_ARGS)
01973 {
01974     Numeric     num = PG_GETARG_NUMERIC(0);
01975     Numeric     res;
01976     NumericVar  arg;
01977     NumericVar  result;
01978     int         rscale;
01979     double      val;
01980 
01981     /*
01982      * Handle NaN
01983      */
01984     if (NUMERIC_IS_NAN(num))
01985         PG_RETURN_NUMERIC(make_result(&const_nan));
01986 
01987     /*
01988      * Unpack the argument and determine the result scale.  We choose a scale
01989      * to give at least NUMERIC_MIN_SIG_DIGITS significant digits; but in any
01990      * case not less than the input's dscale.
01991      */
01992     init_var_from_num(num, &arg);
01993 
01994     init_var(&result);
01995 
01996     /* convert input to float8, ignoring overflow */
01997     val = numericvar_to_double_no_overflow(&arg);
01998 
01999     /*
02000      * log10(result) = num * log10(e), so this is approximately the decimal
02001      * weight of the result:
02002      */
02003     val *= 0.434294481903252;
02004 
02005     /* limit to something that won't cause integer overflow */
02006     val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
02007     val = Min(val, NUMERIC_MAX_RESULT_SCALE);
02008 
02009     rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
02010     rscale = Max(rscale, arg.dscale);
02011     rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
02012     rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
02013 
02014     /*
02015      * Let exp_var() do the calculation and return the result.
02016      */
02017     exp_var(&arg, &result, rscale);
02018 
02019     res = make_result(&result);
02020 
02021     free_var(&result);
02022 
02023     PG_RETURN_NUMERIC(res);
02024 }
02025 
02026 
02027 /*
02028  * numeric_ln() -
02029  *
02030  *  Compute the natural logarithm of x
02031  */
02032 Datum
02033 numeric_ln(PG_FUNCTION_ARGS)
02034 {
02035     Numeric     num = PG_GETARG_NUMERIC(0);
02036     Numeric     res;
02037     NumericVar  arg;
02038     NumericVar  result;
02039     int         dec_digits;
02040     int         rscale;
02041 
02042     /*
02043      * Handle NaN
02044      */
02045     if (NUMERIC_IS_NAN(num))
02046         PG_RETURN_NUMERIC(make_result(&const_nan));
02047 
02048     init_var_from_num(num, &arg);
02049     init_var(&result);
02050 
02051     /* Approx decimal digits before decimal point */
02052     dec_digits = (arg.weight + 1) * DEC_DIGITS;
02053 
02054     if (dec_digits > 1)
02055         rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
02056     else if (dec_digits < 1)
02057         rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
02058     else
02059         rscale = NUMERIC_MIN_SIG_DIGITS;
02060 
02061     rscale = Max(rscale, arg.dscale);
02062     rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
02063     rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
02064 
02065     ln_var(&arg, &result, rscale);
02066 
02067     res = make_result(&result);
02068 
02069     free_var(&result);
02070 
02071     PG_RETURN_NUMERIC(res);
02072 }
02073 
02074 
02075 /*
02076  * numeric_log() -
02077  *
02078  *  Compute the logarithm of x in a given base
02079  */
02080 Datum
02081 numeric_log(PG_FUNCTION_ARGS)
02082 {
02083     Numeric     num1 = PG_GETARG_NUMERIC(0);
02084     Numeric     num2 = PG_GETARG_NUMERIC(1);
02085     Numeric     res;
02086     NumericVar  arg1;
02087     NumericVar  arg2;
02088     NumericVar  result;
02089 
02090     /*
02091      * Handle NaN
02092      */
02093     if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
02094         PG_RETURN_NUMERIC(make_result(&const_nan));
02095 
02096     /*
02097      * Initialize things
02098      */
02099     init_var_from_num(num1, &arg1);
02100     init_var_from_num(num2, &arg2);
02101     init_var(&result);
02102 
02103     /*
02104      * Call log_var() to compute and return the result; note it handles scale
02105      * selection itself.
02106      */
02107     log_var(&arg1, &arg2, &result);
02108 
02109     res = make_result(&result);
02110 
02111     free_var(&result);
02112 
02113     PG_RETURN_NUMERIC(res);
02114 }
02115 
02116 
02117 /*
02118  * numeric_power() -
02119  *
02120  *  Raise b to the power of x
02121  */
02122 Datum
02123 numeric_power(PG_FUNCTION_ARGS)
02124 {
02125     Numeric     num1 = PG_GETARG_NUMERIC(0);
02126     Numeric     num2 = PG_GETARG_NUMERIC(1);
02127     Numeric     res;
02128     NumericVar  arg1;
02129     NumericVar  arg2;
02130     NumericVar  arg2_trunc;
02131     NumericVar  result;
02132 
02133     /*
02134      * Handle NaN
02135      */
02136     if (NUMERIC_IS_NAN(num1) || NUMERIC_IS_NAN(num2))
02137         PG_RETURN_NUMERIC(make_result(&const_nan));
02138 
02139     /*
02140      * Initialize things
02141      */
02142     init_var(&arg2_trunc);
02143     init_var(&result);
02144     init_var_from_num(num1, &arg1);
02145     init_var_from_num(num2, &arg2);
02146 
02147     set_var_from_var(&arg2, &arg2_trunc);
02148     trunc_var(&arg2_trunc, 0);
02149 
02150     /*
02151      * The SQL spec requires that we emit a particular SQLSTATE error code for
02152      * certain error conditions.  Specifically, we don't return a
02153      * divide-by-zero error code for 0 ^ -1.
02154      */
02155     if (cmp_var(&arg1, &const_zero) == 0 &&
02156         cmp_var(&arg2, &const_zero) < 0)
02157         ereport(ERROR,
02158                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
02159                  errmsg("zero raised to a negative power is undefined")));
02160 
02161     if (cmp_var(&arg1, &const_zero) < 0 &&
02162         cmp_var(&arg2, &arg2_trunc) != 0)
02163         ereport(ERROR,
02164                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
02165                  errmsg("a negative number raised to a non-integer power yields a complex result")));
02166 
02167     /*
02168      * Call power_var() to compute and return the result; note it handles
02169      * scale selection itself.
02170      */
02171     power_var(&arg1, &arg2, &result);
02172 
02173     res = make_result(&result);
02174 
02175     free_var(&result);
02176     free_var(&arg2_trunc);
02177 
02178     PG_RETURN_NUMERIC(res);
02179 }
02180 
02181 
02182 /* ----------------------------------------------------------------------
02183  *
02184  * Type conversion functions
02185  *
02186  * ----------------------------------------------------------------------
02187  */
02188 
02189 
02190 Datum
02191 int4_numeric(PG_FUNCTION_ARGS)
02192 {
02193     int32       val = PG_GETARG_INT32(0);
02194     Numeric     res;
02195     NumericVar  result;
02196 
02197     init_var(&result);
02198 
02199     int8_to_numericvar((int64) val, &result);
02200 
02201     res = make_result(&result);
02202 
02203     free_var(&result);
02204 
02205     PG_RETURN_NUMERIC(res);
02206 }
02207 
02208 
02209 Datum
02210 numeric_int4(PG_FUNCTION_ARGS)
02211 {
02212     Numeric     num = PG_GETARG_NUMERIC(0);
02213     NumericVar  x;
02214     int32       result;
02215 
02216     /* XXX would it be better to return NULL? */
02217     if (NUMERIC_IS_NAN(num))
02218         ereport(ERROR,
02219                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
02220                  errmsg("cannot convert NaN to integer")));
02221 
02222     /* Convert to variable format, then convert to int4 */
02223     init_var_from_num(num, &x);
02224     result = numericvar_to_int4(&x);
02225     PG_RETURN_INT32(result);
02226 }
02227 
02228 /*
02229  * Given a NumericVar, convert it to an int32. If the NumericVar
02230  * exceeds the range of an int32, raise the appropriate error via
02231  * ereport(). The input NumericVar is *not* free'd.
02232  */
02233 static int32
02234 numericvar_to_int4(NumericVar *var)
02235 {
02236     int32       result;
02237     int64       val;
02238 
02239     if (!numericvar_to_int8(var, &val))
02240         ereport(ERROR,
02241                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
02242                  errmsg("integer out of range")));
02243 
02244     /* Down-convert to int4 */
02245     result = (int32) val;
02246 
02247     /* Test for overflow by reverse-conversion. */
02248     if ((int64) result != val)
02249         ereport(ERROR,
02250                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
02251                  errmsg("integer out of range")));
02252 
02253     return result;
02254 }
02255 
02256 Datum
02257 int8_numeric(PG_FUNCTION_ARGS)
02258 {
02259     int64       val = PG_GETARG_INT64(0);
02260     Numeric     res;
02261     NumericVar  result;
02262 
02263     init_var(&result);
02264 
02265     int8_to_numericvar(val, &result);
02266 
02267     res = make_result(&result);
02268 
02269     free_var(&result);
02270 
02271     PG_RETURN_NUMERIC(res);
02272 }
02273 
02274 
02275 Datum
02276 numeric_int8(PG_FUNCTION_ARGS)
02277 {
02278     Numeric     num = PG_GETARG_NUMERIC(0);
02279     NumericVar  x;
02280     int64       result;
02281 
02282     /* XXX would it be better to return NULL? */
02283     if (NUMERIC_IS_NAN(num))
02284         ereport(ERROR,
02285                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
02286                  errmsg("cannot convert NaN to bigint")));
02287 
02288     /* Convert to variable format and thence to int8 */
02289     init_var_from_num(num, &x);
02290 
02291     if (!numericvar_to_int8(&x, &result))
02292         ereport(ERROR,
02293                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
02294                  errmsg("bigint out of range")));
02295 
02296     PG_RETURN_INT64(result);
02297 }
02298 
02299 
02300 Datum
02301 int2_numeric(PG_FUNCTION_ARGS)
02302 {
02303     int16       val = PG_GETARG_INT16(0);
02304     Numeric     res;
02305     NumericVar  result;
02306 
02307     init_var(&result);
02308 
02309     int8_to_numericvar((int64) val, &result);
02310 
02311     res = make_result(&result);
02312 
02313     free_var(&result);
02314 
02315     PG_RETURN_NUMERIC(res);
02316 }
02317 
02318 
02319 Datum
02320 numeric_int2(PG_FUNCTION_ARGS)
02321 {
02322     Numeric     num = PG_GETARG_NUMERIC(0);
02323     NumericVar  x;
02324     int64       val;
02325     int16       result;
02326 
02327     /* XXX would it be better to return NULL? */
02328     if (NUMERIC_IS_NAN(num))
02329         ereport(ERROR,
02330                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
02331                  errmsg("cannot convert NaN to smallint")));
02332 
02333     /* Convert to variable format and thence to int8 */
02334     init_var_from_num(num, &x);
02335 
02336     if (!numericvar_to_int8(&x, &val))
02337         ereport(ERROR,
02338                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
02339                  errmsg("smallint out of range")));
02340 
02341     /* Down-convert to int2 */
02342     result = (int16) val;
02343 
02344     /* Test for overflow by reverse-conversion. */
02345     if ((int64) result != val)
02346         ereport(ERROR,
02347                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
02348                  errmsg("smallint out of range")));
02349 
02350     PG_RETURN_INT16(result);
02351 }
02352 
02353 
02354 Datum
02355 float8_numeric(PG_FUNCTION_ARGS)
02356 {
02357     float8      val = PG_GETARG_FLOAT8(0);
02358     Numeric     res;
02359     NumericVar  result;
02360     char        buf[DBL_DIG + 100];
02361 
02362     if (isnan(val))
02363         PG_RETURN_NUMERIC(make_result(&const_nan));
02364 
02365     sprintf(buf, "%.*g", DBL_DIG, val);
02366 
02367     init_var(&result);
02368 
02369     /* Assume we need not worry about leading/trailing spaces */
02370     (void) set_var_from_str(buf, buf, &result);
02371 
02372     res = make_result(&result);
02373 
02374     free_var(&result);
02375 
02376     PG_RETURN_NUMERIC(res);
02377 }
02378 
02379 
02380 Datum
02381 numeric_float8(PG_FUNCTION_ARGS)
02382 {
02383     Numeric     num = PG_GETARG_NUMERIC(0);
02384     char       *tmp;
02385     Datum       result;
02386 
02387     if (NUMERIC_IS_NAN(num))
02388         PG_RETURN_FLOAT8(get_float8_nan());
02389 
02390     tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
02391                                               NumericGetDatum(num)));
02392 
02393     result = DirectFunctionCall1(float8in, CStringGetDatum(tmp));
02394 
02395     pfree(tmp);
02396 
02397     PG_RETURN_DATUM(result);
02398 }
02399 
02400 
02401 /* Convert numeric to float8; if out of range, return +/- HUGE_VAL */
02402 Datum
02403 numeric_float8_no_overflow(PG_FUNCTION_ARGS)
02404 {
02405     Numeric     num = PG_GETARG_NUMERIC(0);
02406     double      val;
02407 
02408     if (NUMERIC_IS_NAN(num))
02409         PG_RETURN_FLOAT8(get_float8_nan());
02410 
02411     val = numeric_to_double_no_overflow(num);
02412 
02413     PG_RETURN_FLOAT8(val);
02414 }
02415 
02416 Datum
02417 float4_numeric(PG_FUNCTION_ARGS)
02418 {
02419     float4      val = PG_GETARG_FLOAT4(0);
02420     Numeric     res;
02421     NumericVar  result;
02422     char        buf[FLT_DIG + 100];
02423 
02424     if (isnan(val))
02425         PG_RETURN_NUMERIC(make_result(&const_nan));
02426 
02427     sprintf(buf, "%.*g", FLT_DIG, val);
02428 
02429     init_var(&result);
02430 
02431     /* Assume we need not worry about leading/trailing spaces */
02432     (void) set_var_from_str(buf, buf, &result);
02433 
02434     res = make_result(&result);
02435 
02436     free_var(&result);
02437 
02438     PG_RETURN_NUMERIC(res);
02439 }
02440 
02441 
02442 Datum
02443 numeric_float4(PG_FUNCTION_ARGS)
02444 {
02445     Numeric     num = PG_GETARG_NUMERIC(0);
02446     char       *tmp;
02447     Datum       result;
02448 
02449     if (NUMERIC_IS_NAN(num))
02450         PG_RETURN_FLOAT4(get_float4_nan());
02451 
02452     tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
02453                                               NumericGetDatum(num)));
02454 
02455     result = DirectFunctionCall1(float4in, CStringGetDatum(tmp));
02456 
02457     pfree(tmp);
02458 
02459     PG_RETURN_DATUM(result);
02460 }
02461 
02462 
02463 /* ----------------------------------------------------------------------
02464  *
02465  * Aggregate functions
02466  *
02467  * The transition datatype for all these aggregates is a 3-element array
02468  * of Numeric, holding the values N, sum(X), sum(X*X) in that order.
02469  *
02470  * We represent N as a numeric mainly to avoid having to build a special
02471  * datatype; it's unlikely it'd overflow an int4, but ...
02472  *
02473  * ----------------------------------------------------------------------
02474  */
02475 
02476 static ArrayType *
02477 do_numeric_accum(ArrayType *transarray, Numeric newval)
02478 {
02479     Datum      *transdatums;
02480     int         ndatums;
02481     Datum       N,
02482                 sumX,
02483                 sumX2;
02484     ArrayType  *result;
02485 
02486     /* We assume the input is array of numeric */
02487     deconstruct_array(transarray,
02488                       NUMERICOID, -1, false, 'i',
02489                       &transdatums, NULL, &ndatums);
02490     if (ndatums != 3)
02491         elog(ERROR, "expected 3-element numeric array");
02492     N = transdatums[0];
02493     sumX = transdatums[1];
02494     sumX2 = transdatums[2];
02495 
02496     N = DirectFunctionCall1(numeric_inc, N);
02497     sumX = DirectFunctionCall2(numeric_add, sumX,
02498                                NumericGetDatum(newval));
02499     sumX2 = DirectFunctionCall2(numeric_add, sumX2,
02500                                 DirectFunctionCall2(numeric_mul,
02501                                                     NumericGetDatum(newval),
02502                                                     NumericGetDatum(newval)));
02503 
02504     transdatums[0] = N;
02505     transdatums[1] = sumX;
02506     transdatums[2] = sumX2;
02507 
02508     result = construct_array(transdatums, 3,
02509                              NUMERICOID, -1, false, 'i');
02510 
02511     return result;
02512 }
02513 
02514 /*
02515  * Improve avg performance by not caclulating sum(X*X).
02516  */
02517 static ArrayType *
02518 do_numeric_avg_accum(ArrayType *transarray, Numeric newval)
02519 {
02520     Datum      *transdatums;
02521     int         ndatums;
02522     Datum       N,
02523                 sumX;
02524     ArrayType  *result;
02525 
02526     /* We assume the input is array of numeric */
02527     deconstruct_array(transarray,
02528                       NUMERICOID, -1, false, 'i',
02529                       &transdatums, NULL, &ndatums);
02530     if (ndatums != 2)
02531         elog(ERROR, "expected 2-element numeric array");
02532     N = transdatums[0];
02533     sumX = transdatums[1];
02534 
02535     N = DirectFunctionCall1(numeric_inc, N);
02536     sumX = DirectFunctionCall2(numeric_add, sumX,
02537                                NumericGetDatum(newval));
02538 
02539     transdatums[0] = N;
02540     transdatums[1] = sumX;
02541 
02542     result = construct_array(transdatums, 2,
02543                              NUMERICOID, -1, false, 'i');
02544 
02545     return result;
02546 }
02547 
02548 Datum
02549 numeric_accum(PG_FUNCTION_ARGS)
02550 {
02551     ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
02552     Numeric     newval = PG_GETARG_NUMERIC(1);
02553 
02554     PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
02555 }
02556 
02557 /*
02558  * Optimized case for average of numeric.
02559  */
02560 Datum
02561 numeric_avg_accum(PG_FUNCTION_ARGS)
02562 {
02563     ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
02564     Numeric     newval = PG_GETARG_NUMERIC(1);
02565 
02566     PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval));
02567 }
02568 
02569 /*
02570  * Integer data types all use Numeric accumulators to share code and
02571  * avoid risk of overflow.  For int2 and int4 inputs, Numeric accumulation
02572  * is overkill for the N and sum(X) values, but definitely not overkill
02573  * for the sum(X*X) value.  Hence, we use int2_accum and int4_accum only
02574  * for stddev/variance --- there are faster special-purpose accumulator
02575  * routines for SUM and AVG of these datatypes.
02576  */
02577 
02578 Datum
02579 int2_accum(PG_FUNCTION_ARGS)
02580 {
02581     ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
02582     Datum       newval2 = PG_GETARG_DATUM(1);
02583     Numeric     newval;
02584 
02585     newval = DatumGetNumeric(DirectFunctionCall1(int2_numeric, newval2));
02586 
02587     PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
02588 }
02589 
02590 Datum
02591 int4_accum(PG_FUNCTION_ARGS)
02592 {
02593     ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
02594     Datum       newval4 = PG_GETARG_DATUM(1);
02595     Numeric     newval;
02596 
02597     newval = DatumGetNumeric(DirectFunctionCall1(int4_numeric, newval4));
02598 
02599     PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
02600 }
02601 
02602 Datum
02603 int8_accum(PG_FUNCTION_ARGS)
02604 {
02605     ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
02606     Datum       newval8 = PG_GETARG_DATUM(1);
02607     Numeric     newval;
02608 
02609     newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8));
02610 
02611     PG_RETURN_ARRAYTYPE_P(do_numeric_accum(transarray, newval));
02612 }
02613 
02614 /*
02615  * Optimized case for average of int8.
02616  */
02617 Datum
02618 int8_avg_accum(PG_FUNCTION_ARGS)
02619 {
02620     ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
02621     Datum       newval8 = PG_GETARG_DATUM(1);
02622     Numeric     newval;
02623 
02624     newval = DatumGetNumeric(DirectFunctionCall1(int8_numeric, newval8));
02625 
02626     PG_RETURN_ARRAYTYPE_P(do_numeric_avg_accum(transarray, newval));
02627 }
02628 
02629 
02630 Datum
02631 numeric_avg(PG_FUNCTION_ARGS)
02632 {
02633     ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
02634     Datum      *transdatums;
02635     int         ndatums;
02636     Numeric     N,
02637                 sumX;
02638 
02639     /* We assume the input is array of numeric */
02640     deconstruct_array(transarray,
02641                       NUMERICOID, -1, false, 'i',
02642                       &transdatums, NULL, &ndatums);
02643     if (ndatums != 2)
02644         elog(ERROR, "expected 2-element numeric array");
02645     N = DatumGetNumeric(transdatums[0]);
02646     sumX = DatumGetNumeric(transdatums[1]);
02647 
02648     /* SQL defines AVG of no values to be NULL */
02649     /* N is zero iff no digits (cf. numeric_uminus) */
02650     if (NUMERIC_NDIGITS(N) == 0)
02651         PG_RETURN_NULL();
02652 
02653     PG_RETURN_DATUM(DirectFunctionCall2(numeric_div,
02654                                         NumericGetDatum(sumX),
02655                                         NumericGetDatum(N)));
02656 }
02657 
02658 /*
02659  * Workhorse routine for the standard deviance and variance
02660  * aggregates. 'transarray' is the aggregate's transition
02661  * array. 'variance' specifies whether we should calculate the
02662  * variance or the standard deviation. 'sample' indicates whether the
02663  * caller is interested in the sample or the population
02664  * variance/stddev.
02665  *
02666  * If appropriate variance statistic is undefined for the input,
02667  * *is_null is set to true and NULL is returned.
02668  */
02669 static Numeric
02670 numeric_stddev_internal(ArrayType *transarray,
02671                         bool variance, bool sample,
02672                         bool *is_null)
02673 {
02674     Datum      *transdatums;
02675     int         ndatums;
02676     Numeric     N,
02677                 sumX,
02678                 sumX2,
02679                 res;
02680     NumericVar  vN,
02681                 vsumX,
02682                 vsumX2,
02683                 vNminus1;
02684     NumericVar *comp;
02685     int         rscale;
02686 
02687     *is_null = false;
02688 
02689     /* We assume the input is array of numeric */
02690     deconstruct_array(transarray,
02691                       NUMERICOID, -1, false, 'i',
02692                       &transdatums, NULL, &ndatums);
02693     if (ndatums != 3)
02694         elog(ERROR, "expected 3-element numeric array");
02695     N = DatumGetNumeric(transdatums[0]);
02696     sumX = DatumGetNumeric(transdatums[1]);
02697     sumX2 = DatumGetNumeric(transdatums[2]);
02698 
02699     if (NUMERIC_IS_NAN(N) || NUMERIC_IS_NAN(sumX) || NUMERIC_IS_NAN(sumX2))
02700         return make_result(&const_nan);
02701 
02702     init_var_from_num(N, &vN);
02703 
02704     /*
02705      * Sample stddev and variance are undefined when N <= 1; population stddev
02706      * is undefined when N == 0. Return NULL in either case.
02707      */
02708     if (sample)
02709         comp = &const_one;
02710     else
02711         comp = &const_zero;
02712 
02713     if (cmp_var(&vN, comp) <= 0)
02714     {
02715         *is_null = true;
02716         return NULL;
02717     }
02718 
02719     init_var(&vNminus1);
02720     sub_var(&vN, &const_one, &vNminus1);
02721 
02722     init_var_from_num(sumX, &vsumX);
02723     init_var_from_num(sumX2, &vsumX2);
02724 
02725     /* compute rscale for mul_var calls */
02726     rscale = vsumX.dscale * 2;
02727 
02728     mul_var(&vsumX, &vsumX, &vsumX, rscale);    /* vsumX = sumX * sumX */
02729     mul_var(&vN, &vsumX2, &vsumX2, rscale);     /* vsumX2 = N * sumX2 */
02730     sub_var(&vsumX2, &vsumX, &vsumX2);  /* N * sumX2 - sumX * sumX */
02731 
02732     if (cmp_var(&vsumX2, &const_zero) <= 0)
02733     {
02734         /* Watch out for roundoff error producing a negative numerator */
02735         res = make_result(&const_zero);
02736     }
02737     else
02738     {
02739         if (sample)
02740             mul_var(&vN, &vNminus1, &vNminus1, 0);      /* N * (N - 1) */
02741         else
02742             mul_var(&vN, &vN, &vNminus1, 0);    /* N * N */
02743         rscale = select_div_scale(&vsumX2, &vNminus1);
02744         div_var(&vsumX2, &vNminus1, &vsumX, rscale, true);      /* variance */
02745         if (!variance)
02746             sqrt_var(&vsumX, &vsumX, rscale);   /* stddev */
02747 
02748         res = make_result(&vsumX);
02749     }
02750 
02751     free_var(&vNminus1);
02752     free_var(&vsumX);
02753     free_var(&vsumX2);
02754 
02755     return res;
02756 }
02757 
02758 Datum
02759 numeric_var_samp(PG_FUNCTION_ARGS)
02760 {
02761     Numeric     res;
02762     bool        is_null;
02763 
02764     res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
02765                                   true, true, &is_null);
02766 
02767     if (is_null)
02768         PG_RETURN_NULL();
02769     else
02770         PG_RETURN_NUMERIC(res);
02771 }
02772 
02773 Datum
02774 numeric_stddev_samp(PG_FUNCTION_ARGS)
02775 {
02776     Numeric     res;
02777     bool        is_null;
02778 
02779     res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
02780                                   false, true, &is_null);
02781 
02782     if (is_null)
02783         PG_RETURN_NULL();
02784     else
02785         PG_RETURN_NUMERIC(res);
02786 }
02787 
02788 Datum
02789 numeric_var_pop(PG_FUNCTION_ARGS)
02790 {
02791     Numeric     res;
02792     bool        is_null;
02793 
02794     res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
02795                                   true, false, &is_null);
02796 
02797     if (is_null)
02798         PG_RETURN_NULL();
02799     else
02800         PG_RETURN_NUMERIC(res);
02801 }
02802 
02803 Datum
02804 numeric_stddev_pop(PG_FUNCTION_ARGS)
02805 {
02806     Numeric     res;
02807     bool        is_null;
02808 
02809     res = numeric_stddev_internal(PG_GETARG_ARRAYTYPE_P(0),
02810                                   false, false, &is_null);
02811 
02812     if (is_null)
02813         PG_RETURN_NULL();
02814     else
02815         PG_RETURN_NUMERIC(res);
02816 }
02817 
02818 /*
02819  * SUM transition functions for integer datatypes.
02820  *
02821  * To avoid overflow, we use accumulators wider than the input datatype.
02822  * A Numeric accumulator is needed for int8 input; for int4 and int2
02823  * inputs, we use int8 accumulators which should be sufficient for practical
02824  * purposes.  (The latter two therefore don't really belong in this file,
02825  * but we keep them here anyway.)
02826  *
02827  * Because SQL defines the SUM() of no values to be NULL, not zero,
02828  * the initial condition of the transition data value needs to be NULL. This
02829  * means we can't rely on ExecAgg to automatically insert the first non-null
02830  * data value into the transition data: it doesn't know how to do the type
02831  * conversion.  The upshot is that these routines have to be marked non-strict
02832  * and handle substitution of the first non-null input themselves.
02833  */
02834 
02835 Datum
02836 int2_sum(PG_FUNCTION_ARGS)
02837 {
02838     int64       newval;
02839 
02840     if (PG_ARGISNULL(0))
02841     {
02842         /* No non-null input seen so far... */
02843         if (PG_ARGISNULL(1))
02844             PG_RETURN_NULL();   /* still no non-null */
02845         /* This is the first non-null input. */
02846         newval = (int64) PG_GETARG_INT16(1);
02847         PG_RETURN_INT64(newval);
02848     }
02849 
02850     /*
02851      * If we're invoked as an aggregate, we can cheat and modify our first
02852      * parameter in-place to avoid palloc overhead. If not, we need to return
02853      * the new value of the transition variable. (If int8 is pass-by-value,
02854      * then of course this is useless as well as incorrect, so just ifdef it
02855      * out.)
02856      */
02857 #ifndef USE_FLOAT8_BYVAL        /* controls int8 too */
02858     if (AggCheckCallContext(fcinfo, NULL))
02859     {
02860         int64      *oldsum = (int64 *) PG_GETARG_POINTER(0);
02861 
02862         /* Leave the running sum unchanged in the new input is null */
02863         if (!PG_ARGISNULL(1))
02864             *oldsum = *oldsum + (int64) PG_GETARG_INT16(1);
02865 
02866         PG_RETURN_POINTER(oldsum);
02867     }
02868     else
02869 #endif
02870     {
02871         int64       oldsum = PG_GETARG_INT64(0);
02872 
02873         /* Leave sum unchanged if new input is null. */
02874         if (PG_ARGISNULL(1))
02875             PG_RETURN_INT64(oldsum);
02876 
02877         /* OK to do the addition. */
02878         newval = oldsum + (int64) PG_GETARG_INT16(1);
02879 
02880         PG_RETURN_INT64(newval);
02881     }
02882 }
02883 
02884 Datum
02885 int4_sum(PG_FUNCTION_ARGS)
02886 {
02887     int64       newval;
02888 
02889     if (PG_ARGISNULL(0))
02890     {
02891         /* No non-null input seen so far... */
02892         if (PG_ARGISNULL(1))
02893             PG_RETURN_NULL();   /* still no non-null */
02894         /* This is the first non-null input. */
02895         newval = (int64) PG_GETARG_INT32(1);
02896         PG_RETURN_INT64(newval);
02897     }
02898 
02899     /*
02900      * If we're invoked as an aggregate, we can cheat and modify our first
02901      * parameter in-place to avoid palloc overhead. If not, we need to return
02902      * the new value of the transition variable. (If int8 is pass-by-value,
02903      * then of course this is useless as well as incorrect, so just ifdef it
02904      * out.)
02905      */
02906 #ifndef USE_FLOAT8_BYVAL        /* controls int8 too */
02907     if (AggCheckCallContext(fcinfo, NULL))
02908     {
02909         int64      *oldsum = (int64 *) PG_GETARG_POINTER(0);
02910 
02911         /* Leave the running sum unchanged in the new input is null */
02912         if (!PG_ARGISNULL(1))
02913             *oldsum = *oldsum + (int64) PG_GETARG_INT32(1);
02914 
02915         PG_RETURN_POINTER(oldsum);
02916     }
02917     else
02918 #endif
02919     {
02920         int64       oldsum = PG_GETARG_INT64(0);
02921 
02922         /* Leave sum unchanged if new input is null. */
02923         if (PG_ARGISNULL(1))
02924             PG_RETURN_INT64(oldsum);
02925 
02926         /* OK to do the addition. */
02927         newval = oldsum + (int64) PG_GETARG_INT32(1);
02928 
02929         PG_RETURN_INT64(newval);
02930     }
02931 }
02932 
02933 Datum
02934 int8_sum(PG_FUNCTION_ARGS)
02935 {
02936     Numeric     oldsum;
02937     Datum       newval;
02938 
02939     if (PG_ARGISNULL(0))
02940     {
02941         /* No non-null input seen so far... */
02942         if (PG_ARGISNULL(1))
02943             PG_RETURN_NULL();   /* still no non-null */
02944         /* This is the first non-null input. */
02945         newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
02946         PG_RETURN_DATUM(newval);
02947     }
02948 
02949     /*
02950      * Note that we cannot special-case the aggregate case here, as we do for
02951      * int2_sum and int4_sum: numeric is of variable size, so we cannot modify
02952      * our first parameter in-place.
02953      */
02954 
02955     oldsum = PG_GETARG_NUMERIC(0);
02956 
02957     /* Leave sum unchanged if new input is null. */
02958     if (PG_ARGISNULL(1))
02959         PG_RETURN_NUMERIC(oldsum);
02960 
02961     /* OK to do the addition. */
02962     newval = DirectFunctionCall1(int8_numeric, PG_GETARG_DATUM(1));
02963 
02964     PG_RETURN_DATUM(DirectFunctionCall2(numeric_add,
02965                                         NumericGetDatum(oldsum), newval));
02966 }
02967 
02968 
02969 /*
02970  * Routines for avg(int2) and avg(int4).  The transition datatype
02971  * is a two-element int8 array, holding count and sum.
02972  */
02973 
02974 typedef struct Int8TransTypeData
02975 {
02976     int64       count;
02977     int64       sum;
02978 } Int8TransTypeData;
02979 
02980 Datum
02981 int2_avg_accum(PG_FUNCTION_ARGS)
02982 {
02983     ArrayType  *transarray;
02984     int16       newval = PG_GETARG_INT16(1);
02985     Int8TransTypeData *transdata;
02986 
02987     /*
02988      * If we're invoked as an aggregate, we can cheat and modify our first
02989      * parameter in-place to reduce palloc overhead. Otherwise we need to make
02990      * a copy of it before scribbling on it.
02991      */
02992     if (AggCheckCallContext(fcinfo, NULL))
02993         transarray = PG_GETARG_ARRAYTYPE_P(0);
02994     else
02995         transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
02996 
02997     if (ARR_HASNULL(transarray) ||
02998         ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
02999         elog(ERROR, "expected 2-element int8 array");
03000 
03001     transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
03002     transdata->count++;
03003     transdata->sum += newval;
03004 
03005     PG_RETURN_ARRAYTYPE_P(transarray);
03006 }
03007 
03008 Datum
03009 int4_avg_accum(PG_FUNCTION_ARGS)
03010 {
03011     ArrayType  *transarray;
03012     int32       newval = PG_GETARG_INT32(1);
03013     Int8TransTypeData *transdata;
03014 
03015     /*
03016      * If we're invoked as an aggregate, we can cheat and modify our first
03017      * parameter in-place to reduce palloc overhead. Otherwise we need to make
03018      * a copy of it before scribbling on it.
03019      */
03020     if (AggCheckCallContext(fcinfo, NULL))
03021         transarray = PG_GETARG_ARRAYTYPE_P(0);
03022     else
03023         transarray = PG_GETARG_ARRAYTYPE_P_COPY(0);
03024 
03025     if (ARR_HASNULL(transarray) ||
03026         ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
03027         elog(ERROR, "expected 2-element int8 array");
03028 
03029     transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
03030     transdata->count++;
03031     transdata->sum += newval;
03032 
03033     PG_RETURN_ARRAYTYPE_P(transarray);
03034 }
03035 
03036 Datum
03037 int8_avg(PG_FUNCTION_ARGS)
03038 {
03039     ArrayType  *transarray = PG_GETARG_ARRAYTYPE_P(0);
03040     Int8TransTypeData *transdata;
03041     Datum       countd,
03042                 sumd;
03043 
03044     if (ARR_HASNULL(transarray) ||
03045         ARR_SIZE(transarray) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
03046         elog(ERROR, "expected 2-element int8 array");
03047     transdata = (Int8TransTypeData *) ARR_DATA_PTR(transarray);
03048 
03049     /* SQL defines AVG of no values to be NULL */
03050     if (transdata->count == 0)
03051         PG_RETURN_NULL();
03052 
03053     countd = DirectFunctionCall1(int8_numeric,
03054                                  Int64GetDatumFast(transdata->count));
03055     sumd = DirectFunctionCall1(int8_numeric,
03056                                Int64GetDatumFast(transdata->sum));
03057 
03058     PG_RETURN_DATUM(DirectFunctionCall2(numeric_div, sumd, countd));
03059 }
03060 
03061 
03062 /* ----------------------------------------------------------------------
03063  *
03064  * Debug support
03065  *
03066  * ----------------------------------------------------------------------
03067  */
03068 
03069 #ifdef NUMERIC_DEBUG
03070 
03071 /*
03072  * dump_numeric() - Dump a value in the db storage format for debugging
03073  */
03074 static void
03075 dump_numeric(const char *str, Numeric num)
03076 {
03077     NumericDigit *digits = NUMERIC_DIGITS(num);
03078     int         ndigits;
03079     int         i;
03080 
03081     ndigits = NUMERIC_NDIGITS(num);
03082 
03083     printf("%s: NUMERIC w=%d d=%d ", str,
03084            NUMERIC_WEIGHT(num), NUMERIC_DSCALE(num));
03085     switch (NUMERIC_SIGN(num))
03086     {
03087         case NUMERIC_POS:
03088             printf("POS");
03089             break;
03090         case NUMERIC_NEG:
03091             printf("NEG");
03092             break;
03093         case NUMERIC_NAN:
03094             printf("NaN");
03095             break;
03096         default:
03097             printf("SIGN=0x%x", NUMERIC_SIGN(num));
03098             break;
03099     }
03100 
03101     for (i = 0; i < ndigits; i++)
03102         printf(" %0*d", DEC_DIGITS, digits[i]);
03103     printf("\n");
03104 }
03105 
03106 
03107 /*
03108  * dump_var() - Dump a value in the variable format for debugging
03109  */
03110 static void
03111 dump_var(const char *str, NumericVar *var)
03112 {
03113     int         i;
03114 
03115     printf("%s: VAR w=%d d=%d ", str, var->weight, var->dscale);
03116     switch (var->sign)
03117     {
03118         case NUMERIC_POS:
03119             printf("POS");
03120             break;
03121         case NUMERIC_NEG:
03122             printf("NEG");
03123             break;
03124         case NUMERIC_NAN:
03125             printf("NaN");
03126             break;
03127         default:
03128             printf("SIGN=0x%x", var->sign);
03129             break;
03130     }
03131 
03132     for (i = 0; i < var->ndigits; i++)
03133         printf(" %0*d", DEC_DIGITS, var->digits[i]);
03134 
03135     printf("\n");
03136 }
03137 #endif   /* NUMERIC_DEBUG */
03138 
03139 
03140 /* ----------------------------------------------------------------------
03141  *
03142  * Local functions follow
03143  *
03144  * In general, these do not support NaNs --- callers must eliminate
03145  * the possibility of NaN first.  (make_result() is an exception.)
03146  *
03147  * ----------------------------------------------------------------------
03148  */
03149 
03150 
03151 /*
03152  * alloc_var() -
03153  *
03154  *  Allocate a digit buffer of ndigits digits (plus a spare digit for rounding)
03155  */
03156 static void
03157 alloc_var(NumericVar *var, int ndigits)
03158 {
03159     digitbuf_free(var->buf);
03160     var->buf = digitbuf_alloc(ndigits + 1);
03161     var->buf[0] = 0;            /* spare digit for rounding */
03162     var->digits = var->buf + 1;
03163     var->ndigits = ndigits;
03164 }
03165 
03166 
03167 /*
03168  * free_var() -
03169  *
03170  *  Return the digit buffer of a variable to the free pool
03171  */
03172 static void
03173 free_var(NumericVar *var)
03174 {
03175     digitbuf_free(var->buf);
03176     var->buf = NULL;
03177     var->digits = NULL;
03178     var->sign = NUMERIC_NAN;
03179 }
03180 
03181 
03182 /*
03183  * zero_var() -
03184  *
03185  *  Set a variable to ZERO.
03186  *  Note: its dscale is not touched.
03187  */
03188 static void
03189 zero_var(NumericVar *var)
03190 {
03191     digitbuf_free(var->buf);
03192     var->buf = NULL;
03193     var->digits = NULL;
03194     var->ndigits = 0;
03195     var->weight = 0;            /* by convention; doesn't really matter */
03196     var->sign = NUMERIC_POS;    /* anything but NAN... */
03197 }
03198 
03199 
03200 /*
03201  * set_var_from_str()
03202  *
03203  *  Parse a string and put the number into a variable
03204  *
03205  * This function does not handle leading or trailing spaces, and it doesn't
03206  * accept "NaN" either.  It returns the end+1 position so that caller can
03207  * check for trailing spaces/garbage if deemed necessary.
03208  *
03209  * cp is the place to actually start parsing; str is what to use in error
03210  * reports.  (Typically cp would be the same except advanced over spaces.)
03211  */
03212 static const char *
03213 set_var_from_str(const char *str, const char *cp, NumericVar *dest)
03214 {
03215     bool        have_dp = FALSE;
03216     int         i;
03217     unsigned char *decdigits;
03218     int         sign = NUMERIC_POS;
03219     int         dweight = -1;
03220     int         ddigits;
03221     int         dscale = 0;
03222     int         weight;
03223     int         ndigits;
03224     int         offset;
03225     NumericDigit *digits;
03226 
03227     /*
03228      * We first parse the string to extract decimal digits and determine the
03229      * correct decimal weight.  Then convert to NBASE representation.
03230      */
03231     switch (*cp)
03232     {
03233         case '+':
03234             sign = NUMERIC_POS;
03235             cp++;
03236             break;
03237 
03238         case '-':
03239             sign = NUMERIC_NEG;
03240             cp++;
03241             break;
03242     }
03243 
03244     if (*cp == '.')
03245     {
03246         have_dp = TRUE;
03247         cp++;
03248     }
03249 
03250     if (!isdigit((unsigned char) *cp))
03251         ereport(ERROR,
03252                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
03253               errmsg("invalid input syntax for type numeric: \"%s\"", str)));
03254 
03255     decdigits = (unsigned char *) palloc(strlen(cp) + DEC_DIGITS * 2);
03256 
03257     /* leading padding for digit alignment later */
03258     memset(decdigits, 0, DEC_DIGITS);
03259     i = DEC_DIGITS;
03260 
03261     while (*cp)
03262     {
03263         if (isdigit((unsigned char) *cp))
03264         {
03265             decdigits[i++] = *cp++ - '0';
03266             if (!have_dp)
03267                 dweight++;
03268             else
03269                 dscale++;
03270         }
03271         else if (*cp == '.')
03272         {
03273             if (have_dp)
03274                 ereport(ERROR,
03275                         (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
03276                       errmsg("invalid input syntax for type numeric: \"%s\"",
03277                              str)));
03278             have_dp = TRUE;
03279             cp++;
03280         }
03281         else
03282             break;
03283     }
03284 
03285     ddigits = i - DEC_DIGITS;
03286     /* trailing padding for digit alignment later */
03287     memset(decdigits + i, 0, DEC_DIGITS - 1);
03288 
03289     /* Handle exponent, if any */
03290     if (*cp == 'e' || *cp == 'E')
03291     {
03292         long        exponent;
03293         char       *endptr;
03294 
03295         cp++;
03296         exponent = strtol(cp, &endptr, 10);
03297         if (endptr == cp)
03298             ereport(ERROR,
03299                     (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
03300                      errmsg("invalid input syntax for type numeric: \"%s\"",
03301                             str)));
03302         cp = endptr;
03303         if (exponent > NUMERIC_MAX_PRECISION ||
03304             exponent < -NUMERIC_MAX_PRECISION)
03305             ereport(ERROR,
03306                     (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
03307                      errmsg("invalid input syntax for type numeric: \"%s\"",
03308                             str)));
03309         dweight += (int) exponent;
03310         dscale -= (int) exponent;
03311         if (dscale < 0)
03312             dscale = 0;
03313     }
03314 
03315     /*
03316      * Okay, convert pure-decimal representation to base NBASE.  First we need
03317      * to determine the converted weight and ndigits.  offset is the number of
03318      * decimal zeroes to insert before the first given digit to have a
03319      * correctly aligned first NBASE digit.
03320      */
03321     if (dweight >= 0)
03322         weight = (dweight + 1 + DEC_DIGITS - 1) / DEC_DIGITS - 1;
03323     else
03324         weight = -((-dweight - 1) / DEC_DIGITS + 1);
03325     offset = (weight + 1) * DEC_DIGITS - (dweight + 1);
03326     ndigits = (ddigits + offset + DEC_DIGITS - 1) / DEC_DIGITS;
03327 
03328     alloc_var(dest, ndigits);
03329     dest->sign = sign;
03330     dest->weight = weight;
03331     dest->dscale = dscale;
03332 
03333     i = DEC_DIGITS - offset;
03334     digits = dest->digits;
03335 
03336     while (ndigits-- > 0)
03337     {
03338 #if DEC_DIGITS == 4
03339         *digits++ = ((decdigits[i] * 10 + decdigits[i + 1]) * 10 +
03340                      decdigits[i + 2]) * 10 + decdigits[i + 3];
03341 #elif DEC_DIGITS == 2
03342         *digits++ = decdigits[i] * 10 + decdigits[i + 1];
03343 #elif DEC_DIGITS == 1
03344         *digits++ = decdigits[i];
03345 #else
03346 #error unsupported NBASE
03347 #endif
03348         i += DEC_DIGITS;
03349     }
03350 
03351     pfree(decdigits);
03352 
03353     /* Strip any leading/trailing zeroes, and normalize weight if zero */
03354     strip_var(dest);
03355 
03356     /* Return end+1 position for caller */
03357     return cp;
03358 }
03359 
03360 
03361 /*
03362  * set_var_from_num() -
03363  *
03364  *  Convert the packed db format into a variable
03365  */
03366 static void
03367 set_var_from_num(Numeric num, NumericVar *dest)
03368 {
03369     int         ndigits;
03370 
03371     ndigits = NUMERIC_NDIGITS(num);
03372 
03373     alloc_var(dest, ndigits);
03374 
03375     dest->weight = NUMERIC_WEIGHT(num);
03376     dest->sign = NUMERIC_SIGN(num);
03377     dest->dscale = NUMERIC_DSCALE(num);
03378 
03379     memcpy(dest->digits, NUMERIC_DIGITS(num), ndigits * sizeof(NumericDigit));
03380 }
03381 
03382 
03383 /*
03384  * init_var_from_num() -
03385  *
03386  *  Initialize a variable from packed db format. The digits array is not
03387  *  copied, which saves some cycles when the resulting var is not modified.
03388  *  Also, there's no need to call free_var(), as long as you don't assign any
03389  *  other value to it (with set_var_* functions, or by using the var as the
03390  *  destination of a function like add_var())
03391  *
03392  *  CAUTION: Do not modify the digits buffer of a var initialized with this
03393  *  function, e.g by calling round_var() or trunc_var(), as the changes will
03394  *  propagate to the original Numeric! It's OK to use it as the destination
03395  *  argument of one of the calculational functions, though.
03396  */
03397 static void
03398 init_var_from_num(Numeric num, NumericVar *dest)
03399 {
03400     dest->ndigits = NUMERIC_NDIGITS(num);
03401     dest->weight = NUMERIC_WEIGHT(num);
03402     dest->sign = NUMERIC_SIGN(num);
03403     dest->dscale = NUMERIC_DSCALE(num);
03404     dest->digits = NUMERIC_DIGITS(num);
03405     dest->buf = NULL;   /* digits array is not palloc'd */
03406 }
03407 
03408 
03409 /*
03410  * set_var_from_var() -
03411  *
03412  *  Copy one variable into another
03413  */
03414 static void
03415 set_var_from_var(NumericVar *value, NumericVar *dest)
03416 {
03417     NumericDigit *newbuf;
03418 
03419     newbuf = digitbuf_alloc(value->ndigits + 1);
03420     newbuf[0] = 0;              /* spare digit for rounding */
03421     memcpy(newbuf + 1, value->digits, value->ndigits * sizeof(NumericDigit));
03422 
03423     digitbuf_free(dest->buf);
03424 
03425     memmove(dest, value, sizeof(NumericVar));
03426     dest->buf = newbuf;
03427     dest->digits = newbuf + 1;
03428 }
03429 
03430 
03431 /*
03432  * get_str_from_var() -
03433  *
03434  *  Convert a var to text representation (guts of numeric_out).
03435  *  The var is displayed to the number of digits indicated by its dscale.
03436  *  Returns a palloc'd string.
03437  */
03438 static char *
03439 get_str_from_var(NumericVar *var)
03440 {
03441     int         dscale;
03442     char       *str;
03443     char       *cp;
03444     char       *endcp;
03445     int         i;
03446     int         d;
03447     NumericDigit dig;
03448 
03449 #if DEC_DIGITS > 1
03450     NumericDigit d1;
03451 #endif
03452 
03453     dscale = var->dscale;
03454 
03455     /*
03456      * Allocate space for the result.
03457      *
03458      * i is set to the # of decimal digits before decimal point. dscale is the
03459      * # of decimal digits we will print after decimal point. We may generate
03460      * as many as DEC_DIGITS-1 excess digits at the end, and in addition we
03461      * need room for sign, decimal point, null terminator.
03462      */
03463     i = (var->weight + 1) * DEC_DIGITS;
03464     if (i <= 0)
03465         i = 1;
03466 
03467     str = palloc(i + dscale + DEC_DIGITS + 2);
03468     cp = str;
03469 
03470     /*
03471      * Output a dash for negative values
03472      */
03473     if (var->sign == NUMERIC_NEG)
03474         *cp++ = '-';
03475 
03476     /*
03477      * Output all digits before the decimal point
03478      */
03479     if (var->weight < 0)
03480     {
03481         d = var->weight + 1;
03482         *cp++ = '0';
03483     }
03484     else
03485     {
03486         for (d = 0; d <= var->weight; d++)
03487         {
03488             dig = (d < var->ndigits) ? var->digits[d] : 0;
03489             /* In the first digit, suppress extra leading decimal zeroes */
03490 #if DEC_DIGITS == 4
03491             {
03492                 bool        putit = (d > 0);
03493 
03494                 d1 = dig / 1000;
03495                 dig -= d1 * 1000;
03496                 putit |= (d1 > 0);
03497                 if (putit)
03498                     *cp++ = d1 + '0';
03499                 d1 = dig / 100;
03500                 dig -= d1 * 100;
03501                 putit |= (d1 > 0);
03502                 if (putit)
03503                     *cp++ = d1 + '0';
03504                 d1 = dig / 10;
03505                 dig -= d1 * 10;
03506                 putit |= (d1 > 0);
03507                 if (putit)
03508                     *cp++ = d1 + '0';
03509                 *cp++ = dig + '0';
03510             }
03511 #elif DEC_DIGITS == 2
03512             d1 = dig / 10;
03513             dig -= d1 * 10;
03514             if (d1 > 0 || d > 0)
03515                 *cp++ = d1 + '0';
03516             *cp++ = dig + '0';
03517 #elif DEC_DIGITS == 1
03518             *cp++ = dig + '0';
03519 #else
03520 #error unsupported NBASE
03521 #endif
03522         }
03523     }
03524 
03525     /*
03526      * If requested, output a decimal point and all the digits that follow it.
03527      * We initially put out a multiple of DEC_DIGITS digits, then truncate if
03528      * needed.
03529      */
03530     if (dscale > 0)
03531     {
03532         *cp++ = '.';
03533         endcp = cp + dscale;
03534         for (i = 0; i < dscale; d++, i += DEC_DIGITS)
03535         {
03536             dig = (d >= 0 && d < var->ndigits) ? var->digits[d] : 0;
03537 #if DEC_DIGITS == 4
03538             d1 = dig / 1000;
03539             dig -= d1 * 1000;
03540             *cp++ = d1 + '0';
03541             d1 = dig / 100;
03542             dig -= d1 * 100;
03543             *cp++ = d1 + '0';
03544             d1 = dig / 10;
03545             dig -= d1 * 10;
03546             *cp++ = d1 + '0';
03547             *cp++ = dig + '0';
03548 #elif DEC_DIGITS == 2
03549             d1 = dig / 10;
03550             dig -= d1 * 10;
03551             *cp++ = d1 + '0';
03552             *cp++ = dig + '0';
03553 #elif DEC_DIGITS == 1
03554             *cp++ = dig + '0';
03555 #else
03556 #error unsupported NBASE
03557 #endif
03558         }
03559         cp = endcp;
03560     }
03561 
03562     /*
03563      * terminate the string and return it
03564      */
03565     *cp = '\0';
03566     return str;
03567 }
03568 
03569 /*
03570  * get_str_from_var_sci() -
03571  *
03572  *  Convert a var to a normalised scientific notation text representation.
03573  *  This function does the heavy lifting for numeric_out_sci().
03574  *
03575  *  This notation has the general form a * 10^b, where a is known as the
03576  *  "significand" and b is known as the "exponent".
03577  *
03578  *  Because we can't do superscript in ASCII (and because we want to copy
03579  *  printf's behaviour) we display the exponent using E notation, with a
03580  *  minimum of two exponent digits.
03581  *
03582  *  For example, the value 1234 could be output as 1.2e+03.
03583  *
03584  *  We assume that the exponent can fit into an int32.
03585  *
03586  *  rscale is the number of decimal digits desired after the decimal point in
03587  *  the output, negative values will be treated as meaning zero.
03588  *
03589  *  Returns a palloc'd string.
03590  */
03591 static char *
03592 get_str_from_var_sci(NumericVar *var, int rscale)
03593 {
03594     int32       exponent;
03595     NumericVar  denominator;
03596     NumericVar  significand;
03597     int         denom_scale;
03598     size_t      len;
03599     char       *str;
03600     char       *sig_out;
03601 
03602     if (rscale < 0)
03603         rscale = 0;
03604 
03605     /*
03606      * Determine the exponent of this number in normalised form.
03607      *
03608      * This is the exponent required to represent the number with only one
03609      * significant digit before the decimal place.
03610      */
03611     if (var->ndigits > 0)
03612     {
03613         exponent = (var->weight + 1) * DEC_DIGITS;
03614 
03615         /*
03616          * Compensate for leading decimal zeroes in the first numeric digit by
03617          * decrementing the exponent.
03618          */
03619         exponent -= DEC_DIGITS - (int) log10(var->digits[0]);
03620     }
03621     else
03622     {
03623         /*
03624          * If var has no digits, then it must be zero.
03625          *
03626          * Zero doesn't technically have a meaningful exponent in normalised
03627          * notation, but we just display the exponent as zero for consistency
03628          * of output.
03629          */
03630         exponent = 0;
03631     }
03632 
03633     /*
03634      * The denominator is set to 10 raised to the power of the exponent.
03635      *
03636      * We then divide var by the denominator to get the significand, rounding
03637      * to rscale decimal digits in the process.
03638      */
03639     if (exponent < 0)
03640         denom_scale = -exponent;
03641     else
03642         denom_scale = 0;
03643 
03644     init_var(&denominator);
03645     init_var(&significand);
03646 
03647     power_var_int(&const_ten, exponent, &denominator, denom_scale);
03648     div_var(var, &denominator, &significand, rscale, true);
03649     sig_out = get_str_from_var(&significand);
03650 
03651     free_var(&denominator);
03652     free_var(&significand);
03653 
03654     /*
03655      * Allocate space for the result.
03656      *
03657      * In addition to the significand, we need room for the exponent
03658      * decoration ("e"), the sign of the exponent, up to 10 digits for the
03659      * exponent itself, and of course the null terminator.
03660      */
03661     len = strlen(sig_out) + 13;
03662     str = palloc(len);
03663     snprintf(str, len, "%se%+03d", sig_out, exponent);
03664 
03665     pfree(sig_out);
03666 
03667     return str;
03668 }
03669 
03670 
03671 /*
03672  * make_result() -
03673  *
03674  *  Create the packed db numeric format in palloc()'d memory from
03675  *  a variable.
03676  */
03677 static Numeric
03678 make_result(NumericVar *var)
03679 {
03680     Numeric     result;
03681     NumericDigit *digits = var->digits;
03682     int         weight = var->weight;
03683     int         sign = var->sign;
03684     int         n;
03685     Size        len;
03686 
03687     if (sign == NUMERIC_NAN)
03688     {
03689         result = (Numeric) palloc(NUMERIC_HDRSZ_SHORT);
03690 
03691         SET_VARSIZE(result, NUMERIC_HDRSZ_SHORT);
03692         result->choice.n_header = NUMERIC_NAN;
03693         /* the header word is all we need */
03694 
03695         dump_numeric("make_result()", result);
03696         return result;
03697     }
03698 
03699     n = var->ndigits;
03700 
03701     /* truncate leading zeroes */
03702     while (n > 0 && *digits == 0)
03703     {
03704         digits++;
03705         weight--;
03706         n--;
03707     }
03708     /* truncate trailing zeroes */
03709     while (n > 0 && digits[n - 1] == 0)
03710         n--;
03711 
03712     /* If zero result, force to weight=0 and positive sign */
03713     if (n == 0)
03714     {
03715         weight = 0;
03716         sign = NUMERIC_POS;
03717     }
03718 
03719     /* Build the result */
03720     if (NUMERIC_CAN_BE_SHORT(var->dscale, weight))
03721     {
03722         len = NUMERIC_HDRSZ_SHORT + n * sizeof(NumericDigit);
03723         result = (Numeric) palloc(len);
03724         SET_VARSIZE(result, len);
03725         result->choice.n_short.n_header =
03726             (sign == NUMERIC_NEG ? (NUMERIC_SHORT | NUMERIC_SHORT_SIGN_MASK)
03727              : NUMERIC_SHORT)
03728             | (var->dscale << NUMERIC_SHORT_DSCALE_SHIFT)
03729             | (weight < 0 ? NUMERIC_SHORT_WEIGHT_SIGN_MASK : 0)
03730             | (weight & NUMERIC_SHORT_WEIGHT_MASK);
03731     }
03732     else
03733     {
03734         len = NUMERIC_HDRSZ + n * sizeof(NumericDigit);
03735         result = (Numeric) palloc(len);
03736         SET_VARSIZE(result, len);
03737         result->choice.n_long.n_sign_dscale =
03738             sign | (var->dscale & NUMERIC_DSCALE_MASK);
03739         result->choice.n_long.n_weight = weight;
03740     }
03741 
03742     memcpy(NUMERIC_DIGITS(result), digits, n * sizeof(NumericDigit));
03743     Assert(NUMERIC_NDIGITS(result) == n);
03744 
03745     /* Check for overflow of int16 fields */
03746     if (NUMERIC_WEIGHT(result) != weight ||
03747         NUMERIC_DSCALE(result) != var->dscale)
03748         ereport(ERROR,
03749                 (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
03750                  errmsg("value overflows numeric format")));
03751 
03752     dump_numeric("make_result()", result);
03753     return result;
03754 }
03755 
03756 
03757 /*
03758  * apply_typmod() -
03759  *
03760  *  Do bounds checking and rounding according to the attributes
03761  *  typmod field.
03762  */
03763 static void
03764 apply_typmod(NumericVar *var, int32 typmod)
03765 {
03766     int         precision;
03767     int         scale;
03768     int         maxdigits;
03769     int         ddigits;
03770     int         i;
03771 
03772     /* Do nothing if we have a default typmod (-1) */
03773     if (typmod < (int32) (VARHDRSZ))
03774         return;
03775 
03776     typmod -= VARHDRSZ;
03777     precision = (typmod >> 16) & 0xffff;
03778     scale = typmod & 0xffff;
03779     maxdigits = precision - scale;
03780 
03781     /* Round to target scale (and set var->dscale) */
03782     round_var(var, scale);
03783 
03784     /*
03785      * Check for overflow - note we can't do this before rounding, because
03786      * rounding could raise the weight.  Also note that the var's weight could
03787      * be inflated by leading zeroes, which will be stripped before storage
03788      * but perhaps might not have been yet. In any case, we must recognize a
03789      * true zero, whose weight doesn't mean anything.
03790      */
03791     ddigits = (var->weight + 1) * DEC_DIGITS;
03792     if (ddigits > maxdigits)
03793     {
03794         /* Determine true weight; and check for all-zero result */
03795         for (i = 0; i < var->ndigits; i++)
03796         {
03797             NumericDigit dig = var->digits[i];
03798 
03799             if (dig)
03800             {
03801                 /* Adjust for any high-order decimal zero digits */
03802 #if DEC_DIGITS == 4
03803                 if (dig < 10)
03804                     ddigits -= 3;
03805                 else if (dig < 100)
03806                     ddigits -= 2;
03807                 else if (dig < 1000)
03808                     ddigits -= 1;
03809 #elif DEC_DIGITS == 2
03810                 if (dig < 10)
03811                     ddigits -= 1;
03812 #elif DEC_DIGITS == 1
03813                 /* no adjustment */
03814 #else
03815 #error unsupported NBASE
03816 #endif
03817                 if (ddigits > maxdigits)
03818                     ereport(ERROR,
03819                             (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
03820                              errmsg("numeric field overflow"),
03821                              errdetail("A field with precision %d, scale %d must round to an absolute value less than %s%d.",
03822                                        precision, scale,
03823                     /* Display 10^0 as 1 */
03824                                        maxdigits ? "10^" : "",
03825                                        maxdigits ? maxdigits : 1
03826                                        )));
03827                 break;
03828             }
03829             ddigits -= DEC_DIGITS;
03830         }
03831     }
03832 }
03833 
03834 /*
03835  * Convert numeric to int8, rounding if needed.
03836  *
03837  * If overflow, return FALSE (no error is raised).  Return TRUE if okay.
03838  */
03839 static bool
03840 numericvar_to_int8(NumericVar *var, int64 *result)
03841 {
03842     NumericDigit *digits;
03843     int         ndigits;
03844     int         weight;
03845     int         i;
03846     int64       val,
03847                 oldval;
03848     bool        neg;
03849     NumericVar  rounded;
03850 
03851     /* Round to nearest integer */
03852     init_var(&rounded);
03853     set_var_from_var(var, &rounded);
03854     round_var(&rounded, 0);
03855 
03856     /* Check for zero input */
03857     strip_var(&rounded);
03858     ndigits = rounded.ndigits;
03859     if (ndigits == 0)
03860     {
03861         *result = 0;
03862         free_var(&rounded);
03863         return true;
03864     }
03865 
03866     /*
03867      * For input like 10000000000, we must treat stripped digits as real. So
03868      * the loop assumes there are weight+1 digits before the decimal point.
03869      */
03870     weight = rounded.weight;
03871     Assert(weight >= 0 && ndigits <= weight + 1);
03872 
03873     /* Construct the result */
03874     digits = rounded.digits;
03875     neg = (rounded.sign == NUMERIC_NEG);
03876     val = digits[0];
03877     for (i = 1; i <= weight; i++)
03878     {
03879         oldval = val;
03880         val *= NBASE;
03881         if (i < ndigits)
03882             val += digits[i];
03883 
03884         /*
03885          * The overflow check is a bit tricky because we want to accept
03886          * INT64_MIN, which will overflow the positive accumulator.  We can
03887          * detect this case easily though because INT64_MIN is the only
03888          * nonzero value for which -val == val (on a two's complement machine,
03889          * anyway).
03890          */
03891         if ((val / NBASE) != oldval)    /* possible overflow? */
03892         {
03893             if (!neg || (-val) != val || val == 0 || oldval < 0)
03894             {
03895                 free_var(&rounded);
03896                 return false;
03897             }
03898         }
03899     }
03900 
03901     free_var(&rounded);
03902 
03903     *result = neg ? -val : val;
03904     return true;
03905 }
03906 
03907 /*
03908  * Convert int8 value to numeric.
03909  */
03910 static void
03911 int8_to_numericvar(int64 val, NumericVar *var)
03912 {
03913     uint64      uval,
03914                 newuval;
03915     NumericDigit *ptr;
03916     int         ndigits;
03917 
03918     /* int8 can require at most 19 decimal digits; add one for safety */
03919     alloc_var(var, 20 / DEC_DIGITS);
03920     if (val < 0)
03921     {
03922         var->sign = NUMERIC_NEG;
03923         uval = -val;
03924     }
03925     else
03926     {
03927         var->sign = NUMERIC_POS;
03928         uval = val;
03929     }
03930     var->dscale = 0;
03931     if (val == 0)
03932     {
03933         var->ndigits = 0;
03934         var->weight = 0;
03935         return;
03936     }
03937     ptr = var->digits + var->ndigits;
03938     ndigits = 0;
03939     do
03940     {
03941         ptr--;
03942         ndigits++;
03943         newuval = uval / NBASE;
03944         *ptr = uval - newuval * NBASE;
03945         uval = newuval;
03946     } while (uval);
03947     var->digits = ptr;
03948     var->ndigits = ndigits;
03949     var->weight = ndigits - 1;
03950 }
03951 
03952 /*
03953  * Convert numeric to float8; if out of range, return +/- HUGE_VAL
03954  */
03955 static double
03956 numeric_to_double_no_overflow(Numeric num)
03957 {
03958     char       *tmp;
03959     double      val;
03960     char       *endptr;
03961 
03962     tmp = DatumGetCString(DirectFunctionCall1(numeric_out,
03963                                               NumericGetDatum(num)));
03964 
03965     /* unlike float8in, we ignore ERANGE from strtod */
03966     val = strtod(tmp, &endptr);
03967     if (*endptr != '\0')
03968     {
03969         /* shouldn't happen ... */
03970         ereport(ERROR,
03971                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
03972              errmsg("invalid input syntax for type double precision: \"%s\"",
03973                     tmp)));
03974     }
03975 
03976     pfree(tmp);
03977 
03978     return val;
03979 }
03980 
03981 /* As above, but work from a NumericVar */
03982 static double
03983 numericvar_to_double_no_overflow(NumericVar *var)
03984 {
03985     char       *tmp;
03986     double      val;
03987     char       *endptr;
03988 
03989     tmp = get_str_from_var(var);
03990 
03991     /* unlike float8in, we ignore ERANGE from strtod */
03992     val = strtod(tmp, &endptr);
03993     if (*endptr != '\0')
03994     {
03995         /* shouldn't happen ... */
03996         ereport(ERROR,
03997                 (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
03998              errmsg("invalid input syntax for type double precision: \"%s\"",
03999                     tmp)));
04000     }
04001 
04002     pfree(tmp);
04003 
04004     return val;
04005 }
04006 
04007 
04008 /*
04009  * cmp_var() -
04010  *
04011  *  Compare two values on variable level.  We assume zeroes have been
04012  *  truncated to no digits.
04013  */
04014 static int
04015 cmp_var(NumericVar *var1, NumericVar *var2)
04016 {
04017     return cmp_var_common(var1->digits, var1->ndigits,
04018                           var1->weight, var1->sign,
04019                           var2->digits, var2->ndigits,
04020                           var2->weight, var2->sign);
04021 }
04022 
04023 /*
04024  * cmp_var_common() -
04025  *
04026  *  Main routine of cmp_var(). This function can be used by both
04027  *  NumericVar and Numeric.
04028  */
04029 static int
04030 cmp_var_common(const NumericDigit *var1digits, int var1ndigits,
04031                int var1weight, int var1sign,
04032                const NumericDigit *var2digits, int var2ndigits,
04033                int var2weight, int var2sign)
04034 {
04035     if (var1ndigits == 0)
04036     {
04037         if (var2ndigits == 0)
04038             return 0;
04039         if (var2sign == NUMERIC_NEG)
04040             return 1;
04041         return -1;
04042     }
04043     if (var2ndigits == 0)
04044     {
04045         if (var1sign == NUMERIC_POS)
04046             return 1;
04047         return -1;
04048     }
04049 
04050     if (var1sign == NUMERIC_POS)
04051     {
04052         if (var2sign == NUMERIC_NEG)
04053             return 1;
04054         return cmp_abs_common(var1digits, var1ndigits, var1weight,
04055                               var2digits, var2ndigits, var2weight);
04056     }
04057 
04058     if (var2sign == NUMERIC_POS)
04059         return -1;
04060 
04061     return cmp_abs_common(var2digits, var2ndigits, var2weight,
04062                           var1digits, var1ndigits, var1weight);
04063 }
04064 
04065 
04066 /*
04067  * add_var() -
04068  *
04069  *  Full version of add functionality on variable level (handling signs).
04070  *  result might point to one of the operands too without danger.
04071  */
04072 static void
04073 add_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
04074 {
04075     /*
04076      * Decide on the signs of the two variables what to do
04077      */
04078     if (var1->sign == NUMERIC_POS)
04079     {
04080         if (var2->sign == NUMERIC_POS)
04081         {
04082             /*
04083              * Both are positive result = +(ABS(var1) + ABS(var2))
04084              */
04085             add_abs(var1, var2, result);
04086             result->sign = NUMERIC_POS;
04087         }
04088         else
04089         {
04090             /*
04091              * var1 is positive, var2 is negative Must compare absolute values
04092              */
04093             switch (cmp_abs(var1, var2))
04094             {
04095                 case 0:
04096                     /* ----------
04097                      * ABS(var1) == ABS(var2)
04098                      * result = ZERO
04099                      * ----------
04100                      */
04101                     zero_var(result);
04102                     result->dscale = Max(var1->dscale, var2->dscale);
04103                     break;
04104 
04105                 case 1:
04106                     /* ----------
04107                      * ABS(var1) > ABS(var2)
04108                      * result = +(ABS(var1) - ABS(var2))
04109                      * ----------
04110                      */
04111                     sub_abs(var1, var2, result);
04112                     result->sign = NUMERIC_POS;
04113                     break;
04114 
04115                 case -1:
04116                     /* ----------
04117                      * ABS(var1) < ABS(var2)
04118                      * result = -(ABS(var2) - ABS(var1))
04119                      * ----------
04120                      */
04121                     sub_abs(var2, var1, result);
04122                     result->sign = NUMERIC_NEG;
04123                     break;
04124             }
04125         }
04126     }
04127     else
04128     {
04129         if (var2->sign == NUMERIC_POS)
04130         {
04131             /* ----------
04132              * var1 is negative, var2 is positive
04133              * Must compare absolute values
04134              * ----------
04135              */
04136             switch (cmp_abs(var1, var2))
04137             {
04138                 case 0:
04139                     /* ----------
04140                      * ABS(var1) == ABS(var2)
04141                      * result = ZERO
04142                      * ----------
04143                      */
04144                     zero_var(result);
04145                     result->dscale = Max(var1->dscale, var2->dscale);
04146                     break;
04147 
04148                 case 1:
04149                     /* ----------
04150                      * ABS(var1) > ABS(var2)
04151                      * result = -(ABS(var1) - ABS(var2))
04152                      * ----------
04153                      */
04154                     sub_abs(var1, var2, result);
04155                     result->sign = NUMERIC_NEG;
04156                     break;
04157 
04158                 case -1:
04159                     /* ----------
04160                      * ABS(var1) < ABS(var2)
04161                      * result = +(ABS(var2) - ABS(var1))
04162                      * ----------
04163                      */
04164                     sub_abs(var2, var1, result);
04165                     result->sign = NUMERIC_POS;
04166                     break;
04167             }
04168         }
04169         else
04170         {
04171             /* ----------
04172              * Both are negative
04173              * result = -(ABS(var1) + ABS(var2))
04174              * ----------
04175              */
04176             add_abs(var1, var2, result);
04177             result->sign = NUMERIC_NEG;
04178         }
04179     }
04180 }
04181 
04182 
04183 /*
04184  * sub_var() -
04185  *
04186  *  Full version of sub functionality on variable level (handling signs).
04187  *  result might point to one of the operands too without danger.
04188  */
04189 static void
04190 sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
04191 {
04192     /*
04193      * Decide on the signs of the two variables what to do
04194      */
04195     if (var1->sign == NUMERIC_POS)
04196     {
04197         if (var2->sign == NUMERIC_NEG)
04198         {
04199             /* ----------
04200              * var1 is positive, var2 is negative
04201              * result = +(ABS(var1) + ABS(var2))
04202              * ----------
04203              */
04204             add_abs(var1, var2, result);
04205             result->sign = NUMERIC_POS;
04206         }
04207         else
04208         {
04209             /* ----------
04210              * Both are positive
04211              * Must compare absolute values
04212              * ----------
04213              */
04214             switch (cmp_abs(var1, var2))
04215             {
04216                 case 0:
04217                     /* ----------
04218                      * ABS(var1) == ABS(var2)
04219                      * result = ZERO
04220                      * ----------
04221                      */
04222                     zero_var(result);
04223                     result->dscale = Max(var1->dscale, var2->dscale);
04224                     break;
04225 
04226                 case 1:
04227                     /* ----------
04228                      * ABS(var1) > ABS(var2)
04229                      * result = +(ABS(var1) - ABS(var2))
04230                      * ----------
04231                      */
04232                     sub_abs(var1, var2, result);
04233                     result->sign = NUMERIC_POS;
04234                     break;
04235 
04236                 case -1:
04237                     /* ----------
04238                      * ABS(var1) < ABS(var2)
04239                      * result = -(ABS(var2) - ABS(var1))
04240                      * ----------
04241                      */
04242                     sub_abs(var2, var1, result);
04243                     result->sign = NUMERIC_NEG;
04244                     break;
04245             }
04246         }
04247     }
04248     else
04249     {
04250         if (var2->sign == NUMERIC_NEG)
04251         {
04252             /* ----------
04253              * Both are negative
04254              * Must compare absolute values
04255              * ----------
04256              */
04257             switch (cmp_abs(var1, var2))
04258             {
04259                 case 0:
04260                     /* ----------
04261                      * ABS(var1) == ABS(var2)
04262                      * result = ZERO
04263                      * ----------
04264                      */
04265                     zero_var(result);
04266                     result->dscale = Max(var1->dscale, var2->dscale);
04267                     break;
04268 
04269                 case 1:
04270                     /* ----------
04271                      * ABS(var1) > ABS(var2)
04272                      * result = -(ABS(var1) - ABS(var2))
04273                      * ----------
04274                      */
04275                     sub_abs(var1, var2, result);
04276                     result->sign = NUMERIC_NEG;
04277                     break;
04278 
04279                 case -1:
04280                     /* ----------
04281                      * ABS(var1) < ABS(var2)
04282                      * result = +(ABS(var2) - ABS(var1))
04283                      * ----------
04284                      */
04285                     sub_abs(var2, var1, result);
04286                     result->sign = NUMERIC_POS;
04287                     break;
04288             }
04289         }
04290         else
04291         {
04292             /* ----------
04293              * var1 is negative, var2 is positive
04294              * result = -(ABS(var1) + ABS(var2))
04295              * ----------
04296              */
04297             add_abs(var1, var2, result);
04298             result->sign = NUMERIC_NEG;
04299         }
04300     }
04301 }
04302 
04303 
04304 /*
04305  * mul_var() -
04306  *
04307  *  Multiplication on variable level. Product of var1 * var2 is stored
04308  *  in result.  Result is rounded to no more than rscale fractional digits.
04309  */
04310 static void
04311 mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
04312         int rscale)
04313 {
04314     int         res_ndigits;
04315     int         res_sign;
04316     int         res_weight;
04317     int         maxdigits;
04318     int        *dig;
04319     int         carry;
04320     int         maxdig;
04321     int         newdig;
04322     NumericDigit *res_digits;
04323     int         i,
04324                 ri,
04325                 i1,
04326                 i2;
04327 
04328     /* copy these values into local vars for speed in inner loop */
04329     int         var1ndigits = var1->ndigits;
04330     int         var2ndigits = var2->ndigits;
04331     NumericDigit *var1digits = var1->digits;
04332     NumericDigit *var2digits = var2->digits;
04333 
04334     if (var1ndigits == 0 || var2ndigits == 0)
04335     {
04336         /* one or both inputs is zero; so is result */
04337         zero_var(result);
04338         result->dscale = rscale;
04339         return;
04340     }
04341 
04342     /* Determine result sign and (maximum possible) weight */
04343     if (var1->sign == var2->sign)
04344         res_sign = NUMERIC_POS;
04345     else
04346         res_sign = NUMERIC_NEG;
04347     res_weight = var1->weight + var2->weight + 2;
04348 
04349     /*
04350      * Determine number of result digits to compute.  If the exact result
04351      * would have more than rscale fractional digits, truncate the computation
04352      * with MUL_GUARD_DIGITS guard digits.  We do that by pretending that one
04353      * or both inputs have fewer digits than they really do.
04354      */
04355     res_ndigits = var1ndigits + var2ndigits + 1;
04356     maxdigits = res_weight + 1 + (rscale * DEC_DIGITS) + MUL_GUARD_DIGITS;
04357     if (res_ndigits > maxdigits)
04358     {
04359         if (maxdigits < 3)
04360         {
04361             /* no useful precision at all in the result... */
04362             zero_var(result);
04363             result->dscale = rscale;
04364             return;
04365         }
04366         /* force maxdigits odd so that input ndigits can be equal */
04367         if ((maxdigits & 1) == 0)
04368             maxdigits++;
04369         if (var1ndigits > var2ndigits)
04370         {
04371             var1ndigits -= res_ndigits - maxdigits;
04372             if (var1ndigits < var2ndigits)
04373                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
04374         }
04375         else
04376         {
04377             var2ndigits -= res_ndigits - maxdigits;
04378             if (var2ndigits < var1ndigits)
04379                 var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
04380         }
04381         res_ndigits = maxdigits;
04382         Assert(res_ndigits == var1ndigits + var2ndigits + 1);
04383     }
04384 
04385     /*
04386      * We do the arithmetic in an array "dig[]" of signed int's.  Since
04387      * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
04388      * to avoid normalizing carries immediately.
04389      *
04390      * maxdig tracks the maximum possible value of any dig[] entry; when this
04391      * threatens to exceed INT_MAX, we take the time to propagate carries. To
04392      * avoid overflow in maxdig itself, it actually represents the max
04393      * possible value divided by NBASE-1.
04394      */
04395     dig = (int *) palloc0(res_ndigits * sizeof(int));
04396     maxdig = 0;
04397 
04398     ri = res_ndigits - 1;
04399     for (i1 = var1ndigits - 1; i1 >= 0; ri--, i1--)
04400     {
04401         int         var1digit = var1digits[i1];
04402 
04403         if (var1digit == 0)
04404             continue;
04405 
04406         /* Time to normalize? */
04407         maxdig += var1digit;
04408         if (maxdig > INT_MAX / (NBASE - 1))
04409         {
04410             /* Yes, do it */
04411             carry = 0;
04412             for (i = res_ndigits - 1; i >= 0; i--)
04413             {
04414                 newdig = dig[i] + carry;
04415                 if (newdig >= NBASE)
04416                 {
04417                     carry = newdig / NBASE;
04418                     newdig -= carry * NBASE;
04419                 }
04420                 else
04421                     carry = 0;
04422                 dig[i] = newdig;
04423             }
04424             Assert(carry == 0);
04425             /* Reset maxdig to indicate new worst-case */
04426             maxdig = 1 + var1digit;
04427         }
04428 
04429         /* Add appropriate multiple of var2 into the accumulator */
04430         i = ri;
04431         for (i2 = var2ndigits - 1; i2 >= 0; i2--)
04432             dig[i--] += var1digit * var2digits[i2];
04433     }
04434 
04435     /*
04436      * Now we do a final carry propagation pass to normalize the result, which
04437      * we combine with storing the result digits into the output. Note that
04438      * this is still done at full precision w/guard digits.
04439      */
04440     alloc_var(result, res_ndigits);
04441     res_digits = result->digits;
04442     carry = 0;
04443     for (i = res_ndigits - 1; i >= 0; i--)
04444     {
04445         newdig = dig[i] + carry;
04446         if (newdig >= NBASE)
04447         {
04448             carry = newdig / NBASE;
04449             newdig -= carry * NBASE;
04450         }
04451         else
04452             carry = 0;
04453         res_digits[i] = newdig;
04454     }
04455     Assert(carry == 0);
04456 
04457     pfree(dig);
04458 
04459     /*
04460      * Finally, round the result to the requested precision.
04461      */
04462     result->weight = res_weight;
04463     result->sign = res_sign;
04464 
04465     /* Round to target rscale (and set result->dscale) */
04466     round_var(result, rscale);
04467 
04468     /* Strip leading and trailing zeroes */
04469     strip_var(result);
04470 }
04471 
04472 
04473 /*
04474  * div_var() -
04475  *
04476  *  Division on variable level. Quotient of var1 / var2 is stored in result.
04477  *  The quotient is figured to exactly rscale fractional digits.
04478  *  If round is true, it is rounded at the rscale'th digit; if false, it
04479  *  is truncated (towards zero) at that digit.
04480  */
04481 static void
04482 div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
04483         int rscale, bool round)
04484 {
04485     int         div_ndigits;
04486     int         res_ndigits;
04487     int         res_sign;
04488     int         res_weight;
04489     int         carry;
04490     int         borrow;
04491     int         divisor1;
04492     int         divisor2;
04493     NumericDigit *dividend;
04494     NumericDigit *divisor;
04495     NumericDigit *res_digits;
04496     int         i;
04497     int         j;
04498 
04499     /* copy these values into local vars for speed in inner loop */
04500     int         var1ndigits = var1->ndigits;
04501     int         var2ndigits = var2->ndigits;
04502 
04503     /*
04504      * First of all division by zero check; we must not be handed an
04505      * unnormalized divisor.
04506      */
04507     if (var2ndigits == 0 || var2->digits[0] == 0)
04508         ereport(ERROR,
04509                 (errcode(ERRCODE_DIVISION_BY_ZERO),
04510                  errmsg("division by zero")));
04511 
04512     /*
04513      * Now result zero check
04514      */
04515     if (var1ndigits == 0)
04516     {
04517         zero_var(result);
04518         result->dscale = rscale;
04519         return;
04520     }
04521 
04522     /*
04523      * Determine the result sign, weight and number of digits to calculate.
04524      * The weight figured here is correct if the emitted quotient has no
04525      * leading zero digits; otherwise strip_var() will fix things up.
04526      */
04527     if (var1->sign == var2->sign)
04528         res_sign = NUMERIC_POS;
04529     else
04530         res_sign = NUMERIC_NEG;
04531     res_weight = var1->weight - var2->weight;
04532     /* The number of accurate result digits we need to produce: */
04533     res_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS;
04534     /* ... but always at least 1 */
04535     res_ndigits = Max(res_ndigits, 1);
04536     /* If rounding needed, figure one more digit to ensure correct result */
04537     if (round)
04538         res_ndigits++;
04539 
04540     /*
04541      * The working dividend normally requires res_ndigits + var2ndigits
04542      * digits, but make it at least var1ndigits so we can load all of var1
04543      * into it.  (There will be an additional digit dividend[0] in the
04544      * dividend space, but for consistency with Knuth's notation we don't
04545      * count that in div_ndigits.)
04546      */
04547     div_ndigits = res_ndigits + var2ndigits;
04548     div_ndigits = Max(div_ndigits, var1ndigits);
04549 
04550     /*
04551      * We need a workspace with room for the working dividend (div_ndigits+1
04552      * digits) plus room for the possibly-normalized divisor (var2ndigits
04553      * digits).  It is convenient also to have a zero at divisor[0] with the
04554      * actual divisor data in divisor[1 .. var2ndigits].  Transferring the
04555      * digits into the workspace also allows us to realloc the result (which
04556      * might be the same as either input var) before we begin the main loop.
04557      * Note that we use palloc0 to ensure that divisor[0], dividend[0], and
04558      * any additional dividend positions beyond var1ndigits, start out 0.
04559      */
04560     dividend = (NumericDigit *)
04561         palloc0((div_ndigits + var2ndigits + 2) * sizeof(NumericDigit));
04562     divisor = dividend + (div_ndigits + 1);
04563     memcpy(dividend + 1, var1->digits, var1ndigits * sizeof(NumericDigit));
04564     memcpy(divisor + 1, var2->digits, var2ndigits * sizeof(NumericDigit));
04565 
04566     /*
04567      * Now we can realloc the result to hold the generated quotient digits.
04568      */
04569     alloc_var(result, res_ndigits);
04570     res_digits = result->digits;
04571 
04572     if (var2ndigits == 1)
04573     {
04574         /*
04575          * If there's only a single divisor digit, we can use a fast path (cf.
04576          * Knuth section 4.3.1 exercise 16).
04577          */
04578         divisor1 = divisor[1];
04579         carry = 0;
04580         for (i = 0; i < res_ndigits; i++)
04581         {
04582             carry = carry * NBASE + dividend[i + 1];
04583             res_digits[i] = carry / divisor1;
04584             carry = carry % divisor1;
04585         }
04586     }
04587     else
04588     {
04589         /*
04590          * The full multiple-place algorithm is taken from Knuth volume 2,
04591          * Algorithm 4.3.1D.
04592          *
04593          * We need the first divisor digit to be >= NBASE/2.  If it isn't,
04594          * make it so by scaling up both the divisor and dividend by the
04595          * factor "d".  (The reason for allocating dividend[0] above is to
04596          * leave room for possible carry here.)
04597          */
04598         if (divisor[1] < HALF_NBASE)
04599         {
04600             int         d = NBASE / (divisor[1] + 1);
04601 
04602             carry = 0;
04603             for (i = var2ndigits; i > 0; i--)
04604             {
04605                 carry += divisor[i] * d;
04606                 divisor[i] = carry % NBASE;
04607                 carry = carry / NBASE;
04608             }
04609             Assert(carry == 0);
04610             carry = 0;
04611             /* at this point only var1ndigits of dividend can be nonzero */
04612             for (i = var1ndigits; i >= 0; i--)
04613             {
04614                 carry += dividend[i] * d;
04615                 dividend[i] = carry % NBASE;
04616                 carry = carry / NBASE;
04617             }
04618             Assert(carry == 0);
04619             Assert(divisor[1] >= HALF_NBASE);
04620         }
04621         /* First 2 divisor digits are used repeatedly in main loop */
04622         divisor1 = divisor[1];
04623         divisor2 = divisor[2];
04624 
04625         /*
04626          * Begin the main loop.  Each iteration of this loop produces the j'th
04627          * quotient digit by dividing dividend[j .. j + var2ndigits] by the
04628          * divisor; this is essentially the same as the common manual
04629          * procedure for long division.
04630          */
04631         for (j = 0; j < res_ndigits; j++)
04632         {
04633             /* Estimate quotient digit from the first two dividend digits */
04634             int         next2digits = dividend[j] * NBASE + dividend[j + 1];
04635             int         qhat;
04636 
04637             /*
04638              * If next2digits are 0, then quotient digit must be 0 and there's
04639              * no need to adjust the working dividend.  It's worth testing
04640              * here to fall out ASAP when processing trailing zeroes in a
04641              * dividend.
04642              */
04643             if (next2digits == 0)
04644             {
04645                 res_digits[j] = 0;
04646                 continue;
04647             }
04648 
04649             if (dividend[j] == divisor1)
04650                 qhat = NBASE - 1;
04651             else
04652                 qhat = next2digits / divisor1;
04653 
04654             /*
04655              * Adjust quotient digit if it's too large.  Knuth proves that
04656              * after this step, the quotient digit will be either correct or
04657              * just one too large.  (Note: it's OK to use dividend[j+2] here
04658              * because we know the divisor length is at least 2.)
04659              */
04660             while (divisor2 * qhat >
04661                    (next2digits - qhat * divisor1) * NBASE + dividend[j + 2])
04662                 qhat--;
04663 
04664             /* As above, need do nothing more when quotient digit is 0 */
04665             if (qhat > 0)
04666             {
04667                 /*
04668                  * Multiply the divisor by qhat, and subtract that from the
04669                  * working dividend.  "carry" tracks the multiplication,
04670                  * "borrow" the subtraction (could we fold these together?)
04671                  */
04672                 carry = 0;
04673                 borrow = 0;
04674                 for (i = var2ndigits; i >= 0; i--)
04675                 {
04676                     carry += divisor[i] * qhat;
04677                     borrow -= carry % NBASE;
04678                     carry = carry / NBASE;
04679                     borrow += dividend[j + i];
04680                     if (borrow < 0)
04681                     {
04682                         dividend[j + i] = borrow + NBASE;
04683                         borrow = -1;
04684                     }
04685                     else
04686                     {
04687                         dividend[j + i] = borrow;
04688                         borrow = 0;
04689                     }
04690                 }
04691                 Assert(carry == 0);
04692 
04693                 /*
04694                  * If we got a borrow out of the top dividend digit, then
04695                  * indeed qhat was one too large.  Fix it, and add back the
04696                  * divisor to correct the working dividend.  (Knuth proves
04697                  * that this will occur only about 3/NBASE of the time; hence,
04698                  * it's a good idea to test this code with small NBASE to be
04699                  * sure this section gets exercised.)
04700                  */
04701                 if (borrow)
04702                 {
04703                     qhat--;
04704                     carry = 0;
04705                     for (i = var2ndigits; i >= 0; i--)
04706                     {
04707                         carry += dividend[j + i] + divisor[i];
04708                         if (carry >= NBASE)
04709                         {
04710                             dividend[j + i] = carry - NBASE;
04711                             carry = 1;
04712                         }
04713                         else
04714                         {
04715                             dividend[j + i] = carry;
04716                             carry = 0;
04717                         }
04718                     }
04719                     /* A carry should occur here to cancel the borrow above */
04720                     Assert(carry == 1);
04721                 }
04722             }
04723 
04724             /* And we're done with this quotient digit */
04725             res_digits[j] = qhat;
04726         }
04727     }
04728 
04729     pfree(dividend);
04730 
04731     /*
04732      * Finally, round or truncate the result to the requested precision.
04733      */
04734     result->weight = res_weight;
04735     result->sign = res_sign;
04736 
04737     /* Round or truncate to target rscale (and set result->dscale) */
04738     if (round)
04739         round_var(result, rscale);
04740     else
04741         trunc_var(result, rscale);
04742 
04743     /* Strip leading and trailing zeroes */
04744     strip_var(result);
04745 }
04746 
04747 
04748 /*
04749  * div_var_fast() -
04750  *
04751  *  This has the same API as div_var, but is implemented using the division
04752  *  algorithm from the "FM" library, rather than Knuth's schoolbook-division
04753  *  approach.  This is significantly faster but can produce inaccurate
04754  *  results, because it sometimes has to propagate rounding to the left,
04755  *  and so we can never be entirely sure that we know the requested digits
04756  *  exactly.  We compute DIV_GUARD_DIGITS extra digits, but there is
04757  *  no certainty that that's enough.  We use this only in the transcendental
04758  *  function calculation routines, where everything is approximate anyway.
04759  */
04760 static void
04761 div_var_fast(NumericVar *var1, NumericVar *var2, NumericVar *result,
04762              int rscale, bool round)
04763 {
04764     int         div_ndigits;
04765     int         res_sign;
04766     int         res_weight;
04767     int        *div;
04768     int         qdigit;
04769     int         carry;
04770     int         maxdiv;
04771     int         newdig;
04772     NumericDigit *res_digits;
04773     double      fdividend,
04774                 fdivisor,
04775                 fdivisorinverse,
04776                 fquotient;
04777     int         qi;
04778     int         i;
04779 
04780     /* copy these values into local vars for speed in inner loop */
04781     int         var1ndigits = var1->ndigits;
04782     int         var2ndigits = var2->ndigits;
04783     NumericDigit *var1digits = var1->digits;
04784     NumericDigit *var2digits = var2->digits;
04785 
04786     /*
04787      * First of all division by zero check; we must not be handed an
04788      * unnormalized divisor.
04789      */
04790     if (var2ndigits == 0 || var2digits[0] == 0)
04791         ereport(ERROR,
04792                 (errcode(ERRCODE_DIVISION_BY_ZERO),
04793                  errmsg("division by zero")));
04794 
04795     /*
04796      * Now result zero check
04797      */
04798     if (var1ndigits == 0)
04799     {
04800         zero_var(result);
04801         result->dscale = rscale;
04802         return;
04803     }
04804 
04805     /*
04806      * Determine the result sign, weight and number of digits to calculate
04807      */
04808     if (var1->sign == var2->sign)
04809         res_sign = NUMERIC_POS;
04810     else
04811         res_sign = NUMERIC_NEG;
04812     res_weight = var1->weight - var2->weight + 1;
04813     /* The number of accurate result digits we need to produce: */
04814     div_ndigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS;
04815     /* Add guard digits for roundoff error */
04816     div_ndigits += DIV_GUARD_DIGITS;
04817     if (div_ndigits < DIV_GUARD_DIGITS)
04818         div_ndigits = DIV_GUARD_DIGITS;
04819     /* Must be at least var1ndigits, too, to simplify data-loading loop */
04820     if (div_ndigits < var1ndigits)
04821         div_ndigits = var1ndigits;
04822 
04823     /*
04824      * We do the arithmetic in an array "div[]" of signed int's.  Since
04825      * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
04826      * to avoid normalizing carries immediately.
04827      *
04828      * We start with div[] containing one zero digit followed by the
04829      * dividend's digits (plus appended zeroes to reach the desired precision
04830      * including guard digits).  Each step of the main loop computes an
04831      * (approximate) quotient digit and stores it into div[], removing one
04832      * position of dividend space.  A final pass of carry propagation takes
04833      * care of any mistaken quotient digits.
04834      */
04835     div = (int *) palloc0((div_ndigits + 1) * sizeof(int));
04836     for (i = 0; i < var1ndigits; i++)
04837         div[i + 1] = var1digits[i];
04838 
04839     /*
04840      * We estimate each quotient digit using floating-point arithmetic, taking
04841      * the first four digits of the (current) dividend and divisor. This must
04842      * be float to avoid overflow.
04843      */
04844     fdivisor = (double) var2digits[0];
04845     for (i = 1; i < 4; i++)
04846     {
04847         fdivisor *= NBASE;
04848         if (i < var2ndigits)
04849             fdivisor += (double) var2digits[i];
04850     }
04851     fdivisorinverse = 1.0 / fdivisor;
04852 
04853     /*
04854      * maxdiv tracks the maximum possible absolute value of any div[] entry;
04855      * when this threatens to exceed INT_MAX, we take the time to propagate
04856      * carries.  To avoid overflow in maxdiv itself, it actually represents
04857      * the max possible abs. value divided by NBASE-1.
04858      */
04859     maxdiv = 1;
04860 
04861     /*
04862      * Outer loop computes next quotient digit, which will go into div[qi]
04863      */
04864     for (qi = 0; qi < div_ndigits; qi++)
04865     {
04866         /* Approximate the current dividend value */
04867         fdividend = (double) div[qi];
04868         for (i = 1; i < 4; i++)
04869         {
04870             fdividend *= NBASE;
04871             if (qi + i <= div_ndigits)
04872                 fdividend += (double) div[qi + i];
04873         }
04874         /* Compute the (approximate) quotient digit */
04875         fquotient = fdividend * fdivisorinverse;
04876         qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
04877             (((int) fquotient) - 1);    /* truncate towards -infinity */
04878 
04879         if (qdigit != 0)
04880         {
04881             /* Do we need to normalize now? */
04882             maxdiv += Abs(qdigit);
04883             if (maxdiv > INT_MAX / (NBASE - 1))
04884             {
04885                 /* Yes, do it */
04886                 carry = 0;
04887                 for (i = div_ndigits; i > qi; i--)
04888                 {
04889                     newdig = div[i] + carry;
04890                     if (newdig < 0)
04891                     {
04892                         carry = -((-newdig - 1) / NBASE) - 1;
04893                         newdig -= carry * NBASE;
04894                     }
04895                     else if (newdig >= NBASE)
04896                     {
04897                         carry = newdig / NBASE;
04898                         newdig -= carry * NBASE;
04899                     }
04900                     else
04901                         carry = 0;
04902                     div[i] = newdig;
04903                 }
04904                 newdig = div[qi] + carry;
04905                 div[qi] = newdig;
04906 
04907                 /*
04908                  * All the div[] digits except possibly div[qi] are now in the
04909                  * range 0..NBASE-1.
04910                  */
04911                 maxdiv = Abs(newdig) / (NBASE - 1);
04912                 maxdiv = Max(maxdiv, 1);
04913 
04914                 /*
04915                  * Recompute the quotient digit since new info may have
04916                  * propagated into the top four dividend digits
04917                  */
04918                 fdividend = (double) div[qi];
04919                 for (i = 1; i < 4; i++)
04920                 {
04921                     fdividend *= NBASE;
04922                     if (qi + i <= div_ndigits)
04923                         fdividend += (double) div[qi + i];
04924                 }
04925                 /* Compute the (approximate) quotient digit */
04926                 fquotient = fdividend * fdivisorinverse;
04927                 qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
04928                     (((int) fquotient) - 1);    /* truncate towards -infinity */
04929                 maxdiv += Abs(qdigit);
04930             }
04931 
04932             /* Subtract off the appropriate multiple of the divisor */
04933             if (qdigit != 0)
04934             {
04935                 int         istop = Min(var2ndigits, div_ndigits - qi + 1);
04936 
04937                 for (i = 0; i < istop; i++)
04938                     div[qi + i] -= qdigit * var2digits[i];
04939             }
04940         }
04941 
04942         /*
04943          * The dividend digit we are about to replace might still be nonzero.
04944          * Fold it into the next digit position.  We don't need to worry about
04945          * overflow here since this should nearly cancel with the subtraction
04946          * of the divisor.
04947          */
04948         div[qi + 1] += div[qi] * NBASE;
04949 
04950         div[qi] = qdigit;
04951     }
04952 
04953     /*
04954      * Approximate and store the last quotient digit (div[div_ndigits])
04955      */
04956     fdividend = (double) div[qi];
04957     for (i = 1; i < 4; i++)
04958         fdividend *= NBASE;
04959     fquotient = fdividend * fdivisorinverse;
04960     qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
04961         (((int) fquotient) - 1);    /* truncate towards -infinity */
04962     div[qi] = qdigit;
04963 
04964     /*
04965      * Now we do a final carry propagation pass to normalize the result, which
04966      * we combine with storing the result digits into the output. Note that
04967      * this is still done at full precision w/guard digits.
04968      */
04969     alloc_var(result, div_ndigits + 1);
04970     res_digits = result->digits;
04971     carry = 0;
04972     for (i = div_ndigits; i >= 0; i--)
04973     {
04974         newdig = div[i] + carry;
04975         if (newdig < 0)
04976         {
04977             carry = -((-newdig - 1) / NBASE) - 1;
04978             newdig -= carry * NBASE;
04979         }
04980         else if (newdig >= NBASE)
04981         {
04982             carry = newdig / NBASE;
04983             newdig -= carry * NBASE;
04984         }
04985         else
04986             carry = 0;
04987         res_digits[i] = newdig;
04988     }
04989     Assert(carry == 0);
04990 
04991     pfree(div);
04992 
04993     /*
04994      * Finally, round the result to the requested precision.
04995      */
04996     result->weight = res_weight;
04997     result->sign = res_sign;
04998 
04999     /* Round to target rscale (and set result->dscale) */
05000     if (round)
05001         round_var(result, rscale);
05002     else
05003         trunc_var(result, rscale);
05004 
05005     /* Strip leading and trailing zeroes */
05006     strip_var(result);
05007 }
05008 
05009 
05010 /*
05011  * Default scale selection for division
05012  *
05013  * Returns the appropriate result scale for the division result.
05014  */
05015 static int
05016 select_div_scale(NumericVar *var1, NumericVar *var2)
05017 {
05018     int         weight1,
05019                 weight2,
05020                 qweight,
05021                 i;
05022     NumericDigit firstdigit1,
05023                 firstdigit2;
05024     int         rscale;
05025 
05026     /*
05027      * The result scale of a division isn't specified in any SQL standard. For
05028      * PostgreSQL we select a result scale that will give at least
05029      * NUMERIC_MIN_SIG_DIGITS significant digits, so that numeric gives a
05030      * result no less accurate than float8; but use a scale not less than
05031      * either input's display scale.
05032      */
05033 
05034     /* Get the actual (normalized) weight and first digit of each input */
05035 
05036     weight1 = 0;                /* values to use if var1 is zero */
05037     firstdigit1 = 0;
05038     for (i = 0; i < var1->ndigits; i++)
05039     {
05040         firstdigit1 = var1->digits[i];
05041         if (firstdigit1 != 0)
05042         {
05043             weight1 = var1->weight - i;
05044             break;
05045         }
05046     }
05047 
05048     weight2 = 0;                /* values to use if var2 is zero */
05049     firstdigit2 = 0;
05050     for (i = 0; i < var2->ndigits; i++)
05051     {
05052         firstdigit2 = var2->digits[i];
05053         if (firstdigit2 != 0)
05054         {
05055             weight2 = var2->weight - i;
05056             break;
05057         }
05058     }
05059 
05060     /*
05061      * Estimate weight of quotient.  If the two first digits are equal, we
05062      * can't be sure, but assume that var1 is less than var2.
05063      */
05064     qweight = weight1 - weight2;
05065     if (firstdigit1 <= firstdigit2)
05066         qweight--;
05067 
05068     /* Select result scale */
05069     rscale = NUMERIC_MIN_SIG_DIGITS - qweight * DEC_DIGITS;
05070     rscale = Max(rscale, var1->dscale);
05071     rscale = Max(rscale, var2->dscale);
05072     rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
05073     rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
05074 
05075     return rscale;
05076 }
05077 
05078 
05079 /*
05080  * mod_var() -
05081  *
05082  *  Calculate the modulo of two numerics at variable level
05083  */
05084 static void
05085 mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
05086 {
05087     NumericVar  tmp;
05088 
05089     init_var(&tmp);
05090 
05091     /* ---------
05092      * We do this using the equation
05093      *      mod(x,y) = x - trunc(x/y)*y
05094      * div_var can be persuaded to give us trunc(x/y) directly.
05095      * ----------
05096      */
05097     div_var(var1, var2, &tmp, 0, false);
05098 
05099     mul_var(var2, &tmp, &tmp, var2->dscale);
05100 
05101     sub_var(var1, &tmp, result);
05102 
05103     free_var(&tmp);
05104 }
05105 
05106 
05107 /*
05108  * ceil_var() -
05109  *
05110  *  Return the smallest integer greater than or equal to the argument
05111  *  on variable level
05112  */
05113 static void
05114 ceil_var(NumericVar *var, NumericVar *result)
05115 {
05116     NumericVar  tmp;
05117 
05118     init_var(&tmp);
05119     set_var_from_var(var, &tmp);
05120 
05121     trunc_var(&tmp, 0);
05122 
05123     if (var->sign == NUMERIC_POS && cmp_var(var, &tmp) != 0)
05124         add_var(&tmp, &const_one, &tmp);
05125 
05126     set_var_from_var(&tmp, result);
05127     free_var(&tmp);
05128 }
05129 
05130 
05131 /*
05132  * floor_var() -
05133  *
05134  *  Return the largest integer equal to or less than the argument
05135  *  on variable level
05136  */
05137 static void
05138 floor_var(NumericVar *var, NumericVar *result)
05139 {
05140     NumericVar  tmp;
05141 
05142     init_var(&tmp);
05143     set_var_from_var(var, &tmp);
05144 
05145     trunc_var(&tmp, 0);
05146 
05147     if (var->sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0)
05148         sub_var(&tmp, &const_one, &tmp);
05149 
05150     set_var_from_var(&tmp, result);
05151     free_var(&tmp);
05152 }
05153 
05154 
05155 /*
05156  * sqrt_var() -
05157  *
05158  *  Compute the square root of x using Newton's algorithm
05159  */
05160 static void
05161 sqrt_var(NumericVar *arg, NumericVar *result, int rscale)
05162 {
05163     NumericVar  tmp_arg;
05164     NumericVar  tmp_val;
05165     NumericVar  last_val;
05166     int         local_rscale;
05167     int         stat;
05168 
05169     local_rscale = rscale + 8;
05170 
05171     stat = cmp_var(arg, &const_zero);
05172     if (stat == 0)
05173     {
05174         zero_var(result);
05175         result->dscale = rscale;
05176         return;
05177     }
05178 
05179     /*
05180      * SQL2003 defines sqrt() in terms of power, so we need to emit the right
05181      * SQLSTATE error code if the operand is negative.
05182      */
05183     if (stat < 0)
05184         ereport(ERROR,
05185                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_POWER_FUNCTION),
05186                  errmsg("cannot take square root of a negative number")));
05187 
05188     init_var(&tmp_arg);
05189     init_var(&tmp_val);
05190     init_var(&last_val);
05191 
05192     /* Copy arg in case it is the same var as result */
05193     set_var_from_var(arg, &tmp_arg);
05194 
05195     /*
05196      * Initialize the result to the first guess
05197      */
05198     alloc_var(result, 1);
05199     result->digits[0] = tmp_arg.digits[0] / 2;
05200     if (result->digits[0] == 0)
05201         result->digits[0] = 1;
05202     result->weight = tmp_arg.weight / 2;
05203     result->sign = NUMERIC_POS;
05204 
05205     set_var_from_var(result, &last_val);
05206 
05207     for (;;)
05208     {
05209         div_var_fast(&tmp_arg, result, &tmp_val, local_rscale, true);
05210 
05211         add_var(result, &tmp_val, result);
05212         mul_var(result, &const_zero_point_five, result, local_rscale);
05213 
05214         if (cmp_var(&last_val, result) == 0)
05215             break;
05216         set_var_from_var(result, &last_val);
05217     }
05218 
05219     free_var(&last_val);
05220     free_var(&tmp_val);
05221     free_var(&tmp_arg);
05222 
05223     /* Round to requested precision */
05224     round_var(result, rscale);
05225 }
05226 
05227 
05228 /*
05229  * exp_var() -
05230  *
05231  *  Raise e to the power of x
05232  */
05233 static void
05234 exp_var(NumericVar *arg, NumericVar *result, int rscale)
05235 {
05236     NumericVar  x;
05237     int         xintval;
05238     bool        xneg = FALSE;
05239     int         local_rscale;
05240 
05241     /*----------
05242      * We separate the integral and fraction parts of x, then compute
05243      *      e^x = e^xint * e^xfrac
05244      * where e = exp(1) and e^xfrac = exp(xfrac) are computed by
05245      * exp_var_internal; the limited range of inputs allows that routine
05246      * to do a good job with a simple Taylor series.  Raising e^xint is
05247      * done by repeated multiplications in power_var_int.
05248      *----------
05249      */
05250     init_var(&x);
05251 
05252     set_var_from_var(arg, &x);
05253 
05254     if (x.sign == NUMERIC_NEG)
05255     {
05256         xneg = TRUE;
05257         x.sign = NUMERIC_POS;
05258     }
05259 
05260     /* Extract the integer part, remove it from x */
05261     xintval = 0;
05262     while (x.weight >= 0)
05263     {
05264         xintval *= NBASE;
05265         if (x.ndigits > 0)
05266         {
05267             xintval += x.digits[0];
05268             x.digits++;
05269             x.ndigits--;
05270         }
05271         x.weight--;
05272         /* Guard against overflow */
05273         if (xintval >= NUMERIC_MAX_RESULT_SCALE * 3)
05274             ereport(ERROR,
05275                     (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
05276                      errmsg("argument for function \"exp\" too big")));
05277     }
05278 
05279     /* Select an appropriate scale for internal calculation */
05280     local_rscale = rscale + MUL_GUARD_DIGITS * 2;
05281 
05282     /* Compute e^xfrac */
05283     exp_var_internal(&x, result, local_rscale);
05284 
05285     /* If there's an integer part, multiply by e^xint */
05286     if (xintval > 0)
05287     {
05288         NumericVar  e;
05289 
05290         init_var(&e);
05291         exp_var_internal(&const_one, &e, local_rscale);
05292         power_var_int(&e, xintval, &e, local_rscale);
05293         mul_var(&e, result, result, local_rscale);
05294         free_var(&e);
05295     }
05296 
05297     /* Compensate for input sign, and round to requested rscale */
05298     if (xneg)
05299         div_var_fast(&const_one, result, result, rscale, true);
05300     else
05301         round_var(result, rscale);
05302 
05303     free_var(&x);
05304 }
05305 
05306 
05307 /*
05308  * exp_var_internal() -
05309  *
05310  *  Raise e to the power of x, where 0 <= x <= 1
05311  *
05312  * NB: the result should be good to at least rscale digits, but it has
05313  * *not* been rounded off; the caller must do that if wanted.
05314  */
05315 static void
05316 exp_var_internal(NumericVar *arg, NumericVar *result, int rscale)
05317 {
05318     NumericVar  x;
05319     NumericVar  xpow;
05320     NumericVar  ifac;
05321     NumericVar  elem;
05322     NumericVar  ni;
05323     int         ndiv2 = 0;
05324     int         local_rscale;
05325 
05326     init_var(&x);
05327     init_var(&xpow);
05328     init_var(&ifac);
05329     init_var(&elem);
05330     init_var(&ni);
05331 
05332     set_var_from_var(arg, &x);
05333 
05334     Assert(x.sign == NUMERIC_POS);
05335 
05336     local_rscale = rscale + 8;
05337 
05338     /* Reduce input into range 0 <= x <= 0.01 */
05339     while (cmp_var(&x, &const_zero_point_01) > 0)
05340     {
05341         ndiv2++;
05342         local_rscale++;
05343         mul_var(&x, &const_zero_point_five, &x, x.dscale + 1);
05344     }
05345 
05346     /*
05347      * Use the Taylor series
05348      *
05349      * exp(x) = 1 + x + x^2/2! + x^3/3! + ...
05350      *
05351      * Given the limited range of x, this should converge reasonably quickly.
05352      * We run the series until the terms fall below the local_rscale limit.
05353      */
05354     add_var(&const_one, &x, result);
05355     set_var_from_var(&x, &xpow);
05356     set_var_from_var(&const_one, &ifac);
05357     set_var_from_var(&const_one, &ni);
05358 
05359     for (;;)
05360     {
05361         add_var(&ni, &const_one, &ni);
05362         mul_var(&xpow, &x, &xpow, local_rscale);
05363         mul_var(&ifac, &ni, &ifac, 0);
05364         div_var_fast(&xpow, &ifac, &elem, local_rscale, true);
05365 
05366         if (elem.ndigits == 0)
05367             break;
05368 
05369         add_var(result, &elem, result);
05370     }
05371 
05372     /* Compensate for argument range reduction */
05373     while (ndiv2-- > 0)
05374         mul_var(result, result, result, local_rscale);
05375 
05376     free_var(&x);
05377     free_var(&xpow);
05378     free_var(&ifac);
05379     free_var(&elem);
05380     free_var(&ni);
05381 }
05382 
05383 
05384 /*
05385  * ln_var() -
05386  *
05387  *  Compute the natural log of x
05388  */
05389 static void
05390 ln_var(NumericVar *arg, NumericVar *result, int rscale)
05391 {
05392     NumericVar  x;
05393     NumericVar  xx;
05394     NumericVar  ni;
05395     NumericVar  elem;
05396     NumericVar  fact;
05397     int         local_rscale;
05398     int         cmp;
05399 
05400     cmp = cmp_var(arg, &const_zero);
05401     if (cmp == 0)
05402         ereport(ERROR,
05403                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
05404                  errmsg("cannot take logarithm of zero")));
05405     else if (cmp < 0)
05406         ereport(ERROR,
05407                 (errcode(ERRCODE_INVALID_ARGUMENT_FOR_LOG),
05408                  errmsg("cannot take logarithm of a negative number")));
05409 
05410     local_rscale = rscale + 8;
05411 
05412     init_var(&x);
05413     init_var(&xx);
05414     init_var(&ni);
05415     init_var(&elem);
05416     init_var(&fact);
05417 
05418     set_var_from_var(arg, &x);
05419     set_var_from_var(&const_two, &fact);
05420 
05421     /* Reduce input into range 0.9 < x < 1.1 */
05422     while (cmp_var(&x, &const_zero_point_nine) <= 0)
05423     {
05424         local_rscale++;
05425         sqrt_var(&x, &x, local_rscale);
05426         mul_var(&fact, &const_two, &fact, 0);
05427     }
05428     while (cmp_var(&x, &const_one_point_one) >= 0)
05429     {
05430         local_rscale++;
05431         sqrt_var(&x, &x, local_rscale);
05432         mul_var(&fact, &const_two, &fact, 0);
05433     }
05434 
05435     /*
05436      * We use the Taylor series for 0.5 * ln((1+z)/(1-z)),
05437      *
05438      * z + z^3/3 + z^5/5 + ...
05439      *
05440      * where z = (x-1)/(x+1) is in the range (approximately) -0.053 .. 0.048
05441      * due to the above range-reduction of x.
05442      *
05443      * The convergence of this is not as fast as one would like, but is
05444      * tolerable given that z is small.
05445      */
05446     sub_var(&x, &const_one, result);
05447     add_var(&x, &const_one, &elem);
05448     div_var_fast(result, &elem, result, local_rscale, true);
05449     set_var_from_var(result, &xx);
05450     mul_var(result, result, &x, local_rscale);
05451 
05452     set_var_from_var(&const_one, &ni);
05453 
05454     for (;;)
05455     {
05456         add_var(&ni, &const_two, &ni);
05457         mul_var(&xx, &x, &xx, local_rscale);
05458         div_var_fast(&xx, &ni, &elem, local_rscale, true);
05459 
05460         if (elem.ndigits == 0)
05461             break;
05462 
05463         add_var(result, &elem, result);
05464 
05465         if (elem.weight < (result->weight - local_rscale * 2 / DEC_DIGITS))
05466             break;
05467     }
05468 
05469     /* Compensate for argument range reduction, round to requested rscale */
05470     mul_var(result, &fact, result, rscale);
05471 
05472     free_var(&x);
05473     free_var(&xx);
05474     free_var(&ni);
05475     free_var(&elem);
05476     free_var(&fact);
05477 }
05478 
05479 
05480 /*
05481  * log_var() -
05482  *
05483  *  Compute the logarithm of num in a given base.
05484  *
05485  *  Note: this routine chooses dscale of the result.
05486  */
05487 static void
05488 log_var(NumericVar *base, NumericVar *num, NumericVar *result)
05489 {
05490     NumericVar  ln_base;
05491     NumericVar  ln_num;
05492     int         dec_digits;
05493     int         rscale;
05494     int         local_rscale;
05495 
05496     init_var(&ln_base);
05497     init_var(&ln_num);
05498 
05499     /* Set scale for ln() calculations --- compare numeric_ln() */
05500 
05501     /* Approx decimal digits before decimal point */
05502     dec_digits = (num->weight + 1) * DEC_DIGITS;
05503 
05504     if (dec_digits > 1)
05505         rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
05506     else if (dec_digits < 1)
05507         rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
05508     else
05509         rscale = NUMERIC_MIN_SIG_DIGITS;
05510 
05511     rscale = Max(rscale, base->dscale);
05512     rscale = Max(rscale, num->dscale);
05513     rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
05514     rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
05515 
05516     local_rscale = rscale + 8;
05517 
05518     /* Form natural logarithms */
05519     ln_var(base, &ln_base, local_rscale);
05520     ln_var(num, &ln_num, local_rscale);
05521 
05522     ln_base.dscale = rscale;
05523     ln_num.dscale = rscale;
05524 
05525     /* Select scale for division result */
05526     rscale = select_div_scale(&ln_num, &ln_base);
05527 
05528     div_var_fast(&ln_num, &ln_base, result, rscale, true);
05529 
05530     free_var(&ln_num);
05531     free_var(&ln_base);
05532 }
05533 
05534 
05535 /*
05536  * power_var() -
05537  *
05538  *  Raise base to the power of exp
05539  *
05540  *  Note: this routine chooses dscale of the result.
05541  */
05542 static void
05543 power_var(NumericVar *base, NumericVar *exp, NumericVar *result)
05544 {
05545     NumericVar  ln_base;
05546     NumericVar  ln_num;
05547     int         dec_digits;
05548     int         rscale;
05549     int         local_rscale;
05550     double      val;
05551 
05552     /* If exp can be represented as an integer, use power_var_int */
05553     if (exp->ndigits == 0 || exp->ndigits <= exp->weight + 1)
05554     {
05555         /* exact integer, but does it fit in int? */
05556         int64       expval64;
05557 
05558         if (numericvar_to_int8(exp, &expval64))
05559         {
05560             int         expval = (int) expval64;
05561 
05562             /* Test for overflow by reverse-conversion. */
05563             if ((int64) expval == expval64)
05564             {
05565                 /* Okay, select rscale */
05566                 rscale = NUMERIC_MIN_SIG_DIGITS;
05567                 rscale = Max(rscale, base->dscale);
05568                 rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
05569                 rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
05570 
05571                 power_var_int(base, expval, result, rscale);
05572                 return;
05573             }
05574         }
05575     }
05576 
05577     /*
05578      * This avoids log(0) for cases of 0 raised to a non-integer. 0 ^ 0
05579      * handled by power_var_int().
05580      */
05581     if (cmp_var(base, &const_zero) == 0)
05582     {
05583         set_var_from_var(&const_zero, result);
05584         result->dscale = NUMERIC_MIN_SIG_DIGITS;        /* no need to round */
05585         return;
05586     }
05587 
05588     init_var(&ln_base);
05589     init_var(&ln_num);
05590 
05591     /* Set scale for ln() calculation --- need extra accuracy here */
05592 
05593     /* Approx decimal digits before decimal point */
05594     dec_digits = (base->weight + 1) * DEC_DIGITS;
05595 
05596     if (dec_digits > 1)
05597         rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(dec_digits - 1);
05598     else if (dec_digits < 1)
05599         rscale = NUMERIC_MIN_SIG_DIGITS * 2 - (int) log10(1 - dec_digits);
05600     else
05601         rscale = NUMERIC_MIN_SIG_DIGITS * 2;
05602 
05603     rscale = Max(rscale, base->dscale * 2);
05604     rscale = Max(rscale, exp->dscale * 2);
05605     rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE * 2);
05606     rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE * 2);
05607 
05608     local_rscale = rscale + 8;
05609 
05610     ln_var(base, &ln_base, local_rscale);
05611 
05612     mul_var(&ln_base, exp, &ln_num, local_rscale);
05613 
05614     /* Set scale for exp() -- compare numeric_exp() */
05615 
05616     /* convert input to float8, ignoring overflow */
05617     val = numericvar_to_double_no_overflow(&ln_num);
05618 
05619     /*
05620      * log10(result) = num * log10(e), so this is approximately the weight:
05621      */
05622     val *= 0.434294481903252;
05623 
05624     /* limit to something that won't cause integer overflow */
05625     val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
05626     val = Min(val, NUMERIC_MAX_RESULT_SCALE);
05627 
05628     rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
05629     rscale = Max(rscale, base->dscale);
05630     rscale = Max(rscale, exp->dscale);
05631     rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
05632     rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
05633 
05634     exp_var(&ln_num, result, rscale);
05635 
05636     free_var(&ln_num);
05637     free_var(&ln_base);
05638 }
05639 
05640 /*
05641  * power_var_int() -
05642  *
05643  *  Raise base to the power of exp, where exp is an integer.
05644  */
05645 static void
05646 power_var_int(NumericVar *base, int exp, NumericVar *result, int rscale)
05647 {
05648     bool        neg;
05649     NumericVar  base_prod;
05650     int         local_rscale;
05651 
05652     switch (exp)
05653     {
05654         case 0:
05655 
05656             /*
05657              * While 0 ^ 0 can be either 1 or indeterminate (error), we treat
05658              * it as 1 because most programming languages do this. SQL:2003
05659              * also requires a return value of 1.
05660              * http://en.wikipedia.org/wiki/Exponentiation#Zero_to_the_zero_pow
05661              * er
05662              */
05663             set_var_from_var(&const_one, result);
05664             result->dscale = rscale;    /* no need to round */
05665             return;
05666         case 1:
05667             set_var_from_var(base, result);
05668             round_var(result, rscale);
05669             return;
05670         case -1:
05671             div_var(&const_one, base, result, rscale, true);
05672             return;
05673         case 2:
05674             mul_var(base, base, result, rscale);
05675             return;
05676         default:
05677             break;
05678     }
05679 
05680     /*
05681      * The general case repeatedly multiplies base according to the bit
05682      * pattern of exp.  We do the multiplications with some extra precision.
05683      */
05684     neg = (exp < 0);
05685     exp = Abs(exp);
05686 
05687     local_rscale = rscale + MUL_GUARD_DIGITS * 2;
05688 
05689     init_var(&base_prod);
05690     set_var_from_var(base, &base_prod);
05691 
05692     if (exp & 1)
05693         set_var_from_var(base, result);
05694     else
05695         set_var_from_var(&const_one, result);
05696 
05697     while ((exp >>= 1) > 0)
05698     {
05699         mul_var(&base_prod, &base_prod, &base_prod, local_rscale);
05700         if (exp & 1)
05701             mul_var(&base_prod, result, result, local_rscale);
05702     }
05703 
05704     free_var(&base_prod);
05705 
05706     /* Compensate for input sign, and round to requested rscale */
05707     if (neg)
05708         div_var_fast(&const_one, result, result, rscale, true);
05709     else
05710         round_var(result, rscale);
05711 }
05712 
05713 
05714 /* ----------------------------------------------------------------------
05715  *
05716  * Following are the lowest level functions that operate unsigned
05717  * on the variable level
05718  *
05719  * ----------------------------------------------------------------------
05720  */
05721 
05722 
05723 /* ----------
05724  * cmp_abs() -
05725  *
05726  *  Compare the absolute values of var1 and var2
05727  *  Returns:    -1 for ABS(var1) < ABS(var2)
05728  *              0  for ABS(var1) == ABS(var2)
05729  *              1  for ABS(var1) > ABS(var2)
05730  * ----------
05731  */
05732 static int
05733 cmp_abs(NumericVar *var1, NumericVar *var2)
05734 {
05735     return cmp_abs_common(var1->digits, var1->ndigits, var1->weight,
05736                           var2->digits, var2->ndigits, var2->weight);
05737 }
05738 
05739 /* ----------
05740  * cmp_abs_common() -
05741  *
05742  *  Main routine of cmp_abs(). This function can be used by both
05743  *  NumericVar and Numeric.
05744  * ----------
05745  */
05746 static int
05747 cmp_abs_common(const NumericDigit *var1digits, int var1ndigits, int var1weight,
05748              const NumericDigit *var2digits, int var2ndigits, int var2weight)
05749 {
05750     int         i1 = 0;
05751     int         i2 = 0;
05752 
05753     /* Check any digits before the first common digit */
05754 
05755     while (var1weight > var2weight && i1 < var1ndigits)
05756     {
05757         if (var1digits[i1++] != 0)
05758             return 1;
05759         var1weight--;
05760     }
05761     while (var2weight > var1weight && i2 < var2ndigits)
05762     {
05763         if (var2digits[i2++] != 0)
05764             return -1;
05765         var2weight--;
05766     }
05767 
05768     /* At this point, either w1 == w2 or we've run out of digits */
05769 
05770     if (var1weight == var2weight)
05771     {
05772         while (i1 < var1ndigits && i2 < var2ndigits)
05773         {
05774             int         stat = var1digits[i1++] - var2digits[i2++];
05775 
05776             if (stat)
05777             {
05778                 if (stat > 0)
05779                     return 1;
05780                 return -1;
05781             }
05782         }
05783     }
05784 
05785     /*
05786      * At this point, we've run out of digits on one side or the other; so any
05787      * remaining nonzero digits imply that side is larger
05788      */
05789     while (i1 < var1ndigits)
05790     {
05791         if (var1digits[i1++] != 0)
05792             return 1;
05793     }
05794     while (i2 < var2ndigits)
05795     {
05796         if (var2digits[i2++] != 0)
05797             return -1;
05798     }
05799 
05800     return 0;
05801 }
05802 
05803 
05804 /*
05805  * add_abs() -
05806  *
05807  *  Add the absolute values of two variables into result.
05808  *  result might point to one of the operands without danger.
05809  */
05810 static void
05811 add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
05812 {
05813     NumericDigit *res_buf;
05814     NumericDigit *res_digits;
05815     int         res_ndigits;
05816     int         res_weight;
05817     int         res_rscale,
05818                 rscale1,
05819                 rscale2;
05820     int         res_dscale;
05821     int         i,
05822                 i1,
05823                 i2;
05824     int         carry = 0;
05825 
05826     /* copy these values into local vars for speed in inner loop */
05827     int         var1ndigits = var1->ndigits;
05828     int         var2ndigits = var2->ndigits;
05829     NumericDigit *var1digits = var1->digits;
05830     NumericDigit *var2digits = var2->digits;
05831 
05832     res_weight = Max(var1->weight, var2->weight) + 1;
05833 
05834     res_dscale = Max(var1->dscale, var2->dscale);
05835 
05836     /* Note: here we are figuring rscale in base-NBASE digits */
05837     rscale1 = var1->ndigits - var1->weight - 1;
05838     rscale2 = var2->ndigits - var2->weight - 1;
05839     res_rscale = Max(rscale1, rscale2);
05840 
05841     res_ndigits = res_rscale + res_weight + 1;
05842     if (res_ndigits <= 0)
05843         res_ndigits = 1;
05844 
05845     res_buf = digitbuf_alloc(res_ndigits + 1);
05846     res_buf[0] = 0;             /* spare digit for later rounding */
05847     res_digits = res_buf + 1;
05848 
05849     i1 = res_rscale + var1->weight + 1;
05850     i2 = res_rscale + var2->weight + 1;
05851     for (i = res_ndigits - 1; i >= 0; i--)
05852     {
05853         i1--;
05854         i2--;
05855         if (i1 >= 0 && i1 < var1ndigits)
05856             carry += var1digits[i1];
05857         if (i2 >= 0 && i2 < var2ndigits)
05858             carry += var2digits[i2];
05859 
05860         if (carry >= NBASE)
05861         {
05862             res_digits[i] = carry - NBASE;
05863             carry = 1;
05864         }
05865         else
05866         {
05867             res_digits[i] = carry;
05868             carry = 0;
05869         }
05870     }
05871 
05872     Assert(carry == 0);         /* else we failed to allow for carry out */
05873 
05874     digitbuf_free(result->buf);
05875     result->ndigits = res_ndigits;
05876     result->buf = res_buf;
05877     result->digits = res_digits;
05878     result->weight = res_weight;
05879     result->dscale = res_dscale;
05880 
05881     /* Remove leading/trailing zeroes */
05882     strip_var(result);
05883 }
05884 
05885 
05886 /*
05887  * sub_abs()
05888  *
05889  *  Subtract the absolute value of var2 from the absolute value of var1
05890  *  and store in result. result might point to one of the operands
05891  *  without danger.
05892  *
05893  *  ABS(var1) MUST BE GREATER OR EQUAL ABS(var2) !!!
05894  */
05895 static void
05896 sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
05897 {
05898     NumericDigit *res_buf;
05899     NumericDigit *res_digits;
05900     int         res_ndigits;
05901     int         res_weight;
05902     int         res_rscale,
05903                 rscale1,
05904                 rscale2;
05905     int         res_dscale;
05906     int         i,
05907                 i1,
05908                 i2;
05909     int         borrow = 0;
05910 
05911     /* copy these values into local vars for speed in inner loop */
05912     int         var1ndigits = var1->ndigits;
05913     int         var2ndigits = var2->ndigits;
05914     NumericDigit *var1digits = var1->digits;
05915     NumericDigit *var2digits = var2->digits;
05916 
05917     res_weight = var1->weight;
05918 
05919     res_dscale = Max(var1->dscale, var2->dscale);
05920 
05921     /* Note: here we are figuring rscale in base-NBASE digits */
05922     rscale1 = var1->ndigits - var1->weight - 1;
05923     rscale2 = var2->ndigits - var2->weight - 1;
05924     res_rscale = Max(rscale1, rscale2);
05925 
05926     res_ndigits = res_rscale + res_weight + 1;
05927     if (res_ndigits <= 0)
05928         res_ndigits = 1;
05929 
05930     res_buf = digitbuf_alloc(res_ndigits + 1);
05931     res_buf[0] = 0;             /* spare digit for later rounding */
05932     res_digits = res_buf + 1;
05933 
05934     i1 = res_rscale + var1->weight + 1;
05935     i2 = res_rscale + var2->weight + 1;
05936     for (i = res_ndigits - 1; i >= 0; i--)
05937     {
05938         i1--;
05939         i2--;
05940         if (i1 >= 0 && i1 < var1ndigits)
05941             borrow += var1digits[i1];
05942         if (i2 >= 0 && i2 < var2ndigits)
05943             borrow -= var2digits[i2];
05944 
05945         if (borrow < 0)
05946         {
05947             res_digits[i] = borrow + NBASE;
05948             borrow = -1;
05949         }
05950         else
05951         {
05952             res_digits[i] = borrow;
05953             borrow = 0;
05954         }
05955     }
05956 
05957     Assert(borrow == 0);        /* else caller gave us var1 < var2 */
05958 
05959     digitbuf_free(result->buf);
05960     result->ndigits = res_ndigits;
05961     result->buf = res_buf;
05962     result->digits = res_digits;
05963     result->weight = res_weight;
05964     result->dscale = res_dscale;
05965 
05966     /* Remove leading/trailing zeroes */
05967     strip_var(result);
05968 }
05969 
05970 /*
05971  * round_var
05972  *
05973  * Round the value of a variable to no more than rscale decimal digits
05974  * after the decimal point.  NOTE: we allow rscale < 0 here, implying
05975  * rounding before the decimal point.
05976  */
05977 static void
05978 round_var(NumericVar *var, int rscale)
05979 {
05980     NumericDigit *digits = var->digits;
05981     int         di;
05982     int         ndigits;
05983     int         carry;
05984 
05985     var->dscale = rscale;
05986 
05987     /* decimal digits wanted */
05988     di = (var->weight + 1) * DEC_DIGITS + rscale;
05989 
05990     /*
05991      * If di = 0, the value loses all digits, but could round up to 1 if its
05992      * first extra digit is >= 5.  If di < 0 the result must be 0.
05993      */
05994     if (di < 0)
05995     {
05996         var->ndigits = 0;
05997         var->weight = 0;
05998         var->sign = NUMERIC_POS;
05999     }
06000     else
06001     {
06002         /* NBASE digits wanted */
06003         ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
06004 
06005         /* 0, or number of decimal digits to keep in last NBASE digit */
06006         di %= DEC_DIGITS;
06007 
06008         if (ndigits < var->ndigits ||
06009             (ndigits == var->ndigits && di > 0))
06010         {
06011             var->ndigits = ndigits;
06012 
06013 #if DEC_DIGITS == 1
06014             /* di must be zero */
06015             carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
06016 #else
06017             if (di == 0)
06018                 carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
06019             else
06020             {
06021                 /* Must round within last NBASE digit */
06022                 int         extra,
06023                             pow10;
06024 
06025 #if DEC_DIGITS == 4
06026                 pow10 = round_powers[di];
06027 #elif DEC_DIGITS == 2
06028                 pow10 = 10;
06029 #else
06030 #error unsupported NBASE
06031 #endif
06032                 extra = digits[--ndigits] % pow10;
06033                 digits[ndigits] -= extra;
06034                 carry = 0;
06035                 if (extra >= pow10 / 2)
06036                 {
06037                     pow10 += digits[ndigits];
06038                     if (pow10 >= NBASE)
06039                     {
06040                         pow10 -= NBASE;
06041                         carry = 1;
06042                     }
06043                     digits[ndigits] = pow10;
06044                 }
06045             }
06046 #endif
06047 
06048             /* Propagate carry if needed */
06049             while (carry)
06050             {
06051                 carry += digits[--ndigits];
06052                 if (carry >= NBASE)
06053                 {
06054                     digits[ndigits] = carry - NBASE;
06055                     carry = 1;
06056                 }
06057                 else
06058                 {
06059                     digits[ndigits] = carry;
06060                     carry = 0;
06061                 }
06062             }
06063 
06064             if (ndigits < 0)
06065             {
06066                 Assert(ndigits == -1);  /* better not have added > 1 digit */
06067                 Assert(var->digits > var->buf);
06068                 var->digits--;
06069                 var->ndigits++;
06070                 var->weight++;
06071             }
06072         }
06073     }
06074 }
06075 
06076 /*
06077  * trunc_var
06078  *
06079  * Truncate (towards zero) the value of a variable at rscale decimal digits
06080  * after the decimal point.  NOTE: we allow rscale < 0 here, implying
06081  * truncation before the decimal point.
06082  */
06083 static void
06084 trunc_var(NumericVar *var, int rscale)
06085 {
06086     int         di;
06087     int         ndigits;
06088 
06089     var->dscale = rscale;
06090 
06091     /* decimal digits wanted */
06092     di = (var->weight + 1) * DEC_DIGITS + rscale;
06093 
06094     /*
06095      * If di <= 0, the value loses all digits.
06096      */
06097     if (di <= 0)
06098     {
06099         var->ndigits = 0;
06100         var->weight = 0;
06101         var->sign = NUMERIC_POS;
06102     }
06103     else
06104     {
06105         /* NBASE digits wanted */
06106         ndigits = (di + DEC_DIGITS - 1) / DEC_DIGITS;
06107 
06108         if (ndigits <= var->ndigits)
06109         {
06110             var->ndigits = ndigits;
06111 
06112 #if DEC_DIGITS == 1
06113             /* no within-digit stuff to worry about */
06114 #else
06115             /* 0, or number of decimal digits to keep in last NBASE digit */
06116             di %= DEC_DIGITS;
06117 
06118             if (di > 0)
06119             {
06120                 /* Must truncate within last NBASE digit */
06121                 NumericDigit *digits = var->digits;
06122                 int         extra,
06123                             pow10;
06124 
06125 #if DEC_DIGITS == 4
06126                 pow10 = round_powers[di];
06127 #elif DEC_DIGITS == 2
06128                 pow10 = 10;
06129 #else
06130 #error unsupported NBASE
06131 #endif
06132                 extra = digits[--ndigits] % pow10;
06133                 digits[ndigits] -= extra;
06134             }
06135 #endif
06136         }
06137     }
06138 }
06139 
06140 /*
06141  * strip_var
06142  *
06143  * Strip any leading and trailing zeroes from a numeric variable
06144  */
06145 static void
06146 strip_var(NumericVar *var)
06147 {
06148     NumericDigit *digits = var->digits;
06149     int         ndigits = var->ndigits;
06150 
06151     /* Strip leading zeroes */
06152     while (ndigits > 0 && *digits == 0)
06153     {
06154         digits++;
06155         var->weight--;
06156         ndigits--;
06157     }
06158 
06159     /* Strip trailing zeroes */
06160     while (ndigits > 0 && digits[ndigits - 1] == 0)
06161         ndigits--;
06162 
06163     /* If it's zero, normalize the sign and weight */
06164     if (ndigits == 0)
06165     {
06166         var->sign = NUMERIC_POS;
06167         var->weight = 0;
06168     }
06169 
06170     var->digits = digits;
06171     var->ndigits = ndigits;
06172 }