1 /*-------------------------------------------------------------------------
4 * Prefetching support for recovery.
6 * Portions Copyright (c) 2022-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/backend/access/transam/xlogprefetcher.c
13 * This module provides a drop-in replacement for an XLogReader that tries to
14 * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15 * accessed in the near future are not already in the buffer pool, it initiates
16 * I/Os that might complete before the caller eventually needs the data. When
17 * referenced blocks are found in the buffer pool already, the buffer is
18 * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19 * avoid a second buffer mapping table lookup.
21 * Currently, only the main fork is considered for prefetching. Currently,
22 * prefetching is only effective on systems where PrefetchBuffer() does
23 * something useful (mainly Linux).
25 *-------------------------------------------------------------------------
30 #include "access/xlogprefetcher.h"
31 #include "access/xlogreader.h"
32 #include "catalog/pg_control.h"
33 #include "catalog/storage_xlog.h"
34 #include "commands/dbcommands_xlog.h"
36 #include "miscadmin.h"
37 #include "port/atomics.h"
38 #include "storage/bufmgr.h"
39 #include "storage/shmem.h"
40 #include "storage/smgr.h"
41 #include "utils/fmgrprotos.h"
42 #include "utils/guc_hooks.h"
43 #include "utils/hsearch.h"
44 #include "utils/timestamp.h"
47 * Every time we process this much WAL, we'll update the values in
48 * pg_stat_recovery_prefetch.
50 #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
53 * To detect repeated access to the same block and skip useless extra system
54 * calls, we remember a small window of recently prefetched blocks.
56 #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
59 * When maintenance_io_concurrency is not saturated, we're prepared to look
60 * ahead up to N times that number of block references.
62 #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
64 /* Define to log internal debugging messages. */
65 /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
68 int recovery_prefetch
= RECOVERY_PREFETCH_TRY
;
71 #define RecoveryPrefetchEnabled() \
72 (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
73 maintenance_io_concurrency > 0)
75 #define RecoveryPrefetchEnabled() false
78 static int XLogPrefetchReconfigureCount
= 0;
81 * Enum used to report whether an IO should be started.
88 } LsnReadQueueNextStatus
;
91 * Type of callback that can decide which block to prefetch next. For now
94 typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun
) (uintptr_t lrq_private
,
98 * A simple circular queue of LSNs, using to control the number of
99 * (potentially) inflight IOs. This stands in for a later more general IO
100 * control mechanism, which is why it has the apparently unnecessary
101 * indirection through a function pointer.
103 typedef struct LsnReadQueue
105 LsnReadQueueNextFun next
;
106 uintptr_t lrq_private
;
117 } queue
[FLEXIBLE_ARRAY_MEMBER
];
121 * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
122 * blocks that will be soon be referenced, to try to avoid IO stalls.
124 struct XLogPrefetcher
126 /* WAL reader and current reading state. */
127 XLogReaderState
*reader
;
128 DecodedXLogRecord
*record
;
131 /* When to publish stats. */
132 XLogRecPtr next_stats_shm_lsn
;
134 /* Book-keeping to avoid accessing blocks that don't exist yet. */
136 dlist_head filter_queue
;
138 /* Book-keeping to avoid repeat prefetches. */
139 RelFileLocator recent_rlocator
[XLOGPREFETCHER_SEQ_WINDOW_SIZE
];
140 BlockNumber recent_block
[XLOGPREFETCHER_SEQ_WINDOW_SIZE
];
143 /* Book-keeping to disable prefetching temporarily. */
144 XLogRecPtr no_readahead_until
;
146 /* IO depth manager. */
147 LsnReadQueue
*streaming_read
;
149 XLogRecPtr begin_ptr
;
151 int reconfigure_count
;
155 * A temporary filter used to track block ranges that haven't been created
156 * yet, whole relations that haven't been created yet, and whole relations
157 * that (we assume) have already been dropped, or will be created by bulk WAL
160 typedef struct XLogPrefetcherFilter
162 RelFileLocator rlocator
;
163 XLogRecPtr filter_until_replayed
;
164 BlockNumber filter_from_block
;
166 } XLogPrefetcherFilter
;
169 * Counters exposed in shared memory for pg_stat_recovery_prefetch.
171 typedef struct XLogPrefetchStats
173 pg_atomic_uint64 reset_time
; /* Time of last reset. */
174 pg_atomic_uint64 prefetch
; /* Prefetches initiated. */
175 pg_atomic_uint64 hit
; /* Blocks already in cache. */
176 pg_atomic_uint64 skip_init
; /* Zero-inited blocks skipped. */
177 pg_atomic_uint64 skip_new
; /* New/missing blocks filtered. */
178 pg_atomic_uint64 skip_fpw
; /* FPWs skipped. */
179 pg_atomic_uint64 skip_rep
; /* Repeat accesses skipped. */
182 int wal_distance
; /* Number of WAL bytes ahead. */
183 int block_distance
; /* Number of block references ahead. */
184 int io_depth
; /* Number of I/Os in progress. */
187 static inline void XLogPrefetcherAddFilter(XLogPrefetcher
*prefetcher
,
188 RelFileLocator rlocator
,
191 static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher
*prefetcher
,
192 RelFileLocator rlocator
,
193 BlockNumber blockno
);
194 static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher
*prefetcher
,
195 XLogRecPtr replaying_lsn
);
196 static LsnReadQueueNextStatus
XLogPrefetcherNextBlock(uintptr_t pgsr_private
,
199 static XLogPrefetchStats
*SharedStats
;
201 static inline LsnReadQueue
*
202 lrq_alloc(uint32 max_distance
,
204 uintptr_t lrq_private
,
205 LsnReadQueueNextFun next
)
210 Assert(max_distance
>= max_inflight
);
212 size
= max_distance
+ 1; /* full ring buffer has a gap */
213 lrq
= palloc(offsetof(LsnReadQueue
, queue
) + sizeof(lrq
->queue
[0]) * size
);
214 lrq
->lrq_private
= lrq_private
;
215 lrq
->max_inflight
= max_inflight
;
227 lrq_free(LsnReadQueue
*lrq
)
233 lrq_inflight(LsnReadQueue
*lrq
)
235 return lrq
->inflight
;
239 lrq_completed(LsnReadQueue
*lrq
)
241 return lrq
->completed
;
245 lrq_prefetch(LsnReadQueue
*lrq
)
247 /* Try to start as many IOs as we can within our limits. */
248 while (lrq
->inflight
< lrq
->max_inflight
&&
249 lrq
->inflight
+ lrq
->completed
< lrq
->size
- 1)
251 Assert(((lrq
->head
+ 1) % lrq
->size
) != lrq
->tail
);
252 switch (lrq
->next(lrq
->lrq_private
, &lrq
->queue
[lrq
->head
].lsn
))
257 lrq
->queue
[lrq
->head
].io
= true;
261 lrq
->queue
[lrq
->head
].io
= false;
266 if (lrq
->head
== lrq
->size
)
272 lrq_complete_lsn(LsnReadQueue
*lrq
, XLogRecPtr lsn
)
275 * We know that LSNs before 'lsn' have been replayed, so we can now assume
276 * that any IOs that were started before then have finished.
278 while (lrq
->tail
!= lrq
->head
&&
279 lrq
->queue
[lrq
->tail
].lsn
< lsn
)
281 if (lrq
->queue
[lrq
->tail
].io
)
286 if (lrq
->tail
== lrq
->size
)
289 if (RecoveryPrefetchEnabled())
294 XLogPrefetchShmemSize(void)
296 return sizeof(XLogPrefetchStats
);
300 * Reset all counters to zero.
303 XLogPrefetchResetStats(void)
305 pg_atomic_write_u64(&SharedStats
->reset_time
, GetCurrentTimestamp());
306 pg_atomic_write_u64(&SharedStats
->prefetch
, 0);
307 pg_atomic_write_u64(&SharedStats
->hit
, 0);
308 pg_atomic_write_u64(&SharedStats
->skip_init
, 0);
309 pg_atomic_write_u64(&SharedStats
->skip_new
, 0);
310 pg_atomic_write_u64(&SharedStats
->skip_fpw
, 0);
311 pg_atomic_write_u64(&SharedStats
->skip_rep
, 0);
315 XLogPrefetchShmemInit(void)
319 SharedStats
= (XLogPrefetchStats
*)
320 ShmemInitStruct("XLogPrefetchStats",
321 sizeof(XLogPrefetchStats
),
326 pg_atomic_init_u64(&SharedStats
->reset_time
, GetCurrentTimestamp());
327 pg_atomic_init_u64(&SharedStats
->prefetch
, 0);
328 pg_atomic_init_u64(&SharedStats
->hit
, 0);
329 pg_atomic_init_u64(&SharedStats
->skip_init
, 0);
330 pg_atomic_init_u64(&SharedStats
->skip_new
, 0);
331 pg_atomic_init_u64(&SharedStats
->skip_fpw
, 0);
332 pg_atomic_init_u64(&SharedStats
->skip_rep
, 0);
337 * Called when any GUC is changed that affects prefetching.
340 XLogPrefetchReconfigure(void)
342 XLogPrefetchReconfigureCount
++;
346 * Increment a counter in shared memory. This is equivalent to *counter++ on a
347 * plain uint64 without any memory barrier or locking, except on platforms
348 * where readers can't read uint64 without possibly observing a torn value.
351 XLogPrefetchIncrement(pg_atomic_uint64
*counter
)
353 Assert(AmStartupProcess() || !IsUnderPostmaster
);
354 pg_atomic_write_u64(counter
, pg_atomic_read_u64(counter
) + 1);
358 * Create a prefetcher that is ready to begin prefetching blocks referenced by
362 XLogPrefetcherAllocate(XLogReaderState
*reader
)
364 XLogPrefetcher
*prefetcher
;
367 prefetcher
= palloc0(sizeof(XLogPrefetcher
));
368 prefetcher
->reader
= reader
;
370 ctl
.keysize
= sizeof(RelFileLocator
);
371 ctl
.entrysize
= sizeof(XLogPrefetcherFilter
);
372 prefetcher
->filter_table
= hash_create("XLogPrefetcherFilterTable", 1024,
373 &ctl
, HASH_ELEM
| HASH_BLOBS
);
374 dlist_init(&prefetcher
->filter_queue
);
376 SharedStats
->wal_distance
= 0;
377 SharedStats
->block_distance
= 0;
378 SharedStats
->io_depth
= 0;
380 /* First usage will cause streaming_read to be allocated. */
381 prefetcher
->reconfigure_count
= XLogPrefetchReconfigureCount
- 1;
387 * Destroy a prefetcher and release all resources.
390 XLogPrefetcherFree(XLogPrefetcher
*prefetcher
)
392 lrq_free(prefetcher
->streaming_read
);
393 hash_destroy(prefetcher
->filter_table
);
398 * Provide access to the reader.
401 XLogPrefetcherGetReader(XLogPrefetcher
*prefetcher
)
403 return prefetcher
->reader
;
407 * Update the statistics visible in the pg_stat_recovery_prefetch view.
410 XLogPrefetcherComputeStats(XLogPrefetcher
*prefetcher
)
417 /* How far ahead of replay are we now? */
418 if (prefetcher
->reader
->decode_queue_tail
)
421 prefetcher
->reader
->decode_queue_tail
->lsn
-
422 prefetcher
->reader
->decode_queue_head
->lsn
;
429 /* How many IOs are currently in flight and completed? */
430 io_depth
= lrq_inflight(prefetcher
->streaming_read
);
431 completed
= lrq_completed(prefetcher
->streaming_read
);
433 /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
434 SharedStats
->io_depth
= io_depth
;
435 SharedStats
->block_distance
= io_depth
+ completed
;
436 SharedStats
->wal_distance
= wal_distance
;
438 prefetcher
->next_stats_shm_lsn
=
439 prefetcher
->reader
->ReadRecPtr
+ XLOGPREFETCHER_STATS_DISTANCE
;
443 * A callback that examines the next block reference in the WAL, and possibly
444 * starts an IO so that a later read will be fast.
446 * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
448 * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
449 * that isn't in the buffer pool, and the kernel has been asked to start
450 * reading it to make a future read system call faster. An LSN is written to
451 * *lsn, and the I/O will be considered to have completed once that LSN is
454 * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
455 * that it was already in the buffer pool, or we decided for various reasons
458 static LsnReadQueueNextStatus
459 XLogPrefetcherNextBlock(uintptr_t pgsr_private
, XLogRecPtr
*lsn
)
461 XLogPrefetcher
*prefetcher
= (XLogPrefetcher
*) pgsr_private
;
462 XLogReaderState
*reader
= prefetcher
->reader
;
463 XLogRecPtr replaying_lsn
= reader
->ReadRecPtr
;
466 * We keep track of the record and block we're up to between calls with
467 * prefetcher->record and prefetcher->next_block_id.
471 DecodedXLogRecord
*record
;
473 /* Try to read a new future record, if we don't already have one. */
474 if (prefetcher
->record
== NULL
)
479 * If there are already records or an error queued up that could
480 * be replayed, we don't want to block here. Otherwise, it's OK
481 * to block waiting for more data: presumably the caller has
482 * nothing else to do.
484 nonblocking
= XLogReaderHasQueuedRecordOrError(reader
);
486 /* Readahead is disabled until we replay past a certain point. */
487 if (nonblocking
&& replaying_lsn
<= prefetcher
->no_readahead_until
)
488 return LRQ_NEXT_AGAIN
;
490 record
= XLogReadAhead(prefetcher
->reader
, nonblocking
);
494 * We can't read any more, due to an error or lack of data in
495 * nonblocking mode. Don't try to read ahead again until
496 * we've replayed everything already decoded.
498 if (nonblocking
&& prefetcher
->reader
->decode_queue_tail
)
499 prefetcher
->no_readahead_until
=
500 prefetcher
->reader
->decode_queue_tail
->lsn
;
502 return LRQ_NEXT_AGAIN
;
506 * If prefetching is disabled, we don't need to analyze the record
507 * or issue any prefetches. We just need to cause one record to
510 if (!RecoveryPrefetchEnabled())
512 *lsn
= InvalidXLogRecPtr
;
513 return LRQ_NEXT_NO_IO
;
516 /* We have a new record to process. */
517 prefetcher
->record
= record
;
518 prefetcher
->next_block_id
= 0;
522 /* Continue to process from last call, or last loop. */
523 record
= prefetcher
->record
;
527 * Check for operations that require us to filter out block ranges, or
528 * pause readahead completely.
530 if (replaying_lsn
< record
->lsn
)
532 uint8 rmid
= record
->header
.xl_rmid
;
533 uint8 record_type
= record
->header
.xl_info
& ~XLR_INFO_MASK
;
535 if (rmid
== RM_XLOG_ID
)
537 if (record_type
== XLOG_CHECKPOINT_SHUTDOWN
||
538 record_type
== XLOG_END_OF_RECOVERY
)
541 * These records might change the TLI. Avoid potential
542 * bugs if we were to allow "read TLI" and "replay TLI" to
543 * differ without more analysis.
545 prefetcher
->no_readahead_until
= record
->lsn
;
547 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
548 elog(XLOGPREFETCHER_DEBUG_LEVEL
,
549 "suppressing all readahead until %X/%X is replayed due to possible TLI change",
550 LSN_FORMAT_ARGS(record
->lsn
));
553 /* Fall through so we move past this record. */
556 else if (rmid
== RM_DBASE_ID
)
559 * When databases are created with the file-copy strategy,
560 * there are no WAL records to tell us about the creation of
561 * individual relations.
563 if (record_type
== XLOG_DBASE_CREATE_FILE_COPY
)
565 xl_dbase_create_file_copy_rec
*xlrec
=
566 (xl_dbase_create_file_copy_rec
*) record
->main_data
;
567 RelFileLocator rlocator
=
568 {InvalidOid
, xlrec
->db_id
, InvalidRelFileNumber
};
571 * Don't try to prefetch anything in this database until
572 * it has been created, or we might confuse the blocks of
573 * different generations, if a database OID or
574 * relfilenumber is reused. It's also more efficient than
575 * discovering that relations don't exist on disk yet with
578 XLogPrefetcherAddFilter(prefetcher
, rlocator
, 0, record
->lsn
);
580 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
581 elog(XLOGPREFETCHER_DEBUG_LEVEL
,
582 "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
584 LSN_FORMAT_ARGS(record
->lsn
));
588 else if (rmid
== RM_SMGR_ID
)
590 if (record_type
== XLOG_SMGR_CREATE
)
592 xl_smgr_create
*xlrec
= (xl_smgr_create
*)
595 if (xlrec
->forkNum
== MAIN_FORKNUM
)
598 * Don't prefetch anything for this whole relation
599 * until it has been created. Otherwise we might
600 * confuse the blocks of different generations, if a
601 * relfilenumber is reused. This also avoids the need
602 * to discover the problem via extra syscalls that
605 XLogPrefetcherAddFilter(prefetcher
, xlrec
->rlocator
, 0,
608 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
609 elog(XLOGPREFETCHER_DEBUG_LEVEL
,
610 "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
611 xlrec
->rlocator
.spcOid
,
612 xlrec
->rlocator
.dbOid
,
613 xlrec
->rlocator
.relNumber
,
614 LSN_FORMAT_ARGS(record
->lsn
));
618 else if (record_type
== XLOG_SMGR_TRUNCATE
)
620 xl_smgr_truncate
*xlrec
= (xl_smgr_truncate
*)
624 * Don't consider prefetching anything in the truncated
625 * range until the truncation has been performed.
627 XLogPrefetcherAddFilter(prefetcher
, xlrec
->rlocator
,
631 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
632 elog(XLOGPREFETCHER_DEBUG_LEVEL
,
633 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
634 xlrec
->rlocator
.spcOid
,
635 xlrec
->rlocator
.dbOid
,
636 xlrec
->rlocator
.relNumber
,
638 LSN_FORMAT_ARGS(record
->lsn
));
644 /* Scan the block references, starting where we left off last time. */
645 while (prefetcher
->next_block_id
<= record
->max_block_id
)
647 int block_id
= prefetcher
->next_block_id
++;
648 DecodedBkpBlock
*block
= &record
->blocks
[block_id
];
650 PrefetchBufferResult result
;
655 Assert(!BufferIsValid(block
->prefetch_buffer
));
658 * Record the LSN of this record. When it's replayed,
659 * LsnReadQueue will consider any IOs submitted for earlier LSNs
664 /* We don't try to prefetch anything but the main fork for now. */
665 if (block
->forknum
!= MAIN_FORKNUM
)
667 return LRQ_NEXT_NO_IO
;
671 * If there is a full page image attached, we won't be reading the
672 * page, so don't bother trying to prefetch.
674 if (block
->has_image
)
676 XLogPrefetchIncrement(&SharedStats
->skip_fpw
);
677 return LRQ_NEXT_NO_IO
;
680 /* There is no point in reading a page that will be zeroed. */
681 if (block
->flags
& BKPBLOCK_WILL_INIT
)
683 XLogPrefetchIncrement(&SharedStats
->skip_init
);
684 return LRQ_NEXT_NO_IO
;
687 /* Should we skip prefetching this block due to a filter? */
688 if (XLogPrefetcherIsFiltered(prefetcher
, block
->rlocator
, block
->blkno
))
690 XLogPrefetchIncrement(&SharedStats
->skip_new
);
691 return LRQ_NEXT_NO_IO
;
694 /* There is no point in repeatedly prefetching the same block. */
695 for (int i
= 0; i
< XLOGPREFETCHER_SEQ_WINDOW_SIZE
; ++i
)
697 if (block
->blkno
== prefetcher
->recent_block
[i
] &&
698 RelFileLocatorEquals(block
->rlocator
, prefetcher
->recent_rlocator
[i
]))
701 * XXX If we also remembered where it was, we could set
702 * recent_buffer so that recovery could skip smgropen()
703 * and a buffer table lookup.
705 XLogPrefetchIncrement(&SharedStats
->skip_rep
);
706 return LRQ_NEXT_NO_IO
;
709 prefetcher
->recent_rlocator
[prefetcher
->recent_idx
] = block
->rlocator
;
710 prefetcher
->recent_block
[prefetcher
->recent_idx
] = block
->blkno
;
711 prefetcher
->recent_idx
=
712 (prefetcher
->recent_idx
+ 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE
;
715 * We could try to have a fast path for repeated references to the
716 * same relation (with some scheme to handle invalidations
717 * safely), but for now we'll call smgropen() every time.
719 reln
= smgropen(block
->rlocator
, INVALID_PROC_NUMBER
);
722 * If the relation file doesn't exist on disk, for example because
723 * we're replaying after a crash and the file will be created and
724 * then unlinked by WAL that hasn't been replayed yet, suppress
725 * further prefetching in the relation until this record is
728 if (!smgrexists(reln
, MAIN_FORKNUM
))
730 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
731 elog(XLOGPREFETCHER_DEBUG_LEVEL
,
732 "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
733 reln
->smgr_rlocator
.locator
.spcOid
,
734 reln
->smgr_rlocator
.locator
.dbOid
,
735 reln
->smgr_rlocator
.locator
.relNumber
,
736 LSN_FORMAT_ARGS(record
->lsn
));
738 XLogPrefetcherAddFilter(prefetcher
, block
->rlocator
, 0,
740 XLogPrefetchIncrement(&SharedStats
->skip_new
);
741 return LRQ_NEXT_NO_IO
;
745 * If the relation isn't big enough to contain the referenced
746 * block yet, suppress prefetching of this block and higher until
747 * this record is replayed.
749 if (block
->blkno
>= smgrnblocks(reln
, block
->forknum
))
751 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
752 elog(XLOGPREFETCHER_DEBUG_LEVEL
,
753 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
754 reln
->smgr_rlocator
.locator
.spcOid
,
755 reln
->smgr_rlocator
.locator
.dbOid
,
756 reln
->smgr_rlocator
.locator
.relNumber
,
758 LSN_FORMAT_ARGS(record
->lsn
));
760 XLogPrefetcherAddFilter(prefetcher
, block
->rlocator
, block
->blkno
,
762 XLogPrefetchIncrement(&SharedStats
->skip_new
);
763 return LRQ_NEXT_NO_IO
;
766 /* Try to initiate prefetching. */
767 result
= PrefetchSharedBuffer(reln
, block
->forknum
, block
->blkno
);
768 if (BufferIsValid(result
.recent_buffer
))
770 /* Cache hit, nothing to do. */
771 XLogPrefetchIncrement(&SharedStats
->hit
);
772 block
->prefetch_buffer
= result
.recent_buffer
;
773 return LRQ_NEXT_NO_IO
;
775 else if (result
.initiated_io
)
777 /* Cache miss, I/O (presumably) started. */
778 XLogPrefetchIncrement(&SharedStats
->prefetch
);
779 block
->prefetch_buffer
= InvalidBuffer
;
782 else if ((io_direct_flags
& IO_DIRECT_DATA
) == 0)
785 * This shouldn't be possible, because we already determined
786 * that the relation exists on disk and is big enough.
787 * Something is wrong with the cache invalidation for
788 * smgrexists(), smgrnblocks(), or the file was unlinked or
789 * truncated beneath our feet?
792 "could not prefetch relation %u/%u/%u block %u",
793 reln
->smgr_rlocator
.locator
.spcOid
,
794 reln
->smgr_rlocator
.locator
.dbOid
,
795 reln
->smgr_rlocator
.locator
.relNumber
,
801 * Several callsites need to be able to read exactly one record
802 * without any internal readahead. Examples: xlog.c reading
803 * checkpoint records with emode set to PANIC, which might otherwise
804 * cause XLogPageRead() to panic on some future page, and xlog.c
805 * determining where to start writing WAL next, which depends on the
806 * contents of the reader's internal buffer after reading one record.
807 * Therefore, don't even think about prefetching until the first
808 * record after XLogPrefetcherBeginRead() has been consumed.
810 if (prefetcher
->reader
->decode_queue_tail
&&
811 prefetcher
->reader
->decode_queue_tail
->lsn
== prefetcher
->begin_ptr
)
812 return LRQ_NEXT_AGAIN
;
814 /* Advance to the next record. */
815 prefetcher
->record
= NULL
;
821 * Expose statistics about recovery prefetching.
824 pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS
)
826 #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
827 ReturnSetInfo
*rsinfo
= (ReturnSetInfo
*) fcinfo
->resultinfo
;
828 Datum values
[PG_STAT_GET_RECOVERY_PREFETCH_COLS
];
829 bool nulls
[PG_STAT_GET_RECOVERY_PREFETCH_COLS
];
831 InitMaterializedSRF(fcinfo
, 0);
833 for (int i
= 0; i
< PG_STAT_GET_RECOVERY_PREFETCH_COLS
; ++i
)
836 values
[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats
->reset_time
));
837 values
[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats
->prefetch
));
838 values
[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats
->hit
));
839 values
[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats
->skip_init
));
840 values
[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats
->skip_new
));
841 values
[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats
->skip_fpw
));
842 values
[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats
->skip_rep
));
843 values
[7] = Int32GetDatum(SharedStats
->wal_distance
);
844 values
[8] = Int32GetDatum(SharedStats
->block_distance
);
845 values
[9] = Int32GetDatum(SharedStats
->io_depth
);
846 tuplestore_putvalues(rsinfo
->setResult
, rsinfo
->setDesc
, values
, nulls
);
852 * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
856 XLogPrefetcherAddFilter(XLogPrefetcher
*prefetcher
, RelFileLocator rlocator
,
857 BlockNumber blockno
, XLogRecPtr lsn
)
859 XLogPrefetcherFilter
*filter
;
862 filter
= hash_search(prefetcher
->filter_table
, &rlocator
, HASH_ENTER
, &found
);
866 * Don't allow any prefetching of this block or higher until replayed.
868 filter
->filter_until_replayed
= lsn
;
869 filter
->filter_from_block
= blockno
;
870 dlist_push_head(&prefetcher
->filter_queue
, &filter
->link
);
875 * We were already filtering this rlocator. Extend the filter's
876 * lifetime to cover this WAL record, but leave the lower of the block
877 * numbers there because we don't want to have to track individual
880 filter
->filter_until_replayed
= lsn
;
881 dlist_delete(&filter
->link
);
882 dlist_push_head(&prefetcher
->filter_queue
, &filter
->link
);
883 filter
->filter_from_block
= Min(filter
->filter_from_block
, blockno
);
888 * Have we replayed any records that caused us to begin filtering a block
889 * range? That means that relations should have been created, extended or
890 * dropped as required, so we can stop filtering out accesses to a given
894 XLogPrefetcherCompleteFilters(XLogPrefetcher
*prefetcher
, XLogRecPtr replaying_lsn
)
896 while (unlikely(!dlist_is_empty(&prefetcher
->filter_queue
)))
898 XLogPrefetcherFilter
*filter
= dlist_tail_element(XLogPrefetcherFilter
,
900 &prefetcher
->filter_queue
);
902 if (filter
->filter_until_replayed
>= replaying_lsn
)
905 dlist_delete(&filter
->link
);
906 hash_search(prefetcher
->filter_table
, filter
, HASH_REMOVE
, NULL
);
911 * Check if a given block should be skipped due to a filter.
914 XLogPrefetcherIsFiltered(XLogPrefetcher
*prefetcher
, RelFileLocator rlocator
,
918 * Test for empty queue first, because we expect it to be empty most of
919 * the time and we can avoid the hash table lookup in that case.
921 if (unlikely(!dlist_is_empty(&prefetcher
->filter_queue
)))
923 XLogPrefetcherFilter
*filter
;
925 /* See if the block range is filtered. */
926 filter
= hash_search(prefetcher
->filter_table
, &rlocator
, HASH_FIND
, NULL
);
927 if (filter
&& filter
->filter_from_block
<= blockno
)
929 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
930 elog(XLOGPREFETCHER_DEBUG_LEVEL
,
931 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
932 rlocator
.spcOid
, rlocator
.dbOid
, rlocator
.relNumber
, blockno
,
933 LSN_FORMAT_ARGS(filter
->filter_until_replayed
),
934 filter
->filter_from_block
);
939 /* See if the whole database is filtered. */
940 rlocator
.relNumber
= InvalidRelFileNumber
;
941 rlocator
.spcOid
= InvalidOid
;
942 filter
= hash_search(prefetcher
->filter_table
, &rlocator
, HASH_FIND
, NULL
);
945 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
946 elog(XLOGPREFETCHER_DEBUG_LEVEL
,
947 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
948 rlocator
.spcOid
, rlocator
.dbOid
, rlocator
.relNumber
, blockno
,
949 LSN_FORMAT_ARGS(filter
->filter_until_replayed
));
959 * A wrapper for XLogBeginRead() that also resets the prefetcher.
962 XLogPrefetcherBeginRead(XLogPrefetcher
*prefetcher
, XLogRecPtr recPtr
)
964 /* This will forget about any in-flight IO. */
965 prefetcher
->reconfigure_count
--;
967 /* Book-keeping to avoid readahead on first read. */
968 prefetcher
->begin_ptr
= recPtr
;
970 prefetcher
->no_readahead_until
= 0;
972 /* This will forget about any queued up records in the decoder. */
973 XLogBeginRead(prefetcher
->reader
, recPtr
);
977 * A wrapper for XLogReadRecord() that provides the same interface, but also
978 * tries to initiate I/O for blocks referenced in future WAL records.
981 XLogPrefetcherReadRecord(XLogPrefetcher
*prefetcher
, char **errmsg
)
983 DecodedXLogRecord
*record
;
984 XLogRecPtr replayed_up_to
;
987 * See if it's time to reset the prefetching machinery, because a relevant
990 if (unlikely(XLogPrefetchReconfigureCount
!= prefetcher
->reconfigure_count
))
995 if (prefetcher
->streaming_read
)
996 lrq_free(prefetcher
->streaming_read
);
998 if (RecoveryPrefetchEnabled())
1000 Assert(maintenance_io_concurrency
> 0);
1001 max_inflight
= maintenance_io_concurrency
;
1002 max_distance
= max_inflight
* XLOGPREFETCHER_DISTANCE_MULTIPLIER
;
1010 prefetcher
->streaming_read
= lrq_alloc(max_distance
,
1012 (uintptr_t) prefetcher
,
1013 XLogPrefetcherNextBlock
);
1015 prefetcher
->reconfigure_count
= XLogPrefetchReconfigureCount
;
1019 * Release last returned record, if there is one, as it's now been
1022 replayed_up_to
= XLogReleasePreviousRecord(prefetcher
->reader
);
1025 * Can we drop any filters yet? If we were waiting for a relation to be
1026 * created or extended, it is now OK to access blocks in the covered
1029 XLogPrefetcherCompleteFilters(prefetcher
, replayed_up_to
);
1032 * All IO initiated by earlier WAL is now completed. This might trigger
1033 * further prefetching.
1035 lrq_complete_lsn(prefetcher
->streaming_read
, replayed_up_to
);
1038 * If there's nothing queued yet, then start prefetching to cause at least
1039 * one record to be queued.
1041 if (!XLogReaderHasQueuedRecordOrError(prefetcher
->reader
))
1043 Assert(lrq_inflight(prefetcher
->streaming_read
) == 0);
1044 Assert(lrq_completed(prefetcher
->streaming_read
) == 0);
1045 lrq_prefetch(prefetcher
->streaming_read
);
1048 /* Read the next record. */
1049 record
= XLogNextRecord(prefetcher
->reader
, errmsg
);
1054 * The record we just got is the "current" one, for the benefit of the
1055 * XLogRecXXX() macros.
1057 Assert(record
== prefetcher
->reader
->record
);
1060 * If maintenance_io_concurrency is set very low, we might have started
1061 * prefetching some but not all of the blocks referenced in the record
1062 * we're about to return. Forget about the rest of the blocks in this
1063 * record by dropping the prefetcher's reference to it.
1065 if (record
== prefetcher
->record
)
1066 prefetcher
->record
= NULL
;
1069 * See if it's time to compute some statistics, because enough WAL has
1072 if (unlikely(record
->lsn
>= prefetcher
->next_stats_shm_lsn
))
1073 XLogPrefetcherComputeStats(prefetcher
);
1075 Assert(record
== prefetcher
->reader
->record
);
1077 return &record
->header
;
1081 check_recovery_prefetch(int *new_value
, void **extra
, GucSource source
)
1083 #ifndef USE_PREFETCH
1084 if (*new_value
== RECOVERY_PREFETCH_ON
)
1086 GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
1095 assign_recovery_prefetch(int new_value
, void *extra
)
1097 /* Reconfigure prefetching, because a setting it depends on changed. */
1098 recovery_prefetch
= new_value
;
1099 if (AmStartupProcess())
1100 XLogPrefetchReconfigure();