00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include "postgres.h"
00034
00035 #include <math.h>
00036
00037 #include "access/htup_details.h"
00038 #include "catalog/pg_type.h"
00039 #include "executor/spi.h"
00040 #include "funcapi.h"
00041 #include "lib/stringinfo.h"
00042 #include "miscadmin.h"
00043 #include "utils/builtins.h"
00044
00045 #include "tablefunc.h"
00046
00047 PG_MODULE_MAGIC;
00048
00049 static HTAB *load_categories_hash(char *cats_sql, MemoryContext per_query_ctx);
00050 static Tuplestorestate *get_crosstab_tuplestore(char *sql,
00051 HTAB *crosstab_hash,
00052 TupleDesc tupdesc,
00053 MemoryContext per_query_ctx,
00054 bool randomAccess);
00055 static void validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial);
00056 static bool compatCrosstabTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
00057 static bool compatConnectbyTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2);
00058 static void get_normal_pair(float8 *x1, float8 *x2);
00059 static Tuplestorestate *connectby(char *relname,
00060 char *key_fld,
00061 char *parent_key_fld,
00062 char *orderby_fld,
00063 char *branch_delim,
00064 char *start_with,
00065 int max_depth,
00066 bool show_branch,
00067 bool show_serial,
00068 MemoryContext per_query_ctx,
00069 bool randomAccess,
00070 AttInMetadata *attinmeta);
00071 static Tuplestorestate *build_tuplestore_recursively(char *key_fld,
00072 char *parent_key_fld,
00073 char *relname,
00074 char *orderby_fld,
00075 char *branch_delim,
00076 char *start_with,
00077 char *branch,
00078 int level,
00079 int *serial,
00080 int max_depth,
00081 bool show_branch,
00082 bool show_serial,
00083 MemoryContext per_query_ctx,
00084 AttInMetadata *attinmeta,
00085 Tuplestorestate *tupstore);
00086
00087 typedef struct
00088 {
00089 float8 mean;
00090 float8 stddev;
00091 float8 carry_val;
00092 bool use_carry;
00093 } normal_rand_fctx;
00094
00095 #define xpfree(var_) \
00096 do { \
00097 if (var_ != NULL) \
00098 { \
00099 pfree(var_); \
00100 var_ = NULL; \
00101 } \
00102 } while (0)
00103
00104 #define xpstrdup(tgtvar_, srcvar_) \
00105 do { \
00106 if (srcvar_) \
00107 tgtvar_ = pstrdup(srcvar_); \
00108 else \
00109 tgtvar_ = NULL; \
00110 } while (0)
00111
00112 #define xstreq(tgtvar_, srcvar_) \
00113 (((tgtvar_ == NULL) && (srcvar_ == NULL)) || \
00114 ((tgtvar_ != NULL) && (srcvar_ != NULL) && (strcmp(tgtvar_, srcvar_) == 0)))
00115
00116
00117 #define INT32_STRLEN 12
00118
00119
00120 typedef struct crosstab_cat_desc
00121 {
00122 char *catname;
00123 int attidx;
00124 } crosstab_cat_desc;
00125
00126 #define MAX_CATNAME_LEN NAMEDATALEN
00127 #define INIT_CATS 64
00128
00129 #define crosstab_HashTableLookup(HASHTAB, CATNAME, CATDESC) \
00130 do { \
00131 crosstab_HashEnt *hentry; char key[MAX_CATNAME_LEN]; \
00132 \
00133 MemSet(key, 0, MAX_CATNAME_LEN); \
00134 snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATNAME); \
00135 hentry = (crosstab_HashEnt*) hash_search(HASHTAB, \
00136 key, HASH_FIND, NULL); \
00137 if (hentry) \
00138 CATDESC = hentry->catdesc; \
00139 else \
00140 CATDESC = NULL; \
00141 } while(0)
00142
00143 #define crosstab_HashTableInsert(HASHTAB, CATDESC) \
00144 do { \
00145 crosstab_HashEnt *hentry; bool found; char key[MAX_CATNAME_LEN]; \
00146 \
00147 MemSet(key, 0, MAX_CATNAME_LEN); \
00148 snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATDESC->catname); \
00149 hentry = (crosstab_HashEnt*) hash_search(HASHTAB, \
00150 key, HASH_ENTER, &found); \
00151 if (found) \
00152 ereport(ERROR, \
00153 (errcode(ERRCODE_DUPLICATE_OBJECT), \
00154 errmsg("duplicate category name"))); \
00155 hentry->catdesc = CATDESC; \
00156 } while(0)
00157
00158
00159 typedef struct crosstab_hashent
00160 {
00161 char internal_catname[MAX_CATNAME_LEN];
00162 crosstab_cat_desc *catdesc;
00163 } crosstab_HashEnt;
00164
00165
00166
00167
00168
00169
00170
00171
00172 PG_FUNCTION_INFO_V1(normal_rand);
00173 Datum
00174 normal_rand(PG_FUNCTION_ARGS)
00175 {
00176 FuncCallContext *funcctx;
00177 int call_cntr;
00178 int max_calls;
00179 normal_rand_fctx *fctx;
00180 float8 mean;
00181 float8 stddev;
00182 float8 carry_val;
00183 bool use_carry;
00184 MemoryContext oldcontext;
00185
00186
00187 if (SRF_IS_FIRSTCALL())
00188 {
00189
00190 funcctx = SRF_FIRSTCALL_INIT();
00191
00192
00193
00194
00195 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
00196
00197
00198 funcctx->max_calls = PG_GETARG_UINT32(0);
00199
00200
00201 fctx = (normal_rand_fctx *) palloc(sizeof(normal_rand_fctx));
00202
00203
00204
00205
00206
00207
00208
00209 fctx->mean = PG_GETARG_FLOAT8(1);
00210 fctx->stddev = PG_GETARG_FLOAT8(2);
00211 fctx->carry_val = 0;
00212 fctx->use_carry = false;
00213
00214 funcctx->user_fctx = fctx;
00215
00216 MemoryContextSwitchTo(oldcontext);
00217 }
00218
00219
00220 funcctx = SRF_PERCALL_SETUP();
00221
00222 call_cntr = funcctx->call_cntr;
00223 max_calls = funcctx->max_calls;
00224 fctx = funcctx->user_fctx;
00225 mean = fctx->mean;
00226 stddev = fctx->stddev;
00227 carry_val = fctx->carry_val;
00228 use_carry = fctx->use_carry;
00229
00230 if (call_cntr < max_calls)
00231 {
00232 float8 result;
00233
00234 if (use_carry)
00235 {
00236
00237
00238
00239 fctx->use_carry = false;
00240 result = carry_val;
00241 }
00242 else
00243 {
00244 float8 normval_1;
00245 float8 normval_2;
00246
00247
00248 get_normal_pair(&normval_1, &normval_2);
00249
00250
00251 result = mean + (stddev * normval_1);
00252
00253
00254 fctx->carry_val = mean + (stddev * normval_2);
00255 fctx->use_carry = true;
00256 }
00257
00258
00259 SRF_RETURN_NEXT(funcctx, Float8GetDatum(result));
00260 }
00261 else
00262
00263 SRF_RETURN_DONE(funcctx);
00264 }
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277 static void
00278 get_normal_pair(float8 *x1, float8 *x2)
00279 {
00280 float8 u1,
00281 u2,
00282 v1,
00283 v2,
00284 s;
00285
00286 do
00287 {
00288 u1 = (float8) random() / (float8) MAX_RANDOM_VALUE;
00289 u2 = (float8) random() / (float8) MAX_RANDOM_VALUE;
00290
00291 v1 = (2.0 * u1) - 1.0;
00292 v2 = (2.0 * u2) - 1.0;
00293
00294 s = v1 * v1 + v2 * v2;
00295 } while (s >= 1.0);
00296
00297 if (s == 0)
00298 {
00299 *x1 = 0;
00300 *x2 = 0;
00301 }
00302 else
00303 {
00304 s = sqrt((-2.0 * log(s)) / s);
00305 *x1 = v1 * s;
00306 *x2 = v2 * s;
00307 }
00308 }
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347 PG_FUNCTION_INFO_V1(crosstab);
00348 Datum
00349 crosstab(PG_FUNCTION_ARGS)
00350 {
00351 char *sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
00352 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
00353 Tuplestorestate *tupstore;
00354 TupleDesc tupdesc;
00355 int call_cntr;
00356 int max_calls;
00357 AttInMetadata *attinmeta;
00358 SPITupleTable *spi_tuptable;
00359 TupleDesc spi_tupdesc;
00360 bool firstpass;
00361 char *lastrowid;
00362 int i;
00363 int num_categories;
00364 MemoryContext per_query_ctx;
00365 MemoryContext oldcontext;
00366 int ret;
00367 int proc;
00368
00369
00370 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
00371 ereport(ERROR,
00372 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00373 errmsg("set-valued function called in context that cannot accept a set")));
00374 if (!(rsinfo->allowedModes & SFRM_Materialize))
00375 ereport(ERROR,
00376 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00377 errmsg("materialize mode required, but it is not " \
00378 "allowed in this context")));
00379
00380 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
00381
00382
00383 if ((ret = SPI_connect()) < 0)
00384
00385 elog(ERROR, "crosstab: SPI_connect returned %d", ret);
00386
00387
00388 ret = SPI_execute(sql, true, 0);
00389 proc = SPI_processed;
00390
00391
00392 if (ret != SPI_OK_SELECT || proc <= 0)
00393 {
00394 SPI_finish();
00395 rsinfo->isDone = ExprEndResult;
00396 PG_RETURN_NULL();
00397 }
00398
00399 spi_tuptable = SPI_tuptable;
00400 spi_tupdesc = spi_tuptable->tupdesc;
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413 if (spi_tupdesc->natts != 3)
00414 ereport(ERROR,
00415 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00416 errmsg("invalid source data SQL statement"),
00417 errdetail("The provided SQL must return 3 "
00418 "columns: rowid, category, and values.")));
00419
00420
00421 switch (get_call_result_type(fcinfo, NULL, &tupdesc))
00422 {
00423 case TYPEFUNC_COMPOSITE:
00424
00425 break;
00426 case TYPEFUNC_RECORD:
00427
00428 ereport(ERROR,
00429 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00430 errmsg("function returning record called in context "
00431 "that cannot accept type record")));
00432 break;
00433 default:
00434
00435 elog(ERROR, "return type must be a row type");
00436 break;
00437 }
00438
00439
00440
00441
00442
00443 if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
00444 ereport(ERROR,
00445 (errcode(ERRCODE_SYNTAX_ERROR),
00446 errmsg("return and sql tuple descriptions are " \
00447 "incompatible")));
00448
00449
00450
00451
00452 oldcontext = MemoryContextSwitchTo(per_query_ctx);
00453
00454
00455 tupdesc = CreateTupleDescCopy(tupdesc);
00456
00457
00458 tupstore =
00459 tuplestore_begin_heap(rsinfo->allowedModes & SFRM_Materialize_Random,
00460 false, work_mem);
00461
00462 MemoryContextSwitchTo(oldcontext);
00463
00464
00465
00466
00467
00468 attinmeta = TupleDescGetAttInMetadata(tupdesc);
00469
00470
00471 max_calls = proc;
00472
00473
00474 num_categories = tupdesc->natts - 1;
00475
00476 firstpass = true;
00477 lastrowid = NULL;
00478
00479 for (call_cntr = 0; call_cntr < max_calls; call_cntr++)
00480 {
00481 bool skip_tuple = false;
00482 char **values;
00483
00484
00485 values = (char **) palloc0((1 + num_categories) * sizeof(char *));
00486
00487
00488
00489
00490
00491 for (i = 0; i < num_categories; i++)
00492 {
00493 HeapTuple spi_tuple;
00494 char *rowid;
00495
00496
00497 if (call_cntr >= max_calls)
00498 break;
00499
00500
00501 spi_tuple = spi_tuptable->vals[call_cntr];
00502
00503
00504 rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
00505
00506
00507
00508
00509
00510 if (i == 0)
00511 {
00512 xpstrdup(values[0], rowid);
00513
00514
00515
00516
00517
00518 if (!firstpass && xstreq(lastrowid, rowid))
00519 {
00520 xpfree(rowid);
00521 skip_tuple = true;
00522 break;
00523 }
00524 }
00525
00526
00527
00528
00529
00530 if (xstreq(rowid, values[0]))
00531 {
00532
00533
00534
00535
00536
00537
00538
00539 values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
00540
00541
00542
00543
00544
00545
00546 if (i < (num_categories - 1))
00547 call_cntr++;
00548 xpfree(rowid);
00549 }
00550 else
00551 {
00552
00553
00554
00555
00556
00557 call_cntr--;
00558 xpfree(rowid);
00559 break;
00560 }
00561 }
00562
00563 if (!skip_tuple)
00564 {
00565 HeapTuple tuple;
00566
00567
00568 tuple = BuildTupleFromCStrings(attinmeta, values);
00569 tuplestore_puttuple(tupstore, tuple);
00570 heap_freetuple(tuple);
00571 }
00572
00573
00574 xpfree(lastrowid);
00575 xpstrdup(lastrowid, values[0]);
00576 firstpass = false;
00577
00578
00579 for (i = 0; i < num_categories + 1; i++)
00580 if (values[i] != NULL)
00581 pfree(values[i]);
00582 pfree(values);
00583 }
00584
00585
00586 rsinfo->returnMode = SFRM_Materialize;
00587 rsinfo->setResult = tupstore;
00588 rsinfo->setDesc = tupdesc;
00589
00590
00591 SPI_finish();
00592
00593 return (Datum) 0;
00594 }
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631 PG_FUNCTION_INFO_V1(crosstab_hash);
00632 Datum
00633 crosstab_hash(PG_FUNCTION_ARGS)
00634 {
00635 char *sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
00636 char *cats_sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
00637 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
00638 TupleDesc tupdesc;
00639 MemoryContext per_query_ctx;
00640 MemoryContext oldcontext;
00641 HTAB *crosstab_hash;
00642
00643
00644 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
00645 ereport(ERROR,
00646 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00647 errmsg("set-valued function called in context that cannot accept a set")));
00648 if (!(rsinfo->allowedModes & SFRM_Materialize) ||
00649 rsinfo->expectedDesc == NULL)
00650 ereport(ERROR,
00651 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00652 errmsg("materialize mode required, but it is not " \
00653 "allowed in this context")));
00654
00655 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
00656 oldcontext = MemoryContextSwitchTo(per_query_ctx);
00657
00658
00659 tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
00660
00661
00662
00663
00664
00665
00666
00667
00668 if (tupdesc->natts < 2)
00669 ereport(ERROR,
00670 (errcode(ERRCODE_SYNTAX_ERROR),
00671 errmsg("query-specified return tuple and " \
00672 "crosstab function are not compatible")));
00673
00674
00675 crosstab_hash = load_categories_hash(cats_sql, per_query_ctx);
00676
00677
00678 rsinfo->returnMode = SFRM_Materialize;
00679
00680
00681 rsinfo->setResult = get_crosstab_tuplestore(sql,
00682 crosstab_hash,
00683 tupdesc,
00684 per_query_ctx,
00685 rsinfo->allowedModes & SFRM_Materialize_Random);
00686
00687
00688
00689
00690
00691
00692
00693
00694 rsinfo->setDesc = tupdesc;
00695 MemoryContextSwitchTo(oldcontext);
00696
00697 return (Datum) 0;
00698 }
00699
00700
00701
00702
00703 static HTAB *
00704 load_categories_hash(char *cats_sql, MemoryContext per_query_ctx)
00705 {
00706 HTAB *crosstab_hash;
00707 HASHCTL ctl;
00708 int ret;
00709 int proc;
00710 MemoryContext SPIcontext;
00711
00712
00713 MemSet(&ctl, 0, sizeof(ctl));
00714 ctl.keysize = MAX_CATNAME_LEN;
00715 ctl.entrysize = sizeof(crosstab_HashEnt);
00716 ctl.hcxt = per_query_ctx;
00717
00718
00719
00720
00721
00722 crosstab_hash = hash_create("crosstab hash",
00723 INIT_CATS,
00724 &ctl,
00725 HASH_ELEM | HASH_CONTEXT);
00726
00727
00728 if ((ret = SPI_connect()) < 0)
00729
00730 elog(ERROR, "load_categories_hash: SPI_connect returned %d", ret);
00731
00732
00733 ret = SPI_execute(cats_sql, true, 0);
00734 proc = SPI_processed;
00735
00736
00737 if ((ret == SPI_OK_SELECT) && (proc > 0))
00738 {
00739 SPITupleTable *spi_tuptable = SPI_tuptable;
00740 TupleDesc spi_tupdesc = spi_tuptable->tupdesc;
00741 int i;
00742
00743
00744
00745
00746
00747 if (spi_tupdesc->natts != 1)
00748 ereport(ERROR,
00749 (errcode(ERRCODE_SYNTAX_ERROR),
00750 errmsg("provided \"categories\" SQL must " \
00751 "return 1 column of at least one row")));
00752
00753 for (i = 0; i < proc; i++)
00754 {
00755 crosstab_cat_desc *catdesc;
00756 char *catname;
00757 HeapTuple spi_tuple;
00758
00759
00760 spi_tuple = spi_tuptable->vals[i];
00761
00762
00763 catname = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
00764
00765 SPIcontext = MemoryContextSwitchTo(per_query_ctx);
00766
00767 catdesc = (crosstab_cat_desc *) palloc(sizeof(crosstab_cat_desc));
00768 catdesc->catname = catname;
00769 catdesc->attidx = i;
00770
00771
00772 crosstab_HashTableInsert(crosstab_hash, catdesc);
00773
00774 MemoryContextSwitchTo(SPIcontext);
00775 }
00776 }
00777
00778 if (SPI_finish() != SPI_OK_FINISH)
00779
00780 elog(ERROR, "load_categories_hash: SPI_finish() failed");
00781
00782 return crosstab_hash;
00783 }
00784
00785
00786
00787
00788 static Tuplestorestate *
00789 get_crosstab_tuplestore(char *sql,
00790 HTAB *crosstab_hash,
00791 TupleDesc tupdesc,
00792 MemoryContext per_query_ctx,
00793 bool randomAccess)
00794 {
00795 Tuplestorestate *tupstore;
00796 int num_categories = hash_get_num_entries(crosstab_hash);
00797 AttInMetadata *attinmeta = TupleDescGetAttInMetadata(tupdesc);
00798 char **values;
00799 HeapTuple tuple;
00800 int ret;
00801 int proc;
00802
00803
00804 tupstore = tuplestore_begin_heap(randomAccess, false, work_mem);
00805
00806
00807 if ((ret = SPI_connect()) < 0)
00808
00809 elog(ERROR, "get_crosstab_tuplestore: SPI_connect returned %d", ret);
00810
00811
00812 ret = SPI_execute(sql, true, 0);
00813 proc = SPI_processed;
00814
00815
00816 if ((ret == SPI_OK_SELECT) && (proc > 0))
00817 {
00818 SPITupleTable *spi_tuptable = SPI_tuptable;
00819 TupleDesc spi_tupdesc = spi_tuptable->tupdesc;
00820 int ncols = spi_tupdesc->natts;
00821 char *rowid;
00822 char *lastrowid = NULL;
00823 bool firstpass = true;
00824 int i,
00825 j;
00826 int result_ncols;
00827
00828 if (num_categories == 0)
00829 {
00830
00831 ereport(ERROR,
00832 (errcode(ERRCODE_SYNTAX_ERROR),
00833 errmsg("provided \"categories\" SQL must " \
00834 "return 1 column of at least one row")));
00835 }
00836
00837
00838
00839
00840
00841
00842
00843
00844
00845
00846
00847
00848
00849
00850 if (ncols < 3)
00851 ereport(ERROR,
00852 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00853 errmsg("invalid source data SQL statement"),
00854 errdetail("The provided SQL must return 3 " \
00855 " columns; rowid, category, and values.")));
00856
00857 result_ncols = (ncols - 2) + num_categories;
00858
00859
00860 if (tupdesc->natts != result_ncols)
00861 ereport(ERROR,
00862 (errcode(ERRCODE_SYNTAX_ERROR),
00863 errmsg("invalid return type"),
00864 errdetail("Query-specified return " \
00865 "tuple has %d columns but crosstab " \
00866 "returns %d.", tupdesc->natts, result_ncols)));
00867
00868
00869 values = (char **) palloc(result_ncols * sizeof(char *));
00870
00871
00872 memset(values, '\0', result_ncols * sizeof(char *));
00873
00874 for (i = 0; i < proc; i++)
00875 {
00876 HeapTuple spi_tuple;
00877 crosstab_cat_desc *catdesc;
00878 char *catname;
00879
00880
00881 spi_tuple = spi_tuptable->vals[i];
00882
00883
00884 rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
00885
00886
00887
00888
00889
00890 if (firstpass || !xstreq(lastrowid, rowid))
00891 {
00892
00893
00894
00895
00896 if (!firstpass)
00897 {
00898
00899 tuple = BuildTupleFromCStrings(attinmeta, values);
00900
00901 tuplestore_puttuple(tupstore, tuple);
00902
00903 for (j = 0; j < result_ncols; j++)
00904 xpfree(values[j]);
00905 }
00906
00907 values[0] = rowid;
00908 for (j = 1; j < ncols - 2; j++)
00909 values[j] = SPI_getvalue(spi_tuple, spi_tupdesc, j + 1);
00910
00911
00912 firstpass = false;
00913 }
00914
00915
00916 catname = SPI_getvalue(spi_tuple, spi_tupdesc, ncols - 1);
00917
00918 if (catname != NULL)
00919 {
00920 crosstab_HashTableLookup(crosstab_hash, catname, catdesc);
00921
00922 if (catdesc)
00923 values[catdesc->attidx + ncols - 2] =
00924 SPI_getvalue(spi_tuple, spi_tupdesc, ncols);
00925 }
00926
00927 xpfree(lastrowid);
00928 xpstrdup(lastrowid, rowid);
00929 }
00930
00931
00932 tuple = BuildTupleFromCStrings(attinmeta, values);
00933
00934 tuplestore_puttuple(tupstore, tuple);
00935 }
00936
00937 if (SPI_finish() != SPI_OK_FINISH)
00938
00939 elog(ERROR, "get_crosstab_tuplestore: SPI_finish() failed");
00940
00941 tuplestore_donestoring(tupstore);
00942
00943 return tupstore;
00944 }
00945
00946
00947
00948
00949
00950
00951
00952
00953
00954
00955
00956
00957
00958
00959
00960
00961
00962
00963
00964
00965
00966
00967
00968
00969
00970
00971
00972
00973
00974
00975
00976
00977
00978
00979
00980 PG_FUNCTION_INFO_V1(connectby_text);
00981
00982 #define CONNECTBY_NCOLS 4
00983 #define CONNECTBY_NCOLS_NOBRANCH 3
00984
00985 Datum
00986 connectby_text(PG_FUNCTION_ARGS)
00987 {
00988 char *relname = text_to_cstring(PG_GETARG_TEXT_PP(0));
00989 char *key_fld = text_to_cstring(PG_GETARG_TEXT_PP(1));
00990 char *parent_key_fld = text_to_cstring(PG_GETARG_TEXT_PP(2));
00991 char *start_with = text_to_cstring(PG_GETARG_TEXT_PP(3));
00992 int max_depth = PG_GETARG_INT32(4);
00993 char *branch_delim = NULL;
00994 bool show_branch = false;
00995 bool show_serial = false;
00996 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
00997 TupleDesc tupdesc;
00998 AttInMetadata *attinmeta;
00999 MemoryContext per_query_ctx;
01000 MemoryContext oldcontext;
01001
01002
01003 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
01004 ereport(ERROR,
01005 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01006 errmsg("set-valued function called in context that cannot accept a set")));
01007 if (!(rsinfo->allowedModes & SFRM_Materialize) ||
01008 rsinfo->expectedDesc == NULL)
01009 ereport(ERROR,
01010 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01011 errmsg("materialize mode required, but it is not " \
01012 "allowed in this context")));
01013
01014 if (fcinfo->nargs == 6)
01015 {
01016 branch_delim = text_to_cstring(PG_GETARG_TEXT_PP(5));
01017 show_branch = true;
01018 }
01019 else
01020
01021 branch_delim = pstrdup("~");
01022
01023 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
01024 oldcontext = MemoryContextSwitchTo(per_query_ctx);
01025
01026
01027 tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
01028
01029
01030 validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
01031
01032
01033 attinmeta = TupleDescGetAttInMetadata(tupdesc);
01034
01035
01036 rsinfo->returnMode = SFRM_Materialize;
01037 rsinfo->setResult = connectby(relname,
01038 key_fld,
01039 parent_key_fld,
01040 NULL,
01041 branch_delim,
01042 start_with,
01043 max_depth,
01044 show_branch,
01045 show_serial,
01046 per_query_ctx,
01047 rsinfo->allowedModes & SFRM_Materialize_Random,
01048 attinmeta);
01049 rsinfo->setDesc = tupdesc;
01050
01051 MemoryContextSwitchTo(oldcontext);
01052
01053
01054
01055
01056
01057
01058
01059
01060 return (Datum) 0;
01061 }
01062
01063 PG_FUNCTION_INFO_V1(connectby_text_serial);
01064 Datum
01065 connectby_text_serial(PG_FUNCTION_ARGS)
01066 {
01067 char *relname = text_to_cstring(PG_GETARG_TEXT_PP(0));
01068 char *key_fld = text_to_cstring(PG_GETARG_TEXT_PP(1));
01069 char *parent_key_fld = text_to_cstring(PG_GETARG_TEXT_PP(2));
01070 char *orderby_fld = text_to_cstring(PG_GETARG_TEXT_PP(3));
01071 char *start_with = text_to_cstring(PG_GETARG_TEXT_PP(4));
01072 int max_depth = PG_GETARG_INT32(5);
01073 char *branch_delim = NULL;
01074 bool show_branch = false;
01075 bool show_serial = true;
01076 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
01077 TupleDesc tupdesc;
01078 AttInMetadata *attinmeta;
01079 MemoryContext per_query_ctx;
01080 MemoryContext oldcontext;
01081
01082
01083 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
01084 ereport(ERROR,
01085 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01086 errmsg("set-valued function called in context that cannot accept a set")));
01087 if (!(rsinfo->allowedModes & SFRM_Materialize) ||
01088 rsinfo->expectedDesc == NULL)
01089 ereport(ERROR,
01090 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01091 errmsg("materialize mode required, but it is not " \
01092 "allowed in this context")));
01093
01094 if (fcinfo->nargs == 7)
01095 {
01096 branch_delim = text_to_cstring(PG_GETARG_TEXT_PP(6));
01097 show_branch = true;
01098 }
01099 else
01100
01101 branch_delim = pstrdup("~");
01102
01103 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
01104 oldcontext = MemoryContextSwitchTo(per_query_ctx);
01105
01106
01107 tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
01108
01109
01110 validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
01111
01112
01113 attinmeta = TupleDescGetAttInMetadata(tupdesc);
01114
01115
01116 rsinfo->returnMode = SFRM_Materialize;
01117 rsinfo->setResult = connectby(relname,
01118 key_fld,
01119 parent_key_fld,
01120 orderby_fld,
01121 branch_delim,
01122 start_with,
01123 max_depth,
01124 show_branch,
01125 show_serial,
01126 per_query_ctx,
01127 rsinfo->allowedModes & SFRM_Materialize_Random,
01128 attinmeta);
01129 rsinfo->setDesc = tupdesc;
01130
01131 MemoryContextSwitchTo(oldcontext);
01132
01133
01134
01135
01136
01137
01138
01139
01140 return (Datum) 0;
01141 }
01142
01143
01144
01145
01146
01147 static Tuplestorestate *
01148 connectby(char *relname,
01149 char *key_fld,
01150 char *parent_key_fld,
01151 char *orderby_fld,
01152 char *branch_delim,
01153 char *start_with,
01154 int max_depth,
01155 bool show_branch,
01156 bool show_serial,
01157 MemoryContext per_query_ctx,
01158 bool randomAccess,
01159 AttInMetadata *attinmeta)
01160 {
01161 Tuplestorestate *tupstore = NULL;
01162 int ret;
01163 MemoryContext oldcontext;
01164
01165 int serial = 1;
01166
01167
01168 if ((ret = SPI_connect()) < 0)
01169
01170 elog(ERROR, "connectby: SPI_connect returned %d", ret);
01171
01172
01173 oldcontext = MemoryContextSwitchTo(per_query_ctx);
01174
01175
01176 tupstore = tuplestore_begin_heap(randomAccess, false, work_mem);
01177
01178 MemoryContextSwitchTo(oldcontext);
01179
01180
01181 tupstore = build_tuplestore_recursively(key_fld,
01182 parent_key_fld,
01183 relname,
01184 orderby_fld,
01185 branch_delim,
01186 start_with,
01187 start_with,
01188 0,
01189 &serial,
01190 max_depth,
01191 show_branch,
01192 show_serial,
01193 per_query_ctx,
01194 attinmeta,
01195 tupstore);
01196
01197 SPI_finish();
01198
01199 return tupstore;
01200 }
01201
01202 static Tuplestorestate *
01203 build_tuplestore_recursively(char *key_fld,
01204 char *parent_key_fld,
01205 char *relname,
01206 char *orderby_fld,
01207 char *branch_delim,
01208 char *start_with,
01209 char *branch,
01210 int level,
01211 int *serial,
01212 int max_depth,
01213 bool show_branch,
01214 bool show_serial,
01215 MemoryContext per_query_ctx,
01216 AttInMetadata *attinmeta,
01217 Tuplestorestate *tupstore)
01218 {
01219 TupleDesc tupdesc = attinmeta->tupdesc;
01220 int ret;
01221 int proc;
01222 int serial_column;
01223 StringInfoData sql;
01224 char **values;
01225 char *current_key;
01226 char *current_key_parent;
01227 char current_level[INT32_STRLEN];
01228 char serial_str[INT32_STRLEN];
01229 char *current_branch;
01230 HeapTuple tuple;
01231
01232 if (max_depth > 0 && level > max_depth)
01233 return tupstore;
01234
01235 initStringInfo(&sql);
01236
01237
01238 if (!show_serial)
01239 {
01240 appendStringInfo(&sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s",
01241 key_fld,
01242 parent_key_fld,
01243 relname,
01244 parent_key_fld,
01245 quote_literal_cstr(start_with),
01246 key_fld, key_fld, parent_key_fld);
01247 serial_column = 0;
01248 }
01249 else
01250 {
01251 appendStringInfo(&sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s ORDER BY %s",
01252 key_fld,
01253 parent_key_fld,
01254 relname,
01255 parent_key_fld,
01256 quote_literal_cstr(start_with),
01257 key_fld, key_fld, parent_key_fld,
01258 orderby_fld);
01259 serial_column = 1;
01260 }
01261
01262 if (show_branch)
01263 values = (char **) palloc((CONNECTBY_NCOLS + serial_column) * sizeof(char *));
01264 else
01265 values = (char **) palloc((CONNECTBY_NCOLS_NOBRANCH + serial_column) * sizeof(char *));
01266
01267
01268 if (level == 0)
01269 {
01270
01271 values[0] = start_with;
01272
01273
01274 values[1] = NULL;
01275
01276
01277 sprintf(current_level, "%d", level);
01278 values[2] = current_level;
01279
01280
01281 if (show_branch)
01282 values[3] = start_with;
01283
01284
01285 if (show_serial)
01286 {
01287 sprintf(serial_str, "%d", (*serial)++);
01288 if (show_branch)
01289 values[4] = serial_str;
01290 else
01291 values[3] = serial_str;
01292 }
01293
01294
01295 tuple = BuildTupleFromCStrings(attinmeta, values);
01296
01297
01298 tuplestore_puttuple(tupstore, tuple);
01299
01300
01301 level++;
01302 }
01303
01304
01305 ret = SPI_execute(sql.data, true, 0);
01306 proc = SPI_processed;
01307
01308
01309 if ((ret == SPI_OK_SELECT) && (proc > 0))
01310 {
01311 HeapTuple spi_tuple;
01312 SPITupleTable *tuptable = SPI_tuptable;
01313 TupleDesc spi_tupdesc = tuptable->tupdesc;
01314 int i;
01315 StringInfoData branchstr;
01316 StringInfoData chk_branchstr;
01317 StringInfoData chk_current_key;
01318
01319
01320 if (level == 0)
01321 {
01322
01323
01324
01325
01326
01327
01328 if (!compatConnectbyTupleDescs(tupdesc, spi_tupdesc))
01329 ereport(ERROR,
01330 (errcode(ERRCODE_SYNTAX_ERROR),
01331 errmsg("invalid return type"),
01332 errdetail("Return and SQL tuple descriptions are " \
01333 "incompatible.")));
01334 }
01335
01336 initStringInfo(&branchstr);
01337 initStringInfo(&chk_branchstr);
01338 initStringInfo(&chk_current_key);
01339
01340 for (i = 0; i < proc; i++)
01341 {
01342
01343 appendStringInfo(&branchstr, "%s", branch);
01344 appendStringInfo(&chk_branchstr, "%s%s%s", branch_delim, branch, branch_delim);
01345
01346
01347 spi_tuple = tuptable->vals[i];
01348
01349
01350 current_key = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
01351 appendStringInfo(&chk_current_key, "%s%s%s", branch_delim, current_key, branch_delim);
01352 current_key_parent = pstrdup(SPI_getvalue(spi_tuple, spi_tupdesc, 2));
01353
01354
01355 sprintf(current_level, "%d", level);
01356
01357
01358 if (strstr(chk_branchstr.data, chk_current_key.data))
01359 elog(ERROR, "infinite recursion detected");
01360
01361
01362 appendStringInfo(&branchstr, "%s%s", branch_delim, current_key);
01363 current_branch = branchstr.data;
01364
01365
01366 values[0] = pstrdup(current_key);
01367 values[1] = current_key_parent;
01368 values[2] = current_level;
01369 if (show_branch)
01370 values[3] = current_branch;
01371 if (show_serial)
01372 {
01373 sprintf(serial_str, "%d", (*serial)++);
01374 if (show_branch)
01375 values[4] = serial_str;
01376 else
01377 values[3] = serial_str;
01378 }
01379
01380 tuple = BuildTupleFromCStrings(attinmeta, values);
01381
01382 xpfree(current_key);
01383 xpfree(current_key_parent);
01384
01385
01386 tuplestore_puttuple(tupstore, tuple);
01387
01388 heap_freetuple(tuple);
01389
01390
01391 tupstore = build_tuplestore_recursively(key_fld,
01392 parent_key_fld,
01393 relname,
01394 orderby_fld,
01395 branch_delim,
01396 values[0],
01397 current_branch,
01398 level + 1,
01399 serial,
01400 max_depth,
01401 show_branch,
01402 show_serial,
01403 per_query_ctx,
01404 attinmeta,
01405 tupstore);
01406
01407
01408 resetStringInfo(&branchstr);
01409 resetStringInfo(&chk_branchstr);
01410 resetStringInfo(&chk_current_key);
01411 }
01412
01413 xpfree(branchstr.data);
01414 xpfree(chk_branchstr.data);
01415 xpfree(chk_current_key.data);
01416 }
01417
01418 return tupstore;
01419 }
01420
01421
01422
01423
01424 static void
01425 validateConnectbyTupleDesc(TupleDesc tupdesc, bool show_branch, bool show_serial)
01426 {
01427 int serial_column = 0;
01428
01429 if (show_serial)
01430 serial_column = 1;
01431
01432
01433 if (show_branch)
01434 {
01435 if (tupdesc->natts != (CONNECTBY_NCOLS + serial_column))
01436 ereport(ERROR,
01437 (errcode(ERRCODE_SYNTAX_ERROR),
01438 errmsg("invalid return type"),
01439 errdetail("Query-specified return tuple has " \
01440 "wrong number of columns.")));
01441 }
01442 else
01443 {
01444 if (tupdesc->natts != CONNECTBY_NCOLS_NOBRANCH + serial_column)
01445 ereport(ERROR,
01446 (errcode(ERRCODE_SYNTAX_ERROR),
01447 errmsg("invalid return type"),
01448 errdetail("Query-specified return tuple has " \
01449 "wrong number of columns.")));
01450 }
01451
01452
01453 if (tupdesc->attrs[0]->atttypid != tupdesc->attrs[1]->atttypid)
01454 ereport(ERROR,
01455 (errcode(ERRCODE_SYNTAX_ERROR),
01456 errmsg("invalid return type"),
01457 errdetail("First two columns must be the same type.")));
01458
01459
01460 if (tupdesc->attrs[2]->atttypid != INT4OID)
01461 ereport(ERROR,
01462 (errcode(ERRCODE_SYNTAX_ERROR),
01463 errmsg("invalid return type"),
01464 errdetail("Third column must be type %s.",
01465 format_type_be(INT4OID))));
01466
01467
01468 if (show_branch && tupdesc->attrs[3]->atttypid != TEXTOID)
01469 ereport(ERROR,
01470 (errcode(ERRCODE_SYNTAX_ERROR),
01471 errmsg("invalid return type"),
01472 errdetail("Fourth column must be type %s.",
01473 format_type_be(TEXTOID))));
01474
01475
01476 if (show_branch && show_serial && tupdesc->attrs[4]->atttypid != INT4OID)
01477 elog(ERROR, "query-specified return tuple not valid for Connectby: "
01478 "fifth column must be type %s", format_type_be(INT4OID));
01479
01480
01481 if (!show_branch && show_serial && tupdesc->attrs[3]->atttypid != INT4OID)
01482 elog(ERROR, "query-specified return tuple not valid for Connectby: "
01483 "fourth column must be type %s", format_type_be(INT4OID));
01484
01485
01486 }
01487
01488
01489
01490
01491 static bool
01492 compatConnectbyTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
01493 {
01494 Oid ret_atttypid;
01495 Oid sql_atttypid;
01496
01497
01498 ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
01499 sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
01500 if (ret_atttypid != sql_atttypid)
01501 ereport(ERROR,
01502 (errcode(ERRCODE_SYNTAX_ERROR),
01503 errmsg("invalid return type"),
01504 errdetail("SQL key field datatype does " \
01505 "not match return key field datatype.")));
01506
01507
01508 ret_atttypid = ret_tupdesc->attrs[1]->atttypid;
01509 sql_atttypid = sql_tupdesc->attrs[1]->atttypid;
01510 if (ret_atttypid != sql_atttypid)
01511 ereport(ERROR,
01512 (errcode(ERRCODE_SYNTAX_ERROR),
01513 errmsg("invalid return type"),
01514 errdetail("SQL parent key field datatype does " \
01515 "not match return parent key field datatype.")));
01516
01517
01518 return true;
01519 }
01520
01521
01522
01523
01524 static bool
01525 compatCrosstabTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
01526 {
01527 int i;
01528 Form_pg_attribute ret_attr;
01529 Oid ret_atttypid;
01530 Form_pg_attribute sql_attr;
01531 Oid sql_atttypid;
01532
01533 if (ret_tupdesc->natts < 2 ||
01534 sql_tupdesc->natts < 3)
01535 return false;
01536
01537
01538 ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
01539 sql_atttypid = sql_tupdesc->attrs[0]->atttypid;
01540 if (ret_atttypid != sql_atttypid)
01541 ereport(ERROR,
01542 (errcode(ERRCODE_SYNTAX_ERROR),
01543 errmsg("invalid return type"),
01544 errdetail("SQL rowid datatype does not match " \
01545 "return rowid datatype.")));
01546
01547
01548
01549
01550
01551
01552 sql_attr = sql_tupdesc->attrs[2];
01553 for (i = 1; i < ret_tupdesc->natts; i++)
01554 {
01555 ret_attr = ret_tupdesc->attrs[i];
01556
01557 if (ret_attr->atttypid != sql_attr->atttypid)
01558 return false;
01559 }
01560
01561
01562 return true;
01563 }