Header And Logo

PostgreSQL
| The world's most advanced open source database.

Functions

postgres_fdw.h File Reference

#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "nodes/relation.h"
#include "utils/rel.h"
#include "libpq-fe.h"
Include dependency graph for postgres_fdw.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

int set_transmission_modes (void)
void reset_transmission_modes (int nestlevel)
PGconnGetConnection (ForeignServer *server, UserMapping *user, bool will_prep_stmt)
void ReleaseConnection (PGconn *conn)
unsigned int GetCursorNumber (PGconn *conn)
unsigned int GetPrepStmtNumber (PGconn *conn)
void pgfdw_report_error (int elevel, PGresult *res, bool clear, const char *sql)
int ExtractConnectionOptions (List *defelems, const char **keywords, const char **values)
void classifyConditions (PlannerInfo *root, RelOptInfo *baserel, List **remote_conds, List **local_conds)
bool is_foreign_expr (PlannerInfo *root, RelOptInfo *baserel, Expr *expr)
void deparseSelectSql (StringInfo buf, PlannerInfo *root, RelOptInfo *baserel, Bitmapset *attrs_used, List **retrieved_attrs)
void appendWhereClause (StringInfo buf, PlannerInfo *root, RelOptInfo *baserel, List *exprs, bool is_first, List **params)
void deparseInsertSql (StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *targetAttrs, List *returningList, List **retrieved_attrs)
void deparseUpdateSql (StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *targetAttrs, List *returningList, List **retrieved_attrs)
void deparseDeleteSql (StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *returningList, List **retrieved_attrs)
void deparseAnalyzeSizeSql (StringInfo buf, Relation rel)
void deparseAnalyzeSql (StringInfo buf, Relation rel, List **retrieved_attrs)

Function Documentation

void appendWhereClause ( StringInfo  buf,
PlannerInfo root,
RelOptInfo baserel,
List exprs,
bool  is_first,
List **  params 
)

Definition at line 779 of file deparse.c.

References appendStringInfoChar(), appendStringInfoString(), deparse_expr_cxt::buf, RestrictInfo::clause, deparseExpr(), deparse_expr_cxt::foreignrel, lfirst, deparse_expr_cxt::params_list, reset_transmission_modes(), deparse_expr_cxt::root, and set_transmission_modes().

Referenced by estimate_path_cost_size(), and postgresGetForeignPlan().

{
    deparse_expr_cxt context;
    int         nestlevel;
    ListCell   *lc;

    if (params)
        *params = NIL;          /* initialize result list to empty */

    /* Set up context struct for recursion */
    context.root = root;
    context.foreignrel = baserel;
    context.buf = buf;
    context.params_list = params;

    /* Make sure any constants in the exprs are printed portably */
    nestlevel = set_transmission_modes();

    foreach(lc, exprs)
    {
        RestrictInfo *ri = (RestrictInfo *) lfirst(lc);

        /* Connect expressions with "AND" and parenthesize each condition. */
        if (is_first)
            appendStringInfoString(buf, " WHERE ");
        else
            appendStringInfoString(buf, " AND ");

        appendStringInfoChar(buf, '(');
        deparseExpr(ri->clause, &context);
        appendStringInfoChar(buf, ')');

        is_first = false;
    }

    reset_transmission_modes(nestlevel);
}

void classifyConditions ( PlannerInfo root,
RelOptInfo baserel,
List **  remote_conds,
List **  local_conds 
)

Definition at line 143 of file deparse.c.

References RelOptInfo::baserestrictinfo, RestrictInfo::clause, is_foreign_expr(), lappend(), and lfirst.

Referenced by postgresGetForeignRelSize().

{
    ListCell   *lc;

    *remote_conds = NIL;
    *local_conds = NIL;

    foreach(lc, baserel->baserestrictinfo)
    {
        RestrictInfo *ri = (RestrictInfo *) lfirst(lc);

        if (is_foreign_expr(root, baserel, ri->clause))
            *remote_conds = lappend(*remote_conds, ri);
        else
            *local_conds = lappend(*local_conds, ri);
    }
}

void deparseAnalyzeSizeSql ( StringInfo  buf,
Relation  rel 
)

Definition at line 984 of file deparse.c.

