2 * Copyright (c) 2016-2020, Yann Collet, Facebook, Inc.
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
8 * You may select, at your option, one of the above-listed licenses.
12 /* ====== Dependencies ======= */
13 #include <stddef.h> /* size_t */
14 #include "debug.h" /* assert */
15 #include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */
18 /* ====== Compiler specifics ====== */
20 # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
24 #ifdef ZSTD_MULTITHREAD
26 #include "threading.h" /* pthread adaptation */
28 /* A job is a function and an opaque argument */
29 typedef struct POOL_job_s
{
30 POOL_function function
;
35 ZSTD_customMem customMem
;
36 /* Keep track of the threads */
37 ZSTD_pthread_t
* threads
;
38 size_t threadCapacity
;
41 /* The queue is a circular buffer */
47 /* The number of threads working on jobs */
48 size_t numThreadsBusy
;
49 /* Indicates if the queue is empty */
52 /* The mutex protects the queue */
53 ZSTD_pthread_mutex_t queueMutex
;
54 /* Condition variable for pushers to wait on when the queue is full */
55 ZSTD_pthread_cond_t queuePushCond
;
56 /* Condition variables for poppers to wait on when the queue is empty */
57 ZSTD_pthread_cond_t queuePopCond
;
58 /* Indicates if the queue is shutting down */
63 * Work thread for the thread pool.
64 * Waits for jobs and executes them.
65 * @returns : NULL on failure else non-null.
67 static void* POOL_thread(void* opaque
) {
68 POOL_ctx
* const ctx
= (POOL_ctx
*)opaque
;
69 if (!ctx
) { return NULL
; }
71 /* Lock the mutex and wait for a non-empty queue or until shutdown */
72 ZSTD_pthread_mutex_lock(&ctx
->queueMutex
);
74 while ( ctx
->queueEmpty
75 || (ctx
->numThreadsBusy
>= ctx
->threadLimit
) ) {
77 /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
78 * a few threads will be shutdown while !queueEmpty,
79 * but enough threads will remain active to finish the queue */
80 ZSTD_pthread_mutex_unlock(&ctx
->queueMutex
);
83 ZSTD_pthread_cond_wait(&ctx
->queuePopCond
, &ctx
->queueMutex
);
85 /* Pop a job off the queue */
86 { POOL_job
const job
= ctx
->queue
[ctx
->queueHead
];
87 ctx
->queueHead
= (ctx
->queueHead
+ 1) % ctx
->queueSize
;
88 ctx
->numThreadsBusy
++;
89 ctx
->queueEmpty
= ctx
->queueHead
== ctx
->queueTail
;
90 /* Unlock the mutex, signal a pusher, and run the job */
91 ZSTD_pthread_cond_signal(&ctx
->queuePushCond
);
92 ZSTD_pthread_mutex_unlock(&ctx
->queueMutex
);
94 job
.function(job
.opaque
);
96 /* If the intended queue size was 0, signal after finishing job */
97 ZSTD_pthread_mutex_lock(&ctx
->queueMutex
);
98 ctx
->numThreadsBusy
--;
99 if (ctx
->queueSize
== 1) {
100 ZSTD_pthread_cond_signal(&ctx
->queuePushCond
);
102 ZSTD_pthread_mutex_unlock(&ctx
->queueMutex
);
105 assert(0); /* Unreachable */
108 POOL_ctx
* POOL_create(size_t numThreads
, size_t queueSize
) {
109 return POOL_create_advanced(numThreads
, queueSize
, ZSTD_defaultCMem
);
112 POOL_ctx
* POOL_create_advanced(size_t numThreads
, size_t queueSize
,
113 ZSTD_customMem customMem
) {
115 /* Check parameters */
116 if (!numThreads
) { return NULL
; }
117 /* Allocate the context and zero initialize */
118 ctx
= (POOL_ctx
*)ZSTD_calloc(sizeof(POOL_ctx
), customMem
);
119 if (!ctx
) { return NULL
; }
120 /* Initialize the job queue.
121 * It needs one extra space since one space is wasted to differentiate
122 * empty and full queues.
124 ctx
->queueSize
= queueSize
+ 1;
125 ctx
->queue
= (POOL_job
*)ZSTD_malloc(ctx
->queueSize
* sizeof(POOL_job
), customMem
);
128 ctx
->numThreadsBusy
= 0;
132 error
|= ZSTD_pthread_mutex_init(&ctx
->queueMutex
, NULL
);
133 error
|= ZSTD_pthread_cond_init(&ctx
->queuePushCond
, NULL
);
134 error
|= ZSTD_pthread_cond_init(&ctx
->queuePopCond
, NULL
);
135 if (error
) { POOL_free(ctx
); return NULL
; }
138 /* Allocate space for the thread handles */
139 ctx
->threads
= (ZSTD_pthread_t
*)ZSTD_malloc(numThreads
* sizeof(ZSTD_pthread_t
), customMem
);
140 ctx
->threadCapacity
= 0;
141 ctx
->customMem
= customMem
;
142 /* Check for errors */
143 if (!ctx
->threads
|| !ctx
->queue
) { POOL_free(ctx
); return NULL
; }
144 /* Initialize the threads */
146 for (i
= 0; i
< numThreads
; ++i
) {
147 if (ZSTD_pthread_create(&ctx
->threads
[i
], NULL
, &POOL_thread
, ctx
)) {
148 ctx
->threadCapacity
= i
;
152 ctx
->threadCapacity
= numThreads
;
153 ctx
->threadLimit
= numThreads
;
159 Shutdown the queue, wake any sleeping threads, and join all of the threads.
161 static void POOL_join(POOL_ctx
* ctx
) {
162 /* Shut down the queue */
163 ZSTD_pthread_mutex_lock(&ctx
->queueMutex
);
165 ZSTD_pthread_mutex_unlock(&ctx
->queueMutex
);
166 /* Wake up sleeping threads */
167 ZSTD_pthread_cond_broadcast(&ctx
->queuePushCond
);
168 ZSTD_pthread_cond_broadcast(&ctx
->queuePopCond
);
169 /* Join all of the threads */
171 for (i
= 0; i
< ctx
->threadCapacity
; ++i
) {
172 ZSTD_pthread_join(ctx
->threads
[i
], NULL
); /* note : could fail */
176 void POOL_free(POOL_ctx
*ctx
) {
177 if (!ctx
) { return; }
179 ZSTD_pthread_mutex_destroy(&ctx
->queueMutex
);
180 ZSTD_pthread_cond_destroy(&ctx
->queuePushCond
);
181 ZSTD_pthread_cond_destroy(&ctx
->queuePopCond
);
182 ZSTD_free(ctx
->queue
, ctx
->customMem
);
183 ZSTD_free(ctx
->threads
, ctx
->customMem
);
184 ZSTD_free(ctx
, ctx
->customMem
);
189 size_t POOL_sizeof(POOL_ctx
*ctx
) {
190 if (ctx
==NULL
) return 0; /* supports sizeof NULL */
192 + ctx
->queueSize
* sizeof(POOL_job
)
193 + ctx
->threadCapacity
* sizeof(ZSTD_pthread_t
);
197 /* @return : 0 on success, 1 on error */
198 static int POOL_resize_internal(POOL_ctx
* ctx
, size_t numThreads
)
200 if (numThreads
<= ctx
->threadCapacity
) {
201 if (!numThreads
) return 1;
202 ctx
->threadLimit
= numThreads
;
205 /* numThreads > threadCapacity */
206 { ZSTD_pthread_t
* const threadPool
= (ZSTD_pthread_t
*)ZSTD_malloc(numThreads
* sizeof(ZSTD_pthread_t
), ctx
->customMem
);
207 if (!threadPool
) return 1;
208 /* replace existing thread pool */
209 memcpy(threadPool
, ctx
->threads
, ctx
->threadCapacity
* sizeof(*threadPool
));
210 ZSTD_free(ctx
->threads
, ctx
->customMem
);
211 ctx
->threads
= threadPool
;
212 /* Initialize additional threads */
214 for (threadId
= ctx
->threadCapacity
; threadId
< numThreads
; ++threadId
) {
215 if (ZSTD_pthread_create(&threadPool
[threadId
], NULL
, &POOL_thread
, ctx
)) {
216 ctx
->threadCapacity
= threadId
;
220 /* successfully expanded */
221 ctx
->threadCapacity
= numThreads
;
222 ctx
->threadLimit
= numThreads
;
226 /* @return : 0 on success, 1 on error */
227 int POOL_resize(POOL_ctx
* ctx
, size_t numThreads
)
230 if (ctx
==NULL
) return 1;
231 ZSTD_pthread_mutex_lock(&ctx
->queueMutex
);
232 result
= POOL_resize_internal(ctx
, numThreads
);
233 ZSTD_pthread_cond_broadcast(&ctx
->queuePopCond
);
234 ZSTD_pthread_mutex_unlock(&ctx
->queueMutex
);
239 * Returns 1 if the queue is full and 0 otherwise.
241 * When queueSize is 1 (pool was created with an intended queueSize of 0),
242 * then a queue is empty if there is a thread free _and_ no job is waiting.
244 static int isQueueFull(POOL_ctx
const* ctx
) {
245 if (ctx
->queueSize
> 1) {
246 return ctx
->queueHead
== ((ctx
->queueTail
+ 1) % ctx
->queueSize
);
248 return (ctx
->numThreadsBusy
== ctx
->threadLimit
) ||
254 static void POOL_add_internal(POOL_ctx
* ctx
, POOL_function function
, void *opaque
)
256 POOL_job
const job
= {function
, opaque
};
258 if (ctx
->shutdown
) return;
261 ctx
->queue
[ctx
->queueTail
] = job
;
262 ctx
->queueTail
= (ctx
->queueTail
+ 1) % ctx
->queueSize
;
263 ZSTD_pthread_cond_signal(&ctx
->queuePopCond
);
266 void POOL_add(POOL_ctx
* ctx
, POOL_function function
, void* opaque
)
269 ZSTD_pthread_mutex_lock(&ctx
->queueMutex
);
270 /* Wait until there is space in the queue for the new job */
271 while (isQueueFull(ctx
) && (!ctx
->shutdown
)) {
272 ZSTD_pthread_cond_wait(&ctx
->queuePushCond
, &ctx
->queueMutex
);
274 POOL_add_internal(ctx
, function
, opaque
);
275 ZSTD_pthread_mutex_unlock(&ctx
->queueMutex
);
279 int POOL_tryAdd(POOL_ctx
* ctx
, POOL_function function
, void* opaque
)
282 ZSTD_pthread_mutex_lock(&ctx
->queueMutex
);
283 if (isQueueFull(ctx
)) {
284 ZSTD_pthread_mutex_unlock(&ctx
->queueMutex
);
287 POOL_add_internal(ctx
, function
, opaque
);
288 ZSTD_pthread_mutex_unlock(&ctx
->queueMutex
);
293 #else /* ZSTD_MULTITHREAD not defined */
295 /* ========================== */
296 /* No multi-threading support */
297 /* ========================== */
300 /* We don't need any data, but if it is empty, malloc() might return NULL. */
304 static POOL_ctx g_ctx
;
306 POOL_ctx
* POOL_create(size_t numThreads
, size_t queueSize
) {
307 return POOL_create_advanced(numThreads
, queueSize
, ZSTD_defaultCMem
);
310 POOL_ctx
* POOL_create_advanced(size_t numThreads
, size_t queueSize
, ZSTD_customMem customMem
) {
317 void POOL_free(POOL_ctx
* ctx
) {
318 assert(!ctx
|| ctx
== &g_ctx
);
322 int POOL_resize(POOL_ctx
* ctx
, size_t numThreads
) {
323 (void)ctx
; (void)numThreads
;
327 void POOL_add(POOL_ctx
* ctx
, POOL_function function
, void* opaque
) {
332 int POOL_tryAdd(POOL_ctx
* ctx
, POOL_function function
, void* opaque
) {
338 size_t POOL_sizeof(POOL_ctx
* ctx
) {
339 if (ctx
==NULL
) return 0; /* supports sizeof NULL */
340 assert(ctx
== &g_ctx
);
344 #endif /* ZSTD_MULTITHREAD */