00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "postgres.h"
00016
00017 #include "access/htup_details.h"
00018 #include "access/xact.h"
00019 #include "catalog/pg_proc.h"
00020 #include "catalog/pg_type.h"
00021 #include "executor/functions.h"
00022 #include "funcapi.h"
00023 #include "miscadmin.h"
00024 #include "nodes/makefuncs.h"
00025 #include "nodes/nodeFuncs.h"
00026 #include "parser/parse_coerce.h"
00027 #include "parser/parse_func.h"
00028 #include "storage/proc.h"
00029 #include "tcop/utility.h"
00030 #include "utils/builtins.h"
00031 #include "utils/datum.h"
00032 #include "utils/lsyscache.h"
00033 #include "utils/memutils.h"
00034 #include "utils/snapmgr.h"
00035 #include "utils/syscache.h"
00036
00037
00038
00039
00040
00041 typedef struct
00042 {
00043 DestReceiver pub;
00044 Tuplestorestate *tstore;
00045 MemoryContext cxt;
00046 JunkFilter *filter;
00047 } DR_sqlfunction;
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058 typedef enum
00059 {
00060 F_EXEC_START, F_EXEC_RUN, F_EXEC_DONE
00061 } ExecStatus;
00062
00063 typedef struct execution_state
00064 {
00065 struct execution_state *next;
00066 ExecStatus status;
00067 bool setsResult;
00068 bool lazyEval;
00069 Node *stmt;
00070 QueryDesc *qd;
00071 } execution_state;
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091 typedef struct
00092 {
00093 char *fname;
00094 char *src;
00095
00096 SQLFunctionParseInfoPtr pinfo;
00097
00098 Oid rettype;
00099 int16 typlen;
00100 bool typbyval;
00101 bool returnsSet;
00102 bool returnsTuple;
00103 bool shutdown_reg;
00104 bool readonly_func;
00105 bool lazyEval;
00106
00107 ParamListInfo paramLI;
00108
00109 Tuplestorestate *tstore;
00110
00111 JunkFilter *junkFilter;
00112
00113
00114
00115
00116
00117
00118
00119 List *func_state;
00120
00121 MemoryContext fcontext;
00122
00123
00124 LocalTransactionId lxid;
00125 SubTransactionId subxid;
00126 } SQLFunctionCache;
00127
00128 typedef SQLFunctionCache *SQLFunctionCachePtr;
00129
00130
00131
00132
00133
00134
00135 typedef struct SQLFunctionParseInfo
00136 {
00137 char *fname;
00138 int nargs;
00139 Oid *argtypes;
00140 char **argnames;
00141
00142 Oid collation;
00143 } SQLFunctionParseInfo;
00144
00145
00146
00147 static Node *sql_fn_param_ref(ParseState *pstate, ParamRef *pref);
00148 static Node *sql_fn_post_column_ref(ParseState *pstate,
00149 ColumnRef *cref, Node *var);
00150 static Node *sql_fn_make_param(SQLFunctionParseInfoPtr pinfo,
00151 int paramno, int location);
00152 static Node *sql_fn_resolve_param_name(SQLFunctionParseInfoPtr pinfo,
00153 const char *paramname, int location);
00154 static List *init_execution_state(List *queryTree_list,
00155 SQLFunctionCachePtr fcache,
00156 bool lazyEvalOK);
00157 static void init_sql_fcache(FmgrInfo *finfo, Oid collation, bool lazyEvalOK);
00158 static void postquel_start(execution_state *es, SQLFunctionCachePtr fcache);
00159 static bool postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache);
00160 static void postquel_end(execution_state *es);
00161 static void postquel_sub_params(SQLFunctionCachePtr fcache,
00162 FunctionCallInfo fcinfo);
00163 static Datum postquel_get_single_result(TupleTableSlot *slot,
00164 FunctionCallInfo fcinfo,
00165 SQLFunctionCachePtr fcache,
00166 MemoryContext resultcontext);
00167 static void sql_exec_error_callback(void *arg);
00168 static void ShutdownSQLFunction(Datum arg);
00169 static void sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
00170 static void sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self);
00171 static void sqlfunction_shutdown(DestReceiver *self);
00172 static void sqlfunction_destroy(DestReceiver *self);
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183 SQLFunctionParseInfoPtr
00184 prepare_sql_fn_parse_info(HeapTuple procedureTuple,
00185 Node *call_expr,
00186 Oid inputCollation)
00187 {
00188 SQLFunctionParseInfoPtr pinfo;
00189 Form_pg_proc procedureStruct = (Form_pg_proc) GETSTRUCT(procedureTuple);
00190 int nargs;
00191
00192 pinfo = (SQLFunctionParseInfoPtr) palloc0(sizeof(SQLFunctionParseInfo));
00193
00194
00195 pinfo->fname = pstrdup(NameStr(procedureStruct->proname));
00196
00197
00198 pinfo->collation = inputCollation;
00199
00200
00201
00202
00203
00204 pinfo->nargs = nargs = procedureStruct->pronargs;
00205 if (nargs > 0)
00206 {
00207 Oid *argOidVect;
00208 int argnum;
00209
00210 argOidVect = (Oid *) palloc(nargs * sizeof(Oid));
00211 memcpy(argOidVect,
00212 procedureStruct->proargtypes.values,
00213 nargs * sizeof(Oid));
00214
00215 for (argnum = 0; argnum < nargs; argnum++)
00216 {
00217 Oid argtype = argOidVect[argnum];
00218
00219 if (IsPolymorphicType(argtype))
00220 {
00221 argtype = get_call_expr_argtype(call_expr, argnum);
00222 if (argtype == InvalidOid)
00223 ereport(ERROR,
00224 (errcode(ERRCODE_DATATYPE_MISMATCH),
00225 errmsg("could not determine actual type of argument declared %s",
00226 format_type_be(argOidVect[argnum]))));
00227 argOidVect[argnum] = argtype;
00228 }
00229 }
00230
00231 pinfo->argtypes = argOidVect;
00232 }
00233
00234
00235
00236
00237 if (nargs > 0)
00238 {
00239 Datum proargnames;
00240 Datum proargmodes;
00241 int n_arg_names;
00242 bool isNull;
00243
00244 proargnames = SysCacheGetAttr(PROCNAMEARGSNSP, procedureTuple,
00245 Anum_pg_proc_proargnames,
00246 &isNull);
00247 if (isNull)
00248 proargnames = PointerGetDatum(NULL);
00249
00250 proargmodes = SysCacheGetAttr(PROCNAMEARGSNSP, procedureTuple,
00251 Anum_pg_proc_proargmodes,
00252 &isNull);
00253 if (isNull)
00254 proargmodes = PointerGetDatum(NULL);
00255
00256 n_arg_names = get_func_input_arg_names(proargnames, proargmodes,
00257 &pinfo->argnames);
00258
00259
00260 if (n_arg_names < nargs)
00261 pinfo->argnames = NULL;
00262 }
00263 else
00264 pinfo->argnames = NULL;
00265
00266 return pinfo;
00267 }
00268
00269
00270
00271
00272 void
00273 sql_fn_parser_setup(struct ParseState *pstate, SQLFunctionParseInfoPtr pinfo)
00274 {
00275 pstate->p_pre_columnref_hook = NULL;
00276 pstate->p_post_columnref_hook = sql_fn_post_column_ref;
00277 pstate->p_paramref_hook = sql_fn_param_ref;
00278
00279 pstate->p_ref_hook_state = (void *) pinfo;
00280 }
00281
00282
00283
00284
00285 static Node *
00286 sql_fn_post_column_ref(ParseState *pstate, ColumnRef *cref, Node *var)
00287 {
00288 SQLFunctionParseInfoPtr pinfo = (SQLFunctionParseInfoPtr) pstate->p_ref_hook_state;
00289 int nnames;
00290 Node *field1;
00291 Node *subfield = NULL;
00292 const char *name1;
00293 const char *name2 = NULL;
00294 Node *param;
00295
00296
00297
00298
00299
00300
00301 if (var != NULL)
00302 return NULL;
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315 nnames = list_length(cref->fields);
00316
00317 if (nnames > 3)
00318 return NULL;
00319
00320 field1 = (Node *) linitial(cref->fields);
00321 Assert(IsA(field1, String));
00322 name1 = strVal(field1);
00323 if (nnames > 1)
00324 {
00325 subfield = (Node *) lsecond(cref->fields);
00326 Assert(IsA(subfield, String));
00327 name2 = strVal(subfield);
00328 }
00329
00330 if (nnames == 3)
00331 {
00332
00333
00334
00335
00336
00337 if (strcmp(name1, pinfo->fname) != 0)
00338 return NULL;
00339
00340 param = sql_fn_resolve_param_name(pinfo, name2, cref->location);
00341
00342 subfield = (Node *) lthird(cref->fields);
00343 Assert(IsA(subfield, String));
00344 }
00345 else if (nnames == 2 && strcmp(name1, pinfo->fname) == 0)
00346 {
00347
00348
00349
00350
00351 param = sql_fn_resolve_param_name(pinfo, name2, cref->location);
00352
00353 if (param)
00354 {
00355
00356 subfield = NULL;
00357 }
00358 else
00359 {
00360
00361 param = sql_fn_resolve_param_name(pinfo, name1, cref->location);
00362 }
00363 }
00364 else
00365 {
00366
00367 param = sql_fn_resolve_param_name(pinfo, name1, cref->location);
00368 }
00369
00370 if (!param)
00371 return NULL;
00372
00373 if (subfield)
00374 {
00375
00376
00377
00378
00379
00380 param = ParseFuncOrColumn(pstate,
00381 list_make1(subfield),
00382 list_make1(param),
00383 NIL, false, false, false,
00384 NULL, true, cref->location);
00385 }
00386
00387 return param;
00388 }
00389
00390
00391
00392
00393 static Node *
00394 sql_fn_param_ref(ParseState *pstate, ParamRef *pref)
00395 {
00396 SQLFunctionParseInfoPtr pinfo = (SQLFunctionParseInfoPtr) pstate->p_ref_hook_state;
00397 int paramno = pref->number;
00398
00399
00400 if (paramno <= 0 || paramno > pinfo->nargs)
00401 return NULL;
00402
00403 return sql_fn_make_param(pinfo, paramno, pref->location);
00404 }
00405
00406
00407
00408
00409 static Node *
00410 sql_fn_make_param(SQLFunctionParseInfoPtr pinfo,
00411 int paramno, int location)
00412 {
00413 Param *param;
00414
00415 param = makeNode(Param);
00416 param->paramkind = PARAM_EXTERN;
00417 param->paramid = paramno;
00418 param->paramtype = pinfo->argtypes[paramno - 1];
00419 param->paramtypmod = -1;
00420 param->paramcollid = get_typcollation(param->paramtype);
00421 param->location = location;
00422
00423
00424
00425
00426
00427
00428 if (OidIsValid(pinfo->collation) && OidIsValid(param->paramcollid))
00429 param->paramcollid = pinfo->collation;
00430
00431 return (Node *) param;
00432 }
00433
00434
00435
00436
00437
00438
00439 static Node *
00440 sql_fn_resolve_param_name(SQLFunctionParseInfoPtr pinfo,
00441 const char *paramname, int location)
00442 {
00443 int i;
00444
00445 if (pinfo->argnames == NULL)
00446 return NULL;
00447
00448 for (i = 0; i < pinfo->nargs; i++)
00449 {
00450 if (pinfo->argnames[i] && strcmp(pinfo->argnames[i], paramname) == 0)
00451 return sql_fn_make_param(pinfo, i + 1, location);
00452 }
00453
00454 return NULL;
00455 }
00456
00457
00458
00459
00460
00461
00462
00463 static List *
00464 init_execution_state(List *queryTree_list,
00465 SQLFunctionCachePtr fcache,
00466 bool lazyEvalOK)
00467 {
00468 List *eslist = NIL;
00469 execution_state *lasttages = NULL;
00470 ListCell *lc1;
00471
00472 foreach(lc1, queryTree_list)
00473 {
00474 List *qtlist = (List *) lfirst(lc1);
00475 execution_state *firstes = NULL;
00476 execution_state *preves = NULL;
00477 ListCell *lc2;
00478
00479 foreach(lc2, qtlist)
00480 {
00481 Query *queryTree = (Query *) lfirst(lc2);
00482 Node *stmt;
00483 execution_state *newes;
00484
00485 Assert(IsA(queryTree, Query));
00486
00487
00488 if (queryTree->commandType == CMD_UTILITY)
00489 stmt = queryTree->utilityStmt;
00490 else
00491 stmt = (Node *) pg_plan_query(queryTree, 0, NULL);
00492
00493
00494 if (IsA(stmt, TransactionStmt))
00495 ereport(ERROR,
00496 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00497
00498 errmsg("%s is not allowed in a SQL function",
00499 CreateCommandTag(stmt))));
00500
00501 if (fcache->readonly_func && !CommandIsReadOnly(stmt))
00502 ereport(ERROR,
00503 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00504
00505 errmsg("%s is not allowed in a non-volatile function",
00506 CreateCommandTag(stmt))));
00507
00508
00509 newes = (execution_state *) palloc(sizeof(execution_state));
00510 if (preves)
00511 preves->next = newes;
00512 else
00513 firstes = newes;
00514
00515 newes->next = NULL;
00516 newes->status = F_EXEC_START;
00517 newes->setsResult = false;
00518 newes->lazyEval = false;
00519 newes->stmt = stmt;
00520 newes->qd = NULL;
00521
00522 if (queryTree->canSetTag)
00523 lasttages = newes;
00524
00525 preves = newes;
00526 }
00527
00528 eslist = lappend(eslist, firstes);
00529 }
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546 if (lasttages && fcache->junkFilter)
00547 {
00548 lasttages->setsResult = true;
00549 if (lazyEvalOK &&
00550 IsA(lasttages->stmt, PlannedStmt))
00551 {
00552 PlannedStmt *ps = (PlannedStmt *) lasttages->stmt;
00553
00554 if (ps->commandType == CMD_SELECT &&
00555 ps->utilityStmt == NULL &&
00556 !ps->hasModifyingCTE)
00557 fcache->lazyEval = lasttages->lazyEval = true;
00558 }
00559 }
00560
00561 return eslist;
00562 }
00563
00564
00565
00566
00567 static void
00568 init_sql_fcache(FmgrInfo *finfo, Oid collation, bool lazyEvalOK)
00569 {
00570 Oid foid = finfo->fn_oid;
00571 MemoryContext fcontext;
00572 MemoryContext oldcontext;
00573 Oid rettype;
00574 HeapTuple procedureTuple;
00575 Form_pg_proc procedureStruct;
00576 SQLFunctionCachePtr fcache;
00577 List *raw_parsetree_list;
00578 List *queryTree_list;
00579 List *flat_query_list;
00580 ListCell *lc;
00581 Datum tmp;
00582 bool isNull;
00583
00584
00585
00586
00587
00588 fcontext = AllocSetContextCreate(finfo->fn_mcxt,
00589 "SQL function data",
00590 ALLOCSET_DEFAULT_MINSIZE,
00591 ALLOCSET_DEFAULT_INITSIZE,
00592 ALLOCSET_DEFAULT_MAXSIZE);
00593
00594 oldcontext = MemoryContextSwitchTo(fcontext);
00595
00596
00597
00598
00599
00600
00601 fcache = (SQLFunctionCachePtr) palloc0(sizeof(SQLFunctionCache));
00602 fcache->fcontext = fcontext;
00603 finfo->fn_extra = (void *) fcache;
00604
00605
00606
00607
00608 procedureTuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(foid));
00609 if (!HeapTupleIsValid(procedureTuple))
00610 elog(ERROR, "cache lookup failed for function %u", foid);
00611 procedureStruct = (Form_pg_proc) GETSTRUCT(procedureTuple);
00612
00613
00614
00615
00616 fcache->fname = pstrdup(NameStr(procedureStruct->proname));
00617
00618
00619
00620
00621
00622 rettype = procedureStruct->prorettype;
00623
00624 if (IsPolymorphicType(rettype))
00625 {
00626 rettype = get_fn_expr_rettype(finfo);
00627 if (rettype == InvalidOid)
00628 ereport(ERROR,
00629 (errcode(ERRCODE_DATATYPE_MISMATCH),
00630 errmsg("could not determine actual result type for function declared to return type %s",
00631 format_type_be(procedureStruct->prorettype))));
00632 }
00633
00634 fcache->rettype = rettype;
00635
00636
00637 get_typlenbyval(rettype, &fcache->typlen, &fcache->typbyval);
00638
00639
00640 fcache->returnsSet = procedureStruct->proretset;
00641
00642
00643 fcache->readonly_func =
00644 (procedureStruct->provolatile != PROVOLATILE_VOLATILE);
00645
00646
00647
00648
00649
00650
00651 fcache->pinfo = prepare_sql_fn_parse_info(procedureTuple,
00652 finfo->fn_expr,
00653 collation);
00654
00655
00656
00657
00658 tmp = SysCacheGetAttr(PROCOID,
00659 procedureTuple,
00660 Anum_pg_proc_prosrc,
00661 &isNull);
00662 if (isNull)
00663 elog(ERROR, "null prosrc for function %u", foid);
00664 fcache->src = TextDatumGetCString(tmp);
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681 raw_parsetree_list = pg_parse_query(fcache->src);
00682
00683 queryTree_list = NIL;
00684 flat_query_list = NIL;
00685 foreach(lc, raw_parsetree_list)
00686 {
00687 Node *parsetree = (Node *) lfirst(lc);
00688 List *queryTree_sublist;
00689
00690 queryTree_sublist = pg_analyze_and_rewrite_params(parsetree,
00691 fcache->src,
00692 (ParserSetupHook) sql_fn_parser_setup,
00693 fcache->pinfo);
00694 queryTree_list = lappend(queryTree_list, queryTree_sublist);
00695 flat_query_list = list_concat(flat_query_list,
00696 list_copy(queryTree_sublist));
00697 }
00698
00699
00700
00701
00702
00703
00704
00705
00706
00707
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718 fcache->returnsTuple = check_sql_fn_retval(foid,
00719 rettype,
00720 flat_query_list,
00721 NULL,
00722 &fcache->junkFilter);
00723
00724 if (fcache->returnsTuple)
00725 {
00726
00727 BlessTupleDesc(fcache->junkFilter->jf_resultSlot->tts_tupleDescriptor);
00728 }
00729 else if (fcache->returnsSet && type_is_rowtype(fcache->rettype))
00730 {
00731
00732
00733
00734
00735
00736
00737 lazyEvalOK = true;
00738 }
00739
00740
00741 fcache->func_state = init_execution_state(queryTree_list,
00742 fcache,
00743 lazyEvalOK);
00744
00745
00746 fcache->lxid = MyProc->lxid;
00747 fcache->subxid = GetCurrentSubTransactionId();
00748
00749 ReleaseSysCache(procedureTuple);
00750
00751 MemoryContextSwitchTo(oldcontext);
00752 }
00753
00754
00755 static void
00756 postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
00757 {
00758 DestReceiver *dest;
00759
00760 Assert(es->qd == NULL);
00761
00762
00763 Assert(ActiveSnapshotSet());
00764
00765
00766
00767
00768
00769 if (es->setsResult)
00770 {
00771 DR_sqlfunction *myState;
00772
00773 dest = CreateDestReceiver(DestSQLFunction);
00774
00775 myState = (DR_sqlfunction *) dest;
00776 Assert(myState->pub.mydest == DestSQLFunction);
00777 myState->tstore = fcache->tstore;
00778 myState->cxt = CurrentMemoryContext;
00779 myState->filter = fcache->junkFilter;
00780 }
00781 else
00782 dest = None_Receiver;
00783
00784 if (IsA(es->stmt, PlannedStmt))
00785 es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
00786 fcache->src,
00787 GetActiveSnapshot(),
00788 InvalidSnapshot,
00789 dest,
00790 fcache->paramLI, 0);
00791 else
00792 es->qd = CreateUtilityQueryDesc(es->stmt,
00793 fcache->src,
00794 GetActiveSnapshot(),
00795 dest,
00796 fcache->paramLI);
00797
00798
00799 if (es->qd->utilitystmt == NULL)
00800 {
00801
00802
00803
00804
00805
00806
00807
00808 int eflags;
00809
00810 if (es->lazyEval)
00811 eflags = EXEC_FLAG_SKIP_TRIGGERS;
00812 else
00813 eflags = 0;
00814 ExecutorStart(es->qd, eflags);
00815 }
00816
00817 es->status = F_EXEC_RUN;
00818 }
00819
00820
00821
00822 static bool
00823 postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache)
00824 {
00825 bool result;
00826
00827 if (es->qd->utilitystmt)
00828 {
00829
00830 ProcessUtility((es->qd->plannedstmt ?
00831 (Node *) es->qd->plannedstmt :
00832 es->qd->utilitystmt),
00833 fcache->src,
00834 PROCESS_UTILITY_QUERY,
00835 es->qd->params,
00836 es->qd->dest,
00837 NULL);
00838 result = true;
00839 }
00840 else
00841 {
00842
00843 long count = (es->lazyEval) ? 1L : 0L;
00844
00845 ExecutorRun(es->qd, ForwardScanDirection, count);
00846
00847
00848
00849
00850
00851 result = (count == 0L || es->qd->estate->es_processed == 0);
00852 }
00853
00854 return result;
00855 }
00856
00857
00858 static void
00859 postquel_end(execution_state *es)
00860 {
00861
00862 es->status = F_EXEC_DONE;
00863
00864
00865 if (es->qd->utilitystmt == NULL)
00866 {
00867 ExecutorFinish(es->qd);
00868 ExecutorEnd(es->qd);
00869 }
00870
00871 (*es->qd->dest->rDestroy) (es->qd->dest);
00872
00873 FreeQueryDesc(es->qd);
00874 es->qd = NULL;
00875 }
00876
00877
00878 static void
00879 postquel_sub_params(SQLFunctionCachePtr fcache,
00880 FunctionCallInfo fcinfo)
00881 {
00882 int nargs = fcinfo->nargs;
00883
00884 if (nargs > 0)
00885 {
00886 ParamListInfo paramLI;
00887 int i;
00888
00889 if (fcache->paramLI == NULL)
00890 {
00891
00892 paramLI = (ParamListInfo) palloc(sizeof(ParamListInfoData) +
00893 (nargs - 1) * sizeof(ParamExternData));
00894
00895 paramLI->paramFetch = NULL;
00896 paramLI->paramFetchArg = NULL;
00897 paramLI->parserSetup = NULL;
00898 paramLI->parserSetupArg = NULL;
00899 paramLI->numParams = nargs;
00900 fcache->paramLI = paramLI;
00901 }
00902 else
00903 {
00904 paramLI = fcache->paramLI;
00905 Assert(paramLI->numParams == nargs);
00906 }
00907
00908 for (i = 0; i < nargs; i++)
00909 {
00910 ParamExternData *prm = ¶mLI->params[i];
00911
00912 prm->value = fcinfo->arg[i];
00913 prm->isnull = fcinfo->argnull[i];
00914 prm->pflags = 0;
00915 prm->ptype = fcache->pinfo->argtypes[i];
00916 }
00917 }
00918 else
00919 fcache->paramLI = NULL;
00920 }
00921
00922
00923
00924
00925
00926
00927 static Datum
00928 postquel_get_single_result(TupleTableSlot *slot,
00929 FunctionCallInfo fcinfo,
00930 SQLFunctionCachePtr fcache,
00931 MemoryContext resultcontext)
00932 {
00933 Datum value;
00934 MemoryContext oldcontext;
00935
00936
00937
00938
00939
00940
00941
00942 oldcontext = MemoryContextSwitchTo(resultcontext);
00943
00944 if (fcache->returnsTuple)
00945 {
00946
00947 fcinfo->isnull = false;
00948 value = ExecFetchSlotTupleDatum(slot);
00949 value = datumCopy(value, fcache->typbyval, fcache->typlen);
00950 }
00951 else
00952 {
00953
00954
00955
00956
00957 value = slot_getattr(slot, 1, &(fcinfo->isnull));
00958
00959 if (!fcinfo->isnull)
00960 value = datumCopy(value, fcache->typbyval, fcache->typlen);
00961 }
00962
00963 MemoryContextSwitchTo(oldcontext);
00964
00965 return value;
00966 }
00967
00968
00969
00970
00971 Datum
00972 fmgr_sql(PG_FUNCTION_ARGS)
00973 {
00974 SQLFunctionCachePtr fcache;
00975 ErrorContextCallback sqlerrcontext;
00976 MemoryContext oldcontext;
00977 bool randomAccess;
00978 bool lazyEvalOK;
00979 bool is_first;
00980 bool pushed_snapshot;
00981 execution_state *es;
00982 TupleTableSlot *slot;
00983 Datum result;
00984 List *eslist;
00985 ListCell *eslc;
00986
00987
00988
00989
00990 sqlerrcontext.callback = sql_exec_error_callback;
00991 sqlerrcontext.arg = fcinfo->flinfo;
00992 sqlerrcontext.previous = error_context_stack;
00993 error_context_stack = &sqlerrcontext;
00994
00995
00996 if (fcinfo->flinfo->fn_retset)
00997 {
00998 ReturnSetInfo *rsi = (ReturnSetInfo *) fcinfo->resultinfo;
00999
01000
01001
01002
01003
01004
01005
01006 if (!rsi || !IsA(rsi, ReturnSetInfo) ||
01007 (rsi->allowedModes & SFRM_ValuePerCall) == 0 ||
01008 (rsi->allowedModes & SFRM_Materialize) == 0)
01009 ereport(ERROR,
01010 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01011 errmsg("set-valued function called in context that cannot accept a set")));
01012 randomAccess = rsi->allowedModes & SFRM_Materialize_Random;
01013 lazyEvalOK = !(rsi->allowedModes & SFRM_Materialize_Preferred);
01014 }
01015 else
01016 {
01017 randomAccess = false;
01018 lazyEvalOK = true;
01019 }
01020
01021
01022
01023
01024
01025 fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra;
01026
01027 if (fcache != NULL)
01028 {
01029 if (fcache->lxid != MyProc->lxid ||
01030 !SubTransactionIsActive(fcache->subxid))
01031 {
01032
01033 fcinfo->flinfo->fn_extra = NULL;
01034 MemoryContextDelete(fcache->fcontext);
01035 fcache = NULL;
01036 }
01037 }
01038
01039 if (fcache == NULL)
01040 {
01041 init_sql_fcache(fcinfo->flinfo, PG_GET_COLLATION(), lazyEvalOK);
01042 fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra;
01043 }
01044
01045
01046
01047
01048
01049
01050
01051
01052 oldcontext = MemoryContextSwitchTo(fcache->fcontext);
01053
01054
01055
01056
01057
01058 eslist = fcache->func_state;
01059 es = NULL;
01060 is_first = true;
01061 foreach(eslc, eslist)
01062 {
01063 es = (execution_state *) lfirst(eslc);
01064
01065 while (es && es->status == F_EXEC_DONE)
01066 {
01067 is_first = false;
01068 es = es->next;
01069 }
01070
01071 if (es)
01072 break;
01073 }
01074
01075
01076
01077
01078
01079 if (is_first && es && es->status == F_EXEC_START)
01080 postquel_sub_params(fcache, fcinfo);
01081
01082
01083
01084
01085
01086 if (!fcache->tstore)
01087 fcache->tstore = tuplestore_begin_heap(randomAccess, false, work_mem);
01088
01089
01090
01091
01092
01093
01094
01095
01096
01097
01098
01099
01100
01101
01102
01103
01104
01105
01106
01107 pushed_snapshot = false;
01108 while (es)
01109 {
01110 bool completed;
01111
01112 if (es->status == F_EXEC_START)
01113 {
01114
01115
01116
01117
01118
01119
01120 if (!fcache->readonly_func)
01121 {
01122 CommandCounterIncrement();
01123 if (!pushed_snapshot)
01124 {
01125 PushActiveSnapshot(GetTransactionSnapshot());
01126 pushed_snapshot = true;
01127 }
01128 else
01129 UpdateActiveSnapshotCommandId();
01130 }
01131
01132 postquel_start(es, fcache);
01133 }
01134 else if (!fcache->readonly_func && !pushed_snapshot)
01135 {
01136
01137 PushActiveSnapshot(es->qd->snapshot);
01138 pushed_snapshot = true;
01139 }
01140
01141 completed = postquel_getnext(es, fcache);
01142
01143
01144
01145
01146
01147
01148
01149
01150
01151 if (completed || !fcache->returnsSet)
01152 postquel_end(es);
01153
01154
01155
01156
01157
01158
01159
01160
01161
01162
01163 if (es->status != F_EXEC_DONE)
01164 break;
01165
01166
01167
01168
01169 es = es->next;
01170 while (!es)
01171 {
01172 eslc = lnext(eslc);
01173 if (!eslc)
01174 break;
01175
01176 es = (execution_state *) lfirst(eslc);
01177
01178
01179
01180
01181
01182
01183
01184 if (pushed_snapshot)
01185 {
01186 PopActiveSnapshot();
01187 pushed_snapshot = false;
01188 }
01189 }
01190 }
01191
01192
01193
01194
01195 if (fcache->returnsSet)
01196 {
01197 ReturnSetInfo *rsi = (ReturnSetInfo *) fcinfo->resultinfo;
01198
01199 if (es)
01200 {
01201
01202
01203
01204
01205 Assert(es->lazyEval);
01206
01207 Assert(fcache->junkFilter);
01208 slot = fcache->junkFilter->jf_resultSlot;
01209 if (!tuplestore_gettupleslot(fcache->tstore, true, false, slot))
01210 elog(ERROR, "failed to fetch lazy-eval tuple");
01211
01212 result = postquel_get_single_result(slot, fcinfo,
01213 fcache, oldcontext);
01214
01215
01216 tuplestore_clear(fcache->tstore);
01217
01218
01219
01220
01221 rsi->isDone = ExprMultipleResult;
01222
01223
01224
01225
01226
01227 if (!fcache->shutdown_reg)
01228 {
01229 RegisterExprContextCallback(rsi->econtext,
01230 ShutdownSQLFunction,
01231 PointerGetDatum(fcache));
01232 fcache->shutdown_reg = true;
01233 }
01234 }
01235 else if (fcache->lazyEval)
01236 {
01237
01238
01239
01240 tuplestore_clear(fcache->tstore);
01241
01242
01243
01244
01245 rsi->isDone = ExprEndResult;
01246
01247 fcinfo->isnull = true;
01248 result = (Datum) 0;
01249
01250
01251 if (fcache->shutdown_reg)
01252 {
01253 UnregisterExprContextCallback(rsi->econtext,
01254 ShutdownSQLFunction,
01255 PointerGetDatum(fcache));
01256 fcache->shutdown_reg = false;
01257 }
01258 }
01259 else
01260 {
01261
01262
01263
01264
01265
01266 rsi->returnMode = SFRM_Materialize;
01267 rsi->setResult = fcache->tstore;
01268 fcache->tstore = NULL;
01269
01270 if (fcache->junkFilter)
01271 rsi->setDesc = CreateTupleDescCopy(fcache->junkFilter->jf_cleanTupType);
01272
01273 fcinfo->isnull = true;
01274 result = (Datum) 0;
01275
01276
01277 if (fcache->shutdown_reg)
01278 {
01279 UnregisterExprContextCallback(rsi->econtext,
01280 ShutdownSQLFunction,
01281 PointerGetDatum(fcache));
01282 fcache->shutdown_reg = false;
01283 }
01284 }
01285 }
01286 else
01287 {
01288
01289
01290
01291 if (fcache->junkFilter)
01292 {
01293
01294 slot = fcache->junkFilter->jf_resultSlot;
01295 if (tuplestore_gettupleslot(fcache->tstore, true, false, slot))
01296 result = postquel_get_single_result(slot, fcinfo,
01297 fcache, oldcontext);
01298 else
01299 {
01300 fcinfo->isnull = true;
01301 result = (Datum) 0;
01302 }
01303 }
01304 else
01305 {
01306
01307 Assert(fcache->rettype == VOIDOID);
01308 fcinfo->isnull = true;
01309 result = (Datum) 0;
01310 }
01311
01312
01313 tuplestore_clear(fcache->tstore);
01314 }
01315
01316
01317 if (pushed_snapshot)
01318 PopActiveSnapshot();
01319
01320
01321
01322
01323
01324 if (es == NULL)
01325 {
01326 foreach(eslc, fcache->func_state)
01327 {
01328 es = (execution_state *) lfirst(eslc);
01329 while (es)
01330 {
01331 es->status = F_EXEC_START;
01332 es = es->next;
01333 }
01334 }
01335 }
01336
01337 error_context_stack = sqlerrcontext.previous;
01338
01339 MemoryContextSwitchTo(oldcontext);
01340
01341 return result;
01342 }
01343
01344
01345
01346
01347
01348 static void
01349 sql_exec_error_callback(void *arg)
01350 {
01351 FmgrInfo *flinfo = (FmgrInfo *) arg;
01352 SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) flinfo->fn_extra;
01353 int syntaxerrposition;
01354
01355
01356
01357
01358
01359 if (fcache == NULL || fcache->fname == NULL)
01360 return;
01361
01362
01363
01364
01365 syntaxerrposition = geterrposition();
01366 if (syntaxerrposition > 0 && fcache->src != NULL)
01367 {
01368 errposition(0);
01369 internalerrposition(syntaxerrposition);
01370 internalerrquery(fcache->src);
01371 }
01372
01373
01374
01375
01376
01377
01378
01379
01380 if (fcache->func_state)
01381 {
01382 execution_state *es;
01383 int query_num;
01384 ListCell *lc;
01385
01386 es = NULL;
01387 query_num = 1;
01388 foreach(lc, fcache->func_state)
01389 {
01390 es = (execution_state *) lfirst(lc);
01391 while (es)
01392 {
01393 if (es->qd)
01394 {
01395 errcontext("SQL function \"%s\" statement %d",
01396 fcache->fname, query_num);
01397 break;
01398 }
01399 es = es->next;
01400 }
01401 if (es)
01402 break;
01403 query_num++;
01404 }
01405 if (es == NULL)
01406 {
01407
01408
01409
01410
01411 errcontext("SQL function \"%s\"", fcache->fname);
01412 }
01413 }
01414 else
01415 {
01416
01417
01418
01419
01420
01421 errcontext("SQL function \"%s\" during startup", fcache->fname);
01422 }
01423 }
01424
01425
01426
01427
01428
01429
01430 static void
01431 ShutdownSQLFunction(Datum arg)
01432 {
01433 SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg);
01434 execution_state *es;
01435 ListCell *lc;
01436
01437 foreach(lc, fcache->func_state)
01438 {
01439 es = (execution_state *) lfirst(lc);
01440 while (es)
01441 {
01442
01443 if (es->status == F_EXEC_RUN)
01444 {
01445
01446 if (!fcache->readonly_func)
01447 PushActiveSnapshot(es->qd->snapshot);
01448
01449 postquel_end(es);
01450
01451 if (!fcache->readonly_func)
01452 PopActiveSnapshot();
01453 }
01454
01455
01456 es->status = F_EXEC_START;
01457 es = es->next;
01458 }
01459 }
01460
01461
01462 if (fcache->tstore)
01463 tuplestore_end(fcache->tstore);
01464 fcache->tstore = NULL;
01465
01466
01467 fcache->shutdown_reg = false;
01468 }
01469
01470
01471
01472
01473
01474
01475
01476
01477
01478
01479
01480
01481
01482
01483
01484
01485
01486
01487
01488
01489
01490
01491
01492
01493
01494
01495
01496
01497
01498
01499
01500
01501
01502
01503
01504
01505
01506
01507
01508
01509
01510
01511
01512
01513 bool
01514 check_sql_fn_retval(Oid func_id, Oid rettype, List *queryTreeList,
01515 bool *modifyTargetList,
01516 JunkFilter **junkFilter)
01517 {
01518 Query *parse;
01519 List **tlist_ptr;
01520 List *tlist;
01521 int tlistlen;
01522 char fn_typtype;
01523 Oid restype;
01524 ListCell *lc;
01525
01526 AssertArg(!IsPolymorphicType(rettype));
01527
01528 if (modifyTargetList)
01529 *modifyTargetList = false;
01530 if (junkFilter)
01531 *junkFilter = NULL;
01532
01533
01534
01535
01536
01537
01538 parse = NULL;
01539 foreach(lc, queryTreeList)
01540 {
01541 Query *q = (Query *) lfirst(lc);
01542
01543 if (q->canSetTag)
01544 parse = q;
01545 }
01546
01547
01548
01549
01550
01551
01552
01553
01554
01555
01556
01557
01558 if (parse &&
01559 parse->commandType == CMD_SELECT &&
01560 parse->utilityStmt == NULL)
01561 {
01562 tlist_ptr = &parse->targetList;
01563 tlist = parse->targetList;
01564 }
01565 else if (parse &&
01566 (parse->commandType == CMD_INSERT ||
01567 parse->commandType == CMD_UPDATE ||
01568 parse->commandType == CMD_DELETE) &&
01569 parse->returningList)
01570 {
01571 tlist_ptr = &parse->returningList;
01572 tlist = parse->returningList;
01573 }
01574 else
01575 {
01576
01577 if (rettype != VOIDOID)
01578 ereport(ERROR,
01579 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
01580 errmsg("return type mismatch in function declared to return %s",
01581 format_type_be(rettype)),
01582 errdetail("Function's final statement must be SELECT or INSERT/UPDATE/DELETE RETURNING.")));
01583 return false;
01584 }
01585
01586
01587
01588
01589
01590
01591
01592
01593
01594
01595
01596
01597 tlistlen = ExecCleanTargetListLength(tlist);
01598
01599 fn_typtype = get_typtype(rettype);
01600
01601 if (fn_typtype == TYPTYPE_BASE ||
01602 fn_typtype == TYPTYPE_DOMAIN ||
01603 fn_typtype == TYPTYPE_ENUM ||
01604 fn_typtype == TYPTYPE_RANGE ||
01605 rettype == VOIDOID)
01606 {
01607
01608
01609
01610
01611
01612 TargetEntry *tle;
01613
01614 if (tlistlen != 1)
01615 ereport(ERROR,
01616 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
01617 errmsg("return type mismatch in function declared to return %s",
01618 format_type_be(rettype)),
01619 errdetail("Final statement must return exactly one column.")));
01620
01621
01622 tle = (TargetEntry *) linitial(tlist);
01623 Assert(!tle->resjunk);
01624
01625 restype = exprType((Node *) tle->expr);
01626 if (!IsBinaryCoercible(restype, rettype))
01627 ereport(ERROR,
01628 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
01629 errmsg("return type mismatch in function declared to return %s",
01630 format_type_be(rettype)),
01631 errdetail("Actual return type is %s.",
01632 format_type_be(restype))));
01633 if (modifyTargetList && restype != rettype)
01634 {
01635 tle->expr = (Expr *) makeRelabelType(tle->expr,
01636 rettype,
01637 -1,
01638 get_typcollation(rettype),
01639 COERCE_IMPLICIT_CAST);
01640
01641 if (tle->ressortgroupref != 0 || parse->setOperations)
01642 *modifyTargetList = true;
01643 }
01644
01645
01646 if (junkFilter)
01647 *junkFilter = ExecInitJunkFilter(tlist, false, NULL);
01648 }
01649 else if (fn_typtype == TYPTYPE_COMPOSITE || rettype == RECORDOID)
01650 {
01651
01652 TupleDesc tupdesc;
01653 int tupnatts;
01654 int tuplogcols;
01655 int colindex;
01656 List *newtlist;
01657 List *junkattrs;
01658
01659
01660
01661
01662
01663
01664
01665
01666
01667
01668
01669
01670
01671 if (tlistlen == 1)
01672 {
01673 TargetEntry *tle = (TargetEntry *) linitial(tlist);
01674
01675 Assert(!tle->resjunk);
01676 restype = exprType((Node *) tle->expr);
01677 if (IsBinaryCoercible(restype, rettype))
01678 {
01679 if (modifyTargetList && restype != rettype)
01680 {
01681 tle->expr = (Expr *) makeRelabelType(tle->expr,
01682 rettype,
01683 -1,
01684 get_typcollation(rettype),
01685 COERCE_IMPLICIT_CAST);
01686
01687 if (tle->ressortgroupref != 0 || parse->setOperations)
01688 *modifyTargetList = true;
01689 }
01690
01691 if (junkFilter)
01692 *junkFilter = ExecInitJunkFilter(tlist, false, NULL);
01693 return false;
01694 }
01695 }
01696
01697
01698 if (get_func_result_type(func_id, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
01699 {
01700
01701
01702
01703
01704 if (junkFilter)
01705 *junkFilter = ExecInitJunkFilter(tlist, false, NULL);
01706 return true;
01707 }
01708 Assert(tupdesc);
01709
01710
01711
01712
01713
01714
01715
01716 tupnatts = tupdesc->natts;
01717 tuplogcols = 0;
01718 colindex = 0;
01719 newtlist = NIL;
01720 junkattrs = NIL;
01721
01722 foreach(lc, tlist)
01723 {
01724 TargetEntry *tle = (TargetEntry *) lfirst(lc);
01725 Form_pg_attribute attr;
01726 Oid tletype;
01727 Oid atttype;
01728
01729 if (tle->resjunk)
01730 {
01731 if (modifyTargetList)
01732 junkattrs = lappend(junkattrs, tle);
01733 continue;
01734 }
01735
01736 do
01737 {
01738 colindex++;
01739 if (colindex > tupnatts)
01740 ereport(ERROR,
01741 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
01742 errmsg("return type mismatch in function declared to return %s",
01743 format_type_be(rettype)),
01744 errdetail("Final statement returns too many columns.")));
01745 attr = tupdesc->attrs[colindex - 1];
01746 if (attr->attisdropped && modifyTargetList)
01747 {
01748 Expr *null_expr;
01749
01750
01751 null_expr = (Expr *) makeConst(INT4OID,
01752 -1,
01753 InvalidOid,
01754 sizeof(int32),
01755 (Datum) 0,
01756 true,
01757 true );
01758 newtlist = lappend(newtlist,
01759 makeTargetEntry(null_expr,
01760 colindex,
01761 NULL,
01762 false));
01763
01764 if (parse->setOperations)
01765 *modifyTargetList = true;
01766 }
01767 } while (attr->attisdropped);
01768 tuplogcols++;
01769
01770 tletype = exprType((Node *) tle->expr);
01771 atttype = attr->atttypid;
01772 if (!IsBinaryCoercible(tletype, atttype))
01773 ereport(ERROR,
01774 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
01775 errmsg("return type mismatch in function declared to return %s",
01776 format_type_be(rettype)),
01777 errdetail("Final statement returns %s instead of %s at column %d.",
01778 format_type_be(tletype),
01779 format_type_be(atttype),
01780 tuplogcols)));
01781 if (modifyTargetList)
01782 {
01783 if (tletype != atttype)
01784 {
01785 tle->expr = (Expr *) makeRelabelType(tle->expr,
01786 atttype,
01787 -1,
01788 get_typcollation(atttype),
01789 COERCE_IMPLICIT_CAST);
01790
01791 if (tle->ressortgroupref != 0 || parse->setOperations)
01792 *modifyTargetList = true;
01793 }
01794 tle->resno = colindex;
01795 newtlist = lappend(newtlist, tle);
01796 }
01797 }
01798
01799
01800 for (colindex++; colindex <= tupnatts; colindex++)
01801 {
01802 if (!tupdesc->attrs[colindex - 1]->attisdropped)
01803 ereport(ERROR,
01804 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
01805 errmsg("return type mismatch in function declared to return %s",
01806 format_type_be(rettype)),
01807 errdetail("Final statement returns too few columns.")));
01808 if (modifyTargetList)
01809 {
01810 Expr *null_expr;
01811
01812
01813 null_expr = (Expr *) makeConst(INT4OID,
01814 -1,
01815 InvalidOid,
01816 sizeof(int32),
01817 (Datum) 0,
01818 true,
01819 true );
01820 newtlist = lappend(newtlist,
01821 makeTargetEntry(null_expr,
01822 colindex,
01823 NULL,
01824 false));
01825
01826 if (parse->setOperations)
01827 *modifyTargetList = true;
01828 }
01829 }
01830
01831 if (modifyTargetList)
01832 {
01833
01834 foreach(lc, junkattrs)
01835 {
01836 TargetEntry *tle = (TargetEntry *) lfirst(lc);
01837
01838 tle->resno = colindex++;
01839 }
01840
01841 *tlist_ptr = list_concat(newtlist, junkattrs);
01842 }
01843
01844
01845 if (junkFilter)
01846 *junkFilter = ExecInitJunkFilterConversion(tlist,
01847 CreateTupleDescCopy(tupdesc),
01848 NULL);
01849
01850
01851 return true;
01852 }
01853 else
01854 ereport(ERROR,
01855 (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
01856 errmsg("return type %s is not supported for SQL functions",
01857 format_type_be(rettype))));
01858
01859 return false;
01860 }
01861
01862
01863
01864
01865
01866 DestReceiver *
01867 CreateSQLFunctionDestReceiver(void)
01868 {
01869 DR_sqlfunction *self = (DR_sqlfunction *) palloc0(sizeof(DR_sqlfunction));
01870
01871 self->pub.receiveSlot = sqlfunction_receive;
01872 self->pub.rStartup = sqlfunction_startup;
01873 self->pub.rShutdown = sqlfunction_shutdown;
01874 self->pub.rDestroy = sqlfunction_destroy;
01875 self->pub.mydest = DestSQLFunction;
01876
01877
01878
01879 return (DestReceiver *) self;
01880 }
01881
01882
01883
01884
01885 static void
01886 sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
01887 {
01888
01889 }
01890
01891
01892
01893
01894 static void
01895 sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self)
01896 {
01897 DR_sqlfunction *myState = (DR_sqlfunction *) self;
01898
01899
01900 slot = ExecFilterJunk(myState->filter, slot);
01901
01902
01903 tuplestore_puttupleslot(myState->tstore, slot);
01904 }
01905
01906
01907
01908
01909 static void
01910 sqlfunction_shutdown(DestReceiver *self)
01911 {
01912
01913 }
01914
01915
01916
01917
01918 static void
01919 sqlfunction_destroy(DestReceiver *self)
01920 {
01921 pfree(self);
01922 }