Header And Logo

PostgreSQL
| The world's most advanced open source database.

Data Structures | Typedefs | Functions | Variables

connection.c File Reference

#include "postgres.h"
#include "postgres_fdw.h"
#include "access/xact.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
Include dependency graph for connection.c:

Go to the source code of this file.

Data Structures

struct  ConnCacheKey
struct  ConnCacheEntry

Typedefs

typedef struct ConnCacheKey ConnCacheKey
typedef struct ConnCacheEntry ConnCacheEntry

Functions

static PGconnconnect_pg_server (ForeignServer *server, UserMapping *user)
static void check_conn_params (const char **keywords, const char **values)
static void configure_remote_session (PGconn *conn)
static void do_sql_command (PGconn *conn, const char *sql)
static void begin_remote_xact (ConnCacheEntry *entry)
static void pgfdw_xact_callback (XactEvent event, void *arg)
static void pgfdw_subxact_callback (SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg)
PGconnGetConnection (ForeignServer *server, UserMapping *user, bool will_prep_stmt)
void ReleaseConnection (PGconn *conn)
unsigned int GetCursorNumber (PGconn *conn)
unsigned int GetPrepStmtNumber (PGconn *conn)
void pgfdw_report_error (int elevel, PGresult *res, bool clear, const char *sql)

Variables

static HTABConnectionHash = NULL
static unsigned int cursor_number = 0
static unsigned int prep_stmt_number = 0
static bool xact_got_connection = false

Typedef Documentation

typedef struct ConnCacheKey ConnCacheKey

Function Documentation

static void begin_remote_xact ( ConnCacheEntry entry  )  [static]

Definition at line 373 of file connection.c.

References ConnCacheEntry::conn, DEBUG3, do_sql_command(), elog, GetCurrentTransactionNestLevel(), IsolationIsSerializable, snprintf(), and ConnCacheEntry::xact_depth.

Referenced by GetConnection().

{
    int         curlevel = GetCurrentTransactionNestLevel();

    /* Start main transaction if we haven't yet */
    if (entry->xact_depth <= 0)
    {
        const char *sql;

        elog(DEBUG3, "starting remote transaction on connection %p",
             entry->conn);

        if (IsolationIsSerializable())
            sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
        else
            sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
        do_sql_command(entry->conn, sql);
        entry->xact_depth = 1;
    }

    /*
     * If we're in a subtransaction, stack up savepoints to match our level.
     * This ensures we can rollback just the desired effects when a
     * subtransaction aborts.
     */
    while (entry->xact_depth < curlevel)
    {
        char        sql[64];

        snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
        do_sql_command(entry->conn, sql);
        entry->xact_depth++;
    }
}

static void check_conn_params ( const char **  keywords,
const char **  values 
) [static]

Definition at line 283 of file connection.c.

References ereport, errcode(), errdetail(), errmsg(), ERROR, i, and superuser().

Referenced by connect_pg_server().

{
    int         i;

    /* no check required if superuser */
    if (superuser())
        return;

    /* ok if params contain a non-empty password */
    for (i = 0; keywords[i] != NULL; i++)
    {
        if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
            return;
    }

    ereport(ERROR,
            (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
             errmsg("password is required"),
             errdetail("Non-superusers must provide a password in the user mapping.")));
}

static void configure_remote_session ( PGconn conn  )  [static]

Definition at line 316 of file connection.c.

References do_sql_command(), and PQserverVersion().

Referenced by connect_pg_server().

{
    int         remoteversion = PQserverVersion(conn);

    /* Force the search path to contain only pg_catalog (see deparse.c) */
    do_sql_command(conn, "SET search_path = pg_catalog");

    /*
     * Set remote timezone; this is basically just cosmetic, since all
     * transmitted and returned timestamptzs should specify a zone explicitly
     * anyway.  However it makes the regression test outputs more predictable.
     *
     * We don't risk setting remote zone equal to ours, since the remote
     * server might use a different timezone database.  Instead, use UTC
     * (quoted, because very old servers are picky about case).
     */
    do_sql_command(conn, "SET timezone = 'UTC'");

    /*
     * Set values needed to ensure unambiguous data output from remote.  (This
     * logic should match what pg_dump does.  See also set_transmission_modes
     * in postgres_fdw.c.)
     */
    do_sql_command(conn, "SET datestyle = ISO");
    if (remoteversion >= 80400)
        do_sql_command(conn, "SET intervalstyle = postgres");
    if (remoteversion >= 90000)
        do_sql_command(conn, "SET extra_float_digits = 3");
    else
        do_sql_command(conn, "SET extra_float_digits = 2");
}

static PGconn * connect_pg_server ( ForeignServer server,
UserMapping user 
) [static]

Definition at line 183 of file connection.c.

References check_conn_params(), configure_remote_session(), conn, CONNECTION_OK, ereport, errcode(), errdetail(), errdetail_internal(), errhint(), errmsg(), ERROR, ExtractConnectionOptions(), GetDatabaseEncodingName(), list_length(), UserMapping::options, ForeignServer::options, palloc(), pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PQconnectdbParams(), PQconnectionUsedPassword(), PQerrorMessage(), PQfinish(), PQstatus(), pstrdup(), ForeignServer::servername, superuser(), and values.

Referenced by GetConnection().

{
    PGconn     *volatile conn = NULL;

    /*
     * Use PG_TRY block to ensure closing connection on error.
     */
    PG_TRY();
    {
        const char **keywords;
        const char **values;
        int         n;

        /*
         * Construct connection params from generic options of ForeignServer
         * and UserMapping.  (Some of them might not be libpq options, in
         * which case we'll just waste a few array slots.)  Add 3 extra slots
         * for fallback_application_name, client_encoding, end marker.
         */
        n = list_length(server->options) + list_length(user->options) + 3;
        keywords = (const char **) palloc(n * sizeof(char *));
        values = (const char **) palloc(n * sizeof(char *));

        n = 0;
        n += ExtractConnectionOptions(server->options,
                                      keywords + n, values + n);
        n += ExtractConnectionOptions(user->options,
                                      keywords + n, values + n);

        /* Use "postgres_fdw" as fallback_application_name. */
        keywords[n] = "fallback_application_name";
        values[n] = "postgres_fdw";
        n++;

        /* Set client_encoding so that libpq can convert encoding properly. */
        keywords[n] = "client_encoding";
        values[n] = GetDatabaseEncodingName();
        n++;

        keywords[n] = values[n] = NULL;

        /* verify connection parameters and make connection */
        check_conn_params(keywords, values);

        conn = PQconnectdbParams(keywords, values, false);
        if (!conn || PQstatus(conn) != CONNECTION_OK)
        {
            char       *connmessage;
            int         msglen;

            /* libpq typically appends a newline, strip that */
            connmessage = pstrdup(PQerrorMessage(conn));
            msglen = strlen(connmessage);
            if (msglen > 0 && connmessage[msglen - 1] == '\n')
                connmessage[msglen - 1] = '\0';
            ereport(ERROR,
               (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
                errmsg("could not connect to server \"%s\"",
                       server->servername),
                errdetail_internal("%s", connmessage)));
        }

        /*
         * Check that non-superuser has used password to establish connection;
         * otherwise, he's piggybacking on the postgres server's user
         * identity. See also dblink_security_check() in contrib/dblink.
         */
        if (!superuser() && !PQconnectionUsedPassword(conn))
            ereport(ERROR,
                  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
                   errmsg("password is required"),
                   errdetail("Non-superuser cannot connect if the server does not request a password."),
                   errhint("Target server's authentication method must be changed.")));

        /* Prepare new session for use */
        configure_remote_session(conn);

        pfree(keywords);
        pfree(values);
    }
    PG_CATCH();
    {
        /* Release PGconn data structure if we managed to create one */
        if (conn)
            PQfinish(conn);
        PG_RE_THROW();
    }
    PG_END_TRY();

    return conn;
}

static void do_sql_command ( PGconn conn,
const char *  sql 
) [static]
PGconn* GetConnection ( ForeignServer server,
UserMapping user,
bool  will_prep_stmt 
)

Definition at line 97 of file connection.c.

