Move routines to manipulate WAL into PostgreSQL::Test::Cluster
[pgsql.git] / src / bin / pg_basebackup / pg_createsubscriber.c
blobfaf18ccf1312c3976de26a652ef3e8c7b2f9bbfc
1 /*-------------------------------------------------------------------------
3 * pg_createsubscriber.c
4 * Create a new logical replica from a standby server
6 * Copyright (c) 2024-2025, PostgreSQL Global Development Group
8 * IDENTIFICATION
9 * src/bin/pg_basebackup/pg_createsubscriber.c
11 *-------------------------------------------------------------------------
14 #include "postgres_fe.h"
16 #include <sys/stat.h>
17 #include <sys/time.h>
18 #include <sys/wait.h>
19 #include <time.h>
21 #include "common/connect.h"
22 #include "common/controldata_utils.h"
23 #include "common/logging.h"
24 #include "common/pg_prng.h"
25 #include "common/restricted_token.h"
26 #include "fe_utils/recovery_gen.h"
27 #include "fe_utils/simple_list.h"
28 #include "fe_utils/string_utils.h"
29 #include "getopt_long.h"
31 #define DEFAULT_SUB_PORT "50432"
33 /* Command-line options */
34 struct CreateSubscriberOptions
36 char *config_file; /* configuration file */
37 char *pub_conninfo_str; /* publisher connection string */
38 char *socket_dir; /* directory for Unix-domain socket, if any */
39 char *sub_port; /* subscriber port number */
40 const char *sub_username; /* subscriber username */
41 SimpleStringList database_names; /* list of database names */
42 SimpleStringList pub_names; /* list of publication names */
43 SimpleStringList sub_names; /* list of subscription names */
44 SimpleStringList replslot_names; /* list of replication slot names */
45 int recovery_timeout; /* stop recovery after this time */
48 struct LogicalRepInfo
50 char *dbname; /* database name */
51 char *pubconninfo; /* publisher connection string */
52 char *subconninfo; /* subscriber connection string */
53 char *pubname; /* publication name */
54 char *subname; /* subscription name */
55 char *replslotname; /* replication slot name */
57 bool made_replslot; /* replication slot was created */
58 bool made_publication; /* publication was created */
61 static void cleanup_objects_atexit(void);
62 static void usage();
63 static char *get_base_conninfo(const char *conninfo, char **dbname);
64 static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
65 static char *get_exec_path(const char *argv0, const char *progname);
66 static void check_data_directory(const char *datadir);
67 static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
68 static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
69 const char *pub_base_conninfo,
70 const char *sub_base_conninfo);
71 static PGconn *connect_database(const char *conninfo, bool exit_on_error);
72 static void disconnect_database(PGconn *conn, bool exit_on_error);
73 static uint64 get_primary_sysid(const char *conninfo);
74 static uint64 get_standby_sysid(const char *datadir);
75 static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
76 static bool server_is_in_recovery(PGconn *conn);
77 static char *generate_object_name(PGconn *conn);
78 static void check_publisher(const struct LogicalRepInfo *dbinfo);
79 static char *setup_publisher(struct LogicalRepInfo *dbinfo);
80 static void check_subscriber(const struct LogicalRepInfo *dbinfo);
81 static void setup_subscriber(struct LogicalRepInfo *dbinfo,
82 const char *consistent_lsn);
83 static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
84 const char *lsn);
85 static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
86 const char *slotname);
87 static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
88 static char *create_logical_replication_slot(PGconn *conn,
89 struct LogicalRepInfo *dbinfo);
90 static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
91 const char *slot_name);
92 static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
93 static void start_standby_server(const struct CreateSubscriberOptions *opt,
94 bool restricted_access,
95 bool restrict_logical_worker);
96 static void stop_standby_server(const char *datadir);
97 static void wait_for_end_recovery(const char *conninfo,
98 const struct CreateSubscriberOptions *opt);
99 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
100 static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
101 static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
102 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
103 const char *lsn);
104 static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
105 static void check_and_drop_existing_subscriptions(PGconn *conn,
106 const struct LogicalRepInfo *dbinfo);
107 static void drop_existing_subscriptions(PGconn *conn, const char *subname,
108 const char *dbname);
110 #define USEC_PER_SEC 1000000
111 #define WAIT_INTERVAL 1 /* 1 second */
113 static const char *progname;
115 static char *primary_slot_name = NULL;
116 static bool dry_run = false;
118 static bool success = false;
120 static struct LogicalRepInfo *dbinfo;
121 static int num_dbs = 0; /* number of specified databases */
122 static int num_pubs = 0; /* number of specified publications */
123 static int num_subs = 0; /* number of specified subscriptions */
124 static int num_replslots = 0; /* number of specified replication slots */
126 static pg_prng_state prng_state;
128 static char *pg_ctl_path = NULL;
129 static char *pg_resetwal_path = NULL;
131 /* standby / subscriber data directory */
132 static char *subscriber_dir = NULL;
134 static bool recovery_ended = false;
135 static bool standby_running = false;
137 enum WaitPMResult
139 POSTMASTER_READY,
140 POSTMASTER_STILL_STARTING
145 * Cleanup objects that were created by pg_createsubscriber if there is an
146 * error.
148 * Publications and replication slots are created on primary. Depending on the
149 * step it failed, it should remove the already created objects if it is
150 * possible (sometimes it won't work due to a connection issue).
151 * There is no cleanup on the target server. The steps on the target server are
152 * executed *after* promotion, hence, at this point, a failure means recreate
153 * the physical replica and start again.
155 static void
156 cleanup_objects_atexit(void)
158 if (success)
159 return;
162 * If the server is promoted, there is no way to use the current setup
163 * again. Warn the user that a new replication setup should be done before
164 * trying again.
166 if (recovery_ended)
168 pg_log_warning("failed after the end of recovery");
169 pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
170 "You must recreate the physical replica before continuing.");
173 for (int i = 0; i < num_dbs; i++)
175 if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
177 PGconn *conn;
179 conn = connect_database(dbinfo[i].pubconninfo, false);
180 if (conn != NULL)
182 if (dbinfo[i].made_publication)
183 drop_publication(conn, &dbinfo[i]);
184 if (dbinfo[i].made_replslot)
185 drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname);
186 disconnect_database(conn, false);
188 else
191 * If a connection could not be established, inform the user
192 * that some objects were left on primary and should be
193 * removed before trying again.
195 if (dbinfo[i].made_publication)
197 pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
198 dbinfo[i].pubname, dbinfo[i].dbname);
199 pg_log_warning_hint("Drop this publication before trying again.");
201 if (dbinfo[i].made_replslot)
203 pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
204 dbinfo[i].replslotname, dbinfo[i].dbname);
205 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
211 if (standby_running)
212 stop_standby_server(subscriber_dir);
215 static void
216 usage(void)
218 printf(_("%s creates a new logical replica from a standby server.\n\n"),
219 progname);
220 printf(_("Usage:\n"));
221 printf(_(" %s [OPTION]...\n"), progname);
222 printf(_("\nOptions:\n"));
223 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
224 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
225 printf(_(" -n, --dry-run dry run, just show what would be done\n"));
226 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
227 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
228 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
229 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
230 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
231 printf(_(" -v, --verbose output verbose messages\n"));
232 printf(_(" --config-file=FILENAME use specified main server configuration\n"
233 " file when running target cluster\n"));
234 printf(_(" --publication=NAME publication name\n"));
235 printf(_(" --replication-slot=NAME replication slot name\n"));
236 printf(_(" --subscription=NAME subscription name\n"));
237 printf(_(" -V, --version output version information, then exit\n"));
238 printf(_(" -?, --help show this help, then exit\n"));
239 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
240 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
244 * Subroutine to append "keyword=value" to a connection string,
245 * with proper quoting of the value. (We assume keywords don't need that.)
247 static void
248 appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
250 if (buf->len > 0)
251 appendPQExpBufferChar(buf, ' ');
252 appendPQExpBufferStr(buf, keyword);
253 appendPQExpBufferChar(buf, '=');
254 appendConnStrVal(buf, val);
258 * Validate a connection string. Returns a base connection string that is a
259 * connection string without a database name.
261 * Since we might process multiple databases, each database name will be
262 * appended to this base connection string to provide a final connection
263 * string. If the second argument (dbname) is not null, returns dbname if the
264 * provided connection string contains it.
266 * It is the caller's responsibility to free the returned connection string and
267 * dbname.
269 static char *
270 get_base_conninfo(const char *conninfo, char **dbname)
272 PQExpBuffer buf;
273 PQconninfoOption *conn_opts;
274 PQconninfoOption *conn_opt;
275 char *errmsg = NULL;
276 char *ret;
278 conn_opts = PQconninfoParse(conninfo, &errmsg);
279 if (conn_opts == NULL)
281 pg_log_error("could not parse connection string: %s", errmsg);
282 PQfreemem(errmsg);
283 return NULL;
286 buf = createPQExpBuffer();
287 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
289 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
291 if (strcmp(conn_opt->keyword, "dbname") == 0)
293 if (dbname)
294 *dbname = pg_strdup(conn_opt->val);
295 continue;
297 appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
301 ret = pg_strdup(buf->data);
303 destroyPQExpBuffer(buf);
304 PQconninfoFree(conn_opts);
306 return ret;
310 * Build a subscriber connection string. Only a few parameters are supported
311 * since it starts a server with restricted access.
313 static char *
314 get_sub_conninfo(const struct CreateSubscriberOptions *opt)
316 PQExpBuffer buf = createPQExpBuffer();
317 char *ret;
319 appendConnStrItem(buf, "port", opt->sub_port);
320 #if !defined(WIN32)
321 appendConnStrItem(buf, "host", opt->socket_dir);
322 #endif
323 if (opt->sub_username != NULL)
324 appendConnStrItem(buf, "user", opt->sub_username);
325 appendConnStrItem(buf, "fallback_application_name", progname);
327 ret = pg_strdup(buf->data);
329 destroyPQExpBuffer(buf);
331 return ret;
335 * Verify if a PostgreSQL binary (progname) is available in the same directory as
336 * pg_createsubscriber and it has the same version. It returns the absolute
337 * path of the progname.
339 static char *
340 get_exec_path(const char *argv0, const char *progname)
342 char *versionstr;
343 char *exec_path;
344 int ret;
346 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
347 exec_path = pg_malloc(MAXPGPATH);
348 ret = find_other_exec(argv0, progname, versionstr, exec_path);
350 if (ret < 0)
352 char full_path[MAXPGPATH];
354 if (find_my_exec(argv0, full_path) < 0)
355 strlcpy(full_path, progname, sizeof(full_path));
357 if (ret == -1)
358 pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
359 progname, "pg_createsubscriber", full_path);
360 else
361 pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
362 progname, full_path, "pg_createsubscriber");
365 pg_log_debug("%s path is: %s", progname, exec_path);
367 return exec_path;
371 * Is it a cluster directory? These are preliminary checks. It is far from
372 * making an accurate check. If it is not a clone from the publisher, it will
373 * eventually fail in a future step.
375 static void
376 check_data_directory(const char *datadir)
378 struct stat statbuf;
379 char versionfile[MAXPGPATH];
381 pg_log_info("checking if directory \"%s\" is a cluster data directory",
382 datadir);
384 if (stat(datadir, &statbuf) != 0)
386 if (errno == ENOENT)
387 pg_fatal("data directory \"%s\" does not exist", datadir);
388 else
389 pg_fatal("could not access directory \"%s\": %m", datadir);
392 snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
393 if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
395 pg_fatal("directory \"%s\" is not a database cluster directory",
396 datadir);
401 * Append database name into a base connection string.
403 * dbname is the only parameter that changes so it is not included in the base
404 * connection string. This function concatenates dbname to build a "real"
405 * connection string.
407 static char *
408 concat_conninfo_dbname(const char *conninfo, const char *dbname)
410 PQExpBuffer buf = createPQExpBuffer();
411 char *ret;
413 Assert(conninfo != NULL);
415 appendPQExpBufferStr(buf, conninfo);
416 appendConnStrItem(buf, "dbname", dbname);
418 ret = pg_strdup(buf->data);
419 destroyPQExpBuffer(buf);
421 return ret;
425 * Store publication and subscription information.
427 * If publication, replication slot and subscription names were specified,
428 * store it here. Otherwise, a generated name will be assigned to the object in
429 * setup_publisher().
431 static struct LogicalRepInfo *
432 store_pub_sub_info(const struct CreateSubscriberOptions *opt,
433 const char *pub_base_conninfo,
434 const char *sub_base_conninfo)
436 struct LogicalRepInfo *dbinfo;
437 SimpleStringListCell *pubcell = NULL;
438 SimpleStringListCell *subcell = NULL;
439 SimpleStringListCell *replslotcell = NULL;
440 int i = 0;
442 dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
444 if (num_pubs > 0)
445 pubcell = opt->pub_names.head;
446 if (num_subs > 0)
447 subcell = opt->sub_names.head;
448 if (num_replslots > 0)
449 replslotcell = opt->replslot_names.head;
451 for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
453 char *conninfo;
455 /* Fill publisher attributes */
456 conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
457 dbinfo[i].pubconninfo = conninfo;
458 dbinfo[i].dbname = cell->val;
459 if (num_pubs > 0)
460 dbinfo[i].pubname = pubcell->val;
461 else
462 dbinfo[i].pubname = NULL;
463 if (num_replslots > 0)
464 dbinfo[i].replslotname = replslotcell->val;
465 else
466 dbinfo[i].replslotname = NULL;
467 dbinfo[i].made_replslot = false;
468 dbinfo[i].made_publication = false;
469 /* Fill subscriber attributes */
470 conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
471 dbinfo[i].subconninfo = conninfo;
472 if (num_subs > 0)
473 dbinfo[i].subname = subcell->val;
474 else
475 dbinfo[i].subname = NULL;
476 /* Other fields will be filled later */
478 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
479 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
480 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
481 dbinfo[i].pubconninfo);
482 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
483 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
484 dbinfo[i].subconninfo);
486 if (num_pubs > 0)
487 pubcell = pubcell->next;
488 if (num_subs > 0)
489 subcell = subcell->next;
490 if (num_replslots > 0)
491 replslotcell = replslotcell->next;
493 i++;
496 return dbinfo;
500 * Open a new connection. If exit_on_error is true, it has an undesired
501 * condition and it should exit immediately.
503 static PGconn *
504 connect_database(const char *conninfo, bool exit_on_error)
506 PGconn *conn;
507 PGresult *res;
509 conn = PQconnectdb(conninfo);
510 if (PQstatus(conn) != CONNECTION_OK)
512 pg_log_error("connection to database failed: %s",
513 PQerrorMessage(conn));
514 PQfinish(conn);
516 if (exit_on_error)
517 exit(1);
518 return NULL;
521 /* Secure search_path */
522 res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
523 if (PQresultStatus(res) != PGRES_TUPLES_OK)
525 pg_log_error("could not clear \"search_path\": %s",
526 PQresultErrorMessage(res));
527 PQclear(res);
528 PQfinish(conn);
530 if (exit_on_error)
531 exit(1);
532 return NULL;
534 PQclear(res);
536 return conn;
540 * Close the connection. If exit_on_error is true, it has an undesired
541 * condition and it should exit immediately.
543 static void
544 disconnect_database(PGconn *conn, bool exit_on_error)
546 Assert(conn != NULL);
548 PQfinish(conn);
550 if (exit_on_error)
551 exit(1);
555 * Obtain the system identifier using the provided connection. It will be used
556 * to compare if a data directory is a clone of another one.
558 static uint64
559 get_primary_sysid(const char *conninfo)
561 PGconn *conn;
562 PGresult *res;
563 uint64 sysid;
565 pg_log_info("getting system identifier from publisher");
567 conn = connect_database(conninfo, true);
569 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
570 if (PQresultStatus(res) != PGRES_TUPLES_OK)
572 pg_log_error("could not get system identifier: %s",
573 PQresultErrorMessage(res));
574 disconnect_database(conn, true);
576 if (PQntuples(res) != 1)
578 pg_log_error("could not get system identifier: got %d rows, expected %d row",
579 PQntuples(res), 1);
580 disconnect_database(conn, true);
583 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
585 pg_log_info("system identifier is %llu on publisher",
586 (unsigned long long) sysid);
588 PQclear(res);
589 disconnect_database(conn, false);
591 return sysid;
595 * Obtain the system identifier from control file. It will be used to compare
596 * if a data directory is a clone of another one. This routine is used locally
597 * and avoids a connection.
599 static uint64
600 get_standby_sysid(const char *datadir)
602 ControlFileData *cf;
603 bool crc_ok;
604 uint64 sysid;
606 pg_log_info("getting system identifier from subscriber");
608 cf = get_controlfile(datadir, &crc_ok);
609 if (!crc_ok)
610 pg_fatal("control file appears to be corrupt");
612 sysid = cf->system_identifier;
614 pg_log_info("system identifier is %llu on subscriber",
615 (unsigned long long) sysid);
617 pg_free(cf);
619 return sysid;
623 * Modify the system identifier. Since a standby server preserves the system
624 * identifier, it makes sense to change it to avoid situations in which WAL
625 * files from one of the systems might be used in the other one.
627 static void
628 modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
630 ControlFileData *cf;
631 bool crc_ok;
632 struct timeval tv;
634 char *cmd_str;
636 pg_log_info("modifying system identifier of subscriber");
638 cf = get_controlfile(subscriber_dir, &crc_ok);
639 if (!crc_ok)
640 pg_fatal("control file appears to be corrupt");
643 * Select a new system identifier.
645 * XXX this code was extracted from BootStrapXLOG().
647 gettimeofday(&tv, NULL);
648 cf->system_identifier = ((uint64) tv.tv_sec) << 32;
649 cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
650 cf->system_identifier |= getpid() & 0xFFF;
652 if (!dry_run)
653 update_controlfile(subscriber_dir, cf, true);
655 pg_log_info("system identifier is %llu on subscriber",
656 (unsigned long long) cf->system_identifier);
658 pg_log_info("running pg_resetwal on the subscriber");
660 cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
661 subscriber_dir, DEVNULL);
663 pg_log_debug("pg_resetwal command is: %s", cmd_str);
665 if (!dry_run)
667 int rc = system(cmd_str);
669 if (rc == 0)
670 pg_log_info("subscriber successfully changed the system identifier");
671 else
672 pg_fatal("could not change system identifier of subscriber: %s", wait_result_to_str(rc));
675 pg_free(cf);
679 * Generate an object name using a prefix, database oid and a random integer.
680 * It is used in case the user does not specify an object name (publication,
681 * subscription, replication slot).
683 static char *
684 generate_object_name(PGconn *conn)
686 PGresult *res;
687 Oid oid;
688 uint32 rand;
689 char *objname;
691 res = PQexec(conn,
692 "SELECT oid FROM pg_catalog.pg_database "
693 "WHERE datname = pg_catalog.current_database()");
694 if (PQresultStatus(res) != PGRES_TUPLES_OK)
696 pg_log_error("could not obtain database OID: %s",
697 PQresultErrorMessage(res));
698 disconnect_database(conn, true);
701 if (PQntuples(res) != 1)
703 pg_log_error("could not obtain database OID: got %d rows, expected %d row",
704 PQntuples(res), 1);
705 disconnect_database(conn, true);
708 /* Database OID */
709 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
711 PQclear(res);
713 /* Random unsigned integer */
714 rand = pg_prng_uint32(&prng_state);
717 * Build the object name. The name must not exceed NAMEDATALEN - 1. This
718 * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
719 * '\0').
721 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
723 return objname;
727 * Create the publications and replication slots in preparation for logical
728 * replication. Returns the LSN from latest replication slot. It will be the
729 * replication start point that is used to adjust the subscriptions (see
730 * set_replication_progress).
732 static char *
733 setup_publisher(struct LogicalRepInfo *dbinfo)
735 char *lsn = NULL;
737 pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
739 for (int i = 0; i < num_dbs; i++)
741 PGconn *conn;
742 char *genname = NULL;
744 conn = connect_database(dbinfo[i].pubconninfo, true);
747 * If an object name was not specified as command-line options, assign
748 * a generated object name. The replication slot has a different rule.
749 * The subscription name is assigned to the replication slot name if
750 * no replication slot is specified. It follows the same rule as
751 * CREATE SUBSCRIPTION.
753 if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
754 genname = generate_object_name(conn);
755 if (num_pubs == 0)
756 dbinfo[i].pubname = pg_strdup(genname);
757 if (num_subs == 0)
758 dbinfo[i].subname = pg_strdup(genname);
759 if (num_replslots == 0)
760 dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
763 * Create publication on publisher. This step should be executed
764 * *before* promoting the subscriber to avoid any transactions between
765 * consistent LSN and the new publication rows (such transactions
766 * wouldn't see the new publication rows resulting in an error).
768 create_publication(conn, &dbinfo[i]);
770 /* Create replication slot on publisher */
771 if (lsn)
772 pg_free(lsn);
773 lsn = create_logical_replication_slot(conn, &dbinfo[i]);
774 if (lsn != NULL || dry_run)
775 pg_log_info("create replication slot \"%s\" on publisher",
776 dbinfo[i].replslotname);
777 else
778 exit(1);
781 * Since we are using the LSN returned by the last replication slot as
782 * recovery_target_lsn, this LSN is ahead of the current WAL position
783 * and the recovery waits until the publisher writes a WAL record to
784 * reach the target and ends the recovery. On idle systems, this wait
785 * time is unpredictable and could lead to failure in promoting the
786 * subscriber. To avoid that, insert a harmless WAL record.
788 if (i == num_dbs - 1 && !dry_run)
790 PGresult *res;
792 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
793 if (PQresultStatus(res) != PGRES_TUPLES_OK)
795 pg_log_error("could not write an additional WAL record: %s",
796 PQresultErrorMessage(res));
797 disconnect_database(conn, true);
799 PQclear(res);
802 disconnect_database(conn, false);
805 return lsn;
809 * Is recovery still in progress?
811 static bool
812 server_is_in_recovery(PGconn *conn)
814 PGresult *res;
815 int ret;
817 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
819 if (PQresultStatus(res) != PGRES_TUPLES_OK)
821 pg_log_error("could not obtain recovery progress: %s",
822 PQresultErrorMessage(res));
823 disconnect_database(conn, true);
827 ret = strcmp("t", PQgetvalue(res, 0, 0));
829 PQclear(res);
831 return ret == 0;
835 * Is the primary server ready for logical replication?
837 * XXX Does it not allow a synchronous replica?
839 static void
840 check_publisher(const struct LogicalRepInfo *dbinfo)
842 PGconn *conn;
843 PGresult *res;
844 bool failed = false;
846 char *wal_level;
847 int max_repslots;
848 int cur_repslots;
849 int max_walsenders;
850 int cur_walsenders;
851 int max_prepared_transactions;
853 pg_log_info("checking settings on publisher");
855 conn = connect_database(dbinfo[0].pubconninfo, true);
858 * If the primary server is in recovery (i.e. cascading replication),
859 * objects (publication) cannot be created because it is read only.
861 if (server_is_in_recovery(conn))
863 pg_log_error("primary server cannot be in recovery");
864 disconnect_database(conn, true);
867 /*------------------------------------------------------------------------
868 * Logical replication requires a few parameters to be set on publisher.
869 * Since these parameters are not a requirement for physical replication,
870 * we should check it to make sure it won't fail.
872 * - wal_level = logical
873 * - max_replication_slots >= current + number of dbs to be converted
874 * - max_wal_senders >= current + number of dbs to be converted
875 * -----------------------------------------------------------------------
877 res = PQexec(conn,
878 "SELECT pg_catalog.current_setting('wal_level'),"
879 " pg_catalog.current_setting('max_replication_slots'),"
880 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
881 " pg_catalog.current_setting('max_wal_senders'),"
882 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
883 " pg_catalog.current_setting('max_prepared_transactions')");
885 if (PQresultStatus(res) != PGRES_TUPLES_OK)
887 pg_log_error("could not obtain publisher settings: %s",
888 PQresultErrorMessage(res));
889 disconnect_database(conn, true);
892 wal_level = pg_strdup(PQgetvalue(res, 0, 0));
893 max_repslots = atoi(PQgetvalue(res, 0, 1));
894 cur_repslots = atoi(PQgetvalue(res, 0, 2));
895 max_walsenders = atoi(PQgetvalue(res, 0, 3));
896 cur_walsenders = atoi(PQgetvalue(res, 0, 4));
897 max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
899 PQclear(res);
901 pg_log_debug("publisher: wal_level: %s", wal_level);
902 pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
903 pg_log_debug("publisher: current replication slots: %d", cur_repslots);
904 pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
905 pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
906 pg_log_debug("publisher: max_prepared_transactions: %d",
907 max_prepared_transactions);
909 disconnect_database(conn, false);
911 if (strcmp(wal_level, "logical") != 0)
913 pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
914 failed = true;
917 if (max_repslots - cur_repslots < num_dbs)
919 pg_log_error("publisher requires %d replication slots, but only %d remain",
920 num_dbs, max_repslots - cur_repslots);
921 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
922 "max_replication_slots", cur_repslots + num_dbs);
923 failed = true;
926 if (max_walsenders - cur_walsenders < num_dbs)
928 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
929 num_dbs, max_walsenders - cur_walsenders);
930 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
931 "max_wal_senders", cur_walsenders + num_dbs);
932 failed = true;
935 if (max_prepared_transactions != 0)
937 pg_log_warning("two_phase option will not be enabled for replication slots");
938 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
939 "Prepared transactions will be replicated at COMMIT PREPARED.");
942 pg_free(wal_level);
944 if (failed)
945 exit(1);
949 * Is the standby server ready for logical replication?
951 * XXX Does it not allow a time-delayed replica?
953 * XXX In a cascaded replication scenario (P -> S -> C), if the target server
954 * is S, it cannot detect there is a replica (server C) because server S starts
955 * accepting only local connections and server C cannot connect to it. Hence,
956 * there is not a reliable way to provide a suitable error saying the server C
957 * will be broken at the end of this process (due to pg_resetwal).
959 static void
960 check_subscriber(const struct LogicalRepInfo *dbinfo)
962 PGconn *conn;
963 PGresult *res;
964 bool failed = false;
966 int max_lrworkers;
967 int max_repslots;
968 int max_wprocs;
970 pg_log_info("checking settings on subscriber");
972 conn = connect_database(dbinfo[0].subconninfo, true);
974 /* The target server must be a standby */
975 if (!server_is_in_recovery(conn))
977 pg_log_error("target server must be a standby");
978 disconnect_database(conn, true);
981 /*------------------------------------------------------------------------
982 * Logical replication requires a few parameters to be set on subscriber.
983 * Since these parameters are not a requirement for physical replication,
984 * we should check it to make sure it won't fail.
986 * - max_replication_slots >= number of dbs to be converted
987 * - max_logical_replication_workers >= number of dbs to be converted
988 * - max_worker_processes >= 1 + number of dbs to be converted
989 *------------------------------------------------------------------------
991 res = PQexec(conn,
992 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
993 "'max_logical_replication_workers', "
994 "'max_replication_slots', "
995 "'max_worker_processes', "
996 "'primary_slot_name') "
997 "ORDER BY name");
999 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1001 pg_log_error("could not obtain subscriber settings: %s",
1002 PQresultErrorMessage(res));
1003 disconnect_database(conn, true);
1006 max_lrworkers = atoi(PQgetvalue(res, 0, 0));
1007 max_repslots = atoi(PQgetvalue(res, 1, 0));
1008 max_wprocs = atoi(PQgetvalue(res, 2, 0));
1009 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1010 primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
1012 pg_log_debug("subscriber: max_logical_replication_workers: %d",
1013 max_lrworkers);
1014 pg_log_debug("subscriber: max_replication_slots: %d", max_repslots);
1015 pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1016 if (primary_slot_name)
1017 pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1019 PQclear(res);
1021 disconnect_database(conn, false);
1023 if (max_repslots < num_dbs)
1025 pg_log_error("subscriber requires %d replication slots, but only %d remain",
1026 num_dbs, max_repslots);
1027 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1028 "max_replication_slots", num_dbs);
1029 failed = true;
1032 if (max_lrworkers < num_dbs)
1034 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1035 num_dbs, max_lrworkers);
1036 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1037 "max_logical_replication_workers", num_dbs);
1038 failed = true;
1041 if (max_wprocs < num_dbs + 1)
1043 pg_log_error("subscriber requires %d worker processes, but only %d remain",
1044 num_dbs + 1, max_wprocs);
1045 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1046 "max_worker_processes", num_dbs + 1);
1047 failed = true;
1050 if (failed)
1051 exit(1);
1055 * Drop a specified subscription. This is to avoid duplicate subscriptions on
1056 * the primary (publisher node) and the newly created subscriber. We
1057 * shouldn't drop the associated slot as that would be used by the publisher
1058 * node.
1060 static void
1061 drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
1063 PQExpBuffer query = createPQExpBuffer();
1064 PGresult *res;
1066 Assert(conn != NULL);
1069 * Construct a query string. These commands are allowed to be executed
1070 * within a transaction.
1072 appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1073 subname);
1074 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1075 subname);
1076 appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1078 pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1079 subname, dbname);
1081 if (!dry_run)
1083 res = PQexec(conn, query->data);
1085 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1087 pg_log_error("could not drop subscription \"%s\": %s",
1088 subname, PQresultErrorMessage(res));
1089 disconnect_database(conn, true);
1092 PQclear(res);
1095 destroyPQExpBuffer(query);
1099 * Retrieve and drop the pre-existing subscriptions.
1101 static void
1102 check_and_drop_existing_subscriptions(PGconn *conn,
1103 const struct LogicalRepInfo *dbinfo)
1105 PQExpBuffer query = createPQExpBuffer();
1106 char *dbname;
1107 PGresult *res;
1109 Assert(conn != NULL);
1111 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1113 appendPQExpBuffer(query,
1114 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1115 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1116 "WHERE d.datname = %s",
1117 dbname);
1118 res = PQexec(conn, query->data);
1120 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1122 pg_log_error("could not obtain pre-existing subscriptions: %s",
1123 PQresultErrorMessage(res));
1124 disconnect_database(conn, true);
1127 for (int i = 0; i < PQntuples(res); i++)
1128 drop_existing_subscriptions(conn, PQgetvalue(res, i, 0),
1129 dbinfo->dbname);
1131 PQclear(res);
1132 destroyPQExpBuffer(query);
1136 * Create the subscriptions, adjust the initial location for logical
1137 * replication and enable the subscriptions. That's the last step for logical
1138 * replication setup.
1140 static void
1141 setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1143 for (int i = 0; i < num_dbs; i++)
1145 PGconn *conn;
1147 /* Connect to subscriber. */
1148 conn = connect_database(dbinfo[i].subconninfo, true);
1151 * We don't need the pre-existing subscriptions on the newly formed
1152 * subscriber. They can connect to other publisher nodes and either
1153 * get some unwarranted data or can lead to ERRORs in connecting to
1154 * such nodes.
1156 check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
1159 * Since the publication was created before the consistent LSN, it is
1160 * available on the subscriber when the physical replica is promoted.
1161 * Remove publications from the subscriber because it has no use.
1163 drop_publication(conn, &dbinfo[i]);
1165 create_subscription(conn, &dbinfo[i]);
1167 /* Set the replication progress to the correct LSN */
1168 set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1170 /* Enable subscription */
1171 enable_subscription(conn, &dbinfo[i]);
1173 disconnect_database(conn, false);
1178 * Write the required recovery parameters.
1180 static void
1181 setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1183 PGconn *conn;
1184 PQExpBuffer recoveryconfcontents;
1187 * Despite of the recovery parameters will be written to the subscriber,
1188 * use a publisher connection. The primary_conninfo is generated using the
1189 * connection settings.
1191 conn = connect_database(dbinfo[0].pubconninfo, true);
1194 * Write recovery parameters.
1196 * The subscriber is not running yet. In dry run mode, the recovery
1197 * parameters *won't* be written. An invalid LSN is used for printing
1198 * purposes. Additional recovery parameters are added here. It avoids
1199 * unexpected behavior such as end of recovery as soon as a consistent
1200 * state is reached (recovery_target) and failure due to multiple recovery
1201 * targets (name, time, xid, LSN).
1203 recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
1204 appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
1205 appendPQExpBuffer(recoveryconfcontents,
1206 "recovery_target_timeline = 'latest'\n");
1207 appendPQExpBuffer(recoveryconfcontents,
1208 "recovery_target_inclusive = true\n");
1209 appendPQExpBuffer(recoveryconfcontents,
1210 "recovery_target_action = promote\n");
1211 appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
1212 appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
1213 appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
1215 if (dry_run)
1217 appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
1218 appendPQExpBuffer(recoveryconfcontents,
1219 "recovery_target_lsn = '%X/%X'\n",
1220 LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1222 else
1224 appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1225 lsn);
1226 WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
1228 disconnect_database(conn, false);
1230 pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1234 * Drop physical replication slot on primary if the standby was using it. After
1235 * the transformation, it has no use.
1237 * XXX we might not fail here. Instead, we provide a warning so the user
1238 * eventually drops this replication slot later.
1240 static void
1241 drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1243 PGconn *conn;
1245 /* Replication slot does not exist, do nothing */
1246 if (!primary_slot_name)
1247 return;
1249 conn = connect_database(dbinfo[0].pubconninfo, false);
1250 if (conn != NULL)
1252 drop_replication_slot(conn, &dbinfo[0], slotname);
1253 disconnect_database(conn, false);
1255 else
1257 pg_log_warning("could not drop replication slot \"%s\" on primary",
1258 slotname);
1259 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1264 * Drop failover replication slots on subscriber. After the transformation,
1265 * they have no use.
1267 * XXX We do not fail here. Instead, we provide a warning so the user can drop
1268 * them later.
1270 static void
1271 drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
1273 PGconn *conn;
1274 PGresult *res;
1276 conn = connect_database(dbinfo[0].subconninfo, false);
1277 if (conn != NULL)
1279 /* Get failover replication slot names */
1280 res = PQexec(conn,
1281 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1283 if (PQresultStatus(res) == PGRES_TUPLES_OK)
1285 /* Remove failover replication slots from subscriber */
1286 for (int i = 0; i < PQntuples(res); i++)
1287 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1289 else
1291 pg_log_warning("could not obtain failover replication slot information: %s",
1292 PQresultErrorMessage(res));
1293 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1296 PQclear(res);
1297 disconnect_database(conn, false);
1299 else
1301 pg_log_warning("could not drop failover replication slot");
1302 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1307 * Create a logical replication slot and returns a LSN.
1309 * CreateReplicationSlot() is not used because it does not provide the one-row
1310 * result set that contains the LSN.
1312 static char *
1313 create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
1315 PQExpBuffer str = createPQExpBuffer();
1316 PGresult *res = NULL;
1317 const char *slot_name = dbinfo->replslotname;
1318 char *slot_name_esc;
1319 char *lsn = NULL;
1321 Assert(conn != NULL);
1323 pg_log_info("creating the replication slot \"%s\" in database \"%s\"",
1324 slot_name, dbinfo->dbname);
1326 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1328 appendPQExpBuffer(str,
1329 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
1330 slot_name_esc);
1332 pg_free(slot_name_esc);
1334 pg_log_debug("command is: %s", str->data);
1336 if (!dry_run)
1338 res = PQexec(conn, str->data);
1339 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1341 pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1342 slot_name, dbinfo->dbname,
1343 PQresultErrorMessage(res));
1344 PQclear(res);
1345 destroyPQExpBuffer(str);
1346 return NULL;
1349 lsn = pg_strdup(PQgetvalue(res, 0, 0));
1350 PQclear(res);
1353 /* For cleanup purposes */
1354 dbinfo->made_replslot = true;
1356 destroyPQExpBuffer(str);
1358 return lsn;
1361 static void
1362 drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
1363 const char *slot_name)
1365 PQExpBuffer str = createPQExpBuffer();
1366 char *slot_name_esc;
1367 PGresult *res;
1369 Assert(conn != NULL);
1371 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1372 slot_name, dbinfo->dbname);
1374 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1376 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1378 pg_free(slot_name_esc);
1380 pg_log_debug("command is: %s", str->data);
1382 if (!dry_run)
1384 res = PQexec(conn, str->data);
1385 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1387 pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1388 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1389 dbinfo->made_replslot = false; /* don't try again. */
1392 PQclear(res);
1395 destroyPQExpBuffer(str);
1399 * Reports a suitable message if pg_ctl fails.
1401 static void
1402 pg_ctl_status(const char *pg_ctl_cmd, int rc)
1404 if (rc != 0)
1406 if (WIFEXITED(rc))
1408 pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1410 else if (WIFSIGNALED(rc))
1412 #if defined(WIN32)
1413 pg_log_error("pg_ctl was terminated by exception 0x%X",
1414 WTERMSIG(rc));
1415 pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1416 #else
1417 pg_log_error("pg_ctl was terminated by signal %d: %s",
1418 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1419 #endif
1421 else
1423 pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1426 pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1427 exit(1);
1431 static void
1432 start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1433 bool restrict_logical_worker)
1435 PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1436 int rc;
1438 appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1439 appendShellString(pg_ctl_cmd, subscriber_dir);
1440 appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1441 if (restricted_access)
1443 appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1444 #if !defined(WIN32)
1447 * An empty listen_addresses list means the server does not listen on
1448 * any IP interfaces; only Unix-domain sockets can be used to connect
1449 * to the server. Prevent external connections to minimize the chance
1450 * of failure.
1452 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1453 if (opt->socket_dir)
1454 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1455 opt->socket_dir);
1456 appendPQExpBufferChar(pg_ctl_cmd, '"');
1457 #endif
1459 if (opt->config_file != NULL)
1460 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1461 opt->config_file);
1463 /* Suppress to start logical replication if requested */
1464 if (restrict_logical_worker)
1465 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1467 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1468 rc = system(pg_ctl_cmd->data);
1469 pg_ctl_status(pg_ctl_cmd->data, rc);
1470 standby_running = true;
1471 destroyPQExpBuffer(pg_ctl_cmd);
1472 pg_log_info("server was started");
1475 static void
1476 stop_standby_server(const char *datadir)
1478 char *pg_ctl_cmd;
1479 int rc;
1481 pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1482 datadir);
1483 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1484 rc = system(pg_ctl_cmd);
1485 pg_ctl_status(pg_ctl_cmd, rc);
1486 standby_running = false;
1487 pg_log_info("server was stopped");
1491 * Returns after the server finishes the recovery process.
1493 * If recovery_timeout option is set, terminate abnormally without finishing
1494 * the recovery process. By default, it waits forever.
1496 * XXX Is the recovery process still in progress? When recovery process has a
1497 * better progress reporting mechanism, it should be added here.
1499 static void
1500 wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1502 PGconn *conn;
1503 int status = POSTMASTER_STILL_STARTING;
1504 int timer = 0;
1506 pg_log_info("waiting for the target server to reach the consistent state");
1508 conn = connect_database(conninfo, true);
1510 for (;;)
1512 bool in_recovery = server_is_in_recovery(conn);
1515 * Does the recovery process finish? In dry run mode, there is no
1516 * recovery mode. Bail out as the recovery process has ended.
1518 if (!in_recovery || dry_run)
1520 status = POSTMASTER_READY;
1521 recovery_ended = true;
1522 break;
1525 /* Bail out after recovery_timeout seconds if this option is set */
1526 if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1528 stop_standby_server(subscriber_dir);
1529 pg_log_error("recovery timed out");
1530 disconnect_database(conn, true);
1533 /* Keep waiting */
1534 pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
1536 timer += WAIT_INTERVAL;
1539 disconnect_database(conn, false);
1541 if (status == POSTMASTER_STILL_STARTING)
1542 pg_fatal("server did not end recovery");
1544 pg_log_info("target server reached the consistent state");
1545 pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1549 * Create a publication that includes all tables in the database.
1551 static void
1552 create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1554 PQExpBuffer str = createPQExpBuffer();
1555 PGresult *res;
1556 char *ipubname_esc;
1557 char *spubname_esc;
1559 Assert(conn != NULL);
1561 ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1562 spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1564 /* Check if the publication already exists */
1565 appendPQExpBuffer(str,
1566 "SELECT 1 FROM pg_catalog.pg_publication "
1567 "WHERE pubname = %s",
1568 spubname_esc);
1569 res = PQexec(conn, str->data);
1570 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1572 pg_log_error("could not obtain publication information: %s",
1573 PQresultErrorMessage(res));
1574 disconnect_database(conn, true);
1577 if (PQntuples(res) == 1)
1580 * Unfortunately, if it reaches this code path, it will always fail
1581 * (unless you decide to change the existing publication name). That's
1582 * bad but it is very unlikely that the user will choose a name with
1583 * pg_createsubscriber_ prefix followed by the exact database oid and
1584 * a random number.
1586 pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1587 pg_log_error_hint("Consider renaming this publication before continuing.");
1588 disconnect_database(conn, true);
1591 PQclear(res);
1592 resetPQExpBuffer(str);
1594 pg_log_info("creating publication \"%s\" in database \"%s\"",
1595 dbinfo->pubname, dbinfo->dbname);
1597 appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1598 ipubname_esc);
1600 pg_log_debug("command is: %s", str->data);
1602 if (!dry_run)
1604 res = PQexec(conn, str->data);
1605 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1607 pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1608 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1609 disconnect_database(conn, true);
1611 PQclear(res);
1614 /* For cleanup purposes */
1615 dbinfo->made_publication = true;
1617 pg_free(ipubname_esc);
1618 pg_free(spubname_esc);
1619 destroyPQExpBuffer(str);
1623 * Remove publication if it couldn't finish all steps.
1625 static void
1626 drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1628 PQExpBuffer str = createPQExpBuffer();
1629 PGresult *res;
1630 char *pubname_esc;
1632 Assert(conn != NULL);
1634 pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1636 pg_log_info("dropping publication \"%s\" in database \"%s\"",
1637 dbinfo->pubname, dbinfo->dbname);
1639 appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1641 pg_free(pubname_esc);
1643 pg_log_debug("command is: %s", str->data);
1645 if (!dry_run)
1647 res = PQexec(conn, str->data);
1648 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1650 pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1651 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1652 dbinfo->made_publication = false; /* don't try again. */
1655 * Don't disconnect and exit here. This routine is used by primary
1656 * (cleanup publication / replication slot due to an error) and
1657 * subscriber (remove the replicated publications). In both cases,
1658 * it can continue and provide instructions for the user to remove
1659 * it later if cleanup fails.
1662 PQclear(res);
1665 destroyPQExpBuffer(str);
1669 * Create a subscription with some predefined options.
1671 * A replication slot was already created in a previous step. Let's use it. It
1672 * is not required to copy data. The subscription will be created but it will
1673 * not be enabled now. That's because the replication progress must be set and
1674 * the replication origin name (one of the function arguments) contains the
1675 * subscription OID in its name. Once the subscription is created,
1676 * set_replication_progress() can obtain the chosen origin name and set up its
1677 * initial location.
1679 static void
1680 create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1682 PQExpBuffer str = createPQExpBuffer();
1683 PGresult *res;
1684 char *pubname_esc;
1685 char *subname_esc;
1686 char *pubconninfo_esc;
1687 char *replslotname_esc;
1689 Assert(conn != NULL);
1691 pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1692 subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1693 pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1694 replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1696 pg_log_info("creating subscription \"%s\" in database \"%s\"",
1697 dbinfo->subname, dbinfo->dbname);
1699 appendPQExpBuffer(str,
1700 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1701 "WITH (create_slot = false, enabled = false, "
1702 "slot_name = %s, copy_data = false)",
1703 subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
1705 pg_free(pubname_esc);
1706 pg_free(subname_esc);
1707 pg_free(pubconninfo_esc);
1708 pg_free(replslotname_esc);
1710 pg_log_debug("command is: %s", str->data);
1712 if (!dry_run)
1714 res = PQexec(conn, str->data);
1715 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1717 pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1718 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
1719 disconnect_database(conn, true);
1721 PQclear(res);
1724 destroyPQExpBuffer(str);
1728 * Sets the replication progress to the consistent LSN.
1730 * The subscriber caught up to the consistent LSN provided by the last
1731 * replication slot that was created. The goal is to set up the initial
1732 * location for the logical replication that is the exact LSN that the
1733 * subscriber was promoted. Once the subscription is enabled it will start
1734 * streaming from that location onwards. In dry run mode, the subscription OID
1735 * and LSN are set to invalid values for printing purposes.
1737 static void
1738 set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
1740 PQExpBuffer str = createPQExpBuffer();
1741 PGresult *res;
1742 Oid suboid;
1743 char *subname;
1744 char *dbname;
1745 char *originname;
1746 char *lsnstr;
1748 Assert(conn != NULL);
1750 subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
1751 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1753 appendPQExpBuffer(str,
1754 "SELECT s.oid FROM pg_catalog.pg_subscription s "
1755 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1756 "WHERE s.subname = %s AND d.datname = %s",
1757 subname, dbname);
1759 res = PQexec(conn, str->data);
1760 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1762 pg_log_error("could not obtain subscription OID: %s",
1763 PQresultErrorMessage(res));
1764 disconnect_database(conn, true);
1767 if (PQntuples(res) != 1 && !dry_run)
1769 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1770 PQntuples(res), 1);
1771 disconnect_database(conn, true);
1774 if (dry_run)
1776 suboid = InvalidOid;
1777 lsnstr = psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1779 else
1781 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1782 lsnstr = psprintf("%s", lsn);
1785 PQclear(res);
1788 * The origin name is defined as pg_%u. %u is the subscription OID. See
1789 * ApplyWorkerMain().
1791 originname = psprintf("pg_%u", suboid);
1793 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1794 originname, lsnstr, dbinfo->dbname);
1796 resetPQExpBuffer(str);
1797 appendPQExpBuffer(str,
1798 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1799 originname, lsnstr);
1801 pg_log_debug("command is: %s", str->data);
1803 if (!dry_run)
1805 res = PQexec(conn, str->data);
1806 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1808 pg_log_error("could not set replication progress for subscription \"%s\": %s",
1809 dbinfo->subname, PQresultErrorMessage(res));
1810 disconnect_database(conn, true);
1812 PQclear(res);
1815 pg_free(subname);
1816 pg_free(dbname);
1817 pg_free(originname);
1818 pg_free(lsnstr);
1819 destroyPQExpBuffer(str);
1823 * Enables the subscription.
1825 * The subscription was created in a previous step but it was disabled. After
1826 * adjusting the initial logical replication location, enable the subscription.
1828 static void
1829 enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1831 PQExpBuffer str = createPQExpBuffer();
1832 PGresult *res;
1833 char *subname;
1835 Assert(conn != NULL);
1837 subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1839 pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1840 dbinfo->subname, dbinfo->dbname);
1842 appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1844 pg_log_debug("command is: %s", str->data);
1846 if (!dry_run)
1848 res = PQexec(conn, str->data);
1849 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1851 pg_log_error("could not enable subscription \"%s\": %s",
1852 dbinfo->subname, PQresultErrorMessage(res));
1853 disconnect_database(conn, true);
1856 PQclear(res);
1859 pg_free(subname);
1860 destroyPQExpBuffer(str);
1864 main(int argc, char **argv)
1866 static struct option long_options[] =
1868 {"database", required_argument, NULL, 'd'},
1869 {"pgdata", required_argument, NULL, 'D'},
1870 {"dry-run", no_argument, NULL, 'n'},
1871 {"subscriber-port", required_argument, NULL, 'p'},
1872 {"publisher-server", required_argument, NULL, 'P'},
1873 {"socketdir", required_argument, NULL, 's'},
1874 {"recovery-timeout", required_argument, NULL, 't'},
1875 {"subscriber-username", required_argument, NULL, 'U'},
1876 {"verbose", no_argument, NULL, 'v'},
1877 {"version", no_argument, NULL, 'V'},
1878 {"help", no_argument, NULL, '?'},
1879 {"config-file", required_argument, NULL, 1},
1880 {"publication", required_argument, NULL, 2},
1881 {"replication-slot", required_argument, NULL, 3},
1882 {"subscription", required_argument, NULL, 4},
1883 {NULL, 0, NULL, 0}
1886 struct CreateSubscriberOptions opt = {0};
1888 int c;
1889 int option_index;
1891 char *pub_base_conninfo;
1892 char *sub_base_conninfo;
1893 char *dbname_conninfo = NULL;
1895 uint64 pub_sysid;
1896 uint64 sub_sysid;
1897 struct stat statbuf;
1899 char *consistent_lsn;
1901 char pidfile[MAXPGPATH];
1903 pg_logging_init(argv[0]);
1904 pg_logging_set_level(PG_LOG_WARNING);
1905 progname = get_progname(argv[0]);
1906 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
1908 if (argc > 1)
1910 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
1912 usage();
1913 exit(0);
1915 else if (strcmp(argv[1], "-V") == 0
1916 || strcmp(argv[1], "--version") == 0)
1918 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
1919 exit(0);
1923 /* Default settings */
1924 subscriber_dir = NULL;
1925 opt.config_file = NULL;
1926 opt.pub_conninfo_str = NULL;
1927 opt.socket_dir = NULL;
1928 opt.sub_port = DEFAULT_SUB_PORT;
1929 opt.sub_username = NULL;
1930 opt.database_names = (SimpleStringList)
1934 opt.recovery_timeout = 0;
1937 * Don't allow it to be run as root. It uses pg_ctl which does not allow
1938 * it either.
1940 #ifndef WIN32
1941 if (geteuid() == 0)
1943 pg_log_error("cannot be executed by \"root\"");
1944 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
1945 progname);
1946 exit(1);
1948 #endif
1950 get_restricted_token();
1952 while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
1953 long_options, &option_index)) != -1)
1955 switch (c)
1957 case 'd':
1958 if (!simple_string_list_member(&opt.database_names, optarg))
1960 simple_string_list_append(&opt.database_names, optarg);
1961 num_dbs++;
1963 else
1965 pg_log_error("database \"%s\" specified more than once", optarg);
1966 exit(1);
1968 break;
1969 case 'D':
1970 subscriber_dir = pg_strdup(optarg);
1971 canonicalize_path(subscriber_dir);
1972 break;
1973 case 'n':
1974 dry_run = true;
1975 break;
1976 case 'p':
1977 opt.sub_port = pg_strdup(optarg);
1978 break;
1979 case 'P':
1980 opt.pub_conninfo_str = pg_strdup(optarg);
1981 break;
1982 case 's':
1983 opt.socket_dir = pg_strdup(optarg);
1984 canonicalize_path(opt.socket_dir);
1985 break;
1986 case 't':
1987 opt.recovery_timeout = atoi(optarg);
1988 break;
1989 case 'U':
1990 opt.sub_username = pg_strdup(optarg);
1991 break;
1992 case 'v':
1993 pg_logging_increase_verbosity();
1994 break;
1995 case 1:
1996 opt.config_file = pg_strdup(optarg);
1997 break;
1998 case 2:
1999 if (!simple_string_list_member(&opt.pub_names, optarg))
2001 simple_string_list_append(&opt.pub_names, optarg);
2002 num_pubs++;
2004 else
2006 pg_log_error("publication \"%s\" specified more than once", optarg);
2007 exit(1);
2009 break;
2010 case 3:
2011 if (!simple_string_list_member(&opt.replslot_names, optarg))
2013 simple_string_list_append(&opt.replslot_names, optarg);
2014 num_replslots++;
2016 else
2018 pg_log_error("replication slot \"%s\" specified more than once", optarg);
2019 exit(1);
2021 break;
2022 case 4:
2023 if (!simple_string_list_member(&opt.sub_names, optarg))
2025 simple_string_list_append(&opt.sub_names, optarg);
2026 num_subs++;
2028 else
2030 pg_log_error("subscription \"%s\" specified more than once", optarg);
2031 exit(1);
2033 break;
2034 default:
2035 /* getopt_long already emitted a complaint */
2036 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2037 exit(1);
2041 /* Any non-option arguments? */
2042 if (optind < argc)
2044 pg_log_error("too many command-line arguments (first is \"%s\")",
2045 argv[optind]);
2046 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2047 exit(1);
2050 /* Required arguments */
2051 if (subscriber_dir == NULL)
2053 pg_log_error("no subscriber data directory specified");
2054 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2055 exit(1);
2058 /* If socket directory is not provided, use the current directory */
2059 if (opt.socket_dir == NULL)
2061 char cwd[MAXPGPATH];
2063 if (!getcwd(cwd, MAXPGPATH))
2064 pg_fatal("could not determine current directory");
2065 opt.socket_dir = pg_strdup(cwd);
2066 canonicalize_path(opt.socket_dir);
2070 * Parse connection string. Build a base connection string that might be
2071 * reused by multiple databases.
2073 if (opt.pub_conninfo_str == NULL)
2076 * TODO use primary_conninfo (if available) from subscriber and
2077 * extract publisher connection string. Assume that there are
2078 * identical entries for physical and logical replication. If there is
2079 * not, we would fail anyway.
2081 pg_log_error("no publisher connection string specified");
2082 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2083 exit(1);
2085 pg_log_info("validating publisher connection string");
2086 pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2087 &dbname_conninfo);
2088 if (pub_base_conninfo == NULL)
2089 exit(1);
2091 pg_log_info("validating subscriber connection string");
2092 sub_base_conninfo = get_sub_conninfo(&opt);
2094 if (opt.database_names.head == NULL)
2096 pg_log_info("no database was specified");
2099 * If --database option is not provided, try to obtain the dbname from
2100 * the publisher conninfo. If dbname parameter is not available, error
2101 * out.
2103 if (dbname_conninfo)
2105 simple_string_list_append(&opt.database_names, dbname_conninfo);
2106 num_dbs++;
2108 pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2109 dbname_conninfo);
2111 else
2113 pg_log_error("no database name specified");
2114 pg_log_error_hint("Try \"%s --help\" for more information.",
2115 progname);
2116 exit(1);
2120 /* Number of object names must match number of databases */
2121 if (num_pubs > 0 && num_pubs != num_dbs)
2123 pg_log_error("wrong number of publication names specified");
2124 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2125 num_pubs, num_dbs);
2126 exit(1);
2128 if (num_subs > 0 && num_subs != num_dbs)
2130 pg_log_error("wrong number of subscription names specified");
2131 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2132 num_subs, num_dbs);
2133 exit(1);
2135 if (num_replslots > 0 && num_replslots != num_dbs)
2137 pg_log_error("wrong number of replication slot names specified");
2138 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2139 num_replslots, num_dbs);
2140 exit(1);
2143 /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2144 pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2145 pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2147 /* Rudimentary check for a data directory */
2148 check_data_directory(subscriber_dir);
2151 * Store database information for publisher and subscriber. It should be
2152 * called before atexit() because its return is used in the
2153 * cleanup_objects_atexit().
2155 dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2157 /* Register a function to clean up objects in case of failure */
2158 atexit(cleanup_objects_atexit);
2161 * Check if the subscriber data directory has the same system identifier
2162 * than the publisher data directory.
2164 pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
2165 sub_sysid = get_standby_sysid(subscriber_dir);
2166 if (pub_sysid != sub_sysid)
2167 pg_fatal("subscriber data directory is not a copy of the source database cluster");
2169 /* Subscriber PID file */
2170 snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2173 * The standby server must not be running. If the server is started under
2174 * service manager and pg_createsubscriber stops it, the service manager
2175 * might react to this action and start the server again. Therefore,
2176 * refuse to proceed if the server is running to avoid possible failures.
2178 if (stat(pidfile, &statbuf) == 0)
2180 pg_log_error("standby server is running");
2181 pg_log_error_hint("Stop the standby server and try again.");
2182 exit(1);
2186 * Start a short-lived standby server with temporary parameters (provided
2187 * by command-line options). The goal is to avoid connections during the
2188 * transformation steps.
2190 pg_log_info("starting the standby server with command-line options");
2191 start_standby_server(&opt, true, false);
2193 /* Check if the standby server is ready for logical replication */
2194 check_subscriber(dbinfo);
2196 /* Check if the primary server is ready for logical replication */
2197 check_publisher(dbinfo);
2200 * Stop the target server. The recovery process requires that the server
2201 * reaches a consistent state before targeting the recovery stop point.
2202 * Make sure a consistent state is reached (stop the target server
2203 * guarantees it) *before* creating the replication slots in
2204 * setup_publisher().
2206 pg_log_info("stopping the subscriber");
2207 stop_standby_server(subscriber_dir);
2209 /* Create the required objects for each database on publisher */
2210 consistent_lsn = setup_publisher(dbinfo);
2212 /* Write the required recovery parameters */
2213 setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
2216 * Start subscriber so the recovery parameters will take effect. Wait
2217 * until accepting connections. We don't want to start logical replication
2218 * during setup.
2220 pg_log_info("starting the subscriber");
2221 start_standby_server(&opt, true, true);
2223 /* Waiting the subscriber to be promoted */
2224 wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
2227 * Create the subscription for each database on subscriber. It does not
2228 * enable it immediately because it needs to adjust the replication start
2229 * point to the LSN reported by setup_publisher(). It also cleans up
2230 * publications created by this tool and replication to the standby.
2232 setup_subscriber(dbinfo, consistent_lsn);
2234 /* Remove primary_slot_name if it exists on primary */
2235 drop_primary_replication_slot(dbinfo, primary_slot_name);
2237 /* Remove failover replication slots if they exist on subscriber */
2238 drop_failover_replication_slots(dbinfo);
2240 /* Stop the subscriber */
2241 pg_log_info("stopping the subscriber");
2242 stop_standby_server(subscriber_dir);
2244 /* Change system identifier from subscriber */
2245 modify_subscriber_sysid(&opt);
2247 success = true;
2249 pg_log_info("Done!");
2251 return 0;