Move routines to manipulate WAL into PostgreSQL::Test::Cluster
[pgsql.git] / src / backend / commands / publicationcmds.c
blob35747b3df5f2d9942d734a7bb7dae5b8b918068f
1 /*-------------------------------------------------------------------------
3 * publicationcmds.c
4 * publication manipulation
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
9 * IDENTIFICATION
10 * src/backend/commands/publicationcmds.c
12 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/htup_details.h"
18 #include "access/table.h"
19 #include "access/xact.h"
20 #include "catalog/catalog.h"
21 #include "catalog/indexing.h"
22 #include "catalog/namespace.h"
23 #include "catalog/objectaccess.h"
24 #include "catalog/objectaddress.h"
25 #include "catalog/pg_database.h"
26 #include "catalog/pg_inherits.h"
27 #include "catalog/pg_namespace.h"
28 #include "catalog/pg_proc.h"
29 #include "catalog/pg_publication.h"
30 #include "catalog/pg_publication_namespace.h"
31 #include "catalog/pg_publication_rel.h"
32 #include "commands/dbcommands.h"
33 #include "commands/defrem.h"
34 #include "commands/event_trigger.h"
35 #include "commands/publicationcmds.h"
36 #include "miscadmin.h"
37 #include "nodes/nodeFuncs.h"
38 #include "parser/parse_clause.h"
39 #include "parser/parse_collate.h"
40 #include "parser/parse_relation.h"
41 #include "storage/lmgr.h"
42 #include "utils/acl.h"
43 #include "utils/builtins.h"
44 #include "utils/inval.h"
45 #include "utils/lsyscache.h"
46 #include "utils/rel.h"
47 #include "utils/syscache.h"
48 #include "utils/varlena.h"
52 * Information used to validate the columns in the row filter expression. See
53 * contain_invalid_rfcolumn_walker for details.
55 typedef struct rf_context
57 Bitmapset *bms_replident; /* bitset of replica identity columns */
58 bool pubviaroot; /* true if we are validating the parent
59 * relation's row filter */
60 Oid relid; /* relid of the relation */
61 Oid parentid; /* relid of the parent relation */
62 } rf_context;
64 static List *OpenTableList(List *tables);
65 static void CloseTableList(List *rels);
66 static void LockSchemaList(List *schemalist);
67 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
68 AlterPublicationStmt *stmt);
69 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70 static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
71 AlterPublicationStmt *stmt);
72 static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
75 static void
76 parse_publication_options(ParseState *pstate,
77 List *options,
78 bool *publish_given,
79 PublicationActions *pubactions,
80 bool *publish_via_partition_root_given,
81 bool *publish_via_partition_root,
82 bool *publish_generated_columns_given,
83 bool *publish_generated_columns)
85 ListCell *lc;
87 *publish_given = false;
88 *publish_via_partition_root_given = false;
89 *publish_generated_columns_given = false;
91 /* defaults */
92 pubactions->pubinsert = true;
93 pubactions->pubupdate = true;
94 pubactions->pubdelete = true;
95 pubactions->pubtruncate = true;
96 *publish_via_partition_root = false;
97 *publish_generated_columns = false;
99 /* Parse options */
100 foreach(lc, options)
102 DefElem *defel = (DefElem *) lfirst(lc);
104 if (strcmp(defel->defname, "publish") == 0)
106 char *publish;
107 List *publish_list;
108 ListCell *lc2;
110 if (*publish_given)
111 errorConflictingDefElem(defel, pstate);
114 * If publish option was given only the explicitly listed actions
115 * should be published.
117 pubactions->pubinsert = false;
118 pubactions->pubupdate = false;
119 pubactions->pubdelete = false;
120 pubactions->pubtruncate = false;
122 *publish_given = true;
123 publish = defGetString(defel);
125 if (!SplitIdentifierString(publish, ',', &publish_list))
126 ereport(ERROR,
127 (errcode(ERRCODE_SYNTAX_ERROR),
128 errmsg("invalid list syntax in parameter \"%s\"",
129 "publish")));
131 /* Process the option list. */
132 foreach(lc2, publish_list)
134 char *publish_opt = (char *) lfirst(lc2);
136 if (strcmp(publish_opt, "insert") == 0)
137 pubactions->pubinsert = true;
138 else if (strcmp(publish_opt, "update") == 0)
139 pubactions->pubupdate = true;
140 else if (strcmp(publish_opt, "delete") == 0)
141 pubactions->pubdelete = true;
142 else if (strcmp(publish_opt, "truncate") == 0)
143 pubactions->pubtruncate = true;
144 else
145 ereport(ERROR,
146 (errcode(ERRCODE_SYNTAX_ERROR),
147 errmsg("unrecognized value for publication option \"%s\": \"%s\"",
148 "publish", publish_opt)));
151 else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
153 if (*publish_via_partition_root_given)
154 errorConflictingDefElem(defel, pstate);
155 *publish_via_partition_root_given = true;
156 *publish_via_partition_root = defGetBoolean(defel);
158 else if (strcmp(defel->defname, "publish_generated_columns") == 0)
160 if (*publish_generated_columns_given)
161 errorConflictingDefElem(defel, pstate);
162 *publish_generated_columns_given = true;
163 *publish_generated_columns = defGetBoolean(defel);
165 else
166 ereport(ERROR,
167 (errcode(ERRCODE_SYNTAX_ERROR),
168 errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
173 * Convert the PublicationObjSpecType list into schema oid list and
174 * PublicationTable list.
176 static void
177 ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
178 List **rels, List **schemas)
180 ListCell *cell;
181 PublicationObjSpec *pubobj;
183 if (!pubobjspec_list)
184 return;
186 foreach(cell, pubobjspec_list)
188 Oid schemaid;
189 List *search_path;
191 pubobj = (PublicationObjSpec *) lfirst(cell);
193 switch (pubobj->pubobjtype)
195 case PUBLICATIONOBJ_TABLE:
196 *rels = lappend(*rels, pubobj->pubtable);
197 break;
198 case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
199 schemaid = get_namespace_oid(pubobj->name, false);
201 /* Filter out duplicates if user specifies "sch1, sch1" */
202 *schemas = list_append_unique_oid(*schemas, schemaid);
203 break;
204 case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
205 search_path = fetch_search_path(false);
206 if (search_path == NIL) /* nothing valid in search_path? */
207 ereport(ERROR,
208 errcode(ERRCODE_UNDEFINED_SCHEMA),
209 errmsg("no schema has been selected for CURRENT_SCHEMA"));
211 schemaid = linitial_oid(search_path);
212 list_free(search_path);
214 /* Filter out duplicates if user specifies "sch1, sch1" */
215 *schemas = list_append_unique_oid(*schemas, schemaid);
216 break;
217 default:
218 /* shouldn't happen */
219 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
220 break;
226 * Returns true if any of the columns used in the row filter WHERE expression is
227 * not part of REPLICA IDENTITY, false otherwise.
229 static bool
230 contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
232 if (node == NULL)
233 return false;
235 if (IsA(node, Var))
237 Var *var = (Var *) node;
238 AttrNumber attnum = var->varattno;
241 * If pubviaroot is true, we are validating the row filter of the
242 * parent table, but the bitmap contains the replica identity
243 * information of the child table. So, get the column number of the
244 * child table as parent and child column order could be different.
246 if (context->pubviaroot)
248 char *colname = get_attname(context->parentid, attnum, false);
250 attnum = get_attnum(context->relid, colname);
253 if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
254 context->bms_replident))
255 return true;
258 return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
259 context);
263 * Check if all columns referenced in the filter expression are part of the
264 * REPLICA IDENTITY index or not.
266 * Returns true if any invalid column is found.
268 bool
269 pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
270 bool pubviaroot)
272 HeapTuple rftuple;
273 Oid relid = RelationGetRelid(relation);
274 Oid publish_as_relid = RelationGetRelid(relation);
275 bool result = false;
276 Datum rfdatum;
277 bool rfisnull;
280 * FULL means all columns are in the REPLICA IDENTITY, so all columns are
281 * allowed in the row filter and we can skip the validation.
283 if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
284 return false;
287 * For a partition, if pubviaroot is true, find the topmost ancestor that
288 * is published via this publication as we need to use its row filter
289 * expression to filter the partition's changes.
291 * Note that even though the row filter used is for an ancestor, the
292 * REPLICA IDENTITY used will be for the actual child table.
294 if (pubviaroot && relation->rd_rel->relispartition)
296 publish_as_relid
297 = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
299 if (!OidIsValid(publish_as_relid))
300 publish_as_relid = relid;
303 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
304 ObjectIdGetDatum(publish_as_relid),
305 ObjectIdGetDatum(pubid));
307 if (!HeapTupleIsValid(rftuple))
308 return false;
310 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
311 Anum_pg_publication_rel_prqual,
312 &rfisnull);
314 if (!rfisnull)
316 rf_context context = {0};
317 Node *rfnode;
318 Bitmapset *bms = NULL;
320 context.pubviaroot = pubviaroot;
321 context.parentid = publish_as_relid;
322 context.relid = relid;
324 /* Remember columns that are part of the REPLICA IDENTITY */
325 bms = RelationGetIndexAttrBitmap(relation,
326 INDEX_ATTR_BITMAP_IDENTITY_KEY);
328 context.bms_replident = bms;
329 rfnode = stringToNode(TextDatumGetCString(rfdatum));
330 result = contain_invalid_rfcolumn_walker(rfnode, &context);
333 ReleaseSysCache(rftuple);
335 return result;
339 * Check for invalid columns in the publication table definition.
341 * This function evaluates two conditions:
343 * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
344 * by the column list. If any column is missing, *invalid_column_list is set
345 * to true.
346 * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
347 * are published either by listing them in the column list or by enabling
348 * publish_generated_columns option. If any unpublished generated column is
349 * found, *invalid_gen_col is set to true.
351 * Returns true if any of the above conditions are not met.
353 bool
354 pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
355 bool pubviaroot, bool pubgencols,
356 bool *invalid_column_list,
357 bool *invalid_gen_col)
359 Oid relid = RelationGetRelid(relation);
360 Oid publish_as_relid = RelationGetRelid(relation);
361 Bitmapset *idattrs;
362 Bitmapset *columns = NULL;
363 TupleDesc desc = RelationGetDescr(relation);
364 Publication *pub;
365 int x;
367 *invalid_column_list = false;
368 *invalid_gen_col = false;
371 * For a partition, if pubviaroot is true, find the topmost ancestor that
372 * is published via this publication as we need to use its column list for
373 * the changes.
375 * Note that even though the column list used is for an ancestor, the
376 * REPLICA IDENTITY used will be for the actual child table.
378 if (pubviaroot && relation->rd_rel->relispartition)
380 publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
382 if (!OidIsValid(publish_as_relid))
383 publish_as_relid = relid;
386 /* Fetch the column list */
387 pub = GetPublication(pubid);
388 check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
390 if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
392 /* With REPLICA IDENTITY FULL, no column list is allowed. */
393 *invalid_column_list = (columns != NULL);
396 * As we don't allow a column list with REPLICA IDENTITY FULL, the
397 * publish_generated_columns option must be set to true if the table
398 * has any stored generated columns.
400 if (!pubgencols &&
401 relation->rd_att->constr &&
402 relation->rd_att->constr->has_generated_stored)
403 *invalid_gen_col = true;
405 if (*invalid_gen_col && *invalid_column_list)
406 return true;
409 /* Remember columns that are part of the REPLICA IDENTITY */
410 idattrs = RelationGetIndexAttrBitmap(relation,
411 INDEX_ATTR_BITMAP_IDENTITY_KEY);
414 * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
415 * (to handle system columns the usual way), while column list does not
416 * use offset, so we can't do bms_is_subset(). Instead, we have to loop
417 * over the idattrs and check all of them are in the list.
419 x = -1;
420 while ((x = bms_next_member(idattrs, x)) >= 0)
422 AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
423 Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
425 if (columns == NULL)
428 * The publish_generated_columns option must be set to true if the
429 * REPLICA IDENTITY contains any stored generated column.
431 if (!pubgencols && att->attgenerated)
433 *invalid_gen_col = true;
434 break;
437 /* Skip validating the column list since it is not defined */
438 continue;
442 * If pubviaroot is true, we are validating the column list of the
443 * parent table, but the bitmap contains the replica identity
444 * information of the child table. The parent/child attnums may not
445 * match, so translate them to the parent - get the attname from the
446 * child, and look it up in the parent.
448 if (pubviaroot)
450 /* attribute name in the child table */
451 char *colname = get_attname(relid, attnum, false);
454 * Determine the attnum for the attribute name in parent (we are
455 * using the column list defined on the parent).
457 attnum = get_attnum(publish_as_relid, colname);
460 /* replica identity column, not covered by the column list */
461 *invalid_column_list |= !bms_is_member(attnum, columns);
463 if (*invalid_column_list && *invalid_gen_col)
464 break;
467 bms_free(columns);
468 bms_free(idattrs);
470 return *invalid_column_list || *invalid_gen_col;
473 /* check_functions_in_node callback */
474 static bool
475 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
477 return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
478 func_id >= FirstNormalObjectId);
482 * The row filter walker checks if the row filter expression is a "simple
483 * expression".
485 * It allows only simple or compound expressions such as:
486 * - (Var Op Const)
487 * - (Var Op Var)
488 * - (Var Op Const) AND/OR (Var Op Const)
489 * - etc
490 * (where Var is a column of the table this filter belongs to)
492 * The simple expression has the following restrictions:
493 * - User-defined operators are not allowed;
494 * - User-defined functions are not allowed;
495 * - User-defined types are not allowed;
496 * - User-defined collations are not allowed;
497 * - Non-immutable built-in functions are not allowed;
498 * - System columns are not allowed.
500 * NOTES
502 * We don't allow user-defined functions/operators/types/collations because
503 * (a) if a user drops a user-defined object used in a row filter expression or
504 * if there is any other error while using it, the logical decoding
505 * infrastructure won't be able to recover from such an error even if the
506 * object is recreated again because a historic snapshot is used to evaluate
507 * the row filter;
508 * (b) a user-defined function can be used to access tables that could have
509 * unpleasant results because a historic snapshot is used. That's why only
510 * immutable built-in functions are allowed in row filter expressions.
512 * We don't allow system columns because currently, we don't have that
513 * information in the tuple passed to downstream. Also, as we don't replicate
514 * those to subscribers, there doesn't seem to be a need for a filter on those
515 * columns.
517 * We can allow other node types after more analysis and testing.
519 static bool
520 check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
522 char *errdetail_msg = NULL;
524 if (node == NULL)
525 return false;
527 switch (nodeTag(node))
529 case T_Var:
530 /* System columns are not allowed. */
531 if (((Var *) node)->varattno < InvalidAttrNumber)
532 errdetail_msg = _("System columns are not allowed.");
533 break;
534 case T_OpExpr:
535 case T_DistinctExpr:
536 case T_NullIfExpr:
537 /* OK, except user-defined operators are not allowed. */
538 if (((OpExpr *) node)->opno >= FirstNormalObjectId)
539 errdetail_msg = _("User-defined operators are not allowed.");
540 break;
541 case T_ScalarArrayOpExpr:
542 /* OK, except user-defined operators are not allowed. */
543 if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
544 errdetail_msg = _("User-defined operators are not allowed.");
547 * We don't need to check the hashfuncid and negfuncid of
548 * ScalarArrayOpExpr as those functions are only built for a
549 * subquery.
551 break;
552 case T_RowCompareExpr:
554 ListCell *opid;
556 /* OK, except user-defined operators are not allowed. */
557 foreach(opid, ((RowCompareExpr *) node)->opnos)
559 if (lfirst_oid(opid) >= FirstNormalObjectId)
561 errdetail_msg = _("User-defined operators are not allowed.");
562 break;
566 break;
567 case T_Const:
568 case T_FuncExpr:
569 case T_BoolExpr:
570 case T_RelabelType:
571 case T_CollateExpr:
572 case T_CaseExpr:
573 case T_CaseTestExpr:
574 case T_ArrayExpr:
575 case T_RowExpr:
576 case T_CoalesceExpr:
577 case T_MinMaxExpr:
578 case T_XmlExpr:
579 case T_NullTest:
580 case T_BooleanTest:
581 case T_List:
582 /* OK, supported */
583 break;
584 default:
585 errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
586 break;
590 * For all the supported nodes, if we haven't already found a problem,
591 * check the types, functions, and collations used in it. We check List
592 * by walking through each element.
594 if (!errdetail_msg && !IsA(node, List))
596 if (exprType(node) >= FirstNormalObjectId)
597 errdetail_msg = _("User-defined types are not allowed.");
598 else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
599 pstate))
600 errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
601 else if (exprCollation(node) >= FirstNormalObjectId ||
602 exprInputCollation(node) >= FirstNormalObjectId)
603 errdetail_msg = _("User-defined collations are not allowed.");
607 * If we found a problem in this node, throw error now. Otherwise keep
608 * going.
610 if (errdetail_msg)
611 ereport(ERROR,
612 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
613 errmsg("invalid publication WHERE expression"),
614 errdetail_internal("%s", errdetail_msg),
615 parser_errposition(pstate, exprLocation(node))));
617 return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
618 pstate);
622 * Check if the row filter expression is a "simple expression".
624 * See check_simple_rowfilter_expr_walker for details.
626 static bool
627 check_simple_rowfilter_expr(Node *node, ParseState *pstate)
629 return check_simple_rowfilter_expr_walker(node, pstate);
633 * Transform the publication WHERE expression for all the relations in the list,
634 * ensuring it is coerced to boolean and necessary collation information is
635 * added if required, and add a new nsitem/RTE for the associated relation to
636 * the ParseState's namespace list.
638 * Also check the publication row filter expression and throw an error if
639 * anything not permitted or unexpected is encountered.
641 static void
642 TransformPubWhereClauses(List *tables, const char *queryString,
643 bool pubviaroot)
645 ListCell *lc;
647 foreach(lc, tables)
649 ParseNamespaceItem *nsitem;
650 Node *whereclause = NULL;
651 ParseState *pstate;
652 PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
654 if (pri->whereClause == NULL)
655 continue;
658 * If the publication doesn't publish changes via the root partitioned
659 * table, the partition's row filter will be used. So disallow using
660 * WHERE clause on partitioned table in this case.
662 if (!pubviaroot &&
663 pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
664 ereport(ERROR,
665 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
666 errmsg("cannot use publication WHERE clause for relation \"%s\"",
667 RelationGetRelationName(pri->relation)),
668 errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
669 "publish_via_partition_root")));
672 * A fresh pstate is required so that we only have "this" table in its
673 * rangetable
675 pstate = make_parsestate(NULL);
676 pstate->p_sourcetext = queryString;
677 nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
678 AccessShareLock, NULL,
679 false, false);
680 addNSItemToQuery(pstate, nsitem, false, true, true);
682 whereclause = transformWhereClause(pstate,
683 copyObject(pri->whereClause),
684 EXPR_KIND_WHERE,
685 "PUBLICATION WHERE");
687 /* Fix up collation information */
688 assign_expr_collations(pstate, whereclause);
691 * We allow only simple expressions in row filters. See
692 * check_simple_rowfilter_expr_walker.
694 check_simple_rowfilter_expr(whereclause, pstate);
696 free_parsestate(pstate);
698 pri->whereClause = whereclause;
704 * Given a list of tables that are going to be added to a publication,
705 * verify that they fulfill the necessary preconditions, namely: no tables
706 * have a column list if any schema is published; and partitioned tables do
707 * not have column lists if publish_via_partition_root is not set.
709 * 'publish_schema' indicates that the publication contains any TABLES IN
710 * SCHEMA elements (newly added in this command, or preexisting).
711 * 'pubviaroot' is the value of publish_via_partition_root.
713 static void
714 CheckPubRelationColumnList(char *pubname, List *tables,
715 bool publish_schema, bool pubviaroot)
717 ListCell *lc;
719 foreach(lc, tables)
721 PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
723 if (pri->columns == NIL)
724 continue;
727 * Disallow specifying column list if any schema is in the
728 * publication.
730 * XXX We could instead just forbid the case when the publication
731 * tries to publish the table with a column list and a schema for that
732 * table. However, if we do that then we need a restriction during
733 * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
734 * seem to be a good idea.
736 if (publish_schema)
737 ereport(ERROR,
738 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
739 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
740 get_namespace_name(RelationGetNamespace(pri->relation)),
741 RelationGetRelationName(pri->relation), pubname),
742 errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
745 * If the publication doesn't publish changes via the root partitioned
746 * table, the partition's column list will be used. So disallow using
747 * a column list on the partitioned table in this case.
749 if (!pubviaroot &&
750 pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
751 ereport(ERROR,
752 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
753 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
754 get_namespace_name(RelationGetNamespace(pri->relation)),
755 RelationGetRelationName(pri->relation), pubname),
756 errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
757 "publish_via_partition_root")));
762 * Create new publication.
764 ObjectAddress
765 CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
767 Relation rel;
768 ObjectAddress myself;
769 Oid puboid;
770 bool nulls[Natts_pg_publication];
771 Datum values[Natts_pg_publication];
772 HeapTuple tup;
773 bool publish_given;
774 PublicationActions pubactions;
775 bool publish_via_partition_root_given;
776 bool publish_via_partition_root;
777 bool publish_generated_columns_given;
778 bool publish_generated_columns;
779 AclResult aclresult;
780 List *relations = NIL;
781 List *schemaidlist = NIL;
783 /* must have CREATE privilege on database */
784 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
785 if (aclresult != ACLCHECK_OK)
786 aclcheck_error(aclresult, OBJECT_DATABASE,
787 get_database_name(MyDatabaseId));
789 /* FOR ALL TABLES requires superuser */
790 if (stmt->for_all_tables && !superuser())
791 ereport(ERROR,
792 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
793 errmsg("must be superuser to create FOR ALL TABLES publication")));
795 rel = table_open(PublicationRelationId, RowExclusiveLock);
797 /* Check if name is used */
798 puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
799 CStringGetDatum(stmt->pubname));
800 if (OidIsValid(puboid))
801 ereport(ERROR,
802 (errcode(ERRCODE_DUPLICATE_OBJECT),
803 errmsg("publication \"%s\" already exists",
804 stmt->pubname)));
806 /* Form a tuple. */
807 memset(values, 0, sizeof(values));
808 memset(nulls, false, sizeof(nulls));
810 values[Anum_pg_publication_pubname - 1] =
811 DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
812 values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
814 parse_publication_options(pstate,
815 stmt->options,
816 &publish_given, &pubactions,
817 &publish_via_partition_root_given,
818 &publish_via_partition_root,
819 &publish_generated_columns_given,
820 &publish_generated_columns);
822 puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
823 Anum_pg_publication_oid);
824 values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
825 values[Anum_pg_publication_puballtables - 1] =
826 BoolGetDatum(stmt->for_all_tables);
827 values[Anum_pg_publication_pubinsert - 1] =
828 BoolGetDatum(pubactions.pubinsert);
829 values[Anum_pg_publication_pubupdate - 1] =
830 BoolGetDatum(pubactions.pubupdate);
831 values[Anum_pg_publication_pubdelete - 1] =
832 BoolGetDatum(pubactions.pubdelete);
833 values[Anum_pg_publication_pubtruncate - 1] =
834 BoolGetDatum(pubactions.pubtruncate);
835 values[Anum_pg_publication_pubviaroot - 1] =
836 BoolGetDatum(publish_via_partition_root);
837 values[Anum_pg_publication_pubgencols - 1] =
838 BoolGetDatum(publish_generated_columns);
840 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
842 /* Insert tuple into catalog. */
843 CatalogTupleInsert(rel, tup);
844 heap_freetuple(tup);
846 recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
848 ObjectAddressSet(myself, PublicationRelationId, puboid);
850 /* Make the changes visible. */
851 CommandCounterIncrement();
853 /* Associate objects with the publication. */
854 if (stmt->for_all_tables)
856 /* Invalidate relcache so that publication info is rebuilt. */
857 CacheInvalidateRelcacheAll();
859 else
861 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
862 &schemaidlist);
864 /* FOR TABLES IN SCHEMA requires superuser */
865 if (schemaidlist != NIL && !superuser())
866 ereport(ERROR,
867 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
868 errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
870 if (relations != NIL)
872 List *rels;
874 rels = OpenTableList(relations);
875 TransformPubWhereClauses(rels, pstate->p_sourcetext,
876 publish_via_partition_root);
878 CheckPubRelationColumnList(stmt->pubname, rels,
879 schemaidlist != NIL,
880 publish_via_partition_root);
882 PublicationAddTables(puboid, rels, true, NULL);
883 CloseTableList(rels);
886 if (schemaidlist != NIL)
889 * Schema lock is held until the publication is created to prevent
890 * concurrent schema deletion.
892 LockSchemaList(schemaidlist);
893 PublicationAddSchemas(puboid, schemaidlist, true, NULL);
897 table_close(rel, RowExclusiveLock);
899 InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
901 if (wal_level != WAL_LEVEL_LOGICAL)
902 ereport(WARNING,
903 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
904 errmsg("\"wal_level\" is insufficient to publish logical changes"),
905 errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
907 return myself;
911 * Change options of a publication.
913 static void
914 AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
915 Relation rel, HeapTuple tup)
917 bool nulls[Natts_pg_publication];
918 bool replaces[Natts_pg_publication];
919 Datum values[Natts_pg_publication];
920 bool publish_given;
921 PublicationActions pubactions;
922 bool publish_via_partition_root_given;
923 bool publish_via_partition_root;
924 bool publish_generated_columns_given;
925 bool publish_generated_columns;
926 ObjectAddress obj;
927 Form_pg_publication pubform;
928 List *root_relids = NIL;
929 ListCell *lc;
931 parse_publication_options(pstate,
932 stmt->options,
933 &publish_given, &pubactions,
934 &publish_via_partition_root_given,
935 &publish_via_partition_root,
936 &publish_generated_columns_given,
937 &publish_generated_columns);
939 pubform = (Form_pg_publication) GETSTRUCT(tup);
942 * If the publication doesn't publish changes via the root partitioned
943 * table, the partition's row filter and column list will be used. So
944 * disallow using WHERE clause and column lists on partitioned table in
945 * this case.
947 if (!pubform->puballtables && publish_via_partition_root_given &&
948 !publish_via_partition_root)
951 * Lock the publication so nobody else can do anything with it. This
952 * prevents concurrent alter to add partitioned table(s) with WHERE
953 * clause(s) and/or column lists which we don't allow when not
954 * publishing via root.
956 LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
957 AccessShareLock);
959 root_relids = GetPublicationRelations(pubform->oid,
960 PUBLICATION_PART_ROOT);
962 foreach(lc, root_relids)
964 Oid relid = lfirst_oid(lc);
965 HeapTuple rftuple;
966 char relkind;
967 char *relname;
968 bool has_rowfilter;
969 bool has_collist;
972 * Beware: we don't have lock on the relations, so cope silently
973 * with the cache lookups returning NULL.
976 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
977 ObjectIdGetDatum(relid),
978 ObjectIdGetDatum(pubform->oid));
979 if (!HeapTupleIsValid(rftuple))
980 continue;
981 has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
982 has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
983 if (!has_rowfilter && !has_collist)
985 ReleaseSysCache(rftuple);
986 continue;
989 relkind = get_rel_relkind(relid);
990 if (relkind != RELKIND_PARTITIONED_TABLE)
992 ReleaseSysCache(rftuple);
993 continue;
995 relname = get_rel_name(relid);
996 if (relname == NULL) /* table concurrently dropped */
998 ReleaseSysCache(rftuple);
999 continue;
1002 if (has_rowfilter)
1003 ereport(ERROR,
1004 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1005 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1006 "publish_via_partition_root",
1007 stmt->pubname),
1008 errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1009 relname, "publish_via_partition_root")));
1010 Assert(has_collist);
1011 ereport(ERROR,
1012 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1013 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1014 "publish_via_partition_root",
1015 stmt->pubname),
1016 errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1017 relname, "publish_via_partition_root")));
1021 /* Everything ok, form a new tuple. */
1022 memset(values, 0, sizeof(values));
1023 memset(nulls, false, sizeof(nulls));
1024 memset(replaces, false, sizeof(replaces));
1026 if (publish_given)
1028 values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1029 replaces[Anum_pg_publication_pubinsert - 1] = true;
1031 values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1032 replaces[Anum_pg_publication_pubupdate - 1] = true;
1034 values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1035 replaces[Anum_pg_publication_pubdelete - 1] = true;
1037 values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1038 replaces[Anum_pg_publication_pubtruncate - 1] = true;
1041 if (publish_via_partition_root_given)
1043 values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1044 replaces[Anum_pg_publication_pubviaroot - 1] = true;
1047 if (publish_generated_columns_given)
1049 values[Anum_pg_publication_pubgencols - 1] = BoolGetDatum(publish_generated_columns);
1050 replaces[Anum_pg_publication_pubgencols - 1] = true;
1053 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1054 replaces);
1056 /* Update the catalog. */
1057 CatalogTupleUpdate(rel, &tup->t_self, tup);
1059 CommandCounterIncrement();
1061 pubform = (Form_pg_publication) GETSTRUCT(tup);
1063 /* Invalidate the relcache. */
1064 if (pubform->puballtables)
1066 CacheInvalidateRelcacheAll();
1068 else
1070 List *relids = NIL;
1071 List *schemarelids = NIL;
1074 * For any partitioned tables contained in the publication, we must
1075 * invalidate all partitions contained in the respective partition
1076 * trees, not just those explicitly mentioned in the publication.
1078 if (root_relids == NIL)
1079 relids = GetPublicationRelations(pubform->oid,
1080 PUBLICATION_PART_ALL);
1081 else
1084 * We already got tables explicitly mentioned in the publication.
1085 * Now get all partitions for the partitioned table in the list.
1087 foreach(lc, root_relids)
1088 relids = GetPubPartitionOptionRelations(relids,
1089 PUBLICATION_PART_ALL,
1090 lfirst_oid(lc));
1093 schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1094 PUBLICATION_PART_ALL);
1095 relids = list_concat_unique_oid(relids, schemarelids);
1097 InvalidatePublicationRels(relids);
1100 ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1101 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1102 (Node *) stmt);
1104 InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1108 * Invalidate the relations.
1110 void
1111 InvalidatePublicationRels(List *relids)
1114 * We don't want to send too many individual messages, at some point it's
1115 * cheaper to just reset whole relcache.
1117 if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1119 ListCell *lc;
1121 foreach(lc, relids)
1122 CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1124 else
1125 CacheInvalidateRelcacheAll();
1129 * Add or remove table to/from publication.
1131 static void
1132 AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1133 List *tables, const char *queryString,
1134 bool publish_schema)
1136 List *rels = NIL;
1137 Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1138 Oid pubid = pubform->oid;
1141 * Nothing to do if no objects, except in SET: for that it is quite
1142 * possible that user has not specified any tables in which case we need
1143 * to remove all the existing tables.
1145 if (!tables && stmt->action != AP_SetObjects)
1146 return;
1148 rels = OpenTableList(tables);
1150 if (stmt->action == AP_AddObjects)
1152 TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1154 publish_schema |= is_schema_publication(pubid);
1156 CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1157 pubform->pubviaroot);
1159 PublicationAddTables(pubid, rels, false, stmt);
1161 else if (stmt->action == AP_DropObjects)
1162 PublicationDropTables(pubid, rels, false);
1163 else /* AP_SetObjects */
1165 List *oldrelids = GetPublicationRelations(pubid,
1166 PUBLICATION_PART_ROOT);
1167 List *delrels = NIL;
1168 ListCell *oldlc;
1170 TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1172 CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1173 pubform->pubviaroot);
1176 * To recreate the relation list for the publication, look for
1177 * existing relations that do not need to be dropped.
1179 foreach(oldlc, oldrelids)
1181 Oid oldrelid = lfirst_oid(oldlc);
1182 ListCell *newlc;
1183 PublicationRelInfo *oldrel;
1184 bool found = false;
1185 HeapTuple rftuple;
1186 Node *oldrelwhereclause = NULL;
1187 Bitmapset *oldcolumns = NULL;
1189 /* look up the cache for the old relmap */
1190 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1191 ObjectIdGetDatum(oldrelid),
1192 ObjectIdGetDatum(pubid));
1195 * See if the existing relation currently has a WHERE clause or a
1196 * column list. We need to compare those too.
1198 if (HeapTupleIsValid(rftuple))
1200 bool isnull = true;
1201 Datum whereClauseDatum;
1202 Datum columnListDatum;
1204 /* Load the WHERE clause for this table. */
1205 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1206 Anum_pg_publication_rel_prqual,
1207 &isnull);
1208 if (!isnull)
1209 oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1211 /* Transform the int2vector column list to a bitmap. */
1212 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1213 Anum_pg_publication_rel_prattrs,
1214 &isnull);
1216 if (!isnull)
1217 oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1219 ReleaseSysCache(rftuple);
1222 foreach(newlc, rels)
1224 PublicationRelInfo *newpubrel;
1225 Oid newrelid;
1226 Bitmapset *newcolumns = NULL;
1228 newpubrel = (PublicationRelInfo *) lfirst(newlc);
1229 newrelid = RelationGetRelid(newpubrel->relation);
1232 * Validate the column list. If the column list or WHERE
1233 * clause changes, then the validation done here will be
1234 * duplicated inside PublicationAddTables(). The validation
1235 * is cheap enough that that seems harmless.
1237 newcolumns = pub_collist_validate(newpubrel->relation,
1238 newpubrel->columns);
1241 * Check if any of the new set of relations matches with the
1242 * existing relations in the publication. Additionally, if the
1243 * relation has an associated WHERE clause, check the WHERE
1244 * expressions also match. Same for the column list. Drop the
1245 * rest.
1247 if (newrelid == oldrelid)
1249 if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1250 bms_equal(oldcolumns, newcolumns))
1252 found = true;
1253 break;
1259 * Add the non-matched relations to a list so that they can be
1260 * dropped.
1262 if (!found)
1264 oldrel = palloc(sizeof(PublicationRelInfo));
1265 oldrel->whereClause = NULL;
1266 oldrel->columns = NIL;
1267 oldrel->relation = table_open(oldrelid,
1268 ShareUpdateExclusiveLock);
1269 delrels = lappend(delrels, oldrel);
1273 /* And drop them. */
1274 PublicationDropTables(pubid, delrels, true);
1277 * Don't bother calculating the difference for adding, we'll catch and
1278 * skip existing ones when doing catalog update.
1280 PublicationAddTables(pubid, rels, true, stmt);
1282 CloseTableList(delrels);
1285 CloseTableList(rels);
1289 * Alter the publication schemas.
1291 * Add or remove schemas to/from publication.
1293 static void
1294 AlterPublicationSchemas(AlterPublicationStmt *stmt,
1295 HeapTuple tup, List *schemaidlist)
1297 Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1300 * Nothing to do if no objects, except in SET: for that it is quite
1301 * possible that user has not specified any schemas in which case we need
1302 * to remove all the existing schemas.
1304 if (!schemaidlist && stmt->action != AP_SetObjects)
1305 return;
1308 * Schema lock is held until the publication is altered to prevent
1309 * concurrent schema deletion.
1311 LockSchemaList(schemaidlist);
1312 if (stmt->action == AP_AddObjects)
1314 ListCell *lc;
1315 List *reloids;
1317 reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1319 foreach(lc, reloids)
1321 HeapTuple coltuple;
1323 coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1324 ObjectIdGetDatum(lfirst_oid(lc)),
1325 ObjectIdGetDatum(pubform->oid));
1327 if (!HeapTupleIsValid(coltuple))
1328 continue;
1331 * Disallow adding schema if column list is already part of the
1332 * publication. See CheckPubRelationColumnList.
1334 if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1335 ereport(ERROR,
1336 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1337 errmsg("cannot add schema to publication \"%s\"",
1338 stmt->pubname),
1339 errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1341 ReleaseSysCache(coltuple);
1344 PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1346 else if (stmt->action == AP_DropObjects)
1347 PublicationDropSchemas(pubform->oid, schemaidlist, false);
1348 else /* AP_SetObjects */
1350 List *oldschemaids = GetPublicationSchemas(pubform->oid);
1351 List *delschemas = NIL;
1353 /* Identify which schemas should be dropped */
1354 delschemas = list_difference_oid(oldschemaids, schemaidlist);
1357 * Schema lock is held until the publication is altered to prevent
1358 * concurrent schema deletion.
1360 LockSchemaList(delschemas);
1362 /* And drop them */
1363 PublicationDropSchemas(pubform->oid, delschemas, true);
1366 * Don't bother calculating the difference for adding, we'll catch and
1367 * skip existing ones when doing catalog update.
1369 PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1374 * Check if relations and schemas can be in a given publication and throw
1375 * appropriate error if not.
1377 static void
1378 CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1379 List *tables, List *schemaidlist)
1381 Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1383 if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1384 schemaidlist && !superuser())
1385 ereport(ERROR,
1386 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1387 errmsg("must be superuser to add or set schemas")));
1390 * Check that user is allowed to manipulate the publication tables in
1391 * schema
1393 if (schemaidlist && pubform->puballtables)
1394 ereport(ERROR,
1395 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1396 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1397 NameStr(pubform->pubname)),
1398 errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1400 /* Check that user is allowed to manipulate the publication tables. */
1401 if (tables && pubform->puballtables)
1402 ereport(ERROR,
1403 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1404 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1405 NameStr(pubform->pubname)),
1406 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1410 * Alter the existing publication.
1412 * This is dispatcher function for AlterPublicationOptions,
1413 * AlterPublicationSchemas and AlterPublicationTables.
1415 void
1416 AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1418 Relation rel;
1419 HeapTuple tup;
1420 Form_pg_publication pubform;
1422 rel = table_open(PublicationRelationId, RowExclusiveLock);
1424 tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1425 CStringGetDatum(stmt->pubname));
1427 if (!HeapTupleIsValid(tup))
1428 ereport(ERROR,
1429 (errcode(ERRCODE_UNDEFINED_OBJECT),
1430 errmsg("publication \"%s\" does not exist",
1431 stmt->pubname)));
1433 pubform = (Form_pg_publication) GETSTRUCT(tup);
1435 /* must be owner */
1436 if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1437 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1438 stmt->pubname);
1440 if (stmt->options)
1441 AlterPublicationOptions(pstate, stmt, rel, tup);
1442 else
1444 List *relations = NIL;
1445 List *schemaidlist = NIL;
1446 Oid pubid = pubform->oid;
1448 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1449 &schemaidlist);
1451 CheckAlterPublication(stmt, tup, relations, schemaidlist);
1453 heap_freetuple(tup);
1455 /* Lock the publication so nobody else can do anything with it. */
1456 LockDatabaseObject(PublicationRelationId, pubid, 0,
1457 AccessExclusiveLock);
1460 * It is possible that by the time we acquire the lock on publication,
1461 * concurrent DDL has removed it. We can test this by checking the
1462 * existence of publication. We get the tuple again to avoid the risk
1463 * of any publication option getting changed.
1465 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1466 if (!HeapTupleIsValid(tup))
1467 ereport(ERROR,
1468 errcode(ERRCODE_UNDEFINED_OBJECT),
1469 errmsg("publication \"%s\" does not exist",
1470 stmt->pubname));
1472 AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1473 schemaidlist != NIL);
1474 AlterPublicationSchemas(stmt, tup, schemaidlist);
1477 /* Cleanup. */
1478 heap_freetuple(tup);
1479 table_close(rel, RowExclusiveLock);
1483 * Remove relation from publication by mapping OID.
1485 void
1486 RemovePublicationRelById(Oid proid)
1488 Relation rel;
1489 HeapTuple tup;
1490 Form_pg_publication_rel pubrel;
1491 List *relids = NIL;
1493 rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1495 tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1497 if (!HeapTupleIsValid(tup))
1498 elog(ERROR, "cache lookup failed for publication table %u",
1499 proid);
1501 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1504 * Invalidate relcache so that publication info is rebuilt.
1506 * For the partitioned tables, we must invalidate all partitions contained
1507 * in the respective partition hierarchies, not just the one explicitly
1508 * mentioned in the publication. This is required because we implicitly
1509 * publish the child tables when the parent table is published.
1511 relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1512 pubrel->prrelid);
1514 InvalidatePublicationRels(relids);
1516 CatalogTupleDelete(rel, &tup->t_self);
1518 ReleaseSysCache(tup);
1520 table_close(rel, RowExclusiveLock);
1524 * Remove the publication by mapping OID.
1526 void
1527 RemovePublicationById(Oid pubid)
1529 Relation rel;
1530 HeapTuple tup;
1531 Form_pg_publication pubform;
1533 rel = table_open(PublicationRelationId, RowExclusiveLock);
1535 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1536 if (!HeapTupleIsValid(tup))
1537 elog(ERROR, "cache lookup failed for publication %u", pubid);
1539 pubform = (Form_pg_publication) GETSTRUCT(tup);
1541 /* Invalidate relcache so that publication info is rebuilt. */
1542 if (pubform->puballtables)
1543 CacheInvalidateRelcacheAll();
1545 CatalogTupleDelete(rel, &tup->t_self);
1547 ReleaseSysCache(tup);
1549 table_close(rel, RowExclusiveLock);
1553 * Remove schema from publication by mapping OID.
1555 void
1556 RemovePublicationSchemaById(Oid psoid)
1558 Relation rel;
1559 HeapTuple tup;
1560 List *schemaRels = NIL;
1561 Form_pg_publication_namespace pubsch;
1563 rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1565 tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1567 if (!HeapTupleIsValid(tup))
1568 elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1570 pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1573 * Invalidate relcache so that publication info is rebuilt. See
1574 * RemovePublicationRelById for why we need to consider all the
1575 * partitions.
1577 schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1578 PUBLICATION_PART_ALL);
1579 InvalidatePublicationRels(schemaRels);
1581 CatalogTupleDelete(rel, &tup->t_self);
1583 ReleaseSysCache(tup);
1585 table_close(rel, RowExclusiveLock);
1589 * Open relations specified by a PublicationTable list.
1590 * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1591 * add them to a publication.
1593 static List *
1594 OpenTableList(List *tables)
1596 List *relids = NIL;
1597 List *rels = NIL;
1598 ListCell *lc;
1599 List *relids_with_rf = NIL;
1600 List *relids_with_collist = NIL;
1603 * Open, share-lock, and check all the explicitly-specified relations
1605 foreach(lc, tables)
1607 PublicationTable *t = lfirst_node(PublicationTable, lc);
1608 bool recurse = t->relation->inh;
1609 Relation rel;
1610 Oid myrelid;
1611 PublicationRelInfo *pub_rel;
1613 /* Allow query cancel in case this takes a long time */
1614 CHECK_FOR_INTERRUPTS();
1616 rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
1617 myrelid = RelationGetRelid(rel);
1620 * Filter out duplicates if user specifies "foo, foo".
1622 * Note that this algorithm is known to not be very efficient (O(N^2))
1623 * but given that it only works on list of tables given to us by user
1624 * it's deemed acceptable.
1626 if (list_member_oid(relids, myrelid))
1628 /* Disallow duplicate tables if there are any with row filters. */
1629 if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1630 ereport(ERROR,
1631 (errcode(ERRCODE_DUPLICATE_OBJECT),
1632 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1633 RelationGetRelationName(rel))));
1635 /* Disallow duplicate tables if there are any with column lists. */
1636 if (t->columns || list_member_oid(relids_with_collist, myrelid))
1637 ereport(ERROR,
1638 (errcode(ERRCODE_DUPLICATE_OBJECT),
1639 errmsg("conflicting or redundant column lists for table \"%s\"",
1640 RelationGetRelationName(rel))));
1642 table_close(rel, ShareUpdateExclusiveLock);
1643 continue;
1646 pub_rel = palloc(sizeof(PublicationRelInfo));
1647 pub_rel->relation = rel;
1648 pub_rel->whereClause = t->whereClause;
1649 pub_rel->columns = t->columns;
1650 rels = lappend(rels, pub_rel);
1651 relids = lappend_oid(relids, myrelid);
1653 if (t->whereClause)
1654 relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1656 if (t->columns)
1657 relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1660 * Add children of this rel, if requested, so that they too are added
1661 * to the publication. A partitioned table can't have any inheritance
1662 * children other than its partitions, which need not be explicitly
1663 * added to the publication.
1665 if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1667 List *children;
1668 ListCell *child;
1670 children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1671 NULL);
1673 foreach(child, children)
1675 Oid childrelid = lfirst_oid(child);
1677 /* Allow query cancel in case this takes a long time */
1678 CHECK_FOR_INTERRUPTS();
1681 * Skip duplicates if user specified both parent and child
1682 * tables.
1684 if (list_member_oid(relids, childrelid))
1687 * We don't allow to specify row filter for both parent
1688 * and child table at the same time as it is not very
1689 * clear which one should be given preference.
1691 if (childrelid != myrelid &&
1692 (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1693 ereport(ERROR,
1694 (errcode(ERRCODE_DUPLICATE_OBJECT),
1695 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1696 RelationGetRelationName(rel))));
1699 * We don't allow to specify column list for both parent
1700 * and child table at the same time as it is not very
1701 * clear which one should be given preference.
1703 if (childrelid != myrelid &&
1704 (t->columns || list_member_oid(relids_with_collist, childrelid)))
1705 ereport(ERROR,
1706 (errcode(ERRCODE_DUPLICATE_OBJECT),
1707 errmsg("conflicting or redundant column lists for table \"%s\"",
1708 RelationGetRelationName(rel))));
1710 continue;
1713 /* find_all_inheritors already got lock */
1714 rel = table_open(childrelid, NoLock);
1715 pub_rel = palloc(sizeof(PublicationRelInfo));
1716 pub_rel->relation = rel;
1717 /* child inherits WHERE clause from parent */
1718 pub_rel->whereClause = t->whereClause;
1720 /* child inherits column list from parent */
1721 pub_rel->columns = t->columns;
1722 rels = lappend(rels, pub_rel);
1723 relids = lappend_oid(relids, childrelid);
1725 if (t->whereClause)
1726 relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1728 if (t->columns)
1729 relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1734 list_free(relids);
1735 list_free(relids_with_rf);
1737 return rels;
1741 * Close all relations in the list.
1743 static void
1744 CloseTableList(List *rels)
1746 ListCell *lc;
1748 foreach(lc, rels)
1750 PublicationRelInfo *pub_rel;
1752 pub_rel = (PublicationRelInfo *) lfirst(lc);
1753 table_close(pub_rel->relation, NoLock);
1756 list_free_deep(rels);
1760 * Lock the schemas specified in the schema list in AccessShareLock mode in
1761 * order to prevent concurrent schema deletion.
1763 static void
1764 LockSchemaList(List *schemalist)
1766 ListCell *lc;
1768 foreach(lc, schemalist)
1770 Oid schemaid = lfirst_oid(lc);
1772 /* Allow query cancel in case this takes a long time */
1773 CHECK_FOR_INTERRUPTS();
1774 LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1777 * It is possible that by the time we acquire the lock on schema,
1778 * concurrent DDL has removed it. We can test this by checking the
1779 * existence of schema.
1781 if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1782 ereport(ERROR,
1783 errcode(ERRCODE_UNDEFINED_SCHEMA),
1784 errmsg("schema with OID %u does not exist", schemaid));
1789 * Add listed tables to the publication.
1791 static void
1792 PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1793 AlterPublicationStmt *stmt)
1795 ListCell *lc;
1797 Assert(!stmt || !stmt->for_all_tables);
1799 foreach(lc, rels)
1801 PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1802 Relation rel = pub_rel->relation;
1803 ObjectAddress obj;
1805 /* Must be owner of the table or superuser. */
1806 if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1807 aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
1808 RelationGetRelationName(rel));
1810 obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1811 if (stmt)
1813 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1814 (Node *) stmt);
1816 InvokeObjectPostCreateHook(PublicationRelRelationId,
1817 obj.objectId, 0);
1823 * Remove listed tables from the publication.
1825 static void
1826 PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1828 ObjectAddress obj;
1829 ListCell *lc;
1830 Oid prid;
1832 foreach(lc, rels)
1834 PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1835 Relation rel = pubrel->relation;
1836 Oid relid = RelationGetRelid(rel);
1838 if (pubrel->columns)
1839 ereport(ERROR,
1840 errcode(ERRCODE_SYNTAX_ERROR),
1841 errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1843 prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1844 ObjectIdGetDatum(relid),
1845 ObjectIdGetDatum(pubid));
1846 if (!OidIsValid(prid))
1848 if (missing_ok)
1849 continue;
1851 ereport(ERROR,
1852 (errcode(ERRCODE_UNDEFINED_OBJECT),
1853 errmsg("relation \"%s\" is not part of the publication",
1854 RelationGetRelationName(rel))));
1857 if (pubrel->whereClause)
1858 ereport(ERROR,
1859 (errcode(ERRCODE_SYNTAX_ERROR),
1860 errmsg("cannot use a WHERE clause when removing a table from a publication")));
1862 ObjectAddressSet(obj, PublicationRelRelationId, prid);
1863 performDeletion(&obj, DROP_CASCADE, 0);
1868 * Add listed schemas to the publication.
1870 static void
1871 PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1872 AlterPublicationStmt *stmt)
1874 ListCell *lc;
1876 Assert(!stmt || !stmt->for_all_tables);
1878 foreach(lc, schemas)
1880 Oid schemaid = lfirst_oid(lc);
1881 ObjectAddress obj;
1883 obj = publication_add_schema(pubid, schemaid, if_not_exists);
1884 if (stmt)
1886 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1887 (Node *) stmt);
1889 InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1890 obj.objectId, 0);
1896 * Remove listed schemas from the publication.
1898 static void
1899 PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1901 ObjectAddress obj;
1902 ListCell *lc;
1903 Oid psid;
1905 foreach(lc, schemas)
1907 Oid schemaid = lfirst_oid(lc);
1909 psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
1910 Anum_pg_publication_namespace_oid,
1911 ObjectIdGetDatum(schemaid),
1912 ObjectIdGetDatum(pubid));
1913 if (!OidIsValid(psid))
1915 if (missing_ok)
1916 continue;
1918 ereport(ERROR,
1919 (errcode(ERRCODE_UNDEFINED_OBJECT),
1920 errmsg("tables from schema \"%s\" are not part of the publication",
1921 get_namespace_name(schemaid))));
1924 ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1925 performDeletion(&obj, DROP_CASCADE, 0);
1930 * Internal workhorse for changing a publication owner
1932 static void
1933 AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1935 Form_pg_publication form;
1937 form = (Form_pg_publication) GETSTRUCT(tup);
1939 if (form->pubowner == newOwnerId)
1940 return;
1942 if (!superuser())
1944 AclResult aclresult;
1946 /* Must be owner */
1947 if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
1948 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1949 NameStr(form->pubname));
1951 /* Must be able to become new owner */
1952 check_can_set_role(GetUserId(), newOwnerId);
1954 /* New owner must have CREATE privilege on database */
1955 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
1956 if (aclresult != ACLCHECK_OK)
1957 aclcheck_error(aclresult, OBJECT_DATABASE,
1958 get_database_name(MyDatabaseId));
1960 if (form->puballtables && !superuser_arg(newOwnerId))
1961 ereport(ERROR,
1962 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1963 errmsg("permission denied to change owner of publication \"%s\"",
1964 NameStr(form->pubname)),
1965 errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1967 if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
1968 ereport(ERROR,
1969 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1970 errmsg("permission denied to change owner of publication \"%s\"",
1971 NameStr(form->pubname)),
1972 errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1975 form->pubowner = newOwnerId;
1976 CatalogTupleUpdate(rel, &tup->t_self, tup);
1978 /* Update owner dependency reference */
1979 changeDependencyOnOwner(PublicationRelationId,
1980 form->oid,
1981 newOwnerId);
1983 InvokeObjectPostAlterHook(PublicationRelationId,
1984 form->oid, 0);
1988 * Change publication owner -- by name
1990 ObjectAddress
1991 AlterPublicationOwner(const char *name, Oid newOwnerId)
1993 Oid subid;
1994 HeapTuple tup;
1995 Relation rel;
1996 ObjectAddress address;
1997 Form_pg_publication pubform;
1999 rel = table_open(PublicationRelationId, RowExclusiveLock);
2001 tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2003 if (!HeapTupleIsValid(tup))
2004 ereport(ERROR,
2005 (errcode(ERRCODE_UNDEFINED_OBJECT),
2006 errmsg("publication \"%s\" does not exist", name)));
2008 pubform = (Form_pg_publication) GETSTRUCT(tup);
2009 subid = pubform->oid;
2011 AlterPublicationOwner_internal(rel, tup, newOwnerId);
2013 ObjectAddressSet(address, PublicationRelationId, subid);
2015 heap_freetuple(tup);
2017 table_close(rel, RowExclusiveLock);
2019 return address;
2023 * Change publication owner -- by OID
2025 void
2026 AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
2028 HeapTuple tup;
2029 Relation rel;
2031 rel = table_open(PublicationRelationId, RowExclusiveLock);
2033 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
2035 if (!HeapTupleIsValid(tup))
2036 ereport(ERROR,
2037 (errcode(ERRCODE_UNDEFINED_OBJECT),
2038 errmsg("publication with OID %u does not exist", subid)));
2040 AlterPublicationOwner_internal(rel, tup, newOwnerId);
2042 heap_freetuple(tup);
2044 table_close(rel, RowExclusiveLock);