1 /*-------------------------------------------------------------------------
4 * PostgreSQL commit timestamp manager
6 * This module is a pg_xact-like system that stores the commit timestamp
7 * for each transaction.
9 * XLOG interactions: this module generates an XLOG record whenever a new
10 * CommitTs page is initialized to zeroes. Other writes of CommitTS come
11 * from recording of transaction commit in xact.c, which generates its own
12 * XLOG records for these events and will re-perform the status update on
13 * redo; so we need make no additional XLOG entry here.
15 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
16 * Portions Copyright (c) 1994, Regents of the University of California
18 * src/backend/access/transam/commit_ts.c
20 *-------------------------------------------------------------------------
24 #include "access/commit_ts.h"
25 #include "access/htup_details.h"
26 #include "access/slru.h"
27 #include "access/transam.h"
28 #include "access/xloginsert.h"
29 #include "access/xlogutils.h"
31 #include "miscadmin.h"
32 #include "storage/shmem.h"
33 #include "utils/fmgrprotos.h"
34 #include "utils/guc_hooks.h"
35 #include "utils/timestamp.h"
38 * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
39 * everywhere else in Postgres.
41 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
42 * CommitTs page numbering also wraps around at
43 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
44 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
45 * explicit notice of that fact in this module, except when comparing segment
46 * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
50 * We need 8+2 bytes per xact. Note that enlarging this struct might mean
51 * the largest possible file name is more than 5 chars long; see
54 typedef struct CommitTimestampEntry
58 } CommitTimestampEntry
;
60 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
63 #define COMMIT_TS_XACTS_PER_PAGE \
64 (BLCKSZ / SizeOfCommitTimestampEntry)
68 * Although we return an int64 the actual value can't currently exceed
69 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
72 TransactionIdToCTsPage(TransactionId xid
)
74 return xid
/ (int64
) COMMIT_TS_XACTS_PER_PAGE
;
77 #define TransactionIdToCTsEntry(xid) \
78 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
81 * Link to shared-memory data structures for CommitTs control
83 static SlruCtlData CommitTsCtlData
;
85 #define CommitTsCtl (&CommitTsCtlData)
88 * We keep a cache of the last value set in shared memory.
90 * This is also good place to keep the activation status. We keep this
91 * separate from the GUC so that the standby can activate the module if the
92 * primary has it active independently of the value of the GUC.
94 * This is protected by CommitTsLock. In some places, we use commitTsActive
95 * without acquiring the lock; where this happens, a comment explains the
98 typedef struct CommitTimestampShared
100 TransactionId xidLastCommit
;
101 CommitTimestampEntry dataLastCommit
;
103 } CommitTimestampShared
;
105 static CommitTimestampShared
*commitTsShared
;
109 bool track_commit_timestamp
;
111 static void SetXidCommitTsInPage(TransactionId xid
, int nsubxids
,
112 TransactionId
*subxids
, TimestampTz ts
,
113 RepOriginId nodeid
, int64 pageno
);
114 static void TransactionIdSetCommitTs(TransactionId xid
, TimestampTz ts
,
115 RepOriginId nodeid
, int slotno
);
116 static void error_commit_ts_disabled(void);
117 static int ZeroCommitTsPage(int64 pageno
, bool writeXlog
);
118 static bool CommitTsPagePrecedes(int64 page1
, int64 page2
);
119 static void ActivateCommitTs(void);
120 static void DeactivateCommitTs(void);
121 static void WriteZeroPageXlogRec(int64 pageno
);
122 static void WriteTruncateXlogRec(int64 pageno
, TransactionId oldestXid
);
125 * TransactionTreeSetCommitTsData
127 * Record the final commit timestamp of transaction entries in the commit log
128 * for a transaction and its subtransaction tree, as efficiently as possible.
130 * xid is the top level transaction id.
132 * subxids is an array of xids of length nsubxids, representing subtransactions
133 * in the tree of xid. In various cases nsubxids may be zero.
134 * The reason why tracking just the parent xid commit timestamp is not enough
135 * is that the subtrans SLRU does not stay valid across crashes (it's not
136 * permanent) so we need to keep the information about them here. If the
137 * subtrans implementation changes in the future, we might want to revisit the
138 * decision of storing timestamp info for each subxid.
141 TransactionTreeSetCommitTsData(TransactionId xid
, int nsubxids
,
142 TransactionId
*subxids
, TimestampTz timestamp
,
146 TransactionId headxid
;
147 TransactionId newestXact
;
150 * No-op if the module is not active.
152 * An unlocked read here is fine, because in a standby (the only place
153 * where the flag can change in flight) this routine is only called by the
154 * recovery process, which is also the only process which can change the
157 if (!commitTsShared
->commitTsActive
)
161 * Figure out the latest Xid in this batch: either the last subxid if
162 * there's any, otherwise the parent xid.
165 newestXact
= subxids
[nsubxids
- 1];
170 * We split the xids to set the timestamp to in groups belonging to the
171 * same SLRU page; the first element in each such set is its head. The
172 * first group has the main XID as the head; subsequent sets use the first
173 * subxid not on the previous page as head. This way, we only have to
174 * lock/modify each SLRU page once.
180 int64 pageno
= TransactionIdToCTsPage(headxid
);
183 for (j
= i
; j
< nsubxids
; j
++)
185 if (TransactionIdToCTsPage(subxids
[j
]) != pageno
)
188 /* subxids[i..j] are on the same page as the head */
190 SetXidCommitTsInPage(headxid
, j
- i
, subxids
+ i
, timestamp
, nodeid
,
193 /* if we wrote out all subxids, we're done. */
198 * Set the new head and skip over it, as well as over the subxids we
201 headxid
= subxids
[j
];
205 /* update the cached value in shared memory */
206 LWLockAcquire(CommitTsLock
, LW_EXCLUSIVE
);
207 commitTsShared
->xidLastCommit
= xid
;
208 commitTsShared
->dataLastCommit
.time
= timestamp
;
209 commitTsShared
->dataLastCommit
.nodeid
= nodeid
;
211 /* and move forwards our endpoint, if needed */
212 if (TransactionIdPrecedes(TransamVariables
->newestCommitTsXid
, newestXact
))
213 TransamVariables
->newestCommitTsXid
= newestXact
;
214 LWLockRelease(CommitTsLock
);
218 * Record the commit timestamp of transaction entries in the commit log for all
219 * entries on a single page. Atomic only on this page.
222 SetXidCommitTsInPage(TransactionId xid
, int nsubxids
,
223 TransactionId
*subxids
, TimestampTz ts
,
224 RepOriginId nodeid
, int64 pageno
)
226 LWLock
*lock
= SimpleLruGetBankLock(CommitTsCtl
, pageno
);
230 LWLockAcquire(lock
, LW_EXCLUSIVE
);
232 slotno
= SimpleLruReadPage(CommitTsCtl
, pageno
, true, xid
);
234 TransactionIdSetCommitTs(xid
, ts
, nodeid
, slotno
);
235 for (i
= 0; i
< nsubxids
; i
++)
236 TransactionIdSetCommitTs(subxids
[i
], ts
, nodeid
, slotno
);
238 CommitTsCtl
->shared
->page_dirty
[slotno
] = true;
244 * Sets the commit timestamp of a single transaction.
246 * Caller must hold the correct SLRU bank lock, will be held at exit
249 TransactionIdSetCommitTs(TransactionId xid
, TimestampTz ts
,
250 RepOriginId nodeid
, int slotno
)
252 int entryno
= TransactionIdToCTsEntry(xid
);
253 CommitTimestampEntry entry
;
255 Assert(TransactionIdIsNormal(xid
));
258 entry
.nodeid
= nodeid
;
260 memcpy(CommitTsCtl
->shared
->page_buffer
[slotno
] +
261 SizeOfCommitTimestampEntry
* entryno
,
262 &entry
, SizeOfCommitTimestampEntry
);
266 * Interrogate the commit timestamp of a transaction.
268 * The return value indicates whether a commit timestamp record was found for
269 * the given xid. The timestamp value is returned in *ts (which may not be
270 * null), and the origin node for the Xid is returned in *nodeid, if it's not
274 TransactionIdGetCommitTsData(TransactionId xid
, TimestampTz
*ts
,
277 int64 pageno
= TransactionIdToCTsPage(xid
);
278 int entryno
= TransactionIdToCTsEntry(xid
);
280 CommitTimestampEntry entry
;
281 TransactionId oldestCommitTsXid
;
282 TransactionId newestCommitTsXid
;
284 if (!TransactionIdIsValid(xid
))
286 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
287 errmsg("cannot retrieve commit timestamp for transaction %u", xid
)));
288 else if (!TransactionIdIsNormal(xid
))
290 /* frozen and bootstrap xids are always committed far in the past */
297 LWLockAcquire(CommitTsLock
, LW_SHARED
);
299 /* Error if module not enabled */
300 if (!commitTsShared
->commitTsActive
)
301 error_commit_ts_disabled();
304 * If we're asked for the cached value, return that. Otherwise, fall
305 * through to read from SLRU.
307 if (commitTsShared
->xidLastCommit
== xid
)
309 *ts
= commitTsShared
->dataLastCommit
.time
;
311 *nodeid
= commitTsShared
->dataLastCommit
.nodeid
;
313 LWLockRelease(CommitTsLock
);
317 oldestCommitTsXid
= TransamVariables
->oldestCommitTsXid
;
318 newestCommitTsXid
= TransamVariables
->newestCommitTsXid
;
319 /* neither is invalid, or both are */
320 Assert(TransactionIdIsValid(oldestCommitTsXid
) == TransactionIdIsValid(newestCommitTsXid
));
321 LWLockRelease(CommitTsLock
);
324 * Return empty if the requested value is outside our valid range.
326 if (!TransactionIdIsValid(oldestCommitTsXid
) ||
327 TransactionIdPrecedes(xid
, oldestCommitTsXid
) ||
328 TransactionIdPrecedes(newestCommitTsXid
, xid
))
332 *nodeid
= InvalidRepOriginId
;
336 /* lock is acquired by SimpleLruReadPage_ReadOnly */
337 slotno
= SimpleLruReadPage_ReadOnly(CommitTsCtl
, pageno
, xid
);
339 CommitTsCtl
->shared
->page_buffer
[slotno
] +
340 SizeOfCommitTimestampEntry
* entryno
,
341 SizeOfCommitTimestampEntry
);
345 *nodeid
= entry
.nodeid
;
347 LWLockRelease(SimpleLruGetBankLock(CommitTsCtl
, pageno
));
352 * Return the Xid of the latest committed transaction. (As far as this module
353 * is concerned, anyway; it's up to the caller to ensure the value is useful
356 * ts and nodeid are filled with the corresponding data; they can be passed
357 * as NULL if not wanted.
360 GetLatestCommitTsData(TimestampTz
*ts
, RepOriginId
*nodeid
)
364 LWLockAcquire(CommitTsLock
, LW_SHARED
);
366 /* Error if module not enabled */
367 if (!commitTsShared
->commitTsActive
)
368 error_commit_ts_disabled();
370 xid
= commitTsShared
->xidLastCommit
;
372 *ts
= commitTsShared
->dataLastCommit
.time
;
374 *nodeid
= commitTsShared
->dataLastCommit
.nodeid
;
375 LWLockRelease(CommitTsLock
);
381 error_commit_ts_disabled(void)
384 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
385 errmsg("could not get commit timestamp data"),
386 RecoveryInProgress() ?
387 errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
388 "track_commit_timestamp") :
389 errhint("Make sure the configuration parameter \"%s\" is set.",
390 "track_commit_timestamp")));
394 * SQL-callable wrapper to obtain commit time of a transaction
397 pg_xact_commit_timestamp(PG_FUNCTION_ARGS
)
399 TransactionId xid
= PG_GETARG_TRANSACTIONID(0);
403 found
= TransactionIdGetCommitTsData(xid
, &ts
, NULL
);
408 PG_RETURN_TIMESTAMPTZ(ts
);
413 * pg_last_committed_xact
415 * SQL-callable wrapper to obtain some information about the latest
416 * committed transaction: transaction ID, timestamp and replication
420 pg_last_committed_xact(PG_FUNCTION_ARGS
)
430 /* and construct a tuple with our data */
431 xid
= GetLatestCommitTsData(&ts
, &nodeid
);
433 if (get_call_result_type(fcinfo
, NULL
, &tupdesc
) != TYPEFUNC_COMPOSITE
)
434 elog(ERROR
, "return type must be a row type");
436 if (!TransactionIdIsNormal(xid
))
438 memset(nulls
, true, sizeof(nulls
));
442 values
[0] = TransactionIdGetDatum(xid
);
445 values
[1] = TimestampTzGetDatum(ts
);
448 values
[2] = ObjectIdGetDatum((Oid
) nodeid
);
452 htup
= heap_form_tuple(tupdesc
, values
, nulls
);
454 PG_RETURN_DATUM(HeapTupleGetDatum(htup
));
458 * pg_xact_commit_timestamp_origin
460 * SQL-callable wrapper to obtain commit timestamp and replication origin
461 * of a given transaction.
464 pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS
)
466 TransactionId xid
= PG_GETARG_TRANSACTIONID(0);
475 found
= TransactionIdGetCommitTsData(xid
, &ts
, &nodeid
);
477 if (get_call_result_type(fcinfo
, NULL
, &tupdesc
) != TYPEFUNC_COMPOSITE
)
478 elog(ERROR
, "return type must be a row type");
482 memset(nulls
, true, sizeof(nulls
));
486 values
[0] = TimestampTzGetDatum(ts
);
489 values
[1] = ObjectIdGetDatum((Oid
) nodeid
);
493 htup
= heap_form_tuple(tupdesc
, values
, nulls
);
495 PG_RETURN_DATUM(HeapTupleGetDatum(htup
));
499 * Number of shared CommitTS buffers.
501 * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
502 * Otherwise just cap the configured amount to be between 16 and the maximum
506 CommitTsShmemBuffers(void)
508 /* auto-tune based on shared buffers */
509 if (commit_timestamp_buffers
== 0)
510 return SimpleLruAutotuneBuffers(512, 1024);
512 return Min(Max(16, commit_timestamp_buffers
), SLRU_MAX_ALLOWED_BUFFERS
);
516 * Shared memory sizing for CommitTs
519 CommitTsShmemSize(void)
521 return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
522 sizeof(CommitTimestampShared
);
526 * Initialize CommitTs at system startup (postmaster start or standalone
530 CommitTsShmemInit(void)
534 /* If auto-tuning is requested, now is the time to do it */
535 if (commit_timestamp_buffers
== 0)
539 snprintf(buf
, sizeof(buf
), "%d", CommitTsShmemBuffers());
540 SetConfigOption("commit_timestamp_buffers", buf
, PGC_POSTMASTER
,
541 PGC_S_DYNAMIC_DEFAULT
);
544 * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
545 * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
546 * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
547 * that and we must force the matter with PGC_S_OVERRIDE.
549 if (commit_timestamp_buffers
== 0) /* failed to apply it? */
550 SetConfigOption("commit_timestamp_buffers", buf
, PGC_POSTMASTER
,
553 Assert(commit_timestamp_buffers
!= 0);
555 CommitTsCtl
->PagePrecedes
= CommitTsPagePrecedes
;
556 SimpleLruInit(CommitTsCtl
, "commit_timestamp", CommitTsShmemBuffers(), 0,
557 "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER
,
558 LWTRANCHE_COMMITTS_SLRU
,
559 SYNC_HANDLER_COMMIT_TS
,
561 SlruPagePrecedesUnitTests(CommitTsCtl
, COMMIT_TS_XACTS_PER_PAGE
);
563 commitTsShared
= ShmemInitStruct("CommitTs shared",
564 sizeof(CommitTimestampShared
),
567 if (!IsUnderPostmaster
)
571 commitTsShared
->xidLastCommit
= InvalidTransactionId
;
572 TIMESTAMP_NOBEGIN(commitTsShared
->dataLastCommit
.time
);
573 commitTsShared
->dataLastCommit
.nodeid
= InvalidRepOriginId
;
574 commitTsShared
->commitTsActive
= false;
581 * GUC check_hook for commit_timestamp_buffers
584 check_commit_ts_buffers(int *newval
, void **extra
, GucSource source
)
586 return check_slru_buffers("commit_timestamp_buffers", newval
);
590 * This function must be called ONCE on system install.
592 * (The CommitTs directory is assumed to have been created by initdb, and
593 * CommitTsShmemInit must have been called already.)
596 BootStrapCommitTs(void)
599 * Nothing to do here at present, unlike most other SLRU modules; segments
600 * are created when the server is started with this module enabled. See
606 * Initialize (or reinitialize) a page of CommitTs to zeroes.
607 * If writeXlog is true, also emit an XLOG record saying we did this.
609 * The page is not actually written, just set up in shared memory.
610 * The slot number of the new page is returned.
612 * Control lock must be held at entry, and will be held at exit.
615 ZeroCommitTsPage(int64 pageno
, bool writeXlog
)
619 slotno
= SimpleLruZeroPage(CommitTsCtl
, pageno
);
622 WriteZeroPageXlogRec(pageno
);
628 * This must be called ONCE during postmaster or standalone-backend startup,
629 * after StartupXLOG has initialized TransamVariables->nextXid.
632 StartupCommitTs(void)
638 * This must be called ONCE during postmaster or standalone-backend startup,
639 * after recovery has finished.
642 CompleteCommitTsInitialization(void)
645 * If the feature is not enabled, turn it off for good. This also removes
648 * Conversely, we activate the module if the feature is enabled. This is
649 * necessary for primary and standby as the activation depends on the
650 * control file contents at the beginning of recovery or when a
651 * XLOG_PARAMETER_CHANGE is replayed.
653 if (!track_commit_timestamp
)
654 DeactivateCommitTs();
660 * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
661 * XLog record during recovery.
664 CommitTsParameterChange(bool newvalue
, bool oldvalue
)
667 * If the commit_ts module is disabled in this server and we get word from
668 * the primary server that it is enabled there, activate it so that we can
669 * replay future WAL records involving it; also mark it as active on
670 * pg_control. If the old value was already set, we already did this, so
673 * If the module is disabled in the primary, disable it here too, unless
674 * the module is enabled locally.
676 * Note this only runs in the recovery process, so an unlocked read is
681 if (!commitTsShared
->commitTsActive
)
684 else if (commitTsShared
->commitTsActive
)
685 DeactivateCommitTs();
689 * Activate this module whenever necessary.
690 * This must happen during postmaster or standalone-backend startup,
691 * or during WAL replay anytime the track_commit_timestamp setting is
692 * changed in the primary.
694 * The reason why this SLRU needs separate activation/deactivation functions is
695 * that it can be enabled/disabled during start and the activation/deactivation
696 * on the primary is propagated to the standby via replay. Other SLRUs don't
697 * have this property and they can be just initialized during normal startup.
699 * This is in charge of creating the currently active segment, if it's not
700 * already there. The reason for this is that the server might have been
701 * running with this module disabled for a while and thus might have skipped
702 * the normal creation point.
705 ActivateCommitTs(void)
710 /* If we've done this already, there's nothing to do */
711 LWLockAcquire(CommitTsLock
, LW_EXCLUSIVE
);
712 if (commitTsShared
->commitTsActive
)
714 LWLockRelease(CommitTsLock
);
717 LWLockRelease(CommitTsLock
);
719 xid
= XidFromFullTransactionId(TransamVariables
->nextXid
);
720 pageno
= TransactionIdToCTsPage(xid
);
723 * Re-Initialize our idea of the latest page number.
725 pg_atomic_write_u64(&CommitTsCtl
->shared
->latest_page_number
, pageno
);
728 * If CommitTs is enabled, but it wasn't in the previous server run, we
729 * need to set the oldest and newest values to the next Xid; that way, we
730 * will not try to read data that might not have been set.
732 * XXX does this have a problem if a server is started with commitTs
733 * enabled, then started with commitTs disabled, then restarted with it
734 * enabled again? It doesn't look like it does, because there should be a
735 * checkpoint that sets the value to InvalidTransactionId at end of
736 * recovery; and so any chance of injecting new transactions without
737 * CommitTs values would occur after the oldestCommitTsXid has been set to
738 * Invalid temporarily.
740 LWLockAcquire(CommitTsLock
, LW_EXCLUSIVE
);
741 if (TransamVariables
->oldestCommitTsXid
== InvalidTransactionId
)
743 TransamVariables
->oldestCommitTsXid
=
744 TransamVariables
->newestCommitTsXid
= ReadNextTransactionId();
746 LWLockRelease(CommitTsLock
);
748 /* Create the current segment file, if necessary */
749 if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl
, pageno
))
751 LWLock
*lock
= SimpleLruGetBankLock(CommitTsCtl
, pageno
);
754 LWLockAcquire(lock
, LW_EXCLUSIVE
);
755 slotno
= ZeroCommitTsPage(pageno
, false);
756 SimpleLruWritePage(CommitTsCtl
, slotno
);
757 Assert(!CommitTsCtl
->shared
->page_dirty
[slotno
]);
761 /* Change the activation status in shared memory. */
762 LWLockAcquire(CommitTsLock
, LW_EXCLUSIVE
);
763 commitTsShared
->commitTsActive
= true;
764 LWLockRelease(CommitTsLock
);
768 * Deactivate this module.
770 * This must be called when the track_commit_timestamp parameter is turned off.
771 * This happens during postmaster or standalone-backend startup, or during WAL
774 * Resets CommitTs into invalid state to make sure we don't hand back
775 * possibly-invalid data; also removes segments of old data.
778 DeactivateCommitTs(void)
781 * Cleanup the status in the shared memory.
783 * We reset everything in the commitTsShared record to prevent user from
784 * getting confusing data about last committed transaction on the standby
785 * when the module was activated repeatedly on the primary.
787 LWLockAcquire(CommitTsLock
, LW_EXCLUSIVE
);
789 commitTsShared
->commitTsActive
= false;
790 commitTsShared
->xidLastCommit
= InvalidTransactionId
;
791 TIMESTAMP_NOBEGIN(commitTsShared
->dataLastCommit
.time
);
792 commitTsShared
->dataLastCommit
.nodeid
= InvalidRepOriginId
;
794 TransamVariables
->oldestCommitTsXid
= InvalidTransactionId
;
795 TransamVariables
->newestCommitTsXid
= InvalidTransactionId
;
798 * Remove *all* files. This is necessary so that there are no leftover
799 * files; in the case where this feature is later enabled after running
800 * with it disabled for some time there may be a gap in the file sequence.
801 * (We can probably tolerate out-of-sequence files, as they are going to
802 * be overwritten anyway when we wrap around, but it seems better to be
805 * Note that we do this with CommitTsLock acquired in exclusive mode. This
806 * is very heavy-handed, but since this routine can only be called in the
807 * replica and should happen very rarely, we don't worry too much about
808 * it. Note also that no process should be consulting this SLRU if we
809 * have just deactivated it.
811 (void) SlruScanDirectory(CommitTsCtl
, SlruScanDirCbDeleteAll
, NULL
);
813 LWLockRelease(CommitTsLock
);
817 * Perform a checkpoint --- either during shutdown, or on-the-fly
820 CheckPointCommitTs(void)
823 * Write dirty CommitTs pages to disk. This may result in sync requests
824 * queued for later handling by ProcessSyncRequests(), as part of the
827 SimpleLruWriteAll(CommitTsCtl
, true);
831 * Make sure that CommitTs has room for a newly-allocated XID.
833 * NB: this is called while holding XidGenLock. We want it to be very fast
834 * most of the time; even when it's not so fast, no actual I/O need happen
835 * unless we're forced to write out a dirty CommitTs or xlog page to make room
838 * NB: the current implementation relies on track_commit_timestamp being
842 ExtendCommitTs(TransactionId newestXact
)
848 * Nothing to do if module not enabled. Note we do an unlocked read of
849 * the flag here, which is okay because this routine is only called from
850 * GetNewTransactionId, which is never called in a standby.
853 if (!commitTsShared
->commitTsActive
)
857 * No work except at first XID of a page. But beware: just after
858 * wraparound, the first XID of page zero is FirstNormalTransactionId.
860 if (TransactionIdToCTsEntry(newestXact
) != 0 &&
861 !TransactionIdEquals(newestXact
, FirstNormalTransactionId
))
864 pageno
= TransactionIdToCTsPage(newestXact
);
866 lock
= SimpleLruGetBankLock(CommitTsCtl
, pageno
);
868 LWLockAcquire(lock
, LW_EXCLUSIVE
);
870 /* Zero the page and make an XLOG entry about it */
871 ZeroCommitTsPage(pageno
, !InRecovery
);
877 * Remove all CommitTs segments before the one holding the passed
880 * Note that we don't need to flush XLOG here.
883 TruncateCommitTs(TransactionId oldestXact
)
888 * The cutoff point is the start of the segment containing oldestXact. We
889 * pass the *page* containing oldestXact to SimpleLruTruncate.
891 cutoffPage
= TransactionIdToCTsPage(oldestXact
);
893 /* Check to see if there's any files that could be removed */
894 if (!SlruScanDirectory(CommitTsCtl
, SlruScanDirCbReportPresence
,
896 return; /* nothing to remove */
898 /* Write XLOG record */
899 WriteTruncateXlogRec(cutoffPage
, oldestXact
);
901 /* Now we can remove the old CommitTs segment(s) */
902 SimpleLruTruncate(CommitTsCtl
, cutoffPage
);
906 * Set the limit values between which commit TS can be consulted.
909 SetCommitTsLimit(TransactionId oldestXact
, TransactionId newestXact
)
912 * Be careful not to overwrite values that are either further into the
913 * "future" or signal a disabled committs.
915 LWLockAcquire(CommitTsLock
, LW_EXCLUSIVE
);
916 if (TransamVariables
->oldestCommitTsXid
!= InvalidTransactionId
)
918 if (TransactionIdPrecedes(TransamVariables
->oldestCommitTsXid
, oldestXact
))
919 TransamVariables
->oldestCommitTsXid
= oldestXact
;
920 if (TransactionIdPrecedes(newestXact
, TransamVariables
->newestCommitTsXid
))
921 TransamVariables
->newestCommitTsXid
= newestXact
;
925 Assert(TransamVariables
->newestCommitTsXid
== InvalidTransactionId
);
926 TransamVariables
->oldestCommitTsXid
= oldestXact
;
927 TransamVariables
->newestCommitTsXid
= newestXact
;
929 LWLockRelease(CommitTsLock
);
933 * Move forwards the oldest commitTS value that can be consulted
936 AdvanceOldestCommitTsXid(TransactionId oldestXact
)
938 LWLockAcquire(CommitTsLock
, LW_EXCLUSIVE
);
939 if (TransamVariables
->oldestCommitTsXid
!= InvalidTransactionId
&&
940 TransactionIdPrecedes(TransamVariables
->oldestCommitTsXid
, oldestXact
))
941 TransamVariables
->oldestCommitTsXid
= oldestXact
;
942 LWLockRelease(CommitTsLock
);
947 * Decide whether a commitTS page number is "older" for truncation purposes.
948 * Analogous to CLOGPagePrecedes().
950 * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
951 * introduces differences compared to CLOG and the other SLRUs having (1 <<
952 * 31) % per_page == 0. This function never tests exactly
953 * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
954 * there are two possible counts of page boundaries between oldestXact and the
955 * latest XID assigned, depending on whether oldestXact is within the first
956 * 128 entries of its page. Since this function doesn't know the location of
957 * oldestXact within page2, it returns false for one page that actually is
958 * expendable. This is a wider (yet still negligible) version of the
959 * truncation opportunity that CLOGPagePrecedes() cannot recognize.
961 * For the sake of a worked example, number entries with decimal values such
962 * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
963 * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
964 * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
965 * because entry=2.85 is the border that toggles whether entries precede the
966 * last entry of the oldestXact page. While page 2 is expendable at
967 * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
970 CommitTsPagePrecedes(int64 page1
, int64 page2
)
975 xid1
= ((TransactionId
) page1
) * COMMIT_TS_XACTS_PER_PAGE
;
976 xid1
+= FirstNormalTransactionId
+ 1;
977 xid2
= ((TransactionId
) page2
) * COMMIT_TS_XACTS_PER_PAGE
;
978 xid2
+= FirstNormalTransactionId
+ 1;
980 return (TransactionIdPrecedes(xid1
, xid2
) &&
981 TransactionIdPrecedes(xid1
, xid2
+ COMMIT_TS_XACTS_PER_PAGE
- 1));
986 * Write a ZEROPAGE xlog record
989 WriteZeroPageXlogRec(int64 pageno
)
992 XLogRegisterData((char *) (&pageno
), sizeof(pageno
));
993 (void) XLogInsert(RM_COMMIT_TS_ID
, COMMIT_TS_ZEROPAGE
);
997 * Write a TRUNCATE xlog record
1000 WriteTruncateXlogRec(int64 pageno
, TransactionId oldestXid
)
1002 xl_commit_ts_truncate xlrec
;
1004 xlrec
.pageno
= pageno
;
1005 xlrec
.oldestXid
= oldestXid
;
1008 XLogRegisterData((char *) (&xlrec
), SizeOfCommitTsTruncate
);
1009 (void) XLogInsert(RM_COMMIT_TS_ID
, COMMIT_TS_TRUNCATE
);
1013 * CommitTS resource manager's routines
1016 commit_ts_redo(XLogReaderState
*record
)
1018 uint8 info
= XLogRecGetInfo(record
) & ~XLR_INFO_MASK
;
1020 /* Backup blocks are not used in commit_ts records */
1021 Assert(!XLogRecHasAnyBlockRefs(record
));
1023 if (info
== COMMIT_TS_ZEROPAGE
)
1029 memcpy(&pageno
, XLogRecGetData(record
), sizeof(pageno
));
1031 lock
= SimpleLruGetBankLock(CommitTsCtl
, pageno
);
1032 LWLockAcquire(lock
, LW_EXCLUSIVE
);
1034 slotno
= ZeroCommitTsPage(pageno
, false);
1035 SimpleLruWritePage(CommitTsCtl
, slotno
);
1036 Assert(!CommitTsCtl
->shared
->page_dirty
[slotno
]);
1038 LWLockRelease(lock
);
1040 else if (info
== COMMIT_TS_TRUNCATE
)
1042 xl_commit_ts_truncate
*trunc
= (xl_commit_ts_truncate
*) XLogRecGetData(record
);
1044 AdvanceOldestCommitTsXid(trunc
->oldestXid
);
1047 * During XLOG replay, latest_page_number isn't set up yet; insert a
1048 * suitable value to bypass the sanity test in SimpleLruTruncate.
1050 pg_atomic_write_u64(&CommitTsCtl
->shared
->latest_page_number
,
1053 SimpleLruTruncate(CommitTsCtl
, trunc
->pageno
);
1056 elog(PANIC
, "commit_ts_redo: unknown op code %u", info
);
1060 * Entrypoint for sync.c to sync commit_ts files.
1063 committssyncfiletag(const FileTag
*ftag
, char *path
)
1065 return SlruSyncFileTag(CommitTsCtl
, ftag
, path
);