Fix pg_dump bug in the database-level collation patch. "datcollate" and
[PostgreSQL.git] / src / backend / commands / dbcommands.c
blobf31261db9eab0b23d7b135ef1d75d7244d46df4e
1 /*-------------------------------------------------------------------------
3 * dbcommands.c
4 * Database management commands (create/drop database).
6 * Note: database creation/destruction commands use exclusive locks on
7 * the database objects (as expressed by LockSharedObject()) to avoid
8 * stepping on each others' toes. Formerly we used table-level locks
9 * on pg_database, but that's too coarse-grained.
11 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
15 * IDENTIFICATION
16 * $PostgreSQL$
18 *-------------------------------------------------------------------------
20 #include "postgres.h"
22 #include <fcntl.h>
23 #include <locale.h>
24 #include <unistd.h>
25 #include <sys/stat.h>
27 #include "access/genam.h"
28 #include "access/heapam.h"
29 #include "access/xact.h"
30 #include "access/xlogutils.h"
31 #include "catalog/catalog.h"
32 #include "catalog/dependency.h"
33 #include "catalog/indexing.h"
34 #include "catalog/pg_authid.h"
35 #include "catalog/pg_database.h"
36 #include "catalog/pg_tablespace.h"
37 #include "commands/comment.h"
38 #include "commands/dbcommands.h"
39 #include "commands/tablespace.h"
40 #include "mb/pg_wchar.h"
41 #include "miscadmin.h"
42 #include "pgstat.h"
43 #include "postmaster/bgwriter.h"
44 #include "storage/bufmgr.h"
45 #include "storage/lmgr.h"
46 #include "storage/freespace.h"
47 #include "storage/ipc.h"
48 #include "storage/procarray.h"
49 #include "storage/smgr.h"
50 #include "utils/acl.h"
51 #include "utils/builtins.h"
52 #include "utils/flatfiles.h"
53 #include "utils/fmgroids.h"
54 #include "utils/guc.h"
55 #include "utils/lsyscache.h"
56 #include "utils/pg_locale.h"
57 #include "utils/syscache.h"
58 #include "utils/tqual.h"
61 typedef struct
63 Oid src_dboid; /* source (template) DB */
64 Oid dest_dboid; /* DB we are trying to create */
65 } createdb_failure_params;
67 /* non-export function prototypes */
68 static void createdb_failure_callback(int code, Datum arg);
69 static bool get_db_info(const char *name, LOCKMODE lockmode,
70 Oid *dbIdP, Oid *ownerIdP,
71 int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
72 Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
73 Oid *dbTablespace, char **dbCollate, char **dbCtype);
74 static bool have_createdb_privilege(void);
75 static void remove_dbtablespaces(Oid db_id);
76 static bool check_db_file_conflict(Oid db_id);
77 static int errdetail_busy_db(int notherbackends, int npreparedxacts);
81 * CREATE DATABASE
83 void
84 createdb(const CreatedbStmt *stmt)
86 HeapScanDesc scan;
87 Relation rel;
88 Oid src_dboid;
89 Oid src_owner;
90 int src_encoding;
91 char *src_collate;
92 char *src_ctype;
93 bool src_istemplate;
94 bool src_allowconn;
95 Oid src_lastsysoid;
96 TransactionId src_frozenxid;
97 Oid src_deftablespace;
98 volatile Oid dst_deftablespace;
99 Relation pg_database_rel;
100 HeapTuple tuple;
101 Datum new_record[Natts_pg_database];
102 char new_record_nulls[Natts_pg_database];
103 Oid dboid;
104 Oid datdba;
105 ListCell *option;
106 DefElem *dtablespacename = NULL;
107 DefElem *downer = NULL;
108 DefElem *dtemplate = NULL;
109 DefElem *dencoding = NULL;
110 DefElem *dcollate = NULL;
111 DefElem *dctype = NULL;
112 DefElem *dconnlimit = NULL;
113 char *dbname = stmt->dbname;
114 char *dbowner = NULL;
115 const char *dbtemplate = NULL;
116 char *dbcollate = NULL;
117 char *dbctype = NULL;
118 int encoding = -1;
119 int dbconnlimit = -1;
120 int ctype_encoding;
121 int collate_encoding;
122 int notherbackends;
123 int npreparedxacts;
124 createdb_failure_params fparms;
126 /* Extract options from the statement node tree */
127 foreach(option, stmt->options)
129 DefElem *defel = (DefElem *) lfirst(option);
131 if (strcmp(defel->defname, "tablespace") == 0)
133 if (dtablespacename)
134 ereport(ERROR,
135 (errcode(ERRCODE_SYNTAX_ERROR),
136 errmsg("conflicting or redundant options")));
137 dtablespacename = defel;
139 else if (strcmp(defel->defname, "owner") == 0)
141 if (downer)
142 ereport(ERROR,
143 (errcode(ERRCODE_SYNTAX_ERROR),
144 errmsg("conflicting or redundant options")));
145 downer = defel;
147 else if (strcmp(defel->defname, "template") == 0)
149 if (dtemplate)
150 ereport(ERROR,
151 (errcode(ERRCODE_SYNTAX_ERROR),
152 errmsg("conflicting or redundant options")));
153 dtemplate = defel;
155 else if (strcmp(defel->defname, "encoding") == 0)
157 if (dencoding)
158 ereport(ERROR,
159 (errcode(ERRCODE_SYNTAX_ERROR),
160 errmsg("conflicting or redundant options")));
161 dencoding = defel;
163 else if (strcmp(defel->defname, "collate") == 0)
165 if (dcollate)
166 ereport(ERROR,
167 (errcode(ERRCODE_SYNTAX_ERROR),
168 errmsg("conflicting or redundant options")));
169 dcollate = defel;
171 else if (strcmp(defel->defname, "ctype") == 0)
173 if (dctype)
174 ereport(ERROR,
175 (errcode(ERRCODE_SYNTAX_ERROR),
176 errmsg("conflicting or redundant options")));
177 dctype = defel;
179 else if (strcmp(defel->defname, "connectionlimit") == 0)
181 if (dconnlimit)
182 ereport(ERROR,
183 (errcode(ERRCODE_SYNTAX_ERROR),
184 errmsg("conflicting or redundant options")));
185 dconnlimit = defel;
187 else if (strcmp(defel->defname, "location") == 0)
189 ereport(WARNING,
190 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
191 errmsg("LOCATION is not supported anymore"),
192 errhint("Consider using tablespaces instead.")));
194 else
195 elog(ERROR, "option \"%s\" not recognized",
196 defel->defname);
199 if (downer && downer->arg)
200 dbowner = strVal(downer->arg);
201 if (dtemplate && dtemplate->arg)
202 dbtemplate = strVal(dtemplate->arg);
203 if (dencoding && dencoding->arg)
205 const char *encoding_name;
207 if (IsA(dencoding->arg, Integer))
209 encoding = intVal(dencoding->arg);
210 encoding_name = pg_encoding_to_char(encoding);
211 if (strcmp(encoding_name, "") == 0 ||
212 pg_valid_server_encoding(encoding_name) < 0)
213 ereport(ERROR,
214 (errcode(ERRCODE_UNDEFINED_OBJECT),
215 errmsg("%d is not a valid encoding code",
216 encoding)));
218 else if (IsA(dencoding->arg, String))
220 encoding_name = strVal(dencoding->arg);
221 encoding = pg_valid_server_encoding(encoding_name);
222 if (encoding < 0)
223 ereport(ERROR,
224 (errcode(ERRCODE_UNDEFINED_OBJECT),
225 errmsg("%s is not a valid encoding name",
226 encoding_name)));
228 else
229 elog(ERROR, "unrecognized node type: %d",
230 nodeTag(dencoding->arg));
232 if (dcollate && dcollate->arg)
233 dbcollate = strVal(dcollate->arg);
234 if (dctype && dctype->arg)
235 dbctype = strVal(dctype->arg);
237 if (dconnlimit && dconnlimit->arg)
238 dbconnlimit = intVal(dconnlimit->arg);
240 /* obtain OID of proposed owner */
241 if (dbowner)
242 datdba = get_roleid_checked(dbowner);
243 else
244 datdba = GetUserId();
247 * To create a database, must have createdb privilege and must be able to
248 * become the target role (this does not imply that the target role itself
249 * must have createdb privilege). The latter provision guards against
250 * "giveaway" attacks. Note that a superuser will always have both of
251 * these privileges a fortiori.
253 if (!have_createdb_privilege())
254 ereport(ERROR,
255 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
256 errmsg("permission denied to create database")));
258 check_is_member_of_role(GetUserId(), datdba);
261 * Lookup database (template) to be cloned, and obtain share lock on it.
262 * ShareLock allows two CREATE DATABASEs to work from the same template
263 * concurrently, while ensuring no one is busy dropping it in parallel
264 * (which would be Very Bad since we'd likely get an incomplete copy
265 * without knowing it). This also prevents any new connections from being
266 * made to the source until we finish copying it, so we can be sure it
267 * won't change underneath us.
269 if (!dbtemplate)
270 dbtemplate = "template1"; /* Default template database name */
272 if (!get_db_info(dbtemplate, ShareLock,
273 &src_dboid, &src_owner, &src_encoding,
274 &src_istemplate, &src_allowconn, &src_lastsysoid,
275 &src_frozenxid, &src_deftablespace,
276 &src_collate, &src_ctype))
277 ereport(ERROR,
278 (errcode(ERRCODE_UNDEFINED_DATABASE),
279 errmsg("template database \"%s\" does not exist",
280 dbtemplate)));
283 * Permission check: to copy a DB that's not marked datistemplate, you
284 * must be superuser or the owner thereof.
286 if (!src_istemplate)
288 if (!pg_database_ownercheck(src_dboid, GetUserId()))
289 ereport(ERROR,
290 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
291 errmsg("permission denied to copy database \"%s\"",
292 dbtemplate)));
295 /* If encoding or locales are defaulted, use source's setting */
296 if (encoding < 0)
297 encoding = src_encoding;
298 if (dbcollate == NULL)
299 dbcollate = src_collate;
300 if (dbctype == NULL)
301 dbctype = src_ctype;
303 /* Some encodings are client only */
304 if (!PG_VALID_BE_ENCODING(encoding))
305 ereport(ERROR,
306 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
307 errmsg("invalid server encoding %d", encoding)));
309 /* Check that the chosen locales are valid */
310 if (!check_locale(LC_COLLATE, dbcollate))
311 ereport(ERROR,
312 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
313 errmsg("invalid locale name %s", dbcollate)));
314 if (!check_locale(LC_CTYPE, dbctype))
315 ereport(ERROR,
316 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
317 errmsg("invalid locale name %s", dbctype)));
320 * Check whether encoding matches server locale settings. We allow
321 * mismatch in three cases:
323 * 1. ctype_encoding = SQL_ASCII, which means either that the locale is
324 * C/POSIX which works with any encoding, or that we couldn't determine
325 * the locale's encoding and have to trust the user to get it right.
327 * 2. selected encoding is SQL_ASCII, but only if you're a superuser. This
328 * is risky but we have historically allowed it --- notably, the
329 * regression tests require it.
331 * 3. selected encoding is UTF8 and platform is win32. This is because
332 * UTF8 is a pseudo codepage that is supported in all locales since it's
333 * converted to UTF16 before being used.
335 * Note: if you change this policy, fix initdb to match.
337 ctype_encoding = pg_get_encoding_from_locale(dbctype);
338 collate_encoding = pg_get_encoding_from_locale(dbcollate);
340 if (!(ctype_encoding == encoding ||
341 ctype_encoding == PG_SQL_ASCII ||
342 #ifdef WIN32
343 encoding == PG_UTF8 ||
344 #endif
345 (encoding == PG_SQL_ASCII && superuser())))
346 ereport(ERROR,
347 (errmsg("encoding %s does not match locale %s",
348 pg_encoding_to_char(encoding),
349 dbctype),
350 errdetail("The chosen CTYPE setting requires encoding %s.",
351 pg_encoding_to_char(ctype_encoding))));
353 if (!(collate_encoding == encoding ||
354 collate_encoding == PG_SQL_ASCII ||
355 #ifdef WIN32
356 encoding == PG_UTF8 ||
357 #endif
358 (encoding == PG_SQL_ASCII && superuser())))
359 ereport(ERROR,
360 (errmsg("encoding %s does not match locale %s",
361 pg_encoding_to_char(encoding),
362 dbcollate),
363 errdetail("The chosen COLLATE setting requires encoding %s.",
364 pg_encoding_to_char(collate_encoding))));
367 * Check that the new locale is compatible with the source database.
369 * We know that template0 doesn't contain any indexes that depend on
370 * collation or ctype, so template0 can be used as template for
371 * any locale.
373 if (strcmp(dbtemplate, "template0") != 0)
375 if (strcmp(dbcollate, src_collate))
376 ereport(ERROR,
377 (errmsg("new collation is incompatible with the collation of the template database (%s)", src_collate),
378 errhint("Use the same collation as in the template database, or use template0 as template")));
380 if (strcmp(dbctype, src_ctype))
381 ereport(ERROR,
382 (errmsg("new ctype is incompatible with the ctype of the template database (%s)", src_ctype),
383 errhint("Use the same ctype as in the template database, or use template0 as template")));
386 /* Resolve default tablespace for new database */
387 if (dtablespacename && dtablespacename->arg)
389 char *tablespacename;
390 AclResult aclresult;
392 tablespacename = strVal(dtablespacename->arg);
393 dst_deftablespace = get_tablespace_oid(tablespacename);
394 if (!OidIsValid(dst_deftablespace))
395 ereport(ERROR,
396 (errcode(ERRCODE_UNDEFINED_OBJECT),
397 errmsg("tablespace \"%s\" does not exist",
398 tablespacename)));
399 /* check permissions */
400 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
401 ACL_CREATE);
402 if (aclresult != ACLCHECK_OK)
403 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
404 tablespacename);
406 /* pg_global must never be the default tablespace */
407 if (dst_deftablespace == GLOBALTABLESPACE_OID)
408 ereport(ERROR,
409 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
410 errmsg("pg_global cannot be used as default tablespace")));
413 * If we are trying to change the default tablespace of the template,
414 * we require that the template not have any files in the new default
415 * tablespace. This is necessary because otherwise the copied
416 * database would contain pg_class rows that refer to its default
417 * tablespace both explicitly (by OID) and implicitly (as zero), which
418 * would cause problems. For example another CREATE DATABASE using
419 * the copied database as template, and trying to change its default
420 * tablespace again, would yield outright incorrect results (it would
421 * improperly move tables to the new default tablespace that should
422 * stay in the same tablespace).
424 if (dst_deftablespace != src_deftablespace)
426 char *srcpath;
427 struct stat st;
429 srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
431 if (stat(srcpath, &st) == 0 &&
432 S_ISDIR(st.st_mode) &&
433 !directory_is_empty(srcpath))
434 ereport(ERROR,
435 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
436 errmsg("cannot assign new default tablespace \"%s\"",
437 tablespacename),
438 errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
439 dbtemplate)));
440 pfree(srcpath);
443 else
445 /* Use template database's default tablespace */
446 dst_deftablespace = src_deftablespace;
447 /* Note there is no additional permission check in this path */
451 * Check for db name conflict. This is just to give a more friendly error
452 * message than "unique index violation". There's a race condition but
453 * we're willing to accept the less friendly message in that case.
455 if (OidIsValid(get_database_oid(dbname)))
456 ereport(ERROR,
457 (errcode(ERRCODE_DUPLICATE_DATABASE),
458 errmsg("database \"%s\" already exists", dbname)));
461 * The source DB can't have any active backends, except this one
462 * (exception is to allow CREATE DB while connected to template1).
463 * Otherwise we might copy inconsistent data.
465 * This should be last among the basic error checks, because it involves
466 * potential waiting; we may as well throw an error first if we're gonna
467 * throw one.
469 if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
470 ereport(ERROR,
471 (errcode(ERRCODE_OBJECT_IN_USE),
472 errmsg("source database \"%s\" is being accessed by other users",
473 dbtemplate),
474 errdetail_busy_db(notherbackends, npreparedxacts)));
477 * Select an OID for the new database, checking that it doesn't have a
478 * filename conflict with anything already existing in the tablespace
479 * directories.
481 pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
485 dboid = GetNewOid(pg_database_rel);
486 } while (check_db_file_conflict(dboid));
489 * Insert a new tuple into pg_database. This establishes our ownership of
490 * the new database name (anyone else trying to insert the same name will
491 * block on the unique index, and fail after we commit).
494 /* Form tuple */
495 MemSet(new_record, 0, sizeof(new_record));
496 MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
498 new_record[Anum_pg_database_datname - 1] =
499 DirectFunctionCall1(namein, CStringGetDatum(dbname));
500 new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
501 new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
502 new_record[Anum_pg_database_datcollate - 1] =
503 DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
504 new_record[Anum_pg_database_datctype - 1] =
505 DirectFunctionCall1(namein, CStringGetDatum(dbctype));
506 new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
507 new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
508 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
509 new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
510 new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
511 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
514 * We deliberately set datconfig and datacl to defaults (NULL), rather
515 * than copying them from the template database. Copying datacl would be
516 * a bad idea when the owner is not the same as the template's owner. It's
517 * more debatable whether datconfig should be copied.
519 new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
520 new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
522 tuple = heap_formtuple(RelationGetDescr(pg_database_rel),
523 new_record, new_record_nulls);
525 HeapTupleSetOid(tuple, dboid);
527 simple_heap_insert(pg_database_rel, tuple);
529 /* Update indexes */
530 CatalogUpdateIndexes(pg_database_rel, tuple);
533 * Now generate additional catalog entries associated with the new DB
536 /* Register owner dependency */
537 recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
539 /* Create pg_shdepend entries for objects within database */
540 copyTemplateDependencies(src_dboid, dboid);
543 * Force dirty buffers out to disk, to ensure source database is
544 * up-to-date for the copy.
546 FlushDatabaseBuffers(src_dboid);
549 * Once we start copying subdirectories, we need to be able to clean 'em
550 * up if we fail. Use an ENSURE block to make sure this happens. (This
551 * is not a 100% solution, because of the possibility of failure during
552 * transaction commit after we leave this routine, but it should handle
553 * most scenarios.)
555 fparms.src_dboid = src_dboid;
556 fparms.dest_dboid = dboid;
557 PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
558 PointerGetDatum(&fparms));
561 * Iterate through all tablespaces of the template database, and copy
562 * each one to the new database.
564 rel = heap_open(TableSpaceRelationId, AccessShareLock);
565 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
566 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
568 Oid srctablespace = HeapTupleGetOid(tuple);
569 Oid dsttablespace;
570 char *srcpath;
571 char *dstpath;
572 struct stat st;
574 /* No need to copy global tablespace */
575 if (srctablespace == GLOBALTABLESPACE_OID)
576 continue;
578 srcpath = GetDatabasePath(src_dboid, srctablespace);
580 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
581 directory_is_empty(srcpath))
583 /* Assume we can ignore it */
584 pfree(srcpath);
585 continue;
588 if (srctablespace == src_deftablespace)
589 dsttablespace = dst_deftablespace;
590 else
591 dsttablespace = srctablespace;
593 dstpath = GetDatabasePath(dboid, dsttablespace);
596 * Copy this subdirectory to the new location
598 * We don't need to copy subdirectories
600 copydir(srcpath, dstpath, false);
602 /* Record the filesystem change in XLOG */
604 xl_dbase_create_rec xlrec;
605 XLogRecData rdata[1];
607 xlrec.db_id = dboid;
608 xlrec.tablespace_id = dsttablespace;
609 xlrec.src_db_id = src_dboid;
610 xlrec.src_tablespace_id = srctablespace;
612 rdata[0].data = (char *) &xlrec;
613 rdata[0].len = sizeof(xl_dbase_create_rec);
614 rdata[0].buffer = InvalidBuffer;
615 rdata[0].next = NULL;
617 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
620 heap_endscan(scan);
621 heap_close(rel, AccessShareLock);
624 * We force a checkpoint before committing. This effectively means
625 * that committed XLOG_DBASE_CREATE operations will never need to be
626 * replayed (at least not in ordinary crash recovery; we still have to
627 * make the XLOG entry for the benefit of PITR operations). This
628 * avoids two nasty scenarios:
630 * #1: When PITR is off, we don't XLOG the contents of newly created
631 * indexes; therefore the drop-and-recreate-whole-directory behavior
632 * of DBASE_CREATE replay would lose such indexes.
634 * #2: Since we have to recopy the source database during DBASE_CREATE
635 * replay, we run the risk of copying changes in it that were
636 * committed after the original CREATE DATABASE command but before the
637 * system crash that led to the replay. This is at least unexpected
638 * and at worst could lead to inconsistencies, eg duplicate table
639 * names.
641 * (Both of these were real bugs in releases 8.0 through 8.0.3.)
643 * In PITR replay, the first of these isn't an issue, and the second
644 * is only a risk if the CREATE DATABASE and subsequent template
645 * database change both occur while a base backup is being taken.
646 * There doesn't seem to be much we can do about that except document
647 * it as a limitation.
649 * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
650 * we can avoid this.
652 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
655 * Close pg_database, but keep lock till commit (this is important to
656 * prevent any risk of deadlock failure while updating flat file)
658 heap_close(pg_database_rel, NoLock);
661 * Set flag to update flat database file at commit. Note: this also
662 * forces synchronous commit, which minimizes the window between
663 * creation of the database files and commital of the transaction. If
664 * we crash before committing, we'll have a DB that's taking up disk
665 * space but is not in pg_database, which is not good.
667 database_file_update_needed();
669 PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
670 PointerGetDatum(&fparms));
673 /* Error cleanup callback for createdb */
674 static void
675 createdb_failure_callback(int code, Datum arg)
677 createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
680 * Release lock on source database before doing recursive remove.
681 * This is not essential but it seems desirable to release the lock
682 * as soon as possible.
684 UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
686 /* Throw away any successfully copied subdirectories */
687 remove_dbtablespaces(fparms->dest_dboid);
692 * DROP DATABASE
694 void
695 dropdb(const char *dbname, bool missing_ok)
697 Oid db_id;
698 bool db_istemplate;
699 Relation pgdbrel;
700 HeapTuple tup;
701 int notherbackends;
702 int npreparedxacts;
705 * Look up the target database's OID, and get exclusive lock on it. We
706 * need this to ensure that no new backend starts up in the target
707 * database while we are deleting it (see postinit.c), and that no one is
708 * using it as a CREATE DATABASE template or trying to delete it for
709 * themselves.
711 pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
713 if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
714 &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL))
716 if (!missing_ok)
718 ereport(ERROR,
719 (errcode(ERRCODE_UNDEFINED_DATABASE),
720 errmsg("database \"%s\" does not exist", dbname)));
722 else
724 /* Close pg_database, release the lock, since we changed nothing */
725 heap_close(pgdbrel, RowExclusiveLock);
726 ereport(NOTICE,
727 (errmsg("database \"%s\" does not exist, skipping",
728 dbname)));
729 return;
734 * Permission checks
736 if (!pg_database_ownercheck(db_id, GetUserId()))
737 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
738 dbname);
741 * Disallow dropping a DB that is marked istemplate. This is just to
742 * prevent people from accidentally dropping template0 or template1; they
743 * can do so if they're really determined ...
745 if (db_istemplate)
746 ereport(ERROR,
747 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
748 errmsg("cannot drop a template database")));
750 /* Obviously can't drop my own database */
751 if (db_id == MyDatabaseId)
752 ereport(ERROR,
753 (errcode(ERRCODE_OBJECT_IN_USE),
754 errmsg("cannot drop the currently open database")));
757 * Check for other backends in the target database. (Because we hold the
758 * database lock, no new ones can start after this.)
760 * As in CREATE DATABASE, check this after other error conditions.
762 if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
763 ereport(ERROR,
764 (errcode(ERRCODE_OBJECT_IN_USE),
765 errmsg("database \"%s\" is being accessed by other users",
766 dbname),
767 errdetail_busy_db(notherbackends, npreparedxacts)));
770 * Remove the database's tuple from pg_database.
772 tup = SearchSysCache(DATABASEOID,
773 ObjectIdGetDatum(db_id),
774 0, 0, 0);
775 if (!HeapTupleIsValid(tup))
776 elog(ERROR, "cache lookup failed for database %u", db_id);
778 simple_heap_delete(pgdbrel, &tup->t_self);
780 ReleaseSysCache(tup);
783 * Delete any comments associated with the database.
785 DeleteSharedComments(db_id, DatabaseRelationId);
788 * Remove shared dependency references for the database.
790 dropDatabaseDependencies(db_id);
793 * Drop pages for this database that are in the shared buffer cache. This
794 * is important to ensure that no remaining backend tries to write out a
795 * dirty buffer to the dead database later...
797 DropDatabaseBuffers(db_id);
800 * Also, clean out any entries in the shared free space map.
802 FreeSpaceMapForgetDatabase(db_id);
805 * Tell the stats collector to forget it immediately, too.
807 pgstat_drop_database(db_id);
810 * Tell bgwriter to forget any pending fsync and unlink requests for files
811 * in the database; else the fsyncs will fail at next checkpoint, or worse,
812 * it will delete files that belong to a newly created database with the
813 * same OID.
815 ForgetDatabaseFsyncRequests(db_id);
818 * Force a checkpoint to make sure the bgwriter has received the message
819 * sent by ForgetDatabaseFsyncRequests. On Windows, this also ensures that
820 * the bgwriter doesn't hold any open files, which would cause rmdir() to
821 * fail.
823 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
826 * Remove all tablespace subdirs belonging to the database.
828 remove_dbtablespaces(db_id);
831 * Close pg_database, but keep lock till commit (this is important to
832 * prevent any risk of deadlock failure while updating flat file)
834 heap_close(pgdbrel, NoLock);
837 * Set flag to update flat database file at commit. Note: this also
838 * forces synchronous commit, which minimizes the window between removal
839 * of the database files and commital of the transaction. If we crash
840 * before committing, we'll have a DB that's gone on disk but still there
841 * according to pg_database, which is not good.
843 database_file_update_needed();
848 * Rename database
850 void
851 RenameDatabase(const char *oldname, const char *newname)
853 Oid db_id;
854 HeapTuple newtup;
855 Relation rel;
856 int notherbackends;
857 int npreparedxacts;
860 * Look up the target database's OID, and get exclusive lock on it. We
861 * need this for the same reasons as DROP DATABASE.
863 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
865 if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
866 NULL, NULL, NULL, NULL, NULL, NULL, NULL))
867 ereport(ERROR,
868 (errcode(ERRCODE_UNDEFINED_DATABASE),
869 errmsg("database \"%s\" does not exist", oldname)));
871 /* must be owner */
872 if (!pg_database_ownercheck(db_id, GetUserId()))
873 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
874 oldname);
876 /* must have createdb rights */
877 if (!have_createdb_privilege())
878 ereport(ERROR,
879 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
880 errmsg("permission denied to rename database")));
883 * Make sure the new name doesn't exist. See notes for same error in
884 * CREATE DATABASE.
886 if (OidIsValid(get_database_oid(newname)))
887 ereport(ERROR,
888 (errcode(ERRCODE_DUPLICATE_DATABASE),
889 errmsg("database \"%s\" already exists", newname)));
892 * XXX Client applications probably store the current database somewhere,
893 * so renaming it could cause confusion. On the other hand, there may not
894 * be an actual problem besides a little confusion, so think about this
895 * and decide.
897 if (db_id == MyDatabaseId)
898 ereport(ERROR,
899 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
900 errmsg("current database cannot be renamed")));
903 * Make sure the database does not have active sessions. This is the same
904 * concern as above, but applied to other sessions.
906 * As in CREATE DATABASE, check this after other error conditions.
908 if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
909 ereport(ERROR,
910 (errcode(ERRCODE_OBJECT_IN_USE),
911 errmsg("database \"%s\" is being accessed by other users",
912 oldname),
913 errdetail_busy_db(notherbackends, npreparedxacts)));
915 /* rename */
916 newtup = SearchSysCacheCopy(DATABASEOID,
917 ObjectIdGetDatum(db_id),
918 0, 0, 0);
919 if (!HeapTupleIsValid(newtup))
920 elog(ERROR, "cache lookup failed for database %u", db_id);
921 namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
922 simple_heap_update(rel, &newtup->t_self, newtup);
923 CatalogUpdateIndexes(rel, newtup);
926 * Close pg_database, but keep lock till commit (this is important to
927 * prevent any risk of deadlock failure while updating flat file)
929 heap_close(rel, NoLock);
932 * Set flag to update flat database file at commit.
934 database_file_update_needed();
939 * ALTER DATABASE name ...
941 void
942 AlterDatabase(AlterDatabaseStmt *stmt)
944 Relation rel;
945 HeapTuple tuple,
946 newtuple;
947 ScanKeyData scankey;
948 SysScanDesc scan;
949 ListCell *option;
950 int connlimit = -1;
951 DefElem *dconnlimit = NULL;
952 Datum new_record[Natts_pg_database];
953 char new_record_nulls[Natts_pg_database];
954 char new_record_repl[Natts_pg_database];
956 /* Extract options from the statement node tree */
957 foreach(option, stmt->options)
959 DefElem *defel = (DefElem *) lfirst(option);
961 if (strcmp(defel->defname, "connectionlimit") == 0)
963 if (dconnlimit)
964 ereport(ERROR,
965 (errcode(ERRCODE_SYNTAX_ERROR),
966 errmsg("conflicting or redundant options")));
967 dconnlimit = defel;
969 else
970 elog(ERROR, "option \"%s\" not recognized",
971 defel->defname);
974 if (dconnlimit)
975 connlimit = intVal(dconnlimit->arg);
978 * Get the old tuple. We don't need a lock on the database per se,
979 * because we're not going to do anything that would mess up incoming
980 * connections.
982 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
983 ScanKeyInit(&scankey,
984 Anum_pg_database_datname,
985 BTEqualStrategyNumber, F_NAMEEQ,
986 NameGetDatum(stmt->dbname));
987 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
988 SnapshotNow, 1, &scankey);
989 tuple = systable_getnext(scan);
990 if (!HeapTupleIsValid(tuple))
991 ereport(ERROR,
992 (errcode(ERRCODE_UNDEFINED_DATABASE),
993 errmsg("database \"%s\" does not exist", stmt->dbname)));
995 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
996 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
997 stmt->dbname);
1000 * Build an updated tuple, perusing the information just obtained
1002 MemSet(new_record, 0, sizeof(new_record));
1003 MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
1004 MemSet(new_record_repl, ' ', sizeof(new_record_repl));
1006 if (dconnlimit)
1008 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
1009 new_record_repl[Anum_pg_database_datconnlimit - 1] = 'r';
1012 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), new_record,
1013 new_record_nulls, new_record_repl);
1014 simple_heap_update(rel, &tuple->t_self, newtuple);
1016 /* Update indexes */
1017 CatalogUpdateIndexes(rel, newtuple);
1019 systable_endscan(scan);
1021 /* Close pg_database, but keep lock till commit */
1022 heap_close(rel, NoLock);
1025 * We don't bother updating the flat file since the existing options for
1026 * ALTER DATABASE don't affect it.
1032 * ALTER DATABASE name SET ...
1034 void
1035 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
1037 char *valuestr;
1038 HeapTuple tuple,
1039 newtuple;
1040 Relation rel;
1041 ScanKeyData scankey;
1042 SysScanDesc scan;
1043 Datum repl_val[Natts_pg_database];
1044 char repl_null[Natts_pg_database];
1045 char repl_repl[Natts_pg_database];
1047 valuestr = ExtractSetVariableArgs(stmt->setstmt);
1050 * Get the old tuple. We don't need a lock on the database per se,
1051 * because we're not going to do anything that would mess up incoming
1052 * connections.
1054 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1055 ScanKeyInit(&scankey,
1056 Anum_pg_database_datname,
1057 BTEqualStrategyNumber, F_NAMEEQ,
1058 NameGetDatum(stmt->dbname));
1059 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1060 SnapshotNow, 1, &scankey);
1061 tuple = systable_getnext(scan);
1062 if (!HeapTupleIsValid(tuple))
1063 ereport(ERROR,
1064 (errcode(ERRCODE_UNDEFINED_DATABASE),
1065 errmsg("database \"%s\" does not exist", stmt->dbname)));
1067 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1068 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1069 stmt->dbname);
1071 memset(repl_repl, ' ', sizeof(repl_repl));
1072 repl_repl[Anum_pg_database_datconfig - 1] = 'r';
1074 if (stmt->setstmt->kind == VAR_RESET_ALL)
1076 /* RESET ALL, so just set datconfig to null */
1077 repl_null[Anum_pg_database_datconfig - 1] = 'n';
1078 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
1080 else
1082 Datum datum;
1083 bool isnull;
1084 ArrayType *a;
1086 repl_null[Anum_pg_database_datconfig - 1] = ' ';
1088 /* Extract old value of datconfig */
1089 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
1090 RelationGetDescr(rel), &isnull);
1091 a = isnull ? NULL : DatumGetArrayTypeP(datum);
1093 /* Update (valuestr is NULL in RESET cases) */
1094 if (valuestr)
1095 a = GUCArrayAdd(a, stmt->setstmt->name, valuestr);
1096 else
1097 a = GUCArrayDelete(a, stmt->setstmt->name);
1099 if (a)
1100 repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
1101 else
1102 repl_null[Anum_pg_database_datconfig - 1] = 'n';
1105 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel),
1106 repl_val, repl_null, repl_repl);
1107 simple_heap_update(rel, &tuple->t_self, newtuple);
1109 /* Update indexes */
1110 CatalogUpdateIndexes(rel, newtuple);
1112 systable_endscan(scan);
1114 /* Close pg_database, but keep lock till commit */
1115 heap_close(rel, NoLock);
1118 * We don't bother updating the flat file since ALTER DATABASE SET doesn't
1119 * affect it.
1125 * ALTER DATABASE name OWNER TO newowner
1127 void
1128 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
1130 HeapTuple tuple;
1131 Relation rel;
1132 ScanKeyData scankey;
1133 SysScanDesc scan;
1134 Form_pg_database datForm;
1137 * Get the old tuple. We don't need a lock on the database per se,
1138 * because we're not going to do anything that would mess up incoming
1139 * connections.
1141 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1142 ScanKeyInit(&scankey,
1143 Anum_pg_database_datname,
1144 BTEqualStrategyNumber, F_NAMEEQ,
1145 NameGetDatum(dbname));
1146 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1147 SnapshotNow, 1, &scankey);
1148 tuple = systable_getnext(scan);
1149 if (!HeapTupleIsValid(tuple))
1150 ereport(ERROR,
1151 (errcode(ERRCODE_UNDEFINED_DATABASE),
1152 errmsg("database \"%s\" does not exist", dbname)));
1154 datForm = (Form_pg_database) GETSTRUCT(tuple);
1157 * If the new owner is the same as the existing owner, consider the
1158 * command to have succeeded. This is to be consistent with other
1159 * objects.
1161 if (datForm->datdba != newOwnerId)
1163 Datum repl_val[Natts_pg_database];
1164 char repl_null[Natts_pg_database];
1165 char repl_repl[Natts_pg_database];
1166 Acl *newAcl;
1167 Datum aclDatum;
1168 bool isNull;
1169 HeapTuple newtuple;
1171 /* Otherwise, must be owner of the existing object */
1172 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1173 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1174 dbname);
1176 /* Must be able to become new owner */
1177 check_is_member_of_role(GetUserId(), newOwnerId);
1180 * must have createdb rights
1182 * NOTE: This is different from other alter-owner checks in that the
1183 * current user is checked for createdb privileges instead of the
1184 * destination owner. This is consistent with the CREATE case for
1185 * databases. Because superusers will always have this right, we need
1186 * no special case for them.
1188 if (!have_createdb_privilege())
1189 ereport(ERROR,
1190 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1191 errmsg("permission denied to change owner of database")));
1193 memset(repl_null, ' ', sizeof(repl_null));
1194 memset(repl_repl, ' ', sizeof(repl_repl));
1196 repl_repl[Anum_pg_database_datdba - 1] = 'r';
1197 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1200 * Determine the modified ACL for the new owner. This is only
1201 * necessary when the ACL is non-null.
1203 aclDatum = heap_getattr(tuple,
1204 Anum_pg_database_datacl,
1205 RelationGetDescr(rel),
1206 &isNull);
1207 if (!isNull)
1209 newAcl = aclnewowner(DatumGetAclP(aclDatum),
1210 datForm->datdba, newOwnerId);
1211 repl_repl[Anum_pg_database_datacl - 1] = 'r';
1212 repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1215 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1216 simple_heap_update(rel, &newtuple->t_self, newtuple);
1217 CatalogUpdateIndexes(rel, newtuple);
1219 heap_freetuple(newtuple);
1221 /* Update owner dependency reference */
1222 changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1223 newOwnerId);
1226 systable_endscan(scan);
1228 /* Close pg_database, but keep lock till commit */
1229 heap_close(rel, NoLock);
1232 * We don't bother updating the flat file since ALTER DATABASE OWNER
1233 * doesn't affect it.
1239 * Helper functions
1243 * Look up info about the database named "name". If the database exists,
1244 * obtain the specified lock type on it, fill in any of the remaining
1245 * parameters that aren't NULL, and return TRUE. If no such database,
1246 * return FALSE.
1248 static bool
1249 get_db_info(const char *name, LOCKMODE lockmode,
1250 Oid *dbIdP, Oid *ownerIdP,
1251 int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1252 Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1253 Oid *dbTablespace, char **dbCollate, char **dbCtype)
1255 bool result = false;
1256 Relation relation;
1258 AssertArg(name);
1260 /* Caller may wish to grab a better lock on pg_database beforehand... */
1261 relation = heap_open(DatabaseRelationId, AccessShareLock);
1264 * Loop covers the rare case where the database is renamed before we can
1265 * lock it. We try again just in case we can find a new one of the same
1266 * name.
1268 for (;;)
1270 ScanKeyData scanKey;
1271 SysScanDesc scan;
1272 HeapTuple tuple;
1273 Oid dbOid;
1276 * there's no syscache for database-indexed-by-name, so must do it the
1277 * hard way
1279 ScanKeyInit(&scanKey,
1280 Anum_pg_database_datname,
1281 BTEqualStrategyNumber, F_NAMEEQ,
1282 NameGetDatum(name));
1284 scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1285 SnapshotNow, 1, &scanKey);
1287 tuple = systable_getnext(scan);
1289 if (!HeapTupleIsValid(tuple))
1291 /* definitely no database of that name */
1292 systable_endscan(scan);
1293 break;
1296 dbOid = HeapTupleGetOid(tuple);
1298 systable_endscan(scan);
1301 * Now that we have a database OID, we can try to lock the DB.
1303 if (lockmode != NoLock)
1304 LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1307 * And now, re-fetch the tuple by OID. If it's still there and still
1308 * the same name, we win; else, drop the lock and loop back to try
1309 * again.
1311 tuple = SearchSysCache(DATABASEOID,
1312 ObjectIdGetDatum(dbOid),
1313 0, 0, 0);
1314 if (HeapTupleIsValid(tuple))
1316 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1318 if (strcmp(name, NameStr(dbform->datname)) == 0)
1320 /* oid of the database */
1321 if (dbIdP)
1322 *dbIdP = dbOid;
1323 /* oid of the owner */
1324 if (ownerIdP)
1325 *ownerIdP = dbform->datdba;
1326 /* character encoding */
1327 if (encodingP)
1328 *encodingP = dbform->encoding;
1329 /* allowed as template? */
1330 if (dbIsTemplateP)
1331 *dbIsTemplateP = dbform->datistemplate;
1332 /* allowing connections? */
1333 if (dbAllowConnP)
1334 *dbAllowConnP = dbform->datallowconn;
1335 /* last system OID used in database */
1336 if (dbLastSysOidP)
1337 *dbLastSysOidP = dbform->datlastsysoid;
1338 /* limit of frozen XIDs */
1339 if (dbFrozenXidP)
1340 *dbFrozenXidP = dbform->datfrozenxid;
1341 /* default tablespace for this database */
1342 if (dbTablespace)
1343 *dbTablespace = dbform->dattablespace;
1344 /* default locale settings for this database */
1345 if (dbCollate)
1346 *dbCollate = pstrdup(NameStr(dbform->datcollate));
1347 if (dbCtype)
1348 *dbCtype = pstrdup(NameStr(dbform->datctype));
1349 ReleaseSysCache(tuple);
1350 result = true;
1351 break;
1353 /* can only get here if it was just renamed */
1354 ReleaseSysCache(tuple);
1357 if (lockmode != NoLock)
1358 UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1361 heap_close(relation, AccessShareLock);
1363 return result;
1366 /* Check if current user has createdb privileges */
1367 static bool
1368 have_createdb_privilege(void)
1370 bool result = false;
1371 HeapTuple utup;
1373 /* Superusers can always do everything */
1374 if (superuser())
1375 return true;
1377 utup = SearchSysCache(AUTHOID,
1378 ObjectIdGetDatum(GetUserId()),
1379 0, 0, 0);
1380 if (HeapTupleIsValid(utup))
1382 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1383 ReleaseSysCache(utup);
1385 return result;
1389 * Remove tablespace directories
1391 * We don't know what tablespaces db_id is using, so iterate through all
1392 * tablespaces removing <tablespace>/db_id
1394 static void
1395 remove_dbtablespaces(Oid db_id)
1397 Relation rel;
1398 HeapScanDesc scan;
1399 HeapTuple tuple;
1401 rel = heap_open(TableSpaceRelationId, AccessShareLock);
1402 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1403 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1405 Oid dsttablespace = HeapTupleGetOid(tuple);
1406 char *dstpath;
1407 struct stat st;
1409 /* Don't mess with the global tablespace */
1410 if (dsttablespace == GLOBALTABLESPACE_OID)
1411 continue;
1413 dstpath = GetDatabasePath(db_id, dsttablespace);
1415 if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1417 /* Assume we can ignore it */
1418 pfree(dstpath);
1419 continue;
1422 if (!rmtree(dstpath, true))
1423 ereport(WARNING,
1424 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1425 dstpath)));
1427 /* Record the filesystem change in XLOG */
1429 xl_dbase_drop_rec xlrec;
1430 XLogRecData rdata[1];
1432 xlrec.db_id = db_id;
1433 xlrec.tablespace_id = dsttablespace;
1435 rdata[0].data = (char *) &xlrec;
1436 rdata[0].len = sizeof(xl_dbase_drop_rec);
1437 rdata[0].buffer = InvalidBuffer;
1438 rdata[0].next = NULL;
1440 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1443 pfree(dstpath);
1446 heap_endscan(scan);
1447 heap_close(rel, AccessShareLock);
1451 * Check for existing files that conflict with a proposed new DB OID;
1452 * return TRUE if there are any
1454 * If there were a subdirectory in any tablespace matching the proposed new
1455 * OID, we'd get a create failure due to the duplicate name ... and then we'd
1456 * try to remove that already-existing subdirectory during the cleanup in
1457 * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
1458 * instead we make this extra check before settling on the OID of the new
1459 * database. This exactly parallels what GetNewRelFileNode() does for table
1460 * relfilenode values.
1462 static bool
1463 check_db_file_conflict(Oid db_id)
1465 bool result = false;
1466 Relation rel;
1467 HeapScanDesc scan;
1468 HeapTuple tuple;
1470 rel = heap_open(TableSpaceRelationId, AccessShareLock);
1471 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1472 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1474 Oid dsttablespace = HeapTupleGetOid(tuple);
1475 char *dstpath;
1476 struct stat st;
1478 /* Don't mess with the global tablespace */
1479 if (dsttablespace == GLOBALTABLESPACE_OID)
1480 continue;
1482 dstpath = GetDatabasePath(db_id, dsttablespace);
1484 if (lstat(dstpath, &st) == 0)
1486 /* Found a conflicting file (or directory, whatever) */
1487 pfree(dstpath);
1488 result = true;
1489 break;
1492 pfree(dstpath);
1495 heap_endscan(scan);
1496 heap_close(rel, AccessShareLock);
1497 return result;
1501 * Issue a suitable errdetail message for a busy database
1503 static int
1504 errdetail_busy_db(int notherbackends, int npreparedxacts)
1507 * We don't worry about singular versus plural here, since the English
1508 * rules for that don't translate very well. But we can at least avoid
1509 * the case of zero items.
1511 if (notherbackends > 0 && npreparedxacts > 0)
1512 errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
1513 notherbackends, npreparedxacts);
1514 else if (notherbackends > 0)
1515 errdetail("There are %d other session(s) using the database.",
1516 notherbackends);
1517 else
1518 errdetail("There are %d prepared transaction(s) using the database.",
1519 npreparedxacts);
1520 return 0; /* just to keep ereport macro happy */
1524 * get_database_oid - given a database name, look up the OID
1526 * Returns InvalidOid if database name not found.
1529 get_database_oid(const char *dbname)
1531 Relation pg_database;
1532 ScanKeyData entry[1];
1533 SysScanDesc scan;
1534 HeapTuple dbtuple;
1535 Oid oid;
1538 * There's no syscache for pg_database indexed by name, so we must look
1539 * the hard way.
1541 pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1542 ScanKeyInit(&entry[0],
1543 Anum_pg_database_datname,
1544 BTEqualStrategyNumber, F_NAMEEQ,
1545 CStringGetDatum(dbname));
1546 scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1547 SnapshotNow, 1, entry);
1549 dbtuple = systable_getnext(scan);
1551 /* We assume that there can be at most one matching tuple */
1552 if (HeapTupleIsValid(dbtuple))
1553 oid = HeapTupleGetOid(dbtuple);
1554 else
1555 oid = InvalidOid;
1557 systable_endscan(scan);
1558 heap_close(pg_database, AccessShareLock);
1560 return oid;
1565 * get_database_name - given a database OID, look up the name
1567 * Returns a palloc'd string, or NULL if no such database.
1569 char *
1570 get_database_name(Oid dbid)
1572 HeapTuple dbtuple;
1573 char *result;
1575 dbtuple = SearchSysCache(DATABASEOID,
1576 ObjectIdGetDatum(dbid),
1577 0, 0, 0);
1578 if (HeapTupleIsValid(dbtuple))
1580 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1581 ReleaseSysCache(dbtuple);
1583 else
1584 result = NULL;
1586 return result;
1590 * DATABASE resource manager's routines
1592 void
1593 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1595 uint8 info = record->xl_info & ~XLR_INFO_MASK;
1597 if (info == XLOG_DBASE_CREATE)
1599 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1600 char *src_path;
1601 char *dst_path;
1602 struct stat st;
1604 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
1605 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1608 * Our theory for replaying a CREATE is to forcibly drop the target
1609 * subdirectory if present, then re-copy the source data. This may be
1610 * more work than needed, but it is simple to implement.
1612 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1614 if (!rmtree(dst_path, true))
1615 ereport(WARNING,
1616 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1617 dst_path)));
1621 * Force dirty buffers out to disk, to ensure source database is
1622 * up-to-date for the copy.
1624 FlushDatabaseBuffers(xlrec->src_db_id);
1627 * Copy this subdirectory to the new location
1629 * We don't need to copy subdirectories
1631 copydir(src_path, dst_path, false);
1633 else if (info == XLOG_DBASE_DROP)
1635 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1636 char *dst_path;
1638 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1640 /* Drop pages for this database that are in the shared buffer cache */
1641 DropDatabaseBuffers(xlrec->db_id);
1643 /* Also, clean out any entries in the shared free space map */
1644 FreeSpaceMapForgetDatabase(xlrec->db_id);
1646 /* Also, clean out any fsync requests that might be pending in md.c */
1647 ForgetDatabaseFsyncRequests(xlrec->db_id);
1649 /* Clean out the xlog relcache too */
1650 XLogDropDatabase(xlrec->db_id);
1652 /* And remove the physical files */
1653 if (!rmtree(dst_path, true))
1654 ereport(WARNING,
1655 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1656 dst_path)));
1658 else
1659 elog(PANIC, "dbase_redo: unknown op code %u", info);
1662 void
1663 dbase_desc(StringInfo buf, uint8 xl_info, char *rec)
1665 uint8 info = xl_info & ~XLR_INFO_MASK;
1667 if (info == XLOG_DBASE_CREATE)
1669 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1671 appendStringInfo(buf, "create db: copy dir %u/%u to %u/%u",
1672 xlrec->src_db_id, xlrec->src_tablespace_id,
1673 xlrec->db_id, xlrec->tablespace_id);
1675 else if (info == XLOG_DBASE_DROP)
1677 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1679 appendStringInfo(buf, "drop db: dir %u/%u",
1680 xlrec->db_id, xlrec->tablespace_id);
1682 else
1683 appendStringInfo(buf, "UNKNOWN");