Allow non-btree speculative insertion indexes
[pgsql.git] / src / backend / commands / publicationcmds.c
blob150a768d16f4e13ff9c9553c3f27789a3aead1b6
1 /*-------------------------------------------------------------------------
3 * publicationcmds.c
4 * publication manipulation
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
9 * IDENTIFICATION
10 * src/backend/commands/publicationcmds.c
12 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/htup_details.h"
18 #include "access/table.h"
19 #include "access/xact.h"
20 #include "catalog/catalog.h"
21 #include "catalog/indexing.h"
22 #include "catalog/namespace.h"
23 #include "catalog/objectaccess.h"
24 #include "catalog/objectaddress.h"
25 #include "catalog/pg_database.h"
26 #include "catalog/pg_inherits.h"
27 #include "catalog/pg_namespace.h"
28 #include "catalog/pg_proc.h"
29 #include "catalog/pg_publication.h"
30 #include "catalog/pg_publication_namespace.h"
31 #include "catalog/pg_publication_rel.h"
32 #include "commands/dbcommands.h"
33 #include "commands/defrem.h"
34 #include "commands/event_trigger.h"
35 #include "commands/publicationcmds.h"
36 #include "miscadmin.h"
37 #include "nodes/nodeFuncs.h"
38 #include "parser/parse_clause.h"
39 #include "parser/parse_collate.h"
40 #include "parser/parse_relation.h"
41 #include "rewrite/rewriteHandler.h"
42 #include "storage/lmgr.h"
43 #include "utils/acl.h"
44 #include "utils/builtins.h"
45 #include "utils/inval.h"
46 #include "utils/lsyscache.h"
47 #include "utils/rel.h"
48 #include "utils/syscache.h"
49 #include "utils/varlena.h"
53 * Information used to validate the columns in the row filter expression. See
54 * contain_invalid_rfcolumn_walker for details.
56 typedef struct rf_context
58 Bitmapset *bms_replident; /* bitset of replica identity columns */
59 bool pubviaroot; /* true if we are validating the parent
60 * relation's row filter */
61 Oid relid; /* relid of the relation */
62 Oid parentid; /* relid of the parent relation */
63 } rf_context;
65 static List *OpenTableList(List *tables);
66 static void CloseTableList(List *rels);
67 static void LockSchemaList(List *schemalist);
68 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
69 AlterPublicationStmt *stmt);
70 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
71 static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
72 AlterPublicationStmt *stmt);
73 static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
74 static char defGetGeneratedColsOption(DefElem *def);
77 static void
78 parse_publication_options(ParseState *pstate,
79 List *options,
80 bool *publish_given,
81 PublicationActions *pubactions,
82 bool *publish_via_partition_root_given,
83 bool *publish_via_partition_root,
84 bool *publish_generated_columns_given,
85 char *publish_generated_columns)
87 ListCell *lc;
89 *publish_given = false;
90 *publish_via_partition_root_given = false;
91 *publish_generated_columns_given = false;
93 /* defaults */
94 pubactions->pubinsert = true;
95 pubactions->pubupdate = true;
96 pubactions->pubdelete = true;
97 pubactions->pubtruncate = true;
98 *publish_via_partition_root = false;
99 *publish_generated_columns = PUBLISH_GENCOLS_NONE;
101 /* Parse options */
102 foreach(lc, options)
104 DefElem *defel = (DefElem *) lfirst(lc);
106 if (strcmp(defel->defname, "publish") == 0)
108 char *publish;
109 List *publish_list;
110 ListCell *lc2;
112 if (*publish_given)
113 errorConflictingDefElem(defel, pstate);
116 * If publish option was given only the explicitly listed actions
117 * should be published.
119 pubactions->pubinsert = false;
120 pubactions->pubupdate = false;
121 pubactions->pubdelete = false;
122 pubactions->pubtruncate = false;
124 *publish_given = true;
125 publish = defGetString(defel);
127 if (!SplitIdentifierString(publish, ',', &publish_list))
128 ereport(ERROR,
129 (errcode(ERRCODE_SYNTAX_ERROR),
130 errmsg("invalid list syntax in parameter \"%s\"",
131 "publish")));
133 /* Process the option list. */
134 foreach(lc2, publish_list)
136 char *publish_opt = (char *) lfirst(lc2);
138 if (strcmp(publish_opt, "insert") == 0)
139 pubactions->pubinsert = true;
140 else if (strcmp(publish_opt, "update") == 0)
141 pubactions->pubupdate = true;
142 else if (strcmp(publish_opt, "delete") == 0)
143 pubactions->pubdelete = true;
144 else if (strcmp(publish_opt, "truncate") == 0)
145 pubactions->pubtruncate = true;
146 else
147 ereport(ERROR,
148 (errcode(ERRCODE_SYNTAX_ERROR),
149 errmsg("unrecognized value for publication option \"%s\": \"%s\"",
150 "publish", publish_opt)));
153 else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
155 if (*publish_via_partition_root_given)
156 errorConflictingDefElem(defel, pstate);
157 *publish_via_partition_root_given = true;
158 *publish_via_partition_root = defGetBoolean(defel);
160 else if (strcmp(defel->defname, "publish_generated_columns") == 0)
162 if (*publish_generated_columns_given)
163 errorConflictingDefElem(defel, pstate);
164 *publish_generated_columns_given = true;
165 *publish_generated_columns = defGetGeneratedColsOption(defel);
167 else
168 ereport(ERROR,
169 (errcode(ERRCODE_SYNTAX_ERROR),
170 errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
175 * Convert the PublicationObjSpecType list into schema oid list and
176 * PublicationTable list.
178 static void
179 ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
180 List **rels, List **schemas)
182 ListCell *cell;
183 PublicationObjSpec *pubobj;
185 if (!pubobjspec_list)
186 return;
188 foreach(cell, pubobjspec_list)
190 Oid schemaid;
191 List *search_path;
193 pubobj = (PublicationObjSpec *) lfirst(cell);
195 switch (pubobj->pubobjtype)
197 case PUBLICATIONOBJ_TABLE:
198 *rels = lappend(*rels, pubobj->pubtable);
199 break;
200 case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
201 schemaid = get_namespace_oid(pubobj->name, false);
203 /* Filter out duplicates if user specifies "sch1, sch1" */
204 *schemas = list_append_unique_oid(*schemas, schemaid);
205 break;
206 case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
207 search_path = fetch_search_path(false);
208 if (search_path == NIL) /* nothing valid in search_path? */
209 ereport(ERROR,
210 errcode(ERRCODE_UNDEFINED_SCHEMA),
211 errmsg("no schema has been selected for CURRENT_SCHEMA"));
213 schemaid = linitial_oid(search_path);
214 list_free(search_path);
216 /* Filter out duplicates if user specifies "sch1, sch1" */
217 *schemas = list_append_unique_oid(*schemas, schemaid);
218 break;
219 default:
220 /* shouldn't happen */
221 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
222 break;
228 * Returns true if any of the columns used in the row filter WHERE expression is
229 * not part of REPLICA IDENTITY, false otherwise.
231 static bool
232 contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
234 if (node == NULL)
235 return false;
237 if (IsA(node, Var))
239 Var *var = (Var *) node;
240 AttrNumber attnum = var->varattno;
243 * If pubviaroot is true, we are validating the row filter of the
244 * parent table, but the bitmap contains the replica identity
245 * information of the child table. So, get the column number of the
246 * child table as parent and child column order could be different.
248 if (context->pubviaroot)
250 char *colname = get_attname(context->parentid, attnum, false);
252 attnum = get_attnum(context->relid, colname);
255 if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
256 context->bms_replident))
257 return true;
260 return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
261 context);
265 * Check if all columns referenced in the filter expression are part of the
266 * REPLICA IDENTITY index or not.
268 * Returns true if any invalid column is found.
270 bool
271 pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
272 bool pubviaroot)
274 HeapTuple rftuple;
275 Oid relid = RelationGetRelid(relation);
276 Oid publish_as_relid = RelationGetRelid(relation);
277 bool result = false;
278 Datum rfdatum;
279 bool rfisnull;
282 * FULL means all columns are in the REPLICA IDENTITY, so all columns are
283 * allowed in the row filter and we can skip the validation.
285 if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
286 return false;
289 * For a partition, if pubviaroot is true, find the topmost ancestor that
290 * is published via this publication as we need to use its row filter
291 * expression to filter the partition's changes.
293 * Note that even though the row filter used is for an ancestor, the
294 * REPLICA IDENTITY used will be for the actual child table.
296 if (pubviaroot && relation->rd_rel->relispartition)
298 publish_as_relid
299 = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
301 if (!OidIsValid(publish_as_relid))
302 publish_as_relid = relid;
305 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
306 ObjectIdGetDatum(publish_as_relid),
307 ObjectIdGetDatum(pubid));
309 if (!HeapTupleIsValid(rftuple))
310 return false;
312 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
313 Anum_pg_publication_rel_prqual,
314 &rfisnull);
316 if (!rfisnull)
318 rf_context context = {0};
319 Node *rfnode;
320 Bitmapset *bms = NULL;
322 context.pubviaroot = pubviaroot;
323 context.parentid = publish_as_relid;
324 context.relid = relid;
326 /* Remember columns that are part of the REPLICA IDENTITY */
327 bms = RelationGetIndexAttrBitmap(relation,
328 INDEX_ATTR_BITMAP_IDENTITY_KEY);
330 context.bms_replident = bms;
331 rfnode = stringToNode(TextDatumGetCString(rfdatum));
332 result = contain_invalid_rfcolumn_walker(rfnode, &context);
335 ReleaseSysCache(rftuple);
337 return result;
341 * Check for invalid columns in the publication table definition.
343 * This function evaluates two conditions:
345 * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
346 * by the column list. If any column is missing, *invalid_column_list is set
347 * to true.
348 * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
349 * are published, either by being explicitly named in the column list or, if
350 * no column list is specified, by setting the option
351 * publish_generated_columns to stored. If any unpublished
352 * generated column is found, *invalid_gen_col is set to true.
354 * Returns true if any of the above conditions are not met.
356 bool
357 pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
358 bool pubviaroot, char pubgencols_type,
359 bool *invalid_column_list,
360 bool *invalid_gen_col)
362 Oid relid = RelationGetRelid(relation);
363 Oid publish_as_relid = RelationGetRelid(relation);
364 Bitmapset *idattrs;
365 Bitmapset *columns = NULL;
366 TupleDesc desc = RelationGetDescr(relation);
367 Publication *pub;
368 int x;
370 *invalid_column_list = false;
371 *invalid_gen_col = false;
374 * For a partition, if pubviaroot is true, find the topmost ancestor that
375 * is published via this publication as we need to use its column list for
376 * the changes.
378 * Note that even though the column list used is for an ancestor, the
379 * REPLICA IDENTITY used will be for the actual child table.
381 if (pubviaroot && relation->rd_rel->relispartition)
383 publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
385 if (!OidIsValid(publish_as_relid))
386 publish_as_relid = relid;
389 /* Fetch the column list */
390 pub = GetPublication(pubid);
391 check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
393 if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
395 /* With REPLICA IDENTITY FULL, no column list is allowed. */
396 *invalid_column_list = (columns != NULL);
399 * As we don't allow a column list with REPLICA IDENTITY FULL, the
400 * publish_generated_columns option must be set to stored if the table
401 * has any stored generated columns.
403 if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
404 relation->rd_att->constr &&
405 relation->rd_att->constr->has_generated_stored)
406 *invalid_gen_col = true;
409 * Virtual generated columns are currently not supported for logical
410 * replication at all.
412 if (relation->rd_att->constr &&
413 relation->rd_att->constr->has_generated_virtual)
414 *invalid_gen_col = true;
416 if (*invalid_gen_col && *invalid_column_list)
417 return true;
420 /* Remember columns that are part of the REPLICA IDENTITY */
421 idattrs = RelationGetIndexAttrBitmap(relation,
422 INDEX_ATTR_BITMAP_IDENTITY_KEY);
425 * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
426 * (to handle system columns the usual way), while column list does not
427 * use offset, so we can't do bms_is_subset(). Instead, we have to loop
428 * over the idattrs and check all of them are in the list.
430 x = -1;
431 while ((x = bms_next_member(idattrs, x)) >= 0)
433 AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
434 Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
436 if (columns == NULL)
439 * The publish_generated_columns option must be set to stored if
440 * the REPLICA IDENTITY contains any stored generated column.
442 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
444 *invalid_gen_col = true;
445 break;
449 * The equivalent setting for virtual generated columns does not
450 * exist yet.
452 if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
454 *invalid_gen_col = true;
455 break;
458 /* Skip validating the column list since it is not defined */
459 continue;
463 * If pubviaroot is true, we are validating the column list of the
464 * parent table, but the bitmap contains the replica identity
465 * information of the child table. The parent/child attnums may not
466 * match, so translate them to the parent - get the attname from the
467 * child, and look it up in the parent.
469 if (pubviaroot)
471 /* attribute name in the child table */
472 char *colname = get_attname(relid, attnum, false);
475 * Determine the attnum for the attribute name in parent (we are
476 * using the column list defined on the parent).
478 attnum = get_attnum(publish_as_relid, colname);
481 /* replica identity column, not covered by the column list */
482 *invalid_column_list |= !bms_is_member(attnum, columns);
484 if (*invalid_column_list && *invalid_gen_col)
485 break;
488 bms_free(columns);
489 bms_free(idattrs);
491 return *invalid_column_list || *invalid_gen_col;
494 /* check_functions_in_node callback */
495 static bool
496 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
498 return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
499 func_id >= FirstNormalObjectId);
503 * The row filter walker checks if the row filter expression is a "simple
504 * expression".
506 * It allows only simple or compound expressions such as:
507 * - (Var Op Const)
508 * - (Var Op Var)
509 * - (Var Op Const) AND/OR (Var Op Const)
510 * - etc
511 * (where Var is a column of the table this filter belongs to)
513 * The simple expression has the following restrictions:
514 * - User-defined operators are not allowed;
515 * - User-defined functions are not allowed;
516 * - User-defined types are not allowed;
517 * - User-defined collations are not allowed;
518 * - Non-immutable built-in functions are not allowed;
519 * - System columns are not allowed.
521 * NOTES
523 * We don't allow user-defined functions/operators/types/collations because
524 * (a) if a user drops a user-defined object used in a row filter expression or
525 * if there is any other error while using it, the logical decoding
526 * infrastructure won't be able to recover from such an error even if the
527 * object is recreated again because a historic snapshot is used to evaluate
528 * the row filter;
529 * (b) a user-defined function can be used to access tables that could have
530 * unpleasant results because a historic snapshot is used. That's why only
531 * immutable built-in functions are allowed in row filter expressions.
533 * We don't allow system columns because currently, we don't have that
534 * information in the tuple passed to downstream. Also, as we don't replicate
535 * those to subscribers, there doesn't seem to be a need for a filter on those
536 * columns.
538 * We can allow other node types after more analysis and testing.
540 static bool
541 check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
543 char *errdetail_msg = NULL;
545 if (node == NULL)
546 return false;
548 switch (nodeTag(node))
550 case T_Var:
551 /* System columns are not allowed. */
552 if (((Var *) node)->varattno < InvalidAttrNumber)
553 errdetail_msg = _("System columns are not allowed.");
554 break;
555 case T_OpExpr:
556 case T_DistinctExpr:
557 case T_NullIfExpr:
558 /* OK, except user-defined operators are not allowed. */
559 if (((OpExpr *) node)->opno >= FirstNormalObjectId)
560 errdetail_msg = _("User-defined operators are not allowed.");
561 break;
562 case T_ScalarArrayOpExpr:
563 /* OK, except user-defined operators are not allowed. */
564 if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
565 errdetail_msg = _("User-defined operators are not allowed.");
568 * We don't need to check the hashfuncid and negfuncid of
569 * ScalarArrayOpExpr as those functions are only built for a
570 * subquery.
572 break;
573 case T_RowCompareExpr:
575 ListCell *opid;
577 /* OK, except user-defined operators are not allowed. */
578 foreach(opid, ((RowCompareExpr *) node)->opnos)
580 if (lfirst_oid(opid) >= FirstNormalObjectId)
582 errdetail_msg = _("User-defined operators are not allowed.");
583 break;
587 break;
588 case T_Const:
589 case T_FuncExpr:
590 case T_BoolExpr:
591 case T_RelabelType:
592 case T_CollateExpr:
593 case T_CaseExpr:
594 case T_CaseTestExpr:
595 case T_ArrayExpr:
596 case T_RowExpr:
597 case T_CoalesceExpr:
598 case T_MinMaxExpr:
599 case T_XmlExpr:
600 case T_NullTest:
601 case T_BooleanTest:
602 case T_List:
603 /* OK, supported */
604 break;
605 default:
606 errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
607 break;
611 * For all the supported nodes, if we haven't already found a problem,
612 * check the types, functions, and collations used in it. We check List
613 * by walking through each element.
615 if (!errdetail_msg && !IsA(node, List))
617 if (exprType(node) >= FirstNormalObjectId)
618 errdetail_msg = _("User-defined types are not allowed.");
619 else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
620 pstate))
621 errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
622 else if (exprCollation(node) >= FirstNormalObjectId ||
623 exprInputCollation(node) >= FirstNormalObjectId)
624 errdetail_msg = _("User-defined collations are not allowed.");
628 * If we found a problem in this node, throw error now. Otherwise keep
629 * going.
631 if (errdetail_msg)
632 ereport(ERROR,
633 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
634 errmsg("invalid publication WHERE expression"),
635 errdetail_internal("%s", errdetail_msg),
636 parser_errposition(pstate, exprLocation(node))));
638 return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
639 pstate);
643 * Check if the row filter expression is a "simple expression".
645 * See check_simple_rowfilter_expr_walker for details.
647 static bool
648 check_simple_rowfilter_expr(Node *node, ParseState *pstate)
650 return check_simple_rowfilter_expr_walker(node, pstate);
654 * Transform the publication WHERE expression for all the relations in the list,
655 * ensuring it is coerced to boolean and necessary collation information is
656 * added if required, and add a new nsitem/RTE for the associated relation to
657 * the ParseState's namespace list.
659 * Also check the publication row filter expression and throw an error if
660 * anything not permitted or unexpected is encountered.
662 static void
663 TransformPubWhereClauses(List *tables, const char *queryString,
664 bool pubviaroot)
666 ListCell *lc;
668 foreach(lc, tables)
670 ParseNamespaceItem *nsitem;
671 Node *whereclause = NULL;
672 ParseState *pstate;
673 PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
675 if (pri->whereClause == NULL)
676 continue;
679 * If the publication doesn't publish changes via the root partitioned
680 * table, the partition's row filter will be used. So disallow using
681 * WHERE clause on partitioned table in this case.
683 if (!pubviaroot &&
684 pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
685 ereport(ERROR,
686 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
687 errmsg("cannot use publication WHERE clause for relation \"%s\"",
688 RelationGetRelationName(pri->relation)),
689 errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
690 "publish_via_partition_root")));
693 * A fresh pstate is required so that we only have "this" table in its
694 * rangetable
696 pstate = make_parsestate(NULL);
697 pstate->p_sourcetext = queryString;
698 nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
699 AccessShareLock, NULL,
700 false, false);
701 addNSItemToQuery(pstate, nsitem, false, true, true);
703 whereclause = transformWhereClause(pstate,
704 copyObject(pri->whereClause),
705 EXPR_KIND_WHERE,
706 "PUBLICATION WHERE");
708 /* Fix up collation information */
709 assign_expr_collations(pstate, whereclause);
711 whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
714 * We allow only simple expressions in row filters. See
715 * check_simple_rowfilter_expr_walker.
717 check_simple_rowfilter_expr(whereclause, pstate);
719 free_parsestate(pstate);
721 pri->whereClause = whereclause;
727 * Given a list of tables that are going to be added to a publication,
728 * verify that they fulfill the necessary preconditions, namely: no tables
729 * have a column list if any schema is published; and partitioned tables do
730 * not have column lists if publish_via_partition_root is not set.
732 * 'publish_schema' indicates that the publication contains any TABLES IN
733 * SCHEMA elements (newly added in this command, or preexisting).
734 * 'pubviaroot' is the value of publish_via_partition_root.
736 static void
737 CheckPubRelationColumnList(char *pubname, List *tables,
738 bool publish_schema, bool pubviaroot)
740 ListCell *lc;
742 foreach(lc, tables)
744 PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
746 if (pri->columns == NIL)
747 continue;
750 * Disallow specifying column list if any schema is in the
751 * publication.
753 * XXX We could instead just forbid the case when the publication
754 * tries to publish the table with a column list and a schema for that
755 * table. However, if we do that then we need a restriction during
756 * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
757 * seem to be a good idea.
759 if (publish_schema)
760 ereport(ERROR,
761 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
762 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
763 get_namespace_name(RelationGetNamespace(pri->relation)),
764 RelationGetRelationName(pri->relation), pubname),
765 errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
768 * If the publication doesn't publish changes via the root partitioned
769 * table, the partition's column list will be used. So disallow using
770 * a column list on the partitioned table in this case.
772 if (!pubviaroot &&
773 pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
774 ereport(ERROR,
775 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
776 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
777 get_namespace_name(RelationGetNamespace(pri->relation)),
778 RelationGetRelationName(pri->relation), pubname),
779 errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
780 "publish_via_partition_root")));
785 * Create new publication.
787 ObjectAddress
788 CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
790 Relation rel;
791 ObjectAddress myself;
792 Oid puboid;
793 bool nulls[Natts_pg_publication];
794 Datum values[Natts_pg_publication];
795 HeapTuple tup;
796 bool publish_given;
797 PublicationActions pubactions;
798 bool publish_via_partition_root_given;
799 bool publish_via_partition_root;
800 bool publish_generated_columns_given;
801 char publish_generated_columns;
802 AclResult aclresult;
803 List *relations = NIL;
804 List *schemaidlist = NIL;
806 /* must have CREATE privilege on database */
807 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
808 if (aclresult != ACLCHECK_OK)
809 aclcheck_error(aclresult, OBJECT_DATABASE,
810 get_database_name(MyDatabaseId));
812 /* FOR ALL TABLES requires superuser */
813 if (stmt->for_all_tables && !superuser())
814 ereport(ERROR,
815 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
816 errmsg("must be superuser to create FOR ALL TABLES publication")));
818 rel = table_open(PublicationRelationId, RowExclusiveLock);
820 /* Check if name is used */
821 puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
822 CStringGetDatum(stmt->pubname));
823 if (OidIsValid(puboid))
824 ereport(ERROR,
825 (errcode(ERRCODE_DUPLICATE_OBJECT),
826 errmsg("publication \"%s\" already exists",
827 stmt->pubname)));
829 /* Form a tuple. */
830 memset(values, 0, sizeof(values));
831 memset(nulls, false, sizeof(nulls));
833 values[Anum_pg_publication_pubname - 1] =
834 DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
835 values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
837 parse_publication_options(pstate,
838 stmt->options,
839 &publish_given, &pubactions,
840 &publish_via_partition_root_given,
841 &publish_via_partition_root,
842 &publish_generated_columns_given,
843 &publish_generated_columns);
845 puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
846 Anum_pg_publication_oid);
847 values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
848 values[Anum_pg_publication_puballtables - 1] =
849 BoolGetDatum(stmt->for_all_tables);
850 values[Anum_pg_publication_pubinsert - 1] =
851 BoolGetDatum(pubactions.pubinsert);
852 values[Anum_pg_publication_pubupdate - 1] =
853 BoolGetDatum(pubactions.pubupdate);
854 values[Anum_pg_publication_pubdelete - 1] =
855 BoolGetDatum(pubactions.pubdelete);
856 values[Anum_pg_publication_pubtruncate - 1] =
857 BoolGetDatum(pubactions.pubtruncate);
858 values[Anum_pg_publication_pubviaroot - 1] =
859 BoolGetDatum(publish_via_partition_root);
860 values[Anum_pg_publication_pubgencols - 1] =
861 CharGetDatum(publish_generated_columns);
863 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
865 /* Insert tuple into catalog. */
866 CatalogTupleInsert(rel, tup);
867 heap_freetuple(tup);
869 recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
871 ObjectAddressSet(myself, PublicationRelationId, puboid);
873 /* Make the changes visible. */
874 CommandCounterIncrement();
876 /* Associate objects with the publication. */
877 if (stmt->for_all_tables)
879 /* Invalidate relcache so that publication info is rebuilt. */
880 CacheInvalidateRelcacheAll();
882 else
884 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
885 &schemaidlist);
887 /* FOR TABLES IN SCHEMA requires superuser */
888 if (schemaidlist != NIL && !superuser())
889 ereport(ERROR,
890 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
891 errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
893 if (relations != NIL)
895 List *rels;
897 rels = OpenTableList(relations);
898 TransformPubWhereClauses(rels, pstate->p_sourcetext,
899 publish_via_partition_root);
901 CheckPubRelationColumnList(stmt->pubname, rels,
902 schemaidlist != NIL,
903 publish_via_partition_root);
905 PublicationAddTables(puboid, rels, true, NULL);
906 CloseTableList(rels);
909 if (schemaidlist != NIL)
912 * Schema lock is held until the publication is created to prevent
913 * concurrent schema deletion.
915 LockSchemaList(schemaidlist);
916 PublicationAddSchemas(puboid, schemaidlist, true, NULL);
920 table_close(rel, RowExclusiveLock);
922 InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
924 if (wal_level != WAL_LEVEL_LOGICAL)
925 ereport(WARNING,
926 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
927 errmsg("\"wal_level\" is insufficient to publish logical changes"),
928 errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
930 return myself;
934 * Change options of a publication.
936 static void
937 AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
938 Relation rel, HeapTuple tup)
940 bool nulls[Natts_pg_publication];
941 bool replaces[Natts_pg_publication];
942 Datum values[Natts_pg_publication];
943 bool publish_given;
944 PublicationActions pubactions;
945 bool publish_via_partition_root_given;
946 bool publish_via_partition_root;
947 bool publish_generated_columns_given;
948 char publish_generated_columns;
949 ObjectAddress obj;
950 Form_pg_publication pubform;
951 List *root_relids = NIL;
952 ListCell *lc;
954 parse_publication_options(pstate,
955 stmt->options,
956 &publish_given, &pubactions,
957 &publish_via_partition_root_given,
958 &publish_via_partition_root,
959 &publish_generated_columns_given,
960 &publish_generated_columns);
962 pubform = (Form_pg_publication) GETSTRUCT(tup);
965 * If the publication doesn't publish changes via the root partitioned
966 * table, the partition's row filter and column list will be used. So
967 * disallow using WHERE clause and column lists on partitioned table in
968 * this case.
970 if (!pubform->puballtables && publish_via_partition_root_given &&
971 !publish_via_partition_root)
974 * Lock the publication so nobody else can do anything with it. This
975 * prevents concurrent alter to add partitioned table(s) with WHERE
976 * clause(s) and/or column lists which we don't allow when not
977 * publishing via root.
979 LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
980 AccessShareLock);
982 root_relids = GetPublicationRelations(pubform->oid,
983 PUBLICATION_PART_ROOT);
985 foreach(lc, root_relids)
987 Oid relid = lfirst_oid(lc);
988 HeapTuple rftuple;
989 char relkind;
990 char *relname;
991 bool has_rowfilter;
992 bool has_collist;
995 * Beware: we don't have lock on the relations, so cope silently
996 * with the cache lookups returning NULL.
999 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1000 ObjectIdGetDatum(relid),
1001 ObjectIdGetDatum(pubform->oid));
1002 if (!HeapTupleIsValid(rftuple))
1003 continue;
1004 has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
1005 has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
1006 if (!has_rowfilter && !has_collist)
1008 ReleaseSysCache(rftuple);
1009 continue;
1012 relkind = get_rel_relkind(relid);
1013 if (relkind != RELKIND_PARTITIONED_TABLE)
1015 ReleaseSysCache(rftuple);
1016 continue;
1018 relname = get_rel_name(relid);
1019 if (relname == NULL) /* table concurrently dropped */
1021 ReleaseSysCache(rftuple);
1022 continue;
1025 if (has_rowfilter)
1026 ereport(ERROR,
1027 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1028 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1029 "publish_via_partition_root",
1030 stmt->pubname),
1031 errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1032 relname, "publish_via_partition_root")));
1033 Assert(has_collist);
1034 ereport(ERROR,
1035 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1036 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1037 "publish_via_partition_root",
1038 stmt->pubname),
1039 errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1040 relname, "publish_via_partition_root")));
1044 /* Everything ok, form a new tuple. */
1045 memset(values, 0, sizeof(values));
1046 memset(nulls, false, sizeof(nulls));
1047 memset(replaces, false, sizeof(replaces));
1049 if (publish_given)
1051 values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1052 replaces[Anum_pg_publication_pubinsert - 1] = true;
1054 values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1055 replaces[Anum_pg_publication_pubupdate - 1] = true;
1057 values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1058 replaces[Anum_pg_publication_pubdelete - 1] = true;
1060 values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1061 replaces[Anum_pg_publication_pubtruncate - 1] = true;
1064 if (publish_via_partition_root_given)
1066 values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1067 replaces[Anum_pg_publication_pubviaroot - 1] = true;
1070 if (publish_generated_columns_given)
1072 values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
1073 replaces[Anum_pg_publication_pubgencols - 1] = true;
1076 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1077 replaces);
1079 /* Update the catalog. */
1080 CatalogTupleUpdate(rel, &tup->t_self, tup);
1082 CommandCounterIncrement();
1084 pubform = (Form_pg_publication) GETSTRUCT(tup);
1086 /* Invalidate the relcache. */
1087 if (pubform->puballtables)
1089 CacheInvalidateRelcacheAll();
1091 else
1093 List *relids = NIL;
1094 List *schemarelids = NIL;
1097 * For any partitioned tables contained in the publication, we must
1098 * invalidate all partitions contained in the respective partition
1099 * trees, not just those explicitly mentioned in the publication.
1101 if (root_relids == NIL)
1102 relids = GetPublicationRelations(pubform->oid,
1103 PUBLICATION_PART_ALL);
1104 else
1107 * We already got tables explicitly mentioned in the publication.
1108 * Now get all partitions for the partitioned table in the list.
1110 foreach(lc, root_relids)
1111 relids = GetPubPartitionOptionRelations(relids,
1112 PUBLICATION_PART_ALL,
1113 lfirst_oid(lc));
1116 schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1117 PUBLICATION_PART_ALL);
1118 relids = list_concat_unique_oid(relids, schemarelids);
1120 InvalidatePublicationRels(relids);
1123 ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1124 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1125 (Node *) stmt);
1127 InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1131 * Invalidate the relations.
1133 void
1134 InvalidatePublicationRels(List *relids)
1137 * We don't want to send too many individual messages, at some point it's
1138 * cheaper to just reset whole relcache.
1140 if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1142 ListCell *lc;
1144 foreach(lc, relids)
1145 CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1147 else
1148 CacheInvalidateRelcacheAll();
1152 * Add or remove table to/from publication.
1154 static void
1155 AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1156 List *tables, const char *queryString,
1157 bool publish_schema)
1159 List *rels = NIL;
1160 Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1161 Oid pubid = pubform->oid;
1164 * Nothing to do if no objects, except in SET: for that it is quite
1165 * possible that user has not specified any tables in which case we need
1166 * to remove all the existing tables.
1168 if (!tables && stmt->action != AP_SetObjects)
1169 return;
1171 rels = OpenTableList(tables);
1173 if (stmt->action == AP_AddObjects)
1175 TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1177 publish_schema |= is_schema_publication(pubid);
1179 CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1180 pubform->pubviaroot);
1182 PublicationAddTables(pubid, rels, false, stmt);
1184 else if (stmt->action == AP_DropObjects)
1185 PublicationDropTables(pubid, rels, false);
1186 else /* AP_SetObjects */
1188 List *oldrelids = GetPublicationRelations(pubid,
1189 PUBLICATION_PART_ROOT);
1190 List *delrels = NIL;
1191 ListCell *oldlc;
1193 TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1195 CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1196 pubform->pubviaroot);
1199 * To recreate the relation list for the publication, look for
1200 * existing relations that do not need to be dropped.
1202 foreach(oldlc, oldrelids)
1204 Oid oldrelid = lfirst_oid(oldlc);
1205 ListCell *newlc;
1206 PublicationRelInfo *oldrel;
1207 bool found = false;
1208 HeapTuple rftuple;
1209 Node *oldrelwhereclause = NULL;
1210 Bitmapset *oldcolumns = NULL;
1212 /* look up the cache for the old relmap */
1213 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1214 ObjectIdGetDatum(oldrelid),
1215 ObjectIdGetDatum(pubid));
1218 * See if the existing relation currently has a WHERE clause or a
1219 * column list. We need to compare those too.
1221 if (HeapTupleIsValid(rftuple))
1223 bool isnull = true;
1224 Datum whereClauseDatum;
1225 Datum columnListDatum;
1227 /* Load the WHERE clause for this table. */
1228 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1229 Anum_pg_publication_rel_prqual,
1230 &isnull);
1231 if (!isnull)
1232 oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1234 /* Transform the int2vector column list to a bitmap. */
1235 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1236 Anum_pg_publication_rel_prattrs,
1237 &isnull);
1239 if (!isnull)
1240 oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1242 ReleaseSysCache(rftuple);
1245 foreach(newlc, rels)
1247 PublicationRelInfo *newpubrel;
1248 Oid newrelid;
1249 Bitmapset *newcolumns = NULL;
1251 newpubrel = (PublicationRelInfo *) lfirst(newlc);
1252 newrelid = RelationGetRelid(newpubrel->relation);
1255 * Validate the column list. If the column list or WHERE
1256 * clause changes, then the validation done here will be
1257 * duplicated inside PublicationAddTables(). The validation
1258 * is cheap enough that that seems harmless.
1260 newcolumns = pub_collist_validate(newpubrel->relation,
1261 newpubrel->columns);
1264 * Check if any of the new set of relations matches with the
1265 * existing relations in the publication. Additionally, if the
1266 * relation has an associated WHERE clause, check the WHERE
1267 * expressions also match. Same for the column list. Drop the
1268 * rest.
1270 if (newrelid == oldrelid)
1272 if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1273 bms_equal(oldcolumns, newcolumns))
1275 found = true;
1276 break;
1282 * Add the non-matched relations to a list so that they can be
1283 * dropped.
1285 if (!found)
1287 oldrel = palloc(sizeof(PublicationRelInfo));
1288 oldrel->whereClause = NULL;
1289 oldrel->columns = NIL;
1290 oldrel->relation = table_open(oldrelid,
1291 ShareUpdateExclusiveLock);
1292 delrels = lappend(delrels, oldrel);
1296 /* And drop them. */
1297 PublicationDropTables(pubid, delrels, true);
1300 * Don't bother calculating the difference for adding, we'll catch and
1301 * skip existing ones when doing catalog update.
1303 PublicationAddTables(pubid, rels, true, stmt);
1305 CloseTableList(delrels);
1308 CloseTableList(rels);
1312 * Alter the publication schemas.
1314 * Add or remove schemas to/from publication.
1316 static void
1317 AlterPublicationSchemas(AlterPublicationStmt *stmt,
1318 HeapTuple tup, List *schemaidlist)
1320 Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1323 * Nothing to do if no objects, except in SET: for that it is quite
1324 * possible that user has not specified any schemas in which case we need
1325 * to remove all the existing schemas.
1327 if (!schemaidlist && stmt->action != AP_SetObjects)
1328 return;
1331 * Schema lock is held until the publication is altered to prevent
1332 * concurrent schema deletion.
1334 LockSchemaList(schemaidlist);
1335 if (stmt->action == AP_AddObjects)
1337 ListCell *lc;
1338 List *reloids;
1340 reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1342 foreach(lc, reloids)
1344 HeapTuple coltuple;
1346 coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1347 ObjectIdGetDatum(lfirst_oid(lc)),
1348 ObjectIdGetDatum(pubform->oid));
1350 if (!HeapTupleIsValid(coltuple))
1351 continue;
1354 * Disallow adding schema if column list is already part of the
1355 * publication. See CheckPubRelationColumnList.
1357 if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1358 ereport(ERROR,
1359 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1360 errmsg("cannot add schema to publication \"%s\"",
1361 stmt->pubname),
1362 errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1364 ReleaseSysCache(coltuple);
1367 PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1369 else if (stmt->action == AP_DropObjects)
1370 PublicationDropSchemas(pubform->oid, schemaidlist, false);
1371 else /* AP_SetObjects */
1373 List *oldschemaids = GetPublicationSchemas(pubform->oid);
1374 List *delschemas = NIL;
1376 /* Identify which schemas should be dropped */
1377 delschemas = list_difference_oid(oldschemaids, schemaidlist);
1380 * Schema lock is held until the publication is altered to prevent
1381 * concurrent schema deletion.
1383 LockSchemaList(delschemas);
1385 /* And drop them */
1386 PublicationDropSchemas(pubform->oid, delschemas, true);
1389 * Don't bother calculating the difference for adding, we'll catch and
1390 * skip existing ones when doing catalog update.
1392 PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1397 * Check if relations and schemas can be in a given publication and throw
1398 * appropriate error if not.
1400 static void
1401 CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1402 List *tables, List *schemaidlist)
1404 Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1406 if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1407 schemaidlist && !superuser())
1408 ereport(ERROR,
1409 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1410 errmsg("must be superuser to add or set schemas")));
1413 * Check that user is allowed to manipulate the publication tables in
1414 * schema
1416 if (schemaidlist && pubform->puballtables)
1417 ereport(ERROR,
1418 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1419 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1420 NameStr(pubform->pubname)),
1421 errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1423 /* Check that user is allowed to manipulate the publication tables. */
1424 if (tables && pubform->puballtables)
1425 ereport(ERROR,
1426 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1427 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1428 NameStr(pubform->pubname)),
1429 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1433 * Alter the existing publication.
1435 * This is dispatcher function for AlterPublicationOptions,
1436 * AlterPublicationSchemas and AlterPublicationTables.
1438 void
1439 AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1441 Relation rel;
1442 HeapTuple tup;
1443 Form_pg_publication pubform;
1445 rel = table_open(PublicationRelationId, RowExclusiveLock);
1447 tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1448 CStringGetDatum(stmt->pubname));
1450 if (!HeapTupleIsValid(tup))
1451 ereport(ERROR,
1452 (errcode(ERRCODE_UNDEFINED_OBJECT),
1453 errmsg("publication \"%s\" does not exist",
1454 stmt->pubname)));
1456 pubform = (Form_pg_publication) GETSTRUCT(tup);
1458 /* must be owner */
1459 if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1460 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1461 stmt->pubname);
1463 if (stmt->options)
1464 AlterPublicationOptions(pstate, stmt, rel, tup);
1465 else
1467 List *relations = NIL;
1468 List *schemaidlist = NIL;
1469 Oid pubid = pubform->oid;
1471 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1472 &schemaidlist);
1474 CheckAlterPublication(stmt, tup, relations, schemaidlist);
1476 heap_freetuple(tup);
1478 /* Lock the publication so nobody else can do anything with it. */
1479 LockDatabaseObject(PublicationRelationId, pubid, 0,
1480 AccessExclusiveLock);
1483 * It is possible that by the time we acquire the lock on publication,
1484 * concurrent DDL has removed it. We can test this by checking the
1485 * existence of publication. We get the tuple again to avoid the risk
1486 * of any publication option getting changed.
1488 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1489 if (!HeapTupleIsValid(tup))
1490 ereport(ERROR,
1491 errcode(ERRCODE_UNDEFINED_OBJECT),
1492 errmsg("publication \"%s\" does not exist",
1493 stmt->pubname));
1495 AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1496 schemaidlist != NIL);
1497 AlterPublicationSchemas(stmt, tup, schemaidlist);
1500 /* Cleanup. */
1501 heap_freetuple(tup);
1502 table_close(rel, RowExclusiveLock);
1506 * Remove relation from publication by mapping OID.
1508 void
1509 RemovePublicationRelById(Oid proid)
1511 Relation rel;
1512 HeapTuple tup;
1513 Form_pg_publication_rel pubrel;
1514 List *relids = NIL;
1516 rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1518 tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1520 if (!HeapTupleIsValid(tup))
1521 elog(ERROR, "cache lookup failed for publication table %u",
1522 proid);
1524 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1527 * Invalidate relcache so that publication info is rebuilt.
1529 * For the partitioned tables, we must invalidate all partitions contained
1530 * in the respective partition hierarchies, not just the one explicitly
1531 * mentioned in the publication. This is required because we implicitly
1532 * publish the child tables when the parent table is published.
1534 relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1535 pubrel->prrelid);
1537 InvalidatePublicationRels(relids);
1539 CatalogTupleDelete(rel, &tup->t_self);
1541 ReleaseSysCache(tup);
1543 table_close(rel, RowExclusiveLock);
1547 * Remove the publication by mapping OID.
1549 void
1550 RemovePublicationById(Oid pubid)
1552 Relation rel;
1553 HeapTuple tup;
1554 Form_pg_publication pubform;
1556 rel = table_open(PublicationRelationId, RowExclusiveLock);
1558 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1559 if (!HeapTupleIsValid(tup))
1560 elog(ERROR, "cache lookup failed for publication %u", pubid);
1562 pubform = (Form_pg_publication) GETSTRUCT(tup);
1564 /* Invalidate relcache so that publication info is rebuilt. */
1565 if (pubform->puballtables)
1566 CacheInvalidateRelcacheAll();
1568 CatalogTupleDelete(rel, &tup->t_self);
1570 ReleaseSysCache(tup);
1572 table_close(rel, RowExclusiveLock);
1576 * Remove schema from publication by mapping OID.
1578 void
1579 RemovePublicationSchemaById(Oid psoid)
1581 Relation rel;
1582 HeapTuple tup;
1583 List *schemaRels = NIL;
1584 Form_pg_publication_namespace pubsch;
1586 rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1588 tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1590 if (!HeapTupleIsValid(tup))
1591 elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1593 pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1596 * Invalidate relcache so that publication info is rebuilt. See
1597 * RemovePublicationRelById for why we need to consider all the
1598 * partitions.
1600 schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1601 PUBLICATION_PART_ALL);
1602 InvalidatePublicationRels(schemaRels);
1604 CatalogTupleDelete(rel, &tup->t_self);
1606 ReleaseSysCache(tup);
1608 table_close(rel, RowExclusiveLock);
1612 * Open relations specified by a PublicationTable list.
1613 * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1614 * add them to a publication.
1616 static List *
1617 OpenTableList(List *tables)
1619 List *relids = NIL;
1620 List *rels = NIL;
1621 ListCell *lc;
1622 List *relids_with_rf = NIL;
1623 List *relids_with_collist = NIL;
1626 * Open, share-lock, and check all the explicitly-specified relations
1628 foreach(lc, tables)
1630 PublicationTable *t = lfirst_node(PublicationTable, lc);
1631 bool recurse = t->relation->inh;
1632 Relation rel;
1633 Oid myrelid;
1634 PublicationRelInfo *pub_rel;
1636 /* Allow query cancel in case this takes a long time */
1637 CHECK_FOR_INTERRUPTS();
1639 rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
1640 myrelid = RelationGetRelid(rel);
1643 * Filter out duplicates if user specifies "foo, foo".
1645 * Note that this algorithm is known to not be very efficient (O(N^2))
1646 * but given that it only works on list of tables given to us by user
1647 * it's deemed acceptable.
1649 if (list_member_oid(relids, myrelid))
1651 /* Disallow duplicate tables if there are any with row filters. */
1652 if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1653 ereport(ERROR,
1654 (errcode(ERRCODE_DUPLICATE_OBJECT),
1655 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1656 RelationGetRelationName(rel))));
1658 /* Disallow duplicate tables if there are any with column lists. */
1659 if (t->columns || list_member_oid(relids_with_collist, myrelid))
1660 ereport(ERROR,
1661 (errcode(ERRCODE_DUPLICATE_OBJECT),
1662 errmsg("conflicting or redundant column lists for table \"%s\"",
1663 RelationGetRelationName(rel))));
1665 table_close(rel, ShareUpdateExclusiveLock);
1666 continue;
1669 pub_rel = palloc(sizeof(PublicationRelInfo));
1670 pub_rel->relation = rel;
1671 pub_rel->whereClause = t->whereClause;
1672 pub_rel->columns = t->columns;
1673 rels = lappend(rels, pub_rel);
1674 relids = lappend_oid(relids, myrelid);
1676 if (t->whereClause)
1677 relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1679 if (t->columns)
1680 relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1683 * Add children of this rel, if requested, so that they too are added
1684 * to the publication. A partitioned table can't have any inheritance
1685 * children other than its partitions, which need not be explicitly
1686 * added to the publication.
1688 if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1690 List *children;
1691 ListCell *child;
1693 children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1694 NULL);
1696 foreach(child, children)
1698 Oid childrelid = lfirst_oid(child);
1700 /* Allow query cancel in case this takes a long time */
1701 CHECK_FOR_INTERRUPTS();
1704 * Skip duplicates if user specified both parent and child
1705 * tables.
1707 if (list_member_oid(relids, childrelid))
1710 * We don't allow to specify row filter for both parent
1711 * and child table at the same time as it is not very
1712 * clear which one should be given preference.
1714 if (childrelid != myrelid &&
1715 (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1716 ereport(ERROR,
1717 (errcode(ERRCODE_DUPLICATE_OBJECT),
1718 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1719 RelationGetRelationName(rel))));
1722 * We don't allow to specify column list for both parent
1723 * and child table at the same time as it is not very
1724 * clear which one should be given preference.
1726 if (childrelid != myrelid &&
1727 (t->columns || list_member_oid(relids_with_collist, childrelid)))
1728 ereport(ERROR,
1729 (errcode(ERRCODE_DUPLICATE_OBJECT),
1730 errmsg("conflicting or redundant column lists for table \"%s\"",
1731 RelationGetRelationName(rel))));
1733 continue;
1736 /* find_all_inheritors already got lock */
1737 rel = table_open(childrelid, NoLock);
1738 pub_rel = palloc(sizeof(PublicationRelInfo));
1739 pub_rel->relation = rel;
1740 /* child inherits WHERE clause from parent */
1741 pub_rel->whereClause = t->whereClause;
1743 /* child inherits column list from parent */
1744 pub_rel->columns = t->columns;
1745 rels = lappend(rels, pub_rel);
1746 relids = lappend_oid(relids, childrelid);
1748 if (t->whereClause)
1749 relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1751 if (t->columns)
1752 relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1757 list_free(relids);
1758 list_free(relids_with_rf);
1760 return rels;
1764 * Close all relations in the list.
1766 static void
1767 CloseTableList(List *rels)
1769 ListCell *lc;
1771 foreach(lc, rels)
1773 PublicationRelInfo *pub_rel;
1775 pub_rel = (PublicationRelInfo *) lfirst(lc);
1776 table_close(pub_rel->relation, NoLock);
1779 list_free_deep(rels);
1783 * Lock the schemas specified in the schema list in AccessShareLock mode in
1784 * order to prevent concurrent schema deletion.
1786 static void
1787 LockSchemaList(List *schemalist)
1789 ListCell *lc;
1791 foreach(lc, schemalist)
1793 Oid schemaid = lfirst_oid(lc);
1795 /* Allow query cancel in case this takes a long time */
1796 CHECK_FOR_INTERRUPTS();
1797 LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1800 * It is possible that by the time we acquire the lock on schema,
1801 * concurrent DDL has removed it. We can test this by checking the
1802 * existence of schema.
1804 if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1805 ereport(ERROR,
1806 errcode(ERRCODE_UNDEFINED_SCHEMA),
1807 errmsg("schema with OID %u does not exist", schemaid));
1812 * Add listed tables to the publication.
1814 static void
1815 PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1816 AlterPublicationStmt *stmt)
1818 ListCell *lc;
1820 Assert(!stmt || !stmt->for_all_tables);
1822 foreach(lc, rels)
1824 PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1825 Relation rel = pub_rel->relation;
1826 ObjectAddress obj;
1828 /* Must be owner of the table or superuser. */
1829 if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1830 aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
1831 RelationGetRelationName(rel));
1833 obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1834 if (stmt)
1836 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1837 (Node *) stmt);
1839 InvokeObjectPostCreateHook(PublicationRelRelationId,
1840 obj.objectId, 0);
1846 * Remove listed tables from the publication.
1848 static void
1849 PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1851 ObjectAddress obj;
1852 ListCell *lc;
1853 Oid prid;
1855 foreach(lc, rels)
1857 PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1858 Relation rel = pubrel->relation;
1859 Oid relid = RelationGetRelid(rel);
1861 if (pubrel->columns)
1862 ereport(ERROR,
1863 errcode(ERRCODE_SYNTAX_ERROR),
1864 errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1866 prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1867 ObjectIdGetDatum(relid),
1868 ObjectIdGetDatum(pubid));
1869 if (!OidIsValid(prid))
1871 if (missing_ok)
1872 continue;
1874 ereport(ERROR,
1875 (errcode(ERRCODE_UNDEFINED_OBJECT),
1876 errmsg("relation \"%s\" is not part of the publication",
1877 RelationGetRelationName(rel))));
1880 if (pubrel->whereClause)
1881 ereport(ERROR,
1882 (errcode(ERRCODE_SYNTAX_ERROR),
1883 errmsg("cannot use a WHERE clause when removing a table from a publication")));
1885 ObjectAddressSet(obj, PublicationRelRelationId, prid);
1886 performDeletion(&obj, DROP_CASCADE, 0);
1891 * Add listed schemas to the publication.
1893 static void
1894 PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1895 AlterPublicationStmt *stmt)
1897 ListCell *lc;
1899 Assert(!stmt || !stmt->for_all_tables);
1901 foreach(lc, schemas)
1903 Oid schemaid = lfirst_oid(lc);
1904 ObjectAddress obj;
1906 obj = publication_add_schema(pubid, schemaid, if_not_exists);
1907 if (stmt)
1909 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1910 (Node *) stmt);
1912 InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1913 obj.objectId, 0);
1919 * Remove listed schemas from the publication.
1921 static void
1922 PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1924 ObjectAddress obj;
1925 ListCell *lc;
1926 Oid psid;
1928 foreach(lc, schemas)
1930 Oid schemaid = lfirst_oid(lc);
1932 psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
1933 Anum_pg_publication_namespace_oid,
1934 ObjectIdGetDatum(schemaid),
1935 ObjectIdGetDatum(pubid));
1936 if (!OidIsValid(psid))
1938 if (missing_ok)
1939 continue;
1941 ereport(ERROR,
1942 (errcode(ERRCODE_UNDEFINED_OBJECT),
1943 errmsg("tables from schema \"%s\" are not part of the publication",
1944 get_namespace_name(schemaid))));
1947 ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1948 performDeletion(&obj, DROP_CASCADE, 0);
1953 * Internal workhorse for changing a publication owner
1955 static void
1956 AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1958 Form_pg_publication form;
1960 form = (Form_pg_publication) GETSTRUCT(tup);
1962 if (form->pubowner == newOwnerId)
1963 return;
1965 if (!superuser())
1967 AclResult aclresult;
1969 /* Must be owner */
1970 if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
1971 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1972 NameStr(form->pubname));
1974 /* Must be able to become new owner */
1975 check_can_set_role(GetUserId(), newOwnerId);
1977 /* New owner must have CREATE privilege on database */
1978 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
1979 if (aclresult != ACLCHECK_OK)
1980 aclcheck_error(aclresult, OBJECT_DATABASE,
1981 get_database_name(MyDatabaseId));
1983 if (form->puballtables && !superuser_arg(newOwnerId))
1984 ereport(ERROR,
1985 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1986 errmsg("permission denied to change owner of publication \"%s\"",
1987 NameStr(form->pubname)),
1988 errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1990 if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
1991 ereport(ERROR,
1992 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1993 errmsg("permission denied to change owner of publication \"%s\"",
1994 NameStr(form->pubname)),
1995 errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1998 form->pubowner = newOwnerId;
1999 CatalogTupleUpdate(rel, &tup->t_self, tup);
2001 /* Update owner dependency reference */
2002 changeDependencyOnOwner(PublicationRelationId,
2003 form->oid,
2004 newOwnerId);
2006 InvokeObjectPostAlterHook(PublicationRelationId,
2007 form->oid, 0);
2011 * Change publication owner -- by name
2013 ObjectAddress
2014 AlterPublicationOwner(const char *name, Oid newOwnerId)
2016 Oid subid;
2017 HeapTuple tup;
2018 Relation rel;
2019 ObjectAddress address;
2020 Form_pg_publication pubform;
2022 rel = table_open(PublicationRelationId, RowExclusiveLock);
2024 tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2026 if (!HeapTupleIsValid(tup))
2027 ereport(ERROR,
2028 (errcode(ERRCODE_UNDEFINED_OBJECT),
2029 errmsg("publication \"%s\" does not exist", name)));
2031 pubform = (Form_pg_publication) GETSTRUCT(tup);
2032 subid = pubform->oid;
2034 AlterPublicationOwner_internal(rel, tup, newOwnerId);
2036 ObjectAddressSet(address, PublicationRelationId, subid);
2038 heap_freetuple(tup);
2040 table_close(rel, RowExclusiveLock);
2042 return address;
2046 * Change publication owner -- by OID
2048 void
2049 AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
2051 HeapTuple tup;
2052 Relation rel;
2054 rel = table_open(PublicationRelationId, RowExclusiveLock);
2056 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
2058 if (!HeapTupleIsValid(tup))
2059 ereport(ERROR,
2060 (errcode(ERRCODE_UNDEFINED_OBJECT),
2061 errmsg("publication with OID %u does not exist", subid)));
2063 AlterPublicationOwner_internal(rel, tup, newOwnerId);
2065 heap_freetuple(tup);
2067 table_close(rel, RowExclusiveLock);
2071 * Extract the publish_generated_columns option value from a DefElem. "stored"
2072 * and "none" values are accepted.
2074 static char
2075 defGetGeneratedColsOption(DefElem *def)
2077 char *sval;
2080 * If no parameter value given, assume "stored" is meant.
2082 if (!def->arg)
2083 return PUBLISH_GENCOLS_STORED;
2085 sval = defGetString(def);
2087 if (pg_strcasecmp(sval, "none") == 0)
2088 return PUBLISH_GENCOLS_NONE;
2089 if (pg_strcasecmp(sval, "stored") == 0)
2090 return PUBLISH_GENCOLS_STORED;
2092 ereport(ERROR,
2093 errcode(ERRCODE_SYNTAX_ERROR),
2094 errmsg("%s requires a \"none\" or \"stored\" value",
2095 def->defname));
2097 return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */