1 /*-------------------------------------------------------------------------
4 * Two-phase commit support functions.
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/backend/access/transam/twophase.c
13 * Each global transaction is associated with a global transaction
14 * identifier (GID). The client assigns a GID to a postgres
15 * transaction with the PREPARE TRANSACTION command.
17 * We keep all active global transactions in a shared memory array.
18 * When the PREPARE TRANSACTION command is issued, the GID is
19 * reserved for the transaction in the array. This is done before
20 * a WAL entry is made, because the reservation checks for duplicate
21 * GIDs and aborts the transaction if there already is a global
22 * transaction in prepared state with the same GID.
24 * A global transaction (gxact) also has dummy PGPROC; this is what keeps
25 * the XID considered running by TransactionIdIsInProgress. It is also
26 * convenient as a PGPROC to hook the gxact's locks to.
28 * Information to recover prepared transactions in case of crash is
29 * now stored in WAL for the common case. In some cases there will be
30 * an extended period between preparing a GXACT and commit/abort, in
31 * which case we need to separately record prepared transaction data
32 * in permanent storage. This includes locking information, pending
33 * notifications etc. All that state information is written to the
34 * per-transaction state file in the pg_twophase directory.
35 * All prepared transactions will be written prior to shutdown.
37 * Life track of state data is following:
39 * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 * stores pointer to the start of the WAL record in
41 * gxact->prepare_start_lsn.
42 * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 * using prepare_start_lsn.
44 * * On checkpoint state data copied to files in pg_twophase directory and
46 * * If COMMIT happens after checkpoint then backend reads state data from
49 * During replay and replication, TwoPhaseState also holds information
50 * about active prepared transactions that haven't been moved to disk yet.
52 * Replay of twophase records happens by the following rules:
54 * * At the beginning of recovery, pg_twophase is scanned once, filling
55 * TwoPhaseState with entries marked with gxact->inredo and
56 * gxact->ondisk. Two-phase file data older than the XID horizon of
57 * the redo position are discarded.
58 * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 * gxact->inredo is set to true for such entries.
60 * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 * that have gxact->inredo set and are behind the redo_horizon. We
62 * save them to disk and then switch gxact->ondisk to true.
63 * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 * If gxact->ondisk is true, the corresponding entry from the disk
65 * is additionally deleted.
66 * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 * and PrescanPreparedTransactions() have been modified to go through
68 * gxact->inredo entries that have not made it to disk.
70 *-------------------------------------------------------------------------
79 #include "access/commit_ts.h"
80 #include "access/htup_details.h"
81 #include "access/subtrans.h"
82 #include "access/transam.h"
83 #include "access/twophase.h"
84 #include "access/twophase_rmgr.h"
85 #include "access/xact.h"
86 #include "access/xlog.h"
87 #include "access/xloginsert.h"
88 #include "access/xlogreader.h"
89 #include "access/xlogrecovery.h"
90 #include "access/xlogutils.h"
91 #include "catalog/pg_type.h"
92 #include "catalog/storage.h"
94 #include "miscadmin.h"
97 #include "replication/origin.h"
98 #include "replication/syncrep.h"
99 #include "storage/fd.h"
100 #include "storage/ipc.h"
101 #include "storage/md.h"
102 #include "storage/predicate.h"
103 #include "storage/proc.h"
104 #include "storage/procarray.h"
105 #include "utils/builtins.h"
106 #include "utils/memutils.h"
107 #include "utils/timestamp.h"
110 * Directory where Two-phase commit files reside within PGDATA
112 #define TWOPHASE_DIR "pg_twophase"
114 /* GUC variable, can't be changed after startup */
115 int max_prepared_xacts
= 0;
118 * This struct describes one global transaction that is in prepared state
119 * or attempting to become prepared.
121 * The lifecycle of a global transaction is:
123 * 1. After checking that the requested GID is not in use, set up an entry in
124 * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
125 * and mark it as locked by my backend.
127 * 2. After successfully completing prepare, set valid = true and enter the
128 * referenced PGPROC into the global ProcArray.
130 * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
131 * valid and not locked, then mark the entry as locked by storing my current
132 * proc number into locking_backend. This prevents concurrent attempts to
133 * commit or rollback the same prepared xact.
135 * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
136 * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
139 * Note that if the preparing transaction fails between steps 1 and 2, the
140 * entry must be removed so that the GID and the GlobalTransaction struct
141 * can be reused. See AtAbort_Twophase().
143 * typedef struct GlobalTransactionData *GlobalTransaction appears in
147 typedef struct GlobalTransactionData
149 GlobalTransaction next
; /* list link for free list */
150 int pgprocno
; /* ID of associated dummy PGPROC */
151 TimestampTz prepared_at
; /* time of preparation */
154 * Note that we need to keep track of two LSNs for each GXACT. We keep
155 * track of the start LSN because this is the address we must use to read
156 * state data back from WAL when committing a prepared GXACT. We keep
157 * track of the end LSN because that is the LSN we need to wait for prior
160 XLogRecPtr prepare_start_lsn
; /* XLOG offset of prepare record start */
161 XLogRecPtr prepare_end_lsn
; /* XLOG offset of prepare record end */
162 TransactionId xid
; /* The GXACT id */
164 Oid owner
; /* ID of user that executed the xact */
165 ProcNumber locking_backend
; /* backend currently working on the xact */
166 bool valid
; /* true if PGPROC entry is in proc array */
167 bool ondisk
; /* true if prepare state file is on disk */
168 bool inredo
; /* true if entry was added via xlog_redo */
169 char gid
[GIDSIZE
]; /* The GID assigned to the prepared xact */
170 } GlobalTransactionData
;
173 * Two Phase Commit shared state. Access to this struct is protected
174 * by TwoPhaseStateLock.
176 typedef struct TwoPhaseStateData
178 /* Head of linked list of free GlobalTransactionData structs */
179 GlobalTransaction freeGXacts
;
181 /* Number of valid prepXacts entries. */
184 /* There are max_prepared_xacts items in this array */
185 GlobalTransaction prepXacts
[FLEXIBLE_ARRAY_MEMBER
];
188 static TwoPhaseStateData
*TwoPhaseState
;
191 * Global transaction entry currently locked by us, if any. Note that any
192 * access to the entry pointed to by this variable must be protected by
193 * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
194 * (since it's just local memory).
196 static GlobalTransaction MyLockedGxact
= NULL
;
198 static bool twophaseExitRegistered
= false;
200 static void RecordTransactionCommitPrepared(TransactionId xid
,
202 TransactionId
*children
,
204 RelFileLocator
*rels
,
206 xl_xact_stats_item
*stats
,
208 SharedInvalidationMessage
*invalmsgs
,
211 static void RecordTransactionAbortPrepared(TransactionId xid
,
213 TransactionId
*children
,
215 RelFileLocator
*rels
,
217 xl_xact_stats_item
*stats
,
219 static void ProcessRecords(char *bufptr
, TransactionId xid
,
220 const TwoPhaseCallback callbacks
[]);
221 static void RemoveGXact(GlobalTransaction gxact
);
223 static void XlogReadTwoPhaseData(XLogRecPtr lsn
, char **buf
, int *len
);
224 static char *ProcessTwoPhaseBuffer(TransactionId xid
,
225 XLogRecPtr prepare_start_lsn
,
226 bool fromdisk
, bool setParent
, bool setNextXid
);
227 static void MarkAsPreparingGuts(GlobalTransaction gxact
, TransactionId xid
,
228 const char *gid
, TimestampTz prepared_at
, Oid owner
,
230 static void RemoveTwoPhaseFile(TransactionId xid
, bool giveWarning
);
231 static void RecreateTwoPhaseFile(TransactionId xid
, void *content
, int len
);
234 * Initialization of shared memory
237 TwoPhaseShmemSize(void)
241 /* Need the fixed struct, the array of pointers, and the GTD structs */
242 size
= offsetof(TwoPhaseStateData
, prepXacts
);
243 size
= add_size(size
, mul_size(max_prepared_xacts
,
244 sizeof(GlobalTransaction
)));
245 size
= MAXALIGN(size
);
246 size
= add_size(size
, mul_size(max_prepared_xacts
,
247 sizeof(GlobalTransactionData
)));
253 TwoPhaseShmemInit(void)
257 TwoPhaseState
= ShmemInitStruct("Prepared Transaction Table",
260 if (!IsUnderPostmaster
)
262 GlobalTransaction gxacts
;
266 TwoPhaseState
->freeGXacts
= NULL
;
267 TwoPhaseState
->numPrepXacts
= 0;
270 * Initialize the linked list of free GlobalTransactionData structs
272 gxacts
= (GlobalTransaction
)
273 ((char *) TwoPhaseState
+
274 MAXALIGN(offsetof(TwoPhaseStateData
, prepXacts
) +
275 sizeof(GlobalTransaction
) * max_prepared_xacts
));
276 for (i
= 0; i
< max_prepared_xacts
; i
++)
278 /* insert into linked list */
279 gxacts
[i
].next
= TwoPhaseState
->freeGXacts
;
280 TwoPhaseState
->freeGXacts
= &gxacts
[i
];
282 /* associate it with a PGPROC assigned by InitProcGlobal */
283 gxacts
[i
].pgprocno
= GetNumberFromPGProc(&PreparedXactProcs
[i
]);
291 * Exit hook to unlock the global transaction entry we're working on.
294 AtProcExit_Twophase(int code
, Datum arg
)
296 /* same logic as abort */
301 * Abort hook to unlock the global transaction entry we're working on.
304 AtAbort_Twophase(void)
306 if (MyLockedGxact
== NULL
)
310 * What to do with the locked global transaction entry? If we were in the
311 * process of preparing the transaction, but haven't written the WAL
312 * record and state file yet, the transaction must not be considered as
313 * prepared. Likewise, if we are in the process of finishing an
314 * already-prepared transaction, and fail after having already written the
315 * 2nd phase commit or rollback record to the WAL, the transaction should
316 * not be considered as prepared anymore. In those cases, just remove the
317 * entry from shared memory.
319 * Otherwise, the entry must be left in place so that the transaction can
320 * be finished later, so just unlock it.
322 * If we abort during prepare, after having written the WAL record, we
323 * might not have transferred all locks and other state to the prepared
324 * transaction yet. Likewise, if we abort during commit or rollback,
325 * after having written the WAL record, we might not have released all the
326 * resources held by the transaction yet. In those cases, the in-memory
327 * state can be wrong, but it's too late to back out.
329 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
330 if (!MyLockedGxact
->valid
)
331 RemoveGXact(MyLockedGxact
);
333 MyLockedGxact
->locking_backend
= INVALID_PROC_NUMBER
;
334 LWLockRelease(TwoPhaseStateLock
);
336 MyLockedGxact
= NULL
;
340 * This is called after we have finished transferring state to the prepared
344 PostPrepare_Twophase(void)
346 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
347 MyLockedGxact
->locking_backend
= INVALID_PROC_NUMBER
;
348 LWLockRelease(TwoPhaseStateLock
);
350 MyLockedGxact
= NULL
;
356 * Reserve the GID for the given transaction.
359 MarkAsPreparing(TransactionId xid
, const char *gid
,
360 TimestampTz prepared_at
, Oid owner
, Oid databaseid
)
362 GlobalTransaction gxact
;
365 if (strlen(gid
) >= GIDSIZE
)
367 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
368 errmsg("transaction identifier \"%s\" is too long",
371 /* fail immediately if feature is disabled */
372 if (max_prepared_xacts
== 0)
374 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
375 errmsg("prepared transactions are disabled"),
376 errhint("Set \"max_prepared_transactions\" to a nonzero value.")));
378 /* on first call, register the exit hook */
379 if (!twophaseExitRegistered
)
381 before_shmem_exit(AtProcExit_Twophase
, 0);
382 twophaseExitRegistered
= true;
385 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
387 /* Check for conflicting GID */
388 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
390 gxact
= TwoPhaseState
->prepXacts
[i
];
391 if (strcmp(gxact
->gid
, gid
) == 0)
394 (errcode(ERRCODE_DUPLICATE_OBJECT
),
395 errmsg("transaction identifier \"%s\" is already in use",
400 /* Get a free gxact from the freelist */
401 if (TwoPhaseState
->freeGXacts
== NULL
)
403 (errcode(ERRCODE_OUT_OF_MEMORY
),
404 errmsg("maximum number of prepared transactions reached"),
405 errhint("Increase \"max_prepared_transactions\" (currently %d).",
406 max_prepared_xacts
)));
407 gxact
= TwoPhaseState
->freeGXacts
;
408 TwoPhaseState
->freeGXacts
= gxact
->next
;
410 MarkAsPreparingGuts(gxact
, xid
, gid
, prepared_at
, owner
, databaseid
);
412 gxact
->ondisk
= false;
414 /* And insert it into the active array */
415 Assert(TwoPhaseState
->numPrepXacts
< max_prepared_xacts
);
416 TwoPhaseState
->prepXacts
[TwoPhaseState
->numPrepXacts
++] = gxact
;
418 LWLockRelease(TwoPhaseStateLock
);
424 * MarkAsPreparingGuts
426 * This uses a gxact struct and puts it into the active array.
427 * NOTE: this is also used when reloading a gxact after a crash; so avoid
428 * assuming that we can use very much backend context.
430 * Note: This function should be called with appropriate locks held.
433 MarkAsPreparingGuts(GlobalTransaction gxact
, TransactionId xid
, const char *gid
,
434 TimestampTz prepared_at
, Oid owner
, Oid databaseid
)
439 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock
, LW_EXCLUSIVE
));
441 Assert(gxact
!= NULL
);
442 proc
= GetPGProcByNumber(gxact
->pgprocno
);
444 /* Initialize the PGPROC entry */
445 MemSet(proc
, 0, sizeof(PGPROC
));
446 dlist_node_init(&proc
->links
);
447 proc
->waitStatus
= PROC_WAIT_STATUS_OK
;
448 if (LocalTransactionIdIsValid(MyProc
->vxid
.lxid
))
450 /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
451 proc
->vxid
.lxid
= MyProc
->vxid
.lxid
;
452 proc
->vxid
.procNumber
= MyProcNumber
;
456 Assert(AmStartupProcess() || !IsPostmasterEnvironment
);
457 /* GetLockConflicts() uses this to specify a wait on the XID */
458 proc
->vxid
.lxid
= xid
;
459 proc
->vxid
.procNumber
= INVALID_PROC_NUMBER
;
462 Assert(proc
->xmin
== InvalidTransactionId
);
463 proc
->delayChkptFlags
= 0;
464 proc
->statusFlags
= 0;
466 proc
->databaseId
= databaseid
;
467 proc
->roleId
= owner
;
468 proc
->tempNamespaceId
= InvalidOid
;
469 proc
->isRegularBackend
= false;
470 proc
->lwWaiting
= LW_WS_NOT_WAITING
;
471 proc
->lwWaitMode
= 0;
472 proc
->waitLock
= NULL
;
473 proc
->waitProcLock
= NULL
;
474 pg_atomic_init_u64(&proc
->waitStart
, 0);
475 for (i
= 0; i
< NUM_LOCK_PARTITIONS
; i
++)
476 dlist_init(&proc
->myProcLocks
[i
]);
477 /* subxid data must be filled later by GXactLoadSubxactData */
478 proc
->subxidStatus
.overflowed
= false;
479 proc
->subxidStatus
.count
= 0;
481 gxact
->prepared_at
= prepared_at
;
483 gxact
->owner
= owner
;
484 gxact
->locking_backend
= MyProcNumber
;
485 gxact
->valid
= false;
486 gxact
->inredo
= false;
487 strcpy(gxact
->gid
, gid
);
490 * Remember that we have this GlobalTransaction entry locked for us. If we
491 * abort after this, we must release it.
493 MyLockedGxact
= gxact
;
497 * GXactLoadSubxactData
499 * If the transaction being persisted had any subtransactions, this must
500 * be called before MarkAsPrepared() to load information into the dummy
504 GXactLoadSubxactData(GlobalTransaction gxact
, int nsubxacts
,
505 TransactionId
*children
)
507 PGPROC
*proc
= GetPGProcByNumber(gxact
->pgprocno
);
509 /* We need no extra lock since the GXACT isn't valid yet */
510 if (nsubxacts
> PGPROC_MAX_CACHED_SUBXIDS
)
512 proc
->subxidStatus
.overflowed
= true;
513 nsubxacts
= PGPROC_MAX_CACHED_SUBXIDS
;
517 memcpy(proc
->subxids
.xids
, children
,
518 nsubxacts
* sizeof(TransactionId
));
519 proc
->subxidStatus
.count
= nsubxacts
;
525 * Mark the GXACT as fully valid, and enter it into the global ProcArray.
527 * lock_held indicates whether caller already holds TwoPhaseStateLock.
530 MarkAsPrepared(GlobalTransaction gxact
, bool lock_held
)
532 /* Lock here may be overkill, but I'm not convinced of that ... */
534 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
535 Assert(!gxact
->valid
);
538 LWLockRelease(TwoPhaseStateLock
);
541 * Put it into the global ProcArray so TransactionIdIsInProgress considers
542 * the XID as still running.
544 ProcArrayAdd(GetPGProcByNumber(gxact
->pgprocno
));
549 * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
551 static GlobalTransaction
552 LockGXact(const char *gid
, Oid user
)
556 /* on first call, register the exit hook */
557 if (!twophaseExitRegistered
)
559 before_shmem_exit(AtProcExit_Twophase
, 0);
560 twophaseExitRegistered
= true;
563 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
565 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
567 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
568 PGPROC
*proc
= GetPGProcByNumber(gxact
->pgprocno
);
570 /* Ignore not-yet-valid GIDs */
573 if (strcmp(gxact
->gid
, gid
) != 0)
576 /* Found it, but has someone else got it locked? */
577 if (gxact
->locking_backend
!= INVALID_PROC_NUMBER
)
579 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
580 errmsg("prepared transaction with identifier \"%s\" is busy",
583 if (user
!= gxact
->owner
&& !superuser_arg(user
))
585 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE
),
586 errmsg("permission denied to finish prepared transaction"),
587 errhint("Must be superuser or the user that prepared the transaction.")));
590 * Note: it probably would be possible to allow committing from
591 * another database; but at the moment NOTIFY is known not to work and
592 * there may be some other issues as well. Hence disallow until
593 * someone gets motivated to make it work.
595 if (MyDatabaseId
!= proc
->databaseId
)
597 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
598 errmsg("prepared transaction belongs to another database"),
599 errhint("Connect to the database where the transaction was prepared to finish it.")));
601 /* OK for me to lock it */
602 gxact
->locking_backend
= MyProcNumber
;
603 MyLockedGxact
= gxact
;
605 LWLockRelease(TwoPhaseStateLock
);
610 LWLockRelease(TwoPhaseStateLock
);
613 (errcode(ERRCODE_UNDEFINED_OBJECT
),
614 errmsg("prepared transaction with identifier \"%s\" does not exist",
623 * Remove the prepared transaction from the shared memory array.
625 * NB: caller should have already removed it from ProcArray
628 RemoveGXact(GlobalTransaction gxact
)
632 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock
, LW_EXCLUSIVE
));
634 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
636 if (gxact
== TwoPhaseState
->prepXacts
[i
])
638 /* remove from the active array */
639 TwoPhaseState
->numPrepXacts
--;
640 TwoPhaseState
->prepXacts
[i
] = TwoPhaseState
->prepXacts
[TwoPhaseState
->numPrepXacts
];
642 /* and put it back in the freelist */
643 gxact
->next
= TwoPhaseState
->freeGXacts
;
644 TwoPhaseState
->freeGXacts
= gxact
;
650 elog(ERROR
, "failed to find %p in GlobalTransaction array", gxact
);
654 * Returns an array of all prepared transactions for the user-level
655 * function pg_prepared_xact.
657 * The returned array and all its elements are copies of internal data
658 * structures, to minimize the time we need to hold the TwoPhaseStateLock.
660 * WARNING -- we return even those transactions that are not fully prepared
661 * yet. The caller should filter them out if he doesn't want them.
663 * The returned array is palloc'd.
666 GetPreparedTransactionList(GlobalTransaction
*gxacts
)
668 GlobalTransaction array
;
672 LWLockAcquire(TwoPhaseStateLock
, LW_SHARED
);
674 if (TwoPhaseState
->numPrepXacts
== 0)
676 LWLockRelease(TwoPhaseStateLock
);
682 num
= TwoPhaseState
->numPrepXacts
;
683 array
= (GlobalTransaction
) palloc(sizeof(GlobalTransactionData
) * num
);
685 for (i
= 0; i
< num
; i
++)
686 memcpy(array
+ i
, TwoPhaseState
->prepXacts
[i
],
687 sizeof(GlobalTransactionData
));
689 LWLockRelease(TwoPhaseStateLock
);
695 /* Working status for pg_prepared_xact */
698 GlobalTransaction array
;
705 * Produce a view with one row per prepared transaction.
707 * This function is here so we don't have to export the
708 * GlobalTransactionData struct definition.
711 pg_prepared_xact(PG_FUNCTION_ARGS
)
713 FuncCallContext
*funcctx
;
714 Working_State
*status
;
716 if (SRF_IS_FIRSTCALL())
719 MemoryContext oldcontext
;
721 /* create a function context for cross-call persistence */
722 funcctx
= SRF_FIRSTCALL_INIT();
725 * Switch to memory context appropriate for multiple function calls
727 oldcontext
= MemoryContextSwitchTo(funcctx
->multi_call_memory_ctx
);
729 /* build tupdesc for result tuples */
730 /* this had better match pg_prepared_xacts view in system_views.sql */
731 tupdesc
= CreateTemplateTupleDesc(5);
732 TupleDescInitEntry(tupdesc
, (AttrNumber
) 1, "transaction",
734 TupleDescInitEntry(tupdesc
, (AttrNumber
) 2, "gid",
736 TupleDescInitEntry(tupdesc
, (AttrNumber
) 3, "prepared",
737 TIMESTAMPTZOID
, -1, 0);
738 TupleDescInitEntry(tupdesc
, (AttrNumber
) 4, "ownerid",
740 TupleDescInitEntry(tupdesc
, (AttrNumber
) 5, "dbid",
743 funcctx
->tuple_desc
= BlessTupleDesc(tupdesc
);
746 * Collect all the 2PC status information that we will format and send
747 * out as a result set.
749 status
= (Working_State
*) palloc(sizeof(Working_State
));
750 funcctx
->user_fctx
= status
;
752 status
->ngxacts
= GetPreparedTransactionList(&status
->array
);
755 MemoryContextSwitchTo(oldcontext
);
758 funcctx
= SRF_PERCALL_SETUP();
759 status
= (Working_State
*) funcctx
->user_fctx
;
761 while (status
->array
!= NULL
&& status
->currIdx
< status
->ngxacts
)
763 GlobalTransaction gxact
= &status
->array
[status
->currIdx
++];
764 PGPROC
*proc
= GetPGProcByNumber(gxact
->pgprocno
);
765 Datum values
[5] = {0};
774 * Form tuple with appropriate data.
777 values
[0] = TransactionIdGetDatum(proc
->xid
);
778 values
[1] = CStringGetTextDatum(gxact
->gid
);
779 values
[2] = TimestampTzGetDatum(gxact
->prepared_at
);
780 values
[3] = ObjectIdGetDatum(gxact
->owner
);
781 values
[4] = ObjectIdGetDatum(proc
->databaseId
);
783 tuple
= heap_form_tuple(funcctx
->tuple_desc
, values
, nulls
);
784 result
= HeapTupleGetDatum(tuple
);
785 SRF_RETURN_NEXT(funcctx
, result
);
788 SRF_RETURN_DONE(funcctx
);
793 * Get the GlobalTransaction struct for a prepared transaction
796 * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
797 * caller had better hold it.
799 static GlobalTransaction
800 TwoPhaseGetGXact(TransactionId xid
, bool lock_held
)
802 GlobalTransaction result
= NULL
;
805 static TransactionId cached_xid
= InvalidTransactionId
;
806 static GlobalTransaction cached_gxact
= NULL
;
808 Assert(!lock_held
|| LWLockHeldByMe(TwoPhaseStateLock
));
811 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
812 * repeatedly for the same XID. We can save work with a simple cache.
814 if (xid
== cached_xid
)
818 LWLockAcquire(TwoPhaseStateLock
, LW_SHARED
);
820 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
822 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
824 if (gxact
->xid
== xid
)
832 LWLockRelease(TwoPhaseStateLock
);
834 if (result
== NULL
) /* should not happen */
835 elog(ERROR
, "failed to find GlobalTransaction for xid %u", xid
);
838 cached_gxact
= result
;
844 * TwoPhaseGetXidByVirtualXID
845 * Lookup VXID among xacts prepared since last startup.
847 * (This won't find recovered xacts.) If more than one matches, return any
848 * and set "have_more" to true. To witness multiple matches, a single
849 * proc number must consume 2^32 LXIDs, with no intervening database restart.
852 TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid
,
856 TransactionId result
= InvalidTransactionId
;
858 Assert(VirtualTransactionIdIsValid(vxid
));
859 LWLockAcquire(TwoPhaseStateLock
, LW_SHARED
);
861 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
863 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
865 VirtualTransactionId proc_vxid
;
869 proc
= GetPGProcByNumber(gxact
->pgprocno
);
870 GET_VXID_FROM_PGPROC(proc_vxid
, *proc
);
871 if (VirtualTransactionIdEquals(vxid
, proc_vxid
))
874 * Startup process sets proc->vxid.procNumber to
875 * INVALID_PROC_NUMBER.
877 Assert(!gxact
->inredo
);
879 if (result
!= InvalidTransactionId
)
888 LWLockRelease(TwoPhaseStateLock
);
894 * TwoPhaseGetDummyProcNumber
895 * Get the dummy proc number for prepared transaction specified by XID
897 * Dummy proc numbers are similar to proc numbers of real backends. They
898 * start at MaxBackends, and are unique across all currently active real
899 * backends and prepared transactions. If lock_held is set to true,
900 * TwoPhaseStateLock will not be taken, so the caller had better hold it.
903 TwoPhaseGetDummyProcNumber(TransactionId xid
, bool lock_held
)
905 GlobalTransaction gxact
= TwoPhaseGetGXact(xid
, lock_held
);
907 return gxact
->pgprocno
;
911 * TwoPhaseGetDummyProc
912 * Get the PGPROC that represents a prepared transaction specified by XID
914 * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
915 * caller had better hold it.
918 TwoPhaseGetDummyProc(TransactionId xid
, bool lock_held
)
920 GlobalTransaction gxact
= TwoPhaseGetGXact(xid
, lock_held
);
922 return GetPGProcByNumber(gxact
->pgprocno
);
925 /************************************************************************/
926 /* State file support */
927 /************************************************************************/
930 * Compute the FullTransactionId for the given TransactionId.
932 * This is safe if the xid has not yet reached COMMIT PREPARED or ROLLBACK
933 * PREPARED. After those commands, concurrent vac_truncate_clog() may make
934 * the xid cease to qualify as allowable. XXX Not all callers limit their
937 static inline FullTransactionId
938 AdjustToFullTransactionId(TransactionId xid
)
940 Assert(TransactionIdIsValid(xid
));
941 return FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid
);
945 TwoPhaseFilePath(char *path
, TransactionId xid
)
947 FullTransactionId fxid
= AdjustToFullTransactionId(xid
);
949 return snprintf(path
, MAXPGPATH
, TWOPHASE_DIR
"/%08X%08X",
950 EpochFromFullTransactionId(fxid
),
951 XidFromFullTransactionId(fxid
));
955 * 2PC state file format:
957 * 1. TwoPhaseFileHeader
958 * 2. TransactionId[] (subtransactions)
959 * 3. RelFileLocator[] (files to be deleted at commit)
960 * 4. RelFileLocator[] (files to be deleted at abort)
961 * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
962 * 6. TwoPhaseRecordOnDisk
964 * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
965 * 9. checksum (CRC-32C)
967 * Each segment except the final checksum is MAXALIGN'd.
971 * Header for a 2PC state file
973 #define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
975 typedef xl_xact_prepare TwoPhaseFileHeader
;
978 * Header for each record in a state file
980 * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
981 * The rmgr data will be stored starting on a MAXALIGN boundary.
983 typedef struct TwoPhaseRecordOnDisk
985 uint32 len
; /* length of rmgr data */
986 TwoPhaseRmgrId rmid
; /* resource manager for this record */
987 uint16 info
; /* flag bits for use by rmgr */
988 } TwoPhaseRecordOnDisk
;
991 * During prepare, the state file is assembled in memory before writing it
992 * to WAL and the actual state file. We use a chain of StateFileChunk blocks
995 typedef struct StateFileChunk
999 struct StateFileChunk
*next
;
1002 static struct xllist
1004 StateFileChunk
*head
; /* first data block in the chain */
1005 StateFileChunk
*tail
; /* last block in chain */
1007 uint32 bytes_free
; /* free bytes left in tail block */
1008 uint32 total_len
; /* total data bytes in chain */
1013 * Append a block of data to records data structure.
1015 * NB: each block is padded to a MAXALIGN multiple. This must be
1016 * accounted for when the file is later read!
1018 * The data is copied, so the caller is free to modify it afterwards.
1021 save_state_data(const void *data
, uint32 len
)
1023 uint32 padlen
= MAXALIGN(len
);
1025 if (padlen
> records
.bytes_free
)
1027 records
.tail
->next
= palloc0(sizeof(StateFileChunk
));
1028 records
.tail
= records
.tail
->next
;
1029 records
.tail
->len
= 0;
1030 records
.tail
->next
= NULL
;
1031 records
.num_chunks
++;
1033 records
.bytes_free
= Max(padlen
, 512);
1034 records
.tail
->data
= palloc(records
.bytes_free
);
1037 memcpy(((char *) records
.tail
->data
) + records
.tail
->len
, data
, len
);
1038 records
.tail
->len
+= padlen
;
1039 records
.bytes_free
-= padlen
;
1040 records
.total_len
+= padlen
;
1044 * Start preparing a state file.
1046 * Initializes data structure and inserts the 2PC file header record.
1049 StartPrepare(GlobalTransaction gxact
)
1051 PGPROC
*proc
= GetPGProcByNumber(gxact
->pgprocno
);
1052 TransactionId xid
= gxact
->xid
;
1053 TwoPhaseFileHeader hdr
;
1054 TransactionId
*children
;
1055 RelFileLocator
*commitrels
;
1056 RelFileLocator
*abortrels
;
1057 xl_xact_stats_item
*abortstats
= NULL
;
1058 xl_xact_stats_item
*commitstats
= NULL
;
1059 SharedInvalidationMessage
*invalmsgs
;
1061 /* Initialize linked list */
1062 records
.head
= palloc0(sizeof(StateFileChunk
));
1063 records
.head
->len
= 0;
1064 records
.head
->next
= NULL
;
1066 records
.bytes_free
= Max(sizeof(TwoPhaseFileHeader
), 512);
1067 records
.head
->data
= palloc(records
.bytes_free
);
1069 records
.tail
= records
.head
;
1070 records
.num_chunks
= 1;
1072 records
.total_len
= 0;
1075 hdr
.magic
= TWOPHASE_MAGIC
;
1076 hdr
.total_len
= 0; /* EndPrepare will fill this in */
1078 hdr
.database
= proc
->databaseId
;
1079 hdr
.prepared_at
= gxact
->prepared_at
;
1080 hdr
.owner
= gxact
->owner
;
1081 hdr
.nsubxacts
= xactGetCommittedChildren(&children
);
1082 hdr
.ncommitrels
= smgrGetPendingDeletes(true, &commitrels
);
1083 hdr
.nabortrels
= smgrGetPendingDeletes(false, &abortrels
);
1085 pgstat_get_transactional_drops(true, &commitstats
);
1087 pgstat_get_transactional_drops(false, &abortstats
);
1088 hdr
.ninvalmsgs
= xactGetCommittedInvalidationMessages(&invalmsgs
,
1089 &hdr
.initfileinval
);
1090 hdr
.gidlen
= strlen(gxact
->gid
) + 1; /* Include '\0' */
1091 /* EndPrepare will fill the origin data, if necessary */
1092 hdr
.origin_lsn
= InvalidXLogRecPtr
;
1093 hdr
.origin_timestamp
= 0;
1095 save_state_data(&hdr
, sizeof(TwoPhaseFileHeader
));
1096 save_state_data(gxact
->gid
, hdr
.gidlen
);
1099 * Add the additional info about subxacts, deletable files and cache
1100 * invalidation messages.
1102 if (hdr
.nsubxacts
> 0)
1104 save_state_data(children
, hdr
.nsubxacts
* sizeof(TransactionId
));
1105 /* While we have the child-xact data, stuff it in the gxact too */
1106 GXactLoadSubxactData(gxact
, hdr
.nsubxacts
, children
);
1108 if (hdr
.ncommitrels
> 0)
1110 save_state_data(commitrels
, hdr
.ncommitrels
* sizeof(RelFileLocator
));
1113 if (hdr
.nabortrels
> 0)
1115 save_state_data(abortrels
, hdr
.nabortrels
* sizeof(RelFileLocator
));
1118 if (hdr
.ncommitstats
> 0)
1120 save_state_data(commitstats
,
1121 hdr
.ncommitstats
* sizeof(xl_xact_stats_item
));
1124 if (hdr
.nabortstats
> 0)
1126 save_state_data(abortstats
,
1127 hdr
.nabortstats
* sizeof(xl_xact_stats_item
));
1130 if (hdr
.ninvalmsgs
> 0)
1132 save_state_data(invalmsgs
,
1133 hdr
.ninvalmsgs
* sizeof(SharedInvalidationMessage
));
1139 * Finish preparing state data and writing it to WAL.
1142 EndPrepare(GlobalTransaction gxact
)
1144 TwoPhaseFileHeader
*hdr
;
1145 StateFileChunk
*record
;
1148 /* Add the end sentinel to the list of 2PC records */
1149 RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID
, 0,
1152 /* Go back and fill in total_len in the file header record */
1153 hdr
= (TwoPhaseFileHeader
*) records
.head
->data
;
1154 Assert(hdr
->magic
== TWOPHASE_MAGIC
);
1155 hdr
->total_len
= records
.total_len
+ sizeof(pg_crc32c
);
1157 replorigin
= (replorigin_session_origin
!= InvalidRepOriginId
&&
1158 replorigin_session_origin
!= DoNotReplicateId
);
1162 hdr
->origin_lsn
= replorigin_session_origin_lsn
;
1163 hdr
->origin_timestamp
= replorigin_session_origin_timestamp
;
1167 * If the data size exceeds MaxAllocSize, we won't be able to read it in
1168 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1169 * where we write data to file and then re-read at commit time.
1171 if (hdr
->total_len
> MaxAllocSize
)
1173 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED
),
1174 errmsg("two-phase state file maximum length exceeded")));
1177 * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1178 * cover us, so no need to calculate a separate CRC.
1180 * We have to set DELAY_CHKPT_START here, too; otherwise a checkpoint
1181 * starting immediately after the WAL record is inserted could complete
1182 * without fsync'ing our state file. (This is essentially the same kind
1183 * of race condition as the COMMIT-to-clog-write case that
1184 * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.)
1186 * We save the PREPARE record's location in the gxact for later use by
1187 * CheckPointTwoPhase.
1189 XLogEnsureRecordSpace(0, records
.num_chunks
);
1191 START_CRIT_SECTION();
1193 Assert((MyProc
->delayChkptFlags
& DELAY_CHKPT_START
) == 0);
1194 MyProc
->delayChkptFlags
|= DELAY_CHKPT_START
;
1197 for (record
= records
.head
; record
!= NULL
; record
= record
->next
)
1198 XLogRegisterData(record
->data
, record
->len
);
1200 XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN
);
1202 gxact
->prepare_end_lsn
= XLogInsert(RM_XACT_ID
, XLOG_XACT_PREPARE
);
1206 /* Move LSNs forward for this replication origin */
1207 replorigin_session_advance(replorigin_session_origin_lsn
,
1208 gxact
->prepare_end_lsn
);
1211 XLogFlush(gxact
->prepare_end_lsn
);
1213 /* If we crash now, we have prepared: WAL replay will fix things */
1215 /* Store record's start location to read that later on Commit */
1216 gxact
->prepare_start_lsn
= ProcLastRecPtr
;
1219 * Mark the prepared transaction as valid. As soon as xact.c marks MyProc
1220 * as not running our XID (which it will do immediately after this
1221 * function returns), others can commit/rollback the xact.
1223 * NB: a side effect of this is to make a dummy ProcArray entry for the
1224 * prepared XID. This must happen before we clear the XID from MyProc /
1225 * ProcGlobal->xids[], else there is a window where the XID is not running
1226 * according to TransactionIdIsInProgress, and onlookers would be entitled
1227 * to assume the xact crashed. Instead we have a window where the same
1228 * XID appears twice in ProcArray, which is OK.
1230 MarkAsPrepared(gxact
, false);
1233 * Now we can mark ourselves as out of the commit critical section: a
1234 * checkpoint starting after this will certainly see the gxact as a
1235 * candidate for fsyncing.
1237 MyProc
->delayChkptFlags
&= ~DELAY_CHKPT_START
;
1240 * Remember that we have this GlobalTransaction entry locked for us. If
1241 * we crash after this point, it's too late to abort, but we must unlock
1242 * it so that the prepared transaction can be committed or rolled back.
1244 MyLockedGxact
= gxact
;
1249 * Wait for synchronous replication, if required.
1251 * Note that at this stage we have marked the prepare, but still show as
1252 * running in the procarray (twice!) and continue to hold locks.
1254 SyncRepWaitForLSN(gxact
->prepare_end_lsn
, false);
1256 records
.tail
= records
.head
= NULL
;
1257 records
.num_chunks
= 0;
1261 * Register a 2PC record to be written to state file.
1264 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid
, uint16 info
,
1265 const void *data
, uint32 len
)
1267 TwoPhaseRecordOnDisk record
;
1272 save_state_data(&record
, sizeof(TwoPhaseRecordOnDisk
));
1274 save_state_data(data
, len
);
1279 * Read and validate the state file for xid.
1281 * If it looks OK (has a valid magic number and CRC), return the palloc'd
1282 * contents of the file, issuing an error when finding corrupted data. If
1283 * missing_ok is true, which indicates that missing files can be safely
1284 * ignored, then return NULL. This state can be reached when doing recovery.
1287 ReadTwoPhaseFile(TransactionId xid
, bool missing_ok
)
1289 char path
[MAXPGPATH
];
1291 TwoPhaseFileHeader
*hdr
;
1299 TwoPhaseFilePath(path
, xid
);
1301 fd
= OpenTransientFile(path
, O_RDONLY
| PG_BINARY
);
1304 if (missing_ok
&& errno
== ENOENT
)
1308 (errcode_for_file_access(),
1309 errmsg("could not open file \"%s\": %m", path
)));
1313 * Check file length. We can determine a lower bound pretty easily. We
1314 * set an upper bound to avoid palloc() failure on a corrupt file, though
1315 * we can't guarantee that we won't get an out of memory error anyway,
1316 * even on a valid file.
1318 if (fstat(fd
, &stat
))
1320 (errcode_for_file_access(),
1321 errmsg("could not stat file \"%s\": %m", path
)));
1323 if (stat
.st_size
< (MAXALIGN(sizeof(TwoPhaseFileHeader
)) +
1324 MAXALIGN(sizeof(TwoPhaseRecordOnDisk
)) +
1325 sizeof(pg_crc32c
)) ||
1326 stat
.st_size
> MaxAllocSize
)
1328 (errcode(ERRCODE_DATA_CORRUPTED
),
1329 errmsg_plural("incorrect size of file \"%s\": %lld byte",
1330 "incorrect size of file \"%s\": %lld bytes",
1331 (long long int) stat
.st_size
, path
,
1332 (long long int) stat
.st_size
)));
1334 crc_offset
= stat
.st_size
- sizeof(pg_crc32c
);
1335 if (crc_offset
!= MAXALIGN(crc_offset
))
1337 (errcode(ERRCODE_DATA_CORRUPTED
),
1338 errmsg("incorrect alignment of CRC offset for file \"%s\"",
1342 * OK, slurp in the file.
1344 buf
= (char *) palloc(stat
.st_size
);
1346 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ
);
1347 r
= read(fd
, buf
, stat
.st_size
);
1348 if (r
!= stat
.st_size
)
1352 (errcode_for_file_access(),
1353 errmsg("could not read file \"%s\": %m", path
)));
1356 (errmsg("could not read file \"%s\": read %d of %lld",
1357 path
, r
, (long long int) stat
.st_size
)));
1360 pgstat_report_wait_end();
1362 if (CloseTransientFile(fd
) != 0)
1364 (errcode_for_file_access(),
1365 errmsg("could not close file \"%s\": %m", path
)));
1367 hdr
= (TwoPhaseFileHeader
*) buf
;
1368 if (hdr
->magic
!= TWOPHASE_MAGIC
)
1370 (errcode(ERRCODE_DATA_CORRUPTED
),
1371 errmsg("invalid magic number stored in file \"%s\"",
1374 if (hdr
->total_len
!= stat
.st_size
)
1376 (errcode(ERRCODE_DATA_CORRUPTED
),
1377 errmsg("invalid size stored in file \"%s\"",
1380 INIT_CRC32C(calc_crc
);
1381 COMP_CRC32C(calc_crc
, buf
, crc_offset
);
1382 FIN_CRC32C(calc_crc
);
1384 file_crc
= *((pg_crc32c
*) (buf
+ crc_offset
));
1386 if (!EQ_CRC32C(calc_crc
, file_crc
))
1388 (errcode(ERRCODE_DATA_CORRUPTED
),
1389 errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1397 * Reads 2PC data from xlog. During checkpoint this data will be moved to
1398 * twophase files and ReadTwoPhaseFile should be used instead.
1400 * Note clearly that this function can access WAL during normal operation,
1401 * similarly to the way WALSender or Logical Decoding would do.
1404 XlogReadTwoPhaseData(XLogRecPtr lsn
, char **buf
, int *len
)
1407 XLogReaderState
*xlogreader
;
1410 xlogreader
= XLogReaderAllocate(wal_segment_size
, NULL
,
1411 XL_ROUTINE(.page_read
= &read_local_xlog_page
,
1412 .segment_open
= &wal_segment_open
,
1413 .segment_close
= &wal_segment_close
),
1417 (errcode(ERRCODE_OUT_OF_MEMORY
),
1418 errmsg("out of memory"),
1419 errdetail("Failed while allocating a WAL reading processor.")));
1421 XLogBeginRead(xlogreader
, lsn
);
1422 record
= XLogReadRecord(xlogreader
, &errormsg
);
1428 (errcode_for_file_access(),
1429 errmsg("could not read two-phase state from WAL at %X/%X: %s",
1430 LSN_FORMAT_ARGS(lsn
), errormsg
)));
1433 (errcode_for_file_access(),
1434 errmsg("could not read two-phase state from WAL at %X/%X",
1435 LSN_FORMAT_ARGS(lsn
))));
1438 if (XLogRecGetRmid(xlogreader
) != RM_XACT_ID
||
1439 (XLogRecGetInfo(xlogreader
) & XLOG_XACT_OPMASK
) != XLOG_XACT_PREPARE
)
1441 (errcode_for_file_access(),
1442 errmsg("expected two-phase state data is not present in WAL at %X/%X",
1443 LSN_FORMAT_ARGS(lsn
))));
1446 *len
= XLogRecGetDataLen(xlogreader
);
1448 *buf
= palloc(sizeof(char) * XLogRecGetDataLen(xlogreader
));
1449 memcpy(*buf
, XLogRecGetData(xlogreader
), sizeof(char) * XLogRecGetDataLen(xlogreader
));
1451 XLogReaderFree(xlogreader
);
1456 * Confirms an xid is prepared, during recovery
1459 StandbyTransactionIdIsPrepared(TransactionId xid
)
1462 TwoPhaseFileHeader
*hdr
;
1465 Assert(TransactionIdIsValid(xid
));
1467 if (max_prepared_xacts
<= 0)
1468 return false; /* nothing to do */
1470 /* Read and validate file */
1471 buf
= ReadTwoPhaseFile(xid
, true);
1475 /* Check header also */
1476 hdr
= (TwoPhaseFileHeader
*) buf
;
1477 result
= TransactionIdEquals(hdr
->xid
, xid
);
1484 * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1487 FinishPreparedTransaction(const char *gid
, bool isCommit
)
1489 GlobalTransaction gxact
;
1495 TwoPhaseFileHeader
*hdr
;
1496 TransactionId latestXid
;
1497 TransactionId
*children
;
1498 RelFileLocator
*commitrels
;
1499 RelFileLocator
*abortrels
;
1500 RelFileLocator
*delrels
;
1502 xl_xact_stats_item
*commitstats
;
1503 xl_xact_stats_item
*abortstats
;
1504 SharedInvalidationMessage
*invalmsgs
;
1507 * Validate the GID, and lock the GXACT to ensure that two backends do not
1508 * try to commit the same GID at once.
1510 gxact
= LockGXact(gid
, GetUserId());
1511 proc
= GetPGProcByNumber(gxact
->pgprocno
);
1515 * Read and validate 2PC state data. State data will typically be stored
1516 * in WAL files if the LSN is after the last checkpoint record, or moved
1517 * to disk if for some reason they have lived for a long time.
1520 buf
= ReadTwoPhaseFile(xid
, false);
1522 XlogReadTwoPhaseData(gxact
->prepare_start_lsn
, &buf
, NULL
);
1526 * Disassemble the header area
1528 hdr
= (TwoPhaseFileHeader
*) buf
;
1529 Assert(TransactionIdEquals(hdr
->xid
, xid
));
1530 bufptr
= buf
+ MAXALIGN(sizeof(TwoPhaseFileHeader
));
1531 bufptr
+= MAXALIGN(hdr
->gidlen
);
1532 children
= (TransactionId
*) bufptr
;
1533 bufptr
+= MAXALIGN(hdr
->nsubxacts
* sizeof(TransactionId
));
1534 commitrels
= (RelFileLocator
*) bufptr
;
1535 bufptr
+= MAXALIGN(hdr
->ncommitrels
* sizeof(RelFileLocator
));
1536 abortrels
= (RelFileLocator
*) bufptr
;
1537 bufptr
+= MAXALIGN(hdr
->nabortrels
* sizeof(RelFileLocator
));
1538 commitstats
= (xl_xact_stats_item
*) bufptr
;
1539 bufptr
+= MAXALIGN(hdr
->ncommitstats
* sizeof(xl_xact_stats_item
));
1540 abortstats
= (xl_xact_stats_item
*) bufptr
;
1541 bufptr
+= MAXALIGN(hdr
->nabortstats
* sizeof(xl_xact_stats_item
));
1542 invalmsgs
= (SharedInvalidationMessage
*) bufptr
;
1543 bufptr
+= MAXALIGN(hdr
->ninvalmsgs
* sizeof(SharedInvalidationMessage
));
1545 /* compute latestXid among all children */
1546 latestXid
= TransactionIdLatest(xid
, hdr
->nsubxacts
, children
);
1548 /* Prevent cancel/die interrupt while cleaning up */
1552 * The order of operations here is critical: make the XLOG entry for
1553 * commit or abort, then mark the transaction committed or aborted in
1554 * pg_xact, then remove its PGPROC from the global ProcArray (which means
1555 * TransactionIdIsInProgress will stop saying the prepared xact is in
1556 * progress), then run the post-commit or post-abort callbacks. The
1557 * callbacks will release the locks the transaction held.
1560 RecordTransactionCommitPrepared(xid
,
1561 hdr
->nsubxacts
, children
,
1562 hdr
->ncommitrels
, commitrels
,
1565 hdr
->ninvalmsgs
, invalmsgs
,
1566 hdr
->initfileinval
, gid
);
1568 RecordTransactionAbortPrepared(xid
,
1569 hdr
->nsubxacts
, children
,
1570 hdr
->nabortrels
, abortrels
,
1575 ProcArrayRemove(proc
, latestXid
);
1578 * In case we fail while running the callbacks, mark the gxact invalid so
1579 * no one else will try to commit/rollback, and so it will be recycled if
1580 * we fail after this point. It is still locked by our backend so it
1581 * won't go away yet.
1583 * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1585 gxact
->valid
= false;
1588 * We have to remove any files that were supposed to be dropped. For
1589 * consistency with the regular xact.c code paths, must do this before
1590 * releasing locks, so do it before running the callbacks.
1592 * NB: this code knows that we couldn't be dropping any temp rels ...
1596 delrels
= commitrels
;
1597 ndelrels
= hdr
->ncommitrels
;
1601 delrels
= abortrels
;
1602 ndelrels
= hdr
->nabortrels
;
1605 /* Make sure files supposed to be dropped are dropped */
1606 DropRelationFiles(delrels
, ndelrels
, false);
1609 pgstat_execute_transactional_drops(hdr
->ncommitstats
, commitstats
, false);
1611 pgstat_execute_transactional_drops(hdr
->nabortstats
, abortstats
, false);
1614 * Handle cache invalidation messages.
1616 * Relcache init file invalidation requires processing both before and
1617 * after we send the SI messages, only when committing. See
1622 if (hdr
->initfileinval
)
1623 RelationCacheInitFilePreInvalidate();
1624 SendSharedInvalidMessages(invalmsgs
, hdr
->ninvalmsgs
);
1625 if (hdr
->initfileinval
)
1626 RelationCacheInitFilePostInvalidate();
1630 * Acquire the two-phase lock. We want to work on the two-phase callbacks
1631 * while holding it to avoid potential conflicts with other transactions
1632 * attempting to use the same GID, so the lock is released once the shared
1633 * memory state is cleared.
1635 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
1637 /* And now do the callbacks */
1639 ProcessRecords(bufptr
, xid
, twophase_postcommit_callbacks
);
1641 ProcessRecords(bufptr
, xid
, twophase_postabort_callbacks
);
1643 PredicateLockTwoPhaseFinish(xid
, isCommit
);
1646 * Read this value while holding the two-phase lock, as the on-disk 2PC
1647 * file is physically removed after the lock is released.
1649 ondisk
= gxact
->ondisk
;
1651 /* Clear shared memory state */
1655 * Release the lock as all callbacks are called and shared memory cleanup
1658 LWLockRelease(TwoPhaseStateLock
);
1660 /* Count the prepared xact as committed or aborted */
1661 AtEOXact_PgStat(isCommit
, false);
1664 * And now we can clean up any files we may have left.
1667 RemoveTwoPhaseFile(xid
, true);
1669 MyLockedGxact
= NULL
;
1671 RESUME_INTERRUPTS();
1677 * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1680 ProcessRecords(char *bufptr
, TransactionId xid
,
1681 const TwoPhaseCallback callbacks
[])
1685 TwoPhaseRecordOnDisk
*record
= (TwoPhaseRecordOnDisk
*) bufptr
;
1687 Assert(record
->rmid
<= TWOPHASE_RM_MAX_ID
);
1688 if (record
->rmid
== TWOPHASE_RM_END_ID
)
1691 bufptr
+= MAXALIGN(sizeof(TwoPhaseRecordOnDisk
));
1693 if (callbacks
[record
->rmid
] != NULL
)
1694 callbacks
[record
->rmid
] (xid
, record
->info
, bufptr
, record
->len
);
1696 bufptr
+= MAXALIGN(record
->len
);
1701 * Remove the 2PC file for the specified XID.
1703 * If giveWarning is false, do not complain about file-not-present;
1704 * this is an expected case during WAL replay.
1707 RemoveTwoPhaseFile(TransactionId xid
, bool giveWarning
)
1709 char path
[MAXPGPATH
];
1711 TwoPhaseFilePath(path
, xid
);
1713 if (errno
!= ENOENT
|| giveWarning
)
1715 (errcode_for_file_access(),
1716 errmsg("could not remove file \"%s\": %m", path
)));
1720 * Recreates a state file. This is used in WAL replay and during
1721 * checkpoint creation.
1723 * Note: content and len don't include CRC.
1726 RecreateTwoPhaseFile(TransactionId xid
, void *content
, int len
)
1728 char path
[MAXPGPATH
];
1729 pg_crc32c statefile_crc
;
1733 INIT_CRC32C(statefile_crc
);
1734 COMP_CRC32C(statefile_crc
, content
, len
);
1735 FIN_CRC32C(statefile_crc
);
1737 TwoPhaseFilePath(path
, xid
);
1739 fd
= OpenTransientFile(path
,
1740 O_CREAT
| O_TRUNC
| O_WRONLY
| PG_BINARY
);
1743 (errcode_for_file_access(),
1744 errmsg("could not recreate file \"%s\": %m", path
)));
1746 /* Write content and CRC */
1748 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE
);
1749 if (write(fd
, content
, len
) != len
)
1751 /* if write didn't set errno, assume problem is no disk space */
1755 (errcode_for_file_access(),
1756 errmsg("could not write file \"%s\": %m", path
)));
1758 if (write(fd
, &statefile_crc
, sizeof(pg_crc32c
)) != sizeof(pg_crc32c
))
1760 /* if write didn't set errno, assume problem is no disk space */
1764 (errcode_for_file_access(),
1765 errmsg("could not write file \"%s\": %m", path
)));
1767 pgstat_report_wait_end();
1770 * We must fsync the file because the end-of-replay checkpoint will not do
1771 * so, there being no GXACT in shared memory yet to tell it to.
1773 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC
);
1774 if (pg_fsync(fd
) != 0)
1776 (errcode_for_file_access(),
1777 errmsg("could not fsync file \"%s\": %m", path
)));
1778 pgstat_report_wait_end();
1780 if (CloseTransientFile(fd
) != 0)
1782 (errcode_for_file_access(),
1783 errmsg("could not close file \"%s\": %m", path
)));
1787 * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1789 * We must fsync the state file of any GXACT that is valid or has been
1790 * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1791 * horizon. (If the gxact isn't valid yet, has not been generated in
1792 * redo, or has a later LSN, this checkpoint is not responsible for
1795 * This is deliberately run as late as possible in the checkpoint sequence,
1796 * because GXACTs ordinarily have short lifespans, and so it is quite
1797 * possible that GXACTs that were valid at checkpoint start will no longer
1798 * exist if we wait a little bit. With typical checkpoint settings this
1799 * will be about 3 minutes for an online checkpoint, so as a result we
1800 * expect that there will be no GXACTs that need to be copied to disk.
1802 * If a GXACT remains valid across multiple checkpoints, it will already
1803 * be on disk so we don't bother to repeat that write.
1806 CheckPointTwoPhase(XLogRecPtr redo_horizon
)
1809 int serialized_xacts
= 0;
1811 if (max_prepared_xacts
<= 0)
1812 return; /* nothing to do */
1814 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1817 * We are expecting there to be zero GXACTs that need to be copied to
1818 * disk, so we perform all I/O while holding TwoPhaseStateLock for
1819 * simplicity. This prevents any new xacts from preparing while this
1820 * occurs, which shouldn't be a problem since the presence of long-lived
1821 * prepared xacts indicates the transaction manager isn't active.
1823 * It's also possible to move I/O out of the lock, but on every error we
1824 * should check whether somebody committed our transaction in different
1825 * backend. Let's leave this optimization for future, if somebody will
1826 * spot that this place cause bottleneck.
1828 * Note that it isn't possible for there to be a GXACT with a
1829 * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1830 * because of the efforts with delayChkptFlags.
1832 LWLockAcquire(TwoPhaseStateLock
, LW_SHARED
);
1833 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
1836 * Note that we are using gxact not PGPROC so this works in recovery
1839 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
1841 if ((gxact
->valid
|| gxact
->inredo
) &&
1843 gxact
->prepare_end_lsn
<= redo_horizon
)
1848 XlogReadTwoPhaseData(gxact
->prepare_start_lsn
, &buf
, &len
);
1849 RecreateTwoPhaseFile(gxact
->xid
, buf
, len
);
1850 gxact
->ondisk
= true;
1851 gxact
->prepare_start_lsn
= InvalidXLogRecPtr
;
1852 gxact
->prepare_end_lsn
= InvalidXLogRecPtr
;
1857 LWLockRelease(TwoPhaseStateLock
);
1860 * Flush unconditionally the parent directory to make any information
1861 * durable on disk. Two-phase files could have been removed and those
1862 * removals need to be made persistent as well as any files newly created
1863 * previously since the last checkpoint.
1865 fsync_fname(TWOPHASE_DIR
, true);
1867 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1869 if (log_checkpoints
&& serialized_xacts
> 0)
1871 (errmsg_plural("%u two-phase state file was written "
1872 "for a long-running prepared transaction",
1873 "%u two-phase state files were written "
1874 "for long-running prepared transactions",
1876 serialized_xacts
)));
1880 * restoreTwoPhaseData
1882 * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1883 * This is called once at the beginning of recovery, saving any extra
1884 * lookups in the future. Two-phase files that are newer than the
1885 * minimum XID horizon are discarded on the way.
1888 restoreTwoPhaseData(void)
1891 struct dirent
*clde
;
1893 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
1894 cldir
= AllocateDir(TWOPHASE_DIR
);
1895 while ((clde
= ReadDir(cldir
, TWOPHASE_DIR
)) != NULL
)
1897 if (strlen(clde
->d_name
) == 16 &&
1898 strspn(clde
->d_name
, "0123456789ABCDEF") == 16)
1901 FullTransactionId fxid
;
1904 fxid
= FullTransactionIdFromU64(strtou64(clde
->d_name
, NULL
, 16));
1905 xid
= XidFromFullTransactionId(fxid
);
1907 buf
= ProcessTwoPhaseBuffer(xid
, InvalidXLogRecPtr
,
1908 true, false, false);
1912 PrepareRedoAdd(buf
, InvalidXLogRecPtr
,
1913 InvalidXLogRecPtr
, InvalidRepOriginId
);
1916 LWLockRelease(TwoPhaseStateLock
);
1921 * PrescanPreparedTransactions
1923 * Scan the shared memory entries of TwoPhaseState and determine the range
1924 * of valid XIDs present. This is run during database startup, after we
1925 * have completed reading WAL. TransamVariables->nextXid has been set to
1926 * one more than the highest XID for which evidence exists in WAL.
1928 * We throw away any prepared xacts with main XID beyond nextXid --- if any
1929 * are present, it suggests that the DBA has done a PITR recovery to an
1930 * earlier point in time without cleaning out pg_twophase. We dare not
1931 * try to recover such prepared xacts since they likely depend on database
1932 * state that doesn't exist now.
1934 * However, we will advance nextXid beyond any subxact XIDs belonging to
1935 * valid prepared xacts. We need to do this since subxact commit doesn't
1936 * write a WAL entry, and so there might be no evidence in WAL of those
1939 * On corrupted two-phase files, fail immediately. Keeping around broken
1940 * entries and let replay continue causes harm on the system, and a new
1941 * backup should be rolled in.
1943 * Our other responsibility is to determine and return the oldest valid XID
1944 * among the prepared xacts (if none, return TransamVariables->nextXid).
1945 * This is needed to synchronize pg_subtrans startup properly.
1947 * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1948 * top-level xids is stored in *xids_p. The number of entries in the array
1949 * is returned in *nxids_p.
1952 PrescanPreparedTransactions(TransactionId
**xids_p
, int *nxids_p
)
1954 FullTransactionId nextXid
= TransamVariables
->nextXid
;
1955 TransactionId origNextXid
= XidFromFullTransactionId(nextXid
);
1956 TransactionId result
= origNextXid
;
1957 TransactionId
*xids
= NULL
;
1962 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
1963 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
1967 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
1969 Assert(gxact
->inredo
);
1973 buf
= ProcessTwoPhaseBuffer(xid
,
1974 gxact
->prepare_start_lsn
,
1975 gxact
->ondisk
, false, true);
1981 * OK, we think this file is valid. Incorporate xid into the
1982 * running-minimum result.
1984 if (TransactionIdPrecedes(xid
, result
))
1989 if (nxids
== allocsize
)
1994 xids
= palloc(allocsize
* sizeof(TransactionId
));
1998 allocsize
= allocsize
* 2;
1999 xids
= repalloc(xids
, allocsize
* sizeof(TransactionId
));
2002 xids
[nxids
++] = xid
;
2007 LWLockRelease(TwoPhaseStateLock
);
2019 * StandbyRecoverPreparedTransactions
2021 * Scan the shared memory entries of TwoPhaseState and setup all the required
2022 * information to allow standby queries to treat prepared transactions as still
2025 * This is never called at the end of recovery - we use
2026 * RecoverPreparedTransactions() at that point.
2028 * This updates pg_subtrans, so that any subtransactions will be correctly
2029 * seen as in-progress in snapshots taken during recovery.
2032 StandbyRecoverPreparedTransactions(void)
2036 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
2037 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
2041 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
2043 Assert(gxact
->inredo
);
2047 buf
= ProcessTwoPhaseBuffer(xid
,
2048 gxact
->prepare_start_lsn
,
2049 gxact
->ondisk
, true, false);
2053 LWLockRelease(TwoPhaseStateLock
);
2057 * RecoverPreparedTransactions
2059 * Scan the shared memory entries of TwoPhaseState and reload the state for
2060 * each prepared transaction (reacquire locks, etc).
2062 * This is run at the end of recovery, but before we allow backends to write
2065 * At the end of recovery the way we take snapshots will change. We now need
2066 * to mark all running transactions with their full SubTransSetParent() info
2067 * to allow normal snapshots to work correctly if snapshots overflow.
2068 * We do this here because by definition prepared transactions are the only
2069 * type of write transaction still running, so this is necessary and
2073 RecoverPreparedTransactions(void)
2077 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
2078 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
2082 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
2084 TwoPhaseFileHeader
*hdr
;
2085 TransactionId
*subxids
;
2091 * Reconstruct subtrans state for the transaction --- needed because
2092 * pg_subtrans is not preserved over a restart. Note that we are
2093 * linking all the subtransactions directly to the top-level XID;
2094 * there may originally have been a more complex hierarchy, but
2095 * there's no need to restore that exactly. It's possible that
2096 * SubTransSetParent has been set before, if the prepared transaction
2097 * generated xid assignment records.
2099 buf
= ProcessTwoPhaseBuffer(xid
,
2100 gxact
->prepare_start_lsn
,
2101 gxact
->ondisk
, true, false);
2106 (errmsg("recovering prepared transaction %u from shared memory", xid
)));
2108 hdr
= (TwoPhaseFileHeader
*) buf
;
2109 Assert(TransactionIdEquals(hdr
->xid
, xid
));
2110 bufptr
= buf
+ MAXALIGN(sizeof(TwoPhaseFileHeader
));
2111 gid
= (const char *) bufptr
;
2112 bufptr
+= MAXALIGN(hdr
->gidlen
);
2113 subxids
= (TransactionId
*) bufptr
;
2114 bufptr
+= MAXALIGN(hdr
->nsubxacts
* sizeof(TransactionId
));
2115 bufptr
+= MAXALIGN(hdr
->ncommitrels
* sizeof(RelFileLocator
));
2116 bufptr
+= MAXALIGN(hdr
->nabortrels
* sizeof(RelFileLocator
));
2117 bufptr
+= MAXALIGN(hdr
->ncommitstats
* sizeof(xl_xact_stats_item
));
2118 bufptr
+= MAXALIGN(hdr
->nabortstats
* sizeof(xl_xact_stats_item
));
2119 bufptr
+= MAXALIGN(hdr
->ninvalmsgs
* sizeof(SharedInvalidationMessage
));
2122 * Recreate its GXACT and dummy PGPROC. But, check whether it was
2123 * added in redo and already has a shmem entry for it.
2125 MarkAsPreparingGuts(gxact
, xid
, gid
,
2127 hdr
->owner
, hdr
->database
);
2129 /* recovered, so reset the flag for entries generated by redo */
2130 gxact
->inredo
= false;
2132 GXactLoadSubxactData(gxact
, hdr
->nsubxacts
, subxids
);
2133 MarkAsPrepared(gxact
, true);
2135 LWLockRelease(TwoPhaseStateLock
);
2138 * Recover other state (notably locks) using resource managers.
2140 ProcessRecords(bufptr
, xid
, twophase_recover_callbacks
);
2143 * Release locks held by the standby process after we process each
2144 * prepared transaction. As a result, we don't need too many
2145 * additional locks at any one time.
2148 StandbyReleaseLockTree(xid
, hdr
->nsubxacts
, subxids
);
2151 * We're done with recovering this transaction. Clear MyLockedGxact,
2152 * like we do in PrepareTransaction() during normal operation.
2154 PostPrepare_Twophase();
2158 LWLockAcquire(TwoPhaseStateLock
, LW_EXCLUSIVE
);
2161 LWLockRelease(TwoPhaseStateLock
);
2165 * ProcessTwoPhaseBuffer
2167 * Given a transaction id, read it either from disk or read it directly
2168 * via shmem xlog record pointer using the provided "prepare_start_lsn".
2170 * If setParent is true, set up subtransaction parent linkages.
2172 * If setNextXid is true, set TransamVariables->nextXid to the newest
2176 ProcessTwoPhaseBuffer(TransactionId xid
,
2177 XLogRecPtr prepare_start_lsn
,
2179 bool setParent
, bool setNextXid
)
2181 FullTransactionId nextXid
= TransamVariables
->nextXid
;
2182 TransactionId origNextXid
= XidFromFullTransactionId(nextXid
);
2183 TransactionId
*subxids
;
2185 TwoPhaseFileHeader
*hdr
;
2188 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock
, LW_EXCLUSIVE
));
2191 Assert(prepare_start_lsn
!= InvalidXLogRecPtr
);
2193 /* Already processed? */
2194 if (TransactionIdDidCommit(xid
) || TransactionIdDidAbort(xid
))
2199 (errmsg("removing stale two-phase state file for transaction %u",
2201 RemoveTwoPhaseFile(xid
, true);
2206 (errmsg("removing stale two-phase state from memory for transaction %u",
2208 PrepareRedoRemove(xid
, true);
2213 /* Reject XID if too new */
2214 if (TransactionIdFollowsOrEquals(xid
, origNextXid
))
2219 (errmsg("removing future two-phase state file for transaction %u",
2221 RemoveTwoPhaseFile(xid
, true);
2226 (errmsg("removing future two-phase state from memory for transaction %u",
2228 PrepareRedoRemove(xid
, true);
2235 /* Read and validate file */
2236 buf
= ReadTwoPhaseFile(xid
, false);
2240 /* Read xlog data */
2241 XlogReadTwoPhaseData(prepare_start_lsn
, &buf
, NULL
);
2244 /* Deconstruct header */
2245 hdr
= (TwoPhaseFileHeader
*) buf
;
2246 if (!TransactionIdEquals(hdr
->xid
, xid
))
2250 (errcode(ERRCODE_DATA_CORRUPTED
),
2251 errmsg("corrupted two-phase state file for transaction %u",
2255 (errcode(ERRCODE_DATA_CORRUPTED
),
2256 errmsg("corrupted two-phase state in memory for transaction %u",
2261 * Examine subtransaction XIDs ... they should all follow main XID, and
2262 * they may force us to advance nextXid.
2264 subxids
= (TransactionId
*) (buf
+
2265 MAXALIGN(sizeof(TwoPhaseFileHeader
)) +
2266 MAXALIGN(hdr
->gidlen
));
2267 for (i
= 0; i
< hdr
->nsubxacts
; i
++)
2269 TransactionId subxid
= subxids
[i
];
2271 Assert(TransactionIdFollows(subxid
, xid
));
2273 /* update nextXid if needed */
2275 AdvanceNextFullTransactionIdPastXid(subxid
);
2278 SubTransSetParent(subxid
, xid
);
2286 * RecordTransactionCommitPrepared
2288 * This is basically the same as RecordTransactionCommit (q.v. if you change
2289 * this function): in particular, we must set DELAY_CHKPT_START to avoid a
2292 * We know the transaction made at least one XLOG entry (its PREPARE),
2293 * so it is never possible to optimize out the commit record.
2296 RecordTransactionCommitPrepared(TransactionId xid
,
2298 TransactionId
*children
,
2300 RelFileLocator
*rels
,
2302 xl_xact_stats_item
*stats
,
2304 SharedInvalidationMessage
*invalmsgs
,
2309 TimestampTz committs
= GetCurrentTimestamp();
2313 * Are we using the replication origins feature? Or, in other words, are
2314 * we replaying remote actions?
2316 replorigin
= (replorigin_session_origin
!= InvalidRepOriginId
&&
2317 replorigin_session_origin
!= DoNotReplicateId
);
2319 START_CRIT_SECTION();
2321 /* See notes in RecordTransactionCommit */
2322 Assert((MyProc
->delayChkptFlags
& DELAY_CHKPT_START
) == 0);
2323 MyProc
->delayChkptFlags
|= DELAY_CHKPT_START
;
2326 * Emit the XLOG commit record. Note that we mark 2PC commits as
2327 * potentially having AccessExclusiveLocks since we don't know whether or
2330 recptr
= XactLogCommitRecord(committs
,
2331 nchildren
, children
, nrels
, rels
,
2333 ninvalmsgs
, invalmsgs
,
2335 MyXactFlags
| XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK
,
2340 /* Move LSNs forward for this replication origin */
2341 replorigin_session_advance(replorigin_session_origin_lsn
,
2345 * Record commit timestamp. The value comes from plain commit timestamp
2346 * if replorigin is not enabled, or replorigin already set a value for us
2347 * in replorigin_session_origin_timestamp otherwise.
2349 * We don't need to WAL-log anything here, as the commit record written
2350 * above already contains the data.
2352 if (!replorigin
|| replorigin_session_origin_timestamp
== 0)
2353 replorigin_session_origin_timestamp
= committs
;
2355 TransactionTreeSetCommitTsData(xid
, nchildren
, children
,
2356 replorigin_session_origin_timestamp
,
2357 replorigin_session_origin
);
2360 * We don't currently try to sleep before flush here ... nor is there any
2361 * support for async commit of a prepared xact (the very idea is probably
2365 /* Flush XLOG to disk */
2368 /* Mark the transaction committed in pg_xact */
2369 TransactionIdCommitTree(xid
, nchildren
, children
);
2371 /* Checkpoint can proceed now */
2372 MyProc
->delayChkptFlags
&= ~DELAY_CHKPT_START
;
2377 * Wait for synchronous replication, if required.
2379 * Note that at this stage we have marked clog, but still show as running
2380 * in the procarray and continue to hold locks.
2382 SyncRepWaitForLSN(recptr
, true);
2386 * RecordTransactionAbortPrepared
2388 * This is basically the same as RecordTransactionAbort.
2390 * We know the transaction made at least one XLOG entry (its PREPARE),
2391 * so it is never possible to optimize out the abort record.
2394 RecordTransactionAbortPrepared(TransactionId xid
,
2396 TransactionId
*children
,
2398 RelFileLocator
*rels
,
2400 xl_xact_stats_item
*stats
,
2407 * Are we using the replication origins feature? Or, in other words, are
2408 * we replaying remote actions?
2410 replorigin
= (replorigin_session_origin
!= InvalidRepOriginId
&&
2411 replorigin_session_origin
!= DoNotReplicateId
);
2414 * Catch the scenario where we aborted partway through
2415 * RecordTransactionCommitPrepared ...
2417 if (TransactionIdDidCommit(xid
))
2418 elog(PANIC
, "cannot abort transaction %u, it was already committed",
2421 START_CRIT_SECTION();
2424 * Emit the XLOG commit record. Note that we mark 2PC aborts as
2425 * potentially having AccessExclusiveLocks since we don't know whether or
2428 recptr
= XactLogAbortRecord(GetCurrentTimestamp(),
2429 nchildren
, children
,
2432 MyXactFlags
| XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK
,
2436 /* Move LSNs forward for this replication origin */
2437 replorigin_session_advance(replorigin_session_origin_lsn
,
2440 /* Always flush, since we're about to remove the 2PC state file */
2444 * Mark the transaction aborted in clog. This is not absolutely necessary
2445 * but we may as well do it while we are here.
2447 TransactionIdAbortTree(xid
, nchildren
, children
);
2452 * Wait for synchronous replication, if required.
2454 * Note that at this stage we have marked clog, but still show as running
2455 * in the procarray and continue to hold locks.
2457 SyncRepWaitForLSN(recptr
, false);
2463 * Store pointers to the start/end of the WAL record along with the xid in
2464 * a gxact entry in shared memory TwoPhaseState structure. If caller
2465 * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2466 * data, the entry is marked as located on disk.
2469 PrepareRedoAdd(char *buf
, XLogRecPtr start_lsn
,
2470 XLogRecPtr end_lsn
, RepOriginId origin_id
)
2472 TwoPhaseFileHeader
*hdr
= (TwoPhaseFileHeader
*) buf
;
2475 GlobalTransaction gxact
;
2477 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock
, LW_EXCLUSIVE
));
2478 Assert(RecoveryInProgress());
2480 bufptr
= buf
+ MAXALIGN(sizeof(TwoPhaseFileHeader
));
2481 gid
= (const char *) bufptr
;
2484 * Reserve the GID for the given transaction in the redo code path.
2486 * This creates a gxact struct and puts it into the active array.
2488 * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2489 * shared memory. Hence, we only fill up the bare minimum contents here.
2490 * The gxact also gets marked with gxact->inredo set to true to indicate
2491 * that it got added in the redo phase
2495 * In the event of a crash while a checkpoint was running, it may be
2496 * possible that some two-phase data found its way to disk while its
2497 * corresponding record needs to be replayed in the follow-up recovery. As
2498 * the 2PC data was on disk, it has already been restored at the beginning
2499 * of recovery with restoreTwoPhaseData(), so skip this record to avoid
2500 * duplicates in TwoPhaseState. If a consistent state has been reached,
2501 * the record is added to TwoPhaseState and it should have no
2502 * corresponding file in pg_twophase.
2504 if (!XLogRecPtrIsInvalid(start_lsn
))
2506 char path
[MAXPGPATH
];
2508 TwoPhaseFilePath(path
, hdr
->xid
);
2510 if (access(path
, F_OK
) == 0)
2512 ereport(reachedConsistency
? ERROR
: WARNING
,
2513 (errmsg("could not recover two-phase state file for transaction %u",
2515 errdetail("Two-phase state file has been found in WAL record %X/%X, but this transaction has already been restored from disk.",
2516 LSN_FORMAT_ARGS(start_lsn
))));
2520 if (errno
!= ENOENT
)
2522 (errcode_for_file_access(),
2523 errmsg("could not access file \"%s\": %m", path
)));
2526 /* Get a free gxact from the freelist */
2527 if (TwoPhaseState
->freeGXacts
== NULL
)
2529 (errcode(ERRCODE_OUT_OF_MEMORY
),
2530 errmsg("maximum number of prepared transactions reached"),
2531 errhint("Increase \"max_prepared_transactions\" (currently %d).",
2532 max_prepared_xacts
)));
2533 gxact
= TwoPhaseState
->freeGXacts
;
2534 TwoPhaseState
->freeGXacts
= gxact
->next
;
2536 gxact
->prepared_at
= hdr
->prepared_at
;
2537 gxact
->prepare_start_lsn
= start_lsn
;
2538 gxact
->prepare_end_lsn
= end_lsn
;
2539 gxact
->xid
= hdr
->xid
;
2540 gxact
->owner
= hdr
->owner
;
2541 gxact
->locking_backend
= INVALID_PROC_NUMBER
;
2542 gxact
->valid
= false;
2543 gxact
->ondisk
= XLogRecPtrIsInvalid(start_lsn
);
2544 gxact
->inredo
= true; /* yes, added in redo */
2545 strcpy(gxact
->gid
, gid
);
2547 /* And insert it into the active array */
2548 Assert(TwoPhaseState
->numPrepXacts
< max_prepared_xacts
);
2549 TwoPhaseState
->prepXacts
[TwoPhaseState
->numPrepXacts
++] = gxact
;
2551 if (origin_id
!= InvalidRepOriginId
)
2553 /* recover apply progress */
2554 replorigin_advance(origin_id
, hdr
->origin_lsn
, end_lsn
,
2555 false /* backward */ , false /* WAL */ );
2558 elog(DEBUG2
, "added 2PC data in shared memory for transaction %u", gxact
->xid
);
2564 * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2565 * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2567 * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2571 PrepareRedoRemove(TransactionId xid
, bool giveWarning
)
2573 GlobalTransaction gxact
= NULL
;
2577 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock
, LW_EXCLUSIVE
));
2578 Assert(RecoveryInProgress());
2580 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
2582 gxact
= TwoPhaseState
->prepXacts
[i
];
2584 if (gxact
->xid
== xid
)
2586 Assert(gxact
->inredo
);
2593 * Just leave if there is nothing, this is expected during WAL replay.
2599 * And now we can clean up any files we may have left.
2601 elog(DEBUG2
, "removing 2PC data for transaction %u", xid
);
2603 RemoveTwoPhaseFile(xid
, giveWarning
);
2609 * Check if the prepared transaction with the given GID, lsn and timestamp
2612 * Note that we always compare with the LSN where prepare ends because that is
2613 * what is stored as origin_lsn in the 2PC file.
2615 * This function is primarily used to check if the prepared transaction
2616 * received from the upstream (remote node) already exists. Checking only GID
2617 * is not sufficient because a different prepared xact with the same GID can
2618 * exist on the same node. So, we are ensuring to match origin_lsn and
2619 * origin_timestamp of prepared xact to avoid the possibility of a match of
2620 * prepared xact from two different nodes.
2623 LookupGXact(const char *gid
, XLogRecPtr prepare_end_lsn
,
2624 TimestampTz origin_prepare_timestamp
)
2629 LWLockAcquire(TwoPhaseStateLock
, LW_SHARED
);
2630 for (i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
2632 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
2634 /* Ignore not-yet-valid GIDs. */
2635 if (gxact
->valid
&& strcmp(gxact
->gid
, gid
) == 0)
2638 TwoPhaseFileHeader
*hdr
;
2641 * We are not expecting collisions of GXACTs (same gid) between
2642 * publisher and subscribers, so we perform all I/O while holding
2643 * TwoPhaseStateLock for simplicity.
2645 * To move the I/O out of the lock, we need to ensure that no
2646 * other backend commits the prepared xact in the meantime. We can
2647 * do this optimization if we encounter many collisions in GID
2648 * between publisher and subscriber.
2651 buf
= ReadTwoPhaseFile(gxact
->xid
, false);
2654 Assert(gxact
->prepare_start_lsn
);
2655 XlogReadTwoPhaseData(gxact
->prepare_start_lsn
, &buf
, NULL
);
2658 hdr
= (TwoPhaseFileHeader
*) buf
;
2660 if (hdr
->origin_lsn
== prepare_end_lsn
&&
2661 hdr
->origin_timestamp
== origin_prepare_timestamp
)
2671 LWLockRelease(TwoPhaseStateLock
);
2676 * TwoPhaseTransactionGid
2677 * Form the prepared transaction GID for two_phase transactions.
2679 * Return the GID in the supplied buffer.
2682 TwoPhaseTransactionGid(Oid subid
, TransactionId xid
, char *gid_res
, int szgid
)
2684 Assert(OidIsValid(subid
));
2686 if (!TransactionIdIsValid(xid
))
2688 (errcode(ERRCODE_PROTOCOL_VIOLATION
),
2689 errmsg_internal("invalid two-phase transaction ID")));
2691 snprintf(gid_res
, szgid
, "pg_gid_%u_%u", subid
, xid
);
2695 * IsTwoPhaseTransactionGidForSubid
2696 * Check whether the given GID (as formed by TwoPhaseTransactionGid) is
2697 * for the specified 'subid'.
2700 IsTwoPhaseTransactionGidForSubid(Oid subid
, char *gid
)
2704 TransactionId xid_from_gid
;
2705 char gid_tmp
[GIDSIZE
];
2707 /* Extract the subid and xid from the given GID */
2708 ret
= sscanf(gid
, "pg_gid_%u_%u", &subid_from_gid
, &xid_from_gid
);
2711 * Check that the given GID has expected format, and at least the subid
2714 if (ret
!= 2 || subid
!= subid_from_gid
)
2718 * Reconstruct a temporary GID based on the subid and xid extracted from
2719 * the given GID and check whether the temporary GID and the given GID
2722 TwoPhaseTransactionGid(subid
, xid_from_gid
, gid_tmp
, sizeof(gid_tmp
));
2724 return strcmp(gid
, gid_tmp
) == 0;
2728 * LookupGXactBySubid
2729 * Check if the prepared transaction done by apply worker exists.
2732 LookupGXactBySubid(Oid subid
)
2736 LWLockAcquire(TwoPhaseStateLock
, LW_SHARED
);
2737 for (int i
= 0; i
< TwoPhaseState
->numPrepXacts
; i
++)
2739 GlobalTransaction gxact
= TwoPhaseState
->prepXacts
[i
];
2741 /* Ignore not-yet-valid GIDs. */
2743 IsTwoPhaseTransactionGidForSubid(subid
, gxact
->gid
))
2749 LWLockRelease(TwoPhaseStateLock
);