1 /*-------------------------------------------------------------------------
4 * replication subscriptions
6 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/backend/catalog/pg_subscription.c
12 *-------------------------------------------------------------------------
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/tableam.h"
21 #include "catalog/indexing.h"
22 #include "catalog/pg_subscription.h"
23 #include "catalog/pg_subscription_rel.h"
24 #include "catalog/pg_type.h"
25 #include "miscadmin.h"
26 #include "storage/lmgr.h"
27 #include "utils/array.h"
28 #include "utils/builtins.h"
29 #include "utils/fmgroids.h"
30 #include "utils/lsyscache.h"
31 #include "utils/pg_lsn.h"
32 #include "utils/rel.h"
33 #include "utils/syscache.h"
35 static List
*textarray_to_stringlist(ArrayType
*textarray
);
38 * Add a comma-separated list of publication names to the 'dest' string.
41 GetPublicationsStr(List
*publications
, StringInfo dest
, bool quote_literal
)
46 Assert(publications
!= NIL
);
48 foreach(lc
, publications
)
50 char *pubname
= strVal(lfirst(lc
));
55 appendStringInfoString(dest
, ", ");
58 appendStringInfoString(dest
, quote_literal_cstr(pubname
));
61 appendStringInfoChar(dest
, '"');
62 appendStringInfoString(dest
, pubname
);
63 appendStringInfoChar(dest
, '"');
69 * Fetch the subscription from the syscache.
72 GetSubscription(Oid subid
, bool missing_ok
)
76 Form_pg_subscription subform
;
80 tup
= SearchSysCache1(SUBSCRIPTIONOID
, ObjectIdGetDatum(subid
));
82 if (!HeapTupleIsValid(tup
))
87 elog(ERROR
, "cache lookup failed for subscription %u", subid
);
90 subform
= (Form_pg_subscription
) GETSTRUCT(tup
);
92 sub
= (Subscription
*) palloc(sizeof(Subscription
));
94 sub
->dbid
= subform
->subdbid
;
95 sub
->skiplsn
= subform
->subskiplsn
;
96 sub
->name
= pstrdup(NameStr(subform
->subname
));
97 sub
->owner
= subform
->subowner
;
98 sub
->enabled
= subform
->subenabled
;
99 sub
->binary
= subform
->subbinary
;
100 sub
->stream
= subform
->substream
;
101 sub
->twophasestate
= subform
->subtwophasestate
;
102 sub
->disableonerr
= subform
->subdisableonerr
;
103 sub
->passwordrequired
= subform
->subpasswordrequired
;
104 sub
->runasowner
= subform
->subrunasowner
;
105 sub
->failover
= subform
->subfailover
;
108 datum
= SysCacheGetAttrNotNull(SUBSCRIPTIONOID
,
110 Anum_pg_subscription_subconninfo
);
111 sub
->conninfo
= TextDatumGetCString(datum
);
114 datum
= SysCacheGetAttr(SUBSCRIPTIONOID
,
116 Anum_pg_subscription_subslotname
,
119 sub
->slotname
= pstrdup(NameStr(*DatumGetName(datum
)));
121 sub
->slotname
= NULL
;
124 datum
= SysCacheGetAttrNotNull(SUBSCRIPTIONOID
,
126 Anum_pg_subscription_subsynccommit
);
127 sub
->synccommit
= TextDatumGetCString(datum
);
129 /* Get publications */
130 datum
= SysCacheGetAttrNotNull(SUBSCRIPTIONOID
,
132 Anum_pg_subscription_subpublications
);
133 sub
->publications
= textarray_to_stringlist(DatumGetArrayTypeP(datum
));
136 datum
= SysCacheGetAttrNotNull(SUBSCRIPTIONOID
,
138 Anum_pg_subscription_suborigin
);
139 sub
->origin
= TextDatumGetCString(datum
);
141 /* Is the subscription owner a superuser? */
142 sub
->ownersuperuser
= superuser_arg(sub
->owner
);
144 ReleaseSysCache(tup
);
150 * Return number of subscriptions defined in given database.
151 * Used by dropdb() to check if database can indeed be dropped.
154 CountDBSubscriptions(Oid dbid
)
162 rel
= table_open(SubscriptionRelationId
, RowExclusiveLock
);
164 ScanKeyInit(&scankey
,
165 Anum_pg_subscription_subdbid
,
166 BTEqualStrategyNumber
, F_OIDEQ
,
167 ObjectIdGetDatum(dbid
));
169 scan
= systable_beginscan(rel
, InvalidOid
, false,
172 while (HeapTupleIsValid(tup
= systable_getnext(scan
)))
175 systable_endscan(scan
);
177 table_close(rel
, NoLock
);
183 * Free memory allocated by subscription struct.
186 FreeSubscription(Subscription
*sub
)
189 pfree(sub
->conninfo
);
191 pfree(sub
->slotname
);
192 list_free_deep(sub
->publications
);
197 * Disable the given subscription.
200 DisableSubscription(Oid subid
)
203 bool nulls
[Natts_pg_subscription
];
204 bool replaces
[Natts_pg_subscription
];
205 Datum values
[Natts_pg_subscription
];
208 /* Look up the subscription in the catalog */
209 rel
= table_open(SubscriptionRelationId
, RowExclusiveLock
);
210 tup
= SearchSysCacheCopy1(SUBSCRIPTIONOID
, ObjectIdGetDatum(subid
));
212 if (!HeapTupleIsValid(tup
))
213 elog(ERROR
, "cache lookup failed for subscription %u", subid
);
215 LockSharedObject(SubscriptionRelationId
, subid
, 0, AccessShareLock
);
217 /* Form a new tuple. */
218 memset(values
, 0, sizeof(values
));
219 memset(nulls
, false, sizeof(nulls
));
220 memset(replaces
, false, sizeof(replaces
));
222 /* Set the subscription to disabled. */
223 values
[Anum_pg_subscription_subenabled
- 1] = BoolGetDatum(false);
224 replaces
[Anum_pg_subscription_subenabled
- 1] = true;
226 /* Update the catalog */
227 tup
= heap_modify_tuple(tup
, RelationGetDescr(rel
), values
, nulls
,
229 CatalogTupleUpdate(rel
, &tup
->t_self
, tup
);
232 table_close(rel
, NoLock
);
236 * Convert text array to list of strings.
238 * Note: the resulting list of strings is pallocated here.
241 textarray_to_stringlist(ArrayType
*textarray
)
248 deconstruct_array_builtin(textarray
, TEXTOID
, &elems
, NULL
, &nelems
);
253 for (i
= 0; i
< nelems
; i
++)
254 res
= lappend(res
, makeString(TextDatumGetCString(elems
[i
])));
260 * Add new state record for a subscription table.
262 * If retain_lock is true, then don't release the locks taken in this function.
263 * We normally release the locks at the end of transaction but in binary-upgrade
264 * mode, we expect to release those immediately.
267 AddSubscriptionRelState(Oid subid
, Oid relid
, char state
,
268 XLogRecPtr sublsn
, bool retain_lock
)
272 bool nulls
[Natts_pg_subscription_rel
];
273 Datum values
[Natts_pg_subscription_rel
];
275 LockSharedObject(SubscriptionRelationId
, subid
, 0, AccessShareLock
);
277 rel
= table_open(SubscriptionRelRelationId
, RowExclusiveLock
);
279 /* Try finding existing mapping. */
280 tup
= SearchSysCacheCopy2(SUBSCRIPTIONRELMAP
,
281 ObjectIdGetDatum(relid
),
282 ObjectIdGetDatum(subid
));
283 if (HeapTupleIsValid(tup
))
284 elog(ERROR
, "subscription table %u in subscription %u already exists",
287 /* Form the tuple. */
288 memset(values
, 0, sizeof(values
));
289 memset(nulls
, false, sizeof(nulls
));
290 values
[Anum_pg_subscription_rel_srsubid
- 1] = ObjectIdGetDatum(subid
);
291 values
[Anum_pg_subscription_rel_srrelid
- 1] = ObjectIdGetDatum(relid
);
292 values
[Anum_pg_subscription_rel_srsubstate
- 1] = CharGetDatum(state
);
293 if (sublsn
!= InvalidXLogRecPtr
)
294 values
[Anum_pg_subscription_rel_srsublsn
- 1] = LSNGetDatum(sublsn
);
296 nulls
[Anum_pg_subscription_rel_srsublsn
- 1] = true;
298 tup
= heap_form_tuple(RelationGetDescr(rel
), values
, nulls
);
300 /* Insert tuple into catalog. */
301 CatalogTupleInsert(rel
, tup
);
308 table_close(rel
, NoLock
);
312 table_close(rel
, RowExclusiveLock
);
313 UnlockSharedObject(SubscriptionRelationId
, subid
, 0, AccessShareLock
);
318 * Update the state of a subscription table.
321 UpdateSubscriptionRelState(Oid subid
, Oid relid
, char state
,
326 bool nulls
[Natts_pg_subscription_rel
];
327 Datum values
[Natts_pg_subscription_rel
];
328 bool replaces
[Natts_pg_subscription_rel
];
330 LockSharedObject(SubscriptionRelationId
, subid
, 0, AccessShareLock
);
332 rel
= table_open(SubscriptionRelRelationId
, RowExclusiveLock
);
334 /* Try finding existing mapping. */
335 tup
= SearchSysCacheCopy2(SUBSCRIPTIONRELMAP
,
336 ObjectIdGetDatum(relid
),
337 ObjectIdGetDatum(subid
));
338 if (!HeapTupleIsValid(tup
))
339 elog(ERROR
, "subscription table %u in subscription %u does not exist",
342 /* Update the tuple. */
343 memset(values
, 0, sizeof(values
));
344 memset(nulls
, false, sizeof(nulls
));
345 memset(replaces
, false, sizeof(replaces
));
347 replaces
[Anum_pg_subscription_rel_srsubstate
- 1] = true;
348 values
[Anum_pg_subscription_rel_srsubstate
- 1] = CharGetDatum(state
);
350 replaces
[Anum_pg_subscription_rel_srsublsn
- 1] = true;
351 if (sublsn
!= InvalidXLogRecPtr
)
352 values
[Anum_pg_subscription_rel_srsublsn
- 1] = LSNGetDatum(sublsn
);
354 nulls
[Anum_pg_subscription_rel_srsublsn
- 1] = true;
356 tup
= heap_modify_tuple(tup
, RelationGetDescr(rel
), values
, nulls
,
359 /* Update the catalog. */
360 CatalogTupleUpdate(rel
, &tup
->t_self
, tup
);
363 table_close(rel
, NoLock
);
367 * Get state of subscription table.
369 * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
372 GetSubscriptionRelState(Oid subid
, Oid relid
, XLogRecPtr
*sublsn
)
381 * This is to avoid the race condition with AlterSubscription which tries
382 * to remove this relstate.
384 rel
= table_open(SubscriptionRelRelationId
, AccessShareLock
);
386 /* Try finding the mapping. */
387 tup
= SearchSysCache2(SUBSCRIPTIONRELMAP
,
388 ObjectIdGetDatum(relid
),
389 ObjectIdGetDatum(subid
));
391 if (!HeapTupleIsValid(tup
))
393 table_close(rel
, AccessShareLock
);
394 *sublsn
= InvalidXLogRecPtr
;
395 return SUBREL_STATE_UNKNOWN
;
399 substate
= ((Form_pg_subscription_rel
) GETSTRUCT(tup
))->srsubstate
;
402 d
= SysCacheGetAttr(SUBSCRIPTIONRELMAP
, tup
,
403 Anum_pg_subscription_rel_srsublsn
, &isnull
);
405 *sublsn
= InvalidXLogRecPtr
;
407 *sublsn
= DatumGetLSN(d
);
410 ReleaseSysCache(tup
);
412 table_close(rel
, AccessShareLock
);
418 * Drop subscription relation mapping. These can be for a particular
419 * subscription, or for a particular relation, or both.
422 RemoveSubscriptionRel(Oid subid
, Oid relid
)
430 rel
= table_open(SubscriptionRelRelationId
, RowExclusiveLock
);
432 if (OidIsValid(subid
))
434 ScanKeyInit(&skey
[nkeys
++],
435 Anum_pg_subscription_rel_srsubid
,
436 BTEqualStrategyNumber
,
438 ObjectIdGetDatum(subid
));
441 if (OidIsValid(relid
))
443 ScanKeyInit(&skey
[nkeys
++],
444 Anum_pg_subscription_rel_srrelid
,
445 BTEqualStrategyNumber
,
447 ObjectIdGetDatum(relid
));
450 /* Do the search and delete what we found. */
451 scan
= table_beginscan_catalog(rel
, nkeys
, skey
);
452 while (HeapTupleIsValid(tup
= heap_getnext(scan
, ForwardScanDirection
)))
454 Form_pg_subscription_rel subrel
;
456 subrel
= (Form_pg_subscription_rel
) GETSTRUCT(tup
);
459 * We don't allow to drop the relation mapping when the table
460 * synchronization is in progress unless the caller updates the
461 * corresponding subscription as well. This is to ensure that we don't
462 * leave tablesync slots or origins in the system when the
463 * corresponding table is dropped.
465 if (!OidIsValid(subid
) && subrel
->srsubstate
!= SUBREL_STATE_READY
)
468 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
469 errmsg("could not drop relation mapping for subscription \"%s\"",
470 get_subscription_name(subrel
->srsubid
, false)),
471 errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
472 get_rel_name(relid
), subrel
->srsubstate
),
475 * translator: first %s is a SQL ALTER command and second %s is a
478 errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
479 "ALTER SUBSCRIPTION ... ENABLE",
480 "DROP SUBSCRIPTION ...")));
483 CatalogTupleDelete(rel
, &tup
->t_self
);
487 table_close(rel
, RowExclusiveLock
);
491 * Does the subscription have any relations?
493 * Use this function only to know true/false, and when you have no need for the
494 * List returned by GetSubscriptionRelations.
497 HasSubscriptionRelations(Oid subid
)
504 rel
= table_open(SubscriptionRelRelationId
, AccessShareLock
);
506 ScanKeyInit(&skey
[0],
507 Anum_pg_subscription_rel_srsubid
,
508 BTEqualStrategyNumber
, F_OIDEQ
,
509 ObjectIdGetDatum(subid
));
511 scan
= systable_beginscan(rel
, InvalidOid
, false,
514 /* If even a single tuple exists then the subscription has tables. */
515 has_subrels
= HeapTupleIsValid(systable_getnext(scan
));
518 systable_endscan(scan
);
519 table_close(rel
, AccessShareLock
);
525 * Get the relations for the subscription.
527 * If not_ready is true, return only the relations that are not in a ready
528 * state, otherwise return all the relations of the subscription. The
529 * returned list is palloc'ed in the current memory context.
532 GetSubscriptionRelations(Oid subid
, bool not_ready
)
541 rel
= table_open(SubscriptionRelRelationId
, AccessShareLock
);
543 ScanKeyInit(&skey
[nkeys
++],
544 Anum_pg_subscription_rel_srsubid
,
545 BTEqualStrategyNumber
, F_OIDEQ
,
546 ObjectIdGetDatum(subid
));
549 ScanKeyInit(&skey
[nkeys
++],
550 Anum_pg_subscription_rel_srsubstate
,
551 BTEqualStrategyNumber
, F_CHARNE
,
552 CharGetDatum(SUBREL_STATE_READY
));
554 scan
= systable_beginscan(rel
, InvalidOid
, false,
557 while (HeapTupleIsValid(tup
= systable_getnext(scan
)))
559 Form_pg_subscription_rel subrel
;
560 SubscriptionRelState
*relstate
;
564 subrel
= (Form_pg_subscription_rel
) GETSTRUCT(tup
);
566 relstate
= (SubscriptionRelState
*) palloc(sizeof(SubscriptionRelState
));
567 relstate
->relid
= subrel
->srrelid
;
568 relstate
->state
= subrel
->srsubstate
;
569 d
= SysCacheGetAttr(SUBSCRIPTIONRELMAP
, tup
,
570 Anum_pg_subscription_rel_srsublsn
, &isnull
);
572 relstate
->lsn
= InvalidXLogRecPtr
;
574 relstate
->lsn
= DatumGetLSN(d
);
576 res
= lappend(res
, relstate
);
580 systable_endscan(scan
);
581 table_close(rel
, AccessShareLock
);