nbtree: fix read page recheck typo.
[pgsql.git] / src / backend / storage / lmgr / lmgr.c
blob094522acb414d86376360424f7121c6b7f9dc4b6
1 /*-------------------------------------------------------------------------
3 * lmgr.c
4 * POSTGRES lock manager code
6 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/backend/storage/lmgr/lmgr.c
13 *-------------------------------------------------------------------------
16 #include "postgres.h"
18 #include "access/subtrans.h"
19 #include "access/xact.h"
20 #include "catalog/catalog.h"
21 #include "commands/progress.h"
22 #include "miscadmin.h"
23 #include "pgstat.h"
24 #include "storage/lmgr.h"
25 #include "storage/proc.h"
26 #include "storage/procarray.h"
27 #include "storage/sinvaladt.h"
28 #include "utils/inval.h"
32 * Per-backend counter for generating speculative insertion tokens.
34 * This may wrap around, but that's OK as it's only used for the short
35 * duration between inserting a tuple and checking that there are no (unique)
36 * constraint violations. It's theoretically possible that a backend sees a
37 * tuple that was speculatively inserted by another backend, but before it has
38 * started waiting on the token, the other backend completes its insertion,
39 * and then performs 2^32 unrelated insertions. And after all that, the
40 * first backend finally calls SpeculativeInsertionLockAcquire(), with the
41 * intention of waiting for the first insertion to complete, but ends up
42 * waiting for the latest unrelated insertion instead. Even then, nothing
43 * particularly bad happens: in the worst case they deadlock, causing one of
44 * the transactions to abort.
46 static uint32 speculativeInsertionToken = 0;
50 * Struct to hold context info for transaction lock waits.
52 * 'oper' is the operation that needs to wait for the other transaction; 'rel'
53 * and 'ctid' specify the address of the tuple being waited for.
55 typedef struct XactLockTableWaitInfo
57 XLTW_Oper oper;
58 Relation rel;
59 ItemPointer ctid;
60 } XactLockTableWaitInfo;
62 static void XactLockTableWaitErrorCb(void *arg);
65 * RelationInitLockInfo
66 * Initializes the lock information in a relation descriptor.
68 * relcache.c must call this during creation of any reldesc.
70 void
71 RelationInitLockInfo(Relation relation)
73 Assert(RelationIsValid(relation));
74 Assert(OidIsValid(RelationGetRelid(relation)));
76 relation->rd_lockInfo.lockRelId.relId = RelationGetRelid(relation);
78 if (relation->rd_rel->relisshared)
79 relation->rd_lockInfo.lockRelId.dbId = InvalidOid;
80 else
81 relation->rd_lockInfo.lockRelId.dbId = MyDatabaseId;
85 * SetLocktagRelationOid
86 * Set up a locktag for a relation, given only relation OID
88 static inline void
89 SetLocktagRelationOid(LOCKTAG *tag, Oid relid)
91 Oid dbid;
93 if (IsSharedRelation(relid))
94 dbid = InvalidOid;
95 else
96 dbid = MyDatabaseId;
98 SET_LOCKTAG_RELATION(*tag, dbid, relid);
102 * LockRelationOid
104 * Lock a relation given only its OID. This should generally be used
105 * before attempting to open the relation's relcache entry.
107 void
108 LockRelationOid(Oid relid, LOCKMODE lockmode)
110 LOCKTAG tag;
111 LOCALLOCK *locallock;
112 LockAcquireResult res;
114 SetLocktagRelationOid(&tag, relid);
116 res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
119 * Now that we have the lock, check for invalidation messages, so that we
120 * will update or flush any stale relcache entry before we try to use it.
121 * RangeVarGetRelid() specifically relies on us for this. We can skip
122 * this in the not-uncommon case that we already had the same type of lock
123 * being requested, since then no one else could have modified the
124 * relcache entry in an undesirable way. (In the case where our own xact
125 * modifies the rel, the relcache update happens via
126 * CommandCounterIncrement, not here.)
128 * However, in corner cases where code acts on tables (usually catalogs)
129 * recursively, we might get here while still processing invalidation
130 * messages in some outer execution of this function or a sibling. The
131 * "cleared" status of the lock tells us whether we really are done
132 * absorbing relevant inval messages.
134 if (res != LOCKACQUIRE_ALREADY_CLEAR)
136 AcceptInvalidationMessages();
137 MarkLockClear(locallock);
142 * ConditionalLockRelationOid
144 * As above, but only lock if we can get the lock without blocking.
145 * Returns true iff the lock was acquired.
147 * NOTE: we do not currently need conditional versions of all the
148 * LockXXX routines in this file, but they could easily be added if needed.
150 bool
151 ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
153 LOCKTAG tag;
154 LOCALLOCK *locallock;
155 LockAcquireResult res;
157 SetLocktagRelationOid(&tag, relid);
159 res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
161 if (res == LOCKACQUIRE_NOT_AVAIL)
162 return false;
165 * Now that we have the lock, check for invalidation messages; see notes
166 * in LockRelationOid.
168 if (res != LOCKACQUIRE_ALREADY_CLEAR)
170 AcceptInvalidationMessages();
171 MarkLockClear(locallock);
174 return true;
178 * LockRelationId
180 * Lock, given a LockRelId. Same as LockRelationOid but take LockRelId as an
181 * input.
183 void
184 LockRelationId(LockRelId *relid, LOCKMODE lockmode)
186 LOCKTAG tag;
187 LOCALLOCK *locallock;
188 LockAcquireResult res;
190 SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
192 res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
195 * Now that we have the lock, check for invalidation messages; see notes
196 * in LockRelationOid.
198 if (res != LOCKACQUIRE_ALREADY_CLEAR)
200 AcceptInvalidationMessages();
201 MarkLockClear(locallock);
206 * UnlockRelationId
208 * Unlock, given a LockRelId. This is preferred over UnlockRelationOid
209 * for speed reasons.
211 void
212 UnlockRelationId(LockRelId *relid, LOCKMODE lockmode)
214 LOCKTAG tag;
216 SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
218 LockRelease(&tag, lockmode, false);
222 * UnlockRelationOid
224 * Unlock, given only a relation Oid. Use UnlockRelationId if you can.
226 void
227 UnlockRelationOid(Oid relid, LOCKMODE lockmode)
229 LOCKTAG tag;
231 SetLocktagRelationOid(&tag, relid);
233 LockRelease(&tag, lockmode, false);
237 * LockRelation
239 * This is a convenience routine for acquiring an additional lock on an
240 * already-open relation. Never try to do "relation_open(foo, NoLock)"
241 * and then lock with this.
243 void
244 LockRelation(Relation relation, LOCKMODE lockmode)
246 LOCKTAG tag;
247 LOCALLOCK *locallock;
248 LockAcquireResult res;
250 SET_LOCKTAG_RELATION(tag,
251 relation->rd_lockInfo.lockRelId.dbId,
252 relation->rd_lockInfo.lockRelId.relId);
254 res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
257 * Now that we have the lock, check for invalidation messages; see notes
258 * in LockRelationOid.
260 if (res != LOCKACQUIRE_ALREADY_CLEAR)
262 AcceptInvalidationMessages();
263 MarkLockClear(locallock);
268 * ConditionalLockRelation
270 * This is a convenience routine for acquiring an additional lock on an
271 * already-open relation. Never try to do "relation_open(foo, NoLock)"
272 * and then lock with this.
274 bool
275 ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
277 LOCKTAG tag;
278 LOCALLOCK *locallock;
279 LockAcquireResult res;
281 SET_LOCKTAG_RELATION(tag,
282 relation->rd_lockInfo.lockRelId.dbId,
283 relation->rd_lockInfo.lockRelId.relId);
285 res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
287 if (res == LOCKACQUIRE_NOT_AVAIL)
288 return false;
291 * Now that we have the lock, check for invalidation messages; see notes
292 * in LockRelationOid.
294 if (res != LOCKACQUIRE_ALREADY_CLEAR)
296 AcceptInvalidationMessages();
297 MarkLockClear(locallock);
300 return true;
304 * UnlockRelation
306 * This is a convenience routine for unlocking a relation without also
307 * closing it.
309 void
310 UnlockRelation(Relation relation, LOCKMODE lockmode)
312 LOCKTAG tag;
314 SET_LOCKTAG_RELATION(tag,
315 relation->rd_lockInfo.lockRelId.dbId,
316 relation->rd_lockInfo.lockRelId.relId);
318 LockRelease(&tag, lockmode, false);
322 * CheckRelationLockedByMe
324 * Returns true if current transaction holds a lock on 'relation' of mode
325 * 'lockmode'. If 'orstronger' is true, a stronger lockmode is also OK.
326 * ("Stronger" is defined as "numerically higher", which is a bit
327 * semantically dubious but is OK for the purposes we use this for.)
329 bool
330 CheckRelationLockedByMe(Relation relation, LOCKMODE lockmode, bool orstronger)
332 LOCKTAG tag;
334 SET_LOCKTAG_RELATION(tag,
335 relation->rd_lockInfo.lockRelId.dbId,
336 relation->rd_lockInfo.lockRelId.relId);
338 return LockHeldByMe(&tag, lockmode, orstronger);
342 * CheckRelationOidLockedByMe
344 * Like the above, but takes an OID as argument.
346 bool
347 CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, bool orstronger)
349 LOCKTAG tag;
351 SetLocktagRelationOid(&tag, relid);
353 return LockHeldByMe(&tag, lockmode, orstronger);
357 * LockHasWaitersRelation
359 * This is a function to check whether someone else is waiting for a
360 * lock which we are currently holding.
362 bool
363 LockHasWaitersRelation(Relation relation, LOCKMODE lockmode)
365 LOCKTAG tag;
367 SET_LOCKTAG_RELATION(tag,
368 relation->rd_lockInfo.lockRelId.dbId,
369 relation->rd_lockInfo.lockRelId.relId);
371 return LockHasWaiters(&tag, lockmode, false);
375 * LockRelationIdForSession
377 * This routine grabs a session-level lock on the target relation. The
378 * session lock persists across transaction boundaries. It will be removed
379 * when UnlockRelationIdForSession() is called, or if an ereport(ERROR) occurs,
380 * or if the backend exits.
382 * Note that one should also grab a transaction-level lock on the rel
383 * in any transaction that actually uses the rel, to ensure that the
384 * relcache entry is up to date.
386 void
387 LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
389 LOCKTAG tag;
391 SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
393 (void) LockAcquire(&tag, lockmode, true, false);
397 * UnlockRelationIdForSession
399 void
400 UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode)
402 LOCKTAG tag;
404 SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
406 LockRelease(&tag, lockmode, true);
410 * LockRelationForExtension
412 * This lock tag is used to interlock addition of pages to relations.
413 * We need such locking because bufmgr/smgr definition of P_NEW is not
414 * race-condition-proof.
416 * We assume the caller is already holding some type of regular lock on
417 * the relation, so no AcceptInvalidationMessages call is needed here.
419 void
420 LockRelationForExtension(Relation relation, LOCKMODE lockmode)
422 LOCKTAG tag;
424 SET_LOCKTAG_RELATION_EXTEND(tag,
425 relation->rd_lockInfo.lockRelId.dbId,
426 relation->rd_lockInfo.lockRelId.relId);
428 (void) LockAcquire(&tag, lockmode, false, false);
432 * ConditionalLockRelationForExtension
434 * As above, but only lock if we can get the lock without blocking.
435 * Returns true iff the lock was acquired.
437 bool
438 ConditionalLockRelationForExtension(Relation relation, LOCKMODE lockmode)
440 LOCKTAG tag;
442 SET_LOCKTAG_RELATION_EXTEND(tag,
443 relation->rd_lockInfo.lockRelId.dbId,
444 relation->rd_lockInfo.lockRelId.relId);
446 return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
450 * RelationExtensionLockWaiterCount
452 * Count the number of processes waiting for the given relation extension lock.
455 RelationExtensionLockWaiterCount(Relation relation)
457 LOCKTAG tag;
459 SET_LOCKTAG_RELATION_EXTEND(tag,
460 relation->rd_lockInfo.lockRelId.dbId,
461 relation->rd_lockInfo.lockRelId.relId);
463 return LockWaiterCount(&tag);
467 * UnlockRelationForExtension
469 void
470 UnlockRelationForExtension(Relation relation, LOCKMODE lockmode)
472 LOCKTAG tag;
474 SET_LOCKTAG_RELATION_EXTEND(tag,
475 relation->rd_lockInfo.lockRelId.dbId,
476 relation->rd_lockInfo.lockRelId.relId);
478 LockRelease(&tag, lockmode, false);
482 * LockDatabaseFrozenIds
484 * This allows one backend per database to execute vac_update_datfrozenxid().
486 void
487 LockDatabaseFrozenIds(LOCKMODE lockmode)
489 LOCKTAG tag;
491 SET_LOCKTAG_DATABASE_FROZEN_IDS(tag, MyDatabaseId);
493 (void) LockAcquire(&tag, lockmode, false, false);
497 * LockPage
499 * Obtain a page-level lock. This is currently used by some index access
500 * methods to lock individual index pages.
502 void
503 LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
505 LOCKTAG tag;
507 SET_LOCKTAG_PAGE(tag,
508 relation->rd_lockInfo.lockRelId.dbId,
509 relation->rd_lockInfo.lockRelId.relId,
510 blkno);
512 (void) LockAcquire(&tag, lockmode, false, false);
516 * ConditionalLockPage
518 * As above, but only lock if we can get the lock without blocking.
519 * Returns true iff the lock was acquired.
521 bool
522 ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
524 LOCKTAG tag;
526 SET_LOCKTAG_PAGE(tag,
527 relation->rd_lockInfo.lockRelId.dbId,
528 relation->rd_lockInfo.lockRelId.relId,
529 blkno);
531 return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
535 * UnlockPage
537 void
538 UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
540 LOCKTAG tag;
542 SET_LOCKTAG_PAGE(tag,
543 relation->rd_lockInfo.lockRelId.dbId,
544 relation->rd_lockInfo.lockRelId.relId,
545 blkno);
547 LockRelease(&tag, lockmode, false);
551 * LockTuple
553 * Obtain a tuple-level lock. This is used in a less-than-intuitive fashion
554 * because we can't afford to keep a separate lock in shared memory for every
555 * tuple. See heap_lock_tuple before using this!
557 void
558 LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
560 LOCKTAG tag;
562 SET_LOCKTAG_TUPLE(tag,
563 relation->rd_lockInfo.lockRelId.dbId,
564 relation->rd_lockInfo.lockRelId.relId,
565 ItemPointerGetBlockNumber(tid),
566 ItemPointerGetOffsetNumber(tid));
568 (void) LockAcquire(&tag, lockmode, false, false);
572 * ConditionalLockTuple
574 * As above, but only lock if we can get the lock without blocking.
575 * Returns true iff the lock was acquired.
577 bool
578 ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
580 LOCKTAG tag;
582 SET_LOCKTAG_TUPLE(tag,
583 relation->rd_lockInfo.lockRelId.dbId,
584 relation->rd_lockInfo.lockRelId.relId,
585 ItemPointerGetBlockNumber(tid),
586 ItemPointerGetOffsetNumber(tid));
588 return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
592 * UnlockTuple
594 void
595 UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
597 LOCKTAG tag;
599 SET_LOCKTAG_TUPLE(tag,
600 relation->rd_lockInfo.lockRelId.dbId,
601 relation->rd_lockInfo.lockRelId.relId,
602 ItemPointerGetBlockNumber(tid),
603 ItemPointerGetOffsetNumber(tid));
605 LockRelease(&tag, lockmode, false);
609 * XactLockTableInsert
611 * Insert a lock showing that the given transaction ID is running ---
612 * this is done when an XID is acquired by a transaction or subtransaction.
613 * The lock can then be used to wait for the transaction to finish.
615 void
616 XactLockTableInsert(TransactionId xid)
618 LOCKTAG tag;
620 SET_LOCKTAG_TRANSACTION(tag, xid);
622 (void) LockAcquire(&tag, ExclusiveLock, false, false);
626 * XactLockTableDelete
628 * Delete the lock showing that the given transaction ID is running.
629 * (This is never used for main transaction IDs; those locks are only
630 * released implicitly at transaction end. But we do use it for subtrans IDs.)
632 void
633 XactLockTableDelete(TransactionId xid)
635 LOCKTAG tag;
637 SET_LOCKTAG_TRANSACTION(tag, xid);
639 LockRelease(&tag, ExclusiveLock, false);
643 * XactLockTableWait
645 * Wait for the specified transaction to commit or abort. If an operation
646 * is specified, an error context callback is set up. If 'oper' is passed as
647 * None, no error context callback is set up.
649 * Note that this does the right thing for subtransactions: if we wait on a
650 * subtransaction, we will exit as soon as it aborts or its top parent commits.
651 * It takes some extra work to ensure this, because to save on shared memory
652 * the XID lock of a subtransaction is released when it ends, whether
653 * successfully or unsuccessfully. So we have to check if it's "still running"
654 * and if so wait for its parent.
656 void
657 XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
658 XLTW_Oper oper)
660 LOCKTAG tag;
661 XactLockTableWaitInfo info;
662 ErrorContextCallback callback;
663 bool first = true;
666 * If an operation is specified, set up our verbose error context
667 * callback.
669 if (oper != XLTW_None)
671 Assert(RelationIsValid(rel));
672 Assert(ItemPointerIsValid(ctid));
674 info.rel = rel;
675 info.ctid = ctid;
676 info.oper = oper;
678 callback.callback = XactLockTableWaitErrorCb;
679 callback.arg = &info;
680 callback.previous = error_context_stack;
681 error_context_stack = &callback;
684 for (;;)
686 Assert(TransactionIdIsValid(xid));
687 Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
689 SET_LOCKTAG_TRANSACTION(tag, xid);
691 (void) LockAcquire(&tag, ShareLock, false, false);
693 LockRelease(&tag, ShareLock, false);
695 if (!TransactionIdIsInProgress(xid))
696 break;
699 * If the Xid belonged to a subtransaction, then the lock would have
700 * gone away as soon as it was finished; for correct tuple visibility,
701 * the right action is to wait on its parent transaction to go away.
702 * But instead of going levels up one by one, we can just wait for the
703 * topmost transaction to finish with the same end result, which also
704 * incurs less locktable traffic.
706 * Some uses of this function don't involve tuple visibility -- such
707 * as when building snapshots for logical decoding. It is possible to
708 * see a transaction in ProcArray before it registers itself in the
709 * locktable. The topmost transaction in that case is the same xid,
710 * so we try again after a short sleep. (Don't sleep the first time
711 * through, to avoid slowing down the normal case.)
713 if (!first)
714 pg_usleep(1000L);
715 first = false;
716 xid = SubTransGetTopmostTransaction(xid);
719 if (oper != XLTW_None)
720 error_context_stack = callback.previous;
724 * ConditionalXactLockTableWait
726 * As above, but only lock if we can get the lock without blocking.
727 * Returns true if the lock was acquired.
729 bool
730 ConditionalXactLockTableWait(TransactionId xid)
732 LOCKTAG tag;
733 bool first = true;
735 for (;;)
737 Assert(TransactionIdIsValid(xid));
738 Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
740 SET_LOCKTAG_TRANSACTION(tag, xid);
742 if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)
743 return false;
745 LockRelease(&tag, ShareLock, false);
747 if (!TransactionIdIsInProgress(xid))
748 break;
750 /* See XactLockTableWait about this case */
751 if (!first)
752 pg_usleep(1000L);
753 first = false;
754 xid = SubTransGetTopmostTransaction(xid);
757 return true;
761 * SpeculativeInsertionLockAcquire
763 * Insert a lock showing that the given transaction ID is inserting a tuple,
764 * but hasn't yet decided whether it's going to keep it. The lock can then be
765 * used to wait for the decision to go ahead with the insertion, or aborting
766 * it.
768 * The token is used to distinguish multiple insertions by the same
769 * transaction. It is returned to caller.
771 uint32
772 SpeculativeInsertionLockAcquire(TransactionId xid)
774 LOCKTAG tag;
776 speculativeInsertionToken++;
779 * Check for wrap-around. Zero means no token is held, so don't use that.
781 if (speculativeInsertionToken == 0)
782 speculativeInsertionToken = 1;
784 SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
786 (void) LockAcquire(&tag, ExclusiveLock, false, false);
788 return speculativeInsertionToken;
792 * SpeculativeInsertionLockRelease
794 * Delete the lock showing that the given transaction is speculatively
795 * inserting a tuple.
797 void
798 SpeculativeInsertionLockRelease(TransactionId xid)
800 LOCKTAG tag;
802 SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, speculativeInsertionToken);
804 LockRelease(&tag, ExclusiveLock, false);
808 * SpeculativeInsertionWait
810 * Wait for the specified transaction to finish or abort the insertion of a
811 * tuple.
813 void
814 SpeculativeInsertionWait(TransactionId xid, uint32 token)
816 LOCKTAG tag;
818 SET_LOCKTAG_SPECULATIVE_INSERTION(tag, xid, token);
820 Assert(TransactionIdIsValid(xid));
821 Assert(token != 0);
823 (void) LockAcquire(&tag, ShareLock, false, false);
824 LockRelease(&tag, ShareLock, false);
828 * XactLockTableWaitErrorCb
829 * Error context callback for transaction lock waits.
831 static void
832 XactLockTableWaitErrorCb(void *arg)
834 XactLockTableWaitInfo *info = (XactLockTableWaitInfo *) arg;
837 * We would like to print schema name too, but that would require a
838 * syscache lookup.
840 if (info->oper != XLTW_None &&
841 ItemPointerIsValid(info->ctid) && RelationIsValid(info->rel))
843 const char *cxt;
845 switch (info->oper)
847 case XLTW_Update:
848 cxt = gettext_noop("while updating tuple (%u,%u) in relation \"%s\"");
849 break;
850 case XLTW_Delete:
851 cxt = gettext_noop("while deleting tuple (%u,%u) in relation \"%s\"");
852 break;
853 case XLTW_Lock:
854 cxt = gettext_noop("while locking tuple (%u,%u) in relation \"%s\"");
855 break;
856 case XLTW_LockUpdated:
857 cxt = gettext_noop("while locking updated version (%u,%u) of tuple in relation \"%s\"");
858 break;
859 case XLTW_InsertIndex:
860 cxt = gettext_noop("while inserting index tuple (%u,%u) in relation \"%s\"");
861 break;
862 case XLTW_InsertIndexUnique:
863 cxt = gettext_noop("while checking uniqueness of tuple (%u,%u) in relation \"%s\"");
864 break;
865 case XLTW_FetchUpdated:
866 cxt = gettext_noop("while rechecking updated tuple (%u,%u) in relation \"%s\"");
867 break;
868 case XLTW_RecheckExclusionConstr:
869 cxt = gettext_noop("while checking exclusion constraint on tuple (%u,%u) in relation \"%s\"");
870 break;
872 default:
873 return;
876 errcontext(cxt,
877 ItemPointerGetBlockNumber(info->ctid),
878 ItemPointerGetOffsetNumber(info->ctid),
879 RelationGetRelationName(info->rel));
884 * WaitForLockersMultiple
885 * Wait until no transaction holds locks that conflict with the given
886 * locktags at the given lockmode.
888 * To do this, obtain the current list of lockers, and wait on their VXIDs
889 * until they are finished.
891 * Note we don't try to acquire the locks on the given locktags, only the
892 * VXIDs and XIDs of their lock holders; if somebody grabs a conflicting lock
893 * on the objects after we obtained our initial list of lockers, we will not
894 * wait for them.
896 void
897 WaitForLockersMultiple(List *locktags, LOCKMODE lockmode, bool progress)
899 List *holders = NIL;
900 ListCell *lc;
901 int total = 0;
902 int done = 0;
904 /* Done if no locks to wait for */
905 if (locktags == NIL)
906 return;
908 /* Collect the transactions we need to wait on */
909 foreach(lc, locktags)
911 LOCKTAG *locktag = lfirst(lc);
912 int count;
914 holders = lappend(holders,
915 GetLockConflicts(locktag, lockmode,
916 progress ? &count : NULL));
917 if (progress)
918 total += count;
921 if (progress)
922 pgstat_progress_update_param(PROGRESS_WAITFOR_TOTAL, total);
925 * Note: GetLockConflicts() never reports our own xid, hence we need not
926 * check for that. Also, prepared xacts are reported and awaited.
929 /* Finally wait for each such transaction to complete */
930 foreach(lc, holders)
932 VirtualTransactionId *lockholders = lfirst(lc);
934 while (VirtualTransactionIdIsValid(*lockholders))
936 /* If requested, publish who we're going to wait for. */
937 if (progress)
939 PGPROC *holder = ProcNumberGetProc(lockholders->procNumber);
941 if (holder)
942 pgstat_progress_update_param(PROGRESS_WAITFOR_CURRENT_PID,
943 holder->pid);
945 VirtualXactLock(*lockholders, true);
946 lockholders++;
948 if (progress)
949 pgstat_progress_update_param(PROGRESS_WAITFOR_DONE, ++done);
952 if (progress)
954 const int index[] = {
955 PROGRESS_WAITFOR_TOTAL,
956 PROGRESS_WAITFOR_DONE,
957 PROGRESS_WAITFOR_CURRENT_PID
959 const int64 values[] = {
960 0, 0, 0
963 pgstat_progress_update_multi_param(3, index, values);
966 list_free_deep(holders);
970 * WaitForLockers
972 * Same as WaitForLockersMultiple, for a single lock tag.
974 void
975 WaitForLockers(LOCKTAG heaplocktag, LOCKMODE lockmode, bool progress)
977 List *l;
979 l = list_make1(&heaplocktag);
980 WaitForLockersMultiple(l, lockmode, progress);
981 list_free(l);
986 * LockDatabaseObject
988 * Obtain a lock on a general object of the current database. Don't use
989 * this for shared objects (such as tablespaces). It's unwise to apply it
990 * to relations, also, since a lock taken this way will NOT conflict with
991 * locks taken via LockRelation and friends.
993 void
994 LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
995 LOCKMODE lockmode)
997 LOCKTAG tag;
999 SET_LOCKTAG_OBJECT(tag,
1000 MyDatabaseId,
1001 classid,
1002 objid,
1003 objsubid);
1005 (void) LockAcquire(&tag, lockmode, false, false);
1007 /* Make sure syscaches are up-to-date with any changes we waited for */
1008 AcceptInvalidationMessages();
1012 * ConditionalLockDatabaseObject
1014 * As above, but only lock if we can get the lock without blocking.
1015 * Returns true iff the lock was acquired.
1017 bool
1018 ConditionalLockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
1019 LOCKMODE lockmode)
1021 LOCKTAG tag;
1022 LOCALLOCK *locallock;
1023 LockAcquireResult res;
1025 SET_LOCKTAG_OBJECT(tag,
1026 MyDatabaseId,
1027 classid,
1028 objid,
1029 objsubid);
1031 res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
1033 if (res == LOCKACQUIRE_NOT_AVAIL)
1034 return false;
1037 * Now that we have the lock, check for invalidation messages; see notes
1038 * in LockRelationOid.
1040 if (res != LOCKACQUIRE_ALREADY_CLEAR)
1042 AcceptInvalidationMessages();
1043 MarkLockClear(locallock);
1046 return true;
1050 * UnlockDatabaseObject
1052 void
1053 UnlockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
1054 LOCKMODE lockmode)
1056 LOCKTAG tag;
1058 SET_LOCKTAG_OBJECT(tag,
1059 MyDatabaseId,
1060 classid,
1061 objid,
1062 objsubid);
1064 LockRelease(&tag, lockmode, false);
1068 * LockSharedObject
1070 * Obtain a lock on a shared-across-databases object.
1072 void
1073 LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
1074 LOCKMODE lockmode)
1076 LOCKTAG tag;
1078 SET_LOCKTAG_OBJECT(tag,
1079 InvalidOid,
1080 classid,
1081 objid,
1082 objsubid);
1084 (void) LockAcquire(&tag, lockmode, false, false);
1086 /* Make sure syscaches are up-to-date with any changes we waited for */
1087 AcceptInvalidationMessages();
1091 * ConditionalLockSharedObject
1093 * As above, but only lock if we can get the lock without blocking.
1094 * Returns true iff the lock was acquired.
1096 bool
1097 ConditionalLockSharedObject(Oid classid, Oid objid, uint16 objsubid,
1098 LOCKMODE lockmode)
1100 LOCKTAG tag;
1101 LOCALLOCK *locallock;
1102 LockAcquireResult res;
1104 SET_LOCKTAG_OBJECT(tag,
1105 InvalidOid,
1106 classid,
1107 objid,
1108 objsubid);
1110 res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
1112 if (res == LOCKACQUIRE_NOT_AVAIL)
1113 return false;
1116 * Now that we have the lock, check for invalidation messages; see notes
1117 * in LockRelationOid.
1119 if (res != LOCKACQUIRE_ALREADY_CLEAR)
1121 AcceptInvalidationMessages();
1122 MarkLockClear(locallock);
1125 return true;
1129 * UnlockSharedObject
1131 void
1132 UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid,
1133 LOCKMODE lockmode)
1135 LOCKTAG tag;
1137 SET_LOCKTAG_OBJECT(tag,
1138 InvalidOid,
1139 classid,
1140 objid,
1141 objsubid);
1143 LockRelease(&tag, lockmode, false);
1147 * LockSharedObjectForSession
1149 * Obtain a session-level lock on a shared-across-databases object.
1150 * See LockRelationIdForSession for notes about session-level locks.
1152 void
1153 LockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
1154 LOCKMODE lockmode)
1156 LOCKTAG tag;
1158 SET_LOCKTAG_OBJECT(tag,
1159 InvalidOid,
1160 classid,
1161 objid,
1162 objsubid);
1164 (void) LockAcquire(&tag, lockmode, true, false);
1168 * UnlockSharedObjectForSession
1170 void
1171 UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
1172 LOCKMODE lockmode)
1174 LOCKTAG tag;
1176 SET_LOCKTAG_OBJECT(tag,
1177 InvalidOid,
1178 classid,
1179 objid,
1180 objsubid);
1182 LockRelease(&tag, lockmode, true);
1186 * LockApplyTransactionForSession
1188 * Obtain a session-level lock on a transaction being applied on a logical
1189 * replication subscriber. See LockRelationIdForSession for notes about
1190 * session-level locks.
1192 void
1193 LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid,
1194 LOCKMODE lockmode)
1196 LOCKTAG tag;
1198 SET_LOCKTAG_APPLY_TRANSACTION(tag,
1199 MyDatabaseId,
1200 suboid,
1201 xid,
1202 objid);
1204 (void) LockAcquire(&tag, lockmode, true, false);
1208 * UnlockApplyTransactionForSession
1210 void
1211 UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid,
1212 LOCKMODE lockmode)
1214 LOCKTAG tag;
1216 SET_LOCKTAG_APPLY_TRANSACTION(tag,
1217 MyDatabaseId,
1218 suboid,
1219 xid,
1220 objid);
1222 LockRelease(&tag, lockmode, true);
1226 * Append a description of a lockable object to buf.
1228 * Ideally we would print names for the numeric values, but that requires
1229 * getting locks on system tables, which might cause problems since this is
1230 * typically used to report deadlock situations.
1232 void
1233 DescribeLockTag(StringInfo buf, const LOCKTAG *tag)
1235 switch ((LockTagType) tag->locktag_type)
1237 case LOCKTAG_RELATION:
1238 appendStringInfo(buf,
1239 _("relation %u of database %u"),
1240 tag->locktag_field2,
1241 tag->locktag_field1);
1242 break;
1243 case LOCKTAG_RELATION_EXTEND:
1244 appendStringInfo(buf,
1245 _("extension of relation %u of database %u"),
1246 tag->locktag_field2,
1247 tag->locktag_field1);
1248 break;
1249 case LOCKTAG_DATABASE_FROZEN_IDS:
1250 appendStringInfo(buf,
1251 _("pg_database.datfrozenxid of database %u"),
1252 tag->locktag_field1);
1253 break;
1254 case LOCKTAG_PAGE:
1255 appendStringInfo(buf,
1256 _("page %u of relation %u of database %u"),
1257 tag->locktag_field3,
1258 tag->locktag_field2,
1259 tag->locktag_field1);
1260 break;
1261 case LOCKTAG_TUPLE:
1262 appendStringInfo(buf,
1263 _("tuple (%u,%u) of relation %u of database %u"),
1264 tag->locktag_field3,
1265 tag->locktag_field4,
1266 tag->locktag_field2,
1267 tag->locktag_field1);
1268 break;
1269 case LOCKTAG_TRANSACTION:
1270 appendStringInfo(buf,
1271 _("transaction %u"),
1272 tag->locktag_field1);
1273 break;
1274 case LOCKTAG_VIRTUALTRANSACTION:
1275 appendStringInfo(buf,
1276 _("virtual transaction %d/%u"),
1277 tag->locktag_field1,
1278 tag->locktag_field2);
1279 break;
1280 case LOCKTAG_SPECULATIVE_TOKEN:
1281 appendStringInfo(buf,
1282 _("speculative token %u of transaction %u"),
1283 tag->locktag_field2,
1284 tag->locktag_field1);
1285 break;
1286 case LOCKTAG_OBJECT:
1287 appendStringInfo(buf,
1288 _("object %u of class %u of database %u"),
1289 tag->locktag_field3,
1290 tag->locktag_field2,
1291 tag->locktag_field1);
1292 break;
1293 case LOCKTAG_USERLOCK:
1294 /* reserved for old contrib code, now on pgfoundry */
1295 appendStringInfo(buf,
1296 _("user lock [%u,%u,%u]"),
1297 tag->locktag_field1,
1298 tag->locktag_field2,
1299 tag->locktag_field3);
1300 break;
1301 case LOCKTAG_ADVISORY:
1302 appendStringInfo(buf,
1303 _("advisory lock [%u,%u,%u,%u]"),
1304 tag->locktag_field1,
1305 tag->locktag_field2,
1306 tag->locktag_field3,
1307 tag->locktag_field4);
1308 break;
1309 case LOCKTAG_APPLY_TRANSACTION:
1310 appendStringInfo(buf,
1311 _("remote transaction %u of subscription %u of database %u"),
1312 tag->locktag_field3,
1313 tag->locktag_field2,
1314 tag->locktag_field1);
1315 break;
1316 default:
1317 appendStringInfo(buf,
1318 _("unrecognized locktag type %d"),
1319 (int) tag->locktag_type);
1320 break;
1325 * GetLockNameFromTagType
1327 * Given locktag type, return the corresponding lock name.
1329 const char *
1330 GetLockNameFromTagType(uint16 locktag_type)
1332 if (locktag_type > LOCKTAG_LAST_TYPE)
1333 return "???";
1334 return LockTagTypeNames[locktag_type];