Header And Logo

PostgreSQL
| The world's most advanced open source database.

Data Structures | Defines | Typedefs | Enumerations | Functions | Variables

postgres_fdw.c File Reference

#include "postgres.h"
#include "postgres_fdw.h"
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/cost.h"
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
#include "optimizer/planmain.h"
#include "optimizer/prep.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/var.h"
#include "parser/parsetree.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
Include dependency graph for postgres_fdw.c:

Go to the source code of this file.

Data Structures

struct  PgFdwRelationInfo
struct  PgFdwScanState
struct  PgFdwModifyState
struct  PgFdwAnalyzeState
struct  ConversionLocation
struct  ec_member_foreign_arg

Defines

#define DEFAULT_FDW_STARTUP_COST   100.0
#define DEFAULT_FDW_TUPLE_COST   0.01

Typedefs

typedef struct PgFdwRelationInfo PgFdwRelationInfo
typedef struct PgFdwScanState PgFdwScanState
typedef struct PgFdwModifyState PgFdwModifyState
typedef struct PgFdwAnalyzeState PgFdwAnalyzeState
typedef struct ConversionLocation ConversionLocation

Enumerations

enum  FdwScanPrivateIndex { FdwScanPrivateSelectSql, FdwScanPrivateRetrievedAttrs }
enum  FdwModifyPrivateIndex { FdwModifyPrivateUpdateSql, FdwModifyPrivateTargetAttnums, FdwModifyPrivateHasReturning, FdwModifyPrivateRetrievedAttrs }

Functions

Datum postgres_fdw_handler (PG_FUNCTION_ARGS)
 PG_FUNCTION_INFO_V1 (postgres_fdw_handler)
static void postgresGetForeignRelSize (PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
static void postgresGetForeignPaths (PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
static ForeignScanpostgresGetForeignPlan (PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses)
static void postgresBeginForeignScan (ForeignScanState *node, int eflags)
static TupleTableSlotpostgresIterateForeignScan (ForeignScanState *node)
static void postgresReScanForeignScan (ForeignScanState *node)
static void postgresEndForeignScan (ForeignScanState *node)
static void postgresAddForeignUpdateTargets (Query *parsetree, RangeTblEntry *target_rte, Relation target_relation)
static ListpostgresPlanForeignModify (PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index)
static void postgresBeginForeignModify (ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, List *fdw_private, int subplan_index, int eflags)
static TupleTableSlotpostgresExecForeignInsert (EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
static TupleTableSlotpostgresExecForeignUpdate (EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
static TupleTableSlotpostgresExecForeignDelete (EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot)
static void postgresEndForeignModify (EState *estate, ResultRelInfo *resultRelInfo)
static void postgresExplainForeignScan (ForeignScanState *node, ExplainState *es)
static void postgresExplainForeignModify (ModifyTableState *mtstate, ResultRelInfo *rinfo, List *fdw_private, int subplan_index, ExplainState *es)
static bool postgresAnalyzeForeignTable (Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages)
static void estimate_path_cost_size (PlannerInfo *root, RelOptInfo *baserel, List *join_conds, double *p_rows, int *p_width, Cost *p_startup_cost, Cost *p_total_cost)
static void get_remote_estimate (const char *sql, PGconn *conn, double *rows, int *width, Cost *startup_cost, Cost *total_cost)
static bool ec_member_matches_foreign (PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg)
static void create_cursor (ForeignScanState *node)
static void fetch_more_data (ForeignScanState *node)
static void close_cursor (PGconn *conn, unsigned int cursor_number)
static void prepare_foreign_modify (PgFdwModifyState *fmstate)
static const char ** convert_prep_stmt_params (PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot *slot)
static void store_returning_result (PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res)
static int postgresAcquireSampleRowsFunc (Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, double *totaldeadrows)
static void analyze_row_processor (PGresult *res, int row, PgFdwAnalyzeState *astate)
static HeapTuple make_tuple_from_result_row (PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, MemoryContext temp_context)
static void conversion_error_callback (void *arg)
int set_transmission_modes (void)
void reset_transmission_modes (int nestlevel)

Variables

 PG_MODULE_MAGIC

Define Documentation

#define DEFAULT_FDW_STARTUP_COST   100.0

Definition at line 44 of file postgres_fdw.c.

#define DEFAULT_FDW_TUPLE_COST   0.01

Definition at line 47 of file postgres_fdw.c.


Typedef Documentation


Enumeration Type Documentation

Enumerator:
FdwModifyPrivateUpdateSql 
FdwModifyPrivateTargetAttnums 
FdwModifyPrivateHasReturning 
FdwModifyPrivateRetrievedAttrs 

Definition at line 114 of file postgres_fdw.c.

{
    /* SQL statement to execute remotely (as a String node) */
    FdwModifyPrivateUpdateSql,
    /* Integer list of target attribute numbers for INSERT/UPDATE */
    FdwModifyPrivateTargetAttnums,
    /* has-returning flag (as an integer Value node) */
    FdwModifyPrivateHasReturning,
    /* Integer list of attribute numbers retrieved by RETURNING */
    FdwModifyPrivateRetrievedAttrs
};

Enumerator:
FdwScanPrivateSelectSql 
FdwScanPrivateRetrievedAttrs 

Definition at line 96 of file postgres_fdw.c.

{
    /* SQL statement to execute remotely (as a String node) */
    FdwScanPrivateSelectSql,
    /* Integer list of attribute numbers retrieved by the SELECT */
    FdwScanPrivateRetrievedAttrs
};


Function Documentation

static void analyze_row_processor ( PGresult res,
int  row,
PgFdwAnalyzeState astate 
) [static]

Definition at line 2435 of file postgres_fdw.c.

References PgFdwAnalyzeState::anl_cxt, anl_get_next_S(), anl_random_fract(), Assert, PgFdwAnalyzeState::attinmeta, heap_freetuple(), make_tuple_from_result_row(), MemoryContextSwitchTo(), PgFdwAnalyzeState::numrows, PgFdwAnalyzeState::rel, PgFdwAnalyzeState::retrieved_attrs, PgFdwAnalyzeState::rows, PgFdwAnalyzeState::rowstoskip, PgFdwAnalyzeState::rstate, PgFdwAnalyzeState::samplerows, PgFdwAnalyzeState::targrows, and PgFdwAnalyzeState::temp_cxt.

Referenced by postgresAcquireSampleRowsFunc().

{
    int         targrows = astate->targrows;
    int         pos;            /* array index to store tuple in */
    MemoryContext oldcontext;

    /* Always increment sample row counter. */
    astate->samplerows += 1;

    /*
     * Determine the slot where this sample row should be stored.  Set pos to
     * negative value to indicate the row should be skipped.
     */
    if (astate->numrows < targrows)
    {
        /* First targrows rows are always included into the sample */
        pos = astate->numrows++;
    }
    else
    {
        /*
         * Now we start replacing tuples in the sample until we reach the end
         * of the relation.  Same algorithm as in acquire_sample_rows in
         * analyze.c; see Jeff Vitter's paper.
         */
        if (astate->rowstoskip < 0)
            astate->rowstoskip = anl_get_next_S(astate->samplerows, targrows,
                                                &astate->rstate);

        if (astate->rowstoskip <= 0)
        {
            /* Choose a random reservoir element to replace. */
            pos = (int) (targrows * anl_random_fract());
            Assert(pos >= 0 && pos < targrows);
            heap_freetuple(astate->rows[pos]);
        }
        else
        {
            /* Skip this tuple. */
            pos = -1;
        }

        astate->rowstoskip -= 1;
    }

    if (pos >= 0)
    {
        /*
         * Create sample tuple from current result row, and store it in the
         * position determined above.  The tuple has to be created in anl_cxt.
         */
        oldcontext = MemoryContextSwitchTo(astate->anl_cxt);

        astate->rows[pos] = make_tuple_from_result_row(res, row,
                                                       astate->rel,
                                                       astate->attinmeta,
                                                       astate->retrieved_attrs,
                                                       astate->temp_cxt);

        MemoryContextSwitchTo(oldcontext);
    }
}

static void close_cursor ( PGconn conn,
unsigned int  cursor_number 
) [static]

Definition at line 2059 of file postgres_fdw.c.

References ERROR, pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQexec(), PQresultStatus(), and snprintf().

Referenced by postgresAcquireSampleRowsFunc(), and postgresEndForeignScan().

{
    char        sql[64];
    PGresult   *res;

    snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);

    /*
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
    res = PQexec(conn, sql);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
        pgfdw_report_error(ERROR, res, true, sql);
    PQclear(res);
}

static void conversion_error_callback ( void *  arg  )  [static]

Definition at line 2624 of file postgres_fdw.c.

References tupleDesc::attrs, ConversionLocation::cur_attno, errcontext, NameStr, tupleDesc::natts, ConversionLocation::rel, RelationGetDescr, and RelationGetRelationName.

{
    ConversionLocation *errpos = (ConversionLocation *) arg;
    TupleDesc   tupdesc = RelationGetDescr(errpos->rel);

    if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
        errcontext("column \"%s\" of foreign table \"%s\"",
                   NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname),
                   RelationGetRelationName(errpos->rel));
}

static const char ** convert_prep_stmt_params ( PgFdwModifyState fmstate,
ItemPointer  tupleid,
TupleTableSlot slot 
) [static]

Definition at line 2126 of file postgres_fdw.c.

References Assert, lfirst_int, MemoryContextSwitchTo(), NIL, NULL, OutputFunctionCall(), PgFdwModifyState::p_flinfo, PgFdwModifyState::p_nums, palloc(), PointerGetDatum, reset_transmission_modes(), set_transmission_modes(), slot_getattr(), PgFdwModifyState::target_attrs, PgFdwModifyState::temp_cxt, and value.

Referenced by postgresExecForeignDelete(), postgresExecForeignInsert(), and postgresExecForeignUpdate().

{
    const char **p_values;
    int         pindex = 0;
    MemoryContext oldcontext;

    oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);

    p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);

    /* 1st parameter should be ctid, if it's in use */
    if (tupleid != NULL)
    {
        /* don't need set_transmission_modes for TID output */
        p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
                                              PointerGetDatum(tupleid));
        pindex++;
    }

    /* get following parameters from slot */
    if (slot != NULL && fmstate->target_attrs != NIL)
    {
        int         nestlevel;
        ListCell   *lc;

        nestlevel = set_transmission_modes();

        foreach(lc, fmstate->target_attrs)
        {
            int         attnum = lfirst_int(lc);
            Datum       value;
            bool        isnull;

            value = slot_getattr(slot, attnum, &isnull);
            if (isnull)
                p_values[pindex] = NULL;
            else
                p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
                                                      value);
            pindex++;
        }

        reset_transmission_modes(nestlevel);
    }

    Assert(pindex == fmstate->p_nums);

    MemoryContextSwitchTo(oldcontext);

    return p_values;
}

static void create_cursor ( ForeignScanState node  )  [static]

Definition at line 1844 of file postgres_fdw.c.

References appendStringInfo(), buf, PgFdwScanState::conn, conn, PgFdwScanState::cursor_exists, PgFdwScanState::cursor_number, PgFdwScanState::eof_reached, ERROR, ExecEvalExpr, ForeignScanState::fdw_state, PgFdwScanState::fetch_ct_2, i, initStringInfo(), lfirst, MemoryContextSwitchTo(), PgFdwScanState::next_tuple, NULL, PgFdwScanState::num_tuples, PgFdwScanState::numParams, OutputFunctionCall(), PgFdwScanState::param_exprs, PgFdwScanState::param_flinfo, PgFdwScanState::param_values, pfree(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQexecParams(), PQresultStatus(), ScanState::ps, PlanState::ps_ExprContext, PgFdwScanState::query, reset_transmission_modes(), set_transmission_modes(), ForeignScanState::ss, PgFdwScanState::tuples, and values.

Referenced by postgresIterateForeignScan().

{
    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
    ExprContext *econtext = node->ss.ps.ps_ExprContext;
    int         numParams = fsstate->numParams;
    const char **values = fsstate->param_values;
    PGconn     *conn = fsstate->conn;
    StringInfoData buf;
    PGresult   *res;

    /*
     * Construct array of query parameter values in text format.  We do the
     * conversions in the short-lived per-tuple context, so as not to cause a
     * memory leak over repeated scans.
     */
    if (numParams > 0)
    {
        int         nestlevel;
        MemoryContext oldcontext;
        int         i;
        ListCell   *lc;

        oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);

        nestlevel = set_transmission_modes();

        i = 0;
        foreach(lc, fsstate->param_exprs)
        {
            ExprState  *expr_state = (ExprState *) lfirst(lc);
            Datum       expr_value;
            bool        isNull;

            /* Evaluate the parameter expression */
            expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);

            /*
             * Get string representation of each parameter value by invoking
             * type-specific output function, unless the value is null.
             */
            if (isNull)
                values[i] = NULL;
            else
                values[i] = OutputFunctionCall(&fsstate->param_flinfo[i],
                                               expr_value);
            i++;
        }

        reset_transmission_modes(nestlevel);

        MemoryContextSwitchTo(oldcontext);
    }

    /* Construct the DECLARE CURSOR command */
    initStringInfo(&buf);
    appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
                     fsstate->cursor_number, fsstate->query);

    /*
     * Notice that we pass NULL for paramTypes, thus forcing the remote server
     * to infer types for all parameters.  Since we explicitly cast every
     * parameter (see deparse.c), the "inference" is trivial and will produce
     * the desired result.  This allows us to avoid assuming that the remote
     * server has the same OIDs we do for the parameters' types.
     *
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
    res = PQexecParams(conn, buf.data, numParams, NULL, values,
                       NULL, NULL, 0);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
        pgfdw_report_error(ERROR, res, true, fsstate->query);
    PQclear(res);

    /* Mark the cursor as created, and show no tuples have been retrieved */
    fsstate->cursor_exists = true;
    fsstate->tuples = NULL;
    fsstate->num_tuples = 0;
    fsstate->next_tuple = 0;
    fsstate->fetch_ct_2 = 0;
    fsstate->eof_reached = false;

    /* Clean up */
    pfree(buf.data);
}

static bool ec_member_matches_foreign ( PlannerInfo root,
RelOptInfo rel,
EquivalenceClass ec,
EquivalenceMember em,
void *  arg 
) [static]

Definition at line 1815 of file postgres_fdw.c.

References ec_member_foreign_arg::already_used, ec_member_foreign_arg::current, EquivalenceMember::em_expr, equal(), list_member(), and NULL.

Referenced by postgresGetForeignPaths().

{
    ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg;
    Expr       *expr = em->em_expr;

    /*
     * If we've identified what we're processing in the current scan, we only
     * want to match that expression.
     */
    if (state->current != NULL)
        return equal(expr, state->current);

    /*
     * Otherwise, ignore anything we've already processed.
     */
    if (list_member(state->already_used, expr))
        return false;

    /* This is the new target to process. */
    state->current = expr;
    return true;
}

static void estimate_path_cost_size ( PlannerInfo root,
RelOptInfo baserel,
List join_conds,
double *  p_rows,
int *  p_width,
Cost p_startup_cost,
Cost p_total_cost 
) [static]

Definition at line 1646 of file postgres_fdw.c.

References appendStringInfoString(), appendWhereClause(), Assert, PgFdwRelationInfo::attrs_used, RelOptInfo::baserestrictcost, clamp_row_est(), conn, cpu_tuple_cost, StringInfoData::data, deparseSelectSql(), RelOptInfo::fdw_private, PgFdwRelationInfo::fdw_startup_cost, PgFdwRelationInfo::fdw_tuple_cost, get_remote_estimate(), GetConnection(), initStringInfo(), PgFdwRelationInfo::local_conds_cost, PgFdwRelationInfo::local_conds_sel, Min, NIL, NULL, RelOptInfo::pages, QualCost::per_tuple, ReleaseConnection(), PgFdwRelationInfo::remote_conds, RelOptInfo::rows, seq_page_cost, PgFdwRelationInfo::server, QualCost::startup, RelOptInfo::tuples, PgFdwRelationInfo::use_remote_estimate, PgFdwRelationInfo::user, and RelOptInfo::width.

Referenced by postgresGetForeignPaths(), and postgresGetForeignRelSize().

{
    PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
    double      rows;
    double      retrieved_rows;
    int         width;
    Cost        startup_cost;
    Cost        total_cost;
    Cost        run_cost;
    Cost        cpu_per_tuple;

    /*
     * If the table or the server is configured to use remote estimates,
     * connect to the foreign server and execute EXPLAIN to estimate the
     * number of rows selected by the restriction+join clauses.  Otherwise,
     * estimate rows using whatever statistics we have locally, in a way
     * similar to ordinary tables.
     */
    if (fpinfo->use_remote_estimate)
    {
        StringInfoData sql;
        List       *retrieved_attrs;
        PGconn     *conn;

        /*
         * Construct EXPLAIN query including the desired SELECT, FROM, and
         * WHERE clauses.  Params and other-relation Vars are replaced by
         * dummy values.
         */
        initStringInfo(&sql);
        appendStringInfoString(&sql, "EXPLAIN ");
        deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
                         &retrieved_attrs);
        if (fpinfo->remote_conds)
            appendWhereClause(&sql, root, baserel, fpinfo->remote_conds,
                              true, NULL);
        if (join_conds)
            appendWhereClause(&sql, root, baserel, join_conds,
                              (fpinfo->remote_conds == NIL), NULL);

        /* Get the remote estimate */
        conn = GetConnection(fpinfo->server, fpinfo->user, false);
        get_remote_estimate(sql.data, conn, &rows, &width,
                            &startup_cost, &total_cost);
        ReleaseConnection(conn);

        retrieved_rows = rows;

        /* Factor in the selectivity of the local_conds */
        rows = clamp_row_est(rows * fpinfo->local_conds_sel);

        /* Add in the eval cost of the local_conds */
        startup_cost += fpinfo->local_conds_cost.startup;
        total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
    }
    else
    {
        /*
         * We don't support join conditions in this mode (hence, no
         * parameterized paths can be made).
         */
        Assert(join_conds == NIL);

        /* Use rows/width estimates made by set_baserel_size_estimates. */
        rows = baserel->rows;
        width = baserel->width;

        /*
         * Back into an estimate of the number of retrieved rows.  Just in
         * case this is nuts, clamp to at most baserel->tuples.
         */
        retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
        retrieved_rows = Min(retrieved_rows, baserel->tuples);

        /*
         * Cost as though this were a seqscan, which is pessimistic.  We
         * effectively imagine the local_conds are being evaluated remotely,
         * too.
         */
        startup_cost = 0;
        run_cost = 0;
        run_cost += seq_page_cost * baserel->pages;

        startup_cost += baserel->baserestrictcost.startup;
        cpu_per_tuple = cpu_tuple_cost + baserel->baserestrictcost.per_tuple;
        run_cost += cpu_per_tuple * baserel->tuples;

        total_cost = startup_cost + run_cost;
    }

    /*
     * Add some additional cost factors to account for connection overhead
     * (fdw_startup_cost), transferring data across the network
     * (fdw_tuple_cost per retrieved row), and local manipulation of the data
     * (cpu_tuple_cost per retrieved row).
     */
    startup_cost += fpinfo->fdw_startup_cost;
    total_cost += fpinfo->fdw_startup_cost;
    total_cost += fpinfo->fdw_tuple_cost * retrieved_rows;
    total_cost += cpu_tuple_cost * retrieved_rows;

    /* Return results. */
    *p_rows = rows;
    *p_width = width;
    *p_startup_cost = startup_cost;
    *p_total_cost = total_cost;
}

static void fetch_more_data ( ForeignScanState node  )  [static]

Definition at line 1934 of file postgres_fdw.c.

References PgFdwScanState::attinmeta, PgFdwScanState::batch_cxt, PgFdwScanState::conn, conn, PgFdwScanState::cursor_number, PgFdwScanState::eof_reached, ERROR, ForeignScanState::fdw_state, PgFdwScanState::fetch_ct_2, i, make_tuple_from_result_row(), MemoryContextReset(), MemoryContextSwitchTo(), PgFdwScanState::next_tuple, NULL, PgFdwScanState::num_tuples, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_report_error(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQntuples(), PQresultStatus(), PgFdwScanState::query, PgFdwScanState::rel, PgFdwScanState::retrieved_attrs, snprintf(), PgFdwScanState::temp_cxt, and PgFdwScanState::tuples.

Referenced by postgresIterateForeignScan().

{
    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
    PGresult   *volatile res = NULL;
    MemoryContext oldcontext;

    /*
     * We'll store the tuples in the batch_cxt.  First, flush the previous
     * batch.
     */
    fsstate->tuples = NULL;
    MemoryContextReset(fsstate->batch_cxt);
    oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);

    /* PGresult must be released before leaving this function. */
    PG_TRY();
    {
        PGconn     *conn = fsstate->conn;
        char        sql[64];
        int         fetch_size;
        int         numrows;
        int         i;

        /* The fetch size is arbitrary, but shouldn't be enormous. */
        fetch_size = 100;

        snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
                 fetch_size, fsstate->cursor_number);

        res = PQexec(conn, sql);
        /* On error, report the original query, not the FETCH. */
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
            pgfdw_report_error(ERROR, res, false, fsstate->query);

        /* Convert the data into HeapTuples */
        numrows = PQntuples(res);
        fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
        fsstate->num_tuples = numrows;
        fsstate->next_tuple = 0;

        for (i = 0; i < numrows; i++)
        {
            fsstate->tuples[i] =
                make_tuple_from_result_row(res, i,
                                           fsstate->rel,
                                           fsstate->attinmeta,
                                           fsstate->retrieved_attrs,
                                           fsstate->temp_cxt);
        }

        /* Update fetch_ct_2 */
        if (fsstate->fetch_ct_2 < 2)
            fsstate->fetch_ct_2++;

        /* Must be EOF if we didn't get as many tuples as we asked for. */
        fsstate->eof_reached = (numrows < fetch_size);

        PQclear(res);
        res = NULL;
    }
    PG_CATCH();
    {
        if (res)
            PQclear(res);
        PG_RE_THROW();
    }
    PG_END_TRY();

    MemoryContextSwitchTo(oldcontext);
}

