00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "postgres.h"
00014
00015 #include "postgres_fdw.h"
00016
00017 #include "access/htup_details.h"
00018 #include "access/sysattr.h"
00019 #include "commands/defrem.h"
00020 #include "commands/explain.h"
00021 #include "commands/vacuum.h"
00022 #include "foreign/fdwapi.h"
00023 #include "funcapi.h"
00024 #include "miscadmin.h"
00025 #include "nodes/makefuncs.h"
00026 #include "nodes/nodeFuncs.h"
00027 #include "optimizer/cost.h"
00028 #include "optimizer/pathnode.h"
00029 #include "optimizer/paths.h"
00030 #include "optimizer/planmain.h"
00031 #include "optimizer/prep.h"
00032 #include "optimizer/restrictinfo.h"
00033 #include "optimizer/var.h"
00034 #include "parser/parsetree.h"
00035 #include "utils/builtins.h"
00036 #include "utils/guc.h"
00037 #include "utils/lsyscache.h"
00038 #include "utils/memutils.h"
00039
00040
00041 PG_MODULE_MAGIC;
00042
00043
00044 #define DEFAULT_FDW_STARTUP_COST 100.0
00045
00046
00047 #define DEFAULT_FDW_TUPLE_COST 0.01
00048
00049
00050
00051
00052
00053 typedef struct PgFdwRelationInfo
00054 {
00055
00056 List *remote_conds;
00057 List *local_conds;
00058
00059
00060 Bitmapset *attrs_used;
00061
00062
00063 QualCost local_conds_cost;
00064 Selectivity local_conds_sel;
00065
00066
00067 double rows;
00068 int width;
00069 Cost startup_cost;
00070 Cost total_cost;
00071
00072
00073 bool use_remote_estimate;
00074 Cost fdw_startup_cost;
00075 Cost fdw_tuple_cost;
00076
00077
00078 ForeignTable *table;
00079 ForeignServer *server;
00080 UserMapping *user;
00081 } PgFdwRelationInfo;
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096 enum FdwScanPrivateIndex
00097 {
00098
00099 FdwScanPrivateSelectSql,
00100
00101 FdwScanPrivateRetrievedAttrs
00102 };
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114 enum FdwModifyPrivateIndex
00115 {
00116
00117 FdwModifyPrivateUpdateSql,
00118
00119 FdwModifyPrivateTargetAttnums,
00120
00121 FdwModifyPrivateHasReturning,
00122
00123 FdwModifyPrivateRetrievedAttrs
00124 };
00125
00126
00127
00128
00129 typedef struct PgFdwScanState
00130 {
00131 Relation rel;
00132 AttInMetadata *attinmeta;
00133
00134
00135 char *query;
00136 List *retrieved_attrs;
00137
00138
00139 PGconn *conn;
00140 unsigned int cursor_number;
00141 bool cursor_exists;
00142 int numParams;
00143 FmgrInfo *param_flinfo;
00144 List *param_exprs;
00145 const char **param_values;
00146
00147
00148 HeapTuple *tuples;
00149 int num_tuples;
00150 int next_tuple;
00151
00152
00153 int fetch_ct_2;
00154 bool eof_reached;
00155
00156
00157 MemoryContext batch_cxt;
00158 MemoryContext temp_cxt;
00159 } PgFdwScanState;
00160
00161
00162
00163
00164 typedef struct PgFdwModifyState
00165 {
00166 Relation rel;
00167 AttInMetadata *attinmeta;
00168
00169
00170 PGconn *conn;
00171 char *p_name;
00172
00173
00174 char *query;
00175 List *target_attrs;
00176 bool has_returning;
00177 List *retrieved_attrs;
00178
00179
00180 AttrNumber ctidAttno;
00181 int p_nums;
00182 FmgrInfo *p_flinfo;
00183
00184
00185 MemoryContext temp_cxt;
00186 } PgFdwModifyState;
00187
00188
00189
00190
00191 typedef struct PgFdwAnalyzeState
00192 {
00193 Relation rel;
00194 AttInMetadata *attinmeta;
00195 List *retrieved_attrs;
00196
00197
00198 HeapTuple *rows;
00199 int targrows;
00200 int numrows;
00201
00202
00203 double samplerows;
00204 double rowstoskip;
00205 double rstate;
00206
00207
00208 MemoryContext anl_cxt;
00209 MemoryContext temp_cxt;
00210 } PgFdwAnalyzeState;
00211
00212
00213
00214
00215 typedef struct ConversionLocation
00216 {
00217 Relation rel;
00218 AttrNumber cur_attno;
00219 } ConversionLocation;
00220
00221
00222 typedef struct
00223 {
00224 Expr *current;
00225 List *already_used;
00226 } ec_member_foreign_arg;
00227
00228
00229
00230
00231 extern Datum postgres_fdw_handler(PG_FUNCTION_ARGS);
00232
00233 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
00234
00235
00236
00237
00238 static void postgresGetForeignRelSize(PlannerInfo *root,
00239 RelOptInfo *baserel,
00240 Oid foreigntableid);
00241 static void postgresGetForeignPaths(PlannerInfo *root,
00242 RelOptInfo *baserel,
00243 Oid foreigntableid);
00244 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
00245 RelOptInfo *baserel,
00246 Oid foreigntableid,
00247 ForeignPath *best_path,
00248 List *tlist,
00249 List *scan_clauses);
00250 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
00251 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
00252 static void postgresReScanForeignScan(ForeignScanState *node);
00253 static void postgresEndForeignScan(ForeignScanState *node);
00254 static void postgresAddForeignUpdateTargets(Query *parsetree,
00255 RangeTblEntry *target_rte,
00256 Relation target_relation);
00257 static List *postgresPlanForeignModify(PlannerInfo *root,
00258 ModifyTable *plan,
00259 Index resultRelation,
00260 int subplan_index);
00261 static void postgresBeginForeignModify(ModifyTableState *mtstate,
00262 ResultRelInfo *resultRelInfo,
00263 List *fdw_private,
00264 int subplan_index,
00265 int eflags);
00266 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
00267 ResultRelInfo *resultRelInfo,
00268 TupleTableSlot *slot,
00269 TupleTableSlot *planSlot);
00270 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
00271 ResultRelInfo *resultRelInfo,
00272 TupleTableSlot *slot,
00273 TupleTableSlot *planSlot);
00274 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
00275 ResultRelInfo *resultRelInfo,
00276 TupleTableSlot *slot,
00277 TupleTableSlot *planSlot);
00278 static void postgresEndForeignModify(EState *estate,
00279 ResultRelInfo *resultRelInfo);
00280 static void postgresExplainForeignScan(ForeignScanState *node,
00281 ExplainState *es);
00282 static void postgresExplainForeignModify(ModifyTableState *mtstate,
00283 ResultRelInfo *rinfo,
00284 List *fdw_private,
00285 int subplan_index,
00286 ExplainState *es);
00287 static bool postgresAnalyzeForeignTable(Relation relation,
00288 AcquireSampleRowsFunc *func,
00289 BlockNumber *totalpages);
00290
00291
00292
00293
00294 static void estimate_path_cost_size(PlannerInfo *root,
00295 RelOptInfo *baserel,
00296 List *join_conds,
00297 double *p_rows, int *p_width,
00298 Cost *p_startup_cost, Cost *p_total_cost);
00299 static void get_remote_estimate(const char *sql,
00300 PGconn *conn,
00301 double *rows,
00302 int *width,
00303 Cost *startup_cost,
00304 Cost *total_cost);
00305 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
00306 EquivalenceClass *ec, EquivalenceMember *em,
00307 void *arg);
00308 static void create_cursor(ForeignScanState *node);
00309 static void fetch_more_data(ForeignScanState *node);
00310 static void close_cursor(PGconn *conn, unsigned int cursor_number);
00311 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
00312 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
00313 ItemPointer tupleid,
00314 TupleTableSlot *slot);
00315 static void store_returning_result(PgFdwModifyState *fmstate,
00316 TupleTableSlot *slot, PGresult *res);
00317 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
00318 HeapTuple *rows, int targrows,
00319 double *totalrows,
00320 double *totaldeadrows);
00321 static void analyze_row_processor(PGresult *res, int row,
00322 PgFdwAnalyzeState *astate);
00323 static HeapTuple make_tuple_from_result_row(PGresult *res,
00324 int row,
00325 Relation rel,
00326 AttInMetadata *attinmeta,
00327 List *retrieved_attrs,
00328 MemoryContext temp_context);
00329 static void conversion_error_callback(void *arg);
00330
00331
00332
00333
00334
00335
00336 Datum
00337 postgres_fdw_handler(PG_FUNCTION_ARGS)
00338 {
00339 FdwRoutine *routine = makeNode(FdwRoutine);
00340
00341
00342 routine->GetForeignRelSize = postgresGetForeignRelSize;
00343 routine->GetForeignPaths = postgresGetForeignPaths;
00344 routine->GetForeignPlan = postgresGetForeignPlan;
00345 routine->BeginForeignScan = postgresBeginForeignScan;
00346 routine->IterateForeignScan = postgresIterateForeignScan;
00347 routine->ReScanForeignScan = postgresReScanForeignScan;
00348 routine->EndForeignScan = postgresEndForeignScan;
00349
00350
00351 routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
00352 routine->PlanForeignModify = postgresPlanForeignModify;
00353 routine->BeginForeignModify = postgresBeginForeignModify;
00354 routine->ExecForeignInsert = postgresExecForeignInsert;
00355 routine->ExecForeignUpdate = postgresExecForeignUpdate;
00356 routine->ExecForeignDelete = postgresExecForeignDelete;
00357 routine->EndForeignModify = postgresEndForeignModify;
00358
00359
00360 routine->ExplainForeignScan = postgresExplainForeignScan;
00361 routine->ExplainForeignModify = postgresExplainForeignModify;
00362
00363
00364 routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
00365
00366 PG_RETURN_POINTER(routine);
00367 }
00368
00369
00370
00371
00372
00373
00374
00375
00376 static void
00377 postgresGetForeignRelSize(PlannerInfo *root,
00378 RelOptInfo *baserel,
00379 Oid foreigntableid)
00380 {
00381 PgFdwRelationInfo *fpinfo;
00382 ListCell *lc;
00383
00384
00385
00386
00387
00388 fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
00389 baserel->fdw_private = (void *) fpinfo;
00390
00391
00392 fpinfo->table = GetForeignTable(foreigntableid);
00393 fpinfo->server = GetForeignServer(fpinfo->table->serverid);
00394
00395
00396
00397
00398
00399 fpinfo->use_remote_estimate = false;
00400 fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
00401 fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
00402
00403 foreach(lc, fpinfo->server->options)
00404 {
00405 DefElem *def = (DefElem *) lfirst(lc);
00406
00407 if (strcmp(def->defname, "use_remote_estimate") == 0)
00408 fpinfo->use_remote_estimate = defGetBoolean(def);
00409 else if (strcmp(def->defname, "fdw_startup_cost") == 0)
00410 fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
00411 else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
00412 fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
00413 }
00414 foreach(lc, fpinfo->table->options)
00415 {
00416 DefElem *def = (DefElem *) lfirst(lc);
00417
00418 if (strcmp(def->defname, "use_remote_estimate") == 0)
00419 {
00420 fpinfo->use_remote_estimate = defGetBoolean(def);
00421 break;
00422 }
00423 }
00424
00425
00426
00427
00428
00429
00430
00431 if (fpinfo->use_remote_estimate)
00432 {
00433 RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
00434 Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
00435
00436 fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
00437 }
00438 else
00439 fpinfo->user = NULL;
00440
00441
00442
00443
00444
00445 classifyConditions(root, baserel,
00446 &fpinfo->remote_conds, &fpinfo->local_conds);
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456 fpinfo->attrs_used = NULL;
00457 pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
00458 &fpinfo->attrs_used);
00459 foreach(lc, fpinfo->local_conds)
00460 {
00461 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
00462
00463 pull_varattnos((Node *) rinfo->clause, baserel->relid,
00464 &fpinfo->attrs_used);
00465 }
00466
00467
00468
00469
00470
00471
00472 fpinfo->local_conds_sel = clauselist_selectivity(root,
00473 fpinfo->local_conds,
00474 baserel->relid,
00475 JOIN_INNER,
00476 NULL);
00477
00478 cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
00479
00480
00481
00482
00483
00484
00485
00486
00487 if (fpinfo->use_remote_estimate)
00488 {
00489
00490
00491
00492
00493
00494 estimate_path_cost_size(root, baserel, NIL,
00495 &fpinfo->rows, &fpinfo->width,
00496 &fpinfo->startup_cost, &fpinfo->total_cost);
00497
00498
00499 baserel->rows = fpinfo->rows;
00500 baserel->width = fpinfo->width;
00501 }
00502 else
00503 {
00504
00505
00506
00507
00508
00509
00510
00511
00512
00513 if (baserel->pages == 0 && baserel->tuples == 0)
00514 {
00515 baserel->pages = 10;
00516 baserel->tuples =
00517 (10 * BLCKSZ) / (baserel->width + sizeof(HeapTupleHeaderData));
00518 }
00519
00520
00521 set_baserel_size_estimates(root, baserel);
00522
00523
00524 estimate_path_cost_size(root, baserel, NIL,
00525 &fpinfo->rows, &fpinfo->width,
00526 &fpinfo->startup_cost, &fpinfo->total_cost);
00527 }
00528 }
00529
00530
00531
00532
00533
00534 static void
00535 postgresGetForeignPaths(PlannerInfo *root,
00536 RelOptInfo *baserel,
00537 Oid foreigntableid)
00538 {
00539 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
00540 ForeignPath *path;
00541 Relids lateral_referencers;
00542 List *join_quals;
00543 Relids required_outer;
00544 double rows;
00545 int width;
00546 Cost startup_cost;
00547 Cost total_cost;
00548 ListCell *lc;
00549
00550
00551
00552
00553
00554
00555
00556
00557 path = create_foreignscan_path(root, baserel,
00558 fpinfo->rows,
00559 fpinfo->startup_cost,
00560 fpinfo->total_cost,
00561 NIL,
00562 NULL,
00563 NIL);
00564 add_path(baserel, (Path *) path);
00565
00566
00567
00568
00569
00570
00571 if (!fpinfo->use_remote_estimate)
00572 return;
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587 lateral_referencers = NULL;
00588 foreach(lc, root->lateral_info_list)
00589 {
00590 LateralJoinInfo *ljinfo = (LateralJoinInfo *) lfirst(lc);
00591
00592 if (bms_is_member(baserel->relid, ljinfo->lateral_lhs))
00593 lateral_referencers = bms_add_member(lateral_referencers,
00594 ljinfo->lateral_rhs);
00595 }
00596
00597
00598 foreach(lc, baserel->joininfo)
00599 {
00600 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
00601
00602
00603 if (!join_clause_is_movable_to(rinfo, baserel->relid))
00604 continue;
00605
00606
00607 if (bms_overlap(rinfo->clause_relids, lateral_referencers))
00608 continue;
00609
00610
00611 if (!is_foreign_expr(root, baserel, rinfo->clause))
00612 continue;
00613
00614
00615
00616
00617 join_quals = list_make1(rinfo);
00618 estimate_path_cost_size(root, baserel, join_quals,
00619 &rows, &width,
00620 &startup_cost, &total_cost);
00621
00622
00623 required_outer = bms_union(rinfo->clause_relids,
00624 baserel->lateral_relids);
00625
00626 required_outer = bms_del_member(required_outer, baserel->relid);
00627
00628 if (bms_is_empty(required_outer))
00629 required_outer = NULL;
00630
00631 path = create_foreignscan_path(root, baserel,
00632 rows,
00633 startup_cost,
00634 total_cost,
00635 NIL,
00636 required_outer,
00637 NIL);
00638 add_path(baserel, (Path *) path);
00639 }
00640
00641
00642
00643
00644
00645
00646 if (baserel->has_eclass_joins)
00647 {
00648
00649
00650
00651
00652
00653
00654
00655 ec_member_foreign_arg arg;
00656
00657 arg.already_used = NIL;
00658 for (;;)
00659 {
00660 List *clauses;
00661
00662
00663 arg.current = NULL;
00664 clauses = generate_implied_equalities_for_column(root,
00665 baserel,
00666 ec_member_matches_foreign,
00667 (void *) &arg,
00668 lateral_referencers);
00669
00670
00671 if (arg.current == NULL)
00672 {
00673 Assert(clauses == NIL);
00674 break;
00675 }
00676
00677
00678 foreach(lc, clauses)
00679 {
00680 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
00681
00682
00683 if (!join_clause_is_movable_to(rinfo, baserel->relid))
00684 continue;
00685
00686
00687 Assert(!bms_overlap(rinfo->clause_relids, lateral_referencers));
00688
00689
00690 if (!is_foreign_expr(root, baserel, rinfo->clause))
00691 continue;
00692
00693
00694
00695
00696 join_quals = list_make1(rinfo);
00697 estimate_path_cost_size(root, baserel, join_quals,
00698 &rows, &width,
00699 &startup_cost, &total_cost);
00700
00701
00702 required_outer = bms_union(rinfo->clause_relids,
00703 baserel->lateral_relids);
00704 required_outer = bms_del_member(required_outer, baserel->relid);
00705 if (bms_is_empty(required_outer))
00706 required_outer = NULL;
00707
00708 path = create_foreignscan_path(root, baserel,
00709 rows,
00710 startup_cost,
00711 total_cost,
00712 NIL,
00713 required_outer,
00714 NIL);
00715 add_path(baserel, (Path *) path);
00716 }
00717
00718
00719 arg.already_used = lappend(arg.already_used, arg.current);
00720 }
00721 }
00722 }
00723
00724
00725
00726
00727
00728 static ForeignScan *
00729 postgresGetForeignPlan(PlannerInfo *root,
00730 RelOptInfo *baserel,
00731 Oid foreigntableid,
00732 ForeignPath *best_path,
00733 List *tlist,
00734 List *scan_clauses)
00735 {
00736 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
00737 Index scan_relid = baserel->relid;
00738 List *fdw_private;
00739 List *remote_conds = NIL;
00740 List *local_exprs = NIL;
00741 List *params_list = NIL;
00742 List *retrieved_attrs;
00743 StringInfoData sql;
00744 ListCell *lc;
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761
00762
00763
00764
00765
00766 foreach(lc, scan_clauses)
00767 {
00768 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
00769
00770 Assert(IsA(rinfo, RestrictInfo));
00771
00772
00773 if (rinfo->pseudoconstant)
00774 continue;
00775
00776 if (list_member_ptr(fpinfo->remote_conds, rinfo))
00777 remote_conds = lappend(remote_conds, rinfo);
00778 else if (list_member_ptr(fpinfo->local_conds, rinfo))
00779 local_exprs = lappend(local_exprs, rinfo->clause);
00780 else
00781 {
00782 Assert(is_foreign_expr(root, baserel, rinfo->clause));
00783 remote_conds = lappend(remote_conds, rinfo);
00784 }
00785 }
00786
00787
00788
00789
00790
00791 initStringInfo(&sql);
00792 deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
00793 &retrieved_attrs);
00794 if (remote_conds)
00795 appendWhereClause(&sql, root, baserel, remote_conds,
00796 true, ¶ms_list);
00797
00798
00799
00800
00801
00802
00803
00804
00805
00806
00807
00808 if (baserel->relid == root->parse->resultRelation &&
00809 (root->parse->commandType == CMD_UPDATE ||
00810 root->parse->commandType == CMD_DELETE))
00811 {
00812
00813 appendStringInfo(&sql, " FOR UPDATE");
00814 }
00815 else
00816 {
00817 RowMarkClause *rc = get_parse_rowmark(root->parse, baserel->relid);
00818
00819 if (rc)
00820 {
00821
00822
00823
00824
00825
00826
00827
00828
00829
00830 switch (rc->strength)
00831 {
00832 case LCS_FORKEYSHARE:
00833 case LCS_FORSHARE:
00834 appendStringInfo(&sql, " FOR SHARE");
00835 break;
00836 case LCS_FORNOKEYUPDATE:
00837 case LCS_FORUPDATE:
00838 appendStringInfo(&sql, " FOR UPDATE");
00839 break;
00840 }
00841 }
00842 }
00843
00844
00845
00846
00847
00848 fdw_private = list_make2(makeString(sql.data),
00849 retrieved_attrs);
00850
00851
00852
00853
00854
00855
00856
00857
00858
00859 return make_foreignscan(tlist,
00860 local_exprs,
00861 scan_relid,
00862 params_list,
00863 fdw_private);
00864 }
00865
00866
00867
00868
00869
00870 static void
00871 postgresBeginForeignScan(ForeignScanState *node, int eflags)
00872 {
00873 ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
00874 EState *estate = node->ss.ps.state;
00875 PgFdwScanState *fsstate;
00876 RangeTblEntry *rte;
00877 Oid userid;
00878 ForeignTable *table;
00879 ForeignServer *server;
00880 UserMapping *user;
00881 int numParams;
00882 int i;
00883 ListCell *lc;
00884
00885
00886
00887
00888 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
00889 return;
00890
00891
00892
00893
00894 fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
00895 node->fdw_state = (void *) fsstate;
00896
00897
00898
00899
00900
00901 rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
00902 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
00903
00904
00905 fsstate->rel = node->ss.ss_currentRelation;
00906 table = GetForeignTable(RelationGetRelid(fsstate->rel));
00907 server = GetForeignServer(table->serverid);
00908 user = GetUserMapping(userid, server->serverid);
00909
00910
00911
00912
00913
00914 fsstate->conn = GetConnection(server, user, false);
00915
00916
00917 fsstate->cursor_number = GetCursorNumber(fsstate->conn);
00918 fsstate->cursor_exists = false;
00919
00920
00921 fsstate->query = strVal(list_nth(fsplan->fdw_private,
00922 FdwScanPrivateSelectSql));
00923 fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
00924 FdwScanPrivateRetrievedAttrs);
00925
00926
00927 fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
00928 "postgres_fdw tuple data",
00929 ALLOCSET_DEFAULT_MINSIZE,
00930 ALLOCSET_DEFAULT_INITSIZE,
00931 ALLOCSET_DEFAULT_MAXSIZE);
00932 fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
00933 "postgres_fdw temporary data",
00934 ALLOCSET_SMALL_MINSIZE,
00935 ALLOCSET_SMALL_INITSIZE,
00936 ALLOCSET_SMALL_MAXSIZE);
00937
00938
00939 fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel));
00940
00941
00942 numParams = list_length(fsplan->fdw_exprs);
00943 fsstate->numParams = numParams;
00944 fsstate->param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
00945
00946 i = 0;
00947 foreach(lc, fsplan->fdw_exprs)
00948 {
00949 Node *param_expr = (Node *) lfirst(lc);
00950 Oid typefnoid;
00951 bool isvarlena;
00952
00953 getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
00954 fmgr_info(typefnoid, &fsstate->param_flinfo[i]);
00955 i++;
00956 }
00957
00958
00959
00960
00961
00962
00963
00964
00965
00966 fsstate->param_exprs = (List *)
00967 ExecInitExpr((Expr *) fsplan->fdw_exprs,
00968 (PlanState *) node);
00969
00970
00971
00972
00973 if (numParams > 0)
00974 fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
00975 else
00976 fsstate->param_values = NULL;
00977 }
00978
00979
00980
00981
00982
00983
00984 static TupleTableSlot *
00985 postgresIterateForeignScan(ForeignScanState *node)
00986 {
00987 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
00988 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
00989
00990
00991
00992
00993
00994 if (!fsstate->cursor_exists)
00995 create_cursor(node);
00996
00997
00998
00999
01000 if (fsstate->next_tuple >= fsstate->num_tuples)
01001 {
01002
01003 if (!fsstate->eof_reached)
01004 fetch_more_data(node);
01005
01006 if (fsstate->next_tuple >= fsstate->num_tuples)
01007 return ExecClearTuple(slot);
01008 }
01009
01010
01011
01012
01013 ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
01014 slot,
01015 InvalidBuffer,
01016 false);
01017
01018 return slot;
01019 }
01020
01021
01022
01023
01024
01025 static void
01026 postgresReScanForeignScan(ForeignScanState *node)
01027 {
01028 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
01029 char sql[64];
01030 PGresult *res;
01031
01032
01033 if (!fsstate->cursor_exists)
01034 return;
01035
01036
01037
01038
01039
01040
01041
01042 if (node->ss.ps.chgParam != NULL)
01043 {
01044 fsstate->cursor_exists = false;
01045 snprintf(sql, sizeof(sql), "CLOSE c%u",
01046 fsstate->cursor_number);
01047 }
01048 else if (fsstate->fetch_ct_2 > 1)
01049 {
01050 snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
01051 fsstate->cursor_number);
01052 }
01053 else
01054 {
01055
01056 fsstate->next_tuple = 0;
01057 return;
01058 }
01059
01060
01061
01062
01063
01064 res = PQexec(fsstate->conn, sql);
01065 if (PQresultStatus(res) != PGRES_COMMAND_OK)
01066 pgfdw_report_error(ERROR, res, true, sql);
01067 PQclear(res);
01068
01069
01070 fsstate->tuples = NULL;
01071 fsstate->num_tuples = 0;
01072 fsstate->next_tuple = 0;
01073 fsstate->fetch_ct_2 = 0;
01074 fsstate->eof_reached = false;
01075 }
01076
01077
01078
01079
01080
01081 static void
01082 postgresEndForeignScan(ForeignScanState *node)
01083 {
01084 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
01085
01086
01087 if (fsstate == NULL)
01088 return;
01089
01090
01091 if (fsstate->cursor_exists)
01092 close_cursor(fsstate->conn, fsstate->cursor_number);
01093
01094
01095 ReleaseConnection(fsstate->conn);
01096 fsstate->conn = NULL;
01097
01098
01099 }
01100
01101
01102
01103
01104
01105 static void
01106 postgresAddForeignUpdateTargets(Query *parsetree,
01107 RangeTblEntry *target_rte,
01108 Relation target_relation)
01109 {
01110 Var *var;
01111 const char *attrname;
01112 TargetEntry *tle;
01113
01114
01115
01116
01117
01118
01119 var = makeVar(parsetree->resultRelation,
01120 SelfItemPointerAttributeNumber,
01121 TIDOID,
01122 -1,
01123 InvalidOid,
01124 0);
01125
01126
01127 attrname = "ctid";
01128
01129 tle = makeTargetEntry((Expr *) var,
01130 list_length(parsetree->targetList) + 1,
01131 pstrdup(attrname),
01132 true);
01133
01134
01135 parsetree->targetList = lappend(parsetree->targetList, tle);
01136 }
01137
01138
01139
01140
01141
01142
01143
01144
01145
01146
01147
01148
01149 static List *
01150 postgresPlanForeignModify(PlannerInfo *root,
01151 ModifyTable *plan,
01152 Index resultRelation,
01153 int subplan_index)
01154 {
01155 CmdType operation = plan->operation;
01156 RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
01157 Relation rel;
01158 StringInfoData sql;
01159 List *targetAttrs = NIL;
01160 List *returningList = NIL;
01161 List *retrieved_attrs = NIL;
01162
01163 initStringInfo(&sql);
01164
01165
01166
01167
01168
01169 rel = heap_open(rte->relid, NoLock);
01170
01171
01172
01173
01174
01175
01176
01177
01178 if (operation == CMD_INSERT)
01179 {
01180 TupleDesc tupdesc = RelationGetDescr(rel);
01181 int attnum;
01182
01183 for (attnum = 1; attnum <= tupdesc->natts; attnum++)
01184 {
01185 Form_pg_attribute attr = tupdesc->attrs[attnum - 1];
01186
01187 if (!attr->attisdropped)
01188 targetAttrs = lappend_int(targetAttrs, attnum);
01189 }
01190 }
01191 else if (operation == CMD_UPDATE)
01192 {
01193 Bitmapset *tmpset = bms_copy(rte->modifiedCols);
01194 AttrNumber col;
01195
01196 while ((col = bms_first_member(tmpset)) >= 0)
01197 {
01198 col += FirstLowInvalidHeapAttributeNumber;
01199 if (col <= InvalidAttrNumber)
01200 elog(ERROR, "system-column update is not supported");
01201 targetAttrs = lappend_int(targetAttrs, col);
01202 }
01203 }
01204
01205
01206
01207
01208 if (plan->returningLists)
01209 returningList = (List *) list_nth(plan->returningLists, subplan_index);
01210
01211
01212
01213
01214 switch (operation)
01215 {
01216 case CMD_INSERT:
01217 deparseInsertSql(&sql, root, resultRelation, rel,
01218 targetAttrs, returningList,
01219 &retrieved_attrs);
01220 break;
01221 case CMD_UPDATE:
01222 deparseUpdateSql(&sql, root, resultRelation, rel,
01223 targetAttrs, returningList,
01224 &retrieved_attrs);
01225 break;
01226 case CMD_DELETE:
01227 deparseDeleteSql(&sql, root, resultRelation, rel,
01228 returningList,
01229 &retrieved_attrs);
01230 break;
01231 default:
01232 elog(ERROR, "unexpected operation: %d", (int) operation);
01233 break;
01234 }
01235
01236 heap_close(rel, NoLock);
01237
01238
01239
01240
01241
01242 return list_make4(makeString(sql.data),
01243 targetAttrs,
01244 makeInteger((returningList != NIL)),
01245 retrieved_attrs);
01246 }
01247
01248
01249
01250
01251
01252 static void
01253 postgresBeginForeignModify(ModifyTableState *mtstate,
01254 ResultRelInfo *resultRelInfo,
01255 List *fdw_private,
01256 int subplan_index,
01257 int eflags)
01258 {
01259 PgFdwModifyState *fmstate;
01260 EState *estate = mtstate->ps.state;
01261 CmdType operation = mtstate->operation;
01262 Relation rel = resultRelInfo->ri_RelationDesc;
01263 RangeTblEntry *rte;
01264 Oid userid;
01265 ForeignTable *table;
01266 ForeignServer *server;
01267 UserMapping *user;
01268 AttrNumber n_params;
01269 Oid typefnoid;
01270 bool isvarlena;
01271 ListCell *lc;
01272
01273
01274
01275
01276
01277 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
01278 return;
01279
01280
01281 fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
01282 fmstate->rel = rel;
01283
01284
01285
01286
01287
01288 rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
01289 userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
01290
01291
01292 table = GetForeignTable(RelationGetRelid(rel));
01293 server = GetForeignServer(table->serverid);
01294 user = GetUserMapping(userid, server->serverid);
01295
01296
01297 fmstate->conn = GetConnection(server, user, true);
01298 fmstate->p_name = NULL;
01299
01300
01301 fmstate->query = strVal(list_nth(fdw_private,
01302 FdwModifyPrivateUpdateSql));
01303 fmstate->target_attrs = (List *) list_nth(fdw_private,
01304 FdwModifyPrivateTargetAttnums);
01305 fmstate->has_returning = intVal(list_nth(fdw_private,
01306 FdwModifyPrivateHasReturning));
01307 fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
01308 FdwModifyPrivateRetrievedAttrs);
01309
01310
01311 fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
01312 "postgres_fdw temporary data",
01313 ALLOCSET_SMALL_MINSIZE,
01314 ALLOCSET_SMALL_INITSIZE,
01315 ALLOCSET_SMALL_MAXSIZE);
01316
01317
01318 if (fmstate->has_returning)
01319 fmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
01320
01321
01322 n_params = list_length(fmstate->target_attrs) + 1;
01323 fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
01324 fmstate->p_nums = 0;
01325
01326 if (operation == CMD_UPDATE || operation == CMD_DELETE)
01327 {
01328
01329 Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
01330
01331 fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
01332 "ctid");
01333 if (!AttributeNumberIsValid(fmstate->ctidAttno))
01334 elog(ERROR, "could not find junk ctid column");
01335
01336
01337 getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
01338 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
01339 fmstate->p_nums++;
01340 }
01341
01342 if (operation == CMD_INSERT || operation == CMD_UPDATE)
01343 {
01344
01345 foreach(lc, fmstate->target_attrs)
01346 {
01347 int attnum = lfirst_int(lc);
01348 Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];
01349
01350 Assert(!attr->attisdropped);
01351
01352 getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
01353 fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
01354 fmstate->p_nums++;
01355 }
01356 }
01357
01358 Assert(fmstate->p_nums <= n_params);
01359
01360 resultRelInfo->ri_FdwState = fmstate;
01361 }
01362
01363
01364
01365
01366
01367 static TupleTableSlot *
01368 postgresExecForeignInsert(EState *estate,
01369 ResultRelInfo *resultRelInfo,
01370 TupleTableSlot *slot,
01371 TupleTableSlot *planSlot)
01372 {
01373 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
01374 const char **p_values;
01375 PGresult *res;
01376 int n_rows;
01377
01378
01379 if (!fmstate->p_name)
01380 prepare_foreign_modify(fmstate);
01381
01382
01383 p_values = convert_prep_stmt_params(fmstate, NULL, slot);
01384
01385
01386
01387
01388
01389
01390
01391 res = PQexecPrepared(fmstate->conn,
01392 fmstate->p_name,
01393 fmstate->p_nums,
01394 p_values,
01395 NULL,
01396 NULL,
01397 0);
01398 if (PQresultStatus(res) !=
01399 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
01400 pgfdw_report_error(ERROR, res, true, fmstate->query);
01401
01402
01403 if (fmstate->has_returning)
01404 {
01405 n_rows = PQntuples(res);
01406 if (n_rows > 0)
01407 store_returning_result(fmstate, slot, res);
01408 }
01409 else
01410 n_rows = atoi(PQcmdTuples(res));
01411
01412
01413 PQclear(res);
01414
01415 MemoryContextReset(fmstate->temp_cxt);
01416
01417
01418 return (n_rows > 0) ? slot : NULL;
01419 }
01420
01421
01422
01423
01424
01425 static TupleTableSlot *
01426 postgresExecForeignUpdate(EState *estate,
01427 ResultRelInfo *resultRelInfo,
01428 TupleTableSlot *slot,
01429 TupleTableSlot *planSlot)
01430 {
01431 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
01432 Datum datum;
01433 bool isNull;
01434 const char **p_values;
01435 PGresult *res;
01436 int n_rows;
01437
01438
01439 if (!fmstate->p_name)
01440 prepare_foreign_modify(fmstate);
01441
01442
01443 datum = ExecGetJunkAttribute(planSlot,
01444 fmstate->ctidAttno,
01445 &isNull);
01446
01447 if (isNull)
01448 elog(ERROR, "ctid is NULL");
01449
01450
01451 p_values = convert_prep_stmt_params(fmstate,
01452 (ItemPointer) DatumGetPointer(datum),
01453 slot);
01454
01455
01456
01457
01458
01459
01460
01461 res = PQexecPrepared(fmstate->conn,
01462 fmstate->p_name,
01463 fmstate->p_nums,
01464 p_values,
01465 NULL,
01466 NULL,
01467 0);
01468 if (PQresultStatus(res) !=
01469 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
01470 pgfdw_report_error(ERROR, res, true, fmstate->query);
01471
01472
01473 if (fmstate->has_returning)
01474 {
01475 n_rows = PQntuples(res);
01476 if (n_rows > 0)
01477 store_returning_result(fmstate, slot, res);
01478 }
01479 else
01480 n_rows = atoi(PQcmdTuples(res));
01481
01482
01483 PQclear(res);
01484
01485 MemoryContextReset(fmstate->temp_cxt);
01486
01487
01488 return (n_rows > 0) ? slot : NULL;
01489 }
01490
01491
01492
01493
01494
01495 static TupleTableSlot *
01496 postgresExecForeignDelete(EState *estate,
01497 ResultRelInfo *resultRelInfo,
01498 TupleTableSlot *slot,
01499 TupleTableSlot *planSlot)
01500 {
01501 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
01502 Datum datum;
01503 bool isNull;
01504 const char **p_values;
01505 PGresult *res;
01506 int n_rows;
01507
01508
01509 if (!fmstate->p_name)
01510 prepare_foreign_modify(fmstate);
01511
01512
01513 datum = ExecGetJunkAttribute(planSlot,
01514 fmstate->ctidAttno,
01515 &isNull);
01516
01517 if (isNull)
01518 elog(ERROR, "ctid is NULL");
01519
01520
01521 p_values = convert_prep_stmt_params(fmstate,
01522 (ItemPointer) DatumGetPointer(datum),
01523 NULL);
01524
01525
01526
01527
01528
01529
01530
01531 res = PQexecPrepared(fmstate->conn,
01532 fmstate->p_name,
01533 fmstate->p_nums,
01534 p_values,
01535 NULL,
01536 NULL,
01537 0);
01538 if (PQresultStatus(res) !=
01539 (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
01540 pgfdw_report_error(ERROR, res, true, fmstate->query);
01541
01542
01543 if (fmstate->has_returning)
01544 {
01545 n_rows = PQntuples(res);
01546 if (n_rows > 0)
01547 store_returning_result(fmstate, slot, res);
01548 }
01549 else
01550 n_rows = atoi(PQcmdTuples(res));
01551
01552
01553 PQclear(res);
01554
01555 MemoryContextReset(fmstate->temp_cxt);
01556
01557
01558 return (n_rows > 0) ? slot : NULL;
01559 }
01560
01561
01562
01563
01564
01565 static void
01566 postgresEndForeignModify(EState *estate,
01567 ResultRelInfo *resultRelInfo)
01568 {
01569 PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
01570
01571
01572 if (fmstate == NULL)
01573 return;
01574
01575
01576 if (fmstate->p_name)
01577 {
01578 char sql[64];
01579 PGresult *res;
01580
01581 snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
01582
01583
01584
01585
01586
01587 res = PQexec(fmstate->conn, sql);
01588 if (PQresultStatus(res) != PGRES_COMMAND_OK)
01589 pgfdw_report_error(ERROR, res, true, sql);
01590 PQclear(res);
01591 fmstate->p_name = NULL;
01592 }
01593
01594
01595 ReleaseConnection(fmstate->conn);
01596 fmstate->conn = NULL;
01597 }
01598
01599
01600
01601
01602
01603 static void
01604 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
01605 {
01606 List *fdw_private;
01607 char *sql;
01608
01609 if (es->verbose)
01610 {
01611 fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
01612 sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
01613 ExplainPropertyText("Remote SQL", sql, es);
01614 }
01615 }
01616
01617
01618
01619
01620
01621 static void
01622 postgresExplainForeignModify(ModifyTableState *mtstate,
01623 ResultRelInfo *rinfo,
01624 List *fdw_private,
01625 int subplan_index,
01626 ExplainState *es)
01627 {
01628 if (es->verbose)
01629 {
01630 char *sql = strVal(list_nth(fdw_private,
01631 FdwModifyPrivateUpdateSql));
01632
01633 ExplainPropertyText("Remote SQL", sql, es);
01634 }
01635 }
01636
01637
01638
01639
01640
01641
01642
01643
01644
01645 static void
01646 estimate_path_cost_size(PlannerInfo *root,
01647 RelOptInfo *baserel,
01648 List *join_conds,
01649 double *p_rows, int *p_width,
01650 Cost *p_startup_cost, Cost *p_total_cost)
01651 {
01652 PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
01653 double rows;
01654 double retrieved_rows;
01655 int width;
01656 Cost startup_cost;
01657 Cost total_cost;
01658 Cost run_cost;
01659 Cost cpu_per_tuple;
01660
01661
01662
01663
01664
01665
01666
01667
01668 if (fpinfo->use_remote_estimate)
01669 {
01670 StringInfoData sql;
01671 List *retrieved_attrs;
01672 PGconn *conn;
01673
01674
01675
01676
01677
01678
01679 initStringInfo(&sql);
01680 appendStringInfoString(&sql, "EXPLAIN ");
01681 deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
01682 &retrieved_attrs);
01683 if (fpinfo->remote_conds)
01684 appendWhereClause(&sql, root, baserel, fpinfo->remote_conds,
01685 true, NULL);
01686 if (join_conds)
01687 appendWhereClause(&sql, root, baserel, join_conds,
01688 (fpinfo->remote_conds == NIL), NULL);
01689
01690
01691 conn = GetConnection(fpinfo->server, fpinfo->user, false);
01692 get_remote_estimate(sql.data, conn, &rows, &width,
01693 &startup_cost, &total_cost);
01694 ReleaseConnection(conn);
01695
01696 retrieved_rows = rows;
01697
01698
01699 rows = clamp_row_est(rows * fpinfo->local_conds_sel);
01700
01701
01702 startup_cost += fpinfo->local_conds_cost.startup;
01703 total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
01704 }
01705 else
01706 {
01707
01708
01709
01710
01711 Assert(join_conds == NIL);
01712
01713
01714 rows = baserel->rows;
01715 width = baserel->width;
01716
01717
01718
01719
01720
01721 retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
01722 retrieved_rows = Min(retrieved_rows, baserel->tuples);
01723
01724
01725
01726
01727
01728
01729 startup_cost = 0;
01730 run_cost = 0;
01731 run_cost += seq_page_cost * baserel->pages;
01732
01733 startup_cost += baserel->baserestrictcost.startup;
01734 cpu_per_tuple = cpu_tuple_cost + baserel->baserestrictcost.per_tuple;
01735 run_cost += cpu_per_tuple * baserel->tuples;
01736
01737 total_cost = startup_cost + run_cost;
01738 }
01739
01740
01741
01742
01743
01744
01745
01746 startup_cost += fpinfo->fdw_startup_cost;
01747 total_cost += fpinfo->fdw_startup_cost;
01748 total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
01749 total_cost += cpu_tuple_cost * retrieved_rows;
01750
01751
01752 *p_rows = rows;
01753 *p_width = width;
01754 *p_startup_cost = startup_cost;
01755 *p_total_cost = total_cost;
01756 }
01757
01758
01759
01760
01761
01762 static void
01763 get_remote_estimate(const char *sql, PGconn *conn,
01764 double *rows, int *width,
01765 Cost *startup_cost, Cost *total_cost)
01766 {
01767 PGresult *volatile res = NULL;
01768
01769
01770 PG_TRY();
01771 {
01772 char *line;
01773 char *p;
01774 int n;
01775
01776
01777
01778
01779 res = PQexec(conn, sql);
01780 if (PQresultStatus(res) != PGRES_TUPLES_OK)
01781 pgfdw_report_error(ERROR, res, false, sql);
01782
01783
01784
01785
01786
01787
01788 line = PQgetvalue(res, 0, 0);
01789 p = strrchr(line, '(');
01790 if (p == NULL)
01791 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
01792 n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
01793 startup_cost, total_cost, rows, width);
01794 if (n != 4)
01795 elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
01796
01797 PQclear(res);
01798 res = NULL;
01799 }
01800 PG_CATCH();
01801 {
01802 if (res)
01803 PQclear(res);
01804 PG_RE_THROW();
01805 }
01806 PG_END_TRY();
01807 }
01808
01809
01810
01811
01812
01813
01814 static bool
01815 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
01816 EquivalenceClass *ec, EquivalenceMember *em,
01817 void *arg)
01818 {
01819 ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
01820 Expr *expr = em->em_expr;
01821
01822
01823
01824
01825
01826 if (state->current != NULL)
01827 return equal(expr, state->current);
01828
01829
01830
01831
01832 if (list_member(state->already_used, expr))
01833 return false;
01834
01835
01836 state->current = expr;
01837 return true;
01838 }
01839
01840
01841
01842
01843 static void
01844 create_cursor(ForeignScanState *node)
01845 {
01846 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
01847 ExprContext *econtext = node->ss.ps.ps_ExprContext;
01848 int numParams = fsstate->numParams;
01849 const char **values = fsstate->param_values;
01850 PGconn *conn = fsstate->conn;
01851 StringInfoData buf;
01852 PGresult *res;
01853
01854
01855
01856
01857
01858
01859 if (numParams > 0)
01860 {
01861 int nestlevel;
01862 MemoryContext oldcontext;
01863 int i;
01864 ListCell *lc;
01865
01866 oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
01867
01868 nestlevel = set_transmission_modes();
01869
01870 i = 0;
01871 foreach(lc, fsstate->param_exprs)
01872 {
01873 ExprState *expr_state = (ExprState *) lfirst(lc);
01874 Datum expr_value;
01875 bool isNull;
01876
01877
01878 expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);
01879
01880
01881
01882
01883
01884 if (isNull)
01885 values[i] = NULL;
01886 else
01887 values[i] = OutputFunctionCall(&fsstate->param_flinfo[i],
01888 expr_value);
01889 i++;
01890 }
01891
01892 reset_transmission_modes(nestlevel);
01893
01894 MemoryContextSwitchTo(oldcontext);
01895 }
01896
01897
01898 initStringInfo(&buf);
01899 appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
01900 fsstate->cursor_number, fsstate->query);
01901
01902
01903
01904
01905
01906
01907
01908
01909
01910
01911
01912 res = PQexecParams(conn, buf.data, numParams, NULL, values,
01913 NULL, NULL, 0);
01914 if (PQresultStatus(res) != PGRES_COMMAND_OK)
01915 pgfdw_report_error(ERROR, res, true, fsstate->query);
01916 PQclear(res);
01917
01918
01919 fsstate->cursor_exists = true;
01920 fsstate->tuples = NULL;
01921 fsstate->num_tuples = 0;
01922 fsstate->next_tuple = 0;
01923 fsstate->fetch_ct_2 = 0;
01924 fsstate->eof_reached = false;
01925
01926
01927 pfree(buf.data);
01928 }
01929
01930
01931
01932
01933 static void
01934 fetch_more_data(ForeignScanState *node)
01935 {
01936 PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
01937 PGresult *volatile res = NULL;
01938 MemoryContext oldcontext;
01939
01940
01941
01942
01943
01944 fsstate->tuples = NULL;
01945 MemoryContextReset(fsstate->batch_cxt);
01946 oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
01947
01948
01949 PG_TRY();
01950 {
01951 PGconn *conn = fsstate->conn;
01952 char sql[64];
01953 int fetch_size;
01954 int numrows;
01955 int i;
01956
01957
01958 fetch_size = 100;
01959
01960 snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
01961 fetch_size, fsstate->cursor_number);
01962
01963 res = PQexec(conn, sql);
01964
01965 if (PQresultStatus(res) != PGRES_TUPLES_OK)
01966 pgfdw_report_error(ERROR, res, false, fsstate->query);
01967
01968
01969 numrows = PQntuples(res);
01970 fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
01971 fsstate->num_tuples = numrows;
01972 fsstate->next_tuple = 0;
01973
01974 for (i = 0; i < numrows; i++)
01975 {
01976 fsstate->tuples[i] =
01977 make_tuple_from_result_row(res, i,
01978 fsstate->rel,
01979 fsstate->attinmeta,
01980 fsstate->retrieved_attrs,
01981 fsstate->temp_cxt);
01982 }
01983
01984
01985 if (fsstate->fetch_ct_2 < 2)
01986 fsstate->fetch_ct_2++;
01987
01988
01989 fsstate->eof_reached = (numrows < fetch_size);
01990
01991 PQclear(res);
01992 res = NULL;
01993 }
01994 PG_CATCH();
01995 {
01996 if (res)
01997 PQclear(res);
01998 PG_RE_THROW();
01999 }
02000 PG_END_TRY();
02001
02002 MemoryContextSwitchTo(oldcontext);
02003 }
02004
02005
02006
02007
02008
02009
02010
02011
02012
02013
02014
02015
02016
02017
02018
02019
02020
02021 int
02022 set_transmission_modes(void)
02023 {
02024 int nestlevel = NewGUCNestLevel();
02025
02026
02027
02028
02029
02030 if (DateStyle != USE_ISO_DATES)
02031 (void) set_config_option("datestyle", "ISO",
02032 PGC_USERSET, PGC_S_SESSION,
02033 GUC_ACTION_SAVE, true, 0);
02034 if (IntervalStyle != INTSTYLE_POSTGRES)
02035 (void) set_config_option("intervalstyle", "postgres",
02036 PGC_USERSET, PGC_S_SESSION,
02037 GUC_ACTION_SAVE, true, 0);
02038 if (extra_float_digits < 3)
02039 (void) set_config_option("extra_float_digits", "3",
02040 PGC_USERSET, PGC_S_SESSION,
02041 GUC_ACTION_SAVE, true, 0);
02042
02043 return nestlevel;
02044 }
02045
02046
02047
02048
02049 void
02050 reset_transmission_modes(int nestlevel)
02051 {
02052 AtEOXact_GUC(true, nestlevel);
02053 }
02054
02055
02056
02057
02058 static void
02059 close_cursor(PGconn *conn, unsigned int cursor_number)
02060 {
02061 char sql[64];
02062 PGresult *res;
02063
02064 snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
02065
02066
02067
02068
02069
02070 res = PQexec(conn, sql);
02071 if (PQresultStatus(res) != PGRES_COMMAND_OK)
02072 pgfdw_report_error(ERROR, res, true, sql);
02073 PQclear(res);
02074 }
02075
02076
02077
02078
02079
02080 static void
02081 prepare_foreign_modify(PgFdwModifyState *fmstate)
02082 {
02083 char prep_name[NAMEDATALEN];
02084 char *p_name;
02085 PGresult *res;
02086
02087
02088 snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
02089 GetPrepStmtNumber(fmstate->conn));
02090 p_name = pstrdup(prep_name);
02091
02092
02093
02094
02095
02096
02097
02098
02099
02100
02101
02102 res = PQprepare(fmstate->conn,
02103 p_name,
02104 fmstate->query,
02105 0,
02106 NULL);
02107
02108 if (PQresultStatus(res) != PGRES_COMMAND_OK)
02109 pgfdw_report_error(ERROR, res, true, fmstate->query);
02110 PQclear(res);
02111
02112
02113 fmstate->p_name = p_name;
02114 }
02115
02116
02117
02118
02119
02120
02121
02122
02123
02124
02125 static const char **
02126 convert_prep_stmt_params(PgFdwModifyState *fmstate,
02127 ItemPointer tupleid,
02128 TupleTableSlot *slot)
02129 {
02130 const char **p_values;
02131 int pindex = 0;
02132 MemoryContext oldcontext;
02133
02134 oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
02135
02136 p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
02137
02138
02139 if (tupleid != NULL)
02140 {
02141
02142 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
02143 PointerGetDatum(tupleid));
02144 pindex++;
02145 }
02146
02147
02148 if (slot != NULL && fmstate->target_attrs != NIL)
02149 {
02150 int nestlevel;
02151 ListCell *lc;
02152
02153 nestlevel = set_transmission_modes();
02154
02155 foreach(lc, fmstate->target_attrs)
02156 {
02157 int attnum = lfirst_int(lc);
02158 Datum value;
02159 bool isnull;
02160
02161 value = slot_getattr(slot, attnum, &isnull);
02162 if (isnull)
02163 p_values[pindex] = NULL;
02164 else
02165 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
02166 value);
02167 pindex++;
02168 }
02169
02170 reset_transmission_modes(nestlevel);
02171 }
02172
02173 Assert(pindex == fmstate->p_nums);
02174
02175 MemoryContextSwitchTo(oldcontext);
02176
02177 return p_values;
02178 }
02179
02180
02181
02182
02183
02184
02185
02186
02187 static void
02188 store_returning_result(PgFdwModifyState *fmstate,
02189 TupleTableSlot *slot, PGresult *res)
02190 {
02191
02192 PG_TRY();
02193 {
02194 HeapTuple newtup;
02195
02196 newtup = make_tuple_from_result_row(res, 0,
02197 fmstate->rel,
02198 fmstate->attinmeta,
02199 fmstate->retrieved_attrs,
02200 fmstate->temp_cxt);
02201
02202 ExecStoreTuple(newtup, slot, InvalidBuffer, true);
02203 }
02204 PG_CATCH();
02205 {
02206 if (res)
02207 PQclear(res);
02208 PG_RE_THROW();
02209 }
02210 PG_END_TRY();
02211 }
02212
02213
02214
02215
02216
02217 static bool
02218 postgresAnalyzeForeignTable(Relation relation,
02219 AcquireSampleRowsFunc *func,
02220 BlockNumber *totalpages)
02221 {
02222 ForeignTable *table;
02223 ForeignServer *server;
02224 UserMapping *user;
02225 PGconn *conn;
02226 StringInfoData sql;
02227 PGresult *volatile res = NULL;
02228
02229
02230 *func = postgresAcquireSampleRowsFunc;
02231
02232
02233
02234
02235
02236
02237
02238
02239
02240
02241
02242
02243 table = GetForeignTable(RelationGetRelid(relation));
02244 server = GetForeignServer(table->serverid);
02245 user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
02246 conn = GetConnection(server, user, false);
02247
02248
02249
02250
02251 initStringInfo(&sql);
02252 deparseAnalyzeSizeSql(&sql, relation);
02253
02254
02255 PG_TRY();
02256 {
02257 res = PQexec(conn, sql.data);
02258 if (PQresultStatus(res) != PGRES_TUPLES_OK)
02259 pgfdw_report_error(ERROR, res, false, sql.data);
02260
02261 if (PQntuples(res) != 1 || PQnfields(res) != 1)
02262 elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
02263 *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
02264
02265 PQclear(res);
02266 res = NULL;
02267 }
02268 PG_CATCH();
02269 {
02270 if (res)
02271 PQclear(res);
02272 PG_RE_THROW();
02273 }
02274 PG_END_TRY();
02275
02276 ReleaseConnection(conn);
02277
02278 return true;
02279 }
02280
02281
02282
02283
02284
02285
02286
02287
02288
02289
02290
02291
02292
02293
02294
02295
02296
02297 static int
02298 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
02299 HeapTuple *rows, int targrows,
02300 double *totalrows,
02301 double *totaldeadrows)
02302 {
02303 PgFdwAnalyzeState astate;
02304 ForeignTable *table;
02305 ForeignServer *server;
02306 UserMapping *user;
02307 PGconn *conn;
02308 unsigned int cursor_number;
02309 StringInfoData sql;
02310 PGresult *volatile res = NULL;
02311
02312
02313 astate.rel = relation;
02314 astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
02315
02316 astate.rows = rows;
02317 astate.targrows = targrows;
02318 astate.numrows = 0;
02319 astate.samplerows = 0;
02320 astate.rowstoskip = -1;
02321 astate.rstate = anl_init_selection_state(targrows);
02322
02323
02324 astate.anl_cxt = CurrentMemoryContext;
02325 astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
02326 "postgres_fdw temporary data",
02327 ALLOCSET_SMALL_MINSIZE,
02328 ALLOCSET_SMALL_INITSIZE,
02329 ALLOCSET_SMALL_MAXSIZE);
02330
02331
02332
02333
02334
02335 table = GetForeignTable(RelationGetRelid(relation));
02336 server = GetForeignServer(table->serverid);
02337 user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
02338 conn = GetConnection(server, user, false);
02339
02340
02341
02342
02343 cursor_number = GetCursorNumber(conn);
02344 initStringInfo(&sql);
02345 appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
02346 deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
02347
02348
02349 PG_TRY();
02350 {
02351 res = PQexec(conn, sql.data);
02352 if (PQresultStatus(res) != PGRES_COMMAND_OK)
02353 pgfdw_report_error(ERROR, res, false, sql.data);
02354 PQclear(res);
02355 res = NULL;
02356
02357
02358 for (;;)
02359 {
02360 char fetch_sql[64];
02361 int fetch_size;
02362 int numrows;
02363 int i;
02364
02365
02366 CHECK_FOR_INTERRUPTS();
02367
02368
02369
02370
02371
02372
02373
02374
02375 fetch_size = 100;
02376
02377
02378 snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
02379 fetch_size, cursor_number);
02380
02381 res = PQexec(conn, fetch_sql);
02382
02383 if (PQresultStatus(res) != PGRES_TUPLES_OK)
02384 pgfdw_report_error(ERROR, res, false, sql.data);
02385
02386
02387 numrows = PQntuples(res);
02388 for (i = 0; i < numrows; i++)
02389 analyze_row_processor(res, i, &astate);
02390
02391 PQclear(res);
02392 res = NULL;
02393
02394
02395 if (numrows < fetch_size)
02396 break;
02397 }
02398
02399
02400 close_cursor(conn, cursor_number);
02401 }
02402 PG_CATCH();
02403 {
02404 if (res)
02405 PQclear(res);
02406 PG_RE_THROW();
02407 }
02408 PG_END_TRY();
02409
02410 ReleaseConnection(conn);
02411
02412
02413 *totaldeadrows = 0.0;
02414
02415
02416 *totalrows = astate.samplerows;
02417
02418
02419
02420
02421 ereport(elevel,
02422 (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
02423 RelationGetRelationName(relation),
02424 astate.samplerows, astate.numrows)));
02425
02426 return astate.numrows;
02427 }
02428
02429
02430
02431
02432
02433
02434 static void
02435 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
02436 {
02437 int targrows = astate->targrows;
02438 int pos;
02439 MemoryContext oldcontext;
02440
02441
02442 astate->samplerows += 1;
02443
02444
02445
02446
02447
02448 if (astate->numrows < targrows)
02449 {
02450
02451 pos = astate->numrows++;
02452 }
02453 else
02454 {
02455
02456
02457
02458
02459
02460 if (astate->rowstoskip < 0)
02461 astate->rowstoskip = anl_get_next_S(astate->samplerows, targrows,
02462 &astate->rstate);
02463
02464 if (astate->rowstoskip <= 0)
02465 {
02466
02467 pos = (int) (targrows * anl_random_fract());
02468 Assert(pos >= 0 && pos < targrows);
02469 heap_freetuple(astate->rows[pos]);
02470 }
02471 else
02472 {
02473
02474 pos = -1;
02475 }
02476
02477 astate->rowstoskip -= 1;
02478 }
02479
02480 if (pos >= 0)
02481 {
02482
02483
02484
02485
02486 oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
02487
02488 astate->rows[pos] = make_tuple_from_result_row(res, row,
02489 astate->rel,
02490 astate->attinmeta,
02491 astate->retrieved_attrs,
02492 astate->temp_cxt);
02493
02494 MemoryContextSwitchTo(oldcontext);
02495 }
02496 }
02497
02498
02499
02500
02501
02502
02503
02504
02505
02506 static HeapTuple
02507 make_tuple_from_result_row(PGresult *res,
02508 int row,
02509 Relation rel,
02510 AttInMetadata *attinmeta,
02511 List *retrieved_attrs,
02512 MemoryContext temp_context)
02513 {
02514 HeapTuple tuple;
02515 TupleDesc tupdesc = RelationGetDescr(rel);
02516 Datum *values;
02517 bool *nulls;
02518 ItemPointer ctid = NULL;
02519 ConversionLocation errpos;
02520 ErrorContextCallback errcallback;
02521 MemoryContext oldcontext;
02522 ListCell *lc;
02523 int j;
02524
02525 Assert(row < PQntuples(res));
02526
02527
02528
02529
02530
02531
02532 oldcontext = MemoryContextSwitchTo(temp_context);
02533
02534 values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
02535 nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
02536
02537 memset(nulls, true, tupdesc->natts * sizeof(bool));
02538
02539
02540
02541
02542 errpos.rel = rel;
02543 errpos.cur_attno = 0;
02544 errcallback.callback = conversion_error_callback;
02545 errcallback.arg = (void *) &errpos;
02546 errcallback.previous = error_context_stack;
02547 error_context_stack = &errcallback;
02548
02549
02550
02551
02552 j = 0;
02553 foreach(lc, retrieved_attrs)
02554 {
02555 int i = lfirst_int(lc);
02556 char *valstr;
02557
02558
02559 if (PQgetisnull(res, row, j))
02560 valstr = NULL;
02561 else
02562 valstr = PQgetvalue(res, row, j);
02563
02564
02565 if (i > 0)
02566 {
02567
02568 Assert(i <= tupdesc->natts);
02569 nulls[i - 1] = (valstr == NULL);
02570
02571 errpos.cur_attno = i;
02572 values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
02573 valstr,
02574 attinmeta->attioparams[i - 1],
02575 attinmeta->atttypmods[i - 1]);
02576 errpos.cur_attno = 0;
02577 }
02578 else if (i == SelfItemPointerAttributeNumber)
02579 {
02580
02581 if (valstr != NULL)
02582 {
02583 Datum datum;
02584
02585 datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
02586 ctid = (ItemPointer) DatumGetPointer(datum);
02587 }
02588 }
02589
02590 j++;
02591 }
02592
02593
02594 error_context_stack = errcallback.previous;
02595
02596
02597
02598
02599
02600 if (j > 0 && j != PQnfields(res))
02601 elog(ERROR, "remote query result does not match the foreign table");
02602
02603
02604
02605
02606 MemoryContextSwitchTo(oldcontext);
02607
02608 tuple = heap_form_tuple(tupdesc, values, nulls);
02609
02610 if (ctid)
02611 tuple->t_self = *ctid;
02612
02613
02614 MemoryContextReset(temp_context);
02615
02616 return tuple;
02617 }
02618
02619
02620
02621
02622
02623 static void
02624 conversion_error_callback(void *arg)
02625 {
02626 ConversionLocation *errpos = (ConversionLocation *) arg;
02627 TupleDesc tupdesc = RelationGetDescr(errpos->rel);
02628
02629 if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
02630 errcontext("column \"%s\" of foreign table \"%s\"",
02631 NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname),
02632 RelationGetRelationName(errpos->rel));
02633 }