Repair memory leaks in plpython.
[pgsql.git] / doc / src / sgml / logicaldecoding.sgml
blob1c4ae38f1b99299d4696c3f4df85a67cca5a69c5
1 <!-- doc/src/sgml/logicaldecoding.sgml -->
2 <chapter id="logicaldecoding">
3 <title>Logical Decoding</title>
4 <indexterm zone="logicaldecoding">
5 <primary>Logical Decoding</primary>
6 </indexterm>
7 <para>
8 PostgreSQL provides infrastructure to stream the modifications performed
9 via SQL to external consumers. This functionality can be used for a
10 variety of purposes, including replication solutions and auditing.
11 </para>
13 <para>
14 Changes are sent out in streams identified by logical replication slots.
15 </para>
17 <para>
18 The format in which those changes are streamed is determined by the output
19 plugin used. An example plugin is provided in the PostgreSQL distribution.
20 Additional plugins can be
21 written to extend the choice of available formats without modifying any
22 core code.
23 Every output plugin has access to each individual new row produced
24 by <command>INSERT</command> and the new row version created
25 by <command>UPDATE</command>. Availability of old row versions for
26 <command>UPDATE</command> and <command>DELETE</command> depends on
27 the configured replica identity (see <xref linkend="sql-altertable-replica-identity"/>).
28 </para>
30 <para>
31 Changes can be consumed either using the streaming replication protocol
32 (see <xref linkend="protocol-replication"/> and
33 <xref linkend="logicaldecoding-walsender"/>), or by calling functions
34 via SQL (see <xref linkend="logicaldecoding-sql"/>). It is also possible
35 to write additional methods of consuming the output of a replication slot
36 without modifying core code
37 (see <xref linkend="logicaldecoding-writer"/>).
38 </para>
40 <sect1 id="logicaldecoding-example">
41 <title>Logical Decoding Examples</title>
43 <para>
44 The following example demonstrates controlling logical decoding using the
45 SQL interface.
46 </para>
48 <para>
49 Before you can use logical decoding, you must set
50 <xref linkend="guc-wal-level"/> to <literal>logical</literal> and
51 <xref linkend="guc-max-replication-slots"/> to at least 1. Then, you
52 should connect to the target database (in the example
53 below, <literal>postgres</literal>) as a superuser.
54 </para>
56 <programlisting>
57 postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding'
58 postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
59 slot_name | lsn
60 -----------------+-----------
61 regression_slot | 0/16B1970
62 (1 row)
64 postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
65 slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn
66 -----------------+---------------+-----------+----------+--------+-------------+-----------------
67 regression_slot | test_decoding | logical | postgres | f | 0/16A4408 | 0/16A4440
68 (1 row)
70 postgres=# -- There are no changes to see yet
71 postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
72 lsn | xid | data
73 -----+-----+------
74 (0 rows)
76 postgres=# CREATE TABLE data(id serial primary key, data text);
77 CREATE TABLE
79 postgres=# -- DDL isn't replicated, so all you'll see is the transaction
80 postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
81 lsn | xid | data
82 -----------+-------+--------------
83 0/BA2DA58 | 10297 | BEGIN 10297
84 0/BA5A5A0 | 10297 | COMMIT 10297
85 (2 rows)
87 postgres=# -- Once changes are read, they're consumed and not emitted
88 postgres=# -- in a subsequent call:
89 postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
90 lsn | xid | data
91 -----+-----+------
92 (0 rows)
94 postgres=# BEGIN;
95 postgres=*# INSERT INTO data(data) VALUES('1');
96 postgres=*# INSERT INTO data(data) VALUES('2');
97 postgres=*# COMMIT;
99 postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
100 lsn | xid | data
101 -----------+-------+---------------------------------------------------------
102 0/BA5A688 | 10298 | BEGIN 10298
103 0/BA5A6F0 | 10298 | table public.data: INSERT: id[integer]:1 data[text]:'1'
104 0/BA5A7F8 | 10298 | table public.data: INSERT: id[integer]:2 data[text]:'2'
105 0/BA5A8A8 | 10298 | COMMIT 10298
106 (4 rows)
108 postgres=# INSERT INTO data(data) VALUES('3');
110 postgres=# -- You can also peek ahead in the change stream without consuming changes
111 postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
112 lsn | xid | data
113 -----------+-------+---------------------------------------------------------
114 0/BA5A8E0 | 10299 | BEGIN 10299
115 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
116 0/BA5A990 | 10299 | COMMIT 10299
117 (3 rows)
119 postgres=# -- The next call to pg_logical_slot_peek_changes() returns the same changes again
120 postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
121 lsn | xid | data
122 -----------+-------+---------------------------------------------------------
123 0/BA5A8E0 | 10299 | BEGIN 10299
124 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
125 0/BA5A990 | 10299 | COMMIT 10299
126 (3 rows)
128 postgres=# -- options can be passed to output plugin, to influence the formatting
129 postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on');
130 lsn | xid | data
131 -----------+-------+---------------------------------------------------------
132 0/BA5A8E0 | 10299 | BEGIN 10299
133 0/BA5A8E0 | 10299 | table public.data: INSERT: id[integer]:3 data[text]:'3'
134 0/BA5A990 | 10299 | COMMIT 10299 (at 2017-05-10 12:07:21.272494-04)
135 (3 rows)
137 postgres=# -- Remember to destroy a slot you no longer need to stop it consuming
138 postgres=# -- server resources:
139 postgres=# SELECT pg_drop_replication_slot('regression_slot');
140 pg_drop_replication_slot
141 -----------------------
143 (1 row)
144 </programlisting>
146 <para>
147 The following examples show how logical decoding is controlled over the
148 streaming replication protocol, using the
149 program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL
150 distribution. This requires that client authentication is set up to allow
151 replication connections
152 (see <xref linkend="streaming-replication-authentication"/>) and
153 that <varname>max_wal_senders</varname> is set sufficiently high to allow
154 an additional connection. The second example shows how to stream two-phase
155 transactions. Before you use two-phase commands, you must set
156 <xref linkend="guc-max-prepared-transactions"/> to at least 1.
157 </para>
158 <programlisting>
159 Example 1:
160 $ pg_recvlogical -d postgres --slot=test --create-slot
161 $ pg_recvlogical -d postgres --slot=test --start -f -
162 <keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
163 $ psql -d postgres -c "INSERT INTO data(data) VALUES('4');"
164 $ fg
165 BEGIN 693
166 table public.data: INSERT: id[integer]:4 data[text]:'4'
167 COMMIT 693
168 <keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
169 $ pg_recvlogical -d postgres --slot=test --drop-slot
171 Example 2:
172 $ pg_recvlogical -d postgres --slot=test --create-slot --two-phase
173 $ pg_recvlogical -d postgres --slot=test --start -f -
174 <keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
175 $ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';"
176 $ fg
177 BEGIN 694
178 table public.data: INSERT: id[integer]:5 data[text]:'5'
179 PREPARE TRANSACTION 'test', txid 694
180 <keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
181 $ psql -d postgres -c "COMMIT PREPARED 'test';"
182 $ fg
183 COMMIT PREPARED 'test', txid 694
184 <keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
185 $ pg_recvlogical -d postgres --slot=test --drop-slot
186 </programlisting>
188 <para>
189 The following example shows SQL interface that can be used to decode prepared
190 transactions. Before you use two-phase commit commands, you must set
191 <varname>max_prepared_transactions</varname> to at least 1. You must also have
192 set the two-phase parameter as 'true' while creating the slot using
193 <function>pg_create_logical_replication_slot</function>
194 Note that we will stream the entire transaction after the commit if it
195 is not already decoded.
196 </para>
197 <programlisting>
198 postgres=# BEGIN;
199 postgres=*# INSERT INTO data(data) VALUES('5');
200 postgres=*# PREPARE TRANSACTION 'test_prepared1';
202 postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
203 lsn | xid | data
204 -----------+-----+---------------------------------------------------------
205 0/1689DC0 | 529 | BEGIN 529
206 0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
207 0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
208 (3 rows)
210 postgres=# COMMIT PREPARED 'test_prepared1';
211 postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
212 lsn | xid | data
213 -----------+-----+--------------------------------------------
214 0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
215 (4 row)
217 postgres=#-- you can also rollback a prepared transaction
218 postgres=# BEGIN;
219 postgres=*# INSERT INTO data(data) VALUES('6');
220 postgres=*# PREPARE TRANSACTION 'test_prepared2';
221 postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
222 lsn | xid | data
223 -----------+-----+---------------------------------------------------------
224 0/168A180 | 530 | BEGIN 530
225 0/168A1E8 | 530 | table public.data: INSERT: id[integer]:4 data[text]:'6'
226 0/168A430 | 530 | PREPARE TRANSACTION 'test_prepared2', txid 530
227 (3 rows)
229 postgres=# ROLLBACK PREPARED 'test_prepared2';
230 postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
231 lsn | xid | data
232 -----------+-----+----------------------------------------------
233 0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530
234 (1 row)
235 </programlisting>
236 </sect1>
238 <sect1 id="logicaldecoding-explanation">
239 <title>Logical Decoding Concepts</title>
240 <sect2 id="logicaldecoding-explanation-log-dec">
241 <title>Logical Decoding</title>
243 <indexterm>
244 <primary>Logical Decoding</primary>
245 </indexterm>
247 <para>
248 Logical decoding is the process of extracting all persistent changes
249 to a database's tables into a coherent, easy to understand format which
250 can be interpreted without detailed knowledge of the database's internal
251 state.
252 </para>
254 <para>
255 In <productname>PostgreSQL</productname>, logical decoding is implemented
256 by decoding the contents of the <link linkend="wal">write-ahead
257 log</link>, which describe changes on a storage level, into an
258 application-specific form such as a stream of tuples or SQL statements.
259 </para>
260 </sect2>
262 <sect2 id="logicaldecoding-replication-slots">
263 <title>Replication Slots</title>
265 <indexterm>
266 <primary>replication slot</primary>
267 <secondary>logical replication</secondary>
268 </indexterm>
270 <para>
271 In the context of logical replication, a slot represents a stream of
272 changes that can be replayed to a client in the order they were made on
273 the origin server. Each slot streams a sequence of changes from a single
274 database.
275 </para>
277 <note>
278 <para><productname>PostgreSQL</productname> also has streaming replication slots
279 (see <xref linkend="streaming-replication"/>), but they are used somewhat
280 differently there.
281 </para>
282 </note>
284 <para>
285 A replication slot has an identifier that is unique across all databases
286 in a <productname>PostgreSQL</productname> cluster. Slots persist
287 independently of the connection using them and are crash-safe.
288 </para>
290 <para>
291 A logical slot will emit each change just once in normal operation.
292 The current position of each slot is persisted only at checkpoint, so in
293 the case of a crash the slot may return to an earlier LSN, which will
294 then cause recent changes to be sent again when the server restarts.
295 Logical decoding clients are responsible for avoiding ill effects from
296 handling the same message more than once. Clients may wish to record
297 the last LSN they saw when decoding and skip over any repeated data or
298 (when using the replication protocol) request that decoding start from
299 that LSN rather than letting the server determine the start point.
300 The Replication Progress Tracking feature is designed for this purpose,
301 refer to <link linkend="replication-origins">replication origins</link>.
302 </para>
304 <para>
305 Multiple independent slots may exist for a single database. Each slot has
306 its own state, allowing different consumers to receive changes from
307 different points in the database change stream. For most applications, a
308 separate slot will be required for each consumer.
309 </para>
311 <para>
312 A logical replication slot knows nothing about the state of the
313 receiver(s). It's even possible to have multiple different receivers using
314 the same slot at different times; they'll just get the changes following
315 on from when the last receiver stopped consuming them. Only one receiver
316 may consume changes from a slot at any given time.
317 </para>
319 <para>
320 A logical replication slot can also be created on a hot standby. To prevent
321 <command>VACUUM</command> from removing required rows from the system
322 catalogs, <varname>hot_standby_feedback</varname> should be set on the
323 standby. In spite of that, if any required rows get removed, the slot gets
324 invalidated. It's highly recommended to use a physical slot between the
325 primary and the standby. Otherwise, <varname>hot_standby_feedback</varname>
326 will work but only while the connection is alive (for example a node
327 restart would break it). Then, the primary may delete system catalog rows
328 that could be needed by the logical decoding on the standby (as it does
329 not know about the <literal>catalog_xmin</literal> on the standby).
330 Existing logical slots on standby also get invalidated if
331 <varname>wal_level</varname> on the primary is reduced to less than
332 <literal>logical</literal>.
333 This is done as soon as the standby detects such a change in the WAL stream.
334 It means that, for walsenders that are lagging (if any), some WAL records up
335 to the <varname>wal_level</varname> parameter change on the primary won't be
336 decoded.
337 </para>
339 <para>
340 Creation of a logical slot requires information about all the currently
341 running transactions. On the primary, this information is available
342 directly, but on a standby, this information has to be obtained from
343 primary. Thus, slot creation may need to wait for some activity to happen
344 on the primary. If the primary is idle, creating a logical slot on
345 standby may take noticeable time. This can be sped up by calling the
346 <function>pg_log_standby_snapshot</function> function on the primary.
347 </para>
349 <caution>
350 <para>
351 Replication slots persist across crashes and know nothing about the state
352 of their consumer(s). They will prevent removal of required resources
353 even when there is no connection using them. This consumes storage
354 because neither required WAL nor required rows from the system catalogs
355 can be removed by <command>VACUUM</command> as long as they are required by a replication
356 slot. In extreme cases this could cause the database to shut down to prevent
357 transaction ID wraparound (see <xref linkend="vacuum-for-wraparound"/>).
358 So if a slot is no longer required it should be dropped.
359 </para>
360 </caution>
362 </sect2>
364 <sect2 id="logicaldecoding-replication-slots-synchronization">
365 <title>Replication Slot Synchronization</title>
366 <para>
367 The logical replication slots on the primary can be synchronized to
368 the hot standby by using the <literal>failover</literal> parameter of
369 <link linkend="pg-create-logical-replication-slot">
370 <function>pg_create_logical_replication_slot</function></link>, or by
371 using the <link linkend="sql-createsubscription-params-with-failover">
372 <literal>failover</literal></link> option of
373 <command>CREATE SUBSCRIPTION</command> during slot creation, and then calling
374 <link linkend="pg-sync-replication-slots">
375 <function>pg_sync_replication_slots</function></link>
376 on the standby. By setting <link linkend="guc-sync-replication-slots">
377 <varname>sync_replication_slots</varname></link>
378 on the standby, the failover slots can be synchronized periodically in
379 the slotsync worker. For the synchronization to work, it is mandatory to
380 have a physical replication slot between the primary and the standby (i.e.,
381 <link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>
382 should be configured on the standby), and
383 <link linkend="guc-hot-standby-feedback"><varname>hot_standby_feedback</varname></link>
384 must be enabled on the standby. It is also necessary to specify a valid
385 <literal>dbname</literal> in the
386 <link linkend="guc-primary-conninfo"><varname>primary_conninfo</varname></link>.
387 It's highly recommended that the said physical replication slot is named in
388 <link linkend="guc-synchronized-standby-slots"><varname>synchronized_standby_slots</varname></link>
389 list on the primary, to prevent the subscriber from consuming changes
390 faster than the hot standby. Even when correctly configured, some latency
391 is expected when sending changes to logical subscribers due to the waiting
392 on slots named in
393 <link linkend="guc-synchronized-standby-slots"><varname>synchronized_standby_slots</varname></link>.
394 When <varname>synchronized_standby_slots</varname> is utilized, the
395 primary server will not completely shut down until the corresponding
396 standbys, associated with the physical replication slots specified
397 in <varname>synchronized_standby_slots</varname>, have confirmed
398 receiving the WAL up to the latest flushed position on the primary server.
399 </para>
401 <para>
402 The ability to resume logical replication after failover depends upon the
403 <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>synced</structfield>
404 value for the synchronized slots on the standby at the time of failover.
405 Only persistent slots that have attained synced state as true on the standby
406 before failover can be used for logical replication after failover.
407 Temporary synced slots cannot be used for logical decoding, therefore
408 logical replication for those slots cannot be resumed. For example, if the
409 synchronized slot could not become persistent on the standby due to a
410 disabled subscription, then the subscription cannot be resumed after
411 failover even when it is enabled.
412 </para>
414 <para>
415 To resume logical replication after failover from the synced logical
416 slots, the subscription's 'conninfo' must be altered to point to the
417 new primary server. This is done using
418 <link linkend="sql-altersubscription-params-connection"><command>ALTER SUBSCRIPTION ... CONNECTION</command></link>.
419 It is recommended that subscriptions are first disabled before promoting
420 the standby and are re-enabled after altering the connection string.
421 </para>
422 <caution>
423 <para>
424 There is a chance that the old primary is up again during the promotion
425 and if subscriptions are not disabled, the logical subscribers may
426 continue to receive data from the old primary server even after promotion
427 until the connection string is altered. This might result in data
428 inconsistency issues, preventing the logical subscribers from being
429 able to continue replication from the new primary server.
430 </para>
431 </caution>
432 </sect2>
434 <sect2 id="logicaldecoding-explanation-output-plugins">
435 <title>Output Plugins</title>
436 <para>
437 Output plugins transform the data from the write-ahead log's internal
438 representation into the format the consumer of a replication slot desires.
439 </para>
440 </sect2>
442 <sect2 id="logicaldecoding-explanation-exported-snapshots">
443 <title>Exported Snapshots</title>
444 <para>
445 When a new replication slot is created using the streaming replication
446 interface (see <xref linkend="protocol-replication-create-replication-slot"/>), a
447 snapshot is exported
448 (see <xref linkend="functions-snapshot-synchronization"/>), which will show
449 exactly the state of the database after which all changes will be
450 included in the change stream. This can be used to create a new replica by
451 using <link linkend="sql-set-transaction"><literal>SET TRANSACTION
452 SNAPSHOT</literal></link> to read the state of the database at the moment
453 the slot was created. This transaction can then be used to dump the
454 database's state at that point in time, which afterwards can be updated
455 using the slot's contents without losing any changes.
456 </para>
457 <para>
458 Creation of a snapshot is not always possible. In particular, it will
459 fail when connected to a hot standby. Applications that do not require
460 snapshot export may suppress it with the <literal>NOEXPORT_SNAPSHOT</literal>
461 option.
462 </para>
463 </sect2>
464 </sect1>
466 <sect1 id="logicaldecoding-walsender">
467 <title>Streaming Replication Protocol Interface</title>
469 <para>
470 The commands
471 <itemizedlist>
472 <listitem>
473 <para><literal>CREATE_REPLICATION_SLOT <replaceable>slot_name</replaceable> LOGICAL <replaceable>output_plugin</replaceable></literal></para>
474 </listitem>
476 <listitem>
477 <para><literal>DROP_REPLICATION_SLOT <replaceable>slot_name</replaceable></literal> <optional> <literal>WAIT</literal> </optional></para>
478 </listitem>
480 <listitem>
481 <para><literal>START_REPLICATION SLOT <replaceable>slot_name</replaceable> LOGICAL ...</literal></para>
482 </listitem>
483 </itemizedlist>
484 are used to create, drop, and stream changes from a replication
485 slot, respectively. These commands are only available over a replication
486 connection; they cannot be used via SQL.
487 See <xref linkend="protocol-replication"/> for details on these commands.
488 </para>
490 <para>
491 The command <xref linkend="app-pgrecvlogical"/> can be used to control
492 logical decoding over a streaming replication connection. (It uses
493 these commands internally.)
494 </para>
495 </sect1>
497 <sect1 id="logicaldecoding-sql">
498 <title>Logical Decoding <acronym>SQL</acronym> Interface</title>
500 <para>
501 See <xref linkend="functions-replication"/> for detailed documentation on
502 the SQL-level API for interacting with logical decoding.
503 </para>
505 <para>
506 Synchronous replication (see <xref linkend="synchronous-replication"/>) is
507 only supported on replication slots used over the streaming replication interface. The
508 function interface and additional, non-core interfaces do not support
509 synchronous replication.
510 </para>
511 </sect1>
513 <sect1 id="logicaldecoding-catalogs">
514 <title>System Catalogs Related to Logical Decoding</title>
516 <para>
517 The <link linkend="view-pg-replication-slots"><structname>pg_replication_slots</structname></link>
518 view and the
519 <link linkend="monitoring-pg-stat-replication-view">
520 <structname>pg_stat_replication</structname></link>
521 view provide information about the current state of replication slots and
522 streaming replication connections respectively. These views apply to both physical and
523 logical replication. The
524 <link linkend="monitoring-pg-stat-replication-slots-view">
525 <structname>pg_stat_replication_slots</structname></link>
526 view provides statistics information about the logical replication slots.
527 </para>
528 </sect1>
530 <sect1 id="logicaldecoding-output-plugin">
531 <title>Logical Decoding Output Plugins</title>
532 <para>
533 An example output plugin can be found in the
534 <link linkend="test-decoding">
535 <filename>contrib/test_decoding</filename>
536 </link>
537 subdirectory of the PostgreSQL source tree.
538 </para>
539 <sect2 id="logicaldecoding-output-init">
540 <title>Initialization Function</title>
541 <indexterm zone="logicaldecoding-output-init">
542 <primary>_PG_output_plugin_init</primary>
543 </indexterm>
544 <para>
545 An output plugin is loaded by dynamically loading a shared library with
546 the output plugin's name as the library base name. The normal library
547 search path is used to locate the library. To provide the required output
548 plugin callbacks and to indicate that the library is actually an output
549 plugin it needs to provide a function named
550 <function>_PG_output_plugin_init</function>. This function is passed a
551 struct that needs to be filled with the callback function pointers for
552 individual actions.
553 <programlisting>
554 typedef struct OutputPluginCallbacks
556 LogicalDecodeStartupCB startup_cb;
557 LogicalDecodeBeginCB begin_cb;
558 LogicalDecodeChangeCB change_cb;
559 LogicalDecodeTruncateCB truncate_cb;
560 LogicalDecodeCommitCB commit_cb;
561 LogicalDecodeMessageCB message_cb;
562 LogicalDecodeFilterByOriginCB filter_by_origin_cb;
563 LogicalDecodeShutdownCB shutdown_cb;
564 LogicalDecodeFilterPrepareCB filter_prepare_cb;
565 LogicalDecodeBeginPrepareCB begin_prepare_cb;
566 LogicalDecodePrepareCB prepare_cb;
567 LogicalDecodeCommitPreparedCB commit_prepared_cb;
568 LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
569 LogicalDecodeStreamStartCB stream_start_cb;
570 LogicalDecodeStreamStopCB stream_stop_cb;
571 LogicalDecodeStreamAbortCB stream_abort_cb;
572 LogicalDecodeStreamPrepareCB stream_prepare_cb;
573 LogicalDecodeStreamCommitCB stream_commit_cb;
574 LogicalDecodeStreamChangeCB stream_change_cb;
575 LogicalDecodeStreamMessageCB stream_message_cb;
576 LogicalDecodeStreamTruncateCB stream_truncate_cb;
577 } OutputPluginCallbacks;
579 typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
580 </programlisting>
581 The <function>begin_cb</function>, <function>change_cb</function>
582 and <function>commit_cb</function> callbacks are required,
583 while <function>startup_cb</function>, <function>truncate_cb</function>,
584 <function>message_cb</function>, <function>filter_by_origin_cb</function>,
585 and <function>shutdown_cb</function> are optional.
586 If <function>truncate_cb</function> is not set but a
587 <command>TRUNCATE</command> is to be decoded, the action will be ignored.
588 </para>
590 <para>
591 An output plugin may also define functions to support streaming of large,
592 in-progress transactions. The <function>stream_start_cb</function>,
593 <function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
594 <function>stream_commit_cb</function>, and <function>stream_change_cb</function>
595 are required, while <function>stream_message_cb</function> and
596 <function>stream_truncate_cb</function> are optional. The
597 <function>stream_prepare_cb</function> is also required if the output
598 plugin also support two-phase commits.
599 </para>
601 <para>
602 An output plugin may also define functions to support two-phase commits,
603 which allows actions to be decoded on the <command>PREPARE TRANSACTION</command>.
604 The <function>begin_prepare_cb</function>, <function>prepare_cb</function>,
605 <function>commit_prepared_cb</function> and <function>rollback_prepared_cb</function>
606 callbacks are required, while <function>filter_prepare_cb</function> is optional.
607 The <function>stream_prepare_cb</function> is also required if the output plugin
608 also supports the streaming of large in-progress transactions.
609 </para>
610 </sect2>
612 <sect2 id="logicaldecoding-capabilities">
613 <title>Capabilities</title>
615 <para>
616 To decode, format and output changes, output plugins can use most of the
617 backend's normal infrastructure, including calling output functions. Read
618 only access to relations is permitted as long as only relations are
619 accessed that either have been created by <command>initdb</command> in
620 the <literal>pg_catalog</literal> schema, or have been marked as user
621 provided catalog tables using
622 <programlisting>
623 ALTER TABLE user_catalog_table SET (user_catalog_table = true);
624 CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
625 </programlisting>
626 Note that access to user catalog tables or regular system catalog tables
627 in the output plugins has to be done via the <literal>systable_*</literal>
628 scan APIs only. Access via the <literal>heap_*</literal> scan APIs will
629 error out. Additionally, any actions leading to transaction ID assignment
630 are prohibited. That, among others, includes writing to tables, performing
631 DDL changes, and calling <literal>pg_current_xact_id()</literal>.
632 </para>
633 </sect2>
635 <sect2 id="logicaldecoding-output-mode">
636 <title>Output Modes</title>
638 <para>
639 Output plugin callbacks can pass data to the consumer in nearly arbitrary
640 formats. For some use cases, like viewing the changes via SQL, returning
641 data in a data type that can contain arbitrary data (e.g., <type>bytea</type>) is
642 cumbersome. If the output plugin only outputs textual data in the
643 server's encoding, it can declare that by
644 setting <literal>OutputPluginOptions.output_type</literal>
645 to <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal> instead
646 of <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal> in
647 the <link linkend="logicaldecoding-output-plugin-startup">startup
648 callback</link>. In that case, all the data has to be in the server's encoding
649 so that a <type>text</type> datum can contain it. This is checked in assertion-enabled
650 builds.
651 </para>
652 </sect2>
654 <sect2 id="logicaldecoding-output-plugin-callbacks">
655 <title>Output Plugin Callbacks</title>
657 <para>
658 An output plugin gets notified about changes that are happening via
659 various callbacks it needs to provide.
660 </para>
662 <para>
663 Concurrent transactions are decoded in commit order, and only changes
664 belonging to a specific transaction are decoded between
665 the <literal>begin</literal> and <literal>commit</literal>
666 callbacks. Transactions that were rolled back explicitly or implicitly
667 never get
668 decoded. Successful savepoints are
669 folded into the transaction containing them in the order they were
670 executed within that transaction. A transaction that is prepared for
671 a two-phase commit using <command>PREPARE TRANSACTION</command> will
672 also be decoded if the output plugin callbacks needed for decoding
673 them are provided. It is possible that the current prepared transaction
674 which is being decoded is aborted concurrently via a
675 <command>ROLLBACK PREPARED</command> command. In that case, the logical
676 decoding of this transaction will be aborted too. All the changes of such
677 a transaction are skipped once the abort is detected and the
678 <function>prepare_cb</function> callback is invoked. Thus even in case of
679 a concurrent abort, enough information is provided to the output plugin
680 for it to properly deal with <command>ROLLBACK PREPARED</command> once
681 that is decoded.
682 </para>
684 <note>
685 <para>
686 Only transactions that have already safely been flushed to disk will be
687 decoded. That can lead to a <command>COMMIT</command> not immediately being decoded in a
688 directly following <literal>pg_logical_slot_get_changes()</literal>
689 when <varname>synchronous_commit</varname> is set
690 to <literal>off</literal>.
691 </para>
692 </note>
694 <sect3 id="logicaldecoding-output-plugin-startup">
695 <title>Startup Callback</title>
696 <para>
697 The optional <function>startup_cb</function> callback is called whenever
698 a replication slot is created or asked to stream changes, independent
699 of the number of changes that are ready to be put out.
700 <programlisting>
701 typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
702 OutputPluginOptions *options,
703 bool is_init);
704 </programlisting>
705 The <literal>is_init</literal> parameter will be true when the
706 replication slot is being created and false
707 otherwise. <parameter>options</parameter> points to a struct of options
708 that output plugins can set:
709 <programlisting>
710 typedef struct OutputPluginOptions
712 OutputPluginOutputType output_type;
713 bool receive_rewrites;
714 } OutputPluginOptions;
715 </programlisting>
716 <literal>output_type</literal> has to either be set to
717 <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal>
718 or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. See also
719 <xref linkend="logicaldecoding-output-mode"/>.
720 If <literal>receive_rewrites</literal> is true, the output plugin will
721 also be called for changes made by heap rewrites during certain DDL
722 operations. These are of interest to plugins that handle DDL
723 replication, but they require special handling.
724 </para>
726 <para>
727 The startup callback should validate the options present in
728 <literal>ctx-&gt;output_plugin_options</literal>. If the output plugin
729 needs to have a state, it can
730 use <literal>ctx-&gt;output_plugin_private</literal> to store it.
731 </para>
732 </sect3>
734 <sect3 id="logicaldecoding-output-plugin-shutdown">
735 <title>Shutdown Callback</title>
737 <para>
738 The optional <function>shutdown_cb</function> callback is called
739 whenever a formerly active replication slot is not used anymore and can
740 be used to deallocate resources private to the output plugin. The slot
741 isn't necessarily being dropped, streaming is just being stopped.
742 <programlisting>
743 typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
744 </programlisting>
745 </para>
746 </sect3>
748 <sect3 id="logicaldecoding-output-plugin-begin">
749 <title>Transaction Begin Callback</title>
751 <para>
752 The required <function>begin_cb</function> callback is called whenever a
753 start of a committed transaction has been decoded. Aborted transactions
754 and their contents never get decoded.
755 <programlisting>
756 typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
757 ReorderBufferTXN *txn);
758 </programlisting>
759 The <parameter>txn</parameter> parameter contains meta information about
760 the transaction, like the time stamp at which it has been committed and
761 its XID.
762 </para>
763 </sect3>
765 <sect3 id="logicaldecoding-output-plugin-commit">
766 <title>Transaction End Callback</title>
768 <para>
769 The required <function>commit_cb</function> callback is called whenever
770 a transaction commit has been
771 decoded. The <function>change_cb</function> callbacks for all modified
772 rows will have been called before this, if there have been any modified
773 rows.
774 <programlisting>
775 typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
776 ReorderBufferTXN *txn,
777 XLogRecPtr commit_lsn);
778 </programlisting>
779 </para>
780 </sect3>
782 <sect3 id="logicaldecoding-output-plugin-change">
783 <title>Change Callback</title>
785 <para>
786 The required <function>change_cb</function> callback is called for every
787 individual row modification inside a transaction, may it be
788 an <command>INSERT</command>, <command>UPDATE</command>,
789 or <command>DELETE</command>. Even if the original command modified
790 several rows at once the callback will be called individually for each
791 row. The <function>change_cb</function> callback may access system or
792 user catalog tables to aid in the process of outputting the row
793 modification details. In case of decoding a prepared (but yet
794 uncommitted) transaction or decoding of an uncommitted transaction, this
795 change callback might also error out due to simultaneous rollback of
796 this very same transaction. In that case, the logical decoding of this
797 aborted transaction is stopped gracefully.
798 <programlisting>
799 typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
800 ReorderBufferTXN *txn,
801 Relation relation,
802 ReorderBufferChange *change);
803 </programlisting>
804 The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters
805 have the same contents as for the <function>begin_cb</function>
806 and <function>commit_cb</function> callbacks, but additionally the
807 relation descriptor <parameter>relation</parameter> points to the
808 relation the row belongs to and a struct
809 <parameter>change</parameter> describing the row modification are passed
811 </para>
813 <note>
814 <para>
815 Only changes in user defined tables that are not unlogged
816 (see <xref linkend="sql-createtable-unlogged"/>) and not temporary
817 (see <xref linkend="sql-createtable-temporary"/>) can be extracted using
818 logical decoding.
819 </para>
820 </note>
821 </sect3>
823 <sect3 id="logicaldecoding-output-plugin-truncate">
824 <title>Truncate Callback</title>
826 <para>
827 The optional <function>truncate_cb</function> callback is called for a
828 <command>TRUNCATE</command> command.
829 <programlisting>
830 typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
831 ReorderBufferTXN *txn,
832 int nrelations,
833 Relation relations[],
834 ReorderBufferChange *change);
835 </programlisting>
836 The parameters are analogous to the <function>change_cb</function>
837 callback. However, because <command>TRUNCATE</command> actions on
838 tables connected by foreign keys need to be executed together, this
839 callback receives an array of relations instead of just a single one.
840 See the description of the <xref linkend="sql-truncate"/> statement for
841 details.
842 </para>
843 </sect3>
845 <sect3 id="logicaldecoding-output-plugin-filter-origin">
846 <title>Origin Filter Callback</title>
848 <para>
849 The optional <function>filter_by_origin_cb</function> callback
850 is called to determine whether data that has been replayed
851 from <parameter>origin_id</parameter> is of interest to the
852 output plugin.
853 <programlisting>
854 typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
855 RepOriginId origin_id);
856 </programlisting>
857 The <parameter>ctx</parameter> parameter has the same contents
858 as for the other callbacks. No information but the origin is
859 available. To signal that changes originating on the passed in
860 node are irrelevant, return true, causing them to be filtered
861 away; false otherwise. The other callbacks will not be called
862 for transactions and changes that have been filtered away.
863 </para>
864 <para>
865 This is useful when implementing cascading or multidirectional
866 replication solutions. Filtering by the origin allows to
867 prevent replicating the same changes back and forth in such
868 setups. While transactions and changes also carry information
869 about the origin, filtering via this callback is noticeably
870 more efficient.
871 </para>
872 </sect3>
874 <sect3 id="logicaldecoding-output-plugin-message">
875 <title>Generic Message Callback</title>
877 <para>
878 The optional <function>message_cb</function> callback is called whenever
879 a logical decoding message has been decoded.
880 <programlisting>
881 typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
882 ReorderBufferTXN *txn,
883 XLogRecPtr message_lsn,
884 bool transactional,
885 const char *prefix,
886 Size message_size,
887 const char *message);
888 </programlisting>
889 The <parameter>txn</parameter> parameter contains meta information about
890 the transaction, like the time stamp at which it has been committed and
891 its XID. Note however that it can be NULL when the message is
892 non-transactional and the XID was not assigned yet in the transaction
893 which logged the message. The <parameter>lsn</parameter> has WAL
894 location of the message. The <parameter>transactional</parameter> says
895 if the message was sent as transactional or not. Similar to the change
896 callback, in case of decoding a prepared (but yet uncommitted)
897 transaction or decoding of an uncommitted transaction, this message
898 callback might also error out due to simultaneous rollback of
899 this very same transaction. In that case, the logical decoding of this
900 aborted transaction is stopped gracefully.
902 The <parameter>prefix</parameter> is arbitrary null-terminated prefix
903 which can be used for identifying interesting messages for the current
904 plugin. And finally the <parameter>message</parameter> parameter holds
905 the actual message of <parameter>message_size</parameter> size.
906 </para>
907 <para>
908 Extra care should be taken to ensure that the prefix the output plugin
909 considers interesting is unique. Using name of the extension or the
910 output plugin itself is often a good choice.
911 </para>
912 </sect3>
914 <sect3 id="logicaldecoding-output-plugin-filter-prepare">
915 <title>Prepare Filter Callback</title>
917 <para>
918 The optional <function>filter_prepare_cb</function> callback
919 is called to determine whether data that is part of the current
920 two-phase commit transaction should be considered for decoding
921 at this prepare stage or later as a regular one-phase transaction at
922 <command>COMMIT PREPARED</command> time. To signal that
923 decoding should be skipped, return <literal>true</literal>;
924 <literal>false</literal> otherwise. When the callback is not
925 defined, <literal>false</literal> is assumed (i.e. no filtering, all
926 transactions using two-phase commit are decoded in two phases as well).
927 <programlisting>
928 typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
929 TransactionId xid,
930 const char *gid);
931 </programlisting>
932 The <parameter>ctx</parameter> parameter has the same contents as for
933 the other callbacks. The parameters <parameter>xid</parameter>
934 and <parameter>gid</parameter> provide two different ways to identify
935 the transaction. The later <command>COMMIT PREPARED</command> or
936 <command>ROLLBACK PREPARED</command> carries both identifiers,
937 providing an output plugin the choice of what to use.
938 </para>
939 <para>
940 The callback may be invoked multiple times per transaction to decode
941 and must provide the same static answer for a given pair of
942 <parameter>xid</parameter> and <parameter>gid</parameter> every time
943 it is called.
944 </para>
945 </sect3>
947 <sect3 id="logicaldecoding-output-plugin-begin-prepare">
948 <title>Transaction Begin Prepare Callback</title>
950 <para>
951 The required <function>begin_prepare_cb</function> callback is called
952 whenever the start of a prepared transaction has been decoded. The
953 <parameter>gid</parameter> field, which is part of the
954 <parameter>txn</parameter> parameter, can be used in this callback to
955 check if the plugin has already received this <command>PREPARE</command>
956 in which case it can either error out or skip the remaining changes of
957 the transaction.
958 <programlisting>
959 typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
960 ReorderBufferTXN *txn);
961 </programlisting>
962 </para>
963 </sect3>
965 <sect3 id="logicaldecoding-output-plugin-prepare">
966 <title>Transaction Prepare Callback</title>
968 <para>
969 The required <function>prepare_cb</function> callback is called whenever
970 a transaction which is prepared for two-phase commit has been
971 decoded. The <function>change_cb</function> callback for all modified
972 rows will have been called before this, if there have been any modified
973 rows. The <parameter>gid</parameter> field, which is part of the
974 <parameter>txn</parameter> parameter, can be used in this callback.
975 <programlisting>
976 typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
977 ReorderBufferTXN *txn,
978 XLogRecPtr prepare_lsn);
979 </programlisting>
980 </para>
981 </sect3>
983 <sect3 id="logicaldecoding-output-plugin-commit-prepared">
984 <title>Transaction Commit Prepared Callback</title>
986 <para>
987 The required <function>commit_prepared_cb</function> callback is called
988 whenever a transaction <command>COMMIT PREPARED</command> has been decoded.
989 The <parameter>gid</parameter> field, which is part of the
990 <parameter>txn</parameter> parameter, can be used in this callback.
991 <programlisting>
992 typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
993 ReorderBufferTXN *txn,
994 XLogRecPtr commit_lsn);
995 </programlisting>
996 </para>
997 </sect3>
999 <sect3 id="logicaldecoding-output-plugin-rollback-prepared">
1000 <title>Transaction Rollback Prepared Callback</title>
1002 <para>
1003 The required <function>rollback_prepared_cb</function> callback is called
1004 whenever a transaction <command>ROLLBACK PREPARED</command> has been
1005 decoded. The <parameter>gid</parameter> field, which is part of the
1006 <parameter>txn</parameter> parameter, can be used in this callback. The
1007 parameters <parameter>prepare_end_lsn</parameter> and
1008 <parameter>prepare_time</parameter> can be used to check if the plugin
1009 has received this <command>PREPARE TRANSACTION</command> in which case
1010 it can apply the rollback, otherwise, it can skip the rollback operation. The
1011 <parameter>gid</parameter> alone is not sufficient because the downstream
1012 node can have a prepared transaction with same identifier.
1013 <programlisting>
1014 typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
1015 ReorderBufferTXN *txn,
1016 XLogRecPtr prepare_end_lsn,
1017 TimestampTz prepare_time);
1018 </programlisting>
1019 </para>
1020 </sect3>
1022 <sect3 id="logicaldecoding-output-plugin-stream-start">
1023 <title>Stream Start Callback</title>
1024 <para>
1025 The required <function>stream_start_cb</function> callback is called when
1026 opening a block of streamed changes from an in-progress transaction.
1027 <programlisting>
1028 typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
1029 ReorderBufferTXN *txn);
1030 </programlisting>
1031 </para>
1032 </sect3>
1034 <sect3 id="logicaldecoding-output-plugin-stream-stop">
1035 <title>Stream Stop Callback</title>
1036 <para>
1037 The required <function>stream_stop_cb</function> callback is called when
1038 closing a block of streamed changes from an in-progress transaction.
1039 <programlisting>
1040 typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
1041 ReorderBufferTXN *txn);
1042 </programlisting>
1043 </para>
1044 </sect3>
1046 <sect3 id="logicaldecoding-output-plugin-stream-abort">
1047 <title>Stream Abort Callback</title>
1048 <para>
1049 The required <function>stream_abort_cb</function> callback is called to
1050 abort a previously streamed transaction.
1051 <programlisting>
1052 typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
1053 ReorderBufferTXN *txn,
1054 XLogRecPtr abort_lsn);
1055 </programlisting>
1056 </para>
1057 </sect3>
1059 <sect3 id="logicaldecoding-output-plugin-stream-prepare">
1060 <title>Stream Prepare Callback</title>
1061 <para>
1062 The <function>stream_prepare_cb</function> callback is called to prepare
1063 a previously streamed transaction as part of a two-phase commit. This
1064 callback is required when the output plugin supports both the streaming
1065 of large in-progress transactions and two-phase commits.
1066 <programlisting>
1067 typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
1068 ReorderBufferTXN *txn,
1069 XLogRecPtr prepare_lsn);
1070 </programlisting>
1071 </para>
1072 </sect3>
1074 <sect3 id="logicaldecoding-output-plugin-stream-commit">
1075 <title>Stream Commit Callback</title>
1076 <para>
1077 The required <function>stream_commit_cb</function> callback is called to
1078 commit a previously streamed transaction.
1079 <programlisting>
1080 typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
1081 ReorderBufferTXN *txn,
1082 XLogRecPtr commit_lsn);
1083 </programlisting>
1084 </para>
1085 </sect3>
1087 <sect3 id="logicaldecoding-output-plugin-stream-change">
1088 <title>Stream Change Callback</title>
1089 <para>
1090 The required <function>stream_change_cb</function> callback is called
1091 when sending a change in a block of streamed changes (demarcated by
1092 <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
1093 The actual changes are not displayed as the transaction can abort at a later
1094 point in time and we don't decode changes for aborted transactions.
1095 <programlisting>
1096 typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
1097 ReorderBufferTXN *txn,
1098 Relation relation,
1099 ReorderBufferChange *change);
1100 </programlisting>
1101 </para>
1102 </sect3>
1104 <sect3 id="logicaldecoding-output-plugin-stream-message">
1105 <title>Stream Message Callback</title>
1106 <para>
1107 The optional <function>stream_message_cb</function> callback is called when
1108 sending a generic message in a block of streamed changes (demarcated by
1109 <function>stream_start_cb</function> and <function>stream_stop_cb</function> calls).
1110 The message contents for transactional messages are not displayed as the transaction
1111 can abort at a later point in time and we don't decode changes for aborted
1112 transactions.
1113 <programlisting>
1114 typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
1115 ReorderBufferTXN *txn,
1116 XLogRecPtr message_lsn,
1117 bool transactional,
1118 const char *prefix,
1119 Size message_size,
1120 const char *message);
1121 </programlisting>
1122 </para>
1123 </sect3>
1125 <sect3 id="logicaldecoding-output-plugin-stream-truncate">
1126 <title>Stream Truncate Callback</title>
1127 <para>
1128 The optional <function>stream_truncate_cb</function> callback is called
1129 for a <command>TRUNCATE</command> command in a block of streamed changes
1130 (demarcated by <function>stream_start_cb</function> and
1131 <function>stream_stop_cb</function> calls).
1132 <programlisting>
1133 typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
1134 ReorderBufferTXN *txn,
1135 int nrelations,
1136 Relation relations[],
1137 ReorderBufferChange *change);
1138 </programlisting>
1139 The parameters are analogous to the <function>stream_change_cb</function>
1140 callback. However, because <command>TRUNCATE</command> actions on
1141 tables connected by foreign keys need to be executed together, this
1142 callback receives an array of relations instead of just a single one.
1143 See the description of the <xref linkend="sql-truncate"/> statement for
1144 details.
1145 </para>
1146 </sect3>
1148 </sect2>
1150 <sect2 id="logicaldecoding-output-plugin-output">
1151 <title>Functions for Producing Output</title>
1153 <para>
1154 To actually produce output, output plugins can write data to
1155 the <literal>StringInfo</literal> output buffer
1156 in <literal>ctx-&gt;out</literal> when inside
1157 the <function>begin_cb</function>, <function>commit_cb</function>,
1158 or <function>change_cb</function> callbacks. Before writing to the output
1159 buffer, <function>OutputPluginPrepareWrite(ctx, last_write)</function> has
1160 to be called, and after finishing writing to the
1161 buffer, <function>OutputPluginWrite(ctx, last_write)</function> has to be
1162 called to perform the write. The <parameter>last_write</parameter>
1163 indicates whether a particular write was the callback's last write.
1164 </para>
1166 <para>
1167 The following example shows how to output data to the consumer of an
1168 output plugin:
1169 <programlisting>
1170 OutputPluginPrepareWrite(ctx, true);
1171 appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
1172 OutputPluginWrite(ctx, true);
1173 </programlisting>
1174 </para>
1175 </sect2>
1176 </sect1>
1178 <sect1 id="logicaldecoding-writer">
1179 <title>Logical Decoding Output Writers</title>
1181 <para>
1182 It is possible to add more output methods for logical decoding.
1183 For details, see
1184 <filename>src/backend/replication/logical/logicalfuncs.c</filename>.
1185 Essentially, three functions need to be provided: one to read WAL, one to
1186 prepare writing output, and one to write the output
1187 (see <xref linkend="logicaldecoding-output-plugin-output"/>).
1188 </para>
1189 </sect1>
1191 <sect1 id="logicaldecoding-synchronous">
1192 <title>Synchronous Replication Support for Logical Decoding</title>
1193 <sect2 id="logicaldecoding-synchronous-overview">
1194 <title>Overview</title>
1196 <para>
1197 Logical decoding can be used to build
1198 <link linkend="synchronous-replication">synchronous
1199 replication</link> solutions with the same user interface as synchronous
1200 replication for <link linkend="streaming-replication">streaming
1201 replication</link>. To do this, the streaming replication interface
1202 (see <xref linkend="logicaldecoding-walsender"/>) must be used to stream out
1203 data. Clients have to send <literal>Standby status update (F)</literal>
1204 (see <xref linkend="protocol-replication"/>) messages, just like streaming
1205 replication clients do.
1206 </para>
1208 <note>
1209 <para>
1210 A synchronous replica receiving changes via logical decoding will work in
1211 the scope of a single database. Since, in contrast to
1212 that, <parameter>synchronous_standby_names</parameter> currently is
1213 server wide, this means this technique will not work properly if more
1214 than one database is actively used.
1215 </para>
1216 </note>
1217 </sect2>
1219 <sect2 id="logicaldecoding-synchronous-caveats">
1220 <title>Caveats</title>
1222 <para>
1223 In synchronous replication setup, a deadlock can happen, if the transaction
1224 has locked [user] catalog tables exclusively. See
1225 <xref linkend="logicaldecoding-capabilities"/> for information on user
1226 catalog tables. This is because logical decoding of transactions can lock
1227 catalog tables to access them. To avoid this users must refrain from taking
1228 an exclusive lock on [user] catalog tables. This can happen in the following
1229 ways:
1231 <itemizedlist>
1232 <listitem>
1233 <para>
1234 Issuing an explicit <command>LOCK</command> on <structname>pg_class</structname>
1235 in a transaction.
1236 </para>
1237 </listitem>
1239 <listitem>
1240 <para>
1241 Perform <command>CLUSTER</command> on <structname>pg_class</structname> in
1242 a transaction.
1243 </para>
1244 </listitem>
1246 <listitem>
1247 <para>
1248 <command>PREPARE TRANSACTION</command> after <command>LOCK</command> command
1249 on <structname>pg_class</structname> and allow logical decoding of two-phase
1250 transactions.
1251 </para>
1252 </listitem>
1254 <listitem>
1255 <para>
1256 <command>PREPARE TRANSACTION</command> after <command>CLUSTER</command>
1257 command on <structname>pg_trigger</structname> and allow logical decoding of
1258 two-phase transactions. This will lead to deadlock only when published table
1259 have a trigger.
1260 </para>
1261 </listitem>
1263 <listitem>
1264 <para>
1265 Executing <command>TRUNCATE</command> on [user] catalog table in a
1266 transaction.
1267 </para>
1268 </listitem>
1269 </itemizedlist>
1271 Note that these commands can cause deadlocks not only for the system
1272 catalog tables listed above but for other catalog tables.
1273 </para>
1274 </sect2>
1275 </sect1>
1277 <sect1 id="logicaldecoding-streaming">
1278 <title>Streaming of Large Transactions for Logical Decoding</title>
1280 <para>
1281 The basic output plugin callbacks (e.g., <function>begin_cb</function>,
1282 <function>change_cb</function>, <function>commit_cb</function> and
1283 <function>message_cb</function>) are only invoked when the transaction
1284 actually commits. The changes are still decoded from the transaction
1285 log, but are only passed to the output plugin at commit (and discarded
1286 if the transaction aborts).
1287 </para>
1289 <para>
1290 This means that while the decoding happens incrementally, and may spill
1291 to disk to keep memory usage under control, all the decoded changes have
1292 to be transmitted when the transaction finally commits (or more precisely,
1293 when the commit is decoded from the transaction log). Depending on the
1294 size of the transaction and network bandwidth, the transfer time may
1295 significantly increase the apply lag.
1296 </para>
1298 <para>
1299 To reduce the apply lag caused by large transactions, an output plugin
1300 may provide additional callback to support incremental streaming of
1301 in-progress transactions. There are multiple required streaming callbacks
1302 (<function>stream_start_cb</function>, <function>stream_stop_cb</function>,
1303 <function>stream_abort_cb</function>, <function>stream_commit_cb</function>
1304 and <function>stream_change_cb</function>) and two optional callbacks
1305 (<function>stream_message_cb</function> and <function>stream_truncate_cb</function>).
1306 Also, if streaming of two-phase commands is to be supported, then additional
1307 callbacks must be provided. (See <xref linkend="logicaldecoding-two-phase-commits"/>
1308 for details).
1309 </para>
1311 <para>
1312 When streaming an in-progress transaction, the changes (and messages) are
1313 streamed in blocks demarcated by <function>stream_start_cb</function>
1314 and <function>stream_stop_cb</function> callbacks. Once all the decoded
1315 changes are transmitted, the transaction can be committed using the
1316 <function>stream_commit_cb</function> callback
1317 (or possibly aborted using the <function>stream_abort_cb</function> callback).
1318 If two-phase commits are supported, the transaction can be prepared using the
1319 <function>stream_prepare_cb</function> callback,
1320 <command>COMMIT PREPARED</command> using the
1321 <function>commit_prepared_cb</function> callback or aborted using the
1322 <function>rollback_prepared_cb</function>.
1323 </para>
1325 <para>
1326 One example sequence of streaming callback calls for one transaction may
1327 look like this:
1328 <programlisting>
1329 stream_start_cb(...); &lt;-- start of first block of changes
1330 stream_change_cb(...);
1331 stream_change_cb(...);
1332 stream_message_cb(...);
1333 stream_change_cb(...);
1335 stream_change_cb(...);
1336 stream_stop_cb(...); &lt;-- end of first block of changes
1338 stream_start_cb(...); &lt;-- start of second block of changes
1339 stream_change_cb(...);
1340 stream_change_cb(...);
1341 stream_change_cb(...);
1343 stream_message_cb(...);
1344 stream_change_cb(...);
1345 stream_stop_cb(...); &lt;-- end of second block of changes
1348 [a. when using normal commit]
1349 stream_commit_cb(...); &lt;-- commit of the streamed transaction
1351 [b. when using two-phase commit]
1352 stream_prepare_cb(...); &lt;-- prepare the streamed transaction
1353 commit_prepared_cb(...); &lt;-- commit of the prepared transaction
1354 </programlisting>
1355 </para>
1357 <para>
1358 The actual sequence of callback calls may be more complicated, of course.
1359 There may be blocks for multiple streamed transactions, some of the
1360 transactions may get aborted, etc.
1361 </para>
1363 <para>
1364 Similar to spill-to-disk behavior, streaming is triggered when the total
1365 amount of changes decoded from the WAL (for all in-progress transactions)
1366 exceeds the limit defined by <varname>logical_decoding_work_mem</varname> setting.
1367 At that point, the largest top-level transaction (measured by the amount of memory
1368 currently used for decoded changes) is selected and streamed. However, in
1369 some cases we still have to spill to disk even if streaming is enabled
1370 because we exceed the memory threshold but still have not decoded the
1371 complete tuple e.g., only decoded toast table insert but not the main table
1372 insert.
1373 </para>
1375 <para>
1376 Even when streaming large transactions, the changes are still applied in
1377 commit order, preserving the same guarantees as the non-streaming mode.
1378 </para>
1380 </sect1>
1382 <sect1 id="logicaldecoding-two-phase-commits">
1383 <title>Two-phase Commit Support for Logical Decoding</title>
1385 <para>
1386 With the basic output plugin callbacks (eg., <function>begin_cb</function>,
1387 <function>change_cb</function>, <function>commit_cb</function> and
1388 <function>message_cb</function>) two-phase commit commands like
1389 <command>PREPARE TRANSACTION</command>, <command>COMMIT PREPARED</command>
1390 and <command>ROLLBACK PREPARED</command> are not decoded. While the
1391 <command>PREPARE TRANSACTION</command> is ignored,
1392 <command>COMMIT PREPARED</command> is decoded as a <command>COMMIT</command>
1393 and <command>ROLLBACK PREPARED</command> is decoded as a
1394 <command>ROLLBACK</command>.
1395 </para>
1397 <para>
1398 To support the streaming of two-phase commands, an output plugin needs to
1399 provide additional callbacks. There are multiple two-phase commit callbacks
1400 that are required, (<function>begin_prepare_cb</function>,
1401 <function>prepare_cb</function>, <function>commit_prepared_cb</function>,
1402 <function>rollback_prepared_cb</function> and
1403 <function>stream_prepare_cb</function>) and an optional callback
1404 (<function>filter_prepare_cb</function>).
1405 </para>
1407 <para>
1408 If the output plugin callbacks for decoding two-phase commit commands are
1409 provided, then on <command>PREPARE TRANSACTION</command>, the changes of
1410 that transaction are decoded, passed to the output plugin, and the
1411 <function>prepare_cb</function> callback is invoked. This differs from the
1412 basic decoding setup where changes are only passed to the output plugin
1413 when a transaction is committed. The start of a prepared transaction is
1414 indicated by the <function>begin_prepare_cb</function> callback.
1415 </para>
1417 <para>
1418 When a prepared transaction is rolled back using the
1419 <command>ROLLBACK PREPARED</command>, then the
1420 <function>rollback_prepared_cb</function> callback is invoked and when the
1421 prepared transaction is committed using <command>COMMIT PREPARED</command>,
1422 then the <function>commit_prepared_cb</function> callback is invoked.
1423 </para>
1425 <para>
1426 Optionally the output plugin can define filtering rules via
1427 <function>filter_prepare_cb</function> to decode only specific transaction
1428 in two phases. This can be achieved by pattern matching on the
1429 <parameter>gid</parameter> or via lookups using the
1430 <parameter>xid</parameter>.
1431 </para>
1433 <para>
1434 The users that want to decode prepared transactions need to be careful about
1435 below mentioned points:
1437 <itemizedlist>
1438 <listitem>
1439 <para>
1440 If the prepared transaction has locked [user] catalog tables exclusively
1441 then decoding prepare can block till the main transaction is committed.
1442 </para>
1443 </listitem>
1445 <listitem>
1446 <para>
1447 The logical replication solution that builds distributed two phase commit
1448 using this feature can deadlock if the prepared transaction has locked
1449 [user] catalog tables exclusively. To avoid this users must refrain from
1450 having locks on catalog tables (e.g. explicit <command>LOCK</command> command)
1451 in such transactions.
1452 See <xref linkend="logicaldecoding-synchronous-caveats"/> for the details.
1453 </para>
1454 </listitem>
1455 </itemizedlist>
1456 </para>
1458 </sect1>
1459 </chapter>