static void get_remote_estimate ( const char *  sql,
PGconn conn,
double *  rows,
int *  width,
Cost startup_cost,
Cost total_cost 
) [static]

Definition at line 1763 of file postgres_fdw.c.

References elog, ERROR, NULL, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_report_error(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), and PQresultStatus().

Referenced by estimate_path_cost_size().

{
    PGresult   *volatile res = NULL;

    /* PGresult must be released before leaving this function. */
    PG_TRY();
    {
        char       *line;
        char       *p;
        int         n;

        /*
         * Execute EXPLAIN remotely.
         */
        res = PQexec(conn, sql);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
            pgfdw_report_error(ERROR, res, false, sql);

        /*
         * Extract cost numbers for topmost plan node.  Note we search for a
         * left paren from the end of the line to avoid being confused by
         * other uses of parentheses.
         */
        line = PQgetvalue(res, 0, 0);
        p = strrchr(line, '(');
        if (p == NULL)
            elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
        n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
                   startup_cost, total_cost, rows, width);
        if (n != 4)
            elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);

        PQclear(res);
        res = NULL;
    }
    PG_CATCH();
    {
        if (res)
            PQclear(res);
        PG_RE_THROW();
    }
    PG_END_TRY();
}

static HeapTuple make_tuple_from_result_row ( PGresult res,
int  row,
Relation  rel,
AttInMetadata attinmeta,
List retrieved_attrs,
MemoryContext  temp_context 
) [static]

Definition at line 2507 of file postgres_fdw.c.

References ErrorContextCallback::arg, Assert, AttInMetadata::attinfuncs, AttInMetadata::attioparams, AttInMetadata::atttypmods, ErrorContextCallback::callback, CStringGetDatum, ConversionLocation::cur_attno, DatumGetPointer, DirectFunctionCall1, elog, ERROR, error_context_stack, heap_form_tuple(), i, InputFunctionCall(), lfirst_int, MemoryContextReset(), MemoryContextSwitchTo(), tupleDesc::natts, NULL, palloc(), palloc0(), PQgetisnull(), PQgetvalue(), PQnfields(), PQntuples(), ErrorContextCallback::previous, ConversionLocation::rel, RelationGetDescr, SelfItemPointerAttributeNumber, HeapTupleData::t_self, tidin(), and values.

Referenced by analyze_row_processor(), fetch_more_data(), and store_returning_result().

{
    HeapTuple   tuple;
    TupleDesc   tupdesc = RelationGetDescr(rel);
    Datum      *values;
    bool       *nulls;
    ItemPointer ctid = NULL;
    ConversionLocation errpos;
    ErrorContextCallback errcallback;
    MemoryContext oldcontext;
    ListCell   *lc;
    int         j;

    Assert(row < PQntuples(res));

    /*
     * Do the following work in a temp context that we reset after each tuple.
     * This cleans up not only the data we have direct access to, but any
     * cruft the I/O functions might leak.
     */
    oldcontext = MemoryContextSwitchTo(temp_context);

    values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
    nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
    /* Initialize to nulls for any columns not present in result */
    memset(nulls, true, tupdesc->natts * sizeof(bool));

    /*
     * Set up and install callback to report where conversion error occurs.
     */
    errpos.rel = rel;
    errpos.cur_attno = 0;
    errcallback.callback = conversion_error_callback;
    errcallback.arg = (void *) &errpos;
    errcallback.previous = error_context_stack;
    error_context_stack = &errcallback;

    /*
     * i indexes columns in the relation, j indexes columns in the PGresult.
     */
    j = 0;
    foreach(lc, retrieved_attrs)
    {
        int         i = lfirst_int(lc);
        char       *valstr;

        /* fetch next column's textual value */
        if (PQgetisnull(res, row, j))
            valstr = NULL;
        else
            valstr = PQgetvalue(res, row, j);

        /* convert value to internal representation */
        if (i > 0)
        {
            /* ordinary column */
            Assert(i <= tupdesc->natts);
            nulls[i - 1] = (valstr == NULL);
            /* Apply the input function even to nulls, to support domains */
            errpos.cur_attno = i;
            values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
                                              valstr,
                                              attinmeta->attioparams[i - 1],
                                              attinmeta->atttypmods[i - 1]);
            errpos.cur_attno = 0;
        }
        else if (i == SelfItemPointerAttributeNumber)
        {
            /* ctid --- note we ignore any other system column in result */
            if (valstr != NULL)
            {
                Datum       datum;

                datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
                ctid = (ItemPointer) DatumGetPointer(datum);
            }
        }

        j++;
    }

    /* Uninstall error context callback. */
    error_context_stack = errcallback.previous;

    /*
     * Check we got the expected number of columns.  Note: j == 0 and
     * PQnfields == 1 is expected, since deparse emits a NULL if no columns.
     */
    if (j > 0 && j != PQnfields(res))
        elog(ERROR, "remote query result does not match the foreign table");

    /*
     * Build the result tuple in caller's memory context.
     */
    MemoryContextSwitchTo(oldcontext);

    tuple = heap_form_tuple(tupdesc, values, nulls);

    if (ctid)
        tuple->t_self = *ctid;

    /* Clean up */
    MemoryContextReset(temp_context);

    return tuple;
}

PG_FUNCTION_INFO_V1 ( postgres_fdw_handler   ) 
Datum postgres_fdw_handler ( PG_FUNCTION_ARGS   ) 

Definition at line 337 of file postgres_fdw.c.

References FdwRoutine::AddForeignUpdateTargets, FdwRoutine::AnalyzeForeignTable, FdwRoutine::BeginForeignModify, FdwRoutine::BeginForeignScan, FdwRoutine::EndForeignModify, FdwRoutine::EndForeignScan, FdwRoutine::ExecForeignDelete, FdwRoutine::ExecForeignInsert, FdwRoutine::ExecForeignUpdate, FdwRoutine::ExplainForeignModify, FdwRoutine::ExplainForeignScan, FdwRoutine::GetForeignPaths, FdwRoutine::GetForeignPlan, FdwRoutine::GetForeignRelSize, FdwRoutine::IterateForeignScan, makeNode, PG_RETURN_POINTER, FdwRoutine::PlanForeignModify, and FdwRoutine::ReScanForeignScan.

{
    FdwRoutine *routine = makeNode(FdwRoutine);

    /* Functions for scanning foreign tables */
    routine->GetForeignRelSize = postgresGetForeignRelSize;
    routine->GetForeignPaths = postgresGetForeignPaths;
    routine->GetForeignPlan = postgresGetForeignPlan;
    routine->BeginForeignScan = postgresBeginForeignScan;
    routine->IterateForeignScan = postgresIterateForeignScan;
    routine->ReScanForeignScan = postgresReScanForeignScan;
    routine->EndForeignScan = postgresEndForeignScan;

    /* Functions for updating foreign tables */
    routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
    routine->PlanForeignModify = postgresPlanForeignModify;
    routine->BeginForeignModify = postgresBeginForeignModify;
    routine->ExecForeignInsert = postgresExecForeignInsert;
    routine->ExecForeignUpdate = postgresExecForeignUpdate;
    routine->ExecForeignDelete = postgresExecForeignDelete;
    routine->EndForeignModify = postgresEndForeignModify;

    /* Support functions for EXPLAIN */
    routine->ExplainForeignScan = postgresExplainForeignScan;
    routine->ExplainForeignModify = postgresExplainForeignModify;

    /* Support functions for ANALYZE */
    routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;

    PG_RETURN_POINTER(routine);
}

static int postgresAcquireSampleRowsFunc ( Relation  relation,
int  elevel,
HeapTuple rows,
int  targrows,
double *  totalrows,
double *  totaldeadrows 
) [static]

Definition at line 2298 of file postgres_fdw.c.

References ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE, ALLOCSET_SMALL_MINSIZE, AllocSetContextCreate(), analyze_row_processor(), PgFdwAnalyzeState::anl_cxt, anl_init_selection_state(), appendStringInfo(), PgFdwAnalyzeState::attinmeta, CHECK_FOR_INTERRUPTS, close_cursor(), conn, CurrentMemoryContext, cursor_number, StringInfoData::data, deparseAnalyzeSql(), ereport, errmsg(), ERROR, GetConnection(), GetCursorNumber(), GetForeignServer(), GetForeignTable(), GetUserMapping(), i, initStringInfo(), PgFdwAnalyzeState::numrows, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_report_error(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQexec(), PQntuples(), PQresultStatus(), RelationData::rd_rel, PgFdwAnalyzeState::rel, RelationGetDescr, RelationGetRelationName, RelationGetRelid, ReleaseConnection(), PgFdwAnalyzeState::retrieved_attrs, PgFdwAnalyzeState::rows, PgFdwAnalyzeState::rowstoskip, PgFdwAnalyzeState::rstate, PgFdwAnalyzeState::samplerows, ForeignServer::serverid, ForeignTable::serverid, snprintf(), PgFdwAnalyzeState::targrows, PgFdwAnalyzeState::temp_cxt, TupleDescGetAttInMetadata(), and user.

{
    PgFdwAnalyzeState astate;
    ForeignTable *table;
    ForeignServer *server;
    UserMapping *user;
    PGconn     *conn;
    unsigned int cursor_number;
    StringInfoData sql;
    PGresult   *volatile res = NULL;

    /* Initialize workspace state */
    astate.rel = relation;
    astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));

    astate.rows = rows;
    astate.targrows = targrows;
    astate.numrows = 0;
    astate.samplerows = 0;
    astate.rowstoskip = -1;     /* -1 means not set yet */
    astate.rstate = anl_init_selection_state(targrows);

    /* Remember ANALYZE context, and create a per-tuple temp context */
    astate.anl_cxt = CurrentMemoryContext;
    astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
                                            "postgres_fdw temporary data",
                                            ALLOCSET_SMALL_MINSIZE,
                                            ALLOCSET_SMALL_INITSIZE,
                                            ALLOCSET_SMALL_MAXSIZE);

    /*
     * Get the connection to use.  We do the remote access as the table's
     * owner, even if the ANALYZE was started by some other user.
     */
    table = GetForeignTable(RelationGetRelid(relation));
    server = GetForeignServer(table->serverid);
    user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
    conn = GetConnection(server, user, false);

    /*
     * Construct cursor that retrieves whole rows from remote.
     */
    cursor_number = GetCursorNumber(conn);
    initStringInfo(&sql);
    appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
    deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);

    /* In what follows, do not risk leaking any PGresults. */
    PG_TRY();
    {
        res = PQexec(conn, sql.data);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
            pgfdw_report_error(ERROR, res, false, sql.data);
        PQclear(res);
        res = NULL;

        /* Retrieve and process rows a batch at a time. */
        for (;;)
        {
            char        fetch_sql[64];
            int         fetch_size;
            int         numrows;
            int         i;

            /* Allow users to cancel long query */
            CHECK_FOR_INTERRUPTS();

            /*
             * XXX possible future improvement: if rowstoskip is large, we
             * could issue a MOVE rather than physically fetching the rows,
             * then just adjust rowstoskip and samplerows appropriately.
             */

            /* The fetch size is arbitrary, but shouldn't be enormous. */
            fetch_size = 100;

            /* Fetch some rows */
            snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
                     fetch_size, cursor_number);

            res = PQexec(conn, fetch_sql);
            /* On error, report the original query, not the FETCH. */
            if (PQresultStatus(res) != PGRES_TUPLES_OK)
                pgfdw_report_error(ERROR, res, false, sql.data);

            /* Process whatever we got. */
            numrows = PQntuples(res);
            for (i = 0; i < numrows; i++)
                analyze_row_processor(res, i, &astate);

            PQclear(res);
            res = NULL;

            /* Must be EOF if we didn't get all the rows requested. */
            if (numrows < fetch_size)
                break;
        }

        /* Close the cursor, just to be tidy. */
        close_cursor(conn, cursor_number);
    }
    PG_CATCH();
    {
        if (res)
            PQclear(res);
        PG_RE_THROW();
    }
    PG_END_TRY();

    ReleaseConnection(conn);

    /* We assume that we have no dead tuple. */
    *totaldeadrows = 0.0;

    /* We've retrieved all living tuples from foreign server. */
    *totalrows = astate.samplerows;

    /*
     * Emit some interesting relation info
     */
    ereport(elevel,
            (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
                    RelationGetRelationName(relation),
                    astate.samplerows, astate.numrows)));

    return astate.numrows;
}

