1 /*-------------------------------------------------------------------------
3 * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
6 * Author: Magnus Hagander <magnus@hagander.net>
8 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
11 * src/bin/pg_basebackup/streamutil.c
12 *-------------------------------------------------------------------------
15 #include "postgres_fe.h"
20 #include "access/xlog_internal.h"
21 #include "common/connect.h"
22 #include "common/fe_memutils.h"
23 #include "common/file_perm.h"
24 #include "common/logging.h"
25 #include "common/string.h"
26 #include "datatype/timestamp.h"
27 #include "port/pg_bswap.h"
28 #include "pqexpbuffer.h"
29 #include "receivelog.h"
30 #include "streamutil.h"
32 #define ERRCODE_DUPLICATE_OBJECT "42710"
36 static bool RetrieveDataDirCreatePerm(PGconn
*conn
);
38 /* SHOW command for replication connection was introduced in version 10 */
39 #define MINIMUM_VERSION_FOR_SHOW_CMD 100000
42 * Group access is supported from version 11.
44 #define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000
47 char *connection_string
= NULL
;
52 int dbgetpassword
= 0; /* 0=auto, -1=never, 1=always */
53 static char *password
= NULL
;
57 * Connect to the server. Returns a valid PGconn pointer if connected,
58 * or NULL on non-permanent error. On permanent error, the function will
59 * call exit(1) directly.
65 int argcount
= 7; /* dbname, replication, fallback_app_name,
66 * host, user, port, password */
68 const char **keywords
;
72 PQconninfoOption
*conn_opts
= NULL
;
73 PQconninfoOption
*conn_opt
;
76 /* pg_recvlogical uses dbname only; others use connection_string only. */
77 Assert(dbname
== NULL
|| connection_string
== NULL
);
80 * Merge the connection info inputs given in form of connection string,
81 * options and default values (dbname=replication, replication=true, etc.)
82 * Explicitly discard any dbname value in the connection string;
83 * otherwise, PQconnectdbParams() would interpret that value as being
84 * itself a connection string.
87 if (connection_string
)
89 conn_opts
= PQconninfoParse(connection_string
, &err_msg
);
90 if (conn_opts
== NULL
)
92 pg_log_error("%s", err_msg
);
96 for (conn_opt
= conn_opts
; conn_opt
->keyword
!= NULL
; conn_opt
++)
98 if (conn_opt
->val
!= NULL
&& conn_opt
->val
[0] != '\0' &&
99 strcmp(conn_opt
->keyword
, "dbname") != 0)
103 keywords
= pg_malloc0((argcount
+ 1) * sizeof(*keywords
));
104 values
= pg_malloc0((argcount
+ 1) * sizeof(*values
));
106 for (conn_opt
= conn_opts
; conn_opt
->keyword
!= NULL
; conn_opt
++)
108 if (conn_opt
->val
!= NULL
&& conn_opt
->val
[0] != '\0' &&
109 strcmp(conn_opt
->keyword
, "dbname") != 0)
111 keywords
[i
] = conn_opt
->keyword
;
112 values
[i
] = conn_opt
->val
;
119 keywords
= pg_malloc0((argcount
+ 1) * sizeof(*keywords
));
120 values
= pg_malloc0((argcount
+ 1) * sizeof(*values
));
123 keywords
[i
] = "dbname";
124 values
[i
] = dbname
== NULL
? "replication" : dbname
;
126 keywords
[i
] = "replication";
127 values
[i
] = dbname
== NULL
? "true" : "database";
129 keywords
[i
] = "fallback_application_name";
130 values
[i
] = progname
;
135 keywords
[i
] = "host";
141 keywords
[i
] = "user";
147 keywords
[i
] = "port";
152 /* If -W was given, force prompt for password, but only the first time */
153 need_password
= (dbgetpassword
== 1 && !password
);
157 /* Get a new password if appropriate */
162 password
= simple_prompt("Password: ", false);
163 need_password
= false;
166 /* Use (or reuse, on a subsequent connection) password if we have it */
169 keywords
[i
] = "password";
170 values
[i
] = password
;
178 tmpconn
= PQconnectdbParams(keywords
, values
, true);
181 * If there is too little memory even to allocate the PGconn object
182 * and PQconnectdbParams returns NULL, we call exit(1) directly.
186 pg_log_error("could not connect to server");
190 /* If we need a password and -w wasn't given, loop back and get one */
191 if (PQstatus(tmpconn
) == CONNECTION_BAD
&&
192 PQconnectionNeedsPassword(tmpconn
) &&
196 need_password
= true;
199 while (need_password
);
201 if (PQstatus(tmpconn
) != CONNECTION_OK
)
203 pg_log_error("%s", PQerrorMessage(tmpconn
));
208 PQconninfoFree(conn_opts
);
216 PQconninfoFree(conn_opts
);
219 * Set always-secure search path, so malicious users can't get control.
220 * The capacity to run normal SQL queries was added in PostgreSQL 10, so
221 * the search path cannot be changed (by us or attackers) on earlier
224 if (dbname
!= NULL
&& PQserverVersion(tmpconn
) >= 100000)
228 res
= PQexec(tmpconn
, ALWAYS_SECURE_SEARCH_PATH_SQL
);
229 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
231 pg_log_error("could not clear search_path: %s",
232 PQerrorMessage(tmpconn
));
241 * Ensure we have the same value of integer_datetimes (now always "on") as
242 * the server we are connecting to.
244 tmpparam
= PQparameterStatus(tmpconn
, "integer_datetimes");
247 pg_log_error("could not determine server setting for integer_datetimes");
252 if (strcmp(tmpparam
, "on") != 0)
254 pg_log_error("integer_datetimes compile flag does not match server");
260 * Retrieve the source data directory mode and use it to construct a umask
261 * for creating directories and files.
263 if (!RetrieveDataDirCreatePerm(tmpconn
))
273 * From version 10, explicitly set wal segment size using SHOW wal_segment_size
274 * since ControlFile is not accessible here.
277 RetrieveWalSegSize(PGconn
*conn
)
284 /* check connection existence */
285 Assert(conn
!= NULL
);
287 /* for previous versions set the default xlog seg size */
288 if (PQserverVersion(conn
) < MINIMUM_VERSION_FOR_SHOW_CMD
)
290 WalSegSz
= DEFAULT_XLOG_SEG_SIZE
;
294 res
= PQexec(conn
, "SHOW wal_segment_size");
295 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
297 pg_log_error("could not send replication command \"%s\": %s",
298 "SHOW wal_segment_size", PQerrorMessage(conn
));
303 if (PQntuples(res
) != 1 || PQnfields(res
) < 1)
305 pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",
306 PQntuples(res
), PQnfields(res
), 1, 1);
312 /* fetch xlog value and unit from the result */
313 if (sscanf(PQgetvalue(res
, 0, 0), "%d%s", &xlog_val
, xlog_unit
) != 2)
315 pg_log_error("WAL segment size could not be parsed");
322 /* set the multiplier based on unit to convert xlog_val to bytes */
323 if (strcmp(xlog_unit
, "MB") == 0)
324 multiplier
= 1024 * 1024;
325 else if (strcmp(xlog_unit
, "GB") == 0)
326 multiplier
= 1024 * 1024 * 1024;
328 /* convert and set WalSegSz */
329 WalSegSz
= xlog_val
* multiplier
;
331 if (!IsValidWalSegSize(WalSegSz
))
333 pg_log_error(ngettext("WAL segment size must be a power of two between 1 MB and 1 GB, but the remote server reported a value of %d byte",
334 "WAL segment size must be a power of two between 1 MB and 1 GB, but the remote server reported a value of %d bytes",
344 * RetrieveDataDirCreatePerm
346 * This function is used to determine the privileges on the server's PG data
347 * directory and, based on that, set what the permissions will be for
348 * directories and files we create.
350 * PG11 added support for (optionally) group read/execute rights to be set on
351 * the data directory. Prior to PG11, only the owner was allowed to have rights
352 * on the data directory.
355 RetrieveDataDirCreatePerm(PGconn
*conn
)
358 int data_directory_mode
;
360 /* check connection existence */
361 Assert(conn
!= NULL
);
363 /* for previous versions leave the default group access */
364 if (PQserverVersion(conn
) < MINIMUM_VERSION_FOR_GROUP_ACCESS
)
367 res
= PQexec(conn
, "SHOW data_directory_mode");
368 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
370 pg_log_error("could not send replication command \"%s\": %s",
371 "SHOW data_directory_mode", PQerrorMessage(conn
));
376 if (PQntuples(res
) != 1 || PQnfields(res
) < 1)
378 pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",
379 PQntuples(res
), PQnfields(res
), 1, 1);
385 if (sscanf(PQgetvalue(res
, 0, 0), "%o", &data_directory_mode
) != 1)
387 pg_log_error("group access flag could not be parsed: %s",
388 PQgetvalue(res
, 0, 0));
394 SetDataDirectoryCreatePerm(data_directory_mode
);
401 * Run IDENTIFY_SYSTEM through a given connection and give back to caller
402 * some result information if requested:
403 * - System identifier
404 * - Current timeline ID
405 * - Start LSN position
406 * - Database name (NULL in servers prior to 9.4)
409 RunIdentifySystem(PGconn
*conn
, char **sysid
, TimeLineID
*starttli
,
410 XLogRecPtr
*startpos
, char **db_name
)
416 /* Check connection existence */
417 Assert(conn
!= NULL
);
419 res
= PQexec(conn
, "IDENTIFY_SYSTEM");
420 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
422 pg_log_error("could not send replication command \"%s\": %s",
423 "IDENTIFY_SYSTEM", PQerrorMessage(conn
));
428 if (PQntuples(res
) != 1 || PQnfields(res
) < 3)
430 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
431 PQntuples(res
), PQnfields(res
), 1, 3);
437 /* Get system identifier */
439 *sysid
= pg_strdup(PQgetvalue(res
, 0, 0));
441 /* Get timeline ID to start streaming from */
442 if (starttli
!= NULL
)
443 *starttli
= atoi(PQgetvalue(res
, 0, 1));
445 /* Get LSN start position if necessary */
446 if (startpos
!= NULL
)
448 if (sscanf(PQgetvalue(res
, 0, 2), "%X/%X", &hi
, &lo
) != 2)
450 pg_log_error("could not parse write-ahead log location \"%s\"",
451 PQgetvalue(res
, 0, 2));
456 *startpos
= ((uint64
) hi
) << 32 | lo
;
459 /* Get database name, only available in 9.4 and newer versions */
463 if (PQserverVersion(conn
) >= 90400)
465 if (PQnfields(res
) < 4)
467 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
468 PQntuples(res
), PQnfields(res
), 1, 4);
473 if (!PQgetisnull(res
, 0, 3))
474 *db_name
= pg_strdup(PQgetvalue(res
, 0, 3));
483 * Create a replication slot for the given connection. This function
484 * returns true in case of success.
487 CreateReplicationSlot(PGconn
*conn
, const char *slot_name
, const char *plugin
,
488 bool is_temporary
, bool is_physical
, bool reserve_wal
,
489 bool slot_exists_ok
, bool two_phase
)
494 query
= createPQExpBuffer();
496 Assert((is_physical
&& plugin
== NULL
) ||
497 (!is_physical
&& plugin
!= NULL
));
498 Assert(!(two_phase
&& is_physical
));
499 Assert(slot_name
!= NULL
);
502 appendPQExpBuffer(query
, "CREATE_REPLICATION_SLOT \"%s\"", slot_name
);
504 appendPQExpBufferStr(query
, " TEMPORARY");
507 appendPQExpBufferStr(query
, " PHYSICAL");
509 appendPQExpBufferStr(query
, " RESERVE_WAL");
513 appendPQExpBuffer(query
, " LOGICAL \"%s\"", plugin
);
514 if (two_phase
&& PQserverVersion(conn
) >= 150000)
515 appendPQExpBufferStr(query
, " TWO_PHASE");
517 if (PQserverVersion(conn
) >= 100000)
518 /* pg_recvlogical doesn't use an exported snapshot, so suppress */
519 appendPQExpBufferStr(query
, " NOEXPORT_SNAPSHOT");
522 res
= PQexec(conn
, query
->data
);
523 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
525 const char *sqlstate
= PQresultErrorField(res
, PG_DIAG_SQLSTATE
);
527 if (slot_exists_ok
&&
529 strcmp(sqlstate
, ERRCODE_DUPLICATE_OBJECT
) == 0)
531 destroyPQExpBuffer(query
);
537 pg_log_error("could not send replication command \"%s\": %s",
538 query
->data
, PQerrorMessage(conn
));
540 destroyPQExpBuffer(query
);
546 if (PQntuples(res
) != 1 || PQnfields(res
) != 4)
548 pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
550 PQntuples(res
), PQnfields(res
), 1, 4);
552 destroyPQExpBuffer(query
);
557 destroyPQExpBuffer(query
);
563 * Drop a replication slot for the given connection. This function
564 * returns true in case of success.
567 DropReplicationSlot(PGconn
*conn
, const char *slot_name
)
572 Assert(slot_name
!= NULL
);
574 query
= createPQExpBuffer();
577 appendPQExpBuffer(query
, "DROP_REPLICATION_SLOT \"%s\"",
579 res
= PQexec(conn
, query
->data
);
580 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
582 pg_log_error("could not send replication command \"%s\": %s",
583 query
->data
, PQerrorMessage(conn
));
585 destroyPQExpBuffer(query
);
590 if (PQntuples(res
) != 0 || PQnfields(res
) != 0)
592 pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
594 PQntuples(res
), PQnfields(res
), 0, 0);
596 destroyPQExpBuffer(query
);
601 destroyPQExpBuffer(query
);
608 * Frontend version of GetCurrentTimestamp(), since we are not linked with
612 feGetCurrentTimestamp(void)
617 gettimeofday(&tp
, NULL
);
619 result
= (TimestampTz
) tp
.tv_sec
-
620 ((POSTGRES_EPOCH_JDATE
- UNIX_EPOCH_JDATE
) * SECS_PER_DAY
);
621 result
= (result
* USECS_PER_SEC
) + tp
.tv_usec
;
627 * Frontend version of TimestampDifference(), since we are not linked with
631 feTimestampDifference(TimestampTz start_time
, TimestampTz stop_time
,
632 long *secs
, int *microsecs
)
634 TimestampTz diff
= stop_time
- start_time
;
643 *secs
= (long) (diff
/ USECS_PER_SEC
);
644 *microsecs
= (int) (diff
% USECS_PER_SEC
);
649 * Frontend version of TimestampDifferenceExceeds(), since we are not
650 * linked with backend code.
653 feTimestampDifferenceExceeds(TimestampTz start_time
,
654 TimestampTz stop_time
,
657 TimestampTz diff
= stop_time
- start_time
;
659 return (diff
>= msec
* INT64CONST(1000));
663 * Converts an int64 to network byte order.
666 fe_sendint64(int64 i
, char *buf
)
668 uint64 n64
= pg_hton64(i
);
670 memcpy(buf
, &n64
, sizeof(n64
));
674 * Converts an int64 from network byte order to native format.
677 fe_recvint64(char *buf
)
681 memcpy(&n64
, buf
, sizeof(n64
));
683 return pg_ntoh64(n64
);