1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
6 #include "funnel-requestqueue.h"
8 #include <linux/atomic.h>
9 #include <linux/compiler.h>
10 #include <linux/wait.h>
12 #include "funnel-queue.h"
14 #include "memory-alloc.h"
15 #include "thread-utils.h"
18 * This queue will attempt to handle requests in reasonably sized batches instead of reacting
19 * immediately to each new request. The wait time between batches is dynamically adjusted up or
20 * down to try to balance responsiveness against wasted thread run time.
22 * If the wait time becomes long enough, the queue will become dormant and must be explicitly
23 * awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel
24 * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a
25 * wakeup of the worker thread.
27 * When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to
28 * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before
29 * going to sleep is done inside the wait_event_interruptible() macro, after a point where one or
30 * more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel
31 * queue's "next" field update isn't visible yet to make the entry accessible, its existence will
32 * kick the worker thread out of dormant mode and back into timer-based mode.
34 * Unbatched requests are used to communicate between different zone threads and will also cause
35 * the queue to awaken immediately.
40 MICROSECOND
= 1000 * NANOSECOND
,
41 MILLISECOND
= 1000 * MICROSECOND
,
42 DEFAULT_WAIT_TIME
= 20 * MICROSECOND
,
43 MINIMUM_WAIT_TIME
= DEFAULT_WAIT_TIME
/ 2,
44 MAXIMUM_WAIT_TIME
= MILLISECOND
,
49 struct uds_request_queue
{
50 /* Wait queue for synchronizing producers and consumer */
51 struct wait_queue_head wait_head
;
52 /* Function to process a request */
53 uds_request_queue_processor_fn processor
;
54 /* Queue of new incoming requests */
55 struct funnel_queue
*main_queue
;
56 /* Queue of old requests to retry */
57 struct funnel_queue
*retry_queue
;
58 /* The thread id of the worker thread */
59 struct thread
*thread
;
60 /* True if the worker was started */
62 /* When true, requests can be enqueued */
64 /* A flag set when the worker is waiting without a timeout */
68 static inline struct uds_request
*poll_queues(struct uds_request_queue
*queue
)
70 struct funnel_queue_entry
*entry
;
72 entry
= vdo_funnel_queue_poll(queue
->retry_queue
);
74 return container_of(entry
, struct uds_request
, queue_link
);
76 entry
= vdo_funnel_queue_poll(queue
->main_queue
);
78 return container_of(entry
, struct uds_request
, queue_link
);
83 static inline bool are_queues_idle(struct uds_request_queue
*queue
)
85 return vdo_is_funnel_queue_idle(queue
->retry_queue
) &&
86 vdo_is_funnel_queue_idle(queue
->main_queue
);
90 * Determine if there is a next request to process, and return it if there is. Also return flags
91 * indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether
92 * the thread did sleep before returning a new request.
94 static inline bool dequeue_request(struct uds_request_queue
*queue
,
95 struct uds_request
**request_ptr
, bool *waited_ptr
)
97 struct uds_request
*request
= poll_queues(queue
);
99 if (request
!= NULL
) {
100 *request_ptr
= request
;
104 if (!READ_ONCE(queue
->running
)) {
105 /* Wake the worker thread so it can exit. */
115 static void wait_for_request(struct uds_request_queue
*queue
, bool dormant
,
116 unsigned long timeout
, struct uds_request
**request
,
120 wait_event_interruptible(queue
->wait_head
,
121 (dequeue_request(queue
, request
, waited
) ||
122 !are_queues_idle(queue
)));
126 wait_event_interruptible_hrtimeout(queue
->wait_head
,
127 dequeue_request(queue
, request
, waited
),
128 ns_to_ktime(timeout
));
131 static void request_queue_worker(void *arg
)
133 struct uds_request_queue
*queue
= arg
;
134 struct uds_request
*request
= NULL
;
135 unsigned long time_batch
= DEFAULT_WAIT_TIME
;
136 bool dormant
= atomic_read(&queue
->dormant
);
138 long current_batch
= 0;
141 wait_for_request(queue
, dormant
, time_batch
, &request
, &waited
);
142 if (likely(request
!= NULL
)) {
144 queue
->processor(request
);
145 } else if (!READ_ONCE(queue
->running
)) {
151 * The queue has been roused from dormancy. Clear the flag so enqueuers can
152 * stop broadcasting. No fence is needed for this transition.
154 atomic_set(&queue
->dormant
, false);
156 time_batch
= DEFAULT_WAIT_TIME
;
159 * We waited for this request to show up. Adjust the wait time to smooth
160 * out the batch size.
162 if (current_batch
< MINIMUM_BATCH
) {
164 * If the last batch of requests was too small, increase the wait
167 time_batch
+= time_batch
/ 4;
168 if (time_batch
>= MAXIMUM_WAIT_TIME
) {
169 atomic_set(&queue
->dormant
, true);
172 } else if (current_batch
> MAXIMUM_BATCH
) {
174 * If the last batch of requests was too large, decrease the wait
177 time_batch
-= time_batch
/ 4;
178 if (time_batch
< MINIMUM_WAIT_TIME
)
179 time_batch
= MINIMUM_WAIT_TIME
;
186 * Ensure that we process any remaining requests that were enqueued before trying to shut
187 * down. The corresponding write barrier is in uds_request_queue_finish().
190 while ((request
= poll_queues(queue
)) != NULL
)
191 queue
->processor(request
);
194 int uds_make_request_queue(const char *queue_name
,
195 uds_request_queue_processor_fn processor
,
196 struct uds_request_queue
**queue_ptr
)
199 struct uds_request_queue
*queue
;
201 result
= vdo_allocate(1, struct uds_request_queue
, __func__
, &queue
);
202 if (result
!= VDO_SUCCESS
)
205 queue
->processor
= processor
;
206 queue
->running
= true;
207 atomic_set(&queue
->dormant
, false);
208 init_waitqueue_head(&queue
->wait_head
);
210 result
= vdo_make_funnel_queue(&queue
->main_queue
);
211 if (result
!= VDO_SUCCESS
) {
212 uds_request_queue_finish(queue
);
216 result
= vdo_make_funnel_queue(&queue
->retry_queue
);
217 if (result
!= VDO_SUCCESS
) {
218 uds_request_queue_finish(queue
);
222 result
= vdo_create_thread(request_queue_worker
, queue
, queue_name
,
224 if (result
!= VDO_SUCCESS
) {
225 uds_request_queue_finish(queue
);
229 queue
->started
= true;
234 static inline void wake_up_worker(struct uds_request_queue
*queue
)
236 if (wq_has_sleeper(&queue
->wait_head
))
237 wake_up(&queue
->wait_head
);
240 void uds_request_queue_enqueue(struct uds_request_queue
*queue
,
241 struct uds_request
*request
)
243 struct funnel_queue
*sub_queue
;
244 bool unbatched
= request
->unbatched
;
246 sub_queue
= request
->requeued
? queue
->retry_queue
: queue
->main_queue
;
247 vdo_funnel_queue_put(sub_queue
, &request
->queue_link
);
250 * We must wake the worker thread when it is dormant. A read fence isn't needed here since
251 * we know the queue operation acts as one.
253 if (atomic_read(&queue
->dormant
) || unbatched
)
254 wake_up_worker(queue
);
257 void uds_request_queue_finish(struct uds_request_queue
*queue
)
263 * This memory barrier ensures that any requests we queued will be seen. The point is that
264 * when dequeue_request() sees the following update to the running flag, it will also be
265 * able to see any change we made to a next field in the funnel queue entry. The
266 * corresponding read barrier is in request_queue_worker().
269 WRITE_ONCE(queue
->running
, false);
271 if (queue
->started
) {
272 wake_up_worker(queue
);
273 vdo_join_threads(queue
->thread
);
276 vdo_free_funnel_queue(queue
->main_queue
);
277 vdo_free_funnel_queue(queue
->retry_queue
);