static void postgresAddForeignUpdateTargets ( Query parsetree,
RangeTblEntry target_rte,
Relation  target_relation 
) [static]

Definition at line 1106 of file postgres_fdw.c.

References InvalidOid, lappend(), list_length(), makeTargetEntry(), makeVar(), pstrdup(), Query::resultRelation, SelfItemPointerAttributeNumber, Query::targetList, and TIDOID.

{
    Var        *var;
    const char *attrname;
    TargetEntry *tle;

    /*
     * In postgres_fdw, what we need is the ctid, same as for a regular table.
     */

    /* Make a Var representing the desired value */
    var = makeVar(parsetree->resultRelation,
                  SelfItemPointerAttributeNumber,
                  TIDOID,
                  -1,
                  InvalidOid,
                  0);

    /* Wrap it in a resjunk TLE with the right name ... */
    attrname = "ctid";

    tle = makeTargetEntry((Expr *) var,
                          list_length(parsetree->targetList) + 1,
                          pstrdup(attrname),
                          true);

    /* ... and add it to the query's targetlist */
    parsetree->targetList = lappend(parsetree->targetList, tle);
}

static bool postgresAnalyzeForeignTable ( Relation  relation,
AcquireSampleRowsFunc func,
BlockNumber totalpages 
) [static]

Definition at line 2218 of file postgres_fdw.c.

References conn, StringInfoData::data, deparseAnalyzeSizeSql(), elog, ERROR, GetConnection(), GetForeignServer(), GetForeignTable(), GetUserMapping(), initStringInfo(), NULL, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_report_error(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), RelationData::rd_rel, RelationGetRelid, ReleaseConnection(), ForeignServer::serverid, ForeignTable::serverid, and user.

{
    ForeignTable *table;
    ForeignServer *server;
    UserMapping *user;
    PGconn     *conn;
    StringInfoData sql;
    PGresult   *volatile res = NULL;

    /* Return the row-analysis function pointer */
    *func = postgresAcquireSampleRowsFunc;

    /*
     * Now we have to get the number of pages.  It's annoying that the ANALYZE
     * API requires us to return that now, because it forces some duplication
     * of effort between this routine and postgresAcquireSampleRowsFunc.  But
     * it's probably not worth redefining that API at this point.
     */

    /*
     * Get the connection to use.  We do the remote access as the table's
     * owner, even if the ANALYZE was started by some other user.
     */
    table = GetForeignTable(RelationGetRelid(relation));
    server = GetForeignServer(table->serverid);
    user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
    conn = GetConnection(server, user, false);

    /*
     * Construct command to get page count for relation.
     */
    initStringInfo(&sql);
    deparseAnalyzeSizeSql(&sql, relation);

    /* In what follows, do not risk leaking any PGresults. */
    PG_TRY();
    {
        res = PQexec(conn, sql.data);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
            pgfdw_report_error(ERROR, res, false, sql.data);

        if (PQntuples(res) != 1 || PQnfields(res) != 1)
            elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
        *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);

        PQclear(res);
        res = NULL;
    }
    PG_CATCH();
    {
        if (res)
            PQclear(res);
        PG_RE_THROW();
    }
    PG_END_TRY();

    ReleaseConnection(conn);

    return true;
}

static void postgresBeginForeignModify ( ModifyTableState mtstate,
ResultRelInfo resultRelInfo,
List fdw_private,
int  subplan_index,
int  eflags 
) [static]

Definition at line 1253 of file postgres_fdw.c.

References ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE, ALLOCSET_SMALL_MINSIZE, AllocSetContextCreate(), Assert, PgFdwModifyState::attinmeta, AttributeNumberIsValid, RangeTblEntry::checkAsUser, CMD_DELETE, CMD_INSERT, CMD_UPDATE, PgFdwModifyState::conn, PgFdwModifyState::ctidAttno, elog, ERROR, EState::es_query_cxt, EState::es_range_table, EXEC_FLAG_EXPLAIN_ONLY, ExecFindJunkAttributeInTlist(), FdwModifyPrivateHasReturning, FdwModifyPrivateRetrievedAttrs, FdwModifyPrivateTargetAttnums, FdwModifyPrivateUpdateSql, fmgr_info(), GetConnection(), GetForeignServer(), GetForeignTable(), getTypeOutputInfo(), GetUserId(), GetUserMapping(), PgFdwModifyState::has_returning, intVal, lfirst_int, list_length(), list_nth(), ModifyTableState::mt_plans, ModifyTableState::operation, PgFdwModifyState::p_flinfo, PgFdwModifyState::p_name, PgFdwModifyState::p_nums, palloc0(), PlanState::plan, ModifyTableState::ps, PgFdwModifyState::query, PgFdwModifyState::rel, RelationGetDescr, RelationGetRelid, PgFdwModifyState::retrieved_attrs, ResultRelInfo::ri_FdwState, ResultRelInfo::ri_RangeTableIndex, ResultRelInfo::ri_RelationDesc, rt_fetch, ForeignServer::serverid, ForeignTable::serverid, PlanState::state, strVal, PgFdwModifyState::target_attrs, Plan::targetlist, PgFdwModifyState::temp_cxt, TIDOID, TupleDescGetAttInMetadata(), and user.

{
    PgFdwModifyState *fmstate;
    EState     *estate = mtstate->ps.state;
    CmdType     operation = mtstate->operation;
    Relation    rel = resultRelInfo->ri_RelationDesc;
    RangeTblEntry *rte;
    Oid         userid;
    ForeignTable *table;
    ForeignServer *server;
    UserMapping *user;
    AttrNumber  n_params;
    Oid         typefnoid;
    bool        isvarlena;
    ListCell   *lc;

    /*
     * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
     * stays NULL.
     */
    if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
        return;

    /* Begin constructing PgFdwModifyState. */
    fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
    fmstate->rel = rel;

    /*
     * Identify which user to do the remote access as.  This should match what
     * ExecCheckRTEPerms() does.
     */
    rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
    userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();

    /* Get info about foreign table. */
    table = GetForeignTable(RelationGetRelid(rel));
    server = GetForeignServer(table->serverid);
    user = GetUserMapping(userid, server->serverid);

    /* Open connection; report that we'll create a prepared statement. */
    fmstate->conn = GetConnection(server, user, true);
    fmstate->p_name = NULL;     /* prepared statement not made yet */

    /* Deconstruct fdw_private data. */
    fmstate->query = strVal(list_nth(fdw_private,
                                     FdwModifyPrivateUpdateSql));
    fmstate->target_attrs = (List *) list_nth(fdw_private,
                                              FdwModifyPrivateTargetAttnums);
    fmstate->has_returning = intVal(list_nth(fdw_private,
                                             FdwModifyPrivateHasReturning));
    fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
                                                 FdwModifyPrivateRetrievedAttrs);

    /* Create context for per-tuple temp workspace. */
    fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
                                              "postgres_fdw temporary data",
                                              ALLOCSET_SMALL_MINSIZE,
                                              ALLOCSET_SMALL_INITSIZE,
                                              ALLOCSET_SMALL_MAXSIZE);

    /* Prepare for input conversion of RETURNING results. */
    if (fmstate->has_returning)
        fmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));

    /* Prepare for output conversion of parameters used in prepared stmt. */
    n_params = list_length(fmstate->target_attrs) + 1;
    fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
    fmstate->p_nums = 0;

    if (operation == CMD_UPDATE || operation == CMD_DELETE)
    {
        /* Find the ctid resjunk column in the subplan's result */
        Plan       *subplan = mtstate->mt_plans[subplan_index]->plan;

        fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
                                                          "ctid");
        if (!AttributeNumberIsValid(fmstate->ctidAttno))
            elog(ERROR, "could not find junk ctid column");

        /* First transmittable parameter will be ctid */
        getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
        fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
        fmstate->p_nums++;
    }

    if (operation == CMD_INSERT || operation == CMD_UPDATE)
    {
        /* Set up for remaining transmittable parameters */
        foreach(lc, fmstate->target_attrs)
        {
            int         attnum = lfirst_int(lc);
            Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];

            Assert(!attr->attisdropped);

            getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
            fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
            fmstate->p_nums++;
        }
    }

    Assert(fmstate->p_nums <= n_params);

    resultRelInfo->ri_FdwState = fmstate;
}

static void postgresBeginForeignScan ( ForeignScanState node,
int  eflags 
) [static]

Definition at line 871 of file postgres_fdw.c.

References ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE, ALLOCSET_SMALL_MINSIZE, AllocSetContextCreate(), EXEC_FLAG_EXPLAIN_ONLY, ExecInitExpr(), exprType(), ForeignScan::fdw_exprs, ForeignScan::fdw_private, ForeignScanState::fdw_state, FdwScanPrivateRetrievedAttrs, FdwScanPrivateSelectSql, fmgr_info(), GetConnection(), GetCursorNumber(), GetForeignServer(), GetForeignTable(), getTypeOutputInfo(), GetUserId(), GetUserMapping(), i, lfirst, list_length(), list_nth(), palloc0(), PlanState::plan, ScanState::ps, RelationGetDescr, RelationGetRelid, rt_fetch, ForeignScan::scan, Scan::scanrelid, ForeignScanState::ss, ScanState::ss_currentRelation, PlanState::state, strVal, TupleDescGetAttInMetadata(), and user.

{
    ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
    EState     *estate = node->ss.ps.state;
    PgFdwScanState *fsstate;
    RangeTblEntry *rte;
    Oid         userid;
    ForeignTable *table;
    ForeignServer *server;
    UserMapping *user;
    int         numParams;
    int         i;
    ListCell   *lc;

    /*
     * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
     */
    if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
        return;

    /*
     * We'll save private state in node->fdw_state.
     */
    fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
    node->fdw_state = (void *) fsstate;

    /*
     * Identify which user to do the remote access as.  This should match what
     * ExecCheckRTEPerms() does.
     */
    rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
    userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();

    /* Get info about foreign table. */
    fsstate->rel = node->ss.ss_currentRelation;
    table = GetForeignTable(RelationGetRelid(fsstate->rel));
    server = GetForeignServer(table->serverid);
    user = GetUserMapping(userid, server->serverid);

    /*
     * Get connection to the foreign server.  Connection manager will
     * establish new connection if necessary.
     */
    fsstate->conn = GetConnection(server, user, false);

    /* Assign a unique ID for my cursor */
    fsstate->cursor_number = GetCursorNumber(fsstate->conn);
    fsstate->cursor_exists = false;

    /* Get private info created by planner functions. */
    fsstate->query = strVal(list_nth(fsplan->fdw_private,
                                     FdwScanPrivateSelectSql));
    fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
                                                 FdwScanPrivateRetrievedAttrs);

    /* Create contexts for batches of tuples and per-tuple temp workspace. */
    fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
                                               "postgres_fdw tuple data",
                                               ALLOCSET_DEFAULT_MINSIZE,
                                               ALLOCSET_DEFAULT_INITSIZE,
                                               ALLOCSET_DEFAULT_MAXSIZE);
    fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
                                              "postgres_fdw temporary data",
                                              ALLOCSET_SMALL_MINSIZE,
                                              ALLOCSET_SMALL_INITSIZE,
                                              ALLOCSET_SMALL_MAXSIZE);

    /* Get info we'll need for input data conversion. */
    fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel));

    /* Prepare for output conversion of parameters used in remote query. */
    numParams = list_length(fsplan->fdw_exprs);
    fsstate->numParams = numParams;
    fsstate->param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);

    i = 0;
    foreach(lc, fsplan->fdw_exprs)
    {
        Node       *param_expr = (Node *) lfirst(lc);
        Oid         typefnoid;
        bool        isvarlena;

        getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
        fmgr_info(typefnoid, &fsstate->param_flinfo[i]);
        i++;
    }

    /*
     * Prepare remote-parameter expressions for evaluation.  (Note: in
     * practice, we expect that all these expressions will be just Params, so
     * we could possibly do something more efficient than using the full
     * expression-eval machinery for this.  But probably there would be little
     * benefit, and it'd require postgres_fdw to know more than is desirable
     * about Param evaluation.)
     */
    fsstate->param_exprs = (List *)
        ExecInitExpr((Expr *) fsplan->fdw_exprs,
                     (PlanState *) node);

    /*
     * Allocate buffer for text form of query parameters, if any.
     */
    if (numParams > 0)
        fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
    else
        fsstate->param_values = NULL;
}

static void postgresEndForeignModify ( EState estate,
ResultRelInfo resultRelInfo 
) [static]

Definition at line 1566 of file postgres_fdw.c.

References PgFdwModifyState::conn, ERROR, NULL, PgFdwModifyState::p_name, pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQexec(), PQresultStatus(), ReleaseConnection(), ResultRelInfo::ri_FdwState, and snprintf().

{
    PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;

    /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
    if (fmstate == NULL)
        return;

    /* If we created a prepared statement, destroy it */
    if (fmstate->p_name)
    {
        char        sql[64];
        PGresult   *res;

        snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);

        /*
         * We don't use a PG_TRY block here, so be careful not to throw error
         * without releasing the PGresult.
         */
        res = PQexec(fmstate->conn, sql);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
            pgfdw_report_error(ERROR, res, true, sql);
        PQclear(res);
        fmstate->p_name = NULL;
    }

    /* Release remote connection */
    ReleaseConnection(fmstate->conn);
    fmstate->conn = NULL;
}

static void postgresEndForeignScan ( ForeignScanState node  )  [static]

Definition at line 1082 of file postgres_fdw.c.

References close_cursor(), PgFdwScanState::conn, PgFdwScanState::cursor_exists, PgFdwScanState::cursor_number, ForeignScanState::fdw_state, NULL, and ReleaseConnection().

{
    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;

    /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
    if (fsstate == NULL)
        return;

    /* Close the cursor if open, to prevent accumulation of cursors */
    if (fsstate->cursor_exists)
        close_cursor(fsstate->conn, fsstate->cursor_number);

    /* Release remote connection */
    ReleaseConnection(fsstate->conn);
    fsstate->conn = NULL;

    /* MemoryContexts will be deleted automatically. */
}

static TupleTableSlot * postgresExecForeignDelete ( EState estate,
ResultRelInfo resultRelInfo,
TupleTableSlot slot,
TupleTableSlot planSlot 
) [static]

Definition at line 1496 of file postgres_fdw.c.

