Make nbtree split REDO locking match original execution.
[pgsql.git] / src / backend / catalog / pg_constraint.c
blobfdc63e7dea16b3ab41b29af4837b0270a84b1d7c
1 /*-------------------------------------------------------------------------
3 * pg_constraint.c
4 * routines to support manipulation of the pg_constraint relation
6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/backend/catalog/pg_constraint.c
13 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/genam.h"
18 #include "access/htup_details.h"
19 #include "access/sysattr.h"
20 #include "access/table.h"
21 #include "access/xact.h"
22 #include "catalog/catalog.h"
23 #include "catalog/dependency.h"
24 #include "catalog/indexing.h"
25 #include "catalog/objectaccess.h"
26 #include "catalog/pg_constraint.h"
27 #include "catalog/pg_operator.h"
28 #include "catalog/pg_type.h"
29 #include "commands/defrem.h"
30 #include "commands/tablecmds.h"
31 #include "utils/array.h"
32 #include "utils/builtins.h"
33 #include "utils/fmgroids.h"
34 #include "utils/lsyscache.h"
35 #include "utils/rel.h"
36 #include "utils/syscache.h"
40 * CreateConstraintEntry
41 * Create a constraint table entry.
43 * Subsidiary records (such as triggers or indexes to implement the
44 * constraint) are *not* created here. But we do make dependency links
45 * from the constraint to the things it depends on.
47 * The new constraint's OID is returned.
49 Oid
50 CreateConstraintEntry(const char *constraintName,
51 Oid constraintNamespace,
52 char constraintType,
53 bool isDeferrable,
54 bool isDeferred,
55 bool isValidated,
56 Oid parentConstrId,
57 Oid relId,
58 const int16 *constraintKey,
59 int constraintNKeys,
60 int constraintNTotalKeys,
61 Oid domainId,
62 Oid indexRelId,
63 Oid foreignRelId,
64 const int16 *foreignKey,
65 const Oid *pfEqOp,
66 const Oid *ppEqOp,
67 const Oid *ffEqOp,
68 int foreignNKeys,
69 char foreignUpdateType,
70 char foreignDeleteType,
71 char foreignMatchType,
72 const Oid *exclOp,
73 Node *conExpr,
74 const char *conBin,
75 bool conIsLocal,
76 int conInhCount,
77 bool conNoInherit,
78 bool is_internal)
80 Relation conDesc;
81 Oid conOid;
82 HeapTuple tup;
83 bool nulls[Natts_pg_constraint];
84 Datum values[Natts_pg_constraint];
85 ArrayType *conkeyArray;
86 ArrayType *confkeyArray;
87 ArrayType *conpfeqopArray;
88 ArrayType *conppeqopArray;
89 ArrayType *conffeqopArray;
90 ArrayType *conexclopArray;
91 NameData cname;
92 int i;
93 ObjectAddress conobject;
95 conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
97 Assert(constraintName);
98 namestrcpy(&cname, constraintName);
101 * Convert C arrays into Postgres arrays.
103 if (constraintNKeys > 0)
105 Datum *conkey;
107 conkey = (Datum *) palloc(constraintNKeys * sizeof(Datum));
108 for (i = 0; i < constraintNKeys; i++)
109 conkey[i] = Int16GetDatum(constraintKey[i]);
110 conkeyArray = construct_array(conkey, constraintNKeys,
111 INT2OID, 2, true, TYPALIGN_SHORT);
113 else
114 conkeyArray = NULL;
116 if (foreignNKeys > 0)
118 Datum *fkdatums;
120 fkdatums = (Datum *) palloc(foreignNKeys * sizeof(Datum));
121 for (i = 0; i < foreignNKeys; i++)
122 fkdatums[i] = Int16GetDatum(foreignKey[i]);
123 confkeyArray = construct_array(fkdatums, foreignNKeys,
124 INT2OID, 2, true, TYPALIGN_SHORT);
125 for (i = 0; i < foreignNKeys; i++)
126 fkdatums[i] = ObjectIdGetDatum(pfEqOp[i]);
127 conpfeqopArray = construct_array(fkdatums, foreignNKeys,
128 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
129 for (i = 0; i < foreignNKeys; i++)
130 fkdatums[i] = ObjectIdGetDatum(ppEqOp[i]);
131 conppeqopArray = construct_array(fkdatums, foreignNKeys,
132 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
133 for (i = 0; i < foreignNKeys; i++)
134 fkdatums[i] = ObjectIdGetDatum(ffEqOp[i]);
135 conffeqopArray = construct_array(fkdatums, foreignNKeys,
136 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
138 else
140 confkeyArray = NULL;
141 conpfeqopArray = NULL;
142 conppeqopArray = NULL;
143 conffeqopArray = NULL;
146 if (exclOp != NULL)
148 Datum *opdatums;
150 opdatums = (Datum *) palloc(constraintNKeys * sizeof(Datum));
151 for (i = 0; i < constraintNKeys; i++)
152 opdatums[i] = ObjectIdGetDatum(exclOp[i]);
153 conexclopArray = construct_array(opdatums, constraintNKeys,
154 OIDOID, sizeof(Oid), true, TYPALIGN_INT);
156 else
157 conexclopArray = NULL;
159 /* initialize nulls and values */
160 for (i = 0; i < Natts_pg_constraint; i++)
162 nulls[i] = false;
163 values[i] = (Datum) NULL;
166 conOid = GetNewOidWithIndex(conDesc, ConstraintOidIndexId,
167 Anum_pg_constraint_oid);
168 values[Anum_pg_constraint_oid - 1] = ObjectIdGetDatum(conOid);
169 values[Anum_pg_constraint_conname - 1] = NameGetDatum(&cname);
170 values[Anum_pg_constraint_connamespace - 1] = ObjectIdGetDatum(constraintNamespace);
171 values[Anum_pg_constraint_contype - 1] = CharGetDatum(constraintType);
172 values[Anum_pg_constraint_condeferrable - 1] = BoolGetDatum(isDeferrable);
173 values[Anum_pg_constraint_condeferred - 1] = BoolGetDatum(isDeferred);
174 values[Anum_pg_constraint_convalidated - 1] = BoolGetDatum(isValidated);
175 values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(relId);
176 values[Anum_pg_constraint_contypid - 1] = ObjectIdGetDatum(domainId);
177 values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(indexRelId);
178 values[Anum_pg_constraint_conparentid - 1] = ObjectIdGetDatum(parentConstrId);
179 values[Anum_pg_constraint_confrelid - 1] = ObjectIdGetDatum(foreignRelId);
180 values[Anum_pg_constraint_confupdtype - 1] = CharGetDatum(foreignUpdateType);
181 values[Anum_pg_constraint_confdeltype - 1] = CharGetDatum(foreignDeleteType);
182 values[Anum_pg_constraint_confmatchtype - 1] = CharGetDatum(foreignMatchType);
183 values[Anum_pg_constraint_conislocal - 1] = BoolGetDatum(conIsLocal);
184 values[Anum_pg_constraint_coninhcount - 1] = Int32GetDatum(conInhCount);
185 values[Anum_pg_constraint_connoinherit - 1] = BoolGetDatum(conNoInherit);
187 if (conkeyArray)
188 values[Anum_pg_constraint_conkey - 1] = PointerGetDatum(conkeyArray);
189 else
190 nulls[Anum_pg_constraint_conkey - 1] = true;
192 if (confkeyArray)
193 values[Anum_pg_constraint_confkey - 1] = PointerGetDatum(confkeyArray);
194 else
195 nulls[Anum_pg_constraint_confkey - 1] = true;
197 if (conpfeqopArray)
198 values[Anum_pg_constraint_conpfeqop - 1] = PointerGetDatum(conpfeqopArray);
199 else
200 nulls[Anum_pg_constraint_conpfeqop - 1] = true;
202 if (conppeqopArray)
203 values[Anum_pg_constraint_conppeqop - 1] = PointerGetDatum(conppeqopArray);
204 else
205 nulls[Anum_pg_constraint_conppeqop - 1] = true;
207 if (conffeqopArray)
208 values[Anum_pg_constraint_conffeqop - 1] = PointerGetDatum(conffeqopArray);
209 else
210 nulls[Anum_pg_constraint_conffeqop - 1] = true;
212 if (conexclopArray)
213 values[Anum_pg_constraint_conexclop - 1] = PointerGetDatum(conexclopArray);
214 else
215 nulls[Anum_pg_constraint_conexclop - 1] = true;
217 if (conBin)
218 values[Anum_pg_constraint_conbin - 1] = CStringGetTextDatum(conBin);
219 else
220 nulls[Anum_pg_constraint_conbin - 1] = true;
222 tup = heap_form_tuple(RelationGetDescr(conDesc), values, nulls);
224 CatalogTupleInsert(conDesc, tup);
226 ObjectAddressSet(conobject, ConstraintRelationId, conOid);
228 table_close(conDesc, RowExclusiveLock);
230 if (OidIsValid(relId))
233 * Register auto dependency from constraint to owning relation, or to
234 * specific column(s) if any are mentioned.
236 ObjectAddress relobject;
238 if (constraintNTotalKeys > 0)
240 for (i = 0; i < constraintNTotalKeys; i++)
242 ObjectAddressSubSet(relobject, RelationRelationId, relId,
243 constraintKey[i]);
244 recordDependencyOn(&conobject, &relobject, DEPENDENCY_AUTO);
247 else
249 ObjectAddressSet(relobject, RelationRelationId, relId);
250 recordDependencyOn(&conobject, &relobject, DEPENDENCY_AUTO);
254 if (OidIsValid(domainId))
257 * Register auto dependency from constraint to owning domain
259 ObjectAddress domobject;
261 ObjectAddressSet(domobject, TypeRelationId, domainId);
262 recordDependencyOn(&conobject, &domobject, DEPENDENCY_AUTO);
265 if (OidIsValid(foreignRelId))
268 * Register normal dependency from constraint to foreign relation, or
269 * to specific column(s) if any are mentioned.
271 ObjectAddress relobject;
273 if (foreignNKeys > 0)
275 for (i = 0; i < foreignNKeys; i++)
277 ObjectAddressSubSet(relobject, RelationRelationId,
278 foreignRelId, foreignKey[i]);
279 recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
282 else
284 ObjectAddressSet(relobject, RelationRelationId, foreignRelId);
285 recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
289 if (OidIsValid(indexRelId) && constraintType == CONSTRAINT_FOREIGN)
292 * Register normal dependency on the unique index that supports a
293 * foreign-key constraint. (Note: for indexes associated with unique
294 * or primary-key constraints, the dependency runs the other way, and
295 * is not made here.)
297 ObjectAddress relobject;
299 ObjectAddressSet(relobject, RelationRelationId, indexRelId);
300 recordDependencyOn(&conobject, &relobject, DEPENDENCY_NORMAL);
303 if (foreignNKeys > 0)
306 * Register normal dependencies on the equality operators that support
307 * a foreign-key constraint. If the PK and FK types are the same then
308 * all three operators for a column are the same; otherwise they are
309 * different.
311 ObjectAddress oprobject;
313 oprobject.classId = OperatorRelationId;
314 oprobject.objectSubId = 0;
316 for (i = 0; i < foreignNKeys; i++)
318 oprobject.objectId = pfEqOp[i];
319 recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
320 if (ppEqOp[i] != pfEqOp[i])
322 oprobject.objectId = ppEqOp[i];
323 recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
325 if (ffEqOp[i] != pfEqOp[i])
327 oprobject.objectId = ffEqOp[i];
328 recordDependencyOn(&conobject, &oprobject, DEPENDENCY_NORMAL);
334 * We don't bother to register dependencies on the exclusion operators of
335 * an exclusion constraint. We assume they are members of the opclass
336 * supporting the index, so there's an indirect dependency via that. (This
337 * would be pretty dicey for cross-type operators, but exclusion operators
338 * can never be cross-type.)
341 if (conExpr != NULL)
344 * Register dependencies from constraint to objects mentioned in CHECK
345 * expression.
347 recordDependencyOnSingleRelExpr(&conobject, conExpr, relId,
348 DEPENDENCY_NORMAL,
349 DEPENDENCY_NORMAL, false);
352 /* Post creation hook for new constraint */
353 InvokeObjectPostCreateHookArg(ConstraintRelationId, conOid, 0,
354 is_internal);
356 return conOid;
360 * Test whether given name is currently used as a constraint name
361 * for the given object (relation or domain).
363 * This is used to decide whether to accept a user-specified constraint name.
364 * It is deliberately not the same test as ChooseConstraintName uses to decide
365 * whether an auto-generated name is OK: here, we will allow it unless there
366 * is an identical constraint name in use *on the same object*.
368 * NB: Caller should hold exclusive lock on the given object, else
369 * this test can be fooled by concurrent additions.
371 bool
372 ConstraintNameIsUsed(ConstraintCategory conCat, Oid objId,
373 const char *conname)
375 bool found;
376 Relation conDesc;
377 SysScanDesc conscan;
378 ScanKeyData skey[3];
380 conDesc = table_open(ConstraintRelationId, AccessShareLock);
382 ScanKeyInit(&skey[0],
383 Anum_pg_constraint_conrelid,
384 BTEqualStrategyNumber, F_OIDEQ,
385 ObjectIdGetDatum((conCat == CONSTRAINT_RELATION)
386 ? objId : InvalidOid));
387 ScanKeyInit(&skey[1],
388 Anum_pg_constraint_contypid,
389 BTEqualStrategyNumber, F_OIDEQ,
390 ObjectIdGetDatum((conCat == CONSTRAINT_DOMAIN)
391 ? objId : InvalidOid));
392 ScanKeyInit(&skey[2],
393 Anum_pg_constraint_conname,
394 BTEqualStrategyNumber, F_NAMEEQ,
395 CStringGetDatum(conname));
397 conscan = systable_beginscan(conDesc, ConstraintRelidTypidNameIndexId,
398 true, NULL, 3, skey);
400 /* There can be at most one matching row */
401 found = (HeapTupleIsValid(systable_getnext(conscan)));
403 systable_endscan(conscan);
404 table_close(conDesc, AccessShareLock);
406 return found;
410 * Does any constraint of the given name exist in the given namespace?
412 * This is used for code that wants to match ChooseConstraintName's rule
413 * that we should avoid autogenerating duplicate constraint names within a
414 * namespace.
416 bool
417 ConstraintNameExists(const char *conname, Oid namespaceid)
419 bool found;
420 Relation conDesc;
421 SysScanDesc conscan;
422 ScanKeyData skey[2];
424 conDesc = table_open(ConstraintRelationId, AccessShareLock);
426 ScanKeyInit(&skey[0],
427 Anum_pg_constraint_conname,
428 BTEqualStrategyNumber, F_NAMEEQ,
429 CStringGetDatum(conname));
431 ScanKeyInit(&skey[1],
432 Anum_pg_constraint_connamespace,
433 BTEqualStrategyNumber, F_OIDEQ,
434 ObjectIdGetDatum(namespaceid));
436 conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
437 NULL, 2, skey);
439 found = (HeapTupleIsValid(systable_getnext(conscan)));
441 systable_endscan(conscan);
442 table_close(conDesc, AccessShareLock);
444 return found;
448 * Select a nonconflicting name for a new constraint.
450 * The objective here is to choose a name that is unique within the
451 * specified namespace. Postgres does not require this, but the SQL
452 * spec does, and some apps depend on it. Therefore we avoid choosing
453 * default names that so conflict.
455 * name1, name2, and label are used the same way as for makeObjectName(),
456 * except that the label can't be NULL; digits will be appended to the label
457 * if needed to create a name that is unique within the specified namespace.
459 * 'others' can be a list of string names already chosen within the current
460 * command (but not yet reflected into the catalogs); we will not choose
461 * a duplicate of one of these either.
463 * Note: it is theoretically possible to get a collision anyway, if someone
464 * else chooses the same name concurrently. This is fairly unlikely to be
465 * a problem in practice, especially if one is holding an exclusive lock on
466 * the relation identified by name1.
468 * Returns a palloc'd string.
470 char *
471 ChooseConstraintName(const char *name1, const char *name2,
472 const char *label, Oid namespaceid,
473 List *others)
475 int pass = 0;
476 char *conname = NULL;
477 char modlabel[NAMEDATALEN];
478 Relation conDesc;
479 SysScanDesc conscan;
480 ScanKeyData skey[2];
481 bool found;
482 ListCell *l;
484 conDesc = table_open(ConstraintRelationId, AccessShareLock);
486 /* try the unmodified label first */
487 StrNCpy(modlabel, label, sizeof(modlabel));
489 for (;;)
491 conname = makeObjectName(name1, name2, modlabel);
493 found = false;
495 foreach(l, others)
497 if (strcmp((char *) lfirst(l), conname) == 0)
499 found = true;
500 break;
504 if (!found)
506 ScanKeyInit(&skey[0],
507 Anum_pg_constraint_conname,
508 BTEqualStrategyNumber, F_NAMEEQ,
509 CStringGetDatum(conname));
511 ScanKeyInit(&skey[1],
512 Anum_pg_constraint_connamespace,
513 BTEqualStrategyNumber, F_OIDEQ,
514 ObjectIdGetDatum(namespaceid));
516 conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
517 NULL, 2, skey);
519 found = (HeapTupleIsValid(systable_getnext(conscan)));
521 systable_endscan(conscan);
524 if (!found)
525 break;
527 /* found a conflict, so try a new name component */
528 pfree(conname);
529 snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
532 table_close(conDesc, AccessShareLock);
534 return conname;
538 * Delete a single constraint record.
540 void
541 RemoveConstraintById(Oid conId)
543 Relation conDesc;
544 HeapTuple tup;
545 Form_pg_constraint con;
547 conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
549 tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conId));
550 if (!HeapTupleIsValid(tup)) /* should not happen */
551 elog(ERROR, "cache lookup failed for constraint %u", conId);
552 con = (Form_pg_constraint) GETSTRUCT(tup);
555 * Special processing depending on what the constraint is for.
557 if (OidIsValid(con->conrelid))
559 Relation rel;
562 * If the constraint is for a relation, open and exclusive-lock the
563 * relation it's for.
565 rel = table_open(con->conrelid, AccessExclusiveLock);
568 * We need to update the relchecks count if it is a check constraint
569 * being dropped. This update will force backends to rebuild relcache
570 * entries when we commit.
572 if (con->contype == CONSTRAINT_CHECK)
574 Relation pgrel;
575 HeapTuple relTup;
576 Form_pg_class classForm;
578 pgrel = table_open(RelationRelationId, RowExclusiveLock);
579 relTup = SearchSysCacheCopy1(RELOID,
580 ObjectIdGetDatum(con->conrelid));
581 if (!HeapTupleIsValid(relTup))
582 elog(ERROR, "cache lookup failed for relation %u",
583 con->conrelid);
584 classForm = (Form_pg_class) GETSTRUCT(relTup);
586 if (classForm->relchecks == 0) /* should not happen */
587 elog(ERROR, "relation \"%s\" has relchecks = 0",
588 RelationGetRelationName(rel));
589 classForm->relchecks--;
591 CatalogTupleUpdate(pgrel, &relTup->t_self, relTup);
593 heap_freetuple(relTup);
595 table_close(pgrel, RowExclusiveLock);
598 /* Keep lock on constraint's rel until end of xact */
599 table_close(rel, NoLock);
601 else if (OidIsValid(con->contypid))
604 * XXX for now, do nothing special when dropping a domain constraint
606 * Probably there should be some form of locking on the domain type,
607 * but we have no such concept at the moment.
610 else
611 elog(ERROR, "constraint %u is not of a known type", conId);
613 /* Fry the constraint itself */
614 CatalogTupleDelete(conDesc, &tup->t_self);
616 /* Clean up */
617 ReleaseSysCache(tup);
618 table_close(conDesc, RowExclusiveLock);
622 * RenameConstraintById
623 * Rename a constraint.
625 * Note: this isn't intended to be a user-exposed function; it doesn't check
626 * permissions etc. Currently this is only invoked when renaming an index
627 * that is associated with a constraint, but it's made a little more general
628 * than that with the expectation of someday having ALTER TABLE RENAME
629 * CONSTRAINT.
631 void
632 RenameConstraintById(Oid conId, const char *newname)
634 Relation conDesc;
635 HeapTuple tuple;
636 Form_pg_constraint con;
638 conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
640 tuple = SearchSysCacheCopy1(CONSTROID, ObjectIdGetDatum(conId));
641 if (!HeapTupleIsValid(tuple))
642 elog(ERROR, "cache lookup failed for constraint %u", conId);
643 con = (Form_pg_constraint) GETSTRUCT(tuple);
646 * For user-friendliness, check whether the name is already in use.
648 if (OidIsValid(con->conrelid) &&
649 ConstraintNameIsUsed(CONSTRAINT_RELATION,
650 con->conrelid,
651 newname))
652 ereport(ERROR,
653 (errcode(ERRCODE_DUPLICATE_OBJECT),
654 errmsg("constraint \"%s\" for relation \"%s\" already exists",
655 newname, get_rel_name(con->conrelid))));
656 if (OidIsValid(con->contypid) &&
657 ConstraintNameIsUsed(CONSTRAINT_DOMAIN,
658 con->contypid,
659 newname))
660 ereport(ERROR,
661 (errcode(ERRCODE_DUPLICATE_OBJECT),
662 errmsg("constraint \"%s\" for domain %s already exists",
663 newname, format_type_be(con->contypid))));
665 /* OK, do the rename --- tuple is a copy, so OK to scribble on it */
666 namestrcpy(&(con->conname), newname);
668 CatalogTupleUpdate(conDesc, &tuple->t_self, tuple);
670 InvokeObjectPostAlterHook(ConstraintRelationId, conId, 0);
672 heap_freetuple(tuple);
673 table_close(conDesc, RowExclusiveLock);
677 * AlterConstraintNamespaces
678 * Find any constraints belonging to the specified object,
679 * and move them to the specified new namespace.
681 * isType indicates whether the owning object is a type or a relation.
683 void
684 AlterConstraintNamespaces(Oid ownerId, Oid oldNspId,
685 Oid newNspId, bool isType, ObjectAddresses *objsMoved)
687 Relation conRel;
688 ScanKeyData key[2];
689 SysScanDesc scan;
690 HeapTuple tup;
692 conRel = table_open(ConstraintRelationId, RowExclusiveLock);
694 ScanKeyInit(&key[0],
695 Anum_pg_constraint_conrelid,
696 BTEqualStrategyNumber, F_OIDEQ,
697 ObjectIdGetDatum(isType ? InvalidOid : ownerId));
698 ScanKeyInit(&key[1],
699 Anum_pg_constraint_contypid,
700 BTEqualStrategyNumber, F_OIDEQ,
701 ObjectIdGetDatum(isType ? ownerId : InvalidOid));
703 scan = systable_beginscan(conRel, ConstraintRelidTypidNameIndexId, true,
704 NULL, 2, key);
706 while (HeapTupleIsValid((tup = systable_getnext(scan))))
708 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(tup);
709 ObjectAddress thisobj;
711 ObjectAddressSet(thisobj, ConstraintRelationId, conform->oid);
713 if (object_address_present(&thisobj, objsMoved))
714 continue;
716 /* Don't update if the object is already part of the namespace */
717 if (conform->connamespace == oldNspId && oldNspId != newNspId)
719 tup = heap_copytuple(tup);
720 conform = (Form_pg_constraint) GETSTRUCT(tup);
722 conform->connamespace = newNspId;
724 CatalogTupleUpdate(conRel, &tup->t_self, tup);
727 * Note: currently, the constraint will not have its own
728 * dependency on the namespace, so we don't need to do
729 * changeDependencyFor().
733 InvokeObjectPostAlterHook(ConstraintRelationId, thisobj.objectId, 0);
735 add_exact_object_address(&thisobj, objsMoved);
738 systable_endscan(scan);
740 table_close(conRel, RowExclusiveLock);
744 * ConstraintSetParentConstraint
745 * Set a partition's constraint as child of its parent constraint,
746 * or remove the linkage if parentConstrId is InvalidOid.
748 * This updates the constraint's pg_constraint row to show it as inherited, and
749 * adds PARTITION dependencies to prevent the constraint from being deleted
750 * on its own. Alternatively, reverse that.
752 void
753 ConstraintSetParentConstraint(Oid childConstrId,
754 Oid parentConstrId,
755 Oid childTableId)
757 Relation constrRel;
758 Form_pg_constraint constrForm;
759 HeapTuple tuple,
760 newtup;
761 ObjectAddress depender;
762 ObjectAddress referenced;
764 constrRel = table_open(ConstraintRelationId, RowExclusiveLock);
765 tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(childConstrId));
766 if (!HeapTupleIsValid(tuple))
767 elog(ERROR, "cache lookup failed for constraint %u", childConstrId);
768 newtup = heap_copytuple(tuple);
769 constrForm = (Form_pg_constraint) GETSTRUCT(newtup);
770 if (OidIsValid(parentConstrId))
772 /* don't allow setting parent for a constraint that already has one */
773 Assert(constrForm->coninhcount == 0);
774 if (constrForm->conparentid != InvalidOid)
775 elog(ERROR, "constraint %u already has a parent constraint",
776 childConstrId);
778 constrForm->conislocal = false;
779 constrForm->coninhcount++;
780 constrForm->conparentid = parentConstrId;
782 CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
784 ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
786 ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
787 recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_PRI);
789 ObjectAddressSet(referenced, RelationRelationId, childTableId);
790 recordDependencyOn(&depender, &referenced, DEPENDENCY_PARTITION_SEC);
792 else
794 constrForm->coninhcount--;
795 constrForm->conislocal = true;
796 constrForm->conparentid = InvalidOid;
798 /* Make sure there's no further inheritance. */
799 Assert(constrForm->coninhcount == 0);
801 CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
803 deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
804 ConstraintRelationId,
805 DEPENDENCY_PARTITION_PRI);
806 deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
807 RelationRelationId,
808 DEPENDENCY_PARTITION_SEC);
811 ReleaseSysCache(tuple);
812 table_close(constrRel, RowExclusiveLock);
817 * get_relation_constraint_oid
818 * Find a constraint on the specified relation with the specified name.
819 * Returns constraint's OID.
822 get_relation_constraint_oid(Oid relid, const char *conname, bool missing_ok)
824 Relation pg_constraint;
825 HeapTuple tuple;
826 SysScanDesc scan;
827 ScanKeyData skey[3];
828 Oid conOid = InvalidOid;
830 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
832 ScanKeyInit(&skey[0],
833 Anum_pg_constraint_conrelid,
834 BTEqualStrategyNumber, F_OIDEQ,
835 ObjectIdGetDatum(relid));
836 ScanKeyInit(&skey[1],
837 Anum_pg_constraint_contypid,
838 BTEqualStrategyNumber, F_OIDEQ,
839 ObjectIdGetDatum(InvalidOid));
840 ScanKeyInit(&skey[2],
841 Anum_pg_constraint_conname,
842 BTEqualStrategyNumber, F_NAMEEQ,
843 CStringGetDatum(conname));
845 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
846 NULL, 3, skey);
848 /* There can be at most one matching row */
849 if (HeapTupleIsValid(tuple = systable_getnext(scan)))
850 conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
852 systable_endscan(scan);
854 /* If no such constraint exists, complain */
855 if (!OidIsValid(conOid) && !missing_ok)
856 ereport(ERROR,
857 (errcode(ERRCODE_UNDEFINED_OBJECT),
858 errmsg("constraint \"%s\" for table \"%s\" does not exist",
859 conname, get_rel_name(relid))));
861 table_close(pg_constraint, AccessShareLock);
863 return conOid;
867 * get_relation_constraint_attnos
868 * Find a constraint on the specified relation with the specified name
869 * and return the constrained columns.
871 * Returns a Bitmapset of the column attnos of the constrained columns, with
872 * attnos being offset by FirstLowInvalidHeapAttributeNumber so that system
873 * columns can be represented.
875 * *constraintOid is set to the OID of the constraint, or InvalidOid on
876 * failure.
878 Bitmapset *
879 get_relation_constraint_attnos(Oid relid, const char *conname,
880 bool missing_ok, Oid *constraintOid)
882 Bitmapset *conattnos = NULL;
883 Relation pg_constraint;
884 HeapTuple tuple;
885 SysScanDesc scan;
886 ScanKeyData skey[3];
888 /* Set *constraintOid, to avoid complaints about uninitialized vars */
889 *constraintOid = InvalidOid;
891 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
893 ScanKeyInit(&skey[0],
894 Anum_pg_constraint_conrelid,
895 BTEqualStrategyNumber, F_OIDEQ,
896 ObjectIdGetDatum(relid));
897 ScanKeyInit(&skey[1],
898 Anum_pg_constraint_contypid,
899 BTEqualStrategyNumber, F_OIDEQ,
900 ObjectIdGetDatum(InvalidOid));
901 ScanKeyInit(&skey[2],
902 Anum_pg_constraint_conname,
903 BTEqualStrategyNumber, F_NAMEEQ,
904 CStringGetDatum(conname));
906 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
907 NULL, 3, skey);
909 /* There can be at most one matching row */
910 if (HeapTupleIsValid(tuple = systable_getnext(scan)))
912 Datum adatum;
913 bool isNull;
915 *constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
917 /* Extract the conkey array, ie, attnums of constrained columns */
918 adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
919 RelationGetDescr(pg_constraint), &isNull);
920 if (!isNull)
922 ArrayType *arr;
923 int numcols;
924 int16 *attnums;
925 int i;
927 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
928 numcols = ARR_DIMS(arr)[0];
929 if (ARR_NDIM(arr) != 1 ||
930 numcols < 0 ||
931 ARR_HASNULL(arr) ||
932 ARR_ELEMTYPE(arr) != INT2OID)
933 elog(ERROR, "conkey is not a 1-D smallint array");
934 attnums = (int16 *) ARR_DATA_PTR(arr);
936 /* Construct the result value */
937 for (i = 0; i < numcols; i++)
939 conattnos = bms_add_member(conattnos,
940 attnums[i] - FirstLowInvalidHeapAttributeNumber);
945 systable_endscan(scan);
947 /* If no such constraint exists, complain */
948 if (!OidIsValid(*constraintOid) && !missing_ok)
949 ereport(ERROR,
950 (errcode(ERRCODE_UNDEFINED_OBJECT),
951 errmsg("constraint \"%s\" for table \"%s\" does not exist",
952 conname, get_rel_name(relid))));
954 table_close(pg_constraint, AccessShareLock);
956 return conattnos;
960 * Return the OID of the constraint associated with the given index in the
961 * given relation; or InvalidOid if no such index is catalogued.
964 get_relation_idx_constraint_oid(Oid relationId, Oid indexId)
966 Relation pg_constraint;
967 SysScanDesc scan;
968 ScanKeyData key;
969 HeapTuple tuple;
970 Oid constraintId = InvalidOid;
972 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
974 ScanKeyInit(&key,
975 Anum_pg_constraint_conrelid,
976 BTEqualStrategyNumber,
977 F_OIDEQ,
978 ObjectIdGetDatum(relationId));
979 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId,
980 true, NULL, 1, &key);
981 while ((tuple = systable_getnext(scan)) != NULL)
983 Form_pg_constraint constrForm;
985 constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
986 if (constrForm->conindid == indexId)
988 constraintId = constrForm->oid;
989 break;
992 systable_endscan(scan);
994 table_close(pg_constraint, AccessShareLock);
995 return constraintId;
999 * get_domain_constraint_oid
1000 * Find a constraint on the specified domain with the specified name.
1001 * Returns constraint's OID.
1004 get_domain_constraint_oid(Oid typid, const char *conname, bool missing_ok)
1006 Relation pg_constraint;
1007 HeapTuple tuple;
1008 SysScanDesc scan;
1009 ScanKeyData skey[3];
1010 Oid conOid = InvalidOid;
1012 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1014 ScanKeyInit(&skey[0],
1015 Anum_pg_constraint_conrelid,
1016 BTEqualStrategyNumber, F_OIDEQ,
1017 ObjectIdGetDatum(InvalidOid));
1018 ScanKeyInit(&skey[1],
1019 Anum_pg_constraint_contypid,
1020 BTEqualStrategyNumber, F_OIDEQ,
1021 ObjectIdGetDatum(typid));
1022 ScanKeyInit(&skey[2],
1023 Anum_pg_constraint_conname,
1024 BTEqualStrategyNumber, F_NAMEEQ,
1025 CStringGetDatum(conname));
1027 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1028 NULL, 3, skey);
1030 /* There can be at most one matching row */
1031 if (HeapTupleIsValid(tuple = systable_getnext(scan)))
1032 conOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1034 systable_endscan(scan);
1036 /* If no such constraint exists, complain */
1037 if (!OidIsValid(conOid) && !missing_ok)
1038 ereport(ERROR,
1039 (errcode(ERRCODE_UNDEFINED_OBJECT),
1040 errmsg("constraint \"%s\" for domain %s does not exist",
1041 conname, format_type_be(typid))));
1043 table_close(pg_constraint, AccessShareLock);
1045 return conOid;
1049 * get_primary_key_attnos
1050 * Identify the columns in a relation's primary key, if any.
1052 * Returns a Bitmapset of the column attnos of the primary key's columns,
1053 * with attnos being offset by FirstLowInvalidHeapAttributeNumber so that
1054 * system columns can be represented.
1056 * If there is no primary key, return NULL. We also return NULL if the pkey
1057 * constraint is deferrable and deferrableOk is false.
1059 * *constraintOid is set to the OID of the pkey constraint, or InvalidOid
1060 * on failure.
1062 Bitmapset *
1063 get_primary_key_attnos(Oid relid, bool deferrableOk, Oid *constraintOid)
1065 Bitmapset *pkattnos = NULL;
1066 Relation pg_constraint;
1067 HeapTuple tuple;
1068 SysScanDesc scan;
1069 ScanKeyData skey[1];
1071 /* Set *constraintOid, to avoid complaints about uninitialized vars */
1072 *constraintOid = InvalidOid;
1074 /* Scan pg_constraint for constraints of the target rel */
1075 pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
1077 ScanKeyInit(&skey[0],
1078 Anum_pg_constraint_conrelid,
1079 BTEqualStrategyNumber, F_OIDEQ,
1080 ObjectIdGetDatum(relid));
1082 scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
1083 NULL, 1, skey);
1085 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1087 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
1088 Datum adatum;
1089 bool isNull;
1090 ArrayType *arr;
1091 int16 *attnums;
1092 int numkeys;
1093 int i;
1095 /* Skip constraints that are not PRIMARY KEYs */
1096 if (con->contype != CONSTRAINT_PRIMARY)
1097 continue;
1100 * If the primary key is deferrable, but we've been instructed to
1101 * ignore deferrable constraints, then we might as well give up
1102 * searching, since there can only be a single primary key on a table.
1104 if (con->condeferrable && !deferrableOk)
1105 break;
1107 /* Extract the conkey array, ie, attnums of PK's columns */
1108 adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
1109 RelationGetDescr(pg_constraint), &isNull);
1110 if (isNull)
1111 elog(ERROR, "null conkey for constraint %u",
1112 ((Form_pg_constraint) GETSTRUCT(tuple))->oid);
1113 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1114 numkeys = ARR_DIMS(arr)[0];
1115 if (ARR_NDIM(arr) != 1 ||
1116 numkeys < 0 ||
1117 ARR_HASNULL(arr) ||
1118 ARR_ELEMTYPE(arr) != INT2OID)
1119 elog(ERROR, "conkey is not a 1-D smallint array");
1120 attnums = (int16 *) ARR_DATA_PTR(arr);
1122 /* Construct the result value */
1123 for (i = 0; i < numkeys; i++)
1125 pkattnos = bms_add_member(pkattnos,
1126 attnums[i] - FirstLowInvalidHeapAttributeNumber);
1128 *constraintOid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1130 /* No need to search further */
1131 break;
1134 systable_endscan(scan);
1136 table_close(pg_constraint, AccessShareLock);
1138 return pkattnos;
1142 * Extract data from the pg_constraint tuple of a foreign-key constraint.
1144 * All arguments save the first are output arguments; the last three of them
1145 * can be passed as NULL if caller doesn't need them.
1147 void
1148 DeconstructFkConstraintRow(HeapTuple tuple, int *numfks,
1149 AttrNumber *conkey, AttrNumber *confkey,
1150 Oid *pf_eq_oprs, Oid *pp_eq_oprs, Oid *ff_eq_oprs)
1152 Oid constrId;
1153 Datum adatum;
1154 bool isNull;
1155 ArrayType *arr;
1156 int numkeys;
1158 constrId = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
1161 * We expect the arrays to be 1-D arrays of the right types; verify that.
1162 * We don't need to use deconstruct_array() since the array data is just
1163 * going to look like a C array of values.
1165 adatum = SysCacheGetAttr(CONSTROID, tuple,
1166 Anum_pg_constraint_conkey, &isNull);
1167 if (isNull)
1168 elog(ERROR, "null conkey for constraint %u", constrId);
1169 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1170 if (ARR_NDIM(arr) != 1 ||
1171 ARR_HASNULL(arr) ||
1172 ARR_ELEMTYPE(arr) != INT2OID)
1173 elog(ERROR, "conkey is not a 1-D smallint array");
1174 numkeys = ARR_DIMS(arr)[0];
1175 if (numkeys <= 0 || numkeys > INDEX_MAX_KEYS)
1176 elog(ERROR, "foreign key constraint cannot have %d columns", numkeys);
1177 memcpy(conkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1178 if ((Pointer) arr != DatumGetPointer(adatum))
1179 pfree(arr); /* free de-toasted copy, if any */
1181 adatum = SysCacheGetAttr(CONSTROID, tuple,
1182 Anum_pg_constraint_confkey, &isNull);
1183 if (isNull)
1184 elog(ERROR, "null confkey for constraint %u", constrId);
1185 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1186 if (ARR_NDIM(arr) != 1 ||
1187 ARR_DIMS(arr)[0] != numkeys ||
1188 ARR_HASNULL(arr) ||
1189 ARR_ELEMTYPE(arr) != INT2OID)
1190 elog(ERROR, "confkey is not a 1-D smallint array");
1191 memcpy(confkey, ARR_DATA_PTR(arr), numkeys * sizeof(int16));
1192 if ((Pointer) arr != DatumGetPointer(adatum))
1193 pfree(arr); /* free de-toasted copy, if any */
1195 if (pf_eq_oprs)
1197 adatum = SysCacheGetAttr(CONSTROID, tuple,
1198 Anum_pg_constraint_conpfeqop, &isNull);
1199 if (isNull)
1200 elog(ERROR, "null conpfeqop for constraint %u", constrId);
1201 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1202 /* see TryReuseForeignKey if you change the test below */
1203 if (ARR_NDIM(arr) != 1 ||
1204 ARR_DIMS(arr)[0] != numkeys ||
1205 ARR_HASNULL(arr) ||
1206 ARR_ELEMTYPE(arr) != OIDOID)
1207 elog(ERROR, "conpfeqop is not a 1-D Oid array");
1208 memcpy(pf_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1209 if ((Pointer) arr != DatumGetPointer(adatum))
1210 pfree(arr); /* free de-toasted copy, if any */
1213 if (pp_eq_oprs)
1215 adatum = SysCacheGetAttr(CONSTROID, tuple,
1216 Anum_pg_constraint_conppeqop, &isNull);
1217 if (isNull)
1218 elog(ERROR, "null conppeqop for constraint %u", constrId);
1219 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1220 if (ARR_NDIM(arr) != 1 ||
1221 ARR_DIMS(arr)[0] != numkeys ||
1222 ARR_HASNULL(arr) ||
1223 ARR_ELEMTYPE(arr) != OIDOID)
1224 elog(ERROR, "conppeqop is not a 1-D Oid array");
1225 memcpy(pp_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1226 if ((Pointer) arr != DatumGetPointer(adatum))
1227 pfree(arr); /* free de-toasted copy, if any */
1230 if (ff_eq_oprs)
1232 adatum = SysCacheGetAttr(CONSTROID, tuple,
1233 Anum_pg_constraint_conffeqop, &isNull);
1234 if (isNull)
1235 elog(ERROR, "null conffeqop for constraint %u", constrId);
1236 arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
1237 if (ARR_NDIM(arr) != 1 ||
1238 ARR_DIMS(arr)[0] != numkeys ||
1239 ARR_HASNULL(arr) ||
1240 ARR_ELEMTYPE(arr) != OIDOID)
1241 elog(ERROR, "conffeqop is not a 1-D Oid array");
1242 memcpy(ff_eq_oprs, ARR_DATA_PTR(arr), numkeys * sizeof(Oid));
1243 if ((Pointer) arr != DatumGetPointer(adatum))
1244 pfree(arr); /* free de-toasted copy, if any */
1247 *numfks = numkeys;
1251 * Determine whether a relation can be proven functionally dependent on
1252 * a set of grouping columns. If so, return true and add the pg_constraint
1253 * OIDs of the constraints needed for the proof to the *constraintDeps list.
1255 * grouping_columns is a list of grouping expressions, in which columns of
1256 * the rel of interest are Vars with the indicated varno/varlevelsup.
1258 * Currently we only check to see if the rel has a primary key that is a
1259 * subset of the grouping_columns. We could also use plain unique constraints
1260 * if all their columns are known not null, but there's a problem: we need
1261 * to be able to represent the not-null-ness as part of the constraints added
1262 * to *constraintDeps. FIXME whenever not-null constraints get represented
1263 * in pg_constraint.
1265 bool
1266 check_functional_grouping(Oid relid,
1267 Index varno, Index varlevelsup,
1268 List *grouping_columns,
1269 List **constraintDeps)
1271 Bitmapset *pkattnos;
1272 Bitmapset *groupbyattnos;
1273 Oid constraintOid;
1274 ListCell *gl;
1276 /* If the rel has no PK, then we can't prove functional dependency */
1277 pkattnos = get_primary_key_attnos(relid, false, &constraintOid);
1278 if (pkattnos == NULL)
1279 return false;
1281 /* Identify all the rel's columns that appear in grouping_columns */
1282 groupbyattnos = NULL;
1283 foreach(gl, grouping_columns)
1285 Var *gvar = (Var *) lfirst(gl);
1287 if (IsA(gvar, Var) &&
1288 gvar->varno == varno &&
1289 gvar->varlevelsup == varlevelsup)
1290 groupbyattnos = bms_add_member(groupbyattnos,
1291 gvar->varattno - FirstLowInvalidHeapAttributeNumber);
1294 if (bms_is_subset(pkattnos, groupbyattnos))
1296 /* The PK is a subset of grouping_columns, so we win */
1297 *constraintDeps = lappend_oid(*constraintDeps, constraintOid);
1298 return true;
1301 return false;