#include "postgres.h"
#include "postgres_fdw.h"
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/cost.h"
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
#include "optimizer/planmain.h"
#include "optimizer/prep.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/var.h"
#include "parser/parsetree.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
Go to the source code of this file.
#define DEFAULT_FDW_STARTUP_COST 100.0 |
Definition at line 44 of file postgres_fdw.c.
#define DEFAULT_FDW_TUPLE_COST 0.01 |
Definition at line 47 of file postgres_fdw.c.
typedef struct ConversionLocation ConversionLocation |
typedef struct PgFdwAnalyzeState PgFdwAnalyzeState |
typedef struct PgFdwModifyState PgFdwModifyState |
typedef struct PgFdwRelationInfo PgFdwRelationInfo |
typedef struct PgFdwScanState PgFdwScanState |
FdwModifyPrivateUpdateSql | |
FdwModifyPrivateTargetAttnums | |
FdwModifyPrivateHasReturning | |
FdwModifyPrivateRetrievedAttrs |
Definition at line 114 of file postgres_fdw.c.
{ /* SQL statement to execute remotely (as a String node) */ FdwModifyPrivateUpdateSql, /* Integer list of target attribute numbers for INSERT/UPDATE */ FdwModifyPrivateTargetAttnums, /* has-returning flag (as an integer Value node) */ FdwModifyPrivateHasReturning, /* Integer list of attribute numbers retrieved by RETURNING */ FdwModifyPrivateRetrievedAttrs };
enum FdwScanPrivateIndex |
Definition at line 96 of file postgres_fdw.c.
{ /* SQL statement to execute remotely (as a String node) */ FdwScanPrivateSelectSql, /* Integer list of attribute numbers retrieved by the SELECT */ FdwScanPrivateRetrievedAttrs };
static void analyze_row_processor | ( | PGresult * | res, | |
int | row, | |||
PgFdwAnalyzeState * | astate | |||
) | [static] |
Definition at line 2435 of file postgres_fdw.c.
References PgFdwAnalyzeState::anl_cxt, anl_get_next_S(), anl_random_fract(), Assert, PgFdwAnalyzeState::attinmeta, heap_freetuple(), make_tuple_from_result_row(), MemoryContextSwitchTo(), PgFdwAnalyzeState::numrows, PgFdwAnalyzeState::rel, PgFdwAnalyzeState::retrieved_attrs, PgFdwAnalyzeState::rows, PgFdwAnalyzeState::rowstoskip, PgFdwAnalyzeState::rstate, PgFdwAnalyzeState::samplerows, PgFdwAnalyzeState::targrows, and PgFdwAnalyzeState::temp_cxt.
Referenced by postgresAcquireSampleRowsFunc().
{ int targrows = astate->targrows; int pos; /* array index to store tuple in */ MemoryContext oldcontext; /* Always increment sample row counter. */ astate->samplerows += 1; /* * Determine the slot where this sample row should be stored. Set pos to * negative value to indicate the row should be skipped. */ if (astate->numrows < targrows) { /* First targrows rows are always included into the sample */ pos = astate->numrows++; } else { /* * Now we start replacing tuples in the sample until we reach the end * of the relation. Same algorithm as in acquire_sample_rows in * analyze.c; see Jeff Vitter's paper. */ if (astate->rowstoskip < 0) astate->rowstoskip = anl_get_next_S(astate->samplerows, targrows, &astate->rstate); if (astate->rowstoskip <= 0) { /* Choose a random reservoir element to replace. */ pos = (int) (targrows * anl_random_fract()); Assert(pos >= 0 && pos < targrows); heap_freetuple(astate->rows[pos]); } else { /* Skip this tuple. */ pos = -1; } astate->rowstoskip -= 1; } if (pos >= 0) { /* * Create sample tuple from current result row, and store it in the * position determined above. The tuple has to be created in anl_cxt. */ oldcontext = MemoryContextSwitchTo(astate->anl_cxt); astate->rows[pos] = make_tuple_from_result_row(res, row, astate->rel, astate->attinmeta, astate->retrieved_attrs, astate->temp_cxt); MemoryContextSwitchTo(oldcontext); } }
static void close_cursor | ( | PGconn * | conn, | |
unsigned int | cursor_number | |||
) | [static] |
Definition at line 2059 of file postgres_fdw.c.
References ERROR, pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQexec(), PQresultStatus(), and snprintf().
Referenced by postgresAcquireSampleRowsFunc(), and postgresEndForeignScan().
{ char sql[64]; PGresult *res; snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number); /* * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = PQexec(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, true, sql); PQclear(res); }
static void conversion_error_callback | ( | void * | arg | ) | [static] |
Definition at line 2624 of file postgres_fdw.c.
References tupleDesc::attrs, ConversionLocation::cur_attno, errcontext, NameStr, tupleDesc::natts, ConversionLocation::rel, RelationGetDescr, and RelationGetRelationName.
{ ConversionLocation *errpos = (ConversionLocation *) arg; TupleDesc tupdesc = RelationGetDescr(errpos->rel); if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts) errcontext("column \"%s\" of foreign table \"%s\"", NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname), RelationGetRelationName(errpos->rel)); }
static const char ** convert_prep_stmt_params | ( | PgFdwModifyState * | fmstate, | |
ItemPointer | tupleid, | |||
TupleTableSlot * | slot | |||
) | [static] |
Definition at line 2126 of file postgres_fdw.c.
References Assert, lfirst_int, MemoryContextSwitchTo(), NIL, NULL, OutputFunctionCall(), PgFdwModifyState::p_flinfo, PgFdwModifyState::p_nums, palloc(), PointerGetDatum, reset_transmission_modes(), set_transmission_modes(), slot_getattr(), PgFdwModifyState::target_attrs, PgFdwModifyState::temp_cxt, and value.
Referenced by postgresExecForeignDelete(), postgresExecForeignInsert(), and postgresExecForeignUpdate().
{ const char **p_values; int pindex = 0; MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums); /* 1st parameter should be ctid, if it's in use */ if (tupleid != NULL) { /* don't need set_transmission_modes for TID output */ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], PointerGetDatum(tupleid)); pindex++; } /* get following parameters from slot */ if (slot != NULL && fmstate->target_attrs != NIL) { int nestlevel; ListCell *lc; nestlevel = set_transmission_modes(); foreach(lc, fmstate->target_attrs) { int attnum = lfirst_int(lc); Datum value; bool isnull; value = slot_getattr(slot, attnum, &isnull); if (isnull) p_values[pindex] = NULL; else p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], value); pindex++; } reset_transmission_modes(nestlevel); } Assert(pindex == fmstate->p_nums); MemoryContextSwitchTo(oldcontext); return p_values; }
static void create_cursor | ( | ForeignScanState * | node | ) | [static] |
Definition at line 1844 of file postgres_fdw.c.
References appendStringInfo(), buf, PgFdwScanState::conn, conn, PgFdwScanState::cursor_exists, PgFdwScanState::cursor_number, PgFdwScanState::eof_reached, ERROR, ExecEvalExpr, ForeignScanState::fdw_state, PgFdwScanState::fetch_ct_2, i, initStringInfo(), lfirst, MemoryContextSwitchTo(), PgFdwScanState::next_tuple, NULL, PgFdwScanState::num_tuples, PgFdwScanState::numParams, OutputFunctionCall(), PgFdwScanState::param_exprs, PgFdwScanState::param_flinfo, PgFdwScanState::param_values, pfree(), pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQexecParams(), PQresultStatus(), ScanState::ps, PlanState::ps_ExprContext, PgFdwScanState::query, reset_transmission_modes(), set_transmission_modes(), ForeignScanState::ss, PgFdwScanState::tuples, and values.
Referenced by postgresIterateForeignScan().
{ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; ExprContext *econtext = node->ss.ps.ps_ExprContext; int numParams = fsstate->numParams; const char **values = fsstate->param_values; PGconn *conn = fsstate->conn; StringInfoData buf; PGresult *res; /* * Construct array of query parameter values in text format. We do the * conversions in the short-lived per-tuple context, so as not to cause a * memory leak over repeated scans. */ if (numParams > 0) { int nestlevel; MemoryContext oldcontext; int i; ListCell *lc; oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); nestlevel = set_transmission_modes(); i = 0; foreach(lc, fsstate->param_exprs) { ExprState *expr_state = (ExprState *) lfirst(lc); Datum expr_value; bool isNull; /* Evaluate the parameter expression */ expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL); /* * Get string representation of each parameter value by invoking * type-specific output function, unless the value is null. */ if (isNull) values[i] = NULL; else values[i] = OutputFunctionCall(&fsstate->param_flinfo[i], expr_value); i++; } reset_transmission_modes(nestlevel); MemoryContextSwitchTo(oldcontext); } /* Construct the DECLARE CURSOR command */ initStringInfo(&buf); appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", fsstate->cursor_number, fsstate->query); /* * Notice that we pass NULL for paramTypes, thus forcing the remote server * to infer types for all parameters. Since we explicitly cast every * parameter (see deparse.c), the "inference" is trivial and will produce * the desired result. This allows us to avoid assuming that the remote * server has the same OIDs we do for the parameters' types. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = PQexecParams(conn, buf.data, numParams, NULL, values, NULL, NULL, 0); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, true, fsstate->query); PQclear(res); /* Mark the cursor as created, and show no tuples have been retrieved */ fsstate->cursor_exists = true; fsstate->tuples = NULL; fsstate->num_tuples = 0; fsstate->next_tuple = 0; fsstate->fetch_ct_2 = 0; fsstate->eof_reached = false; /* Clean up */ pfree(buf.data); }
static bool ec_member_matches_foreign | ( | PlannerInfo * | root, | |
RelOptInfo * | rel, | |||
EquivalenceClass * | ec, | |||
EquivalenceMember * | em, | |||
void * | arg | |||
) | [static] |
Definition at line 1815 of file postgres_fdw.c.
References ec_member_foreign_arg::already_used, ec_member_foreign_arg::current, EquivalenceMember::em_expr, equal(), list_member(), and NULL.
Referenced by postgresGetForeignPaths().
{ ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg; Expr *expr = em->em_expr; /* * If we've identified what we're processing in the current scan, we only * want to match that expression. */ if (state->current != NULL) return equal(expr, state->current); /* * Otherwise, ignore anything we've already processed. */ if (list_member(state->already_used, expr)) return false; /* This is the new target to process. */ state->current = expr; return true; }
static void estimate_path_cost_size | ( | PlannerInfo * | root, | |
RelOptInfo * | baserel, | |||
List * | join_conds, | |||
double * | p_rows, | |||
int * | p_width, | |||
Cost * | p_startup_cost, | |||
Cost * | p_total_cost | |||
) | [static] |
Definition at line 1646 of file postgres_fdw.c.
References appendStringInfoString(), appendWhereClause(), Assert, PgFdwRelationInfo::attrs_used, RelOptInfo::baserestrictcost, clamp_row_est(), conn, cpu_tuple_cost, StringInfoData::data, deparseSelectSql(), RelOptInfo::fdw_private, PgFdwRelationInfo::fdw_startup_cost, PgFdwRelationInfo::fdw_tuple_cost, get_remote_estimate(), GetConnection(), initStringInfo(), PgFdwRelationInfo::local_conds_cost, PgFdwRelationInfo::local_conds_sel, Min, NIL, NULL, RelOptInfo::pages, QualCost::per_tuple, ReleaseConnection(), PgFdwRelationInfo::remote_conds, RelOptInfo::rows, seq_page_cost, PgFdwRelationInfo::server, QualCost::startup, RelOptInfo::tuples, PgFdwRelationInfo::use_remote_estimate, PgFdwRelationInfo::user, and RelOptInfo::width.
Referenced by postgresGetForeignPaths(), and postgresGetForeignRelSize().
{ PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private; double rows; double retrieved_rows; int width; Cost startup_cost; Cost total_cost; Cost run_cost; Cost cpu_per_tuple; /* * If the table or the server is configured to use remote estimates, * connect to the foreign server and execute EXPLAIN to estimate the * number of rows selected by the restriction+join clauses. Otherwise, * estimate rows using whatever statistics we have locally, in a way * similar to ordinary tables. */ if (fpinfo->use_remote_estimate) { StringInfoData sql; List *retrieved_attrs; PGconn *conn; /* * Construct EXPLAIN query including the desired SELECT, FROM, and * WHERE clauses. Params and other-relation Vars are replaced by * dummy values. */ initStringInfo(&sql); appendStringInfoString(&sql, "EXPLAIN "); deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used, &retrieved_attrs); if (fpinfo->remote_conds) appendWhereClause(&sql, root, baserel, fpinfo->remote_conds, true, NULL); if (join_conds) appendWhereClause(&sql, root, baserel, join_conds, (fpinfo->remote_conds == NIL), NULL); /* Get the remote estimate */ conn = GetConnection(fpinfo->server, fpinfo->user, false); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); retrieved_rows = rows; /* Factor in the selectivity of the local_conds */ rows = clamp_row_est(rows * fpinfo->local_conds_sel); /* Add in the eval cost of the local_conds */ startup_cost += fpinfo->local_conds_cost.startup; total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; } else { /* * We don't support join conditions in this mode (hence, no * parameterized paths can be made). */ Assert(join_conds == NIL); /* Use rows/width estimates made by set_baserel_size_estimates. */ rows = baserel->rows; width = baserel->width; /* * Back into an estimate of the number of retrieved rows. Just in * case this is nuts, clamp to at most baserel->tuples. */ retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel); retrieved_rows = Min(retrieved_rows, baserel->tuples); /* * Cost as though this were a seqscan, which is pessimistic. We * effectively imagine the local_conds are being evaluated remotely, * too. */ startup_cost = 0; run_cost = 0; run_cost += seq_page_cost * baserel->pages; startup_cost += baserel->baserestrictcost.startup; cpu_per_tuple = cpu_tuple_cost + baserel->baserestrictcost.per_tuple; run_cost += cpu_per_tuple * baserel->tuples; total_cost = startup_cost + run_cost; } /* * Add some additional cost factors to account for connection overhead * (fdw_startup_cost), transferring data across the network * (fdw_tuple_cost per retrieved row), and local manipulation of the data * (cpu_tuple_cost per retrieved row). */ startup_cost += fpinfo->fdw_startup_cost; total_cost += fpinfo->fdw_startup_cost; total_cost += fpinfo->fdw_tuple_cost * retrieved_rows; total_cost += cpu_tuple_cost * retrieved_rows; /* Return results. */ *p_rows = rows; *p_width = width; *p_startup_cost = startup_cost; *p_total_cost = total_cost; }
static void fetch_more_data | ( | ForeignScanState * | node | ) | [static] |
Definition at line 1934 of file postgres_fdw.c.
References PgFdwScanState::attinmeta, PgFdwScanState::batch_cxt, PgFdwScanState::conn, conn, PgFdwScanState::cursor_number, PgFdwScanState::eof_reached, ERROR, ForeignScanState::fdw_state, PgFdwScanState::fetch_ct_2, i, make_tuple_from_result_row(), MemoryContextReset(), MemoryContextSwitchTo(), PgFdwScanState::next_tuple, NULL, PgFdwScanState::num_tuples, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_report_error(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQntuples(), PQresultStatus(), PgFdwScanState::query, PgFdwScanState::rel, PgFdwScanState::retrieved_attrs, snprintf(), PgFdwScanState::temp_cxt, and PgFdwScanState::tuples.
Referenced by postgresIterateForeignScan().
{ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; PGresult *volatile res = NULL; MemoryContext oldcontext; /* * We'll store the tuples in the batch_cxt. First, flush the previous * batch. */ fsstate->tuples = NULL; MemoryContextReset(fsstate->batch_cxt); oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); /* PGresult must be released before leaving this function. */ PG_TRY(); { PGconn *conn = fsstate->conn; char sql[64]; int fetch_size; int numrows; int i; /* The fetch size is arbitrary, but shouldn't be enormous. */ fetch_size = 100; snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", fetch_size, fsstate->cursor_number); res = PQexec(conn, sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, false, fsstate->query); /* Convert the data into HeapTuples */ numrows = PQntuples(res); fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); fsstate->num_tuples = numrows; fsstate->next_tuple = 0; for (i = 0; i < numrows; i++) { fsstate->tuples[i] = make_tuple_from_result_row(res, i, fsstate->rel, fsstate->attinmeta, fsstate->retrieved_attrs, fsstate->temp_cxt); } /* Update fetch_ct_2 */ if (fsstate->fetch_ct_2 < 2) fsstate->fetch_ct_2++; /* Must be EOF if we didn't get as many tuples as we asked for. */ fsstate->eof_reached = (numrows < fetch_size); PQclear(res); res = NULL; } PG_CATCH(); { if (res) PQclear(res); PG_RE_THROW(); } PG_END_TRY(); MemoryContextSwitchTo(oldcontext); }
static void get_remote_estimate | ( | const char * | sql, | |
PGconn * | conn, | |||
double * | rows, | |||
int * | width, | |||
Cost * | startup_cost, | |||
Cost * | total_cost | |||
) | [static] |
Definition at line 1763 of file postgres_fdw.c.
References elog, ERROR, NULL, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_report_error(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), and PQresultStatus().
Referenced by estimate_path_cost_size().
{ PGresult *volatile res = NULL; /* PGresult must be released before leaving this function. */ PG_TRY(); { char *line; char *p; int n; /* * Execute EXPLAIN remotely. */ res = PQexec(conn, sql); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, false, sql); /* * Extract cost numbers for topmost plan node. Note we search for a * left paren from the end of the line to avoid being confused by * other uses of parentheses. */ line = PQgetvalue(res, 0, 0); p = strrchr(line, '('); if (p == NULL) elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)", startup_cost, total_cost, rows, width); if (n != 4) elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); PQclear(res); res = NULL; } PG_CATCH(); { if (res) PQclear(res); PG_RE_THROW(); } PG_END_TRY(); }
static HeapTuple make_tuple_from_result_row | ( | PGresult * | res, | |
int | row, | |||
Relation | rel, | |||
AttInMetadata * | attinmeta, | |||
List * | retrieved_attrs, | |||
MemoryContext | temp_context | |||
) | [static] |
Definition at line 2507 of file postgres_fdw.c.
References ErrorContextCallback::arg, Assert, AttInMetadata::attinfuncs, AttInMetadata::attioparams, AttInMetadata::atttypmods, ErrorContextCallback::callback, CStringGetDatum, ConversionLocation::cur_attno, DatumGetPointer, DirectFunctionCall1, elog, ERROR, error_context_stack, heap_form_tuple(), i, InputFunctionCall(), lfirst_int, MemoryContextReset(), MemoryContextSwitchTo(), tupleDesc::natts, NULL, palloc(), palloc0(), PQgetisnull(), PQgetvalue(), PQnfields(), PQntuples(), ErrorContextCallback::previous, ConversionLocation::rel, RelationGetDescr, SelfItemPointerAttributeNumber, HeapTupleData::t_self, tidin(), and values.
Referenced by analyze_row_processor(), fetch_more_data(), and store_returning_result().
{ HeapTuple tuple; TupleDesc tupdesc = RelationGetDescr(rel); Datum *values; bool *nulls; ItemPointer ctid = NULL; ConversionLocation errpos; ErrorContextCallback errcallback; MemoryContext oldcontext; ListCell *lc; int j; Assert(row < PQntuples(res)); /* * Do the following work in a temp context that we reset after each tuple. * This cleans up not only the data we have direct access to, but any * cruft the I/O functions might leak. */ oldcontext = MemoryContextSwitchTo(temp_context); values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum)); nulls = (bool *) palloc(tupdesc->natts * sizeof(bool)); /* Initialize to nulls for any columns not present in result */ memset(nulls, true, tupdesc->natts * sizeof(bool)); /* * Set up and install callback to report where conversion error occurs. */ errpos.rel = rel; errpos.cur_attno = 0; errcallback.callback = conversion_error_callback; errcallback.arg = (void *) &errpos; errcallback.previous = error_context_stack; error_context_stack = &errcallback; /* * i indexes columns in the relation, j indexes columns in the PGresult. */ j = 0; foreach(lc, retrieved_attrs) { int i = lfirst_int(lc); char *valstr; /* fetch next column's textual value */ if (PQgetisnull(res, row, j)) valstr = NULL; else valstr = PQgetvalue(res, row, j); /* convert value to internal representation */ if (i > 0) { /* ordinary column */ Assert(i <= tupdesc->natts); nulls[i - 1] = (valstr == NULL); /* Apply the input function even to nulls, to support domains */ errpos.cur_attno = i; values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1], valstr, attinmeta->attioparams[i - 1], attinmeta->atttypmods[i - 1]); errpos.cur_attno = 0; } else if (i == SelfItemPointerAttributeNumber) { /* ctid --- note we ignore any other system column in result */ if (valstr != NULL) { Datum datum; datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr)); ctid = (ItemPointer) DatumGetPointer(datum); } } j++; } /* Uninstall error context callback. */ error_context_stack = errcallback.previous; /* * Check we got the expected number of columns. Note: j == 0 and * PQnfields == 1 is expected, since deparse emits a NULL if no columns. */ if (j > 0 && j != PQnfields(res)) elog(ERROR, "remote query result does not match the foreign table"); /* * Build the result tuple in caller's memory context. */ MemoryContextSwitchTo(oldcontext); tuple = heap_form_tuple(tupdesc, values, nulls); if (ctid) tuple->t_self = *ctid; /* Clean up */ MemoryContextReset(temp_context); return tuple; }
PG_FUNCTION_INFO_V1 | ( | postgres_fdw_handler | ) |
Datum postgres_fdw_handler | ( | PG_FUNCTION_ARGS | ) |
Definition at line 337 of file postgres_fdw.c.
References FdwRoutine::AddForeignUpdateTargets, FdwRoutine::AnalyzeForeignTable, FdwRoutine::BeginForeignModify, FdwRoutine::BeginForeignScan, FdwRoutine::EndForeignModify, FdwRoutine::EndForeignScan, FdwRoutine::ExecForeignDelete, FdwRoutine::ExecForeignInsert, FdwRoutine::ExecForeignUpdate, FdwRoutine::ExplainForeignModify, FdwRoutine::ExplainForeignScan, FdwRoutine::GetForeignPaths, FdwRoutine::GetForeignPlan, FdwRoutine::GetForeignRelSize, FdwRoutine::IterateForeignScan, makeNode, PG_RETURN_POINTER, FdwRoutine::PlanForeignModify, and FdwRoutine::ReScanForeignScan.
{ FdwRoutine *routine = makeNode(FdwRoutine); /* Functions for scanning foreign tables */ routine->GetForeignRelSize = postgresGetForeignRelSize; routine->GetForeignPaths = postgresGetForeignPaths; routine->GetForeignPlan = postgresGetForeignPlan; routine->BeginForeignScan = postgresBeginForeignScan; routine->IterateForeignScan = postgresIterateForeignScan; routine->ReScanForeignScan = postgresReScanForeignScan; routine->EndForeignScan = postgresEndForeignScan; /* Functions for updating foreign tables */ routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets; routine->PlanForeignModify = postgresPlanForeignModify; routine->BeginForeignModify = postgresBeginForeignModify; routine->ExecForeignInsert = postgresExecForeignInsert; routine->ExecForeignUpdate = postgresExecForeignUpdate; routine->ExecForeignDelete = postgresExecForeignDelete; routine->EndForeignModify = postgresEndForeignModify; /* Support functions for EXPLAIN */ routine->ExplainForeignScan = postgresExplainForeignScan; routine->ExplainForeignModify = postgresExplainForeignModify; /* Support functions for ANALYZE */ routine->AnalyzeForeignTable = postgresAnalyzeForeignTable; PG_RETURN_POINTER(routine); }
static int postgresAcquireSampleRowsFunc | ( | Relation | relation, | |
int | elevel, | |||
HeapTuple * | rows, | |||
int | targrows, | |||
double * | totalrows, | |||
double * | totaldeadrows | |||
) | [static] |
Definition at line 2298 of file postgres_fdw.c.
References ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE, ALLOCSET_SMALL_MINSIZE, AllocSetContextCreate(), analyze_row_processor(), PgFdwAnalyzeState::anl_cxt, anl_init_selection_state(), appendStringInfo(), PgFdwAnalyzeState::attinmeta, CHECK_FOR_INTERRUPTS, close_cursor(), conn, CurrentMemoryContext, cursor_number, StringInfoData::data, deparseAnalyzeSql(), ereport, errmsg(), ERROR, GetConnection(), GetCursorNumber(), GetForeignServer(), GetForeignTable(), GetUserMapping(), i, initStringInfo(), PgFdwAnalyzeState::numrows, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_report_error(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQexec(), PQntuples(), PQresultStatus(), RelationData::rd_rel, PgFdwAnalyzeState::rel, RelationGetDescr, RelationGetRelationName, RelationGetRelid, ReleaseConnection(), PgFdwAnalyzeState::retrieved_attrs, PgFdwAnalyzeState::rows, PgFdwAnalyzeState::rowstoskip, PgFdwAnalyzeState::rstate, PgFdwAnalyzeState::samplerows, ForeignServer::serverid, ForeignTable::serverid, snprintf(), PgFdwAnalyzeState::targrows, PgFdwAnalyzeState::temp_cxt, TupleDescGetAttInMetadata(), and user.
{ PgFdwAnalyzeState astate; ForeignTable *table; ForeignServer *server; UserMapping *user; PGconn *conn; unsigned int cursor_number; StringInfoData sql; PGresult *volatile res = NULL; /* Initialize workspace state */ astate.rel = relation; astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation)); astate.rows = rows; astate.targrows = targrows; astate.numrows = 0; astate.samplerows = 0; astate.rowstoskip = -1; /* -1 means not set yet */ astate.rstate = anl_init_selection_state(targrows); /* Remember ANALYZE context, and create a per-tuple temp context */ astate.anl_cxt = CurrentMemoryContext; astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext, "postgres_fdw temporary data", ALLOCSET_SMALL_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); /* * Get the connection to use. We do the remote access as the table's * owner, even if the ANALYZE was started by some other user. */ table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, server->serverid); conn = GetConnection(server, user, false); /* * Construct cursor that retrieves whole rows from remote. */ cursor_number = GetCursorNumber(conn); initStringInfo(&sql); appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number); deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs); /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { res = PQexec(conn, sql.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, false, sql.data); PQclear(res); res = NULL; /* Retrieve and process rows a batch at a time. */ for (;;) { char fetch_sql[64]; int fetch_size; int numrows; int i; /* Allow users to cancel long query */ CHECK_FOR_INTERRUPTS(); /* * XXX possible future improvement: if rowstoskip is large, we * could issue a MOVE rather than physically fetching the rows, * then just adjust rowstoskip and samplerows appropriately. */ /* The fetch size is arbitrary, but shouldn't be enormous. */ fetch_size = 100; /* Fetch some rows */ snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", fetch_size, cursor_number); res = PQexec(conn, fetch_sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, false, sql.data); /* Process whatever we got. */ numrows = PQntuples(res); for (i = 0; i < numrows; i++) analyze_row_processor(res, i, &astate); PQclear(res); res = NULL; /* Must be EOF if we didn't get all the rows requested. */ if (numrows < fetch_size) break; } /* Close the cursor, just to be tidy. */ close_cursor(conn, cursor_number); } PG_CATCH(); { if (res) PQclear(res); PG_RE_THROW(); } PG_END_TRY(); ReleaseConnection(conn); /* We assume that we have no dead tuple. */ *totaldeadrows = 0.0; /* We've retrieved all living tuples from foreign server. */ *totalrows = astate.samplerows; /* * Emit some interesting relation info */ ereport(elevel, (errmsg("\"%s\": table contains %.0f rows, %d rows in sample", RelationGetRelationName(relation), astate.samplerows, astate.numrows))); return astate.numrows; }
static void postgresAddForeignUpdateTargets | ( | Query * | parsetree, | |
RangeTblEntry * | target_rte, | |||
Relation | target_relation | |||
) | [static] |
Definition at line 1106 of file postgres_fdw.c.
References InvalidOid, lappend(), list_length(), makeTargetEntry(), makeVar(), pstrdup(), Query::resultRelation, SelfItemPointerAttributeNumber, Query::targetList, and TIDOID.
{ Var *var; const char *attrname; TargetEntry *tle; /* * In postgres_fdw, what we need is the ctid, same as for a regular table. */ /* Make a Var representing the desired value */ var = makeVar(parsetree->resultRelation, SelfItemPointerAttributeNumber, TIDOID, -1, InvalidOid, 0); /* Wrap it in a resjunk TLE with the right name ... */ attrname = "ctid"; tle = makeTargetEntry((Expr *) var, list_length(parsetree->targetList) + 1, pstrdup(attrname), true); /* ... and add it to the query's targetlist */ parsetree->targetList = lappend(parsetree->targetList, tle); }
static bool postgresAnalyzeForeignTable | ( | Relation | relation, | |
AcquireSampleRowsFunc * | func, | |||
BlockNumber * | totalpages | |||
) | [static] |
Definition at line 2218 of file postgres_fdw.c.
References conn, StringInfoData::data, deparseAnalyzeSizeSql(), elog, ERROR, GetConnection(), GetForeignServer(), GetForeignTable(), GetUserMapping(), initStringInfo(), NULL, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgfdw_report_error(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), RelationData::rd_rel, RelationGetRelid, ReleaseConnection(), ForeignServer::serverid, ForeignTable::serverid, and user.
{ ForeignTable *table; ForeignServer *server; UserMapping *user; PGconn *conn; StringInfoData sql; PGresult *volatile res = NULL; /* Return the row-analysis function pointer */ *func = postgresAcquireSampleRowsFunc; /* * Now we have to get the number of pages. It's annoying that the ANALYZE * API requires us to return that now, because it forces some duplication * of effort between this routine and postgresAcquireSampleRowsFunc. But * it's probably not worth redefining that API at this point. */ /* * Get the connection to use. We do the remote access as the table's * owner, even if the ANALYZE was started by some other user. */ table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, server->serverid); conn = GetConnection(server, user, false); /* * Construct command to get page count for relation. */ initStringInfo(&sql); deparseAnalyzeSizeSql(&sql, relation); /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { res = PQexec(conn, sql.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, false, sql.data); if (PQntuples(res) != 1 || PQnfields(res) != 1) elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query"); *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10); PQclear(res); res = NULL; } PG_CATCH(); { if (res) PQclear(res); PG_RE_THROW(); } PG_END_TRY(); ReleaseConnection(conn); return true; }
static void postgresBeginForeignModify | ( | ModifyTableState * | mtstate, | |
ResultRelInfo * | resultRelInfo, | |||
List * | fdw_private, | |||
int | subplan_index, | |||
int | eflags | |||
) | [static] |
Definition at line 1253 of file postgres_fdw.c.
References ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE, ALLOCSET_SMALL_MINSIZE, AllocSetContextCreate(), Assert, PgFdwModifyState::attinmeta, AttributeNumberIsValid, RangeTblEntry::checkAsUser, CMD_DELETE, CMD_INSERT, CMD_UPDATE, PgFdwModifyState::conn, PgFdwModifyState::ctidAttno, elog, ERROR, EState::es_query_cxt, EState::es_range_table, EXEC_FLAG_EXPLAIN_ONLY, ExecFindJunkAttributeInTlist(), FdwModifyPrivateHasReturning, FdwModifyPrivateRetrievedAttrs, FdwModifyPrivateTargetAttnums, FdwModifyPrivateUpdateSql, fmgr_info(), GetConnection(), GetForeignServer(), GetForeignTable(), getTypeOutputInfo(), GetUserId(), GetUserMapping(), PgFdwModifyState::has_returning, intVal, lfirst_int, list_length(), list_nth(), ModifyTableState::mt_plans, ModifyTableState::operation, PgFdwModifyState::p_flinfo, PgFdwModifyState::p_name, PgFdwModifyState::p_nums, palloc0(), PlanState::plan, ModifyTableState::ps, PgFdwModifyState::query, PgFdwModifyState::rel, RelationGetDescr, RelationGetRelid, PgFdwModifyState::retrieved_attrs, ResultRelInfo::ri_FdwState, ResultRelInfo::ri_RangeTableIndex, ResultRelInfo::ri_RelationDesc, rt_fetch, ForeignServer::serverid, ForeignTable::serverid, PlanState::state, strVal, PgFdwModifyState::target_attrs, Plan::targetlist, PgFdwModifyState::temp_cxt, TIDOID, TupleDescGetAttInMetadata(), and user.
{ PgFdwModifyState *fmstate; EState *estate = mtstate->ps.state; CmdType operation = mtstate->operation; Relation rel = resultRelInfo->ri_RelationDesc; RangeTblEntry *rte; Oid userid; ForeignTable *table; ForeignServer *server; UserMapping *user; AttrNumber n_params; Oid typefnoid; bool isvarlena; ListCell *lc; /* * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState * stays NULL. */ if (eflags & EXEC_FLAG_EXPLAIN_ONLY) return; /* Begin constructing PgFdwModifyState. */ fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState)); fmstate->rel = rel; /* * Identify which user to do the remote access as. This should match what * ExecCheckRTEPerms() does. */ rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table); userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); /* Get info about foreign table. */ table = GetForeignTable(RelationGetRelid(rel)); server = GetForeignServer(table->serverid); user = GetUserMapping(userid, server->serverid); /* Open connection; report that we'll create a prepared statement. */ fmstate->conn = GetConnection(server, user, true); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Deconstruct fdw_private data. */ fmstate->query = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql)); fmstate->target_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateTargetAttnums); fmstate->has_returning = intVal(list_nth(fdw_private, FdwModifyPrivateHasReturning)); fmstate->retrieved_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateRetrievedAttrs); /* Create context for per-tuple temp workspace. */ fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw temporary data", ALLOCSET_SMALL_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); /* Prepare for input conversion of RETURNING results. */ if (fmstate->has_returning) fmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel)); /* Prepare for output conversion of parameters used in prepared stmt. */ n_params = list_length(fmstate->target_attrs) + 1; fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); fmstate->p_nums = 0; if (operation == CMD_UPDATE || operation == CMD_DELETE) { /* Find the ctid resjunk column in the subplan's result */ Plan *subplan = mtstate->mt_plans[subplan_index]->plan; fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, "ctid"); if (!AttributeNumberIsValid(fmstate->ctidAttno)) elog(ERROR, "could not find junk ctid column"); /* First transmittable parameter will be ctid */ getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); fmstate->p_nums++; } if (operation == CMD_INSERT || operation == CMD_UPDATE) { /* Set up for remaining transmittable parameters */ foreach(lc, fmstate->target_attrs) { int attnum = lfirst_int(lc); Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1]; Assert(!attr->attisdropped); getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); fmstate->p_nums++; } } Assert(fmstate->p_nums <= n_params); resultRelInfo->ri_FdwState = fmstate; }
static void postgresBeginForeignScan | ( | ForeignScanState * | node, | |
int | eflags | |||
) | [static] |
Definition at line 871 of file postgres_fdw.c.
References ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE, ALLOCSET_SMALL_MINSIZE, AllocSetContextCreate(), EXEC_FLAG_EXPLAIN_ONLY, ExecInitExpr(), exprType(), ForeignScan::fdw_exprs, ForeignScan::fdw_private, ForeignScanState::fdw_state, FdwScanPrivateRetrievedAttrs, FdwScanPrivateSelectSql, fmgr_info(), GetConnection(), GetCursorNumber(), GetForeignServer(), GetForeignTable(), getTypeOutputInfo(), GetUserId(), GetUserMapping(), i, lfirst, list_length(), list_nth(), palloc0(), PlanState::plan, ScanState::ps, RelationGetDescr, RelationGetRelid, rt_fetch, ForeignScan::scan, Scan::scanrelid, ForeignScanState::ss, ScanState::ss_currentRelation, PlanState::state, strVal, TupleDescGetAttInMetadata(), and user.
{ ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; EState *estate = node->ss.ps.state; PgFdwScanState *fsstate; RangeTblEntry *rte; Oid userid; ForeignTable *table; ForeignServer *server; UserMapping *user; int numParams; int i; ListCell *lc; /* * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. */ if (eflags & EXEC_FLAG_EXPLAIN_ONLY) return; /* * We'll save private state in node->fdw_state. */ fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState)); node->fdw_state = (void *) fsstate; /* * Identify which user to do the remote access as. This should match what * ExecCheckRTEPerms() does. */ rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table); userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); /* Get info about foreign table. */ fsstate->rel = node->ss.ss_currentRelation; table = GetForeignTable(RelationGetRelid(fsstate->rel)); server = GetForeignServer(table->serverid); user = GetUserMapping(userid, server->serverid); /* * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ fsstate->conn = GetConnection(server, user, false); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); fsstate->cursor_exists = false; /* Get private info created by planner functions. */ fsstate->query = strVal(list_nth(fsplan->fdw_private, FdwScanPrivateSelectSql)); fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, FdwScanPrivateRetrievedAttrs); /* Create contexts for batches of tuples and per-tuple temp workspace. */ fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw tuple data", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw temporary data", ALLOCSET_SMALL_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); /* Get info we'll need for input data conversion. */ fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel)); /* Prepare for output conversion of parameters used in remote query. */ numParams = list_length(fsplan->fdw_exprs); fsstate->numParams = numParams; fsstate->param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams); i = 0; foreach(lc, fsplan->fdw_exprs) { Node *param_expr = (Node *) lfirst(lc); Oid typefnoid; bool isvarlena; getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena); fmgr_info(typefnoid, &fsstate->param_flinfo[i]); i++; } /* * Prepare remote-parameter expressions for evaluation. (Note: in * practice, we expect that all these expressions will be just Params, so * we could possibly do something more efficient than using the full * expression-eval machinery for this. But probably there would be little * benefit, and it'd require postgres_fdw to know more than is desirable * about Param evaluation.) */ fsstate->param_exprs = (List *) ExecInitExpr((Expr *) fsplan->fdw_exprs, (PlanState *) node); /* * Allocate buffer for text form of query parameters, if any. */ if (numParams > 0) fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *)); else fsstate->param_values = NULL; }
static void postgresEndForeignModify | ( | EState * | estate, | |
ResultRelInfo * | resultRelInfo | |||
) | [static] |
Definition at line 1566 of file postgres_fdw.c.
References PgFdwModifyState::conn, ERROR, NULL, PgFdwModifyState::p_name, pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQexec(), PQresultStatus(), ReleaseConnection(), ResultRelInfo::ri_FdwState, and snprintf().
{ PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; /* If fmstate is NULL, we are in EXPLAIN; nothing to do */ if (fmstate == NULL) return; /* If we created a prepared statement, destroy it */ if (fmstate->p_name) { char sql[64]; PGresult *res; snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); /* * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = PQexec(fmstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, true, sql); PQclear(res); fmstate->p_name = NULL; } /* Release remote connection */ ReleaseConnection(fmstate->conn); fmstate->conn = NULL; }
static void postgresEndForeignScan | ( | ForeignScanState * | node | ) | [static] |
Definition at line 1082 of file postgres_fdw.c.
References close_cursor(), PgFdwScanState::conn, PgFdwScanState::cursor_exists, PgFdwScanState::cursor_number, ForeignScanState::fdw_state, NULL, and ReleaseConnection().
{ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ if (fsstate == NULL) return; /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) close_cursor(fsstate->conn, fsstate->cursor_number); /* Release remote connection */ ReleaseConnection(fsstate->conn); fsstate->conn = NULL; /* MemoryContexts will be deleted automatically. */ }
static TupleTableSlot * postgresExecForeignDelete | ( | EState * | estate, | |
ResultRelInfo * | resultRelInfo, | |||
TupleTableSlot * | slot, | |||
TupleTableSlot * | planSlot | |||
) | [static] |
Definition at line 1496 of file postgres_fdw.c.
References PgFdwModifyState::conn, convert_prep_stmt_params(), PgFdwModifyState::ctidAttno, DatumGetPointer, elog, ERROR, ExecGetJunkAttribute(), PgFdwModifyState::has_returning, MemoryContextReset(), NULL, PgFdwModifyState::p_name, PgFdwModifyState::p_nums, pgfdw_report_error(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQcmdTuples(), PQexecPrepared(), PQntuples(), PQresultStatus(), prepare_foreign_modify(), PgFdwModifyState::query, ResultRelInfo::ri_FdwState, store_returning_result(), and PgFdwModifyState::temp_cxt.
{ PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; Datum datum; bool isNull; const char **p_values; PGresult *res; int n_rows; /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); /* Get the ctid that was passed up as a resjunk column */ datum = ExecGetJunkAttribute(planSlot, fmstate->ctidAttno, &isNull); /* shouldn't ever get a null result... */ if (isNull) elog(ERROR, "ctid is NULL"); /* Convert parameters needed by prepared statement to text form */ p_values = convert_prep_stmt_params(fmstate, (ItemPointer) DatumGetPointer(datum), NULL); /* * Execute the prepared statement, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = PQexecPrepared(fmstate->conn, fmstate->p_name, fmstate->p_nums, p_values, NULL, NULL, 0); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, true, fmstate->query); /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) { n_rows = PQntuples(res); if (n_rows > 0) store_returning_result(fmstate, slot, res); } else n_rows = atoi(PQcmdTuples(res)); /* And clean up */ PQclear(res); MemoryContextReset(fmstate->temp_cxt); /* Return NULL if nothing was deleted on the remote end */ return (n_rows > 0) ? slot : NULL; }
static TupleTableSlot * postgresExecForeignInsert | ( | EState * | estate, | |
ResultRelInfo * | resultRelInfo, | |||
TupleTableSlot * | slot, | |||
TupleTableSlot * | planSlot | |||
) | [static] |
Definition at line 1368 of file postgres_fdw.c.
References PgFdwModifyState::conn, convert_prep_stmt_params(), ERROR, PgFdwModifyState::has_returning, MemoryContextReset(), NULL, PgFdwModifyState::p_name, PgFdwModifyState::p_nums, pgfdw_report_error(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQcmdTuples(), PQexecPrepared(), PQntuples(), PQresultStatus(), prepare_foreign_modify(), PgFdwModifyState::query, ResultRelInfo::ri_FdwState, store_returning_result(), and PgFdwModifyState::temp_cxt.
{ PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; const char **p_values; PGresult *res; int n_rows; /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); /* Convert parameters needed by prepared statement to text form */ p_values = convert_prep_stmt_params(fmstate, NULL, slot); /* * Execute the prepared statement, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = PQexecPrepared(fmstate->conn, fmstate->p_name, fmstate->p_nums, p_values, NULL, NULL, 0); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, true, fmstate->query); /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) { n_rows = PQntuples(res); if (n_rows > 0) store_returning_result(fmstate, slot, res); } else n_rows = atoi(PQcmdTuples(res)); /* And clean up */ PQclear(res); MemoryContextReset(fmstate->temp_cxt); /* Return NULL if nothing was inserted on the remote end */ return (n_rows > 0) ? slot : NULL; }
static TupleTableSlot * postgresExecForeignUpdate | ( | EState * | estate, | |
ResultRelInfo * | resultRelInfo, | |||
TupleTableSlot * | slot, | |||
TupleTableSlot * | planSlot | |||
) | [static] |
Definition at line 1426 of file postgres_fdw.c.
References PgFdwModifyState::conn, convert_prep_stmt_params(), PgFdwModifyState::ctidAttno, DatumGetPointer, elog, ERROR, ExecGetJunkAttribute(), PgFdwModifyState::has_returning, MemoryContextReset(), NULL, PgFdwModifyState::p_name, PgFdwModifyState::p_nums, pgfdw_report_error(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQcmdTuples(), PQexecPrepared(), PQntuples(), PQresultStatus(), prepare_foreign_modify(), PgFdwModifyState::query, ResultRelInfo::ri_FdwState, store_returning_result(), and PgFdwModifyState::temp_cxt.
{ PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; Datum datum; bool isNull; const char **p_values; PGresult *res; int n_rows; /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); /* Get the ctid that was passed up as a resjunk column */ datum = ExecGetJunkAttribute(planSlot, fmstate->ctidAttno, &isNull); /* shouldn't ever get a null result... */ if (isNull) elog(ERROR, "ctid is NULL"); /* Convert parameters needed by prepared statement to text form */ p_values = convert_prep_stmt_params(fmstate, (ItemPointer) DatumGetPointer(datum), slot); /* * Execute the prepared statement, and check for success. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = PQexecPrepared(fmstate->conn, fmstate->p_name, fmstate->p_nums, p_values, NULL, NULL, 0); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, true, fmstate->query); /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) { n_rows = PQntuples(res); if (n_rows > 0) store_returning_result(fmstate, slot, res); } else n_rows = atoi(PQcmdTuples(res)); /* And clean up */ PQclear(res); MemoryContextReset(fmstate->temp_cxt); /* Return NULL if nothing was updated on the remote end */ return (n_rows > 0) ? slot : NULL; }
static void postgresExplainForeignModify | ( | ModifyTableState * | mtstate, | |
ResultRelInfo * | rinfo, | |||
List * | fdw_private, | |||
int | subplan_index, | |||
ExplainState * | es | |||
) | [static] |
Definition at line 1622 of file postgres_fdw.c.
References ExplainPropertyText(), FdwModifyPrivateUpdateSql, list_nth(), strVal, and ExplainState::verbose.
{ if (es->verbose) { char *sql = strVal(list_nth(fdw_private, FdwModifyPrivateUpdateSql)); ExplainPropertyText("Remote SQL", sql, es); } }
static void postgresExplainForeignScan | ( | ForeignScanState * | node, | |
ExplainState * | es | |||
) | [static] |
Definition at line 1604 of file postgres_fdw.c.
References ExplainPropertyText(), FdwScanPrivateSelectSql, list_nth(), PlanState::plan, ScanState::ps, ForeignScanState::ss, strVal, and ExplainState::verbose.
{ List *fdw_private; char *sql; if (es->verbose) { fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); ExplainPropertyText("Remote SQL", sql, es); } }
static void postgresGetForeignPaths | ( | PlannerInfo * | root, | |
RelOptInfo * | baserel, | |||
Oid | foreigntableid | |||
) | [static] |
Definition at line 535 of file postgres_fdw.c.
References add_path(), ec_member_foreign_arg::already_used, arg, Assert, bms_add_member(), bms_del_member(), bms_is_empty(), bms_is_member(), bms_overlap(), bms_union(), RestrictInfo::clause, RestrictInfo::clause_relids, create_foreignscan_path(), ec_member_foreign_arg::current, ec_member_matches_foreign(), estimate_path_cost_size(), RelOptInfo::fdw_private, generate_implied_equalities_for_column(), RelOptInfo::has_eclass_joins, is_foreign_expr(), join_clause_is_movable_to(), RelOptInfo::joininfo, lappend(), PlannerInfo::lateral_info_list, LateralJoinInfo::lateral_lhs, RelOptInfo::lateral_relids, LateralJoinInfo::lateral_rhs, lfirst, list_make1, NIL, NULL, RelOptInfo::relid, PgFdwRelationInfo::rows, PgFdwRelationInfo::startup_cost, PgFdwRelationInfo::total_cost, and PgFdwRelationInfo::use_remote_estimate.
{ PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private; ForeignPath *path; Relids lateral_referencers; List *join_quals; Relids required_outer; double rows; int width; Cost startup_cost; Cost total_cost; ListCell *lc; /* * Create simplest ForeignScan path node and add it to baserel. This path * corresponds to SeqScan path of regular tables (though depending on what * baserestrict conditions we were able to send to remote, there might * actually be an indexscan happening there). We already did all the work * to estimate cost and size of this path. */ path = create_foreignscan_path(root, baserel, fpinfo->rows, fpinfo->startup_cost, fpinfo->total_cost, NIL, /* no pathkeys */ NULL, /* no outer rel either */ NIL); /* no fdw_private list */ add_path(baserel, (Path *) path); /* * If we're not using remote estimates, stop here. We have no way to * estimate whether any join clauses would be worth sending across, so * don't bother building parameterized paths. */ if (!fpinfo->use_remote_estimate) return; /* * As a crude first hack, we consider each available join clause and try * to make a parameterized path using just that clause. Later we should * consider combinations of clauses, probably. */ /* * If there are any rels that have LATERAL references to this one, we * cannot use join quals referencing them as remote quals for this one, * since such rels would have to be on the inside not the outside of a * nestloop join relative to this one. Create a Relids set listing all * such rels, for use in checks of potential join clauses. */ lateral_referencers = NULL; foreach(lc, root->lateral_info_list) { LateralJoinInfo *ljinfo = (LateralJoinInfo *) lfirst(lc); if (bms_is_member(baserel->relid, ljinfo->lateral_lhs)) lateral_referencers = bms_add_member(lateral_referencers, ljinfo->lateral_rhs); } /* Scan the rel's join clauses */ foreach(lc, baserel->joininfo) { RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); /* Check if clause can be moved to this rel */ if (!join_clause_is_movable_to(rinfo, baserel->relid)) continue; /* Not useful if it conflicts with any LATERAL references */ if (bms_overlap(rinfo->clause_relids, lateral_referencers)) continue; /* See if it is safe to send to remote */ if (!is_foreign_expr(root, baserel, rinfo->clause)) continue; /* * OK, get a cost estimate from the remote, and make a path. */ join_quals = list_make1(rinfo); estimate_path_cost_size(root, baserel, join_quals, &rows, &width, &startup_cost, &total_cost); /* Must calculate required outer rels for this path */ required_outer = bms_union(rinfo->clause_relids, baserel->lateral_relids); /* We do not want the foreign rel itself listed in required_outer */ required_outer = bms_del_member(required_outer, baserel->relid); /* Enforce convention that required_outer is exactly NULL if empty */ if (bms_is_empty(required_outer)) required_outer = NULL; path = create_foreignscan_path(root, baserel, rows, startup_cost, total_cost, NIL, /* no pathkeys */ required_outer, NIL); /* no fdw_private list */ add_path(baserel, (Path *) path); } /* * The above scan examined only "generic" join clauses, not those that * were absorbed into EquivalenceClauses. See if we can make anything out * of EquivalenceClauses. */ if (baserel->has_eclass_joins) { /* * We repeatedly scan the eclass list looking for column references * (or expressions) belonging to the foreign rel. Each time we find * one, we generate a list of equivalence joinclauses for it, and then * try to make those into foreign paths. Repeat till there are no * more candidate EC members. */ ec_member_foreign_arg arg; arg.already_used = NIL; for (;;) { List *clauses; /* Make clauses, skipping any that join to lateral_referencers */ arg.current = NULL; clauses = generate_implied_equalities_for_column(root, baserel, ec_member_matches_foreign, (void *) &arg, lateral_referencers); /* Done if there are no more expressions in the foreign rel */ if (arg.current == NULL) { Assert(clauses == NIL); break; } /* Scan the extracted join clauses */ foreach(lc, clauses) { RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); /* Check if clause can be moved to this rel */ if (!join_clause_is_movable_to(rinfo, baserel->relid)) continue; /* Shouldn't conflict with any LATERAL references */ Assert(!bms_overlap(rinfo->clause_relids, lateral_referencers)); /* See if it is safe to send to remote */ if (!is_foreign_expr(root, baserel, rinfo->clause)) continue; /* * OK, get a cost estimate from the remote, and make a path. */ join_quals = list_make1(rinfo); estimate_path_cost_size(root, baserel, join_quals, &rows, &width, &startup_cost, &total_cost); /* Must calculate required outer rels for this path */ required_outer = bms_union(rinfo->clause_relids, baserel->lateral_relids); required_outer = bms_del_member(required_outer, baserel->relid); if (bms_is_empty(required_outer)) required_outer = NULL; path = create_foreignscan_path(root, baserel, rows, startup_cost, total_cost, NIL, /* no pathkeys */ required_outer, NIL); /* no fdw_private */ add_path(baserel, (Path *) path); } /* Try again, now ignoring the expression we found this time */ arg.already_used = lappend(arg.already_used, arg.current); } } }
static ForeignScan * postgresGetForeignPlan | ( | PlannerInfo * | root, | |
RelOptInfo * | baserel, | |||
Oid | foreigntableid, | |||
ForeignPath * | best_path, | |||
List * | tlist, | |||
List * | scan_clauses | |||
) | [static] |
Definition at line 729 of file postgres_fdw.c.
References appendStringInfo(), appendWhereClause(), Assert, PgFdwRelationInfo::attrs_used, RestrictInfo::clause, CMD_DELETE, CMD_UPDATE, Query::commandType, deparseSelectSql(), RelOptInfo::fdw_private, get_parse_rowmark(), initStringInfo(), is_foreign_expr(), IsA, lappend(), LCS_FORKEYSHARE, LCS_FORNOKEYUPDATE, LCS_FORSHARE, LCS_FORUPDATE, lfirst, list_make2, list_member_ptr(), PgFdwRelationInfo::local_conds, make_foreignscan(), makeString(), NIL, PlannerInfo::parse, RestrictInfo::pseudoconstant, RelOptInfo::relid, PgFdwRelationInfo::remote_conds, Query::resultRelation, and RowMarkClause::strength.
{ PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private; Index scan_relid = baserel->relid; List *fdw_private; List *remote_conds = NIL; List *local_exprs = NIL; List *params_list = NIL; List *retrieved_attrs; StringInfoData sql; ListCell *lc; /* * Separate the scan_clauses into those that can be executed remotely and * those that can't. baserestrictinfo clauses that were previously * determined to be safe or unsafe by classifyClauses are shown in * fpinfo->remote_conds and fpinfo->local_conds. Anything else in the * scan_clauses list should be a join clause that was found safe by * postgresGetForeignPaths. * * Note: for clauses extracted from EquivalenceClasses, it's possible that * what we get here is a different representation of the clause than what * postgresGetForeignPaths saw; for example we might get a commuted * version of the clause. So we can't insist on simple equality as we do * for the baserestrictinfo clauses. * * This code must match "extract_actual_clauses(scan_clauses, false)" * except for the additional decision about remote versus local execution. * Note however that we only strip the RestrictInfo nodes from the * local_exprs list, since appendWhereClause expects a list of * RestrictInfos. */ foreach(lc, scan_clauses) { RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); Assert(IsA(rinfo, RestrictInfo)); /* Ignore any pseudoconstants, they're dealt with elsewhere */ if (rinfo->pseudoconstant) continue; if (list_member_ptr(fpinfo->remote_conds, rinfo)) remote_conds = lappend(remote_conds, rinfo); else if (list_member_ptr(fpinfo->local_conds, rinfo)) local_exprs = lappend(local_exprs, rinfo->clause); else { Assert(is_foreign_expr(root, baserel, rinfo->clause)); remote_conds = lappend(remote_conds, rinfo); } } /* * Build the query string to be sent for execution, and identify * expressions to be sent as parameters. */ initStringInfo(&sql); deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used, &retrieved_attrs); if (remote_conds) appendWhereClause(&sql, root, baserel, remote_conds, true, ¶ms_list); /* * Add FOR UPDATE/SHARE if appropriate. We apply locking during the * initial row fetch, rather than later on as is done for local tables. * The extra roundtrips involved in trying to duplicate the local * semantics exactly don't seem worthwhile (see also comments for * RowMarkType). * * Note: because we actually run the query as a cursor, this assumes that * DECLARE CURSOR ... FOR UPDATE is supported, which it isn't before 8.3. */ if (baserel->relid == root->parse->resultRelation && (root->parse->commandType == CMD_UPDATE || root->parse->commandType == CMD_DELETE)) { /* Relation is UPDATE/DELETE target, so use FOR UPDATE */ appendStringInfo(&sql, " FOR UPDATE"); } else { RowMarkClause *rc = get_parse_rowmark(root->parse, baserel->relid); if (rc) { /* * Relation is specified as a FOR UPDATE/SHARE target, so handle * that. * * For now, just ignore any [NO] KEY specification, since (a) it's * not clear what that means for a remote table that we don't have * complete information about, and (b) it wouldn't work anyway on * older remote servers. Likewise, we don't worry about NOWAIT. */ switch (rc->strength) { case LCS_FORKEYSHARE: case LCS_FORSHARE: appendStringInfo(&sql, " FOR SHARE"); break; case LCS_FORNOKEYUPDATE: case LCS_FORUPDATE: appendStringInfo(&sql, " FOR UPDATE"); break; } } } /* * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwScanPrivateIndex, above. */ fdw_private = list_make2(makeString(sql.data), retrieved_attrs); /* * Create the ForeignScan node from target list, local filtering * expressions, remote parameter expressions, and FDW private information. * * Note that the remote parameter expressions are stored in the fdw_exprs * field of the finished plan node; we can't keep them in private state * because then they wouldn't be subject to later planner processing. */ return make_foreignscan(tlist, local_exprs, scan_relid, params_list, fdw_private); }
static void postgresGetForeignRelSize | ( | PlannerInfo * | root, | |
RelOptInfo * | baserel, | |||
Oid | foreigntableid | |||
) | [static] |
Definition at line 377 of file postgres_fdw.c.
References PgFdwRelationInfo::attrs_used, RangeTblEntry::checkAsUser, classifyConditions(), RestrictInfo::clause, clauselist_selectivity(), cost_qual_eval(), defGetBoolean(), defGetString(), DefElem::defname, estimate_path_cost_size(), RelOptInfo::fdw_private, PgFdwRelationInfo::fdw_startup_cost, PgFdwRelationInfo::fdw_tuple_cost, GetForeignServer(), GetForeignTable(), GetUserId(), GetUserMapping(), JOIN_INNER, lfirst, PgFdwRelationInfo::local_conds, PgFdwRelationInfo::local_conds_cost, PgFdwRelationInfo::local_conds_sel, NIL, NULL, ForeignTable::options, ForeignServer::options, RelOptInfo::pages, palloc0(), planner_rt_fetch, pull_varattnos(), RelOptInfo::relid, RelOptInfo::reltargetlist, PgFdwRelationInfo::remote_conds, RelOptInfo::rows, PgFdwRelationInfo::rows, PgFdwRelationInfo::server, ForeignServer::serverid, ForeignTable::serverid, set_baserel_size_estimates(), PgFdwRelationInfo::startup_cost, PgFdwRelationInfo::table, PgFdwRelationInfo::total_cost, RelOptInfo::tuples, PgFdwRelationInfo::use_remote_estimate, PgFdwRelationInfo::user, RelOptInfo::width, and PgFdwRelationInfo::width.
{ PgFdwRelationInfo *fpinfo; ListCell *lc; /* * We use PgFdwRelationInfo to pass various information to subsequent * functions. */ fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); baserel->fdw_private = (void *) fpinfo; /* Look up foreign-table catalog info. */ fpinfo->table = GetForeignTable(foreigntableid); fpinfo->server = GetForeignServer(fpinfo->table->serverid); /* * Extract user-settable option values. Note that per-table setting of * use_remote_estimate overrides per-server setting. */ fpinfo->use_remote_estimate = false; fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST; fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; foreach(lc, fpinfo->server->options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "use_remote_estimate") == 0) fpinfo->use_remote_estimate = defGetBoolean(def); else if (strcmp(def->defname, "fdw_startup_cost") == 0) fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL); else if (strcmp(def->defname, "fdw_tuple_cost") == 0) fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL); } foreach(lc, fpinfo->table->options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "use_remote_estimate") == 0) { fpinfo->use_remote_estimate = defGetBoolean(def); break; /* only need the one value */ } } /* * If the table or the server is configured to use remote estimates, * identify which user to do remote access as during planning. This * should match what ExecCheckRTEPerms() does. If we fail due to lack of * permissions, the query would have failed at runtime anyway. */ if (fpinfo->use_remote_estimate) { RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root); Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid); } else fpinfo->user = NULL; /* * Identify which baserestrictinfo clauses can be sent to the remote * server and which can't. */ classifyConditions(root, baserel, &fpinfo->remote_conds, &fpinfo->local_conds); /* * Identify which attributes will need to be retrieved from the remote * server. These include all attrs needed for joins or final output, plus * all attrs used in the local_conds. (Note: if we end up using a * parameterized scan, it's possible that some of the join clauses will be * sent to the remote and thus we wouldn't really need to retrieve the * columns used in them. Doesn't seem worth detecting that case though.) */ fpinfo->attrs_used = NULL; pull_varattnos((Node *) baserel->reltargetlist, baserel->relid, &fpinfo->attrs_used); foreach(lc, fpinfo->local_conds) { RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); pull_varattnos((Node *) rinfo->clause, baserel->relid, &fpinfo->attrs_used); } /* * Compute the selectivity and cost of the local_conds, so we don't have * to do it over again for each path. The best we can do for these * conditions is to estimate selectivity on the basis of local statistics. */ fpinfo->local_conds_sel = clauselist_selectivity(root, fpinfo->local_conds, baserel->relid, JOIN_INNER, NULL); cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); /* * If the table or the server is configured to use remote estimates, * connect to the foreign server and execute EXPLAIN to estimate the * number of rows selected by the restriction clauses, as well as the * average row width. Otherwise, estimate using whatever statistics we * have locally, in a way similar to ordinary tables. */ if (fpinfo->use_remote_estimate) { /* * Get cost/size estimates with help of remote server. Save the * values in fpinfo so we don't need to do it again to generate the * basic foreign path. */ estimate_path_cost_size(root, baserel, NIL, &fpinfo->rows, &fpinfo->width, &fpinfo->startup_cost, &fpinfo->total_cost); /* Report estimated baserel size to planner. */ baserel->rows = fpinfo->rows; baserel->width = fpinfo->width; } else { /* * If the foreign table has never been ANALYZEd, it will have relpages * and reltuples equal to zero, which most likely has nothing to do * with reality. We can't do a whole lot about that if we're not * allowed to consult the remote server, but we can use a hack similar * to plancat.c's treatment of empty relations: use a minimum size * estimate of 10 pages, and divide by the column-datatype-based width * estimate to get the corresponding number of tuples. */ if (baserel->pages == 0 && baserel->tuples == 0) { baserel->pages = 10; baserel->tuples = (10 * BLCKSZ) / (baserel->width + sizeof(HeapTupleHeaderData)); } /* Estimate baserel size as best we can with local statistics. */ set_baserel_size_estimates(root, baserel); /* Fill in basically-bogus cost estimates for use later. */ estimate_path_cost_size(root, baserel, NIL, &fpinfo->rows, &fpinfo->width, &fpinfo->startup_cost, &fpinfo->total_cost); } }
static TupleTableSlot * postgresIterateForeignScan | ( | ForeignScanState * | node | ) | [static] |
Definition at line 985 of file postgres_fdw.c.
References create_cursor(), PgFdwScanState::cursor_exists, PgFdwScanState::eof_reached, ExecClearTuple(), ExecStoreTuple(), ForeignScanState::fdw_state, fetch_more_data(), InvalidBuffer, PgFdwScanState::next_tuple, PgFdwScanState::num_tuples, ForeignScanState::ss, ScanState::ss_ScanTupleSlot, and PgFdwScanState::tuples.
{ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; /* * If this is the first call after Begin or ReScan, we need to create the * cursor on the remote side. */ if (!fsstate->cursor_exists) create_cursor(node); /* * Get some more tuples, if we've run out. */ if (fsstate->next_tuple >= fsstate->num_tuples) { /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) fetch_more_data(node); /* If we didn't get any tuples, must be end of data. */ if (fsstate->next_tuple >= fsstate->num_tuples) return ExecClearTuple(slot); } /* * Return the next tuple. */ ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++], slot, InvalidBuffer, false); return slot; }
static List * postgresPlanForeignModify | ( | PlannerInfo * | root, | |
ModifyTable * | plan, | |||
Index | resultRelation, | |||
int | subplan_index | |||
) | [static] |
Definition at line 1150 of file postgres_fdw.c.
References tupleDesc::attrs, bms_copy(), bms_first_member(), CMD_DELETE, CMD_INSERT, CMD_UPDATE, StringInfoData::data, deparseDeleteSql(), deparseInsertSql(), deparseUpdateSql(), elog, ERROR, heap_close, heap_open(), initStringInfo(), InvalidAttrNumber, lappend_int(), list_make4, list_nth(), makeInteger(), makeString(), RangeTblEntry::modifiedCols, tupleDesc::natts, NIL, NoLock, ModifyTable::operation, planner_rt_fetch, RelationGetDescr, RangeTblEntry::relid, and ModifyTable::returningLists.
{ CmdType operation = plan->operation; RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); Relation rel; StringInfoData sql; List *targetAttrs = NIL; List *returningList = NIL; List *retrieved_attrs = NIL; initStringInfo(&sql); /* * Core code already has some lock on each rel being planned, so we can * use NoLock here. */ rel = heap_open(rte->relid, NoLock); /* * In an INSERT, we transmit all columns that are defined in the foreign * table. In an UPDATE, we transmit only columns that were explicitly * targets of the UPDATE, so as to avoid unnecessary data transmission. * (We can't do that for INSERT since we would miss sending default values * for columns not listed in the source statement.) */ if (operation == CMD_INSERT) { TupleDesc tupdesc = RelationGetDescr(rel); int attnum; for (attnum = 1; attnum <= tupdesc->natts; attnum++) { Form_pg_attribute attr = tupdesc->attrs[attnum - 1]; if (!attr->attisdropped) targetAttrs = lappend_int(targetAttrs, attnum); } } else if (operation == CMD_UPDATE) { Bitmapset *tmpset = bms_copy(rte->modifiedCols); AttrNumber col; while ((col = bms_first_member(tmpset)) >= 0) { col += FirstLowInvalidHeapAttributeNumber; if (col <= InvalidAttrNumber) /* shouldn't happen */ elog(ERROR, "system-column update is not supported"); targetAttrs = lappend_int(targetAttrs, col); } } /* * Extract the relevant RETURNING list if any. */ if (plan->returningLists) returningList = (List *) list_nth(plan->returningLists, subplan_index); /* * Construct the SQL command string. */ switch (operation) { case CMD_INSERT: deparseInsertSql(&sql, root, resultRelation, rel, targetAttrs, returningList, &retrieved_attrs); break; case CMD_UPDATE: deparseUpdateSql(&sql, root, resultRelation, rel, targetAttrs, returningList, &retrieved_attrs); break; case CMD_DELETE: deparseDeleteSql(&sql, root, resultRelation, rel, returningList, &retrieved_attrs); break; default: elog(ERROR, "unexpected operation: %d", (int) operation); break; } heap_close(rel, NoLock); /* * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwModifyPrivateIndex, above. */ return list_make4(makeString(sql.data), targetAttrs, makeInteger((returningList != NIL)), retrieved_attrs); }
static void postgresReScanForeignScan | ( | ForeignScanState * | node | ) | [static] |
Definition at line 1026 of file postgres_fdw.c.
References PlanState::chgParam, PgFdwScanState::conn, PgFdwScanState::cursor_exists, PgFdwScanState::cursor_number, PgFdwScanState::eof_reached, ERROR, ForeignScanState::fdw_state, PgFdwScanState::fetch_ct_2, PgFdwScanState::next_tuple, NULL, PgFdwScanState::num_tuples, pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQexec(), PQresultStatus(), ScanState::ps, snprintf(), ForeignScanState::ss, and PgFdwScanState::tuples.
{ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; char sql[64]; PGresult *res; /* If we haven't created the cursor yet, nothing to do. */ if (!fsstate->cursor_exists) return; /* * If any internal parameters affecting this node have changed, we'd * better destroy and recreate the cursor. Otherwise, rewinding it should * be good enough. If we've only fetched zero or one batch, we needn't * even rewind the cursor, just rescan what we have. */ if (node->ss.ps.chgParam != NULL) { fsstate->cursor_exists = false; snprintf(sql, sizeof(sql), "CLOSE c%u", fsstate->cursor_number); } else if (fsstate->fetch_ct_2 > 1) { snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u", fsstate->cursor_number); } else { /* Easy: just rescan what we already have in memory, if anything */ fsstate->next_tuple = 0; return; } /* * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = PQexec(fsstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, true, sql); PQclear(res); /* Now force a fresh FETCH. */ fsstate->tuples = NULL; fsstate->num_tuples = 0; fsstate->next_tuple = 0; fsstate->fetch_ct_2 = 0; fsstate->eof_reached = false; }
static void prepare_foreign_modify | ( | PgFdwModifyState * | fmstate | ) | [static] |
Definition at line 2081 of file postgres_fdw.c.
References PgFdwModifyState::conn, ERROR, GetPrepStmtNumber(), NULL, PgFdwModifyState::p_name, pgfdw_report_error(), PGRES_COMMAND_OK, PQclear(), PQprepare(), PQresultStatus(), pstrdup(), PgFdwModifyState::query, and snprintf().
Referenced by postgresExecForeignDelete(), postgresExecForeignInsert(), and postgresExecForeignUpdate().
{ char prep_name[NAMEDATALEN]; char *p_name; PGresult *res; /* Construct name we'll use for the prepared statement. */ snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", GetPrepStmtNumber(fmstate->conn)); p_name = pstrdup(prep_name); /* * We intentionally do not specify parameter types here, but leave the * remote server to derive them by default. This avoids possible problems * with the remote server using different type OIDs than we do. All of * the prepared statements we use in this module are simple enough that * the remote server will make the right choices. * * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ res = PQprepare(fmstate->conn, p_name, fmstate->query, 0, NULL); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, true, fmstate->query); PQclear(res); /* This action shows that the prepare has been done. */ fmstate->p_name = p_name; }
void reset_transmission_modes | ( | int | nestlevel | ) |
Definition at line 2050 of file postgres_fdw.c.
References AtEOXact_GUC().
Referenced by appendWhereClause(), convert_prep_stmt_params(), and create_cursor().
{ AtEOXact_GUC(true, nestlevel); }
int set_transmission_modes | ( | void | ) |
Definition at line 2022 of file postgres_fdw.c.
References DateStyle, extra_float_digits, GUC_ACTION_SAVE, IntervalStyle, INTSTYLE_POSTGRES, NewGUCNestLevel(), PGC_S_SESSION, PGC_USERSET, set_config_option(), and USE_ISO_DATES.
Referenced by appendWhereClause(), convert_prep_stmt_params(), and create_cursor().
{ int nestlevel = NewGUCNestLevel(); /* * The values set here should match what pg_dump does. See also * configure_remote_session in connection.c. */ if (DateStyle != USE_ISO_DATES) (void) set_config_option("datestyle", "ISO", PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0); if (IntervalStyle != INTSTYLE_POSTGRES) (void) set_config_option("intervalstyle", "postgres", PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0); if (extra_float_digits < 3) (void) set_config_option("extra_float_digits", "3", PGC_USERSET, PGC_S_SESSION, GUC_ACTION_SAVE, true, 0); return nestlevel; }
static void store_returning_result | ( | PgFdwModifyState * | fmstate, | |
TupleTableSlot * | slot, | |||
PGresult * | res | |||
) | [static] |
Definition at line 2188 of file postgres_fdw.c.
References PgFdwModifyState::attinmeta, ExecStoreTuple(), InvalidBuffer, make_tuple_from_result_row(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PQclear(), PgFdwModifyState::rel, PgFdwModifyState::retrieved_attrs, and PgFdwModifyState::temp_cxt.
Referenced by postgresExecForeignDelete(), postgresExecForeignInsert(), and postgresExecForeignUpdate().
{ /* PGresult must be released before leaving this function. */ PG_TRY(); { HeapTuple newtup; newtup = make_tuple_from_result_row(res, 0, fmstate->rel, fmstate->attinmeta, fmstate->retrieved_attrs, fmstate->temp_cxt); /* tuple will be deleted when it is cleared from the slot */ ExecStoreTuple(newtup, slot, InvalidBuffer, true); } PG_CATCH(); { if (res) PQclear(res); PG_RE_THROW(); } PG_END_TRY(); }
Definition at line 41 of file postgres_fdw.c.