Remove building with NOCRYPTO option
[minix3.git] / external / public-domain / xz / dist / src / liblzma / common / stream_encoder_mt.c
blob9780ed04cac4f42aa421b0b246dc4a91d553127c
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 /// \file stream_encoder_mt.c
4 /// \brief Multithreaded .xz Stream encoder
5 //
6 // Author: Lasse Collin
7 //
8 // This file has been put into the public domain.
9 // You can do whatever you want with this file.
11 ///////////////////////////////////////////////////////////////////////////////
13 #include "filter_encoder.h"
14 #include "easy_preset.h"
15 #include "block_encoder.h"
16 #include "block_buffer_encoder.h"
17 #include "index_encoder.h"
18 #include "outqueue.h"
21 /// Maximum supported block size. This makes it simpler to prevent integer
22 /// overflows if we are given unusually large block size.
23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
26 typedef enum {
27 /// Waiting for work.
28 THR_IDLE,
30 /// Encoding is in progress.
31 THR_RUN,
33 /// Encoding is in progress but no more input data will
34 /// be read.
35 THR_FINISH,
37 /// The main thread wants the thread to stop whatever it was doing
38 /// but not exit.
39 THR_STOP,
41 /// The main thread wants the thread to exit. We could use
42 /// cancellation but since there's stopped anyway, this is lazier.
43 THR_EXIT,
45 } worker_state;
48 typedef struct worker_thread_s worker_thread;
49 struct worker_thread_s {
50 worker_state state;
52 /// Input buffer of coder->block_size bytes. The main thread will
53 /// put new input into this and update in_size accordingly. Once
54 /// no more input is coming, state will be set to THR_FINISH.
55 uint8_t *in;
57 /// Amount of data available in the input buffer. This is modified
58 /// only by the main thread.
59 size_t in_size;
61 /// Output buffer for this thread. This is set by the main
62 /// thread every time a new Block is started with this thread
63 /// structure.
64 lzma_outbuf *outbuf;
66 /// Pointer to the main structure is needed when putting this
67 /// thread back to the stack of free threads.
68 lzma_coder *coder;
70 /// The allocator is set by the main thread. Since a copy of the
71 /// pointer is kept here, the application must not change the
72 /// allocator before calling lzma_end().
73 const lzma_allocator *allocator;
75 /// Amount of uncompressed data that has already been compressed.
76 uint64_t progress_in;
78 /// Amount of compressed data that is ready.
79 uint64_t progress_out;
81 /// Block encoder
82 lzma_next_coder block_encoder;
84 /// Compression options for this Block
85 lzma_block block_options;
87 /// Next structure in the stack of free worker threads.
88 worker_thread *next;
90 mythread_mutex mutex;
91 mythread_cond cond;
93 /// The ID of this thread is used to join the thread
94 /// when it's not needed anymore.
95 mythread thread_id;
99 struct lzma_coder_s {
100 enum {
101 SEQ_STREAM_HEADER,
102 SEQ_BLOCK,
103 SEQ_INDEX,
104 SEQ_STREAM_FOOTER,
105 } sequence;
107 /// Start a new Block every block_size bytes of input unless
108 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
109 size_t block_size;
111 /// The filter chain currently in use
112 lzma_filter filters[LZMA_FILTERS_MAX + 1];
115 /// Index to hold sizes of the Blocks
116 lzma_index *index;
118 /// Index encoder
119 lzma_next_coder index_encoder;
122 /// Stream Flags for encoding the Stream Header and Stream Footer.
123 lzma_stream_flags stream_flags;
125 /// Buffer to hold Stream Header and Stream Footer.
126 uint8_t header[LZMA_STREAM_HEADER_SIZE];
128 /// Read position in header[]
129 size_t header_pos;
132 /// Output buffer queue for compressed data
133 lzma_outq outq;
136 /// Maximum wait time if cannot use all the input and cannot
137 /// fill the output buffer. This is in milliseconds.
138 uint32_t timeout;
141 /// Error code from a worker thread
142 lzma_ret thread_error;
144 /// Array of allocated thread-specific structures
145 worker_thread *threads;
147 /// Number of structures in "threads" above. This is also the
148 /// number of threads that will be created at maximum.
149 uint32_t threads_max;
151 /// Number of thread structures that have been initialized, and
152 /// thus the number of worker threads actually created so far.
153 uint32_t threads_initialized;
155 /// Stack of free threads. When a thread finishes, it puts itself
156 /// back into this stack. This starts as empty because threads
157 /// are created only when actually needed.
158 worker_thread *threads_free;
160 /// The most recent worker thread to which the main thread writes
161 /// the new input from the application.
162 worker_thread *thr;
165 /// Amount of uncompressed data in Blocks that have already
166 /// been finished.
167 uint64_t progress_in;
169 /// Amount of compressed data in Stream Header + Blocks that
170 /// have already been finished.
171 uint64_t progress_out;
174 mythread_mutex mutex;
175 mythread_cond cond;
179 /// Tell the main thread that something has gone wrong.
180 static void
181 worker_error(worker_thread *thr, lzma_ret ret)
183 assert(ret != LZMA_OK);
184 assert(ret != LZMA_STREAM_END);
186 mythread_sync(thr->coder->mutex) {
187 if (thr->coder->thread_error == LZMA_OK)
188 thr->coder->thread_error = ret;
190 mythread_cond_signal(&thr->coder->cond);
193 return;
197 static worker_state
198 worker_encode(worker_thread *thr, worker_state state)
200 assert(thr->progress_in == 0);
201 assert(thr->progress_out == 0);
203 // Set the Block options.
204 thr->block_options = (lzma_block){
205 .version = 0,
206 .check = thr->coder->stream_flags.check,
207 .compressed_size = thr->coder->outq.buf_size_max,
208 .uncompressed_size = thr->coder->block_size,
210 // TODO: To allow changing the filter chain, the filters
211 // array must be copied to each worker_thread.
212 .filters = thr->coder->filters,
215 // Calculate maximum size of the Block Header. This amount is
216 // reserved in the beginning of the buffer so that Block Header
217 // along with Compressed Size and Uncompressed Size can be
218 // written there.
219 lzma_ret ret = lzma_block_header_size(&thr->block_options);
220 if (ret != LZMA_OK) {
221 worker_error(thr, ret);
222 return THR_STOP;
225 // Initialize the Block encoder.
226 ret = lzma_block_encoder_init(&thr->block_encoder,
227 thr->allocator, &thr->block_options);
228 if (ret != LZMA_OK) {
229 worker_error(thr, ret);
230 return THR_STOP;
233 size_t in_pos = 0;
234 size_t in_size = 0;
236 thr->outbuf->size = thr->block_options.header_size;
237 const size_t out_size = thr->coder->outq.buf_size_max;
239 do {
240 mythread_sync(thr->mutex) {
241 // Store in_pos and out_pos into *thr so that
242 // an application may read them via
243 // lzma_get_progress() to get progress information.
245 // NOTE: These aren't updated when the encoding
246 // finishes. Instead, the final values are taken
247 // later from thr->outbuf.
248 thr->progress_in = in_pos;
249 thr->progress_out = thr->outbuf->size;
251 while (in_size == thr->in_size
252 && thr->state == THR_RUN)
253 mythread_cond_wait(&thr->cond, &thr->mutex);
255 state = thr->state;
256 in_size = thr->in_size;
259 // Return if we were asked to stop or exit.
260 if (state >= THR_STOP)
261 return state;
263 lzma_action action = state == THR_FINISH
264 ? LZMA_FINISH : LZMA_RUN;
266 // Limit the amount of input given to the Block encoder
267 // at once. This way this thread can react fairly quickly
268 // if the main thread wants us to stop or exit.
269 static const size_t in_chunk_max = 16384;
270 size_t in_limit = in_size;
271 if (in_size - in_pos > in_chunk_max) {
272 in_limit = in_pos + in_chunk_max;
273 action = LZMA_RUN;
276 ret = thr->block_encoder.code(
277 thr->block_encoder.coder, thr->allocator,
278 thr->in, &in_pos, in_limit, thr->outbuf->buf,
279 &thr->outbuf->size, out_size, action);
280 } while (ret == LZMA_OK && thr->outbuf->size < out_size);
282 switch (ret) {
283 case LZMA_STREAM_END:
284 assert(state == THR_FINISH);
286 // Encode the Block Header. By doing it after
287 // the compression, we can store the Compressed Size
288 // and Uncompressed Size fields.
289 ret = lzma_block_header_encode(&thr->block_options,
290 thr->outbuf->buf);
291 if (ret != LZMA_OK) {
292 worker_error(thr, ret);
293 return THR_STOP;
296 break;
298 case LZMA_OK:
299 // The data was incompressible. Encode it using uncompressed
300 // LZMA2 chunks.
302 // First wait that we have gotten all the input.
303 mythread_sync(thr->mutex) {
304 while (thr->state == THR_RUN)
305 mythread_cond_wait(&thr->cond, &thr->mutex);
307 state = thr->state;
308 in_size = thr->in_size;
311 if (state >= THR_STOP)
312 return state;
314 // Do the encoding. This takes care of the Block Header too.
315 thr->outbuf->size = 0;
316 ret = lzma_block_uncomp_encode(&thr->block_options,
317 thr->in, in_size, thr->outbuf->buf,
318 &thr->outbuf->size, out_size);
320 // It shouldn't fail.
321 if (ret != LZMA_OK) {
322 worker_error(thr, LZMA_PROG_ERROR);
323 return THR_STOP;
326 break;
328 default:
329 worker_error(thr, ret);
330 return THR_STOP;
333 // Set the size information that will be read by the main thread
334 // to write the Index field.
335 thr->outbuf->unpadded_size
336 = lzma_block_unpadded_size(&thr->block_options);
337 assert(thr->outbuf->unpadded_size != 0);
338 thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
340 return THR_FINISH;
344 static MYTHREAD_RET_TYPE
345 worker_start(void *thr_ptr)
347 worker_thread *thr = thr_ptr;
348 worker_state state = THR_IDLE; // Init to silence a warning
350 while (true) {
351 // Wait for work.
352 mythread_sync(thr->mutex) {
353 while (true) {
354 // The thread is already idle so if we are
355 // requested to stop, just set the state.
356 if (thr->state == THR_STOP) {
357 thr->state = THR_IDLE;
358 mythread_cond_signal(&thr->cond);
361 state = thr->state;
362 if (state != THR_IDLE)
363 break;
365 mythread_cond_wait(&thr->cond, &thr->mutex);
369 assert(state != THR_IDLE);
370 assert(state != THR_STOP);
372 if (state <= THR_FINISH)
373 state = worker_encode(thr, state);
375 if (state == THR_EXIT)
376 break;
378 // Mark the thread as idle unless the main thread has
379 // told us to exit. Signal is needed for the case
380 // where the main thread is waiting for the threads to stop.
381 mythread_sync(thr->mutex) {
382 if (thr->state != THR_EXIT) {
383 thr->state = THR_IDLE;
384 mythread_cond_signal(&thr->cond);
388 mythread_sync(thr->coder->mutex) {
389 // Mark the output buffer as finished if
390 // no errors occurred.
391 thr->outbuf->finished = state == THR_FINISH;
393 // Update the main progress info.
394 thr->coder->progress_in
395 += thr->outbuf->uncompressed_size;
396 thr->coder->progress_out += thr->outbuf->size;
397 thr->progress_in = 0;
398 thr->progress_out = 0;
400 // Return this thread to the stack of free threads.
401 thr->next = thr->coder->threads_free;
402 thr->coder->threads_free = thr;
404 mythread_cond_signal(&thr->coder->cond);
408 // Exiting, free the resources.
409 mythread_mutex_destroy(&thr->mutex);
410 mythread_cond_destroy(&thr->cond);
412 lzma_next_end(&thr->block_encoder, thr->allocator);
413 lzma_free(thr->in, thr->allocator);
414 return MYTHREAD_RET_VALUE;
418 /// Make the threads stop but not exit. Optionally wait for them to stop.
419 static void
420 threads_stop(lzma_coder *coder, bool wait_for_threads)
422 // Tell the threads to stop.
423 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
424 mythread_sync(coder->threads[i].mutex) {
425 coder->threads[i].state = THR_STOP;
426 mythread_cond_signal(&coder->threads[i].cond);
430 if (!wait_for_threads)
431 return;
433 // Wait for the threads to settle in the idle state.
434 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
435 mythread_sync(coder->threads[i].mutex) {
436 while (coder->threads[i].state != THR_IDLE)
437 mythread_cond_wait(&coder->threads[i].cond,
438 &coder->threads[i].mutex);
442 return;
446 /// Stop the threads and free the resources associated with them.
447 /// Wait until the threads have exited.
448 static void
449 threads_end(lzma_coder *coder, const lzma_allocator *allocator)
451 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
452 mythread_sync(coder->threads[i].mutex) {
453 coder->threads[i].state = THR_EXIT;
454 mythread_cond_signal(&coder->threads[i].cond);
458 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
459 int ret = mythread_join(coder->threads[i].thread_id);
460 assert(ret == 0);
461 (void)ret;
464 lzma_free(coder->threads, allocator);
465 return;
469 /// Initialize a new worker_thread structure and create a new thread.
470 static lzma_ret
471 initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator)
473 worker_thread *thr = &coder->threads[coder->threads_initialized];
475 thr->in = lzma_alloc(coder->block_size, allocator);
476 if (thr->in == NULL)
477 return LZMA_MEM_ERROR;
479 if (mythread_mutex_init(&thr->mutex))
480 goto error_mutex;
482 if (mythread_cond_init(&thr->cond))
483 goto error_cond;
485 thr->state = THR_IDLE;
486 thr->allocator = allocator;
487 thr->coder = coder;
488 thr->progress_in = 0;
489 thr->progress_out = 0;
490 thr->block_encoder = LZMA_NEXT_CODER_INIT;
492 if (mythread_create(&thr->thread_id, &worker_start, thr))
493 goto error_thread;
495 ++coder->threads_initialized;
496 coder->thr = thr;
498 return LZMA_OK;
500 error_thread:
501 mythread_cond_destroy(&thr->cond);
503 error_cond:
504 mythread_mutex_destroy(&thr->mutex);
506 error_mutex:
507 lzma_free(thr->in, allocator);
508 return LZMA_MEM_ERROR;
512 static lzma_ret
513 get_thread(lzma_coder *coder, const lzma_allocator *allocator)
515 // If there are no free output subqueues, there is no
516 // point to try getting a thread.
517 if (!lzma_outq_has_buf(&coder->outq))
518 return LZMA_OK;
520 // If there is a free structure on the stack, use it.
521 mythread_sync(coder->mutex) {
522 if (coder->threads_free != NULL) {
523 coder->thr = coder->threads_free;
524 coder->threads_free = coder->threads_free->next;
528 if (coder->thr == NULL) {
529 // If there are no uninitialized structures left, return.
530 if (coder->threads_initialized == coder->threads_max)
531 return LZMA_OK;
533 // Initialize a new thread.
534 return_if_error(initialize_new_thread(coder, allocator));
537 // Reset the parts of the thread state that have to be done
538 // in the main thread.
539 mythread_sync(coder->thr->mutex) {
540 coder->thr->state = THR_RUN;
541 coder->thr->in_size = 0;
542 coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
543 mythread_cond_signal(&coder->thr->cond);
546 return LZMA_OK;
550 static lzma_ret
551 stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator,
552 const uint8_t *restrict in, size_t *restrict in_pos,
553 size_t in_size, lzma_action action)
555 while (*in_pos < in_size
556 || (coder->thr != NULL && action != LZMA_RUN)) {
557 if (coder->thr == NULL) {
558 // Get a new thread.
559 const lzma_ret ret = get_thread(coder, allocator);
560 if (coder->thr == NULL)
561 return ret;
564 // Copy the input data to thread's buffer.
565 size_t thr_in_size = coder->thr->in_size;
566 lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
567 &thr_in_size, coder->block_size);
569 // Tell the Block encoder to finish if
570 // - it has got block_size bytes of input; or
571 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
572 // or LZMA_FULL_BARRIER was used.
574 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
575 const bool finish = thr_in_size == coder->block_size
576 || (*in_pos == in_size && action != LZMA_RUN);
578 bool block_error = false;
580 mythread_sync(coder->thr->mutex) {
581 if (coder->thr->state == THR_IDLE) {
582 // Something has gone wrong with the Block
583 // encoder. It has set coder->thread_error
584 // which we will read a few lines later.
585 block_error = true;
586 } else {
587 // Tell the Block encoder its new amount
588 // of input and update the state if needed.
589 coder->thr->in_size = thr_in_size;
591 if (finish)
592 coder->thr->state = THR_FINISH;
594 mythread_cond_signal(&coder->thr->cond);
598 if (block_error) {
599 lzma_ret ret;
601 mythread_sync(coder->mutex) {
602 ret = coder->thread_error;
605 return ret;
608 if (finish)
609 coder->thr = NULL;
612 return LZMA_OK;
616 /// Wait until more input can be consumed, more output can be read, or
617 /// an optional timeout is reached.
618 static bool
619 wait_for_work(lzma_coder *coder, mythread_condtime *wait_abs,
620 bool *has_blocked, bool has_input)
622 if (coder->timeout != 0 && !*has_blocked) {
623 // Every time when stream_encode_mt() is called via
624 // lzma_code(), *has_blocked starts as false. We set it
625 // to true here and calculate the absolute time when
626 // we must return if there's nothing to do.
628 // The idea of *has_blocked is to avoid unneeded calls
629 // to mythread_condtime_set(), which may do a syscall
630 // depending on the operating system.
631 *has_blocked = true;
632 mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
635 bool timed_out = false;
637 mythread_sync(coder->mutex) {
638 // There are four things that we wait. If one of them
639 // becomes possible, we return.
640 // - If there is input left, we need to get a free
641 // worker thread and an output buffer for it.
642 // - Data ready to be read from the output queue.
643 // - A worker thread indicates an error.
644 // - Time out occurs.
645 while ((!has_input || coder->threads_free == NULL
646 || !lzma_outq_has_buf(&coder->outq))
647 && !lzma_outq_is_readable(&coder->outq)
648 && coder->thread_error == LZMA_OK
649 && !timed_out) {
650 if (coder->timeout != 0)
651 timed_out = mythread_cond_timedwait(
652 &coder->cond, &coder->mutex,
653 wait_abs) != 0;
654 else
655 mythread_cond_wait(&coder->cond,
656 &coder->mutex);
660 return timed_out;
664 static lzma_ret
665 stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator,
666 const uint8_t *restrict in, size_t *restrict in_pos,
667 size_t in_size, uint8_t *restrict out,
668 size_t *restrict out_pos, size_t out_size, lzma_action action)
670 switch (coder->sequence) {
671 case SEQ_STREAM_HEADER:
672 lzma_bufcpy(coder->header, &coder->header_pos,
673 sizeof(coder->header),
674 out, out_pos, out_size);
675 if (coder->header_pos < sizeof(coder->header))
676 return LZMA_OK;
678 coder->header_pos = 0;
679 coder->sequence = SEQ_BLOCK;
681 // Fall through
683 case SEQ_BLOCK: {
684 // Initialized to silence warnings.
685 lzma_vli unpadded_size = 0;
686 lzma_vli uncompressed_size = 0;
687 lzma_ret ret = LZMA_OK;
689 // These are for wait_for_work().
690 bool has_blocked = false;
691 mythread_condtime wait_abs;
693 while (true) {
694 mythread_sync(coder->mutex) {
695 // Check for Block encoder errors.
696 ret = coder->thread_error;
697 if (ret != LZMA_OK) {
698 assert(ret != LZMA_STREAM_END);
699 break;
702 // Try to read compressed data to out[].
703 ret = lzma_outq_read(&coder->outq,
704 out, out_pos, out_size,
705 &unpadded_size,
706 &uncompressed_size);
709 if (ret == LZMA_STREAM_END) {
710 // End of Block. Add it to the Index.
711 ret = lzma_index_append(coder->index,
712 allocator, unpadded_size,
713 uncompressed_size);
715 // If we didn't fill the output buffer yet,
716 // try to read more data. Maybe the next
717 // outbuf has been finished already too.
718 if (*out_pos < out_size)
719 continue;
722 if (ret != LZMA_OK) {
723 // coder->thread_error was set or
724 // lzma_index_append() failed.
725 threads_stop(coder, false);
726 return ret;
729 // Try to give uncompressed data to a worker thread.
730 ret = stream_encode_in(coder, allocator,
731 in, in_pos, in_size, action);
732 if (ret != LZMA_OK) {
733 threads_stop(coder, false);
734 return ret;
737 // See if we should wait or return.
739 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
740 if (*in_pos == in_size) {
741 // LZMA_RUN: More data is probably coming
742 // so return to let the caller fill the
743 // input buffer.
744 if (action == LZMA_RUN)
745 return LZMA_OK;
747 // LZMA_FULL_BARRIER: The same as with
748 // LZMA_RUN but tell the caller that the
749 // barrier was completed.
750 if (action == LZMA_FULL_BARRIER)
751 return LZMA_STREAM_END;
753 // Finishing or flushing isn't completed until
754 // all input data has been encoded and copied
755 // to the output buffer.
756 if (lzma_outq_is_empty(&coder->outq)) {
757 // LZMA_FINISH: Continue to encode
758 // the Index field.
759 if (action == LZMA_FINISH)
760 break;
762 // LZMA_FULL_FLUSH: Return to tell
763 // the caller that flushing was
764 // completed.
765 if (action == LZMA_FULL_FLUSH)
766 return LZMA_STREAM_END;
770 // Return if there is no output space left.
771 // This check must be done after testing the input
772 // buffer, because we might want to use a different
773 // return code.
774 if (*out_pos == out_size)
775 return LZMA_OK;
777 // Neither in nor out has been used completely.
778 // Wait until there's something we can do.
779 if (wait_for_work(coder, &wait_abs, &has_blocked,
780 *in_pos < in_size))
781 return LZMA_TIMED_OUT;
784 // All Blocks have been encoded and the threads have stopped.
785 // Prepare to encode the Index field.
786 return_if_error(lzma_index_encoder_init(
787 &coder->index_encoder, allocator,
788 coder->index));
789 coder->sequence = SEQ_INDEX;
791 // Update the progress info to take the Index and
792 // Stream Footer into account. Those are very fast to encode
793 // so in terms of progress information they can be thought
794 // to be ready to be copied out.
795 coder->progress_out += lzma_index_size(coder->index)
796 + LZMA_STREAM_HEADER_SIZE;
799 // Fall through
801 case SEQ_INDEX: {
802 // Call the Index encoder. It doesn't take any input, so
803 // those pointers can be NULL.
804 const lzma_ret ret = coder->index_encoder.code(
805 coder->index_encoder.coder, allocator,
806 NULL, NULL, 0,
807 out, out_pos, out_size, LZMA_RUN);
808 if (ret != LZMA_STREAM_END)
809 return ret;
811 // Encode the Stream Footer into coder->buffer.
812 coder->stream_flags.backward_size
813 = lzma_index_size(coder->index);
814 if (lzma_stream_footer_encode(&coder->stream_flags,
815 coder->header) != LZMA_OK)
816 return LZMA_PROG_ERROR;
818 coder->sequence = SEQ_STREAM_FOOTER;
821 // Fall through
823 case SEQ_STREAM_FOOTER:
824 lzma_bufcpy(coder->header, &coder->header_pos,
825 sizeof(coder->header),
826 out, out_pos, out_size);
827 return coder->header_pos < sizeof(coder->header)
828 ? LZMA_OK : LZMA_STREAM_END;
831 assert(0);
832 return LZMA_PROG_ERROR;
836 static void
837 stream_encoder_mt_end(lzma_coder *coder, const lzma_allocator *allocator)
839 // Threads must be killed before the output queue can be freed.
840 threads_end(coder, allocator);
841 lzma_outq_end(&coder->outq, allocator);
843 for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
844 lzma_free(coder->filters[i].options, allocator);
846 lzma_next_end(&coder->index_encoder, allocator);
847 lzma_index_end(coder->index, allocator);
849 mythread_cond_destroy(&coder->cond);
850 mythread_mutex_destroy(&coder->mutex);
852 lzma_free(coder, allocator);
853 return;
857 /// Options handling for lzma_stream_encoder_mt_init() and
858 /// lzma_stream_encoder_mt_memusage()
859 static lzma_ret
860 get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
861 const lzma_filter **filters, uint64_t *block_size,
862 uint64_t *outbuf_size_max)
864 // Validate some of the options.
865 if (options == NULL)
866 return LZMA_PROG_ERROR;
868 if (options->flags != 0 || options->threads == 0
869 || options->threads > LZMA_THREADS_MAX)
870 return LZMA_OPTIONS_ERROR;
872 if (options->filters != NULL) {
873 // Filter chain was given, use it as is.
874 *filters = options->filters;
875 } else {
876 // Use a preset.
877 if (lzma_easy_preset(opt_easy, options->preset))
878 return LZMA_OPTIONS_ERROR;
880 *filters = opt_easy->filters;
883 // Block size
884 if (options->block_size > 0) {
885 if (options->block_size > BLOCK_SIZE_MAX)
886 return LZMA_OPTIONS_ERROR;
888 *block_size = options->block_size;
889 } else {
890 // Determine the Block size from the filter chain.
891 *block_size = lzma_mt_block_size(*filters);
892 if (*block_size == 0)
893 return LZMA_OPTIONS_ERROR;
895 assert(*block_size <= BLOCK_SIZE_MAX);
898 // Calculate the maximum amount output that a single output buffer
899 // may need to hold. This is the same as the maximum total size of
900 // a Block.
901 *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
902 if (*outbuf_size_max == 0)
903 return LZMA_MEM_ERROR;
905 return LZMA_OK;
909 static void
910 get_progress(lzma_coder *coder, uint64_t *progress_in, uint64_t *progress_out)
912 // Lock coder->mutex to prevent finishing threads from moving their
913 // progress info from the worker_thread structure to lzma_coder.
914 mythread_sync(coder->mutex) {
915 *progress_in = coder->progress_in;
916 *progress_out = coder->progress_out;
918 for (size_t i = 0; i < coder->threads_initialized; ++i) {
919 mythread_sync(coder->threads[i].mutex) {
920 *progress_in += coder->threads[i].progress_in;
921 *progress_out += coder->threads[i]
922 .progress_out;
927 return;
931 static lzma_ret
932 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
933 const lzma_mt *options)
935 lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
937 // Get the filter chain.
938 lzma_options_easy easy;
939 const lzma_filter *filters;
940 uint64_t block_size;
941 uint64_t outbuf_size_max;
942 return_if_error(get_options(options, &easy, &filters,
943 &block_size, &outbuf_size_max));
945 #if SIZE_MAX < UINT64_MAX
946 if (block_size > SIZE_MAX)
947 return LZMA_MEM_ERROR;
948 #endif
950 // Validate the filter chain so that we can give an error in this
951 // function instead of delaying it to the first call to lzma_code().
952 // The memory usage calculation verifies the filter chain as
953 // a side effect so we take advatange of that.
954 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
955 return LZMA_OPTIONS_ERROR;
957 // Validate the Check ID.
958 if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
959 return LZMA_PROG_ERROR;
961 if (!lzma_check_is_supported(options->check))
962 return LZMA_UNSUPPORTED_CHECK;
964 // Allocate and initialize the base structure if needed.
965 if (next->coder == NULL) {
966 next->coder = lzma_alloc(sizeof(lzma_coder), allocator);
967 if (next->coder == NULL)
968 return LZMA_MEM_ERROR;
970 // For the mutex and condition variable initializations
971 // the error handling has to be done here because
972 // stream_encoder_mt_end() doesn't know if they have
973 // already been initialized or not.
974 if (mythread_mutex_init(&next->coder->mutex)) {
975 lzma_free(next->coder, allocator);
976 next->coder = NULL;
977 return LZMA_MEM_ERROR;
980 if (mythread_cond_init(&next->coder->cond)) {
981 mythread_mutex_destroy(&next->coder->mutex);
982 lzma_free(next->coder, allocator);
983 next->coder = NULL;
984 return LZMA_MEM_ERROR;
987 next->code = &stream_encode_mt;
988 next->end = &stream_encoder_mt_end;
989 next->get_progress = &get_progress;
990 // next->update = &stream_encoder_mt_update;
992 next->coder->filters[0].id = LZMA_VLI_UNKNOWN;
993 next->coder->index_encoder = LZMA_NEXT_CODER_INIT;
994 next->coder->index = NULL;
995 memzero(&next->coder->outq, sizeof(next->coder->outq));
996 next->coder->threads = NULL;
997 next->coder->threads_max = 0;
998 next->coder->threads_initialized = 0;
1001 // Basic initializations
1002 next->coder->sequence = SEQ_STREAM_HEADER;
1003 next->coder->block_size = (size_t)(block_size);
1004 next->coder->thread_error = LZMA_OK;
1005 next->coder->thr = NULL;
1007 // Allocate the thread-specific base structures.
1008 assert(options->threads > 0);
1009 if (next->coder->threads_max != options->threads) {
1010 threads_end(next->coder, allocator);
1012 next->coder->threads = NULL;
1013 next->coder->threads_max = 0;
1015 next->coder->threads_initialized = 0;
1016 next->coder->threads_free = NULL;
1018 next->coder->threads = lzma_alloc(
1019 options->threads * sizeof(worker_thread),
1020 allocator);
1021 if (next->coder->threads == NULL)
1022 return LZMA_MEM_ERROR;
1024 next->coder->threads_max = options->threads;
1025 } else {
1026 // Reuse the old structures and threads. Tell the running
1027 // threads to stop and wait until they have stopped.
1028 threads_stop(next->coder, true);
1031 // Output queue
1032 return_if_error(lzma_outq_init(&next->coder->outq, allocator,
1033 outbuf_size_max, options->threads));
1035 // Timeout
1036 next->coder->timeout = options->timeout;
1038 // Free the old filter chain and copy the new one.
1039 for (size_t i = 0; next->coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1040 lzma_free(next->coder->filters[i].options, allocator);
1042 return_if_error(lzma_filters_copy(
1043 filters, next->coder->filters, allocator));
1045 // Index
1046 lzma_index_end(next->coder->index, allocator);
1047 next->coder->index = lzma_index_init(allocator);
1048 if (next->coder->index == NULL)
1049 return LZMA_MEM_ERROR;
1051 // Stream Header
1052 next->coder->stream_flags.version = 0;
1053 next->coder->stream_flags.check = options->check;
1054 return_if_error(lzma_stream_header_encode(
1055 &next->coder->stream_flags, next->coder->header));
1057 next->coder->header_pos = 0;
1059 // Progress info
1060 next->coder->progress_in = 0;
1061 next->coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1063 return LZMA_OK;
1067 extern LZMA_API(lzma_ret)
1068 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1070 lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1072 strm->internal->supported_actions[LZMA_RUN] = true;
1073 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1074 strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1075 strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1076 strm->internal->supported_actions[LZMA_FINISH] = true;
1078 return LZMA_OK;
1082 // This function name is a monster but it's consistent with the older
1083 // monster names. :-( 31 chars is the max that C99 requires so in that
1084 // sense it's not too long. ;-)
1085 extern LZMA_API(uint64_t)
1086 lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1088 lzma_options_easy easy;
1089 const lzma_filter *filters;
1090 uint64_t block_size;
1091 uint64_t outbuf_size_max;
1093 if (get_options(options, &easy, &filters, &block_size,
1094 &outbuf_size_max) != LZMA_OK)
1095 return UINT64_MAX;
1097 // Memory usage of the input buffers
1098 const uint64_t inbuf_memusage = options->threads * block_size;
1100 // Memory usage of the filter encoders
1101 uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1102 if (filters_memusage == UINT64_MAX)
1103 return UINT64_MAX;
1105 filters_memusage *= options->threads;
1107 // Memory usage of the output queue
1108 const uint64_t outq_memusage = lzma_outq_memusage(
1109 outbuf_size_max, options->threads);
1110 if (outq_memusage == UINT64_MAX)
1111 return UINT64_MAX;
1113 // Sum them with overflow checking.
1114 uint64_t total_memusage = LZMA_MEMUSAGE_BASE + sizeof(lzma_coder)
1115 + options->threads * sizeof(worker_thread);
1117 if (UINT64_MAX - total_memusage < inbuf_memusage)
1118 return UINT64_MAX;
1120 total_memusage += inbuf_memusage;
1122 if (UINT64_MAX - total_memusage < filters_memusage)
1123 return UINT64_MAX;
1125 total_memusage += filters_memusage;
1127 if (UINT64_MAX - total_memusage < outq_memusage)
1128 return UINT64_MAX;
1130 return total_memusage + outq_memusage;