1 // Copyright 2011 Google Inc. All Rights Reserved.
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the COPYING file in the root of the source
5 // tree. An additional intellectual property rights grant can be found
6 // in the file PATENTS. All contributing project authors may
7 // be found in the AUTHORS file in the root of the source tree.
8 // -----------------------------------------------------------------------------
10 // Multi-threaded worker
12 // Author: Skal (pascal.massimino@gmail.com)
15 #include <string.h> // for memset()
19 #ifdef WEBP_USE_THREAD
24 typedef HANDLE pthread_t
;
25 typedef CRITICAL_SECTION pthread_mutex_t
;
38 struct WebPWorkerImpl
{
39 pthread_mutex_t mutex_
;
40 pthread_cond_t condition_
;
46 //------------------------------------------------------------------------------
47 // simplistic pthread emulation layer
51 // _beginthreadex requires __stdcall
52 #define THREADFN unsigned int __stdcall
53 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
55 static int pthread_create(pthread_t
* const thread
, const void* attr
,
56 unsigned int (__stdcall
*start
)(void*), void* arg
) {
58 *thread
= (pthread_t
)_beginthreadex(NULL
, /* void *security */
59 0, /* unsigned stack_size */
62 0, /* unsigned initflag */
63 NULL
); /* unsigned *thrdaddr */
64 if (*thread
== NULL
) return 1;
65 SetThreadPriority(*thread
, THREAD_PRIORITY_ABOVE_NORMAL
);
69 static int pthread_join(pthread_t thread
, void** value_ptr
) {
71 return (WaitForSingleObject(thread
, INFINITE
) != WAIT_OBJECT_0
||
72 CloseHandle(thread
) == 0);
76 static int pthread_mutex_init(pthread_mutex_t
* const mutex
, void* mutexattr
) {
78 InitializeCriticalSection(mutex
);
82 static int pthread_mutex_lock(pthread_mutex_t
* const mutex
) {
83 EnterCriticalSection(mutex
);
87 static int pthread_mutex_unlock(pthread_mutex_t
* const mutex
) {
88 LeaveCriticalSection(mutex
);
92 static int pthread_mutex_destroy(pthread_mutex_t
* const mutex
) {
93 DeleteCriticalSection(mutex
);
98 static int pthread_cond_destroy(pthread_cond_t
* const condition
) {
100 ok
&= (CloseHandle(condition
->waiting_sem_
) != 0);
101 ok
&= (CloseHandle(condition
->received_sem_
) != 0);
102 ok
&= (CloseHandle(condition
->signal_event_
) != 0);
106 static int pthread_cond_init(pthread_cond_t
* const condition
, void* cond_attr
) {
108 condition
->waiting_sem_
= CreateSemaphore(NULL
, 0, 1, NULL
);
109 condition
->received_sem_
= CreateSemaphore(NULL
, 0, 1, NULL
);
110 condition
->signal_event_
= CreateEvent(NULL
, FALSE
, FALSE
, NULL
);
111 if (condition
->waiting_sem_
== NULL
||
112 condition
->received_sem_
== NULL
||
113 condition
->signal_event_
== NULL
) {
114 pthread_cond_destroy(condition
);
120 static int pthread_cond_signal(pthread_cond_t
* const condition
) {
122 if (WaitForSingleObject(condition
->waiting_sem_
, 0) == WAIT_OBJECT_0
) {
123 // a thread is waiting in pthread_cond_wait: allow it to be notified
124 ok
= SetEvent(condition
->signal_event_
);
125 // wait until the event is consumed so the signaler cannot consume
126 // the event via its own pthread_cond_wait.
127 ok
&= (WaitForSingleObject(condition
->received_sem_
, INFINITE
) !=
133 static int pthread_cond_wait(pthread_cond_t
* const condition
,
134 pthread_mutex_t
* const mutex
) {
136 // note that there is a consumer available so the signal isn't dropped in
137 // pthread_cond_signal
138 if (!ReleaseSemaphore(condition
->waiting_sem_
, 1, NULL
))
140 // now unlock the mutex so pthread_cond_signal may be issued
141 pthread_mutex_unlock(mutex
);
142 ok
= (WaitForSingleObject(condition
->signal_event_
, INFINITE
) ==
144 ok
&= ReleaseSemaphore(condition
->received_sem_
, 1, NULL
);
145 pthread_mutex_lock(mutex
);
150 # define THREADFN void*
151 # define THREAD_RETURN(val) val
154 //------------------------------------------------------------------------------
156 static void Execute(WebPWorker
* const worker
); // Forward declaration.
158 static THREADFN
ThreadLoop(void* ptr
) {
159 WebPWorker
* const worker
= (WebPWorker
*)ptr
;
162 pthread_mutex_lock(&worker
->impl_
->mutex_
);
163 while (worker
->status_
== OK
) { // wait in idling mode
164 pthread_cond_wait(&worker
->impl_
->condition_
, &worker
->impl_
->mutex_
);
166 if (worker
->status_
== WORK
) {
168 worker
->status_
= OK
;
169 } else if (worker
->status_
== NOT_OK
) { // finish the worker
172 // signal to the main thread that we're done (for Sync())
173 pthread_cond_signal(&worker
->impl_
->condition_
);
174 pthread_mutex_unlock(&worker
->impl_
->mutex_
);
176 return THREAD_RETURN(NULL
); // Thread is finished
179 // main thread state control
180 static void ChangeState(WebPWorker
* const worker
,
181 WebPWorkerStatus new_status
) {
182 // No-op when attempting to change state on a thread that didn't come up.
183 // Checking status_ without acquiring the lock first would result in a data
185 if (worker
->impl_
== NULL
) return;
187 pthread_mutex_lock(&worker
->impl_
->mutex_
);
188 if (worker
->status_
>= OK
) {
189 // wait for the worker to finish
190 while (worker
->status_
!= OK
) {
191 pthread_cond_wait(&worker
->impl_
->condition_
, &worker
->impl_
->mutex_
);
193 // assign new status and release the working thread if needed
194 if (new_status
!= OK
) {
195 worker
->status_
= new_status
;
196 pthread_cond_signal(&worker
->impl_
->condition_
);
199 pthread_mutex_unlock(&worker
->impl_
->mutex_
);
202 #endif // WEBP_USE_THREAD
204 //------------------------------------------------------------------------------
206 static void Init(WebPWorker
* const worker
) {
207 memset(worker
, 0, sizeof(*worker
));
208 worker
->status_
= NOT_OK
;
211 static int Sync(WebPWorker
* const worker
) {
212 #ifdef WEBP_USE_THREAD
213 ChangeState(worker
, OK
);
215 assert(worker
->status_
<= OK
);
216 return !worker
->had_error
;
219 static int Reset(WebPWorker
* const worker
) {
221 worker
->had_error
= 0;
222 if (worker
->status_
< OK
) {
223 #ifdef WEBP_USE_THREAD
224 worker
->impl_
= (WebPWorkerImpl
*)WebPSafeCalloc(1, sizeof(*worker
->impl_
));
225 if (worker
->impl_
== NULL
) {
228 if (pthread_mutex_init(&worker
->impl_
->mutex_
, NULL
)) {
231 if (pthread_cond_init(&worker
->impl_
->condition_
, NULL
)) {
232 pthread_mutex_destroy(&worker
->impl_
->mutex_
);
235 pthread_mutex_lock(&worker
->impl_
->mutex_
);
236 ok
= !pthread_create(&worker
->impl_
->thread_
, NULL
, ThreadLoop
, worker
);
237 if (ok
) worker
->status_
= OK
;
238 pthread_mutex_unlock(&worker
->impl_
->mutex_
);
240 pthread_mutex_destroy(&worker
->impl_
->mutex_
);
241 pthread_cond_destroy(&worker
->impl_
->condition_
);
243 WebPSafeFree(worker
->impl_
);
244 worker
->impl_
= NULL
;
248 worker
->status_
= OK
;
250 } else if (worker
->status_
> OK
) {
253 assert(!ok
|| (worker
->status_
== OK
));
257 static void Execute(WebPWorker
* const worker
) {
258 if (worker
->hook
!= NULL
) {
259 worker
->had_error
|= !worker
->hook(worker
->data1
, worker
->data2
);
263 static void Launch(WebPWorker
* const worker
) {
264 #ifdef WEBP_USE_THREAD
265 ChangeState(worker
, WORK
);
271 static void End(WebPWorker
* const worker
) {
272 #ifdef WEBP_USE_THREAD
273 if (worker
->impl_
!= NULL
) {
274 ChangeState(worker
, NOT_OK
);
275 pthread_join(worker
->impl_
->thread_
, NULL
);
276 pthread_mutex_destroy(&worker
->impl_
->mutex_
);
277 pthread_cond_destroy(&worker
->impl_
->condition_
);
278 WebPSafeFree(worker
->impl_
);
279 worker
->impl_
= NULL
;
282 worker
->status_
= NOT_OK
;
283 assert(worker
->impl_
== NULL
);
285 assert(worker
->status_
== NOT_OK
);
288 //------------------------------------------------------------------------------
290 static WebPWorkerInterface g_worker_interface
= {
291 Init
, Reset
, Sync
, Launch
, Execute
, End
294 int WebPSetWorkerInterface(const WebPWorkerInterface
* const winterface
) {
295 if (winterface
== NULL
||
296 winterface
->Init
== NULL
|| winterface
->Reset
== NULL
||
297 winterface
->Sync
== NULL
|| winterface
->Launch
== NULL
||
298 winterface
->Execute
== NULL
|| winterface
->End
== NULL
) {
301 g_worker_interface
= *winterface
;
305 const WebPWorkerInterface
* WebPGetWorkerInterface(void) {
306 return &g_worker_interface
;
309 //------------------------------------------------------------------------------