1 /* thread_pool.c -- A pool of generic worker threads
3 Copyright (c) 2013-2017 Genome Research Ltd.
5 Author: James Bonfield <jkb@sanger.ac.uk>
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 DEALINGS IN THE SOFTWARE. */
41 #include "thread_pool_internal.h"
46 static int worker_id(hts_tpool
*p
) {
48 pthread_t s
= pthread_self();
49 for (i
= 0; i
< p
->tsize
; i
++) {
50 if (pthread_equal(s
, p
->t
[i
].tid
))
56 int DBG_OUT(FILE *fp
, char *fmt
, ...) {
59 return vfprintf(fp
, fmt
, args
);
62 #define DBG_OUT(...) do{}while(0)
65 /* ----------------------------------------------------------------------------
66 * A process-queue to hold results from the thread pool.
68 * Each thread pool may have jobs of multiple types being queued up and
69 * interleaved, so we attach several job process-queues to a single pool.
71 * The jobs themselves are expected to push their results onto their
72 * appropriate results queue.
76 * Adds a result to the end of the process result queue.
78 * Returns 0 on success;
81 static int hts_tpool_add_result(hts_tpool_job
*j
, void *data
) {
82 hts_tpool_process
*q
= j
->q
;
85 pthread_mutex_lock(&q
->p
->pool_m
);
87 DBG_OUT(stderr
, "%d: Adding result to queue %p, serial %"PRId64
", %d of %d\n",
88 worker_id(j
->p
), q
, j
->serial
, q
->n_output
+1, q
->qsize
);
90 if (--q
->n_processing
== 0)
91 pthread_cond_signal(&q
->none_processing_c
);
93 /* No results queue is fine if we don't want any results back */
95 pthread_mutex_unlock(&q
->p
->pool_m
);
99 if (!(r
= malloc(sizeof(*r
))))
104 r
->serial
= j
->serial
;
107 if (q
->output_tail
) {
108 q
->output_tail
->next
= r
;
111 q
->output_head
= q
->output_tail
= r
;
114 DBG_OUT(stderr
, "%d: Broadcasting result_avail (id %"PRId64
")\n",
115 worker_id(j
->p
), r
->serial
);
116 pthread_cond_broadcast(&q
->output_avail_c
);
117 DBG_OUT(stderr
, "%d: Broadcast complete\n", worker_id(j
->p
));
119 pthread_mutex_unlock(&q
->p
->pool_m
);
124 static void wake_next_worker(hts_tpool_process
*q
, int locked
);
126 /* Core of hts_tpool_next_result() */
127 static hts_tpool_result
*hts_tpool_next_result_locked(hts_tpool_process
*q
) {
128 hts_tpool_result
*r
, *last
;
133 for (last
= NULL
, r
= q
->output_head
; r
; last
= r
, r
= r
->next
) {
134 if (r
->serial
== q
->next_serial
)
139 // Remove r from out linked list
140 if (q
->output_head
== r
)
141 q
->output_head
= r
->next
;
143 last
->next
= r
->next
;
145 if (q
->output_tail
== r
)
146 q
->output_tail
= last
;
149 q
->output_tail
= NULL
;
154 if (q
->qsize
&& q
->n_output
< q
->qsize
) {
155 // Not technically input full, but can guarantee there is
156 // room for the input to go somewhere so we still signal.
157 // The waiting code will then check the condition again.
158 pthread_cond_signal(&q
->input_not_full_c
);
160 wake_next_worker(q
, 1);
168 * Pulls the next item off the process result queue. The caller should free
169 * it (and any internals as appropriate) after use. This doesn't wait for a
170 * result to be present.
172 * Results will be returned in strict order.
174 * Returns hts_tpool_result pointer if a result is ready.
177 hts_tpool_result
*hts_tpool_next_result(hts_tpool_process
*q
) {
180 DBG_OUT(stderr
, "Requesting next result on queue %p\n", q
);
182 pthread_mutex_lock(&q
->p
->pool_m
);
183 r
= hts_tpool_next_result_locked(q
);
184 pthread_mutex_unlock(&q
->p
->pool_m
);
186 DBG_OUT(stderr
, "(q=%p) Found %p\n", q
, r
);
192 * Pulls the next item off the process result queue. The caller should free
193 * it (and any internals as appropriate) after use. This will wait for
194 * a result to be present if none are currently available.
196 * Results will be returned in strict order.
198 * Returns hts_tpool_result pointer if a result is ready.
199 * NULL on error or during shutdown.
201 hts_tpool_result
*hts_tpool_next_result_wait(hts_tpool_process
*q
) {
204 pthread_mutex_lock(&q
->p
->pool_m
);
205 while (!(r
= hts_tpool_next_result_locked(q
))) {
206 /* Possible race here now avoided via _locked() call, but incase... */
208 struct timespec timeout
;
210 gettimeofday(&now
, NULL
);
211 timeout
.tv_sec
= now
.tv_sec
+ 10;
212 timeout
.tv_nsec
= now
.tv_usec
* 1000;
216 int rc
= --q
->ref_count
;
217 pthread_mutex_unlock(&q
->p
->pool_m
);
219 hts_tpool_process_destroy(q
);
222 pthread_cond_timedwait(&q
->output_avail_c
, &q
->p
->pool_m
, &timeout
);
226 pthread_mutex_unlock(&q
->p
->pool_m
);
232 * Returns true if there are no items in the process results queue and
233 * also none still pending.
235 int hts_tpool_process_empty(hts_tpool_process
*q
) {
238 pthread_mutex_lock(&q
->p
->pool_m
);
239 empty
= q
->n_input
== 0 && q
->n_processing
== 0 && q
->n_output
== 0;
240 pthread_mutex_unlock(&q
->p
->pool_m
);
245 void hts_tpool_process_ref_incr(hts_tpool_process
*q
) {
246 pthread_mutex_lock(&q
->p
->pool_m
);
248 pthread_mutex_unlock(&q
->p
->pool_m
);
251 void hts_tpool_process_ref_decr(hts_tpool_process
*q
) {
252 pthread_mutex_lock(&q
->p
->pool_m
);
253 if (--q
->ref_count
<= 0) {
254 pthread_mutex_unlock(&q
->p
->pool_m
);
255 hts_tpool_process_destroy(q
);
259 // maybe also call destroy here if needed?
260 pthread_mutex_unlock(&q
->p
->pool_m
);
264 * Returns the number of completed jobs in the process results queue.
266 int hts_tpool_process_len(hts_tpool_process
*q
) {
269 pthread_mutex_lock(&q
->p
->pool_m
);
271 pthread_mutex_unlock(&q
->p
->pool_m
);
277 * Returns the number of completed jobs in the process results queue plus the
278 * number running and queued up to run.
280 int hts_tpool_process_sz(hts_tpool_process
*q
) {
283 pthread_mutex_lock(&q
->p
->pool_m
);
284 len
= q
->n_output
+ q
->n_input
+ q
->n_processing
;
285 pthread_mutex_unlock(&q
->p
->pool_m
);
291 * Shutdown a process.
293 * This sets the shutdown flag and wakes any threads waiting on process
294 * condition variables.
296 void hts_tpool_process_shutdown(hts_tpool_process
*q
) {
297 pthread_mutex_lock(&q
->p
->pool_m
);
299 pthread_cond_broadcast(&q
->output_avail_c
);
300 pthread_cond_broadcast(&q
->input_not_full_c
);
301 pthread_cond_broadcast(&q
->input_empty_c
);
302 pthread_cond_broadcast(&q
->none_processing_c
);
303 pthread_mutex_unlock(&q
->p
->pool_m
);
307 * Frees a result 'r' and if free_data is true also frees
308 * the internal r->data result too.
310 void hts_tpool_delete_result(hts_tpool_result
*r
, int free_data
) {
314 if (free_data
&& r
->data
)
321 * Returns the data portion of a hts_tpool_result, corresponding
322 * to the actual "result" itself.
324 void *hts_tpool_result_data(hts_tpool_result
*r
) {
329 * Initialises a thread process-queue.
331 * In_only, if true, indicates that the process generates does not need to
332 * hold any output. Otherwise an output queue is used to store the results
333 * of processing each input job.
335 * Results hts_tpool_process pointer on success;
338 hts_tpool_process
*hts_tpool_process_init(hts_tpool
*p
, int qsize
, int in_only
) {
339 hts_tpool_process
*q
= malloc(sizeof(*q
));
341 pthread_cond_init(&q
->output_avail_c
, NULL
);
342 pthread_cond_init(&q
->input_not_full_c
, NULL
);
343 pthread_cond_init(&q
->input_empty_c
, NULL
);
344 pthread_cond_init(&q
->none_processing_c
,NULL
);
347 q
->input_head
= NULL
;
348 q
->input_tail
= NULL
;
349 q
->output_head
= NULL
;
350 q
->output_tail
= NULL
;
357 q
->in_only
= in_only
;
359 q
->wake_dispatch
= 0;
365 hts_tpool_process_attach(p
, q
);
370 /* Deallocates memory for a thread process-queue.
371 * Must be called before the thread pool is destroyed.
373 void hts_tpool_process_destroy(hts_tpool_process
*q
) {
374 DBG_OUT(stderr
, "Destroying results queue %p\n", q
);
379 // Ensure it's fully drained before destroying the queue
380 hts_tpool_process_reset(q
, 0);
381 pthread_mutex_lock(&q
->p
->pool_m
);
382 hts_tpool_process_detach(q
->p
, q
);
383 hts_tpool_process_shutdown(q
);
385 // Maybe a worker is scanning this queue, so delay destruction
386 if (--q
->ref_count
> 0) {
387 pthread_mutex_unlock(&q
->p
->pool_m
);
391 pthread_cond_destroy(&q
->output_avail_c
);
392 pthread_cond_destroy(&q
->input_not_full_c
);
393 pthread_cond_destroy(&q
->input_empty_c
);
394 pthread_cond_destroy(&q
->none_processing_c
);
395 pthread_mutex_unlock(&q
->p
->pool_m
);
399 DBG_OUT(stderr
, "Destroyed results queue %p\n", q
);
404 * Attach and detach a thread process-queue with / from the thread pool
407 * We need to do attach after making a thread process, but may also wish
408 * to temporarily detach if we wish to stop running jobs on a specific
409 * process while permitting other process to continue.
411 void hts_tpool_process_attach(hts_tpool
*p
, hts_tpool_process
*q
) {
412 pthread_mutex_lock(&p
->pool_m
);
415 q
->prev
= p
->q_head
->prev
;
416 p
->q_head
->prev
->next
= q
;
423 assert(p
->q_head
&& p
->q_head
->prev
&& p
->q_head
->next
);
424 pthread_mutex_unlock(&p
->pool_m
);
427 void hts_tpool_process_detach(hts_tpool
*p
, hts_tpool_process
*q
) {
428 pthread_mutex_lock(&p
->pool_m
);
429 if (!p
->q_head
|| !q
->prev
|| !q
->next
)
432 hts_tpool_process
*curr
= p
->q_head
, *first
= curr
;
435 q
->next
->prev
= q
->prev
;
436 q
->prev
->next
= q
->next
;
438 q
->next
= q
->prev
= NULL
;
447 } while (curr
!= first
);
450 pthread_mutex_unlock(&p
->pool_m
);
454 /* ----------------------------------------------------------------------------
458 #define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec)
463 * Once woken, each thread checks each process-queue in the pool in turn,
464 * looking for input jobs that also have room for the output (if it requires
465 * storing). If found, we execute it and repeat.
467 * If we checked all input queues and find no such job, then we wait until we
468 * are signalled to check again.
470 static void *tpool_worker(void *arg
) {
471 hts_tpool_worker
*w
= (hts_tpool_worker
*)arg
;
476 // Pop an item off the pool queue
477 pthread_mutex_lock(&p
->pool_m
);
479 assert(p
->q_head
== 0 || (p
->q_head
->prev
&& p
->q_head
->next
));
482 hts_tpool_process
*first
= p
->q_head
, *q
= first
;
487 // Iterate over queues, finding one with jobs and also
488 // room to put the result.
489 //if (q && q->input_head && !hts_tpool_process_output_full(q)) {
490 if (q
&& q
->input_head
&& q
->qsize
- q
->n_output
> p
->tsize
- p
->nwaiting
) {
497 } while (q
&& q
!= first
);
502 fprintf(stderr
, "%d: Shutting down\n", worker_id(p
));
504 pthread_mutex_unlock(&p
->pool_m
);
509 // We scanned all queues and cannot process any, so we wait.
512 // Push this thread to the top of the waiting stack
513 if (p
->t_stack_top
== -1 || p
->t_stack_top
> w
->idx
)
514 p
->t_stack_top
= w
->idx
;
516 p
->t_stack
[w
->idx
] = 1;
517 // printf("%2d: no work. In=%d Proc=%d Out=%d full=%d\n",
518 // w->idx, p->q_head->n_input, p->q_head->n_processing, p->q_head->n_output,
519 // hts_tpool_process_output_full(p->q_head));
520 pthread_cond_wait(&w
->pending_c
, &p
->pool_m
);
521 p
->t_stack
[w
->idx
] = 0;
523 /* Find new t_stack_top */
526 for (i
= 0; i
< p
->tsize
; i
++) {
534 pthread_mutex_unlock(&p
->pool_m
);
535 continue; // To outer for(;;) loop.
538 // Otherwise work_to_do, so process as many items in this queue as
539 // possible before switching to another queue. This means threads
540 // often end up being dedicated to one type of work.
542 while (q
->input_head
&& q
->qsize
- q
->n_output
> q
->n_processing
) {
549 if (!(q
->input_head
= j
->next
))
550 q
->input_tail
= NULL
;
552 // Transitioning from full queue to not-full means we can wake up
553 // any blocked dispatch threads. We broadcast this as it's only
554 // happening once (on the transition) rather than every time we
556 // (I wish I could remember why io_lib rev 3660 changed this from
557 // == to >=, but keeping it just incase!)
559 if (q
->n_input
-- >= q
->qsize
)
560 pthread_cond_broadcast(&q
->input_not_full_c
);
563 pthread_cond_signal(&q
->input_empty_c
);
565 p
->njobs
--; // Total number of jobs; used to adjust to CPU scaling
567 pthread_mutex_unlock(&p
->pool_m
);
569 DBG_OUT(stderr
, "%d: Processing queue %p, serial %"PRId64
"\n",
570 worker_id(j
->p
), q
, j
->serial
);
572 hts_tpool_add_result(j
, j
->func(j
->arg
));
573 //memset(j, 0xbb, sizeof(*j));
576 pthread_mutex_lock(&p
->pool_m
);
578 if (--q
->ref_count
== 0) // we were the last user
579 hts_tpool_process_destroy(q
);
581 // Out of jobs on this queue, so restart search from next one.
582 // This is equivalent to "work-stealing".
585 pthread_mutex_unlock(&p
->pool_m
);
591 static void wake_next_worker(hts_tpool_process
*q
, int locked
) {
594 pthread_mutex_lock(&p
->pool_m
);
596 // Update the q_head to be this queue so we'll start processing
597 // the queue we know to have results.
598 assert(q
->prev
&& q
->next
); // attached
601 // Wake up if we have more jobs waiting than CPUs. This partially combats
602 // CPU frequency scaling effects. Starting too many threads and then
603 // running out of jobs can cause each thread to have lots of start/stop
604 // cycles, which then translates often to CPU frequency scaling
605 // adjustments. Instead it is better to only start as many threads as we
606 // need to keep the throughput up, meaning some threads run flat out and
609 // This isn't perfect as we need to know how many can actually start,
610 // rather than how many are waiting. A limit on output queue size makes
611 // these two figures different.
612 assert(p
->njobs
>= q
->n_input
);
614 int running
= p
->tsize
- p
->nwaiting
;
615 int sig
= p
->t_stack_top
>= 0 && p
->njobs
> p
->tsize
- p
->nwaiting
616 && (!q
|| q
->n_processing
< q
->qsize
- q
->n_output
);
620 // Track average number of running threads and try to keep close.
621 // We permit this to change, but slowly. This avoids "boom and bust" cycles
622 // where we read a lot of data, start a lot of jobs, then become idle again.
623 // This way some threads run steadily and others dormant, which is better
626 // It's 50:50 if this is a good thing. It helps some tasks quite significantly
627 // while slightly hindering other (perhaps more usual) jobs.
629 if (++p
->n_count
== 256) {
633 p
->n_running
+= running
;
634 // Built in lag to avoid see-sawing. Is this safe in all cases?
635 if (sig
&& p
->n_count
>= 128 && running
*p
->n_count
> p
->n_running
+1) sig
=0;
639 printf("%d waiting, %d running, %d output, %d, arun %d => %d\t", p
->njobs
,
640 running
, q
->n_output
, q
->qsize
- q
->n_output
,
641 p
->n_running
/p
->n_count
, sig
);
643 for (i
= 0; i
< p
->tsize
; i
++)
644 putchar("x "[p
->t_stack
[i
]]);
649 pthread_cond_signal(&p
->t
[p
->t_stack_top
].pending_c
);
652 pthread_mutex_unlock(&p
->pool_m
);
656 * Creates a worker pool with n worker threads.
658 * Returns pool pointer on success;
661 hts_tpool
*hts_tpool_init(int n
) {
663 hts_tpool
*p
= malloc(sizeof(*p
));
672 p
->t
= malloc(n
* sizeof(p
->t
[0]));
674 pthread_mutexattr_t attr
;
675 pthread_mutexattr_init(&attr
);
676 pthread_mutexattr_settype(&attr
, PTHREAD_MUTEX_RECURSIVE
);
677 pthread_mutex_init(&p
->pool_m
, &attr
);
678 pthread_mutexattr_destroy(&attr
);
680 if (!(p
->t_stack
= malloc(n
* sizeof(*p
->t_stack
))))
684 pthread_mutex_lock(&p
->pool_m
);
686 for (i
= 0; i
< n
; i
++) {
687 hts_tpool_worker
*w
= &p
->t
[i
];
691 pthread_cond_init(&w
->pending_c
, NULL
);
692 if (0 != pthread_create(&w
->tid
, NULL
, tpool_worker
, w
)) {
693 pthread_mutex_unlock(&p
->pool_m
);
698 pthread_mutex_unlock(&p
->pool_m
);
704 * Returns the number of requested threads for a pool.
706 int hts_tpool_size(hts_tpool
*p
) {
711 * Adds an item to the work pool.
713 * Returns 0 on success
716 int hts_tpool_dispatch(hts_tpool
*p
, hts_tpool_process
*q
,
717 void *(*func
)(void *arg
), void *arg
) {
718 return hts_tpool_dispatch2(p
, q
, func
, arg
, 0);
722 * As above but optional non-block flag.
724 * nonblock 0 => block if input queue is full
725 * nonblock +1 => don't block if input queue is full, but do not add task
726 * nonblock -1 => add task regardless of whether queue is full (over-size)
728 int hts_tpool_dispatch2(hts_tpool
*p
, hts_tpool_process
*q
,
729 void *(*func
)(void *arg
), void *arg
, int nonblock
) {
732 pthread_mutex_lock(&p
->pool_m
);
734 DBG_OUT(stderr
, "Dispatching job for queue %p, serial %"PRId64
"\n",
737 if (q
->n_input
>= q
->qsize
&& nonblock
== 1) {
738 pthread_mutex_unlock(&p
->pool_m
);
743 if (!(j
= malloc(sizeof(*j
)))) {
744 pthread_mutex_unlock(&p
->pool_m
);
752 j
->serial
= q
->curr_serial
++;
755 while (q
->n_input
>= q
->qsize
&& !q
->shutdown
&& !q
->wake_dispatch
)
756 pthread_cond_wait(&q
->input_not_full_c
, &q
->p
->pool_m
);
759 pthread_mutex_unlock(&p
->pool_m
);
762 if (q
->wake_dispatch
) {
763 //fprintf(stderr, "Wake => non-block for this operation\n");
764 q
->wake_dispatch
= 0;
768 p
->njobs
++; // total across all queues
769 q
->n_input
++; // queue specific
772 q
->input_tail
->next
= j
;
775 q
->input_head
= q
->input_tail
= j
;
778 DBG_OUT(stderr
, "Dispatched (serial %"PRId64
")\n", j
->serial
);
780 // Let a worker know we have data.
781 // Keep incoming queue at 1 per running thread, so there is always
782 // something waiting when they end their current task. If we go above
783 // this signal to start more threads (if available). This has the effect
784 // of concentrating jobs to fewer cores when we are I/O bound, which in
785 // turn benefits systems with auto CPU frequency scaling.
787 wake_next_worker(q
, 1);
789 pthread_mutex_unlock(&p
->pool_m
);
795 * Wakes up a single thread stuck in dispatch and make it return with
798 void hts_tpool_wake_dispatch(hts_tpool_process
*q
) {
799 pthread_mutex_lock(&q
->p
->pool_m
);
800 q
->wake_dispatch
= 1;
801 pthread_cond_signal(&q
->input_not_full_c
);
802 pthread_mutex_unlock(&q
->p
->pool_m
);
806 * Flushes the process-queue, but doesn't exit. This simply drains the queue
807 * and ensures all worker threads have finished their current tasks
808 * associated with this process.
810 * NOT: This does not mean the worker threads are not executing jobs in
811 * another process-queue.
813 * Returns 0 on success;
816 int hts_tpool_process_flush(hts_tpool_process
*q
) {
820 DBG_OUT(stderr
, "Flushing pool %p\n", p
);
823 pthread_mutex_lock(&p
->pool_m
);
825 // Wake up everything for the final sprint!
826 for (i
= 0; i
< p
->tsize
; i
++)
828 pthread_cond_signal(&p
->t
[i
].pending_c
);
830 // Ensure there is room for the final sprint.
831 // Shouldn't be possible to get here, but just incase.
832 if (q
->qsize
< q
->n_output
+ q
->n_input
+ q
->n_processing
)
833 q
->qsize
= q
->n_output
+ q
->n_input
+ q
->n_processing
;
835 // Wait for n_input and n_processing to hit zero.
836 while (q
->n_input
|| q
->n_processing
) {
838 pthread_cond_wait(&q
->input_empty_c
, &p
->pool_m
);
839 if (q
->shutdown
) break;
840 while (q
->n_processing
)
841 pthread_cond_wait(&q
->none_processing_c
, &p
->pool_m
);
842 if (q
->shutdown
) break;
845 pthread_mutex_unlock(&p
->pool_m
);
847 DBG_OUT(stderr
, "Flushed complete for pool %p, queue %p\n", p
, q
);
853 * Resets a process to the intial state.
855 * This removes any queued up input jobs, disables any notification of
856 * new results/output, flushes what is left and then discards any
857 * queued output. Anything consumer stuck in a wait on results to
858 * appear should stay stuck and will only wake up when new data is
859 * pushed through the queue.
861 * Returns 0 on success;
864 int hts_tpool_process_reset(hts_tpool_process
*q
, int free_results
) {
865 pthread_mutex_lock(&q
->p
->pool_m
);
866 // prevent next_result from returning data during our flush
867 q
->next_serial
= INT_MAX
;
869 // Purge any queued input not yet being acted upon
870 hts_tpool_job
*j
, *jn
;
871 for (j
= q
->input_head
; j
; j
= jn
) {
872 //fprintf(stderr, "Discard input %d\n", j->serial);
876 q
->input_head
= q
->input_tail
= NULL
;
879 // Purge any queued output, thus ensuring we have room to flush.
880 hts_tpool_result
*r
, *rn
;
881 for (r
= q
->output_head
; r
; r
= rn
) {
882 //fprintf(stderr, "Discard output %d\n", r->serial);
884 hts_tpool_delete_result(r
, free_results
);
886 q
->output_head
= q
->output_tail
= NULL
;
888 pthread_mutex_unlock(&q
->p
->pool_m
);
890 // Wait for any jobs being processed to complete.
891 // (TODO: consider how to cancel any currently processing jobs.
892 // Probably this is too hard.)
893 if (hts_tpool_process_flush(q
) != 0)
896 // Discard any new output.
897 pthread_mutex_lock(&q
->p
->pool_m
);
898 for (r
= q
->output_head
; r
; r
= rn
) {
899 //fprintf(stderr, "Discard output %d\n", r->serial);
901 hts_tpool_delete_result(r
, free_results
);
903 q
->output_head
= q
->output_tail
= NULL
;
906 // Finally reset the serial back to the starting point.
907 q
->next_serial
= q
->curr_serial
= 0;
908 pthread_cond_signal(&q
->input_not_full_c
);
909 pthread_mutex_unlock(&q
->p
->pool_m
);
914 /* Returns the process queue size */
915 int hts_tpool_process_qsize(hts_tpool_process
*q
) {
920 * Destroys a thread pool. The threads are joined into the main
921 * thread so they will finish their current work load.
923 void hts_tpool_destroy(hts_tpool
*p
) {
926 DBG_OUT(stderr
, "Destroying pool %p\n", p
);
928 /* Send shutdown message to worker threads */
929 pthread_mutex_lock(&p
->pool_m
);
932 DBG_OUT(stderr
, "Sending shutdown request\n");
934 for (i
= 0; i
< p
->tsize
; i
++)
935 pthread_cond_signal(&p
->t
[i
].pending_c
);
937 pthread_mutex_unlock(&p
->pool_m
);
939 DBG_OUT(stderr
, "Shutdown complete\n");
941 for (i
= 0; i
< p
->tsize
; i
++)
942 pthread_join(p
->t
[i
].tid
, NULL
);
944 pthread_mutex_destroy(&p
->pool_m
);
945 for (i
= 0; i
< p
->tsize
; i
++)
946 pthread_cond_destroy(&p
->t
[i
].pending_c
);
954 DBG_OUT(stderr
, "Destroyed pool %p\n", p
);
959 * Destroys a thread pool without waiting on jobs to complete.
960 * Use hts_tpool_kill(p) to quickly exit after a fatal error.
962 void hts_tpool_kill(hts_tpool
*p
) {
965 DBG_OUT(stderr
, "Destroying pool %p, kill=%d\n", p
, kill
);
967 for (i
= 0; i
< p
->tsize
; i
++)
968 pthread_kill(p
->t
[i
].tid
, SIGINT
);
970 pthread_mutex_destroy(&p
->pool_m
);
971 for (i
= 0; i
< p
->tsize
; i
++)
972 pthread_cond_destroy(&p
->t
[i
].pending_c
);
980 DBG_OUT(stderr
, "Destroyed pool %p\n", p
);
984 /*=============================================================================
987 * This can be considered both as a basic test and as a worked example for
988 * various usage patterns.
989 *=============================================================================
997 #define TASK_SIZE 1000
1000 /*-----------------------------------------------------------------------------
1001 * Unordered x -> x*x test.
1002 * Results arrive in order of completion.
1004 void *doit_square_u(void *arg
) {
1005 int job
= *(int *)arg
;
1007 usleep(random() % 100000); // to coerce job completion out of order
1009 printf("RESULT: %d\n", job
*job
);
1015 int test_square_u(int n
) {
1016 hts_tpool
*p
= hts_tpool_init(n
);
1017 hts_tpool_process
*q
= hts_tpool_process_init(p
, n
*2, 1);
1021 for (i
= 0; i
< TASK_SIZE
; i
++) {
1022 int *ip
= malloc(sizeof(*ip
));
1024 hts_tpool_dispatch(p
, q
, doit_square_u
, ip
);
1027 hts_tpool_process_flush(q
);
1028 hts_tpool_process_destroy(q
);
1029 hts_tpool_destroy(p
);
1035 /*-----------------------------------------------------------------------------
1036 * Ordered x -> x*x test.
1037 * Results arrive in numerical order.
1039 * This implementation uses a non-blocking dispatch to avoid dead-locks
1040 * where one job takes too long to complete.
1042 void *doit_square(void *arg
) {
1043 int job
= *(int *)arg
;
1046 // One excessively slow, to stress test output queue filling and
1047 // excessive out of order scenarios.
1048 usleep(500000 * ((job
&31)==31) + random() % 10000);
1050 res
= malloc(sizeof(*res
));
1051 *res
= (job
<0) ? -job
*job
: job
*job
;
1057 int test_square(int n
) {
1058 hts_tpool
*p
= hts_tpool_init(n
);
1059 hts_tpool_process
*q
= hts_tpool_process_init(p
, n
*2, 0);
1061 hts_tpool_result
*r
;
1064 for (i
= 0; i
< TASK_SIZE
; i
++) {
1065 int *ip
= malloc(sizeof(*ip
));
1070 // In the situation where some jobs take much longer than
1071 // others, we could end up blocking here as we haven't got
1072 // any room in the output queue to place it. (We don't launch a
1073 // job if the output queue is full.)
1075 // This happens when the next serial number to fetch is, eg, 50
1076 // but jobs 51-100 have all executed really fast and appeared in
1077 // the output queue before 50. A dispatch & check-results
1078 // alternating loop can fail to find job 50 many times over until
1079 // eventually the dispatch blocks before it arrives.
1081 // Our solution is to dispatch in non-blocking mode so we are
1082 // always to either dispatch or consume a result.
1083 blk
= hts_tpool_dispatch2(p
, q
, doit_square
, ip
, 1);
1085 // Check for results.
1086 if ((r
= hts_tpool_next_result(q
))) {
1087 printf("RESULT: %d\n", *(int *)r
->data
);
1088 hts_tpool_delete_result(r
, 1);
1091 // The alternative is a separate thread for dispatching and/or
1092 // consumption of results. See test_squareB.
1093 putchar('.'); fflush(stdout
);
1096 } while (blk
== -1);
1099 // Wait for any input-queued up jobs or in-progress jobs to complete.
1100 hts_tpool_process_flush(q
);
1102 while ((r
= hts_tpool_next_result(q
))) {
1103 printf("RESULT: %d\n", *(int *)r
->data
);
1104 hts_tpool_delete_result(r
, 1);
1107 hts_tpool_process_destroy(q
);
1108 hts_tpool_destroy(p
);
1113 /*-----------------------------------------------------------------------------
1114 * Ordered x -> x*x test.
1115 * Results arrive in numerical order.
1117 * This implementation uses separate dispatching threads and job consumption
1118 * threads (main thread). This means it can use a blocking calls for
1119 * simplicity elsewhere.
1121 struct squareB_opt
{
1123 hts_tpool_process
*q
;
1126 static void *test_squareB_dispatcher(void *arg
) {
1127 struct squareB_opt
*o
= (struct squareB_opt
*)arg
;
1130 for (i
= 0; i
< o
->n
; i
++) {
1131 ip
= malloc(sizeof(*ip
));
1134 hts_tpool_dispatch(o
->p
, o
->q
, doit_square
, ip
);
1137 // Dispatch an sentinel job to mark the end
1138 *(ip
= malloc(sizeof(*ip
))) = -1;
1139 hts_tpool_dispatch(o
->p
, o
->q
, doit_square
, ip
);
1143 int test_squareB(int n
) {
1144 hts_tpool
*p
= hts_tpool_init(n
);
1145 hts_tpool_process
*q
= hts_tpool_process_init(p
, n
*2, 0);
1146 struct squareB_opt o
= {p
, q
, TASK_SIZE
};
1149 // Launch our job creation thread.
1150 pthread_create(&tid
, NULL
, test_squareB_dispatcher
, &o
);
1152 // Consume all results until we find the end-of-job marker.
1154 hts_tpool_result
*r
= hts_tpool_next_result_wait(q
);
1155 int x
= *(int *)r
->data
;
1156 hts_tpool_delete_result(r
, 1);
1159 printf("RESULT: %d\n", x
);
1162 // Wait for any input-queued up jobs or in-progress jobs to complete.
1163 // This should do nothing as we've been executing until the termination
1165 hts_tpool_process_flush(q
);
1166 assert(hts_tpool_next_result(q
) == NULL
);
1168 hts_tpool_process_destroy(q
);
1169 hts_tpool_destroy(p
);
1170 pthread_join(tid
, NULL
);
1176 /*-----------------------------------------------------------------------------
1177 * A simple pipeline test.
1178 * We use a dediocated input thread that does the initial generation of job
1179 * and dispatch, several execution steps running in a shared pool, and a
1180 * dedicated output thread that prints up the final result. It's key that our
1181 * pipeline execution stages can run independently and don't themselves have
1182 * any waits. To achieve this we therefore also use some dedicated threads
1183 * that take the output from one queue and resubmits the job as the input to
1186 * More generally this could perhaps be a single pipeline thread that
1187 * marshalls multiple queues and their interactions, but this is simply a
1188 * demonstration of a single pipeline.
1190 * Our process fills out the bottom byte of a 32-bit int and then shifts it
1191 * left one byte at a time. Only the final stage needs to be ordered. Each
1192 * stage uses its own queue.
1194 * Possible improvement: we only need the last stage to be ordered. By
1195 * allocating our own serial numbers for the first job and manually setting
1196 * these serials in the last job, perhaps we can permit out of order execution
1197 * of all the inbetween stages. (I doubt it'll affect speed much though.)
1200 static void *pipe_input_thread(void *arg
);
1201 static void *pipe_stage1(void *arg
);
1202 static void *pipe_stage2(void *arg
);
1203 static void *pipe_stage3(void *arg
);
1204 static void *pipe_output_thread(void *arg
);
1208 hts_tpool_process
*q1
;
1209 hts_tpool_process
*q2
;
1210 hts_tpool_process
*q3
;
1217 int eof
; // set with last job.
1220 static void *pipe_input_thread(void *arg
) {
1221 pipe_opt
*o
= (pipe_opt
*)arg
;
1224 for (i
= 1; i
<= o
->n
; i
++) {
1225 pipe_job
*j
= malloc(sizeof(*j
));
1228 j
->eof
= (i
== o
->n
);
1230 printf("I %08x\n", j
->x
);
1232 if (hts_tpool_dispatch(o
->p
, o
->q1
, pipe_stage1
, j
) != 0) {
1234 pthread_exit((void *)1);
1241 static void *pipe_stage1(void *arg
) {
1242 pipe_job
*j
= (pipe_job
*)arg
;
1245 usleep(random() % 10000); // fast job
1246 printf("1 %08x\n", j
->x
);
1251 static void *pipe_stage1to2(void *arg
) {
1252 pipe_opt
*o
= (pipe_opt
*)arg
;
1253 hts_tpool_result
*r
;
1255 while ((r
= hts_tpool_next_result_wait(o
->q1
))) {
1256 pipe_job
*j
= (pipe_job
*)r
->data
;
1257 hts_tpool_delete_result(r
, 0);
1258 if (hts_tpool_dispatch(j
->o
->p
, j
->o
->q2
, pipe_stage2
, j
) != 0)
1259 pthread_exit((void *)1);
1267 static void *pipe_stage2(void *arg
) {
1268 pipe_job
*j
= (pipe_job
*)arg
;
1271 usleep(random() % 100000); // slow job
1272 printf("2 %08x\n", j
->x
);
1277 static void *pipe_stage2to3(void *arg
) {
1278 pipe_opt
*o
= (pipe_opt
*)arg
;
1279 hts_tpool_result
*r
;
1281 while ((r
= hts_tpool_next_result_wait(o
->q2
))) {
1282 pipe_job
*j
= (pipe_job
*)r
->data
;
1283 hts_tpool_delete_result(r
, 0);
1284 if (hts_tpool_dispatch(j
->o
->p
, j
->o
->q3
, pipe_stage3
, j
) != 0)
1285 pthread_exit((void *)1);
1293 static void *pipe_stage3(void *arg
) {
1294 pipe_job
*j
= (pipe_job
*)arg
;
1296 usleep(random() % 10000); // fast job
1301 static void *pipe_output_thread(void *arg
) {
1302 pipe_opt
*o
= (pipe_opt
*)arg
;
1303 hts_tpool_result
*r
;
1305 while ((r
= hts_tpool_next_result_wait(o
->q3
))) {
1306 pipe_job
*j
= (pipe_job
*)r
->data
;
1308 printf("O %08x\n", j
->x
);
1309 hts_tpool_delete_result(r
, 1);
1317 int test_pipe(int n
) {
1318 hts_tpool
*p
= hts_tpool_init(n
);
1319 hts_tpool_process
*q1
= hts_tpool_process_init(p
, n
*2, 0);
1320 hts_tpool_process
*q2
= hts_tpool_process_init(p
, n
*2, 0);
1321 hts_tpool_process
*q3
= hts_tpool_process_init(p
, n
*2, 0);
1322 pipe_opt o
= {p
, q1
, q2
, q3
, TASK_SIZE
};
1323 pthread_t tidIto1
, tid1to2
, tid2to3
, tid3toO
;
1327 // Launch our data source and sink threads.
1328 pthread_create(&tidIto1
, NULL
, pipe_input_thread
, &o
);
1329 pthread_create(&tid1to2
, NULL
, pipe_stage1to2
, &o
);
1330 pthread_create(&tid2to3
, NULL
, pipe_stage2to3
, &o
);
1331 pthread_create(&tid3toO
, NULL
, pipe_output_thread
, &o
);
1333 // Wait for tasks to finish.
1335 pthread_join(tidIto1
, &retv
); ret
|= (retv
!= NULL
);
1336 pthread_join(tid1to2
, &retv
); ret
|= (retv
!= NULL
);
1337 pthread_join(tid2to3
, &retv
); ret
|= (retv
!= NULL
);
1338 pthread_join(tid3toO
, &retv
); ret
|= (retv
!= NULL
);
1339 printf("Return value %d\n", ret
);
1341 hts_tpool_process_destroy(q1
);
1342 hts_tpool_process_destroy(q2
);
1343 hts_tpool_process_destroy(q3
);
1344 hts_tpool_destroy(p
);
1349 /*-----------------------------------------------------------------------------*/
1350 int main(int argc
, char **argv
) {
1355 fprintf(stderr
, "Usage: %s command n_threads\n", argv
[0]);
1356 fprintf(stderr
, "Where commands are:\n\n");
1357 fprintf(stderr
, "unordered # Unordered output\n");
1358 fprintf(stderr
, "ordered1 # Main thread with non-block API\n");
1359 fprintf(stderr
, "ordered2 # Dispatch thread, blocking API\n");
1360 fprintf(stderr
, "pipe # Multi-stage pipeline, several queues\n");
1365 if (strcmp(argv
[1], "unordered") == 0) return test_square_u(n
);
1366 if (strcmp(argv
[1], "ordered1") == 0) return test_square(n
);
1367 if (strcmp(argv
[1], "ordered2") == 0) return test_squareB(n
);
1368 if (strcmp(argv
[1], "pipe") == 0) return test_pipe(n
);
1370 fprintf(stderr
, "Unknown sub-command\n");