00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "postgres.h"
00016
00017 #include <ctype.h>
00018 #include <unistd.h>
00019 #include <sys/stat.h>
00020 #include <netinet/in.h>
00021 #include <arpa/inet.h>
00022
00023 #include "access/heapam.h"
00024 #include "access/htup_details.h"
00025 #include "access/sysattr.h"
00026 #include "access/xact.h"
00027 #include "catalog/namespace.h"
00028 #include "catalog/pg_type.h"
00029 #include "commands/copy.h"
00030 #include "commands/defrem.h"
00031 #include "commands/trigger.h"
00032 #include "executor/executor.h"
00033 #include "libpq/libpq.h"
00034 #include "libpq/pqformat.h"
00035 #include "mb/pg_wchar.h"
00036 #include "miscadmin.h"
00037 #include "optimizer/clauses.h"
00038 #include "optimizer/planner.h"
00039 #include "parser/parse_relation.h"
00040 #include "rewrite/rewriteHandler.h"
00041 #include "storage/fd.h"
00042 #include "tcop/tcopprot.h"
00043 #include "utils/acl.h"
00044 #include "utils/builtins.h"
00045 #include "utils/lsyscache.h"
00046 #include "utils/memutils.h"
00047 #include "utils/portal.h"
00048 #include "utils/rel.h"
00049 #include "utils/snapmgr.h"
00050
00051
00052 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
00053 #define OCTVALUE(c) ((c) - '0')
00054
00055
00056
00057
00058
00059 typedef enum CopyDest
00060 {
00061 COPY_FILE,
00062 COPY_OLD_FE,
00063 COPY_NEW_FE
00064 } CopyDest;
00065
00066
00067
00068
00069 typedef enum EolType
00070 {
00071 EOL_UNKNOWN,
00072 EOL_NL,
00073 EOL_CR,
00074 EOL_CRNL
00075 } EolType;
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093 typedef struct CopyStateData
00094 {
00095
00096 CopyDest copy_dest;
00097 FILE *copy_file;
00098 StringInfo fe_msgbuf;
00099
00100 bool fe_eof;
00101 EolType eol_type;
00102 int file_encoding;
00103 bool need_transcoding;
00104 bool encoding_embeds_ascii;
00105
00106
00107 Relation rel;
00108 QueryDesc *queryDesc;
00109 List *attnumlist;
00110 char *filename;
00111 bool is_program;
00112 bool binary;
00113 bool oids;
00114 bool freeze;
00115 bool csv_mode;
00116 bool header_line;
00117 char *null_print;
00118 int null_print_len;
00119 char *null_print_client;
00120 char *delim;
00121 char *quote;
00122 char *escape;
00123 List *force_quote;
00124 bool force_quote_all;
00125 bool *force_quote_flags;
00126 List *force_notnull;
00127 bool *force_notnull_flags;
00128 bool convert_selectively;
00129 List *convert_select;
00130 bool *convert_select_flags;
00131
00132
00133 const char *cur_relname;
00134 int cur_lineno;
00135 const char *cur_attname;
00136 const char *cur_attval;
00137
00138
00139
00140
00141 MemoryContext copycontext;
00142
00143
00144
00145
00146 FmgrInfo *out_functions;
00147 MemoryContext rowcontext;
00148
00149
00150
00151
00152 AttrNumber num_defaults;
00153 bool file_has_oids;
00154 FmgrInfo oid_in_function;
00155 Oid oid_typioparam;
00156 FmgrInfo *in_functions;
00157 Oid *typioparams;
00158 int *defmap;
00159 ExprState **defexprs;
00160 bool volatile_defexprs;
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170 StringInfoData attribute_buf;
00171
00172
00173
00174 int max_fields;
00175 char **raw_fields;
00176
00177
00178
00179
00180
00181
00182
00183
00184 StringInfoData line_buf;
00185 bool line_buf_converted;
00186
00187
00188
00189
00190
00191
00192
00193
00194 #define RAW_BUF_SIZE 65536
00195 char *raw_buf;
00196 int raw_buf_index;
00197 int raw_buf_len;
00198 } CopyStateData;
00199
00200
00201 typedef struct
00202 {
00203 DestReceiver pub;
00204 CopyState cstate;
00205 uint64 processed;
00206 } DR_copy;
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
00226 if (1) \
00227 { \
00228 if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
00229 { \
00230 raw_buf_ptr = prev_raw_ptr; \
00231 need_data = true; \
00232 continue; \
00233 } \
00234 } else ((void) 0)
00235
00236
00237 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
00238 if (1) \
00239 { \
00240 if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
00241 { \
00242 if (extralen) \
00243 raw_buf_ptr = copy_buf_len; \
00244 \
00245 result = true; \
00246 break; \
00247 } \
00248 } else ((void) 0)
00249
00250
00251
00252
00253
00254 #define REFILL_LINEBUF \
00255 if (1) \
00256 { \
00257 if (raw_buf_ptr > cstate->raw_buf_index) \
00258 { \
00259 appendBinaryStringInfo(&cstate->line_buf, \
00260 cstate->raw_buf + cstate->raw_buf_index, \
00261 raw_buf_ptr - cstate->raw_buf_index); \
00262 cstate->raw_buf_index = raw_buf_ptr; \
00263 } \
00264 } else ((void) 0)
00265
00266
00267 #define NO_END_OF_COPY_GOTO \
00268 if (1) \
00269 { \
00270 raw_buf_ptr = prev_raw_ptr + 1; \
00271 goto not_end_of_copy; \
00272 } else ((void) 0)
00273
00274 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
00275
00276
00277
00278 static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
00279 const char *queryString, List *attnamelist, List *options);
00280 static void EndCopy(CopyState cstate);
00281 static void ClosePipeToProgram(CopyState cstate);
00282 static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
00283 const char *filename, bool is_program, List *attnamelist,
00284 List *options);
00285 static void EndCopyTo(CopyState cstate);
00286 static uint64 DoCopyTo(CopyState cstate);
00287 static uint64 CopyTo(CopyState cstate);
00288 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
00289 Datum *values, bool *nulls);
00290 static uint64 CopyFrom(CopyState cstate);
00291 static void CopyFromInsertBatch(CopyState cstate, EState *estate,
00292 CommandId mycid, int hi_options,
00293 ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
00294 BulkInsertState bistate,
00295 int nBufferedTuples, HeapTuple *bufferedTuples);
00296 static bool CopyReadLine(CopyState cstate);
00297 static bool CopyReadLineText(CopyState cstate);
00298 static int CopyReadAttributesText(CopyState cstate);
00299 static int CopyReadAttributesCSV(CopyState cstate);
00300 static Datum CopyReadBinaryAttribute(CopyState cstate,
00301 int column_no, FmgrInfo *flinfo,
00302 Oid typioparam, int32 typmod,
00303 bool *isnull);
00304 static void CopyAttributeOutText(CopyState cstate, char *string);
00305 static void CopyAttributeOutCSV(CopyState cstate, char *string,
00306 bool use_quote, bool single_attr);
00307 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
00308 List *attnamelist);
00309 static char *limit_printout_length(const char *str);
00310
00311
00312 static void SendCopyBegin(CopyState cstate);
00313 static void ReceiveCopyBegin(CopyState cstate);
00314 static void SendCopyEnd(CopyState cstate);
00315 static void CopySendData(CopyState cstate, const void *databuf, int datasize);
00316 static void CopySendString(CopyState cstate, const char *str);
00317 static void CopySendChar(CopyState cstate, char c);
00318 static void CopySendEndOfRow(CopyState cstate);
00319 static int CopyGetData(CopyState cstate, void *databuf,
00320 int minread, int maxread);
00321 static void CopySendInt32(CopyState cstate, int32 val);
00322 static bool CopyGetInt32(CopyState cstate, int32 *val);
00323 static void CopySendInt16(CopyState cstate, int16 val);
00324 static bool CopyGetInt16(CopyState cstate, int16 *val);
00325
00326
00327
00328
00329
00330
00331 static void
00332 SendCopyBegin(CopyState cstate)
00333 {
00334 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
00335 {
00336
00337 StringInfoData buf;
00338 int natts = list_length(cstate->attnumlist);
00339 int16 format = (cstate->binary ? 1 : 0);
00340 int i;
00341
00342 pq_beginmessage(&buf, 'H');
00343 pq_sendbyte(&buf, format);
00344 pq_sendint(&buf, natts, 2);
00345 for (i = 0; i < natts; i++)
00346 pq_sendint(&buf, format, 2);
00347 pq_endmessage(&buf);
00348 cstate->copy_dest = COPY_NEW_FE;
00349 }
00350 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
00351 {
00352
00353 if (cstate->binary)
00354 ereport(ERROR,
00355 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00356 errmsg("COPY BINARY is not supported to stdout or from stdin")));
00357 pq_putemptymessage('H');
00358
00359 pq_startcopyout();
00360 cstate->copy_dest = COPY_OLD_FE;
00361 }
00362 else
00363 {
00364
00365 if (cstate->binary)
00366 ereport(ERROR,
00367 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00368 errmsg("COPY BINARY is not supported to stdout or from stdin")));
00369 pq_putemptymessage('B');
00370
00371 pq_startcopyout();
00372 cstate->copy_dest = COPY_OLD_FE;
00373 }
00374 }
00375
00376 static void
00377 ReceiveCopyBegin(CopyState cstate)
00378 {
00379 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
00380 {
00381
00382 StringInfoData buf;
00383 int natts = list_length(cstate->attnumlist);
00384 int16 format = (cstate->binary ? 1 : 0);
00385 int i;
00386
00387 pq_beginmessage(&buf, 'G');
00388 pq_sendbyte(&buf, format);
00389 pq_sendint(&buf, natts, 2);
00390 for (i = 0; i < natts; i++)
00391 pq_sendint(&buf, format, 2);
00392 pq_endmessage(&buf);
00393 cstate->copy_dest = COPY_NEW_FE;
00394 cstate->fe_msgbuf = makeStringInfo();
00395 }
00396 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
00397 {
00398
00399 if (cstate->binary)
00400 ereport(ERROR,
00401 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00402 errmsg("COPY BINARY is not supported to stdout or from stdin")));
00403 pq_putemptymessage('G');
00404 cstate->copy_dest = COPY_OLD_FE;
00405 }
00406 else
00407 {
00408
00409 if (cstate->binary)
00410 ereport(ERROR,
00411 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00412 errmsg("COPY BINARY is not supported to stdout or from stdin")));
00413 pq_putemptymessage('D');
00414 cstate->copy_dest = COPY_OLD_FE;
00415 }
00416
00417 pq_flush();
00418 }
00419
00420 static void
00421 SendCopyEnd(CopyState cstate)
00422 {
00423 if (cstate->copy_dest == COPY_NEW_FE)
00424 {
00425
00426 Assert(cstate->fe_msgbuf->len == 0);
00427
00428 pq_putemptymessage('c');
00429 }
00430 else
00431 {
00432 CopySendData(cstate, "\\.", 2);
00433
00434 CopySendEndOfRow(cstate);
00435 pq_endcopyout(false);
00436 }
00437 }
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449 static void
00450 CopySendData(CopyState cstate, const void *databuf, int datasize)
00451 {
00452 appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
00453 }
00454
00455 static void
00456 CopySendString(CopyState cstate, const char *str)
00457 {
00458 appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
00459 }
00460
00461 static void
00462 CopySendChar(CopyState cstate, char c)
00463 {
00464 appendStringInfoCharMacro(cstate->fe_msgbuf, c);
00465 }
00466
00467 static void
00468 CopySendEndOfRow(CopyState cstate)
00469 {
00470 StringInfo fe_msgbuf = cstate->fe_msgbuf;
00471
00472 switch (cstate->copy_dest)
00473 {
00474 case COPY_FILE:
00475 if (!cstate->binary)
00476 {
00477
00478 #ifndef WIN32
00479 CopySendChar(cstate, '\n');
00480 #else
00481 CopySendString(cstate, "\r\n");
00482 #endif
00483 }
00484
00485 if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
00486 cstate->copy_file) != 1 ||
00487 ferror(cstate->copy_file))
00488 {
00489 if (cstate->is_program)
00490 {
00491 if (errno == EPIPE)
00492 {
00493
00494
00495
00496
00497
00498
00499 ClosePipeToProgram(cstate);
00500
00501
00502
00503
00504
00505
00506 errno = EPIPE;
00507 }
00508 ereport(ERROR,
00509 (errcode_for_file_access(),
00510 errmsg("could not write to COPY program: %m")));
00511 }
00512 else
00513 ereport(ERROR,
00514 (errcode_for_file_access(),
00515 errmsg("could not write to COPY file: %m")));
00516 }
00517 break;
00518 case COPY_OLD_FE:
00519
00520 if (!cstate->binary)
00521 CopySendChar(cstate, '\n');
00522
00523 if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
00524 {
00525
00526 ereport(FATAL,
00527 (errcode(ERRCODE_CONNECTION_FAILURE),
00528 errmsg("connection lost during COPY to stdout")));
00529 }
00530 break;
00531 case COPY_NEW_FE:
00532
00533 if (!cstate->binary)
00534 CopySendChar(cstate, '\n');
00535
00536
00537 (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
00538 break;
00539 }
00540
00541 resetStringInfo(fe_msgbuf);
00542 }
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554
00555
00556
00557 static int
00558 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
00559 {
00560 int bytesread = 0;
00561
00562 switch (cstate->copy_dest)
00563 {
00564 case COPY_FILE:
00565 bytesread = fread(databuf, 1, maxread, cstate->copy_file);
00566 if (ferror(cstate->copy_file))
00567 ereport(ERROR,
00568 (errcode_for_file_access(),
00569 errmsg("could not read from COPY file: %m")));
00570 break;
00571 case COPY_OLD_FE:
00572
00573
00574
00575
00576
00577
00578
00579
00580 if (pq_getbytes((char *) databuf, minread))
00581 {
00582
00583 ereport(ERROR,
00584 (errcode(ERRCODE_CONNECTION_FAILURE),
00585 errmsg("unexpected EOF on client connection with an open transaction")));
00586 }
00587 bytesread = minread;
00588 break;
00589 case COPY_NEW_FE:
00590 while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
00591 {
00592 int avail;
00593
00594 while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
00595 {
00596
00597 int mtype;
00598
00599 readmessage:
00600 mtype = pq_getbyte();
00601 if (mtype == EOF)
00602 ereport(ERROR,
00603 (errcode(ERRCODE_CONNECTION_FAILURE),
00604 errmsg("unexpected EOF on client connection with an open transaction")));
00605 if (pq_getmessage(cstate->fe_msgbuf, 0))
00606 ereport(ERROR,
00607 (errcode(ERRCODE_CONNECTION_FAILURE),
00608 errmsg("unexpected EOF on client connection with an open transaction")));
00609 switch (mtype)
00610 {
00611 case 'd':
00612 break;
00613 case 'c':
00614
00615 cstate->fe_eof = true;
00616 return bytesread;
00617 case 'f':
00618 ereport(ERROR,
00619 (errcode(ERRCODE_QUERY_CANCELED),
00620 errmsg("COPY from stdin failed: %s",
00621 pq_getmsgstring(cstate->fe_msgbuf))));
00622 break;
00623 case 'H':
00624 case 'S':
00625
00626
00627
00628
00629
00630
00631
00632 goto readmessage;
00633 default:
00634 ereport(ERROR,
00635 (errcode(ERRCODE_PROTOCOL_VIOLATION),
00636 errmsg("unexpected message type 0x%02X during COPY from stdin",
00637 mtype)));
00638 break;
00639 }
00640 }
00641 avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
00642 if (avail > maxread)
00643 avail = maxread;
00644 pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
00645 databuf = (void *) ((char *) databuf + avail);
00646 maxread -= avail;
00647 bytesread += avail;
00648 }
00649 break;
00650 }
00651
00652 return bytesread;
00653 }
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663 static void
00664 CopySendInt32(CopyState cstate, int32 val)
00665 {
00666 uint32 buf;
00667
00668 buf = htonl((uint32) val);
00669 CopySendData(cstate, &buf, sizeof(buf));
00670 }
00671
00672
00673
00674
00675
00676
00677 static bool
00678 CopyGetInt32(CopyState cstate, int32 *val)
00679 {
00680 uint32 buf;
00681
00682 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
00683 {
00684 *val = 0;
00685 return false;
00686 }
00687 *val = (int32) ntohl(buf);
00688 return true;
00689 }
00690
00691
00692
00693
00694 static void
00695 CopySendInt16(CopyState cstate, int16 val)
00696 {
00697 uint16 buf;
00698
00699 buf = htons((uint16) val);
00700 CopySendData(cstate, &buf, sizeof(buf));
00701 }
00702
00703
00704
00705
00706 static bool
00707 CopyGetInt16(CopyState cstate, int16 *val)
00708 {
00709 uint16 buf;
00710
00711 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
00712 {
00713 *val = 0;
00714 return false;
00715 }
00716 *val = (int16) ntohs(buf);
00717 return true;
00718 }
00719
00720
00721
00722
00723
00724
00725
00726
00727
00728
00729
00730
00731 static bool
00732 CopyLoadRawBuf(CopyState cstate)
00733 {
00734 int nbytes;
00735 int inbytes;
00736
00737 if (cstate->raw_buf_index < cstate->raw_buf_len)
00738 {
00739
00740 nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
00741 memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
00742 nbytes);
00743 }
00744 else
00745 nbytes = 0;
00746
00747 inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
00748 1, RAW_BUF_SIZE - nbytes);
00749 nbytes += inbytes;
00750 cstate->raw_buf[nbytes] = '\0';
00751 cstate->raw_buf_index = 0;
00752 cstate->raw_buf_len = nbytes;
00753 return (inbytes > 0);
00754 }
00755
00756
00757
00758
00759
00760
00761
00762
00763
00764
00765
00766
00767
00768
00769
00770
00771
00772
00773
00774
00775 Oid
00776 DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
00777 {
00778 CopyState cstate;
00779 bool is_from = stmt->is_from;
00780 bool pipe = (stmt->filename == NULL);
00781 Relation rel;
00782 Oid relid;
00783
00784
00785 if (!pipe && !superuser())
00786 {
00787 if (stmt->is_program)
00788 ereport(ERROR,
00789 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
00790 errmsg("must be superuser to COPY to or from an external program"),
00791 errhint("Anyone can COPY to stdout or from stdin. "
00792 "psql's \\copy command also works for anyone.")));
00793 else
00794 ereport(ERROR,
00795 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
00796 errmsg("must be superuser to COPY to or from a file"),
00797 errhint("Anyone can COPY to stdout or from stdin. "
00798 "psql's \\copy command also works for anyone.")));
00799 }
00800
00801 if (stmt->relation)
00802 {
00803 TupleDesc tupDesc;
00804 AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
00805 RangeTblEntry *rte;
00806 List *attnums;
00807 ListCell *cur;
00808
00809 Assert(!stmt->query);
00810
00811
00812 rel = heap_openrv(stmt->relation,
00813 (is_from ? RowExclusiveLock : AccessShareLock));
00814
00815 relid = RelationGetRelid(rel);
00816
00817 rte = makeNode(RangeTblEntry);
00818 rte->rtekind = RTE_RELATION;
00819 rte->relid = RelationGetRelid(rel);
00820 rte->relkind = rel->rd_rel->relkind;
00821 rte->requiredPerms = required_access;
00822
00823 tupDesc = RelationGetDescr(rel);
00824 attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
00825 foreach(cur, attnums)
00826 {
00827 int attno = lfirst_int(cur) -
00828 FirstLowInvalidHeapAttributeNumber;
00829
00830 if (is_from)
00831 rte->modifiedCols = bms_add_member(rte->modifiedCols, attno);
00832 else
00833 rte->selectedCols = bms_add_member(rte->selectedCols, attno);
00834 }
00835 ExecCheckRTPerms(list_make1(rte), true);
00836 }
00837 else
00838 {
00839 Assert(stmt->query);
00840
00841 relid = InvalidOid;
00842 rel = NULL;
00843 }
00844
00845 if (is_from)
00846 {
00847 Assert(rel);
00848
00849
00850 if (XactReadOnly && !rel->rd_islocaltemp)
00851 PreventCommandIfReadOnly("COPY FROM");
00852
00853 cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
00854 stmt->attlist, stmt->options);
00855 *processed = CopyFrom(cstate);
00856 EndCopyFrom(cstate);
00857 }
00858 else
00859 {
00860 cstate = BeginCopyTo(rel, stmt->query, queryString,
00861 stmt->filename, stmt->is_program,
00862 stmt->attlist, stmt->options);
00863 *processed = DoCopyTo(cstate);
00864 EndCopyTo(cstate);
00865 }
00866
00867
00868
00869
00870
00871
00872 if (rel != NULL)
00873 heap_close(rel, (is_from ? NoLock : AccessShareLock));
00874
00875 return relid;
00876 }
00877
00878
00879
00880
00881
00882
00883
00884
00885
00886
00887
00888
00889
00890
00891
00892
00893
00894
00895 void
00896 ProcessCopyOptions(CopyState cstate,
00897 bool is_from,
00898 List *options)
00899 {
00900 bool format_specified = false;
00901 ListCell *option;
00902
00903
00904 if (cstate == NULL)
00905 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
00906
00907 cstate->file_encoding = -1;
00908
00909
00910 foreach(option, options)
00911 {
00912 DefElem *defel = (DefElem *) lfirst(option);
00913
00914 if (strcmp(defel->defname, "format") == 0)
00915 {
00916 char *fmt = defGetString(defel);
00917
00918 if (format_specified)
00919 ereport(ERROR,
00920 (errcode(ERRCODE_SYNTAX_ERROR),
00921 errmsg("conflicting or redundant options")));
00922 format_specified = true;
00923 if (strcmp(fmt, "text") == 0)
00924 ;
00925 else if (strcmp(fmt, "csv") == 0)
00926 cstate->csv_mode = true;
00927 else if (strcmp(fmt, "binary") == 0)
00928 cstate->binary = true;
00929 else
00930 ereport(ERROR,
00931 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
00932 errmsg("COPY format \"%s\" not recognized", fmt)));
00933 }
00934 else if (strcmp(defel->defname, "oids") == 0)
00935 {
00936 if (cstate->oids)
00937 ereport(ERROR,
00938 (errcode(ERRCODE_SYNTAX_ERROR),
00939 errmsg("conflicting or redundant options")));
00940 cstate->oids = defGetBoolean(defel);
00941 }
00942 else if (strcmp(defel->defname, "freeze") == 0)
00943 {
00944 if (cstate->freeze)
00945 ereport(ERROR,
00946 (errcode(ERRCODE_SYNTAX_ERROR),
00947 errmsg("conflicting or redundant options")));
00948 cstate->freeze = defGetBoolean(defel);
00949 }
00950 else if (strcmp(defel->defname, "delimiter") == 0)
00951 {
00952 if (cstate->delim)
00953 ereport(ERROR,
00954 (errcode(ERRCODE_SYNTAX_ERROR),
00955 errmsg("conflicting or redundant options")));
00956 cstate->delim = defGetString(defel);
00957 }
00958 else if (strcmp(defel->defname, "null") == 0)
00959 {
00960 if (cstate->null_print)
00961 ereport(ERROR,
00962 (errcode(ERRCODE_SYNTAX_ERROR),
00963 errmsg("conflicting or redundant options")));
00964 cstate->null_print = defGetString(defel);
00965 }
00966 else if (strcmp(defel->defname, "header") == 0)
00967 {
00968 if (cstate->header_line)
00969 ereport(ERROR,
00970 (errcode(ERRCODE_SYNTAX_ERROR),
00971 errmsg("conflicting or redundant options")));
00972 cstate->header_line = defGetBoolean(defel);
00973 }
00974 else if (strcmp(defel->defname, "quote") == 0)
00975 {
00976 if (cstate->quote)
00977 ereport(ERROR,
00978 (errcode(ERRCODE_SYNTAX_ERROR),
00979 errmsg("conflicting or redundant options")));
00980 cstate->quote = defGetString(defel);
00981 }
00982 else if (strcmp(defel->defname, "escape") == 0)
00983 {
00984 if (cstate->escape)
00985 ereport(ERROR,
00986 (errcode(ERRCODE_SYNTAX_ERROR),
00987 errmsg("conflicting or redundant options")));
00988 cstate->escape = defGetString(defel);
00989 }
00990 else if (strcmp(defel->defname, "force_quote") == 0)
00991 {
00992 if (cstate->force_quote || cstate->force_quote_all)
00993 ereport(ERROR,
00994 (errcode(ERRCODE_SYNTAX_ERROR),
00995 errmsg("conflicting or redundant options")));
00996 if (defel->arg && IsA(defel->arg, A_Star))
00997 cstate->force_quote_all = true;
00998 else if (defel->arg && IsA(defel->arg, List))
00999 cstate->force_quote = (List *) defel->arg;
01000 else
01001 ereport(ERROR,
01002 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
01003 errmsg("argument to option \"%s\" must be a list of column names",
01004 defel->defname)));
01005 }
01006 else if (strcmp(defel->defname, "force_not_null") == 0)
01007 {
01008 if (cstate->force_notnull)
01009 ereport(ERROR,
01010 (errcode(ERRCODE_SYNTAX_ERROR),
01011 errmsg("conflicting or redundant options")));
01012 if (defel->arg && IsA(defel->arg, List))
01013 cstate->force_notnull = (List *) defel->arg;
01014 else
01015 ereport(ERROR,
01016 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
01017 errmsg("argument to option \"%s\" must be a list of column names",
01018 defel->defname)));
01019 }
01020 else if (strcmp(defel->defname, "convert_selectively") == 0)
01021 {
01022
01023
01024
01025
01026
01027 if (cstate->convert_selectively)
01028 ereport(ERROR,
01029 (errcode(ERRCODE_SYNTAX_ERROR),
01030 errmsg("conflicting or redundant options")));
01031 cstate->convert_selectively = true;
01032 if (defel->arg == NULL || IsA(defel->arg, List))
01033 cstate->convert_select = (List *) defel->arg;
01034 else
01035 ereport(ERROR,
01036 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
01037 errmsg("argument to option \"%s\" must be a list of column names",
01038 defel->defname)));
01039 }
01040 else if (strcmp(defel->defname, "encoding") == 0)
01041 {
01042 if (cstate->file_encoding >= 0)
01043 ereport(ERROR,
01044 (errcode(ERRCODE_SYNTAX_ERROR),
01045 errmsg("conflicting or redundant options")));
01046 cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
01047 if (cstate->file_encoding < 0)
01048 ereport(ERROR,
01049 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
01050 errmsg("argument to option \"%s\" must be a valid encoding name",
01051 defel->defname)));
01052 }
01053 else
01054 ereport(ERROR,
01055 (errcode(ERRCODE_SYNTAX_ERROR),
01056 errmsg("option \"%s\" not recognized",
01057 defel->defname)));
01058 }
01059
01060
01061
01062
01063
01064 if (cstate->binary && cstate->delim)
01065 ereport(ERROR,
01066 (errcode(ERRCODE_SYNTAX_ERROR),
01067 errmsg("cannot specify DELIMITER in BINARY mode")));
01068
01069 if (cstate->binary && cstate->null_print)
01070 ereport(ERROR,
01071 (errcode(ERRCODE_SYNTAX_ERROR),
01072 errmsg("cannot specify NULL in BINARY mode")));
01073
01074
01075 if (!cstate->delim)
01076 cstate->delim = cstate->csv_mode ? "," : "\t";
01077
01078 if (!cstate->null_print)
01079 cstate->null_print = cstate->csv_mode ? "" : "\\N";
01080 cstate->null_print_len = strlen(cstate->null_print);
01081
01082 if (cstate->csv_mode)
01083 {
01084 if (!cstate->quote)
01085 cstate->quote = "\"";
01086 if (!cstate->escape)
01087 cstate->escape = cstate->quote;
01088 }
01089
01090
01091 if (strlen(cstate->delim) != 1)
01092 ereport(ERROR,
01093 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01094 errmsg("COPY delimiter must be a single one-byte character")));
01095
01096
01097 if (strchr(cstate->delim, '\r') != NULL ||
01098 strchr(cstate->delim, '\n') != NULL)
01099 ereport(ERROR,
01100 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
01101 errmsg("COPY delimiter cannot be newline or carriage return")));
01102
01103 if (strchr(cstate->null_print, '\r') != NULL ||
01104 strchr(cstate->null_print, '\n') != NULL)
01105 ereport(ERROR,
01106 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
01107 errmsg("COPY null representation cannot use newline or carriage return")));
01108
01109
01110
01111
01112
01113
01114
01115
01116
01117
01118
01119 if (!cstate->csv_mode &&
01120 strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
01121 cstate->delim[0]) != NULL)
01122 ereport(ERROR,
01123 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
01124 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
01125
01126
01127 if (!cstate->csv_mode && cstate->header_line)
01128 ereport(ERROR,
01129 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01130 errmsg("COPY HEADER available only in CSV mode")));
01131
01132
01133 if (!cstate->csv_mode && cstate->quote != NULL)
01134 ereport(ERROR,
01135 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01136 errmsg("COPY quote available only in CSV mode")));
01137
01138 if (cstate->csv_mode && strlen(cstate->quote) != 1)
01139 ereport(ERROR,
01140 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01141 errmsg("COPY quote must be a single one-byte character")));
01142
01143 if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
01144 ereport(ERROR,
01145 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
01146 errmsg("COPY delimiter and quote must be different")));
01147
01148
01149 if (!cstate->csv_mode && cstate->escape != NULL)
01150 ereport(ERROR,
01151 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01152 errmsg("COPY escape available only in CSV mode")));
01153
01154 if (cstate->csv_mode && strlen(cstate->escape) != 1)
01155 ereport(ERROR,
01156 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01157 errmsg("COPY escape must be a single one-byte character")));
01158
01159
01160 if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
01161 ereport(ERROR,
01162 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01163 errmsg("COPY force quote available only in CSV mode")));
01164 if ((cstate->force_quote || cstate->force_quote_all) && is_from)
01165 ereport(ERROR,
01166 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01167 errmsg("COPY force quote only available using COPY TO")));
01168
01169
01170 if (!cstate->csv_mode && cstate->force_notnull != NIL)
01171 ereport(ERROR,
01172 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01173 errmsg("COPY force not null available only in CSV mode")));
01174 if (cstate->force_notnull != NIL && !is_from)
01175 ereport(ERROR,
01176 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01177 errmsg("COPY force not null only available using COPY FROM")));
01178
01179
01180 if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
01181 ereport(ERROR,
01182 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01183 errmsg("COPY delimiter must not appear in the NULL specification")));
01184
01185
01186 if (cstate->csv_mode &&
01187 strchr(cstate->null_print, cstate->quote[0]) != NULL)
01188 ereport(ERROR,
01189 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01190 errmsg("CSV quote character must not appear in the NULL specification")));
01191 }
01192
01193
01194
01195
01196
01197
01198
01199
01200
01201
01202
01203
01204
01205
01206
01207
01208 static CopyState
01209 BeginCopy(bool is_from,
01210 Relation rel,
01211 Node *raw_query,
01212 const char *queryString,
01213 List *attnamelist,
01214 List *options)
01215 {
01216 CopyState cstate;
01217 TupleDesc tupDesc;
01218 int num_phys_attrs;
01219 MemoryContext oldcontext;
01220
01221
01222 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
01223
01224
01225
01226
01227
01228 cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
01229 "COPY",
01230 ALLOCSET_DEFAULT_MINSIZE,
01231 ALLOCSET_DEFAULT_INITSIZE,
01232 ALLOCSET_DEFAULT_MAXSIZE);
01233
01234 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
01235
01236
01237 ProcessCopyOptions(cstate, is_from, options);
01238
01239
01240 if (rel)
01241 {
01242 Assert(!raw_query);
01243
01244 cstate->rel = rel;
01245
01246 tupDesc = RelationGetDescr(cstate->rel);
01247
01248
01249 if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
01250 ereport(ERROR,
01251 (errcode(ERRCODE_UNDEFINED_COLUMN),
01252 errmsg("table \"%s\" does not have OIDs",
01253 RelationGetRelationName(cstate->rel))));
01254 }
01255 else
01256 {
01257 List *rewritten;
01258 Query *query;
01259 PlannedStmt *plan;
01260 DestReceiver *dest;
01261
01262 Assert(!is_from);
01263 cstate->rel = NULL;
01264
01265
01266 if (cstate->oids)
01267 ereport(ERROR,
01268 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01269 errmsg("COPY (SELECT) WITH OIDS is not supported")));
01270
01271
01272
01273
01274
01275
01276
01277
01278
01279
01280
01281 rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
01282 queryString, NULL, 0);
01283
01284
01285 if (list_length(rewritten) != 1)
01286 elog(ERROR, "unexpected rewrite result");
01287
01288 query = (Query *) linitial(rewritten);
01289
01290
01291 if (query->utilityStmt != NULL &&
01292 IsA(query->utilityStmt, CreateTableAsStmt))
01293 ereport(ERROR,
01294 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01295 errmsg("COPY (SELECT INTO) is not supported")));
01296
01297 Assert(query->commandType == CMD_SELECT);
01298 Assert(query->utilityStmt == NULL);
01299
01300
01301 plan = planner(query, 0, NULL);
01302
01303
01304
01305
01306
01307 PushCopiedSnapshot(GetActiveSnapshot());
01308 UpdateActiveSnapshotCommandId();
01309
01310
01311 dest = CreateDestReceiver(DestCopyOut);
01312 ((DR_copy *) dest)->cstate = cstate;
01313
01314
01315 cstate->queryDesc = CreateQueryDesc(plan, queryString,
01316 GetActiveSnapshot(),
01317 InvalidSnapshot,
01318 dest, NULL, 0);
01319
01320
01321
01322
01323
01324
01325 ExecutorStart(cstate->queryDesc, 0);
01326
01327 tupDesc = cstate->queryDesc->tupDesc;
01328 }
01329
01330
01331 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
01332
01333 num_phys_attrs = tupDesc->natts;
01334
01335
01336 cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
01337 if (cstate->force_quote_all)
01338 {
01339 int i;
01340
01341 for (i = 0; i < num_phys_attrs; i++)
01342 cstate->force_quote_flags[i] = true;
01343 }
01344 else if (cstate->force_quote)
01345 {
01346 List *attnums;
01347 ListCell *cur;
01348
01349 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
01350
01351 foreach(cur, attnums)
01352 {
01353 int attnum = lfirst_int(cur);
01354
01355 if (!list_member_int(cstate->attnumlist, attnum))
01356 ereport(ERROR,
01357 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
01358 errmsg("FORCE QUOTE column \"%s\" not referenced by COPY",
01359 NameStr(tupDesc->attrs[attnum - 1]->attname))));
01360 cstate->force_quote_flags[attnum - 1] = true;
01361 }
01362 }
01363
01364
01365 cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
01366 if (cstate->force_notnull)
01367 {
01368 List *attnums;
01369 ListCell *cur;
01370
01371 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
01372
01373 foreach(cur, attnums)
01374 {
01375 int attnum = lfirst_int(cur);
01376
01377 if (!list_member_int(cstate->attnumlist, attnum))
01378 ereport(ERROR,
01379 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
01380 errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY",
01381 NameStr(tupDesc->attrs[attnum - 1]->attname))));
01382 cstate->force_notnull_flags[attnum - 1] = true;
01383 }
01384 }
01385
01386
01387 if (cstate->convert_selectively)
01388 {
01389 List *attnums;
01390 ListCell *cur;
01391
01392 cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
01393
01394 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
01395
01396 foreach(cur, attnums)
01397 {
01398 int attnum = lfirst_int(cur);
01399
01400 if (!list_member_int(cstate->attnumlist, attnum))
01401 ereport(ERROR,
01402 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
01403 errmsg_internal("selected column \"%s\" not referenced by COPY",
01404 NameStr(tupDesc->attrs[attnum - 1]->attname))));
01405 cstate->convert_select_flags[attnum - 1] = true;
01406 }
01407 }
01408
01409
01410 if (cstate->file_encoding < 0)
01411 cstate->file_encoding = pg_get_client_encoding();
01412
01413
01414
01415
01416
01417
01418 cstate->need_transcoding =
01419 (cstate->file_encoding != GetDatabaseEncoding() ||
01420 pg_database_encoding_max_length() > 1);
01421
01422 cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
01423
01424 cstate->copy_dest = COPY_FILE;
01425
01426 MemoryContextSwitchTo(oldcontext);
01427
01428 return cstate;
01429 }
01430
01431
01432
01433
01434 static void
01435 ClosePipeToProgram(CopyState cstate)
01436 {
01437 int pclose_rc;
01438
01439 Assert(cstate->is_program);
01440
01441 pclose_rc = ClosePipeStream(cstate->copy_file);
01442 if (pclose_rc == -1)
01443 ereport(ERROR,
01444 (errmsg("could not close pipe to external command: %m")));
01445 else if (pclose_rc != 0)
01446 ereport(ERROR,
01447 (errmsg("program \"%s\" failed",
01448 cstate->filename),
01449 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
01450 }
01451
01452
01453
01454
01455 static void
01456 EndCopy(CopyState cstate)
01457 {
01458 if (cstate->is_program)
01459 {
01460 ClosePipeToProgram(cstate);
01461 }
01462 else
01463 {
01464 if (cstate->filename != NULL && FreeFile(cstate->copy_file))
01465 ereport(ERROR,
01466 (errcode_for_file_access(),
01467 errmsg("could not close file \"%s\": %m",
01468 cstate->filename)));
01469 }
01470
01471 MemoryContextDelete(cstate->copycontext);
01472 pfree(cstate);
01473 }
01474
01475
01476
01477
01478 static CopyState
01479 BeginCopyTo(Relation rel,
01480 Node *query,
01481 const char *queryString,
01482 const char *filename,
01483 bool is_program,
01484 List *attnamelist,
01485 List *options)
01486 {
01487 CopyState cstate;
01488 bool pipe = (filename == NULL);
01489 MemoryContext oldcontext;
01490
01491 if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
01492 {
01493 if (rel->rd_rel->relkind == RELKIND_VIEW)
01494 ereport(ERROR,
01495 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
01496 errmsg("cannot copy from view \"%s\"",
01497 RelationGetRelationName(rel)),
01498 errhint("Try the COPY (SELECT ...) TO variant.")));
01499 else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
01500 ereport(ERROR,
01501 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
01502 errmsg("cannot copy from materialized view \"%s\"",
01503 RelationGetRelationName(rel)),
01504 errhint("Try the COPY (SELECT ...) TO variant.")));
01505 else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
01506 ereport(ERROR,
01507 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
01508 errmsg("cannot copy from foreign table \"%s\"",
01509 RelationGetRelationName(rel)),
01510 errhint("Try the COPY (SELECT ...) TO variant.")));
01511 else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
01512 ereport(ERROR,
01513 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
01514 errmsg("cannot copy from sequence \"%s\"",
01515 RelationGetRelationName(rel))));
01516 else
01517 ereport(ERROR,
01518 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
01519 errmsg("cannot copy from non-table relation \"%s\"",
01520 RelationGetRelationName(rel))));
01521 }
01522
01523 cstate = BeginCopy(false, rel, query, queryString, attnamelist, options);
01524 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
01525
01526 if (pipe)
01527 {
01528 Assert(!is_program);
01529 if (whereToSendOutput != DestRemote)
01530 cstate->copy_file = stdout;
01531 }
01532 else
01533 {
01534 cstate->filename = pstrdup(filename);
01535 cstate->is_program = is_program;
01536
01537 if (is_program)
01538 {
01539 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
01540 if (cstate->copy_file == NULL)
01541 ereport(ERROR,
01542 (errmsg("could not execute command \"%s\": %m",
01543 cstate->filename)));
01544 }
01545 else
01546 {
01547 mode_t oumask;
01548 struct stat st;
01549
01550
01551
01552
01553
01554 if (!is_absolute_path(filename))
01555 ereport(ERROR,
01556 (errcode(ERRCODE_INVALID_NAME),
01557 errmsg("relative path not allowed for COPY to file")));
01558
01559 oumask = umask(S_IWGRP | S_IWOTH);
01560 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
01561 umask(oumask);
01562 if (cstate->copy_file == NULL)
01563 ereport(ERROR,
01564 (errcode_for_file_access(),
01565 errmsg("could not open file \"%s\" for writing: %m",
01566 cstate->filename)));
01567
01568 fstat(fileno(cstate->copy_file), &st);
01569 if (S_ISDIR(st.st_mode))
01570 ereport(ERROR,
01571 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
01572 errmsg("\"%s\" is a directory", cstate->filename)));
01573 }
01574 }
01575
01576 MemoryContextSwitchTo(oldcontext);
01577
01578 return cstate;
01579 }
01580
01581
01582
01583
01584
01585 static uint64
01586 DoCopyTo(CopyState cstate)
01587 {
01588 bool pipe = (cstate->filename == NULL);
01589 bool fe_copy = (pipe && whereToSendOutput == DestRemote);
01590 uint64 processed;
01591
01592 PG_TRY();
01593 {
01594 if (fe_copy)
01595 SendCopyBegin(cstate);
01596
01597 processed = CopyTo(cstate);
01598
01599 if (fe_copy)
01600 SendCopyEnd(cstate);
01601 }
01602 PG_CATCH();
01603 {
01604
01605
01606
01607
01608
01609 pq_endcopyout(true);
01610 PG_RE_THROW();
01611 }
01612 PG_END_TRY();
01613
01614 return processed;
01615 }
01616
01617
01618
01619
01620 static void
01621 EndCopyTo(CopyState cstate)
01622 {
01623 if (cstate->queryDesc != NULL)
01624 {
01625
01626 ExecutorFinish(cstate->queryDesc);
01627 ExecutorEnd(cstate->queryDesc);
01628 FreeQueryDesc(cstate->queryDesc);
01629 PopActiveSnapshot();
01630 }
01631
01632
01633 EndCopy(cstate);
01634 }
01635
01636
01637
01638
01639 static uint64
01640 CopyTo(CopyState cstate)
01641 {
01642 TupleDesc tupDesc;
01643 int num_phys_attrs;
01644 Form_pg_attribute *attr;
01645 ListCell *cur;
01646 uint64 processed;
01647
01648 if (cstate->rel)
01649 tupDesc = RelationGetDescr(cstate->rel);
01650 else
01651 tupDesc = cstate->queryDesc->tupDesc;
01652 attr = tupDesc->attrs;
01653 num_phys_attrs = tupDesc->natts;
01654 cstate->null_print_client = cstate->null_print;
01655
01656
01657 cstate->fe_msgbuf = makeStringInfo();
01658
01659
01660 cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
01661 foreach(cur, cstate->attnumlist)
01662 {
01663 int attnum = lfirst_int(cur);
01664 Oid out_func_oid;
01665 bool isvarlena;
01666
01667 if (cstate->binary)
01668 getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
01669 &out_func_oid,
01670 &isvarlena);
01671 else
01672 getTypeOutputInfo(attr[attnum - 1]->atttypid,
01673 &out_func_oid,
01674 &isvarlena);
01675 fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
01676 }
01677
01678
01679
01680
01681
01682
01683
01684 cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
01685 "COPY TO",
01686 ALLOCSET_DEFAULT_MINSIZE,
01687 ALLOCSET_DEFAULT_INITSIZE,
01688 ALLOCSET_DEFAULT_MAXSIZE);
01689
01690 if (cstate->binary)
01691 {
01692
01693 int32 tmp;
01694
01695
01696 CopySendData(cstate, BinarySignature, 11);
01697
01698 tmp = 0;
01699 if (cstate->oids)
01700 tmp |= (1 << 16);
01701 CopySendInt32(cstate, tmp);
01702
01703 tmp = 0;
01704 CopySendInt32(cstate, tmp);
01705 }
01706 else
01707 {
01708
01709
01710
01711
01712 if (cstate->need_transcoding)
01713 cstate->null_print_client = pg_server_to_any(cstate->null_print,
01714 cstate->null_print_len,
01715 cstate->file_encoding);
01716
01717
01718 if (cstate->header_line)
01719 {
01720 bool hdr_delim = false;
01721
01722 foreach(cur, cstate->attnumlist)
01723 {
01724 int attnum = lfirst_int(cur);
01725 char *colname;
01726
01727 if (hdr_delim)
01728 CopySendChar(cstate, cstate->delim[0]);
01729 hdr_delim = true;
01730
01731 colname = NameStr(attr[attnum - 1]->attname);
01732
01733 CopyAttributeOutCSV(cstate, colname, false,
01734 list_length(cstate->attnumlist) == 1);
01735 }
01736
01737 CopySendEndOfRow(cstate);
01738 }
01739 }
01740
01741 if (cstate->rel)
01742 {
01743 Datum *values;
01744 bool *nulls;
01745 HeapScanDesc scandesc;
01746 HeapTuple tuple;
01747
01748 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
01749 nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
01750
01751 scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
01752
01753 processed = 0;
01754 while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
01755 {
01756 CHECK_FOR_INTERRUPTS();
01757
01758
01759 heap_deform_tuple(tuple, tupDesc, values, nulls);
01760
01761
01762 CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
01763 processed++;
01764 }
01765
01766 heap_endscan(scandesc);
01767
01768 pfree(values);
01769 pfree(nulls);
01770 }
01771 else
01772 {
01773
01774 ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
01775 processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
01776 }
01777
01778 if (cstate->binary)
01779 {
01780
01781 CopySendInt16(cstate, -1);
01782
01783 CopySendEndOfRow(cstate);
01784 }
01785
01786 MemoryContextDelete(cstate->rowcontext);
01787
01788 return processed;
01789 }
01790
01791
01792
01793
01794 static void
01795 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
01796 {
01797 bool need_delim = false;
01798 FmgrInfo *out_functions = cstate->out_functions;
01799 MemoryContext oldcontext;
01800 ListCell *cur;
01801 char *string;
01802
01803 MemoryContextReset(cstate->rowcontext);
01804 oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
01805
01806 if (cstate->binary)
01807 {
01808
01809 CopySendInt16(cstate, list_length(cstate->attnumlist));
01810
01811 if (cstate->oids)
01812 {
01813
01814 CopySendInt32(cstate, sizeof(int32));
01815 CopySendInt32(cstate, tupleOid);
01816 }
01817 }
01818 else
01819 {
01820
01821
01822 if (cstate->oids)
01823 {
01824 string = DatumGetCString(DirectFunctionCall1(oidout,
01825 ObjectIdGetDatum(tupleOid)));
01826 CopySendString(cstate, string);
01827 need_delim = true;
01828 }
01829 }
01830
01831 foreach(cur, cstate->attnumlist)
01832 {
01833 int attnum = lfirst_int(cur);
01834 Datum value = values[attnum - 1];
01835 bool isnull = nulls[attnum - 1];
01836
01837 if (!cstate->binary)
01838 {
01839 if (need_delim)
01840 CopySendChar(cstate, cstate->delim[0]);
01841 need_delim = true;
01842 }
01843
01844 if (isnull)
01845 {
01846 if (!cstate->binary)
01847 CopySendString(cstate, cstate->null_print_client);
01848 else
01849 CopySendInt32(cstate, -1);
01850 }
01851 else
01852 {
01853 if (!cstate->binary)
01854 {
01855 string = OutputFunctionCall(&out_functions[attnum - 1],
01856 value);
01857 if (cstate->csv_mode)
01858 CopyAttributeOutCSV(cstate, string,
01859 cstate->force_quote_flags[attnum - 1],
01860 list_length(cstate->attnumlist) == 1);
01861 else
01862 CopyAttributeOutText(cstate, string);
01863 }
01864 else
01865 {
01866 bytea *outputbytes;
01867
01868 outputbytes = SendFunctionCall(&out_functions[attnum - 1],
01869 value);
01870 CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
01871 CopySendData(cstate, VARDATA(outputbytes),
01872 VARSIZE(outputbytes) - VARHDRSZ);
01873 }
01874 }
01875 }
01876
01877 CopySendEndOfRow(cstate);
01878
01879 MemoryContextSwitchTo(oldcontext);
01880 }
01881
01882
01883
01884
01885
01886
01887
01888 void
01889 CopyFromErrorCallback(void *arg)
01890 {
01891 CopyState cstate = (CopyState) arg;
01892
01893 if (cstate->binary)
01894 {
01895
01896 if (cstate->cur_attname)
01897 errcontext("COPY %s, line %d, column %s",
01898 cstate->cur_relname, cstate->cur_lineno,
01899 cstate->cur_attname);
01900 else
01901 errcontext("COPY %s, line %d",
01902 cstate->cur_relname, cstate->cur_lineno);
01903 }
01904 else
01905 {
01906 if (cstate->cur_attname && cstate->cur_attval)
01907 {
01908
01909 char *attval;
01910
01911 attval = limit_printout_length(cstate->cur_attval);
01912 errcontext("COPY %s, line %d, column %s: \"%s\"",
01913 cstate->cur_relname, cstate->cur_lineno,
01914 cstate->cur_attname, attval);
01915 pfree(attval);
01916 }
01917 else if (cstate->cur_attname)
01918 {
01919
01920 errcontext("COPY %s, line %d, column %s: null input",
01921 cstate->cur_relname, cstate->cur_lineno,
01922 cstate->cur_attname);
01923 }
01924 else
01925 {
01926
01927 if (cstate->line_buf_converted || !cstate->need_transcoding)
01928 {
01929 char *lineval;
01930
01931 lineval = limit_printout_length(cstate->line_buf.data);
01932 errcontext("COPY %s, line %d: \"%s\"",
01933 cstate->cur_relname, cstate->cur_lineno, lineval);
01934 pfree(lineval);
01935 }
01936 else
01937 {
01938
01939
01940
01941
01942
01943
01944
01945
01946 errcontext("COPY %s, line %d",
01947 cstate->cur_relname, cstate->cur_lineno);
01948 }
01949 }
01950 }
01951 }
01952
01953
01954
01955
01956
01957
01958
01959
01960
01961
01962 static char *
01963 limit_printout_length(const char *str)
01964 {
01965 #define MAX_COPY_DATA_DISPLAY 100
01966
01967 int slen = strlen(str);
01968 int len;
01969 char *res;
01970
01971
01972 if (slen <= MAX_COPY_DATA_DISPLAY)
01973 return pstrdup(str);
01974
01975
01976 len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
01977
01978
01979
01980
01981 res = (char *) palloc(len + 4);
01982 memcpy(res, str, len);
01983 strcpy(res + len, "...");
01984
01985 return res;
01986 }
01987
01988
01989
01990
01991 static uint64
01992 CopyFrom(CopyState cstate)
01993 {
01994 HeapTuple tuple;
01995 TupleDesc tupDesc;
01996 Datum *values;
01997 bool *nulls;
01998 ResultRelInfo *resultRelInfo;
01999 EState *estate = CreateExecutorState();
02000 ExprContext *econtext;
02001 TupleTableSlot *myslot;
02002 MemoryContext oldcontext = CurrentMemoryContext;
02003
02004 ErrorContextCallback errcallback;
02005 CommandId mycid = GetCurrentCommandId(true);
02006 int hi_options = 0;
02007 BulkInsertState bistate;
02008 uint64 processed = 0;
02009 bool useHeapMultiInsert;
02010 int nBufferedTuples = 0;
02011
02012 #define MAX_BUFFERED_TUPLES 1000
02013 HeapTuple *bufferedTuples = NULL;
02014 Size bufferedTuplesSize = 0;
02015
02016 Assert(cstate->rel);
02017
02018 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
02019 {
02020 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
02021 ereport(ERROR,
02022 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
02023 errmsg("cannot copy to view \"%s\"",
02024 RelationGetRelationName(cstate->rel))));
02025 else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
02026 ereport(ERROR,
02027 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
02028 errmsg("cannot copy to materialized view \"%s\"",
02029 RelationGetRelationName(cstate->rel))));
02030 else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
02031 ereport(ERROR,
02032 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
02033 errmsg("cannot copy to foreign table \"%s\"",
02034 RelationGetRelationName(cstate->rel))));
02035 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
02036 ereport(ERROR,
02037 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
02038 errmsg("cannot copy to sequence \"%s\"",
02039 RelationGetRelationName(cstate->rel))));
02040 else
02041 ereport(ERROR,
02042 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
02043 errmsg("cannot copy to non-table relation \"%s\"",
02044 RelationGetRelationName(cstate->rel))));
02045 }
02046
02047 tupDesc = RelationGetDescr(cstate->rel);
02048
02049
02050
02051
02052
02053
02054
02055
02056
02057
02058
02059
02060
02061
02062
02063
02064
02065
02066
02067
02068
02069
02070
02071
02072
02073
02074
02075
02076
02077
02078
02079
02080
02081
02082
02083
02084
02085 if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
02086 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
02087 {
02088 hi_options |= HEAP_INSERT_SKIP_FSM;
02089 if (!XLogIsNeeded())
02090 hi_options |= HEAP_INSERT_SKIP_WAL;
02091 }
02092
02093
02094
02095
02096
02097
02098
02099
02100
02101
02102 if (cstate->freeze)
02103 {
02104 if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
02105 ereport(ERROR,
02106 (ERRCODE_INVALID_TRANSACTION_STATE,
02107 errmsg("cannot perform FREEZE because of prior transaction activity")));
02108
02109 if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
02110 cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
02111 ereport(ERROR,
02112 (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE,
02113 errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
02114
02115 hi_options |= HEAP_INSERT_FROZEN;
02116 }
02117
02118
02119
02120
02121
02122
02123 resultRelInfo = makeNode(ResultRelInfo);
02124 InitResultRelInfo(resultRelInfo,
02125 cstate->rel,
02126 1,
02127 0);
02128
02129 ExecOpenIndices(resultRelInfo);
02130
02131 estate->es_result_relations = resultRelInfo;
02132 estate->es_num_result_relations = 1;
02133 estate->es_result_relation_info = resultRelInfo;
02134
02135
02136 myslot = ExecInitExtraTupleSlot(estate);
02137 ExecSetSlotDescriptor(myslot, tupDesc);
02138
02139 estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
02140
02141
02142
02143
02144
02145
02146
02147
02148
02149
02150 if ((resultRelInfo->ri_TrigDesc != NULL &&
02151 (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
02152 resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
02153 cstate->volatile_defexprs)
02154 {
02155 useHeapMultiInsert = false;
02156 }
02157 else
02158 {
02159 useHeapMultiInsert = true;
02160 bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
02161 }
02162
02163
02164 AfterTriggerBeginQuery();
02165
02166
02167
02168
02169
02170
02171
02172 ExecBSInsertTriggers(estate, resultRelInfo);
02173
02174 values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
02175 nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
02176
02177 bistate = GetBulkInsertState();
02178 econtext = GetPerTupleExprContext(estate);
02179
02180
02181 errcallback.callback = CopyFromErrorCallback;
02182 errcallback.arg = (void *) cstate;
02183 errcallback.previous = error_context_stack;
02184 error_context_stack = &errcallback;
02185
02186 for (;;)
02187 {
02188 TupleTableSlot *slot;
02189 bool skip_tuple;
02190 Oid loaded_oid = InvalidOid;
02191
02192 CHECK_FOR_INTERRUPTS();
02193
02194 if (nBufferedTuples == 0)
02195 {
02196
02197
02198
02199
02200
02201 ResetPerTupleExprContext(estate);
02202 }
02203
02204
02205 MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
02206
02207 if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
02208 break;
02209
02210
02211 tuple = heap_form_tuple(tupDesc, values, nulls);
02212
02213 if (loaded_oid != InvalidOid)
02214 HeapTupleSetOid(tuple, loaded_oid);
02215
02216
02217 MemoryContextSwitchTo(oldcontext);
02218
02219
02220 slot = myslot;
02221 ExecStoreTuple(tuple, slot, InvalidBuffer, false);
02222
02223 skip_tuple = false;
02224
02225
02226 if (resultRelInfo->ri_TrigDesc &&
02227 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
02228 {
02229 slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
02230
02231 if (slot == NULL)
02232 skip_tuple = true;
02233 else
02234 tuple = ExecMaterializeSlot(slot);
02235 }
02236
02237 if (!skip_tuple)
02238 {
02239
02240 if (cstate->rel->rd_att->constr)
02241 ExecConstraints(resultRelInfo, slot, estate);
02242
02243 if (useHeapMultiInsert)
02244 {
02245
02246 bufferedTuples[nBufferedTuples++] = tuple;
02247 bufferedTuplesSize += tuple->t_len;
02248
02249
02250
02251
02252
02253
02254
02255 if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
02256 bufferedTuplesSize > 65535)
02257 {
02258 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
02259 resultRelInfo, myslot, bistate,
02260 nBufferedTuples, bufferedTuples);
02261 nBufferedTuples = 0;
02262 bufferedTuplesSize = 0;
02263 }
02264 }
02265 else
02266 {
02267 List *recheckIndexes = NIL;
02268
02269
02270 heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
02271
02272 if (resultRelInfo->ri_NumIndices > 0)
02273 recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
02274 estate);
02275
02276
02277 ExecARInsertTriggers(estate, resultRelInfo, tuple,
02278 recheckIndexes);
02279
02280 list_free(recheckIndexes);
02281 }
02282
02283
02284
02285
02286
02287
02288 processed++;
02289 }
02290 }
02291
02292
02293 if (nBufferedTuples > 0)
02294 CopyFromInsertBatch(cstate, estate, mycid, hi_options,
02295 resultRelInfo, myslot, bistate,
02296 nBufferedTuples, bufferedTuples);
02297
02298
02299 error_context_stack = errcallback.previous;
02300
02301 FreeBulkInsertState(bistate);
02302
02303 MemoryContextSwitchTo(oldcontext);
02304
02305
02306 ExecASInsertTriggers(estate, resultRelInfo);
02307
02308
02309 AfterTriggerEndQuery(estate);
02310
02311 pfree(values);
02312 pfree(nulls);
02313
02314 ExecResetTupleTable(estate->es_tupleTable, false);
02315
02316 ExecCloseIndices(resultRelInfo);
02317
02318 FreeExecutorState(estate);
02319
02320
02321
02322
02323
02324 if (hi_options & HEAP_INSERT_SKIP_WAL)
02325 heap_sync(cstate->rel);
02326
02327 return processed;
02328 }
02329
02330
02331
02332
02333
02334
02335 static void
02336 CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
02337 int hi_options, ResultRelInfo *resultRelInfo,
02338 TupleTableSlot *myslot, BulkInsertState bistate,
02339 int nBufferedTuples, HeapTuple *bufferedTuples)
02340 {
02341 MemoryContext oldcontext;
02342 int i;
02343
02344
02345
02346
02347
02348 oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
02349 heap_multi_insert(cstate->rel,
02350 bufferedTuples,
02351 nBufferedTuples,
02352 mycid,
02353 hi_options,
02354 bistate);
02355 MemoryContextSwitchTo(oldcontext);
02356
02357
02358
02359
02360
02361 if (resultRelInfo->ri_NumIndices > 0)
02362 {
02363 for (i = 0; i < nBufferedTuples; i++)
02364 {
02365 List *recheckIndexes;
02366
02367 ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
02368 recheckIndexes =
02369 ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
02370 estate);
02371 ExecARInsertTriggers(estate, resultRelInfo,
02372 bufferedTuples[i],
02373 recheckIndexes);
02374 list_free(recheckIndexes);
02375 }
02376 }
02377
02378
02379
02380
02381
02382 else if (resultRelInfo->ri_TrigDesc != NULL &&
02383 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
02384 {
02385 for (i = 0; i < nBufferedTuples; i++)
02386 ExecARInsertTriggers(estate, resultRelInfo,
02387 bufferedTuples[i],
02388 NIL);
02389 }
02390 }
02391
02392
02393
02394
02395
02396
02397
02398
02399
02400
02401
02402 CopyState
02403 BeginCopyFrom(Relation rel,
02404 const char *filename,
02405 bool is_program,
02406 List *attnamelist,
02407 List *options)
02408 {
02409 CopyState cstate;
02410 bool pipe = (filename == NULL);
02411 TupleDesc tupDesc;
02412 Form_pg_attribute *attr;
02413 AttrNumber num_phys_attrs,
02414 num_defaults;
02415 FmgrInfo *in_functions;
02416 Oid *typioparams;
02417 int attnum;
02418 Oid in_func_oid;
02419 int *defmap;
02420 ExprState **defexprs;
02421 MemoryContext oldcontext;
02422 bool volatile_defexprs;
02423
02424 cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
02425 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
02426
02427
02428 cstate->fe_eof = false;
02429 cstate->eol_type = EOL_UNKNOWN;
02430 cstate->cur_relname = RelationGetRelationName(cstate->rel);
02431 cstate->cur_lineno = 0;
02432 cstate->cur_attname = NULL;
02433 cstate->cur_attval = NULL;
02434
02435
02436 initStringInfo(&cstate->attribute_buf);
02437 initStringInfo(&cstate->line_buf);
02438 cstate->line_buf_converted = false;
02439 cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
02440 cstate->raw_buf_index = cstate->raw_buf_len = 0;
02441
02442 tupDesc = RelationGetDescr(cstate->rel);
02443 attr = tupDesc->attrs;
02444 num_phys_attrs = tupDesc->natts;
02445 num_defaults = 0;
02446 volatile_defexprs = false;
02447
02448
02449
02450
02451
02452
02453
02454 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
02455 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
02456 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
02457 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
02458
02459 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
02460 {
02461
02462 if (attr[attnum - 1]->attisdropped)
02463 continue;
02464
02465
02466 if (cstate->binary)
02467 getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
02468 &in_func_oid, &typioparams[attnum - 1]);
02469 else
02470 getTypeInputInfo(attr[attnum - 1]->atttypid,
02471 &in_func_oid, &typioparams[attnum - 1]);
02472 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
02473
02474
02475 if (!list_member_int(cstate->attnumlist, attnum))
02476 {
02477
02478
02479 Node *defexpr = build_column_default(cstate->rel, attnum);
02480
02481 if (defexpr != NULL)
02482 {
02483
02484 defexprs[num_defaults] = ExecInitExpr(
02485 expression_planner((Expr *) defexpr), NULL);
02486 defmap[num_defaults] = attnum - 1;
02487 num_defaults++;
02488
02489 if (!volatile_defexprs)
02490 volatile_defexprs = contain_volatile_functions(defexpr);
02491 }
02492 }
02493 }
02494
02495
02496 cstate->in_functions = in_functions;
02497 cstate->typioparams = typioparams;
02498 cstate->defmap = defmap;
02499 cstate->defexprs = defexprs;
02500 cstate->volatile_defexprs = volatile_defexprs;
02501 cstate->num_defaults = num_defaults;
02502 cstate->is_program = is_program;
02503
02504 if (pipe)
02505 {
02506 Assert(!is_program);
02507 if (whereToSendOutput == DestRemote)
02508 ReceiveCopyBegin(cstate);
02509 else
02510 cstate->copy_file = stdin;
02511 }
02512 else
02513 {
02514 cstate->filename = pstrdup(filename);
02515
02516 if (cstate->is_program)
02517 {
02518 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
02519 if (cstate->copy_file == NULL)
02520 ereport(ERROR,
02521 (errmsg("could not execute command \"%s\": %m",
02522 cstate->filename)));
02523 }
02524 else
02525 {
02526 struct stat st;
02527
02528 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
02529 if (cstate->copy_file == NULL)
02530 ereport(ERROR,
02531 (errcode_for_file_access(),
02532 errmsg("could not open file \"%s\" for reading: %m",
02533 cstate->filename)));
02534
02535 fstat(fileno(cstate->copy_file), &st);
02536 if (S_ISDIR(st.st_mode))
02537 ereport(ERROR,
02538 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
02539 errmsg("\"%s\" is a directory", cstate->filename)));
02540 }
02541 }
02542
02543 if (!cstate->binary)
02544 {
02545
02546 cstate->file_has_oids = cstate->oids;
02547 }
02548 else
02549 {
02550
02551 char readSig[11];
02552 int32 tmp;
02553
02554
02555 if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
02556 memcmp(readSig, BinarySignature, 11) != 0)
02557 ereport(ERROR,
02558 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
02559 errmsg("COPY file signature not recognized")));
02560
02561 if (!CopyGetInt32(cstate, &tmp))
02562 ereport(ERROR,
02563 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
02564 errmsg("invalid COPY file header (missing flags)")));
02565 cstate->file_has_oids = (tmp & (1 << 16)) != 0;
02566 tmp &= ~(1 << 16);
02567 if ((tmp >> 16) != 0)
02568 ereport(ERROR,
02569 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
02570 errmsg("unrecognized critical flags in COPY file header")));
02571
02572 if (!CopyGetInt32(cstate, &tmp) ||
02573 tmp < 0)
02574 ereport(ERROR,
02575 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
02576 errmsg("invalid COPY file header (missing length)")));
02577
02578 while (tmp-- > 0)
02579 {
02580 if (CopyGetData(cstate, readSig, 1, 1) != 1)
02581 ereport(ERROR,
02582 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
02583 errmsg("invalid COPY file header (wrong length)")));
02584 }
02585 }
02586
02587 if (cstate->file_has_oids && cstate->binary)
02588 {
02589 getTypeBinaryInputInfo(OIDOID,
02590 &in_func_oid, &cstate->oid_typioparam);
02591 fmgr_info(in_func_oid, &cstate->oid_in_function);
02592 }
02593
02594
02595 if (!cstate->binary)
02596 {
02597 AttrNumber attr_count = list_length(cstate->attnumlist);
02598 int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
02599
02600 cstate->max_fields = nfields;
02601 cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
02602 }
02603
02604 MemoryContextSwitchTo(oldcontext);
02605
02606 return cstate;
02607 }
02608
02609
02610
02611
02612
02613
02614
02615
02616
02617
02618
02619
02620 bool
02621 NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
02622 {
02623 int fldct;
02624 bool done;
02625
02626
02627 Assert(!cstate->binary);
02628
02629
02630 if (cstate->cur_lineno == 0 && cstate->header_line)
02631 {
02632 cstate->cur_lineno++;
02633 if (CopyReadLine(cstate))
02634 return false;
02635 }
02636
02637 cstate->cur_lineno++;
02638
02639
02640 done = CopyReadLine(cstate);
02641
02642
02643
02644
02645
02646
02647 if (done && cstate->line_buf.len == 0)
02648 return false;
02649
02650
02651 if (cstate->csv_mode)
02652 fldct = CopyReadAttributesCSV(cstate);
02653 else
02654 fldct = CopyReadAttributesText(cstate);
02655
02656 *fields = cstate->raw_fields;
02657 *nfields = fldct;
02658 return true;
02659 }
02660
02661
02662
02663
02664
02665
02666
02667
02668
02669
02670
02671
02672 bool
02673 NextCopyFrom(CopyState cstate, ExprContext *econtext,
02674 Datum *values, bool *nulls, Oid *tupleOid)
02675 {
02676 TupleDesc tupDesc;
02677 Form_pg_attribute *attr;
02678 AttrNumber num_phys_attrs,
02679 attr_count,
02680 num_defaults = cstate->num_defaults;
02681 FmgrInfo *in_functions = cstate->in_functions;
02682 Oid *typioparams = cstate->typioparams;
02683 int i;
02684 int nfields;
02685 bool isnull;
02686 bool file_has_oids = cstate->file_has_oids;
02687 int *defmap = cstate->defmap;
02688 ExprState **defexprs = cstate->defexprs;
02689
02690 tupDesc = RelationGetDescr(cstate->rel);
02691 attr = tupDesc->attrs;
02692 num_phys_attrs = tupDesc->natts;
02693 attr_count = list_length(cstate->attnumlist);
02694 nfields = file_has_oids ? (attr_count + 1) : attr_count;
02695
02696
02697 MemSet(values, 0, num_phys_attrs * sizeof(Datum));
02698 MemSet(nulls, true, num_phys_attrs * sizeof(bool));
02699
02700 if (!cstate->binary)
02701 {
02702 char **field_strings;
02703 ListCell *cur;
02704 int fldct;
02705 int fieldno;
02706 char *string;
02707
02708
02709 if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
02710 return false;
02711
02712
02713 if (nfields > 0 && fldct > nfields)
02714 ereport(ERROR,
02715 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
02716 errmsg("extra data after last expected column")));
02717
02718 fieldno = 0;
02719
02720
02721 if (file_has_oids)
02722 {
02723 if (fieldno >= fldct)
02724 ereport(ERROR,
02725 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
02726 errmsg("missing data for OID column")));
02727 string = field_strings[fieldno++];
02728
02729 if (string == NULL)
02730 ereport(ERROR,
02731 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
02732 errmsg("null OID in COPY data")));
02733 else if (cstate->oids && tupleOid != NULL)
02734 {
02735 cstate->cur_attname = "oid";
02736 cstate->cur_attval = string;
02737 *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
02738 CStringGetDatum(string)));
02739 if (*tupleOid == InvalidOid)
02740 ereport(ERROR,
02741 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
02742 errmsg("invalid OID in COPY data")));
02743 cstate->cur_attname = NULL;
02744 cstate->cur_attval = NULL;
02745 }
02746 }
02747
02748
02749 foreach(cur, cstate->attnumlist)
02750 {
02751 int attnum = lfirst_int(cur);
02752 int m = attnum - 1;
02753
02754 if (fieldno >= fldct)
02755 ereport(ERROR,
02756 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
02757 errmsg("missing data for column \"%s\"",
02758 NameStr(attr[m]->attname))));
02759 string = field_strings[fieldno++];
02760
02761 if (cstate->convert_select_flags &&
02762 !cstate->convert_select_flags[m])
02763 {
02764
02765 continue;
02766 }
02767
02768 if (cstate->csv_mode && string == NULL &&
02769 cstate->force_notnull_flags[m])
02770 {
02771
02772 string = cstate->null_print;
02773 }
02774
02775 cstate->cur_attname = NameStr(attr[m]->attname);
02776 cstate->cur_attval = string;
02777 values[m] = InputFunctionCall(&in_functions[m],
02778 string,
02779 typioparams[m],
02780 attr[m]->atttypmod);
02781 if (string != NULL)
02782 nulls[m] = false;
02783 cstate->cur_attname = NULL;
02784 cstate->cur_attval = NULL;
02785 }
02786
02787 Assert(fieldno == nfields);
02788 }
02789 else
02790 {
02791
02792 int16 fld_count;
02793 ListCell *cur;
02794
02795 cstate->cur_lineno++;
02796
02797 if (!CopyGetInt16(cstate, &fld_count))
02798 {
02799
02800 return false;
02801 }
02802
02803 if (fld_count == -1)
02804 {
02805
02806
02807
02808
02809
02810
02811
02812
02813
02814
02815
02816
02817 char dummy;
02818
02819 if (cstate->copy_dest != COPY_OLD_FE &&
02820 CopyGetData(cstate, &dummy, 1, 1) > 0)
02821 ereport(ERROR,
02822 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
02823 errmsg("received copy data after EOF marker")));
02824 return false;
02825 }
02826
02827 if (fld_count != attr_count)
02828 ereport(ERROR,
02829 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
02830 errmsg("row field count is %d, expected %d",
02831 (int) fld_count, attr_count)));
02832
02833 if (file_has_oids)
02834 {
02835 Oid loaded_oid;
02836
02837 cstate->cur_attname = "oid";
02838 loaded_oid =
02839 DatumGetObjectId(CopyReadBinaryAttribute(cstate,
02840 0,
02841 &cstate->oid_in_function,
02842 cstate->oid_typioparam,
02843 -1,
02844 &isnull));
02845 if (isnull || loaded_oid == InvalidOid)
02846 ereport(ERROR,
02847 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
02848 errmsg("invalid OID in COPY data")));
02849 cstate->cur_attname = NULL;
02850 if (cstate->oids && tupleOid != NULL)
02851 *tupleOid = loaded_oid;
02852 }
02853
02854 i = 0;
02855 foreach(cur, cstate->attnumlist)
02856 {
02857 int attnum = lfirst_int(cur);
02858 int m = attnum - 1;
02859
02860 cstate->cur_attname = NameStr(attr[m]->attname);
02861 i++;
02862 values[m] = CopyReadBinaryAttribute(cstate,
02863 i,
02864 &in_functions[m],
02865 typioparams[m],
02866 attr[m]->atttypmod,
02867 &nulls[m]);
02868 cstate->cur_attname = NULL;
02869 }
02870 }
02871
02872
02873
02874
02875
02876
02877 for (i = 0; i < num_defaults; i++)
02878 {
02879
02880
02881
02882
02883 Assert(econtext != NULL);
02884 Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
02885
02886 values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
02887 &nulls[defmap[i]], NULL);
02888 }
02889
02890 return true;
02891 }
02892
02893
02894
02895
02896 void
02897 EndCopyFrom(CopyState cstate)
02898 {
02899
02900
02901 EndCopy(cstate);
02902 }
02903
02904
02905
02906
02907
02908
02909
02910
02911
02912 static bool
02913 CopyReadLine(CopyState cstate)
02914 {
02915 bool result;
02916
02917 resetStringInfo(&cstate->line_buf);
02918
02919
02920 cstate->line_buf_converted = false;
02921
02922
02923 result = CopyReadLineText(cstate);
02924
02925 if (result)
02926 {
02927
02928
02929
02930
02931
02932 if (cstate->copy_dest == COPY_NEW_FE)
02933 {
02934 do
02935 {
02936 cstate->raw_buf_index = cstate->raw_buf_len;
02937 } while (CopyLoadRawBuf(cstate));
02938 }
02939 }
02940 else
02941 {
02942
02943
02944
02945
02946 switch (cstate->eol_type)
02947 {
02948 case EOL_NL:
02949 Assert(cstate->line_buf.len >= 1);
02950 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
02951 cstate->line_buf.len--;
02952 cstate->line_buf.data[cstate->line_buf.len] = '\0';
02953 break;
02954 case EOL_CR:
02955 Assert(cstate->line_buf.len >= 1);
02956 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
02957 cstate->line_buf.len--;
02958 cstate->line_buf.data[cstate->line_buf.len] = '\0';
02959 break;
02960 case EOL_CRNL:
02961 Assert(cstate->line_buf.len >= 2);
02962 Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
02963 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
02964 cstate->line_buf.len -= 2;
02965 cstate->line_buf.data[cstate->line_buf.len] = '\0';
02966 break;
02967 case EOL_UNKNOWN:
02968
02969 Assert(false);
02970 break;
02971 }
02972 }
02973
02974
02975 if (cstate->need_transcoding)
02976 {
02977 char *cvt;
02978
02979 cvt = pg_any_to_server(cstate->line_buf.data,
02980 cstate->line_buf.len,
02981 cstate->file_encoding);
02982 if (cvt != cstate->line_buf.data)
02983 {
02984
02985 resetStringInfo(&cstate->line_buf);
02986 appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
02987 pfree(cvt);
02988 }
02989 }
02990
02991
02992 cstate->line_buf_converted = true;
02993
02994 return result;
02995 }
02996
02997
02998
02999
03000 static bool
03001 CopyReadLineText(CopyState cstate)
03002 {
03003 char *copy_raw_buf;
03004 int raw_buf_ptr;
03005 int copy_buf_len;
03006 bool need_data = false;
03007 bool hit_eof = false;
03008 bool result = false;
03009 char mblen_str[2];
03010
03011
03012 bool first_char_in_line = true;
03013 bool in_quote = false,
03014 last_was_esc = false;
03015 char quotec = '\0';
03016 char escapec = '\0';
03017
03018 if (cstate->csv_mode)
03019 {
03020 quotec = cstate->quote[0];
03021 escapec = cstate->escape[0];
03022
03023 if (quotec == escapec)
03024 escapec = '\0';
03025 }
03026
03027 mblen_str[1] = '\0';
03028
03029
03030
03031
03032
03033
03034
03035
03036
03037
03038
03039
03040
03041
03042
03043
03044
03045
03046
03047
03048
03049
03050 copy_raw_buf = cstate->raw_buf;
03051 raw_buf_ptr = cstate->raw_buf_index;
03052 copy_buf_len = cstate->raw_buf_len;
03053
03054 for (;;)
03055 {
03056 int prev_raw_ptr;
03057 char c;
03058
03059
03060
03061
03062
03063
03064
03065
03066
03067
03068
03069 if (raw_buf_ptr >= copy_buf_len || need_data)
03070 {
03071 REFILL_LINEBUF;
03072
03073
03074
03075
03076
03077 if (!CopyLoadRawBuf(cstate))
03078 hit_eof = true;
03079 raw_buf_ptr = 0;
03080 copy_buf_len = cstate->raw_buf_len;
03081
03082
03083
03084
03085
03086 if (copy_buf_len <= 0)
03087 {
03088 result = true;
03089 break;
03090 }
03091 need_data = false;
03092 }
03093
03094
03095 prev_raw_ptr = raw_buf_ptr;
03096 c = copy_raw_buf[raw_buf_ptr++];
03097
03098 if (cstate->csv_mode)
03099 {
03100
03101
03102
03103
03104
03105
03106
03107
03108
03109 if (c == '\\' || c == '\r')
03110 {
03111 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
03112 }
03113
03114
03115
03116
03117
03118
03119
03120
03121
03122 if (in_quote && c == escapec)
03123 last_was_esc = !last_was_esc;
03124 if (c == quotec && !last_was_esc)
03125 in_quote = !in_quote;
03126 if (c != escapec)
03127 last_was_esc = false;
03128
03129
03130
03131
03132
03133
03134
03135 if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
03136 cstate->cur_lineno++;
03137 }
03138
03139
03140 if (c == '\r' && (!cstate->csv_mode || !in_quote))
03141 {
03142
03143 if (cstate->eol_type == EOL_UNKNOWN ||
03144 cstate->eol_type == EOL_CRNL)
03145 {
03146
03147
03148
03149
03150
03151
03152 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
03153
03154
03155 c = copy_raw_buf[raw_buf_ptr];
03156
03157 if (c == '\n')
03158 {
03159 raw_buf_ptr++;
03160 cstate->eol_type = EOL_CRNL;
03161 }
03162 else
03163 {
03164
03165 if (cstate->eol_type == EOL_CRNL)
03166 ereport(ERROR,
03167 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
03168 !cstate->csv_mode ?
03169 errmsg("literal carriage return found in data") :
03170 errmsg("unquoted carriage return found in data"),
03171 !cstate->csv_mode ?
03172 errhint("Use \"\\r\" to represent carriage return.") :
03173 errhint("Use quoted CSV field to represent carriage return.")));
03174
03175
03176
03177
03178
03179 cstate->eol_type = EOL_CR;
03180 }
03181 }
03182 else if (cstate->eol_type == EOL_NL)
03183 ereport(ERROR,
03184 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
03185 !cstate->csv_mode ?
03186 errmsg("literal carriage return found in data") :
03187 errmsg("unquoted carriage return found in data"),
03188 !cstate->csv_mode ?
03189 errhint("Use \"\\r\" to represent carriage return.") :
03190 errhint("Use quoted CSV field to represent carriage return.")));
03191
03192 break;
03193 }
03194
03195
03196 if (c == '\n' && (!cstate->csv_mode || !in_quote))
03197 {
03198 if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
03199 ereport(ERROR,
03200 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
03201 !cstate->csv_mode ?
03202 errmsg("literal newline found in data") :
03203 errmsg("unquoted newline found in data"),
03204 !cstate->csv_mode ?
03205 errhint("Use \"\\n\" to represent newline.") :
03206 errhint("Use quoted CSV field to represent newline.")));
03207 cstate->eol_type = EOL_NL;
03208
03209 break;
03210 }
03211
03212
03213
03214
03215
03216 if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
03217 {
03218 char c2;
03219
03220 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
03221 IF_NEED_REFILL_AND_EOF_BREAK(0);
03222
03223
03224
03225
03226
03227
03228
03229 c2 = copy_raw_buf[raw_buf_ptr];
03230
03231 if (c2 == '.')
03232 {
03233 raw_buf_ptr++;
03234
03235
03236
03237
03238
03239
03240 if (cstate->eol_type == EOL_CRNL)
03241 {
03242
03243 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
03244
03245 c2 = copy_raw_buf[raw_buf_ptr++];
03246
03247 if (c2 == '\n')
03248 {
03249 if (!cstate->csv_mode)
03250 ereport(ERROR,
03251 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
03252 errmsg("end-of-copy marker does not match previous newline style")));
03253 else
03254 NO_END_OF_COPY_GOTO;
03255 }
03256 else if (c2 != '\r')
03257 {
03258 if (!cstate->csv_mode)
03259 ereport(ERROR,
03260 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
03261 errmsg("end-of-copy marker corrupt")));
03262 else
03263 NO_END_OF_COPY_GOTO;
03264 }
03265 }
03266
03267
03268 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
03269
03270 c2 = copy_raw_buf[raw_buf_ptr++];
03271
03272 if (c2 != '\r' && c2 != '\n')
03273 {
03274 if (!cstate->csv_mode)
03275 ereport(ERROR,
03276 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
03277 errmsg("end-of-copy marker corrupt")));
03278 else
03279 NO_END_OF_COPY_GOTO;
03280 }
03281
03282 if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
03283 (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
03284 (cstate->eol_type == EOL_CR && c2 != '\r'))
03285 {
03286 ereport(ERROR,
03287 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
03288 errmsg("end-of-copy marker does not match previous newline style")));
03289 }
03290
03291
03292
03293
03294
03295 if (prev_raw_ptr > cstate->raw_buf_index)
03296 appendBinaryStringInfo(&cstate->line_buf,
03297 cstate->raw_buf + cstate->raw_buf_index,
03298 prev_raw_ptr - cstate->raw_buf_index);
03299 cstate->raw_buf_index = raw_buf_ptr;
03300 result = true;
03301 break;
03302 }
03303 else if (!cstate->csv_mode)
03304
03305
03306
03307
03308
03309
03310
03311
03312
03313
03314
03315
03316 raw_buf_ptr++;
03317 }
03318
03319
03320
03321
03322
03323
03324
03325 not_end_of_copy:
03326
03327
03328
03329
03330
03331
03332
03333
03334 if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
03335 {
03336 int mblen;
03337
03338 mblen_str[0] = c;
03339
03340 mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
03341 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
03342 IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
03343 raw_buf_ptr += mblen - 1;
03344 }
03345 first_char_in_line = false;
03346 }
03347
03348
03349
03350
03351 REFILL_LINEBUF;
03352
03353 return result;
03354 }
03355
03356
03357
03358
03359 static int
03360 GetDecimalFromHex(char hex)
03361 {
03362 if (isdigit((unsigned char) hex))
03363 return hex - '0';
03364 else
03365 return tolower((unsigned char) hex) - 'a' + 10;
03366 }
03367
03368
03369
03370
03371
03372
03373
03374
03375
03376
03377
03378
03379
03380
03381
03382
03383
03384
03385
03386
03387 static int
03388 CopyReadAttributesText(CopyState cstate)
03389 {
03390 char delimc = cstate->delim[0];
03391 int fieldno;
03392 char *output_ptr;
03393 char *cur_ptr;
03394 char *line_end_ptr;
03395
03396
03397
03398
03399
03400 if (cstate->max_fields <= 0)
03401 {
03402 if (cstate->line_buf.len != 0)
03403 ereport(ERROR,
03404 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
03405 errmsg("extra data after last expected column")));
03406 return 0;
03407 }
03408
03409 resetStringInfo(&cstate->attribute_buf);
03410
03411
03412
03413
03414
03415
03416
03417
03418 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
03419 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
03420 output_ptr = cstate->attribute_buf.data;
03421
03422
03423 cur_ptr = cstate->line_buf.data;
03424 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
03425
03426
03427 fieldno = 0;
03428 for (;;)
03429 {
03430 bool found_delim = false;
03431 char *start_ptr;
03432 char *end_ptr;
03433 int input_len;
03434 bool saw_non_ascii = false;
03435
03436
03437 if (fieldno >= cstate->max_fields)
03438 {
03439 cstate->max_fields *= 2;
03440 cstate->raw_fields =
03441 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
03442 }
03443
03444
03445 start_ptr = cur_ptr;
03446 cstate->raw_fields[fieldno] = output_ptr;
03447
03448
03449
03450
03451
03452
03453
03454
03455
03456
03457
03458
03459 for (;;)
03460 {
03461 char c;
03462
03463 end_ptr = cur_ptr;
03464 if (cur_ptr >= line_end_ptr)
03465 break;
03466 c = *cur_ptr++;
03467 if (c == delimc)
03468 {
03469 found_delim = true;
03470 break;
03471 }
03472 if (c == '\\')
03473 {
03474 if (cur_ptr >= line_end_ptr)
03475 break;
03476 c = *cur_ptr++;
03477 switch (c)
03478 {
03479 case '0':
03480 case '1':
03481 case '2':
03482 case '3':
03483 case '4':
03484 case '5':
03485 case '6':
03486 case '7':
03487 {
03488
03489 int val;
03490
03491 val = OCTVALUE(c);
03492 if (cur_ptr < line_end_ptr)
03493 {
03494 c = *cur_ptr;
03495 if (ISOCTAL(c))
03496 {
03497 cur_ptr++;
03498 val = (val << 3) + OCTVALUE(c);
03499 if (cur_ptr < line_end_ptr)
03500 {
03501 c = *cur_ptr;
03502 if (ISOCTAL(c))
03503 {
03504 cur_ptr++;
03505 val = (val << 3) + OCTVALUE(c);
03506 }
03507 }
03508 }
03509 }
03510 c = val & 0377;
03511 if (c == '\0' || IS_HIGHBIT_SET(c))
03512 saw_non_ascii = true;
03513 }
03514 break;
03515 case 'x':
03516
03517 if (cur_ptr < line_end_ptr)
03518 {
03519 char hexchar = *cur_ptr;
03520
03521 if (isxdigit((unsigned char) hexchar))
03522 {
03523 int val = GetDecimalFromHex(hexchar);
03524
03525 cur_ptr++;
03526 if (cur_ptr < line_end_ptr)
03527 {
03528 hexchar = *cur_ptr;
03529 if (isxdigit((unsigned char) hexchar))
03530 {
03531 cur_ptr++;
03532 val = (val << 4) + GetDecimalFromHex(hexchar);
03533 }
03534 }
03535 c = val & 0xff;
03536 if (c == '\0' || IS_HIGHBIT_SET(c))
03537 saw_non_ascii = true;
03538 }
03539 }
03540 break;
03541 case 'b':
03542 c = '\b';
03543 break;
03544 case 'f':
03545 c = '\f';
03546 break;
03547 case 'n':
03548 c = '\n';
03549 break;
03550 case 'r':
03551 c = '\r';
03552 break;
03553 case 't':
03554 c = '\t';
03555 break;
03556 case 'v':
03557 c = '\v';
03558 break;
03559
03560
03561
03562
03563
03564 }
03565 }
03566
03567
03568 *output_ptr++ = c;
03569 }
03570
03571
03572 input_len = end_ptr - start_ptr;
03573 if (input_len == cstate->null_print_len &&
03574 strncmp(start_ptr, cstate->null_print, input_len) == 0)
03575 cstate->raw_fields[fieldno] = NULL;
03576 else
03577 {
03578
03579
03580
03581
03582
03583
03584 if (saw_non_ascii)
03585 {
03586 char *fld = cstate->raw_fields[fieldno];
03587
03588 pg_verifymbstr(fld, output_ptr - fld, false);
03589 }
03590 }
03591
03592
03593 *output_ptr++ = '\0';
03594
03595 fieldno++;
03596
03597 if (!found_delim)
03598 break;
03599 }
03600
03601
03602 output_ptr--;
03603 Assert(*output_ptr == '\0');
03604 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
03605
03606 return fieldno;
03607 }
03608
03609
03610
03611
03612
03613
03614
03615 static int
03616 CopyReadAttributesCSV(CopyState cstate)
03617 {
03618 char delimc = cstate->delim[0];
03619 char quotec = cstate->quote[0];
03620 char escapec = cstate->escape[0];
03621 int fieldno;
03622 char *output_ptr;
03623 char *cur_ptr;
03624 char *line_end_ptr;
03625
03626
03627
03628
03629
03630 if (cstate->max_fields <= 0)
03631 {
03632 if (cstate->line_buf.len != 0)
03633 ereport(ERROR,
03634 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
03635 errmsg("extra data after last expected column")));
03636 return 0;
03637 }
03638
03639 resetStringInfo(&cstate->attribute_buf);
03640
03641
03642
03643
03644
03645
03646
03647
03648 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
03649 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
03650 output_ptr = cstate->attribute_buf.data;
03651
03652
03653 cur_ptr = cstate->line_buf.data;
03654 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
03655
03656
03657 fieldno = 0;
03658 for (;;)
03659 {
03660 bool found_delim = false;
03661 bool saw_quote = false;
03662 char *start_ptr;
03663 char *end_ptr;
03664 int input_len;
03665
03666
03667 if (fieldno >= cstate->max_fields)
03668 {
03669 cstate->max_fields *= 2;
03670 cstate->raw_fields =
03671 repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
03672 }
03673
03674
03675 start_ptr = cur_ptr;
03676 cstate->raw_fields[fieldno] = output_ptr;
03677
03678
03679
03680
03681
03682
03683
03684
03685 for (;;)
03686 {
03687 char c;
03688
03689
03690 for (;;)
03691 {
03692 end_ptr = cur_ptr;
03693 if (cur_ptr >= line_end_ptr)
03694 goto endfield;
03695 c = *cur_ptr++;
03696
03697 if (c == delimc)
03698 {
03699 found_delim = true;
03700 goto endfield;
03701 }
03702
03703 if (c == quotec)
03704 {
03705 saw_quote = true;
03706 break;
03707 }
03708
03709 *output_ptr++ = c;
03710 }
03711
03712
03713 for (;;)
03714 {
03715 end_ptr = cur_ptr;
03716 if (cur_ptr >= line_end_ptr)
03717 ereport(ERROR,
03718 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
03719 errmsg("unterminated CSV quoted field")));
03720
03721 c = *cur_ptr++;
03722
03723
03724 if (c == escapec)
03725 {
03726
03727
03728
03729
03730 if (cur_ptr < line_end_ptr)
03731 {
03732 char nextc = *cur_ptr;
03733
03734 if (nextc == escapec || nextc == quotec)
03735 {
03736 *output_ptr++ = nextc;
03737 cur_ptr++;
03738 continue;
03739 }
03740 }
03741 }
03742
03743
03744
03745
03746
03747
03748 if (c == quotec)
03749 break;
03750
03751
03752 *output_ptr++ = c;
03753 }
03754 }
03755 endfield:
03756
03757
03758 *output_ptr++ = '\0';
03759
03760
03761 input_len = end_ptr - start_ptr;
03762 if (!saw_quote && input_len == cstate->null_print_len &&
03763 strncmp(start_ptr, cstate->null_print, input_len) == 0)
03764 cstate->raw_fields[fieldno] = NULL;
03765
03766 fieldno++;
03767
03768 if (!found_delim)
03769 break;
03770 }
03771
03772
03773 output_ptr--;
03774 Assert(*output_ptr == '\0');
03775 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
03776
03777 return fieldno;
03778 }
03779
03780
03781
03782
03783
03784 static Datum
03785 CopyReadBinaryAttribute(CopyState cstate,
03786 int column_no, FmgrInfo *flinfo,
03787 Oid typioparam, int32 typmod,
03788 bool *isnull)
03789 {
03790 int32 fld_size;
03791 Datum result;
03792
03793 if (!CopyGetInt32(cstate, &fld_size))
03794 ereport(ERROR,
03795 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
03796 errmsg("unexpected EOF in COPY data")));
03797 if (fld_size == -1)
03798 {
03799 *isnull = true;
03800 return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
03801 }
03802 if (fld_size < 0)
03803 ereport(ERROR,
03804 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
03805 errmsg("invalid field size")));
03806
03807
03808 resetStringInfo(&cstate->attribute_buf);
03809
03810 enlargeStringInfo(&cstate->attribute_buf, fld_size);
03811 if (CopyGetData(cstate, cstate->attribute_buf.data,
03812 fld_size, fld_size) != fld_size)
03813 ereport(ERROR,
03814 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
03815 errmsg("unexpected EOF in COPY data")));
03816
03817 cstate->attribute_buf.len = fld_size;
03818 cstate->attribute_buf.data[fld_size] = '\0';
03819
03820
03821 result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
03822 typioparam, typmod);
03823
03824
03825 if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
03826 ereport(ERROR,
03827 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
03828 errmsg("incorrect binary data format")));
03829
03830 *isnull = false;
03831 return result;
03832 }
03833
03834
03835
03836
03837 #define DUMPSOFAR() \
03838 do { \
03839 if (ptr > start) \
03840 CopySendData(cstate, start, ptr - start); \
03841 } while (0)
03842
03843 static void
03844 CopyAttributeOutText(CopyState cstate, char *string)
03845 {
03846 char *ptr;
03847 char *start;
03848 char c;
03849 char delimc = cstate->delim[0];
03850
03851 if (cstate->need_transcoding)
03852 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
03853 else
03854 ptr = string;
03855
03856
03857
03858
03859
03860
03861
03862
03863
03864
03865
03866
03867
03868
03869
03870 if (cstate->encoding_embeds_ascii)
03871 {
03872 start = ptr;
03873 while ((c = *ptr) != '\0')
03874 {
03875 if ((unsigned char) c < (unsigned char) 0x20)
03876 {
03877
03878
03879
03880
03881
03882
03883
03884 switch (c)
03885 {
03886 case '\b':
03887 c = 'b';
03888 break;
03889 case '\f':
03890 c = 'f';
03891 break;
03892 case '\n':
03893 c = 'n';
03894 break;
03895 case '\r':
03896 c = 'r';
03897 break;
03898 case '\t':
03899 c = 't';
03900 break;
03901 case '\v':
03902 c = 'v';
03903 break;
03904 default:
03905
03906 if (c == delimc)
03907 break;
03908
03909 ptr++;
03910 continue;
03911 }
03912
03913 DUMPSOFAR();
03914 CopySendChar(cstate, '\\');
03915 CopySendChar(cstate, c);
03916 start = ++ptr;
03917 }
03918 else if (c == '\\' || c == delimc)
03919 {
03920 DUMPSOFAR();
03921 CopySendChar(cstate, '\\');
03922 start = ptr++;
03923 }
03924 else if (IS_HIGHBIT_SET(c))
03925 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
03926 else
03927 ptr++;
03928 }
03929 }
03930 else
03931 {
03932 start = ptr;
03933 while ((c = *ptr) != '\0')
03934 {
03935 if ((unsigned char) c < (unsigned char) 0x20)
03936 {
03937
03938
03939
03940
03941
03942
03943
03944 switch (c)
03945 {
03946 case '\b':
03947 c = 'b';
03948 break;
03949 case '\f':
03950 c = 'f';
03951 break;
03952 case '\n':
03953 c = 'n';
03954 break;
03955 case '\r':
03956 c = 'r';
03957 break;
03958 case '\t':
03959 c = 't';
03960 break;
03961 case '\v':
03962 c = 'v';
03963 break;
03964 default:
03965
03966 if (c == delimc)
03967 break;
03968
03969 ptr++;
03970 continue;
03971 }
03972
03973 DUMPSOFAR();
03974 CopySendChar(cstate, '\\');
03975 CopySendChar(cstate, c);
03976 start = ++ptr;
03977 }
03978 else if (c == '\\' || c == delimc)
03979 {
03980 DUMPSOFAR();
03981 CopySendChar(cstate, '\\');
03982 start = ptr++;
03983 }
03984 else
03985 ptr++;
03986 }
03987 }
03988
03989 DUMPSOFAR();
03990 }
03991
03992
03993
03994
03995
03996 static void
03997 CopyAttributeOutCSV(CopyState cstate, char *string,
03998 bool use_quote, bool single_attr)
03999 {
04000 char *ptr;
04001 char *start;
04002 char c;
04003 char delimc = cstate->delim[0];
04004 char quotec = cstate->quote[0];
04005 char escapec = cstate->escape[0];
04006
04007
04008 if (!use_quote && strcmp(string, cstate->null_print) == 0)
04009 use_quote = true;
04010
04011 if (cstate->need_transcoding)
04012 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
04013 else
04014 ptr = string;
04015
04016
04017
04018
04019 if (!use_quote)
04020 {
04021
04022
04023
04024
04025 if (single_attr && strcmp(ptr, "\\.") == 0)
04026 use_quote = true;
04027 else
04028 {
04029 char *tptr = ptr;
04030
04031 while ((c = *tptr) != '\0')
04032 {
04033 if (c == delimc || c == quotec || c == '\n' || c == '\r')
04034 {
04035 use_quote = true;
04036 break;
04037 }
04038 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
04039 tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
04040 else
04041 tptr++;
04042 }
04043 }
04044 }
04045
04046 if (use_quote)
04047 {
04048 CopySendChar(cstate, quotec);
04049
04050
04051
04052
04053 start = ptr;
04054 while ((c = *ptr) != '\0')
04055 {
04056 if (c == quotec || c == escapec)
04057 {
04058 DUMPSOFAR();
04059 CopySendChar(cstate, escapec);
04060 start = ptr;
04061 }
04062 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
04063 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
04064 else
04065 ptr++;
04066 }
04067 DUMPSOFAR();
04068
04069 CopySendChar(cstate, quotec);
04070 }
04071 else
04072 {
04073
04074 CopySendString(cstate, ptr);
04075 }
04076 }
04077
04078
04079
04080
04081
04082
04083
04084
04085
04086
04087 static List *
04088 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
04089 {
04090 List *attnums = NIL;
04091
04092 if (attnamelist == NIL)
04093 {
04094
04095 Form_pg_attribute *attr = tupDesc->attrs;
04096 int attr_count = tupDesc->natts;
04097 int i;
04098
04099 for (i = 0; i < attr_count; i++)
04100 {
04101 if (attr[i]->attisdropped)
04102 continue;
04103 attnums = lappend_int(attnums, i + 1);
04104 }
04105 }
04106 else
04107 {
04108
04109 ListCell *l;
04110
04111 foreach(l, attnamelist)
04112 {
04113 char *name = strVal(lfirst(l));
04114 int attnum;
04115 int i;
04116
04117
04118 attnum = InvalidAttrNumber;
04119 for (i = 0; i < tupDesc->natts; i++)
04120 {
04121 if (tupDesc->attrs[i]->attisdropped)
04122 continue;
04123 if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
04124 {
04125 attnum = tupDesc->attrs[i]->attnum;
04126 break;
04127 }
04128 }
04129 if (attnum == InvalidAttrNumber)
04130 {
04131 if (rel != NULL)
04132 ereport(ERROR,
04133 (errcode(ERRCODE_UNDEFINED_COLUMN),
04134 errmsg("column \"%s\" of relation \"%s\" does not exist",
04135 name, RelationGetRelationName(rel))));
04136 else
04137 ereport(ERROR,
04138 (errcode(ERRCODE_UNDEFINED_COLUMN),
04139 errmsg("column \"%s\" does not exist",
04140 name)));
04141 }
04142
04143 if (list_member_int(attnums, attnum))
04144 ereport(ERROR,
04145 (errcode(ERRCODE_DUPLICATE_COLUMN),
04146 errmsg("column \"%s\" specified more than once",
04147 name)));
04148 attnums = lappend_int(attnums, attnum);
04149 }
04150 }
04151
04152 return attnums;
04153 }
04154
04155
04156
04157
04158
04159 static void
04160 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
04161 {
04162
04163 }
04164
04165
04166
04167
04168 static void
04169 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
04170 {
04171 DR_copy *myState = (DR_copy *) self;
04172 CopyState cstate = myState->cstate;
04173
04174
04175 slot_getallattrs(slot);
04176
04177
04178 CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
04179 myState->processed++;
04180 }
04181
04182
04183
04184
04185 static void
04186 copy_dest_shutdown(DestReceiver *self)
04187 {
04188
04189 }
04190
04191
04192
04193
04194 static void
04195 copy_dest_destroy(DestReceiver *self)
04196 {
04197 pfree(self);
04198 }
04199
04200
04201
04202
04203 DestReceiver *
04204 CreateCopyDestReceiver(void)
04205 {
04206 DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
04207
04208 self->pub.receiveSlot = copy_dest_receive;
04209 self->pub.rStartup = copy_dest_startup;
04210 self->pub.rShutdown = copy_dest_shutdown;
04211 self->pub.rDestroy = copy_dest_destroy;
04212 self->pub.mydest = DestCopyOut;
04213
04214 self->cstate = NULL;
04215 self->processed = 0;
04216
04217 return (DestReceiver *) self;
04218 }