1 /*-------------------------------------------------------------------------
2 * applyparallelworker.c
3 * Support routines for applying xact by parallel apply worker
5 * Copyright (c) 2023-2024, PostgreSQL Global Development Group
8 * src/backend/replication/logical/applyparallelworker.c
10 * This file contains the code to launch, set up, and teardown a parallel apply
11 * worker which receives the changes from the leader worker and invokes routines
12 * to apply those on the subscriber database. Additionally, this file contains
13 * routines that are intended to support setting up, using, and tearing down a
14 * ParallelApplyWorkerInfo which is required so the leader worker and parallel
15 * apply workers can communicate with each other.
17 * The parallel apply workers are assigned (if available) as soon as xact's
18 * first stream is received for subscriptions that have set their 'streaming'
19 * option as parallel. The leader apply worker will send changes to this new
20 * worker via shared memory. We keep this worker assigned till the transaction
21 * commit is received and also wait for the worker to finish at commit. This
22 * preserves commit ordering and avoid file I/O in most cases, although we
23 * still need to spill to a file if there is no worker available. See comments
24 * atop logical/worker to know more about streamed xacts whose changes are
25 * spilled to disk. It is important to maintain commit order to avoid failures
26 * due to: (a) transaction dependencies - say if we insert a row in the first
27 * transaction and update it in the second transaction on publisher then
28 * allowing the subscriber to apply both in parallel can lead to failure in the
29 * update; (b) deadlocks - allowing transactions that update the same set of
30 * rows/tables in the opposite order to be applied in parallel can lead to
33 * A worker pool is used to avoid restarting workers for each streaming
34 * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
35 * in the ParallelApplyWorkerPool. After successfully launching a new worker,
36 * its information is added to the ParallelApplyWorkerPool. Once the worker
37 * finishes applying the transaction, it is marked as available for re-use.
38 * Now, before starting a new worker to apply the streaming transaction, we
39 * check the list for any available worker. Note that we retain a maximum of
40 * half the max_parallel_apply_workers_per_subscription workers in the pool and
41 * after that, we simply exit the worker after applying the transaction.
43 * XXX This worker pool threshold is arbitrary and we can provide a GUC
44 * variable for this in the future if required.
46 * The leader apply worker will create a separate dynamic shared memory segment
47 * when each parallel apply worker starts. The reason for this design is that
48 * we cannot predict how many workers will be needed. It may be possible to
49 * allocate enough shared memory in one segment based on the maximum number of
50 * parallel apply workers (max_parallel_apply_workers_per_subscription), but
51 * this would waste memory if no process is actually started.
53 * The dynamic shared memory segment contains: (a) a shm_mq that is used to
54 * send changes in the transaction from leader apply worker to parallel apply
55 * worker; (b) another shm_mq that is used to send errors (and other messages
56 * reported via elog/ereport) from the parallel apply worker to leader apply
57 * worker; (c) necessary information to be shared among parallel apply workers
58 * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
60 * Locking Considerations
61 * ----------------------
62 * We have a risk of deadlock due to concurrently applying the transactions in
63 * parallel mode that were independent on the publisher side but became
64 * dependent on the subscriber side due to the different database structures
65 * (like schema of subscription tables, constraints, etc.) on each side. This
66 * can happen even without parallel mode when there are concurrent operations
67 * on the subscriber. In order to detect the deadlocks among leader (LA) and
68 * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
69 * next stream (set of changes) and LA waits for PA to finish the transaction.
70 * An alternative approach could be to not allow parallelism when the schema of
71 * tables is different between the publisher and subscriber but that would be
72 * too restrictive and would require the publisher to send much more
73 * information than it is currently sending.
75 * Consider a case where the subscribed table does not have a unique key on the
76 * publisher and has a unique key on the subscriber. The deadlock can happen in
79 * 1) Deadlock between the leader apply worker and a parallel apply worker
81 * Consider that the parallel apply worker (PA) is executing TX-1 and the
82 * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
83 * Now, LA is waiting for PA because of the unique key constraint of the
84 * subscribed table while PA is waiting for LA to send the next stream of
85 * changes or transaction finish command message.
87 * In order for lmgr to detect this, we have LA acquire a session lock on the
88 * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
89 * trying to receive the next stream of changes. Specifically, LA will acquire
90 * the lock in AccessExclusive mode before sending the STREAM_STOP and will
91 * release it if already acquired after sending the STREAM_START, STREAM_ABORT
92 * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
93 * acquire the lock in AccessShare mode after processing STREAM_STOP and
94 * STREAM_ABORT (for subtransaction) and then release the lock immediately
97 * The lock graph for the above example will look as follows:
98 * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
99 * acquire the stream lock) -> LA
101 * This way, when PA is waiting for LA for the next stream of changes, we can
102 * have a wait-edge from PA to LA in lmgr, which will make us detect the
103 * deadlock between LA and PA.
105 * 2) Deadlock between the leader apply worker and parallel apply workers
107 * This scenario is similar to the first case but TX-1 and TX-2 are executed by
108 * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
109 * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
110 * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
111 * transaction in order to preserve the commit order. There is a deadlock among
112 * the three processes.
114 * In order for lmgr to detect this, we have PA acquire a session lock (this is
115 * a different lock than referred in the previous case, see
116 * pa_lock_transaction()) on the transaction being applied and have LA wait on
117 * the lock before proceeding in the transaction finish commands. Specifically,
118 * PA will acquire this lock in AccessExclusive mode before executing the first
119 * message of the transaction and release it at the xact end. LA will acquire
120 * this lock in AccessShare mode at transaction finish commands and release it
123 * The lock graph for the above example will look as follows:
124 * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
125 * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
128 * This way when LA is waiting to finish the transaction end command to preserve
129 * the commit order, we will be able to detect deadlock, if any.
131 * One might think we can use XactLockTableWait(), but XactLockTableWait()
132 * considers PREPARED TRANSACTION as still in progress which means the lock
133 * won't be released even after the parallel apply worker has prepared the
136 * 3) Deadlock when the shm_mq buffer is full
138 * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
139 * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
140 * wait to send messages, and this wait doesn't appear in lmgr.
142 * To avoid this wait, we use a non-blocking write and wait with a timeout. If
143 * the timeout is exceeded, the LA will serialize all the pending messages to
144 * a file and indicate PA-2 that it needs to read that file for the remaining
145 * messages. Then LA will start waiting for commit as in the previous case
146 * which will detect deadlock if any. See pa_send_data() and
147 * enum TransApplyAction.
151 * Both the stream lock and the transaction lock mentioned above are
152 * session-level locks because both locks could be acquired outside the
153 * transaction, and the stream lock in the leader needs to persist across
154 * transaction boundaries i.e. until the end of the streaming transaction.
155 *-------------------------------------------------------------------------
158 #include "postgres.h"
160 #include "libpq/pqformat.h"
161 #include "libpq/pqmq.h"
163 #include "postmaster/interrupt.h"
164 #include "replication/logicallauncher.h"
165 #include "replication/logicalworker.h"
166 #include "replication/origin.h"
167 #include "replication/worker_internal.h"
168 #include "storage/ipc.h"
169 #include "storage/lmgr.h"
170 #include "tcop/tcopprot.h"
171 #include "utils/inval.h"
172 #include "utils/memutils.h"
173 #include "utils/syscache.h"
175 #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
178 * DSM keys for parallel apply worker. Unlike other parallel execution code,
179 * since we don't need to worry about DSM keys conflicting with plan_node_id we
180 * can use small integers.
182 #define PARALLEL_APPLY_KEY_SHARED 1
183 #define PARALLEL_APPLY_KEY_MQ 2
184 #define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
186 /* Queue size of DSM, 16 MB for now. */
187 #define DSM_QUEUE_SIZE (16 * 1024 * 1024)
190 * Error queue size of DSM. It is desirable to make it large enough that a
191 * typical ErrorResponse can be sent without blocking. That way, a worker that
192 * errors out can write the whole message into the queue and terminate without
193 * waiting for the user backend.
195 #define DSM_ERROR_QUEUE_SIZE (16 * 1024)
198 * There are three fields in each message received by the parallel apply
199 * worker: start_lsn, end_lsn and send_time. Because we have updated these
200 * statistics in the leader apply worker, we can ignore these fields in the
201 * parallel apply worker (see function LogicalRepApplyLoop).
203 #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
206 * The type of session-level lock on a transaction being applied on a logical
207 * replication subscriber.
209 #define PARALLEL_APPLY_LOCK_STREAM 0
210 #define PARALLEL_APPLY_LOCK_XACT 1
213 * Hash table entry to map xid to the parallel apply worker state.
215 typedef struct ParallelApplyWorkerEntry
217 TransactionId xid
; /* Hash key -- must be first */
218 ParallelApplyWorkerInfo
*winfo
;
219 } ParallelApplyWorkerEntry
;
222 * A hash table used to cache the state of streaming transactions being applied
223 * by the parallel apply workers.
225 static HTAB
*ParallelApplyTxnHash
= NULL
;
228 * A list (pool) of active parallel apply workers. The information for
229 * the new worker is added to the list after successfully launching it. The
230 * list entry is removed if there are already enough workers in the worker
231 * pool at the end of the transaction. For more information about the worker
232 * pool, see comments atop this file.
234 static List
*ParallelApplyWorkerPool
= NIL
;
237 * Information shared between leader apply worker and parallel apply worker.
239 ParallelApplyWorkerShared
*MyParallelShared
= NULL
;
242 * Is there a message sent by a parallel apply worker that the leader apply
243 * worker needs to receive?
245 volatile sig_atomic_t ParallelApplyMessagePending
= false;
248 * Cache the parallel apply worker information required for applying the
249 * current streaming transaction. It is used to save the cost of searching the
250 * hash table when applying the changes between STREAM_START and STREAM_STOP.
252 static ParallelApplyWorkerInfo
*stream_apply_worker
= NULL
;
254 /* A list to maintain subtransactions, if any. */
255 static List
*subxactlist
= NIL
;
257 static void pa_free_worker_info(ParallelApplyWorkerInfo
*winfo
);
258 static ParallelTransState
pa_get_xact_state(ParallelApplyWorkerShared
*wshared
);
259 static PartialFileSetState
pa_get_fileset_state(void);
262 * Returns true if it is OK to start a parallel apply worker, false otherwise.
267 /* Only leader apply workers can start parallel apply workers. */
268 if (!am_leader_apply_worker())
272 * It is good to check for any change in the subscription parameter to
273 * avoid the case where for a very long time the change doesn't get
274 * reflected. This can happen when there is a constant flow of streaming
275 * transactions that are handled by parallel apply workers.
277 * It is better to do it before the below checks so that the latest values
278 * of subscription can be used for the checks.
280 maybe_reread_subscription();
283 * Don't start a new parallel apply worker if the subscription is not
284 * using parallel streaming mode, or if the publisher does not support
287 if (!MyLogicalRepWorker
->parallel_apply
)
291 * Don't start a new parallel worker if user has set skiplsn as it's
292 * possible that they want to skip the streaming transaction. For
293 * streaming transactions, we need to serialize the transaction to a file
294 * so that we can get the last LSN of the transaction to judge whether to
295 * skip before starting to apply the change.
297 * One might think that we could allow parallelism if the first lsn of the
298 * transaction is greater than skiplsn, but we don't send it with the
299 * STREAM START message, and it doesn't seem worth sending the extra eight
300 * bytes with the STREAM START to enable parallelism for this case.
302 if (!XLogRecPtrIsInvalid(MySubscription
->skiplsn
))
306 * For streaming transactions that are being applied using a parallel
307 * apply worker, we cannot decide whether to apply the change for a
308 * relation that is not in the READY state (see
309 * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
310 * time. So, we don't start the new parallel apply worker in this case.
312 if (!AllTablesyncsReady())
319 * Set up a dynamic shared memory segment.
321 * We set up a control region that contains a fixed-size worker info
322 * (ParallelApplyWorkerShared), a message queue, and an error queue.
324 * Returns true on success, false on failure.
327 pa_setup_dsm(ParallelApplyWorkerInfo
*winfo
)
333 ParallelApplyWorkerShared
*shared
;
335 Size queue_size
= DSM_QUEUE_SIZE
;
336 Size error_queue_size
= DSM_ERROR_QUEUE_SIZE
;
339 * Estimate how much shared memory we need.
341 * Because the TOC machinery may choose to insert padding of oddly-sized
342 * requests, we must estimate each chunk separately.
344 * We need one key to register the location of the header, and two other
345 * keys to track the locations of the message queue and the error message
348 shm_toc_initialize_estimator(&e
);
349 shm_toc_estimate_chunk(&e
, sizeof(ParallelApplyWorkerShared
));
350 shm_toc_estimate_chunk(&e
, queue_size
);
351 shm_toc_estimate_chunk(&e
, error_queue_size
);
353 shm_toc_estimate_keys(&e
, 3);
354 segsize
= shm_toc_estimate(&e
);
356 /* Create the shared memory segment and establish a table of contents. */
357 seg
= dsm_create(shm_toc_estimate(&e
), 0);
361 toc
= shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC
, dsm_segment_address(seg
),
364 /* Set up the header region. */
365 shared
= shm_toc_allocate(toc
, sizeof(ParallelApplyWorkerShared
));
366 SpinLockInit(&shared
->mutex
);
368 shared
->xact_state
= PARALLEL_TRANS_UNKNOWN
;
369 pg_atomic_init_u32(&(shared
->pending_stream_count
), 0);
370 shared
->last_commit_end
= InvalidXLogRecPtr
;
371 shared
->fileset_state
= FS_EMPTY
;
373 shm_toc_insert(toc
, PARALLEL_APPLY_KEY_SHARED
, shared
);
375 /* Set up message queue for the worker. */
376 mq
= shm_mq_create(shm_toc_allocate(toc
, queue_size
), queue_size
);
377 shm_toc_insert(toc
, PARALLEL_APPLY_KEY_MQ
, mq
);
378 shm_mq_set_sender(mq
, MyProc
);
380 /* Attach the queue. */
381 winfo
->mq_handle
= shm_mq_attach(mq
, seg
, NULL
);
383 /* Set up error queue for the worker. */
384 mq
= shm_mq_create(shm_toc_allocate(toc
, error_queue_size
),
386 shm_toc_insert(toc
, PARALLEL_APPLY_KEY_ERROR_QUEUE
, mq
);
387 shm_mq_set_receiver(mq
, MyProc
);
389 /* Attach the queue. */
390 winfo
->error_mq_handle
= shm_mq_attach(mq
, seg
, NULL
);
392 /* Return results to caller. */
393 winfo
->dsm_seg
= seg
;
394 winfo
->shared
= shared
;
400 * Try to get a parallel apply worker from the pool. If none is available then
403 static ParallelApplyWorkerInfo
*
404 pa_launch_parallel_worker(void)
406 MemoryContext oldcontext
;
408 ParallelApplyWorkerInfo
*winfo
;
411 /* Try to get an available parallel apply worker from the worker pool. */
412 foreach(lc
, ParallelApplyWorkerPool
)
414 winfo
= (ParallelApplyWorkerInfo
*) lfirst(lc
);
421 * Start a new parallel apply worker.
423 * The worker info can be used for the lifetime of the worker process, so
424 * create it in a permanent context.
426 oldcontext
= MemoryContextSwitchTo(ApplyContext
);
428 winfo
= (ParallelApplyWorkerInfo
*) palloc0(sizeof(ParallelApplyWorkerInfo
));
430 /* Setup shared memory. */
431 if (!pa_setup_dsm(winfo
))
433 MemoryContextSwitchTo(oldcontext
);
438 launched
= logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY
,
439 MyLogicalRepWorker
->dbid
,
441 MySubscription
->name
,
442 MyLogicalRepWorker
->userid
,
444 dsm_segment_handle(winfo
->dsm_seg
));
448 ParallelApplyWorkerPool
= lappend(ParallelApplyWorkerPool
, winfo
);
452 pa_free_worker_info(winfo
);
456 MemoryContextSwitchTo(oldcontext
);
462 * Allocate a parallel apply worker that will be used for the specified xid.
464 * We first try to get an available worker from the pool, if any and then try
465 * to launch a new worker. On successful allocation, remember the worker
466 * information in the hash table so that we can get it later for processing the
470 pa_allocate_worker(TransactionId xid
)
473 ParallelApplyWorkerInfo
*winfo
= NULL
;
474 ParallelApplyWorkerEntry
*entry
;
479 winfo
= pa_launch_parallel_worker();
483 /* First time through, initialize parallel apply worker state hashtable. */
484 if (!ParallelApplyTxnHash
)
488 MemSet(&ctl
, 0, sizeof(ctl
));
489 ctl
.keysize
= sizeof(TransactionId
);
490 ctl
.entrysize
= sizeof(ParallelApplyWorkerEntry
);
491 ctl
.hcxt
= ApplyContext
;
493 ParallelApplyTxnHash
= hash_create("logical replication parallel apply workers hash",
495 HASH_ELEM
| HASH_BLOBS
| HASH_CONTEXT
);
498 /* Create an entry for the requested transaction. */
499 entry
= hash_search(ParallelApplyTxnHash
, &xid
, HASH_ENTER
, &found
);
501 elog(ERROR
, "hash table corrupted");
503 /* Update the transaction information in shared memory. */
504 SpinLockAcquire(&winfo
->shared
->mutex
);
505 winfo
->shared
->xact_state
= PARALLEL_TRANS_UNKNOWN
;
506 winfo
->shared
->xid
= xid
;
507 SpinLockRelease(&winfo
->shared
->mutex
);
509 winfo
->in_use
= true;
510 winfo
->serialize_changes
= false;
511 entry
->winfo
= winfo
;
515 * Find the assigned worker for the given transaction, if any.
517 ParallelApplyWorkerInfo
*
518 pa_find_worker(TransactionId xid
)
521 ParallelApplyWorkerEntry
*entry
;
523 if (!TransactionIdIsValid(xid
))
526 if (!ParallelApplyTxnHash
)
529 /* Return the cached parallel apply worker if valid. */
530 if (stream_apply_worker
)
531 return stream_apply_worker
;
533 /* Find an entry for the requested transaction. */
534 entry
= hash_search(ParallelApplyTxnHash
, &xid
, HASH_FIND
, &found
);
537 /* The worker must not have exited. */
538 Assert(entry
->winfo
->in_use
);
546 * Makes the worker available for reuse.
548 * This removes the parallel apply worker entry from the hash table so that it
549 * can't be used. If there are enough workers in the pool, it stops the worker
550 * and frees the corresponding info. Otherwise it just marks the worker as
551 * available for reuse.
553 * For more information about the worker pool, see comments atop this file.
556 pa_free_worker(ParallelApplyWorkerInfo
*winfo
)
558 Assert(!am_parallel_apply_worker());
559 Assert(winfo
->in_use
);
560 Assert(pa_get_xact_state(winfo
->shared
) == PARALLEL_TRANS_FINISHED
);
562 if (!hash_search(ParallelApplyTxnHash
, &winfo
->shared
->xid
, HASH_REMOVE
, NULL
))
563 elog(ERROR
, "hash table corrupted");
566 * Stop the worker if there are enough workers in the pool.
568 * XXX Additionally, we also stop the worker if the leader apply worker
569 * serialize part of the transaction data due to a send timeout. This is
570 * because the message could be partially written to the queue and there
571 * is no way to clean the queue other than resending the message until it
572 * succeeds. Instead of trying to send the data which anyway would have
573 * been serialized and then letting the parallel apply worker deal with
574 * the spurious message, we stop the worker.
576 if (winfo
->serialize_changes
||
577 list_length(ParallelApplyWorkerPool
) >
578 (max_parallel_apply_workers_per_subscription
/ 2))
580 logicalrep_pa_worker_stop(winfo
);
581 pa_free_worker_info(winfo
);
586 winfo
->in_use
= false;
587 winfo
->serialize_changes
= false;
591 * Free the parallel apply worker information and unlink the files with
592 * serialized changes if any.
595 pa_free_worker_info(ParallelApplyWorkerInfo
*winfo
)
599 if (winfo
->mq_handle
)
600 shm_mq_detach(winfo
->mq_handle
);
602 if (winfo
->error_mq_handle
)
603 shm_mq_detach(winfo
->error_mq_handle
);
605 /* Unlink the files with serialized changes. */
606 if (winfo
->serialize_changes
)
607 stream_cleanup_files(MyLogicalRepWorker
->subid
, winfo
->shared
->xid
);
610 dsm_detach(winfo
->dsm_seg
);
612 /* Remove from the worker pool. */
613 ParallelApplyWorkerPool
= list_delete_ptr(ParallelApplyWorkerPool
, winfo
);
619 * Detach the error queue for all parallel apply workers.
622 pa_detach_all_error_mq(void)
626 foreach(lc
, ParallelApplyWorkerPool
)
628 ParallelApplyWorkerInfo
*winfo
= (ParallelApplyWorkerInfo
*) lfirst(lc
);
630 if (winfo
->error_mq_handle
)
632 shm_mq_detach(winfo
->error_mq_handle
);
633 winfo
->error_mq_handle
= NULL
;
639 * Check if there are any pending spooled messages.
642 pa_has_spooled_message_pending()
644 PartialFileSetState fileset_state
;
646 fileset_state
= pa_get_fileset_state();
648 return (fileset_state
!= FS_EMPTY
);
652 * Replay the spooled messages once the leader apply worker has finished
653 * serializing changes to the file.
655 * Returns false if there aren't any pending spooled messages, true otherwise.
658 pa_process_spooled_messages_if_required(void)
660 PartialFileSetState fileset_state
;
662 fileset_state
= pa_get_fileset_state();
664 if (fileset_state
== FS_EMPTY
)
668 * If the leader apply worker is busy serializing the partial changes then
669 * acquire the stream lock now and wait for the leader worker to finish
670 * serializing the changes. Otherwise, the parallel apply worker won't get
671 * a chance to receive a STREAM_STOP (and acquire the stream lock) until
672 * the leader had serialized all changes which can lead to undetected
675 * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
676 * worker has finished serializing the changes.
678 if (fileset_state
== FS_SERIALIZE_IN_PROGRESS
)
680 pa_lock_stream(MyParallelShared
->xid
, AccessShareLock
);
681 pa_unlock_stream(MyParallelShared
->xid
, AccessShareLock
);
683 fileset_state
= pa_get_fileset_state();
687 * We cannot read the file immediately after the leader has serialized all
688 * changes to the file because there may still be messages in the memory
689 * queue. We will apply all spooled messages the next time we call this
690 * function and that will ensure there are no messages left in the memory
693 if (fileset_state
== FS_SERIALIZE_DONE
)
695 pa_set_fileset_state(MyParallelShared
, FS_READY
);
697 else if (fileset_state
== FS_READY
)
699 apply_spooled_messages(&MyParallelShared
->fileset
,
700 MyParallelShared
->xid
,
702 pa_set_fileset_state(MyParallelShared
, FS_EMPTY
);
709 * Interrupt handler for main loop of parallel apply worker.
712 ProcessParallelApplyInterrupts(void)
714 CHECK_FOR_INTERRUPTS();
716 if (ShutdownRequestPending
)
719 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
720 MySubscription
->name
)));
725 if (ConfigReloadPending
)
727 ConfigReloadPending
= false;
728 ProcessConfigFile(PGC_SIGHUP
);
732 /* Parallel apply worker main loop. */
734 LogicalParallelApplyLoop(shm_mq_handle
*mqh
)
736 shm_mq_result shmq_res
;
737 ErrorContextCallback errcallback
;
738 MemoryContext oldcxt
= CurrentMemoryContext
;
741 * Init the ApplyMessageContext which we clean up after each replication
744 ApplyMessageContext
= AllocSetContextCreate(ApplyContext
,
745 "ApplyMessageContext",
746 ALLOCSET_DEFAULT_SIZES
);
749 * Push apply error context callback. Fields will be filled while applying
752 errcallback
.callback
= apply_error_callback
;
753 errcallback
.previous
= error_context_stack
;
754 error_context_stack
= &errcallback
;
761 ProcessParallelApplyInterrupts();
763 /* Ensure we are reading the data into our memory context. */
764 MemoryContextSwitchTo(ApplyMessageContext
);
766 shmq_res
= shm_mq_receive(mqh
, &len
, &data
, true);
768 if (shmq_res
== SHM_MQ_SUCCESS
)
774 elog(ERROR
, "invalid message length");
776 initReadOnlyStringInfo(&s
, data
, len
);
779 * The first byte of messages sent from leader apply worker to
780 * parallel apply workers can only be 'w'.
782 c
= pq_getmsgbyte(&s
);
784 elog(ERROR
, "unexpected message \"%c\"", c
);
787 * Ignore statistics fields that have been updated by the leader
790 * XXX We can avoid sending the statistics fields from the leader
791 * apply worker but for that, it needs to rebuild the entire
792 * message by removing these fields which could be more work than
793 * simply ignoring these fields in the parallel apply worker.
795 s
.cursor
+= SIZE_STATS_MESSAGE
;
799 else if (shmq_res
== SHM_MQ_WOULD_BLOCK
)
801 /* Replay the changes from the file, if any. */
802 if (!pa_process_spooled_messages_if_required())
806 /* Wait for more work. */
807 rc
= WaitLatch(MyLatch
,
808 WL_LATCH_SET
| WL_TIMEOUT
| WL_EXIT_ON_PM_DEATH
,
810 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN
);
812 if (rc
& WL_LATCH_SET
)
818 Assert(shmq_res
== SHM_MQ_DETACHED
);
821 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
822 errmsg("lost connection to the logical replication apply worker")));
825 MemoryContextReset(ApplyMessageContext
);
826 MemoryContextSwitchTo(oldcxt
);
829 /* Pop the error context stack. */
830 error_context_stack
= errcallback
.previous
;
832 MemoryContextSwitchTo(oldcxt
);
836 * Make sure the leader apply worker tries to read from our error queue one more
837 * time. This guards against the case where we exit uncleanly without sending
838 * an ErrorResponse, for example because some code calls proc_exit directly.
840 * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
841 * if any. See ParallelWorkerShutdown for details.
844 pa_shutdown(int code
, Datum arg
)
846 SendProcSignal(MyLogicalRepWorker
->leader_pid
,
847 PROCSIG_PARALLEL_APPLY_MESSAGE
,
848 INVALID_PROC_NUMBER
);
850 dsm_detach((dsm_segment
*) DatumGetPointer(arg
));
854 * Parallel apply worker entry point.
857 ParallelApplyWorkerMain(Datum main_arg
)
859 ParallelApplyWorkerShared
*shared
;
865 shm_mq_handle
*error_mqh
;
866 RepOriginId originid
;
867 int worker_slot
= DatumGetInt32(main_arg
);
868 char originname
[NAMEDATALEN
];
870 InitializingApplyWorker
= true;
872 /* Setup signal handling. */
873 pqsignal(SIGHUP
, SignalHandlerForConfigReload
);
874 pqsignal(SIGINT
, SignalHandlerForShutdownRequest
);
875 pqsignal(SIGTERM
, die
);
876 BackgroundWorkerUnblockSignals();
879 * Attach to the dynamic shared memory segment for the parallel apply, and
880 * find its table of contents.
882 * Like parallel query, we don't need resource owner by this time. See
883 * ParallelWorkerMain.
885 memcpy(&handle
, MyBgworkerEntry
->bgw_extra
, sizeof(dsm_handle
));
886 seg
= dsm_attach(handle
);
889 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
890 errmsg("could not map dynamic shared memory segment")));
892 toc
= shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC
, dsm_segment_address(seg
));
895 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
896 errmsg("invalid magic number in dynamic shared memory segment")));
898 /* Look up the shared information. */
899 shared
= shm_toc_lookup(toc
, PARALLEL_APPLY_KEY_SHARED
, false);
900 MyParallelShared
= shared
;
903 * Attach to the message queue.
905 mq
= shm_toc_lookup(toc
, PARALLEL_APPLY_KEY_MQ
, false);
906 shm_mq_set_receiver(mq
, MyProc
);
907 mqh
= shm_mq_attach(mq
, seg
, NULL
);
910 * Primary initialization is complete. Now, we can attach to our slot.
911 * This is to ensure that the leader apply worker does not write data to
912 * the uninitialized memory queue.
914 logicalrep_worker_attach(worker_slot
);
917 * Register the shutdown callback after we are attached to the worker
918 * slot. This is to ensure that MyLogicalRepWorker remains valid when this
919 * callback is invoked.
921 before_shmem_exit(pa_shutdown
, PointerGetDatum(seg
));
923 SpinLockAcquire(&MyParallelShared
->mutex
);
924 MyParallelShared
->logicalrep_worker_generation
= MyLogicalRepWorker
->generation
;
925 MyParallelShared
->logicalrep_worker_slot_no
= worker_slot
;
926 SpinLockRelease(&MyParallelShared
->mutex
);
929 * Attach to the error queue.
931 mq
= shm_toc_lookup(toc
, PARALLEL_APPLY_KEY_ERROR_QUEUE
, false);
932 shm_mq_set_sender(mq
, MyProc
);
933 error_mqh
= shm_mq_attach(mq
, seg
, NULL
);
935 pq_redirect_to_shm_mq(seg
, error_mqh
);
936 pq_set_parallel_leader(MyLogicalRepWorker
->leader_pid
,
937 INVALID_PROC_NUMBER
);
939 MyLogicalRepWorker
->last_send_time
= MyLogicalRepWorker
->last_recv_time
=
940 MyLogicalRepWorker
->reply_time
= 0;
942 InitializeLogRepWorker();
944 InitializingApplyWorker
= false;
946 /* Setup replication origin tracking. */
947 StartTransactionCommand();
948 ReplicationOriginNameForLogicalRep(MySubscription
->oid
, InvalidOid
,
949 originname
, sizeof(originname
));
950 originid
= replorigin_by_name(originname
, false);
953 * The parallel apply worker doesn't need to monopolize this replication
954 * origin which was already acquired by its leader process.
956 replorigin_session_setup(originid
, MyLogicalRepWorker
->leader_pid
);
957 replorigin_session_origin
= originid
;
958 CommitTransactionCommand();
961 * Setup callback for syscache so that we know when something changes in
962 * the subscription relation state.
964 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP
,
965 invalidate_syncing_table_states
,
968 set_apply_error_context_origin(originname
);
970 LogicalParallelApplyLoop(mqh
);
973 * The parallel apply worker must not get here because the parallel apply
974 * worker will only stop when it receives a SIGTERM or SIGINT from the
975 * leader, or when there is an error. None of these cases will allow the
976 * code to reach here.
982 * Handle receipt of an interrupt indicating a parallel apply worker message.
984 * Note: this is called within a signal handler! All we can do is set a flag
985 * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
986 * HandleParallelApplyMessages().
989 HandleParallelApplyMessageInterrupt(void)
991 InterruptPending
= true;
992 ParallelApplyMessagePending
= true;
997 * Handle a single protocol message received from a single parallel apply
1001 HandleParallelApplyMessage(StringInfo msg
)
1005 msgtype
= pq_getmsgbyte(msg
);
1009 case 'E': /* ErrorResponse */
1013 /* Parse ErrorResponse. */
1014 pq_parse_errornotice(msg
, &edata
);
1017 * If desired, add a context line to show that this is a
1018 * message propagated from a parallel apply worker. Otherwise,
1019 * it can sometimes be confusing to understand what actually
1023 edata
.context
= psprintf("%s\n%s", edata
.context
,
1024 _("logical replication parallel apply worker"));
1026 edata
.context
= pstrdup(_("logical replication parallel apply worker"));
1029 * Context beyond that should use the error context callbacks
1030 * that were in effect in LogicalRepApplyLoop().
1032 error_context_stack
= apply_error_context_stack
;
1035 * The actual error must have been reported by the parallel
1039 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
1040 errmsg("logical replication parallel apply worker exited due to error"),
1041 errcontext("%s", edata
.context
)));
1045 * Don't need to do anything about NoticeResponse and
1046 * NotifyResponse as the logical replication worker doesn't need
1047 * to send messages to the client.
1054 elog(ERROR
, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1060 * Handle any queued protocol messages received from parallel apply workers.
1063 HandleParallelApplyMessages(void)
1066 MemoryContext oldcontext
;
1068 static MemoryContext hpam_context
= NULL
;
1071 * This is invoked from ProcessInterrupts(), and since some of the
1072 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1073 * for recursive calls if more signals are received while this runs. It's
1074 * unclear that recursive entry would be safe, and it doesn't seem useful
1075 * even if it is safe, so let's block interrupts until done.
1080 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1081 * don't want to risk leaking data into long-lived contexts, so let's do
1082 * our work here in a private context that we can reset on each use.
1084 if (!hpam_context
) /* first time through? */
1085 hpam_context
= AllocSetContextCreate(TopMemoryContext
,
1086 "HandleParallelApplyMessages",
1087 ALLOCSET_DEFAULT_SIZES
);
1089 MemoryContextReset(hpam_context
);
1091 oldcontext
= MemoryContextSwitchTo(hpam_context
);
1093 ParallelApplyMessagePending
= false;
1095 foreach(lc
, ParallelApplyWorkerPool
)
1100 ParallelApplyWorkerInfo
*winfo
= (ParallelApplyWorkerInfo
*) lfirst(lc
);
1103 * The leader will detach from the error queue and set it to NULL
1104 * before preparing to stop all parallel apply workers, so we don't
1105 * need to handle error messages anymore. See
1106 * logicalrep_worker_detach.
1108 if (!winfo
->error_mq_handle
)
1111 res
= shm_mq_receive(winfo
->error_mq_handle
, &nbytes
, &data
, true);
1113 if (res
== SHM_MQ_WOULD_BLOCK
)
1115 else if (res
== SHM_MQ_SUCCESS
)
1119 initStringInfo(&msg
);
1120 appendBinaryStringInfo(&msg
, data
, nbytes
);
1121 HandleParallelApplyMessage(&msg
);
1126 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
1127 errmsg("lost connection to the logical replication parallel apply worker")));
1130 MemoryContextSwitchTo(oldcontext
);
1132 /* Might as well clear the context on our way out */
1133 MemoryContextReset(hpam_context
);
1135 RESUME_INTERRUPTS();
1139 * Send the data to the specified parallel apply worker via shared-memory
1142 * Returns false if the attempt to send data via shared memory times out, true
1146 pa_send_data(ParallelApplyWorkerInfo
*winfo
, Size nbytes
, const void *data
)
1149 shm_mq_result result
;
1150 TimestampTz startTime
= 0;
1152 Assert(!IsTransactionState());
1153 Assert(!winfo
->serialize_changes
);
1156 * We don't try to send data to parallel worker for 'immediate' mode. This
1157 * is primarily used for testing purposes.
1159 if (unlikely(debug_logical_replication_streaming
== DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
))
1163 * This timeout is a bit arbitrary but testing revealed that it is sufficient
1164 * to send the message unless the parallel apply worker is waiting on some
1165 * lock or there is a serious resource crunch. See the comments atop this file
1166 * to know why we are using a non-blocking way to send the message.
1168 #define SHM_SEND_RETRY_INTERVAL_MS 1000
1169 #define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1173 result
= shm_mq_send(winfo
->mq_handle
, nbytes
, data
, true, true);
1175 if (result
== SHM_MQ_SUCCESS
)
1177 else if (result
== SHM_MQ_DETACHED
)
1179 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
1180 errmsg("could not send data to shared-memory queue")));
1182 Assert(result
== SHM_MQ_WOULD_BLOCK
);
1184 /* Wait before retrying. */
1185 rc
= WaitLatch(MyLatch
,
1186 WL_LATCH_SET
| WL_TIMEOUT
| WL_EXIT_ON_PM_DEATH
,
1187 SHM_SEND_RETRY_INTERVAL_MS
,
1188 WAIT_EVENT_LOGICAL_APPLY_SEND_DATA
);
1190 if (rc
& WL_LATCH_SET
)
1192 ResetLatch(MyLatch
);
1193 CHECK_FOR_INTERRUPTS();
1197 startTime
= GetCurrentTimestamp();
1198 else if (TimestampDifferenceExceeds(startTime
, GetCurrentTimestamp(),
1199 SHM_SEND_TIMEOUT_MS
))
1205 * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
1206 * that the current data and any subsequent data for this transaction will be
1207 * serialized to a file. This is done to prevent possible deadlocks with
1208 * another parallel apply worker (refer to the comments atop this file).
1211 pa_switch_to_partial_serialize(ParallelApplyWorkerInfo
*winfo
,
1215 (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1216 winfo
->shared
->xid
)));
1219 * The parallel apply worker could be stuck for some reason (say waiting
1220 * on some lock by other backend), so stop trying to send data directly to
1221 * it and start serializing data to the file instead.
1223 winfo
->serialize_changes
= true;
1225 /* Initialize the stream fileset. */
1226 stream_start_internal(winfo
->shared
->xid
, true);
1229 * Acquires the stream lock if not already to make sure that the parallel
1230 * apply worker will wait for the leader to release the stream lock until
1231 * the end of the transaction.
1234 pa_lock_stream(winfo
->shared
->xid
, AccessExclusiveLock
);
1236 pa_set_fileset_state(winfo
->shared
, FS_SERIALIZE_IN_PROGRESS
);
1240 * Wait until the parallel apply worker's transaction state has reached or
1241 * exceeded the given xact_state.
1244 pa_wait_for_xact_state(ParallelApplyWorkerInfo
*winfo
,
1245 ParallelTransState xact_state
)
1250 * Stop if the transaction state has reached or exceeded the given
1253 if (pa_get_xact_state(winfo
->shared
) >= xact_state
)
1256 /* Wait to be signalled. */
1257 (void) WaitLatch(MyLatch
,
1258 WL_LATCH_SET
| WL_TIMEOUT
| WL_EXIT_ON_PM_DEATH
,
1260 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE
);
1262 /* Reset the latch so we don't spin. */
1263 ResetLatch(MyLatch
);
1265 /* An interrupt may have occurred while we were waiting. */
1266 CHECK_FOR_INTERRUPTS();
1271 * Wait until the parallel apply worker's transaction finishes.
1274 pa_wait_for_xact_finish(ParallelApplyWorkerInfo
*winfo
)
1277 * Wait until the parallel apply worker set the state to
1278 * PARALLEL_TRANS_STARTED which means it has acquired the transaction
1279 * lock. This is to prevent leader apply worker from acquiring the
1280 * transaction lock earlier than the parallel apply worker.
1282 pa_wait_for_xact_state(winfo
, PARALLEL_TRANS_STARTED
);
1285 * Wait for the transaction lock to be released. This is required to
1286 * detect deadlock among leader and parallel apply workers. Refer to the
1287 * comments atop this file.
1289 pa_lock_transaction(winfo
->shared
->xid
, AccessShareLock
);
1290 pa_unlock_transaction(winfo
->shared
->xid
, AccessShareLock
);
1293 * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
1294 * apply worker failed while applying changes causing the lock to be
1297 if (pa_get_xact_state(winfo
->shared
) != PARALLEL_TRANS_FINISHED
)
1299 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
1300 errmsg("lost connection to the logical replication parallel apply worker")));
1304 * Set the transaction state for a given parallel apply worker.
1307 pa_set_xact_state(ParallelApplyWorkerShared
*wshared
,
1308 ParallelTransState xact_state
)
1310 SpinLockAcquire(&wshared
->mutex
);
1311 wshared
->xact_state
= xact_state
;
1312 SpinLockRelease(&wshared
->mutex
);
1316 * Get the transaction state for a given parallel apply worker.
1318 static ParallelTransState
1319 pa_get_xact_state(ParallelApplyWorkerShared
*wshared
)
1321 ParallelTransState xact_state
;
1323 SpinLockAcquire(&wshared
->mutex
);
1324 xact_state
= wshared
->xact_state
;
1325 SpinLockRelease(&wshared
->mutex
);
1331 * Cache the parallel apply worker information.
1334 pa_set_stream_apply_worker(ParallelApplyWorkerInfo
*winfo
)
1336 stream_apply_worker
= winfo
;
1340 * Form a unique savepoint name for the streaming transaction.
1342 * Note that different subscriptions for publications on different nodes can
1343 * receive same remote xid, so we need to use subscription id along with it.
1345 * Returns the name in the supplied buffer.
1348 pa_savepoint_name(Oid suboid
, TransactionId xid
, char *spname
, Size szsp
)
1350 snprintf(spname
, szsp
, "pg_sp_%u_%u", suboid
, xid
);
1354 * Define a savepoint for a subxact in parallel apply worker if needed.
1356 * The parallel apply worker can figure out if a new subtransaction was
1357 * started by checking if the new change arrived with a different xid. In that
1358 * case define a named savepoint, so that we are able to rollback to it
1362 pa_start_subtrans(TransactionId current_xid
, TransactionId top_xid
)
1364 if (current_xid
!= top_xid
&&
1365 !list_member_xid(subxactlist
, current_xid
))
1367 MemoryContext oldctx
;
1368 char spname
[NAMEDATALEN
];
1370 pa_savepoint_name(MySubscription
->oid
, current_xid
,
1371 spname
, sizeof(spname
));
1373 elog(DEBUG1
, "defining savepoint %s in logical replication parallel apply worker", spname
);
1375 /* We must be in transaction block to define the SAVEPOINT. */
1376 if (!IsTransactionBlock())
1378 if (!IsTransactionState())
1379 StartTransactionCommand();
1381 BeginTransactionBlock();
1382 CommitTransactionCommand();
1385 DefineSavepoint(spname
);
1388 * CommitTransactionCommand is needed to start a subtransaction after
1389 * issuing a SAVEPOINT inside a transaction block (see
1390 * StartSubTransaction()).
1392 CommitTransactionCommand();
1394 oldctx
= MemoryContextSwitchTo(TopTransactionContext
);
1395 subxactlist
= lappend_xid(subxactlist
, current_xid
);
1396 MemoryContextSwitchTo(oldctx
);
1400 /* Reset the list that maintains subtransactions. */
1402 pa_reset_subtrans(void)
1405 * We don't need to free this explicitly as the allocated memory will be
1406 * freed at the transaction end.
1412 * Handle STREAM ABORT message when the transaction was applied in a parallel
1416 pa_stream_abort(LogicalRepStreamAbortData
*abort_data
)
1418 TransactionId xid
= abort_data
->xid
;
1419 TransactionId subxid
= abort_data
->subxid
;
1422 * Update origin state so we can restart streaming from correct position
1425 replorigin_session_origin_lsn
= abort_data
->abort_lsn
;
1426 replorigin_session_origin_timestamp
= abort_data
->abort_time
;
1429 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1430 * just free the subxactlist.
1434 pa_set_xact_state(MyParallelShared
, PARALLEL_TRANS_FINISHED
);
1437 * Release the lock as we might be processing an empty streaming
1438 * transaction in which case the lock won't be released during
1439 * transaction rollback.
1441 * Note that it's ok to release the transaction lock before aborting
1442 * the transaction because even if the parallel apply worker dies due
1443 * to crash or some other reason, such a transaction would still be
1444 * considered aborted.
1446 pa_unlock_transaction(xid
, AccessExclusiveLock
);
1448 AbortCurrentTransaction();
1450 if (IsTransactionBlock())
1452 EndTransactionBlock(false);
1453 CommitTransactionCommand();
1456 pa_reset_subtrans();
1458 pgstat_report_activity(STATE_IDLE
, NULL
);
1462 /* OK, so it's a subxact. Rollback to the savepoint. */
1464 char spname
[NAMEDATALEN
];
1466 pa_savepoint_name(MySubscription
->oid
, subxid
, spname
, sizeof(spname
));
1468 elog(DEBUG1
, "rolling back to savepoint %s in logical replication parallel apply worker", spname
);
1471 * Search the subxactlist, determine the offset tracked for the
1472 * subxact, and truncate the list.
1474 * Note that for an empty sub-transaction we won't find the subxid
1477 for (i
= list_length(subxactlist
) - 1; i
>= 0; i
--)
1479 TransactionId xid_tmp
= lfirst_xid(list_nth_cell(subxactlist
, i
));
1481 if (xid_tmp
== subxid
)
1483 RollbackToSavepoint(spname
);
1484 CommitTransactionCommand();
1485 subxactlist
= list_truncate(subxactlist
, i
);
1493 * Set the fileset state for a particular parallel apply worker. The fileset
1494 * will be set once the leader worker serialized all changes to the file
1495 * so that it can be used by parallel apply worker.
1498 pa_set_fileset_state(ParallelApplyWorkerShared
*wshared
,
1499 PartialFileSetState fileset_state
)
1501 SpinLockAcquire(&wshared
->mutex
);
1502 wshared
->fileset_state
= fileset_state
;
1504 if (fileset_state
== FS_SERIALIZE_DONE
)
1506 Assert(am_leader_apply_worker());
1507 Assert(MyLogicalRepWorker
->stream_fileset
);
1508 wshared
->fileset
= *MyLogicalRepWorker
->stream_fileset
;
1511 SpinLockRelease(&wshared
->mutex
);
1515 * Get the fileset state for the current parallel apply worker.
1517 static PartialFileSetState
1518 pa_get_fileset_state(void)
1520 PartialFileSetState fileset_state
;
1522 Assert(am_parallel_apply_worker());
1524 SpinLockAcquire(&MyParallelShared
->mutex
);
1525 fileset_state
= MyParallelShared
->fileset_state
;
1526 SpinLockRelease(&MyParallelShared
->mutex
);
1528 return fileset_state
;
1532 * Helper functions to acquire and release a lock for each stream block.
1534 * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
1537 * Refer to the comments atop this file to see how the stream lock is used.
1540 pa_lock_stream(TransactionId xid
, LOCKMODE lockmode
)
1542 LockApplyTransactionForSession(MyLogicalRepWorker
->subid
, xid
,
1543 PARALLEL_APPLY_LOCK_STREAM
, lockmode
);
1547 pa_unlock_stream(TransactionId xid
, LOCKMODE lockmode
)
1549 UnlockApplyTransactionForSession(MyLogicalRepWorker
->subid
, xid
,
1550 PARALLEL_APPLY_LOCK_STREAM
, lockmode
);
1554 * Helper functions to acquire and release a lock for each local transaction
1557 * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
1560 * Note that all the callers must pass a remote transaction ID instead of a
1561 * local transaction ID as xid. This is because the local transaction ID will
1562 * only be assigned while applying the first change in the parallel apply but
1563 * it's possible that the first change in the parallel apply worker is blocked
1564 * by a concurrently executing transaction in another parallel apply worker. We
1565 * can only communicate the local transaction id to the leader after applying
1566 * the first change so it won't be able to wait after sending the xact finish
1567 * command using this lock.
1569 * Refer to the comments atop this file to see how the transaction lock is
1573 pa_lock_transaction(TransactionId xid
, LOCKMODE lockmode
)
1575 LockApplyTransactionForSession(MyLogicalRepWorker
->subid
, xid
,
1576 PARALLEL_APPLY_LOCK_XACT
, lockmode
);
1580 pa_unlock_transaction(TransactionId xid
, LOCKMODE lockmode
)
1582 UnlockApplyTransactionForSession(MyLogicalRepWorker
->subid
, xid
,
1583 PARALLEL_APPLY_LOCK_XACT
, lockmode
);
1587 * Decrement the number of pending streaming blocks and wait on the stream lock
1588 * if there is no pending block available.
1591 pa_decr_and_wait_stream_block(void)
1593 Assert(am_parallel_apply_worker());
1596 * It is only possible to not have any pending stream chunks when we are
1597 * applying spooled messages.
1599 if (pg_atomic_read_u32(&MyParallelShared
->pending_stream_count
) == 0)
1601 if (pa_has_spooled_message_pending())
1604 elog(ERROR
, "invalid pending streaming chunk 0");
1607 if (pg_atomic_sub_fetch_u32(&MyParallelShared
->pending_stream_count
, 1) == 0)
1609 pa_lock_stream(MyParallelShared
->xid
, AccessShareLock
);
1610 pa_unlock_stream(MyParallelShared
->xid
, AccessShareLock
);
1615 * Finish processing the streaming transaction in the leader apply worker.
1618 pa_xact_finish(ParallelApplyWorkerInfo
*winfo
, XLogRecPtr remote_lsn
)
1620 Assert(am_leader_apply_worker());
1623 * Unlock the shared object lock so that parallel apply worker can
1624 * continue to receive and apply changes.
1626 pa_unlock_stream(winfo
->shared
->xid
, AccessExclusiveLock
);
1629 * Wait for that worker to finish. This is necessary to maintain commit
1630 * order which avoids failures due to transaction dependencies and
1633 pa_wait_for_xact_finish(winfo
);
1635 if (!XLogRecPtrIsInvalid(remote_lsn
))
1636 store_flush_position(remote_lsn
, winfo
->shared
->last_commit_end
);
1638 pa_free_worker(winfo
);