1 /*-------------------------------------------------------------------------
3 * pg_createsubscriber.c
4 * Create a new logical replica from a standby server
6 * Copyright (c) 2024-2025, PostgreSQL Global Development Group
9 * src/bin/pg_basebackup/pg_createsubscriber.c
11 *-------------------------------------------------------------------------
14 #include "postgres_fe.h"
21 #include "common/connect.h"
22 #include "common/controldata_utils.h"
23 #include "common/logging.h"
24 #include "common/pg_prng.h"
25 #include "common/restricted_token.h"
26 #include "fe_utils/recovery_gen.h"
27 #include "fe_utils/simple_list.h"
28 #include "fe_utils/string_utils.h"
29 #include "getopt_long.h"
31 #define DEFAULT_SUB_PORT "50432"
33 /* Command-line options */
34 struct CreateSubscriberOptions
36 char *config_file
; /* configuration file */
37 char *pub_conninfo_str
; /* publisher connection string */
38 char *socket_dir
; /* directory for Unix-domain socket, if any */
39 char *sub_port
; /* subscriber port number */
40 const char *sub_username
; /* subscriber username */
41 SimpleStringList database_names
; /* list of database names */
42 SimpleStringList pub_names
; /* list of publication names */
43 SimpleStringList sub_names
; /* list of subscription names */
44 SimpleStringList replslot_names
; /* list of replication slot names */
45 int recovery_timeout
; /* stop recovery after this time */
50 char *dbname
; /* database name */
51 char *pubconninfo
; /* publisher connection string */
52 char *subconninfo
; /* subscriber connection string */
53 char *pubname
; /* publication name */
54 char *subname
; /* subscription name */
55 char *replslotname
; /* replication slot name */
57 bool made_replslot
; /* replication slot was created */
58 bool made_publication
; /* publication was created */
61 static void cleanup_objects_atexit(void);
63 static char *get_base_conninfo(const char *conninfo
, char **dbname
);
64 static char *get_sub_conninfo(const struct CreateSubscriberOptions
*opt
);
65 static char *get_exec_path(const char *argv0
, const char *progname
);
66 static void check_data_directory(const char *datadir
);
67 static char *concat_conninfo_dbname(const char *conninfo
, const char *dbname
);
68 static struct LogicalRepInfo
*store_pub_sub_info(const struct CreateSubscriberOptions
*opt
,
69 const char *pub_base_conninfo
,
70 const char *sub_base_conninfo
);
71 static PGconn
*connect_database(const char *conninfo
, bool exit_on_error
);
72 static void disconnect_database(PGconn
*conn
, bool exit_on_error
);
73 static uint64
get_primary_sysid(const char *conninfo
);
74 static uint64
get_standby_sysid(const char *datadir
);
75 static void modify_subscriber_sysid(const struct CreateSubscriberOptions
*opt
);
76 static bool server_is_in_recovery(PGconn
*conn
);
77 static char *generate_object_name(PGconn
*conn
);
78 static void check_publisher(const struct LogicalRepInfo
*dbinfo
);
79 static char *setup_publisher(struct LogicalRepInfo
*dbinfo
);
80 static void check_subscriber(const struct LogicalRepInfo
*dbinfo
);
81 static void setup_subscriber(struct LogicalRepInfo
*dbinfo
,
82 const char *consistent_lsn
);
83 static void setup_recovery(const struct LogicalRepInfo
*dbinfo
, const char *datadir
,
85 static void drop_primary_replication_slot(struct LogicalRepInfo
*dbinfo
,
86 const char *slotname
);
87 static void drop_failover_replication_slots(struct LogicalRepInfo
*dbinfo
);
88 static char *create_logical_replication_slot(PGconn
*conn
,
89 struct LogicalRepInfo
*dbinfo
);
90 static void drop_replication_slot(PGconn
*conn
, struct LogicalRepInfo
*dbinfo
,
91 const char *slot_name
);
92 static void pg_ctl_status(const char *pg_ctl_cmd
, int rc
);
93 static void start_standby_server(const struct CreateSubscriberOptions
*opt
,
94 bool restricted_access
,
95 bool restrict_logical_worker
);
96 static void stop_standby_server(const char *datadir
);
97 static void wait_for_end_recovery(const char *conninfo
,
98 const struct CreateSubscriberOptions
*opt
);
99 static void create_publication(PGconn
*conn
, struct LogicalRepInfo
*dbinfo
);
100 static void drop_publication(PGconn
*conn
, struct LogicalRepInfo
*dbinfo
);
101 static void create_subscription(PGconn
*conn
, const struct LogicalRepInfo
*dbinfo
);
102 static void set_replication_progress(PGconn
*conn
, const struct LogicalRepInfo
*dbinfo
,
104 static void enable_subscription(PGconn
*conn
, const struct LogicalRepInfo
*dbinfo
);
105 static void check_and_drop_existing_subscriptions(PGconn
*conn
,
106 const struct LogicalRepInfo
*dbinfo
);
107 static void drop_existing_subscriptions(PGconn
*conn
, const char *subname
,
110 #define USEC_PER_SEC 1000000
111 #define WAIT_INTERVAL 1 /* 1 second */
113 static const char *progname
;
115 static char *primary_slot_name
= NULL
;
116 static bool dry_run
= false;
118 static bool success
= false;
120 static struct LogicalRepInfo
*dbinfo
;
121 static int num_dbs
= 0; /* number of specified databases */
122 static int num_pubs
= 0; /* number of specified publications */
123 static int num_subs
= 0; /* number of specified subscriptions */
124 static int num_replslots
= 0; /* number of specified replication slots */
126 static pg_prng_state prng_state
;
128 static char *pg_ctl_path
= NULL
;
129 static char *pg_resetwal_path
= NULL
;
131 /* standby / subscriber data directory */
132 static char *subscriber_dir
= NULL
;
134 static bool recovery_ended
= false;
135 static bool standby_running
= false;
140 POSTMASTER_STILL_STARTING
145 * Cleanup objects that were created by pg_createsubscriber if there is an
148 * Publications and replication slots are created on primary. Depending on the
149 * step it failed, it should remove the already created objects if it is
150 * possible (sometimes it won't work due to a connection issue).
151 * There is no cleanup on the target server. The steps on the target server are
152 * executed *after* promotion, hence, at this point, a failure means recreate
153 * the physical replica and start again.
156 cleanup_objects_atexit(void)
162 * If the server is promoted, there is no way to use the current setup
163 * again. Warn the user that a new replication setup should be done before
168 pg_log_warning("failed after the end of recovery");
169 pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
170 "You must recreate the physical replica before continuing.");
173 for (int i
= 0; i
< num_dbs
; i
++)
175 if (dbinfo
[i
].made_publication
|| dbinfo
[i
].made_replslot
)
179 conn
= connect_database(dbinfo
[i
].pubconninfo
, false);
182 if (dbinfo
[i
].made_publication
)
183 drop_publication(conn
, &dbinfo
[i
]);
184 if (dbinfo
[i
].made_replslot
)
185 drop_replication_slot(conn
, &dbinfo
[i
], dbinfo
[i
].replslotname
);
186 disconnect_database(conn
, false);
191 * If a connection could not be established, inform the user
192 * that some objects were left on primary and should be
193 * removed before trying again.
195 if (dbinfo
[i
].made_publication
)
197 pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
198 dbinfo
[i
].pubname
, dbinfo
[i
].dbname
);
199 pg_log_warning_hint("Drop this publication before trying again.");
201 if (dbinfo
[i
].made_replslot
)
203 pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
204 dbinfo
[i
].replslotname
, dbinfo
[i
].dbname
);
205 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
212 stop_standby_server(subscriber_dir
);
218 printf(_("%s creates a new logical replica from a standby server.\n\n"),
220 printf(_("Usage:\n"));
221 printf(_(" %s [OPTION]...\n"), progname
);
222 printf(_("\nOptions:\n"));
223 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
224 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
225 printf(_(" -n, --dry-run dry run, just show what would be done\n"));
226 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT
);
227 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
228 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
229 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
230 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
231 printf(_(" -v, --verbose output verbose messages\n"));
232 printf(_(" --config-file=FILENAME use specified main server configuration\n"
233 " file when running target cluster\n"));
234 printf(_(" --publication=NAME publication name\n"));
235 printf(_(" --replication-slot=NAME replication slot name\n"));
236 printf(_(" --subscription=NAME subscription name\n"));
237 printf(_(" -V, --version output version information, then exit\n"));
238 printf(_(" -?, --help show this help, then exit\n"));
239 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT
);
240 printf(_("%s home page: <%s>\n"), PACKAGE_NAME
, PACKAGE_URL
);
244 * Subroutine to append "keyword=value" to a connection string,
245 * with proper quoting of the value. (We assume keywords don't need that.)
248 appendConnStrItem(PQExpBuffer buf
, const char *keyword
, const char *val
)
251 appendPQExpBufferChar(buf
, ' ');
252 appendPQExpBufferStr(buf
, keyword
);
253 appendPQExpBufferChar(buf
, '=');
254 appendConnStrVal(buf
, val
);
258 * Validate a connection string. Returns a base connection string that is a
259 * connection string without a database name.
261 * Since we might process multiple databases, each database name will be
262 * appended to this base connection string to provide a final connection
263 * string. If the second argument (dbname) is not null, returns dbname if the
264 * provided connection string contains it.
266 * It is the caller's responsibility to free the returned connection string and
270 get_base_conninfo(const char *conninfo
, char **dbname
)
273 PQconninfoOption
*conn_opts
;
274 PQconninfoOption
*conn_opt
;
278 conn_opts
= PQconninfoParse(conninfo
, &errmsg
);
279 if (conn_opts
== NULL
)
281 pg_log_error("could not parse connection string: %s", errmsg
);
286 buf
= createPQExpBuffer();
287 for (conn_opt
= conn_opts
; conn_opt
->keyword
!= NULL
; conn_opt
++)
289 if (conn_opt
->val
!= NULL
&& conn_opt
->val
[0] != '\0')
291 if (strcmp(conn_opt
->keyword
, "dbname") == 0)
294 *dbname
= pg_strdup(conn_opt
->val
);
297 appendConnStrItem(buf
, conn_opt
->keyword
, conn_opt
->val
);
301 ret
= pg_strdup(buf
->data
);
303 destroyPQExpBuffer(buf
);
304 PQconninfoFree(conn_opts
);
310 * Build a subscriber connection string. Only a few parameters are supported
311 * since it starts a server with restricted access.
314 get_sub_conninfo(const struct CreateSubscriberOptions
*opt
)
316 PQExpBuffer buf
= createPQExpBuffer();
319 appendConnStrItem(buf
, "port", opt
->sub_port
);
321 appendConnStrItem(buf
, "host", opt
->socket_dir
);
323 if (opt
->sub_username
!= NULL
)
324 appendConnStrItem(buf
, "user", opt
->sub_username
);
325 appendConnStrItem(buf
, "fallback_application_name", progname
);
327 ret
= pg_strdup(buf
->data
);
329 destroyPQExpBuffer(buf
);
335 * Verify if a PostgreSQL binary (progname) is available in the same directory as
336 * pg_createsubscriber and it has the same version. It returns the absolute
337 * path of the progname.
340 get_exec_path(const char *argv0
, const char *progname
)
346 versionstr
= psprintf("%s (PostgreSQL) %s\n", progname
, PG_VERSION
);
347 exec_path
= pg_malloc(MAXPGPATH
);
348 ret
= find_other_exec(argv0
, progname
, versionstr
, exec_path
);
352 char full_path
[MAXPGPATH
];
354 if (find_my_exec(argv0
, full_path
) < 0)
355 strlcpy(full_path
, progname
, sizeof(full_path
));
358 pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
359 progname
, "pg_createsubscriber", full_path
);
361 pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
362 progname
, full_path
, "pg_createsubscriber");
365 pg_log_debug("%s path is: %s", progname
, exec_path
);
371 * Is it a cluster directory? These are preliminary checks. It is far from
372 * making an accurate check. If it is not a clone from the publisher, it will
373 * eventually fail in a future step.
376 check_data_directory(const char *datadir
)
379 char versionfile
[MAXPGPATH
];
381 pg_log_info("checking if directory \"%s\" is a cluster data directory",
384 if (stat(datadir
, &statbuf
) != 0)
387 pg_fatal("data directory \"%s\" does not exist", datadir
);
389 pg_fatal("could not access directory \"%s\": %m", datadir
);
392 snprintf(versionfile
, MAXPGPATH
, "%s/PG_VERSION", datadir
);
393 if (stat(versionfile
, &statbuf
) != 0 && errno
== ENOENT
)
395 pg_fatal("directory \"%s\" is not a database cluster directory",
401 * Append database name into a base connection string.
403 * dbname is the only parameter that changes so it is not included in the base
404 * connection string. This function concatenates dbname to build a "real"
408 concat_conninfo_dbname(const char *conninfo
, const char *dbname
)
410 PQExpBuffer buf
= createPQExpBuffer();
413 Assert(conninfo
!= NULL
);
415 appendPQExpBufferStr(buf
, conninfo
);
416 appendConnStrItem(buf
, "dbname", dbname
);
418 ret
= pg_strdup(buf
->data
);
419 destroyPQExpBuffer(buf
);
425 * Store publication and subscription information.
427 * If publication, replication slot and subscription names were specified,
428 * store it here. Otherwise, a generated name will be assigned to the object in
431 static struct LogicalRepInfo
*
432 store_pub_sub_info(const struct CreateSubscriberOptions
*opt
,
433 const char *pub_base_conninfo
,
434 const char *sub_base_conninfo
)
436 struct LogicalRepInfo
*dbinfo
;
437 SimpleStringListCell
*pubcell
= NULL
;
438 SimpleStringListCell
*subcell
= NULL
;
439 SimpleStringListCell
*replslotcell
= NULL
;
442 dbinfo
= pg_malloc_array(struct LogicalRepInfo
, num_dbs
);
445 pubcell
= opt
->pub_names
.head
;
447 subcell
= opt
->sub_names
.head
;
448 if (num_replslots
> 0)
449 replslotcell
= opt
->replslot_names
.head
;
451 for (SimpleStringListCell
*cell
= opt
->database_names
.head
; cell
; cell
= cell
->next
)
455 /* Fill publisher attributes */
456 conninfo
= concat_conninfo_dbname(pub_base_conninfo
, cell
->val
);
457 dbinfo
[i
].pubconninfo
= conninfo
;
458 dbinfo
[i
].dbname
= cell
->val
;
460 dbinfo
[i
].pubname
= pubcell
->val
;
462 dbinfo
[i
].pubname
= NULL
;
463 if (num_replslots
> 0)
464 dbinfo
[i
].replslotname
= replslotcell
->val
;
466 dbinfo
[i
].replslotname
= NULL
;
467 dbinfo
[i
].made_replslot
= false;
468 dbinfo
[i
].made_publication
= false;
469 /* Fill subscriber attributes */
470 conninfo
= concat_conninfo_dbname(sub_base_conninfo
, cell
->val
);
471 dbinfo
[i
].subconninfo
= conninfo
;
473 dbinfo
[i
].subname
= subcell
->val
;
475 dbinfo
[i
].subname
= NULL
;
476 /* Other fields will be filled later */
478 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i
,
479 dbinfo
[i
].pubname
? dbinfo
[i
].pubname
: "(auto)",
480 dbinfo
[i
].replslotname
? dbinfo
[i
].replslotname
: "(auto)",
481 dbinfo
[i
].pubconninfo
);
482 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i
,
483 dbinfo
[i
].subname
? dbinfo
[i
].subname
: "(auto)",
484 dbinfo
[i
].subconninfo
);
487 pubcell
= pubcell
->next
;
489 subcell
= subcell
->next
;
490 if (num_replslots
> 0)
491 replslotcell
= replslotcell
->next
;
500 * Open a new connection. If exit_on_error is true, it has an undesired
501 * condition and it should exit immediately.
504 connect_database(const char *conninfo
, bool exit_on_error
)
509 conn
= PQconnectdb(conninfo
);
510 if (PQstatus(conn
) != CONNECTION_OK
)
512 pg_log_error("connection to database failed: %s",
513 PQerrorMessage(conn
));
521 /* Secure search_path */
522 res
= PQexec(conn
, ALWAYS_SECURE_SEARCH_PATH_SQL
);
523 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
525 pg_log_error("could not clear \"search_path\": %s",
526 PQresultErrorMessage(res
));
540 * Close the connection. If exit_on_error is true, it has an undesired
541 * condition and it should exit immediately.
544 disconnect_database(PGconn
*conn
, bool exit_on_error
)
546 Assert(conn
!= NULL
);
555 * Obtain the system identifier using the provided connection. It will be used
556 * to compare if a data directory is a clone of another one.
559 get_primary_sysid(const char *conninfo
)
565 pg_log_info("getting system identifier from publisher");
567 conn
= connect_database(conninfo
, true);
569 res
= PQexec(conn
, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
570 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
572 pg_log_error("could not get system identifier: %s",
573 PQresultErrorMessage(res
));
574 disconnect_database(conn
, true);
576 if (PQntuples(res
) != 1)
578 pg_log_error("could not get system identifier: got %d rows, expected %d row",
580 disconnect_database(conn
, true);
583 sysid
= strtou64(PQgetvalue(res
, 0, 0), NULL
, 10);
585 pg_log_info("system identifier is %llu on publisher",
586 (unsigned long long) sysid
);
589 disconnect_database(conn
, false);
595 * Obtain the system identifier from control file. It will be used to compare
596 * if a data directory is a clone of another one. This routine is used locally
597 * and avoids a connection.
600 get_standby_sysid(const char *datadir
)
606 pg_log_info("getting system identifier from subscriber");
608 cf
= get_controlfile(datadir
, &crc_ok
);
610 pg_fatal("control file appears to be corrupt");
612 sysid
= cf
->system_identifier
;
614 pg_log_info("system identifier is %llu on subscriber",
615 (unsigned long long) sysid
);
623 * Modify the system identifier. Since a standby server preserves the system
624 * identifier, it makes sense to change it to avoid situations in which WAL
625 * files from one of the systems might be used in the other one.
628 modify_subscriber_sysid(const struct CreateSubscriberOptions
*opt
)
636 pg_log_info("modifying system identifier of subscriber");
638 cf
= get_controlfile(subscriber_dir
, &crc_ok
);
640 pg_fatal("control file appears to be corrupt");
643 * Select a new system identifier.
645 * XXX this code was extracted from BootStrapXLOG().
647 gettimeofday(&tv
, NULL
);
648 cf
->system_identifier
= ((uint64
) tv
.tv_sec
) << 32;
649 cf
->system_identifier
|= ((uint64
) tv
.tv_usec
) << 12;
650 cf
->system_identifier
|= getpid() & 0xFFF;
653 update_controlfile(subscriber_dir
, cf
, true);
655 pg_log_info("system identifier is %llu on subscriber",
656 (unsigned long long) cf
->system_identifier
);
658 pg_log_info("running pg_resetwal on the subscriber");
660 cmd_str
= psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path
,
661 subscriber_dir
, DEVNULL
);
663 pg_log_debug("pg_resetwal command is: %s", cmd_str
);
667 int rc
= system(cmd_str
);
670 pg_log_info("subscriber successfully changed the system identifier");
672 pg_fatal("could not change system identifier of subscriber: %s", wait_result_to_str(rc
));
679 * Generate an object name using a prefix, database oid and a random integer.
680 * It is used in case the user does not specify an object name (publication,
681 * subscription, replication slot).
684 generate_object_name(PGconn
*conn
)
692 "SELECT oid FROM pg_catalog.pg_database "
693 "WHERE datname = pg_catalog.current_database()");
694 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
696 pg_log_error("could not obtain database OID: %s",
697 PQresultErrorMessage(res
));
698 disconnect_database(conn
, true);
701 if (PQntuples(res
) != 1)
703 pg_log_error("could not obtain database OID: got %d rows, expected %d row",
705 disconnect_database(conn
, true);
709 oid
= strtoul(PQgetvalue(res
, 0, 0), NULL
, 10);
713 /* Random unsigned integer */
714 rand
= pg_prng_uint32(&prng_state
);
717 * Build the object name. The name must not exceed NAMEDATALEN - 1. This
718 * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
721 objname
= psprintf("pg_createsubscriber_%u_%x", oid
, rand
);
727 * Create the publications and replication slots in preparation for logical
728 * replication. Returns the LSN from latest replication slot. It will be the
729 * replication start point that is used to adjust the subscriptions (see
730 * set_replication_progress).
733 setup_publisher(struct LogicalRepInfo
*dbinfo
)
737 pg_prng_seed(&prng_state
, (uint64
) (getpid() ^ time(NULL
)));
739 for (int i
= 0; i
< num_dbs
; i
++)
742 char *genname
= NULL
;
744 conn
= connect_database(dbinfo
[i
].pubconninfo
, true);
747 * If an object name was not specified as command-line options, assign
748 * a generated object name. The replication slot has a different rule.
749 * The subscription name is assigned to the replication slot name if
750 * no replication slot is specified. It follows the same rule as
751 * CREATE SUBSCRIPTION.
753 if (num_pubs
== 0 || num_subs
== 0 || num_replslots
== 0)
754 genname
= generate_object_name(conn
);
756 dbinfo
[i
].pubname
= pg_strdup(genname
);
758 dbinfo
[i
].subname
= pg_strdup(genname
);
759 if (num_replslots
== 0)
760 dbinfo
[i
].replslotname
= pg_strdup(dbinfo
[i
].subname
);
763 * Create publication on publisher. This step should be executed
764 * *before* promoting the subscriber to avoid any transactions between
765 * consistent LSN and the new publication rows (such transactions
766 * wouldn't see the new publication rows resulting in an error).
768 create_publication(conn
, &dbinfo
[i
]);
770 /* Create replication slot on publisher */
773 lsn
= create_logical_replication_slot(conn
, &dbinfo
[i
]);
774 if (lsn
!= NULL
|| dry_run
)
775 pg_log_info("create replication slot \"%s\" on publisher",
776 dbinfo
[i
].replslotname
);
781 * Since we are using the LSN returned by the last replication slot as
782 * recovery_target_lsn, this LSN is ahead of the current WAL position
783 * and the recovery waits until the publisher writes a WAL record to
784 * reach the target and ends the recovery. On idle systems, this wait
785 * time is unpredictable and could lead to failure in promoting the
786 * subscriber. To avoid that, insert a harmless WAL record.
788 if (i
== num_dbs
- 1 && !dry_run
)
792 res
= PQexec(conn
, "SELECT pg_log_standby_snapshot()");
793 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
795 pg_log_error("could not write an additional WAL record: %s",
796 PQresultErrorMessage(res
));
797 disconnect_database(conn
, true);
802 disconnect_database(conn
, false);
809 * Is recovery still in progress?
812 server_is_in_recovery(PGconn
*conn
)
817 res
= PQexec(conn
, "SELECT pg_catalog.pg_is_in_recovery()");
819 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
821 pg_log_error("could not obtain recovery progress: %s",
822 PQresultErrorMessage(res
));
823 disconnect_database(conn
, true);
827 ret
= strcmp("t", PQgetvalue(res
, 0, 0));
835 * Is the primary server ready for logical replication?
837 * XXX Does it not allow a synchronous replica?
840 check_publisher(const struct LogicalRepInfo
*dbinfo
)
851 int max_prepared_transactions
;
853 pg_log_info("checking settings on publisher");
855 conn
= connect_database(dbinfo
[0].pubconninfo
, true);
858 * If the primary server is in recovery (i.e. cascading replication),
859 * objects (publication) cannot be created because it is read only.
861 if (server_is_in_recovery(conn
))
863 pg_log_error("primary server cannot be in recovery");
864 disconnect_database(conn
, true);
867 /*------------------------------------------------------------------------
868 * Logical replication requires a few parameters to be set on publisher.
869 * Since these parameters are not a requirement for physical replication,
870 * we should check it to make sure it won't fail.
872 * - wal_level = logical
873 * - max_replication_slots >= current + number of dbs to be converted
874 * - max_wal_senders >= current + number of dbs to be converted
875 * -----------------------------------------------------------------------
878 "SELECT pg_catalog.current_setting('wal_level'),"
879 " pg_catalog.current_setting('max_replication_slots'),"
880 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
881 " pg_catalog.current_setting('max_wal_senders'),"
882 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
883 " pg_catalog.current_setting('max_prepared_transactions')");
885 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
887 pg_log_error("could not obtain publisher settings: %s",
888 PQresultErrorMessage(res
));
889 disconnect_database(conn
, true);
892 wal_level
= pg_strdup(PQgetvalue(res
, 0, 0));
893 max_repslots
= atoi(PQgetvalue(res
, 0, 1));
894 cur_repslots
= atoi(PQgetvalue(res
, 0, 2));
895 max_walsenders
= atoi(PQgetvalue(res
, 0, 3));
896 cur_walsenders
= atoi(PQgetvalue(res
, 0, 4));
897 max_prepared_transactions
= atoi(PQgetvalue(res
, 0, 5));
901 pg_log_debug("publisher: wal_level: %s", wal_level
);
902 pg_log_debug("publisher: max_replication_slots: %d", max_repslots
);
903 pg_log_debug("publisher: current replication slots: %d", cur_repslots
);
904 pg_log_debug("publisher: max_wal_senders: %d", max_walsenders
);
905 pg_log_debug("publisher: current wal senders: %d", cur_walsenders
);
906 pg_log_debug("publisher: max_prepared_transactions: %d",
907 max_prepared_transactions
);
909 disconnect_database(conn
, false);
911 if (strcmp(wal_level
, "logical") != 0)
913 pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
917 if (max_repslots
- cur_repslots
< num_dbs
)
919 pg_log_error("publisher requires %d replication slots, but only %d remain",
920 num_dbs
, max_repslots
- cur_repslots
);
921 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
922 "max_replication_slots", cur_repslots
+ num_dbs
);
926 if (max_walsenders
- cur_walsenders
< num_dbs
)
928 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
929 num_dbs
, max_walsenders
- cur_walsenders
);
930 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
931 "max_wal_senders", cur_walsenders
+ num_dbs
);
935 if (max_prepared_transactions
!= 0)
937 pg_log_warning("two_phase option will not be enabled for replication slots");
938 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
939 "Prepared transactions will be replicated at COMMIT PREPARED.");
949 * Is the standby server ready for logical replication?
951 * XXX Does it not allow a time-delayed replica?
953 * XXX In a cascaded replication scenario (P -> S -> C), if the target server
954 * is S, it cannot detect there is a replica (server C) because server S starts
955 * accepting only local connections and server C cannot connect to it. Hence,
956 * there is not a reliable way to provide a suitable error saying the server C
957 * will be broken at the end of this process (due to pg_resetwal).
960 check_subscriber(const struct LogicalRepInfo
*dbinfo
)
970 pg_log_info("checking settings on subscriber");
972 conn
= connect_database(dbinfo
[0].subconninfo
, true);
974 /* The target server must be a standby */
975 if (!server_is_in_recovery(conn
))
977 pg_log_error("target server must be a standby");
978 disconnect_database(conn
, true);
981 /*------------------------------------------------------------------------
982 * Logical replication requires a few parameters to be set on subscriber.
983 * Since these parameters are not a requirement for physical replication,
984 * we should check it to make sure it won't fail.
986 * - max_replication_slots >= number of dbs to be converted
987 * - max_logical_replication_workers >= number of dbs to be converted
988 * - max_worker_processes >= 1 + number of dbs to be converted
989 *------------------------------------------------------------------------
992 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
993 "'max_logical_replication_workers', "
994 "'max_replication_slots', "
995 "'max_worker_processes', "
996 "'primary_slot_name') "
999 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1001 pg_log_error("could not obtain subscriber settings: %s",
1002 PQresultErrorMessage(res
));
1003 disconnect_database(conn
, true);
1006 max_lrworkers
= atoi(PQgetvalue(res
, 0, 0));
1007 max_repslots
= atoi(PQgetvalue(res
, 1, 0));
1008 max_wprocs
= atoi(PQgetvalue(res
, 2, 0));
1009 if (strcmp(PQgetvalue(res
, 3, 0), "") != 0)
1010 primary_slot_name
= pg_strdup(PQgetvalue(res
, 3, 0));
1012 pg_log_debug("subscriber: max_logical_replication_workers: %d",
1014 pg_log_debug("subscriber: max_replication_slots: %d", max_repslots
);
1015 pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs
);
1016 if (primary_slot_name
)
1017 pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name
);
1021 disconnect_database(conn
, false);
1023 if (max_repslots
< num_dbs
)
1025 pg_log_error("subscriber requires %d replication slots, but only %d remain",
1026 num_dbs
, max_repslots
);
1027 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1028 "max_replication_slots", num_dbs
);
1032 if (max_lrworkers
< num_dbs
)
1034 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1035 num_dbs
, max_lrworkers
);
1036 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1037 "max_logical_replication_workers", num_dbs
);
1041 if (max_wprocs
< num_dbs
+ 1)
1043 pg_log_error("subscriber requires %d worker processes, but only %d remain",
1044 num_dbs
+ 1, max_wprocs
);
1045 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1046 "max_worker_processes", num_dbs
+ 1);
1055 * Drop a specified subscription. This is to avoid duplicate subscriptions on
1056 * the primary (publisher node) and the newly created subscriber. We
1057 * shouldn't drop the associated slot as that would be used by the publisher
1061 drop_existing_subscriptions(PGconn
*conn
, const char *subname
, const char *dbname
)
1063 PQExpBuffer query
= createPQExpBuffer();
1066 Assert(conn
!= NULL
);
1069 * Construct a query string. These commands are allowed to be executed
1070 * within a transaction.
1072 appendPQExpBuffer(query
, "ALTER SUBSCRIPTION %s DISABLE;",
1074 appendPQExpBuffer(query
, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1076 appendPQExpBuffer(query
, " DROP SUBSCRIPTION %s;", subname
);
1078 pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1083 res
= PQexec(conn
, query
->data
);
1085 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1087 pg_log_error("could not drop subscription \"%s\": %s",
1088 subname
, PQresultErrorMessage(res
));
1089 disconnect_database(conn
, true);
1095 destroyPQExpBuffer(query
);
1099 * Retrieve and drop the pre-existing subscriptions.
1102 check_and_drop_existing_subscriptions(PGconn
*conn
,
1103 const struct LogicalRepInfo
*dbinfo
)
1105 PQExpBuffer query
= createPQExpBuffer();
1109 Assert(conn
!= NULL
);
1111 dbname
= PQescapeLiteral(conn
, dbinfo
->dbname
, strlen(dbinfo
->dbname
));
1113 appendPQExpBuffer(query
,
1114 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1115 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1116 "WHERE d.datname = %s",
1118 res
= PQexec(conn
, query
->data
);
1120 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1122 pg_log_error("could not obtain pre-existing subscriptions: %s",
1123 PQresultErrorMessage(res
));
1124 disconnect_database(conn
, true);
1127 for (int i
= 0; i
< PQntuples(res
); i
++)
1128 drop_existing_subscriptions(conn
, PQgetvalue(res
, i
, 0),
1132 destroyPQExpBuffer(query
);
1136 * Create the subscriptions, adjust the initial location for logical
1137 * replication and enable the subscriptions. That's the last step for logical
1138 * replication setup.
1141 setup_subscriber(struct LogicalRepInfo
*dbinfo
, const char *consistent_lsn
)
1143 for (int i
= 0; i
< num_dbs
; i
++)
1147 /* Connect to subscriber. */
1148 conn
= connect_database(dbinfo
[i
].subconninfo
, true);
1151 * We don't need the pre-existing subscriptions on the newly formed
1152 * subscriber. They can connect to other publisher nodes and either
1153 * get some unwarranted data or can lead to ERRORs in connecting to
1156 check_and_drop_existing_subscriptions(conn
, &dbinfo
[i
]);
1159 * Since the publication was created before the consistent LSN, it is
1160 * available on the subscriber when the physical replica is promoted.
1161 * Remove publications from the subscriber because it has no use.
1163 drop_publication(conn
, &dbinfo
[i
]);
1165 create_subscription(conn
, &dbinfo
[i
]);
1167 /* Set the replication progress to the correct LSN */
1168 set_replication_progress(conn
, &dbinfo
[i
], consistent_lsn
);
1170 /* Enable subscription */
1171 enable_subscription(conn
, &dbinfo
[i
]);
1173 disconnect_database(conn
, false);
1178 * Write the required recovery parameters.
1181 setup_recovery(const struct LogicalRepInfo
*dbinfo
, const char *datadir
, const char *lsn
)
1184 PQExpBuffer recoveryconfcontents
;
1187 * Despite of the recovery parameters will be written to the subscriber,
1188 * use a publisher connection. The primary_conninfo is generated using the
1189 * connection settings.
1191 conn
= connect_database(dbinfo
[0].pubconninfo
, true);
1194 * Write recovery parameters.
1196 * The subscriber is not running yet. In dry run mode, the recovery
1197 * parameters *won't* be written. An invalid LSN is used for printing
1198 * purposes. Additional recovery parameters are added here. It avoids
1199 * unexpected behavior such as end of recovery as soon as a consistent
1200 * state is reached (recovery_target) and failure due to multiple recovery
1201 * targets (name, time, xid, LSN).
1203 recoveryconfcontents
= GenerateRecoveryConfig(conn
, NULL
, NULL
);
1204 appendPQExpBuffer(recoveryconfcontents
, "recovery_target = ''\n");
1205 appendPQExpBuffer(recoveryconfcontents
,
1206 "recovery_target_timeline = 'latest'\n");
1207 appendPQExpBuffer(recoveryconfcontents
,
1208 "recovery_target_inclusive = true\n");
1209 appendPQExpBuffer(recoveryconfcontents
,
1210 "recovery_target_action = promote\n");
1211 appendPQExpBuffer(recoveryconfcontents
, "recovery_target_name = ''\n");
1212 appendPQExpBuffer(recoveryconfcontents
, "recovery_target_time = ''\n");
1213 appendPQExpBuffer(recoveryconfcontents
, "recovery_target_xid = ''\n");
1217 appendPQExpBuffer(recoveryconfcontents
, "# dry run mode");
1218 appendPQExpBuffer(recoveryconfcontents
,
1219 "recovery_target_lsn = '%X/%X'\n",
1220 LSN_FORMAT_ARGS((XLogRecPtr
) InvalidXLogRecPtr
));
1224 appendPQExpBuffer(recoveryconfcontents
, "recovery_target_lsn = '%s'\n",
1226 WriteRecoveryConfig(conn
, datadir
, recoveryconfcontents
);
1228 disconnect_database(conn
, false);
1230 pg_log_debug("recovery parameters:\n%s", recoveryconfcontents
->data
);
1234 * Drop physical replication slot on primary if the standby was using it. After
1235 * the transformation, it has no use.
1237 * XXX we might not fail here. Instead, we provide a warning so the user
1238 * eventually drops this replication slot later.
1241 drop_primary_replication_slot(struct LogicalRepInfo
*dbinfo
, const char *slotname
)
1245 /* Replication slot does not exist, do nothing */
1246 if (!primary_slot_name
)
1249 conn
= connect_database(dbinfo
[0].pubconninfo
, false);
1252 drop_replication_slot(conn
, &dbinfo
[0], slotname
);
1253 disconnect_database(conn
, false);
1257 pg_log_warning("could not drop replication slot \"%s\" on primary",
1259 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1264 * Drop failover replication slots on subscriber. After the transformation,
1267 * XXX We do not fail here. Instead, we provide a warning so the user can drop
1271 drop_failover_replication_slots(struct LogicalRepInfo
*dbinfo
)
1276 conn
= connect_database(dbinfo
[0].subconninfo
, false);
1279 /* Get failover replication slot names */
1281 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1283 if (PQresultStatus(res
) == PGRES_TUPLES_OK
)
1285 /* Remove failover replication slots from subscriber */
1286 for (int i
= 0; i
< PQntuples(res
); i
++)
1287 drop_replication_slot(conn
, &dbinfo
[0], PQgetvalue(res
, i
, 0));
1291 pg_log_warning("could not obtain failover replication slot information: %s",
1292 PQresultErrorMessage(res
));
1293 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1297 disconnect_database(conn
, false);
1301 pg_log_warning("could not drop failover replication slot");
1302 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1307 * Create a logical replication slot and returns a LSN.
1309 * CreateReplicationSlot() is not used because it does not provide the one-row
1310 * result set that contains the LSN.
1313 create_logical_replication_slot(PGconn
*conn
, struct LogicalRepInfo
*dbinfo
)
1315 PQExpBuffer str
= createPQExpBuffer();
1316 PGresult
*res
= NULL
;
1317 const char *slot_name
= dbinfo
->replslotname
;
1318 char *slot_name_esc
;
1321 Assert(conn
!= NULL
);
1323 pg_log_info("creating the replication slot \"%s\" in database \"%s\"",
1324 slot_name
, dbinfo
->dbname
);
1326 slot_name_esc
= PQescapeLiteral(conn
, slot_name
, strlen(slot_name
));
1328 appendPQExpBuffer(str
,
1329 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
1332 pg_free(slot_name_esc
);
1334 pg_log_debug("command is: %s", str
->data
);
1338 res
= PQexec(conn
, str
->data
);
1339 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1341 pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1342 slot_name
, dbinfo
->dbname
,
1343 PQresultErrorMessage(res
));
1345 destroyPQExpBuffer(str
);
1349 lsn
= pg_strdup(PQgetvalue(res
, 0, 0));
1353 /* For cleanup purposes */
1354 dbinfo
->made_replslot
= true;
1356 destroyPQExpBuffer(str
);
1362 drop_replication_slot(PGconn
*conn
, struct LogicalRepInfo
*dbinfo
,
1363 const char *slot_name
)
1365 PQExpBuffer str
= createPQExpBuffer();
1366 char *slot_name_esc
;
1369 Assert(conn
!= NULL
);
1371 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1372 slot_name
, dbinfo
->dbname
);
1374 slot_name_esc
= PQescapeLiteral(conn
, slot_name
, strlen(slot_name
));
1376 appendPQExpBuffer(str
, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc
);
1378 pg_free(slot_name_esc
);
1380 pg_log_debug("command is: %s", str
->data
);
1384 res
= PQexec(conn
, str
->data
);
1385 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1387 pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1388 slot_name
, dbinfo
->dbname
, PQresultErrorMessage(res
));
1389 dbinfo
->made_replslot
= false; /* don't try again. */
1395 destroyPQExpBuffer(str
);
1399 * Reports a suitable message if pg_ctl fails.
1402 pg_ctl_status(const char *pg_ctl_cmd
, int rc
)
1408 pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc
));
1410 else if (WIFSIGNALED(rc
))
1413 pg_log_error("pg_ctl was terminated by exception 0x%X",
1415 pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1417 pg_log_error("pg_ctl was terminated by signal %d: %s",
1418 WTERMSIG(rc
), pg_strsignal(WTERMSIG(rc
)));
1423 pg_log_error("pg_ctl exited with unrecognized status %d", rc
);
1426 pg_log_error_detail("The failed command was: %s", pg_ctl_cmd
);
1432 start_standby_server(const struct CreateSubscriberOptions
*opt
, bool restricted_access
,
1433 bool restrict_logical_worker
)
1435 PQExpBuffer pg_ctl_cmd
= createPQExpBuffer();
1438 appendPQExpBuffer(pg_ctl_cmd
, "\"%s\" start -D ", pg_ctl_path
);
1439 appendShellString(pg_ctl_cmd
, subscriber_dir
);
1440 appendPQExpBuffer(pg_ctl_cmd
, " -s -o \"-c sync_replication_slots=off\"");
1441 if (restricted_access
)
1443 appendPQExpBuffer(pg_ctl_cmd
, " -o \"-p %s\"", opt
->sub_port
);
1447 * An empty listen_addresses list means the server does not listen on
1448 * any IP interfaces; only Unix-domain sockets can be used to connect
1449 * to the server. Prevent external connections to minimize the chance
1452 appendPQExpBufferStr(pg_ctl_cmd
, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1453 if (opt
->socket_dir
)
1454 appendPQExpBuffer(pg_ctl_cmd
, " -c unix_socket_directories='%s'",
1456 appendPQExpBufferChar(pg_ctl_cmd
, '"');
1459 if (opt
->config_file
!= NULL
)
1460 appendPQExpBuffer(pg_ctl_cmd
, " -o \"-c config_file=%s\"",
1463 /* Suppress to start logical replication if requested */
1464 if (restrict_logical_worker
)
1465 appendPQExpBuffer(pg_ctl_cmd
, " -o \"-c max_logical_replication_workers=0\"");
1467 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd
->data
);
1468 rc
= system(pg_ctl_cmd
->data
);
1469 pg_ctl_status(pg_ctl_cmd
->data
, rc
);
1470 standby_running
= true;
1471 destroyPQExpBuffer(pg_ctl_cmd
);
1472 pg_log_info("server was started");
1476 stop_standby_server(const char *datadir
)
1481 pg_ctl_cmd
= psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path
,
1483 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd
);
1484 rc
= system(pg_ctl_cmd
);
1485 pg_ctl_status(pg_ctl_cmd
, rc
);
1486 standby_running
= false;
1487 pg_log_info("server was stopped");
1491 * Returns after the server finishes the recovery process.
1493 * If recovery_timeout option is set, terminate abnormally without finishing
1494 * the recovery process. By default, it waits forever.
1496 * XXX Is the recovery process still in progress? When recovery process has a
1497 * better progress reporting mechanism, it should be added here.
1500 wait_for_end_recovery(const char *conninfo
, const struct CreateSubscriberOptions
*opt
)
1503 int status
= POSTMASTER_STILL_STARTING
;
1506 pg_log_info("waiting for the target server to reach the consistent state");
1508 conn
= connect_database(conninfo
, true);
1512 bool in_recovery
= server_is_in_recovery(conn
);
1515 * Does the recovery process finish? In dry run mode, there is no
1516 * recovery mode. Bail out as the recovery process has ended.
1518 if (!in_recovery
|| dry_run
)
1520 status
= POSTMASTER_READY
;
1521 recovery_ended
= true;
1525 /* Bail out after recovery_timeout seconds if this option is set */
1526 if (opt
->recovery_timeout
> 0 && timer
>= opt
->recovery_timeout
)
1528 stop_standby_server(subscriber_dir
);
1529 pg_log_error("recovery timed out");
1530 disconnect_database(conn
, true);
1534 pg_usleep(WAIT_INTERVAL
* USEC_PER_SEC
);
1536 timer
+= WAIT_INTERVAL
;
1539 disconnect_database(conn
, false);
1541 if (status
== POSTMASTER_STILL_STARTING
)
1542 pg_fatal("server did not end recovery");
1544 pg_log_info("target server reached the consistent state");
1545 pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1549 * Create a publication that includes all tables in the database.
1552 create_publication(PGconn
*conn
, struct LogicalRepInfo
*dbinfo
)
1554 PQExpBuffer str
= createPQExpBuffer();
1559 Assert(conn
!= NULL
);
1561 ipubname_esc
= PQescapeIdentifier(conn
, dbinfo
->pubname
, strlen(dbinfo
->pubname
));
1562 spubname_esc
= PQescapeLiteral(conn
, dbinfo
->pubname
, strlen(dbinfo
->pubname
));
1564 /* Check if the publication already exists */
1565 appendPQExpBuffer(str
,
1566 "SELECT 1 FROM pg_catalog.pg_publication "
1567 "WHERE pubname = %s",
1569 res
= PQexec(conn
, str
->data
);
1570 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1572 pg_log_error("could not obtain publication information: %s",
1573 PQresultErrorMessage(res
));
1574 disconnect_database(conn
, true);
1577 if (PQntuples(res
) == 1)
1580 * Unfortunately, if it reaches this code path, it will always fail
1581 * (unless you decide to change the existing publication name). That's
1582 * bad but it is very unlikely that the user will choose a name with
1583 * pg_createsubscriber_ prefix followed by the exact database oid and
1586 pg_log_error("publication \"%s\" already exists", dbinfo
->pubname
);
1587 pg_log_error_hint("Consider renaming this publication before continuing.");
1588 disconnect_database(conn
, true);
1592 resetPQExpBuffer(str
);
1594 pg_log_info("creating publication \"%s\" in database \"%s\"",
1595 dbinfo
->pubname
, dbinfo
->dbname
);
1597 appendPQExpBuffer(str
, "CREATE PUBLICATION %s FOR ALL TABLES",
1600 pg_log_debug("command is: %s", str
->data
);
1604 res
= PQexec(conn
, str
->data
);
1605 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1607 pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1608 dbinfo
->pubname
, dbinfo
->dbname
, PQresultErrorMessage(res
));
1609 disconnect_database(conn
, true);
1614 /* For cleanup purposes */
1615 dbinfo
->made_publication
= true;
1617 pg_free(ipubname_esc
);
1618 pg_free(spubname_esc
);
1619 destroyPQExpBuffer(str
);
1623 * Remove publication if it couldn't finish all steps.
1626 drop_publication(PGconn
*conn
, struct LogicalRepInfo
*dbinfo
)
1628 PQExpBuffer str
= createPQExpBuffer();
1632 Assert(conn
!= NULL
);
1634 pubname_esc
= PQescapeIdentifier(conn
, dbinfo
->pubname
, strlen(dbinfo
->pubname
));
1636 pg_log_info("dropping publication \"%s\" in database \"%s\"",
1637 dbinfo
->pubname
, dbinfo
->dbname
);
1639 appendPQExpBuffer(str
, "DROP PUBLICATION %s", pubname_esc
);
1641 pg_free(pubname_esc
);
1643 pg_log_debug("command is: %s", str
->data
);
1647 res
= PQexec(conn
, str
->data
);
1648 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1650 pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1651 dbinfo
->pubname
, dbinfo
->dbname
, PQresultErrorMessage(res
));
1652 dbinfo
->made_publication
= false; /* don't try again. */
1655 * Don't disconnect and exit here. This routine is used by primary
1656 * (cleanup publication / replication slot due to an error) and
1657 * subscriber (remove the replicated publications). In both cases,
1658 * it can continue and provide instructions for the user to remove
1659 * it later if cleanup fails.
1665 destroyPQExpBuffer(str
);
1669 * Create a subscription with some predefined options.
1671 * A replication slot was already created in a previous step. Let's use it. It
1672 * is not required to copy data. The subscription will be created but it will
1673 * not be enabled now. That's because the replication progress must be set and
1674 * the replication origin name (one of the function arguments) contains the
1675 * subscription OID in its name. Once the subscription is created,
1676 * set_replication_progress() can obtain the chosen origin name and set up its
1680 create_subscription(PGconn
*conn
, const struct LogicalRepInfo
*dbinfo
)
1682 PQExpBuffer str
= createPQExpBuffer();
1686 char *pubconninfo_esc
;
1687 char *replslotname_esc
;
1689 Assert(conn
!= NULL
);
1691 pubname_esc
= PQescapeIdentifier(conn
, dbinfo
->pubname
, strlen(dbinfo
->pubname
));
1692 subname_esc
= PQescapeIdentifier(conn
, dbinfo
->subname
, strlen(dbinfo
->subname
));
1693 pubconninfo_esc
= PQescapeLiteral(conn
, dbinfo
->pubconninfo
, strlen(dbinfo
->pubconninfo
));
1694 replslotname_esc
= PQescapeLiteral(conn
, dbinfo
->replslotname
, strlen(dbinfo
->replslotname
));
1696 pg_log_info("creating subscription \"%s\" in database \"%s\"",
1697 dbinfo
->subname
, dbinfo
->dbname
);
1699 appendPQExpBuffer(str
,
1700 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1701 "WITH (create_slot = false, enabled = false, "
1702 "slot_name = %s, copy_data = false)",
1703 subname_esc
, pubconninfo_esc
, pubname_esc
, replslotname_esc
);
1705 pg_free(pubname_esc
);
1706 pg_free(subname_esc
);
1707 pg_free(pubconninfo_esc
);
1708 pg_free(replslotname_esc
);
1710 pg_log_debug("command is: %s", str
->data
);
1714 res
= PQexec(conn
, str
->data
);
1715 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1717 pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1718 dbinfo
->subname
, dbinfo
->dbname
, PQresultErrorMessage(res
));
1719 disconnect_database(conn
, true);
1724 destroyPQExpBuffer(str
);
1728 * Sets the replication progress to the consistent LSN.
1730 * The subscriber caught up to the consistent LSN provided by the last
1731 * replication slot that was created. The goal is to set up the initial
1732 * location for the logical replication that is the exact LSN that the
1733 * subscriber was promoted. Once the subscription is enabled it will start
1734 * streaming from that location onwards. In dry run mode, the subscription OID
1735 * and LSN are set to invalid values for printing purposes.
1738 set_replication_progress(PGconn
*conn
, const struct LogicalRepInfo
*dbinfo
, const char *lsn
)
1740 PQExpBuffer str
= createPQExpBuffer();
1748 Assert(conn
!= NULL
);
1750 subname
= PQescapeLiteral(conn
, dbinfo
->subname
, strlen(dbinfo
->subname
));
1751 dbname
= PQescapeLiteral(conn
, dbinfo
->dbname
, strlen(dbinfo
->dbname
));
1753 appendPQExpBuffer(str
,
1754 "SELECT s.oid FROM pg_catalog.pg_subscription s "
1755 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1756 "WHERE s.subname = %s AND d.datname = %s",
1759 res
= PQexec(conn
, str
->data
);
1760 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1762 pg_log_error("could not obtain subscription OID: %s",
1763 PQresultErrorMessage(res
));
1764 disconnect_database(conn
, true);
1767 if (PQntuples(res
) != 1 && !dry_run
)
1769 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1771 disconnect_database(conn
, true);
1776 suboid
= InvalidOid
;
1777 lsnstr
= psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr
) InvalidXLogRecPtr
));
1781 suboid
= strtoul(PQgetvalue(res
, 0, 0), NULL
, 10);
1782 lsnstr
= psprintf("%s", lsn
);
1788 * The origin name is defined as pg_%u. %u is the subscription OID. See
1789 * ApplyWorkerMain().
1791 originname
= psprintf("pg_%u", suboid
);
1793 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1794 originname
, lsnstr
, dbinfo
->dbname
);
1796 resetPQExpBuffer(str
);
1797 appendPQExpBuffer(str
,
1798 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1799 originname
, lsnstr
);
1801 pg_log_debug("command is: %s", str
->data
);
1805 res
= PQexec(conn
, str
->data
);
1806 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1808 pg_log_error("could not set replication progress for subscription \"%s\": %s",
1809 dbinfo
->subname
, PQresultErrorMessage(res
));
1810 disconnect_database(conn
, true);
1817 pg_free(originname
);
1819 destroyPQExpBuffer(str
);
1823 * Enables the subscription.
1825 * The subscription was created in a previous step but it was disabled. After
1826 * adjusting the initial logical replication location, enable the subscription.
1829 enable_subscription(PGconn
*conn
, const struct LogicalRepInfo
*dbinfo
)
1831 PQExpBuffer str
= createPQExpBuffer();
1835 Assert(conn
!= NULL
);
1837 subname
= PQescapeIdentifier(conn
, dbinfo
->subname
, strlen(dbinfo
->subname
));
1839 pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1840 dbinfo
->subname
, dbinfo
->dbname
);
1842 appendPQExpBuffer(str
, "ALTER SUBSCRIPTION %s ENABLE", subname
);
1844 pg_log_debug("command is: %s", str
->data
);
1848 res
= PQexec(conn
, str
->data
);
1849 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1851 pg_log_error("could not enable subscription \"%s\": %s",
1852 dbinfo
->subname
, PQresultErrorMessage(res
));
1853 disconnect_database(conn
, true);
1860 destroyPQExpBuffer(str
);
1864 main(int argc
, char **argv
)
1866 static struct option long_options
[] =
1868 {"database", required_argument
, NULL
, 'd'},
1869 {"pgdata", required_argument
, NULL
, 'D'},
1870 {"dry-run", no_argument
, NULL
, 'n'},
1871 {"subscriber-port", required_argument
, NULL
, 'p'},
1872 {"publisher-server", required_argument
, NULL
, 'P'},
1873 {"socketdir", required_argument
, NULL
, 's'},
1874 {"recovery-timeout", required_argument
, NULL
, 't'},
1875 {"subscriber-username", required_argument
, NULL
, 'U'},
1876 {"verbose", no_argument
, NULL
, 'v'},
1877 {"version", no_argument
, NULL
, 'V'},
1878 {"help", no_argument
, NULL
, '?'},
1879 {"config-file", required_argument
, NULL
, 1},
1880 {"publication", required_argument
, NULL
, 2},
1881 {"replication-slot", required_argument
, NULL
, 3},
1882 {"subscription", required_argument
, NULL
, 4},
1886 struct CreateSubscriberOptions opt
= {0};
1891 char *pub_base_conninfo
;
1892 char *sub_base_conninfo
;
1893 char *dbname_conninfo
= NULL
;
1897 struct stat statbuf
;
1899 char *consistent_lsn
;
1901 char pidfile
[MAXPGPATH
];
1903 pg_logging_init(argv
[0]);
1904 pg_logging_set_level(PG_LOG_WARNING
);
1905 progname
= get_progname(argv
[0]);
1906 set_pglocale_pgservice(argv
[0], PG_TEXTDOMAIN("pg_basebackup"));
1910 if (strcmp(argv
[1], "--help") == 0 || strcmp(argv
[1], "-?") == 0)
1915 else if (strcmp(argv
[1], "-V") == 0
1916 || strcmp(argv
[1], "--version") == 0)
1918 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION
);
1923 /* Default settings */
1924 subscriber_dir
= NULL
;
1925 opt
.config_file
= NULL
;
1926 opt
.pub_conninfo_str
= NULL
;
1927 opt
.socket_dir
= NULL
;
1928 opt
.sub_port
= DEFAULT_SUB_PORT
;
1929 opt
.sub_username
= NULL
;
1930 opt
.database_names
= (SimpleStringList
)
1934 opt
.recovery_timeout
= 0;
1937 * Don't allow it to be run as root. It uses pg_ctl which does not allow
1943 pg_log_error("cannot be executed by \"root\"");
1944 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
1950 get_restricted_token();
1952 while ((c
= getopt_long(argc
, argv
, "d:D:np:P:s:t:U:v",
1953 long_options
, &option_index
)) != -1)
1958 if (!simple_string_list_member(&opt
.database_names
, optarg
))
1960 simple_string_list_append(&opt
.database_names
, optarg
);
1965 pg_log_error("database \"%s\" specified more than once", optarg
);
1970 subscriber_dir
= pg_strdup(optarg
);
1971 canonicalize_path(subscriber_dir
);
1977 opt
.sub_port
= pg_strdup(optarg
);
1980 opt
.pub_conninfo_str
= pg_strdup(optarg
);
1983 opt
.socket_dir
= pg_strdup(optarg
);
1984 canonicalize_path(opt
.socket_dir
);
1987 opt
.recovery_timeout
= atoi(optarg
);
1990 opt
.sub_username
= pg_strdup(optarg
);
1993 pg_logging_increase_verbosity();
1996 opt
.config_file
= pg_strdup(optarg
);
1999 if (!simple_string_list_member(&opt
.pub_names
, optarg
))
2001 simple_string_list_append(&opt
.pub_names
, optarg
);
2006 pg_log_error("publication \"%s\" specified more than once", optarg
);
2011 if (!simple_string_list_member(&opt
.replslot_names
, optarg
))
2013 simple_string_list_append(&opt
.replslot_names
, optarg
);
2018 pg_log_error("replication slot \"%s\" specified more than once", optarg
);
2023 if (!simple_string_list_member(&opt
.sub_names
, optarg
))
2025 simple_string_list_append(&opt
.sub_names
, optarg
);
2030 pg_log_error("subscription \"%s\" specified more than once", optarg
);
2035 /* getopt_long already emitted a complaint */
2036 pg_log_error_hint("Try \"%s --help\" for more information.", progname
);
2041 /* Any non-option arguments? */
2044 pg_log_error("too many command-line arguments (first is \"%s\")",
2046 pg_log_error_hint("Try \"%s --help\" for more information.", progname
);
2050 /* Required arguments */
2051 if (subscriber_dir
== NULL
)
2053 pg_log_error("no subscriber data directory specified");
2054 pg_log_error_hint("Try \"%s --help\" for more information.", progname
);
2058 /* If socket directory is not provided, use the current directory */
2059 if (opt
.socket_dir
== NULL
)
2061 char cwd
[MAXPGPATH
];
2063 if (!getcwd(cwd
, MAXPGPATH
))
2064 pg_fatal("could not determine current directory");
2065 opt
.socket_dir
= pg_strdup(cwd
);
2066 canonicalize_path(opt
.socket_dir
);
2070 * Parse connection string. Build a base connection string that might be
2071 * reused by multiple databases.
2073 if (opt
.pub_conninfo_str
== NULL
)
2076 * TODO use primary_conninfo (if available) from subscriber and
2077 * extract publisher connection string. Assume that there are
2078 * identical entries for physical and logical replication. If there is
2079 * not, we would fail anyway.
2081 pg_log_error("no publisher connection string specified");
2082 pg_log_error_hint("Try \"%s --help\" for more information.", progname
);
2085 pg_log_info("validating publisher connection string");
2086 pub_base_conninfo
= get_base_conninfo(opt
.pub_conninfo_str
,
2088 if (pub_base_conninfo
== NULL
)
2091 pg_log_info("validating subscriber connection string");
2092 sub_base_conninfo
= get_sub_conninfo(&opt
);
2094 if (opt
.database_names
.head
== NULL
)
2096 pg_log_info("no database was specified");
2099 * If --database option is not provided, try to obtain the dbname from
2100 * the publisher conninfo. If dbname parameter is not available, error
2103 if (dbname_conninfo
)
2105 simple_string_list_append(&opt
.database_names
, dbname_conninfo
);
2108 pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2113 pg_log_error("no database name specified");
2114 pg_log_error_hint("Try \"%s --help\" for more information.",
2120 /* Number of object names must match number of databases */
2121 if (num_pubs
> 0 && num_pubs
!= num_dbs
)
2123 pg_log_error("wrong number of publication names specified");
2124 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2128 if (num_subs
> 0 && num_subs
!= num_dbs
)
2130 pg_log_error("wrong number of subscription names specified");
2131 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2135 if (num_replslots
> 0 && num_replslots
!= num_dbs
)
2137 pg_log_error("wrong number of replication slot names specified");
2138 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2139 num_replslots
, num_dbs
);
2143 /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2144 pg_ctl_path
= get_exec_path(argv
[0], "pg_ctl");
2145 pg_resetwal_path
= get_exec_path(argv
[0], "pg_resetwal");
2147 /* Rudimentary check for a data directory */
2148 check_data_directory(subscriber_dir
);
2151 * Store database information for publisher and subscriber. It should be
2152 * called before atexit() because its return is used in the
2153 * cleanup_objects_atexit().
2155 dbinfo
= store_pub_sub_info(&opt
, pub_base_conninfo
, sub_base_conninfo
);
2157 /* Register a function to clean up objects in case of failure */
2158 atexit(cleanup_objects_atexit
);
2161 * Check if the subscriber data directory has the same system identifier
2162 * than the publisher data directory.
2164 pub_sysid
= get_primary_sysid(dbinfo
[0].pubconninfo
);
2165 sub_sysid
= get_standby_sysid(subscriber_dir
);
2166 if (pub_sysid
!= sub_sysid
)
2167 pg_fatal("subscriber data directory is not a copy of the source database cluster");
2169 /* Subscriber PID file */
2170 snprintf(pidfile
, MAXPGPATH
, "%s/postmaster.pid", subscriber_dir
);
2173 * The standby server must not be running. If the server is started under
2174 * service manager and pg_createsubscriber stops it, the service manager
2175 * might react to this action and start the server again. Therefore,
2176 * refuse to proceed if the server is running to avoid possible failures.
2178 if (stat(pidfile
, &statbuf
) == 0)
2180 pg_log_error("standby server is running");
2181 pg_log_error_hint("Stop the standby server and try again.");
2186 * Start a short-lived standby server with temporary parameters (provided
2187 * by command-line options). The goal is to avoid connections during the
2188 * transformation steps.
2190 pg_log_info("starting the standby server with command-line options");
2191 start_standby_server(&opt
, true, false);
2193 /* Check if the standby server is ready for logical replication */
2194 check_subscriber(dbinfo
);
2196 /* Check if the primary server is ready for logical replication */
2197 check_publisher(dbinfo
);
2200 * Stop the target server. The recovery process requires that the server
2201 * reaches a consistent state before targeting the recovery stop point.
2202 * Make sure a consistent state is reached (stop the target server
2203 * guarantees it) *before* creating the replication slots in
2204 * setup_publisher().
2206 pg_log_info("stopping the subscriber");
2207 stop_standby_server(subscriber_dir
);
2209 /* Create the required objects for each database on publisher */
2210 consistent_lsn
= setup_publisher(dbinfo
);
2212 /* Write the required recovery parameters */
2213 setup_recovery(dbinfo
, subscriber_dir
, consistent_lsn
);
2216 * Start subscriber so the recovery parameters will take effect. Wait
2217 * until accepting connections. We don't want to start logical replication
2220 pg_log_info("starting the subscriber");
2221 start_standby_server(&opt
, true, true);
2223 /* Waiting the subscriber to be promoted */
2224 wait_for_end_recovery(dbinfo
[0].subconninfo
, &opt
);
2227 * Create the subscription for each database on subscriber. It does not
2228 * enable it immediately because it needs to adjust the replication start
2229 * point to the LSN reported by setup_publisher(). It also cleans up
2230 * publications created by this tool and replication to the standby.
2232 setup_subscriber(dbinfo
, consistent_lsn
);
2234 /* Remove primary_slot_name if it exists on primary */
2235 drop_primary_replication_slot(dbinfo
, primary_slot_name
);
2237 /* Remove failover replication slots if they exist on subscriber */
2238 drop_failover_replication_slots(dbinfo
);
2240 /* Stop the subscriber */
2241 pg_log_info("stopping the subscriber");
2242 stop_standby_server(subscriber_dir
);
2244 /* Change system identifier from subscriber */
2245 modify_subscriber_sysid(&opt
);
2249 pg_log_info("Done!");