00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "postgres.h"
00014
00015 #include <sys/stat.h>
00016 #include <unistd.h>
00017
00018 #include "access/htup_details.h"
00019 #include "access/reloptions.h"
00020 #include "access/sysattr.h"
00021 #include "catalog/pg_foreign_table.h"
00022 #include "commands/copy.h"
00023 #include "commands/defrem.h"
00024 #include "commands/explain.h"
00025 #include "commands/vacuum.h"
00026 #include "foreign/fdwapi.h"
00027 #include "foreign/foreign.h"
00028 #include "miscadmin.h"
00029 #include "nodes/makefuncs.h"
00030 #include "optimizer/cost.h"
00031 #include "optimizer/pathnode.h"
00032 #include "optimizer/planmain.h"
00033 #include "optimizer/restrictinfo.h"
00034 #include "optimizer/var.h"
00035 #include "utils/memutils.h"
00036 #include "utils/rel.h"
00037
00038 PG_MODULE_MAGIC;
00039
00040
00041
00042
00043 struct FileFdwOption
00044 {
00045 const char *optname;
00046 Oid optcontext;
00047 };
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058 static const struct FileFdwOption valid_options[] = {
00059
00060 {"filename", ForeignTableRelationId},
00061
00062
00063
00064 {"format", ForeignTableRelationId},
00065 {"header", ForeignTableRelationId},
00066 {"delimiter", ForeignTableRelationId},
00067 {"quote", ForeignTableRelationId},
00068 {"escape", ForeignTableRelationId},
00069 {"null", ForeignTableRelationId},
00070 {"encoding", ForeignTableRelationId},
00071 {"force_not_null", AttributeRelationId},
00072
00073
00074
00075
00076
00077
00078 {NULL, InvalidOid}
00079 };
00080
00081
00082
00083
00084 typedef struct FileFdwPlanState
00085 {
00086 char *filename;
00087 List *options;
00088 BlockNumber pages;
00089 double ntuples;
00090 } FileFdwPlanState;
00091
00092
00093
00094
00095 typedef struct FileFdwExecutionState
00096 {
00097 char *filename;
00098 List *options;
00099 CopyState cstate;
00100 } FileFdwExecutionState;
00101
00102
00103
00104
00105 extern Datum file_fdw_handler(PG_FUNCTION_ARGS);
00106 extern Datum file_fdw_validator(PG_FUNCTION_ARGS);
00107
00108 PG_FUNCTION_INFO_V1(file_fdw_handler);
00109 PG_FUNCTION_INFO_V1(file_fdw_validator);
00110
00111
00112
00113
00114 static void fileGetForeignRelSize(PlannerInfo *root,
00115 RelOptInfo *baserel,
00116 Oid foreigntableid);
00117 static void fileGetForeignPaths(PlannerInfo *root,
00118 RelOptInfo *baserel,
00119 Oid foreigntableid);
00120 static ForeignScan *fileGetForeignPlan(PlannerInfo *root,
00121 RelOptInfo *baserel,
00122 Oid foreigntableid,
00123 ForeignPath *best_path,
00124 List *tlist,
00125 List *scan_clauses);
00126 static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es);
00127 static void fileBeginForeignScan(ForeignScanState *node, int eflags);
00128 static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node);
00129 static void fileReScanForeignScan(ForeignScanState *node);
00130 static void fileEndForeignScan(ForeignScanState *node);
00131 static bool fileAnalyzeForeignTable(Relation relation,
00132 AcquireSampleRowsFunc *func,
00133 BlockNumber *totalpages);
00134
00135
00136
00137
00138 static bool is_valid_option(const char *option, Oid context);
00139 static void fileGetOptions(Oid foreigntableid,
00140 char **filename, List **other_options);
00141 static List *get_file_fdw_attribute_options(Oid relid);
00142 static bool check_selective_binary_conversion(RelOptInfo *baserel,
00143 Oid foreigntableid,
00144 List **columns);
00145 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel,
00146 FileFdwPlanState *fdw_private);
00147 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
00148 FileFdwPlanState *fdw_private,
00149 Cost *startup_cost, Cost *total_cost);
00150 static int file_acquire_sample_rows(Relation onerel, int elevel,
00151 HeapTuple *rows, int targrows,
00152 double *totalrows, double *totaldeadrows);
00153
00154
00155
00156
00157
00158
00159 Datum
00160 file_fdw_handler(PG_FUNCTION_ARGS)
00161 {
00162 FdwRoutine *fdwroutine = makeNode(FdwRoutine);
00163
00164 fdwroutine->GetForeignRelSize = fileGetForeignRelSize;
00165 fdwroutine->GetForeignPaths = fileGetForeignPaths;
00166 fdwroutine->GetForeignPlan = fileGetForeignPlan;
00167 fdwroutine->ExplainForeignScan = fileExplainForeignScan;
00168 fdwroutine->BeginForeignScan = fileBeginForeignScan;
00169 fdwroutine->IterateForeignScan = fileIterateForeignScan;
00170 fdwroutine->ReScanForeignScan = fileReScanForeignScan;
00171 fdwroutine->EndForeignScan = fileEndForeignScan;
00172 fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
00173
00174 PG_RETURN_POINTER(fdwroutine);
00175 }
00176
00177
00178
00179
00180
00181
00182
00183 Datum
00184 file_fdw_validator(PG_FUNCTION_ARGS)
00185 {
00186 List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
00187 Oid catalog = PG_GETARG_OID(1);
00188 char *filename = NULL;
00189 DefElem *force_not_null = NULL;
00190 List *other_options = NIL;
00191 ListCell *cell;
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206 if (catalog == ForeignTableRelationId && !superuser())
00207 ereport(ERROR,
00208 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
00209 errmsg("only superuser can change options of a file_fdw foreign table")));
00210
00211
00212
00213
00214
00215 foreach(cell, options_list)
00216 {
00217 DefElem *def = (DefElem *) lfirst(cell);
00218
00219 if (!is_valid_option(def->defname, catalog))
00220 {
00221 const struct FileFdwOption *opt;
00222 StringInfoData buf;
00223
00224
00225
00226
00227
00228 initStringInfo(&buf);
00229 for (opt = valid_options; opt->optname; opt++)
00230 {
00231 if (catalog == opt->optcontext)
00232 appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
00233 opt->optname);
00234 }
00235
00236 ereport(ERROR,
00237 (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
00238 errmsg("invalid option \"%s\"", def->defname),
00239 buf.len > 0
00240 ? errhint("Valid options in this context are: %s",
00241 buf.data)
00242 : errhint("There are no valid options in this context.")));
00243 }
00244
00245
00246
00247
00248
00249
00250 if (strcmp(def->defname, "filename") == 0)
00251 {
00252 if (filename)
00253 ereport(ERROR,
00254 (errcode(ERRCODE_SYNTAX_ERROR),
00255 errmsg("conflicting or redundant options")));
00256 filename = defGetString(def);
00257 }
00258 else if (strcmp(def->defname, "force_not_null") == 0)
00259 {
00260 if (force_not_null)
00261 ereport(ERROR,
00262 (errcode(ERRCODE_SYNTAX_ERROR),
00263 errmsg("conflicting or redundant options")));
00264 force_not_null = def;
00265
00266 (void) defGetBoolean(def);
00267 }
00268 else
00269 other_options = lappend(other_options, def);
00270 }
00271
00272
00273
00274
00275 ProcessCopyOptions(NULL, true, other_options);
00276
00277
00278
00279
00280 if (catalog == ForeignTableRelationId && filename == NULL)
00281 ereport(ERROR,
00282 (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
00283 errmsg("filename is required for file_fdw foreign tables")));
00284
00285 PG_RETURN_VOID();
00286 }
00287
00288
00289
00290
00291
00292 static bool
00293 is_valid_option(const char *option, Oid context)
00294 {
00295 const struct FileFdwOption *opt;
00296
00297 for (opt = valid_options; opt->optname; opt++)
00298 {
00299 if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
00300 return true;
00301 }
00302 return false;
00303 }
00304
00305
00306
00307
00308
00309
00310
00311 static void
00312 fileGetOptions(Oid foreigntableid,
00313 char **filename, List **other_options)
00314 {
00315 ForeignTable *table;
00316 ForeignServer *server;
00317 ForeignDataWrapper *wrapper;
00318 List *options;
00319 ListCell *lc,
00320 *prev;
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330 table = GetForeignTable(foreigntableid);
00331 server = GetForeignServer(table->serverid);
00332 wrapper = GetForeignDataWrapper(server->fdwid);
00333
00334 options = NIL;
00335 options = list_concat(options, wrapper->options);
00336 options = list_concat(options, server->options);
00337 options = list_concat(options, table->options);
00338 options = list_concat(options, get_file_fdw_attribute_options(foreigntableid));
00339
00340
00341
00342
00343 *filename = NULL;
00344 prev = NULL;
00345 foreach(lc, options)
00346 {
00347 DefElem *def = (DefElem *) lfirst(lc);
00348
00349 if (strcmp(def->defname, "filename") == 0)
00350 {
00351 *filename = defGetString(def);
00352 options = list_delete_cell(options, lc, prev);
00353 break;
00354 }
00355 prev = lc;
00356 }
00357
00358
00359
00360
00361
00362 if (*filename == NULL)
00363 elog(ERROR, "filename is required for file_fdw foreign tables");
00364
00365 *other_options = options;
00366 }
00367
00368
00369
00370
00371
00372
00373
00374
00375 static List *
00376 get_file_fdw_attribute_options(Oid relid)
00377 {
00378 Relation rel;
00379 TupleDesc tupleDesc;
00380 AttrNumber natts;
00381 AttrNumber attnum;
00382 List *fnncolumns = NIL;
00383
00384 rel = heap_open(relid, AccessShareLock);
00385 tupleDesc = RelationGetDescr(rel);
00386 natts = tupleDesc->natts;
00387
00388
00389 for (attnum = 1; attnum <= natts; attnum++)
00390 {
00391 Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
00392 List *options;
00393 ListCell *lc;
00394
00395
00396 if (attr->attisdropped)
00397 continue;
00398
00399 options = GetForeignColumnOptions(relid, attnum);
00400 foreach(lc, options)
00401 {
00402 DefElem *def = (DefElem *) lfirst(lc);
00403
00404 if (strcmp(def->defname, "force_not_null") == 0)
00405 {
00406 if (defGetBoolean(def))
00407 {
00408 char *attname = pstrdup(NameStr(attr->attname));
00409
00410 fnncolumns = lappend(fnncolumns, makeString(attname));
00411 }
00412 }
00413
00414 }
00415 }
00416
00417 heap_close(rel, AccessShareLock);
00418
00419
00420 if (fnncolumns != NIL)
00421 return list_make1(makeDefElem("force_not_null", (Node *) fnncolumns));
00422 else
00423 return NIL;
00424 }
00425
00426
00427
00428
00429
00430 static void
00431 fileGetForeignRelSize(PlannerInfo *root,
00432 RelOptInfo *baserel,
00433 Oid foreigntableid)
00434 {
00435 FileFdwPlanState *fdw_private;
00436
00437
00438
00439
00440
00441 fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
00442 fileGetOptions(foreigntableid,
00443 &fdw_private->filename, &fdw_private->options);
00444 baserel->fdw_private = (void *) fdw_private;
00445
00446
00447 estimate_size(root, baserel, fdw_private);
00448 }
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458 static void
00459 fileGetForeignPaths(PlannerInfo *root,
00460 RelOptInfo *baserel,
00461 Oid foreigntableid)
00462 {
00463 FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private;
00464 Cost startup_cost;
00465 Cost total_cost;
00466 List *columns;
00467 List *coptions = NIL;
00468
00469
00470 if (check_selective_binary_conversion(baserel,
00471 foreigntableid,
00472 &columns))
00473 coptions = list_make1(makeDefElem("convert_selectively",
00474 (Node *) columns));
00475
00476
00477 estimate_costs(root, baserel, fdw_private,
00478 &startup_cost, &total_cost);
00479
00480
00481
00482
00483
00484
00485 add_path(baserel, (Path *)
00486 create_foreignscan_path(root, baserel,
00487 baserel->rows,
00488 startup_cost,
00489 total_cost,
00490 NIL,
00491 NULL,
00492 coptions));
00493
00494
00495
00496
00497
00498
00499 }
00500
00501
00502
00503
00504
00505 static ForeignScan *
00506 fileGetForeignPlan(PlannerInfo *root,
00507 RelOptInfo *baserel,
00508 Oid foreigntableid,
00509 ForeignPath *best_path,
00510 List *tlist,
00511 List *scan_clauses)
00512 {
00513 Index scan_relid = baserel->relid;
00514
00515
00516
00517
00518
00519
00520
00521
00522 scan_clauses = extract_actual_clauses(scan_clauses, false);
00523
00524
00525 return make_foreignscan(tlist,
00526 scan_clauses,
00527 scan_relid,
00528 NIL,
00529 best_path->fdw_private);
00530 }
00531
00532
00533
00534
00535
00536 static void
00537 fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
00538 {
00539 char *filename;
00540 List *options;
00541
00542
00543 fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
00544 &filename, &options);
00545
00546 ExplainPropertyText("Foreign File", filename, es);
00547
00548
00549 if (es->costs)
00550 {
00551 struct stat stat_buf;
00552
00553 if (stat(filename, &stat_buf) == 0)
00554 ExplainPropertyLong("Foreign File Size", (long) stat_buf.st_size,
00555 es);
00556 }
00557 }
00558
00559
00560
00561
00562
00563 static void
00564 fileBeginForeignScan(ForeignScanState *node, int eflags)
00565 {
00566 ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
00567 char *filename;
00568 List *options;
00569 CopyState cstate;
00570 FileFdwExecutionState *festate;
00571
00572
00573
00574
00575 if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
00576 return;
00577
00578
00579 fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
00580 &filename, &options);
00581
00582
00583 options = list_concat(options, plan->fdw_private);
00584
00585
00586
00587
00588
00589 cstate = BeginCopyFrom(node->ss.ss_currentRelation,
00590 filename,
00591 false,
00592 NIL,
00593 options);
00594
00595
00596
00597
00598
00599 festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
00600 festate->filename = filename;
00601 festate->options = options;
00602 festate->cstate = cstate;
00603
00604 node->fdw_state = (void *) festate;
00605 }
00606
00607
00608
00609
00610
00611
00612 static TupleTableSlot *
00613 fileIterateForeignScan(ForeignScanState *node)
00614 {
00615 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
00616 TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
00617 bool found;
00618 ErrorContextCallback errcallback;
00619
00620
00621 errcallback.callback = CopyFromErrorCallback;
00622 errcallback.arg = (void *) festate->cstate;
00623 errcallback.previous = error_context_stack;
00624 error_context_stack = &errcallback;
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638 ExecClearTuple(slot);
00639 found = NextCopyFrom(festate->cstate, NULL,
00640 slot->tts_values, slot->tts_isnull,
00641 NULL);
00642 if (found)
00643 ExecStoreVirtualTuple(slot);
00644
00645
00646 error_context_stack = errcallback.previous;
00647
00648 return slot;
00649 }
00650
00651
00652
00653
00654
00655 static void
00656 fileReScanForeignScan(ForeignScanState *node)
00657 {
00658 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
00659
00660 EndCopyFrom(festate->cstate);
00661
00662 festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation,
00663 festate->filename,
00664 false,
00665 NIL,
00666 festate->options);
00667 }
00668
00669
00670
00671
00672
00673 static void
00674 fileEndForeignScan(ForeignScanState *node)
00675 {
00676 FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
00677
00678
00679 if (festate)
00680 EndCopyFrom(festate->cstate);
00681 }
00682
00683
00684
00685
00686
00687 static bool
00688 fileAnalyzeForeignTable(Relation relation,
00689 AcquireSampleRowsFunc *func,
00690 BlockNumber *totalpages)
00691 {
00692 char *filename;
00693 List *options;
00694 struct stat stat_buf;
00695
00696
00697 fileGetOptions(RelationGetRelid(relation), &filename, &options);
00698
00699
00700
00701
00702
00703 if (stat(filename, &stat_buf) < 0)
00704 ereport(ERROR,
00705 (errcode_for_file_access(),
00706 errmsg("could not stat file \"%s\": %m",
00707 filename)));
00708
00709
00710
00711
00712
00713 *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
00714 if (*totalpages < 1)
00715 *totalpages = 1;
00716
00717 *func = file_acquire_sample_rows;
00718
00719 return true;
00720 }
00721
00722
00723
00724
00725
00726
00727
00728
00729
00730
00731 static bool
00732 check_selective_binary_conversion(RelOptInfo *baserel,
00733 Oid foreigntableid,
00734 List **columns)
00735 {
00736 ForeignTable *table;
00737 ListCell *lc;
00738 Relation rel;
00739 TupleDesc tupleDesc;
00740 AttrNumber attnum;
00741 Bitmapset *attrs_used = NULL;
00742 bool has_wholerow = false;
00743 int numattrs;
00744 int i;
00745
00746 *columns = NIL;
00747
00748
00749
00750
00751 table = GetForeignTable(foreigntableid);
00752 foreach(lc, table->options)
00753 {
00754 DefElem *def = (DefElem *) lfirst(lc);
00755
00756 if (strcmp(def->defname, "format") == 0)
00757 {
00758 char *format = defGetString(def);
00759
00760 if (strcmp(format, "binary") == 0)
00761 return false;
00762 break;
00763 }
00764 }
00765
00766
00767 pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
00768 &attrs_used);
00769
00770
00771 foreach(lc, baserel->baserestrictinfo)
00772 {
00773 RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
00774
00775 pull_varattnos((Node *) rinfo->clause, baserel->relid,
00776 &attrs_used);
00777 }
00778
00779
00780 rel = heap_open(foreigntableid, AccessShareLock);
00781 tupleDesc = RelationGetDescr(rel);
00782
00783 while ((attnum = bms_first_member(attrs_used)) >= 0)
00784 {
00785
00786 attnum += FirstLowInvalidHeapAttributeNumber;
00787
00788 if (attnum == 0)
00789 {
00790 has_wholerow = true;
00791 break;
00792 }
00793
00794
00795 if (attnum < 0)
00796 continue;
00797
00798
00799 if (attnum > 0)
00800 {
00801 Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
00802 char *attname = NameStr(attr->attname);
00803
00804
00805 if (attr->attisdropped)
00806 continue;
00807 *columns = lappend(*columns, makeString(pstrdup(attname)));
00808 }
00809 }
00810
00811
00812 numattrs = 0;
00813 for (i = 0; i < tupleDesc->natts; i++)
00814 {
00815 Form_pg_attribute attr = tupleDesc->attrs[i];
00816
00817 if (attr->attisdropped)
00818 continue;
00819 numattrs++;
00820 }
00821
00822 heap_close(rel, AccessShareLock);
00823
00824
00825 if (has_wholerow)
00826 {
00827 *columns = NIL;
00828 return false;
00829 }
00830
00831
00832 if (numattrs == list_length(*columns))
00833 {
00834 *columns = NIL;
00835 return false;
00836 }
00837
00838 return true;
00839 }
00840
00841
00842
00843
00844
00845
00846
00847
00848 static void
00849 estimate_size(PlannerInfo *root, RelOptInfo *baserel,
00850 FileFdwPlanState *fdw_private)
00851 {
00852 struct stat stat_buf;
00853 BlockNumber pages;
00854 double ntuples;
00855 double nrows;
00856
00857
00858
00859
00860
00861 if (stat(fdw_private->filename, &stat_buf) < 0)
00862 stat_buf.st_size = 10 * BLCKSZ;
00863
00864
00865
00866
00867 pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
00868 if (pages < 1)
00869 pages = 1;
00870 fdw_private->pages = pages;
00871
00872
00873
00874
00875 if (baserel->pages > 0)
00876 {
00877
00878
00879
00880
00881
00882 double density;
00883
00884 density = baserel->tuples / (double) baserel->pages;
00885 ntuples = clamp_row_est(density * (double) pages);
00886 }
00887 else
00888 {
00889
00890
00891
00892
00893
00894
00895
00896
00897 int tuple_width;
00898
00899 tuple_width = MAXALIGN(baserel->width) +
00900 MAXALIGN(sizeof(HeapTupleHeaderData));
00901 ntuples = clamp_row_est((double) stat_buf.st_size /
00902 (double) tuple_width);
00903 }
00904 fdw_private->ntuples = ntuples;
00905
00906
00907
00908
00909
00910 nrows = ntuples *
00911 clauselist_selectivity(root,
00912 baserel->baserestrictinfo,
00913 0,
00914 JOIN_INNER,
00915 NULL);
00916
00917 nrows = clamp_row_est(nrows);
00918
00919
00920 baserel->rows = nrows;
00921 }
00922
00923
00924
00925
00926
00927
00928 static void
00929 estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
00930 FileFdwPlanState *fdw_private,
00931 Cost *startup_cost, Cost *total_cost)
00932 {
00933 BlockNumber pages = fdw_private->pages;
00934 double ntuples = fdw_private->ntuples;
00935 Cost run_cost = 0;
00936 Cost cpu_per_tuple;
00937
00938
00939
00940
00941
00942
00943
00944 run_cost += seq_page_cost * pages;
00945
00946 *startup_cost = baserel->baserestrictcost.startup;
00947 cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
00948 run_cost += cpu_per_tuple * ntuples;
00949 *total_cost = *startup_cost + run_cost;
00950 }
00951
00952
00953
00954
00955
00956
00957
00958
00959
00960
00961
00962
00963
00964
00965
00966 static int
00967 file_acquire_sample_rows(Relation onerel, int elevel,
00968 HeapTuple *rows, int targrows,
00969 double *totalrows, double *totaldeadrows)
00970 {
00971 int numrows = 0;
00972 double rowstoskip = -1;
00973 double rstate;
00974 TupleDesc tupDesc;
00975 Datum *values;
00976 bool *nulls;
00977 bool found;
00978 char *filename;
00979 List *options;
00980 CopyState cstate;
00981 ErrorContextCallback errcallback;
00982 MemoryContext oldcontext = CurrentMemoryContext;
00983 MemoryContext tupcontext;
00984
00985 Assert(onerel);
00986 Assert(targrows > 0);
00987
00988 tupDesc = RelationGetDescr(onerel);
00989 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
00990 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
00991
00992
00993 fileGetOptions(RelationGetRelid(onerel), &filename, &options);
00994
00995
00996
00997
00998 cstate = BeginCopyFrom(onerel, filename, false, NIL, options);
00999
01000
01001
01002
01003
01004 tupcontext = AllocSetContextCreate(CurrentMemoryContext,
01005 "file_fdw temporary context",
01006 ALLOCSET_DEFAULT_MINSIZE,
01007 ALLOCSET_DEFAULT_INITSIZE,
01008 ALLOCSET_DEFAULT_MAXSIZE);
01009
01010
01011 rstate = anl_init_selection_state(targrows);
01012
01013
01014 errcallback.callback = CopyFromErrorCallback;
01015 errcallback.arg = (void *) cstate;
01016 errcallback.previous = error_context_stack;
01017 error_context_stack = &errcallback;
01018
01019 *totalrows = 0;
01020 *totaldeadrows = 0;
01021 for (;;)
01022 {
01023
01024 vacuum_delay_point();
01025
01026
01027 MemoryContextReset(tupcontext);
01028 MemoryContextSwitchTo(tupcontext);
01029
01030 found = NextCopyFrom(cstate, NULL, values, nulls, NULL);
01031
01032 MemoryContextSwitchTo(oldcontext);
01033
01034 if (!found)
01035 break;
01036
01037
01038
01039
01040
01041
01042
01043 if (numrows < targrows)
01044 {
01045 rows[numrows++] = heap_form_tuple(tupDesc, values, nulls);
01046 }
01047 else
01048 {
01049
01050
01051
01052
01053
01054 if (rowstoskip < 0)
01055 rowstoskip = anl_get_next_S(*totalrows, targrows, &rstate);
01056
01057 if (rowstoskip <= 0)
01058 {
01059
01060
01061
01062
01063 int k = (int) (targrows * anl_random_fract());
01064
01065 Assert(k >= 0 && k < targrows);
01066 heap_freetuple(rows[k]);
01067 rows[k] = heap_form_tuple(tupDesc, values, nulls);
01068 }
01069
01070 rowstoskip -= 1;
01071 }
01072
01073 *totalrows += 1;
01074 }
01075
01076
01077 error_context_stack = errcallback.previous;
01078
01079
01080 MemoryContextDelete(tupcontext);
01081
01082 EndCopyFrom(cstate);
01083
01084 pfree(values);
01085 pfree(nulls);
01086
01087
01088
01089
01090 ereport(elevel,
01091 (errmsg("\"%s\": file contains %.0f rows; "
01092 "%d rows in sample",
01093 RelationGetRelationName(onerel),
01094 *totalrows, numrows)));
01095
01096 return numrows;
01097 }