Fix: Use FREE_AND_NULL() for releasing workqueue resources
[tor.git] / src / lib / evloop / workqueue.c
blob17ab44e3ab08cbe00ed98be2ae5f4032ddbb9468
2 /* copyright (c) 2013-2024, The Tor Project, Inc. */
3 /* See LICENSE for licensing information */
5 /**
6 * \file workqueue.c
8 * \brief Implements worker threads, queues of work for them, and mechanisms
9 * for them to send answers back to the main thread.
11 * The main structure here is a threadpool_t : it manages a set of worker
12 * threads, a queue of pending work, and a reply queue. Every piece of work
13 * is a workqueue_entry_t, containing data to process and a function to
14 * process it with.
16 * The main thread informs the worker threads of pending work by using a
17 * condition variable. The workers inform the main process of completed work
18 * by using an alert_sockets_t object, as implemented in net/alertsock.c.
20 * The main thread can also queue an "update" that will be handled by all the
21 * workers. This is useful for updating state that all the workers share.
23 * In Tor today, there is currently only one thread pool, managed
24 * in cpuworker.c and handling a variety of types of work, from the original
25 * "onion skin" circuit handshakes, to consensus diff computation, to
26 * client-side onion service PoW generation.
29 #include "orconfig.h"
30 #include "lib/evloop/compat_libevent.h"
31 #include "lib/evloop/workqueue.h"
33 #include "lib/crypt_ops/crypto_rand.h"
34 #include "lib/intmath/weakrng.h"
35 #include "lib/log/ratelim.h"
36 #include "lib/log/log.h"
37 #include "lib/log/util_bug.h"
38 #include "lib/net/alertsock.h"
39 #include "lib/net/socket.h"
40 #include "lib/thread/threads.h"
42 #include "ext/tor_queue.h"
43 #include <event2/event.h>
44 #include <string.h>
46 #define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
47 #define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
48 #define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
50 TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_t);
51 typedef struct work_tailq_t work_tailq_t;
53 struct threadpool_t {
54 /** An array of pointers to workerthread_t: one for each running worker
55 * thread. */
56 struct workerthread_t **threads;
58 /** Condition variable that we wait on when we have no work, and which
59 * gets signaled when our queue becomes nonempty. */
60 tor_cond_t condition;
61 /** Queues of pending work that we have to do. The queue with priority
62 * <b>p</b> is work[p]. */
63 work_tailq_t work[WORKQUEUE_N_PRIORITIES];
65 /** The current 'update generation' of the threadpool. Any thread that is
66 * at an earlier generation needs to run the update function. */
67 unsigned generation;
69 /** Function that should be run for updates on each thread. */
70 workqueue_reply_t (*update_fn)(void *, void *);
71 /** Function to free update arguments if they can't be run. */
72 void (*free_update_arg_fn)(void *);
73 /** Array of n_threads update arguments. */
74 void **update_args;
75 /** Event to notice when another thread has sent a reply. */
76 struct event *reply_event;
77 void (*reply_cb)(threadpool_t *);
79 /** Number of elements in threads. */
80 int n_threads;
81 /** Mutex to protect all the above fields. */
82 tor_mutex_t lock;
84 /** A reply queue to use when constructing new threads. */
85 replyqueue_t *reply_queue;
87 /** Functions used to allocate and free thread state. */
88 void *(*new_thread_state_fn)(void*);
89 void (*free_thread_state_fn)(void*);
90 void *new_thread_state_arg;
93 /** Used to put a workqueue_priority_t value into a bitfield. */
94 #define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
95 /** Number of bits needed to hold all legal values of workqueue_priority_t */
96 #define WORKQUEUE_PRIORITY_BITS 2
98 struct workqueue_entry_t {
99 /** The next workqueue_entry_t that's pending on the same thread or
100 * reply queue. */
101 TOR_TAILQ_ENTRY(workqueue_entry_t) next_work;
102 /** The threadpool to which this workqueue_entry_t was assigned. This field
103 * is set when the workqueue_entry_t is created, and won't be cleared until
104 * after it's handled in the main thread. */
105 struct threadpool_t *on_pool;
106 /** True iff this entry is waiting for a worker to start processing it. */
107 uint8_t pending;
108 /** Priority of this entry. */
109 workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS;
110 /** Function to run in the worker thread. */
111 workqueue_reply_t (*fn)(void *state, void *arg);
112 /** Function to run while processing the reply queue. */
113 void (*reply_fn)(void *arg);
114 /** Argument for the above functions. */
115 void *arg;
118 struct replyqueue_t {
119 /** Mutex to protect the answers field */
120 tor_mutex_t lock;
121 /** Doubly-linked list of answers that the reply queue needs to handle. */
122 TOR_TAILQ_HEAD(, workqueue_entry_t) answers;
124 /** Mechanism to wake up the main thread when it is receiving answers. */
125 alert_sockets_t alert;
128 /** A worker thread represents a single thread in a thread pool. */
129 typedef struct workerthread_t {
130 /** Which thread it this? In range 0..in_pool->n_threads-1 */
131 int index;
132 /** The pool this thread is a part of. */
133 struct threadpool_t *in_pool;
134 /** User-supplied state field that we pass to the worker functions of each
135 * work item. */
136 void *state;
137 /** Reply queue to which we pass our results. */
138 replyqueue_t *reply_queue;
139 /** The current update generation of this thread */
140 unsigned generation;
141 /** One over the probability of taking work from a lower-priority queue. */
142 int32_t lower_priority_chance;
143 } workerthread_t;
145 static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
146 static void workerthread_free_(workerthread_t *thread);
147 #define workerthread_free(thread) \
148 FREE_AND_NULL(workerthread_t, workerthread_free_, (thread))
149 static void replyqueue_free_(replyqueue_t *queue);
150 #define replyqueue_free(queue) \
151 FREE_AND_NULL(replyqueue_t, replyqueue_free_, (queue))
153 /** Allocate and return a new workqueue_entry_t, set up to run the function
154 * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
155 * thread. See threadpool_queue_work() for full documentation. */
156 static workqueue_entry_t *
157 workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*),
158 void (*reply_fn)(void*),
159 void *arg)
161 workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t));
162 ent->fn = fn;
163 ent->reply_fn = reply_fn;
164 ent->arg = arg;
165 ent->priority = WQ_PRI_HIGH;
166 return ent;
169 #define workqueue_entry_free(ent) \
170 FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))
173 * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on
174 * any queue.
176 static void
177 workqueue_entry_free_(workqueue_entry_t *ent)
179 if (!ent)
180 return;
181 memset(ent, 0xf0, sizeof(*ent));
182 tor_free(ent);
186 * Cancel a workqueue_entry_t that has been returned from
187 * threadpool_queue_work.
189 * You must not call this function on any work whose reply function has been
190 * executed in the main thread; that will cause undefined behavior (probably,
191 * a crash).
193 * If the work is cancelled, this function return the argument passed to the
194 * work function. It is the caller's responsibility to free this storage.
196 * This function will have no effect if the worker thread has already executed
197 * or begun to execute the work item. In that case, it will return NULL.
199 void *
200 workqueue_entry_cancel(workqueue_entry_t *ent)
202 int cancelled = 0;
203 void *result = NULL;
204 tor_mutex_acquire(&ent->on_pool->lock);
205 workqueue_priority_t prio = ent->priority;
206 if (ent->pending) {
207 TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
208 cancelled = 1;
209 result = ent->arg;
211 tor_mutex_release(&ent->on_pool->lock);
213 if (cancelled) {
214 workqueue_entry_free(ent);
216 return result;
219 /**DOCDOC
221 must hold lock */
222 static int
223 worker_thread_has_work(workerthread_t *thread)
225 unsigned i;
226 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
227 if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i]))
228 return 1;
230 return thread->generation != thread->in_pool->generation;
233 /** Extract the next workqueue_entry_t from the the thread's pool, removing
234 * it from the relevant queues and marking it as non-pending.
236 * The caller must hold the lock. */
237 static workqueue_entry_t *
238 worker_thread_extract_next_work(workerthread_t *thread)
240 threadpool_t *pool = thread->in_pool;
241 work_tailq_t *queue = NULL, *this_queue;
242 unsigned i;
243 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
244 this_queue = &pool->work[i];
245 if (!TOR_TAILQ_EMPTY(this_queue)) {
246 queue = this_queue;
247 if (! crypto_fast_rng_one_in_n(get_thread_fast_rng(),
248 thread->lower_priority_chance)) {
249 /* Usually we'll just break now, so that we can get out of the loop
250 * and use the queue where we found work. But with a small
251 * probability, we'll keep looking for lower priority work, so that
252 * we don't ignore our low-priority queues entirely. */
253 break;
258 if (queue == NULL)
259 return NULL;
261 workqueue_entry_t *work = TOR_TAILQ_FIRST(queue);
262 TOR_TAILQ_REMOVE(queue, work, next_work);
263 work->pending = 0;
264 return work;
268 * Main function for the worker thread.
270 static void
271 worker_thread_main(void *thread_)
273 workerthread_t *thread = thread_;
274 threadpool_t *pool = thread->in_pool;
275 workqueue_entry_t *work;
276 workqueue_reply_t result;
278 tor_mutex_acquire(&pool->lock);
279 while (1) {
280 /* lock must be held at this point. */
281 while (worker_thread_has_work(thread)) {
282 /* lock must be held at this point. */
283 if (thread->in_pool->generation != thread->generation) {
284 void *arg = thread->in_pool->update_args[thread->index];
285 thread->in_pool->update_args[thread->index] = NULL;
286 workqueue_reply_t (*update_fn)(void*,void*) =
287 thread->in_pool->update_fn;
288 thread->generation = thread->in_pool->generation;
289 tor_mutex_release(&pool->lock);
291 workqueue_reply_t r = update_fn(thread->state, arg);
293 if (r != WQ_RPL_REPLY) {
294 return;
297 tor_mutex_acquire(&pool->lock);
298 continue;
300 work = worker_thread_extract_next_work(thread);
301 if (BUG(work == NULL))
302 break;
303 tor_mutex_release(&pool->lock);
305 /* We run the work function without holding the thread lock. This
306 * is the main thread's first opportunity to give us more work. */
307 result = work->fn(thread->state, work->arg);
309 /* Queue the reply for the main thread. */
310 queue_reply(thread->reply_queue, work);
312 /* We may need to exit the thread. */
313 if (result != WQ_RPL_REPLY) {
314 return;
316 tor_mutex_acquire(&pool->lock);
318 /* At this point the lock is held, and there is no work in this thread's
319 * queue. */
321 /* TODO: support an idle-function */
323 /* Okay. Now, wait till somebody has work for us. */
324 if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) {
325 log_warn(LD_GENERAL, "Fail tor_cond_wait.");
330 /** Put a reply on the reply queue. The reply must not currently be on
331 * any thread's work queue. */
332 static void
333 queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
335 int was_empty;
336 tor_mutex_acquire(&queue->lock);
337 was_empty = TOR_TAILQ_EMPTY(&queue->answers);
338 TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
339 tor_mutex_release(&queue->lock);
341 if (was_empty) {
342 if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
343 /* XXXX complain! */
348 /** Allocate and start a new worker thread to use state object <b>state</b>,
349 * and send responses to <b>replyqueue</b>. */
350 static workerthread_t *
351 workerthread_new(int32_t lower_priority_chance,
352 void *state, threadpool_t *pool, replyqueue_t *replyqueue)
354 workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
355 thr->state = state;
356 thr->reply_queue = replyqueue;
357 thr->in_pool = pool;
358 thr->lower_priority_chance = lower_priority_chance;
360 if (spawn_func(worker_thread_main, thr) < 0) {
361 //LCOV_EXCL_START
362 tor_assert_nonfatal_unreached();
363 log_err(LD_GENERAL, "Can't launch worker thread.");
364 workerthread_free(thr);
365 return NULL;
366 //LCOV_EXCL_STOP
369 return thr;
373 * Free up the resources allocated by a worker thread.
375 static void
376 workerthread_free_(workerthread_t *thread)
378 tor_free(thread);
382 * Queue an item of work for a thread in a thread pool. The function
383 * <b>fn</b> will be run in a worker thread, and will receive as arguments the
384 * thread's state object, and the provided object <b>arg</b>. It must return
385 * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN.
387 * Regardless of its return value, the function <b>reply_fn</b> will later be
388 * run in the main thread when it invokes replyqueue_process(), and will
389 * receive as its argument the same <b>arg</b> object. It's the reply
390 * function's responsibility to free the work object.
392 * On success, return a workqueue_entry_t object that can be passed to
393 * workqueue_entry_cancel(). On failure, return NULL. (Failure is not
394 * currently possible, but callers should check anyway.)
396 * Items are executed in a loose priority order -- each thread will usually
397 * take from the queued work with the highest prioirity, but will occasionally
398 * visit lower-priority queues to keep them from starving completely.
400 * Note that because of priorities and thread behavior, work items may not
401 * be executed strictly in order.
403 workqueue_entry_t *
404 threadpool_queue_work_priority(threadpool_t *pool,
405 workqueue_priority_t prio,
406 workqueue_reply_t (*fn)(void *, void *),
407 void (*reply_fn)(void *),
408 void *arg)
410 tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
411 ((int)prio) <= WORKQUEUE_PRIORITY_LAST);
413 workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
414 ent->on_pool = pool;
415 ent->pending = 1;
416 ent->priority = prio;
418 tor_mutex_acquire(&pool->lock);
420 TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work);
422 tor_cond_signal_one(&pool->condition);
424 tor_mutex_release(&pool->lock);
426 return ent;
429 /** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */
430 workqueue_entry_t *
431 threadpool_queue_work(threadpool_t *pool,
432 workqueue_reply_t (*fn)(void *, void *),
433 void (*reply_fn)(void *),
434 void *arg)
436 return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg);
440 * Queue a copy of a work item for every thread in a pool. This can be used,
441 * for example, to tell the threads to update some parameter in their states.
443 * Arguments are as for <b>threadpool_queue_work</b>, except that the
444 * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to
445 * make a copy of it.
447 * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update
448 * will be run. If a new update is scheduled before the old update finishes
449 * running, then the new will replace the old in any threads that haven't run
450 * it yet.
452 * Return 0 on success, -1 on failure.
455 threadpool_queue_update(threadpool_t *pool,
456 void *(*dup_fn)(void *),
457 workqueue_reply_t (*fn)(void *, void *),
458 void (*free_fn)(void *),
459 void *arg)
461 int i, n_threads;
462 void (*old_args_free_fn)(void *arg);
463 void **old_args;
464 void **new_args;
466 tor_mutex_acquire(&pool->lock);
467 n_threads = pool->n_threads;
468 old_args = pool->update_args;
469 old_args_free_fn = pool->free_update_arg_fn;
471 new_args = tor_calloc(n_threads, sizeof(void*));
472 for (i = 0; i < n_threads; ++i) {
473 if (dup_fn)
474 new_args[i] = dup_fn(arg);
475 else
476 new_args[i] = arg;
479 pool->update_args = new_args;
480 pool->free_update_arg_fn = free_fn;
481 pool->update_fn = fn;
482 ++pool->generation;
484 tor_cond_signal_all(&pool->condition);
486 tor_mutex_release(&pool->lock);
488 if (old_args) {
489 for (i = 0; i < n_threads; ++i) {
490 if (old_args[i] && old_args_free_fn)
491 old_args_free_fn(old_args[i]);
493 tor_free(old_args);
496 return 0;
499 /** Don't have more than this many threads per pool. */
500 #define MAX_THREADS 1024
502 /** For half of our threads, choose lower priority queues with probability
503 * 1/N for each of these values. Both are chosen somewhat arbitrarily. If
504 * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks
505 * stalling forever. If it's too high, we have a risk of low-priority tasks
506 * grabbing half of the threads. */
507 #define CHANCE_PERMISSIVE 37
508 #define CHANCE_STRICT INT32_MAX
510 /** Launch threads until we have <b>n</b>. */
511 static int
512 threadpool_start_threads(threadpool_t *pool, int n)
514 if (BUG(n < 0))
515 return -1; // LCOV_EXCL_LINE
516 if (n > MAX_THREADS)
517 n = MAX_THREADS;
519 tor_mutex_acquire(&pool->lock);
521 if (pool->n_threads < n)
522 pool->threads = tor_reallocarray(pool->threads,
523 sizeof(workerthread_t*), n);
525 while (pool->n_threads < n) {
526 /* For half of our threads, we'll choose lower priorities permissively;
527 * for the other half, we'll stick more strictly to higher priorities.
528 * This keeps slow low-priority tasks from taking over completely. */
529 int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE;
531 void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
532 workerthread_t *thr = workerthread_new(chance,
533 state, pool, pool->reply_queue);
535 if (!thr) {
536 //LCOV_EXCL_START
537 tor_assert_nonfatal_unreached();
538 pool->free_thread_state_fn(state);
539 tor_mutex_release(&pool->lock);
540 return -1;
541 //LCOV_EXCL_STOP
543 thr->index = pool->n_threads;
544 pool->threads[pool->n_threads++] = thr;
546 tor_mutex_release(&pool->lock);
548 return 0;
552 * Construct a new thread pool with <b>n</b> worker threads, configured to
553 * send their output to <b>replyqueue</b>. The threads' states will be
554 * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b>
555 * as its argument. When the threads close, they will call
556 * <b>free_thread_state_fn</b> on their states.
558 threadpool_t *
559 threadpool_new(int n_threads,
560 replyqueue_t *replyqueue,
561 void *(*new_thread_state_fn)(void*),
562 void (*free_thread_state_fn)(void*),
563 void *arg)
565 threadpool_t *pool;
566 pool = tor_malloc_zero(sizeof(threadpool_t));
567 tor_mutex_init_nonrecursive(&pool->lock);
568 tor_cond_init(&pool->condition);
569 unsigned i;
570 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
571 TOR_TAILQ_INIT(&pool->work[i]);
574 pool->new_thread_state_fn = new_thread_state_fn;
575 pool->new_thread_state_arg = arg;
576 pool->free_thread_state_fn = free_thread_state_fn;
577 pool->reply_queue = replyqueue;
579 if (threadpool_start_threads(pool, n_threads) < 0) {
580 //LCOV_EXCL_START
581 tor_assert_nonfatal_unreached();
582 tor_cond_uninit(&pool->condition);
583 tor_mutex_uninit(&pool->lock);
584 threadpool_free(pool);
585 return NULL;
586 //LCOV_EXCL_STOP
589 return pool;
593 * Free up the resources allocated by worker threads, worker thread pool, ...
595 void
596 threadpool_free_(threadpool_t *pool)
598 if (!pool)
599 return;
601 if (pool->threads) {
602 for (int i = 0; i != pool->n_threads; ++i)
603 workerthread_free(pool->threads[i]);
605 tor_free(pool->threads);
608 if (pool->update_args)
609 pool->free_update_arg_fn(pool->update_args);
611 if (pool->reply_event) {
612 tor_event_del(pool->reply_event);
613 tor_event_free(pool->reply_event);
616 if (pool->reply_queue)
617 replyqueue_free(pool->reply_queue);
619 if (pool->new_thread_state_arg)
620 pool->free_thread_state_fn(pool->new_thread_state_arg);
622 tor_free(pool);
625 /** Return the reply queue associated with a given thread pool. */
626 replyqueue_t *
627 threadpool_get_replyqueue(threadpool_t *tp)
629 return tp->reply_queue;
632 /** Allocate a new reply queue. Reply queues are used to pass results from
633 * worker threads to the main thread. Since the main thread is running an
634 * IO-centric event loop, it needs to get woken up with means other than a
635 * condition variable. */
636 replyqueue_t *
637 replyqueue_new(uint32_t alertsocks_flags)
639 replyqueue_t *rq;
641 rq = tor_malloc_zero(sizeof(replyqueue_t));
642 if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) {
643 //LCOV_EXCL_START
644 replyqueue_free(rq);
645 return NULL;
646 //LCOV_EXCL_STOP
649 tor_mutex_init(&rq->lock);
650 TOR_TAILQ_INIT(&rq->answers);
652 return rq;
656 * Free up the resources allocated by a reply queue.
658 static void
659 replyqueue_free_(replyqueue_t *queue)
661 if (!queue)
662 return;
664 workqueue_entry_t *work;
666 while (!TOR_TAILQ_EMPTY(&queue->answers)) {
667 work = TOR_TAILQ_FIRST(&queue->answers);
668 TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
669 workqueue_entry_free(work);
672 tor_free(queue);
675 /** Internal: Run from the libevent mainloop when there is work to handle in
676 * the reply queue handler. */
677 static void
678 reply_event_cb(evutil_socket_t sock, short events, void *arg)
680 threadpool_t *tp = arg;
681 (void) sock;
682 (void) events;
683 replyqueue_process(tp->reply_queue);
684 if (tp->reply_cb)
685 tp->reply_cb(tp);
688 /** Register the threadpool <b>tp</b>'s reply queue with Tor's global
689 * libevent mainloop. If <b>cb</b> is provided, it is run after
690 * each time there is work to process from the reply queue. Return 0 on
691 * success, -1 on failure.
694 threadpool_register_reply_event(threadpool_t *tp,
695 void (*cb)(threadpool_t *tp))
697 struct event_base *base = tor_libevent_get_base();
699 if (tp->reply_event) {
700 tor_event_free(tp->reply_event);
702 tp->reply_event = tor_event_new(base,
703 tp->reply_queue->alert.read_fd,
704 EV_READ|EV_PERSIST,
705 reply_event_cb,
706 tp);
707 tor_assert(tp->reply_event);
708 tp->reply_cb = cb;
709 return event_add(tp->reply_event, NULL);
713 * Process all pending replies on a reply queue. The main thread should call
714 * this function every time the socket returned by replyqueue_get_socket() is
715 * readable.
717 void
718 replyqueue_process(replyqueue_t *queue)
720 int r = queue->alert.drain_fn(queue->alert.read_fd);
721 if (r < 0) {
722 //LCOV_EXCL_START
723 static ratelim_t warn_limit = RATELIM_INIT(7200);
724 log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
725 "Failure from drain_fd: %s",
726 tor_socket_strerror(-r));
727 //LCOV_EXCL_STOP
730 tor_mutex_acquire(&queue->lock);
731 while (!TOR_TAILQ_EMPTY(&queue->answers)) {
732 /* lock must be held at this point.*/
733 workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
734 TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
735 tor_mutex_release(&queue->lock);
736 work->on_pool = NULL;
738 work->reply_fn(work->arg);
739 workqueue_entry_free(work);
741 tor_mutex_acquire(&queue->lock);
744 tor_mutex_release(&queue->lock);
747 /** Return the number of threads configured for the given pool. */
748 unsigned int
749 threadpool_get_n_threads(threadpool_t *tp)
751 tor_assert(tp);
752 return tp->n_threads;