Force a checkpoint in CREATE DATABASE before starting to copy the files,
[PostgreSQL.git] / src / backend / commands / copy.c
blobbe91132a3ec37a2a6b0013a4d7ea76dee16c64a2
1 /*-------------------------------------------------------------------------
3 * copy.c
4 * Implements the COPY utility command
6 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * $PostgreSQL$
13 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
23 #include "access/heapam.h"
24 #include "access/xact.h"
25 #include "catalog/namespace.h"
26 #include "catalog/pg_type.h"
27 #include "commands/copy.h"
28 #include "commands/trigger.h"
29 #include "executor/executor.h"
30 #include "libpq/libpq.h"
31 #include "libpq/pqformat.h"
32 #include "mb/pg_wchar.h"
33 #include "miscadmin.h"
34 #include "optimizer/planner.h"
35 #include "parser/parse_relation.h"
36 #include "rewrite/rewriteHandler.h"
37 #include "storage/fd.h"
38 #include "tcop/tcopprot.h"
39 #include "utils/acl.h"
40 #include "utils/builtins.h"
41 #include "utils/lsyscache.h"
42 #include "utils/memutils.h"
43 #include "utils/snapmgr.h"
46 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
47 #define OCTVALUE(c) ((c) - '0')
50 * Represents the different source/dest cases we need to worry about at
51 * the bottom level
53 typedef enum CopyDest
55 COPY_FILE, /* to/from file */
56 COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
57 COPY_NEW_FE /* to/from frontend (3.0 protocol) */
58 } CopyDest;
61 * Represents the end-of-line terminator type of the input
63 typedef enum EolType
65 EOL_UNKNOWN,
66 EOL_NL,
67 EOL_CR,
68 EOL_CRNL
69 } EolType;
72 * This struct contains all the state variables used throughout a COPY
73 * operation. For simplicity, we use the same struct for all variants of COPY,
74 * even though some fields are used in only some cases.
76 * Multi-byte encodings: all supported client-side encodings encode multi-byte
77 * characters by having the first byte's high bit set. Subsequent bytes of the
78 * character can have the high bit not set. When scanning data in such an
79 * encoding to look for a match to a single-byte (ie ASCII) character, we must
80 * use the full pg_encoding_mblen() machinery to skip over multibyte
81 * characters, else we might find a false match to a trailing byte. In
82 * supported server encodings, there is no possibility of a false match, and
83 * it's faster to make useless comparisons to trailing bytes than it is to
84 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
85 * when we have to do it the hard way.
87 typedef struct CopyStateData
89 /* low-level state data */
90 CopyDest copy_dest; /* type of copy source/destination */
91 FILE *copy_file; /* used if copy_dest == COPY_FILE */
92 StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
93 * dest == COPY_NEW_FE in COPY FROM */
94 bool fe_copy; /* true for all FE copy dests */
95 bool fe_eof; /* true if detected end of copy data */
96 EolType eol_type; /* EOL type of input */
97 int client_encoding; /* remote side's character encoding */
98 bool need_transcoding; /* client encoding diff from server? */
99 bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
100 uint64 processed; /* # of tuples processed */
102 /* parameters from the COPY command */
103 Relation rel; /* relation to copy to or from */
104 QueryDesc *queryDesc; /* executable query to copy from */
105 List *attnumlist; /* integer list of attnums to copy */
106 char *filename; /* filename, or NULL for STDIN/STDOUT */
107 bool binary; /* binary format? */
108 bool oids; /* include OIDs? */
109 bool csv_mode; /* Comma Separated Value format? */
110 bool header_line; /* CSV header line? */
111 char *null_print; /* NULL marker string (server encoding!) */
112 int null_print_len; /* length of same */
113 char *null_print_client; /* same converted to client encoding */
114 char *delim; /* column delimiter (must be 1 byte) */
115 char *quote; /* CSV quote char (must be 1 byte) */
116 char *escape; /* CSV escape char (must be 1 byte) */
117 bool *force_quote_flags; /* per-column CSV FQ flags */
118 bool *force_notnull_flags; /* per-column CSV FNN flags */
120 /* these are just for error messages, see copy_in_error_callback */
121 const char *cur_relname; /* table name for error messages */
122 int cur_lineno; /* line number for error messages */
123 const char *cur_attname; /* current att for error messages */
124 const char *cur_attval; /* current att value for error messages */
127 * Working state for COPY TO
129 FmgrInfo *out_functions; /* lookup info for output functions */
130 MemoryContext rowcontext; /* per-row evaluation context */
133 * These variables are used to reduce overhead in textual COPY FROM.
135 * attribute_buf holds the separated, de-escaped text for each field of
136 * the current line. The CopyReadAttributes functions return arrays of
137 * pointers into this buffer. We avoid palloc/pfree overhead by re-using
138 * the buffer on each cycle.
140 StringInfoData attribute_buf;
143 * Similarly, line_buf holds the whole input line being processed. The
144 * input cycle is first to read the whole line into line_buf, convert it
145 * to server encoding there, and then extract the individual attribute
146 * fields into attribute_buf. line_buf is preserved unmodified so that we
147 * can display it in error messages if appropriate.
149 StringInfoData line_buf;
150 bool line_buf_converted; /* converted to server encoding? */
153 * Finally, raw_buf holds raw data read from the data source (file or
154 * client connection). CopyReadLine parses this data sufficiently to
155 * locate line boundaries, then transfers the data to line_buf and
156 * converts it. Note: we guarantee that there is a \0 at
157 * raw_buf[raw_buf_len].
159 #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
160 char *raw_buf;
161 int raw_buf_index; /* next byte to process */
162 int raw_buf_len; /* total # of bytes stored */
163 } CopyStateData;
165 typedef CopyStateData *CopyState;
167 /* DestReceiver for COPY (SELECT) TO */
168 typedef struct
170 DestReceiver pub; /* publicly-known function pointers */
171 CopyState cstate; /* CopyStateData for the command */
172 } DR_copy;
176 * These macros centralize code used to process line_buf and raw_buf buffers.
177 * They are macros because they often do continue/break control and to avoid
178 * function call overhead in tight COPY loops.
180 * We must use "if (1)" because "do {} while(0)" overrides the continue/break
181 * processing. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
185 * This keeps the character read at the top of the loop in the buffer
186 * even if there is more than one read-ahead.
188 #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
189 if (1) \
191 if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
193 raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
194 need_data = true; \
195 continue; \
197 } else
200 /* This consumes the remainder of the buffer and breaks */
201 #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
202 if (1) \
204 if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
206 if (extralen) \
207 raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
208 /* backslash just before EOF, treat as data char */ \
209 result = true; \
210 break; \
212 } else
216 * Transfer any approved data to line_buf; must do this to be sure
217 * there is some room in raw_buf.
219 #define REFILL_LINEBUF \
220 if (1) \
222 if (raw_buf_ptr > cstate->raw_buf_index) \
224 appendBinaryStringInfo(&cstate->line_buf, \
225 cstate->raw_buf + cstate->raw_buf_index, \
226 raw_buf_ptr - cstate->raw_buf_index); \
227 cstate->raw_buf_index = raw_buf_ptr; \
229 } else
231 /* Undo any read-ahead and jump out of the block. */
232 #define NO_END_OF_COPY_GOTO \
233 if (1) \
235 raw_buf_ptr = prev_raw_ptr + 1; \
236 goto not_end_of_copy; \
237 } else
240 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
243 /* non-export function prototypes */
244 static void DoCopyTo(CopyState cstate);
245 static void CopyTo(CopyState cstate);
246 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
247 Datum *values, bool *nulls);
248 static void CopyFrom(CopyState cstate);
249 static bool CopyReadLine(CopyState cstate);
250 static bool CopyReadLineText(CopyState cstate);
251 static int CopyReadAttributesText(CopyState cstate, int maxfields,
252 char **fieldvals);
253 static int CopyReadAttributesCSV(CopyState cstate, int maxfields,
254 char **fieldvals);
255 static Datum CopyReadBinaryAttribute(CopyState cstate,
256 int column_no, FmgrInfo *flinfo,
257 Oid typioparam, int32 typmod,
258 bool *isnull);
259 static void CopyAttributeOutText(CopyState cstate, char *string);
260 static void CopyAttributeOutCSV(CopyState cstate, char *string,
261 bool use_quote, bool single_attr);
262 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
263 List *attnamelist);
264 static char *limit_printout_length(const char *str);
266 /* Low-level communications functions */
267 static void SendCopyBegin(CopyState cstate);
268 static void ReceiveCopyBegin(CopyState cstate);
269 static void SendCopyEnd(CopyState cstate);
270 static void CopySendData(CopyState cstate, void *databuf, int datasize);
271 static void CopySendString(CopyState cstate, const char *str);
272 static void CopySendChar(CopyState cstate, char c);
273 static void CopySendEndOfRow(CopyState cstate);
274 static int CopyGetData(CopyState cstate, void *databuf,
275 int minread, int maxread);
276 static void CopySendInt32(CopyState cstate, int32 val);
277 static bool CopyGetInt32(CopyState cstate, int32 *val);
278 static void CopySendInt16(CopyState cstate, int16 val);
279 static bool CopyGetInt16(CopyState cstate, int16 *val);
283 * Send copy start/stop messages for frontend copies. These have changed
284 * in past protocol redesigns.
286 static void
287 SendCopyBegin(CopyState cstate)
289 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
291 /* new way */
292 StringInfoData buf;
293 int natts = list_length(cstate->attnumlist);
294 int16 format = (cstate->binary ? 1 : 0);
295 int i;
297 pq_beginmessage(&buf, 'H');
298 pq_sendbyte(&buf, format); /* overall format */
299 pq_sendint(&buf, natts, 2);
300 for (i = 0; i < natts; i++)
301 pq_sendint(&buf, format, 2); /* per-column formats */
302 pq_endmessage(&buf);
303 cstate->copy_dest = COPY_NEW_FE;
305 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
307 /* old way */
308 if (cstate->binary)
309 ereport(ERROR,
310 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
311 errmsg("COPY BINARY is not supported to stdout or from stdin")));
312 pq_putemptymessage('H');
313 /* grottiness needed for old COPY OUT protocol */
314 pq_startcopyout();
315 cstate->copy_dest = COPY_OLD_FE;
317 else
319 /* very old way */
320 if (cstate->binary)
321 ereport(ERROR,
322 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
323 errmsg("COPY BINARY is not supported to stdout or from stdin")));
324 pq_putemptymessage('B');
325 /* grottiness needed for old COPY OUT protocol */
326 pq_startcopyout();
327 cstate->copy_dest = COPY_OLD_FE;
331 static void
332 ReceiveCopyBegin(CopyState cstate)
334 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
336 /* new way */
337 StringInfoData buf;
338 int natts = list_length(cstate->attnumlist);
339 int16 format = (cstate->binary ? 1 : 0);
340 int i;
342 pq_beginmessage(&buf, 'G');
343 pq_sendbyte(&buf, format); /* overall format */
344 pq_sendint(&buf, natts, 2);
345 for (i = 0; i < natts; i++)
346 pq_sendint(&buf, format, 2); /* per-column formats */
347 pq_endmessage(&buf);
348 cstate->copy_dest = COPY_NEW_FE;
349 cstate->fe_msgbuf = makeStringInfo();
351 else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
353 /* old way */
354 if (cstate->binary)
355 ereport(ERROR,
356 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
357 errmsg("COPY BINARY is not supported to stdout or from stdin")));
358 pq_putemptymessage('G');
359 cstate->copy_dest = COPY_OLD_FE;
361 else
363 /* very old way */
364 if (cstate->binary)
365 ereport(ERROR,
366 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
367 errmsg("COPY BINARY is not supported to stdout or from stdin")));
368 pq_putemptymessage('D');
369 cstate->copy_dest = COPY_OLD_FE;
371 /* We *must* flush here to ensure FE knows it can send. */
372 pq_flush();
375 static void
376 SendCopyEnd(CopyState cstate)
378 if (cstate->copy_dest == COPY_NEW_FE)
380 /* Shouldn't have any unsent data */
381 Assert(cstate->fe_msgbuf->len == 0);
382 /* Send Copy Done message */
383 pq_putemptymessage('c');
385 else
387 CopySendData(cstate, "\\.", 2);
388 /* Need to flush out the trailer (this also appends a newline) */
389 CopySendEndOfRow(cstate);
390 pq_endcopyout(false);
394 /*----------
395 * CopySendData sends output data to the destination (file or frontend)
396 * CopySendString does the same for null-terminated strings
397 * CopySendChar does the same for single characters
398 * CopySendEndOfRow does the appropriate thing at end of each data row
399 * (data is not actually flushed except by CopySendEndOfRow)
401 * NB: no data conversion is applied by these functions
402 *----------
404 static void
405 CopySendData(CopyState cstate, void *databuf, int datasize)
407 appendBinaryStringInfo(cstate->fe_msgbuf, (char *) databuf, datasize);
410 static void
411 CopySendString(CopyState cstate, const char *str)
413 appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
416 static void
417 CopySendChar(CopyState cstate, char c)
419 appendStringInfoCharMacro(cstate->fe_msgbuf, c);
422 static void
423 CopySendEndOfRow(CopyState cstate)
425 StringInfo fe_msgbuf = cstate->fe_msgbuf;
427 switch (cstate->copy_dest)
429 case COPY_FILE:
430 if (!cstate->binary)
432 /* Default line termination depends on platform */
433 #ifndef WIN32
434 CopySendChar(cstate, '\n');
435 #else
436 CopySendString(cstate, "\r\n");
437 #endif
440 (void) fwrite(fe_msgbuf->data, fe_msgbuf->len,
441 1, cstate->copy_file);
442 if (ferror(cstate->copy_file))
443 ereport(ERROR,
444 (errcode_for_file_access(),
445 errmsg("could not write to COPY file: %m")));
446 break;
447 case COPY_OLD_FE:
448 /* The FE/BE protocol uses \n as newline for all platforms */
449 if (!cstate->binary)
450 CopySendChar(cstate, '\n');
452 if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
454 /* no hope of recovering connection sync, so FATAL */
455 ereport(FATAL,
456 (errcode(ERRCODE_CONNECTION_FAILURE),
457 errmsg("connection lost during COPY to stdout")));
459 break;
460 case COPY_NEW_FE:
461 /* The FE/BE protocol uses \n as newline for all platforms */
462 if (!cstate->binary)
463 CopySendChar(cstate, '\n');
465 /* Dump the accumulated row as one CopyData message */
466 (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
467 break;
470 resetStringInfo(fe_msgbuf);
474 * CopyGetData reads data from the source (file or frontend)
476 * We attempt to read at least minread, and at most maxread, bytes from
477 * the source. The actual number of bytes read is returned; if this is
478 * less than minread, EOF was detected.
480 * Note: when copying from the frontend, we expect a proper EOF mark per
481 * protocol; if the frontend simply drops the connection, we raise error.
482 * It seems unwise to allow the COPY IN to complete normally in that case.
484 * NB: no data conversion is applied here.
486 static int
487 CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
489 int bytesread = 0;
491 switch (cstate->copy_dest)
493 case COPY_FILE:
494 bytesread = fread(databuf, 1, maxread, cstate->copy_file);
495 if (ferror(cstate->copy_file))
496 ereport(ERROR,
497 (errcode_for_file_access(),
498 errmsg("could not read from COPY file: %m")));
499 break;
500 case COPY_OLD_FE:
503 * We cannot read more than minread bytes (which in practice is 1)
504 * because old protocol doesn't have any clear way of separating
505 * the COPY stream from following data. This is slow, but not any
506 * slower than the code path was originally, and we don't care
507 * much anymore about the performance of old protocol.
509 if (pq_getbytes((char *) databuf, minread))
511 /* Only a \. terminator is legal EOF in old protocol */
512 ereport(ERROR,
513 (errcode(ERRCODE_CONNECTION_FAILURE),
514 errmsg("unexpected EOF on client connection")));
516 bytesread = minread;
517 break;
518 case COPY_NEW_FE:
519 while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
521 int avail;
523 while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
525 /* Try to receive another message */
526 int mtype;
528 readmessage:
529 mtype = pq_getbyte();
530 if (mtype == EOF)
531 ereport(ERROR,
532 (errcode(ERRCODE_CONNECTION_FAILURE),
533 errmsg("unexpected EOF on client connection")));
534 if (pq_getmessage(cstate->fe_msgbuf, 0))
535 ereport(ERROR,
536 (errcode(ERRCODE_CONNECTION_FAILURE),
537 errmsg("unexpected EOF on client connection")));
538 switch (mtype)
540 case 'd': /* CopyData */
541 break;
542 case 'c': /* CopyDone */
543 /* COPY IN correctly terminated by frontend */
544 cstate->fe_eof = true;
545 return bytesread;
546 case 'f': /* CopyFail */
547 ereport(ERROR,
548 (errcode(ERRCODE_QUERY_CANCELED),
549 errmsg("COPY from stdin failed: %s",
550 pq_getmsgstring(cstate->fe_msgbuf))));
551 break;
552 case 'H': /* Flush */
553 case 'S': /* Sync */
556 * Ignore Flush/Sync for the convenience of client
557 * libraries (such as libpq) that may send those
558 * without noticing that the command they just
559 * sent was COPY.
561 goto readmessage;
562 default:
563 ereport(ERROR,
564 (errcode(ERRCODE_PROTOCOL_VIOLATION),
565 errmsg("unexpected message type 0x%02X during COPY from stdin",
566 mtype)));
567 break;
570 avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
571 if (avail > maxread)
572 avail = maxread;
573 pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
574 databuf = (void *) ((char *) databuf + avail);
575 maxread -= avail;
576 bytesread += avail;
578 break;
581 return bytesread;
586 * These functions do apply some data conversion
590 * CopySendInt32 sends an int32 in network byte order
592 static void
593 CopySendInt32(CopyState cstate, int32 val)
595 uint32 buf;
597 buf = htonl((uint32) val);
598 CopySendData(cstate, &buf, sizeof(buf));
602 * CopyGetInt32 reads an int32 that appears in network byte order
604 * Returns true if OK, false if EOF
606 static bool
607 CopyGetInt32(CopyState cstate, int32 *val)
609 uint32 buf;
611 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
613 *val = 0; /* suppress compiler warning */
614 return false;
616 *val = (int32) ntohl(buf);
617 return true;
621 * CopySendInt16 sends an int16 in network byte order
623 static void
624 CopySendInt16(CopyState cstate, int16 val)
626 uint16 buf;
628 buf = htons((uint16) val);
629 CopySendData(cstate, &buf, sizeof(buf));
633 * CopyGetInt16 reads an int16 that appears in network byte order
635 static bool
636 CopyGetInt16(CopyState cstate, int16 *val)
638 uint16 buf;
640 if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
642 *val = 0; /* suppress compiler warning */
643 return false;
645 *val = (int16) ntohs(buf);
646 return true;
651 * CopyLoadRawBuf loads some more data into raw_buf
653 * Returns TRUE if able to obtain at least one more byte, else FALSE.
655 * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
656 * down to the start of the buffer and then we load more data after that.
657 * This case is used only when a frontend multibyte character crosses a
658 * bufferload boundary.
660 static bool
661 CopyLoadRawBuf(CopyState cstate)
663 int nbytes;
664 int inbytes;
666 if (cstate->raw_buf_index < cstate->raw_buf_len)
668 /* Copy down the unprocessed data */
669 nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
670 memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
671 nbytes);
673 else
674 nbytes = 0; /* no data need be saved */
676 inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
677 1, RAW_BUF_SIZE - nbytes);
678 nbytes += inbytes;
679 cstate->raw_buf[nbytes] = '\0';
680 cstate->raw_buf_index = 0;
681 cstate->raw_buf_len = nbytes;
682 return (inbytes > 0);
687 * DoCopy executes the SQL COPY statement
689 * Either unload or reload contents of table <relation>, depending on <from>.
690 * (<from> = TRUE means we are inserting into the table.) In the "TO" case
691 * we also support copying the output of an arbitrary SELECT query.
693 * If <pipe> is false, transfer is between the table and the file named
694 * <filename>. Otherwise, transfer is between the table and our regular
695 * input/output stream. The latter could be either stdin/stdout or a
696 * socket, depending on whether we're running under Postmaster control.
698 * Iff <binary>, unload or reload in the binary format, as opposed to the
699 * more wasteful but more robust and portable text format.
701 * Iff <oids>, unload or reload the format that includes OID information.
702 * On input, we accept OIDs whether or not the table has an OID column,
703 * but silently drop them if it does not. On output, we report an error
704 * if the user asks for OIDs in a table that has none (not providing an
705 * OID column might seem friendlier, but could seriously confuse programs).
707 * If in the text format, delimit columns with delimiter <delim> and print
708 * NULL values as <null_print>.
710 * Do not allow a Postgres user without superuser privilege to read from
711 * or write to a file.
713 * Do not allow the copy if user doesn't have proper permission to access
714 * the table.
716 uint64
717 DoCopy(const CopyStmt *stmt, const char *queryString)
719 CopyState cstate;
720 bool is_from = stmt->is_from;
721 bool pipe = (stmt->filename == NULL);
722 List *attnamelist = stmt->attlist;
723 List *force_quote = NIL;
724 List *force_notnull = NIL;
725 AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
726 AclResult aclresult;
727 ListCell *option;
728 TupleDesc tupDesc;
729 int num_phys_attrs;
730 uint64 processed;
732 /* Allocate workspace and zero all fields */
733 cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
735 /* Extract options from the statement node tree */
736 foreach(option, stmt->options)
738 DefElem *defel = (DefElem *) lfirst(option);
740 if (strcmp(defel->defname, "binary") == 0)
742 if (cstate->binary)
743 ereport(ERROR,
744 (errcode(ERRCODE_SYNTAX_ERROR),
745 errmsg("conflicting or redundant options")));
746 cstate->binary = intVal(defel->arg);
748 else if (strcmp(defel->defname, "oids") == 0)
750 if (cstate->oids)
751 ereport(ERROR,
752 (errcode(ERRCODE_SYNTAX_ERROR),
753 errmsg("conflicting or redundant options")));
754 cstate->oids = intVal(defel->arg);
756 else if (strcmp(defel->defname, "delimiter") == 0)
758 if (cstate->delim)
759 ereport(ERROR,
760 (errcode(ERRCODE_SYNTAX_ERROR),
761 errmsg("conflicting or redundant options")));
762 cstate->delim = strVal(defel->arg);
764 else if (strcmp(defel->defname, "null") == 0)
766 if (cstate->null_print)
767 ereport(ERROR,
768 (errcode(ERRCODE_SYNTAX_ERROR),
769 errmsg("conflicting or redundant options")));
770 cstate->null_print = strVal(defel->arg);
772 else if (strcmp(defel->defname, "csv") == 0)
774 if (cstate->csv_mode)
775 ereport(ERROR,
776 (errcode(ERRCODE_SYNTAX_ERROR),
777 errmsg("conflicting or redundant options")));
778 cstate->csv_mode = intVal(defel->arg);
780 else if (strcmp(defel->defname, "header") == 0)
782 if (cstate->header_line)
783 ereport(ERROR,
784 (errcode(ERRCODE_SYNTAX_ERROR),
785 errmsg("conflicting or redundant options")));
786 cstate->header_line = intVal(defel->arg);
788 else if (strcmp(defel->defname, "quote") == 0)
790 if (cstate->quote)
791 ereport(ERROR,
792 (errcode(ERRCODE_SYNTAX_ERROR),
793 errmsg("conflicting or redundant options")));
794 cstate->quote = strVal(defel->arg);
796 else if (strcmp(defel->defname, "escape") == 0)
798 if (cstate->escape)
799 ereport(ERROR,
800 (errcode(ERRCODE_SYNTAX_ERROR),
801 errmsg("conflicting or redundant options")));
802 cstate->escape = strVal(defel->arg);
804 else if (strcmp(defel->defname, "force_quote") == 0)
806 if (force_quote)
807 ereport(ERROR,
808 (errcode(ERRCODE_SYNTAX_ERROR),
809 errmsg("conflicting or redundant options")));
810 force_quote = (List *) defel->arg;
812 else if (strcmp(defel->defname, "force_notnull") == 0)
814 if (force_notnull)
815 ereport(ERROR,
816 (errcode(ERRCODE_SYNTAX_ERROR),
817 errmsg("conflicting or redundant options")));
818 force_notnull = (List *) defel->arg;
820 else
821 elog(ERROR, "option \"%s\" not recognized",
822 defel->defname);
825 /* Check for incompatible options */
826 if (cstate->binary && cstate->delim)
827 ereport(ERROR,
828 (errcode(ERRCODE_SYNTAX_ERROR),
829 errmsg("cannot specify DELIMITER in BINARY mode")));
831 if (cstate->binary && cstate->csv_mode)
832 ereport(ERROR,
833 (errcode(ERRCODE_SYNTAX_ERROR),
834 errmsg("cannot specify CSV in BINARY mode")));
836 if (cstate->binary && cstate->null_print)
837 ereport(ERROR,
838 (errcode(ERRCODE_SYNTAX_ERROR),
839 errmsg("cannot specify NULL in BINARY mode")));
841 /* Set defaults for omitted options */
842 if (!cstate->delim)
843 cstate->delim = cstate->csv_mode ? "," : "\t";
845 if (!cstate->null_print)
846 cstate->null_print = cstate->csv_mode ? "" : "\\N";
847 cstate->null_print_len = strlen(cstate->null_print);
849 if (cstate->csv_mode)
851 if (!cstate->quote)
852 cstate->quote = "\"";
853 if (!cstate->escape)
854 cstate->escape = cstate->quote;
857 /* Only single-character delimiter strings are supported. */
858 if (strlen(cstate->delim) != 1)
859 ereport(ERROR,
860 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
861 errmsg("COPY delimiter must be a single ASCII character")));
863 /* Disallow end-of-line characters */
864 if (strchr(cstate->delim, '\r') != NULL ||
865 strchr(cstate->delim, '\n') != NULL)
866 ereport(ERROR,
867 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
868 errmsg("COPY delimiter cannot be newline or carriage return")));
870 if (strchr(cstate->null_print, '\r') != NULL ||
871 strchr(cstate->null_print, '\n') != NULL)
872 ereport(ERROR,
873 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
874 errmsg("COPY null representation cannot use newline or carriage return")));
877 * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
878 * backslash because it would be ambiguous. We can't allow the other
879 * cases because data characters matching the delimiter must be
880 * backslashed, and certain backslash combinations are interpreted
881 * non-literally by COPY IN. Disallowing all lower case ASCII letters
882 * is more than strictly necessary, but seems best for consistency and
883 * future-proofing. Likewise we disallow all digits though only octal
884 * digits are actually dangerous.
886 if (!cstate->csv_mode &&
887 strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
888 cstate->delim[0]) != NULL)
889 ereport(ERROR,
890 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
891 errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
893 /* Check header */
894 if (!cstate->csv_mode && cstate->header_line)
895 ereport(ERROR,
896 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
897 errmsg("COPY HEADER available only in CSV mode")));
899 /* Check quote */
900 if (!cstate->csv_mode && cstate->quote != NULL)
901 ereport(ERROR,
902 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
903 errmsg("COPY quote available only in CSV mode")));
905 if (cstate->csv_mode && strlen(cstate->quote) != 1)
906 ereport(ERROR,
907 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
908 errmsg("COPY quote must be a single ASCII character")));
910 if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
911 ereport(ERROR,
912 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
913 errmsg("COPY delimiter and quote must be different")));
915 /* Check escape */
916 if (!cstate->csv_mode && cstate->escape != NULL)
917 ereport(ERROR,
918 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
919 errmsg("COPY escape available only in CSV mode")));
921 if (cstate->csv_mode && strlen(cstate->escape) != 1)
922 ereport(ERROR,
923 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
924 errmsg("COPY escape must be a single ASCII character")));
926 /* Check force_quote */
927 if (!cstate->csv_mode && force_quote != NIL)
928 ereport(ERROR,
929 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
930 errmsg("COPY force quote available only in CSV mode")));
931 if (force_quote != NIL && is_from)
932 ereport(ERROR,
933 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
934 errmsg("COPY force quote only available using COPY TO")));
936 /* Check force_notnull */
937 if (!cstate->csv_mode && force_notnull != NIL)
938 ereport(ERROR,
939 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
940 errmsg("COPY force not null available only in CSV mode")));
941 if (force_notnull != NIL && !is_from)
942 ereport(ERROR,
943 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
944 errmsg("COPY force not null only available using COPY FROM")));
946 /* Don't allow the delimiter to appear in the null string. */
947 if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
948 ereport(ERROR,
949 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
950 errmsg("COPY delimiter must not appear in the NULL specification")));
952 /* Don't allow the CSV quote char to appear in the null string. */
953 if (cstate->csv_mode &&
954 strchr(cstate->null_print, cstate->quote[0]) != NULL)
955 ereport(ERROR,
956 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
957 errmsg("CSV quote character must not appear in the NULL specification")));
959 /* Disallow file COPY except to superusers. */
960 if (!pipe && !superuser())
961 ereport(ERROR,
962 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
963 errmsg("must be superuser to COPY to or from a file"),
964 errhint("Anyone can COPY to stdout or from stdin. "
965 "psql's \\copy command also works for anyone.")));
967 if (stmt->relation)
969 Assert(!stmt->query);
970 cstate->queryDesc = NULL;
972 /* Open and lock the relation, using the appropriate lock type. */
973 cstate->rel = heap_openrv(stmt->relation,
974 (is_from ? RowExclusiveLock : AccessShareLock));
976 /* Check relation permissions. */
977 aclresult = pg_class_aclcheck(RelationGetRelid(cstate->rel),
978 GetUserId(),
979 required_access);
980 if (aclresult != ACLCHECK_OK)
981 aclcheck_error(aclresult, ACL_KIND_CLASS,
982 RelationGetRelationName(cstate->rel));
984 /* check read-only transaction */
985 if (XactReadOnly && is_from &&
986 !isTempNamespace(RelationGetNamespace(cstate->rel)))
987 ereport(ERROR,
988 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
989 errmsg("transaction is read-only")));
991 /* Don't allow COPY w/ OIDs to or from a table without them */
992 if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
993 ereport(ERROR,
994 (errcode(ERRCODE_UNDEFINED_COLUMN),
995 errmsg("table \"%s\" does not have OIDs",
996 RelationGetRelationName(cstate->rel))));
998 tupDesc = RelationGetDescr(cstate->rel);
1000 else
1002 List *rewritten;
1003 Query *query;
1004 PlannedStmt *plan;
1005 DestReceiver *dest;
1007 Assert(!is_from);
1008 cstate->rel = NULL;
1010 /* Don't allow COPY w/ OIDs from a select */
1011 if (cstate->oids)
1012 ereport(ERROR,
1013 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1014 errmsg("COPY (SELECT) WITH OIDS is not supported")));
1017 * Run parse analysis and rewrite. Note this also acquires sufficient
1018 * locks on the source table(s).
1020 * Because the parser and planner tend to scribble on their input, we
1021 * make a preliminary copy of the source querytree. This prevents
1022 * problems in the case that the COPY is in a portal or plpgsql
1023 * function and is executed repeatedly. (See also the same hack in
1024 * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1026 rewritten = pg_analyze_and_rewrite((Node *) copyObject(stmt->query),
1027 queryString, NULL, 0);
1029 /* We don't expect more or less than one result query */
1030 if (list_length(rewritten) != 1)
1031 elog(ERROR, "unexpected rewrite result");
1033 query = (Query *) linitial(rewritten);
1034 Assert(query->commandType == CMD_SELECT);
1035 Assert(query->utilityStmt == NULL);
1037 /* Query mustn't use INTO, either */
1038 if (query->intoClause)
1039 ereport(ERROR,
1040 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1041 errmsg("COPY (SELECT INTO) is not supported")));
1043 /* plan the query */
1044 plan = planner(query, 0, NULL);
1047 * Use a snapshot with an updated command ID to ensure this query sees
1048 * results of any previously executed queries.
1050 PushUpdatedSnapshot(GetActiveSnapshot());
1052 /* Create dest receiver for COPY OUT */
1053 dest = CreateDestReceiver(DestCopyOut, NULL);
1054 ((DR_copy *) dest)->cstate = cstate;
1056 /* Create a QueryDesc requesting no output */
1057 cstate->queryDesc = CreateQueryDesc(plan, GetActiveSnapshot(),
1058 InvalidSnapshot,
1059 dest, NULL, false);
1062 * Call ExecutorStart to prepare the plan for execution.
1064 * ExecutorStart computes a result tupdesc for us
1066 ExecutorStart(cstate->queryDesc, 0);
1068 tupDesc = cstate->queryDesc->tupDesc;
1071 /* Generate or convert list of attributes to process */
1072 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1074 num_phys_attrs = tupDesc->natts;
1076 /* Convert FORCE QUOTE name list to per-column flags, check validity */
1077 cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1078 if (force_quote)
1080 List *attnums;
1081 ListCell *cur;
1083 attnums = CopyGetAttnums(tupDesc, cstate->rel, force_quote);
1085 foreach(cur, attnums)
1087 int attnum = lfirst_int(cur);
1089 if (!list_member_int(cstate->attnumlist, attnum))
1090 ereport(ERROR,
1091 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1092 errmsg("FORCE QUOTE column \"%s\" not referenced by COPY",
1093 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1094 cstate->force_quote_flags[attnum - 1] = true;
1098 /* Convert FORCE NOT NULL name list to per-column flags, check validity */
1099 cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1100 if (force_notnull)
1102 List *attnums;
1103 ListCell *cur;
1105 attnums = CopyGetAttnums(tupDesc, cstate->rel, force_notnull);
1107 foreach(cur, attnums)
1109 int attnum = lfirst_int(cur);
1111 if (!list_member_int(cstate->attnumlist, attnum))
1112 ereport(ERROR,
1113 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1114 errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY",
1115 NameStr(tupDesc->attrs[attnum - 1]->attname))));
1116 cstate->force_notnull_flags[attnum - 1] = true;
1120 /* Set up variables to avoid per-attribute overhead. */
1121 initStringInfo(&cstate->attribute_buf);
1122 initStringInfo(&cstate->line_buf);
1123 cstate->line_buf_converted = false;
1124 cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
1125 cstate->raw_buf_index = cstate->raw_buf_len = 0;
1126 cstate->processed = 0;
1129 * Set up encoding conversion info. Even if the client and server
1130 * encodings are the same, we must apply pg_client_to_server() to validate
1131 * data in multibyte encodings.
1133 cstate->client_encoding = pg_get_client_encoding();
1134 cstate->need_transcoding =
1135 (cstate->client_encoding != GetDatabaseEncoding() ||
1136 pg_database_encoding_max_length() > 1);
1137 /* See Multibyte encoding comment above */
1138 cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding);
1140 cstate->copy_dest = COPY_FILE; /* default */
1141 cstate->filename = stmt->filename;
1143 if (is_from)
1144 CopyFrom(cstate); /* copy from file to database */
1145 else
1146 DoCopyTo(cstate); /* copy from database to file */
1149 * Close the relation or query. If reading, we can release the
1150 * AccessShareLock we got; if writing, we should hold the lock until end
1151 * of transaction to ensure that updates will be committed before lock is
1152 * released.
1154 if (cstate->rel)
1155 heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock));
1156 else
1158 /* Close down the query and free resources. */
1159 ExecutorEnd(cstate->queryDesc);
1160 FreeQueryDesc(cstate->queryDesc);
1161 PopActiveSnapshot();
1164 /* Clean up storage (probably not really necessary) */
1165 processed = cstate->processed;
1167 pfree(cstate->attribute_buf.data);
1168 pfree(cstate->line_buf.data);
1169 pfree(cstate->raw_buf);
1170 pfree(cstate);
1172 return processed;
1177 * This intermediate routine exists mainly to localize the effects of setjmp
1178 * so we don't need to plaster a lot of variables with "volatile".
1180 static void
1181 DoCopyTo(CopyState cstate)
1183 bool pipe = (cstate->filename == NULL);
1185 if (cstate->rel)
1187 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
1189 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
1190 ereport(ERROR,
1191 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1192 errmsg("cannot copy from view \"%s\"",
1193 RelationGetRelationName(cstate->rel)),
1194 errhint("Try the COPY (SELECT ...) TO variant.")));
1195 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
1196 ereport(ERROR,
1197 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1198 errmsg("cannot copy from sequence \"%s\"",
1199 RelationGetRelationName(cstate->rel))));
1200 else
1201 ereport(ERROR,
1202 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1203 errmsg("cannot copy from non-table relation \"%s\"",
1204 RelationGetRelationName(cstate->rel))));
1208 if (pipe)
1210 if (whereToSendOutput == DestRemote)
1211 cstate->fe_copy = true;
1212 else
1213 cstate->copy_file = stdout;
1215 else
1217 mode_t oumask; /* Pre-existing umask value */
1218 struct stat st;
1221 * Prevent write to relative path ... too easy to shoot oneself in the
1222 * foot by overwriting a database file ...
1224 if (!is_absolute_path(cstate->filename))
1225 ereport(ERROR,
1226 (errcode(ERRCODE_INVALID_NAME),
1227 errmsg("relative path not allowed for COPY to file")));
1229 oumask = umask((mode_t) 022);
1230 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1231 umask(oumask);
1233 if (cstate->copy_file == NULL)
1234 ereport(ERROR,
1235 (errcode_for_file_access(),
1236 errmsg("could not open file \"%s\" for writing: %m",
1237 cstate->filename)));
1239 fstat(fileno(cstate->copy_file), &st);
1240 if (S_ISDIR(st.st_mode))
1241 ereport(ERROR,
1242 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1243 errmsg("\"%s\" is a directory", cstate->filename)));
1246 PG_TRY();
1248 if (cstate->fe_copy)
1249 SendCopyBegin(cstate);
1251 CopyTo(cstate);
1253 if (cstate->fe_copy)
1254 SendCopyEnd(cstate);
1256 PG_CATCH();
1259 * Make sure we turn off old-style COPY OUT mode upon error. It is
1260 * okay to do this in all cases, since it does nothing if the mode is
1261 * not on.
1263 pq_endcopyout(true);
1264 PG_RE_THROW();
1266 PG_END_TRY();
1268 if (!pipe)
1270 if (FreeFile(cstate->copy_file))
1271 ereport(ERROR,
1272 (errcode_for_file_access(),
1273 errmsg("could not write to file \"%s\": %m",
1274 cstate->filename)));
1279 * Copy from relation or query TO file.
1281 static void
1282 CopyTo(CopyState cstate)
1284 TupleDesc tupDesc;
1285 int num_phys_attrs;
1286 Form_pg_attribute *attr;
1287 ListCell *cur;
1289 if (cstate->rel)
1290 tupDesc = RelationGetDescr(cstate->rel);
1291 else
1292 tupDesc = cstate->queryDesc->tupDesc;
1293 attr = tupDesc->attrs;
1294 num_phys_attrs = tupDesc->natts;
1295 cstate->null_print_client = cstate->null_print; /* default */
1297 /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1298 cstate->fe_msgbuf = makeStringInfo();
1300 /* Get info about the columns we need to process. */
1301 cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1302 foreach(cur, cstate->attnumlist)
1304 int attnum = lfirst_int(cur);
1305 Oid out_func_oid;
1306 bool isvarlena;
1308 if (cstate->binary)
1309 getTypeBinaryOutputInfo(attr[attnum - 1]->atttypid,
1310 &out_func_oid,
1311 &isvarlena);
1312 else
1313 getTypeOutputInfo(attr[attnum - 1]->atttypid,
1314 &out_func_oid,
1315 &isvarlena);
1316 fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1320 * Create a temporary memory context that we can reset once per row to
1321 * recover palloc'd memory. This avoids any problems with leaks inside
1322 * datatype output routines, and should be faster than retail pfree's
1323 * anyway. (We don't need a whole econtext as CopyFrom does.)
1325 cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1326 "COPY TO",
1327 ALLOCSET_DEFAULT_MINSIZE,
1328 ALLOCSET_DEFAULT_INITSIZE,
1329 ALLOCSET_DEFAULT_MAXSIZE);
1331 if (cstate->binary)
1333 /* Generate header for a binary copy */
1334 int32 tmp;
1336 /* Signature */
1337 CopySendData(cstate, (char *) BinarySignature, 11);
1338 /* Flags field */
1339 tmp = 0;
1340 if (cstate->oids)
1341 tmp |= (1 << 16);
1342 CopySendInt32(cstate, tmp);
1343 /* No header extension */
1344 tmp = 0;
1345 CopySendInt32(cstate, tmp);
1347 else
1350 * For non-binary copy, we need to convert null_print to client
1351 * encoding, because it will be sent directly with CopySendString.
1353 if (cstate->need_transcoding)
1354 cstate->null_print_client = pg_server_to_client(cstate->null_print,
1355 cstate->null_print_len);
1357 /* if a header has been requested send the line */
1358 if (cstate->header_line)
1360 bool hdr_delim = false;
1362 foreach(cur, cstate->attnumlist)
1364 int attnum = lfirst_int(cur);
1365 char *colname;
1367 if (hdr_delim)
1368 CopySendChar(cstate, cstate->delim[0]);
1369 hdr_delim = true;
1371 colname = NameStr(attr[attnum - 1]->attname);
1373 CopyAttributeOutCSV(cstate, colname, false,
1374 list_length(cstate->attnumlist) == 1);
1377 CopySendEndOfRow(cstate);
1381 if (cstate->rel)
1383 Datum *values;
1384 bool *nulls;
1385 HeapScanDesc scandesc;
1386 HeapTuple tuple;
1388 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
1389 nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
1391 scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
1393 while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
1395 CHECK_FOR_INTERRUPTS();
1397 /* Deconstruct the tuple ... faster than repeated heap_getattr */
1398 heap_deform_tuple(tuple, tupDesc, values, nulls);
1400 /* Format and send the data */
1401 CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
1404 heap_endscan(scandesc);
1406 else
1408 /* run the plan --- the dest receiver will send tuples */
1409 ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
1412 if (cstate->binary)
1414 /* Generate trailer for a binary copy */
1415 CopySendInt16(cstate, -1);
1416 /* Need to flush out the trailer */
1417 CopySendEndOfRow(cstate);
1420 MemoryContextDelete(cstate->rowcontext);
1424 * Emit one row during CopyTo().
1426 static void
1427 CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
1429 bool need_delim = false;
1430 FmgrInfo *out_functions = cstate->out_functions;
1431 MemoryContext oldcontext;
1432 ListCell *cur;
1433 char *string;
1435 MemoryContextReset(cstate->rowcontext);
1436 oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
1438 if (cstate->binary)
1440 /* Binary per-tuple header */
1441 CopySendInt16(cstate, list_length(cstate->attnumlist));
1442 /* Send OID if wanted --- note attnumlist doesn't include it */
1443 if (cstate->oids)
1445 /* Hack --- assume Oid is same size as int32 */
1446 CopySendInt32(cstate, sizeof(int32));
1447 CopySendInt32(cstate, tupleOid);
1450 else
1452 /* Text format has no per-tuple header, but send OID if wanted */
1453 /* Assume digits don't need any quoting or encoding conversion */
1454 if (cstate->oids)
1456 string = DatumGetCString(DirectFunctionCall1(oidout,
1457 ObjectIdGetDatum(tupleOid)));
1458 CopySendString(cstate, string);
1459 need_delim = true;
1463 foreach(cur, cstate->attnumlist)
1465 int attnum = lfirst_int(cur);
1466 Datum value = values[attnum - 1];
1467 bool isnull = nulls[attnum - 1];
1469 if (!cstate->binary)
1471 if (need_delim)
1472 CopySendChar(cstate, cstate->delim[0]);
1473 need_delim = true;
1476 if (isnull)
1478 if (!cstate->binary)
1479 CopySendString(cstate, cstate->null_print_client);
1480 else
1481 CopySendInt32(cstate, -1);
1483 else
1485 if (!cstate->binary)
1487 string = OutputFunctionCall(&out_functions[attnum - 1],
1488 value);
1489 if (cstate->csv_mode)
1490 CopyAttributeOutCSV(cstate, string,
1491 cstate->force_quote_flags[attnum - 1],
1492 list_length(cstate->attnumlist) == 1);
1493 else
1494 CopyAttributeOutText(cstate, string);
1496 else
1498 bytea *outputbytes;
1500 outputbytes = SendFunctionCall(&out_functions[attnum - 1],
1501 value);
1502 CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
1503 CopySendData(cstate, VARDATA(outputbytes),
1504 VARSIZE(outputbytes) - VARHDRSZ);
1509 CopySendEndOfRow(cstate);
1511 MemoryContextSwitchTo(oldcontext);
1513 cstate->processed++;
1518 * error context callback for COPY FROM
1520 static void
1521 copy_in_error_callback(void *arg)
1523 CopyState cstate = (CopyState) arg;
1525 if (cstate->binary)
1527 /* can't usefully display the data */
1528 if (cstate->cur_attname)
1529 errcontext("COPY %s, line %d, column %s",
1530 cstate->cur_relname, cstate->cur_lineno,
1531 cstate->cur_attname);
1532 else
1533 errcontext("COPY %s, line %d",
1534 cstate->cur_relname, cstate->cur_lineno);
1536 else
1538 if (cstate->cur_attname && cstate->cur_attval)
1540 /* error is relevant to a particular column */
1541 char *attval;
1543 attval = limit_printout_length(cstate->cur_attval);
1544 errcontext("COPY %s, line %d, column %s: \"%s\"",
1545 cstate->cur_relname, cstate->cur_lineno,
1546 cstate->cur_attname, attval);
1547 pfree(attval);
1549 else if (cstate->cur_attname)
1551 /* error is relevant to a particular column, value is NULL */
1552 errcontext("COPY %s, line %d, column %s: null input",
1553 cstate->cur_relname, cstate->cur_lineno,
1554 cstate->cur_attname);
1556 else
1558 /* error is relevant to a particular line */
1559 if (cstate->line_buf_converted || !cstate->need_transcoding)
1561 char *lineval;
1563 lineval = limit_printout_length(cstate->line_buf.data);
1564 errcontext("COPY %s, line %d: \"%s\"",
1565 cstate->cur_relname, cstate->cur_lineno, lineval);
1566 pfree(lineval);
1568 else
1571 * Here, the line buffer is still in a foreign encoding, and
1572 * indeed it's quite likely that the error is precisely a
1573 * failure to do encoding conversion (ie, bad data). We dare
1574 * not try to convert it, and at present there's no way to
1575 * regurgitate it without conversion. So we have to punt and
1576 * just report the line number.
1578 errcontext("COPY %s, line %d",
1579 cstate->cur_relname, cstate->cur_lineno);
1586 * Make sure we don't print an unreasonable amount of COPY data in a message.
1588 * It would seem a lot easier to just use the sprintf "precision" limit to
1589 * truncate the string. However, some versions of glibc have a bug/misfeature
1590 * that vsnprintf will always fail (return -1) if it is asked to truncate
1591 * a string that contains invalid byte sequences for the current encoding.
1592 * So, do our own truncation. We return a pstrdup'd copy of the input.
1594 static char *
1595 limit_printout_length(const char *str)
1597 #define MAX_COPY_DATA_DISPLAY 100
1599 int slen = strlen(str);
1600 int len;
1601 char *res;
1603 /* Fast path if definitely okay */
1604 if (slen <= MAX_COPY_DATA_DISPLAY)
1605 return pstrdup(str);
1607 /* Apply encoding-dependent truncation */
1608 len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
1611 * Truncate, and add "..." to show we truncated the input.
1613 res = (char *) palloc(len + 4);
1614 memcpy(res, str, len);
1615 strcpy(res + len, "...");
1617 return res;
1621 * Copy FROM file to relation.
1623 static void
1624 CopyFrom(CopyState cstate)
1626 bool pipe = (cstate->filename == NULL);
1627 HeapTuple tuple;
1628 TupleDesc tupDesc;
1629 Form_pg_attribute *attr;
1630 AttrNumber num_phys_attrs,
1631 attr_count,
1632 num_defaults;
1633 FmgrInfo *in_functions;
1634 FmgrInfo oid_in_function;
1635 Oid *typioparams;
1636 Oid oid_typioparam;
1637 int attnum;
1638 int i;
1639 Oid in_func_oid;
1640 Datum *values;
1641 char *nulls;
1642 int nfields;
1643 char **field_strings;
1644 bool done = false;
1645 bool isnull;
1646 ResultRelInfo *resultRelInfo;
1647 EState *estate = CreateExecutorState(); /* for ExecConstraints() */
1648 TupleTableSlot *slot;
1649 bool file_has_oids;
1650 int *defmap;
1651 ExprState **defexprs; /* array of default att expressions */
1652 ExprContext *econtext; /* used for ExecEvalExpr for default atts */
1653 MemoryContext oldcontext = CurrentMemoryContext;
1654 ErrorContextCallback errcontext;
1655 CommandId mycid = GetCurrentCommandId(true);
1656 bool use_wal = true; /* by default, use WAL logging */
1657 bool use_fsm = true; /* by default, use FSM for free space */
1659 Assert(cstate->rel);
1661 if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
1663 if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
1664 ereport(ERROR,
1665 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1666 errmsg("cannot copy to view \"%s\"",
1667 RelationGetRelationName(cstate->rel))));
1668 else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
1669 ereport(ERROR,
1670 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1671 errmsg("cannot copy to sequence \"%s\"",
1672 RelationGetRelationName(cstate->rel))));
1673 else
1674 ereport(ERROR,
1675 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1676 errmsg("cannot copy to non-table relation \"%s\"",
1677 RelationGetRelationName(cstate->rel))));
1680 /*----------
1681 * Check to see if we can avoid writing WAL
1683 * If archive logging is not enabled *and* either
1684 * - table was created in same transaction as this COPY
1685 * - data is being written to relfilenode created in this transaction
1686 * then we can skip writing WAL. It's safe because if the transaction
1687 * doesn't commit, we'll discard the table (or the new relfilenode file).
1688 * If it does commit, we'll have done the heap_sync at the bottom of this
1689 * routine first.
1691 * As mentioned in comments in utils/rel.h, the in-same-transaction test
1692 * is not completely reliable, since in rare cases rd_createSubid or
1693 * rd_newRelfilenodeSubid can be cleared before the end of the transaction.
1694 * However this is OK since at worst we will fail to make the optimization.
1696 * Also, if the target file is new-in-transaction, we assume that checking
1697 * FSM for free space is a waste of time, even if we must use WAL because
1698 * of archiving. This could possibly be wrong, but it's unlikely.
1700 * The comments for heap_insert and RelationGetBufferForTuple specify that
1701 * skipping WAL logging is only safe if we ensure that our tuples do not
1702 * go into pages containing tuples from any other transactions --- but this
1703 * must be the case if we have a new table or new relfilenode, so we need
1704 * no additional work to enforce that.
1705 *----------
1707 if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
1708 cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
1710 use_fsm = false;
1711 if (!XLogArchivingActive())
1712 use_wal = false;
1715 if (pipe)
1717 if (whereToSendOutput == DestRemote)
1718 ReceiveCopyBegin(cstate);
1719 else
1720 cstate->copy_file = stdin;
1722 else
1724 struct stat st;
1726 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1728 if (cstate->copy_file == NULL)
1729 ereport(ERROR,
1730 (errcode_for_file_access(),
1731 errmsg("could not open file \"%s\" for reading: %m",
1732 cstate->filename)));
1734 fstat(fileno(cstate->copy_file), &st);
1735 if (S_ISDIR(st.st_mode))
1736 ereport(ERROR,
1737 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1738 errmsg("\"%s\" is a directory", cstate->filename)));
1741 tupDesc = RelationGetDescr(cstate->rel);
1742 attr = tupDesc->attrs;
1743 num_phys_attrs = tupDesc->natts;
1744 attr_count = list_length(cstate->attnumlist);
1745 num_defaults = 0;
1748 * We need a ResultRelInfo so we can use the regular executor's
1749 * index-entry-making machinery. (There used to be a huge amount of code
1750 * here that basically duplicated execUtils.c ...)
1752 resultRelInfo = makeNode(ResultRelInfo);
1753 resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
1754 resultRelInfo->ri_RelationDesc = cstate->rel;
1755 resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
1756 if (resultRelInfo->ri_TrigDesc)
1757 resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
1758 palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo));
1759 resultRelInfo->ri_TrigInstrument = NULL;
1761 ExecOpenIndices(resultRelInfo);
1763 estate->es_result_relations = resultRelInfo;
1764 estate->es_num_result_relations = 1;
1765 estate->es_result_relation_info = resultRelInfo;
1767 /* Set up a tuple slot too */
1768 slot = MakeSingleTupleTableSlot(tupDesc);
1770 econtext = GetPerTupleExprContext(estate);
1773 * Pick up the required catalog information for each attribute in the
1774 * relation, including the input function, the element type (to pass to
1775 * the input function), and info about defaults and constraints. (Which
1776 * input function we use depends on text/binary format choice.)
1778 in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1779 typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1780 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1781 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1783 for (attnum = 1; attnum <= num_phys_attrs; attnum++)
1785 /* We don't need info for dropped attributes */
1786 if (attr[attnum - 1]->attisdropped)
1787 continue;
1789 /* Fetch the input function and typioparam info */
1790 if (cstate->binary)
1791 getTypeBinaryInputInfo(attr[attnum - 1]->atttypid,
1792 &in_func_oid, &typioparams[attnum - 1]);
1793 else
1794 getTypeInputInfo(attr[attnum - 1]->atttypid,
1795 &in_func_oid, &typioparams[attnum - 1]);
1796 fmgr_info(in_func_oid, &in_functions[attnum - 1]);
1798 /* Get default info if needed */
1799 if (!list_member_int(cstate->attnumlist, attnum))
1801 /* attribute is NOT to be copied from input */
1802 /* use default value if one exists */
1803 Node *defexpr = build_column_default(cstate->rel, attnum);
1805 if (defexpr != NULL)
1807 defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr,
1808 estate);
1809 defmap[num_defaults] = attnum - 1;
1810 num_defaults++;
1815 /* Prepare to catch AFTER triggers. */
1816 AfterTriggerBeginQuery();
1819 * Check BEFORE STATEMENT insertion triggers. It's debateable whether we
1820 * should do this for COPY, since it's not really an "INSERT" statement as
1821 * such. However, executing these triggers maintains consistency with the
1822 * EACH ROW triggers that we already fire on COPY.
1824 ExecBSInsertTriggers(estate, resultRelInfo);
1826 if (!cstate->binary)
1827 file_has_oids = cstate->oids; /* must rely on user to tell us... */
1828 else
1830 /* Read and verify binary header */
1831 char readSig[11];
1832 int32 tmp;
1834 /* Signature */
1835 if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
1836 memcmp(readSig, BinarySignature, 11) != 0)
1837 ereport(ERROR,
1838 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1839 errmsg("COPY file signature not recognized")));
1840 /* Flags field */
1841 if (!CopyGetInt32(cstate, &tmp))
1842 ereport(ERROR,
1843 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1844 errmsg("invalid COPY file header (missing flags)")));
1845 file_has_oids = (tmp & (1 << 16)) != 0;
1846 tmp &= ~(1 << 16);
1847 if ((tmp >> 16) != 0)
1848 ereport(ERROR,
1849 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1850 errmsg("unrecognized critical flags in COPY file header")));
1851 /* Header extension length */
1852 if (!CopyGetInt32(cstate, &tmp) ||
1853 tmp < 0)
1854 ereport(ERROR,
1855 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1856 errmsg("invalid COPY file header (missing length)")));
1857 /* Skip extension header, if present */
1858 while (tmp-- > 0)
1860 if (CopyGetData(cstate, readSig, 1, 1) != 1)
1861 ereport(ERROR,
1862 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1863 errmsg("invalid COPY file header (wrong length)")));
1867 if (file_has_oids && cstate->binary)
1869 getTypeBinaryInputInfo(OIDOID,
1870 &in_func_oid, &oid_typioparam);
1871 fmgr_info(in_func_oid, &oid_in_function);
1874 values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
1875 nulls = (char *) palloc(num_phys_attrs * sizeof(char));
1877 /* create workspace for CopyReadAttributes results */
1878 nfields = file_has_oids ? (attr_count + 1) : attr_count;
1879 field_strings = (char **) palloc(nfields * sizeof(char *));
1881 /* Initialize state variables */
1882 cstate->fe_eof = false;
1883 cstate->eol_type = EOL_UNKNOWN;
1884 cstate->cur_relname = RelationGetRelationName(cstate->rel);
1885 cstate->cur_lineno = 0;
1886 cstate->cur_attname = NULL;
1887 cstate->cur_attval = NULL;
1889 /* Set up callback to identify error line number */
1890 errcontext.callback = copy_in_error_callback;
1891 errcontext.arg = (void *) cstate;
1892 errcontext.previous = error_context_stack;
1893 error_context_stack = &errcontext;
1895 /* on input just throw the header line away */
1896 if (cstate->header_line)
1898 cstate->cur_lineno++;
1899 done = CopyReadLine(cstate);
1902 while (!done)
1904 bool skip_tuple;
1905 Oid loaded_oid = InvalidOid;
1907 CHECK_FOR_INTERRUPTS();
1909 cstate->cur_lineno++;
1911 /* Reset the per-tuple exprcontext */
1912 ResetPerTupleExprContext(estate);
1914 /* Switch into its memory context */
1915 MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1917 /* Initialize all values for row to NULL */
1918 MemSet(values, 0, num_phys_attrs * sizeof(Datum));
1919 MemSet(nulls, 'n', num_phys_attrs * sizeof(char));
1921 if (!cstate->binary)
1923 ListCell *cur;
1924 int fldct;
1925 int fieldno;
1926 char *string;
1928 /* Actually read the line into memory here */
1929 done = CopyReadLine(cstate);
1932 * EOF at start of line means we're done. If we see EOF after
1933 * some characters, we act as though it was newline followed by
1934 * EOF, ie, process the line and then exit loop on next iteration.
1936 if (done && cstate->line_buf.len == 0)
1937 break;
1939 /* Parse the line into de-escaped field values */
1940 if (cstate->csv_mode)
1941 fldct = CopyReadAttributesCSV(cstate, nfields, field_strings);
1942 else
1943 fldct = CopyReadAttributesText(cstate, nfields, field_strings);
1944 fieldno = 0;
1946 /* Read the OID field if present */
1947 if (file_has_oids)
1949 if (fieldno >= fldct)
1950 ereport(ERROR,
1951 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1952 errmsg("missing data for OID column")));
1953 string = field_strings[fieldno++];
1955 if (string == NULL)
1956 ereport(ERROR,
1957 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1958 errmsg("null OID in COPY data")));
1959 else
1961 cstate->cur_attname = "oid";
1962 cstate->cur_attval = string;
1963 loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin,
1964 CStringGetDatum(string)));
1965 if (loaded_oid == InvalidOid)
1966 ereport(ERROR,
1967 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1968 errmsg("invalid OID in COPY data")));
1969 cstate->cur_attname = NULL;
1970 cstate->cur_attval = NULL;
1974 /* Loop to read the user attributes on the line. */
1975 foreach(cur, cstate->attnumlist)
1977 int attnum = lfirst_int(cur);
1978 int m = attnum - 1;
1980 if (fieldno >= fldct)
1981 ereport(ERROR,
1982 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
1983 errmsg("missing data for column \"%s\"",
1984 NameStr(attr[m]->attname))));
1985 string = field_strings[fieldno++];
1987 if (cstate->csv_mode && string == NULL &&
1988 cstate->force_notnull_flags[m])
1990 /* Go ahead and read the NULL string */
1991 string = cstate->null_print;
1994 cstate->cur_attname = NameStr(attr[m]->attname);
1995 cstate->cur_attval = string;
1996 values[m] = InputFunctionCall(&in_functions[m],
1997 string,
1998 typioparams[m],
1999 attr[m]->atttypmod);
2000 if (string != NULL)
2001 nulls[m] = ' ';
2002 cstate->cur_attname = NULL;
2003 cstate->cur_attval = NULL;
2006 Assert(fieldno == nfields);
2008 else
2010 /* binary */
2011 int16 fld_count;
2012 ListCell *cur;
2014 if (!CopyGetInt16(cstate, &fld_count) ||
2015 fld_count == -1)
2017 done = true;
2018 break;
2021 if (fld_count != attr_count)
2022 ereport(ERROR,
2023 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2024 errmsg("row field count is %d, expected %d",
2025 (int) fld_count, attr_count)));
2027 if (file_has_oids)
2029 cstate->cur_attname = "oid";
2030 loaded_oid =
2031 DatumGetObjectId(CopyReadBinaryAttribute(cstate,
2033 &oid_in_function,
2034 oid_typioparam,
2036 &isnull));
2037 if (isnull || loaded_oid == InvalidOid)
2038 ereport(ERROR,
2039 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2040 errmsg("invalid OID in COPY data")));
2041 cstate->cur_attname = NULL;
2044 i = 0;
2045 foreach(cur, cstate->attnumlist)
2047 int attnum = lfirst_int(cur);
2048 int m = attnum - 1;
2050 cstate->cur_attname = NameStr(attr[m]->attname);
2051 i++;
2052 values[m] = CopyReadBinaryAttribute(cstate,
2054 &in_functions[m],
2055 typioparams[m],
2056 attr[m]->atttypmod,
2057 &isnull);
2058 nulls[m] = isnull ? 'n' : ' ';
2059 cstate->cur_attname = NULL;
2064 * Now compute and insert any defaults available for the columns not
2065 * provided by the input data. Anything not processed here or above
2066 * will remain NULL.
2068 for (i = 0; i < num_defaults; i++)
2070 values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
2071 &isnull, NULL);
2072 if (!isnull)
2073 nulls[defmap[i]] = ' ';
2076 /* And now we can form the input tuple. */
2077 tuple = heap_formtuple(tupDesc, values, nulls);
2079 if (cstate->oids && file_has_oids)
2080 HeapTupleSetOid(tuple, loaded_oid);
2082 /* Triggers and stuff need to be invoked in query context. */
2083 MemoryContextSwitchTo(oldcontext);
2085 skip_tuple = false;
2087 /* BEFORE ROW INSERT Triggers */
2088 if (resultRelInfo->ri_TrigDesc &&
2089 resultRelInfo->ri_TrigDesc->n_before_row[TRIGGER_EVENT_INSERT] > 0)
2091 HeapTuple newtuple;
2093 newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);
2095 if (newtuple == NULL) /* "do nothing" */
2096 skip_tuple = true;
2097 else if (newtuple != tuple) /* modified by Trigger(s) */
2099 heap_freetuple(tuple);
2100 tuple = newtuple;
2104 if (!skip_tuple)
2106 /* Place tuple in tuple slot */
2107 ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2109 /* Check the constraints of the tuple */
2110 if (cstate->rel->rd_att->constr)
2111 ExecConstraints(resultRelInfo, slot, estate);
2113 /* OK, store the tuple and create index entries for it */
2114 heap_insert(cstate->rel, tuple, mycid, use_wal, use_fsm);
2116 if (resultRelInfo->ri_NumIndices > 0)
2117 ExecInsertIndexTuples(slot, &(tuple->t_self), estate, false);
2119 /* AFTER ROW INSERT Triggers */
2120 ExecARInsertTriggers(estate, resultRelInfo, tuple);
2123 * We count only tuples not suppressed by a BEFORE INSERT trigger;
2124 * this is the same definition used by execMain.c for counting
2125 * tuples inserted by an INSERT command.
2127 cstate->processed++;
2131 /* Done, clean up */
2132 error_context_stack = errcontext.previous;
2134 MemoryContextSwitchTo(oldcontext);
2136 /* Execute AFTER STATEMENT insertion triggers */
2137 ExecASInsertTriggers(estate, resultRelInfo);
2139 /* Handle queued AFTER triggers */
2140 AfterTriggerEndQuery(estate);
2142 pfree(values);
2143 pfree(nulls);
2144 pfree(field_strings);
2146 pfree(in_functions);
2147 pfree(typioparams);
2148 pfree(defmap);
2149 pfree(defexprs);
2151 ExecDropSingleTupleTableSlot(slot);
2153 ExecCloseIndices(resultRelInfo);
2155 FreeExecutorState(estate);
2157 if (!pipe)
2159 if (FreeFile(cstate->copy_file))
2160 ereport(ERROR,
2161 (errcode_for_file_access(),
2162 errmsg("could not read from file \"%s\": %m",
2163 cstate->filename)));
2167 * If we skipped writing WAL, then we need to sync the heap (but not
2168 * indexes since those use WAL anyway)
2170 if (!use_wal)
2171 heap_sync(cstate->rel);
2176 * Read the next input line and stash it in line_buf, with conversion to
2177 * server encoding.
2179 * Result is true if read was terminated by EOF, false if terminated
2180 * by newline. The terminating newline or EOF marker is not included
2181 * in the final value of line_buf.
2183 static bool
2184 CopyReadLine(CopyState cstate)
2186 bool result;
2188 resetStringInfo(&cstate->line_buf);
2190 /* Mark that encoding conversion hasn't occurred yet */
2191 cstate->line_buf_converted = false;
2193 /* Parse data and transfer into line_buf */
2194 result = CopyReadLineText(cstate);
2196 if (result)
2199 * Reached EOF. In protocol version 3, we should ignore anything
2200 * after \. up to the protocol end of copy data. (XXX maybe better
2201 * not to treat \. as special?)
2203 if (cstate->copy_dest == COPY_NEW_FE)
2207 cstate->raw_buf_index = cstate->raw_buf_len;
2208 } while (CopyLoadRawBuf(cstate));
2211 else
2214 * If we didn't hit EOF, then we must have transferred the EOL marker
2215 * to line_buf along with the data. Get rid of it.
2217 switch (cstate->eol_type)
2219 case EOL_NL:
2220 Assert(cstate->line_buf.len >= 1);
2221 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
2222 cstate->line_buf.len--;
2223 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2224 break;
2225 case EOL_CR:
2226 Assert(cstate->line_buf.len >= 1);
2227 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
2228 cstate->line_buf.len--;
2229 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2230 break;
2231 case EOL_CRNL:
2232 Assert(cstate->line_buf.len >= 2);
2233 Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
2234 Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
2235 cstate->line_buf.len -= 2;
2236 cstate->line_buf.data[cstate->line_buf.len] = '\0';
2237 break;
2238 case EOL_UNKNOWN:
2239 /* shouldn't get here */
2240 Assert(false);
2241 break;
2245 /* Done reading the line. Convert it to server encoding. */
2246 if (cstate->need_transcoding)
2248 char *cvt;
2250 cvt = pg_client_to_server(cstate->line_buf.data,
2251 cstate->line_buf.len);
2252 if (cvt != cstate->line_buf.data)
2254 /* transfer converted data back to line_buf */
2255 resetStringInfo(&cstate->line_buf);
2256 appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
2257 pfree(cvt);
2261 /* Now it's safe to use the buffer in error messages */
2262 cstate->line_buf_converted = true;
2264 return result;
2268 * CopyReadLineText - inner loop of CopyReadLine for text mode
2270 static bool
2271 CopyReadLineText(CopyState cstate)
2273 char *copy_raw_buf;
2274 int raw_buf_ptr;
2275 int copy_buf_len;
2276 bool need_data = false;
2277 bool hit_eof = false;
2278 bool result = false;
2279 char mblen_str[2];
2281 /* CSV variables */
2282 bool first_char_in_line = true;
2283 bool in_quote = false,
2284 last_was_esc = false;
2285 char quotec = '\0';
2286 char escapec = '\0';
2288 if (cstate->csv_mode)
2290 quotec = cstate->quote[0];
2291 escapec = cstate->escape[0];
2292 /* ignore special escape processing if it's the same as quotec */
2293 if (quotec == escapec)
2294 escapec = '\0';
2297 mblen_str[1] = '\0';
2300 * The objective of this loop is to transfer the entire next input line
2301 * into line_buf. Hence, we only care for detecting newlines (\r and/or
2302 * \n) and the end-of-copy marker (\.).
2304 * In CSV mode, \r and \n inside a quoted field are just part of the data
2305 * value and are put in line_buf. We keep just enough state to know if we
2306 * are currently in a quoted field or not.
2308 * These four characters, and the CSV escape and quote characters, are
2309 * assumed the same in frontend and backend encodings.
2311 * For speed, we try to move data from raw_buf to line_buf in chunks
2312 * rather than one character at a time. raw_buf_ptr points to the next
2313 * character to examine; any characters from raw_buf_index to raw_buf_ptr
2314 * have been determined to be part of the line, but not yet transferred to
2315 * line_buf.
2317 * For a little extra speed within the loop, we copy raw_buf and
2318 * raw_buf_len into local variables.
2320 copy_raw_buf = cstate->raw_buf;
2321 raw_buf_ptr = cstate->raw_buf_index;
2322 copy_buf_len = cstate->raw_buf_len;
2324 for (;;)
2326 int prev_raw_ptr;
2327 char c;
2330 * Load more data if needed. Ideally we would just force four bytes
2331 * of read-ahead and avoid the many calls to
2332 * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
2333 * does not allow us to read too far ahead or we might read into the
2334 * next data, so we read-ahead only as far we know we can. One
2335 * optimization would be to read-ahead four byte here if
2336 * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
2337 * considering the size of the buffer.
2339 if (raw_buf_ptr >= copy_buf_len || need_data)
2341 REFILL_LINEBUF;
2344 * Try to read some more data. This will certainly reset
2345 * raw_buf_index to zero, and raw_buf_ptr must go with it.
2347 if (!CopyLoadRawBuf(cstate))
2348 hit_eof = true;
2349 raw_buf_ptr = 0;
2350 copy_buf_len = cstate->raw_buf_len;
2353 * If we are completely out of data, break out of the loop,
2354 * reporting EOF.
2356 if (copy_buf_len <= 0)
2358 result = true;
2359 break;
2361 need_data = false;
2364 /* OK to fetch a character */
2365 prev_raw_ptr = raw_buf_ptr;
2366 c = copy_raw_buf[raw_buf_ptr++];
2368 if (cstate->csv_mode)
2371 * If character is '\\' or '\r', we may need to look ahead below.
2372 * Force fetch of the next character if we don't already have it.
2373 * We need to do this before changing CSV state, in case one of
2374 * these characters is also the quote or escape character.
2376 * Note: old-protocol does not like forced prefetch, but it's OK
2377 * here since we cannot validly be at EOF.
2379 if (c == '\\' || c == '\r')
2381 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2385 * Dealing with quotes and escapes here is mildly tricky. If the
2386 * quote char is also the escape char, there's no problem - we
2387 * just use the char as a toggle. If they are different, we need
2388 * to ensure that we only take account of an escape inside a
2389 * quoted field and immediately preceding a quote char, and not
2390 * the second in a escape-escape sequence.
2392 if (in_quote && c == escapec)
2393 last_was_esc = !last_was_esc;
2394 if (c == quotec && !last_was_esc)
2395 in_quote = !in_quote;
2396 if (c != escapec)
2397 last_was_esc = false;
2400 * Updating the line count for embedded CR and/or LF chars is
2401 * necessarily a little fragile - this test is probably about the
2402 * best we can do. (XXX it's arguable whether we should do this
2403 * at all --- is cur_lineno a physical or logical count?)
2405 if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
2406 cstate->cur_lineno++;
2409 /* Process \r */
2410 if (c == '\r' && (!cstate->csv_mode || !in_quote))
2412 /* Check for \r\n on first line, _and_ handle \r\n. */
2413 if (cstate->eol_type == EOL_UNKNOWN ||
2414 cstate->eol_type == EOL_CRNL)
2417 * If need more data, go back to loop top to load it.
2419 * Note that if we are at EOF, c will wind up as '\0' because
2420 * of the guaranteed pad of raw_buf.
2422 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2424 /* get next char */
2425 c = copy_raw_buf[raw_buf_ptr];
2427 if (c == '\n')
2429 raw_buf_ptr++; /* eat newline */
2430 cstate->eol_type = EOL_CRNL; /* in case not set yet */
2432 else
2434 /* found \r, but no \n */
2435 if (cstate->eol_type == EOL_CRNL)
2436 ereport(ERROR,
2437 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2438 !cstate->csv_mode ?
2439 errmsg("literal carriage return found in data") :
2440 errmsg("unquoted carriage return found in data"),
2441 !cstate->csv_mode ?
2442 errhint("Use \"\\r\" to represent carriage return.") :
2443 errhint("Use quoted CSV field to represent carriage return.")));
2446 * if we got here, it is the first line and we didn't find
2447 * \n, so don't consume the peeked character
2449 cstate->eol_type = EOL_CR;
2452 else if (cstate->eol_type == EOL_NL)
2453 ereport(ERROR,
2454 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2455 !cstate->csv_mode ?
2456 errmsg("literal carriage return found in data") :
2457 errmsg("unquoted carriage return found in data"),
2458 !cstate->csv_mode ?
2459 errhint("Use \"\\r\" to represent carriage return.") :
2460 errhint("Use quoted CSV field to represent carriage return.")));
2461 /* If reach here, we have found the line terminator */
2462 break;
2465 /* Process \n */
2466 if (c == '\n' && (!cstate->csv_mode || !in_quote))
2468 if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
2469 ereport(ERROR,
2470 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2471 !cstate->csv_mode ?
2472 errmsg("literal newline found in data") :
2473 errmsg("unquoted newline found in data"),
2474 !cstate->csv_mode ?
2475 errhint("Use \"\\n\" to represent newline.") :
2476 errhint("Use quoted CSV field to represent newline.")));
2477 cstate->eol_type = EOL_NL; /* in case not set yet */
2478 /* If reach here, we have found the line terminator */
2479 break;
2483 * In CSV mode, we only recognize \. alone on a line. This is because
2484 * \. is a valid CSV data value.
2486 if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
2488 char c2;
2490 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2491 IF_NEED_REFILL_AND_EOF_BREAK(0);
2493 /* -----
2494 * get next character
2495 * Note: we do not change c so if it isn't \., we can fall
2496 * through and continue processing for client encoding.
2497 * -----
2499 c2 = copy_raw_buf[raw_buf_ptr];
2501 if (c2 == '.')
2503 raw_buf_ptr++; /* consume the '.' */
2506 * Note: if we loop back for more data here, it does not
2507 * matter that the CSV state change checks are re-executed; we
2508 * will come back here with no important state changed.
2510 if (cstate->eol_type == EOL_CRNL)
2512 /* Get the next character */
2513 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2514 /* if hit_eof, c2 will become '\0' */
2515 c2 = copy_raw_buf[raw_buf_ptr++];
2517 if (c2 == '\n')
2519 if (!cstate->csv_mode)
2520 ereport(ERROR,
2521 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2522 errmsg("end-of-copy marker does not match previous newline style")));
2523 else
2524 NO_END_OF_COPY_GOTO;
2526 else if (c2 != '\r')
2528 if (!cstate->csv_mode)
2529 ereport(ERROR,
2530 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2531 errmsg("end-of-copy marker corrupt")));
2532 else
2533 NO_END_OF_COPY_GOTO;
2537 /* Get the next character */
2538 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
2539 /* if hit_eof, c2 will become '\0' */
2540 c2 = copy_raw_buf[raw_buf_ptr++];
2542 if (c2 != '\r' && c2 != '\n')
2544 if (!cstate->csv_mode)
2545 ereport(ERROR,
2546 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2547 errmsg("end-of-copy marker corrupt")));
2548 else
2549 NO_END_OF_COPY_GOTO;
2552 if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
2553 (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
2554 (cstate->eol_type == EOL_CR && c2 != '\r'))
2556 ereport(ERROR,
2557 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2558 errmsg("end-of-copy marker does not match previous newline style")));
2562 * Transfer only the data before the \. into line_buf, then
2563 * discard the data and the \. sequence.
2565 if (prev_raw_ptr > cstate->raw_buf_index)
2566 appendBinaryStringInfo(&cstate->line_buf,
2567 cstate->raw_buf + cstate->raw_buf_index,
2568 prev_raw_ptr - cstate->raw_buf_index);
2569 cstate->raw_buf_index = raw_buf_ptr;
2570 result = true; /* report EOF */
2571 break;
2573 else if (!cstate->csv_mode)
2576 * If we are here, it means we found a backslash followed by
2577 * something other than a period. In non-CSV mode, anything
2578 * after a backslash is special, so we skip over that second
2579 * character too. If we didn't do that \\. would be
2580 * considered an eof-of copy, while in non-CVS mode it is a
2581 * literal backslash followed by a period. In CSV mode,
2582 * backslashes are not special, so we want to process the
2583 * character after the backslash just like a normal character,
2584 * so we don't increment in those cases.
2586 raw_buf_ptr++;
2590 * This label is for CSV cases where \. appears at the start of a
2591 * line, but there is more text after it, meaning it was a data value.
2592 * We are more strict for \. in CSV mode because \. could be a data
2593 * value, while in non-CSV mode, \. cannot be a data value.
2595 not_end_of_copy:
2598 * Process all bytes of a multi-byte character as a group.
2600 * We only support multi-byte sequences where the first byte has the
2601 * high-bit set, so as an optimization we can avoid this block
2602 * entirely if it is not set.
2604 if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
2606 int mblen;
2608 mblen_str[0] = c;
2609 /* All our encodings only read the first byte to get the length */
2610 mblen = pg_encoding_mblen(cstate->client_encoding, mblen_str);
2611 IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
2612 IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
2613 raw_buf_ptr += mblen - 1;
2615 first_char_in_line = false;
2616 } /* end of outer loop */
2619 * Transfer any still-uncopied data to line_buf.
2621 REFILL_LINEBUF;
2623 return result;
2627 * Return decimal value for a hexadecimal digit
2629 static int
2630 GetDecimalFromHex(char hex)
2632 if (isdigit((unsigned char) hex))
2633 return hex - '0';
2634 else
2635 return tolower((unsigned char) hex) - 'a' + 10;
2639 * Parse the current line into separate attributes (fields),
2640 * performing de-escaping as needed.
2642 * The input is in line_buf. We use attribute_buf to hold the result
2643 * strings. fieldvals[k] is set to point to the k'th attribute string,
2644 * or NULL when the input matches the null marker string. (Note that the
2645 * caller cannot check for nulls since the returned string would be the
2646 * post-de-escaping equivalent, which may look the same as some valid data
2647 * string.)
2649 * delim is the column delimiter string (must be just one byte for now).
2650 * null_print is the null marker string. Note that this is compared to
2651 * the pre-de-escaped input string.
2653 * The return value is the number of fields actually read. (We error out
2654 * if this would exceed maxfields, which is the length of fieldvals[].)
2656 static int
2657 CopyReadAttributesText(CopyState cstate, int maxfields, char **fieldvals)
2659 char delimc = cstate->delim[0];
2660 int fieldno;
2661 char *output_ptr;
2662 char *cur_ptr;
2663 char *line_end_ptr;
2666 * We need a special case for zero-column tables: check that the input
2667 * line is empty, and return.
2669 if (maxfields <= 0)
2671 if (cstate->line_buf.len != 0)
2672 ereport(ERROR,
2673 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2674 errmsg("extra data after last expected column")));
2675 return 0;
2678 resetStringInfo(&cstate->attribute_buf);
2681 * The de-escaped attributes will certainly not be longer than the input
2682 * data line, so we can just force attribute_buf to be large enough and
2683 * then transfer data without any checks for enough space. We need to do
2684 * it this way because enlarging attribute_buf mid-stream would invalidate
2685 * pointers already stored into fieldvals[].
2687 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
2688 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
2689 output_ptr = cstate->attribute_buf.data;
2691 /* set pointer variables for loop */
2692 cur_ptr = cstate->line_buf.data;
2693 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
2695 /* Outer loop iterates over fields */
2696 fieldno = 0;
2697 for (;;)
2699 bool found_delim = false;
2700 char *start_ptr;
2701 char *end_ptr;
2702 int input_len;
2703 bool saw_high_bit = false;
2705 /* Make sure space remains in fieldvals[] */
2706 if (fieldno >= maxfields)
2707 ereport(ERROR,
2708 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2709 errmsg("extra data after last expected column")));
2711 /* Remember start of field on both input and output sides */
2712 start_ptr = cur_ptr;
2713 fieldvals[fieldno] = output_ptr;
2715 /* Scan data for field */
2716 for (;;)
2718 char c;
2720 end_ptr = cur_ptr;
2721 if (cur_ptr >= line_end_ptr)
2722 break;
2723 c = *cur_ptr++;
2724 if (c == delimc)
2726 found_delim = true;
2727 break;
2729 if (c == '\\')
2731 if (cur_ptr >= line_end_ptr)
2732 break;
2733 c = *cur_ptr++;
2734 switch (c)
2736 case '0':
2737 case '1':
2738 case '2':
2739 case '3':
2740 case '4':
2741 case '5':
2742 case '6':
2743 case '7':
2745 /* handle \013 */
2746 int val;
2748 val = OCTVALUE(c);
2749 if (cur_ptr < line_end_ptr)
2751 c = *cur_ptr;
2752 if (ISOCTAL(c))
2754 cur_ptr++;
2755 val = (val << 3) + OCTVALUE(c);
2756 if (cur_ptr < line_end_ptr)
2758 c = *cur_ptr;
2759 if (ISOCTAL(c))
2761 cur_ptr++;
2762 val = (val << 3) + OCTVALUE(c);
2767 c = val & 0377;
2768 if (IS_HIGHBIT_SET(c))
2769 saw_high_bit = true;
2771 break;
2772 case 'x':
2773 /* Handle \x3F */
2774 if (cur_ptr < line_end_ptr)
2776 char hexchar = *cur_ptr;
2778 if (isxdigit((unsigned char) hexchar))
2780 int val = GetDecimalFromHex(hexchar);
2782 cur_ptr++;
2783 if (cur_ptr < line_end_ptr)
2785 hexchar = *cur_ptr;
2786 if (isxdigit((unsigned char) hexchar))
2788 cur_ptr++;
2789 val = (val << 4) + GetDecimalFromHex(hexchar);
2792 c = val & 0xff;
2793 if (IS_HIGHBIT_SET(c))
2794 saw_high_bit = true;
2797 break;
2798 case 'b':
2799 c = '\b';
2800 break;
2801 case 'f':
2802 c = '\f';
2803 break;
2804 case 'n':
2805 c = '\n';
2806 break;
2807 case 'r':
2808 c = '\r';
2809 break;
2810 case 't':
2811 c = '\t';
2812 break;
2813 case 'v':
2814 c = '\v';
2815 break;
2818 * in all other cases, take the char after '\'
2819 * literally
2824 /* Add c to output string */
2825 *output_ptr++ = c;
2828 /* Terminate attribute value in output area */
2829 *output_ptr++ = '\0';
2832 * If we de-escaped a char with the high bit set, make sure we still
2833 * have valid data for the db encoding. Avoid calling strlen here for
2834 * the sake of efficiency.
2836 if (saw_high_bit)
2838 char *fld = fieldvals[fieldno];
2840 pg_verifymbstr(fld, output_ptr - (fld + 1), false);
2843 /* Check whether raw input matched null marker */
2844 input_len = end_ptr - start_ptr;
2845 if (input_len == cstate->null_print_len &&
2846 strncmp(start_ptr, cstate->null_print, input_len) == 0)
2847 fieldvals[fieldno] = NULL;
2849 fieldno++;
2850 /* Done if we hit EOL instead of a delim */
2851 if (!found_delim)
2852 break;
2855 /* Clean up state of attribute_buf */
2856 output_ptr--;
2857 Assert(*output_ptr == '\0');
2858 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
2860 return fieldno;
2864 * Parse the current line into separate attributes (fields),
2865 * performing de-escaping as needed. This has exactly the same API as
2866 * CopyReadAttributesText, except we parse the fields according to
2867 * "standard" (i.e. common) CSV usage.
2869 static int
2870 CopyReadAttributesCSV(CopyState cstate, int maxfields, char **fieldvals)
2872 char delimc = cstate->delim[0];
2873 char quotec = cstate->quote[0];
2874 char escapec = cstate->escape[0];
2875 int fieldno;
2876 char *output_ptr;
2877 char *cur_ptr;
2878 char *line_end_ptr;
2881 * We need a special case for zero-column tables: check that the input
2882 * line is empty, and return.
2884 if (maxfields <= 0)
2886 if (cstate->line_buf.len != 0)
2887 ereport(ERROR,
2888 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2889 errmsg("extra data after last expected column")));
2890 return 0;
2893 resetStringInfo(&cstate->attribute_buf);
2896 * The de-escaped attributes will certainly not be longer than the input
2897 * data line, so we can just force attribute_buf to be large enough and
2898 * then transfer data without any checks for enough space. We need to do
2899 * it this way because enlarging attribute_buf mid-stream would invalidate
2900 * pointers already stored into fieldvals[].
2902 if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
2903 enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
2904 output_ptr = cstate->attribute_buf.data;
2906 /* set pointer variables for loop */
2907 cur_ptr = cstate->line_buf.data;
2908 line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
2910 /* Outer loop iterates over fields */
2911 fieldno = 0;
2912 for (;;)
2914 bool found_delim = false;
2915 bool saw_quote = false;
2916 char *start_ptr;
2917 char *end_ptr;
2918 int input_len;
2920 /* Make sure space remains in fieldvals[] */
2921 if (fieldno >= maxfields)
2922 ereport(ERROR,
2923 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2924 errmsg("extra data after last expected column")));
2926 /* Remember start of field on both input and output sides */
2927 start_ptr = cur_ptr;
2928 fieldvals[fieldno] = output_ptr;
2930 /* Scan data for field,
2932 * The loop starts in "not quote" mode and then toggles between
2933 * that and "in quote" mode.
2934 * The loop exits normally if it is in "not quote" mode and a
2935 * delimiter or line end is seen.
2937 for (;;)
2939 char c;
2941 /* Not in quote */
2942 for (;;)
2944 end_ptr = cur_ptr;
2945 if (cur_ptr >= line_end_ptr)
2946 goto endfield;
2947 c = *cur_ptr++;
2948 /* unquoted field delimiter */
2949 if (c == delimc)
2951 found_delim = true;
2952 goto endfield;
2954 /* start of quoted field (or part of field) */
2955 if (c == quotec)
2957 saw_quote = true;
2958 break;
2960 /* Add c to output string */
2961 *output_ptr++ = c;
2964 /* In quote */
2965 for (;;)
2967 end_ptr = cur_ptr;
2968 if (cur_ptr >= line_end_ptr)
2969 ereport(ERROR,
2970 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
2971 errmsg("unterminated CSV quoted field")));
2973 c = *cur_ptr++;
2975 /* escape within a quoted field */
2976 if (c == escapec)
2979 * peek at the next char if available, and escape it if it is
2980 * an escape char or a quote char
2982 if (cur_ptr < line_end_ptr)
2984 char nextc = *cur_ptr;
2986 if (nextc == escapec || nextc == quotec)
2988 *output_ptr++ = nextc;
2989 cur_ptr++;
2990 continue;
2995 * end of quoted field. Must do this test after testing for escape
2996 * in case quote char and escape char are the same (which is the
2997 * common case).
2999 if (c == quotec)
3000 break;
3002 /* Add c to output string */
3003 *output_ptr++ = c;
3006 endfield:
3008 /* Terminate attribute value in output area */
3009 *output_ptr++ = '\0';
3011 /* Check whether raw input matched null marker */
3012 input_len = end_ptr - start_ptr;
3013 if (!saw_quote && input_len == cstate->null_print_len &&
3014 strncmp(start_ptr, cstate->null_print, input_len) == 0)
3015 fieldvals[fieldno] = NULL;
3017 fieldno++;
3018 /* Done if we hit EOL instead of a delim */
3019 if (!found_delim)
3020 break;
3023 /* Clean up state of attribute_buf */
3024 output_ptr--;
3025 Assert(*output_ptr == '\0');
3026 cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
3028 return fieldno;
3033 * Read a binary attribute
3035 static Datum
3036 CopyReadBinaryAttribute(CopyState cstate,
3037 int column_no, FmgrInfo *flinfo,
3038 Oid typioparam, int32 typmod,
3039 bool *isnull)
3041 int32 fld_size;
3042 Datum result;
3044 if (!CopyGetInt32(cstate, &fld_size))
3045 ereport(ERROR,
3046 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3047 errmsg("unexpected EOF in COPY data")));
3048 if (fld_size == -1)
3050 *isnull = true;
3051 return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
3053 if (fld_size < 0)
3054 ereport(ERROR,
3055 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3056 errmsg("invalid field size")));
3058 /* reset attribute_buf to empty, and load raw data in it */
3059 resetStringInfo(&cstate->attribute_buf);
3061 enlargeStringInfo(&cstate->attribute_buf, fld_size);
3062 if (CopyGetData(cstate, cstate->attribute_buf.data,
3063 fld_size, fld_size) != fld_size)
3064 ereport(ERROR,
3065 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3066 errmsg("unexpected EOF in COPY data")));
3068 cstate->attribute_buf.len = fld_size;
3069 cstate->attribute_buf.data[fld_size] = '\0';
3071 /* Call the column type's binary input converter */
3072 result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
3073 typioparam, typmod);
3075 /* Trouble if it didn't eat the whole buffer */
3076 if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
3077 ereport(ERROR,
3078 (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
3079 errmsg("incorrect binary data format")));
3081 *isnull = false;
3082 return result;
3086 * Send text representation of one attribute, with conversion and escaping
3088 #define DUMPSOFAR() \
3089 do { \
3090 if (ptr > start) \
3091 CopySendData(cstate, start, ptr - start); \
3092 } while (0)
3094 static void
3095 CopyAttributeOutText(CopyState cstate, char *string)
3097 char *ptr;
3098 char *start;
3099 char c;
3100 char delimc = cstate->delim[0];
3102 if (cstate->need_transcoding)
3103 ptr = pg_server_to_client(string, strlen(string));
3104 else
3105 ptr = string;
3108 * We have to grovel through the string searching for control characters
3109 * and instances of the delimiter character. In most cases, though, these
3110 * are infrequent. To avoid overhead from calling CopySendData once per
3111 * character, we dump out all characters between escaped characters in a
3112 * single call. The loop invariant is that the data from "start" to "ptr"
3113 * can be sent literally, but hasn't yet been.
3115 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
3116 * in valid backend encodings, extra bytes of a multibyte character never
3117 * look like ASCII. This loop is sufficiently performance-critical that
3118 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
3119 * of the normal safe-encoding path.
3121 if (cstate->encoding_embeds_ascii)
3123 start = ptr;
3124 while ((c = *ptr) != '\0')
3126 if ((unsigned char) c < (unsigned char) 0x20)
3129 * \r and \n must be escaped, the others are traditional.
3130 * We prefer to dump these using the C-like notation, rather
3131 * than a backslash and the literal character, because it
3132 * makes the dump file a bit more proof against Microsoftish
3133 * data mangling.
3135 switch (c)
3137 case '\b':
3138 c = 'b';
3139 break;
3140 case '\f':
3141 c = 'f';
3142 break;
3143 case '\n':
3144 c = 'n';
3145 break;
3146 case '\r':
3147 c = 'r';
3148 break;
3149 case '\t':
3150 c = 't';
3151 break;
3152 case '\v':
3153 c = 'v';
3154 break;
3155 default:
3156 /* If it's the delimiter, must backslash it */
3157 if (c == delimc)
3158 break;
3159 /* All ASCII control chars are length 1 */
3160 ptr++;
3161 continue; /* fall to end of loop */
3163 /* if we get here, we need to convert the control char */
3164 DUMPSOFAR();
3165 CopySendChar(cstate, '\\');
3166 CopySendChar(cstate, c);
3167 start = ++ptr; /* do not include char in next run */
3169 else if (c == '\\' || c == delimc)
3171 DUMPSOFAR();
3172 CopySendChar(cstate, '\\');
3173 start = ptr++; /* we include char in next run */
3175 else if (IS_HIGHBIT_SET(c))
3176 ptr += pg_encoding_mblen(cstate->client_encoding, ptr);
3177 else
3178 ptr++;
3181 else
3183 start = ptr;
3184 while ((c = *ptr) != '\0')
3186 if ((unsigned char) c < (unsigned char) 0x20)
3189 * \r and \n must be escaped, the others are traditional.
3190 * We prefer to dump these using the C-like notation, rather
3191 * than a backslash and the literal character, because it
3192 * makes the dump file a bit more proof against Microsoftish
3193 * data mangling.
3195 switch (c)
3197 case '\b':
3198 c = 'b';
3199 break;
3200 case '\f':
3201 c = 'f';
3202 break;
3203 case '\n':
3204 c = 'n';
3205 break;
3206 case '\r':
3207 c = 'r';
3208 break;
3209 case '\t':
3210 c = 't';
3211 break;
3212 case '\v':
3213 c = 'v';
3214 break;
3215 default:
3216 /* If it's the delimiter, must backslash it */
3217 if (c == delimc)
3218 break;
3219 /* All ASCII control chars are length 1 */
3220 ptr++;
3221 continue; /* fall to end of loop */
3223 /* if we get here, we need to convert the control char */
3224 DUMPSOFAR();
3225 CopySendChar(cstate, '\\');
3226 CopySendChar(cstate, c);
3227 start = ++ptr; /* do not include char in next run */
3229 else if (c == '\\' || c == delimc)
3231 DUMPSOFAR();
3232 CopySendChar(cstate, '\\');
3233 start = ptr++; /* we include char in next run */
3235 else
3236 ptr++;
3240 DUMPSOFAR();
3244 * Send text representation of one attribute, with conversion and
3245 * CSV-style escaping
3247 static void
3248 CopyAttributeOutCSV(CopyState cstate, char *string,
3249 bool use_quote, bool single_attr)
3251 char *ptr;
3252 char *start;
3253 char c;
3254 char delimc = cstate->delim[0];
3255 char quotec = cstate->quote[0];
3256 char escapec = cstate->escape[0];
3258 /* force quoting if it matches null_print (before conversion!) */
3259 if (!use_quote && strcmp(string, cstate->null_print) == 0)
3260 use_quote = true;
3262 if (cstate->need_transcoding)
3263 ptr = pg_server_to_client(string, strlen(string));
3264 else
3265 ptr = string;
3268 * Make a preliminary pass to discover if it needs quoting
3270 if (!use_quote)
3273 * Because '\.' can be a data value, quote it if it appears alone on a
3274 * line so it is not interpreted as the end-of-data marker.
3276 if (single_attr && strcmp(ptr, "\\.") == 0)
3277 use_quote = true;
3278 else
3280 char *tptr = ptr;
3282 while ((c = *tptr) != '\0')
3284 if (c == delimc || c == quotec || c == '\n' || c == '\r')
3286 use_quote = true;
3287 break;
3289 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
3290 tptr += pg_encoding_mblen(cstate->client_encoding, tptr);
3291 else
3292 tptr++;
3297 if (use_quote)
3299 CopySendChar(cstate, quotec);
3302 * We adopt the same optimization strategy as in CopyAttributeOutText
3304 start = ptr;
3305 while ((c = *ptr) != '\0')
3307 if (c == quotec || c == escapec)
3309 DUMPSOFAR();
3310 CopySendChar(cstate, escapec);
3311 start = ptr; /* we include char in next run */
3313 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
3314 ptr += pg_encoding_mblen(cstate->client_encoding, ptr);
3315 else
3316 ptr++;
3318 DUMPSOFAR();
3320 CopySendChar(cstate, quotec);
3322 else
3324 /* If it doesn't need quoting, we can just dump it as-is */
3325 CopySendString(cstate, ptr);
3330 * CopyGetAttnums - build an integer list of attnums to be copied
3332 * The input attnamelist is either the user-specified column list,
3333 * or NIL if there was none (in which case we want all the non-dropped
3334 * columns).
3336 * rel can be NULL ... it's only used for error reports.
3338 static List *
3339 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
3341 List *attnums = NIL;
3343 if (attnamelist == NIL)
3345 /* Generate default column list */
3346 Form_pg_attribute *attr = tupDesc->attrs;
3347 int attr_count = tupDesc->natts;
3348 int i;
3350 for (i = 0; i < attr_count; i++)
3352 if (attr[i]->attisdropped)
3353 continue;
3354 attnums = lappend_int(attnums, i + 1);
3357 else
3359 /* Validate the user-supplied list and extract attnums */
3360 ListCell *l;
3362 foreach(l, attnamelist)
3364 char *name = strVal(lfirst(l));
3365 int attnum;
3366 int i;
3368 /* Lookup column name */
3369 attnum = InvalidAttrNumber;
3370 for (i = 0; i < tupDesc->natts; i++)
3372 if (tupDesc->attrs[i]->attisdropped)
3373 continue;
3374 if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
3376 attnum = tupDesc->attrs[i]->attnum;
3377 break;
3380 if (attnum == InvalidAttrNumber)
3382 if (rel != NULL)
3383 ereport(ERROR,
3384 (errcode(ERRCODE_UNDEFINED_COLUMN),
3385 errmsg("column \"%s\" of relation \"%s\" does not exist",
3386 name, RelationGetRelationName(rel))));
3387 else
3388 ereport(ERROR,
3389 (errcode(ERRCODE_UNDEFINED_COLUMN),
3390 errmsg("column \"%s\" does not exist",
3391 name)));
3393 /* Check for duplicates */
3394 if (list_member_int(attnums, attnum))
3395 ereport(ERROR,
3396 (errcode(ERRCODE_DUPLICATE_COLUMN),
3397 errmsg("column \"%s\" specified more than once",
3398 name)));
3399 attnums = lappend_int(attnums, attnum);
3403 return attnums;
3408 * copy_dest_startup --- executor startup
3410 static void
3411 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
3413 /* no-op */
3417 * copy_dest_receive --- receive one tuple
3419 static void
3420 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
3422 DR_copy *myState = (DR_copy *) self;
3423 CopyState cstate = myState->cstate;
3425 /* Make sure the tuple is fully deconstructed */
3426 slot_getallattrs(slot);
3428 /* And send the data */
3429 CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
3433 * copy_dest_shutdown --- executor end
3435 static void
3436 copy_dest_shutdown(DestReceiver *self)
3438 /* no-op */
3442 * copy_dest_destroy --- release DestReceiver object
3444 static void
3445 copy_dest_destroy(DestReceiver *self)
3447 pfree(self);
3451 * CreateCopyDestReceiver -- create a suitable DestReceiver object
3453 DestReceiver *
3454 CreateCopyDestReceiver(void)
3456 DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
3458 self->pub.receiveSlot = copy_dest_receive;
3459 self->pub.rStartup = copy_dest_startup;
3460 self->pub.rShutdown = copy_dest_shutdown;
3461 self->pub.rDestroy = copy_dest_destroy;
3462 self->pub.mydest = DestCopyOut;
3464 self->cstate = NULL; /* will be set later */
3466 return (DestReceiver *) self;