doc: Update links which returned 404
[pgsql.git] / src / backend / access / transam / xlogprefetcher.c
blob7735562db01d172383af38dc0a6fd8760db1bce9
1 /*-------------------------------------------------------------------------
3 * xlogprefetcher.c
4 * Prefetching support for recovery.
6 * Portions Copyright (c) 2022-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/backend/access/transam/xlogprefetcher.c
13 * This module provides a drop-in replacement for an XLogReader that tries to
14 * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15 * accessed in the near future are not already in the buffer pool, it initiates
16 * I/Os that might complete before the caller eventually needs the data. When
17 * referenced blocks are found in the buffer pool already, the buffer is
18 * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19 * avoid a second buffer mapping table lookup.
21 * Currently, only the main fork is considered for prefetching. Currently,
22 * prefetching is only effective on systems where PrefetchBuffer() does
23 * something useful (mainly Linux).
25 *-------------------------------------------------------------------------
28 #include "postgres.h"
30 #include "access/xlogprefetcher.h"
31 #include "access/xlogreader.h"
32 #include "catalog/pg_control.h"
33 #include "catalog/storage_xlog.h"
34 #include "commands/dbcommands_xlog.h"
35 #include "funcapi.h"
36 #include "miscadmin.h"
37 #include "port/atomics.h"
38 #include "storage/bufmgr.h"
39 #include "storage/shmem.h"
40 #include "storage/smgr.h"
41 #include "utils/fmgrprotos.h"
42 #include "utils/guc_hooks.h"
43 #include "utils/hsearch.h"
44 #include "utils/timestamp.h"
47 * Every time we process this much WAL, we'll update the values in
48 * pg_stat_recovery_prefetch.
50 #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
53 * To detect repeated access to the same block and skip useless extra system
54 * calls, we remember a small window of recently prefetched blocks.
56 #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
59 * When maintenance_io_concurrency is not saturated, we're prepared to look
60 * ahead up to N times that number of block references.
62 #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
64 /* Define to log internal debugging messages. */
65 /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
67 /* GUCs */
68 int recovery_prefetch = RECOVERY_PREFETCH_TRY;
70 #ifdef USE_PREFETCH
71 #define RecoveryPrefetchEnabled() \
72 (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
73 maintenance_io_concurrency > 0)
74 #else
75 #define RecoveryPrefetchEnabled() false
76 #endif
78 static int XLogPrefetchReconfigureCount = 0;
81 * Enum used to report whether an IO should be started.
83 typedef enum
85 LRQ_NEXT_NO_IO,
86 LRQ_NEXT_IO,
87 LRQ_NEXT_AGAIN,
88 } LsnReadQueueNextStatus;
91 * Type of callback that can decide which block to prefetch next. For now
92 * there is only one.
94 typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
95 XLogRecPtr *lsn);
98 * A simple circular queue of LSNs, using to control the number of
99 * (potentially) inflight IOs. This stands in for a later more general IO
100 * control mechanism, which is why it has the apparently unnecessary
101 * indirection through a function pointer.
103 typedef struct LsnReadQueue
105 LsnReadQueueNextFun next;
106 uintptr_t lrq_private;
107 uint32 max_inflight;
108 uint32 inflight;
109 uint32 completed;
110 uint32 head;
111 uint32 tail;
112 uint32 size;
113 struct
115 bool io;
116 XLogRecPtr lsn;
117 } queue[FLEXIBLE_ARRAY_MEMBER];
118 } LsnReadQueue;
121 * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
122 * blocks that will be soon be referenced, to try to avoid IO stalls.
124 struct XLogPrefetcher
126 /* WAL reader and current reading state. */
127 XLogReaderState *reader;
128 DecodedXLogRecord *record;
129 int next_block_id;
131 /* When to publish stats. */
132 XLogRecPtr next_stats_shm_lsn;
134 /* Book-keeping to avoid accessing blocks that don't exist yet. */
135 HTAB *filter_table;
136 dlist_head filter_queue;
138 /* Book-keeping to avoid repeat prefetches. */
139 RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
140 BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
141 int recent_idx;
143 /* Book-keeping to disable prefetching temporarily. */
144 XLogRecPtr no_readahead_until;
146 /* IO depth manager. */
147 LsnReadQueue *streaming_read;
149 XLogRecPtr begin_ptr;
151 int reconfigure_count;
155 * A temporary filter used to track block ranges that haven't been created
156 * yet, whole relations that haven't been created yet, and whole relations
157 * that (we assume) have already been dropped, or will be created by bulk WAL
158 * operators.
160 typedef struct XLogPrefetcherFilter
162 RelFileLocator rlocator;
163 XLogRecPtr filter_until_replayed;
164 BlockNumber filter_from_block;
165 dlist_node link;
166 } XLogPrefetcherFilter;
169 * Counters exposed in shared memory for pg_stat_recovery_prefetch.
171 typedef struct XLogPrefetchStats
173 pg_atomic_uint64 reset_time; /* Time of last reset. */
174 pg_atomic_uint64 prefetch; /* Prefetches initiated. */
175 pg_atomic_uint64 hit; /* Blocks already in cache. */
176 pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
177 pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
178 pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
179 pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */
181 /* Dynamic values */
182 int wal_distance; /* Number of WAL bytes ahead. */
183 int block_distance; /* Number of block references ahead. */
184 int io_depth; /* Number of I/Os in progress. */
185 } XLogPrefetchStats;
187 static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
188 RelFileLocator rlocator,
189 BlockNumber blockno,
190 XLogRecPtr lsn);
191 static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
192 RelFileLocator rlocator,
193 BlockNumber blockno);
194 static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
195 XLogRecPtr replaying_lsn);
196 static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
197 XLogRecPtr *lsn);
199 static XLogPrefetchStats *SharedStats;
201 static inline LsnReadQueue *
202 lrq_alloc(uint32 max_distance,
203 uint32 max_inflight,
204 uintptr_t lrq_private,
205 LsnReadQueueNextFun next)
207 LsnReadQueue *lrq;
208 uint32 size;
210 Assert(max_distance >= max_inflight);
212 size = max_distance + 1; /* full ring buffer has a gap */
213 lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
214 lrq->lrq_private = lrq_private;
215 lrq->max_inflight = max_inflight;
216 lrq->size = size;
217 lrq->next = next;
218 lrq->head = 0;
219 lrq->tail = 0;
220 lrq->inflight = 0;
221 lrq->completed = 0;
223 return lrq;
226 static inline void
227 lrq_free(LsnReadQueue *lrq)
229 pfree(lrq);
232 static inline uint32
233 lrq_inflight(LsnReadQueue *lrq)
235 return lrq->inflight;
238 static inline uint32
239 lrq_completed(LsnReadQueue *lrq)
241 return lrq->completed;
244 static inline void
245 lrq_prefetch(LsnReadQueue *lrq)
247 /* Try to start as many IOs as we can within our limits. */
248 while (lrq->inflight < lrq->max_inflight &&
249 lrq->inflight + lrq->completed < lrq->size - 1)
251 Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
252 switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
254 case LRQ_NEXT_AGAIN:
255 return;
256 case LRQ_NEXT_IO:
257 lrq->queue[lrq->head].io = true;
258 lrq->inflight++;
259 break;
260 case LRQ_NEXT_NO_IO:
261 lrq->queue[lrq->head].io = false;
262 lrq->completed++;
263 break;
265 lrq->head++;
266 if (lrq->head == lrq->size)
267 lrq->head = 0;
271 static inline void
272 lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
275 * We know that LSNs before 'lsn' have been replayed, so we can now assume
276 * that any IOs that were started before then have finished.
278 while (lrq->tail != lrq->head &&
279 lrq->queue[lrq->tail].lsn < lsn)
281 if (lrq->queue[lrq->tail].io)
282 lrq->inflight--;
283 else
284 lrq->completed--;
285 lrq->tail++;
286 if (lrq->tail == lrq->size)
287 lrq->tail = 0;
289 if (RecoveryPrefetchEnabled())
290 lrq_prefetch(lrq);
293 size_t
294 XLogPrefetchShmemSize(void)
296 return sizeof(XLogPrefetchStats);
300 * Reset all counters to zero.
302 void
303 XLogPrefetchResetStats(void)
305 pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
306 pg_atomic_write_u64(&SharedStats->prefetch, 0);
307 pg_atomic_write_u64(&SharedStats->hit, 0);
308 pg_atomic_write_u64(&SharedStats->skip_init, 0);
309 pg_atomic_write_u64(&SharedStats->skip_new, 0);
310 pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
311 pg_atomic_write_u64(&SharedStats->skip_rep, 0);
314 void
315 XLogPrefetchShmemInit(void)
317 bool found;
319 SharedStats = (XLogPrefetchStats *)
320 ShmemInitStruct("XLogPrefetchStats",
321 sizeof(XLogPrefetchStats),
322 &found);
324 if (!found)
326 pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
327 pg_atomic_init_u64(&SharedStats->prefetch, 0);
328 pg_atomic_init_u64(&SharedStats->hit, 0);
329 pg_atomic_init_u64(&SharedStats->skip_init, 0);
330 pg_atomic_init_u64(&SharedStats->skip_new, 0);
331 pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
332 pg_atomic_init_u64(&SharedStats->skip_rep, 0);
337 * Called when any GUC is changed that affects prefetching.
339 void
340 XLogPrefetchReconfigure(void)
342 XLogPrefetchReconfigureCount++;
346 * Increment a counter in shared memory. This is equivalent to *counter++ on a
347 * plain uint64 without any memory barrier or locking, except on platforms
348 * where readers can't read uint64 without possibly observing a torn value.
350 static inline void
351 XLogPrefetchIncrement(pg_atomic_uint64 *counter)
353 Assert(AmStartupProcess() || !IsUnderPostmaster);
354 pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
358 * Create a prefetcher that is ready to begin prefetching blocks referenced by
359 * WAL records.
361 XLogPrefetcher *
362 XLogPrefetcherAllocate(XLogReaderState *reader)
364 XLogPrefetcher *prefetcher;
365 HASHCTL ctl;
367 prefetcher = palloc0(sizeof(XLogPrefetcher));
368 prefetcher->reader = reader;
370 ctl.keysize = sizeof(RelFileLocator);
371 ctl.entrysize = sizeof(XLogPrefetcherFilter);
372 prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
373 &ctl, HASH_ELEM | HASH_BLOBS);
374 dlist_init(&prefetcher->filter_queue);
376 SharedStats->wal_distance = 0;
377 SharedStats->block_distance = 0;
378 SharedStats->io_depth = 0;
380 /* First usage will cause streaming_read to be allocated. */
381 prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
383 return prefetcher;
387 * Destroy a prefetcher and release all resources.
389 void
390 XLogPrefetcherFree(XLogPrefetcher *prefetcher)
392 lrq_free(prefetcher->streaming_read);
393 hash_destroy(prefetcher->filter_table);
394 pfree(prefetcher);
398 * Provide access to the reader.
400 XLogReaderState *
401 XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
403 return prefetcher->reader;
407 * Update the statistics visible in the pg_stat_recovery_prefetch view.
409 void
410 XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
412 uint32 io_depth;
413 uint32 completed;
414 int64 wal_distance;
417 /* How far ahead of replay are we now? */
418 if (prefetcher->reader->decode_queue_tail)
420 wal_distance =
421 prefetcher->reader->decode_queue_tail->lsn -
422 prefetcher->reader->decode_queue_head->lsn;
424 else
426 wal_distance = 0;
429 /* How many IOs are currently in flight and completed? */
430 io_depth = lrq_inflight(prefetcher->streaming_read);
431 completed = lrq_completed(prefetcher->streaming_read);
433 /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
434 SharedStats->io_depth = io_depth;
435 SharedStats->block_distance = io_depth + completed;
436 SharedStats->wal_distance = wal_distance;
438 prefetcher->next_stats_shm_lsn =
439 prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
443 * A callback that examines the next block reference in the WAL, and possibly
444 * starts an IO so that a later read will be fast.
446 * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
448 * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
449 * that isn't in the buffer pool, and the kernel has been asked to start
450 * reading it to make a future read system call faster. An LSN is written to
451 * *lsn, and the I/O will be considered to have completed once that LSN is
452 * replayed.
454 * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
455 * that it was already in the buffer pool, or we decided for various reasons
456 * not to prefetch.
458 static LsnReadQueueNextStatus
459 XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
461 XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
462 XLogReaderState *reader = prefetcher->reader;
463 XLogRecPtr replaying_lsn = reader->ReadRecPtr;
466 * We keep track of the record and block we're up to between calls with
467 * prefetcher->record and prefetcher->next_block_id.
469 for (;;)
471 DecodedXLogRecord *record;
473 /* Try to read a new future record, if we don't already have one. */
474 if (prefetcher->record == NULL)
476 bool nonblocking;
479 * If there are already records or an error queued up that could
480 * be replayed, we don't want to block here. Otherwise, it's OK
481 * to block waiting for more data: presumably the caller has
482 * nothing else to do.
484 nonblocking = XLogReaderHasQueuedRecordOrError(reader);
486 /* Readahead is disabled until we replay past a certain point. */
487 if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
488 return LRQ_NEXT_AGAIN;
490 record = XLogReadAhead(prefetcher->reader, nonblocking);
491 if (record == NULL)
494 * We can't read any more, due to an error or lack of data in
495 * nonblocking mode. Don't try to read ahead again until
496 * we've replayed everything already decoded.
498 if (nonblocking && prefetcher->reader->decode_queue_tail)
499 prefetcher->no_readahead_until =
500 prefetcher->reader->decode_queue_tail->lsn;
502 return LRQ_NEXT_AGAIN;
506 * If prefetching is disabled, we don't need to analyze the record
507 * or issue any prefetches. We just need to cause one record to
508 * be decoded.
510 if (!RecoveryPrefetchEnabled())
512 *lsn = InvalidXLogRecPtr;
513 return LRQ_NEXT_NO_IO;
516 /* We have a new record to process. */
517 prefetcher->record = record;
518 prefetcher->next_block_id = 0;
520 else
522 /* Continue to process from last call, or last loop. */
523 record = prefetcher->record;
527 * Check for operations that require us to filter out block ranges, or
528 * pause readahead completely.
530 if (replaying_lsn < record->lsn)
532 uint8 rmid = record->header.xl_rmid;
533 uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
535 if (rmid == RM_XLOG_ID)
537 if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
538 record_type == XLOG_END_OF_RECOVERY)
541 * These records might change the TLI. Avoid potential
542 * bugs if we were to allow "read TLI" and "replay TLI" to
543 * differ without more analysis.
545 prefetcher->no_readahead_until = record->lsn;
547 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
548 elog(XLOGPREFETCHER_DEBUG_LEVEL,
549 "suppressing all readahead until %X/%X is replayed due to possible TLI change",
550 LSN_FORMAT_ARGS(record->lsn));
551 #endif
553 /* Fall through so we move past this record. */
556 else if (rmid == RM_DBASE_ID)
559 * When databases are created with the file-copy strategy,
560 * there are no WAL records to tell us about the creation of
561 * individual relations.
563 if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
565 xl_dbase_create_file_copy_rec *xlrec =
566 (xl_dbase_create_file_copy_rec *) record->main_data;
567 RelFileLocator rlocator =
568 {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
571 * Don't try to prefetch anything in this database until
572 * it has been created, or we might confuse the blocks of
573 * different generations, if a database OID or
574 * relfilenumber is reused. It's also more efficient than
575 * discovering that relations don't exist on disk yet with
576 * ENOENT errors.
578 XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
580 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
581 elog(XLOGPREFETCHER_DEBUG_LEVEL,
582 "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
583 rlocator.dbOid,
584 LSN_FORMAT_ARGS(record->lsn));
585 #endif
588 else if (rmid == RM_SMGR_ID)
590 if (record_type == XLOG_SMGR_CREATE)
592 xl_smgr_create *xlrec = (xl_smgr_create *)
593 record->main_data;
595 if (xlrec->forkNum == MAIN_FORKNUM)
598 * Don't prefetch anything for this whole relation
599 * until it has been created. Otherwise we might
600 * confuse the blocks of different generations, if a
601 * relfilenumber is reused. This also avoids the need
602 * to discover the problem via extra syscalls that
603 * report ENOENT.
605 XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
606 record->lsn);
608 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
609 elog(XLOGPREFETCHER_DEBUG_LEVEL,
610 "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
611 xlrec->rlocator.spcOid,
612 xlrec->rlocator.dbOid,
613 xlrec->rlocator.relNumber,
614 LSN_FORMAT_ARGS(record->lsn));
615 #endif
618 else if (record_type == XLOG_SMGR_TRUNCATE)
620 xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
621 record->main_data;
624 * Don't consider prefetching anything in the truncated
625 * range until the truncation has been performed.
627 XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
628 xlrec->blkno,
629 record->lsn);
631 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
632 elog(XLOGPREFETCHER_DEBUG_LEVEL,
633 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
634 xlrec->rlocator.spcOid,
635 xlrec->rlocator.dbOid,
636 xlrec->rlocator.relNumber,
637 xlrec->blkno,
638 LSN_FORMAT_ARGS(record->lsn));
639 #endif
644 /* Scan the block references, starting where we left off last time. */
645 while (prefetcher->next_block_id <= record->max_block_id)
647 int block_id = prefetcher->next_block_id++;
648 DecodedBkpBlock *block = &record->blocks[block_id];
649 SMgrRelation reln;
650 PrefetchBufferResult result;
652 if (!block->in_use)
653 continue;
655 Assert(!BufferIsValid(block->prefetch_buffer));
658 * Record the LSN of this record. When it's replayed,
659 * LsnReadQueue will consider any IOs submitted for earlier LSNs
660 * to be finished.
662 *lsn = record->lsn;
664 /* We don't try to prefetch anything but the main fork for now. */
665 if (block->forknum != MAIN_FORKNUM)
667 return LRQ_NEXT_NO_IO;
671 * If there is a full page image attached, we won't be reading the
672 * page, so don't bother trying to prefetch.
674 if (block->has_image)
676 XLogPrefetchIncrement(&SharedStats->skip_fpw);
677 return LRQ_NEXT_NO_IO;
680 /* There is no point in reading a page that will be zeroed. */
681 if (block->flags & BKPBLOCK_WILL_INIT)
683 XLogPrefetchIncrement(&SharedStats->skip_init);
684 return LRQ_NEXT_NO_IO;
687 /* Should we skip prefetching this block due to a filter? */
688 if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
690 XLogPrefetchIncrement(&SharedStats->skip_new);
691 return LRQ_NEXT_NO_IO;
694 /* There is no point in repeatedly prefetching the same block. */
695 for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
697 if (block->blkno == prefetcher->recent_block[i] &&
698 RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
701 * XXX If we also remembered where it was, we could set
702 * recent_buffer so that recovery could skip smgropen()
703 * and a buffer table lookup.
705 XLogPrefetchIncrement(&SharedStats->skip_rep);
706 return LRQ_NEXT_NO_IO;
709 prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
710 prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
711 prefetcher->recent_idx =
712 (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
715 * We could try to have a fast path for repeated references to the
716 * same relation (with some scheme to handle invalidations
717 * safely), but for now we'll call smgropen() every time.
719 reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
722 * If the relation file doesn't exist on disk, for example because
723 * we're replaying after a crash and the file will be created and
724 * then unlinked by WAL that hasn't been replayed yet, suppress
725 * further prefetching in the relation until this record is
726 * replayed.
728 if (!smgrexists(reln, MAIN_FORKNUM))
730 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
731 elog(XLOGPREFETCHER_DEBUG_LEVEL,
732 "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
733 reln->smgr_rlocator.locator.spcOid,
734 reln->smgr_rlocator.locator.dbOid,
735 reln->smgr_rlocator.locator.relNumber,
736 LSN_FORMAT_ARGS(record->lsn));
737 #endif
738 XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
739 record->lsn);
740 XLogPrefetchIncrement(&SharedStats->skip_new);
741 return LRQ_NEXT_NO_IO;
745 * If the relation isn't big enough to contain the referenced
746 * block yet, suppress prefetching of this block and higher until
747 * this record is replayed.
749 if (block->blkno >= smgrnblocks(reln, block->forknum))
751 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
752 elog(XLOGPREFETCHER_DEBUG_LEVEL,
753 "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
754 reln->smgr_rlocator.locator.spcOid,
755 reln->smgr_rlocator.locator.dbOid,
756 reln->smgr_rlocator.locator.relNumber,
757 block->blkno,
758 LSN_FORMAT_ARGS(record->lsn));
759 #endif
760 XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
761 record->lsn);
762 XLogPrefetchIncrement(&SharedStats->skip_new);
763 return LRQ_NEXT_NO_IO;
766 /* Try to initiate prefetching. */
767 result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
768 if (BufferIsValid(result.recent_buffer))
770 /* Cache hit, nothing to do. */
771 XLogPrefetchIncrement(&SharedStats->hit);
772 block->prefetch_buffer = result.recent_buffer;
773 return LRQ_NEXT_NO_IO;
775 else if (result.initiated_io)
777 /* Cache miss, I/O (presumably) started. */
778 XLogPrefetchIncrement(&SharedStats->prefetch);
779 block->prefetch_buffer = InvalidBuffer;
780 return LRQ_NEXT_IO;
782 else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
785 * This shouldn't be possible, because we already determined
786 * that the relation exists on disk and is big enough.
787 * Something is wrong with the cache invalidation for
788 * smgrexists(), smgrnblocks(), or the file was unlinked or
789 * truncated beneath our feet?
791 elog(ERROR,
792 "could not prefetch relation %u/%u/%u block %u",
793 reln->smgr_rlocator.locator.spcOid,
794 reln->smgr_rlocator.locator.dbOid,
795 reln->smgr_rlocator.locator.relNumber,
796 block->blkno);
801 * Several callsites need to be able to read exactly one record
802 * without any internal readahead. Examples: xlog.c reading
803 * checkpoint records with emode set to PANIC, which might otherwise
804 * cause XLogPageRead() to panic on some future page, and xlog.c
805 * determining where to start writing WAL next, which depends on the
806 * contents of the reader's internal buffer after reading one record.
807 * Therefore, don't even think about prefetching until the first
808 * record after XLogPrefetcherBeginRead() has been consumed.
810 if (prefetcher->reader->decode_queue_tail &&
811 prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
812 return LRQ_NEXT_AGAIN;
814 /* Advance to the next record. */
815 prefetcher->record = NULL;
817 pg_unreachable();
821 * Expose statistics about recovery prefetching.
823 Datum
824 pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
826 #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
827 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
828 Datum values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
829 bool nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
831 InitMaterializedSRF(fcinfo, 0);
833 for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
834 nulls[i] = false;
836 values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
837 values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
838 values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
839 values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
840 values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
841 values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
842 values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
843 values[7] = Int32GetDatum(SharedStats->wal_distance);
844 values[8] = Int32GetDatum(SharedStats->block_distance);
845 values[9] = Int32GetDatum(SharedStats->io_depth);
846 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
848 return (Datum) 0;
852 * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
853 * has been replayed.
855 static inline void
856 XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
857 BlockNumber blockno, XLogRecPtr lsn)
859 XLogPrefetcherFilter *filter;
860 bool found;
862 filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
863 if (!found)
866 * Don't allow any prefetching of this block or higher until replayed.
868 filter->filter_until_replayed = lsn;
869 filter->filter_from_block = blockno;
870 dlist_push_head(&prefetcher->filter_queue, &filter->link);
872 else
875 * We were already filtering this rlocator. Extend the filter's
876 * lifetime to cover this WAL record, but leave the lower of the block
877 * numbers there because we don't want to have to track individual
878 * blocks.
880 filter->filter_until_replayed = lsn;
881 dlist_delete(&filter->link);
882 dlist_push_head(&prefetcher->filter_queue, &filter->link);
883 filter->filter_from_block = Min(filter->filter_from_block, blockno);
888 * Have we replayed any records that caused us to begin filtering a block
889 * range? That means that relations should have been created, extended or
890 * dropped as required, so we can stop filtering out accesses to a given
891 * relfilenumber.
893 static inline void
894 XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
896 while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
898 XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
899 link,
900 &prefetcher->filter_queue);
902 if (filter->filter_until_replayed >= replaying_lsn)
903 break;
905 dlist_delete(&filter->link);
906 hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
911 * Check if a given block should be skipped due to a filter.
913 static inline bool
914 XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
915 BlockNumber blockno)
918 * Test for empty queue first, because we expect it to be empty most of
919 * the time and we can avoid the hash table lookup in that case.
921 if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
923 XLogPrefetcherFilter *filter;
925 /* See if the block range is filtered. */
926 filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
927 if (filter && filter->filter_from_block <= blockno)
929 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
930 elog(XLOGPREFETCHER_DEBUG_LEVEL,
931 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
932 rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
933 LSN_FORMAT_ARGS(filter->filter_until_replayed),
934 filter->filter_from_block);
935 #endif
936 return true;
939 /* See if the whole database is filtered. */
940 rlocator.relNumber = InvalidRelFileNumber;
941 rlocator.spcOid = InvalidOid;
942 filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
943 if (filter)
945 #ifdef XLOGPREFETCHER_DEBUG_LEVEL
946 elog(XLOGPREFETCHER_DEBUG_LEVEL,
947 "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
948 rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
949 LSN_FORMAT_ARGS(filter->filter_until_replayed));
950 #endif
951 return true;
955 return false;
959 * A wrapper for XLogBeginRead() that also resets the prefetcher.
961 void
962 XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
964 /* This will forget about any in-flight IO. */
965 prefetcher->reconfigure_count--;
967 /* Book-keeping to avoid readahead on first read. */
968 prefetcher->begin_ptr = recPtr;
970 prefetcher->no_readahead_until = 0;
972 /* This will forget about any queued up records in the decoder. */
973 XLogBeginRead(prefetcher->reader, recPtr);
977 * A wrapper for XLogReadRecord() that provides the same interface, but also
978 * tries to initiate I/O for blocks referenced in future WAL records.
980 XLogRecord *
981 XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
983 DecodedXLogRecord *record;
984 XLogRecPtr replayed_up_to;
987 * See if it's time to reset the prefetching machinery, because a relevant
988 * GUC was changed.
990 if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
992 uint32 max_distance;
993 uint32 max_inflight;
995 if (prefetcher->streaming_read)
996 lrq_free(prefetcher->streaming_read);
998 if (RecoveryPrefetchEnabled())
1000 Assert(maintenance_io_concurrency > 0);
1001 max_inflight = maintenance_io_concurrency;
1002 max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1004 else
1006 max_inflight = 1;
1007 max_distance = 1;
1010 prefetcher->streaming_read = lrq_alloc(max_distance,
1011 max_inflight,
1012 (uintptr_t) prefetcher,
1013 XLogPrefetcherNextBlock);
1015 prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
1019 * Release last returned record, if there is one, as it's now been
1020 * replayed.
1022 replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1025 * Can we drop any filters yet? If we were waiting for a relation to be
1026 * created or extended, it is now OK to access blocks in the covered
1027 * range.
1029 XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1032 * All IO initiated by earlier WAL is now completed. This might trigger
1033 * further prefetching.
1035 lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1038 * If there's nothing queued yet, then start prefetching to cause at least
1039 * one record to be queued.
1041 if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1043 Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1044 Assert(lrq_completed(prefetcher->streaming_read) == 0);
1045 lrq_prefetch(prefetcher->streaming_read);
1048 /* Read the next record. */
1049 record = XLogNextRecord(prefetcher->reader, errmsg);
1050 if (!record)
1051 return NULL;
1054 * The record we just got is the "current" one, for the benefit of the
1055 * XLogRecXXX() macros.
1057 Assert(record == prefetcher->reader->record);
1060 * If maintenance_io_concurrency is set very low, we might have started
1061 * prefetching some but not all of the blocks referenced in the record
1062 * we're about to return. Forget about the rest of the blocks in this
1063 * record by dropping the prefetcher's reference to it.
1065 if (record == prefetcher->record)
1066 prefetcher->record = NULL;
1069 * See if it's time to compute some statistics, because enough WAL has
1070 * been processed.
1072 if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1073 XLogPrefetcherComputeStats(prefetcher);
1075 Assert(record == prefetcher->reader->record);
1077 return &record->header;
1080 bool
1081 check_recovery_prefetch(int *new_value, void **extra, GucSource source)
1083 #ifndef USE_PREFETCH
1084 if (*new_value == RECOVERY_PREFETCH_ON)
1086 GUC_check_errdetail("\"recovery_prefetch\" is not supported on platforms that lack support for issuing read-ahead advice.");
1087 return false;
1089 #endif
1091 return true;
1094 void
1095 assign_recovery_prefetch(int new_value, void *extra)
1097 /* Reconfigure prefetching, because a setting it depends on changed. */
1098 recovery_prefetch = new_value;
1099 if (AmStartupProcess())
1100 XLogPrefetchReconfigure();