Fix a memory leak in dumping functions with TRANSFORMs
[pgsql.git] / src / backend / catalog / pg_subscription.c
blob89bf5ec9337ec9e5d3aef65a9dac3dfa3ab89ff8
1 /*-------------------------------------------------------------------------
3 * pg_subscription.c
4 * replication subscriptions
6 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
9 * IDENTIFICATION
10 * src/backend/catalog/pg_subscription.c
12 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/tableam.h"
21 #include "catalog/indexing.h"
22 #include "catalog/pg_subscription.h"
23 #include "catalog/pg_subscription_rel.h"
24 #include "catalog/pg_type.h"
25 #include "miscadmin.h"
26 #include "storage/lmgr.h"
27 #include "utils/array.h"
28 #include "utils/builtins.h"
29 #include "utils/fmgroids.h"
30 #include "utils/lsyscache.h"
31 #include "utils/pg_lsn.h"
32 #include "utils/rel.h"
33 #include "utils/syscache.h"
35 static List *textarray_to_stringlist(ArrayType *textarray);
38 * Add a comma-separated list of publication names to the 'dest' string.
40 void
41 GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
43 ListCell *lc;
44 bool first = true;
46 Assert(publications != NIL);
48 foreach(lc, publications)
50 char *pubname = strVal(lfirst(lc));
52 if (first)
53 first = false;
54 else
55 appendStringInfoString(dest, ", ");
57 if (quote_literal)
58 appendStringInfoString(dest, quote_literal_cstr(pubname));
59 else
61 appendStringInfoChar(dest, '"');
62 appendStringInfoString(dest, pubname);
63 appendStringInfoChar(dest, '"');
69 * Fetch the subscription from the syscache.
71 Subscription *
72 GetSubscription(Oid subid, bool missing_ok)
74 HeapTuple tup;
75 Subscription *sub;
76 Form_pg_subscription subform;
77 Datum datum;
78 bool isnull;
80 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
82 if (!HeapTupleIsValid(tup))
84 if (missing_ok)
85 return NULL;
87 elog(ERROR, "cache lookup failed for subscription %u", subid);
90 subform = (Form_pg_subscription) GETSTRUCT(tup);
92 sub = (Subscription *) palloc(sizeof(Subscription));
93 sub->oid = subid;
94 sub->dbid = subform->subdbid;
95 sub->skiplsn = subform->subskiplsn;
96 sub->name = pstrdup(NameStr(subform->subname));
97 sub->owner = subform->subowner;
98 sub->enabled = subform->subenabled;
99 sub->binary = subform->subbinary;
100 sub->stream = subform->substream;
101 sub->twophasestate = subform->subtwophasestate;
102 sub->disableonerr = subform->subdisableonerr;
103 sub->passwordrequired = subform->subpasswordrequired;
104 sub->runasowner = subform->subrunasowner;
105 sub->failover = subform->subfailover;
107 /* Get conninfo */
108 datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
109 tup,
110 Anum_pg_subscription_subconninfo);
111 sub->conninfo = TextDatumGetCString(datum);
113 /* Get slotname */
114 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
115 tup,
116 Anum_pg_subscription_subslotname,
117 &isnull);
118 if (!isnull)
119 sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
120 else
121 sub->slotname = NULL;
123 /* Get synccommit */
124 datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
125 tup,
126 Anum_pg_subscription_subsynccommit);
127 sub->synccommit = TextDatumGetCString(datum);
129 /* Get publications */
130 datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
131 tup,
132 Anum_pg_subscription_subpublications);
133 sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
135 /* Get origin */
136 datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
137 tup,
138 Anum_pg_subscription_suborigin);
139 sub->origin = TextDatumGetCString(datum);
141 /* Is the subscription owner a superuser? */
142 sub->ownersuperuser = superuser_arg(sub->owner);
144 ReleaseSysCache(tup);
146 return sub;
150 * Return number of subscriptions defined in given database.
151 * Used by dropdb() to check if database can indeed be dropped.
154 CountDBSubscriptions(Oid dbid)
156 int nsubs = 0;
157 Relation rel;
158 ScanKeyData scankey;
159 SysScanDesc scan;
160 HeapTuple tup;
162 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
164 ScanKeyInit(&scankey,
165 Anum_pg_subscription_subdbid,
166 BTEqualStrategyNumber, F_OIDEQ,
167 ObjectIdGetDatum(dbid));
169 scan = systable_beginscan(rel, InvalidOid, false,
170 NULL, 1, &scankey);
172 while (HeapTupleIsValid(tup = systable_getnext(scan)))
173 nsubs++;
175 systable_endscan(scan);
177 table_close(rel, NoLock);
179 return nsubs;
183 * Free memory allocated by subscription struct.
185 void
186 FreeSubscription(Subscription *sub)
188 pfree(sub->name);
189 pfree(sub->conninfo);
190 if (sub->slotname)
191 pfree(sub->slotname);
192 list_free_deep(sub->publications);
193 pfree(sub);
197 * Disable the given subscription.
199 void
200 DisableSubscription(Oid subid)
202 Relation rel;
203 bool nulls[Natts_pg_subscription];
204 bool replaces[Natts_pg_subscription];
205 Datum values[Natts_pg_subscription];
206 HeapTuple tup;
208 /* Look up the subscription in the catalog */
209 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
210 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
212 if (!HeapTupleIsValid(tup))
213 elog(ERROR, "cache lookup failed for subscription %u", subid);
215 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
217 /* Form a new tuple. */
218 memset(values, 0, sizeof(values));
219 memset(nulls, false, sizeof(nulls));
220 memset(replaces, false, sizeof(replaces));
222 /* Set the subscription to disabled. */
223 values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
224 replaces[Anum_pg_subscription_subenabled - 1] = true;
226 /* Update the catalog */
227 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
228 replaces);
229 CatalogTupleUpdate(rel, &tup->t_self, tup);
230 heap_freetuple(tup);
232 table_close(rel, NoLock);
236 * Convert text array to list of strings.
238 * Note: the resulting list of strings is pallocated here.
240 static List *
241 textarray_to_stringlist(ArrayType *textarray)
243 Datum *elems;
244 int nelems,
246 List *res = NIL;
248 deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
250 if (nelems == 0)
251 return NIL;
253 for (i = 0; i < nelems; i++)
254 res = lappend(res, makeString(TextDatumGetCString(elems[i])));
256 return res;
260 * Add new state record for a subscription table.
262 * If retain_lock is true, then don't release the locks taken in this function.
263 * We normally release the locks at the end of transaction but in binary-upgrade
264 * mode, we expect to release those immediately.
266 void
267 AddSubscriptionRelState(Oid subid, Oid relid, char state,
268 XLogRecPtr sublsn, bool retain_lock)
270 Relation rel;
271 HeapTuple tup;
272 bool nulls[Natts_pg_subscription_rel];
273 Datum values[Natts_pg_subscription_rel];
275 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
277 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
279 /* Try finding existing mapping. */
280 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
281 ObjectIdGetDatum(relid),
282 ObjectIdGetDatum(subid));
283 if (HeapTupleIsValid(tup))
284 elog(ERROR, "subscription table %u in subscription %u already exists",
285 relid, subid);
287 /* Form the tuple. */
288 memset(values, 0, sizeof(values));
289 memset(nulls, false, sizeof(nulls));
290 values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
291 values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
292 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
293 if (sublsn != InvalidXLogRecPtr)
294 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
295 else
296 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
298 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
300 /* Insert tuple into catalog. */
301 CatalogTupleInsert(rel, tup);
303 heap_freetuple(tup);
305 /* Cleanup. */
306 if (retain_lock)
308 table_close(rel, NoLock);
310 else
312 table_close(rel, RowExclusiveLock);
313 UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
318 * Update the state of a subscription table.
320 void
321 UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
322 XLogRecPtr sublsn)
324 Relation rel;
325 HeapTuple tup;
326 bool nulls[Natts_pg_subscription_rel];
327 Datum values[Natts_pg_subscription_rel];
328 bool replaces[Natts_pg_subscription_rel];
330 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
332 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
334 /* Try finding existing mapping. */
335 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
336 ObjectIdGetDatum(relid),
337 ObjectIdGetDatum(subid));
338 if (!HeapTupleIsValid(tup))
339 elog(ERROR, "subscription table %u in subscription %u does not exist",
340 relid, subid);
342 /* Update the tuple. */
343 memset(values, 0, sizeof(values));
344 memset(nulls, false, sizeof(nulls));
345 memset(replaces, false, sizeof(replaces));
347 replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
348 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
350 replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
351 if (sublsn != InvalidXLogRecPtr)
352 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
353 else
354 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
356 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
357 replaces);
359 /* Update the catalog. */
360 CatalogTupleUpdate(rel, &tup->t_self, tup);
362 /* Cleanup. */
363 table_close(rel, NoLock);
367 * Get state of subscription table.
369 * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
371 char
372 GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
374 HeapTuple tup;
375 char substate;
376 bool isnull;
377 Datum d;
378 Relation rel;
381 * This is to avoid the race condition with AlterSubscription which tries
382 * to remove this relstate.
384 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
386 /* Try finding the mapping. */
387 tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
388 ObjectIdGetDatum(relid),
389 ObjectIdGetDatum(subid));
391 if (!HeapTupleIsValid(tup))
393 table_close(rel, AccessShareLock);
394 *sublsn = InvalidXLogRecPtr;
395 return SUBREL_STATE_UNKNOWN;
398 /* Get the state. */
399 substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
401 /* Get the LSN */
402 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
403 Anum_pg_subscription_rel_srsublsn, &isnull);
404 if (isnull)
405 *sublsn = InvalidXLogRecPtr;
406 else
407 *sublsn = DatumGetLSN(d);
409 /* Cleanup */
410 ReleaseSysCache(tup);
412 table_close(rel, AccessShareLock);
414 return substate;
418 * Drop subscription relation mapping. These can be for a particular
419 * subscription, or for a particular relation, or both.
421 void
422 RemoveSubscriptionRel(Oid subid, Oid relid)
424 Relation rel;
425 TableScanDesc scan;
426 ScanKeyData skey[2];
427 HeapTuple tup;
428 int nkeys = 0;
430 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
432 if (OidIsValid(subid))
434 ScanKeyInit(&skey[nkeys++],
435 Anum_pg_subscription_rel_srsubid,
436 BTEqualStrategyNumber,
437 F_OIDEQ,
438 ObjectIdGetDatum(subid));
441 if (OidIsValid(relid))
443 ScanKeyInit(&skey[nkeys++],
444 Anum_pg_subscription_rel_srrelid,
445 BTEqualStrategyNumber,
446 F_OIDEQ,
447 ObjectIdGetDatum(relid));
450 /* Do the search and delete what we found. */
451 scan = table_beginscan_catalog(rel, nkeys, skey);
452 while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
454 Form_pg_subscription_rel subrel;
456 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
459 * We don't allow to drop the relation mapping when the table
460 * synchronization is in progress unless the caller updates the
461 * corresponding subscription as well. This is to ensure that we don't
462 * leave tablesync slots or origins in the system when the
463 * corresponding table is dropped.
465 if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
467 ereport(ERROR,
468 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
469 errmsg("could not drop relation mapping for subscription \"%s\"",
470 get_subscription_name(subrel->srsubid, false)),
471 errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
472 get_rel_name(relid), subrel->srsubstate),
475 * translator: first %s is a SQL ALTER command and second %s is a
476 * SQL DROP command
478 errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
479 "ALTER SUBSCRIPTION ... ENABLE",
480 "DROP SUBSCRIPTION ...")));
483 CatalogTupleDelete(rel, &tup->t_self);
485 table_endscan(scan);
487 table_close(rel, RowExclusiveLock);
491 * Does the subscription have any relations?
493 * Use this function only to know true/false, and when you have no need for the
494 * List returned by GetSubscriptionRelations.
496 bool
497 HasSubscriptionRelations(Oid subid)
499 Relation rel;
500 ScanKeyData skey[1];
501 SysScanDesc scan;
502 bool has_subrels;
504 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
506 ScanKeyInit(&skey[0],
507 Anum_pg_subscription_rel_srsubid,
508 BTEqualStrategyNumber, F_OIDEQ,
509 ObjectIdGetDatum(subid));
511 scan = systable_beginscan(rel, InvalidOid, false,
512 NULL, 1, skey);
514 /* If even a single tuple exists then the subscription has tables. */
515 has_subrels = HeapTupleIsValid(systable_getnext(scan));
517 /* Cleanup */
518 systable_endscan(scan);
519 table_close(rel, AccessShareLock);
521 return has_subrels;
525 * Get the relations for the subscription.
527 * If not_ready is true, return only the relations that are not in a ready
528 * state, otherwise return all the relations of the subscription. The
529 * returned list is palloc'ed in the current memory context.
531 List *
532 GetSubscriptionRelations(Oid subid, bool not_ready)
534 List *res = NIL;
535 Relation rel;
536 HeapTuple tup;
537 int nkeys = 0;
538 ScanKeyData skey[2];
539 SysScanDesc scan;
541 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
543 ScanKeyInit(&skey[nkeys++],
544 Anum_pg_subscription_rel_srsubid,
545 BTEqualStrategyNumber, F_OIDEQ,
546 ObjectIdGetDatum(subid));
548 if (not_ready)
549 ScanKeyInit(&skey[nkeys++],
550 Anum_pg_subscription_rel_srsubstate,
551 BTEqualStrategyNumber, F_CHARNE,
552 CharGetDatum(SUBREL_STATE_READY));
554 scan = systable_beginscan(rel, InvalidOid, false,
555 NULL, nkeys, skey);
557 while (HeapTupleIsValid(tup = systable_getnext(scan)))
559 Form_pg_subscription_rel subrel;
560 SubscriptionRelState *relstate;
561 Datum d;
562 bool isnull;
564 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
566 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
567 relstate->relid = subrel->srrelid;
568 relstate->state = subrel->srsubstate;
569 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
570 Anum_pg_subscription_rel_srsublsn, &isnull);
571 if (isnull)
572 relstate->lsn = InvalidXLogRecPtr;
573 else
574 relstate->lsn = DatumGetLSN(d);
576 res = lappend(res, relstate);
579 /* Cleanup */
580 systable_endscan(scan);
581 table_close(rel, AccessShareLock);
583 return res;