1 /*-------------------------------------------------------------------------
4 * COPY <table> TO file/program/client
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/commands/copyto.c
13 *-------------------------------------------------------------------------
21 #include "access/tableam.h"
22 #include "commands/copy.h"
23 #include "commands/progress.h"
24 #include "executor/execdesc.h"
25 #include "executor/executor.h"
26 #include "executor/tuptable.h"
27 #include "libpq/libpq.h"
28 #include "libpq/pqformat.h"
29 #include "mb/pg_wchar.h"
30 #include "miscadmin.h"
32 #include "storage/fd.h"
33 #include "tcop/tcopprot.h"
34 #include "utils/lsyscache.h"
35 #include "utils/memutils.h"
36 #include "utils/rel.h"
37 #include "utils/snapmgr.h"
40 * Represents the different dest cases we need to worry about at
45 COPY_FILE
, /* to file (or a piped program) */
46 COPY_FRONTEND
, /* to frontend */
47 COPY_CALLBACK
, /* to callback function */
51 * This struct contains all the state variables used throughout a COPY TO
54 * Multi-byte encodings: all supported client-side encodings encode multi-byte
55 * characters by having the first byte's high bit set. Subsequent bytes of the
56 * character can have the high bit not set. When scanning data in such an
57 * encoding to look for a match to a single-byte (ie ASCII) character, we must
58 * use the full pg_encoding_mblen() machinery to skip over multibyte
59 * characters, else we might find a false match to a trailing byte. In
60 * supported server encodings, there is no possibility of a false match, and
61 * it's faster to make useless comparisons to trailing bytes than it is to
62 * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
63 * when we have to do it the hard way.
65 typedef struct CopyToStateData
67 /* low-level state data */
68 CopyDest copy_dest
; /* type of copy source/destination */
69 FILE *copy_file
; /* used if copy_dest == COPY_FILE */
70 StringInfo fe_msgbuf
; /* used for all dests during COPY TO */
72 int file_encoding
; /* file or remote side's character encoding */
73 bool need_transcoding
; /* file encoding diff from server? */
74 bool encoding_embeds_ascii
; /* ASCII can be non-first byte? */
76 /* parameters from the COPY command */
77 Relation rel
; /* relation to copy to */
78 QueryDesc
*queryDesc
; /* executable query to copy from */
79 List
*attnumlist
; /* integer list of attnums to copy */
80 char *filename
; /* filename, or NULL for STDOUT */
81 bool is_program
; /* is 'filename' a program to popen? */
82 copy_data_dest_cb data_dest_cb
; /* function for writing data */
84 CopyFormatOptions opts
;
85 Node
*whereClause
; /* WHERE condition (or NULL) */
90 MemoryContext copycontext
; /* per-copy execution context */
92 FmgrInfo
*out_functions
; /* lookup info for output functions */
93 MemoryContext rowcontext
; /* per-row evaluation context */
94 uint64 bytes_processed
; /* number of bytes processed so far */
97 /* DestReceiver for COPY (query) TO */
100 DestReceiver pub
; /* publicly-known function pointers */
101 CopyToState cstate
; /* CopyToStateData for the command */
102 uint64 processed
; /* # of tuples processed */
105 /* NOTE: there's a copy of this in copyfromparse.c */
106 static const char BinarySignature
[11] = "PGCOPY\n\377\r\n\0";
109 /* non-export function prototypes */
110 static void EndCopy(CopyToState cstate
);
111 static void ClosePipeToProgram(CopyToState cstate
);
112 static void CopyOneRowTo(CopyToState cstate
, TupleTableSlot
*slot
);
113 static void CopyAttributeOutText(CopyToState cstate
, const char *string
);
114 static void CopyAttributeOutCSV(CopyToState cstate
, const char *string
,
117 /* Low-level communications functions */
118 static void SendCopyBegin(CopyToState cstate
);
119 static void SendCopyEnd(CopyToState cstate
);
120 static void CopySendData(CopyToState cstate
, const void *databuf
, int datasize
);
121 static void CopySendString(CopyToState cstate
, const char *str
);
122 static void CopySendChar(CopyToState cstate
, char c
);
123 static void CopySendEndOfRow(CopyToState cstate
);
124 static void CopySendInt32(CopyToState cstate
, int32 val
);
125 static void CopySendInt16(CopyToState cstate
, int16 val
);
129 * Send copy start/stop messages for frontend copies. These have changed
130 * in past protocol redesigns.
133 SendCopyBegin(CopyToState cstate
)
136 int natts
= list_length(cstate
->attnumlist
);
137 int16 format
= (cstate
->opts
.binary
? 1 : 0);
140 pq_beginmessage(&buf
, PqMsg_CopyOutResponse
);
141 pq_sendbyte(&buf
, format
); /* overall format */
142 pq_sendint16(&buf
, natts
);
143 for (i
= 0; i
< natts
; i
++)
144 pq_sendint16(&buf
, format
); /* per-column formats */
146 cstate
->copy_dest
= COPY_FRONTEND
;
150 SendCopyEnd(CopyToState cstate
)
152 /* Shouldn't have any unsent data */
153 Assert(cstate
->fe_msgbuf
->len
== 0);
154 /* Send Copy Done message */
155 pq_putemptymessage(PqMsg_CopyDone
);
159 * CopySendData sends output data to the destination (file or frontend)
160 * CopySendString does the same for null-terminated strings
161 * CopySendChar does the same for single characters
162 * CopySendEndOfRow does the appropriate thing at end of each data row
163 * (data is not actually flushed except by CopySendEndOfRow)
165 * NB: no data conversion is applied by these functions
169 CopySendData(CopyToState cstate
, const void *databuf
, int datasize
)
171 appendBinaryStringInfo(cstate
->fe_msgbuf
, databuf
, datasize
);
175 CopySendString(CopyToState cstate
, const char *str
)
177 appendBinaryStringInfo(cstate
->fe_msgbuf
, str
, strlen(str
));
181 CopySendChar(CopyToState cstate
, char c
)
183 appendStringInfoCharMacro(cstate
->fe_msgbuf
, c
);
187 CopySendEndOfRow(CopyToState cstate
)
189 StringInfo fe_msgbuf
= cstate
->fe_msgbuf
;
191 switch (cstate
->copy_dest
)
194 if (!cstate
->opts
.binary
)
196 /* Default line termination depends on platform */
198 CopySendChar(cstate
, '\n');
200 CopySendString(cstate
, "\r\n");
204 if (fwrite(fe_msgbuf
->data
, fe_msgbuf
->len
, 1,
205 cstate
->copy_file
) != 1 ||
206 ferror(cstate
->copy_file
))
208 if (cstate
->is_program
)
213 * The pipe will be closed automatically on error at
214 * the end of transaction, but we might get a better
215 * error message from the subprocess' exit code than
218 ClosePipeToProgram(cstate
);
221 * If ClosePipeToProgram() didn't throw an error, the
222 * program terminated normally, but closed the pipe
223 * first. Restore errno, and throw an error.
228 (errcode_for_file_access(),
229 errmsg("could not write to COPY program: %m")));
233 (errcode_for_file_access(),
234 errmsg("could not write to COPY file: %m")));
238 /* The FE/BE protocol uses \n as newline for all platforms */
239 if (!cstate
->opts
.binary
)
240 CopySendChar(cstate
, '\n');
242 /* Dump the accumulated row as one CopyData message */
243 (void) pq_putmessage(PqMsg_CopyData
, fe_msgbuf
->data
, fe_msgbuf
->len
);
246 cstate
->data_dest_cb(fe_msgbuf
->data
, fe_msgbuf
->len
);
250 /* Update the progress */
251 cstate
->bytes_processed
+= fe_msgbuf
->len
;
252 pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED
, cstate
->bytes_processed
);
254 resetStringInfo(fe_msgbuf
);
258 * These functions do apply some data conversion
262 * CopySendInt32 sends an int32 in network byte order
265 CopySendInt32(CopyToState cstate
, int32 val
)
269 buf
= pg_hton32((uint32
) val
);
270 CopySendData(cstate
, &buf
, sizeof(buf
));
274 * CopySendInt16 sends an int16 in network byte order
277 CopySendInt16(CopyToState cstate
, int16 val
)
281 buf
= pg_hton16((uint16
) val
);
282 CopySendData(cstate
, &buf
, sizeof(buf
));
286 * Closes the pipe to an external program, checking the pclose() return code.
289 ClosePipeToProgram(CopyToState cstate
)
293 Assert(cstate
->is_program
);
295 pclose_rc
= ClosePipeStream(cstate
->copy_file
);
298 (errcode_for_file_access(),
299 errmsg("could not close pipe to external command: %m")));
300 else if (pclose_rc
!= 0)
303 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION
),
304 errmsg("program \"%s\" failed",
306 errdetail_internal("%s", wait_result_to_str(pclose_rc
))));
311 * Release resources allocated in a cstate for COPY TO/FROM.
314 EndCopy(CopyToState cstate
)
316 if (cstate
->is_program
)
318 ClosePipeToProgram(cstate
);
322 if (cstate
->filename
!= NULL
&& FreeFile(cstate
->copy_file
))
324 (errcode_for_file_access(),
325 errmsg("could not close file \"%s\": %m",
329 pgstat_progress_end_command();
331 MemoryContextDelete(cstate
->copycontext
);
336 * Setup CopyToState to read tuples from a table or a query for COPY TO.
338 * 'rel': Relation to be copied
339 * 'raw_query': Query whose results are to be copied
340 * 'queryRelId': OID of base relation to convert to a query (for RLS)
341 * 'filename': Name of server-local file to write, NULL for STDOUT
342 * 'is_program': true if 'filename' is program to execute
343 * 'data_dest_cb': Callback that processes the output data
344 * 'attnamelist': List of char *, columns to include. NIL selects all cols.
345 * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
347 * Returns a CopyToState, to be passed to DoCopyTo() and related functions.
350 BeginCopyTo(ParseState
*pstate
,
354 const char *filename
,
356 copy_data_dest_cb data_dest_cb
,
361 bool pipe
= (filename
== NULL
&& data_dest_cb
== NULL
);
364 MemoryContext oldcontext
;
365 const int progress_cols
[] = {
366 PROGRESS_COPY_COMMAND
,
369 int64 progress_vals
[] = {
370 PROGRESS_COPY_COMMAND_TO
,
374 if (rel
!= NULL
&& rel
->rd_rel
->relkind
!= RELKIND_RELATION
)
376 if (rel
->rd_rel
->relkind
== RELKIND_VIEW
)
378 (errcode(ERRCODE_WRONG_OBJECT_TYPE
),
379 errmsg("cannot copy from view \"%s\"",
380 RelationGetRelationName(rel
)),
381 errhint("Try the COPY (SELECT ...) TO variant.")));
382 else if (rel
->rd_rel
->relkind
== RELKIND_MATVIEW
)
384 (errcode(ERRCODE_WRONG_OBJECT_TYPE
),
385 errmsg("cannot copy from materialized view \"%s\"",
386 RelationGetRelationName(rel
)),
387 errhint("Try the COPY (SELECT ...) TO variant.")));
388 else if (rel
->rd_rel
->relkind
== RELKIND_FOREIGN_TABLE
)
390 (errcode(ERRCODE_WRONG_OBJECT_TYPE
),
391 errmsg("cannot copy from foreign table \"%s\"",
392 RelationGetRelationName(rel
)),
393 errhint("Try the COPY (SELECT ...) TO variant.")));
394 else if (rel
->rd_rel
->relkind
== RELKIND_SEQUENCE
)
396 (errcode(ERRCODE_WRONG_OBJECT_TYPE
),
397 errmsg("cannot copy from sequence \"%s\"",
398 RelationGetRelationName(rel
))));
399 else if (rel
->rd_rel
->relkind
== RELKIND_PARTITIONED_TABLE
)
401 (errcode(ERRCODE_WRONG_OBJECT_TYPE
),
402 errmsg("cannot copy from partitioned table \"%s\"",
403 RelationGetRelationName(rel
)),
404 errhint("Try the COPY (SELECT ...) TO variant.")));
407 (errcode(ERRCODE_WRONG_OBJECT_TYPE
),
408 errmsg("cannot copy from non-table relation \"%s\"",
409 RelationGetRelationName(rel
))));
413 /* Allocate workspace and zero all fields */
414 cstate
= (CopyToStateData
*) palloc0(sizeof(CopyToStateData
));
417 * We allocate everything used by a cstate in a new memory context. This
418 * avoids memory leaks during repeated use of COPY in a query.
420 cstate
->copycontext
= AllocSetContextCreate(CurrentMemoryContext
,
422 ALLOCSET_DEFAULT_SIZES
);
424 oldcontext
= MemoryContextSwitchTo(cstate
->copycontext
);
426 /* Extract options from the statement node tree */
427 ProcessCopyOptions(pstate
, &cstate
->opts
, false /* is_from */ , options
);
429 /* Process the source/target relation or query */
436 tupDesc
= RelationGetDescr(cstate
->rel
);
448 * Run parse analysis and rewrite. Note this also acquires sufficient
449 * locks on the source table(s).
451 rewritten
= pg_analyze_and_rewrite_fixedparams(raw_query
,
452 pstate
->p_sourcetext
, NULL
, 0,
455 /* check that we got back something we can work with */
456 if (rewritten
== NIL
)
459 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
460 errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
462 else if (list_length(rewritten
) > 1)
466 /* examine queries to determine which error message to issue */
467 foreach(lc
, rewritten
)
469 Query
*q
= lfirst_node(Query
, lc
);
471 if (q
->querySource
== QSRC_QUAL_INSTEAD_RULE
)
473 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
474 errmsg("conditional DO INSTEAD rules are not supported for COPY")));
475 if (q
->querySource
== QSRC_NON_INSTEAD_RULE
)
477 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
478 errmsg("DO ALSO rules are not supported for COPY")));
482 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
483 errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
486 query
= linitial_node(Query
, rewritten
);
488 /* The grammar allows SELECT INTO, but we don't support that */
489 if (query
->utilityStmt
!= NULL
&&
490 IsA(query
->utilityStmt
, CreateTableAsStmt
))
492 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
493 errmsg("COPY (SELECT INTO) is not supported")));
495 /* The only other utility command we could see is NOTIFY */
496 if (query
->utilityStmt
!= NULL
)
498 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
499 errmsg("COPY query must not be a utility command")));
502 * Similarly the grammar doesn't enforce the presence of a RETURNING
503 * clause, but this is required here.
505 if (query
->commandType
!= CMD_SELECT
&&
506 query
->returningList
== NIL
)
508 Assert(query
->commandType
== CMD_INSERT
||
509 query
->commandType
== CMD_UPDATE
||
510 query
->commandType
== CMD_DELETE
||
511 query
->commandType
== CMD_MERGE
);
514 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
515 errmsg("COPY query must have a RETURNING clause")));
519 plan
= pg_plan_query(query
, pstate
->p_sourcetext
,
520 CURSOR_OPT_PARALLEL_OK
, NULL
);
523 * With row-level security and a user using "COPY relation TO", we
524 * have to convert the "COPY relation TO" to a query-based COPY (eg:
525 * "COPY (SELECT * FROM ONLY relation) TO"), to allow the rewriter to
526 * add in any RLS clauses.
528 * When this happens, we are passed in the relid of the originally
529 * found relation (which we have locked). As the planner will look up
530 * the relation again, we double-check here to make sure it found the
531 * same one that we have locked.
533 if (queryRelId
!= InvalidOid
)
536 * Note that with RLS involved there may be multiple relations,
537 * and while the one we need is almost certainly first, we don't
538 * make any guarantees of that in the planner, so check the whole
539 * list and make sure we find the original relation.
541 if (!list_member_oid(plan
->relationOids
, queryRelId
))
543 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
544 errmsg("relation referenced by COPY statement has changed")));
548 * Use a snapshot with an updated command ID to ensure this query sees
549 * results of any previously executed queries.
551 PushCopiedSnapshot(GetActiveSnapshot());
552 UpdateActiveSnapshotCommandId();
554 /* Create dest receiver for COPY OUT */
555 dest
= CreateDestReceiver(DestCopyOut
);
556 ((DR_copy
*) dest
)->cstate
= cstate
;
558 /* Create a QueryDesc requesting no output */
559 cstate
->queryDesc
= CreateQueryDesc(plan
, pstate
->p_sourcetext
,
562 dest
, NULL
, NULL
, 0);
565 * Call ExecutorStart to prepare the plan for execution.
567 * ExecutorStart computes a result tupdesc for us
569 ExecutorStart(cstate
->queryDesc
, 0);
571 tupDesc
= cstate
->queryDesc
->tupDesc
;
574 /* Generate or convert list of attributes to process */
575 cstate
->attnumlist
= CopyGetAttnums(tupDesc
, cstate
->rel
, attnamelist
);
577 num_phys_attrs
= tupDesc
->natts
;
579 /* Convert FORCE_QUOTE name list to per-column flags, check validity */
580 cstate
->opts
.force_quote_flags
= (bool *) palloc0(num_phys_attrs
* sizeof(bool));
581 if (cstate
->opts
.force_quote_all
)
583 MemSet(cstate
->opts
.force_quote_flags
, true, num_phys_attrs
* sizeof(bool));
585 else if (cstate
->opts
.force_quote
)
590 attnums
= CopyGetAttnums(tupDesc
, cstate
->rel
, cstate
->opts
.force_quote
);
592 foreach(cur
, attnums
)
594 int attnum
= lfirst_int(cur
);
595 Form_pg_attribute attr
= TupleDescAttr(tupDesc
, attnum
- 1);
597 if (!list_member_int(cstate
->attnumlist
, attnum
))
599 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE
),
600 /*- translator: %s is the name of a COPY option, e.g. FORCE_NOT_NULL */
601 errmsg("%s column \"%s\" not referenced by COPY",
602 "FORCE_QUOTE", NameStr(attr
->attname
))));
603 cstate
->opts
.force_quote_flags
[attnum
- 1] = true;
607 /* Use client encoding when ENCODING option is not specified. */
608 if (cstate
->opts
.file_encoding
< 0)
609 cstate
->file_encoding
= pg_get_client_encoding();
611 cstate
->file_encoding
= cstate
->opts
.file_encoding
;
614 * Set up encoding conversion info if the file and server encodings differ
615 * (see also pg_server_to_any).
617 if (cstate
->file_encoding
== GetDatabaseEncoding() ||
618 cstate
->file_encoding
== PG_SQL_ASCII
)
619 cstate
->need_transcoding
= false;
621 cstate
->need_transcoding
= true;
623 /* See Multibyte encoding comment above */
624 cstate
->encoding_embeds_ascii
= PG_ENCODING_IS_CLIENT_ONLY(cstate
->file_encoding
);
626 cstate
->copy_dest
= COPY_FILE
; /* default */
630 progress_vals
[1] = PROGRESS_COPY_TYPE_CALLBACK
;
631 cstate
->copy_dest
= COPY_CALLBACK
;
632 cstate
->data_dest_cb
= data_dest_cb
;
636 progress_vals
[1] = PROGRESS_COPY_TYPE_PIPE
;
638 Assert(!is_program
); /* the grammar does not allow this */
639 if (whereToSendOutput
!= DestRemote
)
640 cstate
->copy_file
= stdout
;
644 cstate
->filename
= pstrdup(filename
);
645 cstate
->is_program
= is_program
;
649 progress_vals
[1] = PROGRESS_COPY_TYPE_PROGRAM
;
650 cstate
->copy_file
= OpenPipeStream(cstate
->filename
, PG_BINARY_W
);
651 if (cstate
->copy_file
== NULL
)
653 (errcode_for_file_access(),
654 errmsg("could not execute command \"%s\": %m",
659 mode_t oumask
; /* Pre-existing umask value */
662 progress_vals
[1] = PROGRESS_COPY_TYPE_FILE
;
665 * Prevent write to relative path ... too easy to shoot oneself in
666 * the foot by overwriting a database file ...
668 if (!is_absolute_path(filename
))
670 (errcode(ERRCODE_INVALID_NAME
),
671 errmsg("relative path not allowed for COPY to file")));
673 oumask
= umask(S_IWGRP
| S_IWOTH
);
676 cstate
->copy_file
= AllocateFile(cstate
->filename
, PG_BINARY_W
);
683 if (cstate
->copy_file
== NULL
)
685 /* copy errno because ereport subfunctions might change it */
686 int save_errno
= errno
;
689 (errcode_for_file_access(),
690 errmsg("could not open file \"%s\" for writing: %m",
692 (save_errno
== ENOENT
|| save_errno
== EACCES
) ?
693 errhint("COPY TO instructs the PostgreSQL server process to write a file. "
694 "You may want a client-side facility such as psql's \\copy.") : 0));
697 if (fstat(fileno(cstate
->copy_file
), &st
))
699 (errcode_for_file_access(),
700 errmsg("could not stat file \"%s\": %m",
703 if (S_ISDIR(st
.st_mode
))
705 (errcode(ERRCODE_WRONG_OBJECT_TYPE
),
706 errmsg("\"%s\" is a directory", cstate
->filename
)));
710 /* initialize progress */
711 pgstat_progress_start_command(PROGRESS_COMMAND_COPY
,
712 cstate
->rel
? RelationGetRelid(cstate
->rel
) : InvalidOid
);
713 pgstat_progress_update_multi_param(2, progress_cols
, progress_vals
);
715 cstate
->bytes_processed
= 0;
717 MemoryContextSwitchTo(oldcontext
);
723 * Clean up storage and release resources for COPY TO.
726 EndCopyTo(CopyToState cstate
)
728 if (cstate
->queryDesc
!= NULL
)
730 /* Close down the query and free resources. */
731 ExecutorFinish(cstate
->queryDesc
);
732 ExecutorEnd(cstate
->queryDesc
);
733 FreeQueryDesc(cstate
->queryDesc
);
737 /* Clean up storage */
742 * Copy from relation or query TO file.
744 * Returns the number of rows processed.
747 DoCopyTo(CopyToState cstate
)
749 bool pipe
= (cstate
->filename
== NULL
&& cstate
->data_dest_cb
== NULL
);
750 bool fe_copy
= (pipe
&& whereToSendOutput
== DestRemote
);
757 SendCopyBegin(cstate
);
760 tupDesc
= RelationGetDescr(cstate
->rel
);
762 tupDesc
= cstate
->queryDesc
->tupDesc
;
763 num_phys_attrs
= tupDesc
->natts
;
764 cstate
->opts
.null_print_client
= cstate
->opts
.null_print
; /* default */
766 /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
767 cstate
->fe_msgbuf
= makeStringInfo();
769 /* Get info about the columns we need to process. */
770 cstate
->out_functions
= (FmgrInfo
*) palloc(num_phys_attrs
* sizeof(FmgrInfo
));
771 foreach(cur
, cstate
->attnumlist
)
773 int attnum
= lfirst_int(cur
);
776 Form_pg_attribute attr
= TupleDescAttr(tupDesc
, attnum
- 1);
778 if (cstate
->opts
.binary
)
779 getTypeBinaryOutputInfo(attr
->atttypid
,
783 getTypeOutputInfo(attr
->atttypid
,
786 fmgr_info(out_func_oid
, &cstate
->out_functions
[attnum
- 1]);
790 * Create a temporary memory context that we can reset once per row to
791 * recover palloc'd memory. This avoids any problems with leaks inside
792 * datatype output routines, and should be faster than retail pfree's
793 * anyway. (We don't need a whole econtext as CopyFrom does.)
795 cstate
->rowcontext
= AllocSetContextCreate(CurrentMemoryContext
,
797 ALLOCSET_DEFAULT_SIZES
);
799 if (cstate
->opts
.binary
)
801 /* Generate header for a binary copy */
805 CopySendData(cstate
, BinarySignature
, 11);
808 CopySendInt32(cstate
, tmp
);
809 /* No header extension */
811 CopySendInt32(cstate
, tmp
);
816 * For non-binary copy, we need to convert null_print to file
817 * encoding, because it will be sent directly with CopySendString.
819 if (cstate
->need_transcoding
)
820 cstate
->opts
.null_print_client
= pg_server_to_any(cstate
->opts
.null_print
,
821 cstate
->opts
.null_print_len
,
822 cstate
->file_encoding
);
824 /* if a header has been requested send the line */
825 if (cstate
->opts
.header_line
)
827 bool hdr_delim
= false;
829 foreach(cur
, cstate
->attnumlist
)
831 int attnum
= lfirst_int(cur
);
835 CopySendChar(cstate
, cstate
->opts
.delim
[0]);
838 colname
= NameStr(TupleDescAttr(tupDesc
, attnum
- 1)->attname
);
840 if (cstate
->opts
.csv_mode
)
841 CopyAttributeOutCSV(cstate
, colname
, false);
843 CopyAttributeOutText(cstate
, colname
);
846 CopySendEndOfRow(cstate
);
852 TupleTableSlot
*slot
;
853 TableScanDesc scandesc
;
855 scandesc
= table_beginscan(cstate
->rel
, GetActiveSnapshot(), 0, NULL
);
856 slot
= table_slot_create(cstate
->rel
, NULL
);
859 while (table_scan_getnextslot(scandesc
, ForwardScanDirection
, slot
))
861 CHECK_FOR_INTERRUPTS();
863 /* Deconstruct the tuple ... */
864 slot_getallattrs(slot
);
866 /* Format and send the data */
867 CopyOneRowTo(cstate
, slot
);
870 * Increment the number of processed tuples, and report the
873 pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED
,
877 ExecDropSingleTupleTableSlot(slot
);
878 table_endscan(scandesc
);
882 /* run the plan --- the dest receiver will send tuples */
883 ExecutorRun(cstate
->queryDesc
, ForwardScanDirection
, 0);
884 processed
= ((DR_copy
*) cstate
->queryDesc
->dest
)->processed
;
887 if (cstate
->opts
.binary
)
889 /* Generate trailer for a binary copy */
890 CopySendInt16(cstate
, -1);
891 /* Need to flush out the trailer */
892 CopySendEndOfRow(cstate
);
895 MemoryContextDelete(cstate
->rowcontext
);
904 * Emit one row during DoCopyTo().
907 CopyOneRowTo(CopyToState cstate
, TupleTableSlot
*slot
)
909 FmgrInfo
*out_functions
= cstate
->out_functions
;
910 MemoryContext oldcontext
;
912 MemoryContextReset(cstate
->rowcontext
);
913 oldcontext
= MemoryContextSwitchTo(cstate
->rowcontext
);
915 if (cstate
->opts
.binary
)
917 /* Binary per-tuple header */
918 CopySendInt16(cstate
, list_length(cstate
->attnumlist
));
921 /* Make sure the tuple is fully deconstructed */
922 slot_getallattrs(slot
);
924 if (!cstate
->opts
.binary
)
926 bool need_delim
= false;
928 foreach_int(attnum
, cstate
->attnumlist
)
930 Datum value
= slot
->tts_values
[attnum
- 1];
931 bool isnull
= slot
->tts_isnull
[attnum
- 1];
935 CopySendChar(cstate
, cstate
->opts
.delim
[0]);
939 CopySendString(cstate
, cstate
->opts
.null_print_client
);
942 string
= OutputFunctionCall(&out_functions
[attnum
- 1],
944 if (cstate
->opts
.csv_mode
)
945 CopyAttributeOutCSV(cstate
, string
,
946 cstate
->opts
.force_quote_flags
[attnum
- 1]);
948 CopyAttributeOutText(cstate
, string
);
954 foreach_int(attnum
, cstate
->attnumlist
)
956 Datum value
= slot
->tts_values
[attnum
- 1];
957 bool isnull
= slot
->tts_isnull
[attnum
- 1];
961 CopySendInt32(cstate
, -1);
964 outputbytes
= SendFunctionCall(&out_functions
[attnum
- 1],
966 CopySendInt32(cstate
, VARSIZE(outputbytes
) - VARHDRSZ
);
967 CopySendData(cstate
, VARDATA(outputbytes
),
968 VARSIZE(outputbytes
) - VARHDRSZ
);
973 CopySendEndOfRow(cstate
);
975 MemoryContextSwitchTo(oldcontext
);
979 * Send text representation of one attribute, with conversion and escaping
981 #define DUMPSOFAR() \
984 CopySendData(cstate, start, ptr - start); \
988 CopyAttributeOutText(CopyToState cstate
, const char *string
)
993 char delimc
= cstate
->opts
.delim
[0];
995 if (cstate
->need_transcoding
)
996 ptr
= pg_server_to_any(string
, strlen(string
), cstate
->file_encoding
);
1001 * We have to grovel through the string searching for control characters
1002 * and instances of the delimiter character. In most cases, though, these
1003 * are infrequent. To avoid overhead from calling CopySendData once per
1004 * character, we dump out all characters between escaped characters in a
1005 * single call. The loop invariant is that the data from "start" to "ptr"
1006 * can be sent literally, but hasn't yet been.
1008 * We can skip pg_encoding_mblen() overhead when encoding is safe, because
1009 * in valid backend encodings, extra bytes of a multibyte character never
1010 * look like ASCII. This loop is sufficiently performance-critical that
1011 * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
1012 * of the normal safe-encoding path.
1014 if (cstate
->encoding_embeds_ascii
)
1017 while ((c
= *ptr
) != '\0')
1019 if ((unsigned char) c
< (unsigned char) 0x20)
1022 * \r and \n must be escaped, the others are traditional. We
1023 * prefer to dump these using the C-like notation, rather than
1024 * a backslash and the literal character, because it makes the
1025 * dump file a bit more proof against Microsoftish data
1049 /* If it's the delimiter, must backslash it */
1052 /* All ASCII control chars are length 1 */
1054 continue; /* fall to end of loop */
1056 /* if we get here, we need to convert the control char */
1058 CopySendChar(cstate
, '\\');
1059 CopySendChar(cstate
, c
);
1060 start
= ++ptr
; /* do not include char in next run */
1062 else if (c
== '\\' || c
== delimc
)
1065 CopySendChar(cstate
, '\\');
1066 start
= ptr
++; /* we include char in next run */
1068 else if (IS_HIGHBIT_SET(c
))
1069 ptr
+= pg_encoding_mblen(cstate
->file_encoding
, ptr
);
1077 while ((c
= *ptr
) != '\0')
1079 if ((unsigned char) c
< (unsigned char) 0x20)
1082 * \r and \n must be escaped, the others are traditional. We
1083 * prefer to dump these using the C-like notation, rather than
1084 * a backslash and the literal character, because it makes the
1085 * dump file a bit more proof against Microsoftish data
1109 /* If it's the delimiter, must backslash it */
1112 /* All ASCII control chars are length 1 */
1114 continue; /* fall to end of loop */
1116 /* if we get here, we need to convert the control char */
1118 CopySendChar(cstate
, '\\');
1119 CopySendChar(cstate
, c
);
1120 start
= ++ptr
; /* do not include char in next run */
1122 else if (c
== '\\' || c
== delimc
)
1125 CopySendChar(cstate
, '\\');
1126 start
= ptr
++; /* we include char in next run */
1137 * Send text representation of one attribute, with conversion and
1138 * CSV-style escaping
1141 CopyAttributeOutCSV(CopyToState cstate
, const char *string
,
1147 char delimc
= cstate
->opts
.delim
[0];
1148 char quotec
= cstate
->opts
.quote
[0];
1149 char escapec
= cstate
->opts
.escape
[0];
1150 bool single_attr
= (list_length(cstate
->attnumlist
) == 1);
1152 /* force quoting if it matches null_print (before conversion!) */
1153 if (!use_quote
&& strcmp(string
, cstate
->opts
.null_print
) == 0)
1156 if (cstate
->need_transcoding
)
1157 ptr
= pg_server_to_any(string
, strlen(string
), cstate
->file_encoding
);
1162 * Make a preliminary pass to discover if it needs quoting
1167 * Quote '\.' if it appears alone on a line, so that it will not be
1168 * interpreted as an end-of-data marker. (PG 18 and up will not
1169 * interpret '\.' in CSV that way, except in embedded-in-SQL data; but
1170 * we want the data to be loadable by older versions too. Also, this
1171 * avoids breaking clients that are still using PQgetline().)
1173 if (single_attr
&& strcmp(ptr
, "\\.") == 0)
1177 const char *tptr
= ptr
;
1179 while ((c
= *tptr
) != '\0')
1181 if (c
== delimc
|| c
== quotec
|| c
== '\n' || c
== '\r')
1186 if (IS_HIGHBIT_SET(c
) && cstate
->encoding_embeds_ascii
)
1187 tptr
+= pg_encoding_mblen(cstate
->file_encoding
, tptr
);
1196 CopySendChar(cstate
, quotec
);
1199 * We adopt the same optimization strategy as in CopyAttributeOutText
1202 while ((c
= *ptr
) != '\0')
1204 if (c
== quotec
|| c
== escapec
)
1207 CopySendChar(cstate
, escapec
);
1208 start
= ptr
; /* we include char in next run */
1210 if (IS_HIGHBIT_SET(c
) && cstate
->encoding_embeds_ascii
)
1211 ptr
+= pg_encoding_mblen(cstate
->file_encoding
, ptr
);
1217 CopySendChar(cstate
, quotec
);
1221 /* If it doesn't need quoting, we can just dump it as-is */
1222 CopySendString(cstate
, ptr
);
1227 * copy_dest_startup --- executor startup
1230 copy_dest_startup(DestReceiver
*self
, int operation
, TupleDesc typeinfo
)
1236 * copy_dest_receive --- receive one tuple
1239 copy_dest_receive(TupleTableSlot
*slot
, DestReceiver
*self
)
1241 DR_copy
*myState
= (DR_copy
*) self
;
1242 CopyToState cstate
= myState
->cstate
;
1245 CopyOneRowTo(cstate
, slot
);
1247 /* Increment the number of processed tuples, and report the progress */
1248 pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED
,
1249 ++myState
->processed
);
1255 * copy_dest_shutdown --- executor end
1258 copy_dest_shutdown(DestReceiver
*self
)
1264 * copy_dest_destroy --- release DestReceiver object
1267 copy_dest_destroy(DestReceiver
*self
)
1273 * CreateCopyDestReceiver -- create a suitable DestReceiver object
1276 CreateCopyDestReceiver(void)
1278 DR_copy
*self
= (DR_copy
*) palloc(sizeof(DR_copy
));
1280 self
->pub
.receiveSlot
= copy_dest_receive
;
1281 self
->pub
.rStartup
= copy_dest_startup
;
1282 self
->pub
.rShutdown
= copy_dest_shutdown
;
1283 self
->pub
.rDestroy
= copy_dest_destroy
;
1284 self
->pub
.mydest
= DestCopyOut
;
1286 self
->cstate
= NULL
; /* will be set later */
1287 self
->processed
= 0;
1289 return (DestReceiver
*) self
;