Update obsolete nbtree array preprocessing comments.
[pgsql.git] / src / backend / access / transam / commit_ts.c
blob77e1899d7ad218501fa993301993e78657e93871
1 /*-------------------------------------------------------------------------
3 * commit_ts.c
4 * PostgreSQL commit timestamp manager
6 * This module is a pg_xact-like system that stores the commit timestamp
7 * for each transaction.
9 * XLOG interactions: this module generates an XLOG record whenever a new
10 * CommitTs page is initialized to zeroes. Other writes of CommitTS come
11 * from recording of transaction commit in xact.c, which generates its own
12 * XLOG records for these events and will re-perform the status update on
13 * redo; so we need make no additional XLOG entry here.
15 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
16 * Portions Copyright (c) 1994, Regents of the University of California
18 * src/backend/access/transam/commit_ts.c
20 *-------------------------------------------------------------------------
22 #include "postgres.h"
24 #include "access/commit_ts.h"
25 #include "access/htup_details.h"
26 #include "access/slru.h"
27 #include "access/transam.h"
28 #include "access/xloginsert.h"
29 #include "access/xlogutils.h"
30 #include "funcapi.h"
31 #include "miscadmin.h"
32 #include "storage/shmem.h"
33 #include "utils/fmgrprotos.h"
34 #include "utils/guc_hooks.h"
35 #include "utils/timestamp.h"
38 * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
39 * everywhere else in Postgres.
41 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
42 * CommitTs page numbering also wraps around at
43 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
44 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
45 * explicit notice of that fact in this module, except when comparing segment
46 * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
50 * We need 8+2 bytes per xact. Note that enlarging this struct might mean
51 * the largest possible file name is more than 5 chars long; see
52 * SlruScanDirectory.
54 typedef struct CommitTimestampEntry
56 TimestampTz time;
57 RepOriginId nodeid;
58 } CommitTimestampEntry;
60 #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
61 sizeof(RepOriginId))
63 #define COMMIT_TS_XACTS_PER_PAGE \
64 (BLCKSZ / SizeOfCommitTimestampEntry)
68 * Although we return an int64 the actual value can't currently exceed
69 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
71 static inline int64
72 TransactionIdToCTsPage(TransactionId xid)
74 return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
77 #define TransactionIdToCTsEntry(xid) \
78 ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
81 * Link to shared-memory data structures for CommitTs control
83 static SlruCtlData CommitTsCtlData;
85 #define CommitTsCtl (&CommitTsCtlData)
88 * We keep a cache of the last value set in shared memory.
90 * This is also good place to keep the activation status. We keep this
91 * separate from the GUC so that the standby can activate the module if the
92 * primary has it active independently of the value of the GUC.
94 * This is protected by CommitTsLock. In some places, we use commitTsActive
95 * without acquiring the lock; where this happens, a comment explains the
96 * rationale for it.
98 typedef struct CommitTimestampShared
100 TransactionId xidLastCommit;
101 CommitTimestampEntry dataLastCommit;
102 bool commitTsActive;
103 } CommitTimestampShared;
105 static CommitTimestampShared *commitTsShared;
108 /* GUC variable */
109 bool track_commit_timestamp;
111 static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
112 TransactionId *subxids, TimestampTz ts,
113 RepOriginId nodeid, int64 pageno);
114 static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
115 RepOriginId nodeid, int slotno);
116 static void error_commit_ts_disabled(void);
117 static int ZeroCommitTsPage(int64 pageno, bool writeXlog);
118 static bool CommitTsPagePrecedes(int64 page1, int64 page2);
119 static void ActivateCommitTs(void);
120 static void DeactivateCommitTs(void);
121 static void WriteZeroPageXlogRec(int64 pageno);
122 static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
125 * TransactionTreeSetCommitTsData
127 * Record the final commit timestamp of transaction entries in the commit log
128 * for a transaction and its subtransaction tree, as efficiently as possible.
130 * xid is the top level transaction id.
132 * subxids is an array of xids of length nsubxids, representing subtransactions
133 * in the tree of xid. In various cases nsubxids may be zero.
134 * The reason why tracking just the parent xid commit timestamp is not enough
135 * is that the subtrans SLRU does not stay valid across crashes (it's not
136 * permanent) so we need to keep the information about them here. If the
137 * subtrans implementation changes in the future, we might want to revisit the
138 * decision of storing timestamp info for each subxid.
140 void
141 TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
142 TransactionId *subxids, TimestampTz timestamp,
143 RepOriginId nodeid)
145 int i;
146 TransactionId headxid;
147 TransactionId newestXact;
150 * No-op if the module is not active.
152 * An unlocked read here is fine, because in a standby (the only place
153 * where the flag can change in flight) this routine is only called by the
154 * recovery process, which is also the only process which can change the
155 * flag.
157 if (!commitTsShared->commitTsActive)
158 return;
161 * Figure out the latest Xid in this batch: either the last subxid if
162 * there's any, otherwise the parent xid.
164 if (nsubxids > 0)
165 newestXact = subxids[nsubxids - 1];
166 else
167 newestXact = xid;
170 * We split the xids to set the timestamp to in groups belonging to the
171 * same SLRU page; the first element in each such set is its head. The
172 * first group has the main XID as the head; subsequent sets use the first
173 * subxid not on the previous page as head. This way, we only have to
174 * lock/modify each SLRU page once.
176 headxid = xid;
177 i = 0;
178 for (;;)
180 int64 pageno = TransactionIdToCTsPage(headxid);
181 int j;
183 for (j = i; j < nsubxids; j++)
185 if (TransactionIdToCTsPage(subxids[j]) != pageno)
186 break;
188 /* subxids[i..j] are on the same page as the head */
190 SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
191 pageno);
193 /* if we wrote out all subxids, we're done. */
194 if (j >= nsubxids)
195 break;
198 * Set the new head and skip over it, as well as over the subxids we
199 * just wrote.
201 headxid = subxids[j];
202 i = j + 1;
205 /* update the cached value in shared memory */
206 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
207 commitTsShared->xidLastCommit = xid;
208 commitTsShared->dataLastCommit.time = timestamp;
209 commitTsShared->dataLastCommit.nodeid = nodeid;
211 /* and move forwards our endpoint, if needed */
212 if (TransactionIdPrecedes(TransamVariables->newestCommitTsXid, newestXact))
213 TransamVariables->newestCommitTsXid = newestXact;
214 LWLockRelease(CommitTsLock);
218 * Record the commit timestamp of transaction entries in the commit log for all
219 * entries on a single page. Atomic only on this page.
221 static void
222 SetXidCommitTsInPage(TransactionId xid, int nsubxids,
223 TransactionId *subxids, TimestampTz ts,
224 RepOriginId nodeid, int64 pageno)
226 LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
227 int slotno;
228 int i;
230 LWLockAcquire(lock, LW_EXCLUSIVE);
232 slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
234 TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
235 for (i = 0; i < nsubxids; i++)
236 TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
238 CommitTsCtl->shared->page_dirty[slotno] = true;
240 LWLockRelease(lock);
244 * Sets the commit timestamp of a single transaction.
246 * Caller must hold the correct SLRU bank lock, will be held at exit
248 static void
249 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
250 RepOriginId nodeid, int slotno)
252 int entryno = TransactionIdToCTsEntry(xid);
253 CommitTimestampEntry entry;
255 Assert(TransactionIdIsNormal(xid));
257 entry.time = ts;
258 entry.nodeid = nodeid;
260 memcpy(CommitTsCtl->shared->page_buffer[slotno] +
261 SizeOfCommitTimestampEntry * entryno,
262 &entry, SizeOfCommitTimestampEntry);
266 * Interrogate the commit timestamp of a transaction.
268 * The return value indicates whether a commit timestamp record was found for
269 * the given xid. The timestamp value is returned in *ts (which may not be
270 * null), and the origin node for the Xid is returned in *nodeid, if it's not
271 * null.
273 bool
274 TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
275 RepOriginId *nodeid)
277 int64 pageno = TransactionIdToCTsPage(xid);
278 int entryno = TransactionIdToCTsEntry(xid);
279 int slotno;
280 CommitTimestampEntry entry;
281 TransactionId oldestCommitTsXid;
282 TransactionId newestCommitTsXid;
284 if (!TransactionIdIsValid(xid))
285 ereport(ERROR,
286 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
287 errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
288 else if (!TransactionIdIsNormal(xid))
290 /* frozen and bootstrap xids are always committed far in the past */
291 *ts = 0;
292 if (nodeid)
293 *nodeid = 0;
294 return false;
297 LWLockAcquire(CommitTsLock, LW_SHARED);
299 /* Error if module not enabled */
300 if (!commitTsShared->commitTsActive)
301 error_commit_ts_disabled();
304 * If we're asked for the cached value, return that. Otherwise, fall
305 * through to read from SLRU.
307 if (commitTsShared->xidLastCommit == xid)
309 *ts = commitTsShared->dataLastCommit.time;
310 if (nodeid)
311 *nodeid = commitTsShared->dataLastCommit.nodeid;
313 LWLockRelease(CommitTsLock);
314 return *ts != 0;
317 oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
318 newestCommitTsXid = TransamVariables->newestCommitTsXid;
319 /* neither is invalid, or both are */
320 Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
321 LWLockRelease(CommitTsLock);
324 * Return empty if the requested value is outside our valid range.
326 if (!TransactionIdIsValid(oldestCommitTsXid) ||
327 TransactionIdPrecedes(xid, oldestCommitTsXid) ||
328 TransactionIdPrecedes(newestCommitTsXid, xid))
330 *ts = 0;
331 if (nodeid)
332 *nodeid = InvalidRepOriginId;
333 return false;
336 /* lock is acquired by SimpleLruReadPage_ReadOnly */
337 slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
338 memcpy(&entry,
339 CommitTsCtl->shared->page_buffer[slotno] +
340 SizeOfCommitTimestampEntry * entryno,
341 SizeOfCommitTimestampEntry);
343 *ts = entry.time;
344 if (nodeid)
345 *nodeid = entry.nodeid;
347 LWLockRelease(SimpleLruGetBankLock(CommitTsCtl, pageno));
348 return *ts != 0;
352 * Return the Xid of the latest committed transaction. (As far as this module
353 * is concerned, anyway; it's up to the caller to ensure the value is useful
354 * for its purposes.)
356 * ts and nodeid are filled with the corresponding data; they can be passed
357 * as NULL if not wanted.
359 TransactionId
360 GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
362 TransactionId xid;
364 LWLockAcquire(CommitTsLock, LW_SHARED);
366 /* Error if module not enabled */
367 if (!commitTsShared->commitTsActive)
368 error_commit_ts_disabled();
370 xid = commitTsShared->xidLastCommit;
371 if (ts)
372 *ts = commitTsShared->dataLastCommit.time;
373 if (nodeid)
374 *nodeid = commitTsShared->dataLastCommit.nodeid;
375 LWLockRelease(CommitTsLock);
377 return xid;
380 static void
381 error_commit_ts_disabled(void)
383 ereport(ERROR,
384 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
385 errmsg("could not get commit timestamp data"),
386 RecoveryInProgress() ?
387 errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
388 "track_commit_timestamp") :
389 errhint("Make sure the configuration parameter \"%s\" is set.",
390 "track_commit_timestamp")));
394 * SQL-callable wrapper to obtain commit time of a transaction
396 Datum
397 pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
399 TransactionId xid = PG_GETARG_TRANSACTIONID(0);
400 TimestampTz ts;
401 bool found;
403 found = TransactionIdGetCommitTsData(xid, &ts, NULL);
405 if (!found)
406 PG_RETURN_NULL();
408 PG_RETURN_TIMESTAMPTZ(ts);
413 * pg_last_committed_xact
415 * SQL-callable wrapper to obtain some information about the latest
416 * committed transaction: transaction ID, timestamp and replication
417 * origin.
419 Datum
420 pg_last_committed_xact(PG_FUNCTION_ARGS)
422 TransactionId xid;
423 RepOriginId nodeid;
424 TimestampTz ts;
425 Datum values[3];
426 bool nulls[3];
427 TupleDesc tupdesc;
428 HeapTuple htup;
430 /* and construct a tuple with our data */
431 xid = GetLatestCommitTsData(&ts, &nodeid);
433 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
434 elog(ERROR, "return type must be a row type");
436 if (!TransactionIdIsNormal(xid))
438 memset(nulls, true, sizeof(nulls));
440 else
442 values[0] = TransactionIdGetDatum(xid);
443 nulls[0] = false;
445 values[1] = TimestampTzGetDatum(ts);
446 nulls[1] = false;
448 values[2] = ObjectIdGetDatum((Oid) nodeid);
449 nulls[2] = false;
452 htup = heap_form_tuple(tupdesc, values, nulls);
454 PG_RETURN_DATUM(HeapTupleGetDatum(htup));
458 * pg_xact_commit_timestamp_origin
460 * SQL-callable wrapper to obtain commit timestamp and replication origin
461 * of a given transaction.
463 Datum
464 pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
466 TransactionId xid = PG_GETARG_TRANSACTIONID(0);
467 RepOriginId nodeid;
468 TimestampTz ts;
469 Datum values[2];
470 bool nulls[2];
471 TupleDesc tupdesc;
472 HeapTuple htup;
473 bool found;
475 found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
477 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
478 elog(ERROR, "return type must be a row type");
480 if (!found)
482 memset(nulls, true, sizeof(nulls));
484 else
486 values[0] = TimestampTzGetDatum(ts);
487 nulls[0] = false;
489 values[1] = ObjectIdGetDatum((Oid) nodeid);
490 nulls[1] = false;
493 htup = heap_form_tuple(tupdesc, values, nulls);
495 PG_RETURN_DATUM(HeapTupleGetDatum(htup));
499 * Number of shared CommitTS buffers.
501 * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
502 * Otherwise just cap the configured amount to be between 16 and the maximum
503 * allowed.
505 static int
506 CommitTsShmemBuffers(void)
508 /* auto-tune based on shared buffers */
509 if (commit_timestamp_buffers == 0)
510 return SimpleLruAutotuneBuffers(512, 1024);
512 return Min(Max(16, commit_timestamp_buffers), SLRU_MAX_ALLOWED_BUFFERS);
516 * Shared memory sizing for CommitTs
518 Size
519 CommitTsShmemSize(void)
521 return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
522 sizeof(CommitTimestampShared);
526 * Initialize CommitTs at system startup (postmaster start or standalone
527 * backend)
529 void
530 CommitTsShmemInit(void)
532 bool found;
534 /* If auto-tuning is requested, now is the time to do it */
535 if (commit_timestamp_buffers == 0)
537 char buf[32];
539 snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
540 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
541 PGC_S_DYNAMIC_DEFAULT);
544 * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
545 * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
546 * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
547 * that and we must force the matter with PGC_S_OVERRIDE.
549 if (commit_timestamp_buffers == 0) /* failed to apply it? */
550 SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
551 PGC_S_OVERRIDE);
553 Assert(commit_timestamp_buffers != 0);
555 CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
556 SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
557 "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
558 LWTRANCHE_COMMITTS_SLRU,
559 SYNC_HANDLER_COMMIT_TS,
560 false);
561 SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
563 commitTsShared = ShmemInitStruct("CommitTs shared",
564 sizeof(CommitTimestampShared),
565 &found);
567 if (!IsUnderPostmaster)
569 Assert(!found);
571 commitTsShared->xidLastCommit = InvalidTransactionId;
572 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
573 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
574 commitTsShared->commitTsActive = false;
576 else
577 Assert(found);
581 * GUC check_hook for commit_timestamp_buffers
583 bool
584 check_commit_ts_buffers(int *newval, void **extra, GucSource source)
586 return check_slru_buffers("commit_timestamp_buffers", newval);
590 * This function must be called ONCE on system install.
592 * (The CommitTs directory is assumed to have been created by initdb, and
593 * CommitTsShmemInit must have been called already.)
595 void
596 BootStrapCommitTs(void)
599 * Nothing to do here at present, unlike most other SLRU modules; segments
600 * are created when the server is started with this module enabled. See
601 * ActivateCommitTs.
606 * Initialize (or reinitialize) a page of CommitTs to zeroes.
607 * If writeXlog is true, also emit an XLOG record saying we did this.
609 * The page is not actually written, just set up in shared memory.
610 * The slot number of the new page is returned.
612 * Control lock must be held at entry, and will be held at exit.
614 static int
615 ZeroCommitTsPage(int64 pageno, bool writeXlog)
617 int slotno;
619 slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
621 if (writeXlog)
622 WriteZeroPageXlogRec(pageno);
624 return slotno;
628 * This must be called ONCE during postmaster or standalone-backend startup,
629 * after StartupXLOG has initialized TransamVariables->nextXid.
631 void
632 StartupCommitTs(void)
634 ActivateCommitTs();
638 * This must be called ONCE during postmaster or standalone-backend startup,
639 * after recovery has finished.
641 void
642 CompleteCommitTsInitialization(void)
645 * If the feature is not enabled, turn it off for good. This also removes
646 * any leftover data.
648 * Conversely, we activate the module if the feature is enabled. This is
649 * necessary for primary and standby as the activation depends on the
650 * control file contents at the beginning of recovery or when a
651 * XLOG_PARAMETER_CHANGE is replayed.
653 if (!track_commit_timestamp)
654 DeactivateCommitTs();
655 else
656 ActivateCommitTs();
660 * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
661 * XLog record during recovery.
663 void
664 CommitTsParameterChange(bool newvalue, bool oldvalue)
667 * If the commit_ts module is disabled in this server and we get word from
668 * the primary server that it is enabled there, activate it so that we can
669 * replay future WAL records involving it; also mark it as active on
670 * pg_control. If the old value was already set, we already did this, so
671 * don't do anything.
673 * If the module is disabled in the primary, disable it here too, unless
674 * the module is enabled locally.
676 * Note this only runs in the recovery process, so an unlocked read is
677 * fine.
679 if (newvalue)
681 if (!commitTsShared->commitTsActive)
682 ActivateCommitTs();
684 else if (commitTsShared->commitTsActive)
685 DeactivateCommitTs();
689 * Activate this module whenever necessary.
690 * This must happen during postmaster or standalone-backend startup,
691 * or during WAL replay anytime the track_commit_timestamp setting is
692 * changed in the primary.
694 * The reason why this SLRU needs separate activation/deactivation functions is
695 * that it can be enabled/disabled during start and the activation/deactivation
696 * on the primary is propagated to the standby via replay. Other SLRUs don't
697 * have this property and they can be just initialized during normal startup.
699 * This is in charge of creating the currently active segment, if it's not
700 * already there. The reason for this is that the server might have been
701 * running with this module disabled for a while and thus might have skipped
702 * the normal creation point.
704 static void
705 ActivateCommitTs(void)
707 TransactionId xid;
708 int64 pageno;
710 /* If we've done this already, there's nothing to do */
711 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
712 if (commitTsShared->commitTsActive)
714 LWLockRelease(CommitTsLock);
715 return;
717 LWLockRelease(CommitTsLock);
719 xid = XidFromFullTransactionId(TransamVariables->nextXid);
720 pageno = TransactionIdToCTsPage(xid);
723 * Re-Initialize our idea of the latest page number.
725 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
728 * If CommitTs is enabled, but it wasn't in the previous server run, we
729 * need to set the oldest and newest values to the next Xid; that way, we
730 * will not try to read data that might not have been set.
732 * XXX does this have a problem if a server is started with commitTs
733 * enabled, then started with commitTs disabled, then restarted with it
734 * enabled again? It doesn't look like it does, because there should be a
735 * checkpoint that sets the value to InvalidTransactionId at end of
736 * recovery; and so any chance of injecting new transactions without
737 * CommitTs values would occur after the oldestCommitTsXid has been set to
738 * Invalid temporarily.
740 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
741 if (TransamVariables->oldestCommitTsXid == InvalidTransactionId)
743 TransamVariables->oldestCommitTsXid =
744 TransamVariables->newestCommitTsXid = ReadNextTransactionId();
746 LWLockRelease(CommitTsLock);
748 /* Create the current segment file, if necessary */
749 if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
751 LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
752 int slotno;
754 LWLockAcquire(lock, LW_EXCLUSIVE);
755 slotno = ZeroCommitTsPage(pageno, false);
756 SimpleLruWritePage(CommitTsCtl, slotno);
757 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
758 LWLockRelease(lock);
761 /* Change the activation status in shared memory. */
762 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
763 commitTsShared->commitTsActive = true;
764 LWLockRelease(CommitTsLock);
768 * Deactivate this module.
770 * This must be called when the track_commit_timestamp parameter is turned off.
771 * This happens during postmaster or standalone-backend startup, or during WAL
772 * replay.
774 * Resets CommitTs into invalid state to make sure we don't hand back
775 * possibly-invalid data; also removes segments of old data.
777 static void
778 DeactivateCommitTs(void)
781 * Cleanup the status in the shared memory.
783 * We reset everything in the commitTsShared record to prevent user from
784 * getting confusing data about last committed transaction on the standby
785 * when the module was activated repeatedly on the primary.
787 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
789 commitTsShared->commitTsActive = false;
790 commitTsShared->xidLastCommit = InvalidTransactionId;
791 TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
792 commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
794 TransamVariables->oldestCommitTsXid = InvalidTransactionId;
795 TransamVariables->newestCommitTsXid = InvalidTransactionId;
798 * Remove *all* files. This is necessary so that there are no leftover
799 * files; in the case where this feature is later enabled after running
800 * with it disabled for some time there may be a gap in the file sequence.
801 * (We can probably tolerate out-of-sequence files, as they are going to
802 * be overwritten anyway when we wrap around, but it seems better to be
803 * tidy.)
805 * Note that we do this with CommitTsLock acquired in exclusive mode. This
806 * is very heavy-handed, but since this routine can only be called in the
807 * replica and should happen very rarely, we don't worry too much about
808 * it. Note also that no process should be consulting this SLRU if we
809 * have just deactivated it.
811 (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
813 LWLockRelease(CommitTsLock);
817 * Perform a checkpoint --- either during shutdown, or on-the-fly
819 void
820 CheckPointCommitTs(void)
823 * Write dirty CommitTs pages to disk. This may result in sync requests
824 * queued for later handling by ProcessSyncRequests(), as part of the
825 * checkpoint.
827 SimpleLruWriteAll(CommitTsCtl, true);
831 * Make sure that CommitTs has room for a newly-allocated XID.
833 * NB: this is called while holding XidGenLock. We want it to be very fast
834 * most of the time; even when it's not so fast, no actual I/O need happen
835 * unless we're forced to write out a dirty CommitTs or xlog page to make room
836 * in shared memory.
838 * NB: the current implementation relies on track_commit_timestamp being
839 * PGC_POSTMASTER.
841 void
842 ExtendCommitTs(TransactionId newestXact)
844 int64 pageno;
845 LWLock *lock;
848 * Nothing to do if module not enabled. Note we do an unlocked read of
849 * the flag here, which is okay because this routine is only called from
850 * GetNewTransactionId, which is never called in a standby.
852 Assert(!InRecovery);
853 if (!commitTsShared->commitTsActive)
854 return;
857 * No work except at first XID of a page. But beware: just after
858 * wraparound, the first XID of page zero is FirstNormalTransactionId.
860 if (TransactionIdToCTsEntry(newestXact) != 0 &&
861 !TransactionIdEquals(newestXact, FirstNormalTransactionId))
862 return;
864 pageno = TransactionIdToCTsPage(newestXact);
866 lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
868 LWLockAcquire(lock, LW_EXCLUSIVE);
870 /* Zero the page and make an XLOG entry about it */
871 ZeroCommitTsPage(pageno, !InRecovery);
873 LWLockRelease(lock);
877 * Remove all CommitTs segments before the one holding the passed
878 * transaction ID.
880 * Note that we don't need to flush XLOG here.
882 void
883 TruncateCommitTs(TransactionId oldestXact)
885 int64 cutoffPage;
888 * The cutoff point is the start of the segment containing oldestXact. We
889 * pass the *page* containing oldestXact to SimpleLruTruncate.
891 cutoffPage = TransactionIdToCTsPage(oldestXact);
893 /* Check to see if there's any files that could be removed */
894 if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
895 &cutoffPage))
896 return; /* nothing to remove */
898 /* Write XLOG record */
899 WriteTruncateXlogRec(cutoffPage, oldestXact);
901 /* Now we can remove the old CommitTs segment(s) */
902 SimpleLruTruncate(CommitTsCtl, cutoffPage);
906 * Set the limit values between which commit TS can be consulted.
908 void
909 SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
912 * Be careful not to overwrite values that are either further into the
913 * "future" or signal a disabled committs.
915 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
916 if (TransamVariables->oldestCommitTsXid != InvalidTransactionId)
918 if (TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
919 TransamVariables->oldestCommitTsXid = oldestXact;
920 if (TransactionIdPrecedes(newestXact, TransamVariables->newestCommitTsXid))
921 TransamVariables->newestCommitTsXid = newestXact;
923 else
925 Assert(TransamVariables->newestCommitTsXid == InvalidTransactionId);
926 TransamVariables->oldestCommitTsXid = oldestXact;
927 TransamVariables->newestCommitTsXid = newestXact;
929 LWLockRelease(CommitTsLock);
933 * Move forwards the oldest commitTS value that can be consulted
935 void
936 AdvanceOldestCommitTsXid(TransactionId oldestXact)
938 LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
939 if (TransamVariables->oldestCommitTsXid != InvalidTransactionId &&
940 TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
941 TransamVariables->oldestCommitTsXid = oldestXact;
942 LWLockRelease(CommitTsLock);
947 * Decide whether a commitTS page number is "older" for truncation purposes.
948 * Analogous to CLOGPagePrecedes().
950 * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
951 * introduces differences compared to CLOG and the other SLRUs having (1 <<
952 * 31) % per_page == 0. This function never tests exactly
953 * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
954 * there are two possible counts of page boundaries between oldestXact and the
955 * latest XID assigned, depending on whether oldestXact is within the first
956 * 128 entries of its page. Since this function doesn't know the location of
957 * oldestXact within page2, it returns false for one page that actually is
958 * expendable. This is a wider (yet still negligible) version of the
959 * truncation opportunity that CLOGPagePrecedes() cannot recognize.
961 * For the sake of a worked example, number entries with decimal values such
962 * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
963 * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
964 * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
965 * because entry=2.85 is the border that toggles whether entries precede the
966 * last entry of the oldestXact page. While page 2 is expendable at
967 * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
969 static bool
970 CommitTsPagePrecedes(int64 page1, int64 page2)
972 TransactionId xid1;
973 TransactionId xid2;
975 xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
976 xid1 += FirstNormalTransactionId + 1;
977 xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
978 xid2 += FirstNormalTransactionId + 1;
980 return (TransactionIdPrecedes(xid1, xid2) &&
981 TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
986 * Write a ZEROPAGE xlog record
988 static void
989 WriteZeroPageXlogRec(int64 pageno)
991 XLogBeginInsert();
992 XLogRegisterData((char *) (&pageno), sizeof(pageno));
993 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
997 * Write a TRUNCATE xlog record
999 static void
1000 WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
1002 xl_commit_ts_truncate xlrec;
1004 xlrec.pageno = pageno;
1005 xlrec.oldestXid = oldestXid;
1007 XLogBeginInsert();
1008 XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
1009 (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
1013 * CommitTS resource manager's routines
1015 void
1016 commit_ts_redo(XLogReaderState *record)
1018 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1020 /* Backup blocks are not used in commit_ts records */
1021 Assert(!XLogRecHasAnyBlockRefs(record));
1023 if (info == COMMIT_TS_ZEROPAGE)
1025 int64 pageno;
1026 int slotno;
1027 LWLock *lock;
1029 memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
1031 lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
1032 LWLockAcquire(lock, LW_EXCLUSIVE);
1034 slotno = ZeroCommitTsPage(pageno, false);
1035 SimpleLruWritePage(CommitTsCtl, slotno);
1036 Assert(!CommitTsCtl->shared->page_dirty[slotno]);
1038 LWLockRelease(lock);
1040 else if (info == COMMIT_TS_TRUNCATE)
1042 xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
1044 AdvanceOldestCommitTsXid(trunc->oldestXid);
1047 * During XLOG replay, latest_page_number isn't set up yet; insert a
1048 * suitable value to bypass the sanity test in SimpleLruTruncate.
1050 pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1051 trunc->pageno);
1053 SimpleLruTruncate(CommitTsCtl, trunc->pageno);
1055 else
1056 elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1060 * Entrypoint for sync.c to sync commit_ts files.
1063 committssyncfiletag(const FileTag *ftag, char *path)
1065 return SlruSyncFileTag(CommitTsCtl, ftag, path);