References PgFdwModifyState::conn, convert_prep_stmt_params(), PgFdwModifyState::ctidAttno, DatumGetPointer, elog, ERROR, ExecGetJunkAttribute(), PgFdwModifyState::has_returning, MemoryContextReset(), NULL, PgFdwModifyState::p_name, PgFdwModifyState::p_nums, pgfdw_report_error(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQcmdTuples(), PQexecPrepared(), PQntuples(), PQresultStatus(), prepare_foreign_modify(), PgFdwModifyState::query, ResultRelInfo::ri_FdwState, store_returning_result(), and PgFdwModifyState::temp_cxt.

{
    PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
    Datum       datum;
    bool        isNull;
    const char **p_values;
    PGresult   *res;
    int         n_rows;

    /* Set up the prepared statement on the remote server, if we didn't yet */
    if (!fmstate->p_name)
        prepare_foreign_modify(fmstate);

    /* Get the ctid that was passed up as a resjunk column */
    datum = ExecGetJunkAttribute(planSlot,
                                 fmstate->ctidAttno,
                                 &isNull);
    /* shouldn't ever get a null result... */
    if (isNull)
        elog(ERROR, "ctid is NULL");

    /* Convert parameters needed by prepared statement to text form */
    p_values = convert_prep_stmt_params(fmstate,
                                        (ItemPointer) DatumGetPointer(datum),
                                        NULL);

    /*
     * Execute the prepared statement, and check for success.
     *
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
    res = PQexecPrepared(fmstate->conn,
                         fmstate->p_name,
                         fmstate->p_nums,
                         p_values,
                         NULL,
                         NULL,
                         0);
    if (PQresultStatus(res) !=
        (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
        pgfdw_report_error(ERROR, res, true, fmstate->query);

    /* Check number of rows affected, and fetch RETURNING tuple if any */
    if (fmstate->has_returning)
    {
        n_rows = PQntuples(res);
        if (n_rows > 0)
            store_returning_result(fmstate, slot, res);
    }
    else
        n_rows = atoi(PQcmdTuples(res));

    /* And clean up */
    PQclear(res);

    MemoryContextReset(fmstate->temp_cxt);

    /* Return NULL if nothing was deleted on the remote end */
    return (n_rows > 0) ? slot : NULL;
}

static TupleTableSlot * postgresExecForeignInsert ( EState estate,
ResultRelInfo resultRelInfo,
TupleTableSlot slot,
TupleTableSlot planSlot 
) [static]

Definition at line 1368 of file postgres_fdw.c.

References PgFdwModifyState::conn, convert_prep_stmt_params(), ERROR, PgFdwModifyState::has_returning, MemoryContextReset(), NULL, PgFdwModifyState::p_name, PgFdwModifyState::p_nums, pgfdw_report_error(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQcmdTuples(), PQexecPrepared(), PQntuples(), PQresultStatus(), prepare_foreign_modify(), PgFdwModifyState::query, ResultRelInfo::ri_FdwState, store_returning_result(), and PgFdwModifyState::temp_cxt.

{
    PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
    const char **p_values;
    PGresult   *res;
    int         n_rows;

    /* Set up the prepared statement on the remote server, if we didn't yet */
    if (!fmstate->p_name)
        prepare_foreign_modify(fmstate);

    /* Convert parameters needed by prepared statement to text form */
    p_values = convert_prep_stmt_params(fmstate, NULL, slot);

    /*
     * Execute the prepared statement, and check for success.
     *
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
    res = PQexecPrepared(fmstate->conn,
                         fmstate->p_name,
                         fmstate->p_nums,
                         p_values,
                         NULL,
                         NULL,
                         0);
    if (PQresultStatus(res) !=
        (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
        pgfdw_report_error(ERROR, res, true, fmstate->query);

    /* Check number of rows affected, and fetch RETURNING tuple if any */
    if (fmstate->has_returning)
    {
        n_rows = PQntuples(res);
        if (n_rows > 0)
            store_returning_result(fmstate, slot, res);
    }
    else
        n_rows = atoi(PQcmdTuples(res));

    /* And clean up */
    PQclear(res);

    MemoryContextReset(fmstate->temp_cxt);

    /* Return NULL if nothing was inserted on the remote end */
    return (n_rows > 0) ? slot : NULL;
}

static TupleTableSlot * postgresExecForeignUpdate ( EState estate,
ResultRelInfo resultRelInfo,
TupleTableSlot slot,
TupleTableSlot planSlot 
) [static]

Definition at line 1426 of file postgres_fdw.c.

References PgFdwModifyState::conn, convert_prep_stmt_params(), PgFdwModifyState::ctidAttno, DatumGetPointer, elog, ERROR, ExecGetJunkAttribute(), PgFdwModifyState::has_returning, MemoryContextReset(), NULL, PgFdwModifyState::p_name, PgFdwModifyState::p_nums, pgfdw_report_error(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQcmdTuples(), PQexecPrepared(), PQntuples(), PQresultStatus(), prepare_foreign_modify(), PgFdwModifyState::query, ResultRelInfo::ri_FdwState, store_returning_result(), and PgFdwModifyState::temp_cxt.

{
    PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
    Datum       datum;
    bool        isNull;
    const char **p_values;
    PGresult   *res;
    int         n_rows;

    /* Set up the prepared statement on the remote server, if we didn't yet */
    if (!fmstate->p_name)
        prepare_foreign_modify(fmstate);

    /* Get the ctid that was passed up as a resjunk column */
    datum = ExecGetJunkAttribute(planSlot,
                                 fmstate->ctidAttno,
                                 &isNull);
    /* shouldn't ever get a null result... */
    if (isNull)
        elog(ERROR, "ctid is NULL");

    /* Convert parameters needed by prepared statement to text form */
    p_values = convert_prep_stmt_params(fmstate,
                                        (ItemPointer) DatumGetPointer(datum),
                                        slot);

    /*
     * Execute the prepared statement, and check for success.
     *
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
    res = PQexecPrepared(fmstate->conn,
                         fmstate->p_name,
                         fmstate->p_nums,
                         p_values,
                         NULL,
                         NULL,
                         0);
    if (PQresultStatus(res) !=
        (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
        pgfdw_report_error(ERROR, res, true, fmstate->query);

    /* Check number of rows affected, and fetch RETURNING tuple if any */
    if (fmstate->has_returning)
    {
        n_rows = PQntuples(res);
        if (n_rows > 0)
            store_returning_result(fmstate, slot, res);
    }
    else
        n_rows = atoi(PQcmdTuples(res));

    /* And clean up */
    PQclear(res);

    MemoryContextReset(fmstate->temp_cxt);

    /* Return NULL if nothing was updated on the remote end */
    return (n_rows > 0) ? slot : NULL;
}

static void postgresExplainForeignModify ( ModifyTableState mtstate,
ResultRelInfo rinfo,
List fdw_private,
int  subplan_index,
ExplainState es 
) [static]

Definition at line 1622 of file postgres_fdw.c.

References ExplainPropertyText(), FdwModifyPrivateUpdateSql, list_nth(), strVal, and ExplainState::verbose.

{
    if (es->verbose)
    {
        char       *sql = strVal(list_nth(fdw_private,
                                          FdwModifyPrivateUpdateSql));

        ExplainPropertyText("Remote SQL", sql, es);
    }
}

static void postgresExplainForeignScan ( ForeignScanState node,
ExplainState es 
) [static]

Definition at line 1604 of file postgres_fdw.c.

References ExplainPropertyText(), FdwScanPrivateSelectSql, list_nth(), PlanState::plan, ScanState::ps, ForeignScanState::ss, strVal, and ExplainState::verbose.

{
    List       *fdw_private;
    char       *sql;

    if (es->verbose)
    {
        fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
        sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
        ExplainPropertyText("Remote SQL", sql, es);
    }
}

static void postgresGetForeignPaths ( PlannerInfo root,
RelOptInfo baserel,
Oid  foreigntableid 
) [static]

Definition at line 535 of file postgres_fdw.c.

References add_path(), ec_member_foreign_arg::already_used, arg, Assert, bms_add_member(), bms_del_member(), bms_is_empty(), bms_is_member(), bms_overlap(), bms_union(), RestrictInfo::clause, RestrictInfo::clause_relids, create_foreignscan_path(), ec_member_foreign_arg::current, ec_member_matches_foreign(), estimate_path_cost_size(), RelOptInfo::fdw_private, generate_implied_equalities_for_column(), RelOptInfo::has_eclass_joins, is_foreign_expr(), join_clause_is_movable_to(), RelOptInfo::joininfo, lappend(), PlannerInfo::lateral_info_list, LateralJoinInfo::lateral_lhs, RelOptInfo::lateral_relids, LateralJoinInfo::lateral_rhs, lfirst, list_make1, NIL, NULL, RelOptInfo::relid, PgFdwRelationInfo::rows, PgFdwRelationInfo::startup_cost, PgFdwRelationInfo::total_cost, and PgFdwRelationInfo::use_remote_estimate.

{
    PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
    ForeignPath *path;
    Relids      lateral_referencers;
    List       *join_quals;
    Relids      required_outer;
    double      rows;
    int         width;
    Cost        startup_cost;
    Cost        total_cost;
    ListCell   *lc;

    /*
     * Create simplest ForeignScan path node and add it to baserel.  This path
     * corresponds to SeqScan path of regular tables (though depending on what
     * baserestrict conditions we were able to send to remote, there might
     * actually be an indexscan happening there).  We already did all the work
     * to estimate cost and size of this path.
     */
    path = create_foreignscan_path(root, baserel,
                                   fpinfo->rows,
                                   fpinfo->startup_cost,
                                   fpinfo->total_cost,
                                   NIL, /* no pathkeys */
                                   NULL,        /* no outer rel either */
                                   NIL);        /* no fdw_private list */
    add_path(baserel, (Path *) path);

    /*
     * If we're not using remote estimates, stop here.  We have no way to
     * estimate whether any join clauses would be worth sending across, so
     * don't bother building parameterized paths.
     */
    if (!fpinfo->use_remote_estimate)
        return;

    /*
     * As a crude first hack, we consider each available join clause and try
     * to make a parameterized path using just that clause.  Later we should
     * consider combinations of clauses, probably.
     */

    /*
     * If there are any rels that have LATERAL references to this one, we
     * cannot use join quals referencing them as remote quals for this one,
     * since such rels would have to be on the inside not the outside of a
     * nestloop join relative to this one.  Create a Relids set listing all
     * such rels, for use in checks of potential join clauses.
     */
    lateral_referencers = NULL;
    foreach(lc, root->lateral_info_list)
    {
        LateralJoinInfo *ljinfo = (LateralJoinInfo *) lfirst(lc);

        if (bms_is_member(baserel->relid, ljinfo->lateral_lhs))
            lateral_referencers = bms_add_member(lateral_referencers,
                                                 ljinfo->lateral_rhs);
    }

    /* Scan the rel's join clauses */
    foreach(lc, baserel->joininfo)
    {
        RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);

        /* Check if clause can be moved to this rel */
        if (!join_clause_is_movable_to(rinfo, baserel->relid))
            continue;

        /* Not useful if it conflicts with any LATERAL references */
        if (bms_overlap(rinfo->clause_relids, lateral_referencers))
            continue;

        /* See if it is safe to send to remote */
        if (!is_foreign_expr(root, baserel, rinfo->clause))
            continue;

        /*
         * OK, get a cost estimate from the remote, and make a path.
         */
        join_quals = list_make1(rinfo);
        estimate_path_cost_size(root, baserel, join_quals,
                                &rows, &width,
                                &startup_cost, &total_cost);

        /* Must calculate required outer rels for this path */
        required_outer = bms_union(rinfo->clause_relids,
                                   baserel->lateral_relids);
        /* We do not want the foreign rel itself listed in required_outer */
        required_outer = bms_del_member(required_outer, baserel->relid);
        /* Enforce convention that required_outer is exactly NULL if empty */
        if (bms_is_empty(required_outer))
            required_outer = NULL;

        path = create_foreignscan_path(root, baserel,
                                       rows,
                                       startup_cost,
                                       total_cost,
                                       NIL,     /* no pathkeys */
                                       required_outer,
                                       NIL);    /* no fdw_private list */
        add_path(baserel, (Path *) path);
    }

    /*
     * The above scan examined only "generic" join clauses, not those that
     * were absorbed into EquivalenceClauses.  See if we can make anything out
     * of EquivalenceClauses.
     */
    if (baserel->has_eclass_joins)
    {
        /*
         * We repeatedly scan the eclass list looking for column references
         * (or expressions) belonging to the foreign rel.  Each time we find
         * one, we generate a list of equivalence joinclauses for it, and then
         * try to make those into foreign paths.  Repeat till there are no
         * more candidate EC members.
         */
        ec_member_foreign_arg arg;

        arg.already_used = NIL;
        for (;;)
        {
            List       *clauses;

            /* Make clauses, skipping any that join to lateral_referencers */
            arg.current = NULL;
            clauses = generate_implied_equalities_for_column(root,
                                                             baserel,
                                                   ec_member_matches_foreign,
                                                             (void *) &arg,
                                                        lateral_referencers);

            /* Done if there are no more expressions in the foreign rel */
            if (arg.current == NULL)
            {
                Assert(clauses == NIL);
                break;
            }

            /* Scan the extracted join clauses */
            foreach(lc, clauses)
            {
                RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);

                /* Check if clause can be moved to this rel */
                if (!join_clause_is_movable_to(rinfo, baserel->relid))
                    continue;

                /* Shouldn't conflict with any LATERAL references */
                Assert(!bms_overlap(rinfo->clause_relids, lateral_referencers));

                /* See if it is safe to send to remote */
                if (!is_foreign_expr(root, baserel, rinfo->clause))
                    continue;

                /*
                 * OK, get a cost estimate from the remote, and make a path.
                 */
                join_quals = list_make1(rinfo);
                estimate_path_cost_size(root, baserel, join_quals,
                                        &rows, &width,
                                        &startup_cost, &total_cost);

                /* Must calculate required outer rels for this path */
                required_outer = bms_union(rinfo->clause_relids,
                                           baserel->lateral_relids);
                required_outer = bms_del_member(required_outer, baserel->relid);
                if (bms_is_empty(required_outer))
                    required_outer = NULL;

                path = create_foreignscan_path(root, baserel,
                                               rows,
                                               startup_cost,
                                               total_cost,
                                               NIL,     /* no pathkeys */
                                               required_outer,
                                               NIL);    /* no fdw_private */
                add_path(baserel, (Path *) path);
            }

            /* Try again, now ignoring the expression we found this time */
            arg.already_used = lappend(arg.already_used, arg.current);
        }
    }
}

static ForeignScan * postgresGetForeignPlan ( PlannerInfo root,
RelOptInfo baserel,
Oid  foreigntableid,
ForeignPath best_path,
List tlist,
List scan_clauses 
) [static]

Definition at line 729 of file postgres_fdw.c.

References appendStringInfo(), appendWhereClause(), Assert, PgFdwRelationInfo::attrs_used, RestrictInfo::clause, CMD_DELETE, CMD_UPDATE, Query::commandType, deparseSelectSql(), RelOptInfo::fdw_private, get_parse_rowmark(), initStringInfo(), is_foreign_expr(), IsA, lappend(), LCS_FORKEYSHARE, LCS_FORNOKEYUPDATE, LCS_FORSHARE, LCS_FORUPDATE, lfirst, list_make2, list_member_ptr(), PgFdwRelationInfo::local_conds, make_foreignscan(), makeString(), NIL, PlannerInfo::parse, RestrictInfo::pseudoconstant, RelOptInfo::relid, PgFdwRelationInfo::remote_conds, Query::resultRelation, and RowMarkClause::strength.

{
    PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
    Index       scan_relid = baserel->relid;
    List       *fdw_private;
    List       *remote_conds = NIL;
    List       *local_exprs = NIL;
    List       *params_list = NIL;
    List       *retrieved_attrs;
    StringInfoData sql;
    ListCell   *lc;

    /*
     * Separate the scan_clauses into those that can be executed remotely and
     * those that can't.  baserestrictinfo clauses that were previously
     * determined to be safe or unsafe by classifyClauses are shown in
     * fpinfo->remote_conds and fpinfo->local_conds.  Anything else in the
     * scan_clauses list should be a join clause that was found safe by
     * postgresGetForeignPaths.
     *
     * Note: for clauses extracted from EquivalenceClasses, it's possible that
     * what we get here is a different representation of the clause than what
     * postgresGetForeignPaths saw; for example we might get a commuted
     * version of the clause.  So we can't insist on simple equality as we do
     * for the baserestrictinfo clauses.
     *
     * This code must match "extract_actual_clauses(scan_clauses, false)"
     * except for the additional decision about remote versus local execution.
     * Note however that we only strip the RestrictInfo nodes from the
     * local_exprs list, since appendWhereClause expects a list of
     * RestrictInfos.
     */
    foreach(lc, scan_clauses)
    {
        RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);

        Assert(IsA(rinfo, RestrictInfo));

        /* Ignore any pseudoconstants, they're dealt with elsewhere */
        if (rinfo->pseudoconstant)
            continue;

        if (list_member_ptr(fpinfo->remote_conds, rinfo))
            remote_conds = lappend(remote_conds, rinfo);
        else if (list_member_ptr(fpinfo->local_conds, rinfo))
            local_exprs = lappend(local_exprs, rinfo->clause);
        else
        {
            Assert(is_foreign_expr(root, baserel, rinfo->clause));
            remote_conds = lappend(remote_conds, rinfo);
        }
    }

    /*
     * Build the query string to be sent for execution, and identify
     * expressions to be sent as parameters.
     */
    initStringInfo(&sql);
    deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used,
                     &retrieved_attrs);
    if (remote_conds)
        appendWhereClause(&sql, root, baserel, remote_conds,
                          true, &params_list);

    /*
     * Add FOR UPDATE/SHARE if appropriate.  We apply locking during the
     * initial row fetch, rather than later on as is done for local tables.
     * The extra roundtrips involved in trying to duplicate the local
     * semantics exactly don't seem worthwhile (see also comments for
     * RowMarkType).
     *
     * Note: because we actually run the query as a cursor, this assumes that
     * DECLARE CURSOR ... FOR UPDATE is supported, which it isn't before 8.3.
     */
    if (baserel->relid == root->parse->resultRelation &&
        (root->parse->commandType == CMD_UPDATE ||
         root->parse->commandType == CMD_DELETE))
    {
        /* Relation is UPDATE/DELETE target, so use FOR UPDATE */
        appendStringInfo(&sql, " FOR UPDATE");
    }
    else
    {
        RowMarkClause *rc = get_parse_rowmark(root->parse, baserel->relid);

        if (rc)
        {
            /*
             * Relation is specified as a FOR UPDATE/SHARE target, so handle
             * that.
             *
             * For now, just ignore any [NO] KEY specification, since (a) it's
             * not clear what that means for a remote table that we don't have
             * complete information about, and (b) it wouldn't work anyway on
             * older remote servers.  Likewise, we don't worry about NOWAIT.
             */
            switch (rc->strength)
            {
                case LCS_FORKEYSHARE:
                case LCS_FORSHARE:
                    appendStringInfo(&sql, " FOR SHARE");
                    break;
                case LCS_FORNOKEYUPDATE:
                case LCS_FORUPDATE:
                    appendStringInfo(&sql, " FOR UPDATE");
                    break;
            }
        }
    }

    /*
     * Build the fdw_private list that will be available to the executor.
     * Items in the list must match enum FdwScanPrivateIndex, above.
     */
    fdw_private = list_make2(makeString(sql.data),
                             retrieved_attrs);

    /*
     * Create the ForeignScan node from target list, local filtering
     * expressions, remote parameter expressions, and FDW private information.
     *
     * Note that the remote parameter expressions are stored in the fdw_exprs
     * field of the finished plan node; we can't keep them in private state
     * because then they wouldn't be subject to later planner processing.
     */
    return make_foreignscan(tlist,
                            local_exprs,
                            scan_relid,
                            params_list,
                            fdw_private);
}

