1 /*-------------------------------------------------------------------------
3 * PostgreSQL logical replication: initial table data synchronization
5 * Copyright (c) 2012-2024, PostgreSQL Global Development Group
8 * src/backend/replication/logical/tablesync.c
11 * This file contains code for initial table data synchronization for
12 * logical replication.
14 * The initial data synchronization is done separately for each table,
15 * in a separate apply worker that only fetches the initial snapshot data
16 * from the publisher and then synchronizes the position in the stream with
17 * the leader apply worker.
19 * There are several reasons for doing the synchronization this way:
20 * - It allows us to parallelize the initial data synchronization
21 * which lowers the time needed for it to happen.
22 * - The initial synchronization does not have to hold the xid and LSN
23 * for the time it takes to copy data of all tables, causing less
24 * bloat and lower disk consumption compared to doing the
25 * synchronization in a single process for the whole database.
26 * - It allows us to synchronize any tables added after the initial
27 * synchronization has finished.
29 * The stream position synchronization works in multiple steps:
30 * - Apply worker requests a tablesync worker to start, setting the new
31 * table state to INIT.
32 * - Tablesync worker starts; changes table state from INIT to DATASYNC while
34 * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 * worker specific) state to indicate when the copy phase has completed, so
36 * if the worker crashes with this (non-memory) state then the copy will not
38 * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 * until either the table state is set to SYNCDONE or the sync worker
43 * - After the sync worker has seen the state change to CATCHUP, it will
44 * read the stream and apply changes (acting like an apply worker) until
45 * it catches up to the specified stream position. Then it sets the
46 * state to SYNCDONE. There might be zero changes applied between
47 * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
49 * - Once the state is set to SYNCDONE, the apply will continue tracking
50 * the table until it reaches the SYNCDONE stream position, at which
51 * point it sets state to READY and stops tracking. Again, there might
52 * be zero changes in between.
54 * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
57 * The catalog pg_subscription_rel is used to keep information about
58 * subscribed tables and their state. The catalog holds all states
59 * except SYNCWAIT and CATCHUP which are only in shared memory.
61 * Example flows look like this:
62 * - Apply is in front:
64 * -> set in catalog FINISHEDCOPY
65 * -> set in memory SYNCWAIT
67 * -> set in memory CATCHUP
70 * -> set in catalog SYNCDONE
76 * -> set in catalog READY
80 * -> set in catalog FINISHEDCOPY
81 * -> set in memory SYNCWAIT
83 * -> set in memory CATCHUP
84 * -> continue per-table filtering
86 * -> set in catalog SYNCDONE
89 * -> set in catalog READY
90 * -> stop per-table filtering
92 *-------------------------------------------------------------------------
97 #include "access/table.h"
98 #include "access/xact.h"
99 #include "catalog/indexing.h"
100 #include "catalog/pg_subscription_rel.h"
101 #include "catalog/pg_type.h"
102 #include "commands/copy.h"
103 #include "miscadmin.h"
104 #include "nodes/makefuncs.h"
105 #include "parser/parse_relation.h"
107 #include "replication/logicallauncher.h"
108 #include "replication/logicalrelation.h"
109 #include "replication/logicalworker.h"
110 #include "replication/origin.h"
111 #include "replication/slot.h"
112 #include "replication/walreceiver.h"
113 #include "replication/worker_internal.h"
114 #include "storage/ipc.h"
115 #include "storage/lmgr.h"
116 #include "utils/acl.h"
117 #include "utils/array.h"
118 #include "utils/builtins.h"
119 #include "utils/lsyscache.h"
120 #include "utils/memutils.h"
121 #include "utils/rls.h"
122 #include "utils/snapmgr.h"
123 #include "utils/syscache.h"
124 #include "utils/usercontext.h"
128 SYNC_TABLE_STATE_NEEDS_REBUILD
,
129 SYNC_TABLE_STATE_REBUILD_STARTED
,
130 SYNC_TABLE_STATE_VALID
,
131 } SyncingTablesState
;
133 static SyncingTablesState table_states_validity
= SYNC_TABLE_STATE_NEEDS_REBUILD
;
134 static List
*table_states_not_ready
= NIL
;
135 static bool FetchTableStates(bool *started_tx
);
137 static StringInfo copybuf
= NULL
;
140 * Exit routine for synchronization worker.
143 pg_attribute_noreturn()
144 finish_sync_worker(void)
147 * Commit any outstanding transaction. This is the usual case, unless
148 * there was nothing to do for the table.
150 if (IsTransactionState())
152 CommitTransactionCommand();
153 pgstat_report_stat(true);
156 /* And flush all writes. */
157 XLogFlush(GetXLogWriteRecPtr());
159 StartTransactionCommand();
161 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
162 MySubscription
->name
,
163 get_rel_name(MyLogicalRepWorker
->relid
))));
164 CommitTransactionCommand();
166 /* Find the leader apply worker and signal it. */
167 logicalrep_worker_wakeup(MyLogicalRepWorker
->subid
, InvalidOid
);
169 /* Stop gracefully */
174 * Wait until the relation sync state is set in the catalog to the expected
175 * one; return true when it happens.
177 * Returns false if the table sync worker or the table itself have
178 * disappeared, or the table state has been reset.
180 * Currently, this is used in the apply worker when transitioning from
181 * CATCHUP state to SYNCDONE.
184 wait_for_relation_state_change(Oid relid
, char expected_state
)
190 LogicalRepWorker
*worker
;
193 CHECK_FOR_INTERRUPTS();
195 InvalidateCatalogSnapshot();
196 state
= GetSubscriptionRelState(MyLogicalRepWorker
->subid
,
199 if (state
== SUBREL_STATE_UNKNOWN
)
202 if (state
== expected_state
)
205 /* Check if the sync worker is still running and bail if not. */
206 LWLockAcquire(LogicalRepWorkerLock
, LW_SHARED
);
207 worker
= logicalrep_worker_find(MyLogicalRepWorker
->subid
, relid
,
209 LWLockRelease(LogicalRepWorkerLock
);
213 (void) WaitLatch(MyLatch
,
214 WL_LATCH_SET
| WL_TIMEOUT
| WL_EXIT_ON_PM_DEATH
,
215 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE
);
224 * Wait until the apply worker changes the state of our synchronization
225 * worker to the expected one.
227 * Used when transitioning from SYNCWAIT state to CATCHUP.
229 * Returns false if the apply worker has disappeared.
232 wait_for_worker_state_change(char expected_state
)
238 LogicalRepWorker
*worker
;
240 CHECK_FOR_INTERRUPTS();
243 * Done if already in correct state. (We assume this fetch is atomic
244 * enough to not give a misleading answer if we do it with no lock.)
246 if (MyLogicalRepWorker
->relstate
== expected_state
)
250 * Bail out if the apply worker has died, else signal it we're
253 LWLockAcquire(LogicalRepWorkerLock
, LW_SHARED
);
254 worker
= logicalrep_worker_find(MyLogicalRepWorker
->subid
,
256 if (worker
&& worker
->proc
)
257 logicalrep_worker_wakeup_ptr(worker
);
258 LWLockRelease(LogicalRepWorkerLock
);
263 * Wait. We expect to get a latch signal back from the apply worker,
264 * but use a timeout in case it dies without sending one.
266 rc
= WaitLatch(MyLatch
,
267 WL_LATCH_SET
| WL_TIMEOUT
| WL_EXIT_ON_PM_DEATH
,
268 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE
);
270 if (rc
& WL_LATCH_SET
)
278 * Callback from syscache invalidation.
281 invalidate_syncing_table_states(Datum arg
, int cacheid
, uint32 hashvalue
)
283 table_states_validity
= SYNC_TABLE_STATE_NEEDS_REBUILD
;
287 * Handle table synchronization cooperation from the synchronization
290 * If the sync worker is in CATCHUP state and reached (or passed) the
291 * predetermined synchronization point in the WAL stream, mark the table as
292 * SYNCDONE and finish.
295 process_syncing_tables_for_sync(XLogRecPtr current_lsn
)
297 SpinLockAcquire(&MyLogicalRepWorker
->relmutex
);
299 if (MyLogicalRepWorker
->relstate
== SUBREL_STATE_CATCHUP
&&
300 current_lsn
>= MyLogicalRepWorker
->relstate_lsn
)
303 char syncslotname
[NAMEDATALEN
] = {0};
304 char originname
[NAMEDATALEN
] = {0};
306 MyLogicalRepWorker
->relstate
= SUBREL_STATE_SYNCDONE
;
307 MyLogicalRepWorker
->relstate_lsn
= current_lsn
;
309 SpinLockRelease(&MyLogicalRepWorker
->relmutex
);
312 * UpdateSubscriptionRelState must be called within a transaction.
314 if (!IsTransactionState())
315 StartTransactionCommand();
317 UpdateSubscriptionRelState(MyLogicalRepWorker
->subid
,
318 MyLogicalRepWorker
->relid
,
319 MyLogicalRepWorker
->relstate
,
320 MyLogicalRepWorker
->relstate_lsn
);
323 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
326 walrcv_endstreaming(LogRepWorkerWalRcvConn
, &tli
);
329 * Cleanup the tablesync slot.
331 * This has to be done after updating the state because otherwise if
332 * there is an error while doing the database operations we won't be
333 * able to rollback dropped slot.
335 ReplicationSlotNameForTablesync(MyLogicalRepWorker
->subid
,
336 MyLogicalRepWorker
->relid
,
338 sizeof(syncslotname
));
341 * It is important to give an error if we are unable to drop the slot,
342 * otherwise, it won't be dropped till the corresponding subscription
343 * is dropped. So passing missing_ok = false.
345 ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn
, syncslotname
, false);
347 CommitTransactionCommand();
348 pgstat_report_stat(false);
351 * Start a new transaction to clean up the tablesync origin tracking.
352 * This transaction will be ended within the finish_sync_worker().
353 * Now, even, if we fail to remove this here, the apply worker will
354 * ensure to clean it up afterward.
356 * We need to do this after the table state is set to SYNCDONE.
357 * Otherwise, if an error occurs while performing the database
358 * operation, the worker will be restarted and the in-memory state of
359 * replication progress (remote_lsn) won't be rolled-back which would
360 * have been cleared before restart. So, the restarted worker will use
361 * invalid replication progress state resulting in replay of
362 * transactions that have already been applied.
364 StartTransactionCommand();
366 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker
->subid
,
367 MyLogicalRepWorker
->relid
,
372 * Resetting the origin session removes the ownership of the slot.
373 * This is needed to allow the origin to be dropped.
375 replorigin_session_reset();
376 replorigin_session_origin
= InvalidRepOriginId
;
377 replorigin_session_origin_lsn
= InvalidXLogRecPtr
;
378 replorigin_session_origin_timestamp
= 0;
381 * Drop the tablesync's origin tracking if exists.
383 * There is a chance that the user is concurrently performing refresh
384 * for the subscription where we remove the table state and its origin
385 * or the apply worker would have removed this origin. So passing
388 replorigin_drop_by_name(originname
, true, false);
390 finish_sync_worker();
393 SpinLockRelease(&MyLogicalRepWorker
->relmutex
);
397 * Handle table synchronization cooperation from the apply worker.
399 * Walk over all subscription tables that are individually tracked by the
400 * apply process (currently, all that have state other than
401 * SUBREL_STATE_READY) and manage synchronization for them.
403 * If there are tables that need synchronizing and are not being synchronized
404 * yet, start sync workers for them (if there are free slots for sync
405 * workers). To prevent starting the sync worker for the same relation at a
406 * high frequency after a failure, we store its last start time with each sync
407 * state info. We start the sync worker for the same relation after waiting
408 * at least wal_retrieve_retry_interval.
410 * For tables that are being synchronized already, check if sync workers
411 * either need action from the apply worker or have finished. This is the
412 * SYNCWAIT to CATCHUP transition.
414 * If the synchronization position is reached (SYNCDONE), then the table can
415 * be marked as READY and is no longer tracked.
418 process_syncing_tables_for_apply(XLogRecPtr current_lsn
)
420 struct tablesync_start_time_mapping
423 TimestampTz last_start_time
;
425 static HTAB
*last_start_times
= NULL
;
427 bool started_tx
= false;
428 bool should_exit
= false;
430 Assert(!IsTransactionState());
432 /* We need up-to-date sync state info for subscription tables here. */
433 FetchTableStates(&started_tx
);
436 * Prepare a hash table for tracking last start times of workers, to avoid
437 * immediate restarts. We don't need it if there are no tables that need
440 if (table_states_not_ready
!= NIL
&& !last_start_times
)
444 ctl
.keysize
= sizeof(Oid
);
445 ctl
.entrysize
= sizeof(struct tablesync_start_time_mapping
);
446 last_start_times
= hash_create("Logical replication table sync worker start times",
447 256, &ctl
, HASH_ELEM
| HASH_BLOBS
);
451 * Clean up the hash table when we're done with all tables (just to
452 * release the bit of memory).
454 else if (table_states_not_ready
== NIL
&& last_start_times
)
456 hash_destroy(last_start_times
);
457 last_start_times
= NULL
;
461 * Process all tables that are being synchronized.
463 foreach(lc
, table_states_not_ready
)
465 SubscriptionRelState
*rstate
= (SubscriptionRelState
*) lfirst(lc
);
467 if (rstate
->state
== SUBREL_STATE_SYNCDONE
)
470 * Apply has caught up to the position where the table sync has
471 * finished. Mark the table as ready so that the apply will just
472 * continue to replicate it normally.
474 if (current_lsn
>= rstate
->lsn
)
476 char originname
[NAMEDATALEN
];
478 rstate
->state
= SUBREL_STATE_READY
;
479 rstate
->lsn
= current_lsn
;
482 StartTransactionCommand();
487 * Remove the tablesync origin tracking if exists.
489 * There is a chance that the user is concurrently performing
490 * refresh for the subscription where we remove the table
491 * state and its origin or the tablesync worker would have
492 * already removed this origin. We can't rely on tablesync
493 * worker to remove the origin tracking as if there is any
494 * error while dropping we won't restart it to drop the
495 * origin. So passing missing_ok = true.
497 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker
->subid
,
501 replorigin_drop_by_name(originname
, true, false);
504 * Update the state to READY only after the origin cleanup.
506 UpdateSubscriptionRelState(MyLogicalRepWorker
->subid
,
507 rstate
->relid
, rstate
->state
,
513 LogicalRepWorker
*syncworker
;
516 * Look for a sync worker for this relation.
518 LWLockAcquire(LogicalRepWorkerLock
, LW_SHARED
);
520 syncworker
= logicalrep_worker_find(MyLogicalRepWorker
->subid
,
521 rstate
->relid
, false);
525 /* Found one, update our copy of its state */
526 SpinLockAcquire(&syncworker
->relmutex
);
527 rstate
->state
= syncworker
->relstate
;
528 rstate
->lsn
= syncworker
->relstate_lsn
;
529 if (rstate
->state
== SUBREL_STATE_SYNCWAIT
)
532 * Sync worker is waiting for apply. Tell sync worker it
535 syncworker
->relstate
= SUBREL_STATE_CATCHUP
;
536 syncworker
->relstate_lsn
=
537 Max(syncworker
->relstate_lsn
, current_lsn
);
539 SpinLockRelease(&syncworker
->relmutex
);
541 /* If we told worker to catch up, wait for it. */
542 if (rstate
->state
== SUBREL_STATE_SYNCWAIT
)
544 /* Signal the sync worker, as it may be waiting for us. */
545 if (syncworker
->proc
)
546 logicalrep_worker_wakeup_ptr(syncworker
);
548 /* Now safe to release the LWLock */
549 LWLockRelease(LogicalRepWorkerLock
);
554 * We must commit the existing transaction to release
555 * the existing locks before entering a busy loop.
556 * This is required to avoid any undetected deadlocks
557 * due to any existing lock as deadlock detector won't
558 * be able to detect the waits on the latch.
560 CommitTransactionCommand();
561 pgstat_report_stat(false);
565 * Enter busy loop and wait for synchronization worker to
566 * reach expected state (or die trying).
568 StartTransactionCommand();
571 wait_for_relation_state_change(rstate
->relid
,
572 SUBREL_STATE_SYNCDONE
);
575 LWLockRelease(LogicalRepWorkerLock
);
580 * If there is no sync worker for this table yet, count
581 * running sync workers for this subscription, while we have
585 logicalrep_sync_worker_count(MyLogicalRepWorker
->subid
);
587 /* Now safe to release the LWLock */
588 LWLockRelease(LogicalRepWorkerLock
);
591 * If there are free sync worker slot(s), start a new sync
592 * worker for the table.
594 if (nsyncworkers
< max_sync_workers_per_subscription
)
596 TimestampTz now
= GetCurrentTimestamp();
597 struct tablesync_start_time_mapping
*hentry
;
600 hentry
= hash_search(last_start_times
, &rstate
->relid
,
604 TimestampDifferenceExceeds(hentry
->last_start_time
, now
,
605 wal_retrieve_retry_interval
))
607 logicalrep_worker_launch(WORKERTYPE_TABLESYNC
,
608 MyLogicalRepWorker
->dbid
,
610 MySubscription
->name
,
611 MyLogicalRepWorker
->userid
,
614 hentry
->last_start_time
= now
;
624 * Even when the two_phase mode is requested by the user, it remains
625 * as 'pending' until all tablesyncs have reached READY state.
627 * When this happens, we restart the apply worker and (if the
628 * conditions are still ok) then the two_phase tri-state will become
629 * 'enabled' at that time.
631 * Note: If the subscription has no tables then leave the state as
632 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
635 if (MySubscription
->twophasestate
== LOGICALREP_TWOPHASE_STATE_PENDING
)
637 CommandCounterIncrement(); /* make updates visible */
638 if (AllTablesyncsReady())
641 (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
642 MySubscription
->name
)));
647 CommitTransactionCommand();
648 pgstat_report_stat(true);
654 * Reset the last-start time for this worker so that the launcher will
655 * restart it without waiting for wal_retrieve_retry_interval.
657 ApplyLauncherForgetWorkerStartTime(MySubscription
->oid
);
664 * Process possible state change(s) of tables that are being synchronized.
667 process_syncing_tables(XLogRecPtr current_lsn
)
669 switch (MyLogicalRepWorker
->type
)
671 case WORKERTYPE_PARALLEL_APPLY
:
674 * Skip for parallel apply workers because they only operate on
675 * tables that are in a READY state. See pa_can_start() and
676 * should_apply_changes_for_rel().
680 case WORKERTYPE_TABLESYNC
:
681 process_syncing_tables_for_sync(current_lsn
);
684 case WORKERTYPE_APPLY
:
685 process_syncing_tables_for_apply(current_lsn
);
688 case WORKERTYPE_UNKNOWN
:
689 /* Should never happen. */
690 elog(ERROR
, "Unknown worker type");
695 * Create list of columns for COPY based on logical relation mapping.
698 make_copy_attnamelist(LogicalRepRelMapEntry
*rel
)
700 List
*attnamelist
= NIL
;
703 for (i
= 0; i
< rel
->remoterel
.natts
; i
++)
705 attnamelist
= lappend(attnamelist
,
706 makeString(rel
->remoterel
.attnames
[i
]));
714 * Data source callback for the COPY FROM, which reads from the remote
715 * connection and passes the data back to our local COPY.
718 copy_read_data(void *outbuf
, int minread
, int maxread
)
723 /* If there are some leftover data from previous read, use it. */
724 avail
= copybuf
->len
- copybuf
->cursor
;
729 memcpy(outbuf
, ©buf
->data
[copybuf
->cursor
], avail
);
730 copybuf
->cursor
+= avail
;
735 while (maxread
> 0 && bytesread
< minread
)
737 pgsocket fd
= PGINVALID_SOCKET
;
743 /* Try read the data. */
744 len
= walrcv_receive(LogRepWorkerWalRcvConn
, &buf
, &fd
);
746 CHECK_FOR_INTERRUPTS();
754 /* Process the data */
759 avail
= copybuf
->len
- copybuf
->cursor
;
762 memcpy(outbuf
, ©buf
->data
[copybuf
->cursor
], avail
);
763 outbuf
= (char *) outbuf
+ avail
;
764 copybuf
->cursor
+= avail
;
769 if (maxread
<= 0 || bytesread
>= minread
)
774 * Wait for more data or latch.
776 (void) WaitLatchOrSocket(MyLatch
,
777 WL_SOCKET_READABLE
| WL_LATCH_SET
|
778 WL_TIMEOUT
| WL_EXIT_ON_PM_DEATH
,
779 fd
, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA
);
789 * Get information about remote relation in similar fashion the RELATION
790 * message provides during replication.
792 * This function also returns (a) the relation qualifications to be used in
793 * the COPY command, and (b) whether the remote relation has published any
797 fetch_remote_table_info(char *nspname
, char *relname
, LogicalRepRelation
*lrel
,
798 List
**qual
, bool *gencol_published
)
800 WalRcvExecResult
*res
;
802 TupleTableSlot
*slot
;
803 Oid tableRow
[] = {OIDOID
, CHAROID
, CHAROID
};
804 Oid attrRow
[] = {INT2OID
, TEXTOID
, OIDOID
, BOOLOID
, BOOLOID
};
805 Oid qualRow
[] = {TEXTOID
};
808 StringInfo pub_names
= NULL
;
809 Bitmapset
*included_cols
= NULL
;
810 int server_version
= walrcv_server_version(LogRepWorkerWalRcvConn
);
812 lrel
->nspname
= nspname
;
813 lrel
->relname
= relname
;
815 /* First fetch Oid and replica identity. */
816 initStringInfo(&cmd
);
817 appendStringInfo(&cmd
, "SELECT c.oid, c.relreplident, c.relkind"
818 " FROM pg_catalog.pg_class c"
819 " INNER JOIN pg_catalog.pg_namespace n"
820 " ON (c.relnamespace = n.oid)"
821 " WHERE n.nspname = %s"
822 " AND c.relname = %s",
823 quote_literal_cstr(nspname
),
824 quote_literal_cstr(relname
));
825 res
= walrcv_exec(LogRepWorkerWalRcvConn
, cmd
.data
,
826 lengthof(tableRow
), tableRow
);
828 if (res
->status
!= WALRCV_OK_TUPLES
)
830 (errcode(ERRCODE_CONNECTION_FAILURE
),
831 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
832 nspname
, relname
, res
->err
)));
834 slot
= MakeSingleTupleTableSlot(res
->tupledesc
, &TTSOpsMinimalTuple
);
835 if (!tuplestore_gettupleslot(res
->tuplestore
, true, false, slot
))
837 (errcode(ERRCODE_UNDEFINED_OBJECT
),
838 errmsg("table \"%s.%s\" not found on publisher",
841 lrel
->remoteid
= DatumGetObjectId(slot_getattr(slot
, 1, &isnull
));
843 lrel
->replident
= DatumGetChar(slot_getattr(slot
, 2, &isnull
));
845 lrel
->relkind
= DatumGetChar(slot_getattr(slot
, 3, &isnull
));
848 ExecDropSingleTupleTableSlot(slot
);
849 walrcv_clear_result(res
);
853 * Get column lists for each relation.
855 * We need to do this before fetching info about column names and types,
856 * so that we can skip columns that should not be replicated.
858 if (server_version
>= 150000)
860 WalRcvExecResult
*pubres
;
861 TupleTableSlot
*tslot
;
862 Oid attrsRow
[] = {INT2VECTOROID
};
864 /* Build the pub_names comma-separated string. */
865 pub_names
= makeStringInfo();
866 GetPublicationsStr(MySubscription
->publications
, pub_names
, true);
869 * Fetch info about column lists for the relation (from all the
872 resetStringInfo(&cmd
);
873 appendStringInfo(&cmd
,
875 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
876 " THEN NULL ELSE gpt.attrs END)"
877 " FROM pg_publication p,"
878 " LATERAL pg_get_publication_tables(p.pubname) gpt,"
880 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
881 " AND p.pubname IN ( %s )",
885 pubres
= walrcv_exec(LogRepWorkerWalRcvConn
, cmd
.data
,
886 lengthof(attrsRow
), attrsRow
);
888 if (pubres
->status
!= WALRCV_OK_TUPLES
)
890 (errcode(ERRCODE_CONNECTION_FAILURE
),
891 errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
892 nspname
, relname
, pubres
->err
)));
895 * We don't support the case where the column list is different for
896 * the same table when combining publications. See comments atop
897 * fetch_table_list. So there should be only one row returned.
898 * Although we already checked this when creating the subscription, we
899 * still need to check here in case the column list was changed after
900 * creating the subscription and before the sync worker is started.
902 if (tuplestore_tuple_count(pubres
->tuplestore
) > 1)
904 errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
905 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
909 * Get the column list and build a single bitmap with the attnums.
911 * If we find a NULL value, it means all the columns should be
914 tslot
= MakeSingleTupleTableSlot(pubres
->tupledesc
, &TTSOpsMinimalTuple
);
915 if (tuplestore_gettupleslot(pubres
->tuplestore
, true, false, tslot
))
917 Datum cfval
= slot_getattr(tslot
, 1, &isnull
);
925 arr
= DatumGetArrayTypeP(cfval
);
926 nelems
= ARR_DIMS(arr
)[0];
927 elems
= (int16
*) ARR_DATA_PTR(arr
);
929 for (natt
= 0; natt
< nelems
; natt
++)
930 included_cols
= bms_add_member(included_cols
, elems
[natt
]);
933 ExecClearTuple(tslot
);
935 ExecDropSingleTupleTableSlot(tslot
);
937 walrcv_clear_result(pubres
);
941 * Now fetch column names and types.
943 resetStringInfo(&cmd
);
944 appendStringInfo(&cmd
,
948 " a.attnum = ANY(i.indkey)");
950 /* Generated columns can be replicated since version 18. */
951 if (server_version
>= 180000)
952 appendStringInfo(&cmd
, ", a.attgenerated != ''");
954 appendStringInfo(&cmd
,
955 " FROM pg_catalog.pg_attribute a"
956 " LEFT JOIN pg_catalog.pg_index i"
957 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
958 " WHERE a.attnum > 0::pg_catalog.int2"
959 " AND NOT a.attisdropped %s"
960 " AND a.attrelid = %u"
961 " ORDER BY a.attnum",
963 (server_version
>= 120000 && server_version
< 180000 ?
964 "AND a.attgenerated = ''" : ""),
966 res
= walrcv_exec(LogRepWorkerWalRcvConn
, cmd
.data
,
967 server_version
>= 180000 ? lengthof(attrRow
) : lengthof(attrRow
) - 1, attrRow
);
969 if (res
->status
!= WALRCV_OK_TUPLES
)
971 (errcode(ERRCODE_CONNECTION_FAILURE
),
972 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
973 nspname
, relname
, res
->err
)));
975 /* We don't know the number of rows coming, so allocate enough space. */
976 lrel
->attnames
= palloc0(MaxTupleAttributeNumber
* sizeof(char *));
977 lrel
->atttyps
= palloc0(MaxTupleAttributeNumber
* sizeof(Oid
));
978 lrel
->attkeys
= NULL
;
981 * Store the columns as a list of names. Ignore those that are not
982 * present in the column list, if there is one.
985 slot
= MakeSingleTupleTableSlot(res
->tupledesc
, &TTSOpsMinimalTuple
);
986 while (tuplestore_gettupleslot(res
->tuplestore
, true, false, slot
))
991 attnum
= DatumGetInt16(slot_getattr(slot
, 1, &isnull
));
994 /* If the column is not in the column list, skip it. */
995 if (included_cols
!= NULL
&& !bms_is_member(attnum
, included_cols
))
997 ExecClearTuple(slot
);
1001 rel_colname
= TextDatumGetCString(slot_getattr(slot
, 2, &isnull
));
1004 lrel
->attnames
[natt
] = rel_colname
;
1005 lrel
->atttyps
[natt
] = DatumGetObjectId(slot_getattr(slot
, 3, &isnull
));
1008 if (DatumGetBool(slot_getattr(slot
, 4, &isnull
)))
1009 lrel
->attkeys
= bms_add_member(lrel
->attkeys
, natt
);
1011 /* Remember if the remote table has published any generated column. */
1012 if (server_version
>= 180000 && !(*gencol_published
))
1014 *gencol_published
= DatumGetBool(slot_getattr(slot
, 5, &isnull
));
1018 /* Should never happen. */
1019 if (++natt
>= MaxTupleAttributeNumber
)
1020 elog(ERROR
, "too many columns in remote table \"%s.%s\"",
1023 ExecClearTuple(slot
);
1025 ExecDropSingleTupleTableSlot(slot
);
1029 walrcv_clear_result(res
);
1032 * Get relation's row filter expressions. DISTINCT avoids the same
1033 * expression of a table in multiple publications from being included
1034 * multiple times in the final expression.
1036 * We need to copy the row even if it matches just one of the
1037 * publications, so we later combine all the quals with OR.
1039 * For initial synchronization, row filtering can be ignored in following
1042 * 1) one of the subscribed publications for the table hasn't specified
1045 * 2) one of the subscribed publications has puballtables set to true
1047 * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1048 * that includes this relation
1050 if (server_version
>= 150000)
1052 /* Reuse the already-built pub_names. */
1053 Assert(pub_names
!= NULL
);
1055 /* Check for row filters. */
1056 resetStringInfo(&cmd
);
1057 appendStringInfo(&cmd
,
1058 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1059 " FROM pg_publication p,"
1060 " LATERAL pg_get_publication_tables(p.pubname) gpt"
1061 " WHERE gpt.relid = %u"
1062 " AND p.pubname IN ( %s )",
1066 res
= walrcv_exec(LogRepWorkerWalRcvConn
, cmd
.data
, 1, qualRow
);
1068 if (res
->status
!= WALRCV_OK_TUPLES
)
1070 (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1071 nspname
, relname
, res
->err
)));
1074 * Multiple row filter expressions for the same table will be combined
1075 * by COPY using OR. If any of the filter expressions for this table
1076 * are null, it means the whole table will be copied. In this case it
1077 * is not necessary to construct a unified row filter expression at
1080 slot
= MakeSingleTupleTableSlot(res
->tupledesc
, &TTSOpsMinimalTuple
);
1081 while (tuplestore_gettupleslot(res
->tuplestore
, true, false, slot
))
1083 Datum rf
= slot_getattr(slot
, 1, &isnull
);
1086 *qual
= lappend(*qual
, makeString(TextDatumGetCString(rf
)));
1089 /* Ignore filters and cleanup as necessary. */
1092 list_free_deep(*qual
);
1098 ExecClearTuple(slot
);
1100 ExecDropSingleTupleTableSlot(slot
);
1102 walrcv_clear_result(res
);
1103 destroyStringInfo(pub_names
);
1110 * Copy existing data of a table from publisher.
1112 * Caller is responsible for locking the local relation.
1115 copy_table(Relation rel
)
1117 LogicalRepRelMapEntry
*relmapentry
;
1118 LogicalRepRelation lrel
;
1120 WalRcvExecResult
*res
;
1122 CopyFromState cstate
;
1125 List
*options
= NIL
;
1126 bool gencol_published
= false;
1128 /* Get the publisher relation info. */
1129 fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel
)),
1130 RelationGetRelationName(rel
), &lrel
, &qual
,
1133 /* Put the relation into relmap. */
1134 logicalrep_relmap_update(&lrel
);
1136 /* Map the publisher relation to local one. */
1137 relmapentry
= logicalrep_rel_open(lrel
.remoteid
, NoLock
);
1138 Assert(rel
== relmapentry
->localrel
);
1140 /* Start copy on the publisher. */
1141 initStringInfo(&cmd
);
1143 /* Regular table with no row filter or generated columns */
1144 if (lrel
.relkind
== RELKIND_RELATION
&& qual
== NIL
&& !gencol_published
)
1146 appendStringInfo(&cmd
, "COPY %s",
1147 quote_qualified_identifier(lrel
.nspname
, lrel
.relname
));
1149 /* If the table has columns, then specify the columns */
1152 appendStringInfoString(&cmd
, " (");
1155 * XXX Do we need to list the columns in all cases? Maybe we're
1156 * replicating all columns?
1158 for (int i
= 0; i
< lrel
.natts
; i
++)
1161 appendStringInfoString(&cmd
, ", ");
1163 appendStringInfoString(&cmd
, quote_identifier(lrel
.attnames
[i
]));
1166 appendStringInfoChar(&cmd
, ')');
1169 appendStringInfoString(&cmd
, " TO STDOUT");
1174 * For non-tables and tables with row filters, we need to do COPY
1175 * (SELECT ...), but we can't just do SELECT * because we may need to
1176 * copy only subset of columns including generated columns. For tables
1177 * with any row filters, build a SELECT query with OR'ed row filters
1180 * We also need to use this same COPY (SELECT ...) syntax when
1181 * generated columns are published, because copy of generated columns
1182 * is not supported by the normal COPY.
1184 appendStringInfoString(&cmd
, "COPY (SELECT ");
1185 for (int i
= 0; i
< lrel
.natts
; i
++)
1187 appendStringInfoString(&cmd
, quote_identifier(lrel
.attnames
[i
]));
1188 if (i
< lrel
.natts
- 1)
1189 appendStringInfoString(&cmd
, ", ");
1192 appendStringInfoString(&cmd
, " FROM ");
1195 * For regular tables, make sure we don't copy data from a child that
1196 * inherits the named table as those will be copied separately.
1198 if (lrel
.relkind
== RELKIND_RELATION
)
1199 appendStringInfoString(&cmd
, "ONLY ");
1201 appendStringInfoString(&cmd
, quote_qualified_identifier(lrel
.nspname
, lrel
.relname
));
1202 /* list of OR'ed filters */
1206 char *q
= strVal(linitial(qual
));
1208 appendStringInfo(&cmd
, " WHERE %s", q
);
1209 for_each_from(lc
, qual
, 1)
1211 q
= strVal(lfirst(lc
));
1212 appendStringInfo(&cmd
, " OR %s", q
);
1214 list_free_deep(qual
);
1217 appendStringInfoString(&cmd
, ") TO STDOUT");
1221 * Prior to v16, initial table synchronization will use text format even
1222 * if the binary option is enabled for a subscription.
1224 if (walrcv_server_version(LogRepWorkerWalRcvConn
) >= 160000 &&
1225 MySubscription
->binary
)
1227 appendStringInfoString(&cmd
, " WITH (FORMAT binary)");
1228 options
= list_make1(makeDefElem("format",
1229 (Node
*) makeString("binary"), -1));
1232 res
= walrcv_exec(LogRepWorkerWalRcvConn
, cmd
.data
, 0, NULL
);
1234 if (res
->status
!= WALRCV_OK_COPY_OUT
)
1236 (errcode(ERRCODE_CONNECTION_FAILURE
),
1237 errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1238 lrel
.nspname
, lrel
.relname
, res
->err
)));
1239 walrcv_clear_result(res
);
1241 copybuf
= makeStringInfo();
1243 pstate
= make_parsestate(NULL
);
1244 (void) addRangeTableEntryForRelation(pstate
, rel
, AccessShareLock
,
1245 NULL
, false, false);
1247 attnamelist
= make_copy_attnamelist(relmapentry
);
1248 cstate
= BeginCopyFrom(pstate
, rel
, NULL
, NULL
, false, copy_read_data
, attnamelist
, options
);
1251 (void) CopyFrom(cstate
);
1253 logicalrep_rel_close(relmapentry
, NoLock
);
1257 * Determine the tablesync slot name.
1259 * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1260 * on slot name length. We append system_identifier to avoid slot_name
1261 * collision with subscriptions in other clusters. With the current scheme
1262 * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1263 * length of slot_name will be 50.
1265 * The returned slot name is stored in the supplied buffer (syncslotname) with
1268 * Note: We don't use the subscription slot name as part of tablesync slot name
1269 * because we are responsible for cleaning up these slots and it could become
1270 * impossible to recalculate what name to cleanup if the subscription slot name
1274 ReplicationSlotNameForTablesync(Oid suboid
, Oid relid
,
1275 char *syncslotname
, Size szslot
)
1277 snprintf(syncslotname
, szslot
, "pg_%u_sync_%u_" UINT64_FORMAT
, suboid
,
1278 relid
, GetSystemIdentifier());
1282 * Start syncing the table in the sync worker.
1284 * If nothing needs to be done to sync the table, we exit the worker without
1285 * any further action.
1287 * The returned slot name is palloc'ed in current memory context.
1290 LogicalRepSyncTableStart(XLogRecPtr
*origin_startpos
)
1295 XLogRecPtr relstate_lsn
;
1297 AclResult aclresult
;
1298 WalRcvExecResult
*res
;
1299 char originname
[NAMEDATALEN
];
1300 RepOriginId originid
;
1302 bool must_use_password
;
1305 /* Check the state of the table synchronization. */
1306 StartTransactionCommand();
1307 relstate
= GetSubscriptionRelState(MyLogicalRepWorker
->subid
,
1308 MyLogicalRepWorker
->relid
,
1310 CommitTransactionCommand();
1312 /* Is the use of a password mandatory? */
1313 must_use_password
= MySubscription
->passwordrequired
&&
1314 !MySubscription
->ownersuperuser
;
1316 SpinLockAcquire(&MyLogicalRepWorker
->relmutex
);
1317 MyLogicalRepWorker
->relstate
= relstate
;
1318 MyLogicalRepWorker
->relstate_lsn
= relstate_lsn
;
1319 SpinLockRelease(&MyLogicalRepWorker
->relmutex
);
1322 * If synchronization is already done or no longer necessary, exit now
1323 * that we've updated shared memory state.
1327 case SUBREL_STATE_SYNCDONE
:
1328 case SUBREL_STATE_READY
:
1329 case SUBREL_STATE_UNKNOWN
:
1330 finish_sync_worker(); /* doesn't return */
1333 /* Calculate the name of the tablesync slot. */
1334 slotname
= (char *) palloc(NAMEDATALEN
);
1335 ReplicationSlotNameForTablesync(MySubscription
->oid
,
1336 MyLogicalRepWorker
->relid
,
1341 * Here we use the slot name instead of the subscription name as the
1342 * application_name, so that it is different from the leader apply worker,
1343 * so that synchronous replication can distinguish them.
1345 LogRepWorkerWalRcvConn
=
1346 walrcv_connect(MySubscription
->conninfo
, true, true,
1349 if (LogRepWorkerWalRcvConn
== NULL
)
1351 (errcode(ERRCODE_CONNECTION_FAILURE
),
1352 errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1353 MySubscription
->name
, err
)));
1355 Assert(MyLogicalRepWorker
->relstate
== SUBREL_STATE_INIT
||
1356 MyLogicalRepWorker
->relstate
== SUBREL_STATE_DATASYNC
||
1357 MyLogicalRepWorker
->relstate
== SUBREL_STATE_FINISHEDCOPY
);
1359 /* Assign the origin tracking record name. */
1360 ReplicationOriginNameForLogicalRep(MySubscription
->oid
,
1361 MyLogicalRepWorker
->relid
,
1363 sizeof(originname
));
1365 if (MyLogicalRepWorker
->relstate
== SUBREL_STATE_DATASYNC
)
1368 * We have previously errored out before finishing the copy so the
1369 * replication slot might exist. We want to remove the slot if it
1370 * already exists and proceed.
1372 * XXX We could also instead try to drop the slot, last time we failed
1373 * but for that, we might need to clean up the copy state as it might
1374 * be in the middle of fetching the rows. Also, if there is a network
1375 * breakdown then it wouldn't have succeeded so trying it next time
1376 * seems like a better bet.
1378 ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn
, slotname
, true);
1380 else if (MyLogicalRepWorker
->relstate
== SUBREL_STATE_FINISHEDCOPY
)
1383 * The COPY phase was previously done, but tablesync then crashed
1384 * before it was able to finish normally.
1386 StartTransactionCommand();
1389 * The origin tracking name must already exist. It was created first
1390 * time this tablesync was launched.
1392 originid
= replorigin_by_name(originname
, false);
1393 replorigin_session_setup(originid
, 0);
1394 replorigin_session_origin
= originid
;
1395 *origin_startpos
= replorigin_session_get_progress(false);
1397 CommitTransactionCommand();
1399 goto copy_table_done
;
1402 SpinLockAcquire(&MyLogicalRepWorker
->relmutex
);
1403 MyLogicalRepWorker
->relstate
= SUBREL_STATE_DATASYNC
;
1404 MyLogicalRepWorker
->relstate_lsn
= InvalidXLogRecPtr
;
1405 SpinLockRelease(&MyLogicalRepWorker
->relmutex
);
1407 /* Update the state and make it visible to others. */
1408 StartTransactionCommand();
1409 UpdateSubscriptionRelState(MyLogicalRepWorker
->subid
,
1410 MyLogicalRepWorker
->relid
,
1411 MyLogicalRepWorker
->relstate
,
1412 MyLogicalRepWorker
->relstate_lsn
);
1413 CommitTransactionCommand();
1414 pgstat_report_stat(true);
1416 StartTransactionCommand();
1419 * Use a standard write lock here. It might be better to disallow access
1420 * to the table while it's being synchronized. But we don't want to block
1421 * the main apply process from working and it has to open the relation in
1422 * RowExclusiveLock when remapping remote relation id to local one.
1424 rel
= table_open(MyLogicalRepWorker
->relid
, RowExclusiveLock
);
1427 * Start a transaction in the remote node in REPEATABLE READ mode. This
1428 * ensures that both the replication slot we create (see below) and the
1429 * COPY are consistent with each other.
1431 res
= walrcv_exec(LogRepWorkerWalRcvConn
,
1432 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1434 if (res
->status
!= WALRCV_OK_COMMAND
)
1436 (errcode(ERRCODE_CONNECTION_FAILURE
),
1437 errmsg("table copy could not start transaction on publisher: %s",
1439 walrcv_clear_result(res
);
1442 * Create a new permanent logical decoding slot. This slot will be used
1443 * for the catchup phase after COPY is done, so tell it to use the
1444 * snapshot to make the final data consistent.
1446 walrcv_create_slot(LogRepWorkerWalRcvConn
,
1447 slotname
, false /* permanent */ , false /* two_phase */ ,
1448 MySubscription
->failover
,
1449 CRS_USE_SNAPSHOT
, origin_startpos
);
1452 * Setup replication origin tracking. The purpose of doing this before the
1453 * copy is to avoid doing the copy again due to any error in setting up
1456 originid
= replorigin_by_name(originname
, true);
1457 if (!OidIsValid(originid
))
1460 * Origin tracking does not exist, so create it now.
1462 * Then advance to the LSN got from walrcv_create_slot. This is WAL
1463 * logged for the purpose of recovery. Locks are to prevent the
1464 * replication origin from vanishing while advancing.
1466 originid
= replorigin_create(originname
);
1468 LockRelationOid(ReplicationOriginRelationId
, RowExclusiveLock
);
1469 replorigin_advance(originid
, *origin_startpos
, InvalidXLogRecPtr
,
1470 true /* go backward */ , true /* WAL log */ );
1471 UnlockRelationOid(ReplicationOriginRelationId
, RowExclusiveLock
);
1473 replorigin_session_setup(originid
, 0);
1474 replorigin_session_origin
= originid
;
1479 (errcode(ERRCODE_DUPLICATE_OBJECT
),
1480 errmsg("replication origin \"%s\" already exists",
1485 * Make sure that the copy command runs as the table owner, unless the
1486 * user has opted out of that behaviour.
1488 run_as_owner
= MySubscription
->runasowner
;
1490 SwitchToUntrustedUser(rel
->rd_rel
->relowner
, &ucxt
);
1493 * Check that our table sync worker has permission to insert into the
1496 aclresult
= pg_class_aclcheck(RelationGetRelid(rel
), GetUserId(),
1498 if (aclresult
!= ACLCHECK_OK
)
1499 aclcheck_error(aclresult
,
1500 get_relkind_objtype(rel
->rd_rel
->relkind
),
1501 RelationGetRelationName(rel
));
1504 * COPY FROM does not honor RLS policies. That is not a problem for
1505 * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1506 * who has it implicitly), but other roles should not be able to
1507 * circumvent RLS. Disallow logical replication into RLS enabled
1508 * relations for such roles.
1510 if (check_enable_rls(RelationGetRelid(rel
), InvalidOid
, false) == RLS_ENABLED
)
1512 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
1513 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1514 GetUserNameFromId(GetUserId(), true),
1515 RelationGetRelationName(rel
))));
1517 /* Now do the initial data copy */
1518 PushActiveSnapshot(GetTransactionSnapshot());
1520 PopActiveSnapshot();
1522 res
= walrcv_exec(LogRepWorkerWalRcvConn
, "COMMIT", 0, NULL
);
1523 if (res
->status
!= WALRCV_OK_COMMAND
)
1525 (errcode(ERRCODE_CONNECTION_FAILURE
),
1526 errmsg("table copy could not finish transaction on publisher: %s",
1528 walrcv_clear_result(res
);
1531 RestoreUserContext(&ucxt
);
1533 table_close(rel
, NoLock
);
1535 /* Make the copy visible. */
1536 CommandCounterIncrement();
1539 * Update the persisted state to indicate the COPY phase is done; make it
1540 * visible to others.
1542 UpdateSubscriptionRelState(MyLogicalRepWorker
->subid
,
1543 MyLogicalRepWorker
->relid
,
1544 SUBREL_STATE_FINISHEDCOPY
,
1545 MyLogicalRepWorker
->relstate_lsn
);
1547 CommitTransactionCommand();
1552 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1553 originname
, LSN_FORMAT_ARGS(*origin_startpos
));
1556 * We are done with the initial data synchronization, update the state.
1558 SpinLockAcquire(&MyLogicalRepWorker
->relmutex
);
1559 MyLogicalRepWorker
->relstate
= SUBREL_STATE_SYNCWAIT
;
1560 MyLogicalRepWorker
->relstate_lsn
= *origin_startpos
;
1561 SpinLockRelease(&MyLogicalRepWorker
->relmutex
);
1564 * Finally, wait until the leader apply worker tells us to catch up and
1565 * then return to let LogicalRepApplyLoop do it.
1567 wait_for_worker_state_change(SUBREL_STATE_CATCHUP
);
1572 * Common code to fetch the up-to-date sync state info into the static lists.
1574 * Returns true if subscription has 1 or more tables, else false.
1576 * Note: If this function started the transaction (indicated by the parameter)
1577 * then it is the caller's responsibility to commit it.
1580 FetchTableStates(bool *started_tx
)
1582 static bool has_subrels
= false;
1584 *started_tx
= false;
1586 if (table_states_validity
!= SYNC_TABLE_STATE_VALID
)
1588 MemoryContext oldctx
;
1591 SubscriptionRelState
*rstate
;
1593 table_states_validity
= SYNC_TABLE_STATE_REBUILD_STARTED
;
1595 /* Clean the old lists. */
1596 list_free_deep(table_states_not_ready
);
1597 table_states_not_ready
= NIL
;
1599 if (!IsTransactionState())
1601 StartTransactionCommand();
1605 /* Fetch all non-ready tables. */
1606 rstates
= GetSubscriptionRelations(MySubscription
->oid
, true);
1608 /* Allocate the tracking info in a permanent memory context. */
1609 oldctx
= MemoryContextSwitchTo(CacheMemoryContext
);
1610 foreach(lc
, rstates
)
1612 rstate
= palloc(sizeof(SubscriptionRelState
));
1613 memcpy(rstate
, lfirst(lc
), sizeof(SubscriptionRelState
));
1614 table_states_not_ready
= lappend(table_states_not_ready
, rstate
);
1616 MemoryContextSwitchTo(oldctx
);
1619 * Does the subscription have tables?
1621 * If there were not-READY relations found then we know it does. But
1622 * if table_states_not_ready was empty we still need to check again to
1623 * see if there are 0 tables.
1625 has_subrels
= (table_states_not_ready
!= NIL
) ||
1626 HasSubscriptionRelations(MySubscription
->oid
);
1629 * If the subscription relation cache has been invalidated since we
1630 * entered this routine, we still use and return the relations we just
1631 * finished constructing, to avoid infinite loops, but we leave the
1632 * table states marked as stale so that we'll rebuild it again on next
1633 * access. Otherwise, we mark the table states as valid.
1635 if (table_states_validity
== SYNC_TABLE_STATE_REBUILD_STARTED
)
1636 table_states_validity
= SYNC_TABLE_STATE_VALID
;
1643 * Execute the initial sync with error handling. Disable the subscription,
1646 * Allocate the slot name in long-lived context on return. Note that we don't
1647 * handle FATAL errors which are probably because of system resource error and
1648 * are not repeatable.
1651 start_table_sync(XLogRecPtr
*origin_startpos
, char **slotname
)
1653 char *sync_slotname
= NULL
;
1655 Assert(am_tablesync_worker());
1659 /* Call initial sync. */
1660 sync_slotname
= LogicalRepSyncTableStart(origin_startpos
);
1664 if (MySubscription
->disableonerr
)
1665 DisableSubscriptionAndExit();
1669 * Report the worker failed during table synchronization. Abort
1670 * the current transaction so that the stats message is sent in an
1673 AbortOutOfAnyTransaction();
1674 pgstat_report_subscription_error(MySubscription
->oid
, false);
1681 /* allocate slot name in long-lived context */
1682 *slotname
= MemoryContextStrdup(ApplyContext
, sync_slotname
);
1683 pfree(sync_slotname
);
1687 * Runs the tablesync worker.
1689 * It starts syncing tables. After a successful sync, sets streaming options
1690 * and starts streaming to catchup with apply worker.
1693 run_tablesync_worker()
1695 char originname
[NAMEDATALEN
];
1696 XLogRecPtr origin_startpos
= InvalidXLogRecPtr
;
1697 char *slotname
= NULL
;
1698 WalRcvStreamOptions options
;
1700 start_table_sync(&origin_startpos
, &slotname
);
1702 ReplicationOriginNameForLogicalRep(MySubscription
->oid
,
1703 MyLogicalRepWorker
->relid
,
1705 sizeof(originname
));
1707 set_apply_error_context_origin(originname
);
1709 set_stream_options(&options
, slotname
, &origin_startpos
);
1711 walrcv_startstreaming(LogRepWorkerWalRcvConn
, &options
);
1713 /* Apply the changes till we catchup with the apply worker. */
1714 start_apply(origin_startpos
);
1717 /* Logical Replication Tablesync worker entry point */
1719 TablesyncWorkerMain(Datum main_arg
)
1721 int worker_slot
= DatumGetInt32(main_arg
);
1723 SetupApplyOrSyncWorker(worker_slot
);
1725 run_tablesync_worker();
1727 finish_sync_worker();
1731 * If the subscription has no tables then return false.
1733 * Otherwise, are all tablesyncs READY?
1735 * Note: This function is not suitable to be called from outside of apply or
1736 * tablesync workers because MySubscription needs to be already initialized.
1739 AllTablesyncsReady(void)
1741 bool started_tx
= false;
1742 bool has_subrels
= false;
1744 /* We need up-to-date sync state info for subscription tables here. */
1745 has_subrels
= FetchTableStates(&started_tx
);
1749 CommitTransactionCommand();
1750 pgstat_report_stat(true);
1754 * Return false when there are no tables in subscription or not all tables
1755 * are in ready state; true otherwise.
1757 return has_subrels
&& (table_states_not_ready
== NIL
);
1761 * Update the two_phase state of the specified subscription in pg_subscription.
1764 UpdateTwoPhaseState(Oid suboid
, char new_state
)
1768 bool nulls
[Natts_pg_subscription
];
1769 bool replaces
[Natts_pg_subscription
];
1770 Datum values
[Natts_pg_subscription
];
1772 Assert(new_state
== LOGICALREP_TWOPHASE_STATE_DISABLED
||
1773 new_state
== LOGICALREP_TWOPHASE_STATE_PENDING
||
1774 new_state
== LOGICALREP_TWOPHASE_STATE_ENABLED
);
1776 rel
= table_open(SubscriptionRelationId
, RowExclusiveLock
);
1777 tup
= SearchSysCacheCopy1(SUBSCRIPTIONOID
, ObjectIdGetDatum(suboid
));
1778 if (!HeapTupleIsValid(tup
))
1780 "cache lookup failed for subscription oid %u",
1783 /* Form a new tuple. */
1784 memset(values
, 0, sizeof(values
));
1785 memset(nulls
, false, sizeof(nulls
));
1786 memset(replaces
, false, sizeof(replaces
));
1788 /* And update/set two_phase state */
1789 values
[Anum_pg_subscription_subtwophasestate
- 1] = CharGetDatum(new_state
);
1790 replaces
[Anum_pg_subscription_subtwophasestate
- 1] = true;
1792 tup
= heap_modify_tuple(tup
, RelationGetDescr(rel
),
1793 values
, nulls
, replaces
);
1794 CatalogTupleUpdate(rel
, &tup
->t_self
, tup
);
1796 heap_freetuple(tup
);
1797 table_close(rel
, RowExclusiveLock
);