Move routines to manipulate WAL into PostgreSQL::Test::Cluster
[pgsql.git] / src / backend / commands / copyto.c
blob99cb23cb347f72dd1a5c4a58a011ce4f4a499a76
1 /*-------------------------------------------------------------------------
3 * copyto.c
4 * COPY <table> TO file/program/client
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/backend/commands/copyto.c
13 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include <ctype.h>
18 #include <unistd.h>
19 #include <sys/stat.h>
21 #include "access/tableam.h"
22 #include "commands/copy.h"
23 #include "commands/progress.h"
24 #include "executor/execdesc.h"
25 #include "executor/executor.h"
26 #include "executor/tuptable.h"
27 #include "libpq/libpq.h"
28 #include "libpq/pqformat.h"
29 #include "mb/pg_wchar.h"
30 #include "miscadmin.h"
31 #include "pgstat.h"
32 #include "storage/fd.h"
33 #include "tcop/tcopprot.h"
34 #include "utils/lsyscache.h"
35 #include "utils/memutils.h"
36 #include "utils/rel.h"
37 #include "utils/snapmgr.h"
40 * Represents the different dest cases we need to worry about at
41 * the bottom level
43 typedef enum CopyDest
45 COPY_FILE, /* to file (or a piped program) */
46 COPY_FRONTEND, /* to frontend */
47 COPY_CALLBACK, /* to callback function */
48 } CopyDest;
51 * This struct contains all the state variables used throughout a COPY TO
52 * operation.
54 * Multi-byte encodings: all supported client-side encodings encode multi-byte
55 * characters by having the first byte's high bit set. Subsequent bytes of the
56 * character can have the high bit not set. When scanning data in such an
57 * encoding to look for a match to a single-byte (ie ASCII) character, we must
58 * use the full pg_encoding_mblen() machinery to skip over multibyte
59 * characters, else we might find a false match to a trailing byte. In
60 * supported server encodings, there is no possibility of a false match, and
61 * it's faster to make useless comparisons to trailing bytes than it is to
62 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
63 * when we have to do it the hard way.
65 typedef struct CopyToStateData
67 /* low-level state data */
68 CopyDest copy_dest; /* type of copy source/destination */
69 FILE *copy_file; /* used if copy_dest == COPY_FILE */
70 StringInfo fe_msgbuf; /* used for all dests during COPY TO */
72 int file_encoding; /* file or remote side's character encoding */
73 bool need_transcoding; /* file encoding diff from server? */
74 bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
76 /* parameters from the COPY command */
77 Relation rel; /* relation to copy to */
78 QueryDesc *queryDesc; /* executable query to copy from */
79 List *attnumlist; /* integer list of attnums to copy */
80 char *filename; /* filename, or NULL for STDOUT */
81 bool is_program; /* is 'filename' a program to popen? */
82 copy_data_dest_cb data_dest_cb; /* function for writing data */
84 CopyFormatOptions opts;
85 Node *whereClause; /* WHERE condition (or NULL) */
88 * Working state
90 MemoryContext copycontext; /* per-copy execution context */
92 FmgrInfo *out_functions; /* lookup info for output functions */
93 MemoryContext rowcontext; /* per-row evaluation context */
94 uint64 bytes_processed; /* number of bytes processed so far */
95 } CopyToStateData;
97 /* DestReceiver for COPY (query) TO */
98 typedef struct
100 DestReceiver pub; /* publicly-known function pointers */
101 CopyToState cstate; /* CopyToStateData for the command */
102 uint64 processed; /* # of tuples processed */
103 } DR_copy;
105 /* NOTE: there's a copy of this in copyfromparse.c */
106 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
109 /* non-export function prototypes */
110 static void EndCopy(CopyToState cstate);
111 static void ClosePipeToProgram(CopyToState cstate);
112 static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
113 static void CopyAttributeOutText(CopyToState cstate, const char *string);
114 static void CopyAttributeOutCSV(CopyToState cstate, const char *string,
115 bool use_quote);
117 /* Low-level communications functions */
118 static void SendCopyBegin(CopyToState cstate);
119 static void SendCopyEnd(CopyToState cstate);
120 static void CopySendData(CopyToState cstate, const void *databuf, int datasize);
121 static void CopySendString(CopyToState cstate, const char *str);
122 static void CopySendChar(CopyToState cstate, char c);
123 static void CopySendEndOfRow(CopyToState cstate);
124 static void CopySendInt32(CopyToState cstate, int32 val);
125 static void CopySendInt16(CopyToState cstate, int16 val);
129 * Send copy start/stop messages for frontend copies. These have changed
130 * in past protocol redesigns.
132 static void
133 SendCopyBegin(CopyToState cstate)
135 StringInfoData buf;
136 int natts = list_length(cstate->attnumlist);
137 int16 format = (cstate->opts.binary ? 1 : 0);
138 int i;
140 pq_beginmessage(&buf, PqMsg_CopyOutResponse);
141 pq_sendbyte(&buf, format); /* overall format */
142 pq_sendint16(&buf, natts);
143 for (i = 0; i < natts; i++)
144 pq_sendint16(&buf, format); /* per-column formats */
145 pq_endmessage(&buf);
146 cstate->copy_dest = COPY_FRONTEND;
149 static void
150 SendCopyEnd(CopyToState cstate)
152 /* Shouldn't have any unsent data */
153 Assert(cstate->fe_msgbuf->len == 0);
154 /* Send Copy Done message */
155 pq_putemptymessage(PqMsg_CopyDone);
158 /*----------
159 * CopySendData sends output data to the destination (file or frontend)
160 * CopySendString does the same for null-terminated strings
161 * CopySendChar does the same for single characters
162 * CopySendEndOfRow does the appropriate thing at end of each data row
163 * (data is not actually flushed except by CopySendEndOfRow)
165 * NB: no data conversion is applied by these functions
166 *----------
168 static void
169 CopySendData(CopyToState cstate, const void *databuf, int datasize)
171 appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
174 static void
175 CopySendString(CopyToState cstate, const char *str)
177 appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
180 static void
181 CopySendChar(CopyToState cstate, char c)
183 appendStringInfoCharMacro(cstate->fe_msgbuf, c);
186 static void
187 CopySendEndOfRow(CopyToState cstate)
189 StringInfo fe_msgbuf = cstate->fe_msgbuf;
191 switch (cstate->copy_dest)
193 case COPY_FILE:
194 if (!cstate->opts.binary)
196 /* Default line termination depends on platform */
197 #ifndef WIN32
198 CopySendChar(cstate, '\n');
199 #else
200 CopySendString(cstate, "\r\n");
201 #endif
204 if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
205 cstate->copy_file) != 1 ||
206 ferror(cstate->copy_file))
208 if (cstate->is_program)
210 if (errno == EPIPE)
213 * The pipe will be closed automatically on error at
214 * the end of transaction, but we might get a better
215 * error message from the subprocess' exit code than
216 * just "Broken Pipe"
218 ClosePipeToProgram(cstate);
221 * If ClosePipeToProgram() didn't throw an error, the
222 * program terminated normally, but closed the pipe
223 * first. Restore errno, and throw an error.
225 errno = EPIPE;
227 ereport(ERROR,
228 (errcode_for_file_access(),
229 errmsg("could not write to COPY program: %m")));
231 else
232 ereport(ERROR,
233 (errcode_for_file_access(),
234 errmsg("could not write to COPY file: %m")));
236 break;
237 case COPY_FRONTEND:
238 /* The FE/BE protocol uses \n as newline for all platforms */
239 if (!cstate->opts.binary)
240 CopySendChar(cstate, '\n');
242 /* Dump the accumulated row as one CopyData message */
243 (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
244 break;
245 case COPY_CALLBACK:
246 cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
247 break;
250 /* Update the progress */
251 cstate->bytes_processed += fe_msgbuf->len;
252 pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
254 resetStringInfo(fe_msgbuf);
258 * These functions do apply some data conversion
262 * CopySendInt32 sends an int32 in network byte order
264 static inline void
265 CopySendInt32(CopyToState cstate, int32 val)
267 uint32 buf;
269 buf = pg_hton32((uint32) val);
270 CopySendData(cstate, &buf, sizeof(buf));
274 * CopySendInt16 sends an int16 in network byte order
276 static inline void
277 CopySendInt16(CopyToState cstate, int16 val)
279 uint16 buf;
281 buf = pg_hton16((uint16) val);
282 CopySendData(cstate, &buf, sizeof(buf));
286 * Closes the pipe to an external program, checking the pclose() return code.
288 static void
289 ClosePipeToProgram(CopyToState cstate)
291 int pclose_rc;
293 Assert(cstate->is_program);
295 pclose_rc = ClosePipeStream(cstate->copy_file);
296 if (pclose_rc == -1)
297 ereport(ERROR,
298 (errcode_for_file_access(),
299 errmsg("could not close pipe to external command: %m")));
300 else if (pclose_rc != 0)
302 ereport(ERROR,
303 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
304 errmsg("program \"%s\" failed",
305 cstate->filename),
306 errdetail_internal("%s", wait_result_to_str(pclose_rc))));
311 * Release resources allocated in a cstate for COPY TO/FROM.
313 static void
314 EndCopy(CopyToState cstate)
316 if (cstate->is_program)
318 ClosePipeToProgram(cstate);
320 else
322 if (cstate->filename != NULL && FreeFile(cstate->copy_file))
323 ereport(ERROR,
324 (errcode_for_file_access(),
325 errmsg("could not close file \"%s\": %m",
326 cstate->filename)));
329 pgstat_progress_end_command();
331 MemoryContextDelete(cstate->copycontext);
332 pfree(cstate);
336 * Setup CopyToState to read tuples from a table or a query for COPY TO.
338 * 'rel': Relation to be copied
339 * 'raw_query': Query whose results are to be copied
340 * 'queryRelId': OID of base relation to convert to a query (for RLS)
341 * 'filename': Name of server-local file to write, NULL for STDOUT
342 * 'is_program': true if 'filename' is program to execute
343 * 'data_dest_cb': Callback that processes the output data
344 * 'attnamelist': List of char *, columns to include. NIL selects all cols.
345 * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
347 * Returns a CopyToState, to be passed to DoCopyTo() and related functions.
349 CopyToState
350 BeginCopyTo(ParseState *pstate,
351 Relation rel,
352 RawStmt *raw_query,
353 Oid queryRelId,
354 const char *filename,
355 bool is_program,
356 copy_data_dest_cb data_dest_cb,
357 List *attnamelist,
358 List *options)
360 CopyToState cstate;
361 bool pipe = (filename == NULL && data_dest_cb == NULL);
362 TupleDesc tupDesc;
363 int num_phys_attrs;
364 MemoryContext oldcontext;
365 const int progress_cols[] = {
366 PROGRESS_COPY_COMMAND,
367 PROGRESS_COPY_TYPE
369 int64 progress_vals[] = {
370 PROGRESS_COPY_COMMAND_TO,
374 if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
376 if (rel->rd_rel->relkind == RELKIND_VIEW)
377 ereport(ERROR,
378 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
379 errmsg("cannot copy from view \"%s\"",
380 RelationGetRelationName(rel)),
381 errhint("Try the COPY (SELECT ...) TO variant.")));
382 else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
383 ereport(ERROR,
384 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
385 errmsg("cannot copy from materialized view \"%s\"",
386 RelationGetRelationName(rel)),
387 errhint("Try the COPY (SELECT ...) TO variant.")));
388 else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
389 ereport(ERROR,
390 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
391 errmsg("cannot copy from foreign table \"%s\"",
392 RelationGetRelationName(rel)),
393 errhint("Try the COPY (SELECT ...) TO variant.")));
394 else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
395 ereport(ERROR,
396 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
397 errmsg("cannot copy from sequence \"%s\"",
398 RelationGetRelationName(rel))));
399 else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
400 ereport(ERROR,
401 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
402 errmsg("cannot copy from partitioned table \"%s\"",
403 RelationGetRelationName(rel)),
404 errhint("Try the COPY (SELECT ...) TO variant.")));
405 else
406 ereport(ERROR,
407 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
408 errmsg("cannot copy from non-table relation \"%s\"",
409 RelationGetRelationName(rel))));
413 /* Allocate workspace and zero all fields */
414 cstate = (CopyToStateData *) palloc0(sizeof(CopyToStateData));
417 * We allocate everything used by a cstate in a new memory context. This
418 * avoids memory leaks during repeated use of COPY in a query.
420 cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
421 "COPY",
422 ALLOCSET_DEFAULT_SIZES);
424 oldcontext = MemoryContextSwitchTo(cstate->copycontext);
426 /* Extract options from the statement node tree */
427 ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
429 /* Process the source/target relation or query */
430 if (rel)
432 Assert(!raw_query);
434 cstate->rel = rel;
436 tupDesc = RelationGetDescr(cstate->rel);
438 else
440 List *rewritten;
441 Query *query;
442 PlannedStmt *plan;
443 DestReceiver *dest;
445 cstate->rel = NULL;
448 * Run parse analysis and rewrite. Note this also acquires sufficient
449 * locks on the source table(s).
451 rewritten = pg_analyze_and_rewrite_fixedparams(raw_query,
452 pstate->p_sourcetext, NULL, 0,
453 NULL);
455 /* check that we got back something we can work with */
456 if (rewritten == NIL)
458 ereport(ERROR,
459 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
460 errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
462 else if (list_length(rewritten) > 1)
464 ListCell *lc;
466 /* examine queries to determine which error message to issue */
467 foreach(lc, rewritten)
469 Query *q = lfirst_node(Query, lc);
471 if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
472 ereport(ERROR,
473 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
474 errmsg("conditional DO INSTEAD rules are not supported for COPY")));
475 if (q->querySource == QSRC_NON_INSTEAD_RULE)
476 ereport(ERROR,
477 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
478 errmsg("DO ALSO rules are not supported for COPY")));
481 ereport(ERROR,
482 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
483 errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
486 query = linitial_node(Query, rewritten);
488 /* The grammar allows SELECT INTO, but we don't support that */
489 if (query->utilityStmt != NULL &&
490 IsA(query->utilityStmt, CreateTableAsStmt))
491 ereport(ERROR,
492 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
493 errmsg("COPY (SELECT INTO) is not supported")));
495 /* The only other utility command we could see is NOTIFY */
496 if (query->utilityStmt != NULL)
497 ereport(ERROR,
498 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
499 errmsg("COPY query must not be a utility command")));
502 * Similarly the grammar doesn't enforce the presence of a RETURNING
503 * clause, but this is required here.
505 if (query->commandType != CMD_SELECT &&
506 query->returningList == NIL)
508 Assert(query->commandType == CMD_INSERT ||
509 query->commandType == CMD_UPDATE ||
510 query->commandType == CMD_DELETE ||
511 query->commandType == CMD_MERGE);
513 ereport(ERROR,
514 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
515 errmsg("COPY query must have a RETURNING clause")));
518 /* plan the query */
519 plan = pg_plan_query(query, pstate->p_sourcetext,
520 CURSOR_OPT_PARALLEL_OK, NULL);
523 * With row-level security and a user using "COPY relation TO", we
524 * have to convert the "COPY relation TO" to a query-based COPY (eg:
525 * "COPY (SELECT * FROM ONLY relation) TO"), to allow the rewriter to
526 * add in any RLS clauses.
528 * When this happens, we are passed in the relid of the originally
529 * found relation (which we have locked). As the planner will look up
530 * the relation again, we double-check here to make sure it found the
531 * same one that we have locked.
533 if (queryRelId != InvalidOid)
536 * Note that with RLS involved there may be multiple relations,
537 * and while the one we need is almost certainly first, we don't
538 * make any guarantees of that in the planner, so check the whole
539 * list and make sure we find the original relation.
541 if (!list_member_oid(plan->relationOids, queryRelId))
542 ereport(ERROR,
543 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
544 errmsg("relation referenced by COPY statement has changed")));
548 * Use a snapshot with an updated command ID to ensure this query sees
549 * results of any previously executed queries.
551 PushCopiedSnapshot(GetActiveSnapshot());
552 UpdateActiveSnapshotCommandId();
554 /* Create dest receiver for COPY OUT */
555 dest = CreateDestReceiver(DestCopyOut);
556 ((DR_copy *) dest)->cstate = cstate;
558 /* Create a QueryDesc requesting no output */
559 cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
560 GetActiveSnapshot(),
561 InvalidSnapshot,
562 dest, NULL, NULL, 0);
565 * Call ExecutorStart to prepare the plan for execution.
567 * ExecutorStart computes a result tupdesc for us
569 ExecutorStart(cstate->queryDesc, 0);
571 tupDesc = cstate->queryDesc->tupDesc;
574 /* Generate or convert list of attributes to process */
575 cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
577 num_phys_attrs = tupDesc->natts;
579 /* Convert FORCE_QUOTE name list to per-column flags, check validity */
580 cstate->opts.force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
581 if (cstate->opts.force_quote_all)
583 MemSet(cstate->opts.force_quote_flags, true, num_phys_attrs * sizeof(bool));
585 else if (cstate->opts.force_quote)
587 List *attnums;
588 ListCell *cur;
590 attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_quote);
592 foreach(cur, attnums)
594 int attnum = lfirst_int(cur);
595 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
597 if (!list_member_int(cstate->attnumlist, attnum))
598 ereport(ERROR,
599 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
600 /*- translator: %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
601 errmsg("%s column \"%s\" not referenced by COPY",
602 "FORCE_QUOTE", NameStr(attr->attname))));
603 cstate->opts.force_quote_flags[attnum - 1] = true;
607 /* Use client encoding when ENCODING option is not specified. */
608 if (cstate->opts.file_encoding < 0)
609 cstate->file_encoding = pg_get_client_encoding();
610 else
611 cstate->file_encoding = cstate->opts.file_encoding;
614 * Set up encoding conversion info if the file and server encodings differ
615 * (see also pg_server_to_any).
617 if (cstate->file_encoding == GetDatabaseEncoding() ||
618 cstate->file_encoding == PG_SQL_ASCII)
619 cstate->need_transcoding = false;
620 else
621 cstate->need_transcoding = true;
623 /* See Multibyte encoding comment above */
624 cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
626 cstate->copy_dest = COPY_FILE; /* default */
628 if (data_dest_cb)
630 progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
631 cstate->copy_dest = COPY_CALLBACK;
632 cstate->data_dest_cb = data_dest_cb;
634 else if (pipe)
636 progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
638 Assert(!is_program); /* the grammar does not allow this */
639 if (whereToSendOutput != DestRemote)
640 cstate->copy_file = stdout;
642 else
644 cstate->filename = pstrdup(filename);
645 cstate->is_program = is_program;
647 if (is_program)
649 progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
650 cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
651 if (cstate->copy_file == NULL)
652 ereport(ERROR,
653 (errcode_for_file_access(),
654 errmsg("could not execute command \"%s\": %m",
655 cstate->filename)));
657 else
659 mode_t oumask; /* Pre-existing umask value */
660 struct stat st;
662 progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
665 * Prevent write to relative path ... too easy to shoot oneself in
666 * the foot by overwriting a database file ...
668 if (!is_absolute_path(filename))
669 ereport(ERROR,
670 (errcode(ERRCODE_INVALID_NAME),
671 errmsg("relative path not allowed for COPY to file")));
673 oumask = umask(S_IWGRP | S_IWOTH);
674 PG_TRY();
676 cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
678 PG_FINALLY();
680 umask(oumask);
682 PG_END_TRY();
683 if (cstate->copy_file == NULL)
685 /* copy errno because ereport subfunctions might change it */
686 int save_errno = errno;
688 ereport(ERROR,
689 (errcode_for_file_access(),
690 errmsg("could not open file \"%s\" for writing: %m",
691 cstate->filename),
692 (save_errno == ENOENT || save_errno == EACCES) ?
693 errhint("COPY TO instructs the PostgreSQL server process to write a file. "
694 "You may want a client-side facility such as psql's \\copy.") : 0));
697 if (fstat(fileno(cstate->copy_file), &st))
698 ereport(ERROR,
699 (errcode_for_file_access(),
700 errmsg("could not stat file \"%s\": %m",
701 cstate->filename)));
703 if (S_ISDIR(st.st_mode))
704 ereport(ERROR,
705 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
706 errmsg("\"%s\" is a directory", cstate->filename)));
710 /* initialize progress */
711 pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
712 cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
713 pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
715 cstate->bytes_processed = 0;
717 MemoryContextSwitchTo(oldcontext);
719 return cstate;
723 * Clean up storage and release resources for COPY TO.
725 void
726 EndCopyTo(CopyToState cstate)
728 if (cstate->queryDesc != NULL)
730 /* Close down the query and free resources. */
731 ExecutorFinish(cstate->queryDesc);
732 ExecutorEnd(cstate->queryDesc);
733 FreeQueryDesc(cstate->queryDesc);
734 PopActiveSnapshot();
737 /* Clean up storage */
738 EndCopy(cstate);
742 * Copy from relation or query TO file.
744 * Returns the number of rows processed.
746 uint64
747 DoCopyTo(CopyToState cstate)
749 bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
750 bool fe_copy = (pipe && whereToSendOutput == DestRemote);
751 TupleDesc tupDesc;
752 int num_phys_attrs;
753 ListCell *cur;
754 uint64 processed;
756 if (fe_copy)
757 SendCopyBegin(cstate);
759 if (cstate->rel)
760 tupDesc = RelationGetDescr(cstate->rel);
761 else
762 tupDesc = cstate->queryDesc->tupDesc;
763 num_phys_attrs = tupDesc->natts;
764 cstate->opts.null_print_client = cstate->opts.null_print; /* default */
766 /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
767 cstate->fe_msgbuf = makeStringInfo();
769 /* Get info about the columns we need to process. */
770 cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
771 foreach(cur, cstate->attnumlist)
773 int attnum = lfirst_int(cur);
774 Oid out_func_oid;
775 bool isvarlena;
776 Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
778 if (cstate->opts.binary)
779 getTypeBinaryOutputInfo(attr->atttypid,
780 &out_func_oid,
781 &isvarlena);
782 else
783 getTypeOutputInfo(attr->atttypid,
784 &out_func_oid,
785 &isvarlena);
786 fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
790 * Create a temporary memory context that we can reset once per row to
791 * recover palloc'd memory. This avoids any problems with leaks inside
792 * datatype output routines, and should be faster than retail pfree's
793 * anyway. (We don't need a whole econtext as CopyFrom does.)
795 cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
796 "COPY TO",
797 ALLOCSET_DEFAULT_SIZES);
799 if (cstate->opts.binary)
801 /* Generate header for a binary copy */
802 int32 tmp;
804 /* Signature */
805 CopySendData(cstate, BinarySignature, 11);
806 /* Flags field */
807 tmp = 0;
808 CopySendInt32(cstate, tmp);
809 /* No header extension */
810 tmp = 0;
811 CopySendInt32(cstate, tmp);
813 else
816 * For non-binary copy, we need to convert null_print to file
817 * encoding, because it will be sent directly with CopySendString.
819 if (cstate->need_transcoding)
820 cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
821 cstate->opts.null_print_len,
822 cstate->file_encoding);
824 /* if a header has been requested send the line */
825 if (cstate->opts.header_line)
827 bool hdr_delim = false;
829 foreach(cur, cstate->attnumlist)
831 int attnum = lfirst_int(cur);
832 char *colname;
834 if (hdr_delim)
835 CopySendChar(cstate, cstate->opts.delim[0]);
836 hdr_delim = true;
838 colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
840 if (cstate->opts.csv_mode)
841 CopyAttributeOutCSV(cstate, colname, false);
842 else
843 CopyAttributeOutText(cstate, colname);
846 CopySendEndOfRow(cstate);
850 if (cstate->rel)
852 TupleTableSlot *slot;
853 TableScanDesc scandesc;
855 scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
856 slot = table_slot_create(cstate->rel, NULL);
858 processed = 0;
859 while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
861 CHECK_FOR_INTERRUPTS();
863 /* Deconstruct the tuple ... */
864 slot_getallattrs(slot);
866 /* Format and send the data */
867 CopyOneRowTo(cstate, slot);
870 * Increment the number of processed tuples, and report the
871 * progress.
873 pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
874 ++processed);
877 ExecDropSingleTupleTableSlot(slot);
878 table_endscan(scandesc);
880 else
882 /* run the plan --- the dest receiver will send tuples */
883 ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0);
884 processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
887 if (cstate->opts.binary)
889 /* Generate trailer for a binary copy */
890 CopySendInt16(cstate, -1);
891 /* Need to flush out the trailer */
892 CopySendEndOfRow(cstate);
895 MemoryContextDelete(cstate->rowcontext);
897 if (fe_copy)
898 SendCopyEnd(cstate);
900 return processed;
904 * Emit one row during DoCopyTo().
906 static void
907 CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
909 FmgrInfo *out_functions = cstate->out_functions;
910 MemoryContext oldcontext;
912 MemoryContextReset(cstate->rowcontext);
913 oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
915 if (cstate->opts.binary)
917 /* Binary per-tuple header */
918 CopySendInt16(cstate, list_length(cstate->attnumlist));
921 /* Make sure the tuple is fully deconstructed */
922 slot_getallattrs(slot);
924 if (!cstate->opts.binary)
926 bool need_delim = false;
928 foreach_int(attnum, cstate->attnumlist)
930 Datum value = slot->tts_values[attnum - 1];
931 bool isnull = slot->tts_isnull[attnum - 1];
932 char *string;
934 if (need_delim)
935 CopySendChar(cstate, cstate->opts.delim[0]);
936 need_delim = true;
938 if (isnull)
939 CopySendString(cstate, cstate->opts.null_print_client);
940 else
942 string = OutputFunctionCall(&out_functions[attnum - 1],
943 value);
944 if (cstate->opts.csv_mode)
945 CopyAttributeOutCSV(cstate, string,
946 cstate->opts.force_quote_flags[attnum - 1]);
947 else
948 CopyAttributeOutText(cstate, string);
952 else
954 foreach_int(attnum, cstate->attnumlist)
956 Datum value = slot->tts_values[attnum - 1];
957 bool isnull = slot->tts_isnull[attnum - 1];
958 bytea *outputbytes;
960 if (isnull)
961 CopySendInt32(cstate, -1);
962 else
964 outputbytes = SendFunctionCall(&out_functions[attnum - 1],
965 value);
966 CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
967 CopySendData(cstate, VARDATA(outputbytes),
968 VARSIZE(outputbytes) - VARHDRSZ);
973 CopySendEndOfRow(cstate);
975 MemoryContextSwitchTo(oldcontext);
979 * Send text representation of one attribute, with conversion and escaping
981 #define DUMPSOFAR() \
982 do { \
983 if (ptr > start) \
984 CopySendData(cstate, start, ptr - start); \
985 } while (0)
987 static void
988 CopyAttributeOutText(CopyToState cstate, const char *string)
990 const char *ptr;
991 const char *start;
992 char c;
993 char delimc = cstate->opts.delim[0];
995 if (cstate->need_transcoding)
996 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
997 else
998 ptr = string;
1001 * We have to grovel through the string searching for control characters
1002 * and instances of the delimiter character. In most cases, though, these
1003 * are infrequent. To avoid overhead from calling CopySendData once per
1004 * character, we dump out all characters between escaped characters in a
1005 * single call. The loop invariant is that the data from "start" to "ptr"
1006 * can be sent literally, but hasn't yet been.
1008 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
1009 * in valid backend encodings, extra bytes of a multibyte character never
1010 * look like ASCII. This loop is sufficiently performance-critical that
1011 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
1012 * of the normal safe-encoding path.
1014 if (cstate->encoding_embeds_ascii)
1016 start = ptr;
1017 while ((c = *ptr) != '\0')
1019 if ((unsigned char) c < (unsigned char) 0x20)
1022 * \r and \n must be escaped, the others are traditional. We
1023 * prefer to dump these using the C-like notation, rather than
1024 * a backslash and the literal character, because it makes the
1025 * dump file a bit more proof against Microsoftish data
1026 * mangling.
1028 switch (c)
1030 case '\b':
1031 c = 'b';
1032 break;
1033 case '\f':
1034 c = 'f';
1035 break;
1036 case '\n':
1037 c = 'n';
1038 break;
1039 case '\r':
1040 c = 'r';
1041 break;
1042 case '\t':
1043 c = 't';
1044 break;
1045 case '\v':
1046 c = 'v';
1047 break;
1048 default:
1049 /* If it's the delimiter, must backslash it */
1050 if (c == delimc)
1051 break;
1052 /* All ASCII control chars are length 1 */
1053 ptr++;
1054 continue; /* fall to end of loop */
1056 /* if we get here, we need to convert the control char */
1057 DUMPSOFAR();
1058 CopySendChar(cstate, '\\');
1059 CopySendChar(cstate, c);
1060 start = ++ptr; /* do not include char in next run */
1062 else if (c == '\\' || c == delimc)
1064 DUMPSOFAR();
1065 CopySendChar(cstate, '\\');
1066 start = ptr++; /* we include char in next run */
1068 else if (IS_HIGHBIT_SET(c))
1069 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
1070 else
1071 ptr++;
1074 else
1076 start = ptr;
1077 while ((c = *ptr) != '\0')
1079 if ((unsigned char) c < (unsigned char) 0x20)
1082 * \r and \n must be escaped, the others are traditional. We
1083 * prefer to dump these using the C-like notation, rather than
1084 * a backslash and the literal character, because it makes the
1085 * dump file a bit more proof against Microsoftish data
1086 * mangling.
1088 switch (c)
1090 case '\b':
1091 c = 'b';
1092 break;
1093 case '\f':
1094 c = 'f';
1095 break;
1096 case '\n':
1097 c = 'n';
1098 break;
1099 case '\r':
1100 c = 'r';
1101 break;
1102 case '\t':
1103 c = 't';
1104 break;
1105 case '\v':
1106 c = 'v';
1107 break;
1108 default:
1109 /* If it's the delimiter, must backslash it */
1110 if (c == delimc)
1111 break;
1112 /* All ASCII control chars are length 1 */
1113 ptr++;
1114 continue; /* fall to end of loop */
1116 /* if we get here, we need to convert the control char */
1117 DUMPSOFAR();
1118 CopySendChar(cstate, '\\');
1119 CopySendChar(cstate, c);
1120 start = ++ptr; /* do not include char in next run */
1122 else if (c == '\\' || c == delimc)
1124 DUMPSOFAR();
1125 CopySendChar(cstate, '\\');
1126 start = ptr++; /* we include char in next run */
1128 else
1129 ptr++;
1133 DUMPSOFAR();
1137 * Send text representation of one attribute, with conversion and
1138 * CSV-style escaping
1140 static void
1141 CopyAttributeOutCSV(CopyToState cstate, const char *string,
1142 bool use_quote)
1144 const char *ptr;
1145 const char *start;
1146 char c;
1147 char delimc = cstate->opts.delim[0];
1148 char quotec = cstate->opts.quote[0];
1149 char escapec = cstate->opts.escape[0];
1150 bool single_attr = (list_length(cstate->attnumlist) == 1);
1152 /* force quoting if it matches null_print (before conversion!) */
1153 if (!use_quote && strcmp(string, cstate->opts.null_print) == 0)
1154 use_quote = true;
1156 if (cstate->need_transcoding)
1157 ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1158 else
1159 ptr = string;
1162 * Make a preliminary pass to discover if it needs quoting
1164 if (!use_quote)
1167 * Quote '\.' if it appears alone on a line, so that it will not be
1168 * interpreted as an end-of-data marker. (PG 18 and up will not
1169 * interpret '\.' in CSV that way, except in embedded-in-SQL data; but
1170 * we want the data to be loadable by older versions too. Also, this
1171 * avoids breaking clients that are still using PQgetline().)
1173 if (single_attr && strcmp(ptr, "\\.") == 0)
1174 use_quote = true;
1175 else
1177 const char *tptr = ptr;
1179 while ((c = *tptr) != '\0')
1181 if (c == delimc || c == quotec || c == '\n' || c == '\r')
1183 use_quote = true;
1184 break;
1186 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
1187 tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
1188 else
1189 tptr++;
1194 if (use_quote)
1196 CopySendChar(cstate, quotec);
1199 * We adopt the same optimization strategy as in CopyAttributeOutText
1201 start = ptr;
1202 while ((c = *ptr) != '\0')
1204 if (c == quotec || c == escapec)
1206 DUMPSOFAR();
1207 CopySendChar(cstate, escapec);
1208 start = ptr; /* we include char in next run */
1210 if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
1211 ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
1212 else
1213 ptr++;
1215 DUMPSOFAR();
1217 CopySendChar(cstate, quotec);
1219 else
1221 /* If it doesn't need quoting, we can just dump it as-is */
1222 CopySendString(cstate, ptr);
1227 * copy_dest_startup --- executor startup
1229 static void
1230 copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
1232 /* no-op */
1236 * copy_dest_receive --- receive one tuple
1238 static bool
1239 copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
1241 DR_copy *myState = (DR_copy *) self;
1242 CopyToState cstate = myState->cstate;
1244 /* Send the data */
1245 CopyOneRowTo(cstate, slot);
1247 /* Increment the number of processed tuples, and report the progress */
1248 pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1249 ++myState->processed);
1251 return true;
1255 * copy_dest_shutdown --- executor end
1257 static void
1258 copy_dest_shutdown(DestReceiver *self)
1260 /* no-op */
1264 * copy_dest_destroy --- release DestReceiver object
1266 static void
1267 copy_dest_destroy(DestReceiver *self)
1269 pfree(self);
1273 * CreateCopyDestReceiver -- create a suitable DestReceiver object
1275 DestReceiver *
1276 CreateCopyDestReceiver(void)
1278 DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
1280 self->pub.receiveSlot = copy_dest_receive;
1281 self->pub.rStartup = copy_dest_startup;
1282 self->pub.rShutdown = copy_dest_shutdown;
1283 self->pub.rDestroy = copy_dest_destroy;
1284 self->pub.mydest = DestCopyOut;
1286 self->cstate = NULL; /* will be set later */
1287 self->processed = 0;
1289 return (DestReceiver *) self;