static void postgresGetForeignRelSize ( PlannerInfo root,
RelOptInfo baserel,
Oid  foreigntableid 
) [static]

Definition at line 377 of file postgres_fdw.c.

References PgFdwRelationInfo::attrs_used, RangeTblEntry::checkAsUser, classifyConditions(), RestrictInfo::clause, clauselist_selectivity(), cost_qual_eval(), defGetBoolean(), defGetString(), DefElem::defname, estimate_path_cost_size(), RelOptInfo::fdw_private, PgFdwRelationInfo::fdw_startup_cost, PgFdwRelationInfo::fdw_tuple_cost, GetForeignServer(), GetForeignTable(), GetUserId(), GetUserMapping(), JOIN_INNER, lfirst, PgFdwRelationInfo::local_conds, PgFdwRelationInfo::local_conds_cost, PgFdwRelationInfo::local_conds_sel, NIL, NULL, ForeignTable::options, ForeignServer::options, RelOptInfo::pages, palloc0(), planner_rt_fetch, pull_varattnos(), RelOptInfo::relid, RelOptInfo::reltargetlist, PgFdwRelationInfo::remote_conds, RelOptInfo::rows, PgFdwRelationInfo::rows, PgFdwRelationInfo::server, ForeignServer::serverid, ForeignTable::serverid, set_baserel_size_estimates(), PgFdwRelationInfo::startup_cost, PgFdwRelationInfo::table, PgFdwRelationInfo::total_cost, RelOptInfo::tuples, PgFdwRelationInfo::use_remote_estimate, PgFdwRelationInfo::user, RelOptInfo::width, and PgFdwRelationInfo::width.

{
    PgFdwRelationInfo *fpinfo;
    ListCell   *lc;

    /*
     * We use PgFdwRelationInfo to pass various information to subsequent
     * functions.
     */
    fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
    baserel->fdw_private = (void *) fpinfo;

    /* Look up foreign-table catalog info. */
    fpinfo->table = GetForeignTable(foreigntableid);
    fpinfo->server = GetForeignServer(fpinfo->table->serverid);

    /*
     * Extract user-settable option values.  Note that per-table setting of
     * use_remote_estimate overrides per-server setting.
     */
    fpinfo->use_remote_estimate = false;
    fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
    fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;

    foreach(lc, fpinfo->server->options)
    {
        DefElem    *def = (DefElem *) lfirst(lc);

        if (strcmp(def->defname, "use_remote_estimate") == 0)
            fpinfo->use_remote_estimate = defGetBoolean(def);
        else if (strcmp(def->defname, "fdw_startup_cost") == 0)
            fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL);
        else if (strcmp(def->defname, "fdw_tuple_cost") == 0)
            fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL);
    }
    foreach(lc, fpinfo->table->options)
    {
        DefElem    *def = (DefElem *) lfirst(lc);

        if (strcmp(def->defname, "use_remote_estimate") == 0)
        {
            fpinfo->use_remote_estimate = defGetBoolean(def);
            break;              /* only need the one value */
        }
    }

    /*
     * If the table or the server is configured to use remote estimates,
     * identify which user to do remote access as during planning.  This
     * should match what ExecCheckRTEPerms() does.  If we fail due to lack of
     * permissions, the query would have failed at runtime anyway.
     */
    if (fpinfo->use_remote_estimate)
    {
        RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
        Oid         userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();

        fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
    }
    else
        fpinfo->user = NULL;

    /*
     * Identify which baserestrictinfo clauses can be sent to the remote
     * server and which can't.
     */
    classifyConditions(root, baserel,
                       &fpinfo->remote_conds, &fpinfo->local_conds);

    /*
     * Identify which attributes will need to be retrieved from the remote
     * server.  These include all attrs needed for joins or final output, plus
     * all attrs used in the local_conds.  (Note: if we end up using a
     * parameterized scan, it's possible that some of the join clauses will be
     * sent to the remote and thus we wouldn't really need to retrieve the
     * columns used in them.  Doesn't seem worth detecting that case though.)
     */
    fpinfo->attrs_used = NULL;
    pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
                   &fpinfo->attrs_used);
    foreach(lc, fpinfo->local_conds)
    {
        RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);

        pull_varattnos((Node *) rinfo->clause, baserel->relid,
                       &fpinfo->attrs_used);
    }

    /*
     * Compute the selectivity and cost of the local_conds, so we don't have
     * to do it over again for each path.  The best we can do for these
     * conditions is to estimate selectivity on the basis of local statistics.
     */
    fpinfo->local_conds_sel = clauselist_selectivity(root,
                                                     fpinfo->local_conds,
                                                     baserel->relid,
                                                     JOIN_INNER,
                                                     NULL);

    cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);

    /*
     * If the table or the server is configured to use remote estimates,
     * connect to the foreign server and execute EXPLAIN to estimate the
     * number of rows selected by the restriction clauses, as well as the
     * average row width.  Otherwise, estimate using whatever statistics we
     * have locally, in a way similar to ordinary tables.
     */
    if (fpinfo->use_remote_estimate)
    {
        /*
         * Get cost/size estimates with help of remote server.  Save the
         * values in fpinfo so we don't need to do it again to generate the
         * basic foreign path.
         */
        estimate_path_cost_size(root, baserel, NIL,
                                &fpinfo->rows, &fpinfo->width,
                                &fpinfo->startup_cost, &fpinfo->total_cost);

        /* Report estimated baserel size to planner. */
        baserel->rows = fpinfo->rows;
        baserel->width = fpinfo->width;
    }
    else
    {
        /*
         * If the foreign table has never been ANALYZEd, it will have relpages
         * and reltuples equal to zero, which most likely has nothing to do
         * with reality.  We can't do a whole lot about that if we're not
         * allowed to consult the remote server, but we can use a hack similar
         * to plancat.c's treatment of empty relations: use a minimum size
         * estimate of 10 pages, and divide by the column-datatype-based width
         * estimate to get the corresponding number of tuples.
         */
        if (baserel->pages == 0 && baserel->tuples == 0)
        {
            baserel->pages = 10;
            baserel->tuples =
                (10 * BLCKSZ) / (baserel->width + sizeof(HeapTupleHeaderData));
        }

        /* Estimate baserel size as best we can with local statistics. */
        set_baserel_size_estimates(root, baserel);

        /* Fill in basically-bogus cost estimates for use later. */
        estimate_path_cost_size(root, baserel, NIL,
                                &fpinfo->rows, &fpinfo->width,
                                &fpinfo->startup_cost, &fpinfo->total_cost);
    }
}

static TupleTableSlot * postgresIterateForeignScan ( ForeignScanState node  )  [static]