References appendStringInfo(), StringInfoData::data, deparseRelation(), deparseStringLiteral(), and initStringInfo().

Referenced by postgresAnalyzeForeignTable().

{
    StringInfoData relname;

    /* We'll need the remote relation name as a literal. */
    initStringInfo(&relname);
    deparseRelation(&relname, rel);

    appendStringInfo(buf, "SELECT pg_catalog.pg_relation_size(");
    deparseStringLiteral(buf, relname.data);
    appendStringInfo(buf, "::pg_catalog.regclass) / %d", BLCKSZ);
}

void deparseAnalyzeSql ( StringInfo  buf,
Relation  rel,
List **  retrieved_attrs 
)

Definition at line 1004 of file deparse.c.

References appendStringInfoString(), tupleDesc::attrs, defGetString(), DefElem::defname, deparseRelation(), GetForeignColumnOptions(), i, lappend_int(), lfirst, NameStr, tupleDesc::natts, quote_identifier(), RelationGetDescr, and RelationGetRelid.

Referenced by postgresAcquireSampleRowsFunc().

{
    Oid         relid = RelationGetRelid(rel);
    TupleDesc   tupdesc = RelationGetDescr(rel);
    int         i;
    char       *colname;
    List       *options;
    ListCell   *lc;
    bool        first = true;

    *retrieved_attrs = NIL;

    appendStringInfoString(buf, "SELECT ");
    for (i = 0; i < tupdesc->natts; i++)
    {
        /* Ignore dropped columns. */
        if (tupdesc->attrs[i]->attisdropped)
            continue;

        if (!first)
            appendStringInfoString(buf, ", ");
        first = false;

        /* Use attribute name or column_name option. */
        colname = NameStr(tupdesc->attrs[i]->attname);
        options = GetForeignColumnOptions(relid, i + 1);

        foreach(lc, options)
        {
            DefElem    *def = (DefElem *) lfirst(lc);

            if (strcmp(def->defname, "column_name") == 0)
            {
                colname = defGetString(def);
                break;
            }
        }

        appendStringInfoString(buf, quote_identifier(colname));

        *retrieved_attrs = lappend_int(*retrieved_attrs, i + 1);
    }

    /* Don't generate bad syntax for zero-column relation. */
    if (first)
        appendStringInfoString(buf, "NULL");

    /*
     * Construct FROM clause
     */
    appendStringInfoString(buf, " FROM ");
    deparseRelation(buf, rel);
}

void deparseDeleteSql ( StringInfo  buf,
PlannerInfo root,
Index  rtindex,
Relation  rel,
List returningList,
List **  retrieved_attrs 
)

Definition at line 936 of file deparse.c.

References appendStringInfoString(), deparseRelation(), and deparseReturningList().

Referenced by postgresPlanForeignModify().

{
    appendStringInfoString(buf, "DELETE FROM ");
    deparseRelation(buf, rel);
    appendStringInfoString(buf, " WHERE ctid = $1");

    if (returningList)
        deparseReturningList(buf, root, rtindex, rel, returningList,
                             retrieved_attrs);
    else
        *retrieved_attrs = NIL;
}

void deparseInsertSql ( StringInfo  buf,
PlannerInfo root,
Index  rtindex,
Relation  rel,
List targetAttrs,
List returningList,
List **  retrieved_attrs 
)

Definition at line 830 of file deparse.c.

References appendStringInfo(), appendStringInfoString(), deparseColumnRef(), deparseRelation(), deparseReturningList(), and lfirst_int.

Referenced by postgresPlanForeignModify().

{
    AttrNumber  pindex;
    bool        first;
    ListCell   *lc;

    appendStringInfoString(buf, "INSERT INTO ");
    deparseRelation(buf, rel);

    if (targetAttrs)
    {
        appendStringInfoString(buf, "(");

        first = true;
        foreach(lc, targetAttrs)
        {
            int         attnum = lfirst_int(lc);

            if (!first)
                appendStringInfoString(buf, ", ");
            first = false;

            deparseColumnRef(buf, rtindex, attnum, root);
        }

        appendStringInfoString(buf, ") VALUES (");

        pindex = 1;
        first = true;
        foreach(lc, targetAttrs)
        {
            if (!first)
                appendStringInfoString(buf, ", ");
            first = false;

            appendStringInfo(buf, "$%d", pindex);
            pindex++;
        }

        appendStringInfoString(buf, ")");
    }
    else
        appendStringInfoString(buf, " DEFAULT VALUES");

    if (returningList)
        deparseReturningList(buf, root, rtindex, rel, returningList,
                             retrieved_attrs);
    else
        *retrieved_attrs = NIL;
}

