1 /*-------------------------------------------------------------------------
4 * publication manipulation
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/backend/commands/publicationcmds.c
12 *-------------------------------------------------------------------------
17 #include "access/htup_details.h"
18 #include "access/table.h"
19 #include "access/xact.h"
20 #include "catalog/catalog.h"
21 #include "catalog/indexing.h"
22 #include "catalog/namespace.h"
23 #include "catalog/objectaccess.h"
24 #include "catalog/objectaddress.h"
25 #include "catalog/pg_database.h"
26 #include "catalog/pg_inherits.h"
27 #include "catalog/pg_namespace.h"
28 #include "catalog/pg_proc.h"
29 #include "catalog/pg_publication.h"
30 #include "catalog/pg_publication_namespace.h"
31 #include "catalog/pg_publication_rel.h"
32 #include "commands/dbcommands.h"
33 #include "commands/defrem.h"
34 #include "commands/event_trigger.h"
35 #include "commands/publicationcmds.h"
36 #include "miscadmin.h"
37 #include "nodes/nodeFuncs.h"
38 #include "parser/parse_clause.h"
39 #include "parser/parse_collate.h"
40 #include "parser/parse_relation.h"
41 #include "storage/lmgr.h"
42 #include "utils/acl.h"
43 #include "utils/builtins.h"
44 #include "utils/inval.h"
45 #include "utils/lsyscache.h"
46 #include "utils/rel.h"
47 #include "utils/syscache.h"
48 #include "utils/varlena.h"
52 * Information used to validate the columns in the row filter expression. See
53 * contain_invalid_rfcolumn_walker for details.
55 typedef struct rf_context
57 Bitmapset
*bms_replident
; /* bitset of replica identity columns */
58 bool pubviaroot
; /* true if we are validating the parent
59 * relation's row filter */
60 Oid relid
; /* relid of the relation */
61 Oid parentid
; /* relid of the parent relation */
64 static List
*OpenTableList(List
*tables
);
65 static void CloseTableList(List
*rels
);
66 static void LockSchemaList(List
*schemalist
);
67 static void PublicationAddTables(Oid pubid
, List
*rels
, bool if_not_exists
,
68 AlterPublicationStmt
*stmt
);
69 static void PublicationDropTables(Oid pubid
, List
*rels
, bool missing_ok
);
70 static void PublicationAddSchemas(Oid pubid
, List
*schemas
, bool if_not_exists
,
71 AlterPublicationStmt
*stmt
);
72 static void PublicationDropSchemas(Oid pubid
, List
*schemas
, bool missing_ok
);
76 parse_publication_options(ParseState
*pstate
,
79 PublicationActions
*pubactions
,
80 bool *publish_via_partition_root_given
,
81 bool *publish_via_partition_root
,
82 bool *publish_generated_columns_given
,
83 bool *publish_generated_columns
)
87 *publish_given
= false;
88 *publish_via_partition_root_given
= false;
89 *publish_generated_columns_given
= false;
92 pubactions
->pubinsert
= true;
93 pubactions
->pubupdate
= true;
94 pubactions
->pubdelete
= true;
95 pubactions
->pubtruncate
= true;
96 *publish_via_partition_root
= false;
97 *publish_generated_columns
= false;
102 DefElem
*defel
= (DefElem
*) lfirst(lc
);
104 if (strcmp(defel
->defname
, "publish") == 0)
111 errorConflictingDefElem(defel
, pstate
);
114 * If publish option was given only the explicitly listed actions
115 * should be published.
117 pubactions
->pubinsert
= false;
118 pubactions
->pubupdate
= false;
119 pubactions
->pubdelete
= false;
120 pubactions
->pubtruncate
= false;
122 *publish_given
= true;
123 publish
= defGetString(defel
);
125 if (!SplitIdentifierString(publish
, ',', &publish_list
))
127 (errcode(ERRCODE_SYNTAX_ERROR
),
128 errmsg("invalid list syntax in parameter \"%s\"",
131 /* Process the option list. */
132 foreach(lc2
, publish_list
)
134 char *publish_opt
= (char *) lfirst(lc2
);
136 if (strcmp(publish_opt
, "insert") == 0)
137 pubactions
->pubinsert
= true;
138 else if (strcmp(publish_opt
, "update") == 0)
139 pubactions
->pubupdate
= true;
140 else if (strcmp(publish_opt
, "delete") == 0)
141 pubactions
->pubdelete
= true;
142 else if (strcmp(publish_opt
, "truncate") == 0)
143 pubactions
->pubtruncate
= true;
146 (errcode(ERRCODE_SYNTAX_ERROR
),
147 errmsg("unrecognized value for publication option \"%s\": \"%s\"",
148 "publish", publish_opt
)));
151 else if (strcmp(defel
->defname
, "publish_via_partition_root") == 0)
153 if (*publish_via_partition_root_given
)
154 errorConflictingDefElem(defel
, pstate
);
155 *publish_via_partition_root_given
= true;
156 *publish_via_partition_root
= defGetBoolean(defel
);
158 else if (strcmp(defel
->defname
, "publish_generated_columns") == 0)
160 if (*publish_generated_columns_given
)
161 errorConflictingDefElem(defel
, pstate
);
162 *publish_generated_columns_given
= true;
163 *publish_generated_columns
= defGetBoolean(defel
);
167 (errcode(ERRCODE_SYNTAX_ERROR
),
168 errmsg("unrecognized publication parameter: \"%s\"", defel
->defname
)));
173 * Convert the PublicationObjSpecType list into schema oid list and
174 * PublicationTable list.
177 ObjectsInPublicationToOids(List
*pubobjspec_list
, ParseState
*pstate
,
178 List
**rels
, List
**schemas
)
181 PublicationObjSpec
*pubobj
;
183 if (!pubobjspec_list
)
186 foreach(cell
, pubobjspec_list
)
191 pubobj
= (PublicationObjSpec
*) lfirst(cell
);
193 switch (pubobj
->pubobjtype
)
195 case PUBLICATIONOBJ_TABLE
:
196 *rels
= lappend(*rels
, pubobj
->pubtable
);
198 case PUBLICATIONOBJ_TABLES_IN_SCHEMA
:
199 schemaid
= get_namespace_oid(pubobj
->name
, false);
201 /* Filter out duplicates if user specifies "sch1, sch1" */
202 *schemas
= list_append_unique_oid(*schemas
, schemaid
);
204 case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA
:
205 search_path
= fetch_search_path(false);
206 if (search_path
== NIL
) /* nothing valid in search_path? */
208 errcode(ERRCODE_UNDEFINED_SCHEMA
),
209 errmsg("no schema has been selected for CURRENT_SCHEMA"));
211 schemaid
= linitial_oid(search_path
);
212 list_free(search_path
);
214 /* Filter out duplicates if user specifies "sch1, sch1" */
215 *schemas
= list_append_unique_oid(*schemas
, schemaid
);
218 /* shouldn't happen */
219 elog(ERROR
, "invalid publication object type %d", pubobj
->pubobjtype
);
226 * Returns true if any of the columns used in the row filter WHERE expression is
227 * not part of REPLICA IDENTITY, false otherwise.
230 contain_invalid_rfcolumn_walker(Node
*node
, rf_context
*context
)
237 Var
*var
= (Var
*) node
;
238 AttrNumber attnum
= var
->varattno
;
241 * If pubviaroot is true, we are validating the row filter of the
242 * parent table, but the bitmap contains the replica identity
243 * information of the child table. So, get the column number of the
244 * child table as parent and child column order could be different.
246 if (context
->pubviaroot
)
248 char *colname
= get_attname(context
->parentid
, attnum
, false);
250 attnum
= get_attnum(context
->relid
, colname
);
253 if (!bms_is_member(attnum
- FirstLowInvalidHeapAttributeNumber
,
254 context
->bms_replident
))
258 return expression_tree_walker(node
, contain_invalid_rfcolumn_walker
,
263 * Check if all columns referenced in the filter expression are part of the
264 * REPLICA IDENTITY index or not.
266 * Returns true if any invalid column is found.
269 pub_rf_contains_invalid_column(Oid pubid
, Relation relation
, List
*ancestors
,
273 Oid relid
= RelationGetRelid(relation
);
274 Oid publish_as_relid
= RelationGetRelid(relation
);
280 * FULL means all columns are in the REPLICA IDENTITY, so all columns are
281 * allowed in the row filter and we can skip the validation.
283 if (relation
->rd_rel
->relreplident
== REPLICA_IDENTITY_FULL
)
287 * For a partition, if pubviaroot is true, find the topmost ancestor that
288 * is published via this publication as we need to use its row filter
289 * expression to filter the partition's changes.
291 * Note that even though the row filter used is for an ancestor, the
292 * REPLICA IDENTITY used will be for the actual child table.
294 if (pubviaroot
&& relation
->rd_rel
->relispartition
)
297 = GetTopMostAncestorInPublication(pubid
, ancestors
, NULL
);
299 if (!OidIsValid(publish_as_relid
))
300 publish_as_relid
= relid
;
303 rftuple
= SearchSysCache2(PUBLICATIONRELMAP
,
304 ObjectIdGetDatum(publish_as_relid
),
305 ObjectIdGetDatum(pubid
));
307 if (!HeapTupleIsValid(rftuple
))
310 rfdatum
= SysCacheGetAttr(PUBLICATIONRELMAP
, rftuple
,
311 Anum_pg_publication_rel_prqual
,
316 rf_context context
= {0};
318 Bitmapset
*bms
= NULL
;
320 context
.pubviaroot
= pubviaroot
;
321 context
.parentid
= publish_as_relid
;
322 context
.relid
= relid
;
324 /* Remember columns that are part of the REPLICA IDENTITY */
325 bms
= RelationGetIndexAttrBitmap(relation
,
326 INDEX_ATTR_BITMAP_IDENTITY_KEY
);
328 context
.bms_replident
= bms
;
329 rfnode
= stringToNode(TextDatumGetCString(rfdatum
));
330 result
= contain_invalid_rfcolumn_walker(rfnode
, &context
);
333 ReleaseSysCache(rftuple
);
339 * Check for invalid columns in the publication table definition.
341 * This function evaluates two conditions:
343 * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
344 * by the column list. If any column is missing, *invalid_column_list is set
346 * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
347 * are published either by listing them in the column list or by enabling
348 * publish_generated_columns option. If any unpublished generated column is
349 * found, *invalid_gen_col is set to true.
351 * Returns true if any of the above conditions are not met.
354 pub_contains_invalid_column(Oid pubid
, Relation relation
, List
*ancestors
,
355 bool pubviaroot
, bool pubgencols
,
356 bool *invalid_column_list
,
357 bool *invalid_gen_col
)
359 Oid relid
= RelationGetRelid(relation
);
360 Oid publish_as_relid
= RelationGetRelid(relation
);
362 Bitmapset
*columns
= NULL
;
363 TupleDesc desc
= RelationGetDescr(relation
);
367 *invalid_column_list
= false;
368 *invalid_gen_col
= false;
371 * For a partition, if pubviaroot is true, find the topmost ancestor that
372 * is published via this publication as we need to use its column list for
375 * Note that even though the column list used is for an ancestor, the
376 * REPLICA IDENTITY used will be for the actual child table.
378 if (pubviaroot
&& relation
->rd_rel
->relispartition
)
380 publish_as_relid
= GetTopMostAncestorInPublication(pubid
, ancestors
, NULL
);
382 if (!OidIsValid(publish_as_relid
))
383 publish_as_relid
= relid
;
386 /* Fetch the column list */
387 pub
= GetPublication(pubid
);
388 check_and_fetch_column_list(pub
, publish_as_relid
, NULL
, &columns
);
390 if (relation
->rd_rel
->relreplident
== REPLICA_IDENTITY_FULL
)
392 /* With REPLICA IDENTITY FULL, no column list is allowed. */
393 *invalid_column_list
= (columns
!= NULL
);
396 * As we don't allow a column list with REPLICA IDENTITY FULL, the
397 * publish_generated_columns option must be set to true if the table
398 * has any stored generated columns.
401 relation
->rd_att
->constr
&&
402 relation
->rd_att
->constr
->has_generated_stored
)
403 *invalid_gen_col
= true;
405 if (*invalid_gen_col
&& *invalid_column_list
)
409 /* Remember columns that are part of the REPLICA IDENTITY */
410 idattrs
= RelationGetIndexAttrBitmap(relation
,
411 INDEX_ATTR_BITMAP_IDENTITY_KEY
);
414 * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
415 * (to handle system columns the usual way), while column list does not
416 * use offset, so we can't do bms_is_subset(). Instead, we have to loop
417 * over the idattrs and check all of them are in the list.
420 while ((x
= bms_next_member(idattrs
, x
)) >= 0)
422 AttrNumber attnum
= (x
+ FirstLowInvalidHeapAttributeNumber
);
423 Form_pg_attribute att
= TupleDescAttr(desc
, attnum
- 1);
428 * The publish_generated_columns option must be set to true if the
429 * REPLICA IDENTITY contains any stored generated column.
431 if (!pubgencols
&& att
->attgenerated
)
433 *invalid_gen_col
= true;
437 /* Skip validating the column list since it is not defined */
442 * If pubviaroot is true, we are validating the column list of the
443 * parent table, but the bitmap contains the replica identity
444 * information of the child table. The parent/child attnums may not
445 * match, so translate them to the parent - get the attname from the
446 * child, and look it up in the parent.
450 /* attribute name in the child table */
451 char *colname
= get_attname(relid
, attnum
, false);
454 * Determine the attnum for the attribute name in parent (we are
455 * using the column list defined on the parent).
457 attnum
= get_attnum(publish_as_relid
, colname
);
460 /* replica identity column, not covered by the column list */
461 *invalid_column_list
|= !bms_is_member(attnum
, columns
);
463 if (*invalid_column_list
&& *invalid_gen_col
)
470 return *invalid_column_list
|| *invalid_gen_col
;
473 /* check_functions_in_node callback */
475 contain_mutable_or_user_functions_checker(Oid func_id
, void *context
)
477 return (func_volatile(func_id
) != PROVOLATILE_IMMUTABLE
||
478 func_id
>= FirstNormalObjectId
);
482 * The row filter walker checks if the row filter expression is a "simple
485 * It allows only simple or compound expressions such as:
488 * - (Var Op Const) AND/OR (Var Op Const)
490 * (where Var is a column of the table this filter belongs to)
492 * The simple expression has the following restrictions:
493 * - User-defined operators are not allowed;
494 * - User-defined functions are not allowed;
495 * - User-defined types are not allowed;
496 * - User-defined collations are not allowed;
497 * - Non-immutable built-in functions are not allowed;
498 * - System columns are not allowed.
502 * We don't allow user-defined functions/operators/types/collations because
503 * (a) if a user drops a user-defined object used in a row filter expression or
504 * if there is any other error while using it, the logical decoding
505 * infrastructure won't be able to recover from such an error even if the
506 * object is recreated again because a historic snapshot is used to evaluate
508 * (b) a user-defined function can be used to access tables that could have
509 * unpleasant results because a historic snapshot is used. That's why only
510 * immutable built-in functions are allowed in row filter expressions.
512 * We don't allow system columns because currently, we don't have that
513 * information in the tuple passed to downstream. Also, as we don't replicate
514 * those to subscribers, there doesn't seem to be a need for a filter on those
517 * We can allow other node types after more analysis and testing.
520 check_simple_rowfilter_expr_walker(Node
*node
, ParseState
*pstate
)
522 char *errdetail_msg
= NULL
;
527 switch (nodeTag(node
))
530 /* System columns are not allowed. */
531 if (((Var
*) node
)->varattno
< InvalidAttrNumber
)
532 errdetail_msg
= _("System columns are not allowed.");
537 /* OK, except user-defined operators are not allowed. */
538 if (((OpExpr
*) node
)->opno
>= FirstNormalObjectId
)
539 errdetail_msg
= _("User-defined operators are not allowed.");
541 case T_ScalarArrayOpExpr
:
542 /* OK, except user-defined operators are not allowed. */
543 if (((ScalarArrayOpExpr
*) node
)->opno
>= FirstNormalObjectId
)
544 errdetail_msg
= _("User-defined operators are not allowed.");
547 * We don't need to check the hashfuncid and negfuncid of
548 * ScalarArrayOpExpr as those functions are only built for a
552 case T_RowCompareExpr
:
556 /* OK, except user-defined operators are not allowed. */
557 foreach(opid
, ((RowCompareExpr
*) node
)->opnos
)
559 if (lfirst_oid(opid
) >= FirstNormalObjectId
)
561 errdetail_msg
= _("User-defined operators are not allowed.");
585 errdetail_msg
= _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
590 * For all the supported nodes, if we haven't already found a problem,
591 * check the types, functions, and collations used in it. We check List
592 * by walking through each element.
594 if (!errdetail_msg
&& !IsA(node
, List
))
596 if (exprType(node
) >= FirstNormalObjectId
)
597 errdetail_msg
= _("User-defined types are not allowed.");
598 else if (check_functions_in_node(node
, contain_mutable_or_user_functions_checker
,
600 errdetail_msg
= _("User-defined or built-in mutable functions are not allowed.");
601 else if (exprCollation(node
) >= FirstNormalObjectId
||
602 exprInputCollation(node
) >= FirstNormalObjectId
)
603 errdetail_msg
= _("User-defined collations are not allowed.");
607 * If we found a problem in this node, throw error now. Otherwise keep
612 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
613 errmsg("invalid publication WHERE expression"),
614 errdetail_internal("%s", errdetail_msg
),
615 parser_errposition(pstate
, exprLocation(node
))));
617 return expression_tree_walker(node
, check_simple_rowfilter_expr_walker
,
622 * Check if the row filter expression is a "simple expression".
624 * See check_simple_rowfilter_expr_walker for details.
627 check_simple_rowfilter_expr(Node
*node
, ParseState
*pstate
)
629 return check_simple_rowfilter_expr_walker(node
, pstate
);
633 * Transform the publication WHERE expression for all the relations in the list,
634 * ensuring it is coerced to boolean and necessary collation information is
635 * added if required, and add a new nsitem/RTE for the associated relation to
636 * the ParseState's namespace list.
638 * Also check the publication row filter expression and throw an error if
639 * anything not permitted or unexpected is encountered.
642 TransformPubWhereClauses(List
*tables
, const char *queryString
,
649 ParseNamespaceItem
*nsitem
;
650 Node
*whereclause
= NULL
;
652 PublicationRelInfo
*pri
= (PublicationRelInfo
*) lfirst(lc
);
654 if (pri
->whereClause
== NULL
)
658 * If the publication doesn't publish changes via the root partitioned
659 * table, the partition's row filter will be used. So disallow using
660 * WHERE clause on partitioned table in this case.
663 pri
->relation
->rd_rel
->relkind
== RELKIND_PARTITIONED_TABLE
)
665 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
666 errmsg("cannot use publication WHERE clause for relation \"%s\"",
667 RelationGetRelationName(pri
->relation
)),
668 errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
669 "publish_via_partition_root")));
672 * A fresh pstate is required so that we only have "this" table in its
675 pstate
= make_parsestate(NULL
);
676 pstate
->p_sourcetext
= queryString
;
677 nsitem
= addRangeTableEntryForRelation(pstate
, pri
->relation
,
678 AccessShareLock
, NULL
,
680 addNSItemToQuery(pstate
, nsitem
, false, true, true);
682 whereclause
= transformWhereClause(pstate
,
683 copyObject(pri
->whereClause
),
685 "PUBLICATION WHERE");
687 /* Fix up collation information */
688 assign_expr_collations(pstate
, whereclause
);
691 * We allow only simple expressions in row filters. See
692 * check_simple_rowfilter_expr_walker.
694 check_simple_rowfilter_expr(whereclause
, pstate
);
696 free_parsestate(pstate
);
698 pri
->whereClause
= whereclause
;
704 * Given a list of tables that are going to be added to a publication,
705 * verify that they fulfill the necessary preconditions, namely: no tables
706 * have a column list if any schema is published; and partitioned tables do
707 * not have column lists if publish_via_partition_root is not set.
709 * 'publish_schema' indicates that the publication contains any TABLES IN
710 * SCHEMA elements (newly added in this command, or preexisting).
711 * 'pubviaroot' is the value of publish_via_partition_root.
714 CheckPubRelationColumnList(char *pubname
, List
*tables
,
715 bool publish_schema
, bool pubviaroot
)
721 PublicationRelInfo
*pri
= (PublicationRelInfo
*) lfirst(lc
);
723 if (pri
->columns
== NIL
)
727 * Disallow specifying column list if any schema is in the
730 * XXX We could instead just forbid the case when the publication
731 * tries to publish the table with a column list and a schema for that
732 * table. However, if we do that then we need a restriction during
733 * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
734 * seem to be a good idea.
738 errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
739 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
740 get_namespace_name(RelationGetNamespace(pri
->relation
)),
741 RelationGetRelationName(pri
->relation
), pubname
),
742 errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
745 * If the publication doesn't publish changes via the root partitioned
746 * table, the partition's column list will be used. So disallow using
747 * a column list on the partitioned table in this case.
750 pri
->relation
->rd_rel
->relkind
== RELKIND_PARTITIONED_TABLE
)
752 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
753 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
754 get_namespace_name(RelationGetNamespace(pri
->relation
)),
755 RelationGetRelationName(pri
->relation
), pubname
),
756 errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
757 "publish_via_partition_root")));
762 * Create new publication.
765 CreatePublication(ParseState
*pstate
, CreatePublicationStmt
*stmt
)
768 ObjectAddress myself
;
770 bool nulls
[Natts_pg_publication
];
771 Datum values
[Natts_pg_publication
];
774 PublicationActions pubactions
;
775 bool publish_via_partition_root_given
;
776 bool publish_via_partition_root
;
777 bool publish_generated_columns_given
;
778 bool publish_generated_columns
;
780 List
*relations
= NIL
;
781 List
*schemaidlist
= NIL
;
783 /* must have CREATE privilege on database */
784 aclresult
= object_aclcheck(DatabaseRelationId
, MyDatabaseId
, GetUserId(), ACL_CREATE
);
785 if (aclresult
!= ACLCHECK_OK
)
786 aclcheck_error(aclresult
, OBJECT_DATABASE
,
787 get_database_name(MyDatabaseId
));
789 /* FOR ALL TABLES requires superuser */
790 if (stmt
->for_all_tables
&& !superuser())
792 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE
),
793 errmsg("must be superuser to create FOR ALL TABLES publication")));
795 rel
= table_open(PublicationRelationId
, RowExclusiveLock
);
797 /* Check if name is used */
798 puboid
= GetSysCacheOid1(PUBLICATIONNAME
, Anum_pg_publication_oid
,
799 CStringGetDatum(stmt
->pubname
));
800 if (OidIsValid(puboid
))
802 (errcode(ERRCODE_DUPLICATE_OBJECT
),
803 errmsg("publication \"%s\" already exists",
807 memset(values
, 0, sizeof(values
));
808 memset(nulls
, false, sizeof(nulls
));
810 values
[Anum_pg_publication_pubname
- 1] =
811 DirectFunctionCall1(namein
, CStringGetDatum(stmt
->pubname
));
812 values
[Anum_pg_publication_pubowner
- 1] = ObjectIdGetDatum(GetUserId());
814 parse_publication_options(pstate
,
816 &publish_given
, &pubactions
,
817 &publish_via_partition_root_given
,
818 &publish_via_partition_root
,
819 &publish_generated_columns_given
,
820 &publish_generated_columns
);
822 puboid
= GetNewOidWithIndex(rel
, PublicationObjectIndexId
,
823 Anum_pg_publication_oid
);
824 values
[Anum_pg_publication_oid
- 1] = ObjectIdGetDatum(puboid
);
825 values
[Anum_pg_publication_puballtables
- 1] =
826 BoolGetDatum(stmt
->for_all_tables
);
827 values
[Anum_pg_publication_pubinsert
- 1] =
828 BoolGetDatum(pubactions
.pubinsert
);
829 values
[Anum_pg_publication_pubupdate
- 1] =
830 BoolGetDatum(pubactions
.pubupdate
);
831 values
[Anum_pg_publication_pubdelete
- 1] =
832 BoolGetDatum(pubactions
.pubdelete
);
833 values
[Anum_pg_publication_pubtruncate
- 1] =
834 BoolGetDatum(pubactions
.pubtruncate
);
835 values
[Anum_pg_publication_pubviaroot
- 1] =
836 BoolGetDatum(publish_via_partition_root
);
837 values
[Anum_pg_publication_pubgencols
- 1] =
838 BoolGetDatum(publish_generated_columns
);
840 tup
= heap_form_tuple(RelationGetDescr(rel
), values
, nulls
);
842 /* Insert tuple into catalog. */
843 CatalogTupleInsert(rel
, tup
);
846 recordDependencyOnOwner(PublicationRelationId
, puboid
, GetUserId());
848 ObjectAddressSet(myself
, PublicationRelationId
, puboid
);
850 /* Make the changes visible. */
851 CommandCounterIncrement();
853 /* Associate objects with the publication. */
854 if (stmt
->for_all_tables
)
856 /* Invalidate relcache so that publication info is rebuilt. */
857 CacheInvalidateRelcacheAll();
861 ObjectsInPublicationToOids(stmt
->pubobjects
, pstate
, &relations
,
864 /* FOR TABLES IN SCHEMA requires superuser */
865 if (schemaidlist
!= NIL
&& !superuser())
867 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE
),
868 errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
870 if (relations
!= NIL
)
874 rels
= OpenTableList(relations
);
875 TransformPubWhereClauses(rels
, pstate
->p_sourcetext
,
876 publish_via_partition_root
);
878 CheckPubRelationColumnList(stmt
->pubname
, rels
,
880 publish_via_partition_root
);
882 PublicationAddTables(puboid
, rels
, true, NULL
);
883 CloseTableList(rels
);
886 if (schemaidlist
!= NIL
)
889 * Schema lock is held until the publication is created to prevent
890 * concurrent schema deletion.
892 LockSchemaList(schemaidlist
);
893 PublicationAddSchemas(puboid
, schemaidlist
, true, NULL
);
897 table_close(rel
, RowExclusiveLock
);
899 InvokeObjectPostCreateHook(PublicationRelationId
, puboid
, 0);
901 if (wal_level
!= WAL_LEVEL_LOGICAL
)
903 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
904 errmsg("\"wal_level\" is insufficient to publish logical changes"),
905 errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
911 * Change options of a publication.
914 AlterPublicationOptions(ParseState
*pstate
, AlterPublicationStmt
*stmt
,
915 Relation rel
, HeapTuple tup
)
917 bool nulls
[Natts_pg_publication
];
918 bool replaces
[Natts_pg_publication
];
919 Datum values
[Natts_pg_publication
];
921 PublicationActions pubactions
;
922 bool publish_via_partition_root_given
;
923 bool publish_via_partition_root
;
924 bool publish_generated_columns_given
;
925 bool publish_generated_columns
;
927 Form_pg_publication pubform
;
928 List
*root_relids
= NIL
;
931 parse_publication_options(pstate
,
933 &publish_given
, &pubactions
,
934 &publish_via_partition_root_given
,
935 &publish_via_partition_root
,
936 &publish_generated_columns_given
,
937 &publish_generated_columns
);
939 pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
942 * If the publication doesn't publish changes via the root partitioned
943 * table, the partition's row filter and column list will be used. So
944 * disallow using WHERE clause and column lists on partitioned table in
947 if (!pubform
->puballtables
&& publish_via_partition_root_given
&&
948 !publish_via_partition_root
)
951 * Lock the publication so nobody else can do anything with it. This
952 * prevents concurrent alter to add partitioned table(s) with WHERE
953 * clause(s) and/or column lists which we don't allow when not
954 * publishing via root.
956 LockDatabaseObject(PublicationRelationId
, pubform
->oid
, 0,
959 root_relids
= GetPublicationRelations(pubform
->oid
,
960 PUBLICATION_PART_ROOT
);
962 foreach(lc
, root_relids
)
964 Oid relid
= lfirst_oid(lc
);
972 * Beware: we don't have lock on the relations, so cope silently
973 * with the cache lookups returning NULL.
976 rftuple
= SearchSysCache2(PUBLICATIONRELMAP
,
977 ObjectIdGetDatum(relid
),
978 ObjectIdGetDatum(pubform
->oid
));
979 if (!HeapTupleIsValid(rftuple
))
981 has_rowfilter
= !heap_attisnull(rftuple
, Anum_pg_publication_rel_prqual
, NULL
);
982 has_collist
= !heap_attisnull(rftuple
, Anum_pg_publication_rel_prattrs
, NULL
);
983 if (!has_rowfilter
&& !has_collist
)
985 ReleaseSysCache(rftuple
);
989 relkind
= get_rel_relkind(relid
);
990 if (relkind
!= RELKIND_PARTITIONED_TABLE
)
992 ReleaseSysCache(rftuple
);
995 relname
= get_rel_name(relid
);
996 if (relname
== NULL
) /* table concurrently dropped */
998 ReleaseSysCache(rftuple
);
1004 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1005 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1006 "publish_via_partition_root",
1008 errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1009 relname
, "publish_via_partition_root")));
1010 Assert(has_collist
);
1012 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1013 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1014 "publish_via_partition_root",
1016 errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1017 relname
, "publish_via_partition_root")));
1021 /* Everything ok, form a new tuple. */
1022 memset(values
, 0, sizeof(values
));
1023 memset(nulls
, false, sizeof(nulls
));
1024 memset(replaces
, false, sizeof(replaces
));
1028 values
[Anum_pg_publication_pubinsert
- 1] = BoolGetDatum(pubactions
.pubinsert
);
1029 replaces
[Anum_pg_publication_pubinsert
- 1] = true;
1031 values
[Anum_pg_publication_pubupdate
- 1] = BoolGetDatum(pubactions
.pubupdate
);
1032 replaces
[Anum_pg_publication_pubupdate
- 1] = true;
1034 values
[Anum_pg_publication_pubdelete
- 1] = BoolGetDatum(pubactions
.pubdelete
);
1035 replaces
[Anum_pg_publication_pubdelete
- 1] = true;
1037 values
[Anum_pg_publication_pubtruncate
- 1] = BoolGetDatum(pubactions
.pubtruncate
);
1038 replaces
[Anum_pg_publication_pubtruncate
- 1] = true;
1041 if (publish_via_partition_root_given
)
1043 values
[Anum_pg_publication_pubviaroot
- 1] = BoolGetDatum(publish_via_partition_root
);
1044 replaces
[Anum_pg_publication_pubviaroot
- 1] = true;
1047 if (publish_generated_columns_given
)
1049 values
[Anum_pg_publication_pubgencols
- 1] = BoolGetDatum(publish_generated_columns
);
1050 replaces
[Anum_pg_publication_pubgencols
- 1] = true;
1053 tup
= heap_modify_tuple(tup
, RelationGetDescr(rel
), values
, nulls
,
1056 /* Update the catalog. */
1057 CatalogTupleUpdate(rel
, &tup
->t_self
, tup
);
1059 CommandCounterIncrement();
1061 pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
1063 /* Invalidate the relcache. */
1064 if (pubform
->puballtables
)
1066 CacheInvalidateRelcacheAll();
1071 List
*schemarelids
= NIL
;
1074 * For any partitioned tables contained in the publication, we must
1075 * invalidate all partitions contained in the respective partition
1076 * trees, not just those explicitly mentioned in the publication.
1078 if (root_relids
== NIL
)
1079 relids
= GetPublicationRelations(pubform
->oid
,
1080 PUBLICATION_PART_ALL
);
1084 * We already got tables explicitly mentioned in the publication.
1085 * Now get all partitions for the partitioned table in the list.
1087 foreach(lc
, root_relids
)
1088 relids
= GetPubPartitionOptionRelations(relids
,
1089 PUBLICATION_PART_ALL
,
1093 schemarelids
= GetAllSchemaPublicationRelations(pubform
->oid
,
1094 PUBLICATION_PART_ALL
);
1095 relids
= list_concat_unique_oid(relids
, schemarelids
);
1097 InvalidatePublicationRels(relids
);
1100 ObjectAddressSet(obj
, PublicationRelationId
, pubform
->oid
);
1101 EventTriggerCollectSimpleCommand(obj
, InvalidObjectAddress
,
1104 InvokeObjectPostAlterHook(PublicationRelationId
, pubform
->oid
, 0);
1108 * Invalidate the relations.
1111 InvalidatePublicationRels(List
*relids
)
1114 * We don't want to send too many individual messages, at some point it's
1115 * cheaper to just reset whole relcache.
1117 if (list_length(relids
) < MAX_RELCACHE_INVAL_MSGS
)
1122 CacheInvalidateRelcacheByRelid(lfirst_oid(lc
));
1125 CacheInvalidateRelcacheAll();
1129 * Add or remove table to/from publication.
1132 AlterPublicationTables(AlterPublicationStmt
*stmt
, HeapTuple tup
,
1133 List
*tables
, const char *queryString
,
1134 bool publish_schema
)
1137 Form_pg_publication pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
1138 Oid pubid
= pubform
->oid
;
1141 * Nothing to do if no objects, except in SET: for that it is quite
1142 * possible that user has not specified any tables in which case we need
1143 * to remove all the existing tables.
1145 if (!tables
&& stmt
->action
!= AP_SetObjects
)
1148 rels
= OpenTableList(tables
);
1150 if (stmt
->action
== AP_AddObjects
)
1152 TransformPubWhereClauses(rels
, queryString
, pubform
->pubviaroot
);
1154 publish_schema
|= is_schema_publication(pubid
);
1156 CheckPubRelationColumnList(stmt
->pubname
, rels
, publish_schema
,
1157 pubform
->pubviaroot
);
1159 PublicationAddTables(pubid
, rels
, false, stmt
);
1161 else if (stmt
->action
== AP_DropObjects
)
1162 PublicationDropTables(pubid
, rels
, false);
1163 else /* AP_SetObjects */
1165 List
*oldrelids
= GetPublicationRelations(pubid
,
1166 PUBLICATION_PART_ROOT
);
1167 List
*delrels
= NIL
;
1170 TransformPubWhereClauses(rels
, queryString
, pubform
->pubviaroot
);
1172 CheckPubRelationColumnList(stmt
->pubname
, rels
, publish_schema
,
1173 pubform
->pubviaroot
);
1176 * To recreate the relation list for the publication, look for
1177 * existing relations that do not need to be dropped.
1179 foreach(oldlc
, oldrelids
)
1181 Oid oldrelid
= lfirst_oid(oldlc
);
1183 PublicationRelInfo
*oldrel
;
1186 Node
*oldrelwhereclause
= NULL
;
1187 Bitmapset
*oldcolumns
= NULL
;
1189 /* look up the cache for the old relmap */
1190 rftuple
= SearchSysCache2(PUBLICATIONRELMAP
,
1191 ObjectIdGetDatum(oldrelid
),
1192 ObjectIdGetDatum(pubid
));
1195 * See if the existing relation currently has a WHERE clause or a
1196 * column list. We need to compare those too.
1198 if (HeapTupleIsValid(rftuple
))
1201 Datum whereClauseDatum
;
1202 Datum columnListDatum
;
1204 /* Load the WHERE clause for this table. */
1205 whereClauseDatum
= SysCacheGetAttr(PUBLICATIONRELMAP
, rftuple
,
1206 Anum_pg_publication_rel_prqual
,
1209 oldrelwhereclause
= stringToNode(TextDatumGetCString(whereClauseDatum
));
1211 /* Transform the int2vector column list to a bitmap. */
1212 columnListDatum
= SysCacheGetAttr(PUBLICATIONRELMAP
, rftuple
,
1213 Anum_pg_publication_rel_prattrs
,
1217 oldcolumns
= pub_collist_to_bitmapset(NULL
, columnListDatum
, NULL
);
1219 ReleaseSysCache(rftuple
);
1222 foreach(newlc
, rels
)
1224 PublicationRelInfo
*newpubrel
;
1226 Bitmapset
*newcolumns
= NULL
;
1228 newpubrel
= (PublicationRelInfo
*) lfirst(newlc
);
1229 newrelid
= RelationGetRelid(newpubrel
->relation
);
1232 * Validate the column list. If the column list or WHERE
1233 * clause changes, then the validation done here will be
1234 * duplicated inside PublicationAddTables(). The validation
1235 * is cheap enough that that seems harmless.
1237 newcolumns
= pub_collist_validate(newpubrel
->relation
,
1238 newpubrel
->columns
);
1241 * Check if any of the new set of relations matches with the
1242 * existing relations in the publication. Additionally, if the
1243 * relation has an associated WHERE clause, check the WHERE
1244 * expressions also match. Same for the column list. Drop the
1247 if (newrelid
== oldrelid
)
1249 if (equal(oldrelwhereclause
, newpubrel
->whereClause
) &&
1250 bms_equal(oldcolumns
, newcolumns
))
1259 * Add the non-matched relations to a list so that they can be
1264 oldrel
= palloc(sizeof(PublicationRelInfo
));
1265 oldrel
->whereClause
= NULL
;
1266 oldrel
->columns
= NIL
;
1267 oldrel
->relation
= table_open(oldrelid
,
1268 ShareUpdateExclusiveLock
);
1269 delrels
= lappend(delrels
, oldrel
);
1273 /* And drop them. */
1274 PublicationDropTables(pubid
, delrels
, true);
1277 * Don't bother calculating the difference for adding, we'll catch and
1278 * skip existing ones when doing catalog update.
1280 PublicationAddTables(pubid
, rels
, true, stmt
);
1282 CloseTableList(delrels
);
1285 CloseTableList(rels
);
1289 * Alter the publication schemas.
1291 * Add or remove schemas to/from publication.
1294 AlterPublicationSchemas(AlterPublicationStmt
*stmt
,
1295 HeapTuple tup
, List
*schemaidlist
)
1297 Form_pg_publication pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
1300 * Nothing to do if no objects, except in SET: for that it is quite
1301 * possible that user has not specified any schemas in which case we need
1302 * to remove all the existing schemas.
1304 if (!schemaidlist
&& stmt
->action
!= AP_SetObjects
)
1308 * Schema lock is held until the publication is altered to prevent
1309 * concurrent schema deletion.
1311 LockSchemaList(schemaidlist
);
1312 if (stmt
->action
== AP_AddObjects
)
1317 reloids
= GetPublicationRelations(pubform
->oid
, PUBLICATION_PART_ROOT
);
1319 foreach(lc
, reloids
)
1323 coltuple
= SearchSysCache2(PUBLICATIONRELMAP
,
1324 ObjectIdGetDatum(lfirst_oid(lc
)),
1325 ObjectIdGetDatum(pubform
->oid
));
1327 if (!HeapTupleIsValid(coltuple
))
1331 * Disallow adding schema if column list is already part of the
1332 * publication. See CheckPubRelationColumnList.
1334 if (!heap_attisnull(coltuple
, Anum_pg_publication_rel_prattrs
, NULL
))
1336 errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1337 errmsg("cannot add schema to publication \"%s\"",
1339 errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1341 ReleaseSysCache(coltuple
);
1344 PublicationAddSchemas(pubform
->oid
, schemaidlist
, false, stmt
);
1346 else if (stmt
->action
== AP_DropObjects
)
1347 PublicationDropSchemas(pubform
->oid
, schemaidlist
, false);
1348 else /* AP_SetObjects */
1350 List
*oldschemaids
= GetPublicationSchemas(pubform
->oid
);
1351 List
*delschemas
= NIL
;
1353 /* Identify which schemas should be dropped */
1354 delschemas
= list_difference_oid(oldschemaids
, schemaidlist
);
1357 * Schema lock is held until the publication is altered to prevent
1358 * concurrent schema deletion.
1360 LockSchemaList(delschemas
);
1363 PublicationDropSchemas(pubform
->oid
, delschemas
, true);
1366 * Don't bother calculating the difference for adding, we'll catch and
1367 * skip existing ones when doing catalog update.
1369 PublicationAddSchemas(pubform
->oid
, schemaidlist
, true, stmt
);
1374 * Check if relations and schemas can be in a given publication and throw
1375 * appropriate error if not.
1378 CheckAlterPublication(AlterPublicationStmt
*stmt
, HeapTuple tup
,
1379 List
*tables
, List
*schemaidlist
)
1381 Form_pg_publication pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
1383 if ((stmt
->action
== AP_AddObjects
|| stmt
->action
== AP_SetObjects
) &&
1384 schemaidlist
&& !superuser())
1386 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE
),
1387 errmsg("must be superuser to add or set schemas")));
1390 * Check that user is allowed to manipulate the publication tables in
1393 if (schemaidlist
&& pubform
->puballtables
)
1395 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
1396 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1397 NameStr(pubform
->pubname
)),
1398 errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1400 /* Check that user is allowed to manipulate the publication tables. */
1401 if (tables
&& pubform
->puballtables
)
1403 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
1404 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1405 NameStr(pubform
->pubname
)),
1406 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1410 * Alter the existing publication.
1412 * This is dispatcher function for AlterPublicationOptions,
1413 * AlterPublicationSchemas and AlterPublicationTables.
1416 AlterPublication(ParseState
*pstate
, AlterPublicationStmt
*stmt
)
1420 Form_pg_publication pubform
;
1422 rel
= table_open(PublicationRelationId
, RowExclusiveLock
);
1424 tup
= SearchSysCacheCopy1(PUBLICATIONNAME
,
1425 CStringGetDatum(stmt
->pubname
));
1427 if (!HeapTupleIsValid(tup
))
1429 (errcode(ERRCODE_UNDEFINED_OBJECT
),
1430 errmsg("publication \"%s\" does not exist",
1433 pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
1436 if (!object_ownercheck(PublicationRelationId
, pubform
->oid
, GetUserId()))
1437 aclcheck_error(ACLCHECK_NOT_OWNER
, OBJECT_PUBLICATION
,
1441 AlterPublicationOptions(pstate
, stmt
, rel
, tup
);
1444 List
*relations
= NIL
;
1445 List
*schemaidlist
= NIL
;
1446 Oid pubid
= pubform
->oid
;
1448 ObjectsInPublicationToOids(stmt
->pubobjects
, pstate
, &relations
,
1451 CheckAlterPublication(stmt
, tup
, relations
, schemaidlist
);
1453 heap_freetuple(tup
);
1455 /* Lock the publication so nobody else can do anything with it. */
1456 LockDatabaseObject(PublicationRelationId
, pubid
, 0,
1457 AccessExclusiveLock
);
1460 * It is possible that by the time we acquire the lock on publication,
1461 * concurrent DDL has removed it. We can test this by checking the
1462 * existence of publication. We get the tuple again to avoid the risk
1463 * of any publication option getting changed.
1465 tup
= SearchSysCacheCopy1(PUBLICATIONOID
, ObjectIdGetDatum(pubid
));
1466 if (!HeapTupleIsValid(tup
))
1468 errcode(ERRCODE_UNDEFINED_OBJECT
),
1469 errmsg("publication \"%s\" does not exist",
1472 AlterPublicationTables(stmt
, tup
, relations
, pstate
->p_sourcetext
,
1473 schemaidlist
!= NIL
);
1474 AlterPublicationSchemas(stmt
, tup
, schemaidlist
);
1478 heap_freetuple(tup
);
1479 table_close(rel
, RowExclusiveLock
);
1483 * Remove relation from publication by mapping OID.
1486 RemovePublicationRelById(Oid proid
)
1490 Form_pg_publication_rel pubrel
;
1493 rel
= table_open(PublicationRelRelationId
, RowExclusiveLock
);
1495 tup
= SearchSysCache1(PUBLICATIONREL
, ObjectIdGetDatum(proid
));
1497 if (!HeapTupleIsValid(tup
))
1498 elog(ERROR
, "cache lookup failed for publication table %u",
1501 pubrel
= (Form_pg_publication_rel
) GETSTRUCT(tup
);
1504 * Invalidate relcache so that publication info is rebuilt.
1506 * For the partitioned tables, we must invalidate all partitions contained
1507 * in the respective partition hierarchies, not just the one explicitly
1508 * mentioned in the publication. This is required because we implicitly
1509 * publish the child tables when the parent table is published.
1511 relids
= GetPubPartitionOptionRelations(relids
, PUBLICATION_PART_ALL
,
1514 InvalidatePublicationRels(relids
);
1516 CatalogTupleDelete(rel
, &tup
->t_self
);
1518 ReleaseSysCache(tup
);
1520 table_close(rel
, RowExclusiveLock
);
1524 * Remove the publication by mapping OID.
1527 RemovePublicationById(Oid pubid
)
1531 Form_pg_publication pubform
;
1533 rel
= table_open(PublicationRelationId
, RowExclusiveLock
);
1535 tup
= SearchSysCache1(PUBLICATIONOID
, ObjectIdGetDatum(pubid
));
1536 if (!HeapTupleIsValid(tup
))
1537 elog(ERROR
, "cache lookup failed for publication %u", pubid
);
1539 pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
1541 /* Invalidate relcache so that publication info is rebuilt. */
1542 if (pubform
->puballtables
)
1543 CacheInvalidateRelcacheAll();
1545 CatalogTupleDelete(rel
, &tup
->t_self
);
1547 ReleaseSysCache(tup
);
1549 table_close(rel
, RowExclusiveLock
);
1553 * Remove schema from publication by mapping OID.
1556 RemovePublicationSchemaById(Oid psoid
)
1560 List
*schemaRels
= NIL
;
1561 Form_pg_publication_namespace pubsch
;
1563 rel
= table_open(PublicationNamespaceRelationId
, RowExclusiveLock
);
1565 tup
= SearchSysCache1(PUBLICATIONNAMESPACE
, ObjectIdGetDatum(psoid
));
1567 if (!HeapTupleIsValid(tup
))
1568 elog(ERROR
, "cache lookup failed for publication schema %u", psoid
);
1570 pubsch
= (Form_pg_publication_namespace
) GETSTRUCT(tup
);
1573 * Invalidate relcache so that publication info is rebuilt. See
1574 * RemovePublicationRelById for why we need to consider all the
1577 schemaRels
= GetSchemaPublicationRelations(pubsch
->pnnspid
,
1578 PUBLICATION_PART_ALL
);
1579 InvalidatePublicationRels(schemaRels
);
1581 CatalogTupleDelete(rel
, &tup
->t_self
);
1583 ReleaseSysCache(tup
);
1585 table_close(rel
, RowExclusiveLock
);
1589 * Open relations specified by a PublicationTable list.
1590 * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1591 * add them to a publication.
1594 OpenTableList(List
*tables
)
1599 List
*relids_with_rf
= NIL
;
1600 List
*relids_with_collist
= NIL
;
1603 * Open, share-lock, and check all the explicitly-specified relations
1607 PublicationTable
*t
= lfirst_node(PublicationTable
, lc
);
1608 bool recurse
= t
->relation
->inh
;
1611 PublicationRelInfo
*pub_rel
;
1613 /* Allow query cancel in case this takes a long time */
1614 CHECK_FOR_INTERRUPTS();
1616 rel
= table_openrv(t
->relation
, ShareUpdateExclusiveLock
);
1617 myrelid
= RelationGetRelid(rel
);
1620 * Filter out duplicates if user specifies "foo, foo".
1622 * Note that this algorithm is known to not be very efficient (O(N^2))
1623 * but given that it only works on list of tables given to us by user
1624 * it's deemed acceptable.
1626 if (list_member_oid(relids
, myrelid
))
1628 /* Disallow duplicate tables if there are any with row filters. */
1629 if (t
->whereClause
|| list_member_oid(relids_with_rf
, myrelid
))
1631 (errcode(ERRCODE_DUPLICATE_OBJECT
),
1632 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1633 RelationGetRelationName(rel
))));
1635 /* Disallow duplicate tables if there are any with column lists. */
1636 if (t
->columns
|| list_member_oid(relids_with_collist
, myrelid
))
1638 (errcode(ERRCODE_DUPLICATE_OBJECT
),
1639 errmsg("conflicting or redundant column lists for table \"%s\"",
1640 RelationGetRelationName(rel
))));
1642 table_close(rel
, ShareUpdateExclusiveLock
);
1646 pub_rel
= palloc(sizeof(PublicationRelInfo
));
1647 pub_rel
->relation
= rel
;
1648 pub_rel
->whereClause
= t
->whereClause
;
1649 pub_rel
->columns
= t
->columns
;
1650 rels
= lappend(rels
, pub_rel
);
1651 relids
= lappend_oid(relids
, myrelid
);
1654 relids_with_rf
= lappend_oid(relids_with_rf
, myrelid
);
1657 relids_with_collist
= lappend_oid(relids_with_collist
, myrelid
);
1660 * Add children of this rel, if requested, so that they too are added
1661 * to the publication. A partitioned table can't have any inheritance
1662 * children other than its partitions, which need not be explicitly
1663 * added to the publication.
1665 if (recurse
&& rel
->rd_rel
->relkind
!= RELKIND_PARTITIONED_TABLE
)
1670 children
= find_all_inheritors(myrelid
, ShareUpdateExclusiveLock
,
1673 foreach(child
, children
)
1675 Oid childrelid
= lfirst_oid(child
);
1677 /* Allow query cancel in case this takes a long time */
1678 CHECK_FOR_INTERRUPTS();
1681 * Skip duplicates if user specified both parent and child
1684 if (list_member_oid(relids
, childrelid
))
1687 * We don't allow to specify row filter for both parent
1688 * and child table at the same time as it is not very
1689 * clear which one should be given preference.
1691 if (childrelid
!= myrelid
&&
1692 (t
->whereClause
|| list_member_oid(relids_with_rf
, childrelid
)))
1694 (errcode(ERRCODE_DUPLICATE_OBJECT
),
1695 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1696 RelationGetRelationName(rel
))));
1699 * We don't allow to specify column list for both parent
1700 * and child table at the same time as it is not very
1701 * clear which one should be given preference.
1703 if (childrelid
!= myrelid
&&
1704 (t
->columns
|| list_member_oid(relids_with_collist
, childrelid
)))
1706 (errcode(ERRCODE_DUPLICATE_OBJECT
),
1707 errmsg("conflicting or redundant column lists for table \"%s\"",
1708 RelationGetRelationName(rel
))));
1713 /* find_all_inheritors already got lock */
1714 rel
= table_open(childrelid
, NoLock
);
1715 pub_rel
= palloc(sizeof(PublicationRelInfo
));
1716 pub_rel
->relation
= rel
;
1717 /* child inherits WHERE clause from parent */
1718 pub_rel
->whereClause
= t
->whereClause
;
1720 /* child inherits column list from parent */
1721 pub_rel
->columns
= t
->columns
;
1722 rels
= lappend(rels
, pub_rel
);
1723 relids
= lappend_oid(relids
, childrelid
);
1726 relids_with_rf
= lappend_oid(relids_with_rf
, childrelid
);
1729 relids_with_collist
= lappend_oid(relids_with_collist
, childrelid
);
1735 list_free(relids_with_rf
);
1741 * Close all relations in the list.
1744 CloseTableList(List
*rels
)
1750 PublicationRelInfo
*pub_rel
;
1752 pub_rel
= (PublicationRelInfo
*) lfirst(lc
);
1753 table_close(pub_rel
->relation
, NoLock
);
1756 list_free_deep(rels
);
1760 * Lock the schemas specified in the schema list in AccessShareLock mode in
1761 * order to prevent concurrent schema deletion.
1764 LockSchemaList(List
*schemalist
)
1768 foreach(lc
, schemalist
)
1770 Oid schemaid
= lfirst_oid(lc
);
1772 /* Allow query cancel in case this takes a long time */
1773 CHECK_FOR_INTERRUPTS();
1774 LockDatabaseObject(NamespaceRelationId
, schemaid
, 0, AccessShareLock
);
1777 * It is possible that by the time we acquire the lock on schema,
1778 * concurrent DDL has removed it. We can test this by checking the
1779 * existence of schema.
1781 if (!SearchSysCacheExists1(NAMESPACEOID
, ObjectIdGetDatum(schemaid
)))
1783 errcode(ERRCODE_UNDEFINED_SCHEMA
),
1784 errmsg("schema with OID %u does not exist", schemaid
));
1789 * Add listed tables to the publication.
1792 PublicationAddTables(Oid pubid
, List
*rels
, bool if_not_exists
,
1793 AlterPublicationStmt
*stmt
)
1797 Assert(!stmt
|| !stmt
->for_all_tables
);
1801 PublicationRelInfo
*pub_rel
= (PublicationRelInfo
*) lfirst(lc
);
1802 Relation rel
= pub_rel
->relation
;
1805 /* Must be owner of the table or superuser. */
1806 if (!object_ownercheck(RelationRelationId
, RelationGetRelid(rel
), GetUserId()))
1807 aclcheck_error(ACLCHECK_NOT_OWNER
, get_relkind_objtype(rel
->rd_rel
->relkind
),
1808 RelationGetRelationName(rel
));
1810 obj
= publication_add_relation(pubid
, pub_rel
, if_not_exists
);
1813 EventTriggerCollectSimpleCommand(obj
, InvalidObjectAddress
,
1816 InvokeObjectPostCreateHook(PublicationRelRelationId
,
1823 * Remove listed tables from the publication.
1826 PublicationDropTables(Oid pubid
, List
*rels
, bool missing_ok
)
1834 PublicationRelInfo
*pubrel
= (PublicationRelInfo
*) lfirst(lc
);
1835 Relation rel
= pubrel
->relation
;
1836 Oid relid
= RelationGetRelid(rel
);
1838 if (pubrel
->columns
)
1840 errcode(ERRCODE_SYNTAX_ERROR
),
1841 errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1843 prid
= GetSysCacheOid2(PUBLICATIONRELMAP
, Anum_pg_publication_rel_oid
,
1844 ObjectIdGetDatum(relid
),
1845 ObjectIdGetDatum(pubid
));
1846 if (!OidIsValid(prid
))
1852 (errcode(ERRCODE_UNDEFINED_OBJECT
),
1853 errmsg("relation \"%s\" is not part of the publication",
1854 RelationGetRelationName(rel
))));
1857 if (pubrel
->whereClause
)
1859 (errcode(ERRCODE_SYNTAX_ERROR
),
1860 errmsg("cannot use a WHERE clause when removing a table from a publication")));
1862 ObjectAddressSet(obj
, PublicationRelRelationId
, prid
);
1863 performDeletion(&obj
, DROP_CASCADE
, 0);
1868 * Add listed schemas to the publication.
1871 PublicationAddSchemas(Oid pubid
, List
*schemas
, bool if_not_exists
,
1872 AlterPublicationStmt
*stmt
)
1876 Assert(!stmt
|| !stmt
->for_all_tables
);
1878 foreach(lc
, schemas
)
1880 Oid schemaid
= lfirst_oid(lc
);
1883 obj
= publication_add_schema(pubid
, schemaid
, if_not_exists
);
1886 EventTriggerCollectSimpleCommand(obj
, InvalidObjectAddress
,
1889 InvokeObjectPostCreateHook(PublicationNamespaceRelationId
,
1896 * Remove listed schemas from the publication.
1899 PublicationDropSchemas(Oid pubid
, List
*schemas
, bool missing_ok
)
1905 foreach(lc
, schemas
)
1907 Oid schemaid
= lfirst_oid(lc
);
1909 psid
= GetSysCacheOid2(PUBLICATIONNAMESPACEMAP
,
1910 Anum_pg_publication_namespace_oid
,
1911 ObjectIdGetDatum(schemaid
),
1912 ObjectIdGetDatum(pubid
));
1913 if (!OidIsValid(psid
))
1919 (errcode(ERRCODE_UNDEFINED_OBJECT
),
1920 errmsg("tables from schema \"%s\" are not part of the publication",
1921 get_namespace_name(schemaid
))));
1924 ObjectAddressSet(obj
, PublicationNamespaceRelationId
, psid
);
1925 performDeletion(&obj
, DROP_CASCADE
, 0);
1930 * Internal workhorse for changing a publication owner
1933 AlterPublicationOwner_internal(Relation rel
, HeapTuple tup
, Oid newOwnerId
)
1935 Form_pg_publication form
;
1937 form
= (Form_pg_publication
) GETSTRUCT(tup
);
1939 if (form
->pubowner
== newOwnerId
)
1944 AclResult aclresult
;
1947 if (!object_ownercheck(PublicationRelationId
, form
->oid
, GetUserId()))
1948 aclcheck_error(ACLCHECK_NOT_OWNER
, OBJECT_PUBLICATION
,
1949 NameStr(form
->pubname
));
1951 /* Must be able to become new owner */
1952 check_can_set_role(GetUserId(), newOwnerId
);
1954 /* New owner must have CREATE privilege on database */
1955 aclresult
= object_aclcheck(DatabaseRelationId
, MyDatabaseId
, newOwnerId
, ACL_CREATE
);
1956 if (aclresult
!= ACLCHECK_OK
)
1957 aclcheck_error(aclresult
, OBJECT_DATABASE
,
1958 get_database_name(MyDatabaseId
));
1960 if (form
->puballtables
&& !superuser_arg(newOwnerId
))
1962 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE
),
1963 errmsg("permission denied to change owner of publication \"%s\"",
1964 NameStr(form
->pubname
)),
1965 errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1967 if (!superuser_arg(newOwnerId
) && is_schema_publication(form
->oid
))
1969 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE
),
1970 errmsg("permission denied to change owner of publication \"%s\"",
1971 NameStr(form
->pubname
)),
1972 errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1975 form
->pubowner
= newOwnerId
;
1976 CatalogTupleUpdate(rel
, &tup
->t_self
, tup
);
1978 /* Update owner dependency reference */
1979 changeDependencyOnOwner(PublicationRelationId
,
1983 InvokeObjectPostAlterHook(PublicationRelationId
,
1988 * Change publication owner -- by name
1991 AlterPublicationOwner(const char *name
, Oid newOwnerId
)
1996 ObjectAddress address
;
1997 Form_pg_publication pubform
;
1999 rel
= table_open(PublicationRelationId
, RowExclusiveLock
);
2001 tup
= SearchSysCacheCopy1(PUBLICATIONNAME
, CStringGetDatum(name
));
2003 if (!HeapTupleIsValid(tup
))
2005 (errcode(ERRCODE_UNDEFINED_OBJECT
),
2006 errmsg("publication \"%s\" does not exist", name
)));
2008 pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
2009 subid
= pubform
->oid
;
2011 AlterPublicationOwner_internal(rel
, tup
, newOwnerId
);
2013 ObjectAddressSet(address
, PublicationRelationId
, subid
);
2015 heap_freetuple(tup
);
2017 table_close(rel
, RowExclusiveLock
);
2023 * Change publication owner -- by OID
2026 AlterPublicationOwner_oid(Oid subid
, Oid newOwnerId
)
2031 rel
= table_open(PublicationRelationId
, RowExclusiveLock
);
2033 tup
= SearchSysCacheCopy1(PUBLICATIONOID
, ObjectIdGetDatum(subid
));
2035 if (!HeapTupleIsValid(tup
))
2037 (errcode(ERRCODE_UNDEFINED_OBJECT
),
2038 errmsg("publication with OID %u does not exist", subid
)));
2040 AlterPublicationOwner_internal(rel
, tup
, newOwnerId
);
2042 heap_freetuple(tup
);
2044 table_close(rel
, RowExclusiveLock
);