Make nbtree split REDO locking match original execution.
[pgsql.git] / src / backend / catalog / heap.c
blobf2ca686397ebd04f93b542a535900b02a4c4b3b4
1 /*-------------------------------------------------------------------------
3 * heap.c
4 * code to create and destroy POSTGRES heap relations
6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/backend/catalog/heap.c
14 * INTERFACE ROUTINES
15 * heap_create() - Create an uncataloged heap relation
16 * heap_create_with_catalog() - Create a cataloged relation
17 * heap_drop_with_catalog() - Removes named relation from catalogs
19 * NOTES
20 * this code taken from access/heap/create.c, which contains
21 * the old heap_create_with_catalog, amcreate, and amdestroy.
22 * those routines will soon call these routines using the function
23 * manager,
24 * just like the poorly named "NewXXX" routines do. The
25 * "New" routines are all going to die soon, once and for all!
26 * -cim 1/13/91
28 *-------------------------------------------------------------------------
30 #include "postgres.h"
32 #include "access/genam.h"
33 #include "access/htup_details.h"
34 #include "access/multixact.h"
35 #include "access/relation.h"
36 #include "access/sysattr.h"
37 #include "access/table.h"
38 #include "access/tableam.h"
39 #include "access/transam.h"
40 #include "access/xact.h"
41 #include "access/xlog.h"
42 #include "catalog/binary_upgrade.h"
43 #include "catalog/catalog.h"
44 #include "catalog/dependency.h"
45 #include "catalog/heap.h"
46 #include "catalog/index.h"
47 #include "catalog/objectaccess.h"
48 #include "catalog/partition.h"
49 #include "catalog/pg_am.h"
50 #include "catalog/pg_attrdef.h"
51 #include "catalog/pg_collation.h"
52 #include "catalog/pg_constraint.h"
53 #include "catalog/pg_foreign_table.h"
54 #include "catalog/pg_inherits.h"
55 #include "catalog/pg_namespace.h"
56 #include "catalog/pg_opclass.h"
57 #include "catalog/pg_partitioned_table.h"
58 #include "catalog/pg_statistic.h"
59 #include "catalog/pg_subscription_rel.h"
60 #include "catalog/pg_tablespace.h"
61 #include "catalog/pg_type.h"
62 #include "catalog/storage.h"
63 #include "catalog/storage_xlog.h"
64 #include "commands/tablecmds.h"
65 #include "commands/typecmds.h"
66 #include "executor/executor.h"
67 #include "miscadmin.h"
68 #include "nodes/nodeFuncs.h"
69 #include "optimizer/optimizer.h"
70 #include "parser/parse_coerce.h"
71 #include "parser/parse_collate.h"
72 #include "parser/parse_expr.h"
73 #include "parser/parse_relation.h"
74 #include "parser/parsetree.h"
75 #include "partitioning/partdesc.h"
76 #include "storage/lmgr.h"
77 #include "storage/predicate.h"
78 #include "storage/smgr.h"
79 #include "utils/acl.h"
80 #include "utils/builtins.h"
81 #include "utils/datum.h"
82 #include "utils/fmgroids.h"
83 #include "utils/inval.h"
84 #include "utils/lsyscache.h"
85 #include "utils/partcache.h"
86 #include "utils/ruleutils.h"
87 #include "utils/snapmgr.h"
88 #include "utils/syscache.h"
91 /* Potentially set by pg_upgrade_support functions */
92 Oid binary_upgrade_next_heap_pg_class_oid = InvalidOid;
93 Oid binary_upgrade_next_toast_pg_class_oid = InvalidOid;
95 static void AddNewRelationTuple(Relation pg_class_desc,
96 Relation new_rel_desc,
97 Oid new_rel_oid,
98 Oid new_type_oid,
99 Oid reloftype,
100 Oid relowner,
101 char relkind,
102 TransactionId relfrozenxid,
103 TransactionId relminmxid,
104 Datum relacl,
105 Datum reloptions);
106 static ObjectAddress AddNewRelationType(const char *typeName,
107 Oid typeNamespace,
108 Oid new_rel_oid,
109 char new_rel_kind,
110 Oid ownerid,
111 Oid new_row_type,
112 Oid new_array_type);
113 static void RelationRemoveInheritance(Oid relid);
114 static Oid StoreRelCheck(Relation rel, const char *ccname, Node *expr,
115 bool is_validated, bool is_local, int inhcount,
116 bool is_no_inherit, bool is_internal);
117 static void StoreConstraints(Relation rel, List *cooked_constraints,
118 bool is_internal);
119 static bool MergeWithExistingConstraint(Relation rel, const char *ccname, Node *expr,
120 bool allow_merge, bool is_local,
121 bool is_initially_valid,
122 bool is_no_inherit);
123 static void SetRelationNumChecks(Relation rel, int numchecks);
124 static Node *cookConstraint(ParseState *pstate,
125 Node *raw_constraint,
126 char *relname);
129 /* ----------------------------------------------------------------
130 * XXX UGLY HARD CODED BADNESS FOLLOWS XXX
132 * these should all be moved to someplace in the lib/catalog
133 * module, if not obliterated first.
134 * ----------------------------------------------------------------
139 * Note:
140 * Should the system special case these attributes in the future?
141 * Advantage: consume much less space in the ATTRIBUTE relation.
142 * Disadvantage: special cases will be all over the place.
146 * The initializers below do not include trailing variable length fields,
147 * but that's OK - we're never going to reference anything beyond the
148 * fixed-size portion of the structure anyway.
151 static const FormData_pg_attribute a1 = {
152 .attname = {"ctid"},
153 .atttypid = TIDOID,
154 .attlen = sizeof(ItemPointerData),
155 .attnum = SelfItemPointerAttributeNumber,
156 .attcacheoff = -1,
157 .atttypmod = -1,
158 .attbyval = false,
159 .attstorage = TYPSTORAGE_PLAIN,
160 .attalign = TYPALIGN_SHORT,
161 .attnotnull = true,
162 .attislocal = true,
165 static const FormData_pg_attribute a2 = {
166 .attname = {"xmin"},
167 .atttypid = XIDOID,
168 .attlen = sizeof(TransactionId),
169 .attnum = MinTransactionIdAttributeNumber,
170 .attcacheoff = -1,
171 .atttypmod = -1,
172 .attbyval = true,
173 .attstorage = TYPSTORAGE_PLAIN,
174 .attalign = TYPALIGN_INT,
175 .attnotnull = true,
176 .attislocal = true,
179 static const FormData_pg_attribute a3 = {
180 .attname = {"cmin"},
181 .atttypid = CIDOID,
182 .attlen = sizeof(CommandId),
183 .attnum = MinCommandIdAttributeNumber,
184 .attcacheoff = -1,
185 .atttypmod = -1,
186 .attbyval = true,
187 .attstorage = TYPSTORAGE_PLAIN,
188 .attalign = TYPALIGN_INT,
189 .attnotnull = true,
190 .attislocal = true,
193 static const FormData_pg_attribute a4 = {
194 .attname = {"xmax"},
195 .atttypid = XIDOID,
196 .attlen = sizeof(TransactionId),
197 .attnum = MaxTransactionIdAttributeNumber,
198 .attcacheoff = -1,
199 .atttypmod = -1,
200 .attbyval = true,
201 .attstorage = TYPSTORAGE_PLAIN,
202 .attalign = TYPALIGN_INT,
203 .attnotnull = true,
204 .attislocal = true,
207 static const FormData_pg_attribute a5 = {
208 .attname = {"cmax"},
209 .atttypid = CIDOID,
210 .attlen = sizeof(CommandId),
211 .attnum = MaxCommandIdAttributeNumber,
212 .attcacheoff = -1,
213 .atttypmod = -1,
214 .attbyval = true,
215 .attstorage = TYPSTORAGE_PLAIN,
216 .attalign = TYPALIGN_INT,
217 .attnotnull = true,
218 .attislocal = true,
222 * We decided to call this attribute "tableoid" rather than say
223 * "classoid" on the basis that in the future there may be more than one
224 * table of a particular class/type. In any case table is still the word
225 * used in SQL.
227 static const FormData_pg_attribute a6 = {
228 .attname = {"tableoid"},
229 .atttypid = OIDOID,
230 .attlen = sizeof(Oid),
231 .attnum = TableOidAttributeNumber,
232 .attcacheoff = -1,
233 .atttypmod = -1,
234 .attbyval = true,
235 .attstorage = TYPSTORAGE_PLAIN,
236 .attalign = TYPALIGN_INT,
237 .attnotnull = true,
238 .attislocal = true,
241 static const FormData_pg_attribute *SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6};
244 * This function returns a Form_pg_attribute pointer for a system attribute.
245 * Note that we elog if the presented attno is invalid, which would only
246 * happen if there's a problem upstream.
248 const FormData_pg_attribute *
249 SystemAttributeDefinition(AttrNumber attno)
251 if (attno >= 0 || attno < -(int) lengthof(SysAtt))
252 elog(ERROR, "invalid system attribute number %d", attno);
253 return SysAtt[-attno - 1];
257 * If the given name is a system attribute name, return a Form_pg_attribute
258 * pointer for a prototype definition. If not, return NULL.
260 const FormData_pg_attribute *
261 SystemAttributeByName(const char *attname)
263 int j;
265 for (j = 0; j < (int) lengthof(SysAtt); j++)
267 const FormData_pg_attribute *att = SysAtt[j];
269 if (strcmp(NameStr(att->attname), attname) == 0)
270 return att;
273 return NULL;
277 /* ----------------------------------------------------------------
278 * XXX END OF UGLY HARD CODED BADNESS XXX
279 * ---------------------------------------------------------------- */
282 /* ----------------------------------------------------------------
283 * heap_create - Create an uncataloged heap relation
285 * Note API change: the caller must now always provide the OID
286 * to use for the relation. The relfilenode may (and, normally,
287 * should) be left unspecified.
289 * rel->rd_rel is initialized by RelationBuildLocalRelation,
290 * and is mostly zeroes at return.
291 * ----------------------------------------------------------------
293 Relation
294 heap_create(const char *relname,
295 Oid relnamespace,
296 Oid reltablespace,
297 Oid relid,
298 Oid relfilenode,
299 Oid accessmtd,
300 TupleDesc tupDesc,
301 char relkind,
302 char relpersistence,
303 bool shared_relation,
304 bool mapped_relation,
305 bool allow_system_table_mods,
306 TransactionId *relfrozenxid,
307 MultiXactId *relminmxid)
309 bool create_storage;
310 Relation rel;
312 /* The caller must have provided an OID for the relation. */
313 Assert(OidIsValid(relid));
316 * Don't allow creating relations in pg_catalog directly, even though it
317 * is allowed to move user defined relations there. Semantics with search
318 * paths including pg_catalog are too confusing for now.
320 * But allow creating indexes on relations in pg_catalog even if
321 * allow_system_table_mods = off, upper layers already guarantee it's on a
322 * user defined relation, not a system one.
324 if (!allow_system_table_mods &&
325 ((IsCatalogNamespace(relnamespace) && relkind != RELKIND_INDEX) ||
326 IsToastNamespace(relnamespace)) &&
327 IsNormalProcessingMode())
328 ereport(ERROR,
329 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
330 errmsg("permission denied to create \"%s.%s\"",
331 get_namespace_name(relnamespace), relname),
332 errdetail("System catalog modifications are currently disallowed.")));
334 *relfrozenxid = InvalidTransactionId;
335 *relminmxid = InvalidMultiXactId;
337 /* Handle reltablespace for specific relkinds. */
338 switch (relkind)
340 case RELKIND_VIEW:
341 case RELKIND_COMPOSITE_TYPE:
342 case RELKIND_FOREIGN_TABLE:
345 * Force reltablespace to zero if the relation has no physical
346 * storage. This is mainly just for cleanliness' sake.
348 * Partitioned tables and indexes don't have physical storage
349 * either, but we want to keep their tablespace settings so that
350 * their children can inherit it.
352 reltablespace = InvalidOid;
353 break;
355 case RELKIND_SEQUENCE:
358 * Force reltablespace to zero for sequences, since we don't
359 * support moving them around into different tablespaces.
361 reltablespace = InvalidOid;
362 break;
363 default:
364 break;
368 * Decide whether to create storage. If caller passed a valid relfilenode,
369 * storage is already created, so don't do it here. Also don't create it
370 * for relkinds without physical storage.
372 if (!RELKIND_HAS_STORAGE(relkind) || OidIsValid(relfilenode))
373 create_storage = false;
374 else
376 create_storage = true;
377 relfilenode = relid;
381 * Never allow a pg_class entry to explicitly specify the database's
382 * default tablespace in reltablespace; force it to zero instead. This
383 * ensures that if the database is cloned with a different default
384 * tablespace, the pg_class entry will still match where CREATE DATABASE
385 * will put the physically copied relation.
387 * Yes, this is a bit of a hack.
389 if (reltablespace == MyDatabaseTableSpace)
390 reltablespace = InvalidOid;
393 * build the relcache entry.
395 rel = RelationBuildLocalRelation(relname,
396 relnamespace,
397 tupDesc,
398 relid,
399 accessmtd,
400 relfilenode,
401 reltablespace,
402 shared_relation,
403 mapped_relation,
404 relpersistence,
405 relkind);
408 * Have the storage manager create the relation's disk file, if needed.
410 * For relations the callback creates both the main and the init fork, for
411 * indexes only the main fork is created. The other forks will be created
412 * on demand.
414 if (create_storage)
416 RelationOpenSmgr(rel);
418 switch (rel->rd_rel->relkind)
420 case RELKIND_VIEW:
421 case RELKIND_COMPOSITE_TYPE:
422 case RELKIND_FOREIGN_TABLE:
423 case RELKIND_PARTITIONED_TABLE:
424 case RELKIND_PARTITIONED_INDEX:
425 Assert(false);
426 break;
428 case RELKIND_INDEX:
429 case RELKIND_SEQUENCE:
430 RelationCreateStorage(rel->rd_node, relpersistence);
431 break;
433 case RELKIND_RELATION:
434 case RELKIND_TOASTVALUE:
435 case RELKIND_MATVIEW:
436 table_relation_set_new_filenode(rel, &rel->rd_node,
437 relpersistence,
438 relfrozenxid, relminmxid);
439 break;
443 return rel;
446 /* ----------------------------------------------------------------
447 * heap_create_with_catalog - Create a cataloged relation
449 * this is done in multiple steps:
451 * 1) CheckAttributeNamesTypes() is used to make certain the tuple
452 * descriptor contains a valid set of attribute names and types
454 * 2) pg_class is opened and get_relname_relid()
455 * performs a scan to ensure that no relation with the
456 * same name already exists.
458 * 3) heap_create() is called to create the new relation on disk.
460 * 4) TypeCreate() is called to define a new type corresponding
461 * to the new relation.
463 * 5) AddNewRelationTuple() is called to register the
464 * relation in pg_class.
466 * 6) AddNewAttributeTuples() is called to register the
467 * new relation's schema in pg_attribute.
469 * 7) StoreConstraints is called () - vadim 08/22/97
471 * 8) the relations are closed and the new relation's oid
472 * is returned.
474 * ----------------------------------------------------------------
477 /* --------------------------------
478 * CheckAttributeNamesTypes
480 * this is used to make certain the tuple descriptor contains a
481 * valid set of attribute names and datatypes. a problem simply
482 * generates ereport(ERROR) which aborts the current transaction.
484 * relkind is the relkind of the relation to be created.
485 * flags controls which datatypes are allowed, cf CheckAttributeType.
486 * --------------------------------
488 void
489 CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
490 int flags)
492 int i;
493 int j;
494 int natts = tupdesc->natts;
496 /* Sanity check on column count */
497 if (natts < 0 || natts > MaxHeapAttributeNumber)
498 ereport(ERROR,
499 (errcode(ERRCODE_TOO_MANY_COLUMNS),
500 errmsg("tables can have at most %d columns",
501 MaxHeapAttributeNumber)));
504 * first check for collision with system attribute names
506 * Skip this for a view or type relation, since those don't have system
507 * attributes.
509 if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
511 for (i = 0; i < natts; i++)
513 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
515 if (SystemAttributeByName(NameStr(attr->attname)) != NULL)
516 ereport(ERROR,
517 (errcode(ERRCODE_DUPLICATE_COLUMN),
518 errmsg("column name \"%s\" conflicts with a system column name",
519 NameStr(attr->attname))));
524 * next check for repeated attribute names
526 for (i = 1; i < natts; i++)
528 for (j = 0; j < i; j++)
530 if (strcmp(NameStr(TupleDescAttr(tupdesc, j)->attname),
531 NameStr(TupleDescAttr(tupdesc, i)->attname)) == 0)
532 ereport(ERROR,
533 (errcode(ERRCODE_DUPLICATE_COLUMN),
534 errmsg("column name \"%s\" specified more than once",
535 NameStr(TupleDescAttr(tupdesc, j)->attname))));
540 * next check the attribute types
542 for (i = 0; i < natts; i++)
544 CheckAttributeType(NameStr(TupleDescAttr(tupdesc, i)->attname),
545 TupleDescAttr(tupdesc, i)->atttypid,
546 TupleDescAttr(tupdesc, i)->attcollation,
547 NIL, /* assume we're creating a new rowtype */
548 flags);
552 /* --------------------------------
553 * CheckAttributeType
555 * Verify that the proposed datatype of an attribute is legal.
556 * This is needed mainly because there are types (and pseudo-types)
557 * in the catalogs that we do not support as elements of real tuples.
558 * We also check some other properties required of a table column.
560 * If the attribute is being proposed for addition to an existing table or
561 * composite type, pass a one-element list of the rowtype OID as
562 * containing_rowtypes. When checking a to-be-created rowtype, it's
563 * sufficient to pass NIL, because there could not be any recursive reference
564 * to a not-yet-existing rowtype.
566 * flags is a bitmask controlling which datatypes we allow. For the most
567 * part, pseudo-types are disallowed as attribute types, but there are some
568 * exceptions: ANYARRAYOID, RECORDOID, and RECORDARRAYOID can be allowed
569 * in some cases. (This works because values of those type classes are
570 * self-identifying to some extent. However, RECORDOID and RECORDARRAYOID
571 * are reliably identifiable only within a session, since the identity info
572 * may use a typmod that is only locally assigned. The caller is expected
573 * to know whether these cases are safe.)
575 * flags can also control the phrasing of the error messages. If
576 * CHKATYPE_IS_PARTKEY is specified, "attname" should be a partition key
577 * column number as text, not a real column name.
578 * --------------------------------
580 void
581 CheckAttributeType(const char *attname,
582 Oid atttypid, Oid attcollation,
583 List *containing_rowtypes,
584 int flags)
586 char att_typtype = get_typtype(atttypid);
587 Oid att_typelem;
589 if (att_typtype == TYPTYPE_PSEUDO)
592 * We disallow pseudo-type columns, with the exception of ANYARRAY,
593 * RECORD, and RECORD[] when the caller says that those are OK.
595 * We don't need to worry about recursive containment for RECORD and
596 * RECORD[] because (a) no named composite type should be allowed to
597 * contain those, and (b) two "anonymous" record types couldn't be
598 * considered to be the same type, so infinite recursion isn't
599 * possible.
601 if (!((atttypid == ANYARRAYOID && (flags & CHKATYPE_ANYARRAY)) ||
602 (atttypid == RECORDOID && (flags & CHKATYPE_ANYRECORD)) ||
603 (atttypid == RECORDARRAYOID && (flags & CHKATYPE_ANYRECORD))))
605 if (flags & CHKATYPE_IS_PARTKEY)
606 ereport(ERROR,
607 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
608 /* translator: first %s is an integer not a name */
609 errmsg("partition key column %s has pseudo-type %s",
610 attname, format_type_be(atttypid))));
611 else
612 ereport(ERROR,
613 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
614 errmsg("column \"%s\" has pseudo-type %s",
615 attname, format_type_be(atttypid))));
618 else if (att_typtype == TYPTYPE_DOMAIN)
621 * If it's a domain, recurse to check its base type.
623 CheckAttributeType(attname, getBaseType(atttypid), attcollation,
624 containing_rowtypes,
625 flags);
627 else if (att_typtype == TYPTYPE_COMPOSITE)
630 * For a composite type, recurse into its attributes.
632 Relation relation;
633 TupleDesc tupdesc;
634 int i;
637 * Check for self-containment. Eventually we might be able to allow
638 * this (just return without complaint, if so) but it's not clear how
639 * many other places would require anti-recursion defenses before it
640 * would be safe to allow tables to contain their own rowtype.
642 if (list_member_oid(containing_rowtypes, atttypid))
643 ereport(ERROR,
644 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
645 errmsg("composite type %s cannot be made a member of itself",
646 format_type_be(atttypid))));
648 containing_rowtypes = lappend_oid(containing_rowtypes, atttypid);
650 relation = relation_open(get_typ_typrelid(atttypid), AccessShareLock);
652 tupdesc = RelationGetDescr(relation);
654 for (i = 0; i < tupdesc->natts; i++)
656 Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
658 if (attr->attisdropped)
659 continue;
660 CheckAttributeType(NameStr(attr->attname),
661 attr->atttypid, attr->attcollation,
662 containing_rowtypes,
663 flags & ~CHKATYPE_IS_PARTKEY);
666 relation_close(relation, AccessShareLock);
668 containing_rowtypes = list_delete_last(containing_rowtypes);
670 else if (att_typtype == TYPTYPE_RANGE)
673 * If it's a range, recurse to check its subtype.
675 CheckAttributeType(attname, get_range_subtype(atttypid),
676 get_range_collation(atttypid),
677 containing_rowtypes,
678 flags);
680 else if (OidIsValid((att_typelem = get_element_type(atttypid))))
683 * Must recurse into array types, too, in case they are composite.
685 CheckAttributeType(attname, att_typelem, attcollation,
686 containing_rowtypes,
687 flags);
691 * This might not be strictly invalid per SQL standard, but it is pretty
692 * useless, and it cannot be dumped, so we must disallow it.
694 if (!OidIsValid(attcollation) && type_is_collatable(atttypid))
696 if (flags & CHKATYPE_IS_PARTKEY)
697 ereport(ERROR,
698 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
699 /* translator: first %s is an integer not a name */
700 errmsg("no collation was derived for partition key column %s with collatable type %s",
701 attname, format_type_be(atttypid)),
702 errhint("Use the COLLATE clause to set the collation explicitly.")));
703 else
704 ereport(ERROR,
705 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
706 errmsg("no collation was derived for column \"%s\" with collatable type %s",
707 attname, format_type_be(atttypid)),
708 errhint("Use the COLLATE clause to set the collation explicitly.")));
713 * Cap the maximum amount of bytes allocated for InsertPgAttributeTuples()
714 * slots.
716 #define MAX_PGATTRIBUTE_INSERT_BYTES 65535
719 * InsertPgAttributeTuples
720 * Construct and insert a set of tuples in pg_attribute.
722 * Caller has already opened and locked pg_attribute. tupdesc contains the
723 * attributes to insert. attcacheoff is always initialized to -1, attacl,
724 * attfdwoptions and attmissingval are always initialized to NULL. attoptions
725 * must contain the same number of elements as tupdesc, or be NULL.
727 * indstate is the index state for CatalogTupleInsertWithInfo. It can be
728 * passed as NULL, in which case we'll fetch the necessary info. (Don't do
729 * this when inserting multiple attributes, because it's a tad more
730 * expensive.)
732 * new_rel_oid is the relation OID assigned to the attributes inserted.
733 * If set to InvalidOid, the relation OID from tupdesc is used instead.
735 void
736 InsertPgAttributeTuples(Relation pg_attribute_rel,
737 TupleDesc tupdesc,
738 Oid new_rel_oid,
739 Datum *attoptions,
740 CatalogIndexState indstate)
742 TupleTableSlot **slot;
743 TupleDesc td;
744 int nslots;
745 int natts = 0;
746 int slotCount = 0;
747 bool close_index = false;
749 td = RelationGetDescr(pg_attribute_rel);
751 /* Initialize the number of slots to use */
752 nslots = Min(tupdesc->natts,
753 (MAX_PGATTRIBUTE_INSERT_BYTES / sizeof(FormData_pg_attribute)));
754 slot = palloc(sizeof(TupleTableSlot *) * nslots);
755 for (int i = 0; i < nslots; i++)
756 slot[i] = MakeSingleTupleTableSlot(td, &TTSOpsHeapTuple);
758 while (natts < tupdesc->natts)
760 Form_pg_attribute attrs = TupleDescAttr(tupdesc, natts);
762 ExecClearTuple(slot[slotCount]);
764 if (new_rel_oid != InvalidOid)
765 slot[slotCount]->tts_values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_rel_oid);
766 else
767 slot[slotCount]->tts_values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(attrs->attrelid);
769 slot[slotCount]->tts_values[Anum_pg_attribute_attname - 1] = NameGetDatum(&attrs->attname);
770 slot[slotCount]->tts_values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(attrs->atttypid);
771 slot[slotCount]->tts_values[Anum_pg_attribute_attstattarget - 1] = Int32GetDatum(attrs->attstattarget);
772 slot[slotCount]->tts_values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(attrs->attlen);
773 slot[slotCount]->tts_values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(attrs->attnum);
774 slot[slotCount]->tts_values[Anum_pg_attribute_attndims - 1] = Int32GetDatum(attrs->attndims);
775 slot[slotCount]->tts_values[Anum_pg_attribute_attcacheoff - 1] = Int32GetDatum(-1);
776 slot[slotCount]->tts_values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(attrs->atttypmod);
777 slot[slotCount]->tts_values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(attrs->attbyval);
778 slot[slotCount]->tts_values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(attrs->attstorage);
779 slot[slotCount]->tts_values[Anum_pg_attribute_attalign - 1] = CharGetDatum(attrs->attalign);
780 slot[slotCount]->tts_values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(attrs->attnotnull);
781 slot[slotCount]->tts_values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(attrs->atthasdef);
782 slot[slotCount]->tts_values[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(attrs->atthasmissing);
783 slot[slotCount]->tts_values[Anum_pg_attribute_attidentity - 1] = CharGetDatum(attrs->attidentity);
784 slot[slotCount]->tts_values[Anum_pg_attribute_attgenerated - 1] = CharGetDatum(attrs->attgenerated);
785 slot[slotCount]->tts_values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(attrs->attisdropped);
786 slot[slotCount]->tts_values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(attrs->attislocal);
787 slot[slotCount]->tts_values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(attrs->attinhcount);
788 slot[slotCount]->tts_values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(attrs->attcollation);
789 if (attoptions && attoptions[natts] != (Datum) 0)
790 slot[slotCount]->tts_values[Anum_pg_attribute_attoptions - 1] = attoptions[natts];
791 else
792 slot[slotCount]->tts_isnull[Anum_pg_attribute_attoptions - 1] = true;
794 /* start out with empty permissions and empty options */
795 slot[slotCount]->tts_isnull[Anum_pg_attribute_attacl - 1] = true;
796 slot[slotCount]->tts_isnull[Anum_pg_attribute_attfdwoptions - 1] = true;
797 slot[slotCount]->tts_isnull[Anum_pg_attribute_attmissingval - 1] = true;
799 ExecStoreVirtualTuple(slot[slotCount]);
800 slotCount++;
803 * If slots are full or the end of processing has been reached, insert
804 * a batch of tuples.
806 if (slotCount == nslots || natts == tupdesc->natts - 1)
808 /* fetch index info only when we know we need it */
809 if (!indstate)
811 indstate = CatalogOpenIndexes(pg_attribute_rel);
812 close_index = true;
815 /* insert the new tuples and update the indexes */
816 CatalogTuplesMultiInsertWithInfo(pg_attribute_rel, slot, slotCount,
817 indstate);
818 slotCount = 0;
821 natts++;
824 if (close_index)
825 CatalogCloseIndexes(indstate);
826 for (int i = 0; i < nslots; i++)
827 ExecDropSingleTupleTableSlot(slot[i]);
828 pfree(slot);
831 /* --------------------------------
832 * AddNewAttributeTuples
834 * this registers the new relation's schema by adding
835 * tuples to pg_attribute.
836 * --------------------------------
838 static void
839 AddNewAttributeTuples(Oid new_rel_oid,
840 TupleDesc tupdesc,
841 char relkind)
843 Relation rel;
844 CatalogIndexState indstate;
845 int natts = tupdesc->natts;
846 ObjectAddress myself,
847 referenced;
850 * open pg_attribute and its indexes.
852 rel = table_open(AttributeRelationId, RowExclusiveLock);
854 indstate = CatalogOpenIndexes(rel);
856 /* set stats detail level to a sane default */
857 for (int i = 0; i < natts; i++)
858 tupdesc->attrs[i].attstattarget = -1;
859 InsertPgAttributeTuples(rel, tupdesc, new_rel_oid, NULL, indstate);
861 /* add dependencies on their datatypes and collations */
862 for (int i = 0; i < natts; i++)
864 /* Add dependency info */
865 ObjectAddressSubSet(myself, RelationRelationId, new_rel_oid, i + 1);
866 ObjectAddressSet(referenced, TypeRelationId,
867 tupdesc->attrs[i].atttypid);
868 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
870 /* The default collation is pinned, so don't bother recording it */
871 if (OidIsValid(tupdesc->attrs[i].attcollation) &&
872 tupdesc->attrs[i].attcollation != DEFAULT_COLLATION_OID)
874 ObjectAddressSet(referenced, CollationRelationId,
875 tupdesc->attrs[i].attcollation);
876 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
881 * Next we add the system attributes. Skip OID if rel has no OIDs. Skip
882 * all for a view or type relation. We don't bother with making datatype
883 * dependencies here, since presumably all these types are pinned.
885 if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
887 TupleDesc td;
889 td = CreateTupleDesc(lengthof(SysAtt), (FormData_pg_attribute **) &SysAtt);
891 InsertPgAttributeTuples(rel, td, new_rel_oid, NULL, indstate);
892 FreeTupleDesc(td);
896 * clean up
898 CatalogCloseIndexes(indstate);
900 table_close(rel, RowExclusiveLock);
903 /* --------------------------------
904 * InsertPgClassTuple
906 * Construct and insert a new tuple in pg_class.
908 * Caller has already opened and locked pg_class.
909 * Tuple data is taken from new_rel_desc->rd_rel, except for the
910 * variable-width fields which are not present in a cached reldesc.
911 * relacl and reloptions are passed in Datum form (to avoid having
912 * to reference the data types in heap.h). Pass (Datum) 0 to set them
913 * to NULL.
914 * --------------------------------
916 void
917 InsertPgClassTuple(Relation pg_class_desc,
918 Relation new_rel_desc,
919 Oid new_rel_oid,
920 Datum relacl,
921 Datum reloptions)
923 Form_pg_class rd_rel = new_rel_desc->rd_rel;
924 Datum values[Natts_pg_class];
925 bool nulls[Natts_pg_class];
926 HeapTuple tup;
928 /* This is a tad tedious, but way cleaner than what we used to do... */
929 memset(values, 0, sizeof(values));
930 memset(nulls, false, sizeof(nulls));
932 values[Anum_pg_class_oid - 1] = ObjectIdGetDatum(new_rel_oid);
933 values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
934 values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
935 values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
936 values[Anum_pg_class_reloftype - 1] = ObjectIdGetDatum(rd_rel->reloftype);
937 values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
938 values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
939 values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
940 values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(rd_rel->reltablespace);
941 values[Anum_pg_class_relpages - 1] = Int32GetDatum(rd_rel->relpages);
942 values[Anum_pg_class_reltuples - 1] = Float4GetDatum(rd_rel->reltuples);
943 values[Anum_pg_class_relallvisible - 1] = Int32GetDatum(rd_rel->relallvisible);
944 values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(rd_rel->reltoastrelid);
945 values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
946 values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
947 values[Anum_pg_class_relpersistence - 1] = CharGetDatum(rd_rel->relpersistence);
948 values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
949 values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
950 values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
951 values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
952 values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
953 values[Anum_pg_class_relrowsecurity - 1] = BoolGetDatum(rd_rel->relrowsecurity);
954 values[Anum_pg_class_relforcerowsecurity - 1] = BoolGetDatum(rd_rel->relforcerowsecurity);
955 values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
956 values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated);
957 values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident);
958 values[Anum_pg_class_relispartition - 1] = BoolGetDatum(rd_rel->relispartition);
959 values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite);
960 values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
961 values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
962 if (relacl != (Datum) 0)
963 values[Anum_pg_class_relacl - 1] = relacl;
964 else
965 nulls[Anum_pg_class_relacl - 1] = true;
966 if (reloptions != (Datum) 0)
967 values[Anum_pg_class_reloptions - 1] = reloptions;
968 else
969 nulls[Anum_pg_class_reloptions - 1] = true;
971 /* relpartbound is set by updating this tuple, if necessary */
972 nulls[Anum_pg_class_relpartbound - 1] = true;
974 tup = heap_form_tuple(RelationGetDescr(pg_class_desc), values, nulls);
976 /* finally insert the new tuple, update the indexes, and clean up */
977 CatalogTupleInsert(pg_class_desc, tup);
979 heap_freetuple(tup);
982 /* --------------------------------
983 * AddNewRelationTuple
985 * this registers the new relation in the catalogs by
986 * adding a tuple to pg_class.
987 * --------------------------------
989 static void
990 AddNewRelationTuple(Relation pg_class_desc,
991 Relation new_rel_desc,
992 Oid new_rel_oid,
993 Oid new_type_oid,
994 Oid reloftype,
995 Oid relowner,
996 char relkind,
997 TransactionId relfrozenxid,
998 TransactionId relminmxid,
999 Datum relacl,
1000 Datum reloptions)
1002 Form_pg_class new_rel_reltup;
1005 * first we update some of the information in our uncataloged relation's
1006 * relation descriptor.
1008 new_rel_reltup = new_rel_desc->rd_rel;
1010 switch (relkind)
1012 case RELKIND_RELATION:
1013 case RELKIND_MATVIEW:
1014 case RELKIND_INDEX:
1015 case RELKIND_TOASTVALUE:
1016 /* The relation is real, but as yet empty */
1017 new_rel_reltup->relpages = 0;
1018 new_rel_reltup->reltuples = 0;
1019 new_rel_reltup->relallvisible = 0;
1020 break;
1021 case RELKIND_SEQUENCE:
1022 /* Sequences always have a known size */
1023 new_rel_reltup->relpages = 1;
1024 new_rel_reltup->reltuples = 1;
1025 new_rel_reltup->relallvisible = 0;
1026 break;
1027 default:
1028 /* Views, etc, have no disk storage */
1029 new_rel_reltup->relpages = 0;
1030 new_rel_reltup->reltuples = 0;
1031 new_rel_reltup->relallvisible = 0;
1032 break;
1035 new_rel_reltup->relfrozenxid = relfrozenxid;
1036 new_rel_reltup->relminmxid = relminmxid;
1037 new_rel_reltup->relowner = relowner;
1038 new_rel_reltup->reltype = new_type_oid;
1039 new_rel_reltup->reloftype = reloftype;
1041 /* relispartition is always set by updating this tuple later */
1042 new_rel_reltup->relispartition = false;
1044 /* fill rd_att's type ID with something sane even if reltype is zero */
1045 new_rel_desc->rd_att->tdtypeid = new_type_oid ? new_type_oid : RECORDOID;
1046 new_rel_desc->rd_att->tdtypmod = -1;
1048 /* Now build and insert the tuple */
1049 InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid,
1050 relacl, reloptions);
1054 /* --------------------------------
1055 * AddNewRelationType -
1057 * define a composite type corresponding to the new relation
1058 * --------------------------------
1060 static ObjectAddress
1061 AddNewRelationType(const char *typeName,
1062 Oid typeNamespace,
1063 Oid new_rel_oid,
1064 char new_rel_kind,
1065 Oid ownerid,
1066 Oid new_row_type,
1067 Oid new_array_type)
1069 return
1070 TypeCreate(new_row_type, /* optional predetermined OID */
1071 typeName, /* type name */
1072 typeNamespace, /* type namespace */
1073 new_rel_oid, /* relation oid */
1074 new_rel_kind, /* relation kind */
1075 ownerid, /* owner's ID */
1076 -1, /* internal size (varlena) */
1077 TYPTYPE_COMPOSITE, /* type-type (composite) */
1078 TYPCATEGORY_COMPOSITE, /* type-category (ditto) */
1079 false, /* composite types are never preferred */
1080 DEFAULT_TYPDELIM, /* default array delimiter */
1081 F_RECORD_IN, /* input procedure */
1082 F_RECORD_OUT, /* output procedure */
1083 F_RECORD_RECV, /* receive procedure */
1084 F_RECORD_SEND, /* send procedure */
1085 InvalidOid, /* typmodin procedure - none */
1086 InvalidOid, /* typmodout procedure - none */
1087 InvalidOid, /* analyze procedure - default */
1088 InvalidOid, /* array element type - irrelevant */
1089 false, /* this is not an array type */
1090 new_array_type, /* array type if any */
1091 InvalidOid, /* domain base type - irrelevant */
1092 NULL, /* default value - none */
1093 NULL, /* default binary representation */
1094 false, /* passed by reference */
1095 TYPALIGN_DOUBLE, /* alignment - must be the largest! */
1096 TYPSTORAGE_EXTENDED, /* fully TOASTable */
1097 -1, /* typmod */
1098 0, /* array dimensions for typBaseType */
1099 false, /* Type NOT NULL */
1100 InvalidOid); /* rowtypes never have a collation */
1103 /* --------------------------------
1104 * heap_create_with_catalog
1106 * creates a new cataloged relation. see comments above.
1108 * Arguments:
1109 * relname: name to give to new rel
1110 * relnamespace: OID of namespace it goes in
1111 * reltablespace: OID of tablespace it goes in
1112 * relid: OID to assign to new rel, or InvalidOid to select a new OID
1113 * reltypeid: OID to assign to rel's rowtype, or InvalidOid to select one
1114 * reloftypeid: if a typed table, OID of underlying type; else InvalidOid
1115 * ownerid: OID of new rel's owner
1116 * tupdesc: tuple descriptor (source of column definitions)
1117 * cooked_constraints: list of precooked check constraints and defaults
1118 * relkind: relkind for new rel
1119 * relpersistence: rel's persistence status (permanent, temp, or unlogged)
1120 * shared_relation: true if it's to be a shared relation
1121 * mapped_relation: true if the relation will use the relfilenode map
1122 * oncommit: ON COMMIT marking (only relevant if it's a temp table)
1123 * reloptions: reloptions in Datum form, or (Datum) 0 if none
1124 * use_user_acl: true if should look for user-defined default permissions;
1125 * if false, relacl is always set NULL
1126 * allow_system_table_mods: true to allow creation in system namespaces
1127 * is_internal: is this a system-generated catalog?
1129 * Output parameters:
1130 * typaddress: if not null, gets the object address of the new pg_type entry
1131 * (this must be null if the relkind is one that doesn't get a pg_type entry)
1133 * Returns the OID of the new relation
1134 * --------------------------------
1137 heap_create_with_catalog(const char *relname,
1138 Oid relnamespace,
1139 Oid reltablespace,
1140 Oid relid,
1141 Oid reltypeid,
1142 Oid reloftypeid,
1143 Oid ownerid,
1144 Oid accessmtd,
1145 TupleDesc tupdesc,
1146 List *cooked_constraints,
1147 char relkind,
1148 char relpersistence,
1149 bool shared_relation,
1150 bool mapped_relation,
1151 OnCommitAction oncommit,
1152 Datum reloptions,
1153 bool use_user_acl,
1154 bool allow_system_table_mods,
1155 bool is_internal,
1156 Oid relrewrite,
1157 ObjectAddress *typaddress)
1159 Relation pg_class_desc;
1160 Relation new_rel_desc;
1161 Acl *relacl;
1162 Oid existing_relid;
1163 Oid old_type_oid;
1164 Oid new_type_oid;
1165 TransactionId relfrozenxid;
1166 MultiXactId relminmxid;
1168 pg_class_desc = table_open(RelationRelationId, RowExclusiveLock);
1171 * sanity checks
1173 Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
1176 * Validate proposed tupdesc for the desired relkind. If
1177 * allow_system_table_mods is on, allow ANYARRAY to be used; this is a
1178 * hack to allow creating pg_statistic and cloning it during VACUUM FULL.
1180 CheckAttributeNamesTypes(tupdesc, relkind,
1181 allow_system_table_mods ? CHKATYPE_ANYARRAY : 0);
1184 * This would fail later on anyway, if the relation already exists. But
1185 * by catching it here we can emit a nicer error message.
1187 existing_relid = get_relname_relid(relname, relnamespace);
1188 if (existing_relid != InvalidOid)
1189 ereport(ERROR,
1190 (errcode(ERRCODE_DUPLICATE_TABLE),
1191 errmsg("relation \"%s\" already exists", relname)));
1194 * Since we are going to create a rowtype as well, also check for
1195 * collision with an existing type name. If there is one and it's an
1196 * autogenerated array, we can rename it out of the way; otherwise we can
1197 * at least give a good error message.
1199 old_type_oid = GetSysCacheOid2(TYPENAMENSP, Anum_pg_type_oid,
1200 CStringGetDatum(relname),
1201 ObjectIdGetDatum(relnamespace));
1202 if (OidIsValid(old_type_oid))
1204 if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
1205 ereport(ERROR,
1206 (errcode(ERRCODE_DUPLICATE_OBJECT),
1207 errmsg("type \"%s\" already exists", relname),
1208 errhint("A relation has an associated type of the same name, "
1209 "so you must use a name that doesn't conflict "
1210 "with any existing type.")));
1214 * Shared relations must be in pg_global (last-ditch check)
1216 if (shared_relation && reltablespace != GLOBALTABLESPACE_OID)
1217 elog(ERROR, "shared relations must be placed in pg_global tablespace");
1220 * Allocate an OID for the relation, unless we were told what to use.
1222 * The OID will be the relfilenode as well, so make sure it doesn't
1223 * collide with either pg_class OIDs or existing physical files.
1225 if (!OidIsValid(relid))
1227 /* Use binary-upgrade override for pg_class.oid/relfilenode? */
1228 if (IsBinaryUpgrade &&
1229 (relkind == RELKIND_RELATION || relkind == RELKIND_SEQUENCE ||
1230 relkind == RELKIND_VIEW || relkind == RELKIND_MATVIEW ||
1231 relkind == RELKIND_COMPOSITE_TYPE || relkind == RELKIND_FOREIGN_TABLE ||
1232 relkind == RELKIND_PARTITIONED_TABLE))
1234 if (!OidIsValid(binary_upgrade_next_heap_pg_class_oid))
1235 ereport(ERROR,
1236 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1237 errmsg("pg_class heap OID value not set when in binary upgrade mode")));
1239 relid = binary_upgrade_next_heap_pg_class_oid;
1240 binary_upgrade_next_heap_pg_class_oid = InvalidOid;
1242 /* There might be no TOAST table, so we have to test for it. */
1243 else if (IsBinaryUpgrade &&
1244 OidIsValid(binary_upgrade_next_toast_pg_class_oid) &&
1245 relkind == RELKIND_TOASTVALUE)
1247 relid = binary_upgrade_next_toast_pg_class_oid;
1248 binary_upgrade_next_toast_pg_class_oid = InvalidOid;
1250 else
1251 relid = GetNewRelFileNode(reltablespace, pg_class_desc,
1252 relpersistence);
1256 * Determine the relation's initial permissions.
1258 if (use_user_acl)
1260 switch (relkind)
1262 case RELKIND_RELATION:
1263 case RELKIND_VIEW:
1264 case RELKIND_MATVIEW:
1265 case RELKIND_FOREIGN_TABLE:
1266 case RELKIND_PARTITIONED_TABLE:
1267 relacl = get_user_default_acl(OBJECT_TABLE, ownerid,
1268 relnamespace);
1269 break;
1270 case RELKIND_SEQUENCE:
1271 relacl = get_user_default_acl(OBJECT_SEQUENCE, ownerid,
1272 relnamespace);
1273 break;
1274 default:
1275 relacl = NULL;
1276 break;
1279 else
1280 relacl = NULL;
1283 * Create the relcache entry (mostly dummy at this point) and the physical
1284 * disk file. (If we fail further down, it's the smgr's responsibility to
1285 * remove the disk file again.)
1287 new_rel_desc = heap_create(relname,
1288 relnamespace,
1289 reltablespace,
1290 relid,
1291 InvalidOid,
1292 accessmtd,
1293 tupdesc,
1294 relkind,
1295 relpersistence,
1296 shared_relation,
1297 mapped_relation,
1298 allow_system_table_mods,
1299 &relfrozenxid,
1300 &relminmxid);
1302 Assert(relid == RelationGetRelid(new_rel_desc));
1304 new_rel_desc->rd_rel->relrewrite = relrewrite;
1307 * Decide whether to create a pg_type entry for the relation's rowtype.
1308 * These types are made except where the use of a relation as such is an
1309 * implementation detail: toast tables, sequences and indexes.
1311 if (!(relkind == RELKIND_SEQUENCE ||
1312 relkind == RELKIND_TOASTVALUE ||
1313 relkind == RELKIND_INDEX ||
1314 relkind == RELKIND_PARTITIONED_INDEX))
1316 Oid new_array_oid;
1317 ObjectAddress new_type_addr;
1318 char *relarrayname;
1321 * We'll make an array over the composite type, too. For largely
1322 * historical reasons, the array type's OID is assigned first.
1324 new_array_oid = AssignTypeArrayOid();
1327 * Make the pg_type entry for the composite type. The OID of the
1328 * composite type can be preselected by the caller, but if reltypeid
1329 * is InvalidOid, we'll generate a new OID for it.
1331 * NOTE: we could get a unique-index failure here, in case someone
1332 * else is creating the same type name in parallel but hadn't
1333 * committed yet when we checked for a duplicate name above.
1335 new_type_addr = AddNewRelationType(relname,
1336 relnamespace,
1337 relid,
1338 relkind,
1339 ownerid,
1340 reltypeid,
1341 new_array_oid);
1342 new_type_oid = new_type_addr.objectId;
1343 if (typaddress)
1344 *typaddress = new_type_addr;
1346 /* Now create the array type. */
1347 relarrayname = makeArrayTypeName(relname, relnamespace);
1349 TypeCreate(new_array_oid, /* force the type's OID to this */
1350 relarrayname, /* Array type name */
1351 relnamespace, /* Same namespace as parent */
1352 InvalidOid, /* Not composite, no relationOid */
1353 0, /* relkind, also N/A here */
1354 ownerid, /* owner's ID */
1355 -1, /* Internal size (varlena) */
1356 TYPTYPE_BASE, /* Not composite - typelem is */
1357 TYPCATEGORY_ARRAY, /* type-category (array) */
1358 false, /* array types are never preferred */
1359 DEFAULT_TYPDELIM, /* default array delimiter */
1360 F_ARRAY_IN, /* array input proc */
1361 F_ARRAY_OUT, /* array output proc */
1362 F_ARRAY_RECV, /* array recv (bin) proc */
1363 F_ARRAY_SEND, /* array send (bin) proc */
1364 InvalidOid, /* typmodin procedure - none */
1365 InvalidOid, /* typmodout procedure - none */
1366 F_ARRAY_TYPANALYZE, /* array analyze procedure */
1367 new_type_oid, /* array element type - the rowtype */
1368 true, /* yes, this is an array type */
1369 InvalidOid, /* this has no array type */
1370 InvalidOid, /* domain base type - irrelevant */
1371 NULL, /* default value - none */
1372 NULL, /* default binary representation */
1373 false, /* passed by reference */
1374 TYPALIGN_DOUBLE, /* alignment - must be the largest! */
1375 TYPSTORAGE_EXTENDED, /* fully TOASTable */
1376 -1, /* typmod */
1377 0, /* array dimensions for typBaseType */
1378 false, /* Type NOT NULL */
1379 InvalidOid); /* rowtypes never have a collation */
1381 pfree(relarrayname);
1383 else
1385 /* Caller should not be expecting a type to be created. */
1386 Assert(reltypeid == InvalidOid);
1387 Assert(typaddress == NULL);
1389 new_type_oid = InvalidOid;
1393 * now create an entry in pg_class for the relation.
1395 * NOTE: we could get a unique-index failure here, in case someone else is
1396 * creating the same relation name in parallel but hadn't committed yet
1397 * when we checked for a duplicate name above.
1399 AddNewRelationTuple(pg_class_desc,
1400 new_rel_desc,
1401 relid,
1402 new_type_oid,
1403 reloftypeid,
1404 ownerid,
1405 relkind,
1406 relfrozenxid,
1407 relminmxid,
1408 PointerGetDatum(relacl),
1409 reloptions);
1412 * now add tuples to pg_attribute for the attributes in our new relation.
1414 AddNewAttributeTuples(relid, new_rel_desc->rd_att, relkind);
1417 * Make a dependency link to force the relation to be deleted if its
1418 * namespace is. Also make a dependency link to its owner, as well as
1419 * dependencies for any roles mentioned in the default ACL.
1421 * For composite types, these dependencies are tracked for the pg_type
1422 * entry, so we needn't record them here. Likewise, TOAST tables don't
1423 * need a namespace dependency (they live in a pinned namespace) nor an
1424 * owner dependency (they depend indirectly through the parent table), nor
1425 * should they have any ACL entries. The same applies for extension
1426 * dependencies.
1428 * Also, skip this in bootstrap mode, since we don't make dependencies
1429 * while bootstrapping.
1431 if (relkind != RELKIND_COMPOSITE_TYPE &&
1432 relkind != RELKIND_TOASTVALUE &&
1433 !IsBootstrapProcessingMode())
1435 ObjectAddress myself,
1436 referenced;
1438 myself.classId = RelationRelationId;
1439 myself.objectId = relid;
1440 myself.objectSubId = 0;
1442 referenced.classId = NamespaceRelationId;
1443 referenced.objectId = relnamespace;
1444 referenced.objectSubId = 0;
1445 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1447 recordDependencyOnOwner(RelationRelationId, relid, ownerid);
1449 recordDependencyOnNewAcl(RelationRelationId, relid, 0, ownerid, relacl);
1451 recordDependencyOnCurrentExtension(&myself, false);
1453 if (reloftypeid)
1455 referenced.classId = TypeRelationId;
1456 referenced.objectId = reloftypeid;
1457 referenced.objectSubId = 0;
1458 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1462 * Make a dependency link to force the relation to be deleted if its
1463 * access method is. Do this only for relation and materialized views.
1465 * No need to add an explicit dependency for the toast table, as the
1466 * main table depends on it.
1468 if (relkind == RELKIND_RELATION ||
1469 relkind == RELKIND_MATVIEW)
1471 referenced.classId = AccessMethodRelationId;
1472 referenced.objectId = accessmtd;
1473 referenced.objectSubId = 0;
1474 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1478 /* Post creation hook for new relation */
1479 InvokeObjectPostCreateHookArg(RelationRelationId, relid, 0, is_internal);
1482 * Store any supplied constraints and defaults.
1484 * NB: this may do a CommandCounterIncrement and rebuild the relcache
1485 * entry, so the relation must be valid and self-consistent at this point.
1486 * In particular, there are not yet constraints and defaults anywhere.
1488 StoreConstraints(new_rel_desc, cooked_constraints, is_internal);
1491 * If there's a special on-commit action, remember it
1493 if (oncommit != ONCOMMIT_NOOP)
1494 register_on_commit_action(relid, oncommit);
1497 * ok, the relation has been cataloged, so close our relations and return
1498 * the OID of the newly created relation.
1500 table_close(new_rel_desc, NoLock); /* do not unlock till end of xact */
1501 table_close(pg_class_desc, RowExclusiveLock);
1503 return relid;
1507 * RelationRemoveInheritance
1509 * Formerly, this routine checked for child relations and aborted the
1510 * deletion if any were found. Now we rely on the dependency mechanism
1511 * to check for or delete child relations. By the time we get here,
1512 * there are no children and we need only remove any pg_inherits rows
1513 * linking this relation to its parent(s).
1515 static void
1516 RelationRemoveInheritance(Oid relid)
1518 Relation catalogRelation;
1519 SysScanDesc scan;
1520 ScanKeyData key;
1521 HeapTuple tuple;
1523 catalogRelation = table_open(InheritsRelationId, RowExclusiveLock);
1525 ScanKeyInit(&key,
1526 Anum_pg_inherits_inhrelid,
1527 BTEqualStrategyNumber, F_OIDEQ,
1528 ObjectIdGetDatum(relid));
1530 scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true,
1531 NULL, 1, &key);
1533 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1534 CatalogTupleDelete(catalogRelation, &tuple->t_self);
1536 systable_endscan(scan);
1537 table_close(catalogRelation, RowExclusiveLock);
1541 * DeleteRelationTuple
1543 * Remove pg_class row for the given relid.
1545 * Note: this is shared by relation deletion and index deletion. It's
1546 * not intended for use anyplace else.
1548 void
1549 DeleteRelationTuple(Oid relid)
1551 Relation pg_class_desc;
1552 HeapTuple tup;
1554 /* Grab an appropriate lock on the pg_class relation */
1555 pg_class_desc = table_open(RelationRelationId, RowExclusiveLock);
1557 tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1558 if (!HeapTupleIsValid(tup))
1559 elog(ERROR, "cache lookup failed for relation %u", relid);
1561 /* delete the relation tuple from pg_class, and finish up */
1562 CatalogTupleDelete(pg_class_desc, &tup->t_self);
1564 ReleaseSysCache(tup);
1566 table_close(pg_class_desc, RowExclusiveLock);
1570 * DeleteAttributeTuples
1572 * Remove pg_attribute rows for the given relid.
1574 * Note: this is shared by relation deletion and index deletion. It's
1575 * not intended for use anyplace else.
1577 void
1578 DeleteAttributeTuples(Oid relid)
1580 Relation attrel;
1581 SysScanDesc scan;
1582 ScanKeyData key[1];
1583 HeapTuple atttup;
1585 /* Grab an appropriate lock on the pg_attribute relation */
1586 attrel = table_open(AttributeRelationId, RowExclusiveLock);
1588 /* Use the index to scan only attributes of the target relation */
1589 ScanKeyInit(&key[0],
1590 Anum_pg_attribute_attrelid,
1591 BTEqualStrategyNumber, F_OIDEQ,
1592 ObjectIdGetDatum(relid));
1594 scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1595 NULL, 1, key);
1597 /* Delete all the matching tuples */
1598 while ((atttup = systable_getnext(scan)) != NULL)
1599 CatalogTupleDelete(attrel, &atttup->t_self);
1601 /* Clean up after the scan */
1602 systable_endscan(scan);
1603 table_close(attrel, RowExclusiveLock);
1607 * DeleteSystemAttributeTuples
1609 * Remove pg_attribute rows for system columns of the given relid.
1611 * Note: this is only used when converting a table to a view. Views don't
1612 * have system columns, so we should remove them from pg_attribute.
1614 void
1615 DeleteSystemAttributeTuples(Oid relid)
1617 Relation attrel;
1618 SysScanDesc scan;
1619 ScanKeyData key[2];
1620 HeapTuple atttup;
1622 /* Grab an appropriate lock on the pg_attribute relation */
1623 attrel = table_open(AttributeRelationId, RowExclusiveLock);
1625 /* Use the index to scan only system attributes of the target relation */
1626 ScanKeyInit(&key[0],
1627 Anum_pg_attribute_attrelid,
1628 BTEqualStrategyNumber, F_OIDEQ,
1629 ObjectIdGetDatum(relid));
1630 ScanKeyInit(&key[1],
1631 Anum_pg_attribute_attnum,
1632 BTLessEqualStrategyNumber, F_INT2LE,
1633 Int16GetDatum(0));
1635 scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1636 NULL, 2, key);
1638 /* Delete all the matching tuples */
1639 while ((atttup = systable_getnext(scan)) != NULL)
1640 CatalogTupleDelete(attrel, &atttup->t_self);
1642 /* Clean up after the scan */
1643 systable_endscan(scan);
1644 table_close(attrel, RowExclusiveLock);
1648 * RemoveAttributeById
1650 * This is the guts of ALTER TABLE DROP COLUMN: actually mark the attribute
1651 * deleted in pg_attribute. We also remove pg_statistic entries for it.
1652 * (Everything else needed, such as getting rid of any pg_attrdef entry,
1653 * is handled by dependency.c.)
1655 void
1656 RemoveAttributeById(Oid relid, AttrNumber attnum)
1658 Relation rel;
1659 Relation attr_rel;
1660 HeapTuple tuple;
1661 Form_pg_attribute attStruct;
1662 char newattname[NAMEDATALEN];
1665 * Grab an exclusive lock on the target table, which we will NOT release
1666 * until end of transaction. (In the simple case where we are directly
1667 * dropping this column, ATExecDropColumn already did this ... but when
1668 * cascading from a drop of some other object, we may not have any lock.)
1670 rel = relation_open(relid, AccessExclusiveLock);
1672 attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
1674 tuple = SearchSysCacheCopy2(ATTNUM,
1675 ObjectIdGetDatum(relid),
1676 Int16GetDatum(attnum));
1677 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
1678 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1679 attnum, relid);
1680 attStruct = (Form_pg_attribute) GETSTRUCT(tuple);
1682 if (attnum < 0)
1684 /* System attribute (probably OID) ... just delete the row */
1686 CatalogTupleDelete(attr_rel, &tuple->t_self);
1688 else
1690 /* Dropping user attributes is lots harder */
1692 /* Mark the attribute as dropped */
1693 attStruct->attisdropped = true;
1696 * Set the type OID to invalid. A dropped attribute's type link
1697 * cannot be relied on (once the attribute is dropped, the type might
1698 * be too). Fortunately we do not need the type row --- the only
1699 * really essential information is the type's typlen and typalign,
1700 * which are preserved in the attribute's attlen and attalign. We set
1701 * atttypid to zero here as a means of catching code that incorrectly
1702 * expects it to be valid.
1704 attStruct->atttypid = InvalidOid;
1706 /* Remove any NOT NULL constraint the column may have */
1707 attStruct->attnotnull = false;
1709 /* We don't want to keep stats for it anymore */
1710 attStruct->attstattarget = 0;
1712 /* Unset this so no one tries to look up the generation expression */
1713 attStruct->attgenerated = '\0';
1716 * Change the column name to something that isn't likely to conflict
1718 snprintf(newattname, sizeof(newattname),
1719 "........pg.dropped.%d........", attnum);
1720 namestrcpy(&(attStruct->attname), newattname);
1722 /* clear the missing value if any */
1723 if (attStruct->atthasmissing)
1725 Datum valuesAtt[Natts_pg_attribute];
1726 bool nullsAtt[Natts_pg_attribute];
1727 bool replacesAtt[Natts_pg_attribute];
1729 /* update the tuple - set atthasmissing and attmissingval */
1730 MemSet(valuesAtt, 0, sizeof(valuesAtt));
1731 MemSet(nullsAtt, false, sizeof(nullsAtt));
1732 MemSet(replacesAtt, false, sizeof(replacesAtt));
1734 valuesAtt[Anum_pg_attribute_atthasmissing - 1] =
1735 BoolGetDatum(false);
1736 replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
1737 valuesAtt[Anum_pg_attribute_attmissingval - 1] = (Datum) 0;
1738 nullsAtt[Anum_pg_attribute_attmissingval - 1] = true;
1739 replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
1741 tuple = heap_modify_tuple(tuple, RelationGetDescr(attr_rel),
1742 valuesAtt, nullsAtt, replacesAtt);
1745 CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
1749 * Because updating the pg_attribute row will trigger a relcache flush for
1750 * the target relation, we need not do anything else to notify other
1751 * backends of the change.
1754 table_close(attr_rel, RowExclusiveLock);
1756 if (attnum > 0)
1757 RemoveStatistics(relid, attnum);
1759 relation_close(rel, NoLock);
1763 * RemoveAttrDefault
1765 * If the specified relation/attribute has a default, remove it.
1766 * (If no default, raise error if complain is true, else return quietly.)
1768 void
1769 RemoveAttrDefault(Oid relid, AttrNumber attnum,
1770 DropBehavior behavior, bool complain, bool internal)
1772 Relation attrdef_rel;
1773 ScanKeyData scankeys[2];
1774 SysScanDesc scan;
1775 HeapTuple tuple;
1776 bool found = false;
1778 attrdef_rel = table_open(AttrDefaultRelationId, RowExclusiveLock);
1780 ScanKeyInit(&scankeys[0],
1781 Anum_pg_attrdef_adrelid,
1782 BTEqualStrategyNumber, F_OIDEQ,
1783 ObjectIdGetDatum(relid));
1784 ScanKeyInit(&scankeys[1],
1785 Anum_pg_attrdef_adnum,
1786 BTEqualStrategyNumber, F_INT2EQ,
1787 Int16GetDatum(attnum));
1789 scan = systable_beginscan(attrdef_rel, AttrDefaultIndexId, true,
1790 NULL, 2, scankeys);
1792 /* There should be at most one matching tuple, but we loop anyway */
1793 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1795 ObjectAddress object;
1796 Form_pg_attrdef attrtuple = (Form_pg_attrdef) GETSTRUCT(tuple);
1798 object.classId = AttrDefaultRelationId;
1799 object.objectId = attrtuple->oid;
1800 object.objectSubId = 0;
1802 performDeletion(&object, behavior,
1803 internal ? PERFORM_DELETION_INTERNAL : 0);
1805 found = true;
1808 systable_endscan(scan);
1809 table_close(attrdef_rel, RowExclusiveLock);
1811 if (complain && !found)
1812 elog(ERROR, "could not find attrdef tuple for relation %u attnum %d",
1813 relid, attnum);
1817 * RemoveAttrDefaultById
1819 * Remove a pg_attrdef entry specified by OID. This is the guts of
1820 * attribute-default removal. Note it should be called via performDeletion,
1821 * not directly.
1823 void
1824 RemoveAttrDefaultById(Oid attrdefId)
1826 Relation attrdef_rel;
1827 Relation attr_rel;
1828 Relation myrel;
1829 ScanKeyData scankeys[1];
1830 SysScanDesc scan;
1831 HeapTuple tuple;
1832 Oid myrelid;
1833 AttrNumber myattnum;
1835 /* Grab an appropriate lock on the pg_attrdef relation */
1836 attrdef_rel = table_open(AttrDefaultRelationId, RowExclusiveLock);
1838 /* Find the pg_attrdef tuple */
1839 ScanKeyInit(&scankeys[0],
1840 Anum_pg_attrdef_oid,
1841 BTEqualStrategyNumber, F_OIDEQ,
1842 ObjectIdGetDatum(attrdefId));
1844 scan = systable_beginscan(attrdef_rel, AttrDefaultOidIndexId, true,
1845 NULL, 1, scankeys);
1847 tuple = systable_getnext(scan);
1848 if (!HeapTupleIsValid(tuple))
1849 elog(ERROR, "could not find tuple for attrdef %u", attrdefId);
1851 myrelid = ((Form_pg_attrdef) GETSTRUCT(tuple))->adrelid;
1852 myattnum = ((Form_pg_attrdef) GETSTRUCT(tuple))->adnum;
1854 /* Get an exclusive lock on the relation owning the attribute */
1855 myrel = relation_open(myrelid, AccessExclusiveLock);
1857 /* Now we can delete the pg_attrdef row */
1858 CatalogTupleDelete(attrdef_rel, &tuple->t_self);
1860 systable_endscan(scan);
1861 table_close(attrdef_rel, RowExclusiveLock);
1863 /* Fix the pg_attribute row */
1864 attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
1866 tuple = SearchSysCacheCopy2(ATTNUM,
1867 ObjectIdGetDatum(myrelid),
1868 Int16GetDatum(myattnum));
1869 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
1870 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1871 myattnum, myrelid);
1873 ((Form_pg_attribute) GETSTRUCT(tuple))->atthasdef = false;
1875 CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple);
1878 * Our update of the pg_attribute row will force a relcache rebuild, so
1879 * there's nothing else to do here.
1881 table_close(attr_rel, RowExclusiveLock);
1883 /* Keep lock on attribute's rel until end of xact */
1884 relation_close(myrel, NoLock);
1888 * heap_drop_with_catalog - removes specified relation from catalogs
1890 * Note that this routine is not responsible for dropping objects that are
1891 * linked to the pg_class entry via dependencies (for example, indexes and
1892 * constraints). Those are deleted by the dependency-tracing logic in
1893 * dependency.c before control gets here. In general, therefore, this routine
1894 * should never be called directly; go through performDeletion() instead.
1896 void
1897 heap_drop_with_catalog(Oid relid)
1899 Relation rel;
1900 HeapTuple tuple;
1901 Oid parentOid = InvalidOid,
1902 defaultPartOid = InvalidOid;
1905 * To drop a partition safely, we must grab exclusive lock on its parent,
1906 * because another backend might be about to execute a query on the parent
1907 * table. If it relies on previously cached partition descriptor, then it
1908 * could attempt to access the just-dropped relation as its partition. We
1909 * must therefore take a table lock strong enough to prevent all queries
1910 * on the table from proceeding until we commit and send out a
1911 * shared-cache-inval notice that will make them update their partition
1912 * descriptors.
1914 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1915 if (!HeapTupleIsValid(tuple))
1916 elog(ERROR, "cache lookup failed for relation %u", relid);
1917 if (((Form_pg_class) GETSTRUCT(tuple))->relispartition)
1919 parentOid = get_partition_parent(relid);
1920 LockRelationOid(parentOid, AccessExclusiveLock);
1923 * If this is not the default partition, dropping it will change the
1924 * default partition's partition constraint, so we must lock it.
1926 defaultPartOid = get_default_partition_oid(parentOid);
1927 if (OidIsValid(defaultPartOid) && relid != defaultPartOid)
1928 LockRelationOid(defaultPartOid, AccessExclusiveLock);
1931 ReleaseSysCache(tuple);
1934 * Open and lock the relation.
1936 rel = relation_open(relid, AccessExclusiveLock);
1939 * There can no longer be anyone *else* touching the relation, but we
1940 * might still have open queries or cursors, or pending trigger events, in
1941 * our own session.
1943 CheckTableNotInUse(rel, "DROP TABLE");
1946 * This effectively deletes all rows in the table, and may be done in a
1947 * serializable transaction. In that case we must record a rw-conflict in
1948 * to this transaction from each transaction holding a predicate lock on
1949 * the table.
1951 CheckTableForSerializableConflictIn(rel);
1954 * Delete pg_foreign_table tuple first.
1956 if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1958 Relation rel;
1959 HeapTuple tuple;
1961 rel = table_open(ForeignTableRelationId, RowExclusiveLock);
1963 tuple = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(relid));
1964 if (!HeapTupleIsValid(tuple))
1965 elog(ERROR, "cache lookup failed for foreign table %u", relid);
1967 CatalogTupleDelete(rel, &tuple->t_self);
1969 ReleaseSysCache(tuple);
1970 table_close(rel, RowExclusiveLock);
1974 * If a partitioned table, delete the pg_partitioned_table tuple.
1976 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1977 RemovePartitionKeyByRelId(relid);
1980 * If the relation being dropped is the default partition itself,
1981 * invalidate its entry in pg_partitioned_table.
1983 if (relid == defaultPartOid)
1984 update_default_partition_oid(parentOid, InvalidOid);
1987 * Schedule unlinking of the relation's physical files at commit.
1989 if (RELKIND_HAS_STORAGE(rel->rd_rel->relkind))
1990 RelationDropStorage(rel);
1993 * Close relcache entry, but *keep* AccessExclusiveLock on the relation
1994 * until transaction commit. This ensures no one else will try to do
1995 * something with the doomed relation.
1997 relation_close(rel, NoLock);
2000 * Remove any associated relation synchronization states.
2002 RemoveSubscriptionRel(InvalidOid, relid);
2005 * Forget any ON COMMIT action for the rel
2007 remove_on_commit_action(relid);
2010 * Flush the relation from the relcache. We want to do this before
2011 * starting to remove catalog entries, just to be certain that no relcache
2012 * entry rebuild will happen partway through. (That should not really
2013 * matter, since we don't do CommandCounterIncrement here, but let's be
2014 * safe.)
2016 RelationForgetRelation(relid);
2019 * remove inheritance information
2021 RelationRemoveInheritance(relid);
2024 * delete statistics
2026 RemoveStatistics(relid, 0);
2029 * delete attribute tuples
2031 DeleteAttributeTuples(relid);
2034 * delete relation tuple
2036 DeleteRelationTuple(relid);
2038 if (OidIsValid(parentOid))
2041 * If this is not the default partition, the partition constraint of
2042 * the default partition has changed to include the portion of the key
2043 * space previously covered by the dropped partition.
2045 if (OidIsValid(defaultPartOid) && relid != defaultPartOid)
2046 CacheInvalidateRelcacheByRelid(defaultPartOid);
2049 * Invalidate the parent's relcache so that the partition is no longer
2050 * included in its partition descriptor.
2052 CacheInvalidateRelcacheByRelid(parentOid);
2053 /* keep the lock */
2059 * RelationClearMissing
2061 * Set atthasmissing and attmissingval to false/null for all attributes
2062 * where they are currently set. This can be safely and usefully done if
2063 * the table is rewritten (e.g. by VACUUM FULL or CLUSTER) where we know there
2064 * are no rows left with less than a full complement of attributes.
2066 * The caller must have an AccessExclusive lock on the relation.
2068 void
2069 RelationClearMissing(Relation rel)
2071 Relation attr_rel;
2072 Oid relid = RelationGetRelid(rel);
2073 int natts = RelationGetNumberOfAttributes(rel);
2074 int attnum;
2075 Datum repl_val[Natts_pg_attribute];
2076 bool repl_null[Natts_pg_attribute];
2077 bool repl_repl[Natts_pg_attribute];
2078 Form_pg_attribute attrtuple;
2079 HeapTuple tuple,
2080 newtuple;
2082 memset(repl_val, 0, sizeof(repl_val));
2083 memset(repl_null, false, sizeof(repl_null));
2084 memset(repl_repl, false, sizeof(repl_repl));
2086 repl_val[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(false);
2087 repl_null[Anum_pg_attribute_attmissingval - 1] = true;
2089 repl_repl[Anum_pg_attribute_atthasmissing - 1] = true;
2090 repl_repl[Anum_pg_attribute_attmissingval - 1] = true;
2093 /* Get a lock on pg_attribute */
2094 attr_rel = table_open(AttributeRelationId, RowExclusiveLock);
2096 /* process each non-system attribute, including any dropped columns */
2097 for (attnum = 1; attnum <= natts; attnum++)
2099 tuple = SearchSysCache2(ATTNUM,
2100 ObjectIdGetDatum(relid),
2101 Int16GetDatum(attnum));
2102 if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
2103 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
2104 attnum, relid);
2106 attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
2108 /* ignore any where atthasmissing is not true */
2109 if (attrtuple->atthasmissing)
2111 newtuple = heap_modify_tuple(tuple, RelationGetDescr(attr_rel),
2112 repl_val, repl_null, repl_repl);
2114 CatalogTupleUpdate(attr_rel, &newtuple->t_self, newtuple);
2116 heap_freetuple(newtuple);
2119 ReleaseSysCache(tuple);
2123 * Our update of the pg_attribute rows will force a relcache rebuild, so
2124 * there's nothing else to do here.
2126 table_close(attr_rel, RowExclusiveLock);
2130 * SetAttrMissing
2132 * Set the missing value of a single attribute. This should only be used by
2133 * binary upgrade. Takes an AccessExclusive lock on the relation owning the
2134 * attribute.
2136 void
2137 SetAttrMissing(Oid relid, char *attname, char *value)
2139 Datum valuesAtt[Natts_pg_attribute];
2140 bool nullsAtt[Natts_pg_attribute];
2141 bool replacesAtt[Natts_pg_attribute];
2142 Datum missingval;
2143 Form_pg_attribute attStruct;
2144 Relation attrrel,
2145 tablerel;
2146 HeapTuple atttup,
2147 newtup;
2149 /* lock the table the attribute belongs to */
2150 tablerel = table_open(relid, AccessExclusiveLock);
2152 /* Lock the attribute row and get the data */
2153 attrrel = table_open(AttributeRelationId, RowExclusiveLock);
2154 atttup = SearchSysCacheAttName(relid, attname);
2155 if (!HeapTupleIsValid(atttup))
2156 elog(ERROR, "cache lookup failed for attribute %s of relation %u",
2157 attname, relid);
2158 attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
2160 /* get an array value from the value string */
2161 missingval = OidFunctionCall3(F_ARRAY_IN,
2162 CStringGetDatum(value),
2163 ObjectIdGetDatum(attStruct->atttypid),
2164 Int32GetDatum(attStruct->atttypmod));
2166 /* update the tuple - set atthasmissing and attmissingval */
2167 MemSet(valuesAtt, 0, sizeof(valuesAtt));
2168 MemSet(nullsAtt, false, sizeof(nullsAtt));
2169 MemSet(replacesAtt, false, sizeof(replacesAtt));
2171 valuesAtt[Anum_pg_attribute_atthasmissing - 1] = BoolGetDatum(true);
2172 replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
2173 valuesAtt[Anum_pg_attribute_attmissingval - 1] = missingval;
2174 replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
2176 newtup = heap_modify_tuple(atttup, RelationGetDescr(attrrel),
2177 valuesAtt, nullsAtt, replacesAtt);
2178 CatalogTupleUpdate(attrrel, &newtup->t_self, newtup);
2180 /* clean up */
2181 ReleaseSysCache(atttup);
2182 table_close(attrrel, RowExclusiveLock);
2183 table_close(tablerel, AccessExclusiveLock);
2187 * Store a default expression for column attnum of relation rel.
2189 * Returns the OID of the new pg_attrdef tuple.
2191 * add_column_mode must be true if we are storing the default for a new
2192 * attribute, and false if it's for an already existing attribute. The reason
2193 * for this is that the missing value must never be updated after it is set,
2194 * which can only be when a column is added to the table. Otherwise we would
2195 * in effect be changing existing tuples.
2198 StoreAttrDefault(Relation rel, AttrNumber attnum,
2199 Node *expr, bool is_internal, bool add_column_mode)
2201 char *adbin;
2202 Relation adrel;
2203 HeapTuple tuple;
2204 Datum values[4];
2205 static bool nulls[4] = {false, false, false, false};
2206 Relation attrrel;
2207 HeapTuple atttup;
2208 Form_pg_attribute attStruct;
2209 char attgenerated;
2210 Oid attrdefOid;
2211 ObjectAddress colobject,
2212 defobject;
2214 adrel = table_open(AttrDefaultRelationId, RowExclusiveLock);
2217 * Flatten expression to string form for storage.
2219 adbin = nodeToString(expr);
2222 * Make the pg_attrdef entry.
2224 attrdefOid = GetNewOidWithIndex(adrel, AttrDefaultOidIndexId,
2225 Anum_pg_attrdef_oid);
2226 values[Anum_pg_attrdef_oid - 1] = ObjectIdGetDatum(attrdefOid);
2227 values[Anum_pg_attrdef_adrelid - 1] = RelationGetRelid(rel);
2228 values[Anum_pg_attrdef_adnum - 1] = attnum;
2229 values[Anum_pg_attrdef_adbin - 1] = CStringGetTextDatum(adbin);
2231 tuple = heap_form_tuple(adrel->rd_att, values, nulls);
2232 CatalogTupleInsert(adrel, tuple);
2234 defobject.classId = AttrDefaultRelationId;
2235 defobject.objectId = attrdefOid;
2236 defobject.objectSubId = 0;
2238 table_close(adrel, RowExclusiveLock);
2240 /* now can free some of the stuff allocated above */
2241 pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
2242 heap_freetuple(tuple);
2243 pfree(adbin);
2246 * Update the pg_attribute entry for the column to show that a default
2247 * exists.
2249 attrrel = table_open(AttributeRelationId, RowExclusiveLock);
2250 atttup = SearchSysCacheCopy2(ATTNUM,
2251 ObjectIdGetDatum(RelationGetRelid(rel)),
2252 Int16GetDatum(attnum));
2253 if (!HeapTupleIsValid(atttup))
2254 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
2255 attnum, RelationGetRelid(rel));
2256 attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
2257 attgenerated = attStruct->attgenerated;
2258 if (!attStruct->atthasdef)
2260 Form_pg_attribute defAttStruct;
2262 ExprState *exprState;
2263 Expr *expr2 = (Expr *) expr;
2264 EState *estate = NULL;
2265 ExprContext *econtext;
2266 Datum valuesAtt[Natts_pg_attribute];
2267 bool nullsAtt[Natts_pg_attribute];
2268 bool replacesAtt[Natts_pg_attribute];
2269 Datum missingval = (Datum) 0;
2270 bool missingIsNull = true;
2272 MemSet(valuesAtt, 0, sizeof(valuesAtt));
2273 MemSet(nullsAtt, false, sizeof(nullsAtt));
2274 MemSet(replacesAtt, false, sizeof(replacesAtt));
2275 valuesAtt[Anum_pg_attribute_atthasdef - 1] = true;
2276 replacesAtt[Anum_pg_attribute_atthasdef - 1] = true;
2278 if (add_column_mode && !attgenerated)
2280 expr2 = expression_planner(expr2);
2281 estate = CreateExecutorState();
2282 exprState = ExecPrepareExpr(expr2, estate);
2283 econtext = GetPerTupleExprContext(estate);
2285 missingval = ExecEvalExpr(exprState, econtext,
2286 &missingIsNull);
2288 FreeExecutorState(estate);
2290 defAttStruct = TupleDescAttr(rel->rd_att, attnum - 1);
2292 if (missingIsNull)
2294 /* if the default evaluates to NULL, just store a NULL array */
2295 missingval = (Datum) 0;
2297 else
2299 /* otherwise make a one-element array of the value */
2300 missingval = PointerGetDatum(construct_array(&missingval,
2302 defAttStruct->atttypid,
2303 defAttStruct->attlen,
2304 defAttStruct->attbyval,
2305 defAttStruct->attalign));
2308 valuesAtt[Anum_pg_attribute_atthasmissing - 1] = !missingIsNull;
2309 replacesAtt[Anum_pg_attribute_atthasmissing - 1] = true;
2310 valuesAtt[Anum_pg_attribute_attmissingval - 1] = missingval;
2311 replacesAtt[Anum_pg_attribute_attmissingval - 1] = true;
2312 nullsAtt[Anum_pg_attribute_attmissingval - 1] = missingIsNull;
2314 atttup = heap_modify_tuple(atttup, RelationGetDescr(attrrel),
2315 valuesAtt, nullsAtt, replacesAtt);
2317 CatalogTupleUpdate(attrrel, &atttup->t_self, atttup);
2319 if (!missingIsNull)
2320 pfree(DatumGetPointer(missingval));
2323 table_close(attrrel, RowExclusiveLock);
2324 heap_freetuple(atttup);
2327 * Make a dependency so that the pg_attrdef entry goes away if the column
2328 * (or whole table) is deleted.
2330 colobject.classId = RelationRelationId;
2331 colobject.objectId = RelationGetRelid(rel);
2332 colobject.objectSubId = attnum;
2334 recordDependencyOn(&defobject, &colobject, DEPENDENCY_AUTO);
2337 * Record dependencies on objects used in the expression, too.
2339 if (attgenerated)
2342 * Generated column: Dropping anything that the generation expression
2343 * refers to automatically drops the generated column.
2345 recordDependencyOnSingleRelExpr(&colobject, expr, RelationGetRelid(rel),
2346 DEPENDENCY_AUTO,
2347 DEPENDENCY_AUTO, false);
2349 else
2352 * Normal default: Dropping anything that the default refers to
2353 * requires CASCADE and drops the default only.
2355 recordDependencyOnSingleRelExpr(&defobject, expr, RelationGetRelid(rel),
2356 DEPENDENCY_NORMAL,
2357 DEPENDENCY_NORMAL, false);
2361 * Post creation hook for attribute defaults.
2363 * XXX. ALTER TABLE ALTER COLUMN SET/DROP DEFAULT is implemented with a
2364 * couple of deletion/creation of the attribute's default entry, so the
2365 * callee should check existence of an older version of this entry if it
2366 * needs to distinguish.
2368 InvokeObjectPostCreateHookArg(AttrDefaultRelationId,
2369 RelationGetRelid(rel), attnum, is_internal);
2371 return attrdefOid;
2375 * Store a check-constraint expression for the given relation.
2377 * Caller is responsible for updating the count of constraints
2378 * in the pg_class entry for the relation.
2380 * The OID of the new constraint is returned.
2382 static Oid
2383 StoreRelCheck(Relation rel, const char *ccname, Node *expr,
2384 bool is_validated, bool is_local, int inhcount,
2385 bool is_no_inherit, bool is_internal)
2387 char *ccbin;
2388 List *varList;
2389 int keycount;
2390 int16 *attNos;
2391 Oid constrOid;
2394 * Flatten expression to string form for storage.
2396 ccbin = nodeToString(expr);
2399 * Find columns of rel that are used in expr
2401 * NB: pull_var_clause is okay here only because we don't allow subselects
2402 * in check constraints; it would fail to examine the contents of
2403 * subselects.
2405 varList = pull_var_clause(expr, 0);
2406 keycount = list_length(varList);
2408 if (keycount > 0)
2410 ListCell *vl;
2411 int i = 0;
2413 attNos = (int16 *) palloc(keycount * sizeof(int16));
2414 foreach(vl, varList)
2416 Var *var = (Var *) lfirst(vl);
2417 int j;
2419 for (j = 0; j < i; j++)
2420 if (attNos[j] == var->varattno)
2421 break;
2422 if (j == i)
2423 attNos[i++] = var->varattno;
2425 keycount = i;
2427 else
2428 attNos = NULL;
2431 * Partitioned tables do not contain any rows themselves, so a NO INHERIT
2432 * constraint makes no sense.
2434 if (is_no_inherit &&
2435 rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2436 ereport(ERROR,
2437 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
2438 errmsg("cannot add NO INHERIT constraint to partitioned table \"%s\"",
2439 RelationGetRelationName(rel))));
2442 * Create the Check Constraint
2444 constrOid =
2445 CreateConstraintEntry(ccname, /* Constraint Name */
2446 RelationGetNamespace(rel), /* namespace */
2447 CONSTRAINT_CHECK, /* Constraint Type */
2448 false, /* Is Deferrable */
2449 false, /* Is Deferred */
2450 is_validated,
2451 InvalidOid, /* no parent constraint */
2452 RelationGetRelid(rel), /* relation */
2453 attNos, /* attrs in the constraint */
2454 keycount, /* # key attrs in the constraint */
2455 keycount, /* # total attrs in the constraint */
2456 InvalidOid, /* not a domain constraint */
2457 InvalidOid, /* no associated index */
2458 InvalidOid, /* Foreign key fields */
2459 NULL,
2460 NULL,
2461 NULL,
2462 NULL,
2464 ' ',
2465 ' ',
2466 ' ',
2467 NULL, /* not an exclusion constraint */
2468 expr, /* Tree form of check constraint */
2469 ccbin, /* Binary form of check constraint */
2470 is_local, /* conislocal */
2471 inhcount, /* coninhcount */
2472 is_no_inherit, /* connoinherit */
2473 is_internal); /* internally constructed? */
2475 pfree(ccbin);
2477 return constrOid;
2481 * Store defaults and constraints (passed as a list of CookedConstraint).
2483 * Each CookedConstraint struct is modified to store the new catalog tuple OID.
2485 * NOTE: only pre-cooked expressions will be passed this way, which is to
2486 * say expressions inherited from an existing relation. Newly parsed
2487 * expressions can be added later, by direct calls to StoreAttrDefault
2488 * and StoreRelCheck (see AddRelationNewConstraints()).
2490 static void
2491 StoreConstraints(Relation rel, List *cooked_constraints, bool is_internal)
2493 int numchecks = 0;
2494 ListCell *lc;
2496 if (cooked_constraints == NIL)
2497 return; /* nothing to do */
2500 * Deparsing of constraint expressions will fail unless the just-created
2501 * pg_attribute tuples for this relation are made visible. So, bump the
2502 * command counter. CAUTION: this will cause a relcache entry rebuild.
2504 CommandCounterIncrement();
2506 foreach(lc, cooked_constraints)
2508 CookedConstraint *con = (CookedConstraint *) lfirst(lc);
2510 switch (con->contype)
2512 case CONSTR_DEFAULT:
2513 con->conoid = StoreAttrDefault(rel, con->attnum, con->expr,
2514 is_internal, false);
2515 break;
2516 case CONSTR_CHECK:
2517 con->conoid =
2518 StoreRelCheck(rel, con->name, con->expr,
2519 !con->skip_validation, con->is_local,
2520 con->inhcount, con->is_no_inherit,
2521 is_internal);
2522 numchecks++;
2523 break;
2524 default:
2525 elog(ERROR, "unrecognized constraint type: %d",
2526 (int) con->contype);
2530 if (numchecks > 0)
2531 SetRelationNumChecks(rel, numchecks);
2535 * AddRelationNewConstraints
2537 * Add new column default expressions and/or constraint check expressions
2538 * to an existing relation. This is defined to do both for efficiency in
2539 * DefineRelation, but of course you can do just one or the other by passing
2540 * empty lists.
2542 * rel: relation to be modified
2543 * newColDefaults: list of RawColumnDefault structures
2544 * newConstraints: list of Constraint nodes
2545 * allow_merge: true if check constraints may be merged with existing ones
2546 * is_local: true if definition is local, false if it's inherited
2547 * is_internal: true if result of some internal process, not a user request
2549 * All entries in newColDefaults will be processed. Entries in newConstraints
2550 * will be processed only if they are CONSTR_CHECK type.
2552 * Returns a list of CookedConstraint nodes that shows the cooked form of
2553 * the default and constraint expressions added to the relation.
2555 * NB: caller should have opened rel with AccessExclusiveLock, and should
2556 * hold that lock till end of transaction. Also, we assume the caller has
2557 * done a CommandCounterIncrement if necessary to make the relation's catalog
2558 * tuples visible.
2560 List *
2561 AddRelationNewConstraints(Relation rel,
2562 List *newColDefaults,
2563 List *newConstraints,
2564 bool allow_merge,
2565 bool is_local,
2566 bool is_internal,
2567 const char *queryString)
2569 List *cookedConstraints = NIL;
2570 TupleDesc tupleDesc;
2571 TupleConstr *oldconstr;
2572 int numoldchecks;
2573 ParseState *pstate;
2574 ParseNamespaceItem *nsitem;
2575 int numchecks;
2576 List *checknames;
2577 ListCell *cell;
2578 Node *expr;
2579 CookedConstraint *cooked;
2582 * Get info about existing constraints.
2584 tupleDesc = RelationGetDescr(rel);
2585 oldconstr = tupleDesc->constr;
2586 if (oldconstr)
2587 numoldchecks = oldconstr->num_check;
2588 else
2589 numoldchecks = 0;
2592 * Create a dummy ParseState and insert the target relation as its sole
2593 * rangetable entry. We need a ParseState for transformExpr.
2595 pstate = make_parsestate(NULL);
2596 pstate->p_sourcetext = queryString;
2597 nsitem = addRangeTableEntryForRelation(pstate,
2598 rel,
2599 AccessShareLock,
2600 NULL,
2601 false,
2602 true);
2603 addNSItemToQuery(pstate, nsitem, true, true, true);
2606 * Process column default expressions.
2608 foreach(cell, newColDefaults)
2610 RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
2611 Form_pg_attribute atp = TupleDescAttr(rel->rd_att, colDef->attnum - 1);
2612 Oid defOid;
2614 expr = cookDefault(pstate, colDef->raw_default,
2615 atp->atttypid, atp->atttypmod,
2616 NameStr(atp->attname),
2617 atp->attgenerated);
2620 * If the expression is just a NULL constant, we do not bother to make
2621 * an explicit pg_attrdef entry, since the default behavior is
2622 * equivalent. This applies to column defaults, but not for
2623 * generation expressions.
2625 * Note a nonobvious property of this test: if the column is of a
2626 * domain type, what we'll get is not a bare null Const but a
2627 * CoerceToDomain expr, so we will not discard the default. This is
2628 * critical because the column default needs to be retained to
2629 * override any default that the domain might have.
2631 if (expr == NULL ||
2632 (!colDef->generated &&
2633 IsA(expr, Const) &&
2634 castNode(Const, expr)->constisnull))
2635 continue;
2637 /* If the DEFAULT is volatile we cannot use a missing value */
2638 if (colDef->missingMode && contain_volatile_functions((Node *) expr))
2639 colDef->missingMode = false;
2641 defOid = StoreAttrDefault(rel, colDef->attnum, expr, is_internal,
2642 colDef->missingMode);
2644 cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2645 cooked->contype = CONSTR_DEFAULT;
2646 cooked->conoid = defOid;
2647 cooked->name = NULL;
2648 cooked->attnum = colDef->attnum;
2649 cooked->expr = expr;
2650 cooked->skip_validation = false;
2651 cooked->is_local = is_local;
2652 cooked->inhcount = is_local ? 0 : 1;
2653 cooked->is_no_inherit = false;
2654 cookedConstraints = lappend(cookedConstraints, cooked);
2658 * Process constraint expressions.
2660 numchecks = numoldchecks;
2661 checknames = NIL;
2662 foreach(cell, newConstraints)
2664 Constraint *cdef = (Constraint *) lfirst(cell);
2665 char *ccname;
2666 Oid constrOid;
2668 if (cdef->contype != CONSTR_CHECK)
2669 continue;
2671 if (cdef->raw_expr != NULL)
2673 Assert(cdef->cooked_expr == NULL);
2676 * Transform raw parsetree to executable expression, and verify
2677 * it's valid as a CHECK constraint.
2679 expr = cookConstraint(pstate, cdef->raw_expr,
2680 RelationGetRelationName(rel));
2682 else
2684 Assert(cdef->cooked_expr != NULL);
2687 * Here, we assume the parser will only pass us valid CHECK
2688 * expressions, so we do no particular checking.
2690 expr = stringToNode(cdef->cooked_expr);
2694 * Check name uniqueness, or generate a name if none was given.
2696 if (cdef->conname != NULL)
2698 ListCell *cell2;
2700 ccname = cdef->conname;
2701 /* Check against other new constraints */
2702 /* Needed because we don't do CommandCounterIncrement in loop */
2703 foreach(cell2, checknames)
2705 if (strcmp((char *) lfirst(cell2), ccname) == 0)
2706 ereport(ERROR,
2707 (errcode(ERRCODE_DUPLICATE_OBJECT),
2708 errmsg("check constraint \"%s\" already exists",
2709 ccname)));
2712 /* save name for future checks */
2713 checknames = lappend(checknames, ccname);
2716 * Check against pre-existing constraints. If we are allowed to
2717 * merge with an existing constraint, there's no more to do here.
2718 * (We omit the duplicate constraint from the result, which is
2719 * what ATAddCheckConstraint wants.)
2721 if (MergeWithExistingConstraint(rel, ccname, expr,
2722 allow_merge, is_local,
2723 cdef->initially_valid,
2724 cdef->is_no_inherit))
2725 continue;
2727 else
2730 * When generating a name, we want to create "tab_col_check" for a
2731 * column constraint and "tab_check" for a table constraint. We
2732 * no longer have any info about the syntactic positioning of the
2733 * constraint phrase, so we approximate this by seeing whether the
2734 * expression references more than one column. (If the user
2735 * played by the rules, the result is the same...)
2737 * Note: pull_var_clause() doesn't descend into sublinks, but we
2738 * eliminated those above; and anyway this only needs to be an
2739 * approximate answer.
2741 List *vars;
2742 char *colname;
2744 vars = pull_var_clause(expr, 0);
2746 /* eliminate duplicates */
2747 vars = list_union(NIL, vars);
2749 if (list_length(vars) == 1)
2750 colname = get_attname(RelationGetRelid(rel),
2751 ((Var *) linitial(vars))->varattno,
2752 true);
2753 else
2754 colname = NULL;
2756 ccname = ChooseConstraintName(RelationGetRelationName(rel),
2757 colname,
2758 "check",
2759 RelationGetNamespace(rel),
2760 checknames);
2762 /* save name for future checks */
2763 checknames = lappend(checknames, ccname);
2767 * OK, store it.
2769 constrOid =
2770 StoreRelCheck(rel, ccname, expr, cdef->initially_valid, is_local,
2771 is_local ? 0 : 1, cdef->is_no_inherit, is_internal);
2773 numchecks++;
2775 cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2776 cooked->contype = CONSTR_CHECK;
2777 cooked->conoid = constrOid;
2778 cooked->name = ccname;
2779 cooked->attnum = 0;
2780 cooked->expr = expr;
2781 cooked->skip_validation = cdef->skip_validation;
2782 cooked->is_local = is_local;
2783 cooked->inhcount = is_local ? 0 : 1;
2784 cooked->is_no_inherit = cdef->is_no_inherit;
2785 cookedConstraints = lappend(cookedConstraints, cooked);
2789 * Update the count of constraints in the relation's pg_class tuple. We do
2790 * this even if there was no change, in order to ensure that an SI update
2791 * message is sent out for the pg_class tuple, which will force other
2792 * backends to rebuild their relcache entries for the rel. (This is
2793 * critical if we added defaults but not constraints.)
2795 SetRelationNumChecks(rel, numchecks);
2797 return cookedConstraints;
2801 * Check for a pre-existing check constraint that conflicts with a proposed
2802 * new one, and either adjust its conislocal/coninhcount settings or throw
2803 * error as needed.
2805 * Returns true if merged (constraint is a duplicate), or false if it's
2806 * got a so-far-unique name, or throws error if conflict.
2808 * XXX See MergeConstraintsIntoExisting too if you change this code.
2810 static bool
2811 MergeWithExistingConstraint(Relation rel, const char *ccname, Node *expr,
2812 bool allow_merge, bool is_local,
2813 bool is_initially_valid,
2814 bool is_no_inherit)
2816 bool found;
2817 Relation conDesc;
2818 SysScanDesc conscan;
2819 ScanKeyData skey[3];
2820 HeapTuple tup;
2822 /* Search for a pg_constraint entry with same name and relation */
2823 conDesc = table_open(ConstraintRelationId, RowExclusiveLock);
2825 found = false;
2827 ScanKeyInit(&skey[0],
2828 Anum_pg_constraint_conrelid,
2829 BTEqualStrategyNumber, F_OIDEQ,
2830 ObjectIdGetDatum(RelationGetRelid(rel)));
2831 ScanKeyInit(&skey[1],
2832 Anum_pg_constraint_contypid,
2833 BTEqualStrategyNumber, F_OIDEQ,
2834 ObjectIdGetDatum(InvalidOid));
2835 ScanKeyInit(&skey[2],
2836 Anum_pg_constraint_conname,
2837 BTEqualStrategyNumber, F_NAMEEQ,
2838 CStringGetDatum(ccname));
2840 conscan = systable_beginscan(conDesc, ConstraintRelidTypidNameIndexId, true,
2841 NULL, 3, skey);
2843 /* There can be at most one matching row */
2844 if (HeapTupleIsValid(tup = systable_getnext(conscan)))
2846 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
2848 /* Found it. Conflicts if not identical check constraint */
2849 if (con->contype == CONSTRAINT_CHECK)
2851 Datum val;
2852 bool isnull;
2854 val = fastgetattr(tup,
2855 Anum_pg_constraint_conbin,
2856 conDesc->rd_att, &isnull);
2857 if (isnull)
2858 elog(ERROR, "null conbin for rel %s",
2859 RelationGetRelationName(rel));
2860 if (equal(expr, stringToNode(TextDatumGetCString(val))))
2861 found = true;
2865 * If the existing constraint is purely inherited (no local
2866 * definition) then interpret addition of a local constraint as a
2867 * legal merge. This allows ALTER ADD CONSTRAINT on parent and child
2868 * tables to be given in either order with same end state. However if
2869 * the relation is a partition, all inherited constraints are always
2870 * non-local, including those that were merged.
2872 if (is_local && !con->conislocal && !rel->rd_rel->relispartition)
2873 allow_merge = true;
2875 if (!found || !allow_merge)
2876 ereport(ERROR,
2877 (errcode(ERRCODE_DUPLICATE_OBJECT),
2878 errmsg("constraint \"%s\" for relation \"%s\" already exists",
2879 ccname, RelationGetRelationName(rel))));
2881 /* If the child constraint is "no inherit" then cannot merge */
2882 if (con->connoinherit)
2883 ereport(ERROR,
2884 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2885 errmsg("constraint \"%s\" conflicts with non-inherited constraint on relation \"%s\"",
2886 ccname, RelationGetRelationName(rel))));
2889 * Must not change an existing inherited constraint to "no inherit"
2890 * status. That's because inherited constraints should be able to
2891 * propagate to lower-level children.
2893 if (con->coninhcount > 0 && is_no_inherit)
2894 ereport(ERROR,
2895 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2896 errmsg("constraint \"%s\" conflicts with inherited constraint on relation \"%s\"",
2897 ccname, RelationGetRelationName(rel))));
2900 * If the child constraint is "not valid" then cannot merge with a
2901 * valid parent constraint.
2903 if (is_initially_valid && !con->convalidated)
2904 ereport(ERROR,
2905 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2906 errmsg("constraint \"%s\" conflicts with NOT VALID constraint on relation \"%s\"",
2907 ccname, RelationGetRelationName(rel))));
2909 /* OK to update the tuple */
2910 ereport(NOTICE,
2911 (errmsg("merging constraint \"%s\" with inherited definition",
2912 ccname)));
2914 tup = heap_copytuple(tup);
2915 con = (Form_pg_constraint) GETSTRUCT(tup);
2918 * In case of partitions, an inherited constraint must be inherited
2919 * only once since it cannot have multiple parents and it is never
2920 * considered local.
2922 if (rel->rd_rel->relispartition)
2924 con->coninhcount = 1;
2925 con->conislocal = false;
2927 else
2929 if (is_local)
2930 con->conislocal = true;
2931 else
2932 con->coninhcount++;
2935 if (is_no_inherit)
2937 Assert(is_local);
2938 con->connoinherit = true;
2941 CatalogTupleUpdate(conDesc, &tup->t_self, tup);
2944 systable_endscan(conscan);
2945 table_close(conDesc, RowExclusiveLock);
2947 return found;
2951 * Update the count of constraints in the relation's pg_class tuple.
2953 * Caller had better hold exclusive lock on the relation.
2955 * An important side effect is that a SI update message will be sent out for
2956 * the pg_class tuple, which will force other backends to rebuild their
2957 * relcache entries for the rel. Also, this backend will rebuild its
2958 * own relcache entry at the next CommandCounterIncrement.
2960 static void
2961 SetRelationNumChecks(Relation rel, int numchecks)
2963 Relation relrel;
2964 HeapTuple reltup;
2965 Form_pg_class relStruct;
2967 relrel = table_open(RelationRelationId, RowExclusiveLock);
2968 reltup = SearchSysCacheCopy1(RELOID,
2969 ObjectIdGetDatum(RelationGetRelid(rel)));
2970 if (!HeapTupleIsValid(reltup))
2971 elog(ERROR, "cache lookup failed for relation %u",
2972 RelationGetRelid(rel));
2973 relStruct = (Form_pg_class) GETSTRUCT(reltup);
2975 if (relStruct->relchecks != numchecks)
2977 relStruct->relchecks = numchecks;
2979 CatalogTupleUpdate(relrel, &reltup->t_self, reltup);
2981 else
2983 /* Skip the disk update, but force relcache inval anyway */
2984 CacheInvalidateRelcache(rel);
2987 heap_freetuple(reltup);
2988 table_close(relrel, RowExclusiveLock);
2992 * Check for references to generated columns
2994 static bool
2995 check_nested_generated_walker(Node *node, void *context)
2997 ParseState *pstate = context;
2999 if (node == NULL)
3000 return false;
3001 else if (IsA(node, Var))
3003 Var *var = (Var *) node;
3004 Oid relid;
3005 AttrNumber attnum;
3007 relid = rt_fetch(var->varno, pstate->p_rtable)->relid;
3008 attnum = var->varattno;
3010 if (OidIsValid(relid) && AttributeNumberIsValid(attnum) && get_attgenerated(relid, attnum))
3011 ereport(ERROR,
3012 (errcode(ERRCODE_SYNTAX_ERROR),
3013 errmsg("cannot use generated column \"%s\" in column generation expression",
3014 get_attname(relid, attnum, false)),
3015 errdetail("A generated column cannot reference another generated column."),
3016 parser_errposition(pstate, var->location)));
3018 return false;
3020 else
3021 return expression_tree_walker(node, check_nested_generated_walker,
3022 (void *) context);
3025 static void
3026 check_nested_generated(ParseState *pstate, Node *node)
3028 check_nested_generated_walker(node, pstate);
3032 * Take a raw default and convert it to a cooked format ready for
3033 * storage.
3035 * Parse state should be set up to recognize any vars that might appear
3036 * in the expression. (Even though we plan to reject vars, it's more
3037 * user-friendly to give the correct error message than "unknown var".)
3039 * If atttypid is not InvalidOid, coerce the expression to the specified
3040 * type (and typmod atttypmod). attname is only needed in this case:
3041 * it is used in the error message, if any.
3043 Node *
3044 cookDefault(ParseState *pstate,
3045 Node *raw_default,
3046 Oid atttypid,
3047 int32 atttypmod,
3048 const char *attname,
3049 char attgenerated)
3051 Node *expr;
3053 Assert(raw_default != NULL);
3056 * Transform raw parsetree to executable expression.
3058 expr = transformExpr(pstate, raw_default, attgenerated ? EXPR_KIND_GENERATED_COLUMN : EXPR_KIND_COLUMN_DEFAULT);
3060 if (attgenerated)
3062 check_nested_generated(pstate, expr);
3064 if (contain_mutable_functions(expr))
3065 ereport(ERROR,
3066 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
3067 errmsg("generation expression is not immutable")));
3069 else
3072 * For a default expression, transformExpr() should have rejected
3073 * column references.
3075 Assert(!contain_var_clause(expr));
3079 * Coerce the expression to the correct type and typmod, if given. This
3080 * should match the parser's processing of non-defaulted expressions ---
3081 * see transformAssignedExpr().
3083 if (OidIsValid(atttypid))
3085 Oid type_id = exprType(expr);
3087 expr = coerce_to_target_type(pstate, expr, type_id,
3088 atttypid, atttypmod,
3089 COERCION_ASSIGNMENT,
3090 COERCE_IMPLICIT_CAST,
3091 -1);
3092 if (expr == NULL)
3093 ereport(ERROR,
3094 (errcode(ERRCODE_DATATYPE_MISMATCH),
3095 errmsg("column \"%s\" is of type %s"
3096 " but default expression is of type %s",
3097 attname,
3098 format_type_be(atttypid),
3099 format_type_be(type_id)),
3100 errhint("You will need to rewrite or cast the expression.")));
3104 * Finally, take care of collations in the finished expression.
3106 assign_expr_collations(pstate, expr);
3108 return expr;
3112 * Take a raw CHECK constraint expression and convert it to a cooked format
3113 * ready for storage.
3115 * Parse state must be set up to recognize any vars that might appear
3116 * in the expression.
3118 static Node *
3119 cookConstraint(ParseState *pstate,
3120 Node *raw_constraint,
3121 char *relname)
3123 Node *expr;
3126 * Transform raw parsetree to executable expression.
3128 expr = transformExpr(pstate, raw_constraint, EXPR_KIND_CHECK_CONSTRAINT);
3131 * Make sure it yields a boolean result.
3133 expr = coerce_to_boolean(pstate, expr, "CHECK");
3136 * Take care of collations.
3138 assign_expr_collations(pstate, expr);
3141 * Make sure no outside relations are referred to (this is probably dead
3142 * code now that add_missing_from is history).
3144 if (list_length(pstate->p_rtable) != 1)
3145 ereport(ERROR,
3146 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
3147 errmsg("only table \"%s\" can be referenced in check constraint",
3148 relname)));
3150 return expr;
3155 * RemoveStatistics --- remove entries in pg_statistic for a rel or column
3157 * If attnum is zero, remove all entries for rel; else remove only the one(s)
3158 * for that column.
3160 void
3161 RemoveStatistics(Oid relid, AttrNumber attnum)
3163 Relation pgstatistic;
3164 SysScanDesc scan;
3165 ScanKeyData key[2];
3166 int nkeys;
3167 HeapTuple tuple;
3169 pgstatistic = table_open(StatisticRelationId, RowExclusiveLock);
3171 ScanKeyInit(&key[0],
3172 Anum_pg_statistic_starelid,
3173 BTEqualStrategyNumber, F_OIDEQ,
3174 ObjectIdGetDatum(relid));
3176 if (attnum == 0)
3177 nkeys = 1;
3178 else
3180 ScanKeyInit(&key[1],
3181 Anum_pg_statistic_staattnum,
3182 BTEqualStrategyNumber, F_INT2EQ,
3183 Int16GetDatum(attnum));
3184 nkeys = 2;
3187 scan = systable_beginscan(pgstatistic, StatisticRelidAttnumInhIndexId, true,
3188 NULL, nkeys, key);
3190 /* we must loop even when attnum != 0, in case of inherited stats */
3191 while (HeapTupleIsValid(tuple = systable_getnext(scan)))
3192 CatalogTupleDelete(pgstatistic, &tuple->t_self);
3194 systable_endscan(scan);
3196 table_close(pgstatistic, RowExclusiveLock);
3201 * RelationTruncateIndexes - truncate all indexes associated
3202 * with the heap relation to zero tuples.
3204 * The routine will truncate and then reconstruct the indexes on
3205 * the specified relation. Caller must hold exclusive lock on rel.
3207 static void
3208 RelationTruncateIndexes(Relation heapRelation)
3210 ListCell *indlist;
3212 /* Ask the relcache to produce a list of the indexes of the rel */
3213 foreach(indlist, RelationGetIndexList(heapRelation))
3215 Oid indexId = lfirst_oid(indlist);
3216 Relation currentIndex;
3217 IndexInfo *indexInfo;
3219 /* Open the index relation; use exclusive lock, just to be sure */
3220 currentIndex = index_open(indexId, AccessExclusiveLock);
3223 * Fetch info needed for index_build. Since we know there are no
3224 * tuples that actually need indexing, we can use a dummy IndexInfo.
3225 * This is slightly cheaper to build, but the real point is to avoid
3226 * possibly running user-defined code in index expressions or
3227 * predicates. We might be getting invoked during ON COMMIT
3228 * processing, and we don't want to run any such code then.
3230 indexInfo = BuildDummyIndexInfo(currentIndex);
3233 * Now truncate the actual file (and discard buffers).
3235 RelationTruncate(currentIndex, 0);
3237 /* Initialize the index and rebuild */
3238 /* Note: we do not need to re-establish pkey setting */
3239 index_build(heapRelation, currentIndex, indexInfo, true, false);
3241 /* We're done with this index */
3242 index_close(currentIndex, NoLock);
3247 * heap_truncate
3249 * This routine deletes all data within all the specified relations.
3251 * This is not transaction-safe! There is another, transaction-safe
3252 * implementation in commands/tablecmds.c. We now use this only for
3253 * ON COMMIT truncation of temporary tables, where it doesn't matter.
3255 void
3256 heap_truncate(List *relids)
3258 List *relations = NIL;
3259 ListCell *cell;
3261 /* Open relations for processing, and grab exclusive access on each */
3262 foreach(cell, relids)
3264 Oid rid = lfirst_oid(cell);
3265 Relation rel;
3267 rel = table_open(rid, AccessExclusiveLock);
3268 relations = lappend(relations, rel);
3271 /* Don't allow truncate on tables that are referenced by foreign keys */
3272 heap_truncate_check_FKs(relations, true);
3274 /* OK to do it */
3275 foreach(cell, relations)
3277 Relation rel = lfirst(cell);
3279 /* Truncate the relation */
3280 heap_truncate_one_rel(rel);
3282 /* Close the relation, but keep exclusive lock on it until commit */
3283 table_close(rel, NoLock);
3288 * heap_truncate_one_rel
3290 * This routine deletes all data within the specified relation.
3292 * This is not transaction-safe, because the truncation is done immediately
3293 * and cannot be rolled back later. Caller is responsible for having
3294 * checked permissions etc, and must have obtained AccessExclusiveLock.
3296 void
3297 heap_truncate_one_rel(Relation rel)
3299 Oid toastrelid;
3302 * Truncate the relation. Partitioned tables have no storage, so there is
3303 * nothing to do for them here.
3305 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3306 return;
3308 /* Truncate the underlying relation */
3309 table_relation_nontransactional_truncate(rel);
3311 /* If the relation has indexes, truncate the indexes too */
3312 RelationTruncateIndexes(rel);
3314 /* If there is a toast table, truncate that too */
3315 toastrelid = rel->rd_rel->reltoastrelid;
3316 if (OidIsValid(toastrelid))
3318 Relation toastrel = table_open(toastrelid, AccessExclusiveLock);
3320 table_relation_nontransactional_truncate(toastrel);
3321 RelationTruncateIndexes(toastrel);
3322 /* keep the lock... */
3323 table_close(toastrel, NoLock);
3328 * heap_truncate_check_FKs
3329 * Check for foreign keys referencing a list of relations that
3330 * are to be truncated, and raise error if there are any
3332 * We disallow such FKs (except self-referential ones) since the whole point
3333 * of TRUNCATE is to not scan the individual rows to be thrown away.
3335 * This is split out so it can be shared by both implementations of truncate.
3336 * Caller should already hold a suitable lock on the relations.
3338 * tempTables is only used to select an appropriate error message.
3340 void
3341 heap_truncate_check_FKs(List *relations, bool tempTables)
3343 List *oids = NIL;
3344 List *dependents;
3345 ListCell *cell;
3348 * Build a list of OIDs of the interesting relations.
3350 * If a relation has no triggers, then it can neither have FKs nor be
3351 * referenced by a FK from another table, so we can ignore it. For
3352 * partitioned tables, FKs have no triggers, so we must include them
3353 * anyway.
3355 foreach(cell, relations)
3357 Relation rel = lfirst(cell);
3359 if (rel->rd_rel->relhastriggers ||
3360 rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3361 oids = lappend_oid(oids, RelationGetRelid(rel));
3365 * Fast path: if no relation has triggers, none has FKs either.
3367 if (oids == NIL)
3368 return;
3371 * Otherwise, must scan pg_constraint. We make one pass with all the
3372 * relations considered; if this finds nothing, then all is well.
3374 dependents = heap_truncate_find_FKs(oids);
3375 if (dependents == NIL)
3376 return;
3379 * Otherwise we repeat the scan once per relation to identify a particular
3380 * pair of relations to complain about. This is pretty slow, but
3381 * performance shouldn't matter much in a failure path. The reason for
3382 * doing things this way is to ensure that the message produced is not
3383 * dependent on chance row locations within pg_constraint.
3385 foreach(cell, oids)
3387 Oid relid = lfirst_oid(cell);
3388 ListCell *cell2;
3390 dependents = heap_truncate_find_FKs(list_make1_oid(relid));
3392 foreach(cell2, dependents)
3394 Oid relid2 = lfirst_oid(cell2);
3396 if (!list_member_oid(oids, relid2))
3398 char *relname = get_rel_name(relid);
3399 char *relname2 = get_rel_name(relid2);
3401 if (tempTables)
3402 ereport(ERROR,
3403 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3404 errmsg("unsupported ON COMMIT and foreign key combination"),
3405 errdetail("Table \"%s\" references \"%s\", but they do not have the same ON COMMIT setting.",
3406 relname2, relname)));
3407 else
3408 ereport(ERROR,
3409 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3410 errmsg("cannot truncate a table referenced in a foreign key constraint"),
3411 errdetail("Table \"%s\" references \"%s\".",
3412 relname2, relname),
3413 errhint("Truncate table \"%s\" at the same time, "
3414 "or use TRUNCATE ... CASCADE.",
3415 relname2)));
3422 * heap_truncate_find_FKs
3423 * Find relations having foreign keys referencing any of the given rels
3425 * Input and result are both lists of relation OIDs. The result contains
3426 * no duplicates, does *not* include any rels that were already in the input
3427 * list, and is sorted in OID order. (The last property is enforced mainly
3428 * to guarantee consistent behavior in the regression tests; we don't want
3429 * behavior to change depending on chance locations of rows in pg_constraint.)
3431 * Note: caller should already have appropriate lock on all rels mentioned
3432 * in relationIds. Since adding or dropping an FK requires exclusive lock
3433 * on both rels, this ensures that the answer will be stable.
3435 List *
3436 heap_truncate_find_FKs(List *relationIds)
3438 List *result = NIL;
3439 List *oids = list_copy(relationIds);
3440 List *parent_cons;
3441 ListCell *cell;
3442 ScanKeyData key;
3443 Relation fkeyRel;
3444 SysScanDesc fkeyScan;
3445 HeapTuple tuple;
3446 bool restart;
3448 oids = list_copy(relationIds);
3451 * Must scan pg_constraint. Right now, it is a seqscan because there is
3452 * no available index on confrelid.
3454 fkeyRel = table_open(ConstraintRelationId, AccessShareLock);
3456 restart:
3457 restart = false;
3458 parent_cons = NIL;
3460 fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
3461 NULL, 0, NULL);
3463 while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
3465 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
3467 /* Not a foreign key */
3468 if (con->contype != CONSTRAINT_FOREIGN)
3469 continue;
3471 /* Not referencing one of our list of tables */
3472 if (!list_member_oid(oids, con->confrelid))
3473 continue;
3476 * If this constraint has a parent constraint which we have not seen
3477 * yet, keep track of it for the second loop, below. Tracking parent
3478 * constraints allows us to climb up to the top-level level constraint
3479 * and look for all possible relations referencing the partitioned
3480 * table.
3482 if (OidIsValid(con->conparentid) &&
3483 !list_member_oid(parent_cons, con->conparentid))
3484 parent_cons = lappend_oid(parent_cons, con->conparentid);
3487 * Add referencer to result, unless present in input list. (Don't
3488 * worry about dupes: we'll fix that below).
3490 if (!list_member_oid(relationIds, con->conrelid))
3491 result = lappend_oid(result, con->conrelid);
3494 systable_endscan(fkeyScan);
3497 * Process each parent constraint we found to add the list of referenced
3498 * relations by them to the oids list. If we do add any new such
3499 * relations, redo the first loop above. Also, if we see that the parent
3500 * constraint in turn has a parent, add that so that we process all
3501 * relations in a single additional pass.
3503 foreach(cell, parent_cons)
3505 Oid parent = lfirst_oid(cell);
3507 ScanKeyInit(&key,
3508 Anum_pg_constraint_oid,
3509 BTEqualStrategyNumber, F_OIDEQ,
3510 ObjectIdGetDatum(parent));
3512 fkeyScan = systable_beginscan(fkeyRel, ConstraintOidIndexId,
3513 true, NULL, 1, &key);
3515 tuple = systable_getnext(fkeyScan);
3516 if (HeapTupleIsValid(tuple))
3518 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
3521 * pg_constraint rows always appear for partitioned hierarchies
3522 * this way: on the each side of the constraint, one row appears
3523 * for each partition that points to the top-most table on the
3524 * other side.
3526 * Because of this arrangement, we can correctly catch all
3527 * relevant relations by adding to 'parent_cons' all rows with
3528 * valid conparentid, and to the 'oids' list all rows with a zero
3529 * conparentid. If any oids are added to 'oids', redo the first
3530 * loop above by setting 'restart'.
3532 if (OidIsValid(con->conparentid))
3533 parent_cons = list_append_unique_oid(parent_cons,
3534 con->conparentid);
3535 else if (!list_member_oid(oids, con->confrelid))
3537 oids = lappend_oid(oids, con->confrelid);
3538 restart = true;
3542 systable_endscan(fkeyScan);
3545 list_free(parent_cons);
3546 if (restart)
3547 goto restart;
3549 table_close(fkeyRel, AccessShareLock);
3550 list_free(oids);
3552 /* Now sort and de-duplicate the result list */
3553 list_sort(result, list_oid_cmp);
3554 list_deduplicate_oid(result);
3556 return result;
3560 * StorePartitionKey
3561 * Store information about the partition key rel into the catalog
3563 void
3564 StorePartitionKey(Relation rel,
3565 char strategy,
3566 int16 partnatts,
3567 AttrNumber *partattrs,
3568 List *partexprs,
3569 Oid *partopclass,
3570 Oid *partcollation)
3572 int i;
3573 int2vector *partattrs_vec;
3574 oidvector *partopclass_vec;
3575 oidvector *partcollation_vec;
3576 Datum partexprDatum;
3577 Relation pg_partitioned_table;
3578 HeapTuple tuple;
3579 Datum values[Natts_pg_partitioned_table];
3580 bool nulls[Natts_pg_partitioned_table];
3581 ObjectAddress myself;
3582 ObjectAddress referenced;
3584 Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
3586 /* Copy the partition attribute numbers, opclass OIDs into arrays */
3587 partattrs_vec = buildint2vector(partattrs, partnatts);
3588 partopclass_vec = buildoidvector(partopclass, partnatts);
3589 partcollation_vec = buildoidvector(partcollation, partnatts);
3591 /* Convert the expressions (if any) to a text datum */
3592 if (partexprs)
3594 char *exprString;
3596 exprString = nodeToString(partexprs);
3597 partexprDatum = CStringGetTextDatum(exprString);
3598 pfree(exprString);
3600 else
3601 partexprDatum = (Datum) 0;
3603 pg_partitioned_table = table_open(PartitionedRelationId, RowExclusiveLock);
3605 MemSet(nulls, false, sizeof(nulls));
3607 /* Only this can ever be NULL */
3608 if (!partexprDatum)
3609 nulls[Anum_pg_partitioned_table_partexprs - 1] = true;
3611 values[Anum_pg_partitioned_table_partrelid - 1] = ObjectIdGetDatum(RelationGetRelid(rel));
3612 values[Anum_pg_partitioned_table_partstrat - 1] = CharGetDatum(strategy);
3613 values[Anum_pg_partitioned_table_partnatts - 1] = Int16GetDatum(partnatts);
3614 values[Anum_pg_partitioned_table_partdefid - 1] = ObjectIdGetDatum(InvalidOid);
3615 values[Anum_pg_partitioned_table_partattrs - 1] = PointerGetDatum(partattrs_vec);
3616 values[Anum_pg_partitioned_table_partclass - 1] = PointerGetDatum(partopclass_vec);
3617 values[Anum_pg_partitioned_table_partcollation - 1] = PointerGetDatum(partcollation_vec);
3618 values[Anum_pg_partitioned_table_partexprs - 1] = partexprDatum;
3620 tuple = heap_form_tuple(RelationGetDescr(pg_partitioned_table), values, nulls);
3622 CatalogTupleInsert(pg_partitioned_table, tuple);
3623 table_close(pg_partitioned_table, RowExclusiveLock);
3625 /* Mark this relation as dependent on a few things as follows */
3626 myself.classId = RelationRelationId;
3627 myself.objectId = RelationGetRelid(rel);
3628 myself.objectSubId = 0;
3630 /* Operator class and collation per key column */
3631 for (i = 0; i < partnatts; i++)
3633 referenced.classId = OperatorClassRelationId;
3634 referenced.objectId = partopclass[i];
3635 referenced.objectSubId = 0;
3637 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3639 /* The default collation is pinned, so don't bother recording it */
3640 if (OidIsValid(partcollation[i]) &&
3641 partcollation[i] != DEFAULT_COLLATION_OID)
3643 referenced.classId = CollationRelationId;
3644 referenced.objectId = partcollation[i];
3645 referenced.objectSubId = 0;
3647 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
3652 * The partitioning columns are made internally dependent on the table,
3653 * because we cannot drop any of them without dropping the whole table.
3654 * (ATExecDropColumn independently enforces that, but it's not bulletproof
3655 * so we need the dependencies too.)
3657 for (i = 0; i < partnatts; i++)
3659 if (partattrs[i] == 0)
3660 continue; /* ignore expressions here */
3662 referenced.classId = RelationRelationId;
3663 referenced.objectId = RelationGetRelid(rel);
3664 referenced.objectSubId = partattrs[i];
3666 recordDependencyOn(&referenced, &myself, DEPENDENCY_INTERNAL);
3670 * Also consider anything mentioned in partition expressions. External
3671 * references (e.g. functions) get NORMAL dependencies. Table columns
3672 * mentioned in the expressions are handled the same as plain partitioning
3673 * columns, i.e. they become internally dependent on the whole table.
3675 if (partexprs)
3676 recordDependencyOnSingleRelExpr(&myself,
3677 (Node *) partexprs,
3678 RelationGetRelid(rel),
3679 DEPENDENCY_NORMAL,
3680 DEPENDENCY_INTERNAL,
3681 true /* reverse the self-deps */ );
3684 * We must invalidate the relcache so that the next
3685 * CommandCounterIncrement() will cause the same to be rebuilt using the
3686 * information in just created catalog entry.
3688 CacheInvalidateRelcache(rel);
3692 * RemovePartitionKeyByRelId
3693 * Remove pg_partitioned_table entry for a relation
3695 void
3696 RemovePartitionKeyByRelId(Oid relid)
3698 Relation rel;
3699 HeapTuple tuple;
3701 rel = table_open(PartitionedRelationId, RowExclusiveLock);
3703 tuple = SearchSysCache1(PARTRELID, ObjectIdGetDatum(relid));
3704 if (!HeapTupleIsValid(tuple))
3705 elog(ERROR, "cache lookup failed for partition key of relation %u",
3706 relid);
3708 CatalogTupleDelete(rel, &tuple->t_self);
3710 ReleaseSysCache(tuple);
3711 table_close(rel, RowExclusiveLock);
3715 * StorePartitionBound
3716 * Update pg_class tuple of rel to store the partition bound and set
3717 * relispartition to true
3719 * If this is the default partition, also update the default partition OID in
3720 * pg_partitioned_table.
3722 * Also, invalidate the parent's relcache, so that the next rebuild will load
3723 * the new partition's info into its partition descriptor. If there is a
3724 * default partition, we must invalidate its relcache entry as well.
3726 void
3727 StorePartitionBound(Relation rel, Relation parent, PartitionBoundSpec *bound)
3729 Relation classRel;
3730 HeapTuple tuple,
3731 newtuple;
3732 Datum new_val[Natts_pg_class];
3733 bool new_null[Natts_pg_class],
3734 new_repl[Natts_pg_class];
3735 Oid defaultPartOid;
3737 /* Update pg_class tuple */
3738 classRel = table_open(RelationRelationId, RowExclusiveLock);
3739 tuple = SearchSysCacheCopy1(RELOID,
3740 ObjectIdGetDatum(RelationGetRelid(rel)));
3741 if (!HeapTupleIsValid(tuple))
3742 elog(ERROR, "cache lookup failed for relation %u",
3743 RelationGetRelid(rel));
3745 #ifdef USE_ASSERT_CHECKING
3747 Form_pg_class classForm;
3748 bool isnull;
3750 classForm = (Form_pg_class) GETSTRUCT(tuple);
3751 Assert(!classForm->relispartition);
3752 (void) SysCacheGetAttr(RELOID, tuple, Anum_pg_class_relpartbound,
3753 &isnull);
3754 Assert(isnull);
3756 #endif
3758 /* Fill in relpartbound value */
3759 memset(new_val, 0, sizeof(new_val));
3760 memset(new_null, false, sizeof(new_null));
3761 memset(new_repl, false, sizeof(new_repl));
3762 new_val[Anum_pg_class_relpartbound - 1] = CStringGetTextDatum(nodeToString(bound));
3763 new_null[Anum_pg_class_relpartbound - 1] = false;
3764 new_repl[Anum_pg_class_relpartbound - 1] = true;
3765 newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
3766 new_val, new_null, new_repl);
3767 /* Also set the flag */
3768 ((Form_pg_class) GETSTRUCT(newtuple))->relispartition = true;
3769 CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
3770 heap_freetuple(newtuple);
3771 table_close(classRel, RowExclusiveLock);
3774 * If we're storing bounds for the default partition, update
3775 * pg_partitioned_table too.
3777 if (bound->is_default)
3778 update_default_partition_oid(RelationGetRelid(parent),
3779 RelationGetRelid(rel));
3781 /* Make these updates visible */
3782 CommandCounterIncrement();
3785 * The partition constraint for the default partition depends on the
3786 * partition bounds of every other partition, so we must invalidate the
3787 * relcache entry for that partition every time a partition is added or
3788 * removed.
3790 defaultPartOid = get_default_oid_from_partdesc(RelationGetPartitionDesc(parent));
3791 if (OidIsValid(defaultPartOid))
3792 CacheInvalidateRelcacheByRelid(defaultPartOid);
3794 CacheInvalidateRelcache(parent);