References begin_remote_xact(), CacheMemoryContext, ConnCacheEntry::conn, connect_pg_server(), DEBUG3, elog, HASHCTL::entrysize, HASHCTL::hash, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, HASH_FUNCTION, hash_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, HASHCTL::hcxt, HASHCTL::keysize, MemSet, NULL, pgfdw_subxact_callback(), pgfdw_xact_callback(), RegisterSubXactCallback(), RegisterXactCallback(), ForeignServer::serverid, ConnCacheKey::serverid, ForeignServer::servername, UserMapping::userid, ConnCacheKey::userid, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by BaseBackup(), dumpBlobs(), dumpDatabase(), dumpTableData_copy(), estimate_path_cost_size(), expand_schema_name_patterns(), expand_table_name_patterns(), getTables(), main(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresBeginForeignModify(), postgresBeginForeignScan(), setup_connection(), StartLogStreamer(), and StreamLog().

{
    bool        found;
    ConnCacheEntry *entry;
    ConnCacheKey key;

    /* First time through, initialize connection cache hashtable */
    if (ConnectionHash == NULL)
    {
        HASHCTL     ctl;

        MemSet(&ctl, 0, sizeof(ctl));
        ctl.keysize = sizeof(ConnCacheKey);
        ctl.entrysize = sizeof(ConnCacheEntry);
        ctl.hash = tag_hash;
        /* allocate ConnectionHash in the cache context */
        ctl.hcxt = CacheMemoryContext;
        ConnectionHash = hash_create("postgres_fdw connections", 8,
                                     &ctl,
                                   HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);

        /*
         * Register some callback functions that manage connection cleanup.
         * This should be done just once in each backend.
         */
        RegisterXactCallback(pgfdw_xact_callback, NULL);
        RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
    }

    /* Set flag that we did GetConnection during the current transaction */
    xact_got_connection = true;

    /* Create hash key for the entry.  Assume no pad bytes in key struct */
    key.serverid = server->serverid;
    key.userid = user->userid;

    /*
     * Find or create cached entry for requested connection.
     */
    entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
    if (!found)
    {
        /* initialize new hashtable entry (key is already filled in) */
        entry->conn = NULL;
        entry->xact_depth = 0;
        entry->have_prep_stmt = false;
        entry->have_error = false;
    }

    /*
     * We don't check the health of cached connection here, because it would
     * require some overhead.  Broken connection will be detected when the
     * connection is actually used.
     */

    /*
     * If cache entry doesn't have a connection, we have to establish a new
     * connection.  (If connect_pg_server throws an error, the cache entry
     * will be left in a valid empty state.)
     */
    if (entry->conn == NULL)
    {
        entry->xact_depth = 0;  /* just to be sure */
        entry->have_prep_stmt = false;
        entry->have_error = false;
        entry->conn = connect_pg_server(server, user);
        elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
             entry->conn, server->servername);
    }

    /*
     * Start a new transaction or subtransaction if needed.
     */
    begin_remote_xact(entry);

    /* Remember if caller will prepare statements */
    entry->have_prep_stmt |= will_prep_stmt;

    return entry->conn;
}

unsigned int GetCursorNumber ( PGconn conn  ) 

Definition at line 433 of file connection.c.

References cursor_number.

Referenced by postgresAcquireSampleRowsFunc(), and postgresBeginForeignScan().

{
    return ++cursor_number;
}

unsigned int GetPrepStmtNumber ( PGconn conn  ) 

Definition at line 447 of file connection.c.

References prep_stmt_number.

Referenced by prepare_foreign_modify().

{
    return ++prep_stmt_number;
}

void pgfdw_report_error ( int  elevel,
PGresult res,
bool  clear,
const char *  sql 
)

Definition at line 465 of file connection.c.

References ereport, errcode(), errcontext, errdetail_internal(), errhint(), errmsg(), errmsg_internal(), MAKE_SQLSTATE, PG_CATCH, PG_DIAG_CONTEXT, PG_DIAG_MESSAGE_DETAIL, PG_DIAG_MESSAGE_HINT, PG_DIAG_MESSAGE_PRIMARY, PG_DIAG_SQLSTATE, PG_END_TRY, PG_RE_THROW, PG_TRY, PQclear(), and PQresultErrorField().

