1 /*-------------------------------------------------------------------------
4 * publication C API manipulation
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
12 *-------------------------------------------------------------------------
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/tableam.h"
21 #include "access/xact.h"
22 #include "catalog/catalog.h"
23 #include "catalog/dependency.h"
24 #include "catalog/index.h"
25 #include "catalog/indexing.h"
26 #include "catalog/namespace.h"
27 #include "catalog/partition.h"
28 #include "catalog/objectaccess.h"
29 #include "catalog/objectaddress.h"
30 #include "catalog/pg_inherits.h"
31 #include "catalog/pg_publication.h"
32 #include "catalog/pg_publication_rel.h"
33 #include "catalog/pg_type.h"
34 #include "commands/publicationcmds.h"
36 #include "miscadmin.h"
37 #include "utils/array.h"
38 #include "utils/builtins.h"
39 #include "utils/catcache.h"
40 #include "utils/fmgroids.h"
41 #include "utils/inval.h"
42 #include "utils/lsyscache.h"
43 #include "utils/rel.h"
44 #include "utils/syscache.h"
47 * Check if relation can be in given publication and throws appropriate
51 check_publication_add_relation(Relation targetrel
)
53 /* Must be a regular or partitioned table */
54 if (RelationGetForm(targetrel
)->relkind
!= RELKIND_RELATION
&&
55 RelationGetForm(targetrel
)->relkind
!= RELKIND_PARTITIONED_TABLE
)
57 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
58 errmsg("cannot add relation \"%s\" to publication",
59 RelationGetRelationName(targetrel
)),
60 errdetail_relkind_not_supported(RelationGetForm(targetrel
)->relkind
)));
62 /* Can't be system table */
63 if (IsCatalogRelation(targetrel
))
65 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
66 errmsg("cannot add relation \"%s\" to publication",
67 RelationGetRelationName(targetrel
)),
68 errdetail("This operation is not supported for system tables.")));
70 /* UNLOGGED and TEMP relations cannot be part of publication. */
71 if (!RelationIsPermanent(targetrel
))
73 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
74 errmsg("cannot add relation \"%s\" to publication",
75 RelationGetRelationName(targetrel
)),
76 errdetail("Temporary and unlogged relations cannot be replicated.")));
80 * Returns if relation represented by oid and Form_pg_class entry
83 * Does same checks as the above, but does not need relation to be opened
84 * and also does not throw errors.
86 * XXX This also excludes all tables with relid < FirstNormalObjectId,
87 * ie all tables created during initdb. This mainly affects the preinstalled
88 * information_schema. IsCatalogRelationOid() only excludes tables with
89 * relid < FirstUnpinnedObjectId, making that test rather redundant,
90 * but really we should get rid of the FirstNormalObjectId test not
91 * IsCatalogRelationOid. We can't do so today because we don't want
92 * information_schema tables to be considered publishable; but this test
93 * is really inadequate for that, since the information_schema could be
94 * dropped and reloaded and then it'll be considered publishable. The best
95 * long-term solution may be to add a "relispublishable" bool to pg_class,
96 * and depend on that instead of OID checks.
99 is_publishable_class(Oid relid
, Form_pg_class reltuple
)
101 return (reltuple
->relkind
== RELKIND_RELATION
||
102 reltuple
->relkind
== RELKIND_PARTITIONED_TABLE
) &&
103 !IsCatalogRelationOid(relid
) &&
104 reltuple
->relpersistence
== RELPERSISTENCE_PERMANENT
&&
105 relid
>= FirstNormalObjectId
;
109 * Another variant of this, taking a Relation.
112 is_publishable_relation(Relation rel
)
114 return is_publishable_class(RelationGetRelid(rel
), rel
->rd_rel
);
119 * SQL-callable variant of the above
121 * This returns null when the relation does not exist. This is intended to be
122 * used for example in psql to avoid gratuitous errors when there are
123 * concurrent catalog changes.
126 pg_relation_is_publishable(PG_FUNCTION_ARGS
)
128 Oid relid
= PG_GETARG_OID(0);
132 tuple
= SearchSysCache1(RELOID
, ObjectIdGetDatum(relid
));
133 if (!HeapTupleIsValid(tuple
))
135 result
= is_publishable_class(relid
, (Form_pg_class
) GETSTRUCT(tuple
));
136 ReleaseSysCache(tuple
);
137 PG_RETURN_BOOL(result
);
141 * Gets the relations based on the publication partition option for a specified
145 GetPubPartitionOptionRelations(List
*result
, PublicationPartOpt pub_partopt
,
148 if (get_rel_relkind(relid
) == RELKIND_PARTITIONED_TABLE
&&
149 pub_partopt
!= PUBLICATION_PART_ROOT
)
151 List
*all_parts
= find_all_inheritors(relid
, NoLock
,
154 if (pub_partopt
== PUBLICATION_PART_ALL
)
155 result
= list_concat(result
, all_parts
);
156 else if (pub_partopt
== PUBLICATION_PART_LEAF
)
160 foreach(lc
, all_parts
)
162 Oid partOid
= lfirst_oid(lc
);
164 if (get_rel_relkind(partOid
) != RELKIND_PARTITIONED_TABLE
)
165 result
= lappend_oid(result
, partOid
);
172 result
= lappend_oid(result
, relid
);
178 * Insert new publication / relation mapping.
181 publication_add_relation(Oid pubid
, PublicationRelInfo
*targetrel
,
186 Datum values
[Natts_pg_publication_rel
];
187 bool nulls
[Natts_pg_publication_rel
];
188 Oid relid
= RelationGetRelid(targetrel
->relation
);
190 Publication
*pub
= GetPublication(pubid
);
191 ObjectAddress myself
,
195 rel
= table_open(PublicationRelRelationId
, RowExclusiveLock
);
198 * Check for duplicates. Note that this does not really prevent
199 * duplicates, it's here just to provide nicer error message in common
200 * case. The real protection is the unique key on the catalog.
202 if (SearchSysCacheExists2(PUBLICATIONRELMAP
, ObjectIdGetDatum(relid
),
203 ObjectIdGetDatum(pubid
)))
205 table_close(rel
, RowExclusiveLock
);
208 return InvalidObjectAddress
;
211 (errcode(ERRCODE_DUPLICATE_OBJECT
),
212 errmsg("relation \"%s\" is already member of publication \"%s\"",
213 RelationGetRelationName(targetrel
->relation
), pub
->name
)));
216 check_publication_add_relation(targetrel
->relation
);
219 memset(values
, 0, sizeof(values
));
220 memset(nulls
, false, sizeof(nulls
));
222 prrelid
= GetNewOidWithIndex(rel
, PublicationRelObjectIndexId
,
223 Anum_pg_publication_rel_oid
);
224 values
[Anum_pg_publication_rel_oid
- 1] = ObjectIdGetDatum(prrelid
);
225 values
[Anum_pg_publication_rel_prpubid
- 1] =
226 ObjectIdGetDatum(pubid
);
227 values
[Anum_pg_publication_rel_prrelid
- 1] =
228 ObjectIdGetDatum(relid
);
230 tup
= heap_form_tuple(RelationGetDescr(rel
), values
, nulls
);
232 /* Insert tuple into catalog. */
233 CatalogTupleInsert(rel
, tup
);
236 ObjectAddressSet(myself
, PublicationRelRelationId
, prrelid
);
238 /* Add dependency on the publication */
239 ObjectAddressSet(referenced
, PublicationRelationId
, pubid
);
240 recordDependencyOn(&myself
, &referenced
, DEPENDENCY_AUTO
);
242 /* Add dependency on the relation */
243 ObjectAddressSet(referenced
, RelationRelationId
, relid
);
244 recordDependencyOn(&myself
, &referenced
, DEPENDENCY_AUTO
);
246 /* Close the table. */
247 table_close(rel
, RowExclusiveLock
);
250 * Invalidate relcache so that publication info is rebuilt.
252 * For the partitioned tables, we must invalidate all partitions contained
253 * in the respective partition hierarchies, not just the one explicitly
254 * mentioned in the publication. This is required because we implicitly
255 * publish the child tables when the parent table is published.
257 relids
= GetPubPartitionOptionRelations(relids
, PUBLICATION_PART_ALL
,
260 InvalidatePublicationRels(relids
);
265 /* Gets list of publication oids for a relation */
267 GetRelationPublications(Oid relid
)
270 CatCList
*pubrellist
;
273 /* Find all publications associated with the relation. */
274 pubrellist
= SearchSysCacheList1(PUBLICATIONRELMAP
,
275 ObjectIdGetDatum(relid
));
276 for (i
= 0; i
< pubrellist
->n_members
; i
++)
278 HeapTuple tup
= &pubrellist
->members
[i
]->tuple
;
279 Oid pubid
= ((Form_pg_publication_rel
) GETSTRUCT(tup
))->prpubid
;
281 result
= lappend_oid(result
, pubid
);
284 ReleaseSysCacheList(pubrellist
);
290 * Gets list of relation oids for a publication.
292 * This should only be used FOR TABLE publications, the FOR ALL TABLES
293 * should use GetAllTablesPublicationRelations().
296 GetPublicationRelations(Oid pubid
, PublicationPartOpt pub_partopt
)
304 /* Find all publications associated with the relation. */
305 pubrelsrel
= table_open(PublicationRelRelationId
, AccessShareLock
);
307 ScanKeyInit(&scankey
,
308 Anum_pg_publication_rel_prpubid
,
309 BTEqualStrategyNumber
, F_OIDEQ
,
310 ObjectIdGetDatum(pubid
));
312 scan
= systable_beginscan(pubrelsrel
, PublicationRelPrrelidPrpubidIndexId
,
313 true, NULL
, 1, &scankey
);
316 while (HeapTupleIsValid(tup
= systable_getnext(scan
)))
318 Form_pg_publication_rel pubrel
;
320 pubrel
= (Form_pg_publication_rel
) GETSTRUCT(tup
);
321 result
= GetPubPartitionOptionRelations(result
, pub_partopt
,
325 systable_endscan(scan
);
326 table_close(pubrelsrel
, AccessShareLock
);
332 * Gets list of publication oids for publications marked as FOR ALL TABLES.
335 GetAllTablesPublications(void)
343 /* Find all publications that are marked as for all tables. */
344 rel
= table_open(PublicationRelationId
, AccessShareLock
);
346 ScanKeyInit(&scankey
,
347 Anum_pg_publication_puballtables
,
348 BTEqualStrategyNumber
, F_BOOLEQ
,
351 scan
= systable_beginscan(rel
, InvalidOid
, false,
355 while (HeapTupleIsValid(tup
= systable_getnext(scan
)))
357 Oid oid
= ((Form_pg_publication
) GETSTRUCT(tup
))->oid
;
359 result
= lappend_oid(result
, oid
);
362 systable_endscan(scan
);
363 table_close(rel
, AccessShareLock
);
369 * Gets list of all relation published by FOR ALL TABLES publication(s).
371 * If the publication publishes partition changes via their respective root
372 * partitioned tables, we must exclude partitions in favor of including the
373 * root partitioned tables.
376 GetAllTablesPublicationRelations(bool pubviaroot
)
384 classRel
= table_open(RelationRelationId
, AccessShareLock
);
387 Anum_pg_class_relkind
,
388 BTEqualStrategyNumber
, F_CHAREQ
,
389 CharGetDatum(RELKIND_RELATION
));
391 scan
= table_beginscan_catalog(classRel
, 1, key
);
393 while ((tuple
= heap_getnext(scan
, ForwardScanDirection
)) != NULL
)
395 Form_pg_class relForm
= (Form_pg_class
) GETSTRUCT(tuple
);
396 Oid relid
= relForm
->oid
;
398 if (is_publishable_class(relid
, relForm
) &&
399 !(relForm
->relispartition
&& pubviaroot
))
400 result
= lappend_oid(result
, relid
);
408 Anum_pg_class_relkind
,
409 BTEqualStrategyNumber
, F_CHAREQ
,
410 CharGetDatum(RELKIND_PARTITIONED_TABLE
));
412 scan
= table_beginscan_catalog(classRel
, 1, key
);
414 while ((tuple
= heap_getnext(scan
, ForwardScanDirection
)) != NULL
)
416 Form_pg_class relForm
= (Form_pg_class
) GETSTRUCT(tuple
);
417 Oid relid
= relForm
->oid
;
419 if (is_publishable_class(relid
, relForm
) &&
420 !relForm
->relispartition
)
421 result
= lappend_oid(result
, relid
);
427 table_close(classRel
, AccessShareLock
);
432 * Get publication using oid
434 * The Publication struct and its data are palloc'ed here.
437 GetPublication(Oid pubid
)
441 Form_pg_publication pubform
;
443 tup
= SearchSysCache1(PUBLICATIONOID
, ObjectIdGetDatum(pubid
));
444 if (!HeapTupleIsValid(tup
))
445 elog(ERROR
, "cache lookup failed for publication %u", pubid
);
447 pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
449 pub
= (Publication
*) palloc(sizeof(Publication
));
451 pub
->name
= pstrdup(NameStr(pubform
->pubname
));
452 pub
->alltables
= pubform
->puballtables
;
453 pub
->pubactions
.pubinsert
= pubform
->pubinsert
;
454 pub
->pubactions
.pubupdate
= pubform
->pubupdate
;
455 pub
->pubactions
.pubdelete
= pubform
->pubdelete
;
456 pub
->pubactions
.pubtruncate
= pubform
->pubtruncate
;
457 pub
->pubviaroot
= pubform
->pubviaroot
;
459 ReleaseSysCache(tup
);
466 * Get Publication using name.
469 GetPublicationByName(const char *pubname
, bool missing_ok
)
473 oid
= get_publication_oid(pubname
, missing_ok
);
475 return OidIsValid(oid
) ? GetPublication(oid
) : NULL
;
479 * get_publication_oid - given a publication name, look up the OID
481 * If missing_ok is false, throw an error if name not found. If true, just
485 get_publication_oid(const char *pubname
, bool missing_ok
)
489 oid
= GetSysCacheOid1(PUBLICATIONNAME
, Anum_pg_publication_oid
,
490 CStringGetDatum(pubname
));
491 if (!OidIsValid(oid
) && !missing_ok
)
493 (errcode(ERRCODE_UNDEFINED_OBJECT
),
494 errmsg("publication \"%s\" does not exist", pubname
)));
499 * get_publication_name - given a publication Oid, look up the name
501 * If missing_ok is false, throw an error if name not found. If true, just
505 get_publication_name(Oid pubid
, bool missing_ok
)
509 Form_pg_publication pubform
;
511 tup
= SearchSysCache1(PUBLICATIONOID
, ObjectIdGetDatum(pubid
));
513 if (!HeapTupleIsValid(tup
))
516 elog(ERROR
, "cache lookup failed for publication %u", pubid
);
520 pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
521 pubname
= pstrdup(NameStr(pubform
->pubname
));
523 ReleaseSysCache(tup
);
529 * Returns Oids of tables in a publication.
532 pg_get_publication_tables(PG_FUNCTION_ARGS
)
534 FuncCallContext
*funcctx
;
535 char *pubname
= text_to_cstring(PG_GETARG_TEXT_PP(0));
536 Publication
*publication
;
539 /* stuff done only on the first call of the function */
540 if (SRF_IS_FIRSTCALL())
542 MemoryContext oldcontext
;
544 /* create a function context for cross-call persistence */
545 funcctx
= SRF_FIRSTCALL_INIT();
547 /* switch to memory context appropriate for multiple function calls */
548 oldcontext
= MemoryContextSwitchTo(funcctx
->multi_call_memory_ctx
);
550 publication
= GetPublicationByName(pubname
, false);
553 * Publications support partitioned tables, although all changes are
554 * replicated using leaf partition identity and schema, so we only
557 if (publication
->alltables
)
558 tables
= GetAllTablesPublicationRelations(publication
->pubviaroot
);
560 tables
= GetPublicationRelations(publication
->oid
,
561 publication
->pubviaroot
?
562 PUBLICATION_PART_ROOT
:
563 PUBLICATION_PART_LEAF
);
564 funcctx
->user_fctx
= (void *) tables
;
566 MemoryContextSwitchTo(oldcontext
);
569 /* stuff done on every call of the function */
570 funcctx
= SRF_PERCALL_SETUP();
571 tables
= (List
*) funcctx
->user_fctx
;
573 if (funcctx
->call_cntr
< list_length(tables
))
575 Oid relid
= list_nth_oid(tables
, funcctx
->call_cntr
);
577 SRF_RETURN_NEXT(funcctx
, ObjectIdGetDatum(relid
));
580 SRF_RETURN_DONE(funcctx
);