1 /*-------------------------------------------------------------------------
3 * basebackup_throttle.c
4 * Basebackup sink implementing throttling. Data is forwarded to the
5 * next base backup sink in the chain at a rate no greater than the
8 * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
11 * src/backend/backup/basebackup_throttle.c
13 *-------------------------------------------------------------------------
17 #include "backup/basebackup_sink.h"
18 #include "miscadmin.h"
20 #include "storage/latch.h"
21 #include "utils/timestamp.h"
23 typedef struct bbsink_throttle
25 /* Common information for all types of sink. */
28 /* The actual number of bytes, transfer of which may cause sleep. */
29 uint64 throttling_sample
;
31 /* Amount of data already transferred but not yet throttled. */
32 int64 throttling_counter
;
34 /* The minimum time required to transfer throttling_sample bytes. */
35 TimeOffset elapsed_min_unit
;
37 /* The last check of the transfer rate. */
38 TimestampTz throttled_last
;
41 static void bbsink_throttle_begin_backup(bbsink
*sink
);
42 static void bbsink_throttle_archive_contents(bbsink
*sink
, size_t len
);
43 static void bbsink_throttle_manifest_contents(bbsink
*sink
, size_t len
);
44 static void throttle(bbsink_throttle
*sink
, size_t increment
);
46 static const bbsink_ops bbsink_throttle_ops
= {
47 .begin_backup
= bbsink_throttle_begin_backup
,
48 .begin_archive
= bbsink_forward_begin_archive
,
49 .archive_contents
= bbsink_throttle_archive_contents
,
50 .end_archive
= bbsink_forward_end_archive
,
51 .begin_manifest
= bbsink_forward_begin_manifest
,
52 .manifest_contents
= bbsink_throttle_manifest_contents
,
53 .end_manifest
= bbsink_forward_end_manifest
,
54 .end_backup
= bbsink_forward_end_backup
,
55 .cleanup
= bbsink_forward_cleanup
59 * How frequently to throttle, as a fraction of the specified rate-second.
61 #define THROTTLING_FREQUENCY 8
64 * Create a new basebackup sink that performs throttling and forwards data
65 * to a successor sink.
68 bbsink_throttle_new(bbsink
*next
, uint32 maxrate
)
70 bbsink_throttle
*sink
;
75 sink
= palloc0(sizeof(bbsink_throttle
));
76 *((const bbsink_ops
**) &sink
->base
.bbs_ops
) = &bbsink_throttle_ops
;
77 sink
->base
.bbs_next
= next
;
79 sink
->throttling_sample
=
80 (int64
) maxrate
* (int64
) 1024 / THROTTLING_FREQUENCY
;
83 * The minimum amount of time for throttling_sample bytes to be
86 sink
->elapsed_min_unit
= USECS_PER_SEC
/ THROTTLING_FREQUENCY
;
92 * There's no real work to do here, but we need to record the current time so
93 * that it can be used for future calculations.
96 bbsink_throttle_begin_backup(bbsink
*sink
)
98 bbsink_throttle
*mysink
= (bbsink_throttle
*) sink
;
100 bbsink_forward_begin_backup(sink
);
102 /* The 'real data' starts now (header was ignored). */
103 mysink
->throttled_last
= GetCurrentTimestamp();
107 * First throttle, and then pass archive contents to next sink.
110 bbsink_throttle_archive_contents(bbsink
*sink
, size_t len
)
112 throttle((bbsink_throttle
*) sink
, len
);
114 bbsink_forward_archive_contents(sink
, len
);
118 * First throttle, and then pass manifest contents to next sink.
121 bbsink_throttle_manifest_contents(bbsink
*sink
, size_t len
)
123 throttle((bbsink_throttle
*) sink
, len
);
125 bbsink_forward_manifest_contents(sink
, len
);
129 * Increment the network transfer counter by the given number of bytes,
130 * and sleep if necessary to comply with the requested network transfer
134 throttle(bbsink_throttle
*sink
, size_t increment
)
136 TimeOffset elapsed_min
;
138 Assert(sink
->throttling_counter
>= 0);
140 sink
->throttling_counter
+= increment
;
141 if (sink
->throttling_counter
< sink
->throttling_sample
)
144 /* How much time should have elapsed at minimum? */
145 elapsed_min
= sink
->elapsed_min_unit
*
146 (sink
->throttling_counter
/ sink
->throttling_sample
);
149 * Since the latch could be set repeatedly because of concurrently WAL
150 * activity, sleep in a loop to ensure enough time has passed.
158 /* Time elapsed since the last measurement (and possible wake up). */
159 elapsed
= GetCurrentTimestamp() - sink
->throttled_last
;
161 /* sleep if the transfer is faster than it should be */
162 sleep
= elapsed_min
- elapsed
;
168 /* We're eating a potentially set latch, so check for interrupts */
169 CHECK_FOR_INTERRUPTS();
172 * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
173 * the maximum time to sleep. Thus the cast to long is safe.
175 wait_result
= WaitLatch(MyLatch
,
176 WL_LATCH_SET
| WL_TIMEOUT
| WL_EXIT_ON_PM_DEATH
,
177 (long) (sleep
/ 1000),
178 WAIT_EVENT_BASE_BACKUP_THROTTLE
);
180 if (wait_result
& WL_LATCH_SET
)
181 CHECK_FOR_INTERRUPTS();
184 if (wait_result
& WL_TIMEOUT
)
189 * As we work with integers, only whole multiple of throttling_sample was
190 * processed. The rest will be done during the next call of this function.
192 sink
->throttling_counter
%= sink
->throttling_sample
;
195 * Time interval for the remaining amount and possible next increments
198 sink
->throttled_last
= GetCurrentTimestamp();