1 // SPDX-License-Identifier: GPL-2.0-only
3 * Copyright 2023 Red Hat
6 #include "funnel-workqueue.h"
8 #include <linux/atomic.h>
9 #include <linux/cache.h>
10 #include <linux/completion.h>
11 #include <linux/err.h>
12 #include <linux/kthread.h>
13 #include <linux/percpu.h>
15 #include "funnel-queue.h"
17 #include "memory-alloc.h"
19 #include "permassert.h"
20 #include "string-utils.h"
22 #include "completion.h"
23 #include "status-codes.h"
25 static DEFINE_PER_CPU(unsigned int, service_queue_rotor
);
28 * DOC: Work queue definition.
30 * There are two types of work queues: simple, with one worker thread, and round-robin, which uses
31 * a group of the former to do the work, and assigns work to them in round-robin fashion (roughly).
32 * Externally, both are represented via the same common sub-structure, though there's actually not
33 * a great deal of overlap between the two types internally.
35 struct vdo_work_queue
{
36 /* Name of just the work queue (e.g., "cpuQ12") */
38 bool round_robin_mode
;
39 struct vdo_thread
*owner
;
40 /* Life cycle functions, etc */
41 const struct vdo_work_queue_type
*type
;
44 struct simple_work_queue
{
45 struct vdo_work_queue common
;
46 struct funnel_queue
*priority_lists
[VDO_WORK_Q_MAX_PRIORITY
+ 1];
50 * The fields above are unchanged after setup but often read, and are good candidates for
51 * caching -- and if the max priority is 2, just fit in one x86-64 cache line if aligned.
52 * The fields below are often modified as we sleep and wake, so we want a separate cache
53 * line for performance.
56 /* Any (0 or 1) worker threads waiting for new work to do */
57 wait_queue_head_t waiting_worker_threads ____cacheline_aligned
;
58 /* Hack to reduce wakeup calls if the worker thread is running */
61 /* These are infrequently used so in terms of performance we don't care where they land. */
62 struct task_struct
*thread
;
63 /* Notify creator once worker has initialized */
64 struct completion
*started
;
67 struct round_robin_work_queue
{
68 struct vdo_work_queue common
;
69 struct simple_work_queue
**service_queues
;
70 unsigned int num_service_queues
;
73 static inline struct simple_work_queue
*as_simple_work_queue(struct vdo_work_queue
*queue
)
75 return ((queue
== NULL
) ?
76 NULL
: container_of(queue
, struct simple_work_queue
, common
));
79 static inline struct round_robin_work_queue
*as_round_robin_work_queue(struct vdo_work_queue
*queue
)
81 return ((queue
== NULL
) ?
83 container_of(queue
, struct round_robin_work_queue
, common
));
86 /* Processing normal completions. */
89 * Dequeue and return the next waiting completion, if any.
91 * We scan the funnel queues from highest priority to lowest, once; there is therefore a race
92 * condition where a high-priority completion can be enqueued followed by a lower-priority one, and
93 * we'll grab the latter (but we'll catch the high-priority item on the next call). If strict
94 * enforcement of priorities becomes necessary, this function will need fixing.
96 static struct vdo_completion
*poll_for_completion(struct simple_work_queue
*queue
)
100 for (i
= queue
->common
.type
->max_priority
; i
>= 0; i
--) {
101 struct funnel_queue_entry
*link
= vdo_funnel_queue_poll(queue
->priority_lists
[i
]);
104 return container_of(link
, struct vdo_completion
, work_queue_entry_link
);
110 static void enqueue_work_queue_completion(struct simple_work_queue
*queue
,
111 struct vdo_completion
*completion
)
113 VDO_ASSERT_LOG_ONLY(completion
->my_queue
== NULL
,
114 "completion %px (fn %px) to enqueue (%px) is not already queued (%px)",
115 completion
, completion
->callback
, queue
, completion
->my_queue
);
116 if (completion
->priority
== VDO_WORK_Q_DEFAULT_PRIORITY
)
117 completion
->priority
= queue
->common
.type
->default_priority
;
119 if (VDO_ASSERT(completion
->priority
<= queue
->common
.type
->max_priority
,
120 "priority is in range for queue") != VDO_SUCCESS
)
121 completion
->priority
= 0;
123 completion
->my_queue
= &queue
->common
;
125 /* Funnel queue handles the synchronization for the put. */
126 vdo_funnel_queue_put(queue
->priority_lists
[completion
->priority
],
127 &completion
->work_queue_entry_link
);
130 * Due to how funnel queue synchronization is handled (just atomic operations), the
131 * simplest safe implementation here would be to wake-up any waiting threads after
132 * enqueueing each item. Even if the funnel queue is not empty at the time of adding an
133 * item to the queue, the consumer thread may not see this since it is not guaranteed to
134 * have the same view of the queue as a producer thread.
136 * However, the above is wasteful so instead we attempt to minimize the number of thread
137 * wakeups. Using an idle flag, and careful ordering using memory barriers, we should be
138 * able to determine when the worker thread might be asleep or going to sleep. We use
139 * cmpxchg to try to take ownership (vs other producer threads) of the responsibility for
140 * waking the worker thread, so multiple wakeups aren't tried at once.
142 * This was tuned for some x86 boxes that were handy; it's untested whether doing the read
143 * first is any better or worse for other platforms, even other x86 configurations.
146 if ((atomic_read(&queue
->idle
) != 1) || (atomic_cmpxchg(&queue
->idle
, 1, 0) != 1))
149 /* There's a maximum of one thread in this list. */
150 wake_up(&queue
->waiting_worker_threads
);
153 static void run_start_hook(struct simple_work_queue
*queue
)
155 if (queue
->common
.type
->start
!= NULL
)
156 queue
->common
.type
->start(queue
->private);
159 static void run_finish_hook(struct simple_work_queue
*queue
)
161 if (queue
->common
.type
->finish
!= NULL
)
162 queue
->common
.type
->finish(queue
->private);
166 * Wait for the next completion to process, or until kthread_should_stop indicates that it's time
167 * for us to shut down.
169 * If kthread_should_stop says it's time to stop but we have pending completions return a
172 * Also update statistics relating to scheduler interactions.
174 static struct vdo_completion
*wait_for_next_completion(struct simple_work_queue
*queue
)
176 struct vdo_completion
*completion
;
180 prepare_to_wait(&queue
->waiting_worker_threads
, &wait
,
183 * Don't set the idle flag until a wakeup will not be lost.
185 * Force synchronization between setting the idle flag and checking the funnel
186 * queue; the producer side will do them in the reverse order. (There's still a
187 * race condition we've chosen to allow, because we've got a timeout below that
188 * unwedges us if we hit it, but this may narrow the window a little.)
190 atomic_set(&queue
->idle
, 1);
191 smp_mb(); /* store-load barrier between "idle" and funnel queue */
193 completion
= poll_for_completion(queue
);
194 if (completion
!= NULL
)
198 * We need to check for thread-stop after setting TASK_INTERRUPTIBLE state up
199 * above. Otherwise, schedule() will put the thread to sleep and might miss a
200 * wakeup from kthread_stop() call in vdo_finish_work_queue().
202 if (kthread_should_stop())
208 * Most of the time when we wake, it should be because there's work to do. If it
209 * was a spurious wakeup, continue looping.
211 completion
= poll_for_completion(queue
);
212 if (completion
!= NULL
)
216 finish_wait(&queue
->waiting_worker_threads
, &wait
);
217 atomic_set(&queue
->idle
, 0);
222 static void process_completion(struct simple_work_queue
*queue
,
223 struct vdo_completion
*completion
)
225 if (VDO_ASSERT(completion
->my_queue
== &queue
->common
,
226 "completion %px from queue %px marked as being in this queue (%px)",
227 completion
, queue
, completion
->my_queue
) == VDO_SUCCESS
)
228 completion
->my_queue
= NULL
;
230 vdo_run_completion(completion
);
233 static void service_work_queue(struct simple_work_queue
*queue
)
235 run_start_hook(queue
);
238 struct vdo_completion
*completion
= poll_for_completion(queue
);
240 if (completion
== NULL
)
241 completion
= wait_for_next_completion(queue
);
243 if (completion
== NULL
) {
244 /* No completions but kthread_should_stop() was triggered. */
248 process_completion(queue
, completion
);
251 * Be friendly to a CPU that has other work to do, if the kernel has told us to.
252 * This speeds up some performance tests; that "other work" might include other VDO
259 run_finish_hook(queue
);
262 static int work_queue_runner(void *ptr
)
264 struct simple_work_queue
*queue
= ptr
;
266 complete(queue
->started
);
267 service_work_queue(queue
);
271 /* Creation & teardown */
273 static void free_simple_work_queue(struct simple_work_queue
*queue
)
277 for (i
= 0; i
<= VDO_WORK_Q_MAX_PRIORITY
; i
++)
278 vdo_free_funnel_queue(queue
->priority_lists
[i
]);
279 vdo_free(queue
->common
.name
);
283 static void free_round_robin_work_queue(struct round_robin_work_queue
*queue
)
285 struct simple_work_queue
**queue_table
= queue
->service_queues
;
286 unsigned int count
= queue
->num_service_queues
;
289 queue
->service_queues
= NULL
;
291 for (i
= 0; i
< count
; i
++)
292 free_simple_work_queue(queue_table
[i
]);
293 vdo_free(queue_table
);
294 vdo_free(queue
->common
.name
);
298 void vdo_free_work_queue(struct vdo_work_queue
*queue
)
303 vdo_finish_work_queue(queue
);
305 if (queue
->round_robin_mode
)
306 free_round_robin_work_queue(as_round_robin_work_queue(queue
));
308 free_simple_work_queue(as_simple_work_queue(queue
));
311 static int make_simple_work_queue(const char *thread_name_prefix
, const char *name
,
312 struct vdo_thread
*owner
, void *private,
313 const struct vdo_work_queue_type
*type
,
314 struct simple_work_queue
**queue_ptr
)
316 DECLARE_COMPLETION_ONSTACK(started
);
317 struct simple_work_queue
*queue
;
319 struct task_struct
*thread
= NULL
;
322 VDO_ASSERT_LOG_ONLY((type
->max_priority
<= VDO_WORK_Q_MAX_PRIORITY
),
323 "queue priority count %u within limit %u", type
->max_priority
,
324 VDO_WORK_Q_MAX_PRIORITY
);
326 result
= vdo_allocate(1, struct simple_work_queue
, "simple work queue", &queue
);
327 if (result
!= VDO_SUCCESS
)
330 queue
->private = private;
331 queue
->started
= &started
;
332 queue
->common
.type
= type
;
333 queue
->common
.owner
= owner
;
334 init_waitqueue_head(&queue
->waiting_worker_threads
);
336 result
= vdo_duplicate_string(name
, "queue name", &queue
->common
.name
);
337 if (result
!= VDO_SUCCESS
) {
342 for (i
= 0; i
<= type
->max_priority
; i
++) {
343 result
= vdo_make_funnel_queue(&queue
->priority_lists
[i
]);
344 if (result
!= VDO_SUCCESS
) {
345 free_simple_work_queue(queue
);
350 thread
= kthread_run(work_queue_runner
, queue
, "%s:%s", thread_name_prefix
,
352 if (IS_ERR(thread
)) {
353 free_simple_work_queue(queue
);
354 return (int) PTR_ERR(thread
);
357 queue
->thread
= thread
;
360 * If we don't wait to ensure the thread is running VDO code, a quick kthread_stop (due to
361 * errors elsewhere) could cause it to never get as far as running VDO, skipping the
364 * Eventually we should just make that path safe too, and then we won't need this
367 wait_for_completion(&started
);
374 * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will
375 * be distributed to them in round-robin fashion.
377 * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless
378 * of the actual number of queues and threads allocated here, code outside of the queue
379 * implementation will treat this as a single zone.
381 int vdo_make_work_queue(const char *thread_name_prefix
, const char *name
,
382 struct vdo_thread
*owner
, const struct vdo_work_queue_type
*type
,
383 unsigned int thread_count
, void *thread_privates
[],
384 struct vdo_work_queue
**queue_ptr
)
386 struct round_robin_work_queue
*queue
;
388 char thread_name
[TASK_COMM_LEN
];
391 if (thread_count
== 1) {
392 struct simple_work_queue
*simple_queue
;
393 void *context
= ((thread_privates
!= NULL
) ? thread_privates
[0] : NULL
);
395 result
= make_simple_work_queue(thread_name_prefix
, name
, owner
, context
,
396 type
, &simple_queue
);
397 if (result
== VDO_SUCCESS
)
398 *queue_ptr
= &simple_queue
->common
;
402 result
= vdo_allocate(1, struct round_robin_work_queue
, "round-robin work queue",
404 if (result
!= VDO_SUCCESS
)
407 result
= vdo_allocate(thread_count
, struct simple_work_queue
*,
408 "subordinate work queues", &queue
->service_queues
);
409 if (result
!= VDO_SUCCESS
) {
414 queue
->num_service_queues
= thread_count
;
415 queue
->common
.round_robin_mode
= true;
416 queue
->common
.owner
= owner
;
418 result
= vdo_duplicate_string(name
, "queue name", &queue
->common
.name
);
419 if (result
!= VDO_SUCCESS
) {
420 vdo_free(queue
->service_queues
);
425 *queue_ptr
= &queue
->common
;
427 for (i
= 0; i
< thread_count
; i
++) {
428 void *context
= ((thread_privates
!= NULL
) ? thread_privates
[i
] : NULL
);
430 snprintf(thread_name
, sizeof(thread_name
), "%s%u", name
, i
);
431 result
= make_simple_work_queue(thread_name_prefix
, thread_name
, owner
,
432 context
, type
, &queue
->service_queues
[i
]);
433 if (result
!= VDO_SUCCESS
) {
434 queue
->num_service_queues
= i
;
435 /* Destroy previously created subordinates. */
436 vdo_free_work_queue(vdo_forget(*queue_ptr
));
444 static void finish_simple_work_queue(struct simple_work_queue
*queue
)
446 if (queue
->thread
== NULL
)
449 /* Tells the worker thread to shut down and waits for it to exit. */
450 kthread_stop(queue
->thread
);
451 queue
->thread
= NULL
;
454 static void finish_round_robin_work_queue(struct round_robin_work_queue
*queue
)
456 struct simple_work_queue
**queue_table
= queue
->service_queues
;
457 unsigned int count
= queue
->num_service_queues
;
460 for (i
= 0; i
< count
; i
++)
461 finish_simple_work_queue(queue_table
[i
]);
464 /* No enqueueing of completions should be done once this function is called. */
465 void vdo_finish_work_queue(struct vdo_work_queue
*queue
)
470 if (queue
->round_robin_mode
)
471 finish_round_robin_work_queue(as_round_robin_work_queue(queue
));
473 finish_simple_work_queue(as_simple_work_queue(queue
));
476 /* Debugging dumps */
478 static void dump_simple_work_queue(struct simple_work_queue
*queue
)
480 const char *thread_status
= "no threads";
481 char task_state_report
= '-';
483 if (queue
->thread
!= NULL
) {
484 task_state_report
= task_state_to_char(queue
->thread
);
485 thread_status
= atomic_read(&queue
->idle
) ? "idle" : "running";
488 vdo_log_info("workQ %px (%s) %s (%c)", &queue
->common
, queue
->common
.name
,
489 thread_status
, task_state_report
);
491 /* ->waiting_worker_threads wait queue status? anyone waiting? */
495 * Write to the buffer some info about the completion, for logging. Since the common use case is
496 * dumping info about a lot of completions to syslog all at once, the format favors brevity over
499 void vdo_dump_work_queue(struct vdo_work_queue
*queue
)
501 if (queue
->round_robin_mode
) {
502 struct round_robin_work_queue
*round_robin
= as_round_robin_work_queue(queue
);
505 for (i
= 0; i
< round_robin
->num_service_queues
; i
++)
506 dump_simple_work_queue(round_robin
->service_queues
[i
]);
508 dump_simple_work_queue(as_simple_work_queue(queue
));
512 static void get_function_name(void *pointer
, char *buffer
, size_t buffer_length
)
514 if (pointer
== NULL
) {
516 * Format "%ps" logs a null pointer as "(null)" with a bunch of leading spaces. We
517 * sometimes use this when logging lots of data; don't be so verbose.
519 strscpy(buffer
, "-", buffer_length
);
522 * Use a pragma to defeat gcc's format checking, which doesn't understand that
523 * "%ps" actually does support a precision spec in Linux kernel code.
527 #pragma GCC diagnostic push
528 #pragma GCC diagnostic ignored "-Wformat"
529 snprintf(buffer
, buffer_length
, "%.*ps", buffer_length
- 1, pointer
);
530 #pragma GCC diagnostic pop
532 space
= strchr(buffer
, ' ');
538 void vdo_dump_completion_to_buffer(struct vdo_completion
*completion
, char *buffer
,
541 size_t current_length
=
542 scnprintf(buffer
, length
, "%.*s/", TASK_COMM_LEN
,
543 (completion
->my_queue
== NULL
? "-" : completion
->my_queue
->name
));
545 if (current_length
< length
- 1) {
546 get_function_name((void *) completion
->callback
, buffer
+ current_length
,
547 length
- current_length
);
551 /* Completion submission */
553 * If the completion has a timeout that has already passed, the timeout handler function may be
554 * invoked by this function.
556 void vdo_enqueue_work_queue(struct vdo_work_queue
*queue
,
557 struct vdo_completion
*completion
)
560 * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue
563 struct simple_work_queue
*simple_queue
= NULL
;
565 if (!queue
->round_robin_mode
) {
566 simple_queue
= as_simple_work_queue(queue
);
568 struct round_robin_work_queue
*round_robin
= as_round_robin_work_queue(queue
);
571 * It shouldn't be a big deal if the same rotor gets used for multiple work queues.
572 * Any patterns that might develop are likely to be disrupted by random ordering of
573 * multiple completions and migration between cores, unless the load is so light as
574 * to be regular in ordering of tasks and the threads are confined to individual
575 * cores; with a load that light we won't care.
577 unsigned int rotor
= this_cpu_inc_return(service_queue_rotor
);
578 unsigned int index
= rotor
% round_robin
->num_service_queues
;
580 simple_queue
= round_robin
->service_queues
[index
];
583 enqueue_work_queue_completion(simple_queue
, completion
);
589 * Return the work queue pointer recorded at initialization time in the work-queue stack handle
590 * initialized on the stack of the current thread, if any.
592 static struct simple_work_queue
*get_current_thread_work_queue(void)
595 * In interrupt context, if a vdo thread is what got interrupted, the calls below will find
596 * the queue for the thread which was interrupted. However, the interrupted thread may have
597 * been processing a completion, in which case starting to process another would violate
598 * our concurrency assumptions.
603 if (kthread_func(current
) != work_queue_runner
)
604 /* Not a VDO work queue thread. */
607 return kthread_data(current
);
610 struct vdo_work_queue
*vdo_get_current_work_queue(void)
612 struct simple_work_queue
*queue
= get_current_thread_work_queue();
614 return (queue
== NULL
) ? NULL
: &queue
->common
;
617 struct vdo_thread
*vdo_get_work_queue_owner(struct vdo_work_queue
*queue
)
623 * vdo_get_work_queue_private_data() - Returns the private data for the current thread's work
624 * queue, or NULL if none or if the current thread is not a
627 void *vdo_get_work_queue_private_data(void)
629 struct simple_work_queue
*queue
= get_current_thread_work_queue();
631 return (queue
!= NULL
) ? queue
->private : NULL
;
634 bool vdo_work_queue_type_is(struct vdo_work_queue
*queue
,
635 const struct vdo_work_queue_type
*type
)
637 return (queue
->type
== type
);