1 /*-------------------------------------------------------------------------
3 * PostgreSQL Logical Decode Plugin Interface
5 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 *-------------------------------------------------------------------------
9 #ifndef OUTPUT_PLUGIN_H
10 #define OUTPUT_PLUGIN_H
12 #include "replication/reorderbuffer.h"
14 struct LogicalDecodingContext
;
15 struct OutputPluginCallbacks
;
17 typedef enum OutputPluginOutputType
19 OUTPUT_PLUGIN_BINARY_OUTPUT
,
20 OUTPUT_PLUGIN_TEXTUAL_OUTPUT
,
21 } OutputPluginOutputType
;
24 * Options set by the output plugin, in the startup callback.
26 typedef struct OutputPluginOptions
28 OutputPluginOutputType output_type
;
29 bool receive_rewrites
;
30 } OutputPluginOptions
;
33 * Type of the shared library symbol _PG_output_plugin_init that is looked up
34 * when loading an output plugin shared library.
36 typedef void (*LogicalOutputPluginInit
) (struct OutputPluginCallbacks
*cb
);
38 extern PGDLLEXPORT
void _PG_output_plugin_init(struct OutputPluginCallbacks
*cb
);
41 * Callback that gets called in a user-defined plugin. ctx->private_data can
42 * be set to some private data.
44 * "is_init" will be set to "true" if the decoding slot just got defined. When
45 * the same slot is used from there one, it will be "false".
47 typedef void (*LogicalDecodeStartupCB
) (struct LogicalDecodingContext
*ctx
,
48 OutputPluginOptions
*options
,
52 * Callback called for every (explicit or implicit) BEGIN of a successful
55 typedef void (*LogicalDecodeBeginCB
) (struct LogicalDecodingContext
*ctx
,
56 ReorderBufferTXN
*txn
);
59 * Callback for every individual change in a successful transaction.
61 typedef void (*LogicalDecodeChangeCB
) (struct LogicalDecodingContext
*ctx
,
62 ReorderBufferTXN
*txn
,
64 ReorderBufferChange
*change
);
67 * Callback for every TRUNCATE in a successful transaction.
69 typedef void (*LogicalDecodeTruncateCB
) (struct LogicalDecodingContext
*ctx
,
70 ReorderBufferTXN
*txn
,
73 ReorderBufferChange
*change
);
76 * Called for every (explicit or implicit) COMMIT of a successful transaction.
78 typedef void (*LogicalDecodeCommitCB
) (struct LogicalDecodingContext
*ctx
,
79 ReorderBufferTXN
*txn
,
80 XLogRecPtr commit_lsn
);
83 * Called for the generic logical decoding messages.
85 typedef void (*LogicalDecodeMessageCB
) (struct LogicalDecodingContext
*ctx
,
86 ReorderBufferTXN
*txn
,
87 XLogRecPtr message_lsn
,
94 * Filter changes by origin.
96 typedef bool (*LogicalDecodeFilterByOriginCB
) (struct LogicalDecodingContext
*ctx
,
97 RepOriginId origin_id
);
100 * Called to shutdown an output plugin.
102 typedef void (*LogicalDecodeShutdownCB
) (struct LogicalDecodingContext
*ctx
);
105 * Called before decoding of PREPARE record to decide whether this
106 * transaction should be decoded with separate calls to prepare and
107 * commit_prepared/rollback_prepared callbacks or wait till COMMIT PREPARED
108 * and sent as usual transaction.
110 typedef bool (*LogicalDecodeFilterPrepareCB
) (struct LogicalDecodingContext
*ctx
,
115 * Callback called for every BEGIN of a prepared transaction.
117 typedef void (*LogicalDecodeBeginPrepareCB
) (struct LogicalDecodingContext
*ctx
,
118 ReorderBufferTXN
*txn
);
121 * Called for PREPARE record unless it was filtered by filter_prepare()
124 typedef void (*LogicalDecodePrepareCB
) (struct LogicalDecodingContext
*ctx
,
125 ReorderBufferTXN
*txn
,
126 XLogRecPtr prepare_lsn
);
129 * Called for COMMIT PREPARED.
131 typedef void (*LogicalDecodeCommitPreparedCB
) (struct LogicalDecodingContext
*ctx
,
132 ReorderBufferTXN
*txn
,
133 XLogRecPtr commit_lsn
);
136 * Called for ROLLBACK PREPARED.
138 typedef void (*LogicalDecodeRollbackPreparedCB
) (struct LogicalDecodingContext
*ctx
,
139 ReorderBufferTXN
*txn
,
140 XLogRecPtr prepare_end_lsn
,
141 TimestampTz prepare_time
);
145 * Called when starting to stream a block of changes from in-progress
146 * transaction (may be called repeatedly, if it's streamed in multiple
149 typedef void (*LogicalDecodeStreamStartCB
) (struct LogicalDecodingContext
*ctx
,
150 ReorderBufferTXN
*txn
);
153 * Called when stopping to stream a block of changes from in-progress
154 * transaction to a remote node (may be called repeatedly, if it's streamed
155 * in multiple chunks).
157 typedef void (*LogicalDecodeStreamStopCB
) (struct LogicalDecodingContext
*ctx
,
158 ReorderBufferTXN
*txn
);
161 * Called to discard changes streamed to remote node from in-progress
164 typedef void (*LogicalDecodeStreamAbortCB
) (struct LogicalDecodingContext
*ctx
,
165 ReorderBufferTXN
*txn
,
166 XLogRecPtr abort_lsn
);
169 * Called to prepare changes streamed to remote node from in-progress
170 * transaction. This is called as part of a two-phase commit.
172 typedef void (*LogicalDecodeStreamPrepareCB
) (struct LogicalDecodingContext
*ctx
,
173 ReorderBufferTXN
*txn
,
174 XLogRecPtr prepare_lsn
);
177 * Called to apply changes streamed to remote node from in-progress
180 typedef void (*LogicalDecodeStreamCommitCB
) (struct LogicalDecodingContext
*ctx
,
181 ReorderBufferTXN
*txn
,
182 XLogRecPtr commit_lsn
);
185 * Callback for streaming individual changes from in-progress transactions.
187 typedef void (*LogicalDecodeStreamChangeCB
) (struct LogicalDecodingContext
*ctx
,
188 ReorderBufferTXN
*txn
,
190 ReorderBufferChange
*change
);
193 * Callback for streaming generic logical decoding messages from in-progress
196 typedef void (*LogicalDecodeStreamMessageCB
) (struct LogicalDecodingContext
*ctx
,
197 ReorderBufferTXN
*txn
,
198 XLogRecPtr message_lsn
,
202 const char *message
);
205 * Callback for streaming truncates from in-progress transactions.
207 typedef void (*LogicalDecodeStreamTruncateCB
) (struct LogicalDecodingContext
*ctx
,
208 ReorderBufferTXN
*txn
,
210 Relation relations
[],
211 ReorderBufferChange
*change
);
214 * Output plugin callbacks
216 typedef struct OutputPluginCallbacks
218 LogicalDecodeStartupCB startup_cb
;
219 LogicalDecodeBeginCB begin_cb
;
220 LogicalDecodeChangeCB change_cb
;
221 LogicalDecodeTruncateCB truncate_cb
;
222 LogicalDecodeCommitCB commit_cb
;
223 LogicalDecodeMessageCB message_cb
;
224 LogicalDecodeFilterByOriginCB filter_by_origin_cb
;
225 LogicalDecodeShutdownCB shutdown_cb
;
227 /* streaming of changes at prepare time */
228 LogicalDecodeFilterPrepareCB filter_prepare_cb
;
229 LogicalDecodeBeginPrepareCB begin_prepare_cb
;
230 LogicalDecodePrepareCB prepare_cb
;
231 LogicalDecodeCommitPreparedCB commit_prepared_cb
;
232 LogicalDecodeRollbackPreparedCB rollback_prepared_cb
;
234 /* streaming of changes */
235 LogicalDecodeStreamStartCB stream_start_cb
;
236 LogicalDecodeStreamStopCB stream_stop_cb
;
237 LogicalDecodeStreamAbortCB stream_abort_cb
;
238 LogicalDecodeStreamPrepareCB stream_prepare_cb
;
239 LogicalDecodeStreamCommitCB stream_commit_cb
;
240 LogicalDecodeStreamChangeCB stream_change_cb
;
241 LogicalDecodeStreamMessageCB stream_message_cb
;
242 LogicalDecodeStreamTruncateCB stream_truncate_cb
;
243 } OutputPluginCallbacks
;
245 /* Functions in replication/logical/logical.c */
246 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext
*ctx
, bool last_write
);
247 extern void OutputPluginWrite(struct LogicalDecodingContext
*ctx
, bool last_write
);
248 extern void OutputPluginUpdateProgress(struct LogicalDecodingContext
*ctx
, bool skipped_xact
);
250 #endif /* OUTPUT_PLUGIN_H */