Header And Logo

PostgreSQL
| The world's most advanced open source database.

postgres_fdw.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * postgres_fdw.c
00004  *        Foreign-data wrapper for remote PostgreSQL servers
00005  *
00006  * Portions Copyright (c) 2012-2013, PostgreSQL Global Development Group
00007  *
00008  * IDENTIFICATION
00009  *        contrib/postgres_fdw/postgres_fdw.c
00010  *
00011  *-------------------------------------------------------------------------
00012  */
00013 #include "postgres.h"
00014 
00015 #include "postgres_fdw.h"
00016 
00017 #include "access/htup_details.h"
00018 #include "access/sysattr.h"
00019 #include "commands/defrem.h"
00020 #include "commands/explain.h"
00021 #include "commands/vacuum.h"
00022 #include "foreign/fdwapi.h"
00023 #include "funcapi.h"
00024 #include "miscadmin.h"
00025 #include "nodes/makefuncs.h"
00026 #include "nodes/nodeFuncs.h"
00027 #include "optimizer/cost.h"
00028 #include "optimizer/pathnode.h"
00029 #include "optimizer/paths.h"
00030 #include "optimizer/planmain.h"
00031 #include "optimizer/prep.h"
00032 #include "optimizer/restrictinfo.h"
00033 #include "optimizer/var.h"
00034 #include "parser/parsetree.h"
00035 #include "utils/builtins.h"
00036 #include "utils/guc.h"
00037 #include "utils/lsyscache.h"
00038 #include "utils/memutils.h"
00039 
00040 
00041 PG_MODULE_MAGIC;
00042 
00043 /* Default CPU cost to start up a foreign query. */
00044 #define DEFAULT_FDW_STARTUP_COST    100.0
00045 
00046 /* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
00047 #define DEFAULT_FDW_TUPLE_COST      0.01
00048 
00049 /*
00050  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
00051  * foreign table.  This information is collected by postgresGetForeignRelSize.
00052  */
00053 typedef struct PgFdwRelationInfo
00054 {
00055     /* baserestrictinfo clauses, broken down into safe and unsafe subsets. */
00056     List       *remote_conds;
00057     List       *local_conds;
00058 
00059     /* Bitmap of attr numbers we need to fetch from the remote server. */
00060     Bitmapset  *attrs_used;
00061 
00062     /* Cost and selectivity of local_conds. */
00063     QualCost    local_conds_cost;
00064     Selectivity local_conds_sel;
00065 
00066     /* Estimated size and cost for a scan with baserestrictinfo quals. */
00067     double      rows;
00068     int         width;
00069     Cost        startup_cost;
00070     Cost        total_cost;
00071 
00072     /* Options extracted from catalogs. */
00073     bool        use_remote_estimate;
00074     Cost        fdw_startup_cost;
00075     Cost        fdw_tuple_cost;
00076 
00077     /* Cached catalog information. */
00078     ForeignTable *table;
00079     ForeignServer *server;
00080     UserMapping *user;          /* only set in use_remote_estimate mode */
00081 } PgFdwRelationInfo;
00082 
00083 /*
00084  * Indexes of FDW-private information stored in fdw_private lists.
00085  *
00086  * We store various information in ForeignScan.fdw_private to pass it from
00087  * planner to executor.  Currently we store:
00088  *
00089  * 1) SELECT statement text to be sent to the remote server
00090  * 2) Integer list of attribute numbers retrieved by the SELECT
00091  *
00092  * These items are indexed with the enum FdwScanPrivateIndex, so an item
00093  * can be fetched with list_nth().  For example, to get the SELECT statement:
00094  *      sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
00095  */
00096 enum FdwScanPrivateIndex
00097 {
00098     /* SQL statement to execute remotely (as a String node) */
00099     FdwScanPrivateSelectSql,
00100     /* Integer list of attribute numbers retrieved by the SELECT */
00101     FdwScanPrivateRetrievedAttrs
00102 };
00103 
00104 /*
00105  * Similarly, this enum describes what's kept in the fdw_private list for
00106  * a ModifyTable node referencing a postgres_fdw foreign table.  We store:
00107  *
00108  * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
00109  * 2) Integer list of target attribute numbers for INSERT/UPDATE
00110  *    (NIL for a DELETE)
00111  * 3) Boolean flag showing if there's a RETURNING clause
00112  * 4) Integer list of attribute numbers retrieved by RETURNING, if any
00113  */
00114 enum FdwModifyPrivateIndex
00115 {
00116     /* SQL statement to execute remotely (as a String node) */
00117     FdwModifyPrivateUpdateSql,
00118     /* Integer list of target attribute numbers for INSERT/UPDATE */
00119     FdwModifyPrivateTargetAttnums,
00120     /* has-returning flag (as an integer Value node) */
00121     FdwModifyPrivateHasReturning,
00122     /* Integer list of attribute numbers retrieved by RETURNING */
00123     FdwModifyPrivateRetrievedAttrs
00124 };
00125 
00126 /*
00127  * Execution state of a foreign scan using postgres_fdw.
00128  */
00129 typedef struct PgFdwScanState
00130 {
00131     Relation    rel;            /* relcache entry for the foreign table */
00132     AttInMetadata *attinmeta;   /* attribute datatype conversion metadata */
00133 
00134     /* extracted fdw_private data */
00135     char       *query;          /* text of SELECT command */
00136     List       *retrieved_attrs; /* list of retrieved attribute numbers */
00137 
00138     /* for remote query execution */
00139     PGconn     *conn;           /* connection for the scan */
00140     unsigned int cursor_number; /* quasi-unique ID for my cursor */
00141     bool        cursor_exists;  /* have we created the cursor? */
00142     int         numParams;      /* number of parameters passed to query */
00143     FmgrInfo   *param_flinfo;   /* output conversion functions for them */
00144     List       *param_exprs;    /* executable expressions for param values */
00145     const char **param_values;  /* textual values of query parameters */
00146 
00147     /* for storing result tuples */
00148     HeapTuple  *tuples;         /* array of currently-retrieved tuples */
00149     int         num_tuples;     /* # of tuples in array */
00150     int         next_tuple;     /* index of next one to return */
00151 
00152     /* batch-level state, for optimizing rewinds and avoiding useless fetch */
00153     int         fetch_ct_2;     /* Min(# of fetches done, 2) */
00154     bool        eof_reached;    /* true if last fetch reached EOF */
00155 
00156     /* working memory contexts */
00157     MemoryContext batch_cxt;    /* context holding current batch of tuples */
00158     MemoryContext temp_cxt;     /* context for per-tuple temporary data */
00159 } PgFdwScanState;
00160 
00161 /*
00162  * Execution state of a foreign insert/update/delete operation.
00163  */
00164 typedef struct PgFdwModifyState
00165 {
00166     Relation    rel;            /* relcache entry for the foreign table */
00167     AttInMetadata *attinmeta;   /* attribute datatype conversion metadata */
00168 
00169     /* for remote query execution */
00170     PGconn     *conn;           /* connection for the scan */
00171     char       *p_name;         /* name of prepared statement, if created */
00172 
00173     /* extracted fdw_private data */
00174     char       *query;          /* text of INSERT/UPDATE/DELETE command */
00175     List       *target_attrs;   /* list of target attribute numbers */
00176     bool        has_returning;  /* is there a RETURNING clause? */
00177     List       *retrieved_attrs; /* attr numbers retrieved by RETURNING */
00178 
00179     /* info about parameters for prepared statement */
00180     AttrNumber  ctidAttno;      /* attnum of input resjunk ctid column */
00181     int         p_nums;         /* number of parameters to transmit */
00182     FmgrInfo   *p_flinfo;       /* output conversion functions for them */
00183 
00184     /* working memory context */
00185     MemoryContext temp_cxt;     /* context for per-tuple temporary data */
00186 } PgFdwModifyState;
00187 
00188 /*
00189  * Workspace for analyzing a foreign table.
00190  */
00191 typedef struct PgFdwAnalyzeState
00192 {
00193     Relation    rel;            /* relcache entry for the foreign table */
00194     AttInMetadata *attinmeta;   /* attribute datatype conversion metadata */
00195     List       *retrieved_attrs; /* attr numbers retrieved by query */
00196 
00197     /* collected sample rows */
00198     HeapTuple  *rows;           /* array of size targrows */
00199     int         targrows;       /* target # of sample rows */
00200     int         numrows;        /* # of sample rows collected */
00201 
00202     /* for random sampling */
00203     double      samplerows;     /* # of rows fetched */
00204     double      rowstoskip;     /* # of rows to skip before next sample */
00205     double      rstate;         /* random state */
00206 
00207     /* working memory contexts */
00208     MemoryContext anl_cxt;      /* context for per-analyze lifespan data */
00209     MemoryContext temp_cxt;     /* context for per-tuple temporary data */
00210 } PgFdwAnalyzeState;
00211 
00212 /*
00213  * Identify the attribute where data conversion fails.
00214  */
00215 typedef struct ConversionLocation
00216 {
00217     Relation    rel;            /* foreign table's relcache entry */
00218     AttrNumber  cur_attno;      /* attribute number being processed, or 0 */
00219 } ConversionLocation;
00220 
00221 /* Callback argument for ec_member_matches_foreign */
00222 typedef struct
00223 {
00224     Expr       *current;        /* current expr, or NULL if not yet found */
00225     List       *already_used;   /* expressions already dealt with */
00226 } ec_member_foreign_arg;
00227 
00228 /*
00229  * SQL functions
00230  */
00231 extern Datum postgres_fdw_handler(PG_FUNCTION_ARGS);
00232 
00233 PG_FUNCTION_INFO_V1(postgres_fdw_handler);
00234 
00235 /*
00236  * FDW callback routines
00237  */
00238 static void postgresGetForeignRelSize(PlannerInfo *root,
00239                           RelOptInfo *baserel,
00240                           Oid foreigntableid);
00241 static void postgresGetForeignPaths(PlannerInfo *root,
00242                         RelOptInfo *baserel,
00243                         Oid foreigntableid);
00244 static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
00245                        RelOptInfo *baserel,
00246                        Oid foreigntableid,
00247                        ForeignPath *best_path,
00248                        List *tlist,
00249                        List *scan_clauses);
00250 static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
00251 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
00252 static void postgresReScanForeignScan(ForeignScanState *node);
00253 static void postgresEndForeignScan(ForeignScanState *node);
00254 static void postgresAddForeignUpdateTargets(Query *parsetree,
00255                                 RangeTblEntry *target_rte,
00256                                 Relation target_relation);
00257 static List *postgresPlanForeignModify(PlannerInfo *root,
00258                           ModifyTable *plan,
00259                           Index resultRelation,
00260                           int subplan_index);
00261 static void postgresBeginForeignModify(ModifyTableState *mtstate,
00262                            ResultRelInfo *resultRelInfo,
00263                            List *fdw_private,
00264                            int subplan_index,
00265                            int eflags);
00266 static TupleTableSlot *postgresExecForeignInsert(EState *estate,
00267                           ResultRelInfo *resultRelInfo,
00268                           TupleTableSlot *slot,
00269                           TupleTableSlot *planSlot);
00270 static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
00271                           ResultRelInfo *resultRelInfo,
00272                           TupleTableSlot *slot,
00273                           TupleTableSlot *planSlot);
00274 static TupleTableSlot *postgresExecForeignDelete(EState *estate,
00275                           ResultRelInfo *resultRelInfo,
00276                           TupleTableSlot *slot,
00277                           TupleTableSlot *planSlot);
00278 static void postgresEndForeignModify(EState *estate,
00279                          ResultRelInfo *resultRelInfo);
00280 static void postgresExplainForeignScan(ForeignScanState *node,
00281                            ExplainState *es);
00282 static void postgresExplainForeignModify(ModifyTableState *mtstate,
00283                              ResultRelInfo *rinfo,
00284                              List *fdw_private,
00285                              int subplan_index,
00286                              ExplainState *es);
00287 static bool postgresAnalyzeForeignTable(Relation relation,
00288                             AcquireSampleRowsFunc *func,
00289                             BlockNumber *totalpages);
00290 
00291 /*
00292  * Helper functions
00293  */
00294 static void estimate_path_cost_size(PlannerInfo *root,
00295                         RelOptInfo *baserel,
00296                         List *join_conds,
00297                         double *p_rows, int *p_width,
00298                         Cost *p_startup_cost, Cost *p_total_cost);
00299 static void get_remote_estimate(const char *sql,
00300                     PGconn *conn,
00301                     double *rows,
00302                     int *width,
00303                     Cost *startup_cost,
00304                     Cost *total_cost);
00305 static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
00306                           EquivalenceClass *ec, EquivalenceMember *em,
00307                           void *arg);
00308 static void create_cursor(ForeignScanState *node);
00309 static void fetch_more_data(ForeignScanState *node);
00310 static void close_cursor(PGconn *conn, unsigned int cursor_number);
00311 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
00312 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
00313                          ItemPointer tupleid,
00314                          TupleTableSlot *slot);
00315 static void store_returning_result(PgFdwModifyState *fmstate,
00316                        TupleTableSlot *slot, PGresult *res);
00317 static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
00318                               HeapTuple *rows, int targrows,
00319                               double *totalrows,
00320                               double *totaldeadrows);
00321 static void analyze_row_processor(PGresult *res, int row,
00322                       PgFdwAnalyzeState *astate);
00323 static HeapTuple make_tuple_from_result_row(PGresult *res,
00324                            int row,
00325                            Relation rel,
00326                            AttInMetadata *attinmeta,
00327                            List *retrieved_attrs,
00328                            MemoryContext temp_context);
00329 static void conversion_error_callback(void *arg);
00330 
00331 
00332 /*
00333  * Foreign-data wrapper handler function: return a struct with pointers
00334  * to my callback routines.
00335  */
00336 Datum
00337 postgres_fdw_handler(PG_FUNCTION_ARGS)
00338 {
00339     FdwRoutine *routine = makeNode(FdwRoutine);
00340 
00341     /* Functions for scanning foreign tables */
00342     routine->GetForeignRelSize = postgresGetForeignRelSize;
00343     routine->GetForeignPaths = postgresGetForeignPaths;
00344     routine->GetForeignPlan = postgresGetForeignPlan;
00345     routine->BeginForeignScan = postgresBeginForeignScan;
00346     routine->IterateForeignScan = postgresIterateForeignScan;
00347     routine->ReScanForeignScan = postgresReScanForeignScan;
00348     routine->EndForeignScan = postgresEndForeignScan;
00349 
00350     /* Functions for updating foreign tables */
00351     routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
00352     routine->PlanForeignModify = postgresPlanForeignModify;
00353     routine->BeginForeignModify = postgresBeginForeignModify;
00354     routine->ExecForeignInsert = postgresExecForeignInsert;
00355     routine->ExecForeignUpdate = postgresExecForeignUpdate;
00356     routine->ExecForeignDelete = postgresExecForeignDelete;
00357     routine->EndForeignModify = postgresEndForeignModify;
00358 
00359     /* Support functions for EXPLAIN */
00360     routine->ExplainForeignScan = postgresExplainForeignScan;
00361     routine->ExplainForeignModify = postgresExplainForeignModify;
00362 
00363     /* Support functions for ANALYZE */
00364     routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
00365 
00366     PG_RETURN_POINTER(routine);
00367 }
00368 
00369 /*
00370  * postgresGetForeignRelSize
00371  *      Estimate # of rows and width of the result of the scan
00372  *
00373  * We should consider the effect of all baserestrictinfo clauses here, but
00374  * not any join clauses.
00375  */
00376 static void
00377 postgresGetForeignRelSize(PlannerInfo *root,
00378                           RelOptInfo *baserel,
00379                           Oid foreigntableid)
00380 {
00381     PgFdwRelationInfo *fpinfo;
00382     ListCell   *lc;
00383 
00384     /*
00385      * We use PgFdwRelationInfo to pass various information to subsequent
00386      * functions.
00387      */
00388     fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
00389     baserel->fdw_private = (void *) fpinfo;
00390 
00391     /* Look up foreign-table catalog info. */
00392     fpinfo->table = GetForeignTable(foreigntableid);
00393     fpinfo->server = GetForeignServer(fpinfo->table->serverid);
00394 
00395     /*
00396      * Extract user-settable option values.  Note that per-table setting of
00397      * use_remote_estimate overrides per-server setting.
00398      */
00399     fpinfo->use_remote_estimate = false;
00400     fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
00401     fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
00402 
00403     foreach(lc, fpinfo->server->options)
00404     {
00405         DefElem    *def = (DefElem *) lfirst(lc);
00406 
00407         if (strcmp(def->defname, "use_remote_estimate") == 0)
00408             fpinfo->use_remote_estimate = defGetBoolean(def);
00409         else if (strcmp(def->defname, "fdw_startup_cost") == 0)
00410             fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
00411         else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
00412             fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
00413     }
00414     foreach(lc, fpinfo->table->options)
00415     {
00416         DefElem    *def = (DefElem *) lfirst(lc);
00417 
00418         if (strcmp(def->defname, "use_remote_estimate") == 0)
00419         {
00420             fpinfo->use_remote_estimate = defGetBoolean(def);
00421             break;              /* only need the one value */
00422         }
00423     }
00424 
00425     /*
00426      * If the table or the server is configured to use remote estimates,
00427      * identify which user to do remote access as during planning.  This
00428      * should match what ExecCheckRTEPerms() does.  If we fail due to lack of
00429      * permissions, the query would have failed at runtime anyway.
00430      */
00431     if (fpinfo->use_remote_estimate)
00432     {
00433         RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
00434         Oid         userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
00435 
00436         fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
00437     }
00438     else
00439         fpinfo->user = NULL;
00440 
00441     /*
00442      * Identify which baserestrictinfo clauses can be sent to the remote
00443      * server and which can't.
00444      */
00445     classifyConditions(root, baserel,
00446                        &fpinfo->remote_conds, &fpinfo->local_conds);
00447 
00448     /*
00449      * Identify which attributes will need to be retrieved from the remote
00450      * server.  These include all attrs needed for joins or final output, plus
00451      * all attrs used in the local_conds.  (Note: if we end up using a
00452      * parameterized scan, it's possible that some of the join clauses will be
00453      * sent to the remote and thus we wouldn't really need to retrieve the
00454      * columns used in them.  Doesn't seem worth detecting that case though.)
00455      */
00456     fpinfo->attrs_used = NULL;
00457     pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
00458                    &fpinfo->attrs_used);
00459     foreach(lc, fpinfo->local_conds)
00460     {
00461         RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
00462 
00463         pull_varattnos((Node *) rinfo->clause, baserel->relid,
00464                        &fpinfo->attrs_used);
00465     }
00466 
00467     /*
00468      * Compute the selectivity and cost of the local_conds, so we don't have
00469      * to do it over again for each path.  The best we can do for these
00470      * conditions is to estimate selectivity on the basis of local statistics.
00471      */
00472     fpinfo->local_conds_sel = clauselist_selectivity(root,
00473                                                      fpinfo->local_conds,
00474                                                      baserel->relid,
00475                                                      JOIN_INNER,
00476                                                      NULL);
00477 
00478     cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
00479 
00480     /*
00481      * If the table or the server is configured to use remote estimates,
00482      * connect to the foreign server and execute EXPLAIN to estimate the
00483      * number of rows selected by the restriction clauses, as well as the
00484      * average row width.  Otherwise, estimate using whatever statistics we
00485      * have locally, in a way similar to ordinary tables.
00486      */
00487     if (fpinfo->use_remote_estimate)
00488     {
00489         /*
00490          * Get cost/size estimates with help of remote server.  Save the
00491          * values in fpinfo so we don't need to do it again to generate the
00492          * basic foreign path.
00493          */
00494         estimate_path_cost_size(root, baserel, NIL,
00495                                 &fpinfo->rows, &fpinfo->width,
00496                                 &fpinfo->startup_cost, &fpinfo->total_cost);
00497 
00498         /* Report estimated baserel size to planner. */
00499         baserel->rows = fpinfo->rows;
00500         baserel->width = fpinfo->width;
00501     }
00502     else
00503     {
00504         /*
00505          * If the foreign table has never been ANALYZEd, it will have relpages
00506          * and reltuples equal to zero, which most likely has nothing to do
00507          * with reality.  We can't do a whole lot about that if we're not
00508          * allowed to consult the remote server, but we can use a hack similar
00509          * to plancat.c's treatment of empty relations: use a minimum size
00510          * estimate of 10 pages, and divide by the column-datatype-based width
00511          * estimate to get the corresponding number of tuples.
00512          */
00513         if (baserel->pages == 0 && baserel->tuples == 0)
00514         {
00515             baserel->pages = 10;
00516             baserel->tuples =
00517                 (10 * BLCKSZ) / (baserel->width + sizeof(HeapTupleHeaderData));
00518         }
00519 
00520         /* Estimate baserel size as best we can with local statistics. */
00521         set_baserel_size_estimates(root, baserel);
00522 
00523         /* Fill in basically-bogus cost estimates for use later. */
00524         estimate_path_cost_size(root, baserel, NIL,
00525                                 &fpinfo->rows, &fpinfo->width,
00526                                 &fpinfo->startup_cost, &fpinfo->total_cost);
00527     }
00528 }
00529 
00530 /*
00531  * postgresGetForeignPaths
00532  *      Create possible scan paths for a scan on the foreign table
00533  */
00534 static void
00535 postgresGetForeignPaths(PlannerInfo *root,
00536                         RelOptInfo *baserel,
00537                         Oid foreigntableid)
00538 {
00539     PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
00540     ForeignPath *path;
00541     Relids      lateral_referencers;
00542     List       *join_quals;
00543     Relids      required_outer;
00544     double      rows;
00545     int         width;
00546     Cost        startup_cost;
00547     Cost        total_cost;
00548     ListCell   *lc;
00549 
00550     /*
00551      * Create simplest ForeignScan path node and add it to baserel.  This path
00552      * corresponds to SeqScan path of regular tables (though depending on what
00553      * baserestrict conditions we were able to send to remote, there might
00554      * actually be an indexscan happening there).  We already did all the work
00555      * to estimate cost and size of this path.
00556      */
00557     path = create_foreignscan_path(root, baserel,
00558                                    fpinfo->rows,
00559                                    fpinfo->startup_cost,
00560                                    fpinfo->total_cost,
00561                                    NIL, /* no pathkeys */
00562                                    NULL,        /* no outer rel either */
00563                                    NIL);        /* no fdw_private list */
00564     add_path(baserel, (Path *) path);
00565 
00566     /*
00567      * If we're not using remote estimates, stop here.  We have no way to
00568      * estimate whether any join clauses would be worth sending across, so
00569      * don't bother building parameterized paths.
00570      */
00571     if (!fpinfo->use_remote_estimate)
00572         return;
00573 
00574     /*
00575      * As a crude first hack, we consider each available join clause and try
00576      * to make a parameterized path using just that clause.  Later we should
00577      * consider combinations of clauses, probably.
00578      */
00579 
00580     /*
00581      * If there are any rels that have LATERAL references to this one, we
00582      * cannot use join quals referencing them as remote quals for this one,
00583      * since such rels would have to be on the inside not the outside of a
00584      * nestloop join relative to this one.  Create a Relids set listing all
00585      * such rels, for use in checks of potential join clauses.
00586      */
00587     lateral_referencers = NULL;
00588     foreach(lc, root->lateral_info_list)
00589     {
00590         LateralJoinInfo *ljinfo = (LateralJoinInfo *) lfirst(lc);
00591 
00592         if (bms_is_member(baserel->relid, ljinfo->lateral_lhs))
00593             lateral_referencers = bms_add_member(lateral_referencers,
00594                                                  ljinfo->lateral_rhs);
00595     }
00596 
00597     /* Scan the rel's join clauses */
00598     foreach(lc, baserel->joininfo)
00599     {
00600         RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
00601 
00602         /* Check if clause can be moved to this rel */
00603         if (!join_clause_is_movable_to(rinfo, baserel->relid))
00604             continue;
00605 
00606         /* Not useful if it conflicts with any LATERAL references */
00607         if (bms_overlap(rinfo->clause_relids, lateral_referencers))
00608             continue;
00609 
00610         /* See if it is safe to send to remote */
00611         if (!is_foreign_expr(root, baserel, rinfo->clause))
00612             continue;
00613 
00614         /*
00615          * OK, get a cost estimate from the remote, and make a path.
00616          */
00617         join_quals = list_make1(rinfo);
00618         estimate_path_cost_size(root, baserel, join_quals,
00619                                 &rows, &width,
00620                                 &startup_cost, &total_cost);
00621 
00622         /* Must calculate required outer rels for this path */
00623         required_outer = bms_union(rinfo->clause_relids,
00624                                    baserel->lateral_relids);
00625         /* We do not want the foreign rel itself listed in required_outer */
00626         required_outer = bms_del_member(required_outer, baserel->relid);
00627         /* Enforce convention that required_outer is exactly NULL if empty */
00628         if (bms_is_empty(required_outer))
00629             required_outer = NULL;
00630 
00631         path = create_foreignscan_path(root, baserel,
00632                                        rows,
00633                                        startup_cost,
00634                                        total_cost,
00635                                        NIL,     /* no pathkeys */
00636                                        required_outer,
00637                                        NIL);    /* no fdw_private list */
00638         add_path(baserel, (Path *) path);
00639     }
00640 
00641     /*
00642      * The above scan examined only "generic" join clauses, not those that
00643      * were absorbed into EquivalenceClauses.  See if we can make anything out
00644      * of EquivalenceClauses.
00645      */
00646     if (baserel->has_eclass_joins)
00647     {
00648         /*
00649          * We repeatedly scan the eclass list looking for column references
00650          * (or expressions) belonging to the foreign rel.  Each time we find
00651          * one, we generate a list of equivalence joinclauses for it, and then
00652          * try to make those into foreign paths.  Repeat till there are no
00653          * more candidate EC members.
00654          */
00655         ec_member_foreign_arg arg;
00656 
00657         arg.already_used = NIL;
00658         for (;;)
00659         {
00660             List       *clauses;
00661 
00662             /* Make clauses, skipping any that join to lateral_referencers */
00663             arg.current = NULL;
00664             clauses = generate_implied_equalities_for_column(root,
00665                                                              baserel,
00666                                                    ec_member_matches_foreign,
00667                                                              (void *) &arg,
00668                                                         lateral_referencers);
00669 
00670             /* Done if there are no more expressions in the foreign rel */
00671             if (arg.current == NULL)
00672             {
00673                 Assert(clauses == NIL);
00674                 break;
00675             }
00676 
00677             /* Scan the extracted join clauses */
00678             foreach(lc, clauses)
00679             {
00680                 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
00681 
00682                 /* Check if clause can be moved to this rel */
00683                 if (!join_clause_is_movable_to(rinfo, baserel->relid))
00684                     continue;
00685 
00686                 /* Shouldn't conflict with any LATERAL references */
00687                 Assert(!bms_overlap(rinfo->clause_relids, lateral_referencers));
00688 
00689                 /* See if it is safe to send to remote */
00690                 if (!is_foreign_expr(root, baserel, rinfo->clause))
00691                     continue;
00692 
00693                 /*
00694                  * OK, get a cost estimate from the remote, and make a path.
00695                  */
00696                 join_quals = list_make1(rinfo);
00697                 estimate_path_cost_size(root, baserel, join_quals,
00698                                         &rows, &width,
00699                                         &startup_cost, &total_cost);
00700 
00701                 /* Must calculate required outer rels for this path */
00702                 required_outer = bms_union(rinfo->clause_relids,
00703                                            baserel->lateral_relids);
00704                 required_outer = bms_del_member(required_outer, baserel->relid);
00705                 if (bms_is_empty(required_outer))
00706                     required_outer = NULL;
00707 
00708                 path = create_foreignscan_path(root, baserel,
00709                                                rows,
00710                                                startup_cost,
00711                                                total_cost,
00712                                                NIL,     /* no pathkeys */
00713                                                required_outer,
00714                                                NIL);    /* no fdw_private */
00715                 add_path(baserel, (Path *) path);
00716             }
00717 
00718             /* Try again, now ignoring the expression we found this time */
00719             arg.already_used = lappend(arg.already_used, arg.current);
00720         }
00721     }
00722 }
00723 
00724 /*
00725  * postgresGetForeignPlan
00726  *      Create ForeignScan plan node which implements selected best path
00727  */
00728 static ForeignScan *
00729 postgresGetForeignPlan(PlannerInfo *root,
00730                        RelOptInfo *baserel,
00731                        Oid foreigntableid,
00732                        ForeignPath *best_path,
00733                        List *tlist,
00734                        List *scan_clauses)
00735 {
00736     PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
00737     Index       scan_relid = baserel->relid;
00738     List       *fdw_private;
00739     List       *remote_conds = NIL;
00740     List       *local_exprs = NIL;
00741     List       *params_list = NIL;
00742     List       *retrieved_attrs;
00743     StringInfoData sql;
00744     ListCell   *lc;
00745 
00746     /*
00747      * Separate the scan_clauses into those that can be executed remotely and
00748      * those that can't.  baserestrictinfo clauses that were previously
00749      * determined to be safe or unsafe by classifyClauses are shown in
00750      * fpinfo->remote_conds and fpinfo->local_conds.  Anything else in the
00751      * scan_clauses list should be a join clause that was found safe by
00752      * postgresGetForeignPaths.
00753      *
00754      * Note: for clauses extracted from EquivalenceClasses, it's possible that
00755      * what we get here is a different representation of the clause than what
00756      * postgresGetForeignPaths saw; for example we might get a commuted
00757      * version of the clause.  So we can't insist on simple equality as we do
00758      * for the baserestrictinfo clauses.
00759      *
00760      * This code must match "extract_actual_clauses(scan_clauses, false)"
00761      * except for the additional decision about remote versus local execution.
00762      * Note however that we only strip the RestrictInfo nodes from the
00763      * local_exprs list, since appendWhereClause expects a list of
00764      * RestrictInfos.
00765      */
00766     foreach(lc, scan_clauses)
00767     {
00768         RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
00769 
00770         Assert(IsA(rinfo, RestrictInfo));
00771 
00772         /* Ignore any pseudoconstants, they're dealt with elsewhere */
00773         if (rinfo->pseudoconstant)
00774             continue;
00775 
00776         if (list_member_ptr(fpinfo->remote_conds, rinfo))
00777             remote_conds = lappend(remote_conds, rinfo);
00778         else if (list_member_ptr(fpinfo->local_conds, rinfo))
00779             local_exprs = lappend(local_exprs, rinfo->clause);
00780         else
00781         {
00782             Assert(is_foreign_expr(root, baserel, rinfo->clause));
00783             remote_conds = lappend(remote_conds, rinfo);
00784         }
00785     }
00786 
00787     /*
00788      * Build the query string to be sent for execution, and identify
00789      * expressions to be sent as parameters.
00790      */
00791     initStringInfo(&sql);
00792     deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
00793                      &retrieved_attrs);
00794     if (remote_conds)
00795         appendWhereClause(&sql, root, baserel, remote_conds,
00796                           true, &params_list);
00797 
00798     /*
00799      * Add FOR UPDATE/SHARE if appropriate.  We apply locking during the
00800      * initial row fetch, rather than later on as is done for local tables.
00801      * The extra roundtrips involved in trying to duplicate the local
00802      * semantics exactly don't seem worthwhile (see also comments for
00803      * RowMarkType).
00804      *
00805      * Note: because we actually run the query as a cursor, this assumes that
00806      * DECLARE CURSOR ... FOR UPDATE is supported, which it isn't before 8.3.
00807      */
00808     if (baserel->relid == root->parse->resultRelation &&
00809         (root->parse->commandType == CMD_UPDATE ||
00810          root->parse->commandType == CMD_DELETE))
00811     {
00812         /* Relation is UPDATE/DELETE target, so use FOR UPDATE */
00813         appendStringInfo(&sql, " FOR UPDATE");
00814     }
00815     else
00816     {
00817         RowMarkClause *rc = get_parse_rowmark(root->parse, baserel->relid);
00818 
00819         if (rc)
00820         {
00821             /*
00822              * Relation is specified as a FOR UPDATE/SHARE target, so handle
00823              * that.
00824              *
00825              * For now, just ignore any [NO] KEY specification, since (a) it's
00826              * not clear what that means for a remote table that we don't have
00827              * complete information about, and (b) it wouldn't work anyway on
00828              * older remote servers.  Likewise, we don't worry about NOWAIT.
00829              */
00830             switch (rc->strength)
00831             {
00832                 case LCS_FORKEYSHARE:
00833                 case LCS_FORSHARE:
00834                     appendStringInfo(&sql, " FOR SHARE");
00835                     break;
00836                 case LCS_FORNOKEYUPDATE:
00837                 case LCS_FORUPDATE:
00838                     appendStringInfo(&sql, " FOR UPDATE");
00839                     break;
00840             }
00841         }
00842     }
00843 
00844     /*
00845      * Build the fdw_private list that will be available to the executor.
00846      * Items in the list must match enum FdwScanPrivateIndex, above.
00847      */
00848     fdw_private = list_make2(makeString(sql.data),
00849                              retrieved_attrs);
00850 
00851     /*
00852      * Create the ForeignScan node from target list, local filtering
00853      * expressions, remote parameter expressions, and FDW private information.
00854      *
00855      * Note that the remote parameter expressions are stored in the fdw_exprs
00856      * field of the finished plan node; we can't keep them in private state
00857      * because then they wouldn't be subject to later planner processing.
00858      */
00859     return make_foreignscan(tlist,
00860                             local_exprs,
00861                             scan_relid,
00862                             params_list,
00863                             fdw_private);
00864 }
00865 
00866 /*
00867  * postgresBeginForeignScan
00868  *      Initiate an executor scan of a foreign PostgreSQL table.
00869  */
00870 static void
00871 postgresBeginForeignScan(ForeignScanState *node, int eflags)
00872 {
00873     ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
00874     EState     *estate = node->ss.ps.state;
00875     PgFdwScanState *fsstate;
00876     RangeTblEntry *rte;
00877     Oid         userid;
00878     ForeignTable *table;
00879     ForeignServer *server;
00880     UserMapping *user;
00881     int         numParams;
00882     int         i;
00883     ListCell   *lc;
00884 
00885     /*
00886      * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
00887      */
00888     if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
00889         return;
00890 
00891     /*
00892      * We'll save private state in node->fdw_state.
00893      */
00894     fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
00895     node->fdw_state = (void *) fsstate;
00896 
00897     /*
00898      * Identify which user to do the remote access as.  This should match what
00899      * ExecCheckRTEPerms() does.
00900      */
00901     rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
00902     userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
00903 
00904     /* Get info about foreign table. */
00905     fsstate->rel = node->ss.ss_currentRelation;
00906     table = GetForeignTable(RelationGetRelid(fsstate->rel));
00907     server = GetForeignServer(table->serverid);
00908     user = GetUserMapping(userid, server->serverid);
00909 
00910     /*
00911      * Get connection to the foreign server.  Connection manager will
00912      * establish new connection if necessary.
00913      */
00914     fsstate->conn = GetConnection(server, user, false);
00915 
00916     /* Assign a unique ID for my cursor */
00917     fsstate->cursor_number = GetCursorNumber(fsstate->conn);
00918     fsstate->cursor_exists = false;
00919 
00920     /* Get private info created by planner functions. */
00921     fsstate->query = strVal(list_nth(fsplan->fdw_private,
00922                                      FdwScanPrivateSelectSql));
00923     fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
00924                                                  FdwScanPrivateRetrievedAttrs);
00925 
00926     /* Create contexts for batches of tuples and per-tuple temp workspace. */
00927     fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
00928                                                "postgres_fdw tuple data",
00929                                                ALLOCSET_DEFAULT_MINSIZE,
00930                                                ALLOCSET_DEFAULT_INITSIZE,
00931                                                ALLOCSET_DEFAULT_MAXSIZE);
00932     fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
00933                                               "postgres_fdw temporary data",
00934                                               ALLOCSET_SMALL_MINSIZE,
00935                                               ALLOCSET_SMALL_INITSIZE,
00936                                               ALLOCSET_SMALL_MAXSIZE);
00937 
00938     /* Get info we'll need for input data conversion. */
00939     fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel));
00940 
00941     /* Prepare for output conversion of parameters used in remote query. */
00942     numParams = list_length(fsplan->fdw_exprs);
00943     fsstate->numParams = numParams;
00944     fsstate->param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
00945 
00946     i = 0;
00947     foreach(lc, fsplan->fdw_exprs)
00948     {
00949         Node       *param_expr = (Node *) lfirst(lc);
00950         Oid         typefnoid;
00951         bool        isvarlena;
00952 
00953         getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
00954         fmgr_info(typefnoid, &fsstate->param_flinfo[i]);
00955         i++;
00956     }
00957 
00958     /*
00959      * Prepare remote-parameter expressions for evaluation.  (Note: in
00960      * practice, we expect that all these expressions will be just Params, so
00961      * we could possibly do something more efficient than using the full
00962      * expression-eval machinery for this.  But probably there would be little
00963      * benefit, and it'd require postgres_fdw to know more than is desirable
00964      * about Param evaluation.)
00965      */
00966     fsstate->param_exprs = (List *)
00967         ExecInitExpr((Expr *) fsplan->fdw_exprs,
00968                      (PlanState *) node);
00969 
00970     /*
00971      * Allocate buffer for text form of query parameters, if any.
00972      */
00973     if (numParams > 0)
00974         fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
00975     else
00976         fsstate->param_values = NULL;
00977 }
00978 
00979 /*
00980  * postgresIterateForeignScan
00981  *      Retrieve next row from the result set, or clear tuple slot to indicate
00982  *      EOF.
00983  */
00984 static TupleTableSlot *
00985 postgresIterateForeignScan(ForeignScanState *node)
00986 {
00987     PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
00988     TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
00989 
00990     /*
00991      * If this is the first call after Begin or ReScan, we need to create the
00992      * cursor on the remote side.
00993      */
00994     if (!fsstate->cursor_exists)
00995         create_cursor(node);
00996 
00997     /*
00998      * Get some more tuples, if we've run out.
00999      */
01000     if (fsstate->next_tuple >= fsstate->num_tuples)
01001     {
01002         /* No point in another fetch if we already detected EOF, though. */
01003         if (!fsstate->eof_reached)
01004             fetch_more_data(node);
01005         /* If we didn't get any tuples, must be end of data. */
01006         if (fsstate->next_tuple >= fsstate->num_tuples)
01007             return ExecClearTuple(slot);
01008     }
01009 
01010     /*
01011      * Return the next tuple.
01012      */
01013     ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
01014                    slot,
01015                    InvalidBuffer,
01016                    false);
01017 
01018     return slot;
01019 }
01020 
01021 /*
01022  * postgresReScanForeignScan
01023  *      Restart the scan.
01024  */
01025 static void
01026 postgresReScanForeignScan(ForeignScanState *node)
01027 {
01028     PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
01029     char        sql[64];
01030     PGresult   *res;
01031 
01032     /* If we haven't created the cursor yet, nothing to do. */
01033     if (!fsstate->cursor_exists)
01034         return;
01035 
01036     /*
01037      * If any internal parameters affecting this node have changed, we'd
01038      * better destroy and recreate the cursor.  Otherwise, rewinding it should
01039      * be good enough.  If we've only fetched zero or one batch, we needn't
01040      * even rewind the cursor, just rescan what we have.
01041      */
01042     if (node->ss.ps.chgParam != NULL)
01043     {
01044         fsstate->cursor_exists = false;
01045         snprintf(sql, sizeof(sql), "CLOSE c%u",
01046                  fsstate->cursor_number);
01047     }
01048     else if (fsstate->fetch_ct_2 > 1)
01049     {
01050         snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
01051                  fsstate->cursor_number);
01052     }
01053     else
01054     {
01055         /* Easy: just rescan what we already have in memory, if anything */
01056         fsstate->next_tuple = 0;
01057         return;
01058     }
01059 
01060     /*
01061      * We don't use a PG_TRY block here, so be careful not to throw error
01062      * without releasing the PGresult.
01063      */
01064     res = PQexec(fsstate->conn, sql);
01065     if (PQresultStatus(res) != PGRES_COMMAND_OK)
01066         pgfdw_report_error(ERROR, res, true, sql);
01067     PQclear(res);
01068 
01069     /* Now force a fresh FETCH. */
01070     fsstate->tuples = NULL;
01071     fsstate->num_tuples = 0;
01072     fsstate->next_tuple = 0;
01073     fsstate->fetch_ct_2 = 0;
01074     fsstate->eof_reached = false;
01075 }
01076 
01077 /*
01078  * postgresEndForeignScan
01079  *      Finish scanning foreign table and dispose objects used for this scan
01080  */
01081 static void
01082 postgresEndForeignScan(ForeignScanState *node)
01083 {
01084     PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
01085 
01086     /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
01087     if (fsstate == NULL)
01088         return;
01089 
01090     /* Close the cursor if open, to prevent accumulation of cursors */
01091     if (fsstate->cursor_exists)
01092         close_cursor(fsstate->conn, fsstate->cursor_number);
01093 
01094     /* Release remote connection */
01095     ReleaseConnection(fsstate->conn);
01096     fsstate->conn = NULL;
01097 
01098     /* MemoryContexts will be deleted automatically. */
01099 }
01100 
01101 /*
01102  * postgresAddForeignUpdateTargets
01103  *      Add resjunk column(s) needed for update/delete on a foreign table
01104  */
01105 static void
01106 postgresAddForeignUpdateTargets(Query *parsetree,
01107                                 RangeTblEntry *target_rte,
01108                                 Relation target_relation)
01109 {
01110     Var        *var;
01111     const char *attrname;
01112     TargetEntry *tle;
01113 
01114     /*
01115      * In postgres_fdw, what we need is the ctid, same as for a regular table.
01116      */
01117 
01118     /* Make a Var representing the desired value */
01119     var = makeVar(parsetree->resultRelation,
01120                   SelfItemPointerAttributeNumber,
01121                   TIDOID,
01122                   -1,
01123                   InvalidOid,
01124                   0);
01125 
01126     /* Wrap it in a resjunk TLE with the right name ... */
01127     attrname = "ctid";
01128 
01129     tle = makeTargetEntry((Expr *) var,
01130                           list_length(parsetree->targetList) + 1,
01131                           pstrdup(attrname),
01132                           true);
01133 
01134     /* ... and add it to the query's targetlist */
01135     parsetree->targetList = lappend(parsetree->targetList, tle);
01136 }
01137 
01138 /*
01139  * postgresPlanForeignModify
01140  *      Plan an insert/update/delete operation on a foreign table
01141  *
01142  * Note: currently, the plan tree generated for UPDATE/DELETE will always
01143  * include a ForeignScan that retrieves ctids (using SELECT FOR UPDATE)
01144  * and then the ModifyTable node will have to execute individual remote
01145  * UPDATE/DELETE commands.  If there are no local conditions or joins
01146  * needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING
01147  * and then do nothing at ModifyTable.  Room for future optimization ...
01148  */
01149 static List *
01150 postgresPlanForeignModify(PlannerInfo *root,
01151                           ModifyTable *plan,
01152                           Index resultRelation,
01153                           int subplan_index)
01154 {
01155     CmdType     operation = plan->operation;
01156     RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
01157     Relation    rel;
01158     StringInfoData sql;
01159     List       *targetAttrs = NIL;
01160     List       *returningList = NIL;
01161     List       *retrieved_attrs = NIL;
01162 
01163     initStringInfo(&sql);
01164 
01165     /*
01166      * Core code already has some lock on each rel being planned, so we can
01167      * use NoLock here.
01168      */
01169     rel = heap_open(rte->relid, NoLock);
01170 
01171     /*
01172      * In an INSERT, we transmit all columns that are defined in the foreign
01173      * table.  In an UPDATE, we transmit only columns that were explicitly
01174      * targets of the UPDATE, so as to avoid unnecessary data transmission.
01175      * (We can't do that for INSERT since we would miss sending default values
01176      * for columns not listed in the source statement.)
01177      */
01178     if (operation == CMD_INSERT)
01179     {
01180         TupleDesc   tupdesc = RelationGetDescr(rel);
01181         int         attnum;
01182 
01183         for (attnum = 1; attnum <= tupdesc->natts; attnum++)
01184         {
01185             Form_pg_attribute attr = tupdesc->attrs[attnum - 1];
01186 
01187             if (!attr->attisdropped)
01188                 targetAttrs = lappend_int(targetAttrs, attnum);
01189         }
01190     }
01191     else if (operation == CMD_UPDATE)
01192     {
01193         Bitmapset  *tmpset = bms_copy(rte->modifiedCols);
01194         AttrNumber  col;
01195 
01196         while ((col = bms_first_member(tmpset)) >= 0)
01197         {
01198             col += FirstLowInvalidHeapAttributeNumber;
01199             if (col <= InvalidAttrNumber)       /* shouldn't happen */
01200                 elog(ERROR, "system-column update is not supported");
01201             targetAttrs = lappend_int(targetAttrs, col);
01202         }
01203     }
01204 
01205     /*
01206      * Extract the relevant RETURNING list if any.
01207      */
01208     if (plan->returningLists)
01209         returningList = (List *) list_nth(plan->returningLists, subplan_index);
01210 
01211     /*
01212      * Construct the SQL command string.
01213      */
01214     switch (operation)
01215     {
01216         case CMD_INSERT:
01217             deparseInsertSql(&sql, root, resultRelation, rel,
01218                              targetAttrs, returningList,
01219                              &retrieved_attrs);
01220             break;
01221         case CMD_UPDATE:
01222             deparseUpdateSql(&sql, root, resultRelation, rel,
01223                              targetAttrs, returningList,
01224                              &retrieved_attrs);
01225             break;
01226         case CMD_DELETE:
01227             deparseDeleteSql(&sql, root, resultRelation, rel,
01228                              returningList,
01229                              &retrieved_attrs);
01230             break;
01231         default:
01232             elog(ERROR, "unexpected operation: %d", (int) operation);
01233             break;
01234     }
01235 
01236     heap_close(rel, NoLock);
01237 
01238     /*
01239      * Build the fdw_private list that will be available to the executor.
01240      * Items in the list must match enum FdwModifyPrivateIndex, above.
01241      */
01242     return list_make4(makeString(sql.data),
01243                       targetAttrs,
01244                       makeInteger((returningList != NIL)),
01245                       retrieved_attrs);
01246 }
01247 
01248 /*
01249  * postgresBeginForeignModify
01250  *      Begin an insert/update/delete operation on a foreign table
01251  */
01252 static void
01253 postgresBeginForeignModify(ModifyTableState *mtstate,
01254                            ResultRelInfo *resultRelInfo,
01255                            List *fdw_private,
01256                            int subplan_index,
01257                            int eflags)
01258 {
01259     PgFdwModifyState *fmstate;
01260     EState     *estate = mtstate->ps.state;
01261     CmdType     operation = mtstate->operation;
01262     Relation    rel = resultRelInfo->ri_RelationDesc;
01263     RangeTblEntry *rte;
01264     Oid         userid;
01265     ForeignTable *table;
01266     ForeignServer *server;
01267     UserMapping *user;
01268     AttrNumber  n_params;
01269     Oid         typefnoid;
01270     bool        isvarlena;
01271     ListCell   *lc;
01272 
01273     /*
01274      * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
01275      * stays NULL.
01276      */
01277     if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
01278         return;
01279 
01280     /* Begin constructing PgFdwModifyState. */
01281     fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
01282     fmstate->rel = rel;
01283 
01284     /*
01285      * Identify which user to do the remote access as.  This should match what
01286      * ExecCheckRTEPerms() does.
01287      */
01288     rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
01289     userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
01290 
01291     /* Get info about foreign table. */
01292     table = GetForeignTable(RelationGetRelid(rel));
01293     server = GetForeignServer(table->serverid);
01294     user = GetUserMapping(userid, server->serverid);
01295 
01296     /* Open connection; report that we'll create a prepared statement. */
01297     fmstate->conn = GetConnection(server, user, true);
01298     fmstate->p_name = NULL;     /* prepared statement not made yet */
01299 
01300     /* Deconstruct fdw_private data. */
01301     fmstate->query = strVal(list_nth(fdw_private,
01302                                      FdwModifyPrivateUpdateSql));
01303     fmstate->target_attrs = (List *) list_nth(fdw_private,
01304                                               FdwModifyPrivateTargetAttnums);
01305     fmstate->has_returning = intVal(list_nth(fdw_private,
01306                                              FdwModifyPrivateHasReturning));
01307     fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
01308                                                  FdwModifyPrivateRetrievedAttrs);
01309 
01310     /* Create context for per-tuple temp workspace. */
01311     fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
01312                                               "postgres_fdw temporary data",
01313                                               ALLOCSET_SMALL_MINSIZE,
01314                                               ALLOCSET_SMALL_INITSIZE,
01315                                               ALLOCSET_SMALL_MAXSIZE);
01316 
01317     /* Prepare for input conversion of RETURNING results. */
01318     if (fmstate->has_returning)
01319         fmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
01320 
01321     /* Prepare for output conversion of parameters used in prepared stmt. */
01322     n_params = list_length(fmstate->target_attrs) + 1;
01323     fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
01324     fmstate->p_nums = 0;
01325 
01326     if (operation == CMD_UPDATE || operation == CMD_DELETE)
01327     {
01328         /* Find the ctid resjunk column in the subplan's result */
01329         Plan       *subplan = mtstate->mt_plans[subplan_index]->plan;
01330 
01331         fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
01332                                                           "ctid");
01333         if (!AttributeNumberIsValid(fmstate->ctidAttno))
01334             elog(ERROR, "could not find junk ctid column");
01335 
01336         /* First transmittable parameter will be ctid */
01337         getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
01338         fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
01339         fmstate->p_nums++;
01340     }
01341 
01342     if (operation == CMD_INSERT || operation == CMD_UPDATE)
01343     {
01344         /* Set up for remaining transmittable parameters */
01345         foreach(lc, fmstate->target_attrs)
01346         {
01347             int         attnum = lfirst_int(lc);
01348             Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];
01349 
01350             Assert(!attr->attisdropped);
01351 
01352             getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
01353             fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
01354             fmstate->p_nums++;
01355         }
01356     }
01357 
01358     Assert(fmstate->p_nums <= n_params);
01359 
01360     resultRelInfo->ri_FdwState = fmstate;
01361 }
01362 
01363 /*
01364  * postgresExecForeignInsert
01365  *      Insert one row into a foreign table
01366  */
01367 static TupleTableSlot *
01368 postgresExecForeignInsert(EState *estate,
01369                           ResultRelInfo *resultRelInfo,
01370                           TupleTableSlot *slot,
01371                           TupleTableSlot *planSlot)
01372 {
01373     PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
01374     const char **p_values;
01375     PGresult   *res;
01376     int         n_rows;
01377 
01378     /* Set up the prepared statement on the remote server, if we didn't yet */
01379     if (!fmstate->p_name)
01380         prepare_foreign_modify(fmstate);
01381 
01382     /* Convert parameters needed by prepared statement to text form */
01383     p_values = convert_prep_stmt_params(fmstate, NULL, slot);
01384 
01385     /*
01386      * Execute the prepared statement, and check for success.
01387      *
01388      * We don't use a PG_TRY block here, so be careful not to throw error
01389      * without releasing the PGresult.
01390      */
01391     res = PQexecPrepared(fmstate->conn,
01392                          fmstate->p_name,
01393                          fmstate->p_nums,
01394                          p_values,
01395                          NULL,
01396                          NULL,
01397                          0);
01398     if (PQresultStatus(res) !=
01399         (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
01400         pgfdw_report_error(ERROR, res, true, fmstate->query);
01401 
01402     /* Check number of rows affected, and fetch RETURNING tuple if any */
01403     if (fmstate->has_returning)
01404     {
01405         n_rows = PQntuples(res);
01406         if (n_rows > 0)
01407             store_returning_result(fmstate, slot, res);
01408     }
01409     else
01410         n_rows = atoi(PQcmdTuples(res));
01411 
01412     /* And clean up */
01413     PQclear(res);
01414 
01415     MemoryContextReset(fmstate->temp_cxt);
01416 
01417     /* Return NULL if nothing was inserted on the remote end */
01418     return (n_rows > 0) ? slot : NULL;
01419 }
01420 
01421 /*
01422  * postgresExecForeignUpdate
01423  *      Update one row in a foreign table
01424  */
01425 static TupleTableSlot *
01426 postgresExecForeignUpdate(EState *estate,
01427                           ResultRelInfo *resultRelInfo,
01428                           TupleTableSlot *slot,
01429                           TupleTableSlot *planSlot)
01430 {
01431     PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
01432     Datum       datum;
01433     bool        isNull;
01434     const char **p_values;
01435     PGresult   *res;
01436     int         n_rows;
01437 
01438     /* Set up the prepared statement on the remote server, if we didn't yet */
01439     if (!fmstate->p_name)
01440         prepare_foreign_modify(fmstate);
01441 
01442     /* Get the ctid that was passed up as a resjunk column */
01443     datum = ExecGetJunkAttribute(planSlot,
01444                                  fmstate->ctidAttno,
01445                                  &isNull);
01446     /* shouldn't ever get a null result... */
01447     if (isNull)
01448         elog(ERROR, "ctid is NULL");
01449 
01450     /* Convert parameters needed by prepared statement to text form */
01451     p_values = convert_prep_stmt_params(fmstate,
01452                                         (ItemPointer) DatumGetPointer(datum),
01453                                         slot);
01454 
01455     /*
01456      * Execute the prepared statement, and check for success.
01457      *
01458      * We don't use a PG_TRY block here, so be careful not to throw error
01459      * without releasing the PGresult.
01460      */
01461     res = PQexecPrepared(fmstate->conn,
01462                          fmstate->p_name,
01463                          fmstate->p_nums,
01464                          p_values,
01465                          NULL,
01466                          NULL,
01467                          0);
01468     if (PQresultStatus(res) !=
01469         (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
01470         pgfdw_report_error(ERROR, res, true, fmstate->query);
01471 
01472     /* Check number of rows affected, and fetch RETURNING tuple if any */
01473     if (fmstate->has_returning)
01474     {
01475         n_rows = PQntuples(res);
01476         if (n_rows > 0)
01477             store_returning_result(fmstate, slot, res);
01478     }
01479     else
01480         n_rows = atoi(PQcmdTuples(res));
01481 
01482     /* And clean up */
01483     PQclear(res);
01484 
01485     MemoryContextReset(fmstate->temp_cxt);
01486 
01487     /* Return NULL if nothing was updated on the remote end */
01488     return (n_rows > 0) ? slot : NULL;
01489 }
01490 
01491 /*
01492  * postgresExecForeignDelete
01493  *      Delete one row from a foreign table
01494  */
01495 static TupleTableSlot *
01496 postgresExecForeignDelete(EState *estate,
01497                           ResultRelInfo *resultRelInfo,
01498                           TupleTableSlot *slot,
01499                           TupleTableSlot *planSlot)
01500 {
01501     PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
01502     Datum       datum;
01503     bool        isNull;
01504     const char **p_values;
01505     PGresult   *res;
01506     int         n_rows;
01507 
01508     /* Set up the prepared statement on the remote server, if we didn't yet */
01509     if (!fmstate->p_name)
01510         prepare_foreign_modify(fmstate);
01511 
01512     /* Get the ctid that was passed up as a resjunk column */
01513     datum = ExecGetJunkAttribute(planSlot,
01514                                  fmstate->ctidAttno,
01515                                  &isNull);
01516     /* shouldn't ever get a null result... */
01517     if (isNull)
01518         elog(ERROR, "ctid is NULL");
01519 
01520     /* Convert parameters needed by prepared statement to text form */
01521     p_values = convert_prep_stmt_params(fmstate,
01522                                         (ItemPointer) DatumGetPointer(datum),
01523                                         NULL);
01524 
01525     /*
01526      * Execute the prepared statement, and check for success.
01527      *
01528      * We don't use a PG_TRY block here, so be careful not to throw error
01529      * without releasing the PGresult.
01530      */
01531     res = PQexecPrepared(fmstate->conn,
01532                          fmstate->p_name,
01533                          fmstate->p_nums,
01534                          p_values,
01535                          NULL,
01536                          NULL,
01537                          0);
01538     if (PQresultStatus(res) !=
01539         (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
01540         pgfdw_report_error(ERROR, res, true, fmstate->query);
01541 
01542     /* Check number of rows affected, and fetch RETURNING tuple if any */
01543     if (fmstate->has_returning)
01544     {
01545         n_rows = PQntuples(res);
01546         if (n_rows > 0)
01547             store_returning_result(fmstate, slot, res);
01548     }
01549     else
01550         n_rows = atoi(PQcmdTuples(res));
01551 
01552     /* And clean up */
01553     PQclear(res);
01554 
01555     MemoryContextReset(fmstate->temp_cxt);
01556 
01557     /* Return NULL if nothing was deleted on the remote end */
01558     return (n_rows > 0) ? slot : NULL;
01559 }
01560 
01561 /*
01562  * postgresEndForeignModify
01563  *      Finish an insert/update/delete operation on a foreign table
01564  */
01565 static void
01566 postgresEndForeignModify(EState *estate,
01567                          ResultRelInfo *resultRelInfo)
01568 {
01569     PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
01570 
01571     /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
01572     if (fmstate == NULL)
01573         return;
01574 
01575     /* If we created a prepared statement, destroy it */
01576     if (fmstate->p_name)
01577     {
01578         char        sql[64];
01579         PGresult   *res;
01580 
01581         snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
01582 
01583         /*
01584          * We don't use a PG_TRY block here, so be careful not to throw error
01585          * without releasing the PGresult.
01586          */
01587         res = PQexec(fmstate->conn, sql);
01588         if (PQresultStatus(res) != PGRES_COMMAND_OK)
01589             pgfdw_report_error(ERROR, res, true, sql);
01590         PQclear(res);
01591         fmstate->p_name = NULL;
01592     }
01593 
01594     /* Release remote connection */
01595     ReleaseConnection(fmstate->conn);
01596     fmstate->conn = NULL;
01597 }
01598 
01599 /*
01600  * postgresExplainForeignScan
01601  *      Produce extra output for EXPLAIN of a ForeignScan on a foreign table
01602  */
01603 static void
01604 postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
01605 {
01606     List       *fdw_private;
01607     char       *sql;
01608 
01609     if (es->verbose)
01610     {
01611         fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
01612         sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
01613         ExplainPropertyText("Remote SQL", sql, es);
01614     }
01615 }
01616 
01617 /*
01618  * postgresExplainForeignModify
01619  *      Produce extra output for EXPLAIN of a ModifyTable on a foreign table
01620  */
01621 static void
01622 postgresExplainForeignModify(ModifyTableState *mtstate,
01623                              ResultRelInfo *rinfo,
01624                              List *fdw_private,
01625                              int subplan_index,
01626                              ExplainState *es)
01627 {
01628     if (es->verbose)
01629     {
01630         char       *sql = strVal(list_nth(fdw_private,
01631                                           FdwModifyPrivateUpdateSql));
01632 
01633         ExplainPropertyText("Remote SQL", sql, es);
01634     }
01635 }
01636 
01637 
01638 /*
01639  * estimate_path_cost_size
01640  *      Get cost and size estimates for a foreign scan
01641  *
01642  * We assume that all the baserestrictinfo clauses will be applied, plus
01643  * any join clauses listed in join_conds.
01644  */
01645 static void
01646 estimate_path_cost_size(PlannerInfo *root,
01647                         RelOptInfo *baserel,
01648                         List *join_conds,
01649                         double *p_rows, int *p_width,
01650                         Cost *p_startup_cost, Cost *p_total_cost)
01651 {
01652     PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
01653     double      rows;
01654     double      retrieved_rows;
01655     int         width;
01656     Cost        startup_cost;
01657     Cost        total_cost;
01658     Cost        run_cost;
01659     Cost        cpu_per_tuple;
01660 
01661     /*
01662      * If the table or the server is configured to use remote estimates,
01663      * connect to the foreign server and execute EXPLAIN to estimate the
01664      * number of rows selected by the restriction+join clauses.  Otherwise,
01665      * estimate rows using whatever statistics we have locally, in a way
01666      * similar to ordinary tables.
01667      */
01668     if (fpinfo->use_remote_estimate)
01669     {
01670         StringInfoData sql;
01671         List       *retrieved_attrs;
01672         PGconn     *conn;
01673 
01674         /*
01675          * Construct EXPLAIN query including the desired SELECT, FROM, and
01676          * WHERE clauses.  Params and other-relation Vars are replaced by
01677          * dummy values.
01678          */
01679         initStringInfo(&sql);
01680         appendStringInfoString(&sql, "EXPLAIN ");
01681         deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
01682                          &retrieved_attrs);
01683         if (fpinfo->remote_conds)
01684             appendWhereClause(&sql, root, baserel, fpinfo->remote_conds,
01685                               true, NULL);
01686         if (join_conds)
01687             appendWhereClause(&sql, root, baserel, join_conds,
01688                               (fpinfo->remote_conds == NIL), NULL);
01689 
01690         /* Get the remote estimate */
01691         conn = GetConnection(fpinfo->server, fpinfo->user, false);
01692         get_remote_estimate(sql.data, conn, &rows, &width,
01693                             &startup_cost, &total_cost);
01694         ReleaseConnection(conn);
01695 
01696         retrieved_rows = rows;
01697 
01698         /* Factor in the selectivity of the local_conds */
01699         rows = clamp_row_est(rows * fpinfo->local_conds_sel);
01700 
01701         /* Add in the eval cost of the local_conds */
01702         startup_cost += fpinfo->local_conds_cost.startup;
01703         total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
01704     }
01705     else
01706     {
01707         /*
01708          * We don't support join conditions in this mode (hence, no
01709          * parameterized paths can be made).
01710          */
01711         Assert(join_conds == NIL);
01712 
01713         /* Use rows/width estimates made by set_baserel_size_estimates. */
01714         rows = baserel->rows;
01715         width = baserel->width;
01716 
01717         /*
01718          * Back into an estimate of the number of retrieved rows.  Just in
01719          * case this is nuts, clamp to at most baserel->tuples.
01720          */
01721         retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
01722         retrieved_rows = Min(retrieved_rows, baserel->tuples);
01723 
01724         /*
01725          * Cost as though this were a seqscan, which is pessimistic.  We
01726          * effectively imagine the local_conds are being evaluated remotely,
01727          * too.
01728          */
01729         startup_cost = 0;
01730         run_cost = 0;
01731         run_cost += seq_page_cost * baserel->pages;
01732 
01733         startup_cost += baserel->baserestrictcost.startup;
01734         cpu_per_tuple = cpu_tuple_cost + baserel->baserestrictcost.per_tuple;
01735         run_cost += cpu_per_tuple * baserel->tuples;
01736 
01737         total_cost = startup_cost + run_cost;
01738     }
01739 
01740     /*
01741      * Add some additional cost factors to account for connection overhead
01742      * (fdw_startup_cost), transferring data across the network
01743      * (fdw_tuple_cost per retrieved row), and local manipulation of the data
01744      * (cpu_tuple_cost per retrieved row).
01745      */
01746     startup_cost += fpinfo->fdw_startup_cost;
01747     total_cost += fpinfo->fdw_startup_cost;
01748     total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
01749     total_cost += cpu_tuple_cost * retrieved_rows;
01750 
01751     /* Return results. */
01752     *p_rows = rows;
01753     *p_width = width;
01754     *p_startup_cost = startup_cost;
01755     *p_total_cost = total_cost;
01756 }
01757 
01758 /*
01759  * Estimate costs of executing a SQL statement remotely.
01760  * The given "sql" must be an EXPLAIN command.
01761  */
01762 static void
01763 get_remote_estimate(const char *sql, PGconn *conn,
01764                     double *rows, int *width,
01765                     Cost *startup_cost, Cost *total_cost)
01766 {
01767     PGresult   *volatile res = NULL;
01768 
01769     /* PGresult must be released before leaving this function. */
01770     PG_TRY();
01771     {
01772         char       *line;
01773         char       *p;
01774         int         n;
01775 
01776         /*
01777          * Execute EXPLAIN remotely.
01778          */
01779         res = PQexec(conn, sql);
01780         if (PQresultStatus(res) != PGRES_TUPLES_OK)
01781             pgfdw_report_error(ERROR, res, false, sql);
01782 
01783         /*
01784          * Extract cost numbers for topmost plan node.  Note we search for a
01785          * left paren from the end of the line to avoid being confused by
01786          * other uses of parentheses.
01787          */
01788         line = PQgetvalue(res, 0, 0);
01789         p = strrchr(line, '(');
01790         if (p == NULL)
01791             elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
01792         n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
01793                    startup_cost, total_cost, rows, width);
01794         if (n != 4)
01795             elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
01796 
01797         PQclear(res);
01798         res = NULL;
01799     }
01800     PG_CATCH();
01801     {
01802         if (res)
01803             PQclear(res);
01804         PG_RE_THROW();
01805     }
01806     PG_END_TRY();
01807 }
01808 
01809 /*
01810  * Detect whether we want to process an EquivalenceClass member.
01811  *
01812  * This is a callback for use by generate_implied_equalities_for_column.
01813  */
01814 static bool
01815 ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
01816                           EquivalenceClass *ec, EquivalenceMember *em,
01817                           void *arg)
01818 {
01819     ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
01820     Expr       *expr = em->em_expr;
01821 
01822     /*
01823      * If we've identified what we're processing in the current scan, we only
01824      * want to match that expression.
01825      */
01826     if (state->current != NULL)
01827         return equal(expr, state->current);
01828 
01829     /*
01830      * Otherwise, ignore anything we've already processed.
01831      */
01832     if (list_member(state->already_used, expr))
01833         return false;
01834 
01835     /* This is the new target to process. */
01836     state->current = expr;
01837     return true;
01838 }
01839 
01840 /*
01841  * Create cursor for node's query with current parameter values.
01842  */
01843 static void
01844 create_cursor(ForeignScanState *node)
01845 {
01846     PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
01847     ExprContext *econtext = node->ss.ps.ps_ExprContext;
01848     int         numParams = fsstate->numParams;
01849     const char **values = fsstate->param_values;
01850     PGconn     *conn = fsstate->conn;
01851     StringInfoData buf;
01852     PGresult   *res;
01853 
01854     /*
01855      * Construct array of query parameter values in text format.  We do the
01856      * conversions in the short-lived per-tuple context, so as not to cause a
01857      * memory leak over repeated scans.
01858      */
01859     if (numParams > 0)
01860     {
01861         int         nestlevel;
01862         MemoryContext oldcontext;
01863         int         i;
01864         ListCell   *lc;
01865 
01866         oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
01867 
01868         nestlevel = set_transmission_modes();
01869 
01870         i = 0;
01871         foreach(lc, fsstate->param_exprs)
01872         {
01873             ExprState  *expr_state = (ExprState *) lfirst(lc);
01874             Datum       expr_value;
01875             bool        isNull;
01876 
01877             /* Evaluate the parameter expression */
01878             expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);
01879 
01880             /*
01881              * Get string representation of each parameter value by invoking
01882              * type-specific output function, unless the value is null.
01883              */
01884             if (isNull)
01885                 values[i] = NULL;
01886             else
01887                 values[i] = OutputFunctionCall(&fsstate->param_flinfo[i],
01888                                                expr_value);
01889             i++;
01890         }
01891 
01892         reset_transmission_modes(nestlevel);
01893 
01894         MemoryContextSwitchTo(oldcontext);
01895     }
01896 
01897     /* Construct the DECLARE CURSOR command */
01898     initStringInfo(&buf);
01899     appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
01900                      fsstate->cursor_number, fsstate->query);
01901 
01902     /*
01903      * Notice that we pass NULL for paramTypes, thus forcing the remote server
01904      * to infer types for all parameters.  Since we explicitly cast every
01905      * parameter (see deparse.c), the "inference" is trivial and will produce
01906      * the desired result.  This allows us to avoid assuming that the remote
01907      * server has the same OIDs we do for the parameters' types.
01908      *
01909      * We don't use a PG_TRY block here, so be careful not to throw error
01910      * without releasing the PGresult.
01911      */
01912     res = PQexecParams(conn, buf.data, numParams, NULL, values,
01913                        NULL, NULL, 0);
01914     if (PQresultStatus(res) != PGRES_COMMAND_OK)
01915         pgfdw_report_error(ERROR, res, true, fsstate->query);
01916     PQclear(res);
01917 
01918     /* Mark the cursor as created, and show no tuples have been retrieved */
01919     fsstate->cursor_exists = true;
01920     fsstate->tuples = NULL;
01921     fsstate->num_tuples = 0;
01922     fsstate->next_tuple = 0;
01923     fsstate->fetch_ct_2 = 0;
01924     fsstate->eof_reached = false;
01925 
01926     /* Clean up */
01927     pfree(buf.data);
01928 }
01929 
01930 /*
01931  * Fetch some more rows from the node's cursor.
01932  */
01933 static void
01934 fetch_more_data(ForeignScanState *node)
01935 {
01936     PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
01937     PGresult   *volatile res = NULL;
01938     MemoryContext oldcontext;
01939 
01940     /*
01941      * We'll store the tuples in the batch_cxt.  First, flush the previous
01942      * batch.
01943      */
01944     fsstate->tuples = NULL;
01945     MemoryContextReset(fsstate->batch_cxt);
01946     oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
01947 
01948     /* PGresult must be released before leaving this function. */
01949     PG_TRY();
01950     {
01951         PGconn     *conn = fsstate->conn;
01952         char        sql[64];
01953         int         fetch_size;
01954         int         numrows;
01955         int         i;
01956 
01957         /* The fetch size is arbitrary, but shouldn't be enormous. */
01958         fetch_size = 100;
01959 
01960         snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
01961                  fetch_size, fsstate->cursor_number);
01962 
01963         res = PQexec(conn, sql);
01964         /* On error, report the original query, not the FETCH. */
01965         if (PQresultStatus(res) != PGRES_TUPLES_OK)
01966             pgfdw_report_error(ERROR, res, false, fsstate->query);
01967 
01968         /* Convert the data into HeapTuples */
01969         numrows = PQntuples(res);
01970         fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
01971         fsstate->num_tuples = numrows;
01972         fsstate->next_tuple = 0;
01973 
01974         for (i = 0; i < numrows; i++)
01975         {
01976             fsstate->tuples[i] =
01977                 make_tuple_from_result_row(res, i,
01978                                            fsstate->rel,
01979                                            fsstate->attinmeta,
01980                                            fsstate->retrieved_attrs,
01981                                            fsstate->temp_cxt);
01982         }
01983 
01984         /* Update fetch_ct_2 */
01985         if (fsstate->fetch_ct_2 < 2)
01986             fsstate->fetch_ct_2++;
01987 
01988         /* Must be EOF if we didn't get as many tuples as we asked for. */
01989         fsstate->eof_reached = (numrows < fetch_size);
01990 
01991         PQclear(res);
01992         res = NULL;
01993     }
01994     PG_CATCH();
01995     {
01996         if (res)
01997             PQclear(res);
01998         PG_RE_THROW();
01999     }
02000     PG_END_TRY();
02001 
02002     MemoryContextSwitchTo(oldcontext);
02003 }
02004 
02005 /*
02006  * Force assorted GUC parameters to settings that ensure that we'll output
02007  * data values in a form that is unambiguous to the remote server.
02008  *
02009  * This is rather expensive and annoying to do once per row, but there's
02010  * little choice if we want to be sure values are transmitted accurately;
02011  * we can't leave the settings in place between rows for fear of affecting
02012  * user-visible computations.
02013  *
02014  * We use the equivalent of a function SET option to allow the settings to
02015  * persist only until the caller calls reset_transmission_modes().  If an
02016  * error is thrown in between, guc.c will take care of undoing the settings.
02017  *
02018  * The return value is the nestlevel that must be passed to
02019  * reset_transmission_modes() to undo things.
02020  */
02021 int
02022 set_transmission_modes(void)
02023 {
02024     int         nestlevel = NewGUCNestLevel();
02025 
02026     /*
02027      * The values set here should match what pg_dump does.  See also
02028      * configure_remote_session in connection.c.
02029      */
02030     if (DateStyle != USE_ISO_DATES)
02031         (void) set_config_option("datestyle", "ISO",
02032                                  PGC_USERSET, PGC_S_SESSION,
02033                                  GUC_ACTION_SAVE, true, 0);
02034     if (IntervalStyle != INTSTYLE_POSTGRES)
02035         (void) set_config_option("intervalstyle", "postgres",
02036                                  PGC_USERSET, PGC_S_SESSION,
02037                                  GUC_ACTION_SAVE, true, 0);
02038     if (extra_float_digits < 3)
02039         (void) set_config_option("extra_float_digits", "3",
02040                                  PGC_USERSET, PGC_S_SESSION,
02041                                  GUC_ACTION_SAVE, true, 0);
02042 
02043     return nestlevel;
02044 }
02045 
02046 /*
02047  * Undo the effects of set_transmission_modes().
02048  */
02049 void
02050 reset_transmission_modes(int nestlevel)
02051 {
02052     AtEOXact_GUC(true, nestlevel);
02053 }
02054 
02055 /*
02056  * Utility routine to close a cursor.
02057  */
02058 static void
02059 close_cursor(PGconn *conn, unsigned int cursor_number)
02060 {
02061     char        sql[64];
02062     PGresult   *res;
02063 
02064     snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
02065 
02066     /*
02067      * We don't use a PG_TRY block here, so be careful not to throw error
02068      * without releasing the PGresult.
02069      */
02070     res = PQexec(conn, sql);
02071     if (PQresultStatus(res) != PGRES_COMMAND_OK)
02072         pgfdw_report_error(ERROR, res, true, sql);
02073     PQclear(res);
02074 }
02075 
02076 /*
02077  * prepare_foreign_modify
02078  *      Establish a prepared statement for execution of INSERT/UPDATE/DELETE
02079  */
02080 static void
02081 prepare_foreign_modify(PgFdwModifyState *fmstate)
02082 {
02083     char        prep_name[NAMEDATALEN];
02084     char       *p_name;
02085     PGresult   *res;
02086 
02087     /* Construct name we'll use for the prepared statement. */
02088     snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
02089              GetPrepStmtNumber(fmstate->conn));
02090     p_name = pstrdup(prep_name);
02091 
02092     /*
02093      * We intentionally do not specify parameter types here, but leave the
02094      * remote server to derive them by default.  This avoids possible problems
02095      * with the remote server using different type OIDs than we do.  All of
02096      * the prepared statements we use in this module are simple enough that
02097      * the remote server will make the right choices.
02098      *
02099      * We don't use a PG_TRY block here, so be careful not to throw error
02100      * without releasing the PGresult.
02101      */
02102     res = PQprepare(fmstate->conn,
02103                     p_name,
02104                     fmstate->query,
02105                     0,
02106                     NULL);
02107 
02108     if (PQresultStatus(res) != PGRES_COMMAND_OK)
02109         pgfdw_report_error(ERROR, res, true, fmstate->query);
02110     PQclear(res);
02111 
02112     /* This action shows that the prepare has been done. */
02113     fmstate->p_name = p_name;
02114 }
02115 
02116 /*
02117  * convert_prep_stmt_params
02118  *      Create array of text strings representing parameter values
02119  *
02120  * tupleid is ctid to send, or NULL if none
02121  * slot is slot to get remaining parameters from, or NULL if none
02122  *
02123  * Data is constructed in temp_cxt; caller should reset that after use.
02124  */
02125 static const char **
02126 convert_prep_stmt_params(PgFdwModifyState *fmstate,
02127                          ItemPointer tupleid,
02128                          TupleTableSlot *slot)
02129 {
02130     const char **p_values;
02131     int         pindex = 0;
02132     MemoryContext oldcontext;
02133 
02134     oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
02135 
02136     p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
02137 
02138     /* 1st parameter should be ctid, if it's in use */
02139     if (tupleid != NULL)
02140     {
02141         /* don't need set_transmission_modes for TID output */
02142         p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
02143                                               PointerGetDatum(tupleid));
02144         pindex++;
02145     }
02146 
02147     /* get following parameters from slot */
02148     if (slot != NULL && fmstate->target_attrs != NIL)
02149     {
02150         int         nestlevel;
02151         ListCell   *lc;
02152 
02153         nestlevel = set_transmission_modes();
02154 
02155         foreach(lc, fmstate->target_attrs)
02156         {
02157             int         attnum = lfirst_int(lc);
02158             Datum       value;
02159             bool        isnull;
02160 
02161             value = slot_getattr(slot, attnum, &isnull);
02162             if (isnull)
02163                 p_values[pindex] = NULL;
02164             else
02165                 p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
02166                                                       value);
02167             pindex++;
02168         }
02169 
02170         reset_transmission_modes(nestlevel);
02171     }
02172 
02173     Assert(pindex == fmstate->p_nums);
02174 
02175     MemoryContextSwitchTo(oldcontext);
02176 
02177     return p_values;
02178 }
02179 
02180 /*
02181  * store_returning_result
02182  *      Store the result of a RETURNING clause
02183  *
02184  * On error, be sure to release the PGresult on the way out.  Callers do not
02185  * have PG_TRY blocks to ensure this happens.
02186  */
02187 static void
02188 store_returning_result(PgFdwModifyState *fmstate,
02189                        TupleTableSlot *slot, PGresult *res)
02190 {
02191     /* PGresult must be released before leaving this function. */
02192     PG_TRY();
02193     {
02194         HeapTuple   newtup;
02195 
02196         newtup = make_tuple_from_result_row(res, 0,
02197                                             fmstate->rel,
02198                                             fmstate->attinmeta,
02199                                             fmstate->retrieved_attrs,
02200                                             fmstate->temp_cxt);
02201         /* tuple will be deleted when it is cleared from the slot */
02202         ExecStoreTuple(newtup, slot, InvalidBuffer, true);
02203     }
02204     PG_CATCH();
02205     {
02206         if (res)
02207             PQclear(res);
02208         PG_RE_THROW();
02209     }
02210     PG_END_TRY();
02211 }
02212 
02213 /*
02214  * postgresAnalyzeForeignTable
02215  *      Test whether analyzing this foreign table is supported
02216  */
02217 static bool
02218 postgresAnalyzeForeignTable(Relation relation,
02219                             AcquireSampleRowsFunc *func,
02220                             BlockNumber *totalpages)
02221 {
02222     ForeignTable *table;
02223     ForeignServer *server;
02224     UserMapping *user;
02225     PGconn     *conn;
02226     StringInfoData sql;
02227     PGresult   *volatile res = NULL;
02228 
02229     /* Return the row-analysis function pointer */
02230     *func = postgresAcquireSampleRowsFunc;
02231 
02232     /*
02233      * Now we have to get the number of pages.  It's annoying that the ANALYZE
02234      * API requires us to return that now, because it forces some duplication
02235      * of effort between this routine and postgresAcquireSampleRowsFunc.  But
02236      * it's probably not worth redefining that API at this point.
02237      */
02238 
02239     /*
02240      * Get the connection to use.  We do the remote access as the table's
02241      * owner, even if the ANALYZE was started by some other user.
02242      */
02243     table = GetForeignTable(RelationGetRelid(relation));
02244     server = GetForeignServer(table->serverid);
02245     user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
02246     conn = GetConnection(server, user, false);
02247 
02248     /*
02249      * Construct command to get page count for relation.
02250      */
02251     initStringInfo(&sql);
02252     deparseAnalyzeSizeSql(&sql, relation);
02253 
02254     /* In what follows, do not risk leaking any PGresults. */
02255     PG_TRY();
02256     {
02257         res = PQexec(conn, sql.data);
02258         if (PQresultStatus(res) != PGRES_TUPLES_OK)
02259             pgfdw_report_error(ERROR, res, false, sql.data);
02260 
02261         if (PQntuples(res) != 1 || PQnfields(res) != 1)
02262             elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
02263         *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
02264 
02265         PQclear(res);
02266         res = NULL;
02267     }
02268     PG_CATCH();
02269     {
02270         if (res)
02271             PQclear(res);
02272         PG_RE_THROW();
02273     }
02274     PG_END_TRY();
02275 
02276     ReleaseConnection(conn);
02277 
02278     return true;
02279 }
02280 
02281 /*
02282  * Acquire a random sample of rows from foreign table managed by postgres_fdw.
02283  *
02284  * We fetch the whole table from the remote side and pick out some sample rows.
02285  *
02286  * Selected rows are returned in the caller-allocated array rows[],
02287  * which must have at least targrows entries.
02288  * The actual number of rows selected is returned as the function result.
02289  * We also count the total number of rows in the table and return it into
02290  * *totalrows.  Note that *totaldeadrows is always set to 0.
02291  *
02292  * Note that the returned list of rows is not always in order by physical
02293  * position in the table.  Therefore, correlation estimates derived later
02294  * may be meaningless, but it's OK because we don't use the estimates
02295  * currently (the planner only pays attention to correlation for indexscans).
02296  */
02297 static int
02298 postgresAcquireSampleRowsFunc(Relation relation, int elevel,
02299                               HeapTuple *rows, int targrows,
02300                               double *totalrows,
02301                               double *totaldeadrows)
02302 {
02303     PgFdwAnalyzeState astate;
02304     ForeignTable *table;
02305     ForeignServer *server;
02306     UserMapping *user;
02307     PGconn     *conn;
02308     unsigned int cursor_number;
02309     StringInfoData sql;
02310     PGresult   *volatile res = NULL;
02311 
02312     /* Initialize workspace state */
02313     astate.rel = relation;
02314     astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
02315 
02316     astate.rows = rows;
02317     astate.targrows = targrows;
02318     astate.numrows = 0;
02319     astate.samplerows = 0;
02320     astate.rowstoskip = -1;     /* -1 means not set yet */
02321     astate.rstate = anl_init_selection_state(targrows);
02322 
02323     /* Remember ANALYZE context, and create a per-tuple temp context */
02324     astate.anl_cxt = CurrentMemoryContext;
02325     astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
02326                                             "postgres_fdw temporary data",
02327                                             ALLOCSET_SMALL_MINSIZE,
02328                                             ALLOCSET_SMALL_INITSIZE,
02329                                             ALLOCSET_SMALL_MAXSIZE);
02330 
02331     /*
02332      * Get the connection to use.  We do the remote access as the table's
02333      * owner, even if the ANALYZE was started by some other user.
02334      */
02335     table = GetForeignTable(RelationGetRelid(relation));
02336     server = GetForeignServer(table->serverid);
02337     user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
02338     conn = GetConnection(server, user, false);
02339 
02340     /*
02341      * Construct cursor that retrieves whole rows from remote.
02342      */
02343     cursor_number = GetCursorNumber(conn);
02344     initStringInfo(&sql);
02345     appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
02346     deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
02347 
02348     /* In what follows, do not risk leaking any PGresults. */
02349     PG_TRY();
02350     {
02351         res = PQexec(conn, sql.data);
02352         if (PQresultStatus(res) != PGRES_COMMAND_OK)
02353             pgfdw_report_error(ERROR, res, false, sql.data);
02354         PQclear(res);
02355         res = NULL;
02356 
02357         /* Retrieve and process rows a batch at a time. */
02358         for (;;)
02359         {
02360             char        fetch_sql[64];
02361             int         fetch_size;
02362             int         numrows;
02363             int         i;
02364 
02365             /* Allow users to cancel long query */
02366             CHECK_FOR_INTERRUPTS();
02367 
02368             /*
02369              * XXX possible future improvement: if rowstoskip is large, we
02370              * could issue a MOVE rather than physically fetching the rows,
02371              * then just adjust rowstoskip and samplerows appropriately.
02372              */
02373 
02374             /* The fetch size is arbitrary, but shouldn't be enormous. */
02375             fetch_size = 100;
02376 
02377             /* Fetch some rows */
02378             snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
02379                      fetch_size, cursor_number);
02380 
02381             res = PQexec(conn, fetch_sql);
02382             /* On error, report the original query, not the FETCH. */
02383             if (PQresultStatus(res) != PGRES_TUPLES_OK)
02384                 pgfdw_report_error(ERROR, res, false, sql.data);
02385 
02386             /* Process whatever we got. */
02387             numrows = PQntuples(res);
02388             for (i = 0; i < numrows; i++)
02389                 analyze_row_processor(res, i, &astate);
02390 
02391             PQclear(res);
02392             res = NULL;
02393 
02394             /* Must be EOF if we didn't get all the rows requested. */
02395             if (numrows < fetch_size)
02396                 break;
02397         }
02398 
02399         /* Close the cursor, just to be tidy. */
02400         close_cursor(conn, cursor_number);
02401     }
02402     PG_CATCH();
02403     {
02404         if (res)
02405             PQclear(res);
02406         PG_RE_THROW();
02407     }
02408     PG_END_TRY();
02409 
02410     ReleaseConnection(conn);
02411 
02412     /* We assume that we have no dead tuple. */
02413     *totaldeadrows = 0.0;
02414 
02415     /* We've retrieved all living tuples from foreign server. */
02416     *totalrows = astate.samplerows;
02417 
02418     /*
02419      * Emit some interesting relation info
02420      */
02421     ereport(elevel,
02422             (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
02423                     RelationGetRelationName(relation),
02424                     astate.samplerows, astate.numrows)));
02425 
02426     return astate.numrows;
02427 }
02428 
02429 /*
02430  * Collect sample rows from the result of query.
02431  *   - Use all tuples in sample until target # of samples are collected.
02432  *   - Subsequently, replace already-sampled tuples randomly.
02433  */
02434 static void
02435 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
02436 {
02437     int         targrows = astate->targrows;
02438     int         pos;            /* array index to store tuple in */
02439     MemoryContext oldcontext;
02440 
02441     /* Always increment sample row counter. */
02442     astate->samplerows += 1;
02443 
02444     /*
02445      * Determine the slot where this sample row should be stored.  Set pos to
02446      * negative value to indicate the row should be skipped.
02447      */
02448     if (astate->numrows < targrows)
02449     {
02450         /* First targrows rows are always included into the sample */
02451         pos = astate->numrows++;
02452     }
02453     else
02454     {
02455         /*
02456          * Now we start replacing tuples in the sample until we reach the end
02457          * of the relation.  Same algorithm as in acquire_sample_rows in
02458          * analyze.c; see Jeff Vitter's paper.
02459          */
02460         if (astate->rowstoskip < 0)
02461             astate->rowstoskip = anl_get_next_S(astate->samplerows, targrows,
02462                                                 &astate->rstate);
02463 
02464         if (astate->rowstoskip <= 0)
02465         {
02466             /* Choose a random reservoir element to replace. */
02467             pos = (int) (targrows * anl_random_fract());
02468             Assert(pos >= 0 && pos < targrows);
02469             heap_freetuple(astate->rows[pos]);
02470         }
02471         else
02472         {
02473             /* Skip this tuple. */
02474             pos = -1;
02475         }
02476 
02477         astate->rowstoskip -= 1;
02478     }
02479 
02480     if (pos >= 0)
02481     {
02482         /*
02483          * Create sample tuple from current result row, and store it in the
02484          * position determined above.  The tuple has to be created in anl_cxt.
02485          */
02486         oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
02487 
02488         astate->rows[pos] = make_tuple_from_result_row(res, row,
02489                                                        astate->rel,
02490                                                        astate->attinmeta,
02491                                                        astate->retrieved_attrs,
02492                                                        astate->temp_cxt);
02493 
02494         MemoryContextSwitchTo(oldcontext);
02495     }
02496 }
02497 
02498 /*
02499  * Create a tuple from the specified row of the PGresult.
02500  *
02501  * rel is the local representation of the foreign table, attinmeta is
02502  * conversion data for the rel's tupdesc, and retrieved_attrs is an
02503  * integer list of the table column numbers present in the PGresult.
02504  * temp_context is a working context that can be reset after each tuple.
02505  */
02506 static HeapTuple
02507 make_tuple_from_result_row(PGresult *res,
02508                            int row,
02509                            Relation rel,
02510                            AttInMetadata *attinmeta,
02511                            List *retrieved_attrs,
02512                            MemoryContext temp_context)
02513 {
02514     HeapTuple   tuple;
02515     TupleDesc   tupdesc = RelationGetDescr(rel);
02516     Datum      *values;
02517     bool       *nulls;
02518     ItemPointer ctid = NULL;
02519     ConversionLocation errpos;
02520     ErrorContextCallback errcallback;
02521     MemoryContext oldcontext;
02522     ListCell   *lc;
02523     int         j;
02524 
02525     Assert(row < PQntuples(res));
02526 
02527     /*
02528      * Do the following work in a temp context that we reset after each tuple.
02529      * This cleans up not only the data we have direct access to, but any
02530      * cruft the I/O functions might leak.
02531      */
02532     oldcontext = MemoryContextSwitchTo(temp_context);
02533 
02534     values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
02535     nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
02536     /* Initialize to nulls for any columns not present in result */
02537     memset(nulls, true, tupdesc->natts * sizeof(bool));
02538 
02539     /*
02540      * Set up and install callback to report where conversion error occurs.
02541      */
02542     errpos.rel = rel;
02543     errpos.cur_attno = 0;
02544     errcallback.callback = conversion_error_callback;
02545     errcallback.arg = (void *) &errpos;
02546     errcallback.previous = error_context_stack;
02547     error_context_stack = &errcallback;
02548 
02549     /*
02550      * i indexes columns in the relation, j indexes columns in the PGresult.
02551      */
02552     j = 0;
02553     foreach(lc, retrieved_attrs)
02554     {
02555         int         i = lfirst_int(lc);
02556         char       *valstr;
02557 
02558         /* fetch next column's textual value */
02559         if (PQgetisnull(res, row, j))
02560             valstr = NULL;
02561         else
02562             valstr = PQgetvalue(res, row, j);
02563 
02564         /* convert value to internal representation */
02565         if (i > 0)
02566         {
02567             /* ordinary column */
02568             Assert(i <= tupdesc->natts);
02569             nulls[i - 1] = (valstr == NULL);
02570             /* Apply the input function even to nulls, to support domains */
02571             errpos.cur_attno = i;
02572             values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
02573                                               valstr,
02574                                               attinmeta->attioparams[i - 1],
02575                                               attinmeta->atttypmods[i - 1]);
02576             errpos.cur_attno = 0;
02577         }
02578         else if (i == SelfItemPointerAttributeNumber)
02579         {
02580             /* ctid --- note we ignore any other system column in result */
02581             if (valstr != NULL)
02582             {
02583                 Datum       datum;
02584 
02585                 datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
02586                 ctid = (ItemPointer) DatumGetPointer(datum);
02587             }
02588         }
02589 
02590         j++;
02591     }
02592 
02593     /* Uninstall error context callback. */
02594     error_context_stack = errcallback.previous;
02595 
02596     /*
02597      * Check we got the expected number of columns.  Note: j == 0 and
02598      * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
02599      */
02600     if (j > 0 && j != PQnfields(res))
02601         elog(ERROR, "remote query result does not match the foreign table");
02602 
02603     /*
02604      * Build the result tuple in caller's memory context.
02605      */
02606     MemoryContextSwitchTo(oldcontext);
02607 
02608     tuple = heap_form_tuple(tupdesc, values, nulls);
02609 
02610     if (ctid)
02611         tuple->t_self = *ctid;
02612 
02613     /* Clean up */
02614     MemoryContextReset(temp_context);
02615 
02616     return tuple;
02617 }
02618 
02619 /*
02620  * Callback function which is called when error occurs during column value
02621  * conversion.  Print names of column and relation.
02622  */
02623 static void
02624 conversion_error_callback(void *arg)
02625 {
02626     ConversionLocation *errpos = (ConversionLocation *) arg;
02627     TupleDesc   tupdesc = RelationGetDescr(errpos->rel);
02628 
02629     if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
02630         errcontext("column \"%s\" of foreign table \"%s\"",
02631                    NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname),
02632                    RelationGetRelationName(errpos->rel));
02633 }