Check that connection limit is within valid range. IOW, not < -1.
[PostgreSQL.git] / src / backend / commands / dbcommands.c
blobd08e3d0853999fadbc24def415f4aea8b3dcf607
1 /*-------------------------------------------------------------------------
3 * dbcommands.c
4 * Database management commands (create/drop database).
6 * Note: database creation/destruction commands use exclusive locks on
7 * the database objects (as expressed by LockSharedObject()) to avoid
8 * stepping on each others' toes. Formerly we used table-level locks
9 * on pg_database, but that's too coarse-grained.
11 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
15 * IDENTIFICATION
16 * $PostgreSQL$
18 *-------------------------------------------------------------------------
20 #include "postgres.h"
22 #include <fcntl.h>
23 #include <locale.h>
24 #include <unistd.h>
25 #include <sys/stat.h>
27 #include "access/genam.h"
28 #include "access/heapam.h"
29 #include "access/xact.h"
30 #include "access/xlogutils.h"
31 #include "catalog/catalog.h"
32 #include "catalog/dependency.h"
33 #include "catalog/indexing.h"
34 #include "catalog/pg_authid.h"
35 #include "catalog/pg_database.h"
36 #include "catalog/pg_tablespace.h"
37 #include "commands/comment.h"
38 #include "commands/dbcommands.h"
39 #include "commands/tablespace.h"
40 #include "mb/pg_wchar.h"
41 #include "miscadmin.h"
42 #include "pgstat.h"
43 #include "postmaster/bgwriter.h"
44 #include "storage/bufmgr.h"
45 #include "storage/fd.h"
46 #include "storage/lmgr.h"
47 #include "storage/ipc.h"
48 #include "storage/procarray.h"
49 #include "storage/smgr.h"
50 #include "utils/acl.h"
51 #include "utils/builtins.h"
52 #include "utils/flatfiles.h"
53 #include "utils/fmgroids.h"
54 #include "utils/guc.h"
55 #include "utils/lsyscache.h"
56 #include "utils/pg_locale.h"
57 #include "utils/snapmgr.h"
58 #include "utils/syscache.h"
59 #include "utils/tqual.h"
62 typedef struct
64 Oid src_dboid; /* source (template) DB */
65 Oid dest_dboid; /* DB we are trying to create */
66 } createdb_failure_params;
68 typedef struct
70 Oid dest_dboid; /* DB we are trying to move */
71 Oid dest_tsoid; /* tablespace we are trying to move to */
72 } movedb_failure_params;
74 /* non-export function prototypes */
75 static void createdb_failure_callback(int code, Datum arg);
76 static void movedb(const char *dbname, const char *tblspcname);
77 static void movedb_failure_callback(int code, Datum arg);
78 static bool get_db_info(const char *name, LOCKMODE lockmode,
79 Oid *dbIdP, Oid *ownerIdP,
80 int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
81 Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
82 Oid *dbTablespace, char **dbCollate, char **dbCtype);
83 static bool have_createdb_privilege(void);
84 static void remove_dbtablespaces(Oid db_id);
85 static bool check_db_file_conflict(Oid db_id);
86 static int errdetail_busy_db(int notherbackends, int npreparedxacts);
90 * CREATE DATABASE
92 void
93 createdb(const CreatedbStmt *stmt)
95 HeapScanDesc scan;
96 Relation rel;
97 Oid src_dboid;
98 Oid src_owner;
99 int src_encoding;
100 char *src_collate;
101 char *src_ctype;
102 bool src_istemplate;
103 bool src_allowconn;
104 Oid src_lastsysoid;
105 TransactionId src_frozenxid;
106 Oid src_deftablespace;
107 volatile Oid dst_deftablespace;
108 Relation pg_database_rel;
109 HeapTuple tuple;
110 Datum new_record[Natts_pg_database];
111 bool new_record_nulls[Natts_pg_database];
112 Oid dboid;
113 Oid datdba;
114 ListCell *option;
115 DefElem *dtablespacename = NULL;
116 DefElem *downer = NULL;
117 DefElem *dtemplate = NULL;
118 DefElem *dencoding = NULL;
119 DefElem *dcollate = NULL;
120 DefElem *dctype = NULL;
121 DefElem *dconnlimit = NULL;
122 char *dbname = stmt->dbname;
123 char *dbowner = NULL;
124 const char *dbtemplate = NULL;
125 char *dbcollate = NULL;
126 char *dbctype = NULL;
127 int encoding = -1;
128 int dbconnlimit = -1;
129 int ctype_encoding;
130 int collate_encoding;
131 int notherbackends;
132 int npreparedxacts;
133 createdb_failure_params fparms;
135 /* Extract options from the statement node tree */
136 foreach(option, stmt->options)
138 DefElem *defel = (DefElem *) lfirst(option);
140 if (strcmp(defel->defname, "tablespace") == 0)
142 if (dtablespacename)
143 ereport(ERROR,
144 (errcode(ERRCODE_SYNTAX_ERROR),
145 errmsg("conflicting or redundant options")));
146 dtablespacename = defel;
148 else if (strcmp(defel->defname, "owner") == 0)
150 if (downer)
151 ereport(ERROR,
152 (errcode(ERRCODE_SYNTAX_ERROR),
153 errmsg("conflicting or redundant options")));
154 downer = defel;
156 else if (strcmp(defel->defname, "template") == 0)
158 if (dtemplate)
159 ereport(ERROR,
160 (errcode(ERRCODE_SYNTAX_ERROR),
161 errmsg("conflicting or redundant options")));
162 dtemplate = defel;
164 else if (strcmp(defel->defname, "encoding") == 0)
166 if (dencoding)
167 ereport(ERROR,
168 (errcode(ERRCODE_SYNTAX_ERROR),
169 errmsg("conflicting or redundant options")));
170 dencoding = defel;
172 else if (strcmp(defel->defname, "collate") == 0)
174 if (dcollate)
175 ereport(ERROR,
176 (errcode(ERRCODE_SYNTAX_ERROR),
177 errmsg("conflicting or redundant options")));
178 dcollate = defel;
180 else if (strcmp(defel->defname, "ctype") == 0)
182 if (dctype)
183 ereport(ERROR,
184 (errcode(ERRCODE_SYNTAX_ERROR),
185 errmsg("conflicting or redundant options")));
186 dctype = defel;
188 else if (strcmp(defel->defname, "connectionlimit") == 0)
190 if (dconnlimit)
191 ereport(ERROR,
192 (errcode(ERRCODE_SYNTAX_ERROR),
193 errmsg("conflicting or redundant options")));
194 dconnlimit = defel;
196 else if (strcmp(defel->defname, "location") == 0)
198 ereport(WARNING,
199 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
200 errmsg("LOCATION is not supported anymore"),
201 errhint("Consider using tablespaces instead.")));
203 else
204 elog(ERROR, "option \"%s\" not recognized",
205 defel->defname);
208 if (downer && downer->arg)
209 dbowner = strVal(downer->arg);
210 if (dtemplate && dtemplate->arg)
211 dbtemplate = strVal(dtemplate->arg);
212 if (dencoding && dencoding->arg)
214 const char *encoding_name;
216 if (IsA(dencoding->arg, Integer))
218 encoding = intVal(dencoding->arg);
219 encoding_name = pg_encoding_to_char(encoding);
220 if (strcmp(encoding_name, "") == 0 ||
221 pg_valid_server_encoding(encoding_name) < 0)
222 ereport(ERROR,
223 (errcode(ERRCODE_UNDEFINED_OBJECT),
224 errmsg("%d is not a valid encoding code",
225 encoding)));
227 else if (IsA(dencoding->arg, String))
229 encoding_name = strVal(dencoding->arg);
230 encoding = pg_valid_server_encoding(encoding_name);
231 if (encoding < 0)
232 ereport(ERROR,
233 (errcode(ERRCODE_UNDEFINED_OBJECT),
234 errmsg("%s is not a valid encoding name",
235 encoding_name)));
237 else
238 elog(ERROR, "unrecognized node type: %d",
239 nodeTag(dencoding->arg));
241 if (dcollate && dcollate->arg)
242 dbcollate = strVal(dcollate->arg);
243 if (dctype && dctype->arg)
244 dbctype = strVal(dctype->arg);
246 if (dconnlimit && dconnlimit->arg)
248 dbconnlimit = intVal(dconnlimit->arg);
249 if (dbconnlimit < -1)
250 ereport(ERROR,
251 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
252 errmsg("invalid connection limit: %d", dbconnlimit)));
255 /* obtain OID of proposed owner */
256 if (dbowner)
257 datdba = get_roleid_checked(dbowner);
258 else
259 datdba = GetUserId();
262 * To create a database, must have createdb privilege and must be able to
263 * become the target role (this does not imply that the target role itself
264 * must have createdb privilege). The latter provision guards against
265 * "giveaway" attacks. Note that a superuser will always have both of
266 * these privileges a fortiori.
268 if (!have_createdb_privilege())
269 ereport(ERROR,
270 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
271 errmsg("permission denied to create database")));
273 check_is_member_of_role(GetUserId(), datdba);
276 * Lookup database (template) to be cloned, and obtain share lock on it.
277 * ShareLock allows two CREATE DATABASEs to work from the same template
278 * concurrently, while ensuring no one is busy dropping it in parallel
279 * (which would be Very Bad since we'd likely get an incomplete copy
280 * without knowing it). This also prevents any new connections from being
281 * made to the source until we finish copying it, so we can be sure it
282 * won't change underneath us.
284 if (!dbtemplate)
285 dbtemplate = "template1"; /* Default template database name */
287 if (!get_db_info(dbtemplate, ShareLock,
288 &src_dboid, &src_owner, &src_encoding,
289 &src_istemplate, &src_allowconn, &src_lastsysoid,
290 &src_frozenxid, &src_deftablespace,
291 &src_collate, &src_ctype))
292 ereport(ERROR,
293 (errcode(ERRCODE_UNDEFINED_DATABASE),
294 errmsg("template database \"%s\" does not exist",
295 dbtemplate)));
298 * Permission check: to copy a DB that's not marked datistemplate, you
299 * must be superuser or the owner thereof.
301 if (!src_istemplate)
303 if (!pg_database_ownercheck(src_dboid, GetUserId()))
304 ereport(ERROR,
305 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
306 errmsg("permission denied to copy database \"%s\"",
307 dbtemplate)));
310 /* If encoding or locales are defaulted, use source's setting */
311 if (encoding < 0)
312 encoding = src_encoding;
313 if (dbcollate == NULL)
314 dbcollate = src_collate;
315 if (dbctype == NULL)
316 dbctype = src_ctype;
318 /* Some encodings are client only */
319 if (!PG_VALID_BE_ENCODING(encoding))
320 ereport(ERROR,
321 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
322 errmsg("invalid server encoding %d", encoding)));
324 /* Check that the chosen locales are valid */
325 if (!check_locale(LC_COLLATE, dbcollate))
326 ereport(ERROR,
327 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
328 errmsg("invalid locale name %s", dbcollate)));
329 if (!check_locale(LC_CTYPE, dbctype))
330 ereport(ERROR,
331 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
332 errmsg("invalid locale name %s", dbctype)));
335 * Check whether encoding matches server locale settings. We allow
336 * mismatch in three cases:
338 * 1. ctype_encoding = SQL_ASCII, which means either that the locale is
339 * C/POSIX which works with any encoding, or that we couldn't determine
340 * the locale's encoding and have to trust the user to get it right.
342 * 2. selected encoding is SQL_ASCII, but only if you're a superuser. This
343 * is risky but we have historically allowed it --- notably, the
344 * regression tests require it.
346 * 3. selected encoding is UTF8 and platform is win32. This is because
347 * UTF8 is a pseudo codepage that is supported in all locales since it's
348 * converted to UTF16 before being used.
350 * Note: if you change this policy, fix initdb to match.
352 ctype_encoding = pg_get_encoding_from_locale(dbctype);
353 collate_encoding = pg_get_encoding_from_locale(dbcollate);
355 if (!(ctype_encoding == encoding ||
356 ctype_encoding == PG_SQL_ASCII ||
357 #ifdef WIN32
358 encoding == PG_UTF8 ||
359 #endif
360 (encoding == PG_SQL_ASCII && superuser())))
361 ereport(ERROR,
362 (errmsg("encoding %s does not match locale %s",
363 pg_encoding_to_char(encoding),
364 dbctype),
365 errdetail("The chosen CTYPE setting requires encoding %s.",
366 pg_encoding_to_char(ctype_encoding))));
368 if (!(collate_encoding == encoding ||
369 collate_encoding == PG_SQL_ASCII ||
370 #ifdef WIN32
371 encoding == PG_UTF8 ||
372 #endif
373 (encoding == PG_SQL_ASCII && superuser())))
374 ereport(ERROR,
375 (errmsg("encoding %s does not match locale %s",
376 pg_encoding_to_char(encoding),
377 dbcollate),
378 errdetail("The chosen COLLATE setting requires encoding %s.",
379 pg_encoding_to_char(collate_encoding))));
382 * Check that the new locale is compatible with the source database.
384 * We know that template0 doesn't contain any indexes that depend on
385 * collation or ctype, so template0 can be used as template for
386 * any locale.
388 if (strcmp(dbtemplate, "template0") != 0)
390 if (strcmp(dbcollate, src_collate))
391 ereport(ERROR,
392 (errmsg("new collation is incompatible with the collation of the template database (%s)", src_collate),
393 errhint("Use the same collation as in the template database, or use template0 as template")));
395 if (strcmp(dbctype, src_ctype))
396 ereport(ERROR,
397 (errmsg("new ctype is incompatible with the ctype of the template database (%s)", src_ctype),
398 errhint("Use the same ctype as in the template database, or use template0 as template")));
401 /* Resolve default tablespace for new database */
402 if (dtablespacename && dtablespacename->arg)
404 char *tablespacename;
405 AclResult aclresult;
407 tablespacename = strVal(dtablespacename->arg);
408 dst_deftablespace = get_tablespace_oid(tablespacename);
409 if (!OidIsValid(dst_deftablespace))
410 ereport(ERROR,
411 (errcode(ERRCODE_UNDEFINED_OBJECT),
412 errmsg("tablespace \"%s\" does not exist",
413 tablespacename)));
414 /* check permissions */
415 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
416 ACL_CREATE);
417 if (aclresult != ACLCHECK_OK)
418 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
419 tablespacename);
421 /* pg_global must never be the default tablespace */
422 if (dst_deftablespace == GLOBALTABLESPACE_OID)
423 ereport(ERROR,
424 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
425 errmsg("pg_global cannot be used as default tablespace")));
428 * If we are trying to change the default tablespace of the template,
429 * we require that the template not have any files in the new default
430 * tablespace. This is necessary because otherwise the copied
431 * database would contain pg_class rows that refer to its default
432 * tablespace both explicitly (by OID) and implicitly (as zero), which
433 * would cause problems. For example another CREATE DATABASE using
434 * the copied database as template, and trying to change its default
435 * tablespace again, would yield outright incorrect results (it would
436 * improperly move tables to the new default tablespace that should
437 * stay in the same tablespace).
439 if (dst_deftablespace != src_deftablespace)
441 char *srcpath;
442 struct stat st;
444 srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
446 if (stat(srcpath, &st) == 0 &&
447 S_ISDIR(st.st_mode) &&
448 !directory_is_empty(srcpath))
449 ereport(ERROR,
450 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
451 errmsg("cannot assign new default tablespace \"%s\"",
452 tablespacename),
453 errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
454 dbtemplate)));
455 pfree(srcpath);
458 else
460 /* Use template database's default tablespace */
461 dst_deftablespace = src_deftablespace;
462 /* Note there is no additional permission check in this path */
466 * Check for db name conflict. This is just to give a more friendly error
467 * message than "unique index violation". There's a race condition but
468 * we're willing to accept the less friendly message in that case.
470 if (OidIsValid(get_database_oid(dbname)))
471 ereport(ERROR,
472 (errcode(ERRCODE_DUPLICATE_DATABASE),
473 errmsg("database \"%s\" already exists", dbname)));
476 * The source DB can't have any active backends, except this one
477 * (exception is to allow CREATE DB while connected to template1).
478 * Otherwise we might copy inconsistent data.
480 * This should be last among the basic error checks, because it involves
481 * potential waiting; we may as well throw an error first if we're gonna
482 * throw one.
484 if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
485 ereport(ERROR,
486 (errcode(ERRCODE_OBJECT_IN_USE),
487 errmsg("source database \"%s\" is being accessed by other users",
488 dbtemplate),
489 errdetail_busy_db(notherbackends, npreparedxacts)));
492 * Select an OID for the new database, checking that it doesn't have a
493 * filename conflict with anything already existing in the tablespace
494 * directories.
496 pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
500 dboid = GetNewOid(pg_database_rel);
501 } while (check_db_file_conflict(dboid));
504 * Insert a new tuple into pg_database. This establishes our ownership of
505 * the new database name (anyone else trying to insert the same name will
506 * block on the unique index, and fail after we commit).
509 /* Form tuple */
510 MemSet(new_record, 0, sizeof(new_record));
511 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
513 new_record[Anum_pg_database_datname - 1] =
514 DirectFunctionCall1(namein, CStringGetDatum(dbname));
515 new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
516 new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
517 new_record[Anum_pg_database_datcollate - 1] =
518 DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
519 new_record[Anum_pg_database_datctype - 1] =
520 DirectFunctionCall1(namein, CStringGetDatum(dbctype));
521 new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
522 new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
523 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
524 new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
525 new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
526 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
529 * We deliberately set datconfig and datacl to defaults (NULL), rather
530 * than copying them from the template database. Copying datacl would be
531 * a bad idea when the owner is not the same as the template's owner. It's
532 * more debatable whether datconfig should be copied.
534 new_record_nulls[Anum_pg_database_datconfig - 1] = true;
535 new_record_nulls[Anum_pg_database_datacl - 1] = true;
537 tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
538 new_record, new_record_nulls);
540 HeapTupleSetOid(tuple, dboid);
542 simple_heap_insert(pg_database_rel, tuple);
544 /* Update indexes */
545 CatalogUpdateIndexes(pg_database_rel, tuple);
548 * Now generate additional catalog entries associated with the new DB
551 /* Register owner dependency */
552 recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
554 /* Create pg_shdepend entries for objects within database */
555 copyTemplateDependencies(src_dboid, dboid);
558 * Force a checkpoint before starting the copy. This will force dirty
559 * buffers out to disk, to ensure source database is up-to-date on disk
560 * for the copy. FlushDatabaseBuffers() would suffice for that, but we
561 * also want to process any pending unlink requests. Otherwise, if a
562 * checkpoint happened while we're copying files, a file might be deleted
563 * just when we're about to copy it, causing the lstat() call in copydir()
564 * to fail with ENOENT.
566 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
569 * Once we start copying subdirectories, we need to be able to clean 'em
570 * up if we fail. Use an ENSURE block to make sure this happens. (This
571 * is not a 100% solution, because of the possibility of failure during
572 * transaction commit after we leave this routine, but it should handle
573 * most scenarios.)
575 fparms.src_dboid = src_dboid;
576 fparms.dest_dboid = dboid;
577 PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
578 PointerGetDatum(&fparms));
581 * Iterate through all tablespaces of the template database, and copy
582 * each one to the new database.
584 rel = heap_open(TableSpaceRelationId, AccessShareLock);
585 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
586 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
588 Oid srctablespace = HeapTupleGetOid(tuple);
589 Oid dsttablespace;
590 char *srcpath;
591 char *dstpath;
592 struct stat st;
594 /* No need to copy global tablespace */
595 if (srctablespace == GLOBALTABLESPACE_OID)
596 continue;
598 srcpath = GetDatabasePath(src_dboid, srctablespace);
600 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
601 directory_is_empty(srcpath))
603 /* Assume we can ignore it */
604 pfree(srcpath);
605 continue;
608 if (srctablespace == src_deftablespace)
609 dsttablespace = dst_deftablespace;
610 else
611 dsttablespace = srctablespace;
613 dstpath = GetDatabasePath(dboid, dsttablespace);
616 * Copy this subdirectory to the new location
618 * We don't need to copy subdirectories
620 copydir(srcpath, dstpath, false);
622 /* Record the filesystem change in XLOG */
624 xl_dbase_create_rec xlrec;
625 XLogRecData rdata[1];
627 xlrec.db_id = dboid;
628 xlrec.tablespace_id = dsttablespace;
629 xlrec.src_db_id = src_dboid;
630 xlrec.src_tablespace_id = srctablespace;
632 rdata[0].data = (char *) &xlrec;
633 rdata[0].len = sizeof(xl_dbase_create_rec);
634 rdata[0].buffer = InvalidBuffer;
635 rdata[0].next = NULL;
637 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
640 heap_endscan(scan);
641 heap_close(rel, AccessShareLock);
644 * We force a checkpoint before committing. This effectively means
645 * that committed XLOG_DBASE_CREATE operations will never need to be
646 * replayed (at least not in ordinary crash recovery; we still have to
647 * make the XLOG entry for the benefit of PITR operations). This
648 * avoids two nasty scenarios:
650 * #1: When PITR is off, we don't XLOG the contents of newly created
651 * indexes; therefore the drop-and-recreate-whole-directory behavior
652 * of DBASE_CREATE replay would lose such indexes.
654 * #2: Since we have to recopy the source database during DBASE_CREATE
655 * replay, we run the risk of copying changes in it that were
656 * committed after the original CREATE DATABASE command but before the
657 * system crash that led to the replay. This is at least unexpected
658 * and at worst could lead to inconsistencies, eg duplicate table
659 * names.
661 * (Both of these were real bugs in releases 8.0 through 8.0.3.)
663 * In PITR replay, the first of these isn't an issue, and the second
664 * is only a risk if the CREATE DATABASE and subsequent template
665 * database change both occur while a base backup is being taken.
666 * There doesn't seem to be much we can do about that except document
667 * it as a limitation.
669 * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
670 * we can avoid this.
672 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
675 * Close pg_database, but keep lock till commit (this is important to
676 * prevent any risk of deadlock failure while updating flat file)
678 heap_close(pg_database_rel, NoLock);
681 * Set flag to update flat database file at commit. Note: this also
682 * forces synchronous commit, which minimizes the window between
683 * creation of the database files and commital of the transaction. If
684 * we crash before committing, we'll have a DB that's taking up disk
685 * space but is not in pg_database, which is not good.
687 database_file_update_needed();
689 PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
690 PointerGetDatum(&fparms));
693 /* Error cleanup callback for createdb */
694 static void
695 createdb_failure_callback(int code, Datum arg)
697 createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
700 * Release lock on source database before doing recursive remove.
701 * This is not essential but it seems desirable to release the lock
702 * as soon as possible.
704 UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
706 /* Throw away any successfully copied subdirectories */
707 remove_dbtablespaces(fparms->dest_dboid);
712 * DROP DATABASE
714 void
715 dropdb(const char *dbname, bool missing_ok)
717 Oid db_id;
718 bool db_istemplate;
719 Relation pgdbrel;
720 HeapTuple tup;
721 int notherbackends;
722 int npreparedxacts;
725 * Look up the target database's OID, and get exclusive lock on it. We
726 * need this to ensure that no new backend starts up in the target
727 * database while we are deleting it (see postinit.c), and that no one is
728 * using it as a CREATE DATABASE template or trying to delete it for
729 * themselves.
731 pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
733 if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
734 &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL))
736 if (!missing_ok)
738 ereport(ERROR,
739 (errcode(ERRCODE_UNDEFINED_DATABASE),
740 errmsg("database \"%s\" does not exist", dbname)));
742 else
744 /* Close pg_database, release the lock, since we changed nothing */
745 heap_close(pgdbrel, RowExclusiveLock);
746 ereport(NOTICE,
747 (errmsg("database \"%s\" does not exist, skipping",
748 dbname)));
749 return;
754 * Permission checks
756 if (!pg_database_ownercheck(db_id, GetUserId()))
757 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
758 dbname);
761 * Disallow dropping a DB that is marked istemplate. This is just to
762 * prevent people from accidentally dropping template0 or template1; they
763 * can do so if they're really determined ...
765 if (db_istemplate)
766 ereport(ERROR,
767 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
768 errmsg("cannot drop a template database")));
770 /* Obviously can't drop my own database */
771 if (db_id == MyDatabaseId)
772 ereport(ERROR,
773 (errcode(ERRCODE_OBJECT_IN_USE),
774 errmsg("cannot drop the currently open database")));
777 * Check for other backends in the target database. (Because we hold the
778 * database lock, no new ones can start after this.)
780 * As in CREATE DATABASE, check this after other error conditions.
782 if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
783 ereport(ERROR,
784 (errcode(ERRCODE_OBJECT_IN_USE),
785 errmsg("database \"%s\" is being accessed by other users",
786 dbname),
787 errdetail_busy_db(notherbackends, npreparedxacts)));
790 * Remove the database's tuple from pg_database.
792 tup = SearchSysCache(DATABASEOID,
793 ObjectIdGetDatum(db_id),
794 0, 0, 0);
795 if (!HeapTupleIsValid(tup))
796 elog(ERROR, "cache lookup failed for database %u", db_id);
798 simple_heap_delete(pgdbrel, &tup->t_self);
800 ReleaseSysCache(tup);
803 * Delete any comments associated with the database.
805 DeleteSharedComments(db_id, DatabaseRelationId);
808 * Remove shared dependency references for the database.
810 dropDatabaseDependencies(db_id);
813 * Drop pages for this database that are in the shared buffer cache. This
814 * is important to ensure that no remaining backend tries to write out a
815 * dirty buffer to the dead database later...
817 DropDatabaseBuffers(db_id);
820 * Tell the stats collector to forget it immediately, too.
822 pgstat_drop_database(db_id);
825 * Tell bgwriter to forget any pending fsync and unlink requests for files
826 * in the database; else the fsyncs will fail at next checkpoint, or worse,
827 * it will delete files that belong to a newly created database with the
828 * same OID.
830 ForgetDatabaseFsyncRequests(db_id);
833 * Force a checkpoint to make sure the bgwriter has received the message
834 * sent by ForgetDatabaseFsyncRequests. On Windows, this also ensures that
835 * the bgwriter doesn't hold any open files, which would cause rmdir() to
836 * fail.
838 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
841 * Remove all tablespace subdirs belonging to the database.
843 remove_dbtablespaces(db_id);
846 * Close pg_database, but keep lock till commit (this is important to
847 * prevent any risk of deadlock failure while updating flat file)
849 heap_close(pgdbrel, NoLock);
852 * Set flag to update flat database file at commit. Note: this also
853 * forces synchronous commit, which minimizes the window between removal
854 * of the database files and commital of the transaction. If we crash
855 * before committing, we'll have a DB that's gone on disk but still there
856 * according to pg_database, which is not good.
858 database_file_update_needed();
863 * Rename database
865 void
866 RenameDatabase(const char *oldname, const char *newname)
868 Oid db_id;
869 HeapTuple newtup;
870 Relation rel;
871 int notherbackends;
872 int npreparedxacts;
875 * Look up the target database's OID, and get exclusive lock on it. We
876 * need this for the same reasons as DROP DATABASE.
878 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
880 if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
881 NULL, NULL, NULL, NULL, NULL, NULL, NULL))
882 ereport(ERROR,
883 (errcode(ERRCODE_UNDEFINED_DATABASE),
884 errmsg("database \"%s\" does not exist", oldname)));
886 /* must be owner */
887 if (!pg_database_ownercheck(db_id, GetUserId()))
888 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
889 oldname);
891 /* must have createdb rights */
892 if (!have_createdb_privilege())
893 ereport(ERROR,
894 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
895 errmsg("permission denied to rename database")));
898 * Make sure the new name doesn't exist. See notes for same error in
899 * CREATE DATABASE.
901 if (OidIsValid(get_database_oid(newname)))
902 ereport(ERROR,
903 (errcode(ERRCODE_DUPLICATE_DATABASE),
904 errmsg("database \"%s\" already exists", newname)));
907 * XXX Client applications probably store the current database somewhere,
908 * so renaming it could cause confusion. On the other hand, there may not
909 * be an actual problem besides a little confusion, so think about this
910 * and decide.
912 if (db_id == MyDatabaseId)
913 ereport(ERROR,
914 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
915 errmsg("current database cannot be renamed")));
918 * Make sure the database does not have active sessions. This is the same
919 * concern as above, but applied to other sessions.
921 * As in CREATE DATABASE, check this after other error conditions.
923 if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
924 ereport(ERROR,
925 (errcode(ERRCODE_OBJECT_IN_USE),
926 errmsg("database \"%s\" is being accessed by other users",
927 oldname),
928 errdetail_busy_db(notherbackends, npreparedxacts)));
930 /* rename */
931 newtup = SearchSysCacheCopy(DATABASEOID,
932 ObjectIdGetDatum(db_id),
933 0, 0, 0);
934 if (!HeapTupleIsValid(newtup))
935 elog(ERROR, "cache lookup failed for database %u", db_id);
936 namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
937 simple_heap_update(rel, &newtup->t_self, newtup);
938 CatalogUpdateIndexes(rel, newtup);
941 * Close pg_database, but keep lock till commit (this is important to
942 * prevent any risk of deadlock failure while updating flat file)
944 heap_close(rel, NoLock);
947 * Set flag to update flat database file at commit.
949 database_file_update_needed();
954 * ALTER DATABASE SET TABLESPACE
956 static void
957 movedb(const char *dbname, const char *tblspcname)
959 Oid db_id;
960 Relation pgdbrel;
961 int notherbackends;
962 int npreparedxacts;
963 HeapTuple oldtuple, newtuple;
964 Oid src_tblspcoid, dst_tblspcoid;
965 Datum new_record[Natts_pg_database];
966 bool new_record_nulls[Natts_pg_database];
967 bool new_record_repl[Natts_pg_database];
968 ScanKeyData scankey;
969 SysScanDesc sysscan;
970 AclResult aclresult;
971 char *src_dbpath;
972 char *dst_dbpath;
973 DIR *dstdir;
974 struct dirent *xlde;
975 movedb_failure_params fparms;
978 * Look up the target database's OID, and get exclusive lock on it. We
979 * need this to ensure that no new backend starts up in the database while
980 * we are moving it, and that no one is using it as a CREATE DATABASE
981 * template or trying to delete it.
983 pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
985 if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
986 NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
987 ereport(ERROR,
988 (errcode(ERRCODE_UNDEFINED_DATABASE),
989 errmsg("database \"%s\" does not exist", dbname)));
992 * We actually need a session lock, so that the lock will persist across
993 * the commit/restart below. (We could almost get away with letting the
994 * lock be released at commit, except that someone could try to move
995 * relations of the DB back into the old directory while we rmtree() it.)
997 LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
998 AccessExclusiveLock);
1001 * Permission checks
1003 if (!pg_database_ownercheck(db_id, GetUserId()))
1004 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1005 dbname);
1008 * Obviously can't move the tables of my own database
1010 if (db_id == MyDatabaseId)
1011 ereport(ERROR,
1012 (errcode(ERRCODE_OBJECT_IN_USE),
1013 errmsg("cannot change the tablespace of the currently open database")));
1016 * Get tablespace's oid
1018 dst_tblspcoid = get_tablespace_oid(tblspcname);
1019 if (dst_tblspcoid == InvalidOid)
1020 ereport(ERROR,
1021 (errcode(ERRCODE_UNDEFINED_DATABASE),
1022 errmsg("tablespace \"%s\" does not exist", tblspcname)));
1025 * Permission checks
1027 aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(),
1028 ACL_CREATE);
1029 if (aclresult != ACLCHECK_OK)
1030 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
1031 tblspcname);
1034 * pg_global must never be the default tablespace
1036 if (dst_tblspcoid == GLOBALTABLESPACE_OID)
1037 ereport(ERROR,
1038 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1039 errmsg("pg_global cannot be used as default tablespace")));
1042 * No-op if same tablespace
1044 if (src_tblspcoid == dst_tblspcoid)
1046 heap_close(pgdbrel, NoLock);
1047 UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1048 AccessExclusiveLock);
1049 return;
1053 * Check for other backends in the target database. (Because we hold the
1054 * database lock, no new ones can start after this.)
1056 * As in CREATE DATABASE, check this after other error conditions.
1058 if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1059 ereport(ERROR,
1060 (errcode(ERRCODE_OBJECT_IN_USE),
1061 errmsg("database \"%s\" is being accessed by other users",
1062 dbname),
1063 errdetail_busy_db(notherbackends, npreparedxacts)));
1066 * Get old and new database paths
1068 src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
1069 dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
1072 * Force a checkpoint before proceeding. This will force dirty buffers out
1073 * to disk, to ensure source database is up-to-date on disk for the
1074 * copy. FlushDatabaseBuffers() would suffice for that, but we also want
1075 * to process any pending unlink requests. Otherwise, the check for
1076 * existing files in the target directory might fail unnecessarily, not to
1077 * mention that the copy might fail due to source files getting deleted
1078 * under it. On Windows, this also ensures that the bgwriter doesn't hold
1079 * any open files, which would cause rmdir() to fail.
1081 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1084 * Check for existence of files in the target directory, i.e., objects of
1085 * this database that are already in the target tablespace. We can't
1086 * allow the move in such a case, because we would need to change those
1087 * relations' pg_class.reltablespace entries to zero, and we don't have
1088 * access to the DB's pg_class to do so.
1090 dstdir = AllocateDir(dst_dbpath);
1091 if (dstdir != NULL)
1093 while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
1095 if (strcmp(xlde->d_name, ".") == 0 ||
1096 strcmp(xlde->d_name, "..") == 0)
1097 continue;
1099 ereport(ERROR,
1100 (errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
1101 dbname, tblspcname),
1102 errhint("You must move them back to the database's default tablespace before using this command.")));
1105 FreeDir(dstdir);
1108 * The directory exists but is empty.
1109 * We must remove it before using the copydir function.
1111 if (rmdir(dst_dbpath) != 0)
1112 elog(ERROR, "could not remove directory \"%s\": %m",
1113 dst_dbpath);
1117 * Use an ENSURE block to make sure we remove the debris if the copy fails
1118 * (eg, due to out-of-disk-space). This is not a 100% solution, because
1119 * of the possibility of failure during transaction commit, but it should
1120 * handle most scenarios.
1122 fparms.dest_dboid = db_id;
1123 fparms.dest_tsoid = dst_tblspcoid;
1124 PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1125 PointerGetDatum(&fparms));
1128 * Copy files from the old tablespace to the new one
1130 copydir(src_dbpath, dst_dbpath, false);
1133 * Record the filesystem change in XLOG
1136 xl_dbase_create_rec xlrec;
1137 XLogRecData rdata[1];
1139 xlrec.db_id = db_id;
1140 xlrec.tablespace_id = dst_tblspcoid;
1141 xlrec.src_db_id = db_id;
1142 xlrec.src_tablespace_id = src_tblspcoid;
1144 rdata[0].data = (char *) &xlrec;
1145 rdata[0].len = sizeof(xl_dbase_create_rec);
1146 rdata[0].buffer = InvalidBuffer;
1147 rdata[0].next = NULL;
1149 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
1153 * Update the database's pg_database tuple
1155 ScanKeyInit(&scankey,
1156 Anum_pg_database_datname,
1157 BTEqualStrategyNumber, F_NAMEEQ,
1158 NameGetDatum(dbname));
1159 sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
1160 SnapshotNow, 1, &scankey);
1161 oldtuple = systable_getnext(sysscan);
1162 if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
1163 ereport(ERROR,
1164 (errcode(ERRCODE_UNDEFINED_DATABASE),
1165 errmsg("database \"%s\" does not exist", dbname)));
1167 MemSet(new_record, 0, sizeof(new_record));
1168 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1169 MemSet(new_record_repl, false, sizeof(new_record_repl));
1171 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
1172 new_record_repl[Anum_pg_database_dattablespace - 1] = true;
1174 newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
1175 new_record,
1176 new_record_nulls, new_record_repl);
1177 simple_heap_update(pgdbrel, &oldtuple->t_self, newtuple);
1179 /* Update indexes */
1180 CatalogUpdateIndexes(pgdbrel, newtuple);
1182 systable_endscan(sysscan);
1185 * Force another checkpoint here. As in CREATE DATABASE, this is to
1186 * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
1187 * operation, which would cause us to lose any unlogged operations
1188 * done in the new DB tablespace before the next checkpoint.
1190 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1193 * Set flag to update flat database file at commit. Note: this also
1194 * forces synchronous commit, which minimizes the window between
1195 * copying the database files and commital of the transaction. If we
1196 * crash before committing, we'll leave an orphaned set of files on
1197 * disk, which is not fatal but not good either.
1199 database_file_update_needed();
1202 * Close pg_database, but keep lock till commit (this is important to
1203 * prevent any risk of deadlock failure while updating flat file)
1205 heap_close(pgdbrel, NoLock);
1207 PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1208 PointerGetDatum(&fparms));
1211 * Commit the transaction so that the pg_database update is committed.
1212 * If we crash while removing files, the database won't be corrupt,
1213 * we'll just leave some orphaned files in the old directory.
1215 * (This is OK because we know we aren't inside a transaction block.)
1217 * XXX would it be safe/better to do this inside the ensure block? Not
1218 * convinced it's a good idea; consider elog just after the transaction
1219 * really commits.
1221 PopActiveSnapshot();
1222 CommitTransactionCommand();
1224 /* Start new transaction for the remaining work; don't need a snapshot */
1225 StartTransactionCommand();
1228 * Remove files from the old tablespace
1230 if (!rmtree(src_dbpath, true))
1231 ereport(WARNING,
1232 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1233 src_dbpath)));
1236 * Record the filesystem change in XLOG
1239 xl_dbase_drop_rec xlrec;
1240 XLogRecData rdata[1];
1242 xlrec.db_id = db_id;
1243 xlrec.tablespace_id = src_tblspcoid;
1245 rdata[0].data = (char *) &xlrec;
1246 rdata[0].len = sizeof(xl_dbase_drop_rec);
1247 rdata[0].buffer = InvalidBuffer;
1248 rdata[0].next = NULL;
1250 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1253 /* Now it's safe to release the database lock */
1254 UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1255 AccessExclusiveLock);
1258 /* Error cleanup callback for movedb */
1259 static void
1260 movedb_failure_callback(int code, Datum arg)
1262 movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
1263 char *dstpath;
1265 /* Get rid of anything we managed to copy to the target directory */
1266 dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
1268 (void) rmtree(dstpath, true);
1273 * ALTER DATABASE name ...
1275 void
1276 AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
1278 Relation rel;
1279 HeapTuple tuple,
1280 newtuple;
1281 ScanKeyData scankey;
1282 SysScanDesc scan;
1283 ListCell *option;
1284 int connlimit = -1;
1285 DefElem *dconnlimit = NULL;
1286 DefElem *dtablespace = NULL;
1287 Datum new_record[Natts_pg_database];
1288 bool new_record_nulls[Natts_pg_database];
1289 bool new_record_repl[Natts_pg_database];
1291 /* Extract options from the statement node tree */
1292 foreach(option, stmt->options)
1294 DefElem *defel = (DefElem *) lfirst(option);
1296 if (strcmp(defel->defname, "connectionlimit") == 0)
1298 if (dconnlimit)
1299 ereport(ERROR,
1300 (errcode(ERRCODE_SYNTAX_ERROR),
1301 errmsg("conflicting or redundant options")));
1302 dconnlimit = defel;
1304 else if (strcmp(defel->defname, "tablespace") == 0)
1306 if (dtablespace)
1307 ereport(ERROR,
1308 (errcode(ERRCODE_SYNTAX_ERROR),
1309 errmsg("conflicting or redundant options")));
1310 dtablespace = defel;
1312 else
1313 elog(ERROR, "option \"%s\" not recognized",
1314 defel->defname);
1317 if (dtablespace)
1319 /* currently, can't be specified along with any other options */
1320 Assert(!dconnlimit);
1321 /* this case isn't allowed within a transaction block */
1322 PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE");
1323 movedb(stmt->dbname, strVal(dtablespace->arg));
1324 return;
1327 if (dconnlimit)
1329 connlimit = intVal(dconnlimit->arg);
1330 if (connlimit < -1)
1331 ereport(ERROR,
1332 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1333 errmsg("invalid connection limit: %d", connlimit)));
1337 * Get the old tuple. We don't need a lock on the database per se,
1338 * because we're not going to do anything that would mess up incoming
1339 * connections.
1341 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1342 ScanKeyInit(&scankey,
1343 Anum_pg_database_datname,
1344 BTEqualStrategyNumber, F_NAMEEQ,
1345 NameGetDatum(stmt->dbname));
1346 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1347 SnapshotNow, 1, &scankey);
1348 tuple = systable_getnext(scan);
1349 if (!HeapTupleIsValid(tuple))
1350 ereport(ERROR,
1351 (errcode(ERRCODE_UNDEFINED_DATABASE),
1352 errmsg("database \"%s\" does not exist", stmt->dbname)));
1354 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1355 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1356 stmt->dbname);
1359 * Build an updated tuple, perusing the information just obtained
1361 MemSet(new_record, 0, sizeof(new_record));
1362 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1363 MemSet(new_record_repl, false, sizeof(new_record_repl));
1365 if (dconnlimit)
1367 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
1368 new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
1371 newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
1372 new_record_nulls, new_record_repl);
1373 simple_heap_update(rel, &tuple->t_self, newtuple);
1375 /* Update indexes */
1376 CatalogUpdateIndexes(rel, newtuple);
1378 systable_endscan(scan);
1380 /* Close pg_database, but keep lock till commit */
1381 heap_close(rel, NoLock);
1384 * We don't bother updating the flat file since the existing options for
1385 * ALTER DATABASE don't affect it.
1391 * ALTER DATABASE name SET ...
1393 void
1394 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
1396 char *valuestr;
1397 HeapTuple tuple,
1398 newtuple;
1399 Relation rel;
1400 ScanKeyData scankey;
1401 SysScanDesc scan;
1402 Datum repl_val[Natts_pg_database];
1403 bool repl_null[Natts_pg_database];
1404 bool repl_repl[Natts_pg_database];
1406 valuestr = ExtractSetVariableArgs(stmt->setstmt);
1409 * Get the old tuple. We don't need a lock on the database per se,
1410 * because we're not going to do anything that would mess up incoming
1411 * connections.
1413 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1414 ScanKeyInit(&scankey,
1415 Anum_pg_database_datname,
1416 BTEqualStrategyNumber, F_NAMEEQ,
1417 NameGetDatum(stmt->dbname));
1418 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1419 SnapshotNow, 1, &scankey);
1420 tuple = systable_getnext(scan);
1421 if (!HeapTupleIsValid(tuple))
1422 ereport(ERROR,
1423 (errcode(ERRCODE_UNDEFINED_DATABASE),
1424 errmsg("database \"%s\" does not exist", stmt->dbname)));
1426 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1427 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1428 stmt->dbname);
1430 memset(repl_repl, false, sizeof(repl_repl));
1431 repl_repl[Anum_pg_database_datconfig - 1] = true;
1433 if (stmt->setstmt->kind == VAR_RESET_ALL)
1435 /* RESET ALL, so just set datconfig to null */
1436 repl_null[Anum_pg_database_datconfig - 1] = true;
1437 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
1439 else
1441 Datum datum;
1442 bool isnull;
1443 ArrayType *a;
1445 repl_null[Anum_pg_database_datconfig - 1] = false;
1447 /* Extract old value of datconfig */
1448 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
1449 RelationGetDescr(rel), &isnull);
1450 a = isnull ? NULL : DatumGetArrayTypeP(datum);
1452 /* Update (valuestr is NULL in RESET cases) */
1453 if (valuestr)
1454 a = GUCArrayAdd(a, stmt->setstmt->name, valuestr);
1455 else
1456 a = GUCArrayDelete(a, stmt->setstmt->name);
1458 if (a)
1459 repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
1460 else
1461 repl_null[Anum_pg_database_datconfig - 1] = true;
1464 newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
1465 repl_val, repl_null, repl_repl);
1466 simple_heap_update(rel, &tuple->t_self, newtuple);
1468 /* Update indexes */
1469 CatalogUpdateIndexes(rel, newtuple);
1471 systable_endscan(scan);
1473 /* Close pg_database, but keep lock till commit */
1474 heap_close(rel, NoLock);
1477 * We don't bother updating the flat file since ALTER DATABASE SET doesn't
1478 * affect it.
1484 * ALTER DATABASE name OWNER TO newowner
1486 void
1487 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
1489 HeapTuple tuple;
1490 Relation rel;
1491 ScanKeyData scankey;
1492 SysScanDesc scan;
1493 Form_pg_database datForm;
1496 * Get the old tuple. We don't need a lock on the database per se,
1497 * because we're not going to do anything that would mess up incoming
1498 * connections.
1500 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1501 ScanKeyInit(&scankey,
1502 Anum_pg_database_datname,
1503 BTEqualStrategyNumber, F_NAMEEQ,
1504 NameGetDatum(dbname));
1505 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1506 SnapshotNow, 1, &scankey);
1507 tuple = systable_getnext(scan);
1508 if (!HeapTupleIsValid(tuple))
1509 ereport(ERROR,
1510 (errcode(ERRCODE_UNDEFINED_DATABASE),
1511 errmsg("database \"%s\" does not exist", dbname)));
1513 datForm = (Form_pg_database) GETSTRUCT(tuple);
1516 * If the new owner is the same as the existing owner, consider the
1517 * command to have succeeded. This is to be consistent with other
1518 * objects.
1520 if (datForm->datdba != newOwnerId)
1522 Datum repl_val[Natts_pg_database];
1523 bool repl_null[Natts_pg_database];
1524 bool repl_repl[Natts_pg_database];
1525 Acl *newAcl;
1526 Datum aclDatum;
1527 bool isNull;
1528 HeapTuple newtuple;
1530 /* Otherwise, must be owner of the existing object */
1531 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1532 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1533 dbname);
1535 /* Must be able to become new owner */
1536 check_is_member_of_role(GetUserId(), newOwnerId);
1539 * must have createdb rights
1541 * NOTE: This is different from other alter-owner checks in that the
1542 * current user is checked for createdb privileges instead of the
1543 * destination owner. This is consistent with the CREATE case for
1544 * databases. Because superusers will always have this right, we need
1545 * no special case for them.
1547 if (!have_createdb_privilege())
1548 ereport(ERROR,
1549 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1550 errmsg("permission denied to change owner of database")));
1552 memset(repl_null, false, sizeof(repl_null));
1553 memset(repl_repl, false, sizeof(repl_repl));
1555 repl_repl[Anum_pg_database_datdba - 1] = true;
1556 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1559 * Determine the modified ACL for the new owner. This is only
1560 * necessary when the ACL is non-null.
1562 aclDatum = heap_getattr(tuple,
1563 Anum_pg_database_datacl,
1564 RelationGetDescr(rel),
1565 &isNull);
1566 if (!isNull)
1568 newAcl = aclnewowner(DatumGetAclP(aclDatum),
1569 datForm->datdba, newOwnerId);
1570 repl_repl[Anum_pg_database_datacl - 1] = true;
1571 repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1574 newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1575 simple_heap_update(rel, &newtuple->t_self, newtuple);
1576 CatalogUpdateIndexes(rel, newtuple);
1578 heap_freetuple(newtuple);
1580 /* Update owner dependency reference */
1581 changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1582 newOwnerId);
1585 systable_endscan(scan);
1587 /* Close pg_database, but keep lock till commit */
1588 heap_close(rel, NoLock);
1591 * We don't bother updating the flat file since ALTER DATABASE OWNER
1592 * doesn't affect it.
1598 * Helper functions
1602 * Look up info about the database named "name". If the database exists,
1603 * obtain the specified lock type on it, fill in any of the remaining
1604 * parameters that aren't NULL, and return TRUE. If no such database,
1605 * return FALSE.
1607 static bool
1608 get_db_info(const char *name, LOCKMODE lockmode,
1609 Oid *dbIdP, Oid *ownerIdP,
1610 int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1611 Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1612 Oid *dbTablespace, char **dbCollate, char **dbCtype)
1614 bool result = false;
1615 Relation relation;
1617 AssertArg(name);
1619 /* Caller may wish to grab a better lock on pg_database beforehand... */
1620 relation = heap_open(DatabaseRelationId, AccessShareLock);
1623 * Loop covers the rare case where the database is renamed before we can
1624 * lock it. We try again just in case we can find a new one of the same
1625 * name.
1627 for (;;)
1629 ScanKeyData scanKey;
1630 SysScanDesc scan;
1631 HeapTuple tuple;
1632 Oid dbOid;
1635 * there's no syscache for database-indexed-by-name, so must do it the
1636 * hard way
1638 ScanKeyInit(&scanKey,
1639 Anum_pg_database_datname,
1640 BTEqualStrategyNumber, F_NAMEEQ,
1641 NameGetDatum(name));
1643 scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1644 SnapshotNow, 1, &scanKey);
1646 tuple = systable_getnext(scan);
1648 if (!HeapTupleIsValid(tuple))
1650 /* definitely no database of that name */
1651 systable_endscan(scan);
1652 break;
1655 dbOid = HeapTupleGetOid(tuple);
1657 systable_endscan(scan);
1660 * Now that we have a database OID, we can try to lock the DB.
1662 if (lockmode != NoLock)
1663 LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1666 * And now, re-fetch the tuple by OID. If it's still there and still
1667 * the same name, we win; else, drop the lock and loop back to try
1668 * again.
1670 tuple = SearchSysCache(DATABASEOID,
1671 ObjectIdGetDatum(dbOid),
1672 0, 0, 0);
1673 if (HeapTupleIsValid(tuple))
1675 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1677 if (strcmp(name, NameStr(dbform->datname)) == 0)
1679 /* oid of the database */
1680 if (dbIdP)
1681 *dbIdP = dbOid;
1682 /* oid of the owner */
1683 if (ownerIdP)
1684 *ownerIdP = dbform->datdba;
1685 /* character encoding */
1686 if (encodingP)
1687 *encodingP = dbform->encoding;
1688 /* allowed as template? */
1689 if (dbIsTemplateP)
1690 *dbIsTemplateP = dbform->datistemplate;
1691 /* allowing connections? */
1692 if (dbAllowConnP)
1693 *dbAllowConnP = dbform->datallowconn;
1694 /* last system OID used in database */
1695 if (dbLastSysOidP)
1696 *dbLastSysOidP = dbform->datlastsysoid;
1697 /* limit of frozen XIDs */
1698 if (dbFrozenXidP)
1699 *dbFrozenXidP = dbform->datfrozenxid;
1700 /* default tablespace for this database */
1701 if (dbTablespace)
1702 *dbTablespace = dbform->dattablespace;
1703 /* default locale settings for this database */
1704 if (dbCollate)
1705 *dbCollate = pstrdup(NameStr(dbform->datcollate));
1706 if (dbCtype)
1707 *dbCtype = pstrdup(NameStr(dbform->datctype));
1708 ReleaseSysCache(tuple);
1709 result = true;
1710 break;
1712 /* can only get here if it was just renamed */
1713 ReleaseSysCache(tuple);
1716 if (lockmode != NoLock)
1717 UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1720 heap_close(relation, AccessShareLock);
1722 return result;
1725 /* Check if current user has createdb privileges */
1726 static bool
1727 have_createdb_privilege(void)
1729 bool result = false;
1730 HeapTuple utup;
1732 /* Superusers can always do everything */
1733 if (superuser())
1734 return true;
1736 utup = SearchSysCache(AUTHOID,
1737 ObjectIdGetDatum(GetUserId()),
1738 0, 0, 0);
1739 if (HeapTupleIsValid(utup))
1741 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1742 ReleaseSysCache(utup);
1744 return result;
1748 * Remove tablespace directories
1750 * We don't know what tablespaces db_id is using, so iterate through all
1751 * tablespaces removing <tablespace>/db_id
1753 static void
1754 remove_dbtablespaces(Oid db_id)
1756 Relation rel;
1757 HeapScanDesc scan;
1758 HeapTuple tuple;
1760 rel = heap_open(TableSpaceRelationId, AccessShareLock);
1761 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1762 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1764 Oid dsttablespace = HeapTupleGetOid(tuple);
1765 char *dstpath;
1766 struct stat st;
1768 /* Don't mess with the global tablespace */
1769 if (dsttablespace == GLOBALTABLESPACE_OID)
1770 continue;
1772 dstpath = GetDatabasePath(db_id, dsttablespace);
1774 if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1776 /* Assume we can ignore it */
1777 pfree(dstpath);
1778 continue;
1781 if (!rmtree(dstpath, true))
1782 ereport(WARNING,
1783 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1784 dstpath)));
1786 /* Record the filesystem change in XLOG */
1788 xl_dbase_drop_rec xlrec;
1789 XLogRecData rdata[1];
1791 xlrec.db_id = db_id;
1792 xlrec.tablespace_id = dsttablespace;
1794 rdata[0].data = (char *) &xlrec;
1795 rdata[0].len = sizeof(xl_dbase_drop_rec);
1796 rdata[0].buffer = InvalidBuffer;
1797 rdata[0].next = NULL;
1799 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1802 pfree(dstpath);
1805 heap_endscan(scan);
1806 heap_close(rel, AccessShareLock);
1810 * Check for existing files that conflict with a proposed new DB OID;
1811 * return TRUE if there are any
1813 * If there were a subdirectory in any tablespace matching the proposed new
1814 * OID, we'd get a create failure due to the duplicate name ... and then we'd
1815 * try to remove that already-existing subdirectory during the cleanup in
1816 * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
1817 * instead we make this extra check before settling on the OID of the new
1818 * database. This exactly parallels what GetNewRelFileNode() does for table
1819 * relfilenode values.
1821 static bool
1822 check_db_file_conflict(Oid db_id)
1824 bool result = false;
1825 Relation rel;
1826 HeapScanDesc scan;
1827 HeapTuple tuple;
1829 rel = heap_open(TableSpaceRelationId, AccessShareLock);
1830 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1831 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1833 Oid dsttablespace = HeapTupleGetOid(tuple);
1834 char *dstpath;
1835 struct stat st;
1837 /* Don't mess with the global tablespace */
1838 if (dsttablespace == GLOBALTABLESPACE_OID)
1839 continue;
1841 dstpath = GetDatabasePath(db_id, dsttablespace);
1843 if (lstat(dstpath, &st) == 0)
1845 /* Found a conflicting file (or directory, whatever) */
1846 pfree(dstpath);
1847 result = true;
1848 break;
1851 pfree(dstpath);
1854 heap_endscan(scan);
1855 heap_close(rel, AccessShareLock);
1856 return result;
1860 * Issue a suitable errdetail message for a busy database
1862 static int
1863 errdetail_busy_db(int notherbackends, int npreparedxacts)
1866 * We don't worry about singular versus plural here, since the English
1867 * rules for that don't translate very well. But we can at least avoid
1868 * the case of zero items.
1870 if (notherbackends > 0 && npreparedxacts > 0)
1871 errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
1872 notherbackends, npreparedxacts);
1873 else if (notherbackends > 0)
1874 errdetail("There are %d other session(s) using the database.",
1875 notherbackends);
1876 else
1877 errdetail("There are %d prepared transaction(s) using the database.",
1878 npreparedxacts);
1879 return 0; /* just to keep ereport macro happy */
1883 * get_database_oid - given a database name, look up the OID
1885 * Returns InvalidOid if database name not found.
1888 get_database_oid(const char *dbname)
1890 Relation pg_database;
1891 ScanKeyData entry[1];
1892 SysScanDesc scan;
1893 HeapTuple dbtuple;
1894 Oid oid;
1897 * There's no syscache for pg_database indexed by name, so we must look
1898 * the hard way.
1900 pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1901 ScanKeyInit(&entry[0],
1902 Anum_pg_database_datname,
1903 BTEqualStrategyNumber, F_NAMEEQ,
1904 CStringGetDatum(dbname));
1905 scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1906 SnapshotNow, 1, entry);
1908 dbtuple = systable_getnext(scan);
1910 /* We assume that there can be at most one matching tuple */
1911 if (HeapTupleIsValid(dbtuple))
1912 oid = HeapTupleGetOid(dbtuple);
1913 else
1914 oid = InvalidOid;
1916 systable_endscan(scan);
1917 heap_close(pg_database, AccessShareLock);
1919 return oid;
1924 * get_database_name - given a database OID, look up the name
1926 * Returns a palloc'd string, or NULL if no such database.
1928 char *
1929 get_database_name(Oid dbid)
1931 HeapTuple dbtuple;
1932 char *result;
1934 dbtuple = SearchSysCache(DATABASEOID,
1935 ObjectIdGetDatum(dbid),
1936 0, 0, 0);
1937 if (HeapTupleIsValid(dbtuple))
1939 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1940 ReleaseSysCache(dbtuple);
1942 else
1943 result = NULL;
1945 return result;
1949 * DATABASE resource manager's routines
1951 void
1952 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1954 uint8 info = record->xl_info & ~XLR_INFO_MASK;
1956 /* Backup blocks are not used in dbase records */
1957 Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
1959 if (info == XLOG_DBASE_CREATE)
1961 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1962 char *src_path;
1963 char *dst_path;
1964 struct stat st;
1966 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
1967 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1970 * Our theory for replaying a CREATE is to forcibly drop the target
1971 * subdirectory if present, then re-copy the source data. This may be
1972 * more work than needed, but it is simple to implement.
1974 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1976 if (!rmtree(dst_path, true))
1977 ereport(WARNING,
1978 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1979 dst_path)));
1983 * Force dirty buffers out to disk, to ensure source database is
1984 * up-to-date for the copy.
1986 FlushDatabaseBuffers(xlrec->src_db_id);
1989 * Copy this subdirectory to the new location
1991 * We don't need to copy subdirectories
1993 copydir(src_path, dst_path, false);
1995 else if (info == XLOG_DBASE_DROP)
1997 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1998 char *dst_path;
2000 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
2002 /* Drop pages for this database that are in the shared buffer cache */
2003 DropDatabaseBuffers(xlrec->db_id);
2005 /* Also, clean out any fsync requests that might be pending in md.c */
2006 ForgetDatabaseFsyncRequests(xlrec->db_id);
2008 /* Clean out the xlog relcache too */
2009 XLogDropDatabase(xlrec->db_id);
2011 /* And remove the physical files */
2012 if (!rmtree(dst_path, true))
2013 ereport(WARNING,
2014 (errmsg("some useless files may be left behind in old database directory \"%s\"",
2015 dst_path)));
2017 else
2018 elog(PANIC, "dbase_redo: unknown op code %u", info);
2021 void
2022 dbase_desc(StringInfo buf, uint8 xl_info, char *rec)
2024 uint8 info = xl_info & ~XLR_INFO_MASK;
2026 if (info == XLOG_DBASE_CREATE)
2028 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
2030 appendStringInfo(buf, "create db: copy dir %u/%u to %u/%u",
2031 xlrec->src_db_id, xlrec->src_tablespace_id,
2032 xlrec->db_id, xlrec->tablespace_id);
2034 else if (info == XLOG_DBASE_DROP)
2036 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
2038 appendStringInfo(buf, "drop db: dir %u/%u",
2039 xlrec->db_id, xlrec->tablespace_id);
2041 else
2042 appendStringInfo(buf, "UNKNOWN");