Move routines to manipulate WAL into PostgreSQL::Test::Cluster
[pgsql.git] / src / backend / commands / schemacmds.c
blob546160f09410e1e9199dda4a224ee410daa7b8d2
1 /*-------------------------------------------------------------------------
3 * schemacmds.c
4 * schema creation/manipulation commands
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/backend/commands/schemacmds.c
13 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/htup_details.h"
18 #include "access/table.h"
19 #include "access/xact.h"
20 #include "catalog/catalog.h"
21 #include "catalog/dependency.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/objectaccess.h"
25 #include "catalog/pg_authid.h"
26 #include "catalog/pg_database.h"
27 #include "catalog/pg_namespace.h"
28 #include "commands/dbcommands.h"
29 #include "commands/event_trigger.h"
30 #include "commands/schemacmds.h"
31 #include "miscadmin.h"
32 #include "parser/parse_utilcmd.h"
33 #include "parser/scansup.h"
34 #include "tcop/utility.h"
35 #include "utils/acl.h"
36 #include "utils/builtins.h"
37 #include "utils/rel.h"
38 #include "utils/syscache.h"
40 static void AlterSchemaOwner_internal(HeapTuple tup, Relation rel, Oid newOwnerId);
43 * CREATE SCHEMA
45 * Note: caller should pass in location information for the whole
46 * CREATE SCHEMA statement, which in turn we pass down as the location
47 * of the component commands. This comports with our general plan of
48 * reporting location/len for the whole command even when executing
49 * a subquery.
51 Oid
52 CreateSchemaCommand(CreateSchemaStmt *stmt, const char *queryString,
53 int stmt_location, int stmt_len)
55 const char *schemaName = stmt->schemaname;
56 Oid namespaceId;
57 List *parsetree_list;
58 ListCell *parsetree_item;
59 Oid owner_uid;
60 Oid saved_uid;
61 int save_sec_context;
62 int save_nestlevel;
63 char *nsp = namespace_search_path;
64 AclResult aclresult;
65 ObjectAddress address;
66 StringInfoData pathbuf;
68 GetUserIdAndSecContext(&saved_uid, &save_sec_context);
71 * Who is supposed to own the new schema?
73 if (stmt->authrole)
74 owner_uid = get_rolespec_oid(stmt->authrole, false);
75 else
76 owner_uid = saved_uid;
78 /* fill schema name with the user name if not specified */
79 if (!schemaName)
81 HeapTuple tuple;
83 tuple = SearchSysCache1(AUTHOID, ObjectIdGetDatum(owner_uid));
84 if (!HeapTupleIsValid(tuple))
85 elog(ERROR, "cache lookup failed for role %u", owner_uid);
86 schemaName =
87 pstrdup(NameStr(((Form_pg_authid) GETSTRUCT(tuple))->rolname));
88 ReleaseSysCache(tuple);
92 * To create a schema, must have schema-create privilege on the current
93 * database and must be able to become the target role (this does not
94 * imply that the target role itself must have create-schema privilege).
95 * The latter provision guards against "giveaway" attacks. Note that a
96 * superuser will always have both of these privileges a fortiori.
98 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, saved_uid, ACL_CREATE);
99 if (aclresult != ACLCHECK_OK)
100 aclcheck_error(aclresult, OBJECT_DATABASE,
101 get_database_name(MyDatabaseId));
103 check_can_set_role(saved_uid, owner_uid);
105 /* Additional check to protect reserved schema names */
106 if (!allowSystemTableMods && IsReservedName(schemaName))
107 ereport(ERROR,
108 (errcode(ERRCODE_RESERVED_NAME),
109 errmsg("unacceptable schema name \"%s\"", schemaName),
110 errdetail("The prefix \"pg_\" is reserved for system schemas.")));
113 * If if_not_exists was given and the schema already exists, bail out.
114 * (Note: we needn't check this when not if_not_exists, because
115 * NamespaceCreate will complain anyway.) We could do this before making
116 * the permissions checks, but since CREATE TABLE IF NOT EXISTS makes its
117 * creation-permission check first, we do likewise.
119 if (stmt->if_not_exists)
121 namespaceId = get_namespace_oid(schemaName, true);
122 if (OidIsValid(namespaceId))
125 * If we are in an extension script, insist that the pre-existing
126 * object be a member of the extension, to avoid security risks.
128 ObjectAddressSet(address, NamespaceRelationId, namespaceId);
129 checkMembershipInCurrentExtension(&address);
131 /* OK to skip */
132 ereport(NOTICE,
133 (errcode(ERRCODE_DUPLICATE_SCHEMA),
134 errmsg("schema \"%s\" already exists, skipping",
135 schemaName)));
136 return InvalidOid;
141 * If the requested authorization is different from the current user,
142 * temporarily set the current user so that the object(s) will be created
143 * with the correct ownership.
145 * (The setting will be restored at the end of this routine, or in case of
146 * error, transaction abort will clean things up.)
148 if (saved_uid != owner_uid)
149 SetUserIdAndSecContext(owner_uid,
150 save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
152 /* Create the schema's namespace */
153 namespaceId = NamespaceCreate(schemaName, owner_uid, false);
155 /* Advance cmd counter to make the namespace visible */
156 CommandCounterIncrement();
159 * Prepend the new schema to the current search path.
161 * We use the equivalent of a function SET option to allow the setting to
162 * persist for exactly the duration of the schema creation. guc.c also
163 * takes care of undoing the setting on error.
165 save_nestlevel = NewGUCNestLevel();
167 initStringInfo(&pathbuf);
168 appendStringInfoString(&pathbuf, quote_identifier(schemaName));
170 while (scanner_isspace(*nsp))
171 nsp++;
173 if (*nsp != '\0')
174 appendStringInfo(&pathbuf, ", %s", nsp);
176 (void) set_config_option("search_path", pathbuf.data,
177 PGC_USERSET, PGC_S_SESSION,
178 GUC_ACTION_SAVE, true, 0, false);
181 * Report the new schema to possibly interested event triggers. Note we
182 * must do this here and not in ProcessUtilitySlow because otherwise the
183 * objects created below are reported before the schema, which would be
184 * wrong.
186 ObjectAddressSet(address, NamespaceRelationId, namespaceId);
187 EventTriggerCollectSimpleCommand(address, InvalidObjectAddress,
188 (Node *) stmt);
191 * Examine the list of commands embedded in the CREATE SCHEMA command, and
192 * reorganize them into a sequentially executable order with no forward
193 * references. Note that the result is still a list of raw parsetrees ---
194 * we cannot, in general, run parse analysis on one statement until we
195 * have actually executed the prior ones.
197 parsetree_list = transformCreateSchemaStmtElements(stmt->schemaElts,
198 schemaName);
201 * Execute each command contained in the CREATE SCHEMA. Since the grammar
202 * allows only utility commands in CREATE SCHEMA, there is no need to pass
203 * them through parse_analyze_*() or the rewriter; we can just hand them
204 * straight to ProcessUtility.
206 foreach(parsetree_item, parsetree_list)
208 Node *stmt = (Node *) lfirst(parsetree_item);
209 PlannedStmt *wrapper;
211 /* need to make a wrapper PlannedStmt */
212 wrapper = makeNode(PlannedStmt);
213 wrapper->commandType = CMD_UTILITY;
214 wrapper->canSetTag = false;
215 wrapper->utilityStmt = stmt;
216 wrapper->stmt_location = stmt_location;
217 wrapper->stmt_len = stmt_len;
219 /* do this step */
220 ProcessUtility(wrapper,
221 queryString,
222 false,
223 PROCESS_UTILITY_SUBCOMMAND,
224 NULL,
225 NULL,
226 None_Receiver,
227 NULL);
229 /* make sure later steps can see the object created here */
230 CommandCounterIncrement();
234 * Restore the GUC variable search_path we set above.
236 AtEOXact_GUC(true, save_nestlevel);
238 /* Reset current user and security context */
239 SetUserIdAndSecContext(saved_uid, save_sec_context);
241 return namespaceId;
246 * Rename schema
248 ObjectAddress
249 RenameSchema(const char *oldname, const char *newname)
251 Oid nspOid;
252 HeapTuple tup;
253 Relation rel;
254 AclResult aclresult;
255 ObjectAddress address;
256 Form_pg_namespace nspform;
258 rel = table_open(NamespaceRelationId, RowExclusiveLock);
260 tup = SearchSysCacheCopy1(NAMESPACENAME, CStringGetDatum(oldname));
261 if (!HeapTupleIsValid(tup))
262 ereport(ERROR,
263 (errcode(ERRCODE_UNDEFINED_SCHEMA),
264 errmsg("schema \"%s\" does not exist", oldname)));
266 nspform = (Form_pg_namespace) GETSTRUCT(tup);
267 nspOid = nspform->oid;
269 /* make sure the new name doesn't exist */
270 if (OidIsValid(get_namespace_oid(newname, true)))
271 ereport(ERROR,
272 (errcode(ERRCODE_DUPLICATE_SCHEMA),
273 errmsg("schema \"%s\" already exists", newname)));
275 /* must be owner */
276 if (!object_ownercheck(NamespaceRelationId, nspOid, GetUserId()))
277 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
278 oldname);
280 /* must have CREATE privilege on database */
281 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
282 if (aclresult != ACLCHECK_OK)
283 aclcheck_error(aclresult, OBJECT_DATABASE,
284 get_database_name(MyDatabaseId));
286 if (!allowSystemTableMods && IsReservedName(newname))
287 ereport(ERROR,
288 (errcode(ERRCODE_RESERVED_NAME),
289 errmsg("unacceptable schema name \"%s\"", newname),
290 errdetail("The prefix \"pg_\" is reserved for system schemas.")));
292 /* rename */
293 namestrcpy(&nspform->nspname, newname);
294 CatalogTupleUpdate(rel, &tup->t_self, tup);
296 InvokeObjectPostAlterHook(NamespaceRelationId, nspOid, 0);
298 ObjectAddressSet(address, NamespaceRelationId, nspOid);
300 table_close(rel, NoLock);
301 heap_freetuple(tup);
303 return address;
306 void
307 AlterSchemaOwner_oid(Oid schemaoid, Oid newOwnerId)
309 HeapTuple tup;
310 Relation rel;
312 rel = table_open(NamespaceRelationId, RowExclusiveLock);
314 tup = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(schemaoid));
315 if (!HeapTupleIsValid(tup))
316 elog(ERROR, "cache lookup failed for schema %u", schemaoid);
318 AlterSchemaOwner_internal(tup, rel, newOwnerId);
320 ReleaseSysCache(tup);
322 table_close(rel, RowExclusiveLock);
327 * Change schema owner
329 ObjectAddress
330 AlterSchemaOwner(const char *name, Oid newOwnerId)
332 Oid nspOid;
333 HeapTuple tup;
334 Relation rel;
335 ObjectAddress address;
336 Form_pg_namespace nspform;
338 rel = table_open(NamespaceRelationId, RowExclusiveLock);
340 tup = SearchSysCache1(NAMESPACENAME, CStringGetDatum(name));
341 if (!HeapTupleIsValid(tup))
342 ereport(ERROR,
343 (errcode(ERRCODE_UNDEFINED_SCHEMA),
344 errmsg("schema \"%s\" does not exist", name)));
346 nspform = (Form_pg_namespace) GETSTRUCT(tup);
347 nspOid = nspform->oid;
349 AlterSchemaOwner_internal(tup, rel, newOwnerId);
351 ObjectAddressSet(address, NamespaceRelationId, nspOid);
353 ReleaseSysCache(tup);
355 table_close(rel, RowExclusiveLock);
357 return address;
360 static void
361 AlterSchemaOwner_internal(HeapTuple tup, Relation rel, Oid newOwnerId)
363 Form_pg_namespace nspForm;
365 Assert(tup->t_tableOid == NamespaceRelationId);
366 Assert(RelationGetRelid(rel) == NamespaceRelationId);
368 nspForm = (Form_pg_namespace) GETSTRUCT(tup);
371 * If the new owner is the same as the existing owner, consider the
372 * command to have succeeded. This is for dump restoration purposes.
374 if (nspForm->nspowner != newOwnerId)
376 Datum repl_val[Natts_pg_namespace];
377 bool repl_null[Natts_pg_namespace];
378 bool repl_repl[Natts_pg_namespace];
379 Acl *newAcl;
380 Datum aclDatum;
381 bool isNull;
382 HeapTuple newtuple;
383 AclResult aclresult;
385 /* Otherwise, must be owner of the existing object */
386 if (!object_ownercheck(NamespaceRelationId, nspForm->oid, GetUserId()))
387 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
388 NameStr(nspForm->nspname));
390 /* Must be able to become new owner */
391 check_can_set_role(GetUserId(), newOwnerId);
394 * must have create-schema rights
396 * NOTE: This is different from other alter-owner checks in that the
397 * current user is checked for create privileges instead of the
398 * destination owner. This is consistent with the CREATE case for
399 * schemas. Because superusers will always have this right, we need
400 * no special case for them.
402 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(),
403 ACL_CREATE);
404 if (aclresult != ACLCHECK_OK)
405 aclcheck_error(aclresult, OBJECT_DATABASE,
406 get_database_name(MyDatabaseId));
408 memset(repl_null, false, sizeof(repl_null));
409 memset(repl_repl, false, sizeof(repl_repl));
411 repl_repl[Anum_pg_namespace_nspowner - 1] = true;
412 repl_val[Anum_pg_namespace_nspowner - 1] = ObjectIdGetDatum(newOwnerId);
415 * Determine the modified ACL for the new owner. This is only
416 * necessary when the ACL is non-null.
418 aclDatum = SysCacheGetAttr(NAMESPACENAME, tup,
419 Anum_pg_namespace_nspacl,
420 &isNull);
421 if (!isNull)
423 newAcl = aclnewowner(DatumGetAclP(aclDatum),
424 nspForm->nspowner, newOwnerId);
425 repl_repl[Anum_pg_namespace_nspacl - 1] = true;
426 repl_val[Anum_pg_namespace_nspacl - 1] = PointerGetDatum(newAcl);
429 newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
431 CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
433 heap_freetuple(newtuple);
435 /* Update owner dependency reference */
436 changeDependencyOnOwner(NamespaceRelationId, nspForm->oid,
437 newOwnerId);
440 InvokeObjectPostAlterHook(NamespaceRelationId,
441 nspForm->oid, 0);