1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "sdk_util/thread_pool.h"
12 #include "sdk_util/auto_lock.h"
16 // Initializes mutex, semaphores and a pool of threads. If 0 is passed for
17 // num_threads, all work will be performed on the dispatch thread.
18 ThreadPool::ThreadPool(int num_threads
)
19 : threads_(NULL
), counter_(0), num_threads_(num_threads
), exiting_(false),
20 user_data_(NULL
), user_work_function_(NULL
) {
21 if (num_threads_
> 0) {
23 status
= sem_init(&work_sem_
, 0, 0);
25 fprintf(stderr
, "Failed to initialize semaphore!\n");
28 status
= sem_init(&done_sem_
, 0, 0);
30 fprintf(stderr
, "Failed to initialize semaphore!\n");
33 threads_
= new pthread_t
[num_threads_
];
34 for (int i
= 0; i
< num_threads_
; i
++) {
35 status
= pthread_create(&threads_
[i
], NULL
, WorkerThreadEntry
, this);
37 fprintf(stderr
, "Failed to create thread!\n");
44 // Post exit request, wait for all threads to join, and cleanup.
45 ThreadPool::~ThreadPool() {
46 if (num_threads_
> 0) {
49 sem_destroy(&done_sem_
);
50 sem_destroy(&work_sem_
);
54 // Setup work parameters. This function is called from the dispatch thread,
55 // when all worker threads are sleeping.
56 void ThreadPool::Setup(int counter
, WorkFunction work
, void *data
) {
58 user_work_function_
= work
;
62 // Return decremented task counter. This function
63 // can be called from multiple threads at any given time.
64 int ThreadPool::DecCounter() {
65 return AtomicAddFetch(&counter_
, -1);
68 // Set exit flag, post and join all the threads in the pool. This function is
69 // called only from the dispatch thread, and only when all worker threads are
71 void ThreadPool::PostExitAndJoinAll() {
73 // Wake up all the sleeping worker threads.
74 for (int i
= 0; i
< num_threads_
; ++i
)
77 for (int i
= 0; i
< num_threads_
; ++i
)
78 pthread_join(threads_
[i
], &retval
);
81 // Main work loop - one for each worker thread.
82 void ThreadPool::WorkLoop() {
84 // Wait for work. If no work is availble, this thread will sleep here.
88 // Grab a task index to work on from the counter.
89 int task_index
= DecCounter();
92 user_work_function_(task_index
, user_data_
);
94 // Post to dispatch thread work is done.
99 // pthread entry point for a worker thread.
100 void* ThreadPool::WorkerThreadEntry(void* thiz
) {
101 static_cast<ThreadPool
*>(thiz
)->WorkLoop();
105 // DispatchMany() will dispatch a set of tasks across worker threads.
106 // Note: This function will block until all work has completed.
107 void ThreadPool::DispatchMany(int num_tasks
, WorkFunction work
, void* data
) {
108 // On entry, all worker threads are sleeping.
109 Setup(num_tasks
, work
, data
);
111 // Wake up the worker threads & have them process tasks.
112 for (int i
= 0; i
< num_threads_
; i
++)
113 sem_post(&work_sem_
);
115 // Worker threads are now awake and busy.
117 // This dispatch thread will now sleep-wait for the worker threads to finish.
118 for (int i
= 0; i
< num_threads_
; i
++)
119 sem_wait(&done_sem_
);
120 // On exit, all tasks are done and all worker threads are sleeping again.
123 // DispatchHere will dispatch all tasks on this thread.
124 void ThreadPool::DispatchHere(int num_tasks
, WorkFunction work
, void* data
) {
125 for (int i
= 0; i
< num_tasks
; i
++)
129 // Dispatch() will invoke the user supplied work function across
130 // one or more threads for each task.
131 // Note: This function will block until all work has completed.
132 void ThreadPool::Dispatch(int num_tasks
, WorkFunction work
, void* data
) {
133 if (num_threads_
> 0)
134 DispatchMany(num_tasks
, work
, data
);
136 DispatchHere(num_tasks
, work
, data
);
139 } // namespace sdk_util