Add REJECT_LIMIT option to the COPY command.
[pgsql.git] / src / backend / commands / copy.c
blobbefab92074e6e854d440a4de37c8998e11b1aacd
1 /*-------------------------------------------------------------------------
3 * copy.c
4 * Implements the COPY utility command
6 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/backend/commands/copy.c
13 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
21 #include "access/sysattr.h"
22 #include "access/table.h"
23 #include "access/xact.h"
24 #include "catalog/pg_authid.h"
25 #include "commands/copy.h"
26 #include "commands/defrem.h"
27 #include "executor/executor.h"
28 #include "mb/pg_wchar.h"
29 #include "miscadmin.h"
30 #include "nodes/makefuncs.h"
31 #include "optimizer/optimizer.h"
32 #include "parser/parse_coerce.h"
33 #include "parser/parse_collate.h"
34 #include "parser/parse_expr.h"
35 #include "parser/parse_relation.h"
36 #include "utils/acl.h"
37 #include "utils/builtins.h"
38 #include "utils/lsyscache.h"
39 #include "utils/rel.h"
40 #include "utils/rls.h"
43 * DoCopy executes the SQL COPY statement
45 * Either unload or reload contents of table <relation>, depending on <from>.
46 * (<from> = true means we are inserting into the table.) In the "TO" case
47 * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
48 * or DELETE query.
50 * If <pipe> is false, transfer is between the table and the file named
51 * <filename>. Otherwise, transfer is between the table and our regular
52 * input/output stream. The latter could be either stdin/stdout or a
53 * socket, depending on whether we're running under Postmaster control.
55 * Do not allow a Postgres user without the 'pg_read_server_files' or
56 * 'pg_write_server_files' role to read from or write to a file.
58 * Do not allow the copy if user doesn't have proper permission to access
59 * the table or the specifically requested columns.
61 void
62 DoCopy(ParseState *pstate, const CopyStmt *stmt,
63 int stmt_location, int stmt_len,
64 uint64 *processed)
66 bool is_from = stmt->is_from;
67 bool pipe = (stmt->filename == NULL);
68 Relation rel;
69 Oid relid;
70 RawStmt *query = NULL;
71 Node *whereClause = NULL;
74 * Disallow COPY to/from file or program except to users with the
75 * appropriate role.
77 if (!pipe)
79 if (stmt->is_program)
81 if (!has_privs_of_role(GetUserId(), ROLE_PG_EXECUTE_SERVER_PROGRAM))
82 ereport(ERROR,
83 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
84 errmsg("permission denied to COPY to or from an external program"),
85 errdetail("Only roles with privileges of the \"%s\" role may COPY to or from an external program.",
86 "pg_execute_server_program"),
87 errhint("Anyone can COPY to stdout or from stdin. "
88 "psql's \\copy command also works for anyone.")));
90 else
92 if (is_from && !has_privs_of_role(GetUserId(), ROLE_PG_READ_SERVER_FILES))
93 ereport(ERROR,
94 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
95 errmsg("permission denied to COPY from a file"),
96 errdetail("Only roles with privileges of the \"%s\" role may COPY from a file.",
97 "pg_read_server_files"),
98 errhint("Anyone can COPY to stdout or from stdin. "
99 "psql's \\copy command also works for anyone.")));
101 if (!is_from && !has_privs_of_role(GetUserId(), ROLE_PG_WRITE_SERVER_FILES))
102 ereport(ERROR,
103 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
104 errmsg("permission denied to COPY to a file"),
105 errdetail("Only roles with privileges of the \"%s\" role may COPY to a file.",
106 "pg_write_server_files"),
107 errhint("Anyone can COPY to stdout or from stdin. "
108 "psql's \\copy command also works for anyone.")));
112 if (stmt->relation)
114 LOCKMODE lockmode = is_from ? RowExclusiveLock : AccessShareLock;
115 ParseNamespaceItem *nsitem;
116 RTEPermissionInfo *perminfo;
117 TupleDesc tupDesc;
118 List *attnums;
119 ListCell *cur;
121 Assert(!stmt->query);
123 /* Open and lock the relation, using the appropriate lock type. */
124 rel = table_openrv(stmt->relation, lockmode);
126 relid = RelationGetRelid(rel);
128 nsitem = addRangeTableEntryForRelation(pstate, rel, lockmode,
129 NULL, false, false);
131 perminfo = nsitem->p_perminfo;
132 perminfo->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
134 if (stmt->whereClause)
136 /* add nsitem to query namespace */
137 addNSItemToQuery(pstate, nsitem, false, true, true);
139 /* Transform the raw expression tree */
140 whereClause = transformExpr(pstate, stmt->whereClause, EXPR_KIND_COPY_WHERE);
142 /* Make sure it yields a boolean result. */
143 whereClause = coerce_to_boolean(pstate, whereClause, "WHERE");
145 /* we have to fix its collations too */
146 assign_expr_collations(pstate, whereClause);
148 whereClause = eval_const_expressions(NULL, whereClause);
150 whereClause = (Node *) canonicalize_qual((Expr *) whereClause, false);
151 whereClause = (Node *) make_ands_implicit((Expr *) whereClause);
154 tupDesc = RelationGetDescr(rel);
155 attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
156 foreach(cur, attnums)
158 int attno;
159 Bitmapset **bms;
161 attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber;
162 bms = is_from ? &perminfo->insertedCols : &perminfo->selectedCols;
164 *bms = bms_add_member(*bms, attno);
166 ExecCheckPermissions(pstate->p_rtable, list_make1(perminfo), true);
169 * Permission check for row security policies.
171 * check_enable_rls will ereport(ERROR) if the user has requested
172 * something invalid and will otherwise indicate if we should enable
173 * RLS (returns RLS_ENABLED) or not for this COPY statement.
175 * If the relation has a row security policy and we are to apply it
176 * then perform a "query" copy and allow the normal query processing
177 * to handle the policies.
179 * If RLS is not enabled for this, then just fall through to the
180 * normal non-filtering relation handling.
182 if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
184 SelectStmt *select;
185 ColumnRef *cr;
186 ResTarget *target;
187 RangeVar *from;
188 List *targetList = NIL;
190 if (is_from)
191 ereport(ERROR,
192 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
193 errmsg("COPY FROM not supported with row-level security"),
194 errhint("Use INSERT statements instead.")));
197 * Build target list
199 * If no columns are specified in the attribute list of the COPY
200 * command, then the target list is 'all' columns. Therefore, '*'
201 * should be used as the target list for the resulting SELECT
202 * statement.
204 * In the case that columns are specified in the attribute list,
205 * create a ColumnRef and ResTarget for each column and add them
206 * to the target list for the resulting SELECT statement.
208 if (!stmt->attlist)
210 cr = makeNode(ColumnRef);
211 cr->fields = list_make1(makeNode(A_Star));
212 cr->location = -1;
214 target = makeNode(ResTarget);
215 target->name = NULL;
216 target->indirection = NIL;
217 target->val = (Node *) cr;
218 target->location = -1;
220 targetList = list_make1(target);
222 else
224 ListCell *lc;
226 foreach(lc, stmt->attlist)
229 * Build the ColumnRef for each column. The ColumnRef
230 * 'fields' property is a String node that corresponds to
231 * the column name respectively.
233 cr = makeNode(ColumnRef);
234 cr->fields = list_make1(lfirst(lc));
235 cr->location = -1;
237 /* Build the ResTarget and add the ColumnRef to it. */
238 target = makeNode(ResTarget);
239 target->name = NULL;
240 target->indirection = NIL;
241 target->val = (Node *) cr;
242 target->location = -1;
244 /* Add each column to the SELECT statement's target list */
245 targetList = lappend(targetList, target);
250 * Build RangeVar for from clause, fully qualified based on the
251 * relation which we have opened and locked. Use "ONLY" so that
252 * COPY retrieves rows from only the target table not any
253 * inheritance children, the same as when RLS doesn't apply.
255 from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
256 pstrdup(RelationGetRelationName(rel)),
257 -1);
258 from->inh = false; /* apply ONLY */
260 /* Build query */
261 select = makeNode(SelectStmt);
262 select->targetList = targetList;
263 select->fromClause = list_make1(from);
265 query = makeNode(RawStmt);
266 query->stmt = (Node *) select;
267 query->stmt_location = stmt_location;
268 query->stmt_len = stmt_len;
271 * Close the relation for now, but keep the lock on it to prevent
272 * changes between now and when we start the query-based COPY.
274 * We'll reopen it later as part of the query-based COPY.
276 table_close(rel, NoLock);
277 rel = NULL;
280 else
282 Assert(stmt->query);
284 query = makeNode(RawStmt);
285 query->stmt = stmt->query;
286 query->stmt_location = stmt_location;
287 query->stmt_len = stmt_len;
289 relid = InvalidOid;
290 rel = NULL;
293 if (is_from)
295 CopyFromState cstate;
297 Assert(rel);
299 /* check read-only transaction and parallel mode */
300 if (XactReadOnly && !rel->rd_islocaltemp)
301 PreventCommandIfReadOnly("COPY FROM");
303 cstate = BeginCopyFrom(pstate, rel, whereClause,
304 stmt->filename, stmt->is_program,
305 NULL, stmt->attlist, stmt->options);
306 *processed = CopyFrom(cstate); /* copy from file to database */
307 EndCopyFrom(cstate);
309 else
311 CopyToState cstate;
313 cstate = BeginCopyTo(pstate, rel, query, relid,
314 stmt->filename, stmt->is_program,
315 NULL, stmt->attlist, stmt->options);
316 *processed = DoCopyTo(cstate); /* copy from database to file */
317 EndCopyTo(cstate);
320 if (rel != NULL)
321 table_close(rel, NoLock);
325 * Extract a CopyHeaderChoice value from a DefElem. This is like
326 * defGetBoolean() but also accepts the special value "match".
328 static CopyHeaderChoice
329 defGetCopyHeaderChoice(DefElem *def, bool is_from)
332 * If no parameter value given, assume "true" is meant.
334 if (def->arg == NULL)
335 return COPY_HEADER_TRUE;
338 * Allow 0, 1, "true", "false", "on", "off", or "match".
340 switch (nodeTag(def->arg))
342 case T_Integer:
343 switch (intVal(def->arg))
345 case 0:
346 return COPY_HEADER_FALSE;
347 case 1:
348 return COPY_HEADER_TRUE;
349 default:
350 /* otherwise, error out below */
351 break;
353 break;
354 default:
356 char *sval = defGetString(def);
359 * The set of strings accepted here should match up with the
360 * grammar's opt_boolean_or_string production.
362 if (pg_strcasecmp(sval, "true") == 0)
363 return COPY_HEADER_TRUE;
364 if (pg_strcasecmp(sval, "false") == 0)
365 return COPY_HEADER_FALSE;
366 if (pg_strcasecmp(sval, "on") == 0)
367 return COPY_HEADER_TRUE;
368 if (pg_strcasecmp(sval, "off") == 0)
369 return COPY_HEADER_FALSE;
370 if (pg_strcasecmp(sval, "match") == 0)
372 if (!is_from)
373 ereport(ERROR,
374 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
375 errmsg("cannot use \"%s\" with HEADER in COPY TO",
376 sval)));
377 return COPY_HEADER_MATCH;
380 break;
382 ereport(ERROR,
383 (errcode(ERRCODE_SYNTAX_ERROR),
384 errmsg("%s requires a Boolean value or \"match\"",
385 def->defname)));
386 return COPY_HEADER_FALSE; /* keep compiler quiet */
390 * Extract a CopyOnErrorChoice value from a DefElem.
392 static CopyOnErrorChoice
393 defGetCopyOnErrorChoice(DefElem *def, ParseState *pstate, bool is_from)
395 char *sval = defGetString(def);
397 if (!is_from)
398 ereport(ERROR,
399 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
400 /*- translator: first %s is the name of a COPY option, e.g. ON_ERROR,
401 second %s is a COPY with direction, e.g. COPY TO */
402 errmsg("COPY %s cannot be used with %s", "ON_ERROR", "COPY TO"),
403 parser_errposition(pstate, def->location)));
406 * Allow "stop", or "ignore" values.
408 if (pg_strcasecmp(sval, "stop") == 0)
409 return COPY_ON_ERROR_STOP;
410 if (pg_strcasecmp(sval, "ignore") == 0)
411 return COPY_ON_ERROR_IGNORE;
413 ereport(ERROR,
414 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
415 /*- translator: first %s is the name of a COPY option, e.g. ON_ERROR */
416 errmsg("COPY %s \"%s\" not recognized", "ON_ERROR", sval),
417 parser_errposition(pstate, def->location)));
418 return COPY_ON_ERROR_STOP; /* keep compiler quiet */
422 * Extract REJECT_LIMIT value from a DefElem.
424 static int64
425 defGetCopyRejectLimitOption(DefElem *def)
427 int64 reject_limit = defGetInt64(def);
429 if (reject_limit <= 0)
430 ereport(ERROR,
431 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
432 errmsg("REJECT_LIMIT (%lld) must be greater than zero",
433 (long long) reject_limit)));
435 return reject_limit;
439 * Extract a CopyLogVerbosityChoice value from a DefElem.
441 static CopyLogVerbosityChoice
442 defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate)
444 char *sval;
447 * Allow "silent", "default", or "verbose" values.
449 sval = defGetString(def);
450 if (pg_strcasecmp(sval, "silent") == 0)
451 return COPY_LOG_VERBOSITY_SILENT;
452 if (pg_strcasecmp(sval, "default") == 0)
453 return COPY_LOG_VERBOSITY_DEFAULT;
454 if (pg_strcasecmp(sval, "verbose") == 0)
455 return COPY_LOG_VERBOSITY_VERBOSE;
457 ereport(ERROR,
458 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
459 /*- translator: first %s is the name of a COPY option, e.g. ON_ERROR */
460 errmsg("COPY %s \"%s\" not recognized", "LOG_VERBOSITY", sval),
461 parser_errposition(pstate, def->location)));
462 return COPY_LOG_VERBOSITY_DEFAULT; /* keep compiler quiet */
466 * Process the statement option list for COPY.
468 * Scan the options list (a list of DefElem) and transpose the information
469 * into *opts_out, applying appropriate error checking.
471 * If 'opts_out' is not NULL, it is assumed to be filled with zeroes initially.
473 * This is exported so that external users of the COPY API can sanity-check
474 * a list of options. In that usage, 'opts_out' can be passed as NULL and
475 * the collected data is just leaked until CurrentMemoryContext is reset.
477 * Note that additional checking, such as whether column names listed in FORCE
478 * QUOTE actually exist, has to be applied later. This just checks for
479 * self-consistency of the options list.
481 void
482 ProcessCopyOptions(ParseState *pstate,
483 CopyFormatOptions *opts_out,
484 bool is_from,
485 List *options)
487 bool format_specified = false;
488 bool freeze_specified = false;
489 bool header_specified = false;
490 bool on_error_specified = false;
491 bool log_verbosity_specified = false;
492 bool reject_limit_specified = false;
493 ListCell *option;
495 /* Support external use for option sanity checking */
496 if (opts_out == NULL)
497 opts_out = (CopyFormatOptions *) palloc0(sizeof(CopyFormatOptions));
499 opts_out->file_encoding = -1;
501 /* Extract options from the statement node tree */
502 foreach(option, options)
504 DefElem *defel = lfirst_node(DefElem, option);
506 if (strcmp(defel->defname, "format") == 0)
508 char *fmt = defGetString(defel);
510 if (format_specified)
511 errorConflictingDefElem(defel, pstate);
512 format_specified = true;
513 if (strcmp(fmt, "text") == 0)
514 /* default format */ ;
515 else if (strcmp(fmt, "csv") == 0)
516 opts_out->csv_mode = true;
517 else if (strcmp(fmt, "binary") == 0)
518 opts_out->binary = true;
519 else
520 ereport(ERROR,
521 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
522 errmsg("COPY format \"%s\" not recognized", fmt),
523 parser_errposition(pstate, defel->location)));
525 else if (strcmp(defel->defname, "freeze") == 0)
527 if (freeze_specified)
528 errorConflictingDefElem(defel, pstate);
529 freeze_specified = true;
530 opts_out->freeze = defGetBoolean(defel);
532 else if (strcmp(defel->defname, "delimiter") == 0)
534 if (opts_out->delim)
535 errorConflictingDefElem(defel, pstate);
536 opts_out->delim = defGetString(defel);
538 else if (strcmp(defel->defname, "null") == 0)
540 if (opts_out->null_print)
541 errorConflictingDefElem(defel, pstate);
542 opts_out->null_print = defGetString(defel);
544 else if (strcmp(defel->defname, "default") == 0)
546 if (opts_out->default_print)
547 errorConflictingDefElem(defel, pstate);
548 opts_out->default_print = defGetString(defel);
550 else if (strcmp(defel->defname, "header") == 0)
552 if (header_specified)
553 errorConflictingDefElem(defel, pstate);
554 header_specified = true;
555 opts_out->header_line = defGetCopyHeaderChoice(defel, is_from);
557 else if (strcmp(defel->defname, "quote") == 0)
559 if (opts_out->quote)
560 errorConflictingDefElem(defel, pstate);
561 opts_out->quote = defGetString(defel);
563 else if (strcmp(defel->defname, "escape") == 0)
565 if (opts_out->escape)
566 errorConflictingDefElem(defel, pstate);
567 opts_out->escape = defGetString(defel);
569 else if (strcmp(defel->defname, "force_quote") == 0)
571 if (opts_out->force_quote || opts_out->force_quote_all)
572 errorConflictingDefElem(defel, pstate);
573 if (defel->arg && IsA(defel->arg, A_Star))
574 opts_out->force_quote_all = true;
575 else if (defel->arg && IsA(defel->arg, List))
576 opts_out->force_quote = castNode(List, defel->arg);
577 else
578 ereport(ERROR,
579 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
580 errmsg("argument to option \"%s\" must be a list of column names",
581 defel->defname),
582 parser_errposition(pstate, defel->location)));
584 else if (strcmp(defel->defname, "force_not_null") == 0)
586 if (opts_out->force_notnull || opts_out->force_notnull_all)
587 errorConflictingDefElem(defel, pstate);
588 if (defel->arg && IsA(defel->arg, A_Star))
589 opts_out->force_notnull_all = true;
590 else if (defel->arg && IsA(defel->arg, List))
591 opts_out->force_notnull = castNode(List, defel->arg);
592 else
593 ereport(ERROR,
594 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
595 errmsg("argument to option \"%s\" must be a list of column names",
596 defel->defname),
597 parser_errposition(pstate, defel->location)));
599 else if (strcmp(defel->defname, "force_null") == 0)
601 if (opts_out->force_null || opts_out->force_null_all)
602 errorConflictingDefElem(defel, pstate);
603 if (defel->arg && IsA(defel->arg, A_Star))
604 opts_out->force_null_all = true;
605 else if (defel->arg && IsA(defel->arg, List))
606 opts_out->force_null = castNode(List, defel->arg);
607 else
608 ereport(ERROR,
609 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
610 errmsg("argument to option \"%s\" must be a list of column names",
611 defel->defname),
612 parser_errposition(pstate, defel->location)));
614 else if (strcmp(defel->defname, "convert_selectively") == 0)
617 * Undocumented, not-accessible-from-SQL option: convert only the
618 * named columns to binary form, storing the rest as NULLs. It's
619 * allowed for the column list to be NIL.
621 if (opts_out->convert_selectively)
622 errorConflictingDefElem(defel, pstate);
623 opts_out->convert_selectively = true;
624 if (defel->arg == NULL || IsA(defel->arg, List))
625 opts_out->convert_select = castNode(List, defel->arg);
626 else
627 ereport(ERROR,
628 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
629 errmsg("argument to option \"%s\" must be a list of column names",
630 defel->defname),
631 parser_errposition(pstate, defel->location)));
633 else if (strcmp(defel->defname, "encoding") == 0)
635 if (opts_out->file_encoding >= 0)
636 errorConflictingDefElem(defel, pstate);
637 opts_out->file_encoding = pg_char_to_encoding(defGetString(defel));
638 if (opts_out->file_encoding < 0)
639 ereport(ERROR,
640 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
641 errmsg("argument to option \"%s\" must be a valid encoding name",
642 defel->defname),
643 parser_errposition(pstate, defel->location)));
645 else if (strcmp(defel->defname, "on_error") == 0)
647 if (on_error_specified)
648 errorConflictingDefElem(defel, pstate);
649 on_error_specified = true;
650 opts_out->on_error = defGetCopyOnErrorChoice(defel, pstate, is_from);
652 else if (strcmp(defel->defname, "log_verbosity") == 0)
654 if (log_verbosity_specified)
655 errorConflictingDefElem(defel, pstate);
656 log_verbosity_specified = true;
657 opts_out->log_verbosity = defGetCopyLogVerbosityChoice(defel, pstate);
659 else if (strcmp(defel->defname, "reject_limit") == 0)
661 if (reject_limit_specified)
662 errorConflictingDefElem(defel, pstate);
663 reject_limit_specified = true;
664 opts_out->reject_limit = defGetCopyRejectLimitOption(defel);
666 else
667 ereport(ERROR,
668 (errcode(ERRCODE_SYNTAX_ERROR),
669 errmsg("option \"%s\" not recognized",
670 defel->defname),
671 parser_errposition(pstate, defel->location)));
675 * Check for incompatible options (must do these two before inserting
676 * defaults)
678 if (opts_out->binary && opts_out->delim)
679 ereport(ERROR,
680 (errcode(ERRCODE_SYNTAX_ERROR),
681 /*- translator: %s is the name of a COPY option, e.g. ON_ERROR */
682 errmsg("cannot specify %s in BINARY mode", "DELIMITER")));
684 if (opts_out->binary && opts_out->null_print)
685 ereport(ERROR,
686 (errcode(ERRCODE_SYNTAX_ERROR),
687 errmsg("cannot specify %s in BINARY mode", "NULL")));
689 if (opts_out->binary && opts_out->default_print)
690 ereport(ERROR,
691 (errcode(ERRCODE_SYNTAX_ERROR),
692 errmsg("cannot specify %s in BINARY mode", "DEFAULT")));
694 if (opts_out->binary && opts_out->on_error != COPY_ON_ERROR_STOP)
695 ereport(ERROR,
696 (errcode(ERRCODE_SYNTAX_ERROR),
697 errmsg("only ON_ERROR STOP is allowed in BINARY mode")));
699 /* Set defaults for omitted options */
700 if (!opts_out->delim)
701 opts_out->delim = opts_out->csv_mode ? "," : "\t";
703 if (!opts_out->null_print)
704 opts_out->null_print = opts_out->csv_mode ? "" : "\\N";
705 opts_out->null_print_len = strlen(opts_out->null_print);
707 if (opts_out->csv_mode)
709 if (!opts_out->quote)
710 opts_out->quote = "\"";
711 if (!opts_out->escape)
712 opts_out->escape = opts_out->quote;
715 /* Only single-byte delimiter strings are supported. */
716 if (strlen(opts_out->delim) != 1)
717 ereport(ERROR,
718 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
719 errmsg("COPY delimiter must be a single one-byte character")));
721 /* Disallow end-of-line characters */
722 if (strchr(opts_out->delim, '\r') != NULL ||
723 strchr(opts_out->delim, '\n') != NULL)
724 ereport(ERROR,
725 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
726 errmsg("COPY delimiter cannot be newline or carriage return")));
728 if (strchr(opts_out->null_print, '\r') != NULL ||
729 strchr(opts_out->null_print, '\n') != NULL)
730 ereport(ERROR,
731 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
732 errmsg("COPY null representation cannot use newline or carriage return")));
734 if (opts_out->default_print)
736 opts_out->default_print_len = strlen(opts_out->default_print);
738 if (strchr(opts_out->default_print, '\r') != NULL ||
739 strchr(opts_out->default_print, '\n') != NULL)
740 ereport(ERROR,
741 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
742 errmsg("COPY default representation cannot use newline or carriage return")));
746 * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
747 * backslash because it would be ambiguous. We can't allow the other
748 * cases because data characters matching the delimiter must be
749 * backslashed, and certain backslash combinations are interpreted
750 * non-literally by COPY IN. Disallowing all lower case ASCII letters is
751 * more than strictly necessary, but seems best for consistency and
752 * future-proofing. Likewise we disallow all digits though only octal
753 * digits are actually dangerous.
755 if (!opts_out->csv_mode &&
756 strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
757 opts_out->delim[0]) != NULL)
758 ereport(ERROR,
759 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
760 errmsg("COPY delimiter cannot be \"%s\"", opts_out->delim)));
762 /* Check header */
763 if (opts_out->binary && opts_out->header_line)
764 ereport(ERROR,
765 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
766 /*- translator: %s is the name of a COPY option, e.g. ON_ERROR */
767 errmsg("cannot specify %s in BINARY mode", "HEADER")));
769 /* Check quote */
770 if (!opts_out->csv_mode && opts_out->quote != NULL)
771 ereport(ERROR,
772 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
773 /*- translator: %s is the name of a COPY option, e.g. ON_ERROR */
774 errmsg("COPY %s requires CSV mode", "QUOTE")));
776 if (opts_out->csv_mode && strlen(opts_out->quote) != 1)
777 ereport(ERROR,
778 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
779 errmsg("COPY quote must be a single one-byte character")));
781 if (opts_out->csv_mode && opts_out->delim[0] == opts_out->quote[0])
782 ereport(ERROR,
783 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
784 errmsg("COPY delimiter and quote must be different")));
786 /* Check escape */
787 if (!opts_out->csv_mode && opts_out->escape != NULL)
788 ereport(ERROR,
789 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
790 /*- translator: %s is the name of a COPY option, e.g. ON_ERROR */
791 errmsg("COPY %s requires CSV mode", "ESCAPE")));
793 if (opts_out->csv_mode && strlen(opts_out->escape) != 1)
794 ereport(ERROR,
795 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
796 errmsg("COPY escape must be a single one-byte character")));
798 /* Check force_quote */
799 if (!opts_out->csv_mode && (opts_out->force_quote || opts_out->force_quote_all))
800 ereport(ERROR,
801 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
802 /*- translator: %s is the name of a COPY option, e.g. ON_ERROR */
803 errmsg("COPY %s requires CSV mode", "FORCE_QUOTE")));
804 if ((opts_out->force_quote || opts_out->force_quote_all) && is_from)
805 ereport(ERROR,
806 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
807 /*- translator: first %s is the name of a COPY option, e.g. ON_ERROR,
808 second %s is a COPY with direction, e.g. COPY TO */
809 errmsg("COPY %s cannot be used with %s", "FORCE_QUOTE",
810 "COPY FROM")));
812 /* Check force_notnull */
813 if (!opts_out->csv_mode && opts_out->force_notnull != NIL)
814 ereport(ERROR,
815 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
816 /*- translator: %s is the name of a COPY option, e.g. ON_ERROR */
817 errmsg("COPY %s requires CSV mode", "FORCE_NOT_NULL")));
818 if (opts_out->force_notnull != NIL && !is_from)
819 ereport(ERROR,
820 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
821 /*- translator: first %s is the name of a COPY option, e.g. ON_ERROR,
822 second %s is a COPY with direction, e.g. COPY TO */
823 errmsg("COPY %s cannot be used with %s", "FORCE_NOT_NULL",
824 "COPY TO")));
826 /* Check force_null */
827 if (!opts_out->csv_mode && opts_out->force_null != NIL)
828 ereport(ERROR,
829 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
830 /*- translator: %s is the name of a COPY option, e.g. ON_ERROR */
831 errmsg("COPY %s requires CSV mode", "FORCE_NULL")));
833 if (opts_out->force_null != NIL && !is_from)
834 ereport(ERROR,
835 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
836 /*- translator: first %s is the name of a COPY option, e.g. ON_ERROR,
837 second %s is a COPY with direction, e.g. COPY TO */
838 errmsg("COPY %s cannot be used with %s", "FORCE_NULL",
839 "COPY TO")));
841 /* Don't allow the delimiter to appear in the null string. */
842 if (strchr(opts_out->null_print, opts_out->delim[0]) != NULL)
843 ereport(ERROR,
844 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
845 /*- translator: %s is the name of a COPY option, e.g. NULL */
846 errmsg("COPY delimiter character must not appear in the %s specification",
847 "NULL")));
849 /* Don't allow the CSV quote char to appear in the null string. */
850 if (opts_out->csv_mode &&
851 strchr(opts_out->null_print, opts_out->quote[0]) != NULL)
852 ereport(ERROR,
853 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
854 /*- translator: %s is the name of a COPY option, e.g. NULL */
855 errmsg("CSV quote character must not appear in the %s specification",
856 "NULL")));
858 /* Check freeze */
859 if (opts_out->freeze && !is_from)
860 ereport(ERROR,
861 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
862 /*- translator: first %s is the name of a COPY option, e.g. ON_ERROR,
863 second %s is a COPY with direction, e.g. COPY TO */
864 errmsg("COPY %s cannot be used with %s", "FREEZE",
865 "COPY TO")));
867 if (opts_out->default_print)
869 if (!is_from)
870 ereport(ERROR,
871 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
872 /*- translator: first %s is the name of a COPY option, e.g. ON_ERROR,
873 second %s is a COPY with direction, e.g. COPY TO */
874 errmsg("COPY %s cannot be used with %s", "DEFAULT",
875 "COPY TO")));
877 /* Don't allow the delimiter to appear in the default string. */
878 if (strchr(opts_out->default_print, opts_out->delim[0]) != NULL)
879 ereport(ERROR,
880 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
881 /*- translator: %s is the name of a COPY option, e.g. NULL */
882 errmsg("COPY delimiter character must not appear in the %s specification",
883 "DEFAULT")));
885 /* Don't allow the CSV quote char to appear in the default string. */
886 if (opts_out->csv_mode &&
887 strchr(opts_out->default_print, opts_out->quote[0]) != NULL)
888 ereport(ERROR,
889 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
890 /*- translator: %s is the name of a COPY option, e.g. NULL */
891 errmsg("CSV quote character must not appear in the %s specification",
892 "DEFAULT")));
894 /* Don't allow the NULL and DEFAULT string to be the same */
895 if (opts_out->null_print_len == opts_out->default_print_len &&
896 strncmp(opts_out->null_print, opts_out->default_print,
897 opts_out->null_print_len) == 0)
898 ereport(ERROR,
899 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
900 errmsg("NULL specification and DEFAULT specification cannot be the same")));
902 /* Check on_error */
903 if (opts_out->reject_limit && !opts_out->on_error)
904 ereport(ERROR,
905 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
906 /*- translator: first and second %s are the names of COPY option, e.g.
907 * ON_ERROR, third is the value of the COPY option, e.g. IGNORE */
908 errmsg("COPY %s requires %s to be set to %s",
909 "REJECT_LIMIT", "ON_ERROR", "IGNORE")));
913 * CopyGetAttnums - build an integer list of attnums to be copied
915 * The input attnamelist is either the user-specified column list,
916 * or NIL if there was none (in which case we want all the non-dropped
917 * columns).
919 * We don't include generated columns in the generated full list and we don't
920 * allow them to be specified explicitly. They don't make sense for COPY
921 * FROM, but we could possibly allow them for COPY TO. But this way it's at
922 * least ensured that whatever we copy out can be copied back in.
924 * rel can be NULL ... it's only used for error reports.
926 List *
927 CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
929 List *attnums = NIL;
931 if (attnamelist == NIL)
933 /* Generate default column list */
934 int attr_count = tupDesc->natts;
935 int i;
937 for (i = 0; i < attr_count; i++)
939 if (TupleDescAttr(tupDesc, i)->attisdropped)
940 continue;
941 if (TupleDescAttr(tupDesc, i)->attgenerated)
942 continue;
943 attnums = lappend_int(attnums, i + 1);
946 else
948 /* Validate the user-supplied list and extract attnums */
949 ListCell *l;
951 foreach(l, attnamelist)
953 char *name = strVal(lfirst(l));
954 int attnum;
955 int i;
957 /* Lookup column name */
958 attnum = InvalidAttrNumber;
959 for (i = 0; i < tupDesc->natts; i++)
961 Form_pg_attribute att = TupleDescAttr(tupDesc, i);
963 if (att->attisdropped)
964 continue;
965 if (namestrcmp(&(att->attname), name) == 0)
967 if (att->attgenerated)
968 ereport(ERROR,
969 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
970 errmsg("column \"%s\" is a generated column",
971 name),
972 errdetail("Generated columns cannot be used in COPY.")));
973 attnum = att->attnum;
974 break;
977 if (attnum == InvalidAttrNumber)
979 if (rel != NULL)
980 ereport(ERROR,
981 (errcode(ERRCODE_UNDEFINED_COLUMN),
982 errmsg("column \"%s\" of relation \"%s\" does not exist",
983 name, RelationGetRelationName(rel))));
984 else
985 ereport(ERROR,
986 (errcode(ERRCODE_UNDEFINED_COLUMN),
987 errmsg("column \"%s\" does not exist",
988 name)));
990 /* Check for duplicates */
991 if (list_member_int(attnums, attnum))
992 ereport(ERROR,
993 (errcode(ERRCODE_DUPLICATE_COLUMN),
994 errmsg("column \"%s\" specified more than once",
995 name)));
996 attnums = lappend_int(attnums, attnum);
1000 return attnums;