1 # Copyright (c) 2023-2024, PostgreSQL Global Development Group
3 # logical decoding on standby : test logical decoding,
4 # recovery conflict and standby promotion.
7 use warnings FATAL
=> 'all';
9 use PostgreSQL
::Test
::Cluster
;
10 use PostgreSQL
::Test
::Utils
;
13 my ($stdin, $stdout, $stderr,
14 $cascading_stdout, $cascading_stderr, $subscriber_stdin,
15 $subscriber_stdout, $subscriber_stderr, $ret,
18 my $node_primary = PostgreSQL
::Test
::Cluster
->new('primary');
19 my $node_standby = PostgreSQL
::Test
::Cluster
->new('standby');
20 my $node_cascading_standby =
21 PostgreSQL
::Test
::Cluster
->new('cascading_standby');
22 my $node_subscriber = PostgreSQL
::Test
::Cluster
->new('subscriber');
23 my $default_timeout = $PostgreSQL::Test
::Utils
::timeout_default
;
26 # Name for the physical slot on primary
27 my $primary_slotname = 'primary_physical';
28 my $standby_physical_slotname = 'standby_physical';
30 # Fetch xmin columns from slot's pg_replication_slots row, after waiting for
31 # given boolean condition to be true to ensure we've reached a quiescent state.
34 my ($node, $slotname, $check_expr) = @_;
36 $node->poll_query_until(
39 FROM pg_catalog
.pg_replication_slots
40 WHERE slot_name
= '$slotname';
41 ]) or die "Timed out waiting for slot xmins to advance";
44 # Create the required logical slots on standby.
45 sub create_logical_slots
47 my ($node, $slot_prefix) = @_;
49 my $active_slot = $slot_prefix . 'activeslot';
50 my $inactive_slot = $slot_prefix . 'inactiveslot';
51 $node->create_logical_slot_on_standby($node_primary, qq($inactive_slot),
53 $node->create_logical_slot_on_standby($node_primary, qq($active_slot),
57 # Drop the logical slots on standby.
58 sub drop_logical_slots
60 my ($slot_prefix) = @_;
61 my $active_slot = $slot_prefix . 'activeslot';
62 my $inactive_slot = $slot_prefix . 'inactiveslot';
64 $node_standby->psql('postgres',
65 qq[SELECT pg_drop_replication_slot
('$inactive_slot')]);
66 $node_standby->psql('postgres',
67 qq[SELECT pg_drop_replication_slot
('$active_slot')]);
70 # Acquire one of the standby logical slots created by create_logical_slots().
71 # In case wait is true we are waiting for an active pid on the 'activeslot' slot.
72 # If wait is not true it means we are testing a known failure scenario.
75 my ($node, $slot_prefix, $wait, $to_stdout, $to_stderr) = @_;
78 my $active_slot = $slot_prefix . 'activeslot';
79 $slot_user_handle = IPC
::Run
::start
(
81 'pg_recvlogical', '-d',
82 $node->connstr('testdb'), '-S',
83 qq($active_slot), '-o',
84 'include-xids=0', '-o',
85 'skip-empty-xacts=1', '--no-loop',
93 IPC
::Run
::timeout
($default_timeout));
97 # make sure activeslot is in use
98 $node->poll_query_until('testdb',
99 qq[SELECT EXISTS
(SELECT
1 FROM pg_replication_slots WHERE slot_name
= '$active_slot' AND active_pid IS NOT NULL
)]
100 ) or die "slot never became active";
102 return $slot_user_handle;
105 # Check pg_recvlogical stderr
106 sub check_pg_recvlogical_stderr
108 my ($slot_user_handle, $check_stderr) = @_;
111 # our client should've terminated in response to the walsender error
112 $slot_user_handle->finish;
114 cmp_ok
($return, "!=", 0, "pg_recvlogical exited non-zero");
117 like
($stderr, qr/$check_stderr/, 'slot has been invalidated');
123 # Check if all the slots on standby are dropped. These include the 'activeslot'
124 # that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
125 sub check_slots_dropped
127 my ($slot_prefix, $slot_user_handle) = @_;
129 is
($node_standby->slot($slot_prefix . 'inactiveslot')->{'slot_type'},
130 '', 'inactiveslot on standby dropped');
131 is
($node_standby->slot($slot_prefix . 'activeslot')->{'slot_type'},
132 '', 'activeslot on standby dropped');
134 check_pg_recvlogical_stderr
($slot_user_handle, "conflict with recovery");
137 # Change hot_standby_feedback and check xmin and catalog_xmin values.
138 sub change_hot_standby_feedback_and_wait_for_xmins
140 my ($hsf, $invalidated) = @_;
142 $node_standby->append_conf(
143 'postgresql.conf', qq[
144 hot_standby_feedback
= $hsf
147 $node_standby->reload;
149 if ($hsf && $invalidated)
151 # With hot_standby_feedback on, xmin should advance,
152 # but catalog_xmin should still remain NULL since there is no logical slot.
153 wait_for_xmins
($node_primary, $primary_slotname,
154 "xmin IS NOT NULL AND catalog_xmin IS NULL");
158 # With hot_standby_feedback on, xmin and catalog_xmin should advance.
159 wait_for_xmins
($node_primary, $primary_slotname,
160 "xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
164 # Both should be NULL since hs_feedback is off
165 wait_for_xmins
($node_primary, $primary_slotname,
166 "xmin IS NULL AND catalog_xmin IS NULL");
171 # Check reason for conflict in pg_replication_slots.
172 sub check_slots_conflict_reason
174 my ($slot_prefix, $reason) = @_;
176 my $active_slot = $slot_prefix . 'activeslot';
177 my $inactive_slot = $slot_prefix . 'inactiveslot';
179 $res = $node_standby->safe_psql(
181 select invalidation_reason from pg_replication_slots where slot_name
= '$active_slot' and conflicting
;)
184 is
($res, "$reason", "$active_slot reason for conflict is $reason");
186 $res = $node_standby->safe_psql(
188 select invalidation_reason from pg_replication_slots where slot_name
= '$inactive_slot' and conflicting
;)
191 is
($res, "$reason", "$inactive_slot reason for conflict is $reason");
194 # Drop the slots, re-create them, change hot_standby_feedback,
195 # check xmin and catalog_xmin values, make slot active and reset stat.
196 sub reactive_slots_change_hfs_and_wait_for_xmins
198 my ($previous_slot_prefix, $slot_prefix, $hsf, $invalidated) = @_;
200 # drop the logical slots
201 drop_logical_slots
($previous_slot_prefix);
203 # create the logical slots
204 create_logical_slots
($node_standby, $slot_prefix);
206 change_hot_standby_feedback_and_wait_for_xmins
($hsf, $invalidated);
209 make_slot_active
($node_standby, $slot_prefix, 1, \
$stdout, \
$stderr);
211 # reset stat: easier to check for confl_active_logicalslot in pg_stat_database_conflicts
212 $node_standby->psql('testdb', q
[select pg_stat_reset
();]);
215 # Check invalidation in the logfile and in pg_stat_database_conflicts
216 sub check_for_invalidation
218 my ($slot_prefix, $log_start, $test_name) = @_;
220 my $active_slot = $slot_prefix . 'activeslot';
221 my $inactive_slot = $slot_prefix . 'inactiveslot';
223 # message should be issued
224 ok
( $node_standby->log_contains(
225 "invalidating obsolete replication slot \"$inactive_slot\"",
227 "inactiveslot slot invalidation is logged $test_name");
229 ok
( $node_standby->log_contains(
230 "invalidating obsolete replication slot \"$active_slot\"",
232 "activeslot slot invalidation is logged $test_name");
234 # Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated
235 ok
( $node_standby->poll_query_until(
237 "select (confl_active_logicalslot = 1) from pg_stat_database_conflicts where datname = 'testdb'",
239 'confl_active_logicalslot updated'
240 ) or die "Timed out waiting confl_active_logicalslot to be updated";
243 # Launch $sql query, wait for a new snapshot that has a newer horizon and
244 # launch a VACUUM. $vac_option is the set of options to be passed to the
245 # VACUUM command, $sql the sql to launch before triggering the vacuum and
246 # $to_vac the relation to vacuum.
248 # Note that pg_current_snapshot() is used to get the horizon. It does
249 # not generate a Transaction/COMMIT WAL record, decreasing the risk of
250 # seeing a xl_running_xacts that would advance an active replication slot's
251 # catalog_xmin. Advancing the active replication slot's catalog_xmin
252 # would break some tests that expect the active slot to conflict with
253 # the catalog xmin horizon.
254 sub wait_until_vacuum_can_remove
256 my ($vac_option, $sql, $to_vac) = @_;
258 # Get the current xid horizon,
259 my $xid_horizon = $node_primary->safe_psql('testdb',
260 qq[select pg_snapshot_xmin
(pg_current_snapshot
());]);
263 $node_primary->safe_psql('testdb', qq[$sql]);
265 # Wait until we get a newer horizon.
266 $node_primary->poll_query_until('testdb',
267 "SELECT (select pg_snapshot_xmin(pg_current_snapshot())::text::int - $xid_horizon) > 0"
268 ) or die "new snapshot does not have a newer horizon";
270 # Launch the vacuum command and insert into flush_wal (see CREATE
271 # TABLE flush_wal for the reason).
272 $node_primary->safe_psql(
273 'testdb', qq[VACUUM
$vac_option verbose
$to_vac;
274 INSERT INTO flush_wal DEFAULT VALUES
;]);
277 ########################
278 # Initialize primary node
279 ########################
281 $node_primary->init(allows_streaming
=> 1, has_archiving
=> 1);
282 $node_primary->append_conf(
283 'postgresql.conf', q{
284 wal_level = 'logical'
285 max_replication_slots = 4
289 $node_primary->dump_info;
290 $node_primary->start;
292 $node_primary->psql('postgres', q
[CREATE DATABASE testdb
]);
294 $node_primary->safe_psql('testdb',
295 qq[SELECT
* FROM pg_create_physical_replication_slot
('$primary_slotname');]
298 # Check conflicting is NULL for physical slot
299 $res = $node_primary->safe_psql(
301 SELECT conflicting is null FROM pg_replication_slots where slot_name
= '$primary_slotname';]
304 is
($res, 't', "Physical slot reports conflicting as NULL");
306 my $backup_name = 'b1';
307 $node_primary->backup($backup_name);
309 # Some tests need to wait for VACUUM to be replayed. But vacuum does not flush
310 # WAL. An insert into flush_wal outside transaction does guarantee a flush.
311 $node_primary->psql('testdb', q
[CREATE TABLE flush_wal
();]);
313 #######################
314 # Initialize standby node
315 #######################
317 $node_standby->init_from_backup(
318 $node_primary, $backup_name,
321 $node_standby->append_conf(
323 qq[primary_slot_name
= '$primary_slotname'
324 max_replication_slots
= 5]);
325 $node_standby->start;
326 $node_primary->wait_for_replay_catchup($node_standby);
328 #######################
329 # Initialize subscriber node
330 #######################
331 $node_subscriber->init;
332 $node_subscriber->start;
334 my %psql_subscriber = (
335 'subscriber_stdin' => '',
336 'subscriber_stdout' => '',
337 'subscriber_stderr' => '');
338 $psql_subscriber{run
} = IPC
::Run
::start
(
339 [ 'psql', '-XA', '-f', '-', '-d', $node_subscriber->connstr('postgres') ],
341 \
$psql_subscriber{subscriber_stdin
},
343 \
$psql_subscriber{subscriber_stdout
},
345 \
$psql_subscriber{subscriber_stderr
},
346 IPC
::Run
::timeout
($default_timeout));
348 ##################################################
349 # Test that logical decoding on the standby
351 ##################################################
353 # create the logical slots
354 create_logical_slots
($node_standby, 'behaves_ok_');
356 $node_primary->safe_psql('testdb',
357 qq[CREATE TABLE decoding_test
(x integer
, y text
);]);
358 $node_primary->safe_psql('testdb',
359 qq[INSERT INTO decoding_test
(x
,y
) SELECT s
, s
::text FROM generate_series
(1,10) s
;]
362 $node_primary->wait_for_replay_catchup($node_standby);
364 my $result = $node_standby->safe_psql('testdb',
365 qq[SELECT pg_logical_slot_get_changes
('behaves_ok_activeslot', NULL
, NULL
);]
368 # test if basic decoding works
369 is
(scalar(my @foobar = split /^/m, $result),
370 14, 'Decoding produced 14 rows (2 BEGIN/COMMIT and 10 rows)');
372 # Insert some rows and verify that we get the same results from pg_recvlogical
373 # and the SQL interface.
374 $node_primary->safe_psql('testdb',
375 qq[INSERT INTO decoding_test
(x
,y
) SELECT s
, s
::text FROM generate_series
(1,4) s
;]
378 my $expected = q{BEGIN
379 table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
380 table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
381 table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
382 table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
385 $node_primary->wait_for_replay_catchup($node_standby);
387 my $stdout_sql = $node_standby->safe_psql('testdb',
388 qq[SELECT data FROM pg_logical_slot_peek_changes
('behaves_ok_activeslot', NULL
, NULL
, 'include-xids', '0', 'skip-empty-xacts', '1');]
391 is
($stdout_sql, $expected, 'got expected output from SQL decoding session');
393 my $endpos = $node_standby->safe_psql('testdb',
394 "SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
397 # Insert some rows after $endpos, which we won't read.
398 $node_primary->safe_psql('testdb',
399 qq[INSERT INTO decoding_test
(x
,y
) SELECT s
, s
::text FROM generate_series
(5,50) s
;]
402 $node_primary->wait_for_replay_catchup($node_standby);
404 my $stdout_recv = $node_standby->pg_recvlogical_upto(
405 'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout,
406 'include-xids' => '0',
407 'skip-empty-xacts' => '1');
409 is
($stdout_recv, $expected,
410 'got same expected output from pg_recvlogical decoding session');
412 $node_standby->poll_query_until('testdb',
413 "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'behaves_ok_activeslot' AND active_pid IS NULL)"
414 ) or die "slot never became inactive";
416 $stdout_recv = $node_standby->pg_recvlogical_upto(
417 'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout,
418 'include-xids' => '0',
419 'skip-empty-xacts' => '1');
421 is
($stdout_recv, '', 'pg_recvlogical acknowledged changes');
423 $node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb');
425 # Wait for catchup to ensure that the new database is visible to other sessions
427 $node_primary->wait_for_replay_catchup($node_standby);
429 ($result, $stdout, $stderr) = $node_standby->psql('otherdb',
430 "SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
433 m/replication slot "behaves_ok_activeslot" was not created in this database/,
434 "replaying logical slot from another database fails");
436 ##################################################
437 # Test that we can subscribe on the standby with the publication
438 # created on the primary.
439 ##################################################
441 # Create a table on the primary
442 $node_primary->safe_psql('postgres',
443 "CREATE TABLE tab_rep (a int primary key)");
445 # Create a table (same structure) on the subscriber node
446 $node_subscriber->safe_psql('postgres',
447 "CREATE TABLE tab_rep (a int primary key)");
449 # Create a publication on the primary
450 $node_primary->safe_psql('postgres',
451 "CREATE PUBLICATION tap_pub for table tab_rep");
453 $node_primary->wait_for_replay_catchup($node_standby);
455 # Subscribe on the standby
456 my $standby_connstr = $node_standby->connstr . ' dbname=postgres';
458 # Not using safe_psql() here as it would wait for activity on the primary
459 # and we wouldn't be able to launch pg_log_standby_snapshot() on the primary
461 # psql_subscriber() allows to not wait synchronously.
462 $psql_subscriber{subscriber_stdin
} .= qq[CREATE SUBSCRIPTION tap_sub
463 CONNECTION
'$standby_connstr'
465 WITH
(copy_data
= off
);];
466 $psql_subscriber{subscriber_stdin
} .= "\n";
468 $psql_subscriber{run
}->pump_nb();
470 # Log the standby snapshot to speed up the subscription creation
471 $node_primary->log_standby_snapshot($node_standby, 'tap_sub');
473 # Explicitly shut down psql instance gracefully - to avoid hangs
474 # or worse on windows
475 $psql_subscriber{subscriber_stdin
} .= "\\q\n";
476 $psql_subscriber{run
}->finish;
478 $node_subscriber->wait_for_subscription_sync($node_standby, 'tap_sub');
480 # Insert some rows on the primary
481 $node_primary->safe_psql('postgres',
482 qq[INSERT INTO tab_rep
select generate_series
(1,10);]);
484 $node_primary->wait_for_replay_catchup($node_standby);
485 $node_standby->wait_for_catchup('tap_sub');
487 # Check that the subscriber can see the rows inserted in the primary
489 $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
490 is
($result, qq(10), 'check replicated inserts after subscription on standby');
492 # We do not need the subscription and the subscriber anymore
493 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
494 $node_subscriber->stop;
496 ##################################################
497 # Recovery conflict: Invalidate conflicting slots, including in-use slots
498 # Scenario 1: hot_standby_feedback off and vacuum FULL
500 # In passing, ensure that replication slot stats are not removed when the
501 # active slot is invalidated.
502 ##################################################
504 # One way to produce recovery conflict is to create/drop a relation and
505 # launch a vacuum full on pg_class with hot_standby_feedback turned off on
507 reactive_slots_change_hfs_and_wait_for_xmins
('behaves_ok_', 'vacuum_full_',
510 # Ensure that replication slot stats are not empty before triggering the
512 $node_primary->safe_psql('testdb',
513 qq[INSERT INTO decoding_test
(x
,y
) SELECT
100,'100';]);
515 $node_standby->poll_query_until('testdb',
516 qq[SELECT total_txns
> 0 FROM pg_stat_replication_slots WHERE slot_name
= 'vacuum_full_activeslot']
517 ) or die "replication slot stats of vacuum_full_activeslot not updated";
519 # This should trigger the conflict
520 wait_until_vacuum_can_remove
(
521 'full', 'CREATE TABLE conflict_test(x integer, y text);
522 DROP TABLE conflict_test;', 'pg_class');
524 $node_primary->wait_for_replay_catchup($node_standby);
526 # Check invalidation in the logfile and in pg_stat_database_conflicts
527 check_for_invalidation
('vacuum_full_', 1, 'with vacuum FULL on pg_class');
529 # Verify reason for conflict is 'rows_removed' in pg_replication_slots
530 check_slots_conflict_reason
('vacuum_full_', 'rows_removed');
532 # Ensure that replication slot stats are not removed after invalidation.
533 is
( $node_standby->safe_psql(
535 qq[SELECT total_txns
> 0 FROM pg_stat_replication_slots WHERE slot_name
= 'vacuum_full_activeslot']
538 'replication slot stats not removed after invalidation');
541 make_slot_active
($node_standby, 'vacuum_full_', 0, \
$stdout, \
$stderr);
543 # We are not able to read from the slot as it has been invalidated
544 check_pg_recvlogical_stderr
($handle,
545 "can no longer get changes from replication slot \"vacuum_full_activeslot\""
548 # Turn hot_standby_feedback back on
549 change_hot_standby_feedback_and_wait_for_xmins
(1, 1);
551 ##################################################
552 # Verify that invalidated logical slots stay invalidated across a restart.
553 ##################################################
554 $node_standby->restart;
556 # Verify reason for conflict is retained across a restart.
557 check_slots_conflict_reason
('vacuum_full_', 'rows_removed');
559 ##################################################
560 # Verify that invalidated logical slots do not lead to retaining WAL.
561 ##################################################
563 # Get the restart_lsn from an invalidated slot
564 my $restart_lsn = $node_standby->safe_psql(
566 "SELECT restart_lsn FROM pg_replication_slots
567 WHERE slot_name = 'vacuum_full_activeslot' AND conflicting;"
572 # As pg_walfile_name() can not be executed on the standby,
573 # get the WAL file name associated to this lsn from the primary
574 my $walfile_name = $node_primary->safe_psql('postgres',
575 "SELECT pg_walfile_name('$restart_lsn')");
577 chomp($walfile_name);
579 # Generate some activity and switch WAL file on the primary
580 $node_primary->advance_wal(1);
581 $node_primary->safe_psql('postgres', "checkpoint;");
583 # Wait for the standby to catch up
584 $node_primary->wait_for_replay_catchup($node_standby);
586 # Request a checkpoint on the standby to trigger the WAL file(s) removal
587 $node_standby->safe_psql('postgres', 'checkpoint;');
589 # Verify that the WAL file has not been retained on the standby
590 my $standby_walfile = $node_standby->data_dir . '/pg_wal/' . $walfile_name;
591 ok
(!-f
"$standby_walfile",
592 "invalidated logical slots do not lead to retaining WAL");
594 ##################################################
595 # Recovery conflict: Invalidate conflicting slots, including in-use slots
596 # Scenario 2: conflict due to row removal with hot_standby_feedback off.
597 ##################################################
599 # get the position to search from in the standby logfile
600 my $logstart = -s
$node_standby->logfile;
602 # One way to produce recovery conflict is to create/drop a relation and
603 # launch a vacuum on pg_class with hot_standby_feedback turned off on the
605 reactive_slots_change_hfs_and_wait_for_xmins
('vacuum_full_', 'row_removal_',
608 # This should trigger the conflict
609 wait_until_vacuum_can_remove
(
610 '', 'CREATE TABLE conflict_test(x integer, y text);
611 DROP TABLE conflict_test;', 'pg_class');
613 $node_primary->wait_for_replay_catchup($node_standby);
615 # Check invalidation in the logfile and in pg_stat_database_conflicts
616 check_for_invalidation
('row_removal_', $logstart, 'with vacuum on pg_class');
618 # Verify reason for conflict is 'rows_removed' in pg_replication_slots
619 check_slots_conflict_reason
('row_removal_', 'rows_removed');
622 make_slot_active
($node_standby, 'row_removal_', 0, \
$stdout, \
$stderr);
624 # We are not able to read from the slot as it has been invalidated
625 check_pg_recvlogical_stderr
($handle,
626 "can no longer get changes from replication slot \"row_removal_activeslot\""
629 ##################################################
630 # Recovery conflict: Same as Scenario 2 but on a shared catalog table
631 # Scenario 3: conflict due to row removal with hot_standby_feedback off.
632 ##################################################
634 # get the position to search from in the standby logfile
635 $logstart = -s
$node_standby->logfile;
637 # One way to produce recovery conflict on a shared catalog table is to
638 # create/drop a role and launch a vacuum on pg_authid with
639 # hot_standby_feedback turned off on the standby.
640 reactive_slots_change_hfs_and_wait_for_xmins
('row_removal_',
641 'shared_row_removal_', 0, 1);
643 # Trigger the conflict
644 wait_until_vacuum_can_remove
(
645 '', 'CREATE ROLE create_trash;
646 DROP ROLE create_trash;', 'pg_authid');
648 $node_primary->wait_for_replay_catchup($node_standby);
650 # Check invalidation in the logfile and in pg_stat_database_conflicts
651 check_for_invalidation
('shared_row_removal_', $logstart,
652 'with vacuum on pg_authid');
654 # Verify reason for conflict is 'rows_removed' in pg_replication_slots
655 check_slots_conflict_reason
('shared_row_removal_', 'rows_removed');
657 $handle = make_slot_active
($node_standby, 'shared_row_removal_', 0, \
$stdout,
660 # We are not able to read from the slot as it has been invalidated
661 check_pg_recvlogical_stderr
($handle,
662 "can no longer get changes from replication slot \"shared_row_removal_activeslot\""
665 ##################################################
666 # Recovery conflict: Same as Scenario 2 but on a non catalog table
667 # Scenario 4: No conflict expected.
668 ##################################################
670 # get the position to search from in the standby logfile
671 $logstart = -s
$node_standby->logfile;
673 reactive_slots_change_hfs_and_wait_for_xmins
('shared_row_removal_',
674 'no_conflict_', 0, 1);
676 # This should not trigger a conflict
677 wait_until_vacuum_can_remove
(
678 '', 'CREATE TABLE conflict_test(x integer, y text);
679 INSERT INTO conflict_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;
680 UPDATE conflict_test set x=1, y=1;', 'conflict_test');
682 $node_primary->wait_for_replay_catchup($node_standby);
684 # message should not be issued
685 ok
( !$node_standby->log_contains(
686 "invalidating obsolete slot \"no_conflict_inactiveslot\"", $logstart),
687 'inactiveslot slot invalidation is not logged with vacuum on conflict_test'
690 ok
( !$node_standby->log_contains(
691 "invalidating obsolete slot \"no_conflict_activeslot\"", $logstart),
692 'activeslot slot invalidation is not logged with vacuum on conflict_test'
695 # Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been updated
696 ok
( $node_standby->poll_query_until(
698 "select (confl_active_logicalslot = 0) from pg_stat_database_conflicts where datname = 'testdb'",
700 'confl_active_logicalslot not updated'
701 ) or die "Timed out waiting confl_active_logicalslot to be updated";
703 # Verify slots are reported as non conflicting in pg_replication_slots
704 is
( $node_standby->safe_psql(
706 q
[select bool_or
(conflicting
) from
707 (select conflicting from pg_replication_slots
708 where slot_type
= 'logical')]),
710 'Logical slots are reported as non conflicting');
712 # Turn hot_standby_feedback back on
713 change_hot_standby_feedback_and_wait_for_xmins
(1, 0);
715 # Restart the standby node to ensure no slots are still active
716 $node_standby->restart;
718 ##################################################
719 # Recovery conflict: Invalidate conflicting slots, including in-use slots
720 # Scenario 5: conflict due to on-access pruning.
721 ##################################################
723 # get the position to search from in the standby logfile
724 $logstart = -s
$node_standby->logfile;
726 # One way to produce recovery conflict is to trigger an on-access pruning
727 # on a relation marked as user_catalog_table.
728 reactive_slots_change_hfs_and_wait_for_xmins
('no_conflict_', 'pruning_', 0,
731 # This should trigger the conflict
732 $node_primary->safe_psql('testdb',
733 qq[CREATE TABLE prun
(id integer
, s char
(2000)) WITH
(fillfactor
= 75, user_catalog_table
= true
);]
735 $node_primary->safe_psql('testdb', qq[INSERT INTO prun VALUES
(1, 'A');]);
736 $node_primary->safe_psql('testdb', qq[UPDATE prun SET s
= 'B';]);
737 $node_primary->safe_psql('testdb', qq[UPDATE prun SET s
= 'C';]);
738 $node_primary->safe_psql('testdb', qq[UPDATE prun SET s
= 'D';]);
739 $node_primary->safe_psql('testdb', qq[UPDATE prun SET s
= 'E';]);
741 $node_primary->wait_for_replay_catchup($node_standby);
743 # Check invalidation in the logfile and in pg_stat_database_conflicts
744 check_for_invalidation
('pruning_', $logstart, 'with on-access pruning');
746 # Verify reason for conflict is 'rows_removed' in pg_replication_slots
747 check_slots_conflict_reason
('pruning_', 'rows_removed');
749 $handle = make_slot_active
($node_standby, 'pruning_', 0, \
$stdout, \
$stderr);
751 # We are not able to read from the slot as it has been invalidated
752 check_pg_recvlogical_stderr
($handle,
753 "can no longer get changes from replication slot \"pruning_activeslot\"");
755 # Turn hot_standby_feedback back on
756 change_hot_standby_feedback_and_wait_for_xmins
(1, 1);
758 ##################################################
759 # Recovery conflict: Invalidate conflicting slots, including in-use slots
760 # Scenario 6: incorrect wal_level on primary.
761 ##################################################
763 # get the position to search from in the standby logfile
764 $logstart = -s
$node_standby->logfile;
766 # drop the logical slots
767 drop_logical_slots
('pruning_');
769 # create the logical slots
770 create_logical_slots
($node_standby, 'wal_level_');
773 make_slot_active
($node_standby, 'wal_level_', 1, \
$stdout, \
$stderr);
775 # reset stat: easier to check for confl_active_logicalslot in pg_stat_database_conflicts
776 $node_standby->psql('testdb', q
[select pg_stat_reset
();]);
778 # Make primary wal_level replica. This will trigger slot conflict.
779 $node_primary->append_conf(
780 'postgresql.conf', q
[
781 wal_level
= 'replica'
783 $node_primary->restart;
785 $node_primary->wait_for_replay_catchup($node_standby);
787 # Check invalidation in the logfile and in pg_stat_database_conflicts
788 check_for_invalidation
('wal_level_', $logstart, 'due to wal_level');
790 # Verify reason for conflict is 'wal_level_insufficient' in pg_replication_slots
791 check_slots_conflict_reason
('wal_level_', 'wal_level_insufficient');
794 make_slot_active
($node_standby, 'wal_level_', 0, \
$stdout, \
$stderr);
795 # We are not able to read from the slot as it requires wal_level >= logical on the primary server
796 check_pg_recvlogical_stderr
($handle,
797 "logical decoding on standby requires wal_level >= logical on the primary"
800 # Restore primary wal_level
801 $node_primary->append_conf(
802 'postgresql.conf', q
[
803 wal_level
= 'logical'
805 $node_primary->restart;
806 $node_primary->wait_for_replay_catchup($node_standby);
809 make_slot_active
($node_standby, 'wal_level_', 0, \
$stdout, \
$stderr);
810 # as the slot has been invalidated we should not be able to read
811 check_pg_recvlogical_stderr
($handle,
812 "can no longer get changes from replication slot \"wal_level_activeslot\""
815 ##################################################
816 # DROP DATABASE should drops it's slots, including active slots.
817 ##################################################
819 # drop the logical slots
820 drop_logical_slots
('wal_level_');
822 # create the logical slots
823 create_logical_slots
($node_standby, 'drop_db_');
825 $handle = make_slot_active
($node_standby, 'drop_db_', 1, \
$stdout, \
$stderr);
827 # Create a slot on a database that would not be dropped. This slot should not
829 $node_standby->create_logical_slot_on_standby($node_primary, 'otherslot',
832 # dropdb on the primary to verify slots are dropped on standby
833 $node_primary->safe_psql('postgres', q
[DROP DATABASE testdb
]);
835 $node_primary->wait_for_replay_catchup($node_standby);
837 is
( $node_standby->safe_psql(
839 q
[SELECT EXISTS
(SELECT
1 FROM pg_database WHERE datname
= 'testdb')]),
841 'database dropped on standby');
843 check_slots_dropped
('drop_db', $handle);
845 is
($node_standby->slot('otherslot')->{'slot_type'},
846 'logical', 'otherslot on standby not dropped');
848 # Cleanup : manually drop the slot that was not dropped.
849 $node_standby->psql('postgres',
850 q
[SELECT pg_drop_replication_slot
('otherslot')]);
852 ##################################################
853 # Test standby promotion and logical decoding behavior
854 # after the standby gets promoted.
855 ##################################################
857 $node_standby->reload;
859 $node_primary->psql('postgres', q
[CREATE DATABASE testdb
]);
860 $node_primary->safe_psql('testdb',
861 qq[CREATE TABLE decoding_test
(x integer
, y text
);]);
863 # Wait for the standby to catchup before initializing the cascading standby
864 $node_primary->wait_for_replay_catchup($node_standby);
866 # Create a physical replication slot on the standby.
867 # Keep this step after the "Verify that invalidated logical slots do not lead
868 # to retaining WAL" test (as the physical slot on the standby could prevent the
870 $node_standby->safe_psql('testdb',
871 qq[SELECT
* FROM pg_create_physical_replication_slot
('$standby_physical_slotname');]
874 # Initialize cascading standby node
875 $node_standby->backup($backup_name);
876 $node_cascading_standby->init_from_backup(
877 $node_standby, $backup_name,
880 $node_cascading_standby->append_conf(
882 qq[primary_slot_name
= '$standby_physical_slotname'
883 hot_standby_feedback
= on
]);
884 $node_cascading_standby->start;
886 # create the logical slots
887 create_logical_slots
($node_standby, 'promotion_');
889 # Wait for the cascading standby to catchup before creating the slots
890 $node_standby->wait_for_replay_catchup($node_cascading_standby,
893 # create the logical slots on the cascading standby too
894 create_logical_slots
($node_cascading_standby, 'promotion_');
898 make_slot_active
($node_standby, 'promotion_', 1, \
$stdout, \
$stderr);
899 my $cascading_handle =
900 make_slot_active
($node_cascading_standby, 'promotion_', 1,
901 \
$cascading_stdout, \
$cascading_stderr);
903 # Insert some rows before the promotion
904 $node_primary->safe_psql('testdb',
905 qq[INSERT INTO decoding_test
(x
,y
) SELECT s
, s
::text FROM generate_series
(1,4) s
;]
908 # Wait for both standbys to catchup
909 $node_primary->wait_for_replay_catchup($node_standby);
910 $node_standby->wait_for_replay_catchup($node_cascading_standby,
914 $node_standby->promote;
916 # insert some rows on promoted standby
917 $node_standby->safe_psql('testdb',
918 qq[INSERT INTO decoding_test
(x
,y
) SELECT s
, s
::text FROM generate_series
(5,7) s
;]
921 # Wait for the cascading standby to catchup
922 $node_standby->wait_for_replay_catchup($node_cascading_standby);
925 table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
926 table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
927 table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
928 table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
931 table public.decoding_test: INSERT: x[integer]:5 y[text]:'5'
932 table public.decoding_test: INSERT: x[integer]:6 y[text]:'6'
933 table public.decoding_test: INSERT: x[integer]:7 y[text]:'7'
936 # check that we are decoding pre and post promotion inserted rows
937 $stdout_sql = $node_standby->safe_psql('testdb',
938 qq[SELECT data FROM pg_logical_slot_peek_changes
('promotion_inactiveslot', NULL
, NULL
, 'include-xids', '0', 'skip-empty-xacts', '1');]
941 is
($stdout_sql, $expected,
942 'got expected output from SQL decoding session on promoted standby');
944 # check that we are decoding pre and post promotion inserted rows
945 # with pg_recvlogical that has started before the promotion
946 my $pump_timeout = IPC
::Run
::timer
($PostgreSQL::Test
::Utils
::timeout_default
);
948 ok
(pump_until
($handle, $pump_timeout, \
$stdout, qr/^.*COMMIT.*COMMIT$/s),
949 'got 2 COMMIT from pg_recvlogical output');
952 is
($stdout, $expected,
953 'got same expected output from pg_recvlogical decoding session');
955 # check that we are decoding pre and post promotion inserted rows on the cascading standby
956 $stdout_sql = $node_cascading_standby->safe_psql('testdb',
957 qq[SELECT data FROM pg_logical_slot_peek_changes
('promotion_inactiveslot', NULL
, NULL
, 'include-xids', '0', 'skip-empty-xacts', '1');]
960 is
($stdout_sql, $expected,
961 'got expected output from SQL decoding session on cascading standby');
963 # check that we are decoding pre and post promotion inserted rows
964 # with pg_recvlogical that has started before the promotion on the cascading standby
966 $cascading_handle, $pump_timeout,
967 \
$cascading_stdout, qr/^.*COMMIT.*COMMIT$/s),
968 'got 2 COMMIT from pg_recvlogical output');
970 chomp($cascading_stdout);
971 is
($cascading_stdout, $expected,
972 'got same expected output from pg_recvlogical decoding session on cascading standby'