Referenced by close_cursor(), create_cursor(), do_sql_command(), fetch_more_data(), get_remote_estimate(), pgfdw_subxact_callback(), pgfdw_xact_callback(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndForeignModify(), postgresExecForeignDelete(), postgresExecForeignInsert(), postgresExecForeignUpdate(), postgresReScanForeignScan(), and prepare_foreign_modify().

{
    /* If requested, PGresult must be released before leaving this function. */
    PG_TRY();
    {
        char       *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
        char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
        char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
        char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
        char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
        int         sqlstate;

        if (diag_sqlstate)
            sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
                                     diag_sqlstate[1],
                                     diag_sqlstate[2],
                                     diag_sqlstate[3],
                                     diag_sqlstate[4]);
        else
            sqlstate = ERRCODE_CONNECTION_FAILURE;

        ereport(elevel,
                (errcode(sqlstate),
                 message_primary ? errmsg_internal("%s", message_primary) :
                 errmsg("unknown error"),
               message_detail ? errdetail_internal("%s", message_detail) : 0,
                 message_hint ? errhint("%s", message_hint) : 0,
                 message_context ? errcontext("%s", message_context) : 0,
                 sql ? errcontext("Remote SQL command: %s", sql) : 0));
    }
    PG_CATCH();
    {
        if (clear)
            PQclear(res);
        PG_RE_THROW();
    }
    PG_END_TRY();
    if (clear)
        PQclear(res);
}

static void pgfdw_subxact_callback ( SubXactEvent  event,
SubTransactionId  mySubid,
SubTransactionId  parentSubid,
void *  arg 
) [static]

Definition at line 638 of file connection.c.

References ConnCacheEntry::conn, do_sql_command(), elog, ERROR, GetCurrentTransactionNestLevel(), hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, NULL, pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQexec(), PQresultStatus(), snprintf(), SUBXACT_EVENT_ABORT_SUB, SUBXACT_EVENT_PRE_COMMIT_SUB, WARNING, ConnCacheEntry::xact_depth, and xact_got_connection.

Referenced by GetConnection().

{
    HASH_SEQ_STATUS scan;
    ConnCacheEntry *entry;
    int         curlevel;

    /* Nothing to do at subxact start, nor after commit. */
    if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
          event == SUBXACT_EVENT_ABORT_SUB))
        return;

    /* Quick exit if no connections were touched in this transaction. */
    if (!xact_got_connection)
        return;

    /*
     * Scan all connection cache entries to find open remote subtransactions
     * of the current level, and close them.
     */
    curlevel = GetCurrentTransactionNestLevel();
    hash_seq_init(&scan, ConnectionHash);
    while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    {
        PGresult   *res;
        char        sql[100];

        /*
         * We only care about connections with open remote subtransactions of
         * the current level.
         */
        if (entry->conn == NULL || entry->xact_depth < curlevel)
            continue;

        if (entry->xact_depth > curlevel)
            elog(ERROR, "missed cleaning up remote subtransaction at level %d",
                 entry->xact_depth);

        if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
        {
            /* Commit all remote subtransactions during pre-commit */
            snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
            do_sql_command(entry->conn, sql);
        }
        else
        {
            /* Assume we might have lost track of prepared statements */
            entry->have_error = true;
            /* Rollback all remote subtransactions during abort */
            snprintf(sql, sizeof(sql),
                     "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
                     curlevel, curlevel);
            res = PQexec(entry->conn, sql);
            if (PQresultStatus(res) != PGRES_COMMAND_OK)
                pgfdw_report_error(WARNING, res, true, sql);
            else
                PQclear(res);
        }

        /* OK, we're outta that level of subtransaction */
        entry->xact_depth--;
    }
}

static void pgfdw_xact_callback ( XactEvent  event,
void *  arg 
) [static]

Definition at line 510 of file connection.c.

References ConnCacheEntry::conn, CONNECTION_OK, cursor_number, DEBUG3, do_sql_command(), elog, ereport, errcode(), errmsg(), ERROR, hash_seq_init(), hash_seq_search(), ConnCacheEntry::have_error, ConnCacheEntry::have_prep_stmt, NULL, pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQexec(), PQfinish(), PQresultStatus(), PQstatus(), PQTRANS_IDLE, PQtransactionStatus(), WARNING, ConnCacheEntry::xact_depth, XACT_EVENT_ABORT, XACT_EVENT_COMMIT, XACT_EVENT_PRE_COMMIT, XACT_EVENT_PRE_PREPARE, XACT_EVENT_PREPARE, and xact_got_connection.

Referenced by GetConnection().