void deparseSelectSql ( StringInfo  buf,
PlannerInfo root,
RelOptInfo baserel,
Bitmapset attrs_used,
List **  retrieved_attrs 
)

Definition at line 662 of file deparse.c.

References appendStringInfoString(), deparseRelation(), deparseTargetList(), heap_close, heap_open(), NoLock, planner_rt_fetch, RangeTblEntry::relid, and RelOptInfo::relid.

Referenced by estimate_path_cost_size(), and postgresGetForeignPlan().

{
    RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
    Relation    rel;

    /*
     * Core code already has some lock on each rel being planned, so we can
     * use NoLock here.
     */
    rel = heap_open(rte->relid, NoLock);

    /*
     * Construct SELECT list
     */
    appendStringInfoString(buf, "SELECT ");
    deparseTargetList(buf, root, baserel->relid, rel, attrs_used,
                      retrieved_attrs);

    /*
     * Construct FROM clause
     */
    appendStringInfoString(buf, " FROM ");
    deparseRelation(buf, rel);

    heap_close(rel, NoLock);
}

void deparseUpdateSql ( StringInfo  buf,
PlannerInfo root,
Index  rtindex,
Relation  rel,
List targetAttrs,
List returningList,
List **  retrieved_attrs 
)

Definition at line 892 of file deparse.c.

References appendStringInfo(), appendStringInfoString(), deparseColumnRef(), deparseRelation(), deparseReturningList(), and lfirst_int.

Referenced by postgresPlanForeignModify().

{
    AttrNumber  pindex;
    bool        first;
    ListCell   *lc;

    appendStringInfoString(buf, "UPDATE ");
    deparseRelation(buf, rel);
    appendStringInfoString(buf, " SET ");

    pindex = 2;                 /* ctid is always the first param */
    first = true;
    foreach(lc, targetAttrs)
    {
        int         attnum = lfirst_int(lc);

        if (!first)
            appendStringInfoString(buf, ", ");
        first = false;

        deparseColumnRef(buf, rtindex, attnum, root);
        appendStringInfo(buf, " = $%d", pindex);
        pindex++;
    }
    appendStringInfoString(buf, " WHERE ctid = $1");

    if (returningList)
        deparseReturningList(buf, root, rtindex, rel, returningList,
                             retrieved_attrs);
    else
        *retrieved_attrs = NIL;
}

int ExtractConnectionOptions ( List defelems,
const char **  keywords,
const char **  values 
)

Definition at line 271 of file option.c.

References defGetString(), DefElem::defname, i, InitPgFdwOptions(), is_libpq_option(), and lfirst.

Referenced by connect_pg_server().

{
    ListCell   *lc;
    int         i;

    /* Build our options lists if we didn't yet. */
    InitPgFdwOptions();

    i = 0;
    foreach(lc, defelems)
    {
        DefElem    *d = (DefElem *) lfirst(lc);

        if (is_libpq_option(d->defname))
        {
            keywords[i] = d->defname;
            values[i] = defGetString(d);
            i++;
        }
    }
    return i;
}

PGconn* GetConnection ( ForeignServer server,
UserMapping user,
bool  will_prep_stmt 
)

Definition at line 97 of file connection.c.

