Header And Logo

PostgreSQL
| The world's most advanced open source database.

tablefunc.c

Go to the documentation of this file.
00001 /*
00002  * contrib/tablefunc/tablefunc.c
00003  *
00004  *
00005  * tablefunc
00006  *
00007  * Sample to demonstrate C functions which return setof scalar
00008  * and setof composite.
00009  * Joe Conway <[email protected]>
00010  * And contributors:
00011  * Nabil Sayegh <[email protected]>
00012  *
00013  * Copyright (c) 2002-2013, PostgreSQL Global Development Group
00014  *
00015  * Permission to use, copy, modify, and distribute this software and its
00016  * documentation for any purpose, without fee, and without a written agreement
00017  * is hereby granted, provided that the above copyright notice and this
00018  * paragraph and the following two paragraphs appear in all copies.
00019  *
00020  * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
00021  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
00022  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
00023  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
00024  * POSSIBILITY OF SUCH DAMAGE.
00025  *
00026  * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
00027  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
00028  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
00029  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
00030  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
00031  *
00032  */
00033 #include "postgres.h"
00034 
00035 #include <math.h>
00036 
00037 #include "access/htup_details.h"
00038 #include "catalog/pg_type.h"
00039 #include "executor/spi.h"
00040 #include "funcapi.h"
00041 #include "lib/stringinfo.h"
00042 #include "miscadmin.h"
00043 #include "utils/builtins.h"
00044 
00045 #include "tablefunc.h"
00046 
00047 PG_MODULE_MAGIC;
00048 
00049 static HTAB *load_categories_hash(char *cats_sql, MemoryContext per_query_ctx);
00050 static Tuplestorestate *get_crosstab_tuplestore(char *sql,
00051                         HTAB *crosstab_hash,
00052                         TupleDesc tupdesc,
00053                         MemoryContext per_query_ctx,
00054                         bool randomAccess);
00055 static void validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial);
00056 static bool compatCrosstabTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
00057 static bool compatConnectbyTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
00058 static void get_normal_pair(float8 *x1, float8 *x2);
00059 static Tuplestorestate *connectby(char *relname,
00060           char *key_fld,
00061           char *parent_key_fld,
00062           char *orderby_fld,
00063           char *branch_delim,
00064           char *start_with,
00065           int max_depth,
00066           bool show_branch,
00067           bool show_serial,
00068           MemoryContext per_query_ctx,
00069           bool randomAccess,
00070           AttInMetadata *attinmeta);
00071 static Tuplestorestate *build_tuplestore_recursively(char *key_fld,
00072                              char *parent_key_fld,
00073                              char *relname,
00074                              char *orderby_fld,
00075                              char *branch_delim,
00076                              char *start_with,
00077                              char *branch,
00078                              int level,
00079                              int *serial,
00080                              int max_depth,
00081                              bool show_branch,
00082                              bool show_serial,
00083                              MemoryContext per_query_ctx,
00084                              AttInMetadata *attinmeta,
00085                              Tuplestorestate *tupstore);
00086 
00087 typedef struct
00088 {
00089     float8      mean;           /* mean of the distribution */
00090     float8      stddev;         /* stddev of the distribution */
00091     float8      carry_val;      /* hold second generated value */
00092     bool        use_carry;      /* use second generated value */
00093 } normal_rand_fctx;
00094 
00095 #define xpfree(var_) \
00096     do { \
00097         if (var_ != NULL) \
00098         { \
00099             pfree(var_); \
00100             var_ = NULL; \
00101         } \
00102     } while (0)
00103 
00104 #define xpstrdup(tgtvar_, srcvar_) \
00105     do { \
00106         if (srcvar_) \
00107             tgtvar_ = pstrdup(srcvar_); \
00108         else \
00109             tgtvar_ = NULL; \
00110     } while (0)
00111 
00112 #define xstreq(tgtvar_, srcvar_) \
00113     (((tgtvar_ == NULL) && (srcvar_ == NULL)) || \
00114      ((tgtvar_ != NULL) && (srcvar_ != NULL) && (strcmp(tgtvar_, srcvar_) == 0)))
00115 
00116 /* sign, 10 digits, '\0' */
00117 #define INT32_STRLEN    12
00118 
00119 /* stored info for a crosstab category */
00120 typedef struct crosstab_cat_desc
00121 {
00122     char       *catname;        /* full category name */
00123     int         attidx;         /* zero based */
00124 } crosstab_cat_desc;
00125 
00126 #define MAX_CATNAME_LEN         NAMEDATALEN
00127 #define INIT_CATS               64
00128 
00129 #define crosstab_HashTableLookup(HASHTAB, CATNAME, CATDESC) \
00130 do { \
00131     crosstab_HashEnt *hentry; char key[MAX_CATNAME_LEN]; \
00132     \
00133     MemSet(key, 0, MAX_CATNAME_LEN); \
00134     snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATNAME); \
00135     hentry = (crosstab_HashEnt*) hash_search(HASHTAB, \
00136                                          key, HASH_FIND, NULL); \
00137     if (hentry) \
00138         CATDESC = hentry->catdesc; \
00139     else \
00140         CATDESC = NULL; \
00141 } while(0)
00142 
00143 #define crosstab_HashTableInsert(HASHTAB, CATDESC) \
00144 do { \
00145     crosstab_HashEnt *hentry; bool found; char key[MAX_CATNAME_LEN]; \
00146     \
00147     MemSet(key, 0, MAX_CATNAME_LEN); \
00148     snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATDESC->catname); \
00149     hentry = (crosstab_HashEnt*) hash_search(HASHTAB, \
00150                                          key, HASH_ENTER, &found); \
00151     if (found) \
00152         ereport(ERROR, \
00153                 (errcode(ERRCODE_DUPLICATE_OBJECT), \
00154                  errmsg("duplicate category name"))); \
00155     hentry->catdesc = CATDESC; \
00156 } while(0)
00157 
00158 /* hash table */
00159 typedef struct crosstab_hashent
00160 {
00161     char        internal_catname[MAX_CATNAME_LEN];
00162     crosstab_cat_desc *catdesc;
00163 } crosstab_HashEnt;
00164 
00165 /*
00166  * normal_rand - return requested number of random values
00167  * with a Gaussian (Normal) distribution.
00168  *
00169  * inputs are int numvals, float8 mean, and float8 stddev
00170  * returns setof float8
00171  */
00172 PG_FUNCTION_INFO_V1(normal_rand);
00173 Datum
00174 normal_rand(PG_FUNCTION_ARGS)
00175 {
00176     FuncCallContext *funcctx;
00177     int         call_cntr;
00178     int         max_calls;
00179     normal_rand_fctx *fctx;
00180     float8      mean;
00181     float8      stddev;
00182     float8      carry_val;
00183     bool        use_carry;
00184     MemoryContext oldcontext;
00185 
00186     /* stuff done only on the first call of the function */
00187     if (SRF_IS_FIRSTCALL())
00188     {
00189         /* create a function context for cross-call persistence */
00190         funcctx = SRF_FIRSTCALL_INIT();
00191 
00192         /*
00193          * switch to memory context appropriate for multiple function calls
00194          */
00195         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
00196 
00197         /* total number of tuples to be returned */
00198         funcctx->max_calls = PG_GETARG_UINT32(0);
00199 
00200         /* allocate memory for user context */
00201         fctx = (normal_rand_fctx *) palloc(sizeof(normal_rand_fctx));
00202 
00203         /*
00204          * Use fctx to keep track of upper and lower bounds from call to call.
00205          * It will also be used to carry over the spare value we get from the
00206          * Box-Muller algorithm so that we only actually calculate a new value
00207          * every other call.
00208          */
00209         fctx->mean = PG_GETARG_FLOAT8(1);
00210         fctx->stddev = PG_GETARG_FLOAT8(2);
00211         fctx->carry_val = 0;
00212         fctx->use_carry = false;
00213 
00214         funcctx->user_fctx = fctx;
00215 
00216         MemoryContextSwitchTo(oldcontext);
00217     }
00218 
00219     /* stuff done on every call of the function */
00220     funcctx = SRF_PERCALL_SETUP();
00221 
00222     call_cntr = funcctx->call_cntr;
00223     max_calls = funcctx->max_calls;
00224     fctx = funcctx->user_fctx;
00225     mean = fctx->mean;
00226     stddev = fctx->stddev;
00227     carry_val = fctx->carry_val;
00228     use_carry = fctx->use_carry;
00229 
00230     if (call_cntr < max_calls)  /* do when there is more left to send */
00231     {
00232         float8      result;
00233 
00234         if (use_carry)
00235         {
00236             /*
00237              * reset use_carry and use second value obtained on last pass
00238              */
00239             fctx->use_carry = false;
00240             result = carry_val;
00241         }
00242         else
00243         {
00244             float8      normval_1;
00245             float8      normval_2;
00246 
00247             /* Get the next two normal values */
00248             get_normal_pair(&normval_1, &normval_2);
00249 
00250             /* use the first */
00251             result = mean + (stddev * normval_1);
00252 
00253             /* and save the second */
00254             fctx->carry_val = mean + (stddev * normval_2);
00255             fctx->use_carry = true;
00256         }
00257 
00258         /* send the result */
00259         SRF_RETURN_NEXT(funcctx, Float8GetDatum(result));
00260     }
00261     else
00262         /* do when there is no more left */
00263         SRF_RETURN_DONE(funcctx);
00264 }
00265 
00266 /*
00267  * get_normal_pair()
00268  * Assigns normally distributed (Gaussian) values to a pair of provided
00269  * parameters, with mean 0, standard deviation 1.
00270  *
00271  * This routine implements Algorithm P (Polar method for normal deviates)
00272  * from Knuth's _The_Art_of_Computer_Programming_, Volume 2, 3rd ed., pages
00273  * 122-126. Knuth cites his source as "The polar method", G. E. P. Box, M. E.
00274  * Muller, and G. Marsaglia, _Annals_Math,_Stat._ 29 (1958), 610-611.
00275  *
00276  */
00277 static void
00278 get_normal_pair(float8 *x1, float8 *x2)
00279 {
00280     float8      u1,
00281                 u2,
00282                 v1,
00283                 v2,
00284                 s;
00285 
00286     do
00287     {
00288         u1 = (float8) random() / (float8) MAX_RANDOM_VALUE;
00289         u2 = (float8) random() / (float8) MAX_RANDOM_VALUE;
00290 
00291         v1 = (2.0 * u1) - 1.0;
00292         v2 = (2.0 * u2) - 1.0;
00293 
00294         s = v1 * v1 + v2 * v2;
00295     } while (s >= 1.0);
00296 
00297     if (s == 0)
00298     {
00299         *x1 = 0;
00300         *x2 = 0;
00301     }
00302     else
00303     {
00304         s = sqrt((-2.0 * log(s)) / s);
00305         *x1 = v1 * s;
00306         *x2 = v2 * s;
00307     }
00308 }
00309 
00310 /*
00311  * crosstab - create a crosstab of rowids and values columns from a
00312  * SQL statement returning one rowid column, one category column,
00313  * and one value column.
00314  *
00315  * e.g. given sql which produces:
00316  *
00317  *          rowid   cat     value
00318  *          ------+-------+-------
00319  *          row1    cat1    val1
00320  *          row1    cat2    val2
00321  *          row1    cat3    val3
00322  *          row1    cat4    val4
00323  *          row2    cat1    val5
00324  *          row2    cat2    val6
00325  *          row2    cat3    val7
00326  *          row2    cat4    val8
00327  *
00328  * crosstab returns:
00329  *                  <===== values columns =====>
00330  *          rowid   cat1    cat2    cat3    cat4
00331  *          ------+-------+-------+-------+-------
00332  *          row1    val1    val2    val3    val4
00333  *          row2    val5    val6    val7    val8
00334  *
00335  * NOTES:
00336  * 1. SQL result must be ordered by 1,2.
00337  * 2. The number of values columns depends on the tuple description
00338  *    of the function's declared return type.  The return type's columns
00339  *    must match the datatypes of the SQL query's result.  The datatype
00340  *    of the category column can be anything, however.
00341  * 3. Missing values (i.e. not enough adjacent rows of same rowid to
00342  *    fill the number of result values columns) are filled in with nulls.
00343  * 4. Extra values (i.e. too many adjacent rows of same rowid to fill
00344  *    the number of result values columns) are skipped.
00345  * 5. Rows with all nulls in the values columns are skipped.
00346  */
00347 PG_FUNCTION_INFO_V1(crosstab);
00348 Datum
00349 crosstab(PG_FUNCTION_ARGS)
00350 {
00351     char       *sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
00352     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
00353     Tuplestorestate *tupstore;
00354     TupleDesc   tupdesc;
00355     int         call_cntr;
00356     int         max_calls;
00357     AttInMetadata *attinmeta;
00358     SPITupleTable *spi_tuptable;
00359     TupleDesc   spi_tupdesc;
00360     bool        firstpass;
00361     char       *lastrowid;
00362     int         i;
00363     int         num_categories;
00364     MemoryContext per_query_ctx;
00365     MemoryContext oldcontext;
00366     int         ret;
00367     int         proc;
00368 
00369     /* check to see if caller supports us returning a tuplestore */
00370     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
00371         ereport(ERROR,
00372                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00373                  errmsg("set-valued function called in context that cannot accept a set")));
00374     if (!(rsinfo->allowedModes & SFRM_Materialize))
00375         ereport(ERROR,
00376                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00377                  errmsg("materialize mode required, but it is not " \
00378                         "allowed in this context")));
00379 
00380     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
00381 
00382     /* Connect to SPI manager */
00383     if ((ret = SPI_connect()) < 0)
00384         /* internal error */
00385         elog(ERROR, "crosstab: SPI_connect returned %d", ret);
00386 
00387     /* Retrieve the desired rows */
00388     ret = SPI_execute(sql, true, 0);
00389     proc = SPI_processed;
00390 
00391     /* If no qualifying tuples, fall out early */
00392     if (ret != SPI_OK_SELECT || proc <= 0)
00393     {
00394         SPI_finish();
00395         rsinfo->isDone = ExprEndResult;
00396         PG_RETURN_NULL();
00397     }
00398 
00399     spi_tuptable = SPI_tuptable;
00400     spi_tupdesc = spi_tuptable->tupdesc;
00401 
00402     /*----------
00403      * The provided SQL query must always return three columns.
00404      *
00405      * 1. rowname
00406      *  the label or identifier for each row in the final result
00407      * 2. category
00408      *  the label or identifier for each column in the final result
00409      * 3. values
00410      *  the value for each column in the final result
00411      *----------
00412      */
00413     if (spi_tupdesc->natts != 3)
00414         ereport(ERROR,
00415                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00416                  errmsg("invalid source data SQL statement"),
00417                  errdetail("The provided SQL must return 3 "
00418                            "columns: rowid, category, and values.")));
00419 
00420     /* get a tuple descriptor for our result type */
00421     switch (get_call_result_type(fcinfo, NULL, &tupdesc))
00422     {
00423         case TYPEFUNC_COMPOSITE:
00424             /* success */
00425             break;
00426         case TYPEFUNC_RECORD:
00427             /* failed to determine actual type of RECORD */
00428             ereport(ERROR,
00429                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00430                      errmsg("function returning record called in context "
00431                             "that cannot accept type record")));
00432             break;
00433         default:
00434             /* result type isn't composite */
00435             elog(ERROR, "return type must be a row type");
00436             break;
00437     }
00438 
00439     /*
00440      * Check that return tupdesc is compatible with the data we got from SPI,
00441      * at least based on number and type of attributes
00442      */
00443     if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
00444         ereport(ERROR,
00445                 (errcode(ERRCODE_SYNTAX_ERROR),
00446                  errmsg("return and sql tuple descriptions are " \
00447                         "incompatible")));
00448 
00449     /*
00450      * switch to long-lived memory context
00451      */
00452     oldcontext = MemoryContextSwitchTo(per_query_ctx);
00453 
00454     /* make sure we have a persistent copy of the result tupdesc */
00455     tupdesc = CreateTupleDescCopy(tupdesc);
00456 
00457     /* initialize our tuplestore in long-lived context */
00458     tupstore =
00459         tuplestore_begin_heap(rsinfo->allowedModes & SFRM_Materialize_Random,
00460                               false, work_mem);
00461 
00462     MemoryContextSwitchTo(oldcontext);
00463 
00464     /*
00465      * Generate attribute metadata needed later to produce tuples from raw C
00466      * strings
00467      */
00468     attinmeta = TupleDescGetAttInMetadata(tupdesc);
00469 
00470     /* total number of tuples to be examined */
00471     max_calls = proc;
00472 
00473     /* the return tuple always must have 1 rowid + num_categories columns */
00474     num_categories = tupdesc->natts - 1;
00475 
00476     firstpass = true;
00477     lastrowid = NULL;
00478 
00479     for (call_cntr = 0; call_cntr < max_calls; call_cntr++)
00480     {
00481         bool        skip_tuple = false;
00482         char      **values;
00483 
00484         /* allocate and zero space */
00485         values = (char **) palloc0((1 + num_categories) * sizeof(char *));
00486 
00487         /*
00488          * now loop through the sql results and assign each value in sequence
00489          * to the next category
00490          */
00491         for (i = 0; i < num_categories; i++)
00492         {
00493             HeapTuple   spi_tuple;
00494             char       *rowid;
00495 
00496             /* see if we've gone too far already */
00497             if (call_cntr >= max_calls)
00498                 break;
00499 
00500             /* get the next sql result tuple */
00501             spi_tuple = spi_tuptable->vals[call_cntr];
00502 
00503             /* get the rowid from the current sql result tuple */
00504             rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
00505 
00506             /*
00507              * If this is the first pass through the values for this rowid,
00508              * set the first column to rowid
00509              */
00510             if (i == 0)
00511             {
00512                 xpstrdup(values[0], rowid);
00513 
00514                 /*
00515                  * Check to see if the rowid is the same as that of the last
00516                  * tuple sent -- if so, skip this tuple entirely
00517                  */
00518                 if (!firstpass && xstreq(lastrowid, rowid))
00519                 {
00520                     xpfree(rowid);
00521                     skip_tuple = true;
00522                     break;
00523                 }
00524             }
00525 
00526             /*
00527              * If rowid hasn't changed on us, continue building the output
00528              * tuple.
00529              */
00530             if (xstreq(rowid, values[0]))
00531             {
00532                 /*
00533                  * Get the next category item value, which is always attribute
00534                  * number three.
00535                  *
00536                  * Be careful to assign the value to the array index based on
00537                  * which category we are presently processing.
00538                  */
00539                 values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
00540 
00541                 /*
00542                  * increment the counter since we consume a row for each
00543                  * category, but not for last pass because the outer loop will
00544                  * do that for us
00545                  */
00546                 if (i < (num_categories - 1))
00547                     call_cntr++;
00548                 xpfree(rowid);
00549             }
00550             else
00551             {
00552                 /*
00553                  * We'll fill in NULLs for the missing values, but we need to
00554                  * decrement the counter since this sql result row doesn't
00555                  * belong to the current output tuple.
00556                  */
00557                 call_cntr--;
00558                 xpfree(rowid);
00559                 break;
00560             }
00561         }
00562 
00563         if (!skip_tuple)
00564         {
00565             HeapTuple   tuple;
00566 
00567             /* build the tuple and store it */
00568             tuple = BuildTupleFromCStrings(attinmeta, values);
00569             tuplestore_puttuple(tupstore, tuple);
00570             heap_freetuple(tuple);
00571         }
00572 
00573         /* Remember current rowid */
00574         xpfree(lastrowid);
00575         xpstrdup(lastrowid, values[0]);
00576         firstpass = false;
00577 
00578         /* Clean up */
00579         for (i = 0; i < num_categories + 1; i++)
00580             if (values[i] != NULL)
00581                 pfree(values[i]);
00582         pfree(values);
00583     }
00584 
00585     /* let the caller know we're sending back a tuplestore */
00586     rsinfo->returnMode = SFRM_Materialize;
00587     rsinfo->setResult = tupstore;
00588     rsinfo->setDesc = tupdesc;
00589 
00590     /* release SPI related resources (and return to caller's context) */
00591     SPI_finish();
00592 
00593     return (Datum) 0;
00594 }
00595 
00596 /*
00597  * crosstab_hash - reimplement crosstab as materialized function and
00598  * properly deal with missing values (i.e. don't pack remaining
00599  * values to the left)
00600  *
00601  * crosstab - create a crosstab of rowids and values columns from a
00602  * SQL statement returning one rowid column, one category column,
00603  * and one value column.
00604  *
00605  * e.g. given sql which produces:
00606  *
00607  *          rowid   cat     value
00608  *          ------+-------+-------
00609  *          row1    cat1    val1
00610  *          row1    cat2    val2
00611  *          row1    cat4    val4
00612  *          row2    cat1    val5
00613  *          row2    cat2    val6
00614  *          row2    cat3    val7
00615  *          row2    cat4    val8
00616  *
00617  * crosstab returns:
00618  *                  <===== values columns =====>
00619  *          rowid   cat1    cat2    cat3    cat4
00620  *          ------+-------+-------+-------+-------
00621  *          row1    val1    val2    null    val4
00622  *          row2    val5    val6    val7    val8
00623  *
00624  * NOTES:
00625  * 1. SQL result must be ordered by 1.
00626  * 2. The number of values columns depends on the tuple description
00627  *    of the function's declared return type.
00628  * 3. Missing values (i.e. missing category) are filled in with nulls.
00629  * 4. Extra values (i.e. not in category results) are skipped.
00630  */
00631 PG_FUNCTION_INFO_V1(crosstab_hash);
00632 Datum
00633 crosstab_hash(PG_FUNCTION_ARGS)
00634 {
00635     char       *sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
00636     char       *cats_sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
00637     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
00638     TupleDesc   tupdesc;
00639     MemoryContext per_query_ctx;
00640     MemoryContext oldcontext;
00641     HTAB       *crosstab_hash;
00642 
00643     /* check to see if caller supports us returning a tuplestore */
00644     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
00645         ereport(ERROR,
00646                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00647                  errmsg("set-valued function called in context that cannot accept a set")));
00648     if (!(rsinfo->allowedModes & SFRM_Materialize) ||
00649         rsinfo->expectedDesc == NULL)
00650         ereport(ERROR,
00651                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00652                  errmsg("materialize mode required, but it is not " \
00653                         "allowed in this context")));
00654 
00655     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
00656     oldcontext = MemoryContextSwitchTo(per_query_ctx);
00657 
00658     /* get the requested return tuple description */
00659     tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
00660 
00661     /*
00662      * Check to make sure we have a reasonable tuple descriptor
00663      *
00664      * Note we will attempt to coerce the values into whatever the return
00665      * attribute type is and depend on the "in" function to complain if
00666      * needed.
00667      */
00668     if (tupdesc->natts < 2)
00669         ereport(ERROR,
00670                 (errcode(ERRCODE_SYNTAX_ERROR),
00671                  errmsg("query-specified return tuple and " \
00672                         "crosstab function are not compatible")));
00673 
00674     /* load up the categories hash table */
00675     crosstab_hash = load_categories_hash(cats_sql, per_query_ctx);
00676 
00677     /* let the caller know we're sending back a tuplestore */
00678     rsinfo->returnMode = SFRM_Materialize;
00679 
00680     /* now go build it */
00681     rsinfo->setResult = get_crosstab_tuplestore(sql,
00682                                                 crosstab_hash,
00683                                                 tupdesc,
00684                                                 per_query_ctx,
00685                              rsinfo->allowedModes & SFRM_Materialize_Random);
00686 
00687     /*
00688      * SFRM_Materialize mode expects us to return a NULL Datum. The actual
00689      * tuples are in our tuplestore and passed back through rsinfo->setResult.
00690      * rsinfo->setDesc is set to the tuple description that we actually used
00691      * to build our tuples with, so the caller can verify we did what it was
00692      * expecting.
00693      */
00694     rsinfo->setDesc = tupdesc;
00695     MemoryContextSwitchTo(oldcontext);
00696 
00697     return (Datum) 0;
00698 }
00699 
00700 /*
00701  * load up the categories hash table
00702  */
00703 static HTAB *
00704 load_categories_hash(char *cats_sql, MemoryContext per_query_ctx)
00705 {
00706     HTAB       *crosstab_hash;
00707     HASHCTL     ctl;
00708     int         ret;
00709     int         proc;
00710     MemoryContext SPIcontext;
00711 
00712     /* initialize the category hash table */
00713     MemSet(&ctl, 0, sizeof(ctl));
00714     ctl.keysize = MAX_CATNAME_LEN;
00715     ctl.entrysize = sizeof(crosstab_HashEnt);
00716     ctl.hcxt = per_query_ctx;
00717 
00718     /*
00719      * use INIT_CATS, defined above as a guess of how many hash table entries
00720      * to create, initially
00721      */
00722     crosstab_hash = hash_create("crosstab hash",
00723                                 INIT_CATS,
00724                                 &ctl,
00725                                 HASH_ELEM | HASH_CONTEXT);
00726 
00727     /* Connect to SPI manager */
00728     if ((ret = SPI_connect()) < 0)
00729         /* internal error */
00730         elog(ERROR, "load_categories_hash: SPI_connect returned %d", ret);
00731 
00732     /* Retrieve the category name rows */
00733     ret = SPI_execute(cats_sql, true, 0);
00734     proc = SPI_processed;
00735 
00736     /* Check for qualifying tuples */
00737     if ((ret == SPI_OK_SELECT) && (proc > 0))
00738     {
00739         SPITupleTable *spi_tuptable = SPI_tuptable;
00740         TupleDesc   spi_tupdesc = spi_tuptable->tupdesc;
00741         int         i;
00742 
00743         /*
00744          * The provided categories SQL query must always return one column:
00745          * category - the label or identifier for each column
00746          */
00747         if (spi_tupdesc->natts != 1)
00748             ereport(ERROR,
00749                     (errcode(ERRCODE_SYNTAX_ERROR),
00750                      errmsg("provided \"categories\" SQL must " \
00751                             "return 1 column of at least one row")));
00752 
00753         for (i = 0; i < proc; i++)
00754         {
00755             crosstab_cat_desc *catdesc;
00756             char       *catname;
00757             HeapTuple   spi_tuple;
00758 
00759             /* get the next sql result tuple */
00760             spi_tuple = spi_tuptable->vals[i];
00761 
00762             /* get the category from the current sql result tuple */
00763             catname = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
00764 
00765             SPIcontext = MemoryContextSwitchTo(per_query_ctx);
00766 
00767             catdesc = (crosstab_cat_desc *) palloc(sizeof(crosstab_cat_desc));
00768             catdesc->catname = catname;
00769             catdesc->attidx = i;
00770 
00771             /* Add the proc description block to the hashtable */
00772             crosstab_HashTableInsert(crosstab_hash, catdesc);
00773 
00774             MemoryContextSwitchTo(SPIcontext);
00775         }
00776     }
00777 
00778     if (SPI_finish() != SPI_OK_FINISH)
00779         /* internal error */
00780         elog(ERROR, "load_categories_hash: SPI_finish() failed");
00781 
00782     return crosstab_hash;
00783 }
00784 
00785 /*
00786  * create and populate the crosstab tuplestore using the provided source query
00787  */
00788 static Tuplestorestate *
00789 get_crosstab_tuplestore(char *sql,
00790                         HTAB *crosstab_hash,
00791                         TupleDesc tupdesc,
00792                         MemoryContext per_query_ctx,
00793                         bool randomAccess)
00794 {
00795     Tuplestorestate *tupstore;
00796     int         num_categories = hash_get_num_entries(crosstab_hash);
00797     AttInMetadata *attinmeta = TupleDescGetAttInMetadata(tupdesc);
00798     char      **values;
00799     HeapTuple   tuple;
00800     int         ret;
00801     int         proc;
00802 
00803     /* initialize our tuplestore (while still in query context!) */
00804     tupstore = tuplestore_begin_heap(randomAccess, false, work_mem);
00805 
00806     /* Connect to SPI manager */
00807     if ((ret = SPI_connect()) < 0)
00808         /* internal error */
00809         elog(ERROR, "get_crosstab_tuplestore: SPI_connect returned %d", ret);
00810 
00811     /* Now retrieve the crosstab source rows */
00812     ret = SPI_execute(sql, true, 0);
00813     proc = SPI_processed;
00814 
00815     /* Check for qualifying tuples */
00816     if ((ret == SPI_OK_SELECT) && (proc > 0))
00817     {
00818         SPITupleTable *spi_tuptable = SPI_tuptable;
00819         TupleDesc   spi_tupdesc = spi_tuptable->tupdesc;
00820         int         ncols = spi_tupdesc->natts;
00821         char       *rowid;
00822         char       *lastrowid = NULL;
00823         bool        firstpass = true;
00824         int         i,
00825                     j;
00826         int         result_ncols;
00827 
00828         if (num_categories == 0)
00829         {
00830             /* no qualifying category tuples */
00831             ereport(ERROR,
00832                     (errcode(ERRCODE_SYNTAX_ERROR),
00833                      errmsg("provided \"categories\" SQL must " \
00834                             "return 1 column of at least one row")));
00835         }
00836 
00837         /*
00838          * The provided SQL query must always return at least three columns:
00839          *
00840          * 1. rowname   the label for each row - column 1 in the final result
00841          * 2. category  the label for each value-column in the final result 3.
00842          * value     the values used to populate the value-columns
00843          *
00844          * If there are more than three columns, the last two are taken as
00845          * "category" and "values". The first column is taken as "rowname".
00846          * Additional columns (2 thru N-2) are assumed the same for the same
00847          * "rowname", and are copied into the result tuple from the first time
00848          * we encounter a particular rowname.
00849          */
00850         if (ncols < 3)
00851             ereport(ERROR,
00852                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00853                      errmsg("invalid source data SQL statement"),
00854                      errdetail("The provided SQL must return 3 " \
00855                                " columns; rowid, category, and values.")));
00856 
00857         result_ncols = (ncols - 2) + num_categories;
00858 
00859         /* Recheck to make sure we tuple descriptor still looks reasonable */
00860         if (tupdesc->natts != result_ncols)
00861             ereport(ERROR,
00862                     (errcode(ERRCODE_SYNTAX_ERROR),
00863                      errmsg("invalid return type"),
00864                      errdetail("Query-specified return " \
00865                                "tuple has %d columns but crosstab " \
00866                                "returns %d.", tupdesc->natts, result_ncols)));
00867 
00868         /* allocate space */
00869         values = (char **) palloc(result_ncols * sizeof(char *));
00870 
00871         /* and make sure it's clear */
00872         memset(values, '\0', result_ncols * sizeof(char *));
00873 
00874         for (i = 0; i < proc; i++)
00875         {
00876             HeapTuple   spi_tuple;
00877             crosstab_cat_desc *catdesc;
00878             char       *catname;
00879 
00880             /* get the next sql result tuple */
00881             spi_tuple = spi_tuptable->vals[i];
00882 
00883             /* get the rowid from the current sql result tuple */
00884             rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
00885 
00886             /*
00887              * if we're on a new output row, grab the column values up to
00888              * column N-2 now
00889              */
00890             if (firstpass || !xstreq(lastrowid, rowid))
00891             {
00892                 /*
00893                  * a new row means we need to flush the old one first, unless
00894                  * we're on the very first row
00895                  */
00896                 if (!firstpass)
00897                 {
00898                     /* rowid changed, flush the previous output row */
00899                     tuple = BuildTupleFromCStrings(attinmeta, values);
00900 
00901                     tuplestore_puttuple(tupstore, tuple);
00902 
00903                     for (j = 0; j < result_ncols; j++)
00904                         xpfree(values[j]);
00905                 }
00906 
00907                 values[0] = rowid;
00908                 for (j = 1; j < ncols - 2; j++)
00909                     values[j] = SPI_getvalue(spi_tuple, spi_tupdesc, j + 1);
00910 
00911                 /* we're no longer on the first pass */
00912                 firstpass = false;
00913             }
00914 
00915             /* look up the category and fill in the appropriate column */
00916             catname = SPI_getvalue(spi_tuple, spi_tupdesc, ncols - 1);
00917 
00918             if (catname != NULL)
00919             {
00920                 crosstab_HashTableLookup(crosstab_hash, catname, catdesc);
00921 
00922                 if (catdesc)
00923                     values[catdesc->attidx + ncols - 2] =
00924                         SPI_getvalue(spi_tuple, spi_tupdesc, ncols);
00925             }
00926 
00927             xpfree(lastrowid);
00928             xpstrdup(lastrowid, rowid);
00929         }
00930 
00931         /* flush the last output row */
00932         tuple = BuildTupleFromCStrings(attinmeta, values);
00933 
00934         tuplestore_puttuple(tupstore, tuple);
00935     }
00936 
00937     if (SPI_finish() != SPI_OK_FINISH)
00938         /* internal error */
00939         elog(ERROR, "get_crosstab_tuplestore: SPI_finish() failed");
00940 
00941     tuplestore_donestoring(tupstore);
00942 
00943     return tupstore;
00944 }
00945 
00946 /*
00947  * connectby_text - produce a result set from a hierarchical (parent/child)
00948  * table.
00949  *
00950  * e.g. given table foo:
00951  *
00952  *          keyid   parent_keyid pos
00953  *          ------+------------+--
00954  *          row1    NULL         0
00955  *          row2    row1         0
00956  *          row3    row1         0
00957  *          row4    row2         1
00958  *          row5    row2         0
00959  *          row6    row4         0
00960  *          row7    row3         0
00961  *          row8    row6         0
00962  *          row9    row5         0
00963  *
00964  *
00965  * connectby(text relname, text keyid_fld, text parent_keyid_fld
00966  *            [, text orderby_fld], text start_with, int max_depth
00967  *            [, text branch_delim])
00968  * connectby('foo', 'keyid', 'parent_keyid', 'pos', 'row2', 0, '~') returns:
00969  *
00970  *      keyid   parent_id   level    branch             serial
00971  *      ------+-----------+--------+-----------------------
00972  *      row2    NULL          0       row2                1
00973  *      row5    row2          1       row2~row5           2
00974  *      row9    row5          2       row2~row5~row9      3
00975  *      row4    row2          1       row2~row4           4
00976  *      row6    row4          2       row2~row4~row6      5
00977  *      row8    row6          3       row2~row4~row6~row8 6
00978  *
00979  */
00980 PG_FUNCTION_INFO_V1(connectby_text);
00981 
00982 #define CONNECTBY_NCOLS                 4
00983 #define CONNECTBY_NCOLS_NOBRANCH        3
00984 
00985 Datum
00986 connectby_text(PG_FUNCTION_ARGS)
00987 {
00988     char       *relname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00989     char       *key_fld = text_to_cstring(PG_GETARG_TEXT_PP(1));
00990     char       *parent_key_fld = text_to_cstring(PG_GETARG_TEXT_PP(2));
00991     char       *start_with = text_to_cstring(PG_GETARG_TEXT_PP(3));
00992     int         max_depth = PG_GETARG_INT32(4);
00993     char       *branch_delim = NULL;
00994     bool        show_branch = false;
00995     bool        show_serial = false;
00996     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
00997     TupleDesc   tupdesc;
00998     AttInMetadata *attinmeta;
00999     MemoryContext per_query_ctx;
01000     MemoryContext oldcontext;
01001 
01002     /* check to see if caller supports us returning a tuplestore */
01003     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
01004         ereport(ERROR,
01005                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01006                  errmsg("set-valued function called in context that cannot accept a set")));
01007     if (!(rsinfo->allowedModes & SFRM_Materialize) ||
01008         rsinfo->expectedDesc == NULL)
01009         ereport(ERROR,
01010                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01011                  errmsg("materialize mode required, but it is not " \
01012                         "allowed in this context")));
01013 
01014     if (fcinfo->nargs == 6)
01015     {
01016         branch_delim = text_to_cstring(PG_GETARG_TEXT_PP(5));
01017         show_branch = true;
01018     }
01019     else
01020         /* default is no show, tilde for the delimiter */
01021         branch_delim = pstrdup("~");
01022 
01023     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
01024     oldcontext = MemoryContextSwitchTo(per_query_ctx);
01025 
01026     /* get the requested return tuple description */
01027     tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
01028 
01029     /* does it meet our needs */
01030     validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
01031 
01032     /* OK, use it then */
01033     attinmeta = TupleDescGetAttInMetadata(tupdesc);
01034 
01035     /* OK, go to work */
01036     rsinfo->returnMode = SFRM_Materialize;
01037     rsinfo->setResult = connectby(relname,
01038                                   key_fld,
01039                                   parent_key_fld,
01040                                   NULL,
01041                                   branch_delim,
01042                                   start_with,
01043                                   max_depth,
01044                                   show_branch,
01045                                   show_serial,
01046                                   per_query_ctx,
01047                               rsinfo->allowedModes & SFRM_Materialize_Random,
01048                                   attinmeta);
01049     rsinfo->setDesc = tupdesc;
01050 
01051     MemoryContextSwitchTo(oldcontext);
01052 
01053     /*
01054      * SFRM_Materialize mode expects us to return a NULL Datum. The actual
01055      * tuples are in our tuplestore and passed back through rsinfo->setResult.
01056      * rsinfo->setDesc is set to the tuple description that we actually used
01057      * to build our tuples with, so the caller can verify we did what it was
01058      * expecting.
01059      */
01060     return (Datum) 0;
01061 }
01062 
01063 PG_FUNCTION_INFO_V1(connectby_text_serial);
01064 Datum
01065 connectby_text_serial(PG_FUNCTION_ARGS)
01066 {
01067     char       *relname = text_to_cstring(PG_GETARG_TEXT_PP(0));
01068     char       *key_fld = text_to_cstring(PG_GETARG_TEXT_PP(1));
01069     char       *parent_key_fld = text_to_cstring(PG_GETARG_TEXT_PP(2));
01070     char       *orderby_fld = text_to_cstring(PG_GETARG_TEXT_PP(3));
01071     char       *start_with = text_to_cstring(PG_GETARG_TEXT_PP(4));
01072     int         max_depth = PG_GETARG_INT32(5);
01073     char       *branch_delim = NULL;
01074     bool        show_branch = false;
01075     bool        show_serial = true;
01076     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
01077     TupleDesc   tupdesc;
01078     AttInMetadata *attinmeta;
01079     MemoryContext per_query_ctx;
01080     MemoryContext oldcontext;
01081 
01082     /* check to see if caller supports us returning a tuplestore */
01083     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
01084         ereport(ERROR,
01085                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01086                  errmsg("set-valued function called in context that cannot accept a set")));
01087     if (!(rsinfo->allowedModes & SFRM_Materialize) ||
01088         rsinfo->expectedDesc == NULL)
01089         ereport(ERROR,
01090                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01091                  errmsg("materialize mode required, but it is not " \
01092                         "allowed in this context")));
01093 
01094     if (fcinfo->nargs == 7)
01095     {
01096         branch_delim = text_to_cstring(PG_GETARG_TEXT_PP(6));
01097         show_branch = true;
01098     }
01099     else
01100         /* default is no show, tilde for the delimiter */
01101         branch_delim = pstrdup("~");
01102 
01103     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
01104     oldcontext = MemoryContextSwitchTo(per_query_ctx);
01105 
01106     /* get the requested return tuple description */
01107     tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
01108 
01109     /* does it meet our needs */
01110     validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
01111 
01112     /* OK, use it then */
01113     attinmeta = TupleDescGetAttInMetadata(tupdesc);
01114 
01115     /* OK, go to work */
01116     rsinfo->returnMode = SFRM_Materialize;
01117     rsinfo->setResult = connectby(relname,
01118                                   key_fld,
01119                                   parent_key_fld,
01120                                   orderby_fld,
01121                                   branch_delim,
01122                                   start_with,
01123                                   max_depth,
01124                                   show_branch,
01125                                   show_serial,
01126                                   per_query_ctx,
01127                               rsinfo->allowedModes & SFRM_Materialize_Random,
01128                                   attinmeta);
01129     rsinfo->setDesc = tupdesc;
01130 
01131     MemoryContextSwitchTo(oldcontext);
01132 
01133     /*
01134      * SFRM_Materialize mode expects us to return a NULL Datum. The actual
01135      * tuples are in our tuplestore and passed back through rsinfo->setResult.
01136      * rsinfo->setDesc is set to the tuple description that we actually used
01137      * to build our tuples with, so the caller can verify we did what it was
01138      * expecting.
01139      */
01140     return (Datum) 0;
01141 }
01142 
01143 
01144 /*
01145  * connectby - does the real work for connectby_text()
01146  */
01147 static Tuplestorestate *
01148 connectby(char *relname,
01149           char *key_fld,
01150           char *parent_key_fld,
01151           char *orderby_fld,
01152           char *branch_delim,
01153           char *start_with,
01154           int max_depth,
01155           bool show_branch,
01156           bool show_serial,
01157           MemoryContext per_query_ctx,
01158           bool randomAccess,
01159           AttInMetadata *attinmeta)
01160 {
01161     Tuplestorestate *tupstore = NULL;
01162     int         ret;
01163     MemoryContext oldcontext;
01164 
01165     int         serial = 1;
01166 
01167     /* Connect to SPI manager */
01168     if ((ret = SPI_connect()) < 0)
01169         /* internal error */
01170         elog(ERROR, "connectby: SPI_connect returned %d", ret);
01171 
01172     /* switch to longer term context to create the tuple store */
01173     oldcontext = MemoryContextSwitchTo(per_query_ctx);
01174 
01175     /* initialize our tuplestore */
01176     tupstore = tuplestore_begin_heap(randomAccess, false, work_mem);
01177 
01178     MemoryContextSwitchTo(oldcontext);
01179 
01180     /* now go get the whole tree */
01181     tupstore = build_tuplestore_recursively(key_fld,
01182                                             parent_key_fld,
01183                                             relname,
01184                                             orderby_fld,
01185                                             branch_delim,
01186                                             start_with,
01187                                             start_with, /* current_branch */
01188                                             0,  /* initial level is 0 */
01189                                             &serial,    /* initial serial is 1 */
01190                                             max_depth,
01191                                             show_branch,
01192                                             show_serial,
01193                                             per_query_ctx,
01194                                             attinmeta,
01195                                             tupstore);
01196 
01197     SPI_finish();
01198 
01199     return tupstore;
01200 }
01201 
01202 static Tuplestorestate *
01203 build_tuplestore_recursively(char *key_fld,
01204                              char *parent_key_fld,
01205                              char *relname,
01206                              char *orderby_fld,
01207                              char *branch_delim,
01208                              char *start_with,
01209                              char *branch,
01210                              int level,
01211                              int *serial,
01212                              int max_depth,
01213                              bool show_branch,
01214                              bool show_serial,
01215                              MemoryContext per_query_ctx,
01216                              AttInMetadata *attinmeta,
01217                              Tuplestorestate *tupstore)
01218 {
01219     TupleDesc   tupdesc = attinmeta->tupdesc;
01220     int         ret;
01221     int         proc;
01222     int         serial_column;
01223     StringInfoData sql;
01224     char      **values;
01225     char       *current_key;
01226     char       *current_key_parent;
01227     char        current_level[INT32_STRLEN];
01228     char        serial_str[INT32_STRLEN];
01229     char       *current_branch;
01230     HeapTuple   tuple;
01231 
01232     if (max_depth > 0 && level > max_depth)
01233         return tupstore;
01234 
01235     initStringInfo(&sql);
01236 
01237     /* Build initial sql statement */
01238     if (!show_serial)
01239     {
01240         appendStringInfo(&sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s",
01241                          key_fld,
01242                          parent_key_fld,
01243                          relname,
01244                          parent_key_fld,
01245                          quote_literal_cstr(start_with),
01246                          key_fld, key_fld, parent_key_fld);
01247         serial_column = 0;
01248     }
01249     else
01250     {
01251         appendStringInfo(&sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s ORDER BY %s",
01252                          key_fld,
01253                          parent_key_fld,
01254                          relname,
01255                          parent_key_fld,
01256                          quote_literal_cstr(start_with),
01257                          key_fld, key_fld, parent_key_fld,
01258                          orderby_fld);
01259         serial_column = 1;
01260     }
01261 
01262     if (show_branch)
01263         values = (char **) palloc((CONNECTBY_NCOLS + serial_column) * sizeof(char *));
01264     else
01265         values = (char **) palloc((CONNECTBY_NCOLS_NOBRANCH + serial_column) * sizeof(char *));
01266 
01267     /* First time through, do a little setup */
01268     if (level == 0)
01269     {
01270         /* root value is the one we initially start with */
01271         values[0] = start_with;
01272 
01273         /* root value has no parent */
01274         values[1] = NULL;
01275 
01276         /* root level is 0 */
01277         sprintf(current_level, "%d", level);
01278         values[2] = current_level;
01279 
01280         /* root branch is just starting root value */
01281         if (show_branch)
01282             values[3] = start_with;
01283 
01284         /* root starts the serial with 1 */
01285         if (show_serial)
01286         {
01287             sprintf(serial_str, "%d", (*serial)++);
01288             if (show_branch)
01289                 values[4] = serial_str;
01290             else
01291                 values[3] = serial_str;
01292         }
01293 
01294         /* construct the tuple */
01295         tuple = BuildTupleFromCStrings(attinmeta, values);
01296 
01297         /* now store it */
01298         tuplestore_puttuple(tupstore, tuple);
01299 
01300         /* increment level */
01301         level++;
01302     }
01303 
01304     /* Retrieve the desired rows */
01305     ret = SPI_execute(sql.data, true, 0);
01306     proc = SPI_processed;
01307 
01308     /* Check for qualifying tuples */
01309     if ((ret == SPI_OK_SELECT) && (proc > 0))
01310     {
01311         HeapTuple   spi_tuple;
01312         SPITupleTable *tuptable = SPI_tuptable;
01313         TupleDesc   spi_tupdesc = tuptable->tupdesc;
01314         int         i;
01315         StringInfoData branchstr;
01316         StringInfoData chk_branchstr;
01317         StringInfoData chk_current_key;
01318 
01319         /* First time through, do a little more setup */
01320         if (level == 0)
01321         {
01322             /*
01323              * Check that return tupdesc is compatible with the one we got
01324              * from the query, but only at level 0 -- no need to check more
01325              * than once
01326              */
01327 
01328             if (!compatConnectbyTupleDescs(tupdesc, spi_tupdesc))
01329                 ereport(ERROR,
01330                         (errcode(ERRCODE_SYNTAX_ERROR),
01331                          errmsg("invalid return type"),
01332                          errdetail("Return and SQL tuple descriptions are " \
01333                                    "incompatible.")));
01334         }
01335 
01336         initStringInfo(&branchstr);
01337         initStringInfo(&chk_branchstr);
01338         initStringInfo(&chk_current_key);
01339 
01340         for (i = 0; i < proc; i++)
01341         {
01342             /* initialize branch for this pass */
01343             appendStringInfo(&branchstr, "%s", branch);
01344             appendStringInfo(&chk_branchstr, "%s%s%s", branch_delim, branch, branch_delim);
01345 
01346             /* get the next sql result tuple */
01347             spi_tuple = tuptable->vals[i];
01348 
01349             /* get the current key and parent */
01350             current_key = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
01351             appendStringInfo(&chk_current_key, "%s%s%s", branch_delim, current_key, branch_delim);
01352             current_key_parent = pstrdup(SPI_getvalue(spi_tuple, spi_tupdesc, 2));
01353 
01354             /* get the current level */
01355             sprintf(current_level, "%d", level);
01356 
01357             /* check to see if this key is also an ancestor */
01358             if (strstr(chk_branchstr.data, chk_current_key.data))
01359                 elog(ERROR, "infinite recursion detected");
01360 
01361             /* OK, extend the branch */
01362             appendStringInfo(&branchstr, "%s%s", branch_delim, current_key);
01363             current_branch = branchstr.data;
01364 
01365             /* build a tuple */
01366             values[0] = pstrdup(current_key);
01367             values[1] = current_key_parent;
01368             values[2] = current_level;
01369             if (show_branch)
01370                 values[3] = current_branch;
01371             if (show_serial)
01372             {
01373                 sprintf(serial_str, "%d", (*serial)++);
01374                 if (show_branch)
01375                     values[4] = serial_str;
01376                 else
01377                     values[3] = serial_str;
01378             }
01379 
01380             tuple = BuildTupleFromCStrings(attinmeta, values);
01381 
01382             xpfree(current_key);
01383             xpfree(current_key_parent);
01384 
01385             /* store the tuple for later use */
01386             tuplestore_puttuple(tupstore, tuple);
01387 
01388             heap_freetuple(tuple);
01389 
01390             /* recurse using current_key_parent as the new start_with */
01391             tupstore = build_tuplestore_recursively(key_fld,
01392                                                     parent_key_fld,
01393                                                     relname,
01394                                                     orderby_fld,
01395                                                     branch_delim,
01396                                                     values[0],
01397                                                     current_branch,
01398                                                     level + 1,
01399                                                     serial,
01400                                                     max_depth,
01401                                                     show_branch,
01402                                                     show_serial,
01403                                                     per_query_ctx,
01404                                                     attinmeta,
01405                                                     tupstore);
01406 
01407             /* reset branch for next pass */
01408             resetStringInfo(&branchstr);
01409             resetStringInfo(&chk_branchstr);
01410             resetStringInfo(&chk_current_key);
01411         }
01412 
01413         xpfree(branchstr.data);
01414         xpfree(chk_branchstr.data);
01415         xpfree(chk_current_key.data);
01416     }
01417 
01418     return tupstore;
01419 }
01420 
01421 /*
01422  * Check expected (query runtime) tupdesc suitable for Connectby
01423  */
01424 static void
01425 validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial)
01426 {
01427     int         serial_column = 0;
01428 
01429     if (show_serial)
01430         serial_column = 1;
01431 
01432     /* are there the correct number of columns */
01433     if (show_branch)
01434     {
01435         if (tupdesc->natts != (CONNECTBY_NCOLS + serial_column))
01436             ereport(ERROR,
01437                     (errcode(ERRCODE_SYNTAX_ERROR),
01438                      errmsg("invalid return type"),
01439                      errdetail("Query-specified return tuple has " \
01440                                "wrong number of columns.")));
01441     }
01442     else
01443     {
01444         if (tupdesc->natts != CONNECTBY_NCOLS_NOBRANCH + serial_column)
01445             ereport(ERROR,
01446                     (errcode(ERRCODE_SYNTAX_ERROR),
01447                      errmsg("invalid return type"),
01448                      errdetail("Query-specified return tuple has " \
01449                                "wrong number of columns.")));
01450     }
01451 
01452     /* check that the types of the first two columns match */
01453     if (tupdesc->attrs[0]->atttypid != tupdesc->attrs[1]->atttypid)
01454         ereport(ERROR,
01455                 (errcode(ERRCODE_SYNTAX_ERROR),
01456                  errmsg("invalid return type"),
01457                  errdetail("First two columns must be the same type.")));
01458 
01459     /* check that the type of the third column is INT4 */
01460     if (tupdesc->attrs[2]->atttypid != INT4OID)
01461         ereport(ERROR,
01462                 (errcode(ERRCODE_SYNTAX_ERROR),
01463                  errmsg("invalid return type"),
01464                  errdetail("Third column must be type %s.",
01465                            format_type_be(INT4OID))));
01466 
01467     /* check that the type of the fourth column is TEXT if applicable */
01468     if (show_branch && tupdesc->attrs[3]->atttypid != TEXTOID)
01469         ereport(ERROR,
01470                 (errcode(ERRCODE_SYNTAX_ERROR),
01471                  errmsg("invalid return type"),
01472                  errdetail("Fourth column must be type %s.",
01473                            format_type_be(TEXTOID))));
01474 
01475     /* check that the type of the fifth column is INT4 */
01476     if (show_branch && show_serial && tupdesc->attrs[4]->atttypid != INT4OID)
01477         elog(ERROR, "query-specified return tuple not valid for Connectby: "
01478              "fifth column must be type %s", format_type_be(INT4OID));
01479 
01480     /* check that the type of the fifth column is INT4 */
01481     if (!show_branch && show_serial && tupdesc->attrs[3]->atttypid != INT4OID)
01482         elog(ERROR, "query-specified return tuple not valid for Connectby: "
01483              "fourth column must be type %s", format_type_be(INT4OID));
01484 
01485     /* OK, the tupdesc is valid for our purposes */
01486 }
01487 
01488 /*
01489  * Check if spi sql tupdesc and return tupdesc are compatible
01490  */
01491 static bool
01492 compatConnectbyTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
01493 {
01494     Oid         ret_atttypid;
01495     Oid         sql_atttypid;
01496 
01497     /* check the key_fld types match */
01498     ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
01499     sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
01500     if (ret_atttypid != sql_atttypid)
01501         ereport(ERROR,
01502                 (errcode(ERRCODE_SYNTAX_ERROR),
01503                  errmsg("invalid return type"),
01504                  errdetail("SQL key field datatype does " \
01505                            "not match return key field datatype.")));
01506 
01507     /* check the parent_key_fld types match */
01508     ret_atttypid = ret_tupdesc->attrs[1]->atttypid;
01509     sql_atttypid = sql_tupdesc->attrs[1]->atttypid;
01510     if (ret_atttypid != sql_atttypid)
01511         ereport(ERROR,
01512                 (errcode(ERRCODE_SYNTAX_ERROR),
01513                  errmsg("invalid return type"),
01514                  errdetail("SQL parent key field datatype does " \
01515                            "not match return parent key field datatype.")));
01516 
01517     /* OK, the two tupdescs are compatible for our purposes */
01518     return true;
01519 }
01520 
01521 /*
01522  * Check if two tupdescs match in type of attributes
01523  */
01524 static bool
01525 compatCrosstabTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
01526 {
01527     int         i;
01528     Form_pg_attribute ret_attr;
01529     Oid         ret_atttypid;
01530     Form_pg_attribute sql_attr;
01531     Oid         sql_atttypid;
01532 
01533     if (ret_tupdesc->natts < 2 ||
01534         sql_tupdesc->natts < 3)
01535         return false;
01536 
01537     /* check the rowid types match */
01538     ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
01539     sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
01540     if (ret_atttypid != sql_atttypid)
01541         ereport(ERROR,
01542                 (errcode(ERRCODE_SYNTAX_ERROR),
01543                  errmsg("invalid return type"),
01544                  errdetail("SQL rowid datatype does not match " \
01545                            "return rowid datatype.")));
01546 
01547     /*
01548      * - attribute [1] of the sql tuple is the category; no need to check it -
01549      * attribute [2] of the sql tuple should match attributes [1] to [natts]
01550      * of the return tuple
01551      */
01552     sql_attr = sql_tupdesc->attrs[2];
01553     for (i = 1; i < ret_tupdesc->natts; i++)
01554     {
01555         ret_attr = ret_tupdesc->attrs[i];
01556 
01557         if (ret_attr->atttypid != sql_attr->atttypid)
01558             return false;
01559     }
01560 
01561     /* OK, the two tupdescs are compatible for our purposes */
01562     return true;
01563 }