Rename pubgencols_type to pubgencols in pg_publication.
[pgsql.git] / src / backend / access / transam / twophase.c
blob73a80559194e75f5ca2c8c69060b35b79199ff3f
1 /*-------------------------------------------------------------------------
3 * twophase.c
4 * Two-phase commit support functions.
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
9 * IDENTIFICATION
10 * src/backend/access/transam/twophase.c
12 * NOTES
13 * Each global transaction is associated with a global transaction
14 * identifier (GID). The client assigns a GID to a postgres
15 * transaction with the PREPARE TRANSACTION command.
17 * We keep all active global transactions in a shared memory array.
18 * When the PREPARE TRANSACTION command is issued, the GID is
19 * reserved for the transaction in the array. This is done before
20 * a WAL entry is made, because the reservation checks for duplicate
21 * GIDs and aborts the transaction if there already is a global
22 * transaction in prepared state with the same GID.
24 * A global transaction (gxact) also has dummy PGPROC; this is what keeps
25 * the XID considered running by TransactionIdIsInProgress. It is also
26 * convenient as a PGPROC to hook the gxact's locks to.
28 * Information to recover prepared transactions in case of crash is
29 * now stored in WAL for the common case. In some cases there will be
30 * an extended period between preparing a GXACT and commit/abort, in
31 * which case we need to separately record prepared transaction data
32 * in permanent storage. This includes locking information, pending
33 * notifications etc. All that state information is written to the
34 * per-transaction state file in the pg_twophase directory.
35 * All prepared transactions will be written prior to shutdown.
37 * Life track of state data is following:
39 * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 * stores pointer to the start of the WAL record in
41 * gxact->prepare_start_lsn.
42 * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 * using prepare_start_lsn.
44 * * On checkpoint state data copied to files in pg_twophase directory and
45 * fsynced
46 * * If COMMIT happens after checkpoint then backend reads state data from
47 * files
49 * During replay and replication, TwoPhaseState also holds information
50 * about active prepared transactions that haven't been moved to disk yet.
52 * Replay of twophase records happens by the following rules:
54 * * At the beginning of recovery, pg_twophase is scanned once, filling
55 * TwoPhaseState with entries marked with gxact->inredo and
56 * gxact->ondisk. Two-phase file data older than the XID horizon of
57 * the redo position are discarded.
58 * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 * gxact->inredo is set to true for such entries.
60 * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 * that have gxact->inredo set and are behind the redo_horizon. We
62 * save them to disk and then switch gxact->ondisk to true.
63 * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 * If gxact->ondisk is true, the corresponding entry from the disk
65 * is additionally deleted.
66 * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 * and PrescanPreparedTransactions() have been modified to go through
68 * gxact->inredo entries that have not made it to disk.
70 *-------------------------------------------------------------------------
72 #include "postgres.h"
74 #include <fcntl.h>
75 #include <sys/stat.h>
76 #include <time.h>
77 #include <unistd.h>
79 #include "access/commit_ts.h"
80 #include "access/htup_details.h"
81 #include "access/subtrans.h"
82 #include "access/transam.h"
83 #include "access/twophase.h"
84 #include "access/twophase_rmgr.h"
85 #include "access/xact.h"
86 #include "access/xlog.h"
87 #include "access/xloginsert.h"
88 #include "access/xlogreader.h"
89 #include "access/xlogrecovery.h"
90 #include "access/xlogutils.h"
91 #include "catalog/pg_type.h"
92 #include "catalog/storage.h"
93 #include "funcapi.h"
94 #include "miscadmin.h"
95 #include "pg_trace.h"
96 #include "pgstat.h"
97 #include "replication/origin.h"
98 #include "replication/syncrep.h"
99 #include "storage/fd.h"
100 #include "storage/ipc.h"
101 #include "storage/md.h"
102 #include "storage/predicate.h"
103 #include "storage/proc.h"
104 #include "storage/procarray.h"
105 #include "utils/builtins.h"
106 #include "utils/memutils.h"
107 #include "utils/timestamp.h"
110 * Directory where Two-phase commit files reside within PGDATA
112 #define TWOPHASE_DIR "pg_twophase"
114 /* GUC variable, can't be changed after startup */
115 int max_prepared_xacts = 0;
118 * This struct describes one global transaction that is in prepared state
119 * or attempting to become prepared.
121 * The lifecycle of a global transaction is:
123 * 1. After checking that the requested GID is not in use, set up an entry in
124 * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
125 * and mark it as locked by my backend.
127 * 2. After successfully completing prepare, set valid = true and enter the
128 * referenced PGPROC into the global ProcArray.
130 * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
131 * valid and not locked, then mark the entry as locked by storing my current
132 * proc number into locking_backend. This prevents concurrent attempts to
133 * commit or rollback the same prepared xact.
135 * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
136 * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
137 * the freelist.
139 * Note that if the preparing transaction fails between steps 1 and 2, the
140 * entry must be removed so that the GID and the GlobalTransaction struct
141 * can be reused. See AtAbort_Twophase().
143 * typedef struct GlobalTransactionData *GlobalTransaction appears in
144 * twophase.h
147 typedef struct GlobalTransactionData
149 GlobalTransaction next; /* list link for free list */
150 int pgprocno; /* ID of associated dummy PGPROC */
151 TimestampTz prepared_at; /* time of preparation */
154 * Note that we need to keep track of two LSNs for each GXACT. We keep
155 * track of the start LSN because this is the address we must use to read
156 * state data back from WAL when committing a prepared GXACT. We keep
157 * track of the end LSN because that is the LSN we need to wait for prior
158 * to commit.
160 XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
161 XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
162 TransactionId xid; /* The GXACT id */
164 Oid owner; /* ID of user that executed the xact */
165 ProcNumber locking_backend; /* backend currently working on the xact */
166 bool valid; /* true if PGPROC entry is in proc array */
167 bool ondisk; /* true if prepare state file is on disk */
168 bool inredo; /* true if entry was added via xlog_redo */
169 char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
170 } GlobalTransactionData;
173 * Two Phase Commit shared state. Access to this struct is protected
174 * by TwoPhaseStateLock.
176 typedef struct TwoPhaseStateData
178 /* Head of linked list of free GlobalTransactionData structs */
179 GlobalTransaction freeGXacts;
181 /* Number of valid prepXacts entries. */
182 int numPrepXacts;
184 /* There are max_prepared_xacts items in this array */
185 GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
186 } TwoPhaseStateData;
188 static TwoPhaseStateData *TwoPhaseState;
191 * Global transaction entry currently locked by us, if any. Note that any
192 * access to the entry pointed to by this variable must be protected by
193 * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
194 * (since it's just local memory).
196 static GlobalTransaction MyLockedGxact = NULL;
198 static bool twophaseExitRegistered = false;
200 static void RecordTransactionCommitPrepared(TransactionId xid,
201 int nchildren,
202 TransactionId *children,
203 int nrels,
204 RelFileLocator *rels,
205 int nstats,
206 xl_xact_stats_item *stats,
207 int ninvalmsgs,
208 SharedInvalidationMessage *invalmsgs,
209 bool initfileinval,
210 const char *gid);
211 static void RecordTransactionAbortPrepared(TransactionId xid,
212 int nchildren,
213 TransactionId *children,
214 int nrels,
215 RelFileLocator *rels,
216 int nstats,
217 xl_xact_stats_item *stats,
218 const char *gid);
219 static void ProcessRecords(char *bufptr, TransactionId xid,
220 const TwoPhaseCallback callbacks[]);
221 static void RemoveGXact(GlobalTransaction gxact);
223 static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
224 static char *ProcessTwoPhaseBuffer(TransactionId xid,
225 XLogRecPtr prepare_start_lsn,
226 bool fromdisk, bool setParent, bool setNextXid);
227 static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
228 const char *gid, TimestampTz prepared_at, Oid owner,
229 Oid databaseid);
230 static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
231 static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
234 * Initialization of shared memory
236 Size
237 TwoPhaseShmemSize(void)
239 Size size;
241 /* Need the fixed struct, the array of pointers, and the GTD structs */
242 size = offsetof(TwoPhaseStateData, prepXacts);
243 size = add_size(size, mul_size(max_prepared_xacts,
244 sizeof(GlobalTransaction)));
245 size = MAXALIGN(size);
246 size = add_size(size, mul_size(max_prepared_xacts,
247 sizeof(GlobalTransactionData)));
249 return size;
252 void
253 TwoPhaseShmemInit(void)
255 bool found;
257 TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
258 TwoPhaseShmemSize(),
259 &found);
260 if (!IsUnderPostmaster)
262 GlobalTransaction gxacts;
263 int i;
265 Assert(!found);
266 TwoPhaseState->freeGXacts = NULL;
267 TwoPhaseState->numPrepXacts = 0;
270 * Initialize the linked list of free GlobalTransactionData structs
272 gxacts = (GlobalTransaction)
273 ((char *) TwoPhaseState +
274 MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
275 sizeof(GlobalTransaction) * max_prepared_xacts));
276 for (i = 0; i < max_prepared_xacts; i++)
278 /* insert into linked list */
279 gxacts[i].next = TwoPhaseState->freeGXacts;
280 TwoPhaseState->freeGXacts = &gxacts[i];
282 /* associate it with a PGPROC assigned by InitProcGlobal */
283 gxacts[i].pgprocno = GetNumberFromPGProc(&PreparedXactProcs[i]);
286 else
287 Assert(found);
291 * Exit hook to unlock the global transaction entry we're working on.
293 static void
294 AtProcExit_Twophase(int code, Datum arg)
296 /* same logic as abort */
297 AtAbort_Twophase();
301 * Abort hook to unlock the global transaction entry we're working on.
303 void
304 AtAbort_Twophase(void)
306 if (MyLockedGxact == NULL)
307 return;
310 * What to do with the locked global transaction entry? If we were in the
311 * process of preparing the transaction, but haven't written the WAL
312 * record and state file yet, the transaction must not be considered as
313 * prepared. Likewise, if we are in the process of finishing an
314 * already-prepared transaction, and fail after having already written the
315 * 2nd phase commit or rollback record to the WAL, the transaction should
316 * not be considered as prepared anymore. In those cases, just remove the
317 * entry from shared memory.
319 * Otherwise, the entry must be left in place so that the transaction can
320 * be finished later, so just unlock it.
322 * If we abort during prepare, after having written the WAL record, we
323 * might not have transferred all locks and other state to the prepared
324 * transaction yet. Likewise, if we abort during commit or rollback,
325 * after having written the WAL record, we might not have released all the
326 * resources held by the transaction yet. In those cases, the in-memory
327 * state can be wrong, but it's too late to back out.
329 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
330 if (!MyLockedGxact->valid)
331 RemoveGXact(MyLockedGxact);
332 else
333 MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
334 LWLockRelease(TwoPhaseStateLock);
336 MyLockedGxact = NULL;
340 * This is called after we have finished transferring state to the prepared
341 * PGPROC entry.
343 void
344 PostPrepare_Twophase(void)
346 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
347 MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
348 LWLockRelease(TwoPhaseStateLock);
350 MyLockedGxact = NULL;
355 * MarkAsPreparing
356 * Reserve the GID for the given transaction.
358 GlobalTransaction
359 MarkAsPreparing(TransactionId xid, const char *gid,
360 TimestampTz prepared_at, Oid owner, Oid databaseid)
362 GlobalTransaction gxact;
363 int i;
365 if (strlen(gid) >= GIDSIZE)
366 ereport(ERROR,
367 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
368 errmsg("transaction identifier \"%s\" is too long",
369 gid)));
371 /* fail immediately if feature is disabled */
372 if (max_prepared_xacts == 0)
373 ereport(ERROR,
374 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
375 errmsg("prepared transactions are disabled"),
376 errhint("Set \"max_prepared_transactions\" to a nonzero value.")));
378 /* on first call, register the exit hook */
379 if (!twophaseExitRegistered)
381 before_shmem_exit(AtProcExit_Twophase, 0);
382 twophaseExitRegistered = true;
385 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
387 /* Check for conflicting GID */
388 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
390 gxact = TwoPhaseState->prepXacts[i];
391 if (strcmp(gxact->gid, gid) == 0)
393 ereport(ERROR,
394 (errcode(ERRCODE_DUPLICATE_OBJECT),
395 errmsg("transaction identifier \"%s\" is already in use",
396 gid)));
400 /* Get a free gxact from the freelist */
401 if (TwoPhaseState->freeGXacts == NULL)
402 ereport(ERROR,
403 (errcode(ERRCODE_OUT_OF_MEMORY),
404 errmsg("maximum number of prepared transactions reached"),
405 errhint("Increase \"max_prepared_transactions\" (currently %d).",
406 max_prepared_xacts)));
407 gxact = TwoPhaseState->freeGXacts;
408 TwoPhaseState->freeGXacts = gxact->next;
410 MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
412 gxact->ondisk = false;
414 /* And insert it into the active array */
415 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
416 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
418 LWLockRelease(TwoPhaseStateLock);
420 return gxact;
424 * MarkAsPreparingGuts
426 * This uses a gxact struct and puts it into the active array.
427 * NOTE: this is also used when reloading a gxact after a crash; so avoid
428 * assuming that we can use very much backend context.
430 * Note: This function should be called with appropriate locks held.
432 static void
433 MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
434 TimestampTz prepared_at, Oid owner, Oid databaseid)
436 PGPROC *proc;
437 int i;
439 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
441 Assert(gxact != NULL);
442 proc = GetPGProcByNumber(gxact->pgprocno);
444 /* Initialize the PGPROC entry */
445 MemSet(proc, 0, sizeof(PGPROC));
446 dlist_node_init(&proc->links);
447 proc->waitStatus = PROC_WAIT_STATUS_OK;
448 if (LocalTransactionIdIsValid(MyProc->vxid.lxid))
450 /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
451 proc->vxid.lxid = MyProc->vxid.lxid;
452 proc->vxid.procNumber = MyProcNumber;
454 else
456 Assert(AmStartupProcess() || !IsPostmasterEnvironment);
457 /* GetLockConflicts() uses this to specify a wait on the XID */
458 proc->vxid.lxid = xid;
459 proc->vxid.procNumber = INVALID_PROC_NUMBER;
461 proc->xid = xid;
462 Assert(proc->xmin == InvalidTransactionId);
463 proc->delayChkptFlags = 0;
464 proc->statusFlags = 0;
465 proc->pid = 0;
466 proc->databaseId = databaseid;
467 proc->roleId = owner;
468 proc->tempNamespaceId = InvalidOid;
469 proc->isRegularBackend = false;
470 proc->lwWaiting = LW_WS_NOT_WAITING;
471 proc->lwWaitMode = 0;
472 proc->waitLock = NULL;
473 proc->waitProcLock = NULL;
474 pg_atomic_init_u64(&proc->waitStart, 0);
475 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
476 dlist_init(&proc->myProcLocks[i]);
477 /* subxid data must be filled later by GXactLoadSubxactData */
478 proc->subxidStatus.overflowed = false;
479 proc->subxidStatus.count = 0;
481 gxact->prepared_at = prepared_at;
482 gxact->xid = xid;
483 gxact->owner = owner;
484 gxact->locking_backend = MyProcNumber;
485 gxact->valid = false;
486 gxact->inredo = false;
487 strcpy(gxact->gid, gid);
490 * Remember that we have this GlobalTransaction entry locked for us. If we
491 * abort after this, we must release it.
493 MyLockedGxact = gxact;
497 * GXactLoadSubxactData
499 * If the transaction being persisted had any subtransactions, this must
500 * be called before MarkAsPrepared() to load information into the dummy
501 * PGPROC.
503 static void
504 GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
505 TransactionId *children)
507 PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
509 /* We need no extra lock since the GXACT isn't valid yet */
510 if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
512 proc->subxidStatus.overflowed = true;
513 nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
515 if (nsubxacts > 0)
517 memcpy(proc->subxids.xids, children,
518 nsubxacts * sizeof(TransactionId));
519 proc->subxidStatus.count = nsubxacts;
524 * MarkAsPrepared
525 * Mark the GXACT as fully valid, and enter it into the global ProcArray.
527 * lock_held indicates whether caller already holds TwoPhaseStateLock.
529 static void
530 MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
532 /* Lock here may be overkill, but I'm not convinced of that ... */
533 if (!lock_held)
534 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
535 Assert(!gxact->valid);
536 gxact->valid = true;
537 if (!lock_held)
538 LWLockRelease(TwoPhaseStateLock);
541 * Put it into the global ProcArray so TransactionIdIsInProgress considers
542 * the XID as still running.
544 ProcArrayAdd(GetPGProcByNumber(gxact->pgprocno));
548 * LockGXact
549 * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
551 static GlobalTransaction
552 LockGXact(const char *gid, Oid user)
554 int i;
556 /* on first call, register the exit hook */
557 if (!twophaseExitRegistered)
559 before_shmem_exit(AtProcExit_Twophase, 0);
560 twophaseExitRegistered = true;
563 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
565 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
567 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
568 PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
570 /* Ignore not-yet-valid GIDs */
571 if (!gxact->valid)
572 continue;
573 if (strcmp(gxact->gid, gid) != 0)
574 continue;
576 /* Found it, but has someone else got it locked? */
577 if (gxact->locking_backend != INVALID_PROC_NUMBER)
578 ereport(ERROR,
579 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
580 errmsg("prepared transaction with identifier \"%s\" is busy",
581 gid)));
583 if (user != gxact->owner && !superuser_arg(user))
584 ereport(ERROR,
585 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
586 errmsg("permission denied to finish prepared transaction"),
587 errhint("Must be superuser or the user that prepared the transaction.")));
590 * Note: it probably would be possible to allow committing from
591 * another database; but at the moment NOTIFY is known not to work and
592 * there may be some other issues as well. Hence disallow until
593 * someone gets motivated to make it work.
595 if (MyDatabaseId != proc->databaseId)
596 ereport(ERROR,
597 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
598 errmsg("prepared transaction belongs to another database"),
599 errhint("Connect to the database where the transaction was prepared to finish it.")));
601 /* OK for me to lock it */
602 gxact->locking_backend = MyProcNumber;
603 MyLockedGxact = gxact;
605 LWLockRelease(TwoPhaseStateLock);
607 return gxact;
610 LWLockRelease(TwoPhaseStateLock);
612 ereport(ERROR,
613 (errcode(ERRCODE_UNDEFINED_OBJECT),
614 errmsg("prepared transaction with identifier \"%s\" does not exist",
615 gid)));
617 /* NOTREACHED */
618 return NULL;
622 * RemoveGXact
623 * Remove the prepared transaction from the shared memory array.
625 * NB: caller should have already removed it from ProcArray
627 static void
628 RemoveGXact(GlobalTransaction gxact)
630 int i;
632 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
634 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
636 if (gxact == TwoPhaseState->prepXacts[i])
638 /* remove from the active array */
639 TwoPhaseState->numPrepXacts--;
640 TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
642 /* and put it back in the freelist */
643 gxact->next = TwoPhaseState->freeGXacts;
644 TwoPhaseState->freeGXacts = gxact;
646 return;
650 elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
654 * Returns an array of all prepared transactions for the user-level
655 * function pg_prepared_xact.
657 * The returned array and all its elements are copies of internal data
658 * structures, to minimize the time we need to hold the TwoPhaseStateLock.
660 * WARNING -- we return even those transactions that are not fully prepared
661 * yet. The caller should filter them out if he doesn't want them.
663 * The returned array is palloc'd.
665 static int
666 GetPreparedTransactionList(GlobalTransaction *gxacts)
668 GlobalTransaction array;
669 int num;
670 int i;
672 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
674 if (TwoPhaseState->numPrepXacts == 0)
676 LWLockRelease(TwoPhaseStateLock);
678 *gxacts = NULL;
679 return 0;
682 num = TwoPhaseState->numPrepXacts;
683 array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
684 *gxacts = array;
685 for (i = 0; i < num; i++)
686 memcpy(array + i, TwoPhaseState->prepXacts[i],
687 sizeof(GlobalTransactionData));
689 LWLockRelease(TwoPhaseStateLock);
691 return num;
695 /* Working status for pg_prepared_xact */
696 typedef struct
698 GlobalTransaction array;
699 int ngxacts;
700 int currIdx;
701 } Working_State;
704 * pg_prepared_xact
705 * Produce a view with one row per prepared transaction.
707 * This function is here so we don't have to export the
708 * GlobalTransactionData struct definition.
710 Datum
711 pg_prepared_xact(PG_FUNCTION_ARGS)
713 FuncCallContext *funcctx;
714 Working_State *status;
716 if (SRF_IS_FIRSTCALL())
718 TupleDesc tupdesc;
719 MemoryContext oldcontext;
721 /* create a function context for cross-call persistence */
722 funcctx = SRF_FIRSTCALL_INIT();
725 * Switch to memory context appropriate for multiple function calls
727 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
729 /* build tupdesc for result tuples */
730 /* this had better match pg_prepared_xacts view in system_views.sql */
731 tupdesc = CreateTemplateTupleDesc(5);
732 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
733 XIDOID, -1, 0);
734 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
735 TEXTOID, -1, 0);
736 TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
737 TIMESTAMPTZOID, -1, 0);
738 TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
739 OIDOID, -1, 0);
740 TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
741 OIDOID, -1, 0);
743 funcctx->tuple_desc = BlessTupleDesc(tupdesc);
746 * Collect all the 2PC status information that we will format and send
747 * out as a result set.
749 status = (Working_State *) palloc(sizeof(Working_State));
750 funcctx->user_fctx = status;
752 status->ngxacts = GetPreparedTransactionList(&status->array);
753 status->currIdx = 0;
755 MemoryContextSwitchTo(oldcontext);
758 funcctx = SRF_PERCALL_SETUP();
759 status = (Working_State *) funcctx->user_fctx;
761 while (status->array != NULL && status->currIdx < status->ngxacts)
763 GlobalTransaction gxact = &status->array[status->currIdx++];
764 PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
765 Datum values[5] = {0};
766 bool nulls[5] = {0};
767 HeapTuple tuple;
768 Datum result;
770 if (!gxact->valid)
771 continue;
774 * Form tuple with appropriate data.
777 values[0] = TransactionIdGetDatum(proc->xid);
778 values[1] = CStringGetTextDatum(gxact->gid);
779 values[2] = TimestampTzGetDatum(gxact->prepared_at);
780 values[3] = ObjectIdGetDatum(gxact->owner);
781 values[4] = ObjectIdGetDatum(proc->databaseId);
783 tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
784 result = HeapTupleGetDatum(tuple);
785 SRF_RETURN_NEXT(funcctx, result);
788 SRF_RETURN_DONE(funcctx);
792 * TwoPhaseGetGXact
793 * Get the GlobalTransaction struct for a prepared transaction
794 * specified by XID
796 * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
797 * caller had better hold it.
799 static GlobalTransaction
800 TwoPhaseGetGXact(TransactionId xid, bool lock_held)
802 GlobalTransaction result = NULL;
803 int i;
805 static TransactionId cached_xid = InvalidTransactionId;
806 static GlobalTransaction cached_gxact = NULL;
808 Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
811 * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
812 * repeatedly for the same XID. We can save work with a simple cache.
814 if (xid == cached_xid)
815 return cached_gxact;
817 if (!lock_held)
818 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
820 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
822 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
824 if (gxact->xid == xid)
826 result = gxact;
827 break;
831 if (!lock_held)
832 LWLockRelease(TwoPhaseStateLock);
834 if (result == NULL) /* should not happen */
835 elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
837 cached_xid = xid;
838 cached_gxact = result;
840 return result;
844 * TwoPhaseGetXidByVirtualXID
845 * Lookup VXID among xacts prepared since last startup.
847 * (This won't find recovered xacts.) If more than one matches, return any
848 * and set "have_more" to true. To witness multiple matches, a single
849 * proc number must consume 2^32 LXIDs, with no intervening database restart.
851 TransactionId
852 TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
853 bool *have_more)
855 int i;
856 TransactionId result = InvalidTransactionId;
858 Assert(VirtualTransactionIdIsValid(vxid));
859 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
861 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
863 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
864 PGPROC *proc;
865 VirtualTransactionId proc_vxid;
867 if (!gxact->valid)
868 continue;
869 proc = GetPGProcByNumber(gxact->pgprocno);
870 GET_VXID_FROM_PGPROC(proc_vxid, *proc);
871 if (VirtualTransactionIdEquals(vxid, proc_vxid))
874 * Startup process sets proc->vxid.procNumber to
875 * INVALID_PROC_NUMBER.
877 Assert(!gxact->inredo);
879 if (result != InvalidTransactionId)
881 *have_more = true;
882 break;
884 result = gxact->xid;
888 LWLockRelease(TwoPhaseStateLock);
890 return result;
894 * TwoPhaseGetDummyProcNumber
895 * Get the dummy proc number for prepared transaction specified by XID
897 * Dummy proc numbers are similar to proc numbers of real backends. They
898 * start at MaxBackends, and are unique across all currently active real
899 * backends and prepared transactions. If lock_held is set to true,
900 * TwoPhaseStateLock will not be taken, so the caller had better hold it.
902 ProcNumber
903 TwoPhaseGetDummyProcNumber(TransactionId xid, bool lock_held)
905 GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
907 return gxact->pgprocno;
911 * TwoPhaseGetDummyProc
912 * Get the PGPROC that represents a prepared transaction specified by XID
914 * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
915 * caller had better hold it.
917 PGPROC *
918 TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
920 GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
922 return GetPGProcByNumber(gxact->pgprocno);
925 /************************************************************************/
926 /* State file support */
927 /************************************************************************/
930 * Compute the FullTransactionId for the given TransactionId.
932 * This is safe if the xid has not yet reached COMMIT PREPARED or ROLLBACK
933 * PREPARED. After those commands, concurrent vac_truncate_clog() may make
934 * the xid cease to qualify as allowable. XXX Not all callers limit their
935 * calls accordingly.
937 static inline FullTransactionId
938 AdjustToFullTransactionId(TransactionId xid)
940 Assert(TransactionIdIsValid(xid));
941 return FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
944 static inline int
945 TwoPhaseFilePath(char *path, TransactionId xid)
947 FullTransactionId fxid = AdjustToFullTransactionId(xid);
949 return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
950 EpochFromFullTransactionId(fxid),
951 XidFromFullTransactionId(fxid));
955 * 2PC state file format:
957 * 1. TwoPhaseFileHeader
958 * 2. TransactionId[] (subtransactions)
959 * 3. RelFileLocator[] (files to be deleted at commit)
960 * 4. RelFileLocator[] (files to be deleted at abort)
961 * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
962 * 6. TwoPhaseRecordOnDisk
963 * 7. ...
964 * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
965 * 9. checksum (CRC-32C)
967 * Each segment except the final checksum is MAXALIGN'd.
971 * Header for a 2PC state file
973 #define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
975 typedef xl_xact_prepare TwoPhaseFileHeader;
978 * Header for each record in a state file
980 * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
981 * The rmgr data will be stored starting on a MAXALIGN boundary.
983 typedef struct TwoPhaseRecordOnDisk
985 uint32 len; /* length of rmgr data */
986 TwoPhaseRmgrId rmid; /* resource manager for this record */
987 uint16 info; /* flag bits for use by rmgr */
988 } TwoPhaseRecordOnDisk;
991 * During prepare, the state file is assembled in memory before writing it
992 * to WAL and the actual state file. We use a chain of StateFileChunk blocks
993 * for that.
995 typedef struct StateFileChunk
997 char *data;
998 uint32 len;
999 struct StateFileChunk *next;
1000 } StateFileChunk;
1002 static struct xllist
1004 StateFileChunk *head; /* first data block in the chain */
1005 StateFileChunk *tail; /* last block in chain */
1006 uint32 num_chunks;
1007 uint32 bytes_free; /* free bytes left in tail block */
1008 uint32 total_len; /* total data bytes in chain */
1009 } records;
1013 * Append a block of data to records data structure.
1015 * NB: each block is padded to a MAXALIGN multiple. This must be
1016 * accounted for when the file is later read!
1018 * The data is copied, so the caller is free to modify it afterwards.
1020 static void
1021 save_state_data(const void *data, uint32 len)
1023 uint32 padlen = MAXALIGN(len);
1025 if (padlen > records.bytes_free)
1027 records.tail->next = palloc0(sizeof(StateFileChunk));
1028 records.tail = records.tail->next;
1029 records.tail->len = 0;
1030 records.tail->next = NULL;
1031 records.num_chunks++;
1033 records.bytes_free = Max(padlen, 512);
1034 records.tail->data = palloc(records.bytes_free);
1037 memcpy(((char *) records.tail->data) + records.tail->len, data, len);
1038 records.tail->len += padlen;
1039 records.bytes_free -= padlen;
1040 records.total_len += padlen;
1044 * Start preparing a state file.
1046 * Initializes data structure and inserts the 2PC file header record.
1048 void
1049 StartPrepare(GlobalTransaction gxact)
1051 PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
1052 TransactionId xid = gxact->xid;
1053 TwoPhaseFileHeader hdr;
1054 TransactionId *children;
1055 RelFileLocator *commitrels;
1056 RelFileLocator *abortrels;
1057 xl_xact_stats_item *abortstats = NULL;
1058 xl_xact_stats_item *commitstats = NULL;
1059 SharedInvalidationMessage *invalmsgs;
1061 /* Initialize linked list */
1062 records.head = palloc0(sizeof(StateFileChunk));
1063 records.head->len = 0;
1064 records.head->next = NULL;
1066 records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1067 records.head->data = palloc(records.bytes_free);
1069 records.tail = records.head;
1070 records.num_chunks = 1;
1072 records.total_len = 0;
1074 /* Create header */
1075 hdr.magic = TWOPHASE_MAGIC;
1076 hdr.total_len = 0; /* EndPrepare will fill this in */
1077 hdr.xid = xid;
1078 hdr.database = proc->databaseId;
1079 hdr.prepared_at = gxact->prepared_at;
1080 hdr.owner = gxact->owner;
1081 hdr.nsubxacts = xactGetCommittedChildren(&children);
1082 hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1083 hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1084 hdr.ncommitstats =
1085 pgstat_get_transactional_drops(true, &commitstats);
1086 hdr.nabortstats =
1087 pgstat_get_transactional_drops(false, &abortstats);
1088 hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1089 &hdr.initfileinval);
1090 hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1091 /* EndPrepare will fill the origin data, if necessary */
1092 hdr.origin_lsn = InvalidXLogRecPtr;
1093 hdr.origin_timestamp = 0;
1095 save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
1096 save_state_data(gxact->gid, hdr.gidlen);
1099 * Add the additional info about subxacts, deletable files and cache
1100 * invalidation messages.
1102 if (hdr.nsubxacts > 0)
1104 save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1105 /* While we have the child-xact data, stuff it in the gxact too */
1106 GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1108 if (hdr.ncommitrels > 0)
1110 save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileLocator));
1111 pfree(commitrels);
1113 if (hdr.nabortrels > 0)
1115 save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileLocator));
1116 pfree(abortrels);
1118 if (hdr.ncommitstats > 0)
1120 save_state_data(commitstats,
1121 hdr.ncommitstats * sizeof(xl_xact_stats_item));
1122 pfree(commitstats);
1124 if (hdr.nabortstats > 0)
1126 save_state_data(abortstats,
1127 hdr.nabortstats * sizeof(xl_xact_stats_item));
1128 pfree(abortstats);
1130 if (hdr.ninvalmsgs > 0)
1132 save_state_data(invalmsgs,
1133 hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1134 pfree(invalmsgs);
1139 * Finish preparing state data and writing it to WAL.
1141 void
1142 EndPrepare(GlobalTransaction gxact)
1144 TwoPhaseFileHeader *hdr;
1145 StateFileChunk *record;
1146 bool replorigin;
1148 /* Add the end sentinel to the list of 2PC records */
1149 RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1150 NULL, 0);
1152 /* Go back and fill in total_len in the file header record */
1153 hdr = (TwoPhaseFileHeader *) records.head->data;
1154 Assert(hdr->magic == TWOPHASE_MAGIC);
1155 hdr->total_len = records.total_len + sizeof(pg_crc32c);
1157 replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1158 replorigin_session_origin != DoNotReplicateId);
1160 if (replorigin)
1162 hdr->origin_lsn = replorigin_session_origin_lsn;
1163 hdr->origin_timestamp = replorigin_session_origin_timestamp;
1167 * If the data size exceeds MaxAllocSize, we won't be able to read it in
1168 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1169 * where we write data to file and then re-read at commit time.
1171 if (hdr->total_len > MaxAllocSize)
1172 ereport(ERROR,
1173 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1174 errmsg("two-phase state file maximum length exceeded")));
1177 * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1178 * cover us, so no need to calculate a separate CRC.
1180 * We have to set DELAY_CHKPT_START here, too; otherwise a checkpoint
1181 * starting immediately after the WAL record is inserted could complete
1182 * without fsync'ing our state file. (This is essentially the same kind
1183 * of race condition as the COMMIT-to-clog-write case that
1184 * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.)
1186 * We save the PREPARE record's location in the gxact for later use by
1187 * CheckPointTwoPhase.
1189 XLogEnsureRecordSpace(0, records.num_chunks);
1191 START_CRIT_SECTION();
1193 Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
1194 MyProc->delayChkptFlags |= DELAY_CHKPT_START;
1196 XLogBeginInsert();
1197 for (record = records.head; record != NULL; record = record->next)
1198 XLogRegisterData(record->data, record->len);
1200 XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1202 gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1204 if (replorigin)
1206 /* Move LSNs forward for this replication origin */
1207 replorigin_session_advance(replorigin_session_origin_lsn,
1208 gxact->prepare_end_lsn);
1211 XLogFlush(gxact->prepare_end_lsn);
1213 /* If we crash now, we have prepared: WAL replay will fix things */
1215 /* Store record's start location to read that later on Commit */
1216 gxact->prepare_start_lsn = ProcLastRecPtr;
1219 * Mark the prepared transaction as valid. As soon as xact.c marks MyProc
1220 * as not running our XID (which it will do immediately after this
1221 * function returns), others can commit/rollback the xact.
1223 * NB: a side effect of this is to make a dummy ProcArray entry for the
1224 * prepared XID. This must happen before we clear the XID from MyProc /
1225 * ProcGlobal->xids[], else there is a window where the XID is not running
1226 * according to TransactionIdIsInProgress, and onlookers would be entitled
1227 * to assume the xact crashed. Instead we have a window where the same
1228 * XID appears twice in ProcArray, which is OK.
1230 MarkAsPrepared(gxact, false);
1233 * Now we can mark ourselves as out of the commit critical section: a
1234 * checkpoint starting after this will certainly see the gxact as a
1235 * candidate for fsyncing.
1237 MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
1240 * Remember that we have this GlobalTransaction entry locked for us. If
1241 * we crash after this point, it's too late to abort, but we must unlock
1242 * it so that the prepared transaction can be committed or rolled back.
1244 MyLockedGxact = gxact;
1246 END_CRIT_SECTION();
1249 * Wait for synchronous replication, if required.
1251 * Note that at this stage we have marked the prepare, but still show as
1252 * running in the procarray (twice!) and continue to hold locks.
1254 SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1256 records.tail = records.head = NULL;
1257 records.num_chunks = 0;
1261 * Register a 2PC record to be written to state file.
1263 void
1264 RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1265 const void *data, uint32 len)
1267 TwoPhaseRecordOnDisk record;
1269 record.rmid = rmid;
1270 record.info = info;
1271 record.len = len;
1272 save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1273 if (len > 0)
1274 save_state_data(data, len);
1279 * Read and validate the state file for xid.
1281 * If it looks OK (has a valid magic number and CRC), return the palloc'd
1282 * contents of the file, issuing an error when finding corrupted data. If
1283 * missing_ok is true, which indicates that missing files can be safely
1284 * ignored, then return NULL. This state can be reached when doing recovery.
1286 static char *
1287 ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
1289 char path[MAXPGPATH];
1290 char *buf;
1291 TwoPhaseFileHeader *hdr;
1292 int fd;
1293 struct stat stat;
1294 uint32 crc_offset;
1295 pg_crc32c calc_crc,
1296 file_crc;
1297 int r;
1299 TwoPhaseFilePath(path, xid);
1301 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1302 if (fd < 0)
1304 if (missing_ok && errno == ENOENT)
1305 return NULL;
1307 ereport(ERROR,
1308 (errcode_for_file_access(),
1309 errmsg("could not open file \"%s\": %m", path)));
1313 * Check file length. We can determine a lower bound pretty easily. We
1314 * set an upper bound to avoid palloc() failure on a corrupt file, though
1315 * we can't guarantee that we won't get an out of memory error anyway,
1316 * even on a valid file.
1318 if (fstat(fd, &stat))
1319 ereport(ERROR,
1320 (errcode_for_file_access(),
1321 errmsg("could not stat file \"%s\": %m", path)));
1323 if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1324 MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1325 sizeof(pg_crc32c)) ||
1326 stat.st_size > MaxAllocSize)
1327 ereport(ERROR,
1328 (errcode(ERRCODE_DATA_CORRUPTED),
1329 errmsg_plural("incorrect size of file \"%s\": %lld byte",
1330 "incorrect size of file \"%s\": %lld bytes",
1331 (long long int) stat.st_size, path,
1332 (long long int) stat.st_size)));
1334 crc_offset = stat.st_size - sizeof(pg_crc32c);
1335 if (crc_offset != MAXALIGN(crc_offset))
1336 ereport(ERROR,
1337 (errcode(ERRCODE_DATA_CORRUPTED),
1338 errmsg("incorrect alignment of CRC offset for file \"%s\"",
1339 path)));
1342 * OK, slurp in the file.
1344 buf = (char *) palloc(stat.st_size);
1346 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1347 r = read(fd, buf, stat.st_size);
1348 if (r != stat.st_size)
1350 if (r < 0)
1351 ereport(ERROR,
1352 (errcode_for_file_access(),
1353 errmsg("could not read file \"%s\": %m", path)));
1354 else
1355 ereport(ERROR,
1356 (errmsg("could not read file \"%s\": read %d of %lld",
1357 path, r, (long long int) stat.st_size)));
1360 pgstat_report_wait_end();
1362 if (CloseTransientFile(fd) != 0)
1363 ereport(ERROR,
1364 (errcode_for_file_access(),
1365 errmsg("could not close file \"%s\": %m", path)));
1367 hdr = (TwoPhaseFileHeader *) buf;
1368 if (hdr->magic != TWOPHASE_MAGIC)
1369 ereport(ERROR,
1370 (errcode(ERRCODE_DATA_CORRUPTED),
1371 errmsg("invalid magic number stored in file \"%s\"",
1372 path)));
1374 if (hdr->total_len != stat.st_size)
1375 ereport(ERROR,
1376 (errcode(ERRCODE_DATA_CORRUPTED),
1377 errmsg("invalid size stored in file \"%s\"",
1378 path)));
1380 INIT_CRC32C(calc_crc);
1381 COMP_CRC32C(calc_crc, buf, crc_offset);
1382 FIN_CRC32C(calc_crc);
1384 file_crc = *((pg_crc32c *) (buf + crc_offset));
1386 if (!EQ_CRC32C(calc_crc, file_crc))
1387 ereport(ERROR,
1388 (errcode(ERRCODE_DATA_CORRUPTED),
1389 errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1390 path)));
1392 return buf;
1397 * Reads 2PC data from xlog. During checkpoint this data will be moved to
1398 * twophase files and ReadTwoPhaseFile should be used instead.
1400 * Note clearly that this function can access WAL during normal operation,
1401 * similarly to the way WALSender or Logical Decoding would do.
1403 static void
1404 XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1406 XLogRecord *record;
1407 XLogReaderState *xlogreader;
1408 char *errormsg;
1410 xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
1411 XL_ROUTINE(.page_read = &read_local_xlog_page,
1412 .segment_open = &wal_segment_open,
1413 .segment_close = &wal_segment_close),
1414 NULL);
1415 if (!xlogreader)
1416 ereport(ERROR,
1417 (errcode(ERRCODE_OUT_OF_MEMORY),
1418 errmsg("out of memory"),
1419 errdetail("Failed while allocating a WAL reading processor.")));
1421 XLogBeginRead(xlogreader, lsn);
1422 record = XLogReadRecord(xlogreader, &errormsg);
1424 if (record == NULL)
1426 if (errormsg)
1427 ereport(ERROR,
1428 (errcode_for_file_access(),
1429 errmsg("could not read two-phase state from WAL at %X/%X: %s",
1430 LSN_FORMAT_ARGS(lsn), errormsg)));
1431 else
1432 ereport(ERROR,
1433 (errcode_for_file_access(),
1434 errmsg("could not read two-phase state from WAL at %X/%X",
1435 LSN_FORMAT_ARGS(lsn))));
1438 if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1439 (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
1440 ereport(ERROR,
1441 (errcode_for_file_access(),
1442 errmsg("expected two-phase state data is not present in WAL at %X/%X",
1443 LSN_FORMAT_ARGS(lsn))));
1445 if (len != NULL)
1446 *len = XLogRecGetDataLen(xlogreader);
1448 *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
1449 memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1451 XLogReaderFree(xlogreader);
1456 * Confirms an xid is prepared, during recovery
1458 bool
1459 StandbyTransactionIdIsPrepared(TransactionId xid)
1461 char *buf;
1462 TwoPhaseFileHeader *hdr;
1463 bool result;
1465 Assert(TransactionIdIsValid(xid));
1467 if (max_prepared_xacts <= 0)
1468 return false; /* nothing to do */
1470 /* Read and validate file */
1471 buf = ReadTwoPhaseFile(xid, true);
1472 if (buf == NULL)
1473 return false;
1475 /* Check header also */
1476 hdr = (TwoPhaseFileHeader *) buf;
1477 result = TransactionIdEquals(hdr->xid, xid);
1478 pfree(buf);
1480 return result;
1484 * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1486 void
1487 FinishPreparedTransaction(const char *gid, bool isCommit)
1489 GlobalTransaction gxact;
1490 PGPROC *proc;
1491 TransactionId xid;
1492 bool ondisk;
1493 char *buf;
1494 char *bufptr;
1495 TwoPhaseFileHeader *hdr;
1496 TransactionId latestXid;
1497 TransactionId *children;
1498 RelFileLocator *commitrels;
1499 RelFileLocator *abortrels;
1500 RelFileLocator *delrels;
1501 int ndelrels;
1502 xl_xact_stats_item *commitstats;
1503 xl_xact_stats_item *abortstats;
1504 SharedInvalidationMessage *invalmsgs;
1507 * Validate the GID, and lock the GXACT to ensure that two backends do not
1508 * try to commit the same GID at once.
1510 gxact = LockGXact(gid, GetUserId());
1511 proc = GetPGProcByNumber(gxact->pgprocno);
1512 xid = gxact->xid;
1515 * Read and validate 2PC state data. State data will typically be stored
1516 * in WAL files if the LSN is after the last checkpoint record, or moved
1517 * to disk if for some reason they have lived for a long time.
1519 if (gxact->ondisk)
1520 buf = ReadTwoPhaseFile(xid, false);
1521 else
1522 XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1526 * Disassemble the header area
1528 hdr = (TwoPhaseFileHeader *) buf;
1529 Assert(TransactionIdEquals(hdr->xid, xid));
1530 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1531 bufptr += MAXALIGN(hdr->gidlen);
1532 children = (TransactionId *) bufptr;
1533 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1534 commitrels = (RelFileLocator *) bufptr;
1535 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
1536 abortrels = (RelFileLocator *) bufptr;
1537 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
1538 commitstats = (xl_xact_stats_item *) bufptr;
1539 bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
1540 abortstats = (xl_xact_stats_item *) bufptr;
1541 bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
1542 invalmsgs = (SharedInvalidationMessage *) bufptr;
1543 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1545 /* compute latestXid among all children */
1546 latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1548 /* Prevent cancel/die interrupt while cleaning up */
1549 HOLD_INTERRUPTS();
1552 * The order of operations here is critical: make the XLOG entry for
1553 * commit or abort, then mark the transaction committed or aborted in
1554 * pg_xact, then remove its PGPROC from the global ProcArray (which means
1555 * TransactionIdIsInProgress will stop saying the prepared xact is in
1556 * progress), then run the post-commit or post-abort callbacks. The
1557 * callbacks will release the locks the transaction held.
1559 if (isCommit)
1560 RecordTransactionCommitPrepared(xid,
1561 hdr->nsubxacts, children,
1562 hdr->ncommitrels, commitrels,
1563 hdr->ncommitstats,
1564 commitstats,
1565 hdr->ninvalmsgs, invalmsgs,
1566 hdr->initfileinval, gid);
1567 else
1568 RecordTransactionAbortPrepared(xid,
1569 hdr->nsubxacts, children,
1570 hdr->nabortrels, abortrels,
1571 hdr->nabortstats,
1572 abortstats,
1573 gid);
1575 ProcArrayRemove(proc, latestXid);
1578 * In case we fail while running the callbacks, mark the gxact invalid so
1579 * no one else will try to commit/rollback, and so it will be recycled if
1580 * we fail after this point. It is still locked by our backend so it
1581 * won't go away yet.
1583 * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1585 gxact->valid = false;
1588 * We have to remove any files that were supposed to be dropped. For
1589 * consistency with the regular xact.c code paths, must do this before
1590 * releasing locks, so do it before running the callbacks.
1592 * NB: this code knows that we couldn't be dropping any temp rels ...
1594 if (isCommit)
1596 delrels = commitrels;
1597 ndelrels = hdr->ncommitrels;
1599 else
1601 delrels = abortrels;
1602 ndelrels = hdr->nabortrels;
1605 /* Make sure files supposed to be dropped are dropped */
1606 DropRelationFiles(delrels, ndelrels, false);
1608 if (isCommit)
1609 pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false);
1610 else
1611 pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false);
1614 * Handle cache invalidation messages.
1616 * Relcache init file invalidation requires processing both before and
1617 * after we send the SI messages, only when committing. See
1618 * AtEOXact_Inval().
1620 if (isCommit)
1622 if (hdr->initfileinval)
1623 RelationCacheInitFilePreInvalidate();
1624 SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1625 if (hdr->initfileinval)
1626 RelationCacheInitFilePostInvalidate();
1630 * Acquire the two-phase lock. We want to work on the two-phase callbacks
1631 * while holding it to avoid potential conflicts with other transactions
1632 * attempting to use the same GID, so the lock is released once the shared
1633 * memory state is cleared.
1635 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1637 /* And now do the callbacks */
1638 if (isCommit)
1639 ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1640 else
1641 ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1643 PredicateLockTwoPhaseFinish(xid, isCommit);
1646 * Read this value while holding the two-phase lock, as the on-disk 2PC
1647 * file is physically removed after the lock is released.
1649 ondisk = gxact->ondisk;
1651 /* Clear shared memory state */
1652 RemoveGXact(gxact);
1655 * Release the lock as all callbacks are called and shared memory cleanup
1656 * is done.
1658 LWLockRelease(TwoPhaseStateLock);
1660 /* Count the prepared xact as committed or aborted */
1661 AtEOXact_PgStat(isCommit, false);
1664 * And now we can clean up any files we may have left.
1666 if (ondisk)
1667 RemoveTwoPhaseFile(xid, true);
1669 MyLockedGxact = NULL;
1671 RESUME_INTERRUPTS();
1673 pfree(buf);
1677 * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1679 static void
1680 ProcessRecords(char *bufptr, TransactionId xid,
1681 const TwoPhaseCallback callbacks[])
1683 for (;;)
1685 TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1687 Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1688 if (record->rmid == TWOPHASE_RM_END_ID)
1689 break;
1691 bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1693 if (callbacks[record->rmid] != NULL)
1694 callbacks[record->rmid] (xid, record->info, bufptr, record->len);
1696 bufptr += MAXALIGN(record->len);
1701 * Remove the 2PC file for the specified XID.
1703 * If giveWarning is false, do not complain about file-not-present;
1704 * this is an expected case during WAL replay.
1706 static void
1707 RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1709 char path[MAXPGPATH];
1711 TwoPhaseFilePath(path, xid);
1712 if (unlink(path))
1713 if (errno != ENOENT || giveWarning)
1714 ereport(WARNING,
1715 (errcode_for_file_access(),
1716 errmsg("could not remove file \"%s\": %m", path)));
1720 * Recreates a state file. This is used in WAL replay and during
1721 * checkpoint creation.
1723 * Note: content and len don't include CRC.
1725 static void
1726 RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1728 char path[MAXPGPATH];
1729 pg_crc32c statefile_crc;
1730 int fd;
1732 /* Recompute CRC */
1733 INIT_CRC32C(statefile_crc);
1734 COMP_CRC32C(statefile_crc, content, len);
1735 FIN_CRC32C(statefile_crc);
1737 TwoPhaseFilePath(path, xid);
1739 fd = OpenTransientFile(path,
1740 O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
1741 if (fd < 0)
1742 ereport(ERROR,
1743 (errcode_for_file_access(),
1744 errmsg("could not recreate file \"%s\": %m", path)));
1746 /* Write content and CRC */
1747 errno = 0;
1748 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
1749 if (write(fd, content, len) != len)
1751 /* if write didn't set errno, assume problem is no disk space */
1752 if (errno == 0)
1753 errno = ENOSPC;
1754 ereport(ERROR,
1755 (errcode_for_file_access(),
1756 errmsg("could not write file \"%s\": %m", path)));
1758 if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1760 /* if write didn't set errno, assume problem is no disk space */
1761 if (errno == 0)
1762 errno = ENOSPC;
1763 ereport(ERROR,
1764 (errcode_for_file_access(),
1765 errmsg("could not write file \"%s\": %m", path)));
1767 pgstat_report_wait_end();
1770 * We must fsync the file because the end-of-replay checkpoint will not do
1771 * so, there being no GXACT in shared memory yet to tell it to.
1773 pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
1774 if (pg_fsync(fd) != 0)
1775 ereport(ERROR,
1776 (errcode_for_file_access(),
1777 errmsg("could not fsync file \"%s\": %m", path)));
1778 pgstat_report_wait_end();
1780 if (CloseTransientFile(fd) != 0)
1781 ereport(ERROR,
1782 (errcode_for_file_access(),
1783 errmsg("could not close file \"%s\": %m", path)));
1787 * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1789 * We must fsync the state file of any GXACT that is valid or has been
1790 * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1791 * horizon. (If the gxact isn't valid yet, has not been generated in
1792 * redo, or has a later LSN, this checkpoint is not responsible for
1793 * fsyncing it.)
1795 * This is deliberately run as late as possible in the checkpoint sequence,
1796 * because GXACTs ordinarily have short lifespans, and so it is quite
1797 * possible that GXACTs that were valid at checkpoint start will no longer
1798 * exist if we wait a little bit. With typical checkpoint settings this
1799 * will be about 3 minutes for an online checkpoint, so as a result we
1800 * expect that there will be no GXACTs that need to be copied to disk.
1802 * If a GXACT remains valid across multiple checkpoints, it will already
1803 * be on disk so we don't bother to repeat that write.
1805 void
1806 CheckPointTwoPhase(XLogRecPtr redo_horizon)
1808 int i;
1809 int serialized_xacts = 0;
1811 if (max_prepared_xacts <= 0)
1812 return; /* nothing to do */
1814 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1817 * We are expecting there to be zero GXACTs that need to be copied to
1818 * disk, so we perform all I/O while holding TwoPhaseStateLock for
1819 * simplicity. This prevents any new xacts from preparing while this
1820 * occurs, which shouldn't be a problem since the presence of long-lived
1821 * prepared xacts indicates the transaction manager isn't active.
1823 * It's also possible to move I/O out of the lock, but on every error we
1824 * should check whether somebody committed our transaction in different
1825 * backend. Let's leave this optimization for future, if somebody will
1826 * spot that this place cause bottleneck.
1828 * Note that it isn't possible for there to be a GXACT with a
1829 * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1830 * because of the efforts with delayChkptFlags.
1832 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1833 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1836 * Note that we are using gxact not PGPROC so this works in recovery
1837 * also
1839 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1841 if ((gxact->valid || gxact->inredo) &&
1842 !gxact->ondisk &&
1843 gxact->prepare_end_lsn <= redo_horizon)
1845 char *buf;
1846 int len;
1848 XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
1849 RecreateTwoPhaseFile(gxact->xid, buf, len);
1850 gxact->ondisk = true;
1851 gxact->prepare_start_lsn = InvalidXLogRecPtr;
1852 gxact->prepare_end_lsn = InvalidXLogRecPtr;
1853 pfree(buf);
1854 serialized_xacts++;
1857 LWLockRelease(TwoPhaseStateLock);
1860 * Flush unconditionally the parent directory to make any information
1861 * durable on disk. Two-phase files could have been removed and those
1862 * removals need to be made persistent as well as any files newly created
1863 * previously since the last checkpoint.
1865 fsync_fname(TWOPHASE_DIR, true);
1867 TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1869 if (log_checkpoints && serialized_xacts > 0)
1870 ereport(LOG,
1871 (errmsg_plural("%u two-phase state file was written "
1872 "for a long-running prepared transaction",
1873 "%u two-phase state files were written "
1874 "for long-running prepared transactions",
1875 serialized_xacts,
1876 serialized_xacts)));
1880 * restoreTwoPhaseData
1882 * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1883 * This is called once at the beginning of recovery, saving any extra
1884 * lookups in the future. Two-phase files that are newer than the
1885 * minimum XID horizon are discarded on the way.
1887 void
1888 restoreTwoPhaseData(void)
1890 DIR *cldir;
1891 struct dirent *clde;
1893 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1894 cldir = AllocateDir(TWOPHASE_DIR);
1895 while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1897 if (strlen(clde->d_name) == 16 &&
1898 strspn(clde->d_name, "0123456789ABCDEF") == 16)
1900 TransactionId xid;
1901 FullTransactionId fxid;
1902 char *buf;
1904 fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
1905 xid = XidFromFullTransactionId(fxid);
1907 buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
1908 true, false, false);
1909 if (buf == NULL)
1910 continue;
1912 PrepareRedoAdd(buf, InvalidXLogRecPtr,
1913 InvalidXLogRecPtr, InvalidRepOriginId);
1916 LWLockRelease(TwoPhaseStateLock);
1917 FreeDir(cldir);
1921 * PrescanPreparedTransactions
1923 * Scan the shared memory entries of TwoPhaseState and determine the range
1924 * of valid XIDs present. This is run during database startup, after we
1925 * have completed reading WAL. TransamVariables->nextXid has been set to
1926 * one more than the highest XID for which evidence exists in WAL.
1928 * We throw away any prepared xacts with main XID beyond nextXid --- if any
1929 * are present, it suggests that the DBA has done a PITR recovery to an
1930 * earlier point in time without cleaning out pg_twophase. We dare not
1931 * try to recover such prepared xacts since they likely depend on database
1932 * state that doesn't exist now.
1934 * However, we will advance nextXid beyond any subxact XIDs belonging to
1935 * valid prepared xacts. We need to do this since subxact commit doesn't
1936 * write a WAL entry, and so there might be no evidence in WAL of those
1937 * subxact XIDs.
1939 * On corrupted two-phase files, fail immediately. Keeping around broken
1940 * entries and let replay continue causes harm on the system, and a new
1941 * backup should be rolled in.
1943 * Our other responsibility is to determine and return the oldest valid XID
1944 * among the prepared xacts (if none, return TransamVariables->nextXid).
1945 * This is needed to synchronize pg_subtrans startup properly.
1947 * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1948 * top-level xids is stored in *xids_p. The number of entries in the array
1949 * is returned in *nxids_p.
1951 TransactionId
1952 PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1954 FullTransactionId nextXid = TransamVariables->nextXid;
1955 TransactionId origNextXid = XidFromFullTransactionId(nextXid);
1956 TransactionId result = origNextXid;
1957 TransactionId *xids = NULL;
1958 int nxids = 0;
1959 int allocsize = 0;
1960 int i;
1962 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1963 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1965 TransactionId xid;
1966 char *buf;
1967 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1969 Assert(gxact->inredo);
1971 xid = gxact->xid;
1973 buf = ProcessTwoPhaseBuffer(xid,
1974 gxact->prepare_start_lsn,
1975 gxact->ondisk, false, true);
1977 if (buf == NULL)
1978 continue;
1981 * OK, we think this file is valid. Incorporate xid into the
1982 * running-minimum result.
1984 if (TransactionIdPrecedes(xid, result))
1985 result = xid;
1987 if (xids_p)
1989 if (nxids == allocsize)
1991 if (nxids == 0)
1993 allocsize = 10;
1994 xids = palloc(allocsize * sizeof(TransactionId));
1996 else
1998 allocsize = allocsize * 2;
1999 xids = repalloc(xids, allocsize * sizeof(TransactionId));
2002 xids[nxids++] = xid;
2005 pfree(buf);
2007 LWLockRelease(TwoPhaseStateLock);
2009 if (xids_p)
2011 *xids_p = xids;
2012 *nxids_p = nxids;
2015 return result;
2019 * StandbyRecoverPreparedTransactions
2021 * Scan the shared memory entries of TwoPhaseState and setup all the required
2022 * information to allow standby queries to treat prepared transactions as still
2023 * active.
2025 * This is never called at the end of recovery - we use
2026 * RecoverPreparedTransactions() at that point.
2028 * This updates pg_subtrans, so that any subtransactions will be correctly
2029 * seen as in-progress in snapshots taken during recovery.
2031 void
2032 StandbyRecoverPreparedTransactions(void)
2034 int i;
2036 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2037 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2039 TransactionId xid;
2040 char *buf;
2041 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2043 Assert(gxact->inredo);
2045 xid = gxact->xid;
2047 buf = ProcessTwoPhaseBuffer(xid,
2048 gxact->prepare_start_lsn,
2049 gxact->ondisk, true, false);
2050 if (buf != NULL)
2051 pfree(buf);
2053 LWLockRelease(TwoPhaseStateLock);
2057 * RecoverPreparedTransactions
2059 * Scan the shared memory entries of TwoPhaseState and reload the state for
2060 * each prepared transaction (reacquire locks, etc).
2062 * This is run at the end of recovery, but before we allow backends to write
2063 * WAL.
2065 * At the end of recovery the way we take snapshots will change. We now need
2066 * to mark all running transactions with their full SubTransSetParent() info
2067 * to allow normal snapshots to work correctly if snapshots overflow.
2068 * We do this here because by definition prepared transactions are the only
2069 * type of write transaction still running, so this is necessary and
2070 * complete.
2072 void
2073 RecoverPreparedTransactions(void)
2075 int i;
2077 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2078 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2080 TransactionId xid;
2081 char *buf;
2082 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2083 char *bufptr;
2084 TwoPhaseFileHeader *hdr;
2085 TransactionId *subxids;
2086 const char *gid;
2088 xid = gxact->xid;
2091 * Reconstruct subtrans state for the transaction --- needed because
2092 * pg_subtrans is not preserved over a restart. Note that we are
2093 * linking all the subtransactions directly to the top-level XID;
2094 * there may originally have been a more complex hierarchy, but
2095 * there's no need to restore that exactly. It's possible that
2096 * SubTransSetParent has been set before, if the prepared transaction
2097 * generated xid assignment records.
2099 buf = ProcessTwoPhaseBuffer(xid,
2100 gxact->prepare_start_lsn,
2101 gxact->ondisk, true, false);
2102 if (buf == NULL)
2103 continue;
2105 ereport(LOG,
2106 (errmsg("recovering prepared transaction %u from shared memory", xid)));
2108 hdr = (TwoPhaseFileHeader *) buf;
2109 Assert(TransactionIdEquals(hdr->xid, xid));
2110 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2111 gid = (const char *) bufptr;
2112 bufptr += MAXALIGN(hdr->gidlen);
2113 subxids = (TransactionId *) bufptr;
2114 bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2115 bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
2116 bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
2117 bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
2118 bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
2119 bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2122 * Recreate its GXACT and dummy PGPROC. But, check whether it was
2123 * added in redo and already has a shmem entry for it.
2125 MarkAsPreparingGuts(gxact, xid, gid,
2126 hdr->prepared_at,
2127 hdr->owner, hdr->database);
2129 /* recovered, so reset the flag for entries generated by redo */
2130 gxact->inredo = false;
2132 GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2133 MarkAsPrepared(gxact, true);
2135 LWLockRelease(TwoPhaseStateLock);
2138 * Recover other state (notably locks) using resource managers.
2140 ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2143 * Release locks held by the standby process after we process each
2144 * prepared transaction. As a result, we don't need too many
2145 * additional locks at any one time.
2147 if (InHotStandby)
2148 StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2151 * We're done with recovering this transaction. Clear MyLockedGxact,
2152 * like we do in PrepareTransaction() during normal operation.
2154 PostPrepare_Twophase();
2156 pfree(buf);
2158 LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2161 LWLockRelease(TwoPhaseStateLock);
2165 * ProcessTwoPhaseBuffer
2167 * Given a transaction id, read it either from disk or read it directly
2168 * via shmem xlog record pointer using the provided "prepare_start_lsn".
2170 * If setParent is true, set up subtransaction parent linkages.
2172 * If setNextXid is true, set TransamVariables->nextXid to the newest
2173 * value scanned.
2175 static char *
2176 ProcessTwoPhaseBuffer(TransactionId xid,
2177 XLogRecPtr prepare_start_lsn,
2178 bool fromdisk,
2179 bool setParent, bool setNextXid)
2181 FullTransactionId nextXid = TransamVariables->nextXid;
2182 TransactionId origNextXid = XidFromFullTransactionId(nextXid);
2183 TransactionId *subxids;
2184 char *buf;
2185 TwoPhaseFileHeader *hdr;
2186 int i;
2188 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2190 if (!fromdisk)
2191 Assert(prepare_start_lsn != InvalidXLogRecPtr);
2193 /* Already processed? */
2194 if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
2196 if (fromdisk)
2198 ereport(WARNING,
2199 (errmsg("removing stale two-phase state file for transaction %u",
2200 xid)));
2201 RemoveTwoPhaseFile(xid, true);
2203 else
2205 ereport(WARNING,
2206 (errmsg("removing stale two-phase state from memory for transaction %u",
2207 xid)));
2208 PrepareRedoRemove(xid, true);
2210 return NULL;
2213 /* Reject XID if too new */
2214 if (TransactionIdFollowsOrEquals(xid, origNextXid))
2216 if (fromdisk)
2218 ereport(WARNING,
2219 (errmsg("removing future two-phase state file for transaction %u",
2220 xid)));
2221 RemoveTwoPhaseFile(xid, true);
2223 else
2225 ereport(WARNING,
2226 (errmsg("removing future two-phase state from memory for transaction %u",
2227 xid)));
2228 PrepareRedoRemove(xid, true);
2230 return NULL;
2233 if (fromdisk)
2235 /* Read and validate file */
2236 buf = ReadTwoPhaseFile(xid, false);
2238 else
2240 /* Read xlog data */
2241 XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2244 /* Deconstruct header */
2245 hdr = (TwoPhaseFileHeader *) buf;
2246 if (!TransactionIdEquals(hdr->xid, xid))
2248 if (fromdisk)
2249 ereport(ERROR,
2250 (errcode(ERRCODE_DATA_CORRUPTED),
2251 errmsg("corrupted two-phase state file for transaction %u",
2252 xid)));
2253 else
2254 ereport(ERROR,
2255 (errcode(ERRCODE_DATA_CORRUPTED),
2256 errmsg("corrupted two-phase state in memory for transaction %u",
2257 xid)));
2261 * Examine subtransaction XIDs ... they should all follow main XID, and
2262 * they may force us to advance nextXid.
2264 subxids = (TransactionId *) (buf +
2265 MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2266 MAXALIGN(hdr->gidlen));
2267 for (i = 0; i < hdr->nsubxacts; i++)
2269 TransactionId subxid = subxids[i];
2271 Assert(TransactionIdFollows(subxid, xid));
2273 /* update nextXid if needed */
2274 if (setNextXid)
2275 AdvanceNextFullTransactionIdPastXid(subxid);
2277 if (setParent)
2278 SubTransSetParent(subxid, xid);
2281 return buf;
2286 * RecordTransactionCommitPrepared
2288 * This is basically the same as RecordTransactionCommit (q.v. if you change
2289 * this function): in particular, we must set DELAY_CHKPT_START to avoid a
2290 * race condition.
2292 * We know the transaction made at least one XLOG entry (its PREPARE),
2293 * so it is never possible to optimize out the commit record.
2295 static void
2296 RecordTransactionCommitPrepared(TransactionId xid,
2297 int nchildren,
2298 TransactionId *children,
2299 int nrels,
2300 RelFileLocator *rels,
2301 int nstats,
2302 xl_xact_stats_item *stats,
2303 int ninvalmsgs,
2304 SharedInvalidationMessage *invalmsgs,
2305 bool initfileinval,
2306 const char *gid)
2308 XLogRecPtr recptr;
2309 TimestampTz committs = GetCurrentTimestamp();
2310 bool replorigin;
2313 * Are we using the replication origins feature? Or, in other words, are
2314 * we replaying remote actions?
2316 replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2317 replorigin_session_origin != DoNotReplicateId);
2319 START_CRIT_SECTION();
2321 /* See notes in RecordTransactionCommit */
2322 Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
2323 MyProc->delayChkptFlags |= DELAY_CHKPT_START;
2326 * Emit the XLOG commit record. Note that we mark 2PC commits as
2327 * potentially having AccessExclusiveLocks since we don't know whether or
2328 * not they do.
2330 recptr = XactLogCommitRecord(committs,
2331 nchildren, children, nrels, rels,
2332 nstats, stats,
2333 ninvalmsgs, invalmsgs,
2334 initfileinval,
2335 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2336 xid, gid);
2339 if (replorigin)
2340 /* Move LSNs forward for this replication origin */
2341 replorigin_session_advance(replorigin_session_origin_lsn,
2342 XactLastRecEnd);
2345 * Record commit timestamp. The value comes from plain commit timestamp
2346 * if replorigin is not enabled, or replorigin already set a value for us
2347 * in replorigin_session_origin_timestamp otherwise.
2349 * We don't need to WAL-log anything here, as the commit record written
2350 * above already contains the data.
2352 if (!replorigin || replorigin_session_origin_timestamp == 0)
2353 replorigin_session_origin_timestamp = committs;
2355 TransactionTreeSetCommitTsData(xid, nchildren, children,
2356 replorigin_session_origin_timestamp,
2357 replorigin_session_origin);
2360 * We don't currently try to sleep before flush here ... nor is there any
2361 * support for async commit of a prepared xact (the very idea is probably
2362 * a contradiction)
2365 /* Flush XLOG to disk */
2366 XLogFlush(recptr);
2368 /* Mark the transaction committed in pg_xact */
2369 TransactionIdCommitTree(xid, nchildren, children);
2371 /* Checkpoint can proceed now */
2372 MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
2374 END_CRIT_SECTION();
2377 * Wait for synchronous replication, if required.
2379 * Note that at this stage we have marked clog, but still show as running
2380 * in the procarray and continue to hold locks.
2382 SyncRepWaitForLSN(recptr, true);
2386 * RecordTransactionAbortPrepared
2388 * This is basically the same as RecordTransactionAbort.
2390 * We know the transaction made at least one XLOG entry (its PREPARE),
2391 * so it is never possible to optimize out the abort record.
2393 static void
2394 RecordTransactionAbortPrepared(TransactionId xid,
2395 int nchildren,
2396 TransactionId *children,
2397 int nrels,
2398 RelFileLocator *rels,
2399 int nstats,
2400 xl_xact_stats_item *stats,
2401 const char *gid)
2403 XLogRecPtr recptr;
2404 bool replorigin;
2407 * Are we using the replication origins feature? Or, in other words, are
2408 * we replaying remote actions?
2410 replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2411 replorigin_session_origin != DoNotReplicateId);
2414 * Catch the scenario where we aborted partway through
2415 * RecordTransactionCommitPrepared ...
2417 if (TransactionIdDidCommit(xid))
2418 elog(PANIC, "cannot abort transaction %u, it was already committed",
2419 xid);
2421 START_CRIT_SECTION();
2424 * Emit the XLOG commit record. Note that we mark 2PC aborts as
2425 * potentially having AccessExclusiveLocks since we don't know whether or
2426 * not they do.
2428 recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2429 nchildren, children,
2430 nrels, rels,
2431 nstats, stats,
2432 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2433 xid, gid);
2435 if (replorigin)
2436 /* Move LSNs forward for this replication origin */
2437 replorigin_session_advance(replorigin_session_origin_lsn,
2438 XactLastRecEnd);
2440 /* Always flush, since we're about to remove the 2PC state file */
2441 XLogFlush(recptr);
2444 * Mark the transaction aborted in clog. This is not absolutely necessary
2445 * but we may as well do it while we are here.
2447 TransactionIdAbortTree(xid, nchildren, children);
2449 END_CRIT_SECTION();
2452 * Wait for synchronous replication, if required.
2454 * Note that at this stage we have marked clog, but still show as running
2455 * in the procarray and continue to hold locks.
2457 SyncRepWaitForLSN(recptr, false);
2461 * PrepareRedoAdd
2463 * Store pointers to the start/end of the WAL record along with the xid in
2464 * a gxact entry in shared memory TwoPhaseState structure. If caller
2465 * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2466 * data, the entry is marked as located on disk.
2468 void
2469 PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
2470 XLogRecPtr end_lsn, RepOriginId origin_id)
2472 TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2473 char *bufptr;
2474 const char *gid;
2475 GlobalTransaction gxact;
2477 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2478 Assert(RecoveryInProgress());
2480 bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2481 gid = (const char *) bufptr;
2484 * Reserve the GID for the given transaction in the redo code path.
2486 * This creates a gxact struct and puts it into the active array.
2488 * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2489 * shared memory. Hence, we only fill up the bare minimum contents here.
2490 * The gxact also gets marked with gxact->inredo set to true to indicate
2491 * that it got added in the redo phase
2495 * In the event of a crash while a checkpoint was running, it may be
2496 * possible that some two-phase data found its way to disk while its
2497 * corresponding record needs to be replayed in the follow-up recovery. As
2498 * the 2PC data was on disk, it has already been restored at the beginning
2499 * of recovery with restoreTwoPhaseData(), so skip this record to avoid
2500 * duplicates in TwoPhaseState. If a consistent state has been reached,
2501 * the record is added to TwoPhaseState and it should have no
2502 * corresponding file in pg_twophase.
2504 if (!XLogRecPtrIsInvalid(start_lsn))
2506 char path[MAXPGPATH];
2508 TwoPhaseFilePath(path, hdr->xid);
2510 if (access(path, F_OK) == 0)
2512 ereport(reachedConsistency ? ERROR : WARNING,
2513 (errmsg("could not recover two-phase state file for transaction %u",
2514 hdr->xid),
2515 errdetail("Two-phase state file has been found in WAL record %X/%X, but this transaction has already been restored from disk.",
2516 LSN_FORMAT_ARGS(start_lsn))));
2517 return;
2520 if (errno != ENOENT)
2521 ereport(ERROR,
2522 (errcode_for_file_access(),
2523 errmsg("could not access file \"%s\": %m", path)));
2526 /* Get a free gxact from the freelist */
2527 if (TwoPhaseState->freeGXacts == NULL)
2528 ereport(ERROR,
2529 (errcode(ERRCODE_OUT_OF_MEMORY),
2530 errmsg("maximum number of prepared transactions reached"),
2531 errhint("Increase \"max_prepared_transactions\" (currently %d).",
2532 max_prepared_xacts)));
2533 gxact = TwoPhaseState->freeGXacts;
2534 TwoPhaseState->freeGXacts = gxact->next;
2536 gxact->prepared_at = hdr->prepared_at;
2537 gxact->prepare_start_lsn = start_lsn;
2538 gxact->prepare_end_lsn = end_lsn;
2539 gxact->xid = hdr->xid;
2540 gxact->owner = hdr->owner;
2541 gxact->locking_backend = INVALID_PROC_NUMBER;
2542 gxact->valid = false;
2543 gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
2544 gxact->inredo = true; /* yes, added in redo */
2545 strcpy(gxact->gid, gid);
2547 /* And insert it into the active array */
2548 Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2549 TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2551 if (origin_id != InvalidRepOriginId)
2553 /* recover apply progress */
2554 replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2555 false /* backward */ , false /* WAL */ );
2558 elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
2562 * PrepareRedoRemove
2564 * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2565 * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2567 * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2568 * is updated.
2570 void
2571 PrepareRedoRemove(TransactionId xid, bool giveWarning)
2573 GlobalTransaction gxact = NULL;
2574 int i;
2575 bool found = false;
2577 Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2578 Assert(RecoveryInProgress());
2580 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2582 gxact = TwoPhaseState->prepXacts[i];
2584 if (gxact->xid == xid)
2586 Assert(gxact->inredo);
2587 found = true;
2588 break;
2593 * Just leave if there is nothing, this is expected during WAL replay.
2595 if (!found)
2596 return;
2599 * And now we can clean up any files we may have left.
2601 elog(DEBUG2, "removing 2PC data for transaction %u", xid);
2602 if (gxact->ondisk)
2603 RemoveTwoPhaseFile(xid, giveWarning);
2604 RemoveGXact(gxact);
2608 * LookupGXact
2609 * Check if the prepared transaction with the given GID, lsn and timestamp
2610 * exists.
2612 * Note that we always compare with the LSN where prepare ends because that is
2613 * what is stored as origin_lsn in the 2PC file.
2615 * This function is primarily used to check if the prepared transaction
2616 * received from the upstream (remote node) already exists. Checking only GID
2617 * is not sufficient because a different prepared xact with the same GID can
2618 * exist on the same node. So, we are ensuring to match origin_lsn and
2619 * origin_timestamp of prepared xact to avoid the possibility of a match of
2620 * prepared xact from two different nodes.
2622 bool
2623 LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
2624 TimestampTz origin_prepare_timestamp)
2626 int i;
2627 bool found = false;
2629 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2630 for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2632 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2634 /* Ignore not-yet-valid GIDs. */
2635 if (gxact->valid && strcmp(gxact->gid, gid) == 0)
2637 char *buf;
2638 TwoPhaseFileHeader *hdr;
2641 * We are not expecting collisions of GXACTs (same gid) between
2642 * publisher and subscribers, so we perform all I/O while holding
2643 * TwoPhaseStateLock for simplicity.
2645 * To move the I/O out of the lock, we need to ensure that no
2646 * other backend commits the prepared xact in the meantime. We can
2647 * do this optimization if we encounter many collisions in GID
2648 * between publisher and subscriber.
2650 if (gxact->ondisk)
2651 buf = ReadTwoPhaseFile(gxact->xid, false);
2652 else
2654 Assert(gxact->prepare_start_lsn);
2655 XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
2658 hdr = (TwoPhaseFileHeader *) buf;
2660 if (hdr->origin_lsn == prepare_end_lsn &&
2661 hdr->origin_timestamp == origin_prepare_timestamp)
2663 found = true;
2664 pfree(buf);
2665 break;
2668 pfree(buf);
2671 LWLockRelease(TwoPhaseStateLock);
2672 return found;
2676 * TwoPhaseTransactionGid
2677 * Form the prepared transaction GID for two_phase transactions.
2679 * Return the GID in the supplied buffer.
2681 void
2682 TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
2684 Assert(OidIsValid(subid));
2686 if (!TransactionIdIsValid(xid))
2687 ereport(ERROR,
2688 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2689 errmsg_internal("invalid two-phase transaction ID")));
2691 snprintf(gid_res, szgid, "pg_gid_%u_%u", subid, xid);
2695 * IsTwoPhaseTransactionGidForSubid
2696 * Check whether the given GID (as formed by TwoPhaseTransactionGid) is
2697 * for the specified 'subid'.
2699 static bool
2700 IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid)
2702 int ret;
2703 Oid subid_from_gid;
2704 TransactionId xid_from_gid;
2705 char gid_tmp[GIDSIZE];
2707 /* Extract the subid and xid from the given GID */
2708 ret = sscanf(gid, "pg_gid_%u_%u", &subid_from_gid, &xid_from_gid);
2711 * Check that the given GID has expected format, and at least the subid
2712 * matches.
2714 if (ret != 2 || subid != subid_from_gid)
2715 return false;
2718 * Reconstruct a temporary GID based on the subid and xid extracted from
2719 * the given GID and check whether the temporary GID and the given GID
2720 * match.
2722 TwoPhaseTransactionGid(subid, xid_from_gid, gid_tmp, sizeof(gid_tmp));
2724 return strcmp(gid, gid_tmp) == 0;
2728 * LookupGXactBySubid
2729 * Check if the prepared transaction done by apply worker exists.
2731 bool
2732 LookupGXactBySubid(Oid subid)
2734 bool found = false;
2736 LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2737 for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
2739 GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2741 /* Ignore not-yet-valid GIDs. */
2742 if (gxact->valid &&
2743 IsTwoPhaseTransactionGidForSubid(subid, gxact->gid))
2745 found = true;
2746 break;
2749 LWLockRelease(TwoPhaseStateLock);
2751 return found;