require_auth: prepare for multiple SASL mechanisms
[pgsql.git] / src / include / replication / worker_internal.h
blob30b2775952c38e36540875dc2a3f2d89dd33939c
1 /*-------------------------------------------------------------------------
3 * worker_internal.h
4 * Internal headers shared by logical replication workers.
6 * Portions Copyright (c) 2016-2025, PostgreSQL Global Development Group
8 * src/include/replication/worker_internal.h
10 *-------------------------------------------------------------------------
12 #ifndef WORKER_INTERNAL_H
13 #define WORKER_INTERNAL_H
15 #include "access/xlogdefs.h"
16 #include "catalog/pg_subscription.h"
17 #include "datatype/timestamp.h"
18 #include "miscadmin.h"
19 #include "replication/logicalrelation.h"
20 #include "replication/walreceiver.h"
21 #include "storage/buffile.h"
22 #include "storage/fileset.h"
23 #include "storage/lock.h"
24 #include "storage/shm_mq.h"
25 #include "storage/shm_toc.h"
26 #include "storage/spin.h"
28 /* Different types of worker */
29 typedef enum LogicalRepWorkerType
31 WORKERTYPE_UNKNOWN = 0,
32 WORKERTYPE_TABLESYNC,
33 WORKERTYPE_APPLY,
34 WORKERTYPE_PARALLEL_APPLY,
35 } LogicalRepWorkerType;
37 typedef struct LogicalRepWorker
39 /* What type of worker is this? */
40 LogicalRepWorkerType type;
42 /* Time at which this worker was launched. */
43 TimestampTz launch_time;
45 /* Indicates if this slot is used or free. */
46 bool in_use;
48 /* Increased every time the slot is taken by new worker. */
49 uint16 generation;
51 /* Pointer to proc array. NULL if not running. */
52 PGPROC *proc;
54 /* Database id to connect to. */
55 Oid dbid;
57 /* User to use for connection (will be same as owner of subscription). */
58 Oid userid;
60 /* Subscription id for the worker. */
61 Oid subid;
63 /* Used for initial table synchronization. */
64 Oid relid;
65 char relstate;
66 XLogRecPtr relstate_lsn;
67 slock_t relmutex;
70 * Used to create the changes and subxact files for the streaming
71 * transactions. Upon the arrival of the first streaming transaction or
72 * when the first-time leader apply worker times out while sending changes
73 * to the parallel apply worker, the fileset will be initialized, and it
74 * will be deleted when the worker exits. Under this, separate buffiles
75 * would be created for each transaction which will be deleted after the
76 * transaction is finished.
78 FileSet *stream_fileset;
81 * PID of leader apply worker if this slot is used for a parallel apply
82 * worker, InvalidPid otherwise.
84 pid_t leader_pid;
86 /* Indicates whether apply can be performed in parallel. */
87 bool parallel_apply;
89 /* Stats. */
90 XLogRecPtr last_lsn;
91 TimestampTz last_send_time;
92 TimestampTz last_recv_time;
93 XLogRecPtr reply_lsn;
94 TimestampTz reply_time;
95 } LogicalRepWorker;
98 * State of the transaction in parallel apply worker.
100 * The enum values must have the same order as the transaction state
101 * transitions.
103 typedef enum ParallelTransState
105 PARALLEL_TRANS_UNKNOWN,
106 PARALLEL_TRANS_STARTED,
107 PARALLEL_TRANS_FINISHED,
108 } ParallelTransState;
111 * State of fileset used to communicate changes from leader to parallel
112 * apply worker.
114 * FS_EMPTY indicates an initial state where the leader doesn't need to use
115 * the file to communicate with the parallel apply worker.
117 * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
118 * to the file.
120 * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
121 * the file.
123 * FS_READY indicates that it is now ok for a parallel apply worker to
124 * read the file.
126 typedef enum PartialFileSetState
128 FS_EMPTY,
129 FS_SERIALIZE_IN_PROGRESS,
130 FS_SERIALIZE_DONE,
131 FS_READY,
132 } PartialFileSetState;
135 * Struct for sharing information between leader apply worker and parallel
136 * apply workers.
138 typedef struct ParallelApplyWorkerShared
140 slock_t mutex;
142 TransactionId xid;
145 * State used to ensure commit ordering.
147 * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
148 * handling the transaction finish commands while the apply leader will
149 * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
150 * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
151 * STREAM_ABORT).
153 ParallelTransState xact_state;
155 /* Information from the corresponding LogicalRepWorker slot. */
156 uint16 logicalrep_worker_generation;
157 int logicalrep_worker_slot_no;
160 * Indicates whether there are pending streaming blocks in the queue. The
161 * parallel apply worker will check it before starting to wait.
163 pg_atomic_uint32 pending_stream_count;
166 * XactLastCommitEnd from the parallel apply worker. This is required by
167 * the leader worker so it can update the lsn_mappings.
169 XLogRecPtr last_commit_end;
172 * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
173 * serialize changes to the file, and share the fileset with the parallel
174 * apply worker when processing the transaction finish command. Then the
175 * parallel apply worker will apply all the spooled messages.
177 * FileSet is used here instead of SharedFileSet because we need it to
178 * survive after releasing the shared memory so that the leader apply
179 * worker can re-use the same fileset for the next streaming transaction.
181 PartialFileSetState fileset_state;
182 FileSet fileset;
183 } ParallelApplyWorkerShared;
186 * Information which is used to manage the parallel apply worker.
188 typedef struct ParallelApplyWorkerInfo
191 * This queue is used to send changes from the leader apply worker to the
192 * parallel apply worker.
194 shm_mq_handle *mq_handle;
197 * This queue is used to transfer error messages from the parallel apply
198 * worker to the leader apply worker.
200 shm_mq_handle *error_mq_handle;
202 dsm_segment *dsm_seg;
205 * Indicates whether the leader apply worker needs to serialize the
206 * remaining changes to a file due to timeout when attempting to send data
207 * to the parallel apply worker via shared memory.
209 bool serialize_changes;
212 * True if the worker is being used to process a parallel apply
213 * transaction. False indicates this worker is available for re-use.
215 bool in_use;
217 ParallelApplyWorkerShared *shared;
218 } ParallelApplyWorkerInfo;
220 /* Main memory context for apply worker. Permanent during worker lifetime. */
221 extern PGDLLIMPORT MemoryContext ApplyContext;
223 extern PGDLLIMPORT MemoryContext ApplyMessageContext;
225 extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
227 extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
229 /* libpqreceiver connection */
230 extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
232 /* Worker and subscription objects. */
233 extern PGDLLIMPORT Subscription *MySubscription;
234 extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
236 extern PGDLLIMPORT bool in_remote_transaction;
238 extern PGDLLIMPORT bool InitializingApplyWorker;
240 extern void logicalrep_worker_attach(int slot);
241 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
242 bool only_running);
243 extern List *logicalrep_workers_find(Oid subid, bool only_running,
244 bool acquire_lock);
245 extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
246 Oid dbid, Oid subid, const char *subname,
247 Oid userid, Oid relid,
248 dsm_handle subworker_dsm);
249 extern void logicalrep_worker_stop(Oid subid, Oid relid);
250 extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
251 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
252 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
254 extern int logicalrep_sync_worker_count(Oid subid);
256 extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
257 char *originname, Size szoriginname);
259 extern bool AllTablesyncsReady(void);
260 extern void UpdateTwoPhaseState(Oid suboid, char new_state);
262 extern void process_syncing_tables(XLogRecPtr current_lsn);
263 extern void invalidate_syncing_table_states(Datum arg, int cacheid,
264 uint32 hashvalue);
266 extern void stream_start_internal(TransactionId xid, bool first_segment);
267 extern void stream_stop_internal(TransactionId xid);
269 /* Common streaming function to apply all the spooled messages */
270 extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
271 XLogRecPtr lsn);
273 extern void apply_dispatch(StringInfo s);
275 extern void maybe_reread_subscription(void);
277 extern void stream_cleanup_files(Oid subid, TransactionId xid);
279 extern void set_stream_options(WalRcvStreamOptions *options,
280 char *slotname,
281 XLogRecPtr *origin_startpos);
283 extern void start_apply(XLogRecPtr origin_startpos);
285 extern void InitializeLogRepWorker(void);
287 extern void SetupApplyOrSyncWorker(int worker_slot);
289 extern void DisableSubscriptionAndExit(void);
291 extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
293 /* Function for apply error callback */
294 extern void apply_error_callback(void *arg);
295 extern void set_apply_error_context_origin(char *originname);
297 /* Parallel apply worker setup and interactions */
298 extern void pa_allocate_worker(TransactionId xid);
299 extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
300 extern void pa_detach_all_error_mq(void);
302 extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
303 const void *data);
304 extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
305 bool stream_locked);
307 extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
308 ParallelTransState xact_state);
309 extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
311 extern void pa_start_subtrans(TransactionId current_xid,
312 TransactionId top_xid);
313 extern void pa_reset_subtrans(void);
314 extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
315 extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
316 PartialFileSetState fileset_state);
318 extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
319 extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
321 extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
322 extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
324 extern void pa_decr_and_wait_stream_block(void);
326 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
327 XLogRecPtr remote_lsn);
329 #define isParallelApplyWorker(worker) ((worker)->in_use && \
330 (worker)->type == WORKERTYPE_PARALLEL_APPLY)
331 #define isTablesyncWorker(worker) ((worker)->in_use && \
332 (worker)->type == WORKERTYPE_TABLESYNC)
334 static inline bool
335 am_tablesync_worker(void)
337 return isTablesyncWorker(MyLogicalRepWorker);
340 static inline bool
341 am_leader_apply_worker(void)
343 Assert(MyLogicalRepWorker->in_use);
344 return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
347 static inline bool
348 am_parallel_apply_worker(void)
350 Assert(MyLogicalRepWorker->in_use);
351 return isParallelApplyWorker(MyLogicalRepWorker);
354 #endif /* WORKER_INTERNAL_H */