References begin_remote_xact(), CacheMemoryContext, ConnCacheEntry::conn, connect_pg_server(), DEBUG3, elog, HASHCTL::entrysize, HASHCTL::hash, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, HASHCTL::hcxt, HASHCTL::keysize, MemSet, NULL, pgfdw_subxact_callback(), pgfdw_xact_callback(), RegisterSubXactCallback(), RegisterXactCallback(), ForeignServer::serverid, ConnCacheKey::serverid, ForeignServer::servername, UserMapping::userid, ConnCacheKey::userid, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by BaseBackup(), dumpBlobs(), dumpDatabase(), dumpTableData_copy(), estimate_path_cost_size(), expand_schema_name_patterns(), expand_table_name_patterns(), getTables(), main(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresBeginForeignModify(), postgresBeginForeignScan(), setup_connection(), StartLogStreamer(), and StreamLog().

{
    bool        found;
    ConnCacheEntry *entry;
    ConnCacheKey key;

    /* First time through, initialize connection cache hashtable */
    if (ConnectionHash == NULL)
    {
        HASHCTL     ctl;

        MemSet(&ctl, 0, sizeof(ctl));
        ctl.keysize = sizeof(ConnCacheKey);
        ctl.entrysize = sizeof(ConnCacheEntry);
        ctl.hash = tag_hash;
        /* allocate ConnectionHash in the cache context */
        ctl.hcxt = CacheMemoryContext;
        ConnectionHash = hash_create("postgres_fdw connections", 8,
                                     &ctl,
                                   HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);

        /*
         * Register some callback functions that manage connection cleanup.
         * This should be done just once in each backend.
         */
        RegisterXactCallback(pgfdw_xact_callback, NULL);
        RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
    }

    /* Set flag that we did GetConnection during the current transaction */
    xact_got_connection = true;

    /* Create hash key for the entry.  Assume no pad bytes in key struct */
    key.serverid = server->serverid;
    key.userid = user->userid;

    /*
     * Find or create cached entry for requested connection.
     */
    entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
    if (!found)
    {
        /* initialize new hashtable entry (key is already filled in) */
        entry->conn = NULL;
        entry->xact_depth = 0;
        entry->have_prep_stmt = false;
        entry->have_error = false;
    }

    /*
     * We don't check the health of cached connection here, because it would
     * require some overhead.  Broken connection will be detected when the
     * connection is actually used.
     */

    /*
     * If cache entry doesn't have a connection, we have to establish a new
     * connection.  (If connect_pg_server throws an error, the cache entry
     * will be left in a valid empty state.)
     */
    if (entry->conn == NULL)
    {
        entry->xact_depth = 0;  /* just to be sure */
        entry->have_prep_stmt = false;
        entry->have_error = false;
        entry->conn = connect_pg_server(server, user);
        elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
             entry->conn, server->servername);
    }

    /*
     * Start a new transaction or subtransaction if needed.
     */
    begin_remote_xact(entry);

    /* Remember if caller will prepare statements */
    entry->have_prep_stmt |= will_prep_stmt;

    return entry->conn;
}

unsigned int GetCursorNumber ( PGconn conn  ) 

Definition at line 433 of file connection.c.

References cursor_number.

Referenced by postgresAcquireSampleRowsFunc(), and postgresBeginForeignScan().

{
    return ++cursor_number;
}

unsigned int GetPrepStmtNumber ( PGconn conn  ) 

Definition at line 447 of file connection.c.

References prep_stmt_number.

Referenced by prepare_foreign_modify().

{
    return ++prep_stmt_number;
}

bool is_foreign_expr ( PlannerInfo root,
RelOptInfo baserel,
Expr expr 
)

Definition at line 168 of file deparse.c.

References Assert, foreign_loc_cxt::collation, contain_mutable_functions(), FDW_COLLATE_NONE, foreign_expr_walker(), foreign_glob_cxt::foreignrel, InvalidOid, foreign_glob_cxt::root, and foreign_loc_cxt::state.

Referenced by classifyConditions(), postgresGetForeignPaths(), and postgresGetForeignPlan().

{
    foreign_glob_cxt glob_cxt;
    foreign_loc_cxt loc_cxt;

    /*
     * Check that the expression consists of nodes that are safe to execute
     * remotely.
     */
    glob_cxt.root = root;
    glob_cxt.foreignrel = baserel;
    loc_cxt.collation = InvalidOid;
    loc_cxt.state = FDW_COLLATE_NONE;
    if (!foreign_expr_walker((Node *) expr, &glob_cxt, &loc_cxt))
        return false;

    /* Expressions examined here should be boolean, ie noncollatable */
    Assert(loc_cxt.collation == InvalidOid);
    Assert(loc_cxt.state == FDW_COLLATE_NONE);

    /*
     * An expression which includes any mutable functions can't be sent over
     * because its result is not stable.  For example, sending now() remote
     * side could cause confusion from clock offsets.  Future versions might
     * be able to make this choice with more granularity.  (We check this last
     * because it requires a lot of expensive catalog lookups.)
     */
    if (contain_mutable_functions((Node *) expr))
        return false;

    /* OK to evaluate on the remote server */
    return true;
}