Definition at line 985 of file postgres_fdw.c.

References create_cursor(), PgFdwScanState::cursor_exists, PgFdwScanState::eof_reached, ExecClearTuple(), ExecStoreTuple(), ForeignScanState::fdw_state, fetch_more_data(), InvalidBuffer, PgFdwScanState::next_tuple, PgFdwScanState::num_tuples, ForeignScanState::ss, ScanState::ss_ScanTupleSlot, and PgFdwScanState::tuples.

{
    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
    TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;

    /*
     * If this is the first call after Begin or ReScan, we need to create the
     * cursor on the remote side.
     */
    if (!fsstate->cursor_exists)
        create_cursor(node);

    /*
     * Get some more tuples, if we've run out.
     */
    if (fsstate->next_tuple >= fsstate->num_tuples)
    {
        /* No point in another fetch if we already detected EOF, though. */
        if (!fsstate->eof_reached)
            fetch_more_data(node);
        /* If we didn't get any tuples, must be end of data. */
        if (fsstate->next_tuple >= fsstate->num_tuples)
            return ExecClearTuple(slot);
    }

    /*
     * Return the next tuple.
     */
    ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
                   slot,
                   InvalidBuffer,
                   false);

    return slot;
}

static List * postgresPlanForeignModify ( PlannerInfo root,
ModifyTable plan,
Index  resultRelation,
int  subplan_index 
) [static]

Definition at line 1150 of file postgres_fdw.c.

References tupleDesc::attrs, bms_copy(), bms_first_member(), CMD_DELETE, CMD_INSERT, CMD_UPDATE, StringInfoData::data, deparseDeleteSql(), deparseInsertSql(), deparseUpdateSql(), elog, ERROR, heap_close, heap_open(), initStringInfo(), InvalidAttrNumber, lappend_int(), list_make4, list_nth(), makeInteger(), makeString(), RangeTblEntry::modifiedCols, tupleDesc::natts, NIL, NoLock, ModifyTable::operation, planner_rt_fetch, RelationGetDescr, RangeTblEntry::relid, and ModifyTable::returningLists.

{
    CmdType     operation = plan->operation;
    RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
    Relation    rel;
    StringInfoData sql;
    List       *targetAttrs = NIL;
    List       *returningList = NIL;
    List       *retrieved_attrs = NIL;

    initStringInfo(&sql);

    /*
     * Core code already has some lock on each rel being planned, so we can
     * use NoLock here.
     */
    rel = heap_open(rte->relid, NoLock);

    /*
     * In an INSERT, we transmit all columns that are defined in the foreign
     * table.  In an UPDATE, we transmit only columns that were explicitly
     * targets of the UPDATE, so as to avoid unnecessary data transmission.
     * (We can't do that for INSERT since we would miss sending default values
     * for columns not listed in the source statement.)
     */
    if (operation == CMD_INSERT)
    {
        TupleDesc   tupdesc = RelationGetDescr(rel);
        int         attnum;

        for (attnum = 1; attnum <= tupdesc->natts; attnum++)
        {
            Form_pg_attribute attr = tupdesc->attrs[attnum - 1];

            if (!attr->attisdropped)
                targetAttrs = lappend_int(targetAttrs, attnum);
        }
    }
    else if (operation == CMD_UPDATE)
    {
        Bitmapset  *tmpset = bms_copy(rte->modifiedCols);
        AttrNumber  col;

        while ((col = bms_first_member(tmpset)) >= 0)
        {
            col += FirstLowInvalidHeapAttributeNumber;
            if (col <= InvalidAttrNumber)       /* shouldn't happen */
                elog(ERROR, "system-column update is not supported");
            targetAttrs = lappend_int(targetAttrs, col);
        }
    }

    /*
     * Extract the relevant RETURNING list if any.
     */
    if (plan->returningLists)
        returningList = (List *) list_nth(plan->returningLists, subplan_index);

    /*
     * Construct the SQL command string.
     */
    switch (operation)
    {
        case CMD_INSERT:
            deparseInsertSql(&sql, root, resultRelation, rel,
                             targetAttrs, returningList,
                             &retrieved_attrs);
            break;
        case CMD_UPDATE:
            deparseUpdateSql(&sql, root, resultRelation, rel,
                             targetAttrs, returningList,
                             &retrieved_attrs);
            break;
        case CMD_DELETE:
            deparseDeleteSql(&sql, root, resultRelation, rel,
                             returningList,
                             &retrieved_attrs);
            break;
        default:
            elog(ERROR, "unexpected operation: %d", (int) operation);
            break;
    }

    heap_close(rel, NoLock);

    /*
     * Build the fdw_private list that will be available to the executor.
     * Items in the list must match enum FdwModifyPrivateIndex, above.
     */
    return list_make4(makeString(sql.data),
                      targetAttrs,
                      makeInteger((returningList != NIL)),
                      retrieved_attrs);
}

static void postgresReScanForeignScan ( ForeignScanState node  )  [static]

Definition at line 1026 of file postgres_fdw.c.

References PlanState::chgParam, PgFdwScanState::conn, PgFdwScanState::cursor_exists, PgFdwScanState::cursor_number, PgFdwScanState::eof_reached, ERROR, ForeignScanState::fdw_state, PgFdwScanState::fetch_ct_2, PgFdwScanState::next_tuple, NULL, PgFdwScanState::num_tuples, pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQexec(), PQresultStatus(), ScanState::ps, snprintf(), ForeignScanState::ss, and PgFdwScanState::tuples.

{
    PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
    char        sql[64];
    PGresult   *res;

    /* If we haven't created the cursor yet, nothing to do. */
    if (!fsstate->cursor_exists)
        return;

    /*
     * If any internal parameters affecting this node have changed, we'd
     * better destroy and recreate the cursor.  Otherwise, rewinding it should
     * be good enough.  If we've only fetched zero or one batch, we needn't
     * even rewind the cursor, just rescan what we have.
     */
    if (node->ss.ps.chgParam != NULL)
    {
        fsstate->cursor_exists = false;
        snprintf(sql, sizeof(sql), "CLOSE c%u",
                 fsstate->cursor_number);
    }
    else if (fsstate->fetch_ct_2 > 1)
    {
        snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
                 fsstate->cursor_number);
    }
    else
    {
        /* Easy: just rescan what we already have in memory, if anything */
        fsstate->next_tuple = 0;
        return;
    }

    /*
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
    res = PQexec(fsstate->conn, sql);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
        pgfdw_report_error(ERROR, res, true, sql);
    PQclear(res);

    /* Now force a fresh FETCH. */
    fsstate->tuples = NULL;
    fsstate->num_tuples = 0;
    fsstate->next_tuple = 0;
    fsstate->fetch_ct_2 = 0;
    fsstate->eof_reached = false;
}

static void prepare_foreign_modify ( PgFdwModifyState fmstate  )  [static]

Definition at line 2081 of file postgres_fdw.c.

References PgFdwModifyState::conn, ERROR, GetPrepStmtNumber(), NULL, PgFdwModifyState::p_name, pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQprepare(), PQresultStatus(), pstrdup(), PgFdwModifyState::query, and snprintf().

Referenced by postgresExecForeignDelete(), postgresExecForeignInsert(), and postgresExecForeignUpdate().

{
    char        prep_name[NAMEDATALEN];
    char       *p_name;
    PGresult   *res;

    /* Construct name we'll use for the prepared statement. */
    snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
             GetPrepStmtNumber(fmstate->conn));
    p_name = pstrdup(prep_name);

    /*
     * We intentionally do not specify parameter types here, but leave the
     * remote server to derive them by default.  This avoids possible problems
     * with the remote server using different type OIDs than we do.  All of
     * the prepared statements we use in this module are simple enough that
     * the remote server will make the right choices.
     *
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
    res = PQprepare(fmstate->conn,
                    p_name,
                    fmstate->query,
                    0,
                    NULL);

    if (PQresultStatus(res) != PGRES_COMMAND_OK)
        pgfdw_report_error(ERROR, res, true, fmstate->query);
    PQclear(res);

    /* This action shows that the prepare has been done. */
    fmstate->p_name = p_name;
}

void reset_transmission_modes ( int  nestlevel  ) 

Definition at line 2050 of file postgres_fdw.c.

References AtEOXact_GUC().

Referenced by appendWhereClause(), convert_prep_stmt_params(), and create_cursor().

{
    AtEOXact_GUC(true, nestlevel);
}

int set_transmission_modes ( void   ) 

Definition at line 2022 of file postgres_fdw.c.

References DateStyle, extra_float_digits, GUC_ACTION_SAVE, IntervalStyle, INTSTYLE_POSTGRES, NewGUCNestLevel(), PGC_S_SESSION, PGC_USERSET, set_config_option(), and USE_ISO_DATES.

Referenced by appendWhereClause(), convert_prep_stmt_params(), and create_cursor().

{
    int         nestlevel = NewGUCNestLevel();

    /*
     * The values set here should match what pg_dump does.  See also
     * configure_remote_session in connection.c.
     */
    if (DateStyle != USE_ISO_DATES)
        (void) set_config_option("datestyle", "ISO",
                                 PGC_USERSET, PGC_S_SESSION,
                                 GUC_ACTION_SAVE, true, 0);
    if (IntervalStyle != INTSTYLE_POSTGRES)
        (void) set_config_option("intervalstyle", "postgres",
                                 PGC_USERSET, PGC_S_SESSION,
                                 GUC_ACTION_SAVE, true, 0);
    if (extra_float_digits < 3)
        (void) set_config_option("extra_float_digits", "3",
                                 PGC_USERSET, PGC_S_SESSION,
                                 GUC_ACTION_SAVE, true, 0);

    return nestlevel;
}

static void store_returning_result ( PgFdwModifyState fmstate,
TupleTableSlot slot,
PGresult res 
) [static]

Definition at line 2188 of file postgres_fdw.c.

References PgFdwModifyState::attinmeta, ExecStoreTuple(), InvalidBuffer, make_tuple_from_result_row(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PQclear(), PgFdwModifyState::rel, PgFdwModifyState::retrieved_attrs, and PgFdwModifyState::temp_cxt.

Referenced by postgresExecForeignDelete(), postgresExecForeignInsert(), and postgresExecForeignUpdate().

{
    /* PGresult must be released before leaving this function. */
    PG_TRY();
    {
        HeapTuple   newtup;

        newtup = make_tuple_from_result_row(res, 0,
                                            fmstate->rel,
                                            fmstate->attinmeta,
                                            fmstate->retrieved_attrs,
                                            fmstate->temp_cxt);
        /* tuple will be deleted when it is cleared from the slot */
        ExecStoreTuple(newtup, slot, InvalidBuffer, true);
    }
    PG_CATCH();
    {
        if (res)
            PQclear(res);
        PG_RE_THROW();
    }
    PG_END_TRY();
}


Variable Documentation

Definition at line 41 of file postgres_fdw.c.