1 /*-------------------------------------------------------------------------
4 * Functions for fetching files from a remote server via libpq.
6 * Copyright (c) 2013-2025, PostgreSQL Global Development Group
8 *-------------------------------------------------------------------------
10 #include "postgres_fe.h"
12 #include "catalog/pg_type_d.h"
13 #include "common/connect.h"
16 #include "lib/stringinfo.h"
17 #include "pg_rewind.h"
18 #include "port/pg_bswap.h"
19 #include "rewind_source.h"
22 * Files are fetched MAX_CHUNK_SIZE bytes at a time, and with a
23 * maximum of MAX_CHUNKS_PER_QUERY chunks in a single query.
25 #define MAX_CHUNK_SIZE (1024 * 1024)
26 #define MAX_CHUNKS_PER_QUERY 1000
28 /* represents a request to fetch a piece of a file from the source */
31 const char *path
; /* path relative to data directory root */
34 } fetch_range_request
;
38 rewind_source common
; /* common interface functions */
43 * Queue of chunks that have been requested with the queue_fetch_range()
44 * function, but have not been fetched from the remote server yet.
47 fetch_range_request request_queue
[MAX_CHUNKS_PER_QUERY
];
49 /* temporary space for process_queued_fetch_requests() */
51 StringInfoData offsets
;
52 StringInfoData lengths
;
55 static void init_libpq_conn(PGconn
*conn
);
56 static char *run_simple_query(PGconn
*conn
, const char *sql
);
57 static void run_simple_command(PGconn
*conn
, const char *sql
);
58 static void appendArrayEscapedString(StringInfo buf
, const char *str
);
60 static void process_queued_fetch_requests(libpq_source
*src
);
62 /* public interface functions */
63 static void libpq_traverse_files(rewind_source
*source
,
64 process_file_callback_t callback
);
65 static void libpq_queue_fetch_file(rewind_source
*source
, const char *path
, size_t len
);
66 static void libpq_queue_fetch_range(rewind_source
*source
, const char *path
,
67 off_t off
, size_t len
);
68 static void libpq_finish_fetch(rewind_source
*source
);
69 static char *libpq_fetch_file(rewind_source
*source
, const char *path
,
71 static XLogRecPtr
libpq_get_current_wal_insert_lsn(rewind_source
*source
);
72 static void libpq_destroy(rewind_source
*source
);
75 * Create a new libpq source.
77 * The caller has already established the connection, but should not try
78 * to use it while the source is active.
81 init_libpq_source(PGconn
*conn
)
85 init_libpq_conn(conn
);
87 src
= pg_malloc0(sizeof(libpq_source
));
89 src
->common
.traverse_files
= libpq_traverse_files
;
90 src
->common
.fetch_file
= libpq_fetch_file
;
91 src
->common
.queue_fetch_file
= libpq_queue_fetch_file
;
92 src
->common
.queue_fetch_range
= libpq_queue_fetch_range
;
93 src
->common
.finish_fetch
= libpq_finish_fetch
;
94 src
->common
.get_current_wal_insert_lsn
= libpq_get_current_wal_insert_lsn
;
95 src
->common
.destroy
= libpq_destroy
;
99 initStringInfo(&src
->paths
);
100 initStringInfo(&src
->offsets
);
101 initStringInfo(&src
->lengths
);
107 * Initialize a libpq connection for use.
110 init_libpq_conn(PGconn
*conn
)
115 /* disable all types of timeouts */
116 run_simple_command(conn
, "SET statement_timeout = 0");
117 run_simple_command(conn
, "SET lock_timeout = 0");
118 run_simple_command(conn
, "SET idle_in_transaction_session_timeout = 0");
119 run_simple_command(conn
, "SET transaction_timeout = 0");
122 * we don't intend to do any updates, put the connection in read-only mode
125 run_simple_command(conn
, "SET default_transaction_read_only = on");
127 /* secure search_path */
128 res
= PQexec(conn
, ALWAYS_SECURE_SEARCH_PATH_SQL
);
129 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
130 pg_fatal("could not clear \"search_path\": %s",
131 PQresultErrorMessage(res
));
135 * Also check that full_page_writes is enabled. We can get torn pages if
136 * a page is modified while we read it with pg_read_binary_file(), and we
137 * rely on full page images to fix them.
139 str
= run_simple_query(conn
, "SHOW full_page_writes");
140 if (strcmp(str
, "on") != 0)
141 pg_fatal("\"full_page_writes\" must be enabled in the source server");
144 /* Prepare a statement we'll use to fetch files */
145 res
= PQprepare(conn
, "fetch_chunks_stmt",
146 "SELECT path, begin,\n"
147 " pg_read_binary_file(path, begin, len, true) AS chunk\n"
148 "FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
151 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
152 pg_fatal("could not prepare statement to fetch file contents: %s",
153 PQresultErrorMessage(res
));
158 * Run a query that returns a single value.
160 * The result should be pg_free'd after use.
163 run_simple_query(PGconn
*conn
, const char *sql
)
168 res
= PQexec(conn
, sql
);
170 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
171 pg_fatal("error running query (%s) on source server: %s",
172 sql
, PQresultErrorMessage(res
));
174 /* sanity check the result set */
175 if (PQnfields(res
) != 1 || PQntuples(res
) != 1 || PQgetisnull(res
, 0, 0))
176 pg_fatal("unexpected result set from query");
178 result
= pg_strdup(PQgetvalue(res
, 0, 0));
188 * In the event of a failure, exit immediately.
191 run_simple_command(PGconn
*conn
, const char *sql
)
195 res
= PQexec(conn
, sql
);
197 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
198 pg_fatal("error running query (%s) in source server: %s",
199 sql
, PQresultErrorMessage(res
));
205 * Call the pg_current_wal_insert_lsn() function in the remote system.
208 libpq_get_current_wal_insert_lsn(rewind_source
*source
)
210 PGconn
*conn
= ((libpq_source
*) source
)->conn
;
216 val
= run_simple_query(conn
, "SELECT pg_current_wal_insert_lsn()");
218 if (sscanf(val
, "%X/%X", &hi
, &lo
) != 2)
219 pg_fatal("unrecognized result \"%s\" for current WAL insert location", val
);
221 result
= ((uint64
) hi
) << 32 | lo
;
229 * Get a list of all files in the data directory.
232 libpq_traverse_files(rewind_source
*source
, process_file_callback_t callback
)
234 PGconn
*conn
= ((libpq_source
*) source
)->conn
;
240 * Create a recursive directory listing of the whole data directory.
242 * The WITH RECURSIVE part does most of the work. The second part gets the
243 * targets of the symlinks in pg_tblspc directory.
245 * XXX: There is no backend function to get a symbolic link's target in
246 * general, so if the admin has put any custom symbolic links in the data
247 * directory, they won't be copied correctly.
250 "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
251 " SELECT '' AS path, filename, size, isdir FROM\n"
252 " (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
253 " pg_stat_file(fn.filename, true) AS this\n"
255 " SELECT parent.path || parent.filename || '/' AS path,\n"
256 " fn, this.size, this.isdir\n"
257 " FROM files AS parent,\n"
258 " pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
259 " pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
260 " WHERE parent.isdir = 't'\n"
262 "SELECT path || filename, size, isdir,\n"
263 " pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
265 "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
266 " AND oid::text = files.filename\n";
267 res
= PQexec(conn
, sql
);
269 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
270 pg_fatal("could not fetch file list: %s",
271 PQresultErrorMessage(res
));
273 /* sanity check the result set */
274 if (PQnfields(res
) != 4)
275 pg_fatal("unexpected result set while fetching file list");
277 /* Read result to local variables */
278 for (i
= 0; i
< PQntuples(res
); i
++)
286 if (PQgetisnull(res
, i
, 1))
289 * The file was removed from the server while the query was
290 * running. Ignore it.
295 path
= PQgetvalue(res
, i
, 0);
296 filesize
= atoll(PQgetvalue(res
, i
, 1));
297 isdir
= (strcmp(PQgetvalue(res
, i
, 2), "t") == 0);
298 link_target
= PQgetvalue(res
, i
, 3);
303 * In-place tablespaces are directories located in pg_tblspc/ with
306 if (is_absolute_path(link_target
))
307 type
= FILE_TYPE_SYMLINK
;
309 type
= FILE_TYPE_DIRECTORY
;
312 type
= FILE_TYPE_DIRECTORY
;
314 type
= FILE_TYPE_REGULAR
;
316 callback(path
, type
, filesize
, link_target
);
322 * Queue up a request to fetch a file from remote system.
325 libpq_queue_fetch_file(rewind_source
*source
, const char *path
, size_t len
)
328 * Truncate the target file immediately, and queue a request to fetch it
329 * from the source. If the file is small, smaller than MAX_CHUNK_SIZE,
330 * request fetching a full-sized chunk anyway, so that if the file has
331 * become larger in the source system, after we scanned the source
332 * directory, we still fetch the whole file. This only works for files up
333 * to MAX_CHUNK_SIZE, but that's good enough for small configuration files
334 * and such that are changed every now and then, but not WAL-logged. For
335 * larger files, we fetch up to the original size.
337 * Even with that mechanism, there is an inherent race condition if the
338 * file is modified at the same instant that we're copying it, so that we
339 * might copy a torn version of the file with one half from the old
340 * version and another half from the new. But pg_basebackup has the same
341 * problem, and it hasn't been a problem in practice.
343 * It might seem more natural to truncate the file later, when we receive
344 * it from the source server, but then we'd need to track which
345 * fetch-requests are for a whole file.
347 open_target_file(path
, true);
348 libpq_queue_fetch_range(source
, path
, 0, Max(len
, MAX_CHUNK_SIZE
));
352 * Queue up a request to fetch a piece of a file from remote system.
355 libpq_queue_fetch_range(rewind_source
*source
, const char *path
, off_t off
,
358 libpq_source
*src
= (libpq_source
*) source
;
361 * Does this request happen to be a continuation of the previous chunk? If
362 * so, merge it with the previous one.
364 * XXX: We use pointer equality to compare the path. That's good enough
365 * for our purposes; the caller always passes the same pointer for the
366 * same filename. If it didn't, we would fail to merge requests, but it
367 * wouldn't affect correctness.
369 if (src
->num_requests
> 0)
371 fetch_range_request
*prev
= &src
->request_queue
[src
->num_requests
- 1];
373 if (prev
->offset
+ prev
->length
== off
&&
374 prev
->length
< MAX_CHUNK_SIZE
&&
378 * Extend the previous request to cover as much of this new
379 * request as possible, without exceeding MAX_CHUNK_SIZE.
383 thislen
= Min(len
, MAX_CHUNK_SIZE
- prev
->length
);
384 prev
->length
+= thislen
;
390 * Fall through to create new requests for any remaining 'len'
391 * that didn't fit in the previous chunk.
396 /* Divide the request into pieces of MAX_CHUNK_SIZE bytes each */
401 /* if the queue is full, perform all the work queued up so far */
402 if (src
->num_requests
== MAX_CHUNKS_PER_QUERY
)
403 process_queued_fetch_requests(src
);
405 thislen
= Min(len
, MAX_CHUNK_SIZE
);
406 src
->request_queue
[src
->num_requests
].path
= path
;
407 src
->request_queue
[src
->num_requests
].offset
= off
;
408 src
->request_queue
[src
->num_requests
].length
= thislen
;
417 * Fetch all the queued chunks and write them to the target data directory.
420 libpq_finish_fetch(rewind_source
*source
)
422 process_queued_fetch_requests((libpq_source
*) source
);
426 process_queued_fetch_requests(libpq_source
*src
)
428 const char *params
[3];
432 if (src
->num_requests
== 0)
435 pg_log_debug("getting %d file chunks", src
->num_requests
);
438 * The prepared statement, 'fetch_chunks_stmt', takes three arrays with
439 * the same length as parameters: paths, offsets and lengths. Construct
440 * the string representations of them.
442 resetStringInfo(&src
->paths
);
443 resetStringInfo(&src
->offsets
);
444 resetStringInfo(&src
->lengths
);
446 appendStringInfoChar(&src
->paths
, '{');
447 appendStringInfoChar(&src
->offsets
, '{');
448 appendStringInfoChar(&src
->lengths
, '{');
449 for (int i
= 0; i
< src
->num_requests
; i
++)
451 fetch_range_request
*rq
= &src
->request_queue
[i
];
455 appendStringInfoChar(&src
->paths
, ',');
456 appendStringInfoChar(&src
->offsets
, ',');
457 appendStringInfoChar(&src
->lengths
, ',');
460 appendArrayEscapedString(&src
->paths
, rq
->path
);
461 appendStringInfo(&src
->offsets
, INT64_FORMAT
, (int64
) rq
->offset
);
462 appendStringInfo(&src
->lengths
, INT64_FORMAT
, (int64
) rq
->length
);
464 appendStringInfoChar(&src
->paths
, '}');
465 appendStringInfoChar(&src
->offsets
, '}');
466 appendStringInfoChar(&src
->lengths
, '}');
469 * Execute the prepared statement.
471 params
[0] = src
->paths
.data
;
472 params
[1] = src
->offsets
.data
;
473 params
[2] = src
->lengths
.data
;
475 if (PQsendQueryPrepared(src
->conn
, "fetch_chunks_stmt", 3, params
, NULL
, NULL
, 1) != 1)
476 pg_fatal("could not send query: %s", PQerrorMessage(src
->conn
));
478 if (PQsetSingleRowMode(src
->conn
) != 1)
479 pg_fatal("could not set libpq connection to single row mode");
482 * The result set is of format:
484 * path text -- path in the data directory, e.g "base/1/123"
485 * begin int8 -- offset within the file
486 * chunk bytea -- file content
490 while ((res
= PQgetResult(src
->conn
)) != NULL
)
492 fetch_range_request
*rq
= &src
->request_queue
[chunkno
];
499 switch (PQresultStatus(res
))
501 case PGRES_SINGLE_TUPLE
:
504 case PGRES_TUPLES_OK
:
506 continue; /* final zero-row result */
509 pg_fatal("unexpected result while fetching remote files: %s",
510 PQresultErrorMessage(res
));
513 if (chunkno
> src
->num_requests
)
514 pg_fatal("received more data chunks than requested");
516 /* sanity check the result set */
517 if (PQnfields(res
) != 3 || PQntuples(res
) != 1)
518 pg_fatal("unexpected result set size while fetching remote files");
520 if (PQftype(res
, 0) != TEXTOID
||
521 PQftype(res
, 1) != INT8OID
||
522 PQftype(res
, 2) != BYTEAOID
)
524 pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u",
525 PQftype(res
, 0), PQftype(res
, 1), PQftype(res
, 2));
528 if (PQfformat(res
, 0) != 1 &&
529 PQfformat(res
, 1) != 1 &&
530 PQfformat(res
, 2) != 1)
532 pg_fatal("unexpected result format while fetching remote files");
535 if (PQgetisnull(res
, 0, 0) ||
536 PQgetisnull(res
, 0, 1))
538 pg_fatal("unexpected null values in result while fetching remote files");
541 if (PQgetlength(res
, 0, 1) != sizeof(int64
))
542 pg_fatal("unexpected result length while fetching remote files");
544 /* Read result set to local variables */
545 memcpy(&chunkoff
, PQgetvalue(res
, 0, 1), sizeof(int64
));
546 chunkoff
= pg_ntoh64(chunkoff
);
547 chunksize
= PQgetlength(res
, 0, 2);
549 filenamelen
= PQgetlength(res
, 0, 0);
550 filename
= pg_malloc(filenamelen
+ 1);
551 memcpy(filename
, PQgetvalue(res
, 0, 0), filenamelen
);
552 filename
[filenamelen
] = '\0';
554 chunk
= PQgetvalue(res
, 0, 2);
557 * If a file has been deleted on the source, remove it on the target
558 * as well. Note that multiple unlink() calls may happen on the same
559 * file if multiple data chunks are associated with it, hence ignore
560 * unconditionally anything missing.
562 if (PQgetisnull(res
, 0, 2))
564 pg_log_debug("received null value for chunk for file \"%s\", file has been deleted",
566 remove_target_file(filename
, true);
570 pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
571 filename
, (long long int) chunkoff
, chunksize
);
573 if (strcmp(filename
, rq
->path
) != 0)
575 pg_fatal("received data for file \"%s\", when requested for \"%s\"",
578 if (chunkoff
!= rq
->offset
)
579 pg_fatal("received data at offset %lld of file \"%s\", when requested for offset %lld",
580 (long long int) chunkoff
, rq
->path
, (long long int) rq
->offset
);
583 * We should not receive more data than we requested, or
584 * pg_read_binary_file() messed up. We could receive less,
585 * though, if the file was truncated in the source after we
586 * checked its size. That's OK, there should be a WAL record of
587 * the truncation, which will get replayed when you start the
588 * target system for the first time after pg_rewind has completed.
590 if (chunksize
> rq
->length
)
591 pg_fatal("received more than requested for file \"%s\"", rq
->path
);
593 open_target_file(filename
, false);
595 write_target_range(chunk
, chunkoff
, chunksize
);
603 if (chunkno
!= src
->num_requests
)
604 pg_fatal("unexpected number of data chunks received");
606 src
->num_requests
= 0;
610 * Escape a string to be used as element in a text array constant
613 appendArrayEscapedString(StringInfo buf
, const char *str
)
615 appendStringInfoCharMacro(buf
, '\"');
620 if (ch
== '"' || ch
== '\\')
621 appendStringInfoCharMacro(buf
, '\\');
623 appendStringInfoCharMacro(buf
, ch
);
627 appendStringInfoCharMacro(buf
, '\"');
631 * Fetch a single file as a malloc'd buffer.
634 libpq_fetch_file(rewind_source
*source
, const char *path
, size_t *filesize
)
636 PGconn
*conn
= ((libpq_source
*) source
)->conn
;
640 const char *paramValues
[1];
642 paramValues
[0] = path
;
643 res
= PQexecParams(conn
, "SELECT pg_read_binary_file($1)",
644 1, NULL
, paramValues
, NULL
, NULL
, 1);
646 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
647 pg_fatal("could not fetch remote file \"%s\": %s",
648 path
, PQresultErrorMessage(res
));
650 /* sanity check the result set */
651 if (PQntuples(res
) != 1 || PQgetisnull(res
, 0, 0))
652 pg_fatal("unexpected result set while fetching remote file \"%s\"",
655 /* Read result to local variables */
656 len
= PQgetlength(res
, 0, 0);
657 result
= pg_malloc(len
+ 1);
658 memcpy(result
, PQgetvalue(res
, 0, 0), len
);
663 pg_log_debug("fetched file \"%s\", length %d", path
, len
);
671 * Close a libpq source.
674 libpq_destroy(rewind_source
*source
)
676 libpq_source
*src
= (libpq_source
*) source
;
678 pfree(src
->paths
.data
);
679 pfree(src
->offsets
.data
);
680 pfree(src
->lengths
.data
);
683 /* NOTE: we don't close the connection here, as it was not opened by us. */