void pgfdw_report_error ( int  elevel,
PGresult res,
bool  clear,
const char *  sql 
)

Definition at line 465 of file connection.c.

References ereport, errcode(), errcontext, errdetail_internal(), errhint(), errmsg(), errmsg_internal(), MAKE_SQLSTATE, PG_CATCH, PG_DIAG_CONTEXT, PG_DIAG_MESSAGE_DETAIL, PG_DIAG_MESSAGE_HINT, PG_DIAG_MESSAGE_PRIMARY, PG_DIAG_SQLSTATE, PG_END_TRY, PG_RE_THROW, PG_TRY, PQclear(), and PQresultErrorField().

Referenced by close_cursor(), create_cursor(), do_sql_command(), fetch_more_data(), get_remote_estimate(), pgfdw_subxact_callback(), pgfdw_xact_callback(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndForeignModify(), postgresExecForeignDelete(), postgresExecForeignInsert(), postgresExecForeignUpdate(), postgresReScanForeignScan(), and prepare_foreign_modify().

{
    /* If requested, PGresult must be released before leaving this function. */
    PG_TRY();
    {
        char       *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
        char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
        char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
        char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
        char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
        int         sqlstate;

        if (diag_sqlstate)
            sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
                                     diag_sqlstate[1],
                                     diag_sqlstate[2],
                                     diag_sqlstate[3],
                                     diag_sqlstate[4]);
        else
            sqlstate = ERRCODE_CONNECTION_FAILURE;

        ereport(elevel,
                (errcode(sqlstate),
                 message_primary ? errmsg_internal("%s", message_primary) :
                 errmsg("unknown error"),
               message_detail ? errdetail_internal("%s", message_detail) : 0,
                 message_hint ? errhint("%s", message_hint) : 0,
                 message_context ? errcontext("%s", message_context) : 0,
                 sql ? errcontext("Remote SQL command: %s", sql) : 0));
    }
    PG_CATCH();
    {
        if (clear)
            PQclear(res);
        PG_RE_THROW();
    }
    PG_END_TRY();
    if (clear)
        PQclear(res);
}

void ReleaseConnection ( PGconn conn  ) 

Definition at line 412 of file connection.c.

Referenced by estimate_path_cost_size(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndForeignModify(), and postgresEndForeignScan().

{
    /*
     * Currently, we don't actually track connection references because all
     * cleanup is managed on a transaction or subtransaction basis instead. So
     * there's nothing to do here.
     */
}

void reset_transmission_modes ( int  nestlevel  ) 

Definition at line 2050 of file postgres_fdw.c.

References AtEOXact_GUC().

Referenced by appendWhereClause(), convert_prep_stmt_params(), and create_cursor().

{
    AtEOXact_GUC(true, nestlevel);
}

int set_transmission_modes ( void   ) 

Definition at line 2022 of file postgres_fdw.c.

References DateStyle, extra_float_digits, GUC_ACTION_SAVE, IntervalStyle, INTSTYLE_POSTGRES, NewGUCNestLevel(), PGC_S_SESSION, PGC_USERSET, set_config_option(), and USE_ISO_DATES.

Referenced by appendWhereClause(), convert_prep_stmt_params(), and create_cursor().

{
    int         nestlevel = NewGUCNestLevel();

    /*
     * The values set here should match what pg_dump does.  See also
     * configure_remote_session in connection.c.
     */
    if (DateStyle != USE_ISO_DATES)
        (void) set_config_option("datestyle", "ISO",
                                 PGC_USERSET, PGC_S_SESSION,
                                 GUC_ACTION_SAVE, true, 0);
    if (IntervalStyle != INTSTYLE_POSTGRES)
        (void) set_config_option("intervalstyle", "postgres",
                                 PGC_USERSET, PGC_S_SESSION,
                                 GUC_ACTION_SAVE, true, 0);
    if (extra_float_digits < 3)
        (void) set_config_option("extra_float_digits", "3",
                                 PGC_USERSET, PGC_S_SESSION,
                                 GUC_ACTION_SAVE, true, 0);

    return nestlevel;
}