Header And Logo

PostgreSQL
| The world's most advanced open source database.

file_fdw.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * file_fdw.c
00004  *        foreign-data wrapper for server-side flat files.
00005  *
00006  * Copyright (c) 2010-2013, PostgreSQL Global Development Group
00007  *
00008  * IDENTIFICATION
00009  *        contrib/file_fdw/file_fdw.c
00010  *
00011  *-------------------------------------------------------------------------
00012  */
00013 #include "postgres.h"
00014 
00015 #include <sys/stat.h>
00016 #include <unistd.h>
00017 
00018 #include "access/htup_details.h"
00019 #include "access/reloptions.h"
00020 #include "access/sysattr.h"
00021 #include "catalog/pg_foreign_table.h"
00022 #include "commands/copy.h"
00023 #include "commands/defrem.h"
00024 #include "commands/explain.h"
00025 #include "commands/vacuum.h"
00026 #include "foreign/fdwapi.h"
00027 #include "foreign/foreign.h"
00028 #include "miscadmin.h"
00029 #include "nodes/makefuncs.h"
00030 #include "optimizer/cost.h"
00031 #include "optimizer/pathnode.h"
00032 #include "optimizer/planmain.h"
00033 #include "optimizer/restrictinfo.h"
00034 #include "optimizer/var.h"
00035 #include "utils/memutils.h"
00036 #include "utils/rel.h"
00037 
00038 PG_MODULE_MAGIC;
00039 
00040 /*
00041  * Describes the valid options for objects that use this wrapper.
00042  */
00043 struct FileFdwOption
00044 {
00045     const char *optname;
00046     Oid         optcontext;     /* Oid of catalog in which option may appear */
00047 };
00048 
00049 /*
00050  * Valid options for file_fdw.
00051  * These options are based on the options for COPY FROM command.
00052  * But note that force_not_null is handled as a boolean option attached to
00053  * each column, not as a table option.
00054  *
00055  * Note: If you are adding new option for user mapping, you need to modify
00056  * fileGetOptions(), which currently doesn't bother to look at user mappings.
00057  */
00058 static const struct FileFdwOption valid_options[] = {
00059     /* File options */
00060     {"filename", ForeignTableRelationId},
00061 
00062     /* Format options */
00063     /* oids option is not supported */
00064     {"format", ForeignTableRelationId},
00065     {"header", ForeignTableRelationId},
00066     {"delimiter", ForeignTableRelationId},
00067     {"quote", ForeignTableRelationId},
00068     {"escape", ForeignTableRelationId},
00069     {"null", ForeignTableRelationId},
00070     {"encoding", ForeignTableRelationId},
00071     {"force_not_null", AttributeRelationId},
00072 
00073     /*
00074      * force_quote is not supported by file_fdw because it's for COPY TO.
00075      */
00076 
00077     /* Sentinel */
00078     {NULL, InvalidOid}
00079 };
00080 
00081 /*
00082  * FDW-specific information for RelOptInfo.fdw_private.
00083  */
00084 typedef struct FileFdwPlanState
00085 {
00086     char       *filename;       /* file to read */
00087     List       *options;        /* merged COPY options, excluding filename */
00088     BlockNumber pages;          /* estimate of file's physical size */
00089     double      ntuples;        /* estimate of number of rows in file */
00090 } FileFdwPlanState;
00091 
00092 /*
00093  * FDW-specific information for ForeignScanState.fdw_state.
00094  */
00095 typedef struct FileFdwExecutionState
00096 {
00097     char       *filename;       /* file to read */
00098     List       *options;        /* merged COPY options, excluding filename */
00099     CopyState   cstate;         /* state of reading file */
00100 } FileFdwExecutionState;
00101 
00102 /*
00103  * SQL functions
00104  */
00105 extern Datum file_fdw_handler(PG_FUNCTION_ARGS);
00106 extern Datum file_fdw_validator(PG_FUNCTION_ARGS);
00107 
00108 PG_FUNCTION_INFO_V1(file_fdw_handler);
00109 PG_FUNCTION_INFO_V1(file_fdw_validator);
00110 
00111 /*
00112  * FDW callback routines
00113  */
00114 static void fileGetForeignRelSize(PlannerInfo *root,
00115                       RelOptInfo *baserel,
00116                       Oid foreigntableid);
00117 static void fileGetForeignPaths(PlannerInfo *root,
00118                     RelOptInfo *baserel,
00119                     Oid foreigntableid);
00120 static ForeignScan *fileGetForeignPlan(PlannerInfo *root,
00121                    RelOptInfo *baserel,
00122                    Oid foreigntableid,
00123                    ForeignPath *best_path,
00124                    List *tlist,
00125                    List *scan_clauses);
00126 static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es);
00127 static void fileBeginForeignScan(ForeignScanState *node, int eflags);
00128 static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node);
00129 static void fileReScanForeignScan(ForeignScanState *node);
00130 static void fileEndForeignScan(ForeignScanState *node);
00131 static bool fileAnalyzeForeignTable(Relation relation,
00132                         AcquireSampleRowsFunc *func,
00133                         BlockNumber *totalpages);
00134 
00135 /*
00136  * Helper functions
00137  */
00138 static bool is_valid_option(const char *option, Oid context);
00139 static void fileGetOptions(Oid foreigntableid,
00140                char **filename, List **other_options);
00141 static List *get_file_fdw_attribute_options(Oid relid);
00142 static bool check_selective_binary_conversion(RelOptInfo *baserel,
00143                                               Oid foreigntableid,
00144                                               List **columns);
00145 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel,
00146               FileFdwPlanState *fdw_private);
00147 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
00148                FileFdwPlanState *fdw_private,
00149                Cost *startup_cost, Cost *total_cost);
00150 static int file_acquire_sample_rows(Relation onerel, int elevel,
00151                          HeapTuple *rows, int targrows,
00152                          double *totalrows, double *totaldeadrows);
00153 
00154 
00155 /*
00156  * Foreign-data wrapper handler function: return a struct with pointers
00157  * to my callback routines.
00158  */
00159 Datum
00160 file_fdw_handler(PG_FUNCTION_ARGS)
00161 {
00162     FdwRoutine *fdwroutine = makeNode(FdwRoutine);
00163 
00164     fdwroutine->GetForeignRelSize = fileGetForeignRelSize;
00165     fdwroutine->GetForeignPaths = fileGetForeignPaths;
00166     fdwroutine->GetForeignPlan = fileGetForeignPlan;
00167     fdwroutine->ExplainForeignScan = fileExplainForeignScan;
00168     fdwroutine->BeginForeignScan = fileBeginForeignScan;
00169     fdwroutine->IterateForeignScan = fileIterateForeignScan;
00170     fdwroutine->ReScanForeignScan = fileReScanForeignScan;
00171     fdwroutine->EndForeignScan = fileEndForeignScan;
00172     fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
00173 
00174     PG_RETURN_POINTER(fdwroutine);
00175 }
00176 
00177 /*
00178  * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER,
00179  * USER MAPPING or FOREIGN TABLE that uses file_fdw.
00180  *
00181  * Raise an ERROR if the option or its value is considered invalid.
00182  */
00183 Datum
00184 file_fdw_validator(PG_FUNCTION_ARGS)
00185 {
00186     List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
00187     Oid         catalog = PG_GETARG_OID(1);
00188     char       *filename = NULL;
00189     DefElem    *force_not_null = NULL;
00190     List       *other_options = NIL;
00191     ListCell   *cell;
00192 
00193     /*
00194      * Only superusers are allowed to set options of a file_fdw foreign table.
00195      * This is because the filename is one of those options, and we don't want
00196      * non-superusers to be able to determine which file gets read.
00197      *
00198      * Putting this sort of permissions check in a validator is a bit of a
00199      * crock, but there doesn't seem to be any other place that can enforce
00200      * the check more cleanly.
00201      *
00202      * Note that the valid_options[] array disallows setting filename at any
00203      * options level other than foreign table --- otherwise there'd still be a
00204      * security hole.
00205      */
00206     if (catalog == ForeignTableRelationId && !superuser())
00207         ereport(ERROR,
00208                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
00209                  errmsg("only superuser can change options of a file_fdw foreign table")));
00210 
00211     /*
00212      * Check that only options supported by file_fdw, and allowed for the
00213      * current object type, are given.
00214      */
00215     foreach(cell, options_list)
00216     {
00217         DefElem    *def = (DefElem *) lfirst(cell);
00218 
00219         if (!is_valid_option(def->defname, catalog))
00220         {
00221             const struct FileFdwOption *opt;
00222             StringInfoData buf;
00223 
00224             /*
00225              * Unknown option specified, complain about it. Provide a hint
00226              * with list of valid options for the object.
00227              */
00228             initStringInfo(&buf);
00229             for (opt = valid_options; opt->optname; opt++)
00230             {
00231                 if (catalog == opt->optcontext)
00232                     appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
00233                                      opt->optname);
00234             }
00235 
00236             ereport(ERROR,
00237                     (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
00238                      errmsg("invalid option \"%s\"", def->defname),
00239                      buf.len > 0
00240                      ? errhint("Valid options in this context are: %s",
00241                                buf.data)
00242                   : errhint("There are no valid options in this context.")));
00243         }
00244 
00245         /*
00246          * Separate out filename and force_not_null, since ProcessCopyOptions
00247          * won't accept them.  (force_not_null only comes in a boolean
00248          * per-column flavor here.)
00249          */
00250         if (strcmp(def->defname, "filename") == 0)
00251         {
00252             if (filename)
00253                 ereport(ERROR,
00254                         (errcode(ERRCODE_SYNTAX_ERROR),
00255                          errmsg("conflicting or redundant options")));
00256             filename = defGetString(def);
00257         }
00258         else if (strcmp(def->defname, "force_not_null") == 0)
00259         {
00260             if (force_not_null)
00261                 ereport(ERROR,
00262                         (errcode(ERRCODE_SYNTAX_ERROR),
00263                          errmsg("conflicting or redundant options")));
00264             force_not_null = def;
00265             /* Don't care what the value is, as long as it's a legal boolean */
00266             (void) defGetBoolean(def);
00267         }
00268         else
00269             other_options = lappend(other_options, def);
00270     }
00271 
00272     /*
00273      * Now apply the core COPY code's validation logic for more checks.
00274      */
00275     ProcessCopyOptions(NULL, true, other_options);
00276 
00277     /*
00278      * Filename option is required for file_fdw foreign tables.
00279      */
00280     if (catalog == ForeignTableRelationId && filename == NULL)
00281         ereport(ERROR,
00282                 (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
00283                  errmsg("filename is required for file_fdw foreign tables")));
00284 
00285     PG_RETURN_VOID();
00286 }
00287 
00288 /*
00289  * Check if the provided option is one of the valid options.
00290  * context is the Oid of the catalog holding the object the option is for.
00291  */
00292 static bool
00293 is_valid_option(const char *option, Oid context)
00294 {
00295     const struct FileFdwOption *opt;
00296 
00297     for (opt = valid_options; opt->optname; opt++)
00298     {
00299         if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
00300             return true;
00301     }
00302     return false;
00303 }
00304 
00305 /*
00306  * Fetch the options for a file_fdw foreign table.
00307  *
00308  * We have to separate out "filename" from the other options because
00309  * it must not appear in the options list passed to the core COPY code.
00310  */
00311 static void
00312 fileGetOptions(Oid foreigntableid,
00313                char **filename, List **other_options)
00314 {
00315     ForeignTable *table;
00316     ForeignServer *server;
00317     ForeignDataWrapper *wrapper;
00318     List       *options;
00319     ListCell   *lc,
00320                *prev;
00321 
00322     /*
00323      * Extract options from FDW objects.  We ignore user mappings because
00324      * file_fdw doesn't have any options that can be specified there.
00325      *
00326      * (XXX Actually, given the current contents of valid_options[], there's
00327      * no point in examining anything except the foreign table's own options.
00328      * Simplify?)
00329      */
00330     table = GetForeignTable(foreigntableid);
00331     server = GetForeignServer(table->serverid);
00332     wrapper = GetForeignDataWrapper(server->fdwid);
00333 
00334     options = NIL;
00335     options = list_concat(options, wrapper->options);
00336     options = list_concat(options, server->options);
00337     options = list_concat(options, table->options);
00338     options = list_concat(options, get_file_fdw_attribute_options(foreigntableid));
00339 
00340     /*
00341      * Separate out the filename.
00342      */
00343     *filename = NULL;
00344     prev = NULL;
00345     foreach(lc, options)
00346     {
00347         DefElem    *def = (DefElem *) lfirst(lc);
00348 
00349         if (strcmp(def->defname, "filename") == 0)
00350         {
00351             *filename = defGetString(def);
00352             options = list_delete_cell(options, lc, prev);
00353             break;
00354         }
00355         prev = lc;
00356     }
00357 
00358     /*
00359      * The validator should have checked that a filename was included in the
00360      * options, but check again, just in case.
00361      */
00362     if (*filename == NULL)
00363         elog(ERROR, "filename is required for file_fdw foreign tables");
00364 
00365     *other_options = options;
00366 }
00367 
00368 /*
00369  * Retrieve per-column generic options from pg_attribute and construct a list
00370  * of DefElems representing them.
00371  *
00372  * At the moment we only have "force_not_null", which should be combined into
00373  * a single DefElem listing all such columns, since that's what COPY expects.
00374  */
00375 static List *
00376 get_file_fdw_attribute_options(Oid relid)
00377 {
00378     Relation    rel;
00379     TupleDesc   tupleDesc;
00380     AttrNumber  natts;
00381     AttrNumber  attnum;
00382     List       *fnncolumns = NIL;
00383 
00384     rel = heap_open(relid, AccessShareLock);
00385     tupleDesc = RelationGetDescr(rel);
00386     natts = tupleDesc->natts;
00387 
00388     /* Retrieve FDW options for all user-defined attributes. */
00389     for (attnum = 1; attnum <= natts; attnum++)
00390     {
00391         Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
00392         List       *options;
00393         ListCell   *lc;
00394 
00395         /* Skip dropped attributes. */
00396         if (attr->attisdropped)
00397             continue;
00398 
00399         options = GetForeignColumnOptions(relid, attnum);
00400         foreach(lc, options)
00401         {
00402             DefElem    *def = (DefElem *) lfirst(lc);
00403 
00404             if (strcmp(def->defname, "force_not_null") == 0)
00405             {
00406                 if (defGetBoolean(def))
00407                 {
00408                     char       *attname = pstrdup(NameStr(attr->attname));
00409 
00410                     fnncolumns = lappend(fnncolumns, makeString(attname));
00411                 }
00412             }
00413             /* maybe in future handle other options here */
00414         }
00415     }
00416 
00417     heap_close(rel, AccessShareLock);
00418 
00419     /* Return DefElem only when some column(s) have force_not_null */
00420     if (fnncolumns != NIL)
00421         return list_make1(makeDefElem("force_not_null", (Node *) fnncolumns));
00422     else
00423         return NIL;
00424 }
00425 
00426 /*
00427  * fileGetForeignRelSize
00428  *      Obtain relation size estimates for a foreign table
00429  */
00430 static void
00431 fileGetForeignRelSize(PlannerInfo *root,
00432                       RelOptInfo *baserel,
00433                       Oid foreigntableid)
00434 {
00435     FileFdwPlanState *fdw_private;
00436 
00437     /*
00438      * Fetch options.  We only need filename at this point, but we might as
00439      * well get everything and not need to re-fetch it later in planning.
00440      */
00441     fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
00442     fileGetOptions(foreigntableid,
00443                    &fdw_private->filename, &fdw_private->options);
00444     baserel->fdw_private = (void *) fdw_private;
00445 
00446     /* Estimate relation size */
00447     estimate_size(root, baserel, fdw_private);
00448 }
00449 
00450 /*
00451  * fileGetForeignPaths
00452  *      Create possible access paths for a scan on the foreign table
00453  *
00454  *      Currently we don't support any push-down feature, so there is only one
00455  *      possible access path, which simply returns all records in the order in
00456  *      the data file.
00457  */
00458 static void
00459 fileGetForeignPaths(PlannerInfo *root,
00460                     RelOptInfo *baserel,
00461                     Oid foreigntableid)
00462 {
00463     FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private;
00464     Cost        startup_cost;
00465     Cost        total_cost;
00466     List       *columns;
00467     List       *coptions = NIL;
00468 
00469     /* Decide whether to selectively perform binary conversion */
00470     if (check_selective_binary_conversion(baserel,
00471                                           foreigntableid,
00472                                           &columns))
00473         coptions = list_make1(makeDefElem("convert_selectively",
00474                                           (Node *) columns));
00475 
00476     /* Estimate costs */
00477     estimate_costs(root, baserel, fdw_private,
00478                    &startup_cost, &total_cost);
00479 
00480     /*
00481      * Create a ForeignPath node and add it as only possible path.  We use the
00482      * fdw_private list of the path to carry the convert_selectively option;
00483      * it will be propagated into the fdw_private list of the Plan node.
00484      */
00485     add_path(baserel, (Path *)
00486              create_foreignscan_path(root, baserel,
00487                                      baserel->rows,
00488                                      startup_cost,
00489                                      total_cost,
00490                                      NIL,       /* no pathkeys */
00491                                      NULL,      /* no outer rel either */
00492                                      coptions));
00493 
00494     /*
00495      * If data file was sorted, and we knew it somehow, we could insert
00496      * appropriate pathkeys into the ForeignPath node to tell the planner
00497      * that.
00498      */
00499 }
00500 
00501 /*
00502  * fileGetForeignPlan
00503  *      Create a ForeignScan plan node for scanning the foreign table
00504  */
00505 static ForeignScan *
00506 fileGetForeignPlan(PlannerInfo *root,
00507                    RelOptInfo *baserel,
00508                    Oid foreigntableid,
00509                    ForeignPath *best_path,
00510                    List *tlist,
00511                    List *scan_clauses)
00512 {
00513     Index       scan_relid = baserel->relid;
00514 
00515     /*
00516      * We have no native ability to evaluate restriction clauses, so we just
00517      * put all the scan_clauses into the plan node's qual list for the
00518      * executor to check.  So all we have to do here is strip RestrictInfo
00519      * nodes from the clauses and ignore pseudoconstants (which will be
00520      * handled elsewhere).
00521      */
00522     scan_clauses = extract_actual_clauses(scan_clauses, false);
00523 
00524     /* Create the ForeignScan node */
00525     return make_foreignscan(tlist,
00526                             scan_clauses,
00527                             scan_relid,
00528                             NIL,    /* no expressions to evaluate */
00529                             best_path->fdw_private);
00530 }
00531 
00532 /*
00533  * fileExplainForeignScan
00534  *      Produce extra output for EXPLAIN
00535  */
00536 static void
00537 fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
00538 {
00539     char       *filename;
00540     List       *options;
00541 
00542     /* Fetch options --- we only need filename at this point */
00543     fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
00544                    &filename, &options);
00545 
00546     ExplainPropertyText("Foreign File", filename, es);
00547 
00548     /* Suppress file size if we're not showing cost details */
00549     if (es->costs)
00550     {
00551         struct stat stat_buf;
00552 
00553         if (stat(filename, &stat_buf) == 0)
00554             ExplainPropertyLong("Foreign File Size", (long) stat_buf.st_size,
00555                                 es);
00556     }
00557 }
00558 
00559 /*
00560  * fileBeginForeignScan
00561  *      Initiate access to the file by creating CopyState
00562  */
00563 static void
00564 fileBeginForeignScan(ForeignScanState *node, int eflags)
00565 {
00566     ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
00567     char       *filename;
00568     List       *options;
00569     CopyState   cstate;
00570     FileFdwExecutionState *festate;
00571 
00572     /*
00573      * Do nothing in EXPLAIN (no ANALYZE) case.  node->fdw_state stays NULL.
00574      */
00575     if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
00576         return;
00577 
00578     /* Fetch options of foreign table */
00579     fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
00580                    &filename, &options);
00581 
00582     /* Add any options from the plan (currently only convert_selectively) */
00583     options = list_concat(options, plan->fdw_private);
00584 
00585     /*
00586      * Create CopyState from FDW options.  We always acquire all columns, so
00587      * as to match the expected ScanTupleSlot signature.
00588      */
00589     cstate = BeginCopyFrom(node->ss.ss_currentRelation,
00590                            filename,
00591                            false,
00592                            NIL,
00593                            options);
00594 
00595     /*
00596      * Save state in node->fdw_state.  We must save enough information to call
00597      * BeginCopyFrom() again.
00598      */
00599     festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
00600     festate->filename = filename;
00601     festate->options = options;
00602     festate->cstate = cstate;
00603 
00604     node->fdw_state = (void *) festate;
00605 }
00606 
00607 /*
00608  * fileIterateForeignScan
00609  *      Read next record from the data file and store it into the
00610  *      ScanTupleSlot as a virtual tuple
00611  */
00612 static TupleTableSlot *
00613 fileIterateForeignScan(ForeignScanState *node)
00614 {
00615     FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
00616     TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
00617     bool        found;
00618     ErrorContextCallback errcallback;
00619 
00620     /* Set up callback to identify error line number. */
00621     errcallback.callback = CopyFromErrorCallback;
00622     errcallback.arg = (void *) festate->cstate;
00623     errcallback.previous = error_context_stack;
00624     error_context_stack = &errcallback;
00625 
00626     /*
00627      * The protocol for loading a virtual tuple into a slot is first
00628      * ExecClearTuple, then fill the values/isnull arrays, then
00629      * ExecStoreVirtualTuple.  If we don't find another row in the file, we
00630      * just skip the last step, leaving the slot empty as required.
00631      *
00632      * We can pass ExprContext = NULL because we read all columns from the
00633      * file, so no need to evaluate default expressions.
00634      *
00635      * We can also pass tupleOid = NULL because we don't allow oids for
00636      * foreign tables.
00637      */
00638     ExecClearTuple(slot);
00639     found = NextCopyFrom(festate->cstate, NULL,
00640                          slot->tts_values, slot->tts_isnull,
00641                          NULL);
00642     if (found)
00643         ExecStoreVirtualTuple(slot);
00644 
00645     /* Remove error callback. */
00646     error_context_stack = errcallback.previous;
00647 
00648     return slot;
00649 }
00650 
00651 /*
00652  * fileReScanForeignScan
00653  *      Rescan table, possibly with new parameters
00654  */
00655 static void
00656 fileReScanForeignScan(ForeignScanState *node)
00657 {
00658     FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
00659 
00660     EndCopyFrom(festate->cstate);
00661 
00662     festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation,
00663                                     festate->filename,
00664                                     false,
00665                                     NIL,
00666                                     festate->options);
00667 }
00668 
00669 /*
00670  * fileEndForeignScan
00671  *      Finish scanning foreign table and dispose objects used for this scan
00672  */
00673 static void
00674 fileEndForeignScan(ForeignScanState *node)
00675 {
00676     FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
00677 
00678     /* if festate is NULL, we are in EXPLAIN; nothing to do */
00679     if (festate)
00680         EndCopyFrom(festate->cstate);
00681 }
00682 
00683 /*
00684  * fileAnalyzeForeignTable
00685  *      Test whether analyzing this foreign table is supported
00686  */
00687 static bool
00688 fileAnalyzeForeignTable(Relation relation,
00689                         AcquireSampleRowsFunc *func,
00690                         BlockNumber *totalpages)
00691 {
00692     char       *filename;
00693     List       *options;
00694     struct stat stat_buf;
00695 
00696     /* Fetch options of foreign table */
00697     fileGetOptions(RelationGetRelid(relation), &filename, &options);
00698 
00699     /*
00700      * Get size of the file.  (XXX if we fail here, would it be better to just
00701      * return false to skip analyzing the table?)
00702      */
00703     if (stat(filename, &stat_buf) < 0)
00704         ereport(ERROR,
00705                 (errcode_for_file_access(),
00706                  errmsg("could not stat file \"%s\": %m",
00707                         filename)));
00708 
00709     /*
00710      * Convert size to pages.  Must return at least 1 so that we can tell
00711      * later on that pg_class.relpages is not default.
00712      */
00713     *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
00714     if (*totalpages < 1)
00715         *totalpages = 1;
00716 
00717     *func = file_acquire_sample_rows;
00718 
00719     return true;
00720 }
00721 
00722 /*
00723  * check_selective_binary_conversion
00724  *
00725  * Check to see if it's useful to convert only a subset of the file's columns
00726  * to binary.  If so, construct a list of the column names to be converted,
00727  * return that at *columns, and return TRUE.  (Note that it's possible to
00728  * determine that no columns need be converted, for instance with a COUNT(*)
00729  * query.  So we can't use returning a NIL list to indicate failure.)
00730  */
00731 static bool
00732 check_selective_binary_conversion(RelOptInfo *baserel,
00733                                   Oid foreigntableid,
00734                                   List **columns)
00735 {
00736     ForeignTable *table;
00737     ListCell   *lc;
00738     Relation    rel;
00739     TupleDesc   tupleDesc;
00740     AttrNumber  attnum;
00741     Bitmapset  *attrs_used = NULL;
00742     bool        has_wholerow = false;
00743     int         numattrs;
00744     int         i;
00745 
00746     *columns = NIL;             /* default result */
00747 
00748     /*
00749      * Check format of the file.  If binary format, this is irrelevant.
00750      */
00751     table = GetForeignTable(foreigntableid);
00752     foreach(lc, table->options)
00753     {
00754         DefElem    *def = (DefElem *) lfirst(lc);
00755 
00756         if (strcmp(def->defname, "format") == 0)
00757         {
00758             char       *format = defGetString(def);
00759 
00760             if (strcmp(format, "binary") == 0)
00761                 return false;
00762             break;
00763         }
00764     }
00765 
00766     /* Collect all the attributes needed for joins or final output. */
00767     pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
00768                    &attrs_used);
00769 
00770     /* Add all the attributes used by restriction clauses. */
00771     foreach(lc, baserel->baserestrictinfo)
00772     {
00773         RestrictInfo   *rinfo = (RestrictInfo *) lfirst(lc);
00774 
00775         pull_varattnos((Node *) rinfo->clause, baserel->relid,
00776                        &attrs_used);
00777     }
00778 
00779     /* Convert attribute numbers to column names. */
00780     rel = heap_open(foreigntableid, AccessShareLock);
00781     tupleDesc = RelationGetDescr(rel);
00782 
00783     while ((attnum = bms_first_member(attrs_used)) >= 0)
00784     {
00785         /* Adjust for system attributes. */
00786         attnum += FirstLowInvalidHeapAttributeNumber;
00787 
00788         if (attnum == 0)
00789         {
00790             has_wholerow = true;
00791             break;
00792         }
00793 
00794         /* Ignore system attributes. */
00795         if (attnum < 0)
00796             continue;
00797 
00798         /* Get user attributes. */
00799         if (attnum > 0)
00800         {
00801             Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
00802             char       *attname = NameStr(attr->attname);
00803 
00804             /* Skip dropped attributes (probably shouldn't see any here). */
00805             if (attr->attisdropped)
00806                 continue;
00807             *columns = lappend(*columns, makeString(pstrdup(attname)));
00808         }
00809     }
00810 
00811     /* Count non-dropped user attributes while we have the tupdesc. */
00812     numattrs = 0;
00813     for (i = 0; i < tupleDesc->natts; i++)
00814     {
00815         Form_pg_attribute attr = tupleDesc->attrs[i];
00816 
00817         if (attr->attisdropped)
00818             continue;
00819         numattrs++;
00820     }
00821 
00822     heap_close(rel, AccessShareLock);
00823 
00824     /* If there's a whole-row reference, fail: we need all the columns. */
00825     if (has_wholerow)
00826     {
00827         *columns = NIL;
00828         return false;
00829     }
00830 
00831     /* If all the user attributes are needed, fail. */
00832     if (numattrs == list_length(*columns))
00833     {
00834         *columns = NIL;
00835         return false;
00836     }
00837 
00838     return true;
00839 }
00840 
00841 /*
00842  * Estimate size of a foreign table.
00843  *
00844  * The main result is returned in baserel->rows.  We also set
00845  * fdw_private->pages and fdw_private->ntuples for later use in the cost
00846  * calculation.
00847  */
00848 static void
00849 estimate_size(PlannerInfo *root, RelOptInfo *baserel,
00850               FileFdwPlanState *fdw_private)
00851 {
00852     struct stat stat_buf;
00853     BlockNumber pages;
00854     double      ntuples;
00855     double      nrows;
00856 
00857     /*
00858      * Get size of the file.  It might not be there at plan time, though, in
00859      * which case we have to use a default estimate.
00860      */
00861     if (stat(fdw_private->filename, &stat_buf) < 0)
00862         stat_buf.st_size = 10 * BLCKSZ;
00863 
00864     /*
00865      * Convert size to pages for use in I/O cost estimate later.
00866      */
00867     pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
00868     if (pages < 1)
00869         pages = 1;
00870     fdw_private->pages = pages;
00871 
00872     /*
00873      * Estimate the number of tuples in the file.
00874      */
00875     if (baserel->pages > 0)
00876     {
00877         /*
00878          * We have # of pages and # of tuples from pg_class (that is, from a
00879          * previous ANALYZE), so compute a tuples-per-page estimate and scale
00880          * that by the current file size.
00881          */
00882         double      density;
00883 
00884         density = baserel->tuples / (double) baserel->pages;
00885         ntuples = clamp_row_est(density * (double) pages);
00886     }
00887     else
00888     {
00889         /*
00890          * Otherwise we have to fake it.  We back into this estimate using the
00891          * planner's idea of the relation width; which is bogus if not all
00892          * columns are being read, not to mention that the text representation
00893          * of a row probably isn't the same size as its internal
00894          * representation.  Possibly we could do something better, but the
00895          * real answer to anyone who complains is "ANALYZE" ...
00896          */
00897         int         tuple_width;
00898 
00899         tuple_width = MAXALIGN(baserel->width) +
00900             MAXALIGN(sizeof(HeapTupleHeaderData));
00901         ntuples = clamp_row_est((double) stat_buf.st_size /
00902                                 (double) tuple_width);
00903     }
00904     fdw_private->ntuples = ntuples;
00905 
00906     /*
00907      * Now estimate the number of rows returned by the scan after applying the
00908      * baserestrictinfo quals.
00909      */
00910     nrows = ntuples *
00911         clauselist_selectivity(root,
00912                                baserel->baserestrictinfo,
00913                                0,
00914                                JOIN_INNER,
00915                                NULL);
00916 
00917     nrows = clamp_row_est(nrows);
00918 
00919     /* Save the output-rows estimate for the planner */
00920     baserel->rows = nrows;
00921 }
00922 
00923 /*
00924  * Estimate costs of scanning a foreign table.
00925  *
00926  * Results are returned in *startup_cost and *total_cost.
00927  */
00928 static void
00929 estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
00930                FileFdwPlanState *fdw_private,
00931                Cost *startup_cost, Cost *total_cost)
00932 {
00933     BlockNumber pages = fdw_private->pages;
00934     double      ntuples = fdw_private->ntuples;
00935     Cost        run_cost = 0;
00936     Cost        cpu_per_tuple;
00937 
00938     /*
00939      * We estimate costs almost the same way as cost_seqscan(), thus assuming
00940      * that I/O costs are equivalent to a regular table file of the same size.
00941      * However, we take per-tuple CPU costs as 10x of a seqscan, to account
00942      * for the cost of parsing records.
00943      */
00944     run_cost += seq_page_cost * pages;
00945 
00946     *startup_cost = baserel->baserestrictcost.startup;
00947     cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
00948     run_cost += cpu_per_tuple * ntuples;
00949     *total_cost = *startup_cost + run_cost;
00950 }
00951 
00952 /*
00953  * file_acquire_sample_rows -- acquire a random sample of rows from the table
00954  *
00955  * Selected rows are returned in the caller-allocated array rows[],
00956  * which must have at least targrows entries.
00957  * The actual number of rows selected is returned as the function result.
00958  * We also count the total number of rows in the file and return it into
00959  * *totalrows.  Note that *totaldeadrows is always set to 0.
00960  *
00961  * Note that the returned list of rows is not always in order by physical
00962  * position in the file.  Therefore, correlation estimates derived later
00963  * may be meaningless, but it's OK because we don't use the estimates
00964  * currently (the planner only pays attention to correlation for indexscans).
00965  */
00966 static int
00967 file_acquire_sample_rows(Relation onerel, int elevel,
00968                          HeapTuple *rows, int targrows,
00969                          double *totalrows, double *totaldeadrows)
00970 {
00971     int         numrows = 0;
00972     double      rowstoskip = -1;    /* -1 means not set yet */
00973     double      rstate;
00974     TupleDesc   tupDesc;
00975     Datum      *values;
00976     bool       *nulls;
00977     bool        found;
00978     char       *filename;
00979     List       *options;
00980     CopyState   cstate;
00981     ErrorContextCallback errcallback;
00982     MemoryContext oldcontext = CurrentMemoryContext;
00983     MemoryContext tupcontext;
00984 
00985     Assert(onerel);
00986     Assert(targrows > 0);
00987 
00988     tupDesc = RelationGetDescr(onerel);
00989     values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
00990     nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
00991 
00992     /* Fetch options of foreign table */
00993     fileGetOptions(RelationGetRelid(onerel), &filename, &options);
00994 
00995     /*
00996      * Create CopyState from FDW options.
00997      */
00998     cstate = BeginCopyFrom(onerel, filename, false, NIL, options);
00999 
01000     /*
01001      * Use per-tuple memory context to prevent leak of memory used to read
01002      * rows from the file with Copy routines.
01003      */
01004     tupcontext = AllocSetContextCreate(CurrentMemoryContext,
01005                                        "file_fdw temporary context",
01006                                        ALLOCSET_DEFAULT_MINSIZE,
01007                                        ALLOCSET_DEFAULT_INITSIZE,
01008                                        ALLOCSET_DEFAULT_MAXSIZE);
01009 
01010     /* Prepare for sampling rows */
01011     rstate = anl_init_selection_state(targrows);
01012 
01013     /* Set up callback to identify error line number. */
01014     errcallback.callback = CopyFromErrorCallback;
01015     errcallback.arg = (void *) cstate;
01016     errcallback.previous = error_context_stack;
01017     error_context_stack = &errcallback;
01018 
01019     *totalrows = 0;
01020     *totaldeadrows = 0;
01021     for (;;)
01022     {
01023         /* Check for user-requested abort or sleep */
01024         vacuum_delay_point();
01025 
01026         /* Fetch next row */
01027         MemoryContextReset(tupcontext);
01028         MemoryContextSwitchTo(tupcontext);
01029 
01030         found = NextCopyFrom(cstate, NULL, values, nulls, NULL);
01031 
01032         MemoryContextSwitchTo(oldcontext);
01033 
01034         if (!found)
01035             break;
01036 
01037         /*
01038          * The first targrows sample rows are simply copied into the
01039          * reservoir.  Then we start replacing tuples in the sample until we
01040          * reach the end of the relation. This algorithm is from Jeff Vitter's
01041          * paper (see more info in commands/analyze.c).
01042          */
01043         if (numrows < targrows)
01044         {
01045             rows[numrows++] = heap_form_tuple(tupDesc, values, nulls);
01046         }
01047         else
01048         {
01049             /*
01050              * t in Vitter's paper is the number of records already processed.
01051              * If we need to compute a new S value, we must use the
01052              * not-yet-incremented value of totalrows as t.
01053              */
01054             if (rowstoskip < 0)
01055                 rowstoskip = anl_get_next_S(*totalrows, targrows, &rstate);
01056 
01057             if (rowstoskip <= 0)
01058             {
01059                 /*
01060                  * Found a suitable tuple, so save it, replacing one old tuple
01061                  * at random
01062                  */
01063                 int         k = (int) (targrows * anl_random_fract());
01064 
01065                 Assert(k >= 0 && k < targrows);
01066                 heap_freetuple(rows[k]);
01067                 rows[k] = heap_form_tuple(tupDesc, values, nulls);
01068             }
01069 
01070             rowstoskip -= 1;
01071         }
01072 
01073         *totalrows += 1;
01074     }
01075 
01076     /* Remove error callback. */
01077     error_context_stack = errcallback.previous;
01078 
01079     /* Clean up. */
01080     MemoryContextDelete(tupcontext);
01081 
01082     EndCopyFrom(cstate);
01083 
01084     pfree(values);
01085     pfree(nulls);
01086 
01087     /*
01088      * Emit some interesting relation info
01089      */
01090     ereport(elevel,
01091             (errmsg("\"%s\": file contains %.0f rows; "
01092                     "%d rows in sample",
01093                     RelationGetRelationName(onerel),
01094                     *totalrows, numrows)));
01095 
01096     return numrows;
01097 }