2 /* copyright (c) 2013-2024, The Tor Project, Inc. */
3 /* See LICENSE for licensing information */
8 * \brief Implements worker threads, queues of work for them, and mechanisms
9 * for them to send answers back to the main thread.
11 * The main structure here is a threadpool_t : it manages a set of worker
12 * threads, a queue of pending work, and a reply queue. Every piece of work
13 * is a workqueue_entry_t, containing data to process and a function to
16 * The main thread informs the worker threads of pending work by using a
17 * condition variable. The workers inform the main process of completed work
18 * by using an alert_sockets_t object, as implemented in net/alertsock.c.
20 * The main thread can also queue an "update" that will be handled by all the
21 * workers. This is useful for updating state that all the workers share.
23 * In Tor today, there is currently only one thread pool, managed
24 * in cpuworker.c and handling a variety of types of work, from the original
25 * "onion skin" circuit handshakes, to consensus diff computation, to
26 * client-side onion service PoW generation.
30 #include "lib/evloop/compat_libevent.h"
31 #include "lib/evloop/workqueue.h"
33 #include "lib/crypt_ops/crypto_rand.h"
34 #include "lib/intmath/weakrng.h"
35 #include "lib/log/ratelim.h"
36 #include "lib/log/log.h"
37 #include "lib/log/util_bug.h"
38 #include "lib/net/alertsock.h"
39 #include "lib/net/socket.h"
40 #include "lib/thread/threads.h"
42 #include "ext/tor_queue.h"
43 #include <event2/event.h>
46 #define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
47 #define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
48 #define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
50 TOR_TAILQ_HEAD(work_tailq_t
, workqueue_entry_t
);
51 typedef struct work_tailq_t work_tailq_t
;
54 /** An array of pointers to workerthread_t: one for each running worker
56 struct workerthread_t
**threads
;
58 /** Condition variable that we wait on when we have no work, and which
59 * gets signaled when our queue becomes nonempty. */
61 /** Queues of pending work that we have to do. The queue with priority
62 * <b>p</b> is work[p]. */
63 work_tailq_t work
[WORKQUEUE_N_PRIORITIES
];
65 /** The current 'update generation' of the threadpool. Any thread that is
66 * at an earlier generation needs to run the update function. */
69 /** Function that should be run for updates on each thread. */
70 workqueue_reply_t (*update_fn
)(void *, void *);
71 /** Function to free update arguments if they can't be run. */
72 void (*free_update_arg_fn
)(void *);
73 /** Array of n_threads update arguments. */
75 /** Event to notice when another thread has sent a reply. */
76 struct event
*reply_event
;
77 void (*reply_cb
)(threadpool_t
*);
79 /** Number of elements in threads. */
81 /** Mutex to protect all the above fields. */
84 /** A reply queue to use when constructing new threads. */
85 replyqueue_t
*reply_queue
;
87 /** Functions used to allocate and free thread state. */
88 void *(*new_thread_state_fn
)(void*);
89 void (*free_thread_state_fn
)(void*);
90 void *new_thread_state_arg
;
93 /** Used to put a workqueue_priority_t value into a bitfield. */
94 #define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
95 /** Number of bits needed to hold all legal values of workqueue_priority_t */
96 #define WORKQUEUE_PRIORITY_BITS 2
98 struct workqueue_entry_t
{
99 /** The next workqueue_entry_t that's pending on the same thread or
101 TOR_TAILQ_ENTRY(workqueue_entry_t
) next_work
;
102 /** The threadpool to which this workqueue_entry_t was assigned. This field
103 * is set when the workqueue_entry_t is created, and won't be cleared until
104 * after it's handled in the main thread. */
105 struct threadpool_t
*on_pool
;
106 /** True iff this entry is waiting for a worker to start processing it. */
108 /** Priority of this entry. */
109 workqueue_priority_bitfield_t priority
: WORKQUEUE_PRIORITY_BITS
;
110 /** Function to run in the worker thread. */
111 workqueue_reply_t (*fn
)(void *state
, void *arg
);
112 /** Function to run while processing the reply queue. */
113 void (*reply_fn
)(void *arg
);
114 /** Argument for the above functions. */
118 struct replyqueue_t
{
119 /** Mutex to protect the answers field */
121 /** Doubly-linked list of answers that the reply queue needs to handle. */
122 TOR_TAILQ_HEAD(, workqueue_entry_t
) answers
;
124 /** Mechanism to wake up the main thread when it is receiving answers. */
125 alert_sockets_t alert
;
128 /** A worker thread represents a single thread in a thread pool. */
129 typedef struct workerthread_t
{
130 /** Which thread it this? In range 0..in_pool->n_threads-1 */
132 /** The pool this thread is a part of. */
133 struct threadpool_t
*in_pool
;
134 /** User-supplied state field that we pass to the worker functions of each
137 /** Reply queue to which we pass our results. */
138 replyqueue_t
*reply_queue
;
139 /** The current update generation of this thread */
141 /** One over the probability of taking work from a lower-priority queue. */
142 int32_t lower_priority_chance
;
145 static void queue_reply(replyqueue_t
*queue
, workqueue_entry_t
*work
);
146 static void workerthread_free_(workerthread_t
*thread
);
147 #define workerthread_free(thread) \
148 FREE_AND_NULL(workerthread_t, workerthread_free_, (thread))
149 static void replyqueue_free_(replyqueue_t
*queue
);
150 #define replyqueue_free(queue) \
151 FREE_AND_NULL(replyqueue_t, replyqueue_free_, (queue))
153 /** Allocate and return a new workqueue_entry_t, set up to run the function
154 * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
155 * thread. See threadpool_queue_work() for full documentation. */
156 static workqueue_entry_t
*
157 workqueue_entry_new(workqueue_reply_t (*fn
)(void*, void*),
158 void (*reply_fn
)(void*),
161 workqueue_entry_t
*ent
= tor_malloc_zero(sizeof(workqueue_entry_t
));
163 ent
->reply_fn
= reply_fn
;
165 ent
->priority
= WQ_PRI_HIGH
;
169 #define workqueue_entry_free(ent) \
170 FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))
173 * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on
177 workqueue_entry_free_(workqueue_entry_t
*ent
)
181 memset(ent
, 0xf0, sizeof(*ent
));
186 * Cancel a workqueue_entry_t that has been returned from
187 * threadpool_queue_work.
189 * You must not call this function on any work whose reply function has been
190 * executed in the main thread; that will cause undefined behavior (probably,
193 * If the work is cancelled, this function return the argument passed to the
194 * work function. It is the caller's responsibility to free this storage.
196 * This function will have no effect if the worker thread has already executed
197 * or begun to execute the work item. In that case, it will return NULL.
200 workqueue_entry_cancel(workqueue_entry_t
*ent
)
204 tor_mutex_acquire(&ent
->on_pool
->lock
);
205 workqueue_priority_t prio
= ent
->priority
;
207 TOR_TAILQ_REMOVE(&ent
->on_pool
->work
[prio
], ent
, next_work
);
211 tor_mutex_release(&ent
->on_pool
->lock
);
214 workqueue_entry_free(ent
);
223 worker_thread_has_work(workerthread_t
*thread
)
226 for (i
= WORKQUEUE_PRIORITY_FIRST
; i
<= WORKQUEUE_PRIORITY_LAST
; ++i
) {
227 if (!TOR_TAILQ_EMPTY(&thread
->in_pool
->work
[i
]))
230 return thread
->generation
!= thread
->in_pool
->generation
;
233 /** Extract the next workqueue_entry_t from the the thread's pool, removing
234 * it from the relevant queues and marking it as non-pending.
236 * The caller must hold the lock. */
237 static workqueue_entry_t
*
238 worker_thread_extract_next_work(workerthread_t
*thread
)
240 threadpool_t
*pool
= thread
->in_pool
;
241 work_tailq_t
*queue
= NULL
, *this_queue
;
243 for (i
= WORKQUEUE_PRIORITY_FIRST
; i
<= WORKQUEUE_PRIORITY_LAST
; ++i
) {
244 this_queue
= &pool
->work
[i
];
245 if (!TOR_TAILQ_EMPTY(this_queue
)) {
247 if (! crypto_fast_rng_one_in_n(get_thread_fast_rng(),
248 thread
->lower_priority_chance
)) {
249 /* Usually we'll just break now, so that we can get out of the loop
250 * and use the queue where we found work. But with a small
251 * probability, we'll keep looking for lower priority work, so that
252 * we don't ignore our low-priority queues entirely. */
261 workqueue_entry_t
*work
= TOR_TAILQ_FIRST(queue
);
262 TOR_TAILQ_REMOVE(queue
, work
, next_work
);
268 * Main function for the worker thread.
271 worker_thread_main(void *thread_
)
273 workerthread_t
*thread
= thread_
;
274 threadpool_t
*pool
= thread
->in_pool
;
275 workqueue_entry_t
*work
;
276 workqueue_reply_t result
;
278 tor_mutex_acquire(&pool
->lock
);
280 /* lock must be held at this point. */
281 while (worker_thread_has_work(thread
)) {
282 /* lock must be held at this point. */
283 if (thread
->in_pool
->generation
!= thread
->generation
) {
284 void *arg
= thread
->in_pool
->update_args
[thread
->index
];
285 thread
->in_pool
->update_args
[thread
->index
] = NULL
;
286 workqueue_reply_t (*update_fn
)(void*,void*) =
287 thread
->in_pool
->update_fn
;
288 thread
->generation
= thread
->in_pool
->generation
;
289 tor_mutex_release(&pool
->lock
);
291 workqueue_reply_t r
= update_fn(thread
->state
, arg
);
293 if (r
!= WQ_RPL_REPLY
) {
297 tor_mutex_acquire(&pool
->lock
);
300 work
= worker_thread_extract_next_work(thread
);
301 if (BUG(work
== NULL
))
303 tor_mutex_release(&pool
->lock
);
305 /* We run the work function without holding the thread lock. This
306 * is the main thread's first opportunity to give us more work. */
307 result
= work
->fn(thread
->state
, work
->arg
);
309 /* Queue the reply for the main thread. */
310 queue_reply(thread
->reply_queue
, work
);
312 /* We may need to exit the thread. */
313 if (result
!= WQ_RPL_REPLY
) {
316 tor_mutex_acquire(&pool
->lock
);
318 /* At this point the lock is held, and there is no work in this thread's
321 /* TODO: support an idle-function */
323 /* Okay. Now, wait till somebody has work for us. */
324 if (tor_cond_wait(&pool
->condition
, &pool
->lock
, NULL
) < 0) {
325 log_warn(LD_GENERAL
, "Fail tor_cond_wait.");
330 /** Put a reply on the reply queue. The reply must not currently be on
331 * any thread's work queue. */
333 queue_reply(replyqueue_t
*queue
, workqueue_entry_t
*work
)
336 tor_mutex_acquire(&queue
->lock
);
337 was_empty
= TOR_TAILQ_EMPTY(&queue
->answers
);
338 TOR_TAILQ_INSERT_TAIL(&queue
->answers
, work
, next_work
);
339 tor_mutex_release(&queue
->lock
);
342 if (queue
->alert
.alert_fn(queue
->alert
.write_fd
) < 0) {
348 /** Allocate and start a new worker thread to use state object <b>state</b>,
349 * and send responses to <b>replyqueue</b>. */
350 static workerthread_t
*
351 workerthread_new(int32_t lower_priority_chance
,
352 void *state
, threadpool_t
*pool
, replyqueue_t
*replyqueue
)
354 workerthread_t
*thr
= tor_malloc_zero(sizeof(workerthread_t
));
356 thr
->reply_queue
= replyqueue
;
358 thr
->lower_priority_chance
= lower_priority_chance
;
360 if (spawn_func(worker_thread_main
, thr
) < 0) {
362 tor_assert_nonfatal_unreached();
363 log_err(LD_GENERAL
, "Can't launch worker thread.");
364 workerthread_free(thr
);
373 * Free up the resources allocated by a worker thread.
376 workerthread_free_(workerthread_t
*thread
)
382 * Queue an item of work for a thread in a thread pool. The function
383 * <b>fn</b> will be run in a worker thread, and will receive as arguments the
384 * thread's state object, and the provided object <b>arg</b>. It must return
385 * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN.
387 * Regardless of its return value, the function <b>reply_fn</b> will later be
388 * run in the main thread when it invokes replyqueue_process(), and will
389 * receive as its argument the same <b>arg</b> object. It's the reply
390 * function's responsibility to free the work object.
392 * On success, return a workqueue_entry_t object that can be passed to
393 * workqueue_entry_cancel(). On failure, return NULL. (Failure is not
394 * currently possible, but callers should check anyway.)
396 * Items are executed in a loose priority order -- each thread will usually
397 * take from the queued work with the highest prioirity, but will occasionally
398 * visit lower-priority queues to keep them from starving completely.
400 * Note that because of priorities and thread behavior, work items may not
401 * be executed strictly in order.
404 threadpool_queue_work_priority(threadpool_t
*pool
,
405 workqueue_priority_t prio
,
406 workqueue_reply_t (*fn
)(void *, void *),
407 void (*reply_fn
)(void *),
410 tor_assert(((int)prio
) >= WORKQUEUE_PRIORITY_FIRST
&&
411 ((int)prio
) <= WORKQUEUE_PRIORITY_LAST
);
413 workqueue_entry_t
*ent
= workqueue_entry_new(fn
, reply_fn
, arg
);
416 ent
->priority
= prio
;
418 tor_mutex_acquire(&pool
->lock
);
420 TOR_TAILQ_INSERT_TAIL(&pool
->work
[prio
], ent
, next_work
);
422 tor_cond_signal_one(&pool
->condition
);
424 tor_mutex_release(&pool
->lock
);
429 /** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */
431 threadpool_queue_work(threadpool_t
*pool
,
432 workqueue_reply_t (*fn
)(void *, void *),
433 void (*reply_fn
)(void *),
436 return threadpool_queue_work_priority(pool
, WQ_PRI_HIGH
, fn
, reply_fn
, arg
);
440 * Queue a copy of a work item for every thread in a pool. This can be used,
441 * for example, to tell the threads to update some parameter in their states.
443 * Arguments are as for <b>threadpool_queue_work</b>, except that the
444 * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to
447 * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update
448 * will be run. If a new update is scheduled before the old update finishes
449 * running, then the new will replace the old in any threads that haven't run
452 * Return 0 on success, -1 on failure.
455 threadpool_queue_update(threadpool_t
*pool
,
456 void *(*dup_fn
)(void *),
457 workqueue_reply_t (*fn
)(void *, void *),
458 void (*free_fn
)(void *),
462 void (*old_args_free_fn
)(void *arg
);
466 tor_mutex_acquire(&pool
->lock
);
467 n_threads
= pool
->n_threads
;
468 old_args
= pool
->update_args
;
469 old_args_free_fn
= pool
->free_update_arg_fn
;
471 new_args
= tor_calloc(n_threads
, sizeof(void*));
472 for (i
= 0; i
< n_threads
; ++i
) {
474 new_args
[i
] = dup_fn(arg
);
479 pool
->update_args
= new_args
;
480 pool
->free_update_arg_fn
= free_fn
;
481 pool
->update_fn
= fn
;
484 tor_cond_signal_all(&pool
->condition
);
486 tor_mutex_release(&pool
->lock
);
489 for (i
= 0; i
< n_threads
; ++i
) {
490 if (old_args
[i
] && old_args_free_fn
)
491 old_args_free_fn(old_args
[i
]);
499 /** Don't have more than this many threads per pool. */
500 #define MAX_THREADS 1024
502 /** For half of our threads, choose lower priority queues with probability
503 * 1/N for each of these values. Both are chosen somewhat arbitrarily. If
504 * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks
505 * stalling forever. If it's too high, we have a risk of low-priority tasks
506 * grabbing half of the threads. */
507 #define CHANCE_PERMISSIVE 37
508 #define CHANCE_STRICT INT32_MAX
510 /** Launch threads until we have <b>n</b>. */
512 threadpool_start_threads(threadpool_t
*pool
, int n
)
515 return -1; // LCOV_EXCL_LINE
519 tor_mutex_acquire(&pool
->lock
);
521 if (pool
->n_threads
< n
)
522 pool
->threads
= tor_reallocarray(pool
->threads
,
523 sizeof(workerthread_t
*), n
);
525 while (pool
->n_threads
< n
) {
526 /* For half of our threads, we'll choose lower priorities permissively;
527 * for the other half, we'll stick more strictly to higher priorities.
528 * This keeps slow low-priority tasks from taking over completely. */
529 int32_t chance
= (pool
->n_threads
& 1) ? CHANCE_STRICT
: CHANCE_PERMISSIVE
;
531 void *state
= pool
->new_thread_state_fn(pool
->new_thread_state_arg
);
532 workerthread_t
*thr
= workerthread_new(chance
,
533 state
, pool
, pool
->reply_queue
);
537 tor_assert_nonfatal_unreached();
538 pool
->free_thread_state_fn(state
);
539 tor_mutex_release(&pool
->lock
);
543 thr
->index
= pool
->n_threads
;
544 pool
->threads
[pool
->n_threads
++] = thr
;
546 tor_mutex_release(&pool
->lock
);
552 * Construct a new thread pool with <b>n</b> worker threads, configured to
553 * send their output to <b>replyqueue</b>. The threads' states will be
554 * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b>
555 * as its argument. When the threads close, they will call
556 * <b>free_thread_state_fn</b> on their states.
559 threadpool_new(int n_threads
,
560 replyqueue_t
*replyqueue
,
561 void *(*new_thread_state_fn
)(void*),
562 void (*free_thread_state_fn
)(void*),
566 pool
= tor_malloc_zero(sizeof(threadpool_t
));
567 tor_mutex_init_nonrecursive(&pool
->lock
);
568 tor_cond_init(&pool
->condition
);
570 for (i
= WORKQUEUE_PRIORITY_FIRST
; i
<= WORKQUEUE_PRIORITY_LAST
; ++i
) {
571 TOR_TAILQ_INIT(&pool
->work
[i
]);
574 pool
->new_thread_state_fn
= new_thread_state_fn
;
575 pool
->new_thread_state_arg
= arg
;
576 pool
->free_thread_state_fn
= free_thread_state_fn
;
577 pool
->reply_queue
= replyqueue
;
579 if (threadpool_start_threads(pool
, n_threads
) < 0) {
581 tor_assert_nonfatal_unreached();
582 tor_cond_uninit(&pool
->condition
);
583 tor_mutex_uninit(&pool
->lock
);
584 threadpool_free(pool
);
593 * Free up the resources allocated by worker threads, worker thread pool, ...
596 threadpool_free_(threadpool_t
*pool
)
602 for (int i
= 0; i
!= pool
->n_threads
; ++i
)
603 workerthread_free(pool
->threads
[i
]);
605 tor_free(pool
->threads
);
608 if (pool
->update_args
)
609 pool
->free_update_arg_fn(pool
->update_args
);
611 if (pool
->reply_event
) {
612 tor_event_del(pool
->reply_event
);
613 tor_event_free(pool
->reply_event
);
616 if (pool
->reply_queue
)
617 replyqueue_free(pool
->reply_queue
);
619 if (pool
->new_thread_state_arg
)
620 pool
->free_thread_state_fn(pool
->new_thread_state_arg
);
625 /** Return the reply queue associated with a given thread pool. */
627 threadpool_get_replyqueue(threadpool_t
*tp
)
629 return tp
->reply_queue
;
632 /** Allocate a new reply queue. Reply queues are used to pass results from
633 * worker threads to the main thread. Since the main thread is running an
634 * IO-centric event loop, it needs to get woken up with means other than a
635 * condition variable. */
637 replyqueue_new(uint32_t alertsocks_flags
)
641 rq
= tor_malloc_zero(sizeof(replyqueue_t
));
642 if (alert_sockets_create(&rq
->alert
, alertsocks_flags
) < 0) {
649 tor_mutex_init(&rq
->lock
);
650 TOR_TAILQ_INIT(&rq
->answers
);
656 * Free up the resources allocated by a reply queue.
659 replyqueue_free_(replyqueue_t
*queue
)
664 workqueue_entry_t
*work
;
666 while (!TOR_TAILQ_EMPTY(&queue
->answers
)) {
667 work
= TOR_TAILQ_FIRST(&queue
->answers
);
668 TOR_TAILQ_REMOVE(&queue
->answers
, work
, next_work
);
669 workqueue_entry_free(work
);
675 /** Internal: Run from the libevent mainloop when there is work to handle in
676 * the reply queue handler. */
678 reply_event_cb(evutil_socket_t sock
, short events
, void *arg
)
680 threadpool_t
*tp
= arg
;
683 replyqueue_process(tp
->reply_queue
);
688 /** Register the threadpool <b>tp</b>'s reply queue with Tor's global
689 * libevent mainloop. If <b>cb</b> is provided, it is run after
690 * each time there is work to process from the reply queue. Return 0 on
691 * success, -1 on failure.
694 threadpool_register_reply_event(threadpool_t
*tp
,
695 void (*cb
)(threadpool_t
*tp
))
697 struct event_base
*base
= tor_libevent_get_base();
699 if (tp
->reply_event
) {
700 tor_event_free(tp
->reply_event
);
702 tp
->reply_event
= tor_event_new(base
,
703 tp
->reply_queue
->alert
.read_fd
,
707 tor_assert(tp
->reply_event
);
709 return event_add(tp
->reply_event
, NULL
);
713 * Process all pending replies on a reply queue. The main thread should call
714 * this function every time the socket returned by replyqueue_get_socket() is
718 replyqueue_process(replyqueue_t
*queue
)
720 int r
= queue
->alert
.drain_fn(queue
->alert
.read_fd
);
723 static ratelim_t warn_limit
= RATELIM_INIT(7200);
724 log_fn_ratelim(&warn_limit
, LOG_WARN
, LD_GENERAL
,
725 "Failure from drain_fd: %s",
726 tor_socket_strerror(-r
));
730 tor_mutex_acquire(&queue
->lock
);
731 while (!TOR_TAILQ_EMPTY(&queue
->answers
)) {
732 /* lock must be held at this point.*/
733 workqueue_entry_t
*work
= TOR_TAILQ_FIRST(&queue
->answers
);
734 TOR_TAILQ_REMOVE(&queue
->answers
, work
, next_work
);
735 tor_mutex_release(&queue
->lock
);
736 work
->on_pool
= NULL
;
738 work
->reply_fn(work
->arg
);
739 workqueue_entry_free(work
);
741 tor_mutex_acquire(&queue
->lock
);
744 tor_mutex_release(&queue
->lock
);
747 /** Return the number of threads configured for the given pool. */
749 threadpool_get_n_threads(threadpool_t
*tp
)
752 return tp
->n_threads
;