nbtree: fix read page recheck typo.
[pgsql.git] / src / backend / backup / basebackup_throttle.c
blob4477945e61334b1ad52dcec5156fc0c7f8c8e681
1 /*-------------------------------------------------------------------------
3 * basebackup_throttle.c
4 * Basebackup sink implementing throttling. Data is forwarded to the
5 * next base backup sink in the chain at a rate no greater than the
6 * configured maximum.
8 * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
10 * IDENTIFICATION
11 * src/backend/backup/basebackup_throttle.c
13 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "backup/basebackup_sink.h"
18 #include "miscadmin.h"
19 #include "pgstat.h"
20 #include "storage/latch.h"
21 #include "utils/timestamp.h"
23 typedef struct bbsink_throttle
25 /* Common information for all types of sink. */
26 bbsink base;
28 /* The actual number of bytes, transfer of which may cause sleep. */
29 uint64 throttling_sample;
31 /* Amount of data already transferred but not yet throttled. */
32 int64 throttling_counter;
34 /* The minimum time required to transfer throttling_sample bytes. */
35 TimeOffset elapsed_min_unit;
37 /* The last check of the transfer rate. */
38 TimestampTz throttled_last;
39 } bbsink_throttle;
41 static void bbsink_throttle_begin_backup(bbsink *sink);
42 static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
43 static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
44 static void throttle(bbsink_throttle *sink, size_t increment);
46 static const bbsink_ops bbsink_throttle_ops = {
47 .begin_backup = bbsink_throttle_begin_backup,
48 .begin_archive = bbsink_forward_begin_archive,
49 .archive_contents = bbsink_throttle_archive_contents,
50 .end_archive = bbsink_forward_end_archive,
51 .begin_manifest = bbsink_forward_begin_manifest,
52 .manifest_contents = bbsink_throttle_manifest_contents,
53 .end_manifest = bbsink_forward_end_manifest,
54 .end_backup = bbsink_forward_end_backup,
55 .cleanup = bbsink_forward_cleanup
59 * How frequently to throttle, as a fraction of the specified rate-second.
61 #define THROTTLING_FREQUENCY 8
64 * Create a new basebackup sink that performs throttling and forwards data
65 * to a successor sink.
67 bbsink *
68 bbsink_throttle_new(bbsink *next, uint32 maxrate)
70 bbsink_throttle *sink;
72 Assert(next != NULL);
73 Assert(maxrate > 0);
75 sink = palloc0(sizeof(bbsink_throttle));
76 *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
77 sink->base.bbs_next = next;
79 sink->throttling_sample =
80 (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
83 * The minimum amount of time for throttling_sample bytes to be
84 * transferred.
86 sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
88 return &sink->base;
92 * There's no real work to do here, but we need to record the current time so
93 * that it can be used for future calculations.
95 static void
96 bbsink_throttle_begin_backup(bbsink *sink)
98 bbsink_throttle *mysink = (bbsink_throttle *) sink;
100 bbsink_forward_begin_backup(sink);
102 /* The 'real data' starts now (header was ignored). */
103 mysink->throttled_last = GetCurrentTimestamp();
107 * First throttle, and then pass archive contents to next sink.
109 static void
110 bbsink_throttle_archive_contents(bbsink *sink, size_t len)
112 throttle((bbsink_throttle *) sink, len);
114 bbsink_forward_archive_contents(sink, len);
118 * First throttle, and then pass manifest contents to next sink.
120 static void
121 bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
123 throttle((bbsink_throttle *) sink, len);
125 bbsink_forward_manifest_contents(sink, len);
129 * Increment the network transfer counter by the given number of bytes,
130 * and sleep if necessary to comply with the requested network transfer
131 * rate.
133 static void
134 throttle(bbsink_throttle *sink, size_t increment)
136 TimeOffset elapsed_min;
138 Assert(sink->throttling_counter >= 0);
140 sink->throttling_counter += increment;
141 if (sink->throttling_counter < sink->throttling_sample)
142 return;
144 /* How much time should have elapsed at minimum? */
145 elapsed_min = sink->elapsed_min_unit *
146 (sink->throttling_counter / sink->throttling_sample);
149 * Since the latch could be set repeatedly because of concurrently WAL
150 * activity, sleep in a loop to ensure enough time has passed.
152 for (;;)
154 TimeOffset elapsed,
155 sleep;
156 int wait_result;
158 /* Time elapsed since the last measurement (and possible wake up). */
159 elapsed = GetCurrentTimestamp() - sink->throttled_last;
161 /* sleep if the transfer is faster than it should be */
162 sleep = elapsed_min - elapsed;
163 if (sleep <= 0)
164 break;
166 ResetLatch(MyLatch);
168 /* We're eating a potentially set latch, so check for interrupts */
169 CHECK_FOR_INTERRUPTS();
172 * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
173 * the maximum time to sleep. Thus the cast to long is safe.
175 wait_result = WaitLatch(MyLatch,
176 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
177 (long) (sleep / 1000),
178 WAIT_EVENT_BASE_BACKUP_THROTTLE);
180 if (wait_result & WL_LATCH_SET)
181 CHECK_FOR_INTERRUPTS();
183 /* Done waiting? */
184 if (wait_result & WL_TIMEOUT)
185 break;
189 * As we work with integers, only whole multiple of throttling_sample was
190 * processed. The rest will be done during the next call of this function.
192 sink->throttling_counter %= sink->throttling_sample;
195 * Time interval for the remaining amount and possible next increments
196 * starts now.
198 sink->throttled_last = GetCurrentTimestamp();