1 /*-------------------------------------------------------------------------
4 * send basebackup archives using COPY OUT
6 * We send a result set with information about the tablespaces to be included
7 * in the backup before starting COPY OUT. Then, we start a single COPY OUT
8 * operation and transmits all the archives and the manifest if present during
9 * the course of that single COPY OUT. Each CopyData message begins with a
10 * type byte, allowing us to signal the start of a new archive, or the
11 * manifest, by some means other than ending the COPY stream. This also allows
12 * for future protocol extensions, since we can include arbitrary information
13 * in the message stream as long as we're certain that the client will know
16 * An older method that sent each archive using a separate COPY OUT
17 * operation is no longer supported.
19 * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
22 * src/backend/backup/basebackup_copy.c
24 *-------------------------------------------------------------------------
28 #include "access/tupdesc.h"
29 #include "backup/basebackup.h"
30 #include "backup/basebackup_sink.h"
31 #include "catalog/pg_type_d.h"
32 #include "executor/executor.h"
33 #include "libpq/libpq.h"
34 #include "libpq/pqformat.h"
35 #include "tcop/dest.h"
36 #include "utils/builtins.h"
37 #include "utils/timestamp.h"
39 typedef struct bbsink_copystream
41 /* Common information for all types of sink. */
44 /* Are we sending the archives to the client, or somewhere else? */
48 * Protocol message buffer. We assemble CopyData protocol messages by
49 * setting the first character of this buffer to 'd' (archive or manifest
50 * data) and then making base.bbs_buffer point to the second character so
51 * that the rest of the data gets copied into the message just where we
57 * When did we last report progress to the client, and how much progress
60 TimestampTz last_progress_report_time
;
61 uint64 bytes_done_at_last_time_check
;
65 * We don't want to send progress messages to the client excessively
66 * frequently. Ideally, we'd like to send a message when the time since the
67 * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
68 * the system time every time we send a tiny bit of data seems too expensive.
69 * So we only check it after the number of bytes sine the last check reaches
70 * PROGRESS_REPORT_BYTE_INTERVAL.
72 #define PROGRESS_REPORT_BYTE_INTERVAL 65536
73 #define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000
75 static void bbsink_copystream_begin_backup(bbsink
*sink
);
76 static void bbsink_copystream_begin_archive(bbsink
*sink
,
77 const char *archive_name
);
78 static void bbsink_copystream_archive_contents(bbsink
*sink
, size_t len
);
79 static void bbsink_copystream_end_archive(bbsink
*sink
);
80 static void bbsink_copystream_begin_manifest(bbsink
*sink
);
81 static void bbsink_copystream_manifest_contents(bbsink
*sink
, size_t len
);
82 static void bbsink_copystream_end_manifest(bbsink
*sink
);
83 static void bbsink_copystream_end_backup(bbsink
*sink
, XLogRecPtr endptr
,
85 static void bbsink_copystream_cleanup(bbsink
*sink
);
87 static void SendCopyOutResponse(void);
88 static void SendCopyDone(void);
89 static void SendXlogRecPtrResult(XLogRecPtr ptr
, TimeLineID tli
);
90 static void SendTablespaceList(List
*tablespaces
);
92 static const bbsink_ops bbsink_copystream_ops
= {
93 .begin_backup
= bbsink_copystream_begin_backup
,
94 .begin_archive
= bbsink_copystream_begin_archive
,
95 .archive_contents
= bbsink_copystream_archive_contents
,
96 .end_archive
= bbsink_copystream_end_archive
,
97 .begin_manifest
= bbsink_copystream_begin_manifest
,
98 .manifest_contents
= bbsink_copystream_manifest_contents
,
99 .end_manifest
= bbsink_copystream_end_manifest
,
100 .end_backup
= bbsink_copystream_end_backup
,
101 .cleanup
= bbsink_copystream_cleanup
105 * Create a new 'copystream' bbsink.
108 bbsink_copystream_new(bool send_to_client
)
110 bbsink_copystream
*sink
= palloc0(sizeof(bbsink_copystream
));
112 *((const bbsink_ops
**) &sink
->base
.bbs_ops
) = &bbsink_copystream_ops
;
113 sink
->send_to_client
= send_to_client
;
115 /* Set up for periodic progress reporting. */
116 sink
->last_progress_report_time
= GetCurrentTimestamp();
117 sink
->bytes_done_at_last_time_check
= UINT64CONST(0);
123 * Send start-of-backup wire protocol messages.
126 bbsink_copystream_begin_backup(bbsink
*sink
)
128 bbsink_copystream
*mysink
= (bbsink_copystream
*) sink
;
129 bbsink_state
*state
= sink
->bbs_state
;
133 * Initialize buffer. We ultimately want to send the archive and manifest
134 * data by means of CopyData messages where the payload portion of each
135 * message begins with a type byte. However, basebackup.c expects the
136 * buffer to be aligned, so we can't just allocate one extra byte for the
137 * type byte. Instead, allocate enough extra bytes that the portion of the
138 * buffer we reveal to our callers can be aligned, while leaving room to
139 * slip the type byte in just beforehand. That will allow us to ship the
140 * data with a single call to pq_putmessage and without needing any extra
143 buf
= palloc(mysink
->base
.bbs_buffer_length
+ MAXIMUM_ALIGNOF
);
144 mysink
->msgbuffer
= buf
+ (MAXIMUM_ALIGNOF
- 1);
145 mysink
->base
.bbs_buffer
= buf
+ MAXIMUM_ALIGNOF
;
146 mysink
->msgbuffer
[0] = 'd'; /* archive or manifest data */
148 /* Tell client the backup start location. */
149 SendXlogRecPtrResult(state
->startptr
, state
->starttli
);
151 /* Send client a list of tablespaces. */
152 SendTablespaceList(state
->tablespaces
);
154 /* Send a CommandComplete message */
155 pq_puttextmessage(PqMsg_CommandComplete
, "SELECT");
157 /* Begin COPY stream. This will be used for all archives + manifest. */
158 SendCopyOutResponse();
162 * Send a CopyData message announcing the beginning of a new archive.
165 bbsink_copystream_begin_archive(bbsink
*sink
, const char *archive_name
)
167 bbsink_state
*state
= sink
->bbs_state
;
171 ti
= list_nth(state
->tablespaces
, state
->tablespace_num
);
172 pq_beginmessage(&buf
, PqMsg_CopyData
);
173 pq_sendbyte(&buf
, 'n'); /* New archive */
174 pq_sendstring(&buf
, archive_name
);
175 pq_sendstring(&buf
, ti
->path
== NULL
? "" : ti
->path
);
180 * Send a CopyData message containing a chunk of archive content.
183 bbsink_copystream_archive_contents(bbsink
*sink
, size_t len
)
185 bbsink_copystream
*mysink
= (bbsink_copystream
*) sink
;
186 bbsink_state
*state
= mysink
->base
.bbs_state
;
190 /* Send the archive content to the client, if appropriate. */
191 if (mysink
->send_to_client
)
193 /* Add one because we're also sending a leading type byte. */
194 pq_putmessage('d', mysink
->msgbuffer
, len
+ 1);
197 /* Consider whether to send a progress report to the client. */
198 targetbytes
= mysink
->bytes_done_at_last_time_check
199 + PROGRESS_REPORT_BYTE_INTERVAL
;
200 if (targetbytes
<= state
->bytes_done
)
202 TimestampTz now
= GetCurrentTimestamp();
206 * OK, we've sent a decent number of bytes, so check the system time
207 * to see whether we're due to send a progress report.
209 mysink
->bytes_done_at_last_time_check
= state
->bytes_done
;
210 ms
= TimestampDifferenceMilliseconds(mysink
->last_progress_report_time
,
214 * Send a progress report if enough time has passed. Also send one if
215 * the system clock was set backward, so that such occurrences don't
216 * have the effect of suppressing further progress messages.
218 if (ms
>= PROGRESS_REPORT_MILLISECOND_THRESHOLD
||
219 now
< mysink
->last_progress_report_time
)
221 mysink
->last_progress_report_time
= now
;
223 pq_beginmessage(&buf
, PqMsg_CopyData
);
224 pq_sendbyte(&buf
, 'p'); /* Progress report */
225 pq_sendint64(&buf
, state
->bytes_done
);
227 pq_flush_if_writable();
233 * We don't need to explicitly signal the end of the archive; the client
234 * will figure out that we've reached the end when we begin the next one,
235 * or begin the manifest, or end the COPY stream. However, this seems like
236 * a good time to force out a progress report. One reason for that is that
237 * if this is the last archive, and we don't force a progress report now,
238 * the client will never be told that we sent all the bytes.
241 bbsink_copystream_end_archive(bbsink
*sink
)
243 bbsink_copystream
*mysink
= (bbsink_copystream
*) sink
;
244 bbsink_state
*state
= mysink
->base
.bbs_state
;
247 mysink
->bytes_done_at_last_time_check
= state
->bytes_done
;
248 mysink
->last_progress_report_time
= GetCurrentTimestamp();
249 pq_beginmessage(&buf
, PqMsg_CopyData
);
250 pq_sendbyte(&buf
, 'p'); /* Progress report */
251 pq_sendint64(&buf
, state
->bytes_done
);
253 pq_flush_if_writable();
257 * Send a CopyData message announcing the beginning of the backup manifest.
260 bbsink_copystream_begin_manifest(bbsink
*sink
)
264 pq_beginmessage(&buf
, PqMsg_CopyData
);
265 pq_sendbyte(&buf
, 'm'); /* Manifest */
270 * Each chunk of manifest data is sent using a CopyData message.
273 bbsink_copystream_manifest_contents(bbsink
*sink
, size_t len
)
275 bbsink_copystream
*mysink
= (bbsink_copystream
*) sink
;
277 if (mysink
->send_to_client
)
279 /* Add one because we're also sending a leading type byte. */
280 pq_putmessage('d', mysink
->msgbuffer
, len
+ 1);
285 * We don't need an explicit terminator for the backup manifest.
288 bbsink_copystream_end_manifest(bbsink
*sink
)
294 * Send end-of-backup wire protocol messages.
297 bbsink_copystream_end_backup(bbsink
*sink
, XLogRecPtr endptr
,
301 SendXlogRecPtrResult(endptr
, endtli
);
308 bbsink_copystream_cleanup(bbsink
*sink
)
314 * Send a CopyOutResponse message.
317 SendCopyOutResponse(void)
321 pq_beginmessage(&buf
, PqMsg_CopyOutResponse
);
322 pq_sendbyte(&buf
, 0); /* overall format */
323 pq_sendint16(&buf
, 0); /* natts */
328 * Send a CopyDone message.
333 pq_putemptymessage(PqMsg_CopyDone
);
337 * Send a single resultset containing just a single
338 * XLogRecPtr record (in text format)
341 SendXlogRecPtrResult(XLogRecPtr ptr
, TimeLineID tli
)
344 TupOutputState
*tstate
;
349 dest
= CreateDestReceiver(DestRemoteSimple
);
351 tupdesc
= CreateTemplateTupleDesc(2);
352 TupleDescInitBuiltinEntry(tupdesc
, (AttrNumber
) 1, "recptr", TEXTOID
, -1, 0);
355 * int8 may seem like a surprising data type for this, but in theory int4
356 * would not be wide enough for this, as TimeLineID is unsigned.
358 TupleDescInitBuiltinEntry(tupdesc
, (AttrNumber
) 2, "tli", INT8OID
, -1, 0);
360 /* send RowDescription */
361 tstate
= begin_tup_output_tupdesc(dest
, tupdesc
, &TTSOpsVirtual
);
364 values
[0] = CStringGetTextDatum(psprintf("%X/%X", LSN_FORMAT_ARGS(ptr
)));
365 values
[1] = Int64GetDatum(tli
);
366 do_tup_output(tstate
, values
, nulls
);
368 end_tup_output(tstate
);
370 /* Send a CommandComplete message */
371 pq_puttextmessage(PqMsg_CommandComplete
, "SELECT");
375 * Send a result set via libpq describing the tablespace list.
378 SendTablespaceList(List
*tablespaces
)
381 TupOutputState
*tstate
;
385 dest
= CreateDestReceiver(DestRemoteSimple
);
387 tupdesc
= CreateTemplateTupleDesc(3);
388 TupleDescInitBuiltinEntry(tupdesc
, (AttrNumber
) 1, "spcoid", OIDOID
, -1, 0);
389 TupleDescInitBuiltinEntry(tupdesc
, (AttrNumber
) 2, "spclocation", TEXTOID
, -1, 0);
390 TupleDescInitBuiltinEntry(tupdesc
, (AttrNumber
) 3, "size", INT8OID
, -1, 0);
392 /* send RowDescription */
393 tstate
= begin_tup_output_tupdesc(dest
, tupdesc
, &TTSOpsVirtual
);
395 /* Construct and send the directory information */
396 foreach(lc
, tablespaces
)
398 tablespaceinfo
*ti
= lfirst(lc
);
402 /* Send one datarow message */
403 if (ti
->path
== NULL
)
410 values
[0] = ObjectIdGetDatum(ti
->oid
);
411 values
[1] = CStringGetTextDatum(ti
->path
);
414 values
[2] = Int64GetDatum(ti
->size
/ 1024);
418 do_tup_output(tstate
, values
, nulls
);
421 end_tup_output(tstate
);