Make nbtree split REDO locking match original execution.
[pgsql.git] / src / backend / catalog / pg_enum.c
blob27e4100a6f4d8d0c598f9adc37bb15808f041965
1 /*-------------------------------------------------------------------------
3 * pg_enum.c
4 * routines to support manipulation of the pg_enum relation
6 * Copyright (c) 2006-2020, PostgreSQL Global Development Group
9 * IDENTIFICATION
10 * src/backend/catalog/pg_enum.c
12 *-------------------------------------------------------------------------
14 #include "postgres.h"
16 #include "access/genam.h"
17 #include "access/htup_details.h"
18 #include "access/table.h"
19 #include "access/xact.h"
20 #include "catalog/binary_upgrade.h"
21 #include "catalog/catalog.h"
22 #include "catalog/indexing.h"
23 #include "catalog/pg_enum.h"
24 #include "catalog/pg_type.h"
25 #include "miscadmin.h"
26 #include "nodes/value.h"
27 #include "storage/lmgr.h"
28 #include "utils/builtins.h"
29 #include "utils/catcache.h"
30 #include "utils/fmgroids.h"
31 #include "utils/hsearch.h"
32 #include "utils/memutils.h"
33 #include "utils/syscache.h"
35 /* Potentially set by pg_upgrade_support functions */
36 Oid binary_upgrade_next_pg_enum_oid = InvalidOid;
39 * Hash table of enum value OIDs created during the current transaction by
40 * AddEnumLabel. We disallow using these values until the transaction is
41 * committed; otherwise, they might get into indexes where we can't clean
42 * them up, and then if the transaction rolls back we have a broken index.
43 * (See comments for check_safe_enum_use() in enum.c.) Values created by
44 * EnumValuesCreate are *not* blacklisted; we assume those are created during
45 * CREATE TYPE, so they can't go away unless the enum type itself does.
47 static HTAB *enum_blacklist = NULL;
49 static void RenumberEnumType(Relation pg_enum, HeapTuple *existing, int nelems);
50 static int sort_order_cmp(const void *p1, const void *p2);
54 * EnumValuesCreate
55 * Create an entry in pg_enum for each of the supplied enum values.
57 * vals is a list of Value strings.
59 void
60 EnumValuesCreate(Oid enumTypeOid, List *vals)
62 Relation pg_enum;
63 NameData enumlabel;
64 Oid *oids;
65 int elemno,
66 num_elems;
67 Datum values[Natts_pg_enum];
68 bool nulls[Natts_pg_enum];
69 ListCell *lc;
70 HeapTuple tup;
72 num_elems = list_length(vals);
75 * We do not bother to check the list of values for duplicates --- if you
76 * have any, you'll get a less-than-friendly unique-index violation. It is
77 * probably not worth trying harder.
80 pg_enum = table_open(EnumRelationId, RowExclusiveLock);
83 * Allocate OIDs for the enum's members.
85 * While this method does not absolutely guarantee that we generate no
86 * duplicate OIDs (since we haven't entered each oid into the table before
87 * allocating the next), trouble could only occur if the OID counter wraps
88 * all the way around before we finish. Which seems unlikely.
90 oids = (Oid *) palloc(num_elems * sizeof(Oid));
92 for (elemno = 0; elemno < num_elems; elemno++)
95 * We assign even-numbered OIDs to all the new enum labels. This
96 * tells the comparison functions the OIDs are in the correct sort
97 * order and can be compared directly.
99 Oid new_oid;
103 new_oid = GetNewOidWithIndex(pg_enum, EnumOidIndexId,
104 Anum_pg_enum_oid);
105 } while (new_oid & 1);
106 oids[elemno] = new_oid;
109 /* sort them, just in case OID counter wrapped from high to low */
110 qsort(oids, num_elems, sizeof(Oid), oid_cmp);
112 /* and make the entries */
113 memset(nulls, false, sizeof(nulls));
115 elemno = 0;
116 foreach(lc, vals)
118 char *lab = strVal(lfirst(lc));
121 * labels are stored in a name field, for easier syscache lookup, so
122 * check the length to make sure it's within range.
124 if (strlen(lab) > (NAMEDATALEN - 1))
125 ereport(ERROR,
126 (errcode(ERRCODE_INVALID_NAME),
127 errmsg("invalid enum label \"%s\"", lab),
128 errdetail("Labels must be %d characters or less.",
129 NAMEDATALEN - 1)));
131 values[Anum_pg_enum_oid - 1] = ObjectIdGetDatum(oids[elemno]);
132 values[Anum_pg_enum_enumtypid - 1] = ObjectIdGetDatum(enumTypeOid);
133 values[Anum_pg_enum_enumsortorder - 1] = Float4GetDatum(elemno + 1);
134 namestrcpy(&enumlabel, lab);
135 values[Anum_pg_enum_enumlabel - 1] = NameGetDatum(&enumlabel);
137 tup = heap_form_tuple(RelationGetDescr(pg_enum), values, nulls);
139 CatalogTupleInsert(pg_enum, tup);
140 heap_freetuple(tup);
142 elemno++;
145 /* clean up */
146 pfree(oids);
147 table_close(pg_enum, RowExclusiveLock);
152 * EnumValuesDelete
153 * Remove all the pg_enum entries for the specified enum type.
155 void
156 EnumValuesDelete(Oid enumTypeOid)
158 Relation pg_enum;
159 ScanKeyData key[1];
160 SysScanDesc scan;
161 HeapTuple tup;
163 pg_enum = table_open(EnumRelationId, RowExclusiveLock);
165 ScanKeyInit(&key[0],
166 Anum_pg_enum_enumtypid,
167 BTEqualStrategyNumber, F_OIDEQ,
168 ObjectIdGetDatum(enumTypeOid));
170 scan = systable_beginscan(pg_enum, EnumTypIdLabelIndexId, true,
171 NULL, 1, key);
173 while (HeapTupleIsValid(tup = systable_getnext(scan)))
175 CatalogTupleDelete(pg_enum, &tup->t_self);
178 systable_endscan(scan);
180 table_close(pg_enum, RowExclusiveLock);
184 * Initialize the enum blacklist for this transaction.
186 static void
187 init_enum_blacklist(void)
189 HASHCTL hash_ctl;
191 memset(&hash_ctl, 0, sizeof(hash_ctl));
192 hash_ctl.keysize = sizeof(Oid);
193 hash_ctl.entrysize = sizeof(Oid);
194 hash_ctl.hcxt = TopTransactionContext;
195 enum_blacklist = hash_create("Enum value blacklist",
197 &hash_ctl,
198 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
202 * AddEnumLabel
203 * Add a new label to the enum set. By default it goes at
204 * the end, but the user can choose to place it before or
205 * after any existing set member.
207 void
208 AddEnumLabel(Oid enumTypeOid,
209 const char *newVal,
210 const char *neighbor,
211 bool newValIsAfter,
212 bool skipIfExists)
214 Relation pg_enum;
215 Oid newOid;
216 Datum values[Natts_pg_enum];
217 bool nulls[Natts_pg_enum];
218 NameData enumlabel;
219 HeapTuple enum_tup;
220 float4 newelemorder;
221 HeapTuple *existing;
222 CatCList *list;
223 int nelems;
224 int i;
226 /* check length of new label is ok */
227 if (strlen(newVal) > (NAMEDATALEN - 1))
228 ereport(ERROR,
229 (errcode(ERRCODE_INVALID_NAME),
230 errmsg("invalid enum label \"%s\"", newVal),
231 errdetail("Labels must be %d characters or less.",
232 NAMEDATALEN - 1)));
235 * Acquire a lock on the enum type, which we won't release until commit.
236 * This ensures that two backends aren't concurrently modifying the same
237 * enum type. Without that, we couldn't be sure to get a consistent view
238 * of the enum members via the syscache. Note that this does not block
239 * other backends from inspecting the type; see comments for
240 * RenumberEnumType.
242 LockDatabaseObject(TypeRelationId, enumTypeOid, 0, ExclusiveLock);
245 * Check if label is already in use. The unique index on pg_enum would
246 * catch this anyway, but we prefer a friendlier error message, and
247 * besides we need a check to support IF NOT EXISTS.
249 enum_tup = SearchSysCache2(ENUMTYPOIDNAME,
250 ObjectIdGetDatum(enumTypeOid),
251 CStringGetDatum(newVal));
252 if (HeapTupleIsValid(enum_tup))
254 ReleaseSysCache(enum_tup);
255 if (skipIfExists)
257 ereport(NOTICE,
258 (errcode(ERRCODE_DUPLICATE_OBJECT),
259 errmsg("enum label \"%s\" already exists, skipping",
260 newVal)));
261 return;
263 else
264 ereport(ERROR,
265 (errcode(ERRCODE_DUPLICATE_OBJECT),
266 errmsg("enum label \"%s\" already exists",
267 newVal)));
270 pg_enum = table_open(EnumRelationId, RowExclusiveLock);
272 /* If we have to renumber the existing members, we restart from here */
273 restart:
275 /* Get the list of existing members of the enum */
276 list = SearchSysCacheList1(ENUMTYPOIDNAME,
277 ObjectIdGetDatum(enumTypeOid));
278 nelems = list->n_members;
280 /* Sort the existing members by enumsortorder */
281 existing = (HeapTuple *) palloc(nelems * sizeof(HeapTuple));
282 for (i = 0; i < nelems; i++)
283 existing[i] = &(list->members[i]->tuple);
285 qsort(existing, nelems, sizeof(HeapTuple), sort_order_cmp);
287 if (neighbor == NULL)
290 * Put the new label at the end of the list. No change to existing
291 * tuples is required.
293 if (nelems > 0)
295 Form_pg_enum en = (Form_pg_enum) GETSTRUCT(existing[nelems - 1]);
297 newelemorder = en->enumsortorder + 1;
299 else
300 newelemorder = 1;
302 else
304 /* BEFORE or AFTER was specified */
305 int nbr_index;
306 int other_nbr_index;
307 Form_pg_enum nbr_en;
308 Form_pg_enum other_nbr_en;
310 /* Locate the neighbor element */
311 for (nbr_index = 0; nbr_index < nelems; nbr_index++)
313 Form_pg_enum en = (Form_pg_enum) GETSTRUCT(existing[nbr_index]);
315 if (strcmp(NameStr(en->enumlabel), neighbor) == 0)
316 break;
318 if (nbr_index >= nelems)
319 ereport(ERROR,
320 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
321 errmsg("\"%s\" is not an existing enum label",
322 neighbor)));
323 nbr_en = (Form_pg_enum) GETSTRUCT(existing[nbr_index]);
326 * Attempt to assign an appropriate enumsortorder value: one less than
327 * the smallest member, one more than the largest member, or halfway
328 * between two existing members.
330 * In the "halfway" case, because of the finite precision of float4,
331 * we might compute a value that's actually equal to one or the other
332 * of its neighbors. In that case we renumber the existing members
333 * and try again.
335 if (newValIsAfter)
336 other_nbr_index = nbr_index + 1;
337 else
338 other_nbr_index = nbr_index - 1;
340 if (other_nbr_index < 0)
341 newelemorder = nbr_en->enumsortorder - 1;
342 else if (other_nbr_index >= nelems)
343 newelemorder = nbr_en->enumsortorder + 1;
344 else
347 * The midpoint value computed here has to be rounded to float4
348 * precision, else our equality comparisons against the adjacent
349 * values are meaningless. The most portable way of forcing that
350 * to happen with non-C-standard-compliant compilers is to store
351 * it into a volatile variable.
353 volatile float4 midpoint;
355 other_nbr_en = (Form_pg_enum) GETSTRUCT(existing[other_nbr_index]);
356 midpoint = (nbr_en->enumsortorder +
357 other_nbr_en->enumsortorder) / 2;
359 if (midpoint == nbr_en->enumsortorder ||
360 midpoint == other_nbr_en->enumsortorder)
362 RenumberEnumType(pg_enum, existing, nelems);
363 /* Clean up and start over */
364 pfree(existing);
365 ReleaseCatCacheList(list);
366 goto restart;
369 newelemorder = midpoint;
373 /* Get a new OID for the new label */
374 if (IsBinaryUpgrade)
376 if (!OidIsValid(binary_upgrade_next_pg_enum_oid))
377 ereport(ERROR,
378 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
379 errmsg("pg_enum OID value not set when in binary upgrade mode")));
382 * Use binary-upgrade override for pg_enum.oid, if supplied. During
383 * binary upgrade, all pg_enum.oid's are set this way so they are
384 * guaranteed to be consistent.
386 if (neighbor != NULL)
387 ereport(ERROR,
388 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
389 errmsg("ALTER TYPE ADD BEFORE/AFTER is incompatible with binary upgrade")));
391 newOid = binary_upgrade_next_pg_enum_oid;
392 binary_upgrade_next_pg_enum_oid = InvalidOid;
394 else
397 * Normal case: we need to allocate a new Oid for the value.
399 * We want to give the new element an even-numbered Oid if it's safe,
400 * which is to say it compares correctly to all pre-existing even
401 * numbered Oids in the enum. Otherwise, we must give it an odd Oid.
403 for (;;)
405 bool sorts_ok;
407 /* Get a new OID (different from all existing pg_enum tuples) */
408 newOid = GetNewOidWithIndex(pg_enum, EnumOidIndexId,
409 Anum_pg_enum_oid);
412 * Detect whether it sorts correctly relative to existing
413 * even-numbered labels of the enum. We can ignore existing
414 * labels with odd Oids, since a comparison involving one of those
415 * will not take the fast path anyway.
417 sorts_ok = true;
418 for (i = 0; i < nelems; i++)
420 HeapTuple exists_tup = existing[i];
421 Form_pg_enum exists_en = (Form_pg_enum) GETSTRUCT(exists_tup);
422 Oid exists_oid = exists_en->oid;
424 if (exists_oid & 1)
425 continue; /* ignore odd Oids */
427 if (exists_en->enumsortorder < newelemorder)
429 /* should sort before */
430 if (exists_oid >= newOid)
432 sorts_ok = false;
433 break;
436 else
438 /* should sort after */
439 if (exists_oid <= newOid)
441 sorts_ok = false;
442 break;
447 if (sorts_ok)
449 /* If it's even and sorts OK, we're done. */
450 if ((newOid & 1) == 0)
451 break;
454 * If it's odd, and sorts OK, loop back to get another OID and
455 * try again. Probably, the next available even OID will sort
456 * correctly too, so it's worth trying.
459 else
462 * If it's odd, and does not sort correctly, we're done.
463 * (Probably, the next available even OID would sort
464 * incorrectly too, so no point in trying again.)
466 if (newOid & 1)
467 break;
470 * If it's even, and does not sort correctly, loop back to get
471 * another OID and try again. (We *must* reject this case.)
477 /* Done with info about existing members */
478 pfree(existing);
479 ReleaseCatCacheList(list);
481 /* Create the new pg_enum entry */
482 memset(nulls, false, sizeof(nulls));
483 values[Anum_pg_enum_oid - 1] = ObjectIdGetDatum(newOid);
484 values[Anum_pg_enum_enumtypid - 1] = ObjectIdGetDatum(enumTypeOid);
485 values[Anum_pg_enum_enumsortorder - 1] = Float4GetDatum(newelemorder);
486 namestrcpy(&enumlabel, newVal);
487 values[Anum_pg_enum_enumlabel - 1] = NameGetDatum(&enumlabel);
488 enum_tup = heap_form_tuple(RelationGetDescr(pg_enum), values, nulls);
489 CatalogTupleInsert(pg_enum, enum_tup);
490 heap_freetuple(enum_tup);
492 table_close(pg_enum, RowExclusiveLock);
494 /* Set up the blacklist hash if not already done in this transaction */
495 if (enum_blacklist == NULL)
496 init_enum_blacklist();
498 /* Add the new value to the blacklist */
499 (void) hash_search(enum_blacklist, &newOid, HASH_ENTER, NULL);
504 * RenameEnumLabel
505 * Rename a label in an enum set.
507 void
508 RenameEnumLabel(Oid enumTypeOid,
509 const char *oldVal,
510 const char *newVal)
512 Relation pg_enum;
513 HeapTuple enum_tup;
514 Form_pg_enum en;
515 CatCList *list;
516 int nelems;
517 HeapTuple old_tup;
518 bool found_new;
519 int i;
521 /* check length of new label is ok */
522 if (strlen(newVal) > (NAMEDATALEN - 1))
523 ereport(ERROR,
524 (errcode(ERRCODE_INVALID_NAME),
525 errmsg("invalid enum label \"%s\"", newVal),
526 errdetail("Labels must be %d characters or less.",
527 NAMEDATALEN - 1)));
530 * Acquire a lock on the enum type, which we won't release until commit.
531 * This ensures that two backends aren't concurrently modifying the same
532 * enum type. Since we are not changing the type's sort order, this is
533 * probably not really necessary, but there seems no reason not to take
534 * the lock to be sure.
536 LockDatabaseObject(TypeRelationId, enumTypeOid, 0, ExclusiveLock);
538 pg_enum = table_open(EnumRelationId, RowExclusiveLock);
540 /* Get the list of existing members of the enum */
541 list = SearchSysCacheList1(ENUMTYPOIDNAME,
542 ObjectIdGetDatum(enumTypeOid));
543 nelems = list->n_members;
546 * Locate the element to rename and check if the new label is already in
547 * use. (The unique index on pg_enum would catch that anyway, but we
548 * prefer a friendlier error message.)
550 old_tup = NULL;
551 found_new = false;
552 for (i = 0; i < nelems; i++)
554 enum_tup = &(list->members[i]->tuple);
555 en = (Form_pg_enum) GETSTRUCT(enum_tup);
556 if (strcmp(NameStr(en->enumlabel), oldVal) == 0)
557 old_tup = enum_tup;
558 if (strcmp(NameStr(en->enumlabel), newVal) == 0)
559 found_new = true;
561 if (!old_tup)
562 ereport(ERROR,
563 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
564 errmsg("\"%s\" is not an existing enum label",
565 oldVal)));
566 if (found_new)
567 ereport(ERROR,
568 (errcode(ERRCODE_DUPLICATE_OBJECT),
569 errmsg("enum label \"%s\" already exists",
570 newVal)));
572 /* OK, make a writable copy of old tuple */
573 enum_tup = heap_copytuple(old_tup);
574 en = (Form_pg_enum) GETSTRUCT(enum_tup);
576 ReleaseCatCacheList(list);
578 /* Update the pg_enum entry */
579 namestrcpy(&en->enumlabel, newVal);
580 CatalogTupleUpdate(pg_enum, &enum_tup->t_self, enum_tup);
581 heap_freetuple(enum_tup);
583 table_close(pg_enum, RowExclusiveLock);
588 * Test if the given enum value is on the blacklist
590 bool
591 EnumBlacklisted(Oid enum_id)
593 bool found;
595 /* If we've made no blacklist table, all values are safe */
596 if (enum_blacklist == NULL)
597 return false;
599 /* Else, is it in the table? */
600 (void) hash_search(enum_blacklist, &enum_id, HASH_FIND, &found);
601 return found;
606 * Clean up enum stuff after end of top-level transaction.
608 void
609 AtEOXact_Enum(void)
612 * Reset the blacklist table, as all our enum values are now committed.
613 * The memory will go away automatically when TopTransactionContext is
614 * freed; it's sufficient to clear our pointer.
616 enum_blacklist = NULL;
621 * RenumberEnumType
622 * Renumber existing enum elements to have sort positions 1..n.
624 * We avoid doing this unless absolutely necessary; in most installations
625 * it will never happen. The reason is that updating existing pg_enum
626 * entries creates hazards for other backends that are concurrently reading
627 * pg_enum. Although system catalog scans now use MVCC semantics, the
628 * syscache machinery might read different pg_enum entries under different
629 * snapshots, so some other backend might get confused about the proper
630 * ordering if a concurrent renumbering occurs.
632 * We therefore make the following choices:
634 * 1. Any code that is interested in the enumsortorder values MUST read
635 * all the relevant pg_enum entries with a single MVCC snapshot, or else
636 * acquire lock on the enum type to prevent concurrent execution of
637 * AddEnumLabel().
639 * 2. Code that is not examining enumsortorder can use a syscache
640 * (for example, enum_in and enum_out do so).
642 static void
643 RenumberEnumType(Relation pg_enum, HeapTuple *existing, int nelems)
645 int i;
648 * We should only need to increase existing elements' enumsortorders,
649 * never decrease them. Therefore, work from the end backwards, to avoid
650 * unwanted uniqueness violations.
652 for (i = nelems - 1; i >= 0; i--)
654 HeapTuple newtup;
655 Form_pg_enum en;
656 float4 newsortorder;
658 newtup = heap_copytuple(existing[i]);
659 en = (Form_pg_enum) GETSTRUCT(newtup);
661 newsortorder = i + 1;
662 if (en->enumsortorder != newsortorder)
664 en->enumsortorder = newsortorder;
666 CatalogTupleUpdate(pg_enum, &newtup->t_self, newtup);
669 heap_freetuple(newtup);
672 /* Make the updates visible */
673 CommandCounterIncrement();
677 /* qsort comparison function for tuples by sort order */
678 static int
679 sort_order_cmp(const void *p1, const void *p2)
681 HeapTuple v1 = *((const HeapTuple *) p1);
682 HeapTuple v2 = *((const HeapTuple *) p2);
683 Form_pg_enum en1 = (Form_pg_enum) GETSTRUCT(v1);
684 Form_pg_enum en2 = (Form_pg_enum) GETSTRUCT(v2);
686 if (en1->enumsortorder < en2->enumsortorder)
687 return -1;
688 else if (en1->enumsortorder > en2->enumsortorder)
689 return 1;
690 else
691 return 0;
694 Size
695 EstimateEnumBlacklistSpace(void)
697 size_t entries;
699 if (enum_blacklist)
700 entries = hash_get_num_entries(enum_blacklist);
701 else
702 entries = 0;
704 /* Add one for the terminator. */
705 return sizeof(Oid) * (entries + 1);
708 void
709 SerializeEnumBlacklist(void *space, Size size)
711 Oid *serialized = (Oid *) space;
714 * Make sure the hash table hasn't changed in size since the caller
715 * reserved the space.
717 Assert(size == EstimateEnumBlacklistSpace());
719 /* Write out all the values from the hash table, if there is one. */
720 if (enum_blacklist)
722 HASH_SEQ_STATUS status;
723 Oid *value;
725 hash_seq_init(&status, enum_blacklist);
726 while ((value = (Oid *) hash_seq_search(&status)))
727 *serialized++ = *value;
730 /* Write out the terminator. */
731 *serialized = InvalidOid;
734 * Make sure the amount of space we actually used matches what was
735 * estimated.
737 Assert((char *) (serialized + 1) == ((char *) space) + size);
740 void
741 RestoreEnumBlacklist(void *space)
743 Oid *serialized = (Oid *) space;
745 Assert(!enum_blacklist);
748 * As a special case, if the list is empty then don't even bother to
749 * create the hash table. This is the usual case, since enum alteration
750 * is expected to be rare.
752 if (!OidIsValid(*serialized))
753 return;
755 /* Read all the values into a new hash table. */
756 init_enum_blacklist();
759 hash_search(enum_blacklist, serialized++, HASH_ENTER, NULL);
760 } while (OidIsValid(*serialized));