Pre-beta mechanical code beautification.
[pgsql.git] / src / test / recovery / t / 040_standby_failover_slots_sync.pl
blobf0bf0ddc121f10936a3cd89e91d09c9d0786de5a
2 # Copyright (c) 2024, PostgreSQL Global Development Group
4 use strict;
5 use warnings;
6 use PostgreSQL::Test::Cluster;
7 use PostgreSQL::Test::Utils;
8 use Test::More;
10 ##################################################
11 # Test that when a subscription with failover enabled is created, it will alter
12 # the failover property of the corresponding slot on the publisher.
13 ##################################################
15 # Create publisher
16 my $publisher = PostgreSQL::Test::Cluster->new('publisher');
17 # Make sure pg_hba.conf is set up to allow connections from repl_role.
18 # This is only needed on Windows machines that don't use UNIX sockets.
19 $publisher->init(
20 allows_streaming => 'logical',
21 auth_extra => [ '--create-role', 'repl_role' ]);
22 # Disable autovacuum to avoid generating xid during stats update as otherwise
23 # the new XID could then be replicated to standby at some random point making
24 # slots at primary lag behind standby during slot sync.
25 $publisher->append_conf('postgresql.conf', 'autovacuum = off');
26 $publisher->start;
28 $publisher->safe_psql('postgres',
29 "CREATE PUBLICATION regress_mypub FOR ALL TABLES;");
31 my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
33 # Create a subscriber node, wait for sync to complete
34 my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
35 $subscriber1->init;
36 $subscriber1->start;
38 # Capture the time before the logical failover slot is created on the
39 # primary. We later call this publisher as primary anyway.
40 my $slot_creation_time_on_primary = $publisher->safe_psql(
41 'postgres', qq[
42 SELECT current_timestamp;
43 ]);
45 # Create a subscription that enables failover.
46 $subscriber1->safe_psql('postgres',
47 "CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, copy_data = false, failover = true, enabled = false);"
50 # Confirm that the failover flag on the slot is turned on
51 is( $publisher->safe_psql(
52 'postgres',
53 q{SELECT failover from pg_replication_slots WHERE slot_name = 'lsub1_slot';}
55 "t",
56 'logical slot has failover true on the publisher');
58 ##################################################
59 # Test that changing the failover property of a subscription updates the
60 # corresponding failover property of the slot.
61 ##################################################
63 # Disable failover
64 $subscriber1->safe_psql('postgres',
65 "ALTER SUBSCRIPTION regress_mysub1 SET (failover = false)");
67 # Confirm that the failover flag on the slot has now been turned off
68 is( $publisher->safe_psql(
69 'postgres',
70 q{SELECT failover from pg_replication_slots WHERE slot_name = 'lsub1_slot';}
72 "f",
73 'logical slot has failover false on the publisher');
75 # Enable failover
76 $subscriber1->safe_psql('postgres',
77 "ALTER SUBSCRIPTION regress_mysub1 SET (failover = true)");
79 # Confirm that the failover flag on the slot has now been turned on
80 is( $publisher->safe_psql(
81 'postgres',
82 q{SELECT failover from pg_replication_slots WHERE slot_name = 'lsub1_slot';}
84 "t",
85 'logical slot has failover true on the publisher');
87 ##################################################
88 # Test that the failover option cannot be changed for enabled subscriptions.
89 ##################################################
91 # Enable subscription
92 $subscriber1->safe_psql('postgres',
93 "ALTER SUBSCRIPTION regress_mysub1 ENABLE");
95 # Disable failover for enabled subscription
96 my ($result, $stdout, $stderr) = $subscriber1->psql('postgres',
97 "ALTER SUBSCRIPTION regress_mysub1 SET (failover = false)");
98 ok( $stderr =~ /ERROR: cannot set failover for enabled subscription/,
99 "altering failover is not allowed for enabled subscription");
101 ##################################################
102 # Test that pg_sync_replication_slots() cannot be executed on a non-standby server.
103 ##################################################
105 ($result, $stdout, $stderr) =
106 $publisher->psql('postgres', "SELECT pg_sync_replication_slots();");
107 ok( $stderr =~
108 /ERROR: replication slots can only be synchronized to a standby server/,
109 "cannot sync slots on a non-standby server");
111 ##################################################
112 # Test logical failover slots corresponding to different plugins can be
113 # synced to the standby.
115 # Configure standby1 to replicate and synchronize logical slots configured
116 # for failover on the primary
118 # failover slot lsub1_slot | output_plugin: pgoutput
119 # failover slot lsub2_slot | output_plugin: test_decoding
120 # primary ---> |
121 # physical slot sb1_slot --->| ----> standby1 (connected via streaming replication)
122 # | lsub1_slot, lsub2_slot (synced_slot)
123 ##################################################
125 my $primary = $publisher;
126 my $backup_name = 'backup';
127 $primary->backup($backup_name);
129 # Create a standby
130 my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
131 $standby1->init_from_backup(
132 $primary, $backup_name,
133 has_streaming => 1,
134 has_restoring => 1);
136 # Increase the log_min_messages setting to DEBUG2 on both the standby and
137 # primary to debug test failures, if any.
138 my $connstr_1 = $primary->connstr;
139 $standby1->append_conf(
140 'postgresql.conf', qq(
141 hot_standby_feedback = on
142 primary_slot_name = 'sb1_slot'
143 primary_conninfo = '$connstr_1 dbname=postgres'
144 log_min_messages = 'debug2'
147 $primary->append_conf('postgresql.conf', "log_min_messages = 'debug2'");
148 $primary->reload;
150 # Drop the subscription to prevent further advancement of the restart_lsn for
151 # the lsub1_slot.
152 $subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION regress_mysub1;");
154 # To ensure that restart_lsn has moved to a recent WAL position, we re-create
155 # the lsub1_slot.
156 $primary->psql('postgres',
157 q{SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);}
160 $primary->psql('postgres',
161 q{SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true);}
164 $primary->psql('postgres',
165 q{SELECT pg_create_physical_replication_slot('sb1_slot');});
167 # Start the standby so that slot syncing can begin
168 $standby1->start;
170 # Capture the inactive_since of the slot from the primary. Note that the slot
171 # will be inactive since the corresponding subscription was dropped.
172 my $inactive_since_on_primary =
173 $primary->validate_slot_inactive_since('lsub1_slot',
174 $slot_creation_time_on_primary);
176 # Wait for the standby to catch up so that the standby is not lagging behind
177 # the failover slots.
178 $primary->wait_for_replay_catchup($standby1);
180 # Synchronize the primary server slots to the standby.
181 $standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
183 # Confirm that the logical failover slots are created on the standby and are
184 # flagged as 'synced'
185 is( $standby1->safe_psql(
186 'postgres',
187 q{SELECT count(*) = 2 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'lsub2_slot') AND synced AND NOT temporary;}
189 "t",
190 'logical slots have synced as true on standby');
192 # Capture the inactive_since of the synced slot on the standby
193 my $inactive_since_on_standby =
194 $standby1->validate_slot_inactive_since('lsub1_slot',
195 $slot_creation_time_on_primary);
197 # Synced slot on the standby must get its own inactive_since
198 is( $standby1->safe_psql(
199 'postgres',
200 "SELECT '$inactive_since_on_primary'::timestamptz < '$inactive_since_on_standby'::timestamptz;"
202 "t",
203 'synchronized slot has got its own inactive_since');
205 ##################################################
206 # Test that the synchronized slot will be dropped if the corresponding remote
207 # slot on the primary server has been dropped.
208 ##################################################
210 $primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub2_slot');");
212 $standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
214 is( $standby1->safe_psql(
215 'postgres',
216 q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'lsub2_slot';}
218 "t",
219 'synchronized slot has been dropped');
221 ##################################################
222 # Test that if the synchronized slot is invalidated while the remote slot is
223 # still valid, the slot will be dropped and re-created on the standby by
224 # executing pg_sync_replication_slots() again.
225 ##################################################
227 # Configure the max_slot_wal_keep_size so that the synced slot can be
228 # invalidated due to wal removal.
229 $standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = 64kB');
230 $standby1->reload;
232 # Generate some activity and switch WAL file on the primary
233 $primary->advance_wal(1);
234 $primary->psql('postgres', "CHECKPOINT");
235 $primary->wait_for_replay_catchup($standby1);
237 # Request a checkpoint on the standby to trigger the WAL file(s) removal
238 $standby1->safe_psql('postgres', "CHECKPOINT");
240 # Check if the synced slot is invalidated
241 is( $standby1->safe_psql(
242 'postgres',
243 q{SELECT invalidation_reason = 'wal_removed' FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
245 "t",
246 'synchronized slot has been invalidated');
248 # Reset max_slot_wal_keep_size to avoid further wal removal
249 $standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = -1');
250 $standby1->reload;
252 # Capture the time before the logical failover slot is created on the primary.
253 $slot_creation_time_on_primary = $publisher->safe_psql(
254 'postgres', qq[
255 SELECT current_timestamp;
258 # To ensure that restart_lsn has moved to a recent WAL position, we re-create
259 # the lsub1_slot.
260 $primary->safe_psql(
261 'postgres', qq[
262 SELECT pg_drop_replication_slot('lsub1_slot');
263 SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);
266 # Capture the inactive_since of the slot from the primary. Note that the slot
267 # will be inactive since the corresponding subscription was dropped.
268 $inactive_since_on_primary =
269 $primary->validate_slot_inactive_since('lsub1_slot',
270 $slot_creation_time_on_primary);
272 # Wait for the standby to catch up so that the standby is not lagging behind
273 # the failover slots.
274 $primary->wait_for_replay_catchup($standby1);
276 my $log_offset = -s $standby1->logfile;
278 # Synchronize the primary server slots to the standby.
279 $standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
281 # Confirm that the invalidated slot has been dropped.
282 $standby1->wait_for_log(
283 qr/dropped replication slot "lsub1_slot" of dbid [0-9]+/, $log_offset);
285 # Confirm that the logical slot has been re-created on the standby and is
286 # flagged as 'synced'
287 is( $standby1->safe_psql(
288 'postgres',
289 q{SELECT invalidation_reason IS NULL AND synced AND NOT temporary FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
291 "t",
292 'logical slot is re-synced');
294 # Reset the log_min_messages to the default value.
295 $primary->append_conf('postgresql.conf', "log_min_messages = 'warning'");
296 $primary->reload;
298 $standby1->append_conf('postgresql.conf', "log_min_messages = 'warning'");
299 $standby1->reload;
301 ##################################################
302 # Test that a synchronized slot can not be decoded, altered or dropped by the
303 # user
304 ##################################################
306 # Attempting to perform logical decoding on a synced slot should result in an error
307 ($result, $stdout, $stderr) = $standby1->psql('postgres',
308 "select * from pg_logical_slot_get_changes('lsub1_slot', NULL, NULL);");
309 ok( $stderr =~
310 /ERROR: cannot use replication slot "lsub1_slot" for logical decoding/,
311 "logical decoding is not allowed on synced slot");
313 # Attempting to alter a synced slot should result in an error
314 ($result, $stdout, $stderr) = $standby1->psql(
315 'postgres',
316 qq[ALTER_REPLICATION_SLOT lsub1_slot (failover);],
317 replication => 'database');
318 ok($stderr =~ /ERROR: cannot alter replication slot "lsub1_slot"/,
319 "synced slot on standby cannot be altered");
321 # Attempting to drop a synced slot should result in an error
322 ($result, $stdout, $stderr) = $standby1->psql('postgres',
323 "SELECT pg_drop_replication_slot('lsub1_slot');");
324 ok($stderr =~ /ERROR: cannot drop replication slot "lsub1_slot"/,
325 "synced slot on standby cannot be dropped");
327 ##################################################
328 # Test that we cannot synchronize slots if dbname is not specified in the
329 # primary_conninfo.
330 ##################################################
332 $standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1'");
333 $standby1->reload;
335 ($result, $stdout, $stderr) =
336 $standby1->psql('postgres', "SELECT pg_sync_replication_slots();");
337 ok( $stderr =~
338 /ERROR: slot synchronization requires dbname to be specified in primary_conninfo/,
339 "cannot sync slots if dbname is not specified in primary_conninfo");
341 # Add the dbname back to the primary_conninfo for further tests
342 $standby1->append_conf('postgresql.conf',
343 "primary_conninfo = '$connstr_1 dbname=postgres'");
344 $standby1->reload;
346 ##################################################
347 # Test that we cannot synchronize slots to a cascading standby server.
348 ##################################################
350 # Create a cascading standby
351 $backup_name = 'backup2';
352 $standby1->backup($backup_name);
354 my $cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby');
355 $cascading_standby->init_from_backup(
356 $standby1, $backup_name,
357 has_streaming => 1,
358 has_restoring => 1);
360 my $cascading_connstr = $standby1->connstr;
361 $cascading_standby->append_conf(
362 'postgresql.conf', qq(
363 hot_standby_feedback = on
364 primary_slot_name = 'cascading_sb_slot'
365 primary_conninfo = '$cascading_connstr dbname=postgres'
368 $standby1->psql('postgres',
369 q{SELECT pg_create_physical_replication_slot('cascading_sb_slot');});
371 $cascading_standby->start;
373 ($result, $stdout, $stderr) =
374 $cascading_standby->psql('postgres', "SELECT pg_sync_replication_slots();");
375 ok( $stderr =~
376 /ERROR: cannot synchronize replication slots from a standby server/,
377 "cannot sync slots to a cascading standby server");
379 $cascading_standby->stop;
381 ##################################################
382 # Create a failover slot and advance the restart_lsn to a position where a
383 # running transaction exists. This setup is for testing that the synced slots
384 # can achieve the consistent snapshot state starting from the restart_lsn
385 # after promotion without losing any data that otherwise would have been
386 # received from the primary.
387 ##################################################
389 $primary->safe_psql('postgres',
390 "SELECT pg_create_logical_replication_slot('snap_test_slot', 'test_decoding', false, false, true);"
393 # Wait for the standby to catch up so that the standby is not lagging behind
394 # the failover slots.
395 $primary->wait_for_replay_catchup($standby1);
397 $standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
399 # Two xl_running_xacts logs are generated here. When decoding the first log, it
400 # only serializes the snapshot, without advancing the restart_lsn to the latest
401 # position. This is because if a transaction is running, the restart_lsn can
402 # only move to a position before that transaction. Hence, the second
403 # xl_running_xacts log is needed, the decoding for which allows the restart_lsn
404 # to advance to the last serialized snapshot's position (the first log).
405 $primary->safe_psql(
406 'postgres', qq(
407 BEGIN;
408 SELECT txid_current();
409 SELECT pg_log_standby_snapshot();
410 COMMIT;
411 BEGIN;
412 SELECT txid_current();
413 SELECT pg_log_standby_snapshot();
414 COMMIT;
417 # Advance the restart_lsn to the position of the first xl_running_xacts log
418 # generated above. Note that there might be concurrent xl_running_xacts logs
419 # written by the bgwriter, which could cause the position to be advanced to an
420 # unexpected point, but that would be a rare scenario and doesn't affect the
421 # test results.
422 $primary->safe_psql('postgres',
423 "SELECT pg_replication_slot_advance('snap_test_slot', pg_current_wal_lsn());"
426 # Wait for the standby to catch up so that the standby is not lagging behind
427 # the failover slots.
428 $primary->wait_for_replay_catchup($standby1);
430 # Log a message that will be consumed on the standby after promotion using the
431 # synced slot. See the test where we promote standby (Promote the standby1 to
432 # primary.)
433 $primary->safe_psql('postgres',
434 "SELECT pg_logical_emit_message(false, 'test', 'test');");
436 # Get the confirmed_flush_lsn for the logical slot snap_test_slot on the primary
437 my $confirmed_flush_lsn = $primary->safe_psql('postgres',
438 "SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'snap_test_slot';"
441 $standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
443 # Verify that confirmed_flush_lsn of snap_test_slot slot is synced to the standby
444 ok( $standby1->poll_query_until(
445 'postgres',
446 "SELECT '$confirmed_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'snap_test_slot' AND synced AND NOT temporary;"
448 'confirmed_flush_lsn of slot snap_test_slot synced to standby');
450 ##################################################
451 # Test to confirm that the slot synchronization is protected from malicious
452 # users.
453 ##################################################
455 $primary->psql('postgres', "CREATE DATABASE slotsync_test_db");
456 $primary->wait_for_replay_catchup($standby1);
458 $standby1->stop;
460 # On the primary server, create '=' operator in another schema mapped to
461 # inequality function and redirect the queries to use new operator by setting
462 # search_path. The new '=' operator is created with leftarg as 'bigint' and
463 # right arg as 'int' to redirect 'count(*) = 1' in slot sync's query to use
464 # new '=' operator.
465 $primary->safe_psql(
466 'slotsync_test_db', q{
468 CREATE ROLE repl_role REPLICATION LOGIN;
469 CREATE SCHEMA myschema;
471 CREATE FUNCTION myschema.myintne(bigint, int) RETURNS bool as $$
472 BEGIN
473 RETURN $1 <> $2;
474 END;
475 $$ LANGUAGE plpgsql immutable;
477 CREATE OPERATOR myschema.= (
478 leftarg = bigint,
479 rightarg = int,
480 procedure = myschema.myintne);
482 ALTER DATABASE slotsync_test_db SET SEARCH_PATH TO myschema,pg_catalog;
483 GRANT USAGE on SCHEMA myschema TO repl_role;
486 # Start the standby with changed primary_conninfo.
487 $standby1->append_conf('postgresql.conf',
488 "primary_conninfo = '$connstr_1 dbname=slotsync_test_db user=repl_role'");
489 $standby1->start;
491 # Run the synchronization function. If the sync flow was not prepared
492 # to handle such attacks, it would have failed during the validation
493 # of the primary_slot_name itself resulting in
494 # ERROR: slot synchronization requires valid primary_slot_name
495 $standby1->safe_psql('slotsync_test_db',
496 "SELECT pg_sync_replication_slots();");
498 # Reset the dbname and user in primary_conninfo to the earlier values.
499 $standby1->append_conf('postgresql.conf',
500 "primary_conninfo = '$connstr_1 dbname=postgres'");
501 $standby1->reload;
503 # Drop the newly created database.
504 $primary->psql('postgres', q{DROP DATABASE slotsync_test_db;});
506 ##################################################
507 # Test to confirm that the slot sync worker exits on invalid GUC(s) and
508 # get started again on valid GUC(s).
509 ##################################################
511 $log_offset = -s $standby1->logfile;
513 # Enable slot sync worker.
514 $standby1->append_conf('postgresql.conf', qq(sync_replication_slots = on));
515 $standby1->reload;
517 # Confirm that the slot sync worker is able to start.
518 $standby1->wait_for_log(qr/slot sync worker started/, $log_offset);
520 $log_offset = -s $standby1->logfile;
522 # Disable another GUC required for slot sync.
523 $standby1->append_conf('postgresql.conf', qq(hot_standby_feedback = off));
524 $standby1->reload;
526 # Confirm that slot sync worker acknowledge the GUC change and logs the msg
527 # about wrong configuration.
528 $standby1->wait_for_log(
529 qr/slot sync worker will restart because of a parameter change/,
530 $log_offset);
531 $standby1->wait_for_log(
532 qr/slot synchronization requires hot_standby_feedback to be enabled/,
533 $log_offset);
535 $log_offset = -s $standby1->logfile;
537 # Re-enable the required GUC
538 $standby1->append_conf('postgresql.conf', "hot_standby_feedback = on");
539 $standby1->reload;
541 # Confirm that the slot sync worker is able to start now.
542 $standby1->wait_for_log(qr/slot sync worker started/, $log_offset);
544 ##################################################
545 # Test to confirm that confirmed_flush_lsn of the logical slot on the primary
546 # is synced to the standby via the slot sync worker.
547 ##################################################
549 # Insert data on the primary
550 $primary->safe_psql(
551 'postgres', qq[
552 CREATE TABLE tab_int (a int PRIMARY KEY);
553 INSERT INTO tab_int SELECT generate_series(1, 10);
556 # Subscribe to the new table data and wait for it to arrive
557 $subscriber1->safe_psql(
558 'postgres', qq[
559 CREATE TABLE tab_int (a int PRIMARY KEY);
560 CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, failover = true, create_slot = false);
563 $subscriber1->wait_for_subscription_sync;
565 # Do not allow any further advancement of the confirmed_flush_lsn for the
566 # lsub1_slot.
567 $subscriber1->safe_psql('postgres',
568 "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
570 # Wait for the replication slot to become inactive on the publisher
571 $primary->poll_query_until(
572 'postgres',
573 "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
576 # Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary
577 my $primary_flush_lsn = $primary->safe_psql('postgres',
578 "SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"
581 # Confirm that confirmed_flush_lsn of lsub1_slot slot is synced to the standby
582 ok( $standby1->poll_query_until(
583 'postgres',
584 "SELECT '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"
586 'confirmed_flush_lsn of slot lsub1_slot synced to standby');
588 ##################################################
589 # Test that logical failover replication slots wait for the specified
590 # physical replication slots to receive the changes first. It uses the
591 # following set up:
593 # (physical standbys)
594 # | ----> standby1 (primary_slot_name = sb1_slot)
595 # | ----> standby2 (primary_slot_name = sb2_slot)
596 # primary ----- |
597 # (logical replication)
598 # | ----> subscriber1 (failover = true, slot_name = lsub1_slot)
599 # | ----> subscriber2 (failover = false, slot_name = lsub2_slot)
601 # standby_slot_names = 'sb1_slot'
603 # The setup is configured in such a way that the logical slot of subscriber1 is
604 # enabled for failover, and thus the subscriber1 will wait for the physical
605 # slot of standby1(sb1_slot) to catch up before receiving the decoded changes.
606 ##################################################
608 $backup_name = 'backup3';
610 $primary->psql('postgres',
611 q{SELECT pg_create_physical_replication_slot('sb2_slot');});
613 $primary->backup($backup_name);
615 # Create another standby
616 my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
617 $standby2->init_from_backup(
618 $primary, $backup_name,
619 has_streaming => 1,
620 has_restoring => 1);
621 $standby2->append_conf(
622 'postgresql.conf', qq(
623 primary_slot_name = 'sb2_slot'
625 $standby2->start;
626 $primary->wait_for_replay_catchup($standby2);
628 # Configure primary to disallow any logical slots that have enabled failover
629 # from getting ahead of the specified physical replication slot (sb1_slot).
630 $primary->append_conf(
631 'postgresql.conf', qq(
632 standby_slot_names = 'sb1_slot'
634 $primary->reload;
636 # Create another subscriber node without enabling failover, wait for sync to
637 # complete
638 my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2');
639 $subscriber2->init;
640 $subscriber2->start;
641 $subscriber2->safe_psql(
642 'postgres', qq[
643 CREATE TABLE tab_int (a int PRIMARY KEY);
644 CREATE SUBSCRIPTION regress_mysub2 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub2_slot);
647 $subscriber2->wait_for_subscription_sync;
649 $subscriber1->safe_psql('postgres',
650 "ALTER SUBSCRIPTION regress_mysub1 ENABLE");
652 my $offset = -s $primary->logfile;
654 # Stop the standby associated with the specified physical replication slot
655 # (sb1_slot) so that the logical replication slot (lsub1_slot) won't receive
656 # changes until the standby comes up.
657 $standby1->stop;
659 # Create some data on the primary
660 my $primary_row_count = 20;
661 $primary->safe_psql('postgres',
662 "INSERT INTO tab_int SELECT generate_series(11, $primary_row_count);");
664 # Wait until the standby2 that's still running gets the data from the primary
665 $primary->wait_for_replay_catchup($standby2);
666 $result = $standby2->safe_psql('postgres',
667 "SELECT count(*) = $primary_row_count FROM tab_int;");
668 is($result, 't', "standby2 gets data from primary");
670 # Wait for regress_mysub2 to get the data from the primary. This subscription
671 # was not enabled for failover so it gets the data without waiting for any
672 # standbys.
673 $primary->wait_for_catchup('regress_mysub2');
674 $result = $subscriber2->safe_psql('postgres',
675 "SELECT count(*) = $primary_row_count FROM tab_int;");
676 is($result, 't', "subscriber2 gets data from primary");
678 # Wait until the primary server logs a warning indicating that it is waiting
679 # for the sb1_slot to catch up.
680 $primary->wait_for_log(
681 qr/replication slot \"sb1_slot\" specified in parameter standby_slot_names does not have active_pid/,
682 $offset);
684 # The regress_mysub1 was enabled for failover so it doesn't get the data from
685 # primary and keeps waiting for the standby specified in standby_slot_names
686 # (sb1_slot aka standby1).
687 $result =
688 $subscriber1->safe_psql('postgres',
689 "SELECT count(*) <> $primary_row_count FROM tab_int;");
690 is($result, 't',
691 "subscriber1 doesn't get data from primary until standby1 acknowledges changes"
694 # Start the standby specified in standby_slot_names (sb1_slot aka standby1) and
695 # wait for it to catch up with the primary.
696 $standby1->start;
697 $primary->wait_for_replay_catchup($standby1);
698 $result = $standby1->safe_psql('postgres',
699 "SELECT count(*) = $primary_row_count FROM tab_int;");
700 is($result, 't', "standby1 gets data from primary");
702 # Now that the standby specified in standby_slot_names is up and running, the
703 # primary can send the decoded changes to the subscription enabled for failover
704 # (i.e. regress_mysub1). While the standby was down, regress_mysub1 didn't
705 # receive any data from the primary. i.e. the primary didn't allow it to go
706 # ahead of standby.
707 $primary->wait_for_catchup('regress_mysub1');
708 $result = $subscriber1->safe_psql('postgres',
709 "SELECT count(*) = $primary_row_count FROM tab_int;");
710 is($result, 't',
711 "subscriber1 gets data from primary after standby1 acknowledges changes");
713 ##################################################
714 # Verify that when using pg_logical_slot_get_changes to consume changes from a
715 # logical failover slot, it will also wait for the slots specified in
716 # standby_slot_names to catch up.
717 ##################################################
719 # Stop the standby associated with the specified physical replication slot so
720 # that the logical replication slot won't receive changes until the standby
721 # slot's restart_lsn is advanced or the slot is removed from the
722 # standby_slot_names list.
723 $primary->safe_psql('postgres', "TRUNCATE tab_int;");
724 $primary->wait_for_catchup('regress_mysub1');
725 $standby1->stop;
727 # Disable the regress_mysub1 to prevent the logical walsender from generating
728 # more warnings.
729 $subscriber1->safe_psql('postgres',
730 "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
732 # Wait for the replication slot to become inactive on the publisher
733 $primary->poll_query_until(
734 'postgres',
735 "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
738 # Create a logical 'test_decoding' replication slot with failover enabled
739 $primary->safe_psql('postgres',
740 "SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, false, true);"
743 my $back_q = $primary->background_psql(
744 'postgres',
745 on_error_stop => 0,
746 timeout => $PostgreSQL::Test::Utils::timeout_default);
748 # pg_logical_slot_get_changes will be blocked until the standby catches up,
749 # hence it needs to be executed in a background session.
750 $offset = -s $primary->logfile;
751 $back_q->query_until(
752 qr/logical_slot_get_changes/, q(
753 \echo logical_slot_get_changes
754 SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);
757 # Wait until the primary server logs a warning indicating that it is waiting
758 # for the sb1_slot to catch up.
759 $primary->wait_for_log(
760 qr/replication slot \"sb1_slot\" specified in parameter standby_slot_names does not have active_pid/,
761 $offset);
763 # Remove the standby from the standby_slot_names list and reload the
764 # configuration.
765 $primary->adjust_conf('postgresql.conf', 'standby_slot_names', "''");
766 $primary->reload;
768 # Since there are no slots in standby_slot_names, the function
769 # pg_logical_slot_get_changes should now return, and the session can be
770 # stopped.
771 $back_q->quit;
773 $primary->safe_psql('postgres',
774 "SELECT pg_drop_replication_slot('test_slot');");
776 # Add the physical slot (sb1_slot) back to the standby_slot_names for further
777 # tests.
778 $primary->adjust_conf('postgresql.conf', 'standby_slot_names', "'sb1_slot'");
779 $primary->reload;
781 # Enable the regress_mysub1 for further tests
782 $subscriber1->safe_psql('postgres',
783 "ALTER SUBSCRIPTION regress_mysub1 ENABLE");
785 ##################################################
786 # Test that logical replication will wait for the user-created inactive
787 # physical slot to catch up until we remove the slot from standby_slot_names.
788 ##################################################
790 $offset = -s $primary->logfile;
792 # Create some data on the primary
793 $primary_row_count = 10;
794 $primary->safe_psql('postgres',
795 "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);");
797 # Wait until the primary server logs a warning indicating that it is waiting
798 # for the sb1_slot to catch up.
799 $primary->wait_for_log(
800 qr/replication slot \"sb1_slot\" specified in parameter standby_slot_names does not have active_pid/,
801 $offset);
803 # The regress_mysub1 doesn't get the data from primary because the specified
804 # standby slot (sb1_slot) in standby_slot_names is inactive.
805 $result =
806 $subscriber1->safe_psql('postgres', "SELECT count(*) = 0 FROM tab_int;");
807 is($result, 't',
808 "subscriber1 doesn't get data as the sb1_slot doesn't catch up");
810 # Remove the standby from the standby_slot_names list and reload the
811 # configuration.
812 $primary->adjust_conf('postgresql.conf', 'standby_slot_names', "''");
813 $primary->reload;
815 # Since there are no slots in standby_slot_names, the primary server should now
816 # send the decoded changes to the subscription.
817 $primary->wait_for_catchup('regress_mysub1');
818 $result = $subscriber1->safe_psql('postgres',
819 "SELECT count(*) = $primary_row_count FROM tab_int;");
820 is($result, 't',
821 "subscriber1 gets data from primary after standby1 is removed from the standby_slot_names list"
824 # Add the physical slot (sb1_slot) back to the standby_slot_names for further
825 # tests.
826 $primary->adjust_conf('postgresql.conf', 'standby_slot_names', "'sb1_slot'");
827 $primary->reload;
829 ##################################################
830 # Promote the standby1 to primary. Confirm that:
831 # a) the slot 'lsub1_slot' and 'snap_test_slot' are retained on the new primary
832 # b) logical replication for regress_mysub1 is resumed successfully after failover
833 # c) changes can be consumed from the synced slot 'snap_test_slot'
834 ##################################################
835 $standby1->start;
836 $primary->wait_for_replay_catchup($standby1);
838 # Capture the time before the standby is promoted
839 my $promotion_time_on_primary = $standby1->safe_psql(
840 'postgres', qq[
841 SELECT current_timestamp;
844 $standby1->promote;
846 # Capture the inactive_since of the synced slot after the promotion.
847 # The expectation here is that the slot gets its inactive_since as part of the
848 # promotion. We do this check before the slot is enabled on the new primary
849 # below, otherwise, the slot gets active setting inactive_since to NULL.
850 my $inactive_since_on_new_primary =
851 $standby1->validate_slot_inactive_since('lsub1_slot',
852 $promotion_time_on_primary);
854 is( $standby1->safe_psql(
855 'postgres',
856 "SELECT '$inactive_since_on_new_primary'::timestamptz > '$inactive_since_on_primary'::timestamptz"
858 "t",
859 'synchronized slot has got its own inactive_since on the new primary after promotion'
862 # Update subscription with the new primary's connection info
863 my $standby1_conninfo = $standby1->connstr . ' dbname=postgres';
864 $subscriber1->safe_psql('postgres',
865 "ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo';");
867 # Confirm the synced slot 'lsub1_slot' is retained on the new primary
868 is( $standby1->safe_psql(
869 'postgres',
870 q{SELECT count(*) = 2 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'snap_test_slot') AND synced AND NOT temporary;}
872 't',
873 'synced slot retained on the new primary');
875 # Insert data on the new primary
876 $standby1->safe_psql('postgres',
877 "INSERT INTO tab_int SELECT generate_series(11, 20);");
878 $standby1->wait_for_catchup('regress_mysub1');
880 # Confirm that data in tab_int replicated on the subscriber
881 is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
882 "20", 'data replicated from the new primary');
884 # Consume the data from the snap_test_slot. The synced slot should reach a
885 # consistent point by restoring the snapshot at the restart_lsn serialized
886 # during slot synchronization.
887 $result = $standby1->safe_psql('postgres',
888 "SELECT count(*) FROM pg_logical_slot_get_changes('snap_test_slot', NULL, NULL) WHERE data ~ 'message*';"
891 is($result, '1', "data can be consumed using snap_test_slot");
893 done_testing();