postgres_fdw: Refactor transaction rollback code to avoid code duplication.
[pgsql.git] / src / backend / catalog / pg_publication.c
blob9cd0c82f93cb7ff15472ee2e614f6cd99a2fe0c0
1 /*-------------------------------------------------------------------------
3 * pg_publication.c
4 * publication C API manipulation
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
9 * IDENTIFICATION
10 * pg_publication.c
12 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/tableam.h"
21 #include "access/xact.h"
22 #include "catalog/catalog.h"
23 #include "catalog/dependency.h"
24 #include "catalog/index.h"
25 #include "catalog/indexing.h"
26 #include "catalog/namespace.h"
27 #include "catalog/partition.h"
28 #include "catalog/objectaccess.h"
29 #include "catalog/objectaddress.h"
30 #include "catalog/pg_inherits.h"
31 #include "catalog/pg_publication.h"
32 #include "catalog/pg_publication_rel.h"
33 #include "catalog/pg_type.h"
34 #include "commands/publicationcmds.h"
35 #include "funcapi.h"
36 #include "miscadmin.h"
37 #include "utils/array.h"
38 #include "utils/builtins.h"
39 #include "utils/catcache.h"
40 #include "utils/fmgroids.h"
41 #include "utils/inval.h"
42 #include "utils/lsyscache.h"
43 #include "utils/rel.h"
44 #include "utils/syscache.h"
47 * Check if relation can be in given publication and throws appropriate
48 * error if not.
50 static void
51 check_publication_add_relation(Relation targetrel)
53 /* Must be a regular or partitioned table */
54 if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
55 RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
56 ereport(ERROR,
57 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
58 errmsg("cannot add relation \"%s\" to publication",
59 RelationGetRelationName(targetrel)),
60 errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
62 /* Can't be system table */
63 if (IsCatalogRelation(targetrel))
64 ereport(ERROR,
65 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66 errmsg("cannot add relation \"%s\" to publication",
67 RelationGetRelationName(targetrel)),
68 errdetail("This operation is not supported for system tables.")));
70 /* UNLOGGED and TEMP relations cannot be part of publication. */
71 if (!RelationIsPermanent(targetrel))
72 ereport(ERROR,
73 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 errmsg("cannot add relation \"%s\" to publication",
75 RelationGetRelationName(targetrel)),
76 errdetail("Temporary and unlogged relations cannot be replicated.")));
80 * Returns if relation represented by oid and Form_pg_class entry
81 * is publishable.
83 * Does same checks as the above, but does not need relation to be opened
84 * and also does not throw errors.
86 * XXX This also excludes all tables with relid < FirstNormalObjectId,
87 * ie all tables created during initdb. This mainly affects the preinstalled
88 * information_schema. IsCatalogRelationOid() only excludes tables with
89 * relid < FirstUnpinnedObjectId, making that test rather redundant,
90 * but really we should get rid of the FirstNormalObjectId test not
91 * IsCatalogRelationOid. We can't do so today because we don't want
92 * information_schema tables to be considered publishable; but this test
93 * is really inadequate for that, since the information_schema could be
94 * dropped and reloaded and then it'll be considered publishable. The best
95 * long-term solution may be to add a "relispublishable" bool to pg_class,
96 * and depend on that instead of OID checks.
98 static bool
99 is_publishable_class(Oid relid, Form_pg_class reltuple)
101 return (reltuple->relkind == RELKIND_RELATION ||
102 reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
103 !IsCatalogRelationOid(relid) &&
104 reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
105 relid >= FirstNormalObjectId;
109 * Another variant of this, taking a Relation.
111 bool
112 is_publishable_relation(Relation rel)
114 return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
119 * SQL-callable variant of the above
121 * This returns null when the relation does not exist. This is intended to be
122 * used for example in psql to avoid gratuitous errors when there are
123 * concurrent catalog changes.
125 Datum
126 pg_relation_is_publishable(PG_FUNCTION_ARGS)
128 Oid relid = PG_GETARG_OID(0);
129 HeapTuple tuple;
130 bool result;
132 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
133 if (!HeapTupleIsValid(tuple))
134 PG_RETURN_NULL();
135 result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
136 ReleaseSysCache(tuple);
137 PG_RETURN_BOOL(result);
141 * Gets the relations based on the publication partition option for a specified
142 * relation.
144 List *
145 GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
146 Oid relid)
148 if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
149 pub_partopt != PUBLICATION_PART_ROOT)
151 List *all_parts = find_all_inheritors(relid, NoLock,
152 NULL);
154 if (pub_partopt == PUBLICATION_PART_ALL)
155 result = list_concat(result, all_parts);
156 else if (pub_partopt == PUBLICATION_PART_LEAF)
158 ListCell *lc;
160 foreach(lc, all_parts)
162 Oid partOid = lfirst_oid(lc);
164 if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
165 result = lappend_oid(result, partOid);
168 else
169 Assert(false);
171 else
172 result = lappend_oid(result, relid);
174 return result;
178 * Insert new publication / relation mapping.
180 ObjectAddress
181 publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
182 bool if_not_exists)
184 Relation rel;
185 HeapTuple tup;
186 Datum values[Natts_pg_publication_rel];
187 bool nulls[Natts_pg_publication_rel];
188 Oid relid = RelationGetRelid(targetrel->relation);
189 Oid prrelid;
190 Publication *pub = GetPublication(pubid);
191 ObjectAddress myself,
192 referenced;
193 List *relids = NIL;
195 rel = table_open(PublicationRelRelationId, RowExclusiveLock);
198 * Check for duplicates. Note that this does not really prevent
199 * duplicates, it's here just to provide nicer error message in common
200 * case. The real protection is the unique key on the catalog.
202 if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
203 ObjectIdGetDatum(pubid)))
205 table_close(rel, RowExclusiveLock);
207 if (if_not_exists)
208 return InvalidObjectAddress;
210 ereport(ERROR,
211 (errcode(ERRCODE_DUPLICATE_OBJECT),
212 errmsg("relation \"%s\" is already member of publication \"%s\"",
213 RelationGetRelationName(targetrel->relation), pub->name)));
216 check_publication_add_relation(targetrel->relation);
218 /* Form a tuple. */
219 memset(values, 0, sizeof(values));
220 memset(nulls, false, sizeof(nulls));
222 prrelid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
223 Anum_pg_publication_rel_oid);
224 values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(prrelid);
225 values[Anum_pg_publication_rel_prpubid - 1] =
226 ObjectIdGetDatum(pubid);
227 values[Anum_pg_publication_rel_prrelid - 1] =
228 ObjectIdGetDatum(relid);
230 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
232 /* Insert tuple into catalog. */
233 CatalogTupleInsert(rel, tup);
234 heap_freetuple(tup);
236 ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
238 /* Add dependency on the publication */
239 ObjectAddressSet(referenced, PublicationRelationId, pubid);
240 recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
242 /* Add dependency on the relation */
243 ObjectAddressSet(referenced, RelationRelationId, relid);
244 recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
246 /* Close the table. */
247 table_close(rel, RowExclusiveLock);
250 * Invalidate relcache so that publication info is rebuilt.
252 * For the partitioned tables, we must invalidate all partitions contained
253 * in the respective partition hierarchies, not just the one explicitly
254 * mentioned in the publication. This is required because we implicitly
255 * publish the child tables when the parent table is published.
257 relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
258 relid);
260 InvalidatePublicationRels(relids);
262 return myself;
265 /* Gets list of publication oids for a relation */
266 List *
267 GetRelationPublications(Oid relid)
269 List *result = NIL;
270 CatCList *pubrellist;
271 int i;
273 /* Find all publications associated with the relation. */
274 pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
275 ObjectIdGetDatum(relid));
276 for (i = 0; i < pubrellist->n_members; i++)
278 HeapTuple tup = &pubrellist->members[i]->tuple;
279 Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
281 result = lappend_oid(result, pubid);
284 ReleaseSysCacheList(pubrellist);
286 return result;
290 * Gets list of relation oids for a publication.
292 * This should only be used FOR TABLE publications, the FOR ALL TABLES
293 * should use GetAllTablesPublicationRelations().
295 List *
296 GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
298 List *result;
299 Relation pubrelsrel;
300 ScanKeyData scankey;
301 SysScanDesc scan;
302 HeapTuple tup;
304 /* Find all publications associated with the relation. */
305 pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
307 ScanKeyInit(&scankey,
308 Anum_pg_publication_rel_prpubid,
309 BTEqualStrategyNumber, F_OIDEQ,
310 ObjectIdGetDatum(pubid));
312 scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
313 true, NULL, 1, &scankey);
315 result = NIL;
316 while (HeapTupleIsValid(tup = systable_getnext(scan)))
318 Form_pg_publication_rel pubrel;
320 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
321 result = GetPubPartitionOptionRelations(result, pub_partopt,
322 pubrel->prrelid);
325 systable_endscan(scan);
326 table_close(pubrelsrel, AccessShareLock);
328 return result;
332 * Gets list of publication oids for publications marked as FOR ALL TABLES.
334 List *
335 GetAllTablesPublications(void)
337 List *result;
338 Relation rel;
339 ScanKeyData scankey;
340 SysScanDesc scan;
341 HeapTuple tup;
343 /* Find all publications that are marked as for all tables. */
344 rel = table_open(PublicationRelationId, AccessShareLock);
346 ScanKeyInit(&scankey,
347 Anum_pg_publication_puballtables,
348 BTEqualStrategyNumber, F_BOOLEQ,
349 BoolGetDatum(true));
351 scan = systable_beginscan(rel, InvalidOid, false,
352 NULL, 1, &scankey);
354 result = NIL;
355 while (HeapTupleIsValid(tup = systable_getnext(scan)))
357 Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
359 result = lappend_oid(result, oid);
362 systable_endscan(scan);
363 table_close(rel, AccessShareLock);
365 return result;
369 * Gets list of all relation published by FOR ALL TABLES publication(s).
371 * If the publication publishes partition changes via their respective root
372 * partitioned tables, we must exclude partitions in favor of including the
373 * root partitioned tables.
375 List *
376 GetAllTablesPublicationRelations(bool pubviaroot)
378 Relation classRel;
379 ScanKeyData key[1];
380 TableScanDesc scan;
381 HeapTuple tuple;
382 List *result = NIL;
384 classRel = table_open(RelationRelationId, AccessShareLock);
386 ScanKeyInit(&key[0],
387 Anum_pg_class_relkind,
388 BTEqualStrategyNumber, F_CHAREQ,
389 CharGetDatum(RELKIND_RELATION));
391 scan = table_beginscan_catalog(classRel, 1, key);
393 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
395 Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
396 Oid relid = relForm->oid;
398 if (is_publishable_class(relid, relForm) &&
399 !(relForm->relispartition && pubviaroot))
400 result = lappend_oid(result, relid);
403 table_endscan(scan);
405 if (pubviaroot)
407 ScanKeyInit(&key[0],
408 Anum_pg_class_relkind,
409 BTEqualStrategyNumber, F_CHAREQ,
410 CharGetDatum(RELKIND_PARTITIONED_TABLE));
412 scan = table_beginscan_catalog(classRel, 1, key);
414 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
416 Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
417 Oid relid = relForm->oid;
419 if (is_publishable_class(relid, relForm) &&
420 !relForm->relispartition)
421 result = lappend_oid(result, relid);
424 table_endscan(scan);
427 table_close(classRel, AccessShareLock);
428 return result;
432 * Get publication using oid
434 * The Publication struct and its data are palloc'ed here.
436 Publication *
437 GetPublication(Oid pubid)
439 HeapTuple tup;
440 Publication *pub;
441 Form_pg_publication pubform;
443 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
444 if (!HeapTupleIsValid(tup))
445 elog(ERROR, "cache lookup failed for publication %u", pubid);
447 pubform = (Form_pg_publication) GETSTRUCT(tup);
449 pub = (Publication *) palloc(sizeof(Publication));
450 pub->oid = pubid;
451 pub->name = pstrdup(NameStr(pubform->pubname));
452 pub->alltables = pubform->puballtables;
453 pub->pubactions.pubinsert = pubform->pubinsert;
454 pub->pubactions.pubupdate = pubform->pubupdate;
455 pub->pubactions.pubdelete = pubform->pubdelete;
456 pub->pubactions.pubtruncate = pubform->pubtruncate;
457 pub->pubviaroot = pubform->pubviaroot;
459 ReleaseSysCache(tup);
461 return pub;
466 * Get Publication using name.
468 Publication *
469 GetPublicationByName(const char *pubname, bool missing_ok)
471 Oid oid;
473 oid = get_publication_oid(pubname, missing_ok);
475 return OidIsValid(oid) ? GetPublication(oid) : NULL;
479 * get_publication_oid - given a publication name, look up the OID
481 * If missing_ok is false, throw an error if name not found. If true, just
482 * return InvalidOid.
485 get_publication_oid(const char *pubname, bool missing_ok)
487 Oid oid;
489 oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
490 CStringGetDatum(pubname));
491 if (!OidIsValid(oid) && !missing_ok)
492 ereport(ERROR,
493 (errcode(ERRCODE_UNDEFINED_OBJECT),
494 errmsg("publication \"%s\" does not exist", pubname)));
495 return oid;
499 * get_publication_name - given a publication Oid, look up the name
501 * If missing_ok is false, throw an error if name not found. If true, just
502 * return NULL.
504 char *
505 get_publication_name(Oid pubid, bool missing_ok)
507 HeapTuple tup;
508 char *pubname;
509 Form_pg_publication pubform;
511 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
513 if (!HeapTupleIsValid(tup))
515 if (!missing_ok)
516 elog(ERROR, "cache lookup failed for publication %u", pubid);
517 return NULL;
520 pubform = (Form_pg_publication) GETSTRUCT(tup);
521 pubname = pstrdup(NameStr(pubform->pubname));
523 ReleaseSysCache(tup);
525 return pubname;
529 * Returns Oids of tables in a publication.
531 Datum
532 pg_get_publication_tables(PG_FUNCTION_ARGS)
534 FuncCallContext *funcctx;
535 char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
536 Publication *publication;
537 List *tables;
539 /* stuff done only on the first call of the function */
540 if (SRF_IS_FIRSTCALL())
542 MemoryContext oldcontext;
544 /* create a function context for cross-call persistence */
545 funcctx = SRF_FIRSTCALL_INIT();
547 /* switch to memory context appropriate for multiple function calls */
548 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
550 publication = GetPublicationByName(pubname, false);
553 * Publications support partitioned tables, although all changes are
554 * replicated using leaf partition identity and schema, so we only
555 * need those.
557 if (publication->alltables)
558 tables = GetAllTablesPublicationRelations(publication->pubviaroot);
559 else
560 tables = GetPublicationRelations(publication->oid,
561 publication->pubviaroot ?
562 PUBLICATION_PART_ROOT :
563 PUBLICATION_PART_LEAF);
564 funcctx->user_fctx = (void *) tables;
566 MemoryContextSwitchTo(oldcontext);
569 /* stuff done on every call of the function */
570 funcctx = SRF_PERCALL_SETUP();
571 tables = (List *) funcctx->user_fctx;
573 if (funcctx->call_cntr < list_length(tables))
575 Oid relid = list_nth_oid(tables, funcctx->call_cntr);
577 SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
580 SRF_RETURN_DONE(funcctx);