Force a checkpoint in CREATE DATABASE before starting to copy the files,
[PostgreSQL.git] / src / backend / commands / dbcommands.c
blob600636e037f200e8b85fa1390c86b2e22b82bf71
1 /*-------------------------------------------------------------------------
3 * dbcommands.c
4 * Database management commands (create/drop database).
6 * Note: database creation/destruction commands use exclusive locks on
7 * the database objects (as expressed by LockSharedObject()) to avoid
8 * stepping on each others' toes. Formerly we used table-level locks
9 * on pg_database, but that's too coarse-grained.
11 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
15 * IDENTIFICATION
16 * $PostgreSQL$
18 *-------------------------------------------------------------------------
20 #include "postgres.h"
22 #include <fcntl.h>
23 #include <locale.h>
24 #include <unistd.h>
25 #include <sys/stat.h>
27 #include "access/genam.h"
28 #include "access/heapam.h"
29 #include "access/xact.h"
30 #include "access/xlogutils.h"
31 #include "catalog/catalog.h"
32 #include "catalog/dependency.h"
33 #include "catalog/indexing.h"
34 #include "catalog/pg_authid.h"
35 #include "catalog/pg_database.h"
36 #include "catalog/pg_tablespace.h"
37 #include "commands/comment.h"
38 #include "commands/dbcommands.h"
39 #include "commands/tablespace.h"
40 #include "mb/pg_wchar.h"
41 #include "miscadmin.h"
42 #include "pgstat.h"
43 #include "postmaster/bgwriter.h"
44 #include "storage/bufmgr.h"
45 #include "storage/lmgr.h"
46 #include "storage/ipc.h"
47 #include "storage/procarray.h"
48 #include "storage/smgr.h"
49 #include "utils/acl.h"
50 #include "utils/builtins.h"
51 #include "utils/flatfiles.h"
52 #include "utils/fmgroids.h"
53 #include "utils/guc.h"
54 #include "utils/lsyscache.h"
55 #include "utils/pg_locale.h"
56 #include "utils/syscache.h"
57 #include "utils/tqual.h"
60 typedef struct
62 Oid src_dboid; /* source (template) DB */
63 Oid dest_dboid; /* DB we are trying to create */
64 } createdb_failure_params;
66 /* non-export function prototypes */
67 static void createdb_failure_callback(int code, Datum arg);
68 static bool get_db_info(const char *name, LOCKMODE lockmode,
69 Oid *dbIdP, Oid *ownerIdP,
70 int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
71 Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
72 Oid *dbTablespace, char **dbCollate, char **dbCtype);
73 static bool have_createdb_privilege(void);
74 static void remove_dbtablespaces(Oid db_id);
75 static bool check_db_file_conflict(Oid db_id);
76 static int errdetail_busy_db(int notherbackends, int npreparedxacts);
80 * CREATE DATABASE
82 void
83 createdb(const CreatedbStmt *stmt)
85 HeapScanDesc scan;
86 Relation rel;
87 Oid src_dboid;
88 Oid src_owner;
89 int src_encoding;
90 char *src_collate;
91 char *src_ctype;
92 bool src_istemplate;
93 bool src_allowconn;
94 Oid src_lastsysoid;
95 TransactionId src_frozenxid;
96 Oid src_deftablespace;
97 volatile Oid dst_deftablespace;
98 Relation pg_database_rel;
99 HeapTuple tuple;
100 Datum new_record[Natts_pg_database];
101 char new_record_nulls[Natts_pg_database];
102 Oid dboid;
103 Oid datdba;
104 ListCell *option;
105 DefElem *dtablespacename = NULL;
106 DefElem *downer = NULL;
107 DefElem *dtemplate = NULL;
108 DefElem *dencoding = NULL;
109 DefElem *dcollate = NULL;
110 DefElem *dctype = NULL;
111 DefElem *dconnlimit = NULL;
112 char *dbname = stmt->dbname;
113 char *dbowner = NULL;
114 const char *dbtemplate = NULL;
115 char *dbcollate = NULL;
116 char *dbctype = NULL;
117 int encoding = -1;
118 int dbconnlimit = -1;
119 int ctype_encoding;
120 int collate_encoding;
121 int notherbackends;
122 int npreparedxacts;
123 createdb_failure_params fparms;
125 /* Extract options from the statement node tree */
126 foreach(option, stmt->options)
128 DefElem *defel = (DefElem *) lfirst(option);
130 if (strcmp(defel->defname, "tablespace") == 0)
132 if (dtablespacename)
133 ereport(ERROR,
134 (errcode(ERRCODE_SYNTAX_ERROR),
135 errmsg("conflicting or redundant options")));
136 dtablespacename = defel;
138 else if (strcmp(defel->defname, "owner") == 0)
140 if (downer)
141 ereport(ERROR,
142 (errcode(ERRCODE_SYNTAX_ERROR),
143 errmsg("conflicting or redundant options")));
144 downer = defel;
146 else if (strcmp(defel->defname, "template") == 0)
148 if (dtemplate)
149 ereport(ERROR,
150 (errcode(ERRCODE_SYNTAX_ERROR),
151 errmsg("conflicting or redundant options")));
152 dtemplate = defel;
154 else if (strcmp(defel->defname, "encoding") == 0)
156 if (dencoding)
157 ereport(ERROR,
158 (errcode(ERRCODE_SYNTAX_ERROR),
159 errmsg("conflicting or redundant options")));
160 dencoding = defel;
162 else if (strcmp(defel->defname, "collate") == 0)
164 if (dcollate)
165 ereport(ERROR,
166 (errcode(ERRCODE_SYNTAX_ERROR),
167 errmsg("conflicting or redundant options")));
168 dcollate = defel;
170 else if (strcmp(defel->defname, "ctype") == 0)
172 if (dctype)
173 ereport(ERROR,
174 (errcode(ERRCODE_SYNTAX_ERROR),
175 errmsg("conflicting or redundant options")));
176 dctype = defel;
178 else if (strcmp(defel->defname, "connectionlimit") == 0)
180 if (dconnlimit)
181 ereport(ERROR,
182 (errcode(ERRCODE_SYNTAX_ERROR),
183 errmsg("conflicting or redundant options")));
184 dconnlimit = defel;
186 else if (strcmp(defel->defname, "location") == 0)
188 ereport(WARNING,
189 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
190 errmsg("LOCATION is not supported anymore"),
191 errhint("Consider using tablespaces instead.")));
193 else
194 elog(ERROR, "option \"%s\" not recognized",
195 defel->defname);
198 if (downer && downer->arg)
199 dbowner = strVal(downer->arg);
200 if (dtemplate && dtemplate->arg)
201 dbtemplate = strVal(dtemplate->arg);
202 if (dencoding && dencoding->arg)
204 const char *encoding_name;
206 if (IsA(dencoding->arg, Integer))
208 encoding = intVal(dencoding->arg);
209 encoding_name = pg_encoding_to_char(encoding);
210 if (strcmp(encoding_name, "") == 0 ||
211 pg_valid_server_encoding(encoding_name) < 0)
212 ereport(ERROR,
213 (errcode(ERRCODE_UNDEFINED_OBJECT),
214 errmsg("%d is not a valid encoding code",
215 encoding)));
217 else if (IsA(dencoding->arg, String))
219 encoding_name = strVal(dencoding->arg);
220 encoding = pg_valid_server_encoding(encoding_name);
221 if (encoding < 0)
222 ereport(ERROR,
223 (errcode(ERRCODE_UNDEFINED_OBJECT),
224 errmsg("%s is not a valid encoding name",
225 encoding_name)));
227 else
228 elog(ERROR, "unrecognized node type: %d",
229 nodeTag(dencoding->arg));
231 if (dcollate && dcollate->arg)
232 dbcollate = strVal(dcollate->arg);
233 if (dctype && dctype->arg)
234 dbctype = strVal(dctype->arg);
236 if (dconnlimit && dconnlimit->arg)
237 dbconnlimit = intVal(dconnlimit->arg);
239 /* obtain OID of proposed owner */
240 if (dbowner)
241 datdba = get_roleid_checked(dbowner);
242 else
243 datdba = GetUserId();
246 * To create a database, must have createdb privilege and must be able to
247 * become the target role (this does not imply that the target role itself
248 * must have createdb privilege). The latter provision guards against
249 * "giveaway" attacks. Note that a superuser will always have both of
250 * these privileges a fortiori.
252 if (!have_createdb_privilege())
253 ereport(ERROR,
254 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
255 errmsg("permission denied to create database")));
257 check_is_member_of_role(GetUserId(), datdba);
260 * Lookup database (template) to be cloned, and obtain share lock on it.
261 * ShareLock allows two CREATE DATABASEs to work from the same template
262 * concurrently, while ensuring no one is busy dropping it in parallel
263 * (which would be Very Bad since we'd likely get an incomplete copy
264 * without knowing it). This also prevents any new connections from being
265 * made to the source until we finish copying it, so we can be sure it
266 * won't change underneath us.
268 if (!dbtemplate)
269 dbtemplate = "template1"; /* Default template database name */
271 if (!get_db_info(dbtemplate, ShareLock,
272 &src_dboid, &src_owner, &src_encoding,
273 &src_istemplate, &src_allowconn, &src_lastsysoid,
274 &src_frozenxid, &src_deftablespace,
275 &src_collate, &src_ctype))
276 ereport(ERROR,
277 (errcode(ERRCODE_UNDEFINED_DATABASE),
278 errmsg("template database \"%s\" does not exist",
279 dbtemplate)));
282 * Permission check: to copy a DB that's not marked datistemplate, you
283 * must be superuser or the owner thereof.
285 if (!src_istemplate)
287 if (!pg_database_ownercheck(src_dboid, GetUserId()))
288 ereport(ERROR,
289 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
290 errmsg("permission denied to copy database \"%s\"",
291 dbtemplate)));
294 /* If encoding or locales are defaulted, use source's setting */
295 if (encoding < 0)
296 encoding = src_encoding;
297 if (dbcollate == NULL)
298 dbcollate = src_collate;
299 if (dbctype == NULL)
300 dbctype = src_ctype;
302 /* Some encodings are client only */
303 if (!PG_VALID_BE_ENCODING(encoding))
304 ereport(ERROR,
305 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
306 errmsg("invalid server encoding %d", encoding)));
308 /* Check that the chosen locales are valid */
309 if (!check_locale(LC_COLLATE, dbcollate))
310 ereport(ERROR,
311 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
312 errmsg("invalid locale name %s", dbcollate)));
313 if (!check_locale(LC_CTYPE, dbctype))
314 ereport(ERROR,
315 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
316 errmsg("invalid locale name %s", dbctype)));
319 * Check whether encoding matches server locale settings. We allow
320 * mismatch in three cases:
322 * 1. ctype_encoding = SQL_ASCII, which means either that the locale is
323 * C/POSIX which works with any encoding, or that we couldn't determine
324 * the locale's encoding and have to trust the user to get it right.
326 * 2. selected encoding is SQL_ASCII, but only if you're a superuser. This
327 * is risky but we have historically allowed it --- notably, the
328 * regression tests require it.
330 * 3. selected encoding is UTF8 and platform is win32. This is because
331 * UTF8 is a pseudo codepage that is supported in all locales since it's
332 * converted to UTF16 before being used.
334 * Note: if you change this policy, fix initdb to match.
336 ctype_encoding = pg_get_encoding_from_locale(dbctype);
337 collate_encoding = pg_get_encoding_from_locale(dbcollate);
339 if (!(ctype_encoding == encoding ||
340 ctype_encoding == PG_SQL_ASCII ||
341 #ifdef WIN32
342 encoding == PG_UTF8 ||
343 #endif
344 (encoding == PG_SQL_ASCII && superuser())))
345 ereport(ERROR,
346 (errmsg("encoding %s does not match locale %s",
347 pg_encoding_to_char(encoding),
348 dbctype),
349 errdetail("The chosen CTYPE setting requires encoding %s.",
350 pg_encoding_to_char(ctype_encoding))));
352 if (!(collate_encoding == encoding ||
353 collate_encoding == PG_SQL_ASCII ||
354 #ifdef WIN32
355 encoding == PG_UTF8 ||
356 #endif
357 (encoding == PG_SQL_ASCII && superuser())))
358 ereport(ERROR,
359 (errmsg("encoding %s does not match locale %s",
360 pg_encoding_to_char(encoding),
361 dbcollate),
362 errdetail("The chosen COLLATE setting requires encoding %s.",
363 pg_encoding_to_char(collate_encoding))));
366 * Check that the new locale is compatible with the source database.
368 * We know that template0 doesn't contain any indexes that depend on
369 * collation or ctype, so template0 can be used as template for
370 * any locale.
372 if (strcmp(dbtemplate, "template0") != 0)
374 if (strcmp(dbcollate, src_collate))
375 ereport(ERROR,
376 (errmsg("new collation is incompatible with the collation of the template database (%s)", src_collate),
377 errhint("Use the same collation as in the template database, or use template0 as template")));
379 if (strcmp(dbctype, src_ctype))
380 ereport(ERROR,
381 (errmsg("new ctype is incompatible with the ctype of the template database (%s)", src_ctype),
382 errhint("Use the same ctype as in the template database, or use template0 as template")));
385 /* Resolve default tablespace for new database */
386 if (dtablespacename && dtablespacename->arg)
388 char *tablespacename;
389 AclResult aclresult;
391 tablespacename = strVal(dtablespacename->arg);
392 dst_deftablespace = get_tablespace_oid(tablespacename);
393 if (!OidIsValid(dst_deftablespace))
394 ereport(ERROR,
395 (errcode(ERRCODE_UNDEFINED_OBJECT),
396 errmsg("tablespace \"%s\" does not exist",
397 tablespacename)));
398 /* check permissions */
399 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
400 ACL_CREATE);
401 if (aclresult != ACLCHECK_OK)
402 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
403 tablespacename);
405 /* pg_global must never be the default tablespace */
406 if (dst_deftablespace == GLOBALTABLESPACE_OID)
407 ereport(ERROR,
408 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
409 errmsg("pg_global cannot be used as default tablespace")));
412 * If we are trying to change the default tablespace of the template,
413 * we require that the template not have any files in the new default
414 * tablespace. This is necessary because otherwise the copied
415 * database would contain pg_class rows that refer to its default
416 * tablespace both explicitly (by OID) and implicitly (as zero), which
417 * would cause problems. For example another CREATE DATABASE using
418 * the copied database as template, and trying to change its default
419 * tablespace again, would yield outright incorrect results (it would
420 * improperly move tables to the new default tablespace that should
421 * stay in the same tablespace).
423 if (dst_deftablespace != src_deftablespace)
425 char *srcpath;
426 struct stat st;
428 srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
430 if (stat(srcpath, &st) == 0 &&
431 S_ISDIR(st.st_mode) &&
432 !directory_is_empty(srcpath))
433 ereport(ERROR,
434 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
435 errmsg("cannot assign new default tablespace \"%s\"",
436 tablespacename),
437 errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
438 dbtemplate)));
439 pfree(srcpath);
442 else
444 /* Use template database's default tablespace */
445 dst_deftablespace = src_deftablespace;
446 /* Note there is no additional permission check in this path */
450 * Check for db name conflict. This is just to give a more friendly error
451 * message than "unique index violation". There's a race condition but
452 * we're willing to accept the less friendly message in that case.
454 if (OidIsValid(get_database_oid(dbname)))
455 ereport(ERROR,
456 (errcode(ERRCODE_DUPLICATE_DATABASE),
457 errmsg("database \"%s\" already exists", dbname)));
460 * The source DB can't have any active backends, except this one
461 * (exception is to allow CREATE DB while connected to template1).
462 * Otherwise we might copy inconsistent data.
464 * This should be last among the basic error checks, because it involves
465 * potential waiting; we may as well throw an error first if we're gonna
466 * throw one.
468 if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
469 ereport(ERROR,
470 (errcode(ERRCODE_OBJECT_IN_USE),
471 errmsg("source database \"%s\" is being accessed by other users",
472 dbtemplate),
473 errdetail_busy_db(notherbackends, npreparedxacts)));
476 * Select an OID for the new database, checking that it doesn't have a
477 * filename conflict with anything already existing in the tablespace
478 * directories.
480 pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
484 dboid = GetNewOid(pg_database_rel);
485 } while (check_db_file_conflict(dboid));
488 * Insert a new tuple into pg_database. This establishes our ownership of
489 * the new database name (anyone else trying to insert the same name will
490 * block on the unique index, and fail after we commit).
493 /* Form tuple */
494 MemSet(new_record, 0, sizeof(new_record));
495 MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
497 new_record[Anum_pg_database_datname - 1] =
498 DirectFunctionCall1(namein, CStringGetDatum(dbname));
499 new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
500 new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
501 new_record[Anum_pg_database_datcollate - 1] =
502 DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
503 new_record[Anum_pg_database_datctype - 1] =
504 DirectFunctionCall1(namein, CStringGetDatum(dbctype));
505 new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false);
506 new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true);
507 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
508 new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
509 new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
510 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
513 * We deliberately set datconfig and datacl to defaults (NULL), rather
514 * than copying them from the template database. Copying datacl would be
515 * a bad idea when the owner is not the same as the template's owner. It's
516 * more debatable whether datconfig should be copied.
518 new_record_nulls[Anum_pg_database_datconfig - 1] = 'n';
519 new_record_nulls[Anum_pg_database_datacl - 1] = 'n';
521 tuple = heap_formtuple(RelationGetDescr(pg_database_rel),
522 new_record, new_record_nulls);
524 HeapTupleSetOid(tuple, dboid);
526 simple_heap_insert(pg_database_rel, tuple);
528 /* Update indexes */
529 CatalogUpdateIndexes(pg_database_rel, tuple);
532 * Now generate additional catalog entries associated with the new DB
535 /* Register owner dependency */
536 recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
538 /* Create pg_shdepend entries for objects within database */
539 copyTemplateDependencies(src_dboid, dboid);
542 * Force a checkpoint before starting the copy. This will force dirty
543 * buffers out to disk, to ensure source database is up-to-date on disk
544 * for the copy. FlushDatabaseBuffers() would suffice for that, but we
545 * also want to process any pending unlink requests. Otherwise, if a
546 * checkpoint happened while we're copying files, a file might be deleted
547 * just when we're about to copy it, causing the lstat() call in copydir()
548 * to fail with ENOENT.
550 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
553 * Once we start copying subdirectories, we need to be able to clean 'em
554 * up if we fail. Use an ENSURE block to make sure this happens. (This
555 * is not a 100% solution, because of the possibility of failure during
556 * transaction commit after we leave this routine, but it should handle
557 * most scenarios.)
559 fparms.src_dboid = src_dboid;
560 fparms.dest_dboid = dboid;
561 PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
562 PointerGetDatum(&fparms));
565 * Iterate through all tablespaces of the template database, and copy
566 * each one to the new database.
568 rel = heap_open(TableSpaceRelationId, AccessShareLock);
569 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
570 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
572 Oid srctablespace = HeapTupleGetOid(tuple);
573 Oid dsttablespace;
574 char *srcpath;
575 char *dstpath;
576 struct stat st;
578 /* No need to copy global tablespace */
579 if (srctablespace == GLOBALTABLESPACE_OID)
580 continue;
582 srcpath = GetDatabasePath(src_dboid, srctablespace);
584 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
585 directory_is_empty(srcpath))
587 /* Assume we can ignore it */
588 pfree(srcpath);
589 continue;
592 if (srctablespace == src_deftablespace)
593 dsttablespace = dst_deftablespace;
594 else
595 dsttablespace = srctablespace;
597 dstpath = GetDatabasePath(dboid, dsttablespace);
600 * Copy this subdirectory to the new location
602 * We don't need to copy subdirectories
604 copydir(srcpath, dstpath, false);
606 /* Record the filesystem change in XLOG */
608 xl_dbase_create_rec xlrec;
609 XLogRecData rdata[1];
611 xlrec.db_id = dboid;
612 xlrec.tablespace_id = dsttablespace;
613 xlrec.src_db_id = src_dboid;
614 xlrec.src_tablespace_id = srctablespace;
616 rdata[0].data = (char *) &xlrec;
617 rdata[0].len = sizeof(xl_dbase_create_rec);
618 rdata[0].buffer = InvalidBuffer;
619 rdata[0].next = NULL;
621 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE, rdata);
624 heap_endscan(scan);
625 heap_close(rel, AccessShareLock);
628 * We force a checkpoint before committing. This effectively means
629 * that committed XLOG_DBASE_CREATE operations will never need to be
630 * replayed (at least not in ordinary crash recovery; we still have to
631 * make the XLOG entry for the benefit of PITR operations). This
632 * avoids two nasty scenarios:
634 * #1: When PITR is off, we don't XLOG the contents of newly created
635 * indexes; therefore the drop-and-recreate-whole-directory behavior
636 * of DBASE_CREATE replay would lose such indexes.
638 * #2: Since we have to recopy the source database during DBASE_CREATE
639 * replay, we run the risk of copying changes in it that were
640 * committed after the original CREATE DATABASE command but before the
641 * system crash that led to the replay. This is at least unexpected
642 * and at worst could lead to inconsistencies, eg duplicate table
643 * names.
645 * (Both of these were real bugs in releases 8.0 through 8.0.3.)
647 * In PITR replay, the first of these isn't an issue, and the second
648 * is only a risk if the CREATE DATABASE and subsequent template
649 * database change both occur while a base backup is being taken.
650 * There doesn't seem to be much we can do about that except document
651 * it as a limitation.
653 * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
654 * we can avoid this.
656 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
659 * Close pg_database, but keep lock till commit (this is important to
660 * prevent any risk of deadlock failure while updating flat file)
662 heap_close(pg_database_rel, NoLock);
665 * Set flag to update flat database file at commit. Note: this also
666 * forces synchronous commit, which minimizes the window between
667 * creation of the database files and commital of the transaction. If
668 * we crash before committing, we'll have a DB that's taking up disk
669 * space but is not in pg_database, which is not good.
671 database_file_update_needed();
673 PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
674 PointerGetDatum(&fparms));
677 /* Error cleanup callback for createdb */
678 static void
679 createdb_failure_callback(int code, Datum arg)
681 createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
684 * Release lock on source database before doing recursive remove.
685 * This is not essential but it seems desirable to release the lock
686 * as soon as possible.
688 UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
690 /* Throw away any successfully copied subdirectories */
691 remove_dbtablespaces(fparms->dest_dboid);
696 * DROP DATABASE
698 void
699 dropdb(const char *dbname, bool missing_ok)
701 Oid db_id;
702 bool db_istemplate;
703 Relation pgdbrel;
704 HeapTuple tup;
705 int notherbackends;
706 int npreparedxacts;
709 * Look up the target database's OID, and get exclusive lock on it. We
710 * need this to ensure that no new backend starts up in the target
711 * database while we are deleting it (see postinit.c), and that no one is
712 * using it as a CREATE DATABASE template or trying to delete it for
713 * themselves.
715 pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
717 if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
718 &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL))
720 if (!missing_ok)
722 ereport(ERROR,
723 (errcode(ERRCODE_UNDEFINED_DATABASE),
724 errmsg("database \"%s\" does not exist", dbname)));
726 else
728 /* Close pg_database, release the lock, since we changed nothing */
729 heap_close(pgdbrel, RowExclusiveLock);
730 ereport(NOTICE,
731 (errmsg("database \"%s\" does not exist, skipping",
732 dbname)));
733 return;
738 * Permission checks
740 if (!pg_database_ownercheck(db_id, GetUserId()))
741 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
742 dbname);
745 * Disallow dropping a DB that is marked istemplate. This is just to
746 * prevent people from accidentally dropping template0 or template1; they
747 * can do so if they're really determined ...
749 if (db_istemplate)
750 ereport(ERROR,
751 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
752 errmsg("cannot drop a template database")));
754 /* Obviously can't drop my own database */
755 if (db_id == MyDatabaseId)
756 ereport(ERROR,
757 (errcode(ERRCODE_OBJECT_IN_USE),
758 errmsg("cannot drop the currently open database")));
761 * Check for other backends in the target database. (Because we hold the
762 * database lock, no new ones can start after this.)
764 * As in CREATE DATABASE, check this after other error conditions.
766 if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
767 ereport(ERROR,
768 (errcode(ERRCODE_OBJECT_IN_USE),
769 errmsg("database \"%s\" is being accessed by other users",
770 dbname),
771 errdetail_busy_db(notherbackends, npreparedxacts)));
774 * Remove the database's tuple from pg_database.
776 tup = SearchSysCache(DATABASEOID,
777 ObjectIdGetDatum(db_id),
778 0, 0, 0);
779 if (!HeapTupleIsValid(tup))
780 elog(ERROR, "cache lookup failed for database %u", db_id);
782 simple_heap_delete(pgdbrel, &tup->t_self);
784 ReleaseSysCache(tup);
787 * Delete any comments associated with the database.
789 DeleteSharedComments(db_id, DatabaseRelationId);
792 * Remove shared dependency references for the database.
794 dropDatabaseDependencies(db_id);
797 * Drop pages for this database that are in the shared buffer cache. This
798 * is important to ensure that no remaining backend tries to write out a
799 * dirty buffer to the dead database later...
801 DropDatabaseBuffers(db_id);
804 * Tell the stats collector to forget it immediately, too.
806 pgstat_drop_database(db_id);
809 * Tell bgwriter to forget any pending fsync and unlink requests for files
810 * in the database; else the fsyncs will fail at next checkpoint, or worse,
811 * it will delete files that belong to a newly created database with the
812 * same OID.
814 ForgetDatabaseFsyncRequests(db_id);
817 * Force a checkpoint to make sure the bgwriter has received the message
818 * sent by ForgetDatabaseFsyncRequests. On Windows, this also ensures that
819 * the bgwriter doesn't hold any open files, which would cause rmdir() to
820 * fail.
822 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
825 * Remove all tablespace subdirs belonging to the database.
827 remove_dbtablespaces(db_id);
830 * Close pg_database, but keep lock till commit (this is important to
831 * prevent any risk of deadlock failure while updating flat file)
833 heap_close(pgdbrel, NoLock);
836 * Set flag to update flat database file at commit. Note: this also
837 * forces synchronous commit, which minimizes the window between removal
838 * of the database files and commital of the transaction. If we crash
839 * before committing, we'll have a DB that's gone on disk but still there
840 * according to pg_database, which is not good.
842 database_file_update_needed();
847 * Rename database
849 void
850 RenameDatabase(const char *oldname, const char *newname)
852 Oid db_id;
853 HeapTuple newtup;
854 Relation rel;
855 int notherbackends;
856 int npreparedxacts;
859 * Look up the target database's OID, and get exclusive lock on it. We
860 * need this for the same reasons as DROP DATABASE.
862 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
864 if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
865 NULL, NULL, NULL, NULL, NULL, NULL, NULL))
866 ereport(ERROR,
867 (errcode(ERRCODE_UNDEFINED_DATABASE),
868 errmsg("database \"%s\" does not exist", oldname)));
870 /* must be owner */
871 if (!pg_database_ownercheck(db_id, GetUserId()))
872 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
873 oldname);
875 /* must have createdb rights */
876 if (!have_createdb_privilege())
877 ereport(ERROR,
878 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
879 errmsg("permission denied to rename database")));
882 * Make sure the new name doesn't exist. See notes for same error in
883 * CREATE DATABASE.
885 if (OidIsValid(get_database_oid(newname)))
886 ereport(ERROR,
887 (errcode(ERRCODE_DUPLICATE_DATABASE),
888 errmsg("database \"%s\" already exists", newname)));
891 * XXX Client applications probably store the current database somewhere,
892 * so renaming it could cause confusion. On the other hand, there may not
893 * be an actual problem besides a little confusion, so think about this
894 * and decide.
896 if (db_id == MyDatabaseId)
897 ereport(ERROR,
898 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
899 errmsg("current database cannot be renamed")));
902 * Make sure the database does not have active sessions. This is the same
903 * concern as above, but applied to other sessions.
905 * As in CREATE DATABASE, check this after other error conditions.
907 if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
908 ereport(ERROR,
909 (errcode(ERRCODE_OBJECT_IN_USE),
910 errmsg("database \"%s\" is being accessed by other users",
911 oldname),
912 errdetail_busy_db(notherbackends, npreparedxacts)));
914 /* rename */
915 newtup = SearchSysCacheCopy(DATABASEOID,
916 ObjectIdGetDatum(db_id),
917 0, 0, 0);
918 if (!HeapTupleIsValid(newtup))
919 elog(ERROR, "cache lookup failed for database %u", db_id);
920 namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
921 simple_heap_update(rel, &newtup->t_self, newtup);
922 CatalogUpdateIndexes(rel, newtup);
925 * Close pg_database, but keep lock till commit (this is important to
926 * prevent any risk of deadlock failure while updating flat file)
928 heap_close(rel, NoLock);
931 * Set flag to update flat database file at commit.
933 database_file_update_needed();
938 * ALTER DATABASE name ...
940 void
941 AlterDatabase(AlterDatabaseStmt *stmt)
943 Relation rel;
944 HeapTuple tuple,
945 newtuple;
946 ScanKeyData scankey;
947 SysScanDesc scan;
948 ListCell *option;
949 int connlimit = -1;
950 DefElem *dconnlimit = NULL;
951 Datum new_record[Natts_pg_database];
952 char new_record_nulls[Natts_pg_database];
953 char new_record_repl[Natts_pg_database];
955 /* Extract options from the statement node tree */
956 foreach(option, stmt->options)
958 DefElem *defel = (DefElem *) lfirst(option);
960 if (strcmp(defel->defname, "connectionlimit") == 0)
962 if (dconnlimit)
963 ereport(ERROR,
964 (errcode(ERRCODE_SYNTAX_ERROR),
965 errmsg("conflicting or redundant options")));
966 dconnlimit = defel;
968 else
969 elog(ERROR, "option \"%s\" not recognized",
970 defel->defname);
973 if (dconnlimit)
974 connlimit = intVal(dconnlimit->arg);
977 * Get the old tuple. We don't need a lock on the database per se,
978 * because we're not going to do anything that would mess up incoming
979 * connections.
981 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
982 ScanKeyInit(&scankey,
983 Anum_pg_database_datname,
984 BTEqualStrategyNumber, F_NAMEEQ,
985 NameGetDatum(stmt->dbname));
986 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
987 SnapshotNow, 1, &scankey);
988 tuple = systable_getnext(scan);
989 if (!HeapTupleIsValid(tuple))
990 ereport(ERROR,
991 (errcode(ERRCODE_UNDEFINED_DATABASE),
992 errmsg("database \"%s\" does not exist", stmt->dbname)));
994 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
995 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
996 stmt->dbname);
999 * Build an updated tuple, perusing the information just obtained
1001 MemSet(new_record, 0, sizeof(new_record));
1002 MemSet(new_record_nulls, ' ', sizeof(new_record_nulls));
1003 MemSet(new_record_repl, ' ', sizeof(new_record_repl));
1005 if (dconnlimit)
1007 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(connlimit);
1008 new_record_repl[Anum_pg_database_datconnlimit - 1] = 'r';
1011 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), new_record,
1012 new_record_nulls, new_record_repl);
1013 simple_heap_update(rel, &tuple->t_self, newtuple);
1015 /* Update indexes */
1016 CatalogUpdateIndexes(rel, newtuple);
1018 systable_endscan(scan);
1020 /* Close pg_database, but keep lock till commit */
1021 heap_close(rel, NoLock);
1024 * We don't bother updating the flat file since the existing options for
1025 * ALTER DATABASE don't affect it.
1031 * ALTER DATABASE name SET ...
1033 void
1034 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
1036 char *valuestr;
1037 HeapTuple tuple,
1038 newtuple;
1039 Relation rel;
1040 ScanKeyData scankey;
1041 SysScanDesc scan;
1042 Datum repl_val[Natts_pg_database];
1043 char repl_null[Natts_pg_database];
1044 char repl_repl[Natts_pg_database];
1046 valuestr = ExtractSetVariableArgs(stmt->setstmt);
1049 * Get the old tuple. We don't need a lock on the database per se,
1050 * because we're not going to do anything that would mess up incoming
1051 * connections.
1053 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1054 ScanKeyInit(&scankey,
1055 Anum_pg_database_datname,
1056 BTEqualStrategyNumber, F_NAMEEQ,
1057 NameGetDatum(stmt->dbname));
1058 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1059 SnapshotNow, 1, &scankey);
1060 tuple = systable_getnext(scan);
1061 if (!HeapTupleIsValid(tuple))
1062 ereport(ERROR,
1063 (errcode(ERRCODE_UNDEFINED_DATABASE),
1064 errmsg("database \"%s\" does not exist", stmt->dbname)));
1066 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1067 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1068 stmt->dbname);
1070 memset(repl_repl, ' ', sizeof(repl_repl));
1071 repl_repl[Anum_pg_database_datconfig - 1] = 'r';
1073 if (stmt->setstmt->kind == VAR_RESET_ALL)
1075 /* RESET ALL, so just set datconfig to null */
1076 repl_null[Anum_pg_database_datconfig - 1] = 'n';
1077 repl_val[Anum_pg_database_datconfig - 1] = (Datum) 0;
1079 else
1081 Datum datum;
1082 bool isnull;
1083 ArrayType *a;
1085 repl_null[Anum_pg_database_datconfig - 1] = ' ';
1087 /* Extract old value of datconfig */
1088 datum = heap_getattr(tuple, Anum_pg_database_datconfig,
1089 RelationGetDescr(rel), &isnull);
1090 a = isnull ? NULL : DatumGetArrayTypeP(datum);
1092 /* Update (valuestr is NULL in RESET cases) */
1093 if (valuestr)
1094 a = GUCArrayAdd(a, stmt->setstmt->name, valuestr);
1095 else
1096 a = GUCArrayDelete(a, stmt->setstmt->name);
1098 if (a)
1099 repl_val[Anum_pg_database_datconfig - 1] = PointerGetDatum(a);
1100 else
1101 repl_null[Anum_pg_database_datconfig - 1] = 'n';
1104 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel),
1105 repl_val, repl_null, repl_repl);
1106 simple_heap_update(rel, &tuple->t_self, newtuple);
1108 /* Update indexes */
1109 CatalogUpdateIndexes(rel, newtuple);
1111 systable_endscan(scan);
1113 /* Close pg_database, but keep lock till commit */
1114 heap_close(rel, NoLock);
1117 * We don't bother updating the flat file since ALTER DATABASE SET doesn't
1118 * affect it.
1124 * ALTER DATABASE name OWNER TO newowner
1126 void
1127 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
1129 HeapTuple tuple;
1130 Relation rel;
1131 ScanKeyData scankey;
1132 SysScanDesc scan;
1133 Form_pg_database datForm;
1136 * Get the old tuple. We don't need a lock on the database per se,
1137 * because we're not going to do anything that would mess up incoming
1138 * connections.
1140 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1141 ScanKeyInit(&scankey,
1142 Anum_pg_database_datname,
1143 BTEqualStrategyNumber, F_NAMEEQ,
1144 NameGetDatum(dbname));
1145 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1146 SnapshotNow, 1, &scankey);
1147 tuple = systable_getnext(scan);
1148 if (!HeapTupleIsValid(tuple))
1149 ereport(ERROR,
1150 (errcode(ERRCODE_UNDEFINED_DATABASE),
1151 errmsg("database \"%s\" does not exist", dbname)));
1153 datForm = (Form_pg_database) GETSTRUCT(tuple);
1156 * If the new owner is the same as the existing owner, consider the
1157 * command to have succeeded. This is to be consistent with other
1158 * objects.
1160 if (datForm->datdba != newOwnerId)
1162 Datum repl_val[Natts_pg_database];
1163 char repl_null[Natts_pg_database];
1164 char repl_repl[Natts_pg_database];
1165 Acl *newAcl;
1166 Datum aclDatum;
1167 bool isNull;
1168 HeapTuple newtuple;
1170 /* Otherwise, must be owner of the existing object */
1171 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1172 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1173 dbname);
1175 /* Must be able to become new owner */
1176 check_is_member_of_role(GetUserId(), newOwnerId);
1179 * must have createdb rights
1181 * NOTE: This is different from other alter-owner checks in that the
1182 * current user is checked for createdb privileges instead of the
1183 * destination owner. This is consistent with the CREATE case for
1184 * databases. Because superusers will always have this right, we need
1185 * no special case for them.
1187 if (!have_createdb_privilege())
1188 ereport(ERROR,
1189 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1190 errmsg("permission denied to change owner of database")));
1192 memset(repl_null, ' ', sizeof(repl_null));
1193 memset(repl_repl, ' ', sizeof(repl_repl));
1195 repl_repl[Anum_pg_database_datdba - 1] = 'r';
1196 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1199 * Determine the modified ACL for the new owner. This is only
1200 * necessary when the ACL is non-null.
1202 aclDatum = heap_getattr(tuple,
1203 Anum_pg_database_datacl,
1204 RelationGetDescr(rel),
1205 &isNull);
1206 if (!isNull)
1208 newAcl = aclnewowner(DatumGetAclP(aclDatum),
1209 datForm->datdba, newOwnerId);
1210 repl_repl[Anum_pg_database_datacl - 1] = 'r';
1211 repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1214 newtuple = heap_modifytuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1215 simple_heap_update(rel, &newtuple->t_self, newtuple);
1216 CatalogUpdateIndexes(rel, newtuple);
1218 heap_freetuple(newtuple);
1220 /* Update owner dependency reference */
1221 changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1222 newOwnerId);
1225 systable_endscan(scan);
1227 /* Close pg_database, but keep lock till commit */
1228 heap_close(rel, NoLock);
1231 * We don't bother updating the flat file since ALTER DATABASE OWNER
1232 * doesn't affect it.
1238 * Helper functions
1242 * Look up info about the database named "name". If the database exists,
1243 * obtain the specified lock type on it, fill in any of the remaining
1244 * parameters that aren't NULL, and return TRUE. If no such database,
1245 * return FALSE.
1247 static bool
1248 get_db_info(const char *name, LOCKMODE lockmode,
1249 Oid *dbIdP, Oid *ownerIdP,
1250 int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1251 Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1252 Oid *dbTablespace, char **dbCollate, char **dbCtype)
1254 bool result = false;
1255 Relation relation;
1257 AssertArg(name);
1259 /* Caller may wish to grab a better lock on pg_database beforehand... */
1260 relation = heap_open(DatabaseRelationId, AccessShareLock);
1263 * Loop covers the rare case where the database is renamed before we can
1264 * lock it. We try again just in case we can find a new one of the same
1265 * name.
1267 for (;;)
1269 ScanKeyData scanKey;
1270 SysScanDesc scan;
1271 HeapTuple tuple;
1272 Oid dbOid;
1275 * there's no syscache for database-indexed-by-name, so must do it the
1276 * hard way
1278 ScanKeyInit(&scanKey,
1279 Anum_pg_database_datname,
1280 BTEqualStrategyNumber, F_NAMEEQ,
1281 NameGetDatum(name));
1283 scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1284 SnapshotNow, 1, &scanKey);
1286 tuple = systable_getnext(scan);
1288 if (!HeapTupleIsValid(tuple))
1290 /* definitely no database of that name */
1291 systable_endscan(scan);
1292 break;
1295 dbOid = HeapTupleGetOid(tuple);
1297 systable_endscan(scan);
1300 * Now that we have a database OID, we can try to lock the DB.
1302 if (lockmode != NoLock)
1303 LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1306 * And now, re-fetch the tuple by OID. If it's still there and still
1307 * the same name, we win; else, drop the lock and loop back to try
1308 * again.
1310 tuple = SearchSysCache(DATABASEOID,
1311 ObjectIdGetDatum(dbOid),
1312 0, 0, 0);
1313 if (HeapTupleIsValid(tuple))
1315 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1317 if (strcmp(name, NameStr(dbform->datname)) == 0)
1319 /* oid of the database */
1320 if (dbIdP)
1321 *dbIdP = dbOid;
1322 /* oid of the owner */
1323 if (ownerIdP)
1324 *ownerIdP = dbform->datdba;
1325 /* character encoding */
1326 if (encodingP)
1327 *encodingP = dbform->encoding;
1328 /* allowed as template? */
1329 if (dbIsTemplateP)
1330 *dbIsTemplateP = dbform->datistemplate;
1331 /* allowing connections? */
1332 if (dbAllowConnP)
1333 *dbAllowConnP = dbform->datallowconn;
1334 /* last system OID used in database */
1335 if (dbLastSysOidP)
1336 *dbLastSysOidP = dbform->datlastsysoid;
1337 /* limit of frozen XIDs */
1338 if (dbFrozenXidP)
1339 *dbFrozenXidP = dbform->datfrozenxid;
1340 /* default tablespace for this database */
1341 if (dbTablespace)
1342 *dbTablespace = dbform->dattablespace;
1343 /* default locale settings for this database */
1344 if (dbCollate)
1345 *dbCollate = pstrdup(NameStr(dbform->datcollate));
1346 if (dbCtype)
1347 *dbCtype = pstrdup(NameStr(dbform->datctype));
1348 ReleaseSysCache(tuple);
1349 result = true;
1350 break;
1352 /* can only get here if it was just renamed */
1353 ReleaseSysCache(tuple);
1356 if (lockmode != NoLock)
1357 UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1360 heap_close(relation, AccessShareLock);
1362 return result;
1365 /* Check if current user has createdb privileges */
1366 static bool
1367 have_createdb_privilege(void)
1369 bool result = false;
1370 HeapTuple utup;
1372 /* Superusers can always do everything */
1373 if (superuser())
1374 return true;
1376 utup = SearchSysCache(AUTHOID,
1377 ObjectIdGetDatum(GetUserId()),
1378 0, 0, 0);
1379 if (HeapTupleIsValid(utup))
1381 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1382 ReleaseSysCache(utup);
1384 return result;
1388 * Remove tablespace directories
1390 * We don't know what tablespaces db_id is using, so iterate through all
1391 * tablespaces removing <tablespace>/db_id
1393 static void
1394 remove_dbtablespaces(Oid db_id)
1396 Relation rel;
1397 HeapScanDesc scan;
1398 HeapTuple tuple;
1400 rel = heap_open(TableSpaceRelationId, AccessShareLock);
1401 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1402 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1404 Oid dsttablespace = HeapTupleGetOid(tuple);
1405 char *dstpath;
1406 struct stat st;
1408 /* Don't mess with the global tablespace */
1409 if (dsttablespace == GLOBALTABLESPACE_OID)
1410 continue;
1412 dstpath = GetDatabasePath(db_id, dsttablespace);
1414 if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1416 /* Assume we can ignore it */
1417 pfree(dstpath);
1418 continue;
1421 if (!rmtree(dstpath, true))
1422 ereport(WARNING,
1423 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1424 dstpath)));
1426 /* Record the filesystem change in XLOG */
1428 xl_dbase_drop_rec xlrec;
1429 XLogRecData rdata[1];
1431 xlrec.db_id = db_id;
1432 xlrec.tablespace_id = dsttablespace;
1434 rdata[0].data = (char *) &xlrec;
1435 rdata[0].len = sizeof(xl_dbase_drop_rec);
1436 rdata[0].buffer = InvalidBuffer;
1437 rdata[0].next = NULL;
1439 (void) XLogInsert(RM_DBASE_ID, XLOG_DBASE_DROP, rdata);
1442 pfree(dstpath);
1445 heap_endscan(scan);
1446 heap_close(rel, AccessShareLock);
1450 * Check for existing files that conflict with a proposed new DB OID;
1451 * return TRUE if there are any
1453 * If there were a subdirectory in any tablespace matching the proposed new
1454 * OID, we'd get a create failure due to the duplicate name ... and then we'd
1455 * try to remove that already-existing subdirectory during the cleanup in
1456 * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
1457 * instead we make this extra check before settling on the OID of the new
1458 * database. This exactly parallels what GetNewRelFileNode() does for table
1459 * relfilenode values.
1461 static bool
1462 check_db_file_conflict(Oid db_id)
1464 bool result = false;
1465 Relation rel;
1466 HeapScanDesc scan;
1467 HeapTuple tuple;
1469 rel = heap_open(TableSpaceRelationId, AccessShareLock);
1470 scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
1471 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1473 Oid dsttablespace = HeapTupleGetOid(tuple);
1474 char *dstpath;
1475 struct stat st;
1477 /* Don't mess with the global tablespace */
1478 if (dsttablespace == GLOBALTABLESPACE_OID)
1479 continue;
1481 dstpath = GetDatabasePath(db_id, dsttablespace);
1483 if (lstat(dstpath, &st) == 0)
1485 /* Found a conflicting file (or directory, whatever) */
1486 pfree(dstpath);
1487 result = true;
1488 break;
1491 pfree(dstpath);
1494 heap_endscan(scan);
1495 heap_close(rel, AccessShareLock);
1496 return result;
1500 * Issue a suitable errdetail message for a busy database
1502 static int
1503 errdetail_busy_db(int notherbackends, int npreparedxacts)
1506 * We don't worry about singular versus plural here, since the English
1507 * rules for that don't translate very well. But we can at least avoid
1508 * the case of zero items.
1510 if (notherbackends > 0 && npreparedxacts > 0)
1511 errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
1512 notherbackends, npreparedxacts);
1513 else if (notherbackends > 0)
1514 errdetail("There are %d other session(s) using the database.",
1515 notherbackends);
1516 else
1517 errdetail("There are %d prepared transaction(s) using the database.",
1518 npreparedxacts);
1519 return 0; /* just to keep ereport macro happy */
1523 * get_database_oid - given a database name, look up the OID
1525 * Returns InvalidOid if database name not found.
1528 get_database_oid(const char *dbname)
1530 Relation pg_database;
1531 ScanKeyData entry[1];
1532 SysScanDesc scan;
1533 HeapTuple dbtuple;
1534 Oid oid;
1537 * There's no syscache for pg_database indexed by name, so we must look
1538 * the hard way.
1540 pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1541 ScanKeyInit(&entry[0],
1542 Anum_pg_database_datname,
1543 BTEqualStrategyNumber, F_NAMEEQ,
1544 CStringGetDatum(dbname));
1545 scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1546 SnapshotNow, 1, entry);
1548 dbtuple = systable_getnext(scan);
1550 /* We assume that there can be at most one matching tuple */
1551 if (HeapTupleIsValid(dbtuple))
1552 oid = HeapTupleGetOid(dbtuple);
1553 else
1554 oid = InvalidOid;
1556 systable_endscan(scan);
1557 heap_close(pg_database, AccessShareLock);
1559 return oid;
1564 * get_database_name - given a database OID, look up the name
1566 * Returns a palloc'd string, or NULL if no such database.
1568 char *
1569 get_database_name(Oid dbid)
1571 HeapTuple dbtuple;
1572 char *result;
1574 dbtuple = SearchSysCache(DATABASEOID,
1575 ObjectIdGetDatum(dbid),
1576 0, 0, 0);
1577 if (HeapTupleIsValid(dbtuple))
1579 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
1580 ReleaseSysCache(dbtuple);
1582 else
1583 result = NULL;
1585 return result;
1589 * DATABASE resource manager's routines
1591 void
1592 dbase_redo(XLogRecPtr lsn, XLogRecord *record)
1594 uint8 info = record->xl_info & ~XLR_INFO_MASK;
1596 if (info == XLOG_DBASE_CREATE)
1598 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
1599 char *src_path;
1600 char *dst_path;
1601 struct stat st;
1603 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
1604 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1607 * Our theory for replaying a CREATE is to forcibly drop the target
1608 * subdirectory if present, then re-copy the source data. This may be
1609 * more work than needed, but it is simple to implement.
1611 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
1613 if (!rmtree(dst_path, true))
1614 ereport(WARNING,
1615 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1616 dst_path)));
1620 * Force dirty buffers out to disk, to ensure source database is
1621 * up-to-date for the copy.
1623 FlushDatabaseBuffers(xlrec->src_db_id);
1626 * Copy this subdirectory to the new location
1628 * We don't need to copy subdirectories
1630 copydir(src_path, dst_path, false);
1632 else if (info == XLOG_DBASE_DROP)
1634 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
1635 char *dst_path;
1637 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
1639 /* Drop pages for this database that are in the shared buffer cache */
1640 DropDatabaseBuffers(xlrec->db_id);
1642 /* Also, clean out any fsync requests that might be pending in md.c */
1643 ForgetDatabaseFsyncRequests(xlrec->db_id);
1645 /* Clean out the xlog relcache too */
1646 XLogDropDatabase(xlrec->db_id);
1648 /* And remove the physical files */
1649 if (!rmtree(dst_path, true))
1650 ereport(WARNING,
1651 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1652 dst_path)));
1654 else
1655 elog(PANIC, "dbase_redo: unknown op code %u", info);
1658 void
1659 dbase_desc(StringInfo buf, uint8 xl_info, char *rec)
1661 uint8 info = xl_info & ~XLR_INFO_MASK;
1663 if (info == XLOG_DBASE_CREATE)
1665 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) rec;
1667 appendStringInfo(buf, "create db: copy dir %u/%u to %u/%u",
1668 xlrec->src_db_id, xlrec->src_tablespace_id,
1669 xlrec->db_id, xlrec->tablespace_id);
1671 else if (info == XLOG_DBASE_DROP)
1673 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) rec;
1675 appendStringInfo(buf, "drop db: dir %u/%u",
1676 xlrec->db_id, xlrec->tablespace_id);
1678 else
1679 appendStringInfo(buf, "UNKNOWN");