{
    HASH_SEQ_STATUS scan;
    ConnCacheEntry *entry;

    /* Quick exit if no connections were touched in this transaction. */
    if (!xact_got_connection)
        return;

    /*
     * Scan all connection cache entries to find open remote transactions, and
     * close them.
     */
    hash_seq_init(&scan, ConnectionHash);
    while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    {
        PGresult   *res;

        /* We only care about connections with open remote transactions */
        if (entry->conn == NULL || entry->xact_depth == 0)
            continue;

        elog(DEBUG3, "closing remote transaction on connection %p",
             entry->conn);

        switch (event)
        {
            case XACT_EVENT_PRE_COMMIT:
                /* Commit all remote transactions during pre-commit */
                do_sql_command(entry->conn, "COMMIT TRANSACTION");

                /*
                 * If there were any errors in subtransactions, and we made
                 * prepared statements, do a DEALLOCATE ALL to make sure we
                 * get rid of all prepared statements.  This is annoying and
                 * not terribly bulletproof, but it's probably not worth
                 * trying harder.
                 *
                 * DEALLOCATE ALL only exists in 8.3 and later, so this
                 * constrains how old a server postgres_fdw can communicate
                 * with.  We intentionally ignore errors in the DEALLOCATE, so
                 * that we can hobble along to some extent with older servers
                 * (leaking prepared statements as we go; but we don't really
                 * support update operations pre-8.3 anyway).
                 */
                if (entry->have_prep_stmt && entry->have_error)
                {
                    res = PQexec(entry->conn, "DEALLOCATE ALL");
                    PQclear(res);
                }
                entry->have_prep_stmt = false;
                entry->have_error = false;
                break;
            case XACT_EVENT_PRE_PREPARE:

                /*
                 * We disallow remote transactions that modified anything,
                 * since it's not really reasonable to hold them open until
                 * the prepared transaction is committed.  For the moment,
                 * throw error unconditionally; later we might allow read-only
                 * cases.  Note that the error will cause us to come right
                 * back here with event == XACT_EVENT_ABORT, so we'll clean up
                 * the connection state at that point.
                 */
                ereport(ERROR,
                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                         errmsg("cannot prepare a transaction that modified remote tables")));
                break;
            case XACT_EVENT_COMMIT:
            case XACT_EVENT_PREPARE:
                /* Should not get here -- pre-commit should have handled it */
                elog(ERROR, "missed cleaning up connection during pre-commit");
                break;
            case XACT_EVENT_ABORT:
                /* Assume we might have lost track of prepared statements */
                entry->have_error = true;
                /* If we're aborting, abort all remote transactions too */
                res = PQexec(entry->conn, "ABORT TRANSACTION");
                /* Note: can't throw ERROR, it would be infinite loop */
                if (PQresultStatus(res) != PGRES_COMMAND_OK)
                    pgfdw_report_error(WARNING, res, true,
                                       "ABORT TRANSACTION");
                else
                {
                    PQclear(res);
                    /* As above, make sure we've cleared any prepared stmts */
                    if (entry->have_prep_stmt && entry->have_error)
                    {
                        res = PQexec(entry->conn, "DEALLOCATE ALL");
                        PQclear(res);
                    }
                    entry->have_prep_stmt = false;
                    entry->have_error = false;
                }
                break;
        }

        /* Reset state to show we're out of a transaction */
        entry->xact_depth = 0;

        /*
         * If the connection isn't in a good idle state, discard it to
         * recover. Next GetConnection will open a new connection.
         */
        if (PQstatus(entry->conn) != CONNECTION_OK ||
            PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
        {
            elog(DEBUG3, "discarding connection %p", entry->conn);
            PQfinish(entry->conn);
            entry->conn = NULL;
        }
    }

    /*
     * Regardless of the event type, we can now mark ourselves as out of the
     * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
     * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
     */
    xact_got_connection = false;

    /* Also reset cursor numbering for next transaction */
    cursor_number = 0;
}

void ReleaseConnection ( PGconn conn  ) 

Definition at line 412 of file connection.c.

Referenced by estimate_path_cost_size(), postgresAcquireSampleRowsFunc(), postgresAnalyzeForeignTable(), postgresEndForeignModify(), and postgresEndForeignScan().

{
    /*
     * Currently, we don't actually track connection references because all
     * cleanup is managed on a transaction or subtransaction basis instead. So
     * there's nothing to do here.
     */
}


Variable Documentation

HTAB* ConnectionHash = NULL [static]

Definition at line 57 of file connection.c.

unsigned int cursor_number = 0 [static]
unsigned int prep_stmt_number = 0 [static]

Definition at line 61 of file connection.c.

Referenced by GetPrepStmtNumber().

bool xact_got_connection = false [static]

Definition at line 64 of file connection.c.

Referenced by GetConnection(), pgfdw_subxact_callback(), and pgfdw_xact_callback().