modified: diffout.py
[GalaxyCodeBases.git] / c_cpp / lib / htslib / thread_pool.c
blob4dd4aff104967c458b3c9d1051469c62b1372d51
1 /* thread_pool.c -- A pool of generic worker threads
3 Copyright (c) 2013-2017 Genome Research Ltd.
5 Author: James Bonfield <jkb@sanger.ac.uk>
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 DEALINGS IN THE SOFTWARE. */
25 #ifndef TEST_MAIN
26 #include <config.h>
27 #endif
29 #include <stdlib.h>
30 #include <inttypes.h>
31 #include <signal.h>
32 #include <errno.h>
33 #include <stdio.h>
34 #include <string.h>
35 #include <sys/time.h>
36 #include <assert.h>
37 #include <stdarg.h>
38 #include <unistd.h>
39 #include <limits.h>
41 #include "thread_pool_internal.h"
43 //#define DEBUG
45 #ifdef DEBUG
46 static int worker_id(hts_tpool *p) {
47 int i;
48 pthread_t s = pthread_self();
49 for (i = 0; i < p->tsize; i++) {
50 if (pthread_equal(s, p->t[i].tid))
51 return i;
53 return -1;
56 int DBG_OUT(FILE *fp, char *fmt, ...) {
57 va_list args;
58 va_start(args, fmt);
59 return vfprintf(fp, fmt, args);
61 #else
62 #define DBG_OUT(...) do{}while(0)
63 #endif
65 /* ----------------------------------------------------------------------------
66 * A process-queue to hold results from the thread pool.
68 * Each thread pool may have jobs of multiple types being queued up and
69 * interleaved, so we attach several job process-queues to a single pool.
71 * The jobs themselves are expected to push their results onto their
72 * appropriate results queue.
76 * Adds a result to the end of the process result queue.
78 * Returns 0 on success;
79 * -1 on failure
81 static int hts_tpool_add_result(hts_tpool_job *j, void *data) {
82 hts_tpool_process *q = j->q;
83 hts_tpool_result *r;
85 pthread_mutex_lock(&q->p->pool_m);
87 DBG_OUT(stderr, "%d: Adding result to queue %p, serial %"PRId64", %d of %d\n",
88 worker_id(j->p), q, j->serial, q->n_output+1, q->qsize);
90 if (--q->n_processing == 0)
91 pthread_cond_signal(&q->none_processing_c);
93 /* No results queue is fine if we don't want any results back */
94 if (q->in_only) {
95 pthread_mutex_unlock(&q->p->pool_m);
96 return 0;
99 if (!(r = malloc(sizeof(*r))))
100 return -1;
102 r->next = NULL;
103 r->data = data;
104 r->serial = j->serial;
106 q->n_output++;
107 if (q->output_tail) {
108 q->output_tail->next = r;
109 q->output_tail = r;
110 } else {
111 q->output_head = q->output_tail = r;
114 DBG_OUT(stderr, "%d: Broadcasting result_avail (id %"PRId64")\n",
115 worker_id(j->p), r->serial);
116 pthread_cond_broadcast(&q->output_avail_c);
117 DBG_OUT(stderr, "%d: Broadcast complete\n", worker_id(j->p));
119 pthread_mutex_unlock(&q->p->pool_m);
121 return 0;
124 static void wake_next_worker(hts_tpool_process *q, int locked);
126 /* Core of hts_tpool_next_result() */
127 static hts_tpool_result *hts_tpool_next_result_locked(hts_tpool_process *q) {
128 hts_tpool_result *r, *last;
130 if (q->shutdown)
131 return NULL;
133 for (last = NULL, r = q->output_head; r; last = r, r = r->next) {
134 if (r->serial == q->next_serial)
135 break;
138 if (r) {
139 // Remove r from out linked list
140 if (q->output_head == r)
141 q->output_head = r->next;
142 else
143 last->next = r->next;
145 if (q->output_tail == r)
146 q->output_tail = last;
148 if (!q->output_head)
149 q->output_tail = NULL;
151 q->next_serial++;
152 q->n_output--;
154 if (q->qsize && q->n_output < q->qsize) {
155 // Not technically input full, but can guarantee there is
156 // room for the input to go somewhere so we still signal.
157 // The waiting code will then check the condition again.
158 pthread_cond_signal(&q->input_not_full_c);
159 if (!q->shutdown)
160 wake_next_worker(q, 1);
164 return r;
168 * Pulls the next item off the process result queue. The caller should free
169 * it (and any internals as appropriate) after use. This doesn't wait for a
170 * result to be present.
172 * Results will be returned in strict order.
174 * Returns hts_tpool_result pointer if a result is ready.
175 * NULL if not.
177 hts_tpool_result *hts_tpool_next_result(hts_tpool_process *q) {
178 hts_tpool_result *r;
180 DBG_OUT(stderr, "Requesting next result on queue %p\n", q);
182 pthread_mutex_lock(&q->p->pool_m);
183 r = hts_tpool_next_result_locked(q);
184 pthread_mutex_unlock(&q->p->pool_m);
186 DBG_OUT(stderr, "(q=%p) Found %p\n", q, r);
188 return r;
192 * Pulls the next item off the process result queue. The caller should free
193 * it (and any internals as appropriate) after use. This will wait for
194 * a result to be present if none are currently available.
196 * Results will be returned in strict order.
198 * Returns hts_tpool_result pointer if a result is ready.
199 * NULL on error or during shutdown.
201 hts_tpool_result *hts_tpool_next_result_wait(hts_tpool_process *q) {
202 hts_tpool_result *r;
204 pthread_mutex_lock(&q->p->pool_m);
205 while (!(r = hts_tpool_next_result_locked(q))) {
206 /* Possible race here now avoided via _locked() call, but incase... */
207 struct timeval now;
208 struct timespec timeout;
210 gettimeofday(&now, NULL);
211 timeout.tv_sec = now.tv_sec + 10;
212 timeout.tv_nsec = now.tv_usec * 1000;
214 q->ref_count++;
215 if (q->shutdown) {
216 int rc = --q->ref_count;
217 pthread_mutex_unlock(&q->p->pool_m);
218 if (rc == 0)
219 hts_tpool_process_destroy(q);
220 return NULL;
222 pthread_cond_timedwait(&q->output_avail_c, &q->p->pool_m, &timeout);
224 q->ref_count--;
226 pthread_mutex_unlock(&q->p->pool_m);
228 return r;
232 * Returns true if there are no items in the process results queue and
233 * also none still pending.
235 int hts_tpool_process_empty(hts_tpool_process *q) {
236 int empty;
238 pthread_mutex_lock(&q->p->pool_m);
239 empty = q->n_input == 0 && q->n_processing == 0 && q->n_output == 0;
240 pthread_mutex_unlock(&q->p->pool_m);
242 return empty;
245 void hts_tpool_process_ref_incr(hts_tpool_process *q) {
246 pthread_mutex_lock(&q->p->pool_m);
247 q->ref_count++;
248 pthread_mutex_unlock(&q->p->pool_m);
251 void hts_tpool_process_ref_decr(hts_tpool_process *q) {
252 pthread_mutex_lock(&q->p->pool_m);
253 if (--q->ref_count <= 0) {
254 pthread_mutex_unlock(&q->p->pool_m);
255 hts_tpool_process_destroy(q);
256 return;
259 // maybe also call destroy here if needed?
260 pthread_mutex_unlock(&q->p->pool_m);
264 * Returns the number of completed jobs in the process results queue.
266 int hts_tpool_process_len(hts_tpool_process *q) {
267 int len;
269 pthread_mutex_lock(&q->p->pool_m);
270 len = q->n_output;
271 pthread_mutex_unlock(&q->p->pool_m);
273 return len;
277 * Returns the number of completed jobs in the process results queue plus the
278 * number running and queued up to run.
280 int hts_tpool_process_sz(hts_tpool_process *q) {
281 int len;
283 pthread_mutex_lock(&q->p->pool_m);
284 len = q->n_output + q->n_input + q->n_processing;
285 pthread_mutex_unlock(&q->p->pool_m);
287 return len;
291 * Shutdown a process.
293 * This sets the shutdown flag and wakes any threads waiting on process
294 * condition variables.
296 void hts_tpool_process_shutdown(hts_tpool_process *q) {
297 pthread_mutex_lock(&q->p->pool_m);
298 q->shutdown = 1;
299 pthread_cond_broadcast(&q->output_avail_c);
300 pthread_cond_broadcast(&q->input_not_full_c);
301 pthread_cond_broadcast(&q->input_empty_c);
302 pthread_cond_broadcast(&q->none_processing_c);
303 pthread_mutex_unlock(&q->p->pool_m);
307 * Frees a result 'r' and if free_data is true also frees
308 * the internal r->data result too.
310 void hts_tpool_delete_result(hts_tpool_result *r, int free_data) {
311 if (!r)
312 return;
314 if (free_data && r->data)
315 free(r->data);
317 free(r);
321 * Returns the data portion of a hts_tpool_result, corresponding
322 * to the actual "result" itself.
324 void *hts_tpool_result_data(hts_tpool_result *r) {
325 return r->data;
329 * Initialises a thread process-queue.
331 * In_only, if true, indicates that the process generates does not need to
332 * hold any output. Otherwise an output queue is used to store the results
333 * of processing each input job.
335 * Results hts_tpool_process pointer on success;
336 * NULL on failure
338 hts_tpool_process *hts_tpool_process_init(hts_tpool *p, int qsize, int in_only) {
339 hts_tpool_process *q = malloc(sizeof(*q));
341 pthread_cond_init(&q->output_avail_c, NULL);
342 pthread_cond_init(&q->input_not_full_c, NULL);
343 pthread_cond_init(&q->input_empty_c, NULL);
344 pthread_cond_init(&q->none_processing_c,NULL);
346 q->p = p;
347 q->input_head = NULL;
348 q->input_tail = NULL;
349 q->output_head = NULL;
350 q->output_tail = NULL;
351 q->next_serial = 0;
352 q->curr_serial = 0;
353 q->n_input = 0;
354 q->n_output = 0;
355 q->n_processing= 0;
356 q->qsize = qsize;
357 q->in_only = in_only;
358 q->shutdown = 0;
359 q->wake_dispatch = 0;
360 q->ref_count = 1;
362 q->next = NULL;
363 q->prev = NULL;
365 hts_tpool_process_attach(p, q);
367 return q;
370 /* Deallocates memory for a thread process-queue.
371 * Must be called before the thread pool is destroyed.
373 void hts_tpool_process_destroy(hts_tpool_process *q) {
374 DBG_OUT(stderr, "Destroying results queue %p\n", q);
376 if (!q)
377 return;
379 // Ensure it's fully drained before destroying the queue
380 hts_tpool_process_reset(q, 0);
381 pthread_mutex_lock(&q->p->pool_m);
382 hts_tpool_process_detach(q->p, q);
383 hts_tpool_process_shutdown(q);
385 // Maybe a worker is scanning this queue, so delay destruction
386 if (--q->ref_count > 0) {
387 pthread_mutex_unlock(&q->p->pool_m);
388 return;
391 pthread_cond_destroy(&q->output_avail_c);
392 pthread_cond_destroy(&q->input_not_full_c);
393 pthread_cond_destroy(&q->input_empty_c);
394 pthread_cond_destroy(&q->none_processing_c);
395 pthread_mutex_unlock(&q->p->pool_m);
397 free(q);
399 DBG_OUT(stderr, "Destroyed results queue %p\n", q);
404 * Attach and detach a thread process-queue with / from the thread pool
405 * scheduler.
407 * We need to do attach after making a thread process, but may also wish
408 * to temporarily detach if we wish to stop running jobs on a specific
409 * process while permitting other process to continue.
411 void hts_tpool_process_attach(hts_tpool *p, hts_tpool_process *q) {
412 pthread_mutex_lock(&p->pool_m);
413 if (p->q_head) {
414 q->next = p->q_head;
415 q->prev = p->q_head->prev;
416 p->q_head->prev->next = q;
417 p->q_head->prev = q;
418 } else {
419 q->next = q;
420 q->prev = q;
422 p->q_head = q;
423 assert(p->q_head && p->q_head->prev && p->q_head->next);
424 pthread_mutex_unlock(&p->pool_m);
427 void hts_tpool_process_detach(hts_tpool *p, hts_tpool_process *q) {
428 pthread_mutex_lock(&p->pool_m);
429 if (!p->q_head || !q->prev || !q->next)
430 goto done;
432 hts_tpool_process *curr = p->q_head, *first = curr;
433 do {
434 if (curr == q) {
435 q->next->prev = q->prev;
436 q->prev->next = q->next;
437 p->q_head = q->next;
438 q->next = q->prev = NULL;
440 // Last one
441 if (p->q_head == q)
442 p->q_head = NULL;
443 break;
446 curr = curr->next;
447 } while (curr != first);
449 done:
450 pthread_mutex_unlock(&p->pool_m);
454 /* ----------------------------------------------------------------------------
455 * The thread pool.
458 #define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec)
461 * A worker thread.
463 * Once woken, each thread checks each process-queue in the pool in turn,
464 * looking for input jobs that also have room for the output (if it requires
465 * storing). If found, we execute it and repeat.
467 * If we checked all input queues and find no such job, then we wait until we
468 * are signalled to check again.
470 static void *tpool_worker(void *arg) {
471 hts_tpool_worker *w = (hts_tpool_worker *)arg;
472 hts_tpool *p = w->p;
473 hts_tpool_job *j;
475 for (;;) {
476 // Pop an item off the pool queue
477 pthread_mutex_lock(&p->pool_m);
479 assert(p->q_head == 0 || (p->q_head->prev && p->q_head->next));
481 int work_to_do = 0;
482 hts_tpool_process *first = p->q_head, *q = first;
483 do {
484 if (p->shutdown)
485 break;
487 // Iterate over queues, finding one with jobs and also
488 // room to put the result.
489 //if (q && q->input_head && !hts_tpool_process_output_full(q)) {
490 if (q && q->input_head && q->qsize - q->n_output > p->tsize - p->nwaiting) {
491 //printf("Work\n");
492 work_to_do = 1;
493 break;
496 if (q) q = q->next;
497 } while (q && q != first);
499 if (p->shutdown) {
500 shutdown:
501 #ifdef DEBUG
502 fprintf(stderr, "%d: Shutting down\n", worker_id(p));
503 #endif
504 pthread_mutex_unlock(&p->pool_m);
505 pthread_exit(NULL);
508 if (!work_to_do) {
509 // We scanned all queues and cannot process any, so we wait.
510 p->nwaiting++;
512 // Push this thread to the top of the waiting stack
513 if (p->t_stack_top == -1 || p->t_stack_top > w->idx)
514 p->t_stack_top = w->idx;
516 p->t_stack[w->idx] = 1;
517 // printf("%2d: no work. In=%d Proc=%d Out=%d full=%d\n",
518 // w->idx, p->q_head->n_input, p->q_head->n_processing, p->q_head->n_output,
519 // hts_tpool_process_output_full(p->q_head));
520 pthread_cond_wait(&w->pending_c, &p->pool_m);
521 p->t_stack[w->idx] = 0;
523 /* Find new t_stack_top */
524 int i;
525 p->t_stack_top = -1;
526 for (i = 0; i < p->tsize; i++) {
527 if (p->t_stack[i]) {
528 p->t_stack_top = i;
529 break;
533 p->nwaiting--;
534 pthread_mutex_unlock(&p->pool_m);
535 continue; // To outer for(;;) loop.
538 // Otherwise work_to_do, so process as many items in this queue as
539 // possible before switching to another queue. This means threads
540 // often end up being dedicated to one type of work.
541 q->ref_count++;
542 while (q->input_head && q->qsize - q->n_output > q->n_processing) {
543 if (p->shutdown)
544 goto shutdown;
546 j = q->input_head;
547 assert(j->p == p);
549 if (!(q->input_head = j->next))
550 q->input_tail = NULL;
552 // Transitioning from full queue to not-full means we can wake up
553 // any blocked dispatch threads. We broadcast this as it's only
554 // happening once (on the transition) rather than every time we
555 // are below qsize.
556 // (I wish I could remember why io_lib rev 3660 changed this from
557 // == to >=, but keeping it just incase!)
558 q->n_processing++;
559 if (q->n_input-- >= q->qsize)
560 pthread_cond_broadcast(&q->input_not_full_c);
562 if (q->n_input == 0)
563 pthread_cond_signal(&q->input_empty_c);
565 p->njobs--; // Total number of jobs; used to adjust to CPU scaling
567 pthread_mutex_unlock(&p->pool_m);
569 DBG_OUT(stderr, "%d: Processing queue %p, serial %"PRId64"\n",
570 worker_id(j->p), q, j->serial);
572 hts_tpool_add_result(j, j->func(j->arg));
573 //memset(j, 0xbb, sizeof(*j));
574 free(j);
576 pthread_mutex_lock(&p->pool_m);
578 if (--q->ref_count == 0) // we were the last user
579 hts_tpool_process_destroy(q);
580 else
581 // Out of jobs on this queue, so restart search from next one.
582 // This is equivalent to "work-stealing".
583 p->q_head = q->next;
585 pthread_mutex_unlock(&p->pool_m);
588 return NULL;
591 static void wake_next_worker(hts_tpool_process *q, int locked) {
592 hts_tpool *p = q->p;
593 if (!locked)
594 pthread_mutex_lock(&p->pool_m);
596 // Update the q_head to be this queue so we'll start processing
597 // the queue we know to have results.
598 assert(q->prev && q->next); // attached
599 p->q_head = q;
601 // Wake up if we have more jobs waiting than CPUs. This partially combats
602 // CPU frequency scaling effects. Starting too many threads and then
603 // running out of jobs can cause each thread to have lots of start/stop
604 // cycles, which then translates often to CPU frequency scaling
605 // adjustments. Instead it is better to only start as many threads as we
606 // need to keep the throughput up, meaning some threads run flat out and
607 // others are idle.
609 // This isn't perfect as we need to know how many can actually start,
610 // rather than how many are waiting. A limit on output queue size makes
611 // these two figures different.
612 assert(p->njobs >= q->n_input);
614 int running = p->tsize - p->nwaiting;
615 int sig = p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting
616 && (!q || q->n_processing < q->qsize - q->n_output);
618 //#define AVG_USAGE
619 #ifdef AVG_USAGE
620 // Track average number of running threads and try to keep close.
621 // We permit this to change, but slowly. This avoids "boom and bust" cycles
622 // where we read a lot of data, start a lot of jobs, then become idle again.
623 // This way some threads run steadily and others dormant, which is better
624 // for throughput.
626 // It's 50:50 if this is a good thing. It helps some tasks quite significantly
627 // while slightly hindering other (perhaps more usual) jobs.
629 if (++p->n_count == 256) {
630 p->n_count >>= 1;
631 p->n_running >>= 1;
633 p->n_running += running;
634 // Built in lag to avoid see-sawing. Is this safe in all cases?
635 if (sig && p->n_count >= 128 && running*p->n_count > p->n_running+1) sig=0;
636 #endif
638 if (0) {
639 printf("%d waiting, %d running, %d output, %d, arun %d => %d\t", p->njobs,
640 running, q->n_output, q->qsize - q->n_output,
641 p->n_running/p->n_count, sig);
642 int i;
643 for (i = 0; i < p->tsize; i++)
644 putchar("x "[p->t_stack[i]]);
645 putchar('\n');
648 if (sig)
649 pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
651 if (!locked)
652 pthread_mutex_unlock(&p->pool_m);
656 * Creates a worker pool with n worker threads.
658 * Returns pool pointer on success;
659 * NULL on failure
661 hts_tpool *hts_tpool_init(int n) {
662 int i;
663 hts_tpool *p = malloc(sizeof(*p));
664 p->tsize = n;
665 p->njobs = 0;
666 p->nwaiting = 0;
667 p->shutdown = 0;
668 p->q_head = NULL;
669 p->t_stack = NULL;
670 p->n_count = 0;
671 p->n_running = 0;
672 p->t = malloc(n * sizeof(p->t[0]));
674 pthread_mutexattr_t attr;
675 pthread_mutexattr_init(&attr);
676 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
677 pthread_mutex_init(&p->pool_m, &attr);
678 pthread_mutexattr_destroy(&attr);
680 if (!(p->t_stack = malloc(n * sizeof(*p->t_stack))))
681 return NULL;
682 p->t_stack_top = -1;
684 pthread_mutex_lock(&p->pool_m);
686 for (i = 0; i < n; i++) {
687 hts_tpool_worker *w = &p->t[i];
688 p->t_stack[i] = 0;
689 w->p = p;
690 w->idx = i;
691 pthread_cond_init(&w->pending_c, NULL);
692 if (0 != pthread_create(&w->tid, NULL, tpool_worker, w)) {
693 pthread_mutex_unlock(&p->pool_m);
694 return NULL;
698 pthread_mutex_unlock(&p->pool_m);
700 return p;
704 * Returns the number of requested threads for a pool.
706 int hts_tpool_size(hts_tpool *p) {
707 return p->tsize;
711 * Adds an item to the work pool.
713 * Returns 0 on success
714 * -1 on failure
716 int hts_tpool_dispatch(hts_tpool *p, hts_tpool_process *q,
717 void *(*func)(void *arg), void *arg) {
718 return hts_tpool_dispatch2(p, q, func, arg, 0);
722 * As above but optional non-block flag.
724 * nonblock 0 => block if input queue is full
725 * nonblock +1 => don't block if input queue is full, but do not add task
726 * nonblock -1 => add task regardless of whether queue is full (over-size)
728 int hts_tpool_dispatch2(hts_tpool *p, hts_tpool_process *q,
729 void *(*func)(void *arg), void *arg, int nonblock) {
730 hts_tpool_job *j;
732 pthread_mutex_lock(&p->pool_m);
734 DBG_OUT(stderr, "Dispatching job for queue %p, serial %"PRId64"\n",
735 q, q->curr_serial);
737 if (q->n_input >= q->qsize && nonblock == 1) {
738 pthread_mutex_unlock(&p->pool_m);
739 errno = EAGAIN;
740 return -1;
743 if (!(j = malloc(sizeof(*j)))) {
744 pthread_mutex_unlock(&p->pool_m);
745 return -1;
747 j->func = func;
748 j->arg = arg;
749 j->next = NULL;
750 j->p = p;
751 j->q = q;
752 j->serial = q->curr_serial++;
754 if (nonblock == 0) {
755 while (q->n_input >= q->qsize && !q->shutdown && !q->wake_dispatch)
756 pthread_cond_wait(&q->input_not_full_c, &q->p->pool_m);
757 if (q->shutdown) {
758 free(j);
759 pthread_mutex_unlock(&p->pool_m);
760 return -1;
762 if (q->wake_dispatch) {
763 //fprintf(stderr, "Wake => non-block for this operation\n");
764 q->wake_dispatch = 0;
768 p->njobs++; // total across all queues
769 q->n_input++; // queue specific
771 if (q->input_tail) {
772 q->input_tail->next = j;
773 q->input_tail = j;
774 } else {
775 q->input_head = q->input_tail = j;
778 DBG_OUT(stderr, "Dispatched (serial %"PRId64")\n", j->serial);
780 // Let a worker know we have data.
781 // Keep incoming queue at 1 per running thread, so there is always
782 // something waiting when they end their current task. If we go above
783 // this signal to start more threads (if available). This has the effect
784 // of concentrating jobs to fewer cores when we are I/O bound, which in
785 // turn benefits systems with auto CPU frequency scaling.
786 if (!q->shutdown)
787 wake_next_worker(q, 1);
789 pthread_mutex_unlock(&p->pool_m);
791 return 0;
795 * Wakes up a single thread stuck in dispatch and make it return with
796 * errno EAGAIN.
798 void hts_tpool_wake_dispatch(hts_tpool_process *q) {
799 pthread_mutex_lock(&q->p->pool_m);
800 q->wake_dispatch = 1;
801 pthread_cond_signal(&q->input_not_full_c);
802 pthread_mutex_unlock(&q->p->pool_m);
806 * Flushes the process-queue, but doesn't exit. This simply drains the queue
807 * and ensures all worker threads have finished their current tasks
808 * associated with this process.
810 * NOT: This does not mean the worker threads are not executing jobs in
811 * another process-queue.
813 * Returns 0 on success;
814 * -1 on failure
816 int hts_tpool_process_flush(hts_tpool_process *q) {
817 int i;
818 hts_tpool *p = q->p;
820 DBG_OUT(stderr, "Flushing pool %p\n", p);
822 // Drains the queue
823 pthread_mutex_lock(&p->pool_m);
825 // Wake up everything for the final sprint!
826 for (i = 0; i < p->tsize; i++)
827 if (p->t_stack[i])
828 pthread_cond_signal(&p->t[i].pending_c);
830 // Ensure there is room for the final sprint.
831 // Shouldn't be possible to get here, but just incase.
832 if (q->qsize < q->n_output + q->n_input + q->n_processing)
833 q->qsize = q->n_output + q->n_input + q->n_processing;
835 // Wait for n_input and n_processing to hit zero.
836 while (q->n_input || q->n_processing) {
837 while (q->n_input)
838 pthread_cond_wait(&q->input_empty_c, &p->pool_m);
839 if (q->shutdown) break;
840 while (q->n_processing)
841 pthread_cond_wait(&q->none_processing_c, &p->pool_m);
842 if (q->shutdown) break;
845 pthread_mutex_unlock(&p->pool_m);
847 DBG_OUT(stderr, "Flushed complete for pool %p, queue %p\n", p, q);
849 return 0;
853 * Resets a process to the intial state.
855 * This removes any queued up input jobs, disables any notification of
856 * new results/output, flushes what is left and then discards any
857 * queued output. Anything consumer stuck in a wait on results to
858 * appear should stay stuck and will only wake up when new data is
859 * pushed through the queue.
861 * Returns 0 on success;
862 * -1 on failure
864 int hts_tpool_process_reset(hts_tpool_process *q, int free_results) {
865 pthread_mutex_lock(&q->p->pool_m);
866 // prevent next_result from returning data during our flush
867 q->next_serial = INT_MAX;
869 // Purge any queued input not yet being acted upon
870 hts_tpool_job *j, *jn;
871 for (j = q->input_head; j; j = jn) {
872 //fprintf(stderr, "Discard input %d\n", j->serial);
873 jn = j->next;
874 free(j);
876 q->input_head = q->input_tail = NULL;
877 q->n_input = 0;
879 // Purge any queued output, thus ensuring we have room to flush.
880 hts_tpool_result *r, *rn;
881 for (r = q->output_head; r; r = rn) {
882 //fprintf(stderr, "Discard output %d\n", r->serial);
883 rn = r->next;
884 hts_tpool_delete_result(r, free_results);
886 q->output_head = q->output_tail = NULL;
887 q->n_output = 0;
888 pthread_mutex_unlock(&q->p->pool_m);
890 // Wait for any jobs being processed to complete.
891 // (TODO: consider how to cancel any currently processing jobs.
892 // Probably this is too hard.)
893 if (hts_tpool_process_flush(q) != 0)
894 return -1;
896 // Discard any new output.
897 pthread_mutex_lock(&q->p->pool_m);
898 for (r = q->output_head; r; r = rn) {
899 //fprintf(stderr, "Discard output %d\n", r->serial);
900 rn = r->next;
901 hts_tpool_delete_result(r, free_results);
903 q->output_head = q->output_tail = NULL;
904 q->n_output = 0;
906 // Finally reset the serial back to the starting point.
907 q->next_serial = q->curr_serial = 0;
908 pthread_cond_signal(&q->input_not_full_c);
909 pthread_mutex_unlock(&q->p->pool_m);
911 return 0;
914 /* Returns the process queue size */
915 int hts_tpool_process_qsize(hts_tpool_process *q) {
916 return q->qsize;
920 * Destroys a thread pool. The threads are joined into the main
921 * thread so they will finish their current work load.
923 void hts_tpool_destroy(hts_tpool *p) {
924 int i;
926 DBG_OUT(stderr, "Destroying pool %p\n", p);
928 /* Send shutdown message to worker threads */
929 pthread_mutex_lock(&p->pool_m);
930 p->shutdown = 1;
932 DBG_OUT(stderr, "Sending shutdown request\n");
934 for (i = 0; i < p->tsize; i++)
935 pthread_cond_signal(&p->t[i].pending_c);
937 pthread_mutex_unlock(&p->pool_m);
939 DBG_OUT(stderr, "Shutdown complete\n");
941 for (i = 0; i < p->tsize; i++)
942 pthread_join(p->t[i].tid, NULL);
944 pthread_mutex_destroy(&p->pool_m);
945 for (i = 0; i < p->tsize; i++)
946 pthread_cond_destroy(&p->t[i].pending_c);
948 if (p->t_stack)
949 free(p->t_stack);
951 free(p->t);
952 free(p);
954 DBG_OUT(stderr, "Destroyed pool %p\n", p);
959 * Destroys a thread pool without waiting on jobs to complete.
960 * Use hts_tpool_kill(p) to quickly exit after a fatal error.
962 void hts_tpool_kill(hts_tpool *p) {
963 int i;
965 DBG_OUT(stderr, "Destroying pool %p, kill=%d\n", p, kill);
967 for (i = 0; i < p->tsize; i++)
968 pthread_kill(p->t[i].tid, SIGINT);
970 pthread_mutex_destroy(&p->pool_m);
971 for (i = 0; i < p->tsize; i++)
972 pthread_cond_destroy(&p->t[i].pending_c);
974 if (p->t_stack)
975 free(p->t_stack);
977 free(p->t);
978 free(p);
980 DBG_OUT(stderr, "Destroyed pool %p\n", p);
984 /*=============================================================================
985 * Test app.
987 * This can be considered both as a basic test and as a worked example for
988 * various usage patterns.
989 *=============================================================================
992 #ifdef TEST_MAIN
994 #include <stdio.h>
996 #ifndef TASK_SIZE
997 #define TASK_SIZE 1000
998 #endif
1000 /*-----------------------------------------------------------------------------
1001 * Unordered x -> x*x test.
1002 * Results arrive in order of completion.
1004 void *doit_square_u(void *arg) {
1005 int job = *(int *)arg;
1007 usleep(random() % 100000); // to coerce job completion out of order
1009 printf("RESULT: %d\n", job*job);
1011 free(arg);
1012 return NULL;
1015 int test_square_u(int n) {
1016 hts_tpool *p = hts_tpool_init(n);
1017 hts_tpool_process *q = hts_tpool_process_init(p, n*2, 1);
1018 int i;
1020 // Dispatch jobs
1021 for (i = 0; i < TASK_SIZE; i++) {
1022 int *ip = malloc(sizeof(*ip));
1023 *ip = i;
1024 hts_tpool_dispatch(p, q, doit_square_u, ip);
1027 hts_tpool_process_flush(q);
1028 hts_tpool_process_destroy(q);
1029 hts_tpool_destroy(p);
1031 return 0;
1035 /*-----------------------------------------------------------------------------
1036 * Ordered x -> x*x test.
1037 * Results arrive in numerical order.
1039 * This implementation uses a non-blocking dispatch to avoid dead-locks
1040 * where one job takes too long to complete.
1042 void *doit_square(void *arg) {
1043 int job = *(int *)arg;
1044 int *res;
1046 // One excessively slow, to stress test output queue filling and
1047 // excessive out of order scenarios.
1048 usleep(500000 * ((job&31)==31) + random() % 10000);
1050 res = malloc(sizeof(*res));
1051 *res = (job<0) ? -job*job : job*job;
1053 free(arg);
1054 return res;
1057 int test_square(int n) {
1058 hts_tpool *p = hts_tpool_init(n);
1059 hts_tpool_process *q = hts_tpool_process_init(p, n*2, 0);
1060 int i;
1061 hts_tpool_result *r;
1063 // Dispatch jobs
1064 for (i = 0; i < TASK_SIZE; i++) {
1065 int *ip = malloc(sizeof(*ip));
1066 *ip = i;
1067 int blk;
1069 do {
1070 // In the situation where some jobs take much longer than
1071 // others, we could end up blocking here as we haven't got
1072 // any room in the output queue to place it. (We don't launch a
1073 // job if the output queue is full.)
1075 // This happens when the next serial number to fetch is, eg, 50
1076 // but jobs 51-100 have all executed really fast and appeared in
1077 // the output queue before 50. A dispatch & check-results
1078 // alternating loop can fail to find job 50 many times over until
1079 // eventually the dispatch blocks before it arrives.
1081 // Our solution is to dispatch in non-blocking mode so we are
1082 // always to either dispatch or consume a result.
1083 blk = hts_tpool_dispatch2(p, q, doit_square, ip, 1);
1085 // Check for results.
1086 if ((r = hts_tpool_next_result(q))) {
1087 printf("RESULT: %d\n", *(int *)r->data);
1088 hts_tpool_delete_result(r, 1);
1090 if (blk == -1) {
1091 // The alternative is a separate thread for dispatching and/or
1092 // consumption of results. See test_squareB.
1093 putchar('.'); fflush(stdout);
1094 usleep(10000);
1096 } while (blk == -1);
1099 // Wait for any input-queued up jobs or in-progress jobs to complete.
1100 hts_tpool_process_flush(q);
1102 while ((r = hts_tpool_next_result(q))) {
1103 printf("RESULT: %d\n", *(int *)r->data);
1104 hts_tpool_delete_result(r, 1);
1107 hts_tpool_process_destroy(q);
1108 hts_tpool_destroy(p);
1110 return 0;
1113 /*-----------------------------------------------------------------------------
1114 * Ordered x -> x*x test.
1115 * Results arrive in numerical order.
1117 * This implementation uses separate dispatching threads and job consumption
1118 * threads (main thread). This means it can use a blocking calls for
1119 * simplicity elsewhere.
1121 struct squareB_opt {
1122 hts_tpool *p;
1123 hts_tpool_process *q;
1124 int n;
1126 static void *test_squareB_dispatcher(void *arg) {
1127 struct squareB_opt *o = (struct squareB_opt *)arg;
1128 int i, *ip;
1130 for (i = 0; i < o->n; i++) {
1131 ip = malloc(sizeof(*ip));
1132 *ip = i;
1134 hts_tpool_dispatch(o->p, o->q, doit_square, ip);
1137 // Dispatch an sentinel job to mark the end
1138 *(ip = malloc(sizeof(*ip))) = -1;
1139 hts_tpool_dispatch(o->p, o->q, doit_square, ip);
1140 pthread_exit(NULL);
1143 int test_squareB(int n) {
1144 hts_tpool *p = hts_tpool_init(n);
1145 hts_tpool_process *q = hts_tpool_process_init(p, n*2, 0);
1146 struct squareB_opt o = {p, q, TASK_SIZE};
1147 pthread_t tid;
1149 // Launch our job creation thread.
1150 pthread_create(&tid, NULL, test_squareB_dispatcher, &o);
1152 // Consume all results until we find the end-of-job marker.
1153 for(;;) {
1154 hts_tpool_result *r = hts_tpool_next_result_wait(q);
1155 int x = *(int *)r->data;
1156 hts_tpool_delete_result(r, 1);
1157 if (x == -1)
1158 break;
1159 printf("RESULT: %d\n", x);
1162 // Wait for any input-queued up jobs or in-progress jobs to complete.
1163 // This should do nothing as we've been executing until the termination
1164 // marker of -1.
1165 hts_tpool_process_flush(q);
1166 assert(hts_tpool_next_result(q) == NULL);
1168 hts_tpool_process_destroy(q);
1169 hts_tpool_destroy(p);
1170 pthread_join(tid, NULL);
1172 return 0;
1176 /*-----------------------------------------------------------------------------
1177 * A simple pipeline test.
1178 * We use a dediocated input thread that does the initial generation of job
1179 * and dispatch, several execution steps running in a shared pool, and a
1180 * dedicated output thread that prints up the final result. It's key that our
1181 * pipeline execution stages can run independently and don't themselves have
1182 * any waits. To achieve this we therefore also use some dedicated threads
1183 * that take the output from one queue and resubmits the job as the input to
1184 * the next queue.
1186 * More generally this could perhaps be a single pipeline thread that
1187 * marshalls multiple queues and their interactions, but this is simply a
1188 * demonstration of a single pipeline.
1190 * Our process fills out the bottom byte of a 32-bit int and then shifts it
1191 * left one byte at a time. Only the final stage needs to be ordered. Each
1192 * stage uses its own queue.
1194 * Possible improvement: we only need the last stage to be ordered. By
1195 * allocating our own serial numbers for the first job and manually setting
1196 * these serials in the last job, perhaps we can permit out of order execution
1197 * of all the inbetween stages. (I doubt it'll affect speed much though.)
1200 static void *pipe_input_thread(void *arg);
1201 static void *pipe_stage1(void *arg);
1202 static void *pipe_stage2(void *arg);
1203 static void *pipe_stage3(void *arg);
1204 static void *pipe_output_thread(void *arg);
1206 typedef struct {
1207 hts_tpool *p;
1208 hts_tpool_process *q1;
1209 hts_tpool_process *q2;
1210 hts_tpool_process *q3;
1211 int n;
1212 } pipe_opt;
1214 typedef struct {
1215 pipe_opt *o;
1216 unsigned int x;
1217 int eof; // set with last job.
1218 } pipe_job;
1220 static void *pipe_input_thread(void *arg) {
1221 pipe_opt *o = (pipe_opt *)arg;
1223 int i;
1224 for (i = 1; i <= o->n; i++) {
1225 pipe_job *j = malloc(sizeof(*j));
1226 j->o = o;
1227 j->x = i;
1228 j->eof = (i == o->n);
1230 printf("I %08x\n", j->x);
1232 if (hts_tpool_dispatch(o->p, o->q1, pipe_stage1, j) != 0) {
1233 free(j);
1234 pthread_exit((void *)1);
1238 pthread_exit(NULL);
1241 static void *pipe_stage1(void *arg) {
1242 pipe_job *j = (pipe_job *)arg;
1244 j->x <<= 8;
1245 usleep(random() % 10000); // fast job
1246 printf("1 %08x\n", j->x);
1248 return j;
1251 static void *pipe_stage1to2(void *arg) {
1252 pipe_opt *o = (pipe_opt *)arg;
1253 hts_tpool_result *r;
1255 while ((r = hts_tpool_next_result_wait(o->q1))) {
1256 pipe_job *j = (pipe_job *)r->data;
1257 hts_tpool_delete_result(r, 0);
1258 if (hts_tpool_dispatch(j->o->p, j->o->q2, pipe_stage2, j) != 0)
1259 pthread_exit((void *)1);
1260 if (j->eof)
1261 break;
1264 pthread_exit(NULL);
1267 static void *pipe_stage2(void *arg) {
1268 pipe_job *j = (pipe_job *)arg;
1270 j->x <<= 8;
1271 usleep(random() % 100000); // slow job
1272 printf("2 %08x\n", j->x);
1274 return j;
1277 static void *pipe_stage2to3(void *arg) {
1278 pipe_opt *o = (pipe_opt *)arg;
1279 hts_tpool_result *r;
1281 while ((r = hts_tpool_next_result_wait(o->q2))) {
1282 pipe_job *j = (pipe_job *)r->data;
1283 hts_tpool_delete_result(r, 0);
1284 if (hts_tpool_dispatch(j->o->p, j->o->q3, pipe_stage3, j) != 0)
1285 pthread_exit((void *)1);
1286 if (j->eof)
1287 break;
1290 pthread_exit(NULL);
1293 static void *pipe_stage3(void *arg) {
1294 pipe_job *j = (pipe_job *)arg;
1296 usleep(random() % 10000); // fast job
1297 j->x <<= 8;
1298 return j;
1301 static void *pipe_output_thread(void *arg) {
1302 pipe_opt *o = (pipe_opt *)arg;
1303 hts_tpool_result *r;
1305 while ((r = hts_tpool_next_result_wait(o->q3))) {
1306 pipe_job *j = (pipe_job *)r->data;
1307 int eof = j->eof;
1308 printf("O %08x\n", j->x);
1309 hts_tpool_delete_result(r, 1);
1310 if (eof)
1311 break;
1314 pthread_exit(NULL);
1317 int test_pipe(int n) {
1318 hts_tpool *p = hts_tpool_init(n);
1319 hts_tpool_process *q1 = hts_tpool_process_init(p, n*2, 0);
1320 hts_tpool_process *q2 = hts_tpool_process_init(p, n*2, 0);
1321 hts_tpool_process *q3 = hts_tpool_process_init(p, n*2, 0);
1322 pipe_opt o = {p, q1, q2, q3, TASK_SIZE};
1323 pthread_t tidIto1, tid1to2, tid2to3, tid3toO;
1324 void *retv;
1325 int ret;
1327 // Launch our data source and sink threads.
1328 pthread_create(&tidIto1, NULL, pipe_input_thread, &o);
1329 pthread_create(&tid1to2, NULL, pipe_stage1to2, &o);
1330 pthread_create(&tid2to3, NULL, pipe_stage2to3, &o);
1331 pthread_create(&tid3toO, NULL, pipe_output_thread, &o);
1333 // Wait for tasks to finish.
1334 ret = 0;
1335 pthread_join(tidIto1, &retv); ret |= (retv != NULL);
1336 pthread_join(tid1to2, &retv); ret |= (retv != NULL);
1337 pthread_join(tid2to3, &retv); ret |= (retv != NULL);
1338 pthread_join(tid3toO, &retv); ret |= (retv != NULL);
1339 printf("Return value %d\n", ret);
1341 hts_tpool_process_destroy(q1);
1342 hts_tpool_process_destroy(q2);
1343 hts_tpool_process_destroy(q3);
1344 hts_tpool_destroy(p);
1346 return 0;
1349 /*-----------------------------------------------------------------------------*/
1350 int main(int argc, char **argv) {
1351 int n;
1352 srandom(0);
1354 if (argc < 3) {
1355 fprintf(stderr, "Usage: %s command n_threads\n", argv[0]);
1356 fprintf(stderr, "Where commands are:\n\n");
1357 fprintf(stderr, "unordered # Unordered output\n");
1358 fprintf(stderr, "ordered1 # Main thread with non-block API\n");
1359 fprintf(stderr, "ordered2 # Dispatch thread, blocking API\n");
1360 fprintf(stderr, "pipe # Multi-stage pipeline, several queues\n");
1361 exit(1);
1364 n = atoi(argv[2]);
1365 if (strcmp(argv[1], "unordered") == 0) return test_square_u(n);
1366 if (strcmp(argv[1], "ordered1") == 0) return test_square(n);
1367 if (strcmp(argv[1], "ordered2") == 0) return test_squareB(n);
1368 if (strcmp(argv[1], "pipe") == 0) return test_pipe(n);
1370 fprintf(stderr, "Unknown sub-command\n");
1371 exit(1);
1373 #endif