1 ///////////////////////////////////////////////////////////////////////////////
3 /// \file stream_encoder_mt.c
4 /// \brief Multithreaded .xz Stream encoder
6 // Author: Lasse Collin
8 // This file has been put into the public domain.
9 // You can do whatever you want with this file.
11 ///////////////////////////////////////////////////////////////////////////////
13 #include "filter_encoder.h"
14 #include "easy_preset.h"
15 #include "block_encoder.h"
16 #include "block_buffer_encoder.h"
17 #include "index_encoder.h"
21 /// Maximum supported block size. This makes it simpler to prevent integer
22 /// overflows if we are given unusually large block size.
23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
30 /// Encoding is in progress.
33 /// Encoding is in progress but no more input data will
37 /// The main thread wants the thread to stop whatever it was doing
41 /// The main thread wants the thread to exit. We could use
42 /// cancellation but since there's stopped anyway, this is lazier.
48 typedef struct worker_thread_s worker_thread
;
49 struct worker_thread_s
{
52 /// Input buffer of coder->block_size bytes. The main thread will
53 /// put new input into this and update in_size accordingly. Once
54 /// no more input is coming, state will be set to THR_FINISH.
57 /// Amount of data available in the input buffer. This is modified
58 /// only by the main thread.
61 /// Output buffer for this thread. This is set by the main
62 /// thread every time a new Block is started with this thread
66 /// Pointer to the main structure is needed when putting this
67 /// thread back to the stack of free threads.
70 /// The allocator is set by the main thread. Since a copy of the
71 /// pointer is kept here, the application must not change the
72 /// allocator before calling lzma_end().
73 const lzma_allocator
*allocator
;
75 /// Amount of uncompressed data that has already been compressed.
78 /// Amount of compressed data that is ready.
79 uint64_t progress_out
;
82 lzma_next_coder block_encoder
;
84 /// Compression options for this Block
85 lzma_block block_options
;
87 /// Next structure in the stack of free worker threads.
93 /// The ID of this thread is used to join the thread
94 /// when it's not needed anymore.
107 /// Start a new Block every block_size bytes of input unless
108 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
111 /// The filter chain currently in use
112 lzma_filter filters
[LZMA_FILTERS_MAX
+ 1];
115 /// Index to hold sizes of the Blocks
119 lzma_next_coder index_encoder
;
122 /// Stream Flags for encoding the Stream Header and Stream Footer.
123 lzma_stream_flags stream_flags
;
125 /// Buffer to hold Stream Header and Stream Footer.
126 uint8_t header
[LZMA_STREAM_HEADER_SIZE
];
128 /// Read position in header[]
132 /// Output buffer queue for compressed data
136 /// Maximum wait time if cannot use all the input and cannot
137 /// fill the output buffer. This is in milliseconds.
141 /// Error code from a worker thread
142 lzma_ret thread_error
;
144 /// Array of allocated thread-specific structures
145 worker_thread
*threads
;
147 /// Number of structures in "threads" above. This is also the
148 /// number of threads that will be created at maximum.
149 uint32_t threads_max
;
151 /// Number of thread structures that have been initialized, and
152 /// thus the number of worker threads actually created so far.
153 uint32_t threads_initialized
;
155 /// Stack of free threads. When a thread finishes, it puts itself
156 /// back into this stack. This starts as empty because threads
157 /// are created only when actually needed.
158 worker_thread
*threads_free
;
160 /// The most recent worker thread to which the main thread writes
161 /// the new input from the application.
165 /// Amount of uncompressed data in Blocks that have already
167 uint64_t progress_in
;
169 /// Amount of compressed data in Stream Header + Blocks that
170 /// have already been finished.
171 uint64_t progress_out
;
174 mythread_mutex mutex
;
179 /// Tell the main thread that something has gone wrong.
181 worker_error(worker_thread
*thr
, lzma_ret ret
)
183 assert(ret
!= LZMA_OK
);
184 assert(ret
!= LZMA_STREAM_END
);
186 mythread_sync(thr
->coder
->mutex
) {
187 if (thr
->coder
->thread_error
== LZMA_OK
)
188 thr
->coder
->thread_error
= ret
;
190 mythread_cond_signal(&thr
->coder
->cond
);
198 worker_encode(worker_thread
*thr
, worker_state state
)
200 assert(thr
->progress_in
== 0);
201 assert(thr
->progress_out
== 0);
203 // Set the Block options.
204 thr
->block_options
= (lzma_block
){
206 .check
= thr
->coder
->stream_flags
.check
,
207 .compressed_size
= thr
->coder
->outq
.buf_size_max
,
208 .uncompressed_size
= thr
->coder
->block_size
,
210 // TODO: To allow changing the filter chain, the filters
211 // array must be copied to each worker_thread.
212 .filters
= thr
->coder
->filters
,
215 // Calculate maximum size of the Block Header. This amount is
216 // reserved in the beginning of the buffer so that Block Header
217 // along with Compressed Size and Uncompressed Size can be
219 lzma_ret ret
= lzma_block_header_size(&thr
->block_options
);
220 if (ret
!= LZMA_OK
) {
221 worker_error(thr
, ret
);
225 // Initialize the Block encoder.
226 ret
= lzma_block_encoder_init(&thr
->block_encoder
,
227 thr
->allocator
, &thr
->block_options
);
228 if (ret
!= LZMA_OK
) {
229 worker_error(thr
, ret
);
236 thr
->outbuf
->size
= thr
->block_options
.header_size
;
237 const size_t out_size
= thr
->coder
->outq
.buf_size_max
;
240 mythread_sync(thr
->mutex
) {
241 // Store in_pos and out_pos into *thr so that
242 // an application may read them via
243 // lzma_get_progress() to get progress information.
245 // NOTE: These aren't updated when the encoding
246 // finishes. Instead, the final values are taken
247 // later from thr->outbuf.
248 thr
->progress_in
= in_pos
;
249 thr
->progress_out
= thr
->outbuf
->size
;
251 while (in_size
== thr
->in_size
252 && thr
->state
== THR_RUN
)
253 mythread_cond_wait(&thr
->cond
, &thr
->mutex
);
256 in_size
= thr
->in_size
;
259 // Return if we were asked to stop or exit.
260 if (state
>= THR_STOP
)
263 lzma_action action
= state
== THR_FINISH
264 ? LZMA_FINISH
: LZMA_RUN
;
266 // Limit the amount of input given to the Block encoder
267 // at once. This way this thread can react fairly quickly
268 // if the main thread wants us to stop or exit.
269 static const size_t in_chunk_max
= 16384;
270 size_t in_limit
= in_size
;
271 if (in_size
- in_pos
> in_chunk_max
) {
272 in_limit
= in_pos
+ in_chunk_max
;
276 ret
= thr
->block_encoder
.code(
277 thr
->block_encoder
.coder
, thr
->allocator
,
278 thr
->in
, &in_pos
, in_limit
, thr
->outbuf
->buf
,
279 &thr
->outbuf
->size
, out_size
, action
);
280 } while (ret
== LZMA_OK
&& thr
->outbuf
->size
< out_size
);
283 case LZMA_STREAM_END
:
284 assert(state
== THR_FINISH
);
286 // Encode the Block Header. By doing it after
287 // the compression, we can store the Compressed Size
288 // and Uncompressed Size fields.
289 ret
= lzma_block_header_encode(&thr
->block_options
,
291 if (ret
!= LZMA_OK
) {
292 worker_error(thr
, ret
);
299 // The data was incompressible. Encode it using uncompressed
302 // First wait that we have gotten all the input.
303 mythread_sync(thr
->mutex
) {
304 while (thr
->state
== THR_RUN
)
305 mythread_cond_wait(&thr
->cond
, &thr
->mutex
);
308 in_size
= thr
->in_size
;
311 if (state
>= THR_STOP
)
314 // Do the encoding. This takes care of the Block Header too.
315 thr
->outbuf
->size
= 0;
316 ret
= lzma_block_uncomp_encode(&thr
->block_options
,
317 thr
->in
, in_size
, thr
->outbuf
->buf
,
318 &thr
->outbuf
->size
, out_size
);
320 // It shouldn't fail.
321 if (ret
!= LZMA_OK
) {
322 worker_error(thr
, LZMA_PROG_ERROR
);
329 worker_error(thr
, ret
);
333 // Set the size information that will be read by the main thread
334 // to write the Index field.
335 thr
->outbuf
->unpadded_size
336 = lzma_block_unpadded_size(&thr
->block_options
);
337 assert(thr
->outbuf
->unpadded_size
!= 0);
338 thr
->outbuf
->uncompressed_size
= thr
->block_options
.uncompressed_size
;
344 static MYTHREAD_RET_TYPE
345 worker_start(void *thr_ptr
)
347 worker_thread
*thr
= thr_ptr
;
348 worker_state state
= THR_IDLE
; // Init to silence a warning
352 mythread_sync(thr
->mutex
) {
354 // The thread is already idle so if we are
355 // requested to stop, just set the state.
356 if (thr
->state
== THR_STOP
) {
357 thr
->state
= THR_IDLE
;
358 mythread_cond_signal(&thr
->cond
);
362 if (state
!= THR_IDLE
)
365 mythread_cond_wait(&thr
->cond
, &thr
->mutex
);
369 assert(state
!= THR_IDLE
);
370 assert(state
!= THR_STOP
);
372 if (state
<= THR_FINISH
)
373 state
= worker_encode(thr
, state
);
375 if (state
== THR_EXIT
)
378 // Mark the thread as idle unless the main thread has
379 // told us to exit. Signal is needed for the case
380 // where the main thread is waiting for the threads to stop.
381 mythread_sync(thr
->mutex
) {
382 if (thr
->state
!= THR_EXIT
) {
383 thr
->state
= THR_IDLE
;
384 mythread_cond_signal(&thr
->cond
);
388 mythread_sync(thr
->coder
->mutex
) {
389 // Mark the output buffer as finished if
390 // no errors occurred.
391 thr
->outbuf
->finished
= state
== THR_FINISH
;
393 // Update the main progress info.
394 thr
->coder
->progress_in
395 += thr
->outbuf
->uncompressed_size
;
396 thr
->coder
->progress_out
+= thr
->outbuf
->size
;
397 thr
->progress_in
= 0;
398 thr
->progress_out
= 0;
400 // Return this thread to the stack of free threads.
401 thr
->next
= thr
->coder
->threads_free
;
402 thr
->coder
->threads_free
= thr
;
404 mythread_cond_signal(&thr
->coder
->cond
);
408 // Exiting, free the resources.
409 mythread_mutex_destroy(&thr
->mutex
);
410 mythread_cond_destroy(&thr
->cond
);
412 lzma_next_end(&thr
->block_encoder
, thr
->allocator
);
413 lzma_free(thr
->in
, thr
->allocator
);
414 return MYTHREAD_RET_VALUE
;
418 /// Make the threads stop but not exit. Optionally wait for them to stop.
420 threads_stop(lzma_coder
*coder
, bool wait_for_threads
)
422 // Tell the threads to stop.
423 for (uint32_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
424 mythread_sync(coder
->threads
[i
].mutex
) {
425 coder
->threads
[i
].state
= THR_STOP
;
426 mythread_cond_signal(&coder
->threads
[i
].cond
);
430 if (!wait_for_threads
)
433 // Wait for the threads to settle in the idle state.
434 for (uint32_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
435 mythread_sync(coder
->threads
[i
].mutex
) {
436 while (coder
->threads
[i
].state
!= THR_IDLE
)
437 mythread_cond_wait(&coder
->threads
[i
].cond
,
438 &coder
->threads
[i
].mutex
);
446 /// Stop the threads and free the resources associated with them.
447 /// Wait until the threads have exited.
449 threads_end(lzma_coder
*coder
, const lzma_allocator
*allocator
)
451 for (uint32_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
452 mythread_sync(coder
->threads
[i
].mutex
) {
453 coder
->threads
[i
].state
= THR_EXIT
;
454 mythread_cond_signal(&coder
->threads
[i
].cond
);
458 for (uint32_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
459 int ret
= mythread_join(coder
->threads
[i
].thread_id
);
464 lzma_free(coder
->threads
, allocator
);
469 /// Initialize a new worker_thread structure and create a new thread.
471 initialize_new_thread(lzma_coder
*coder
, const lzma_allocator
*allocator
)
473 worker_thread
*thr
= &coder
->threads
[coder
->threads_initialized
];
475 thr
->in
= lzma_alloc(coder
->block_size
, allocator
);
477 return LZMA_MEM_ERROR
;
479 if (mythread_mutex_init(&thr
->mutex
))
482 if (mythread_cond_init(&thr
->cond
))
485 thr
->state
= THR_IDLE
;
486 thr
->allocator
= allocator
;
488 thr
->progress_in
= 0;
489 thr
->progress_out
= 0;
490 thr
->block_encoder
= LZMA_NEXT_CODER_INIT
;
492 if (mythread_create(&thr
->thread_id
, &worker_start
, thr
))
495 ++coder
->threads_initialized
;
501 mythread_cond_destroy(&thr
->cond
);
504 mythread_mutex_destroy(&thr
->mutex
);
507 lzma_free(thr
->in
, allocator
);
508 return LZMA_MEM_ERROR
;
513 get_thread(lzma_coder
*coder
, const lzma_allocator
*allocator
)
515 // If there are no free output subqueues, there is no
516 // point to try getting a thread.
517 if (!lzma_outq_has_buf(&coder
->outq
))
520 // If there is a free structure on the stack, use it.
521 mythread_sync(coder
->mutex
) {
522 if (coder
->threads_free
!= NULL
) {
523 coder
->thr
= coder
->threads_free
;
524 coder
->threads_free
= coder
->threads_free
->next
;
528 if (coder
->thr
== NULL
) {
529 // If there are no uninitialized structures left, return.
530 if (coder
->threads_initialized
== coder
->threads_max
)
533 // Initialize a new thread.
534 return_if_error(initialize_new_thread(coder
, allocator
));
537 // Reset the parts of the thread state that have to be done
538 // in the main thread.
539 mythread_sync(coder
->thr
->mutex
) {
540 coder
->thr
->state
= THR_RUN
;
541 coder
->thr
->in_size
= 0;
542 coder
->thr
->outbuf
= lzma_outq_get_buf(&coder
->outq
);
543 mythread_cond_signal(&coder
->thr
->cond
);
551 stream_encode_in(lzma_coder
*coder
, const lzma_allocator
*allocator
,
552 const uint8_t *restrict in
, size_t *restrict in_pos
,
553 size_t in_size
, lzma_action action
)
555 while (*in_pos
< in_size
556 || (coder
->thr
!= NULL
&& action
!= LZMA_RUN
)) {
557 if (coder
->thr
== NULL
) {
559 const lzma_ret ret
= get_thread(coder
, allocator
);
560 if (coder
->thr
== NULL
)
564 // Copy the input data to thread's buffer.
565 size_t thr_in_size
= coder
->thr
->in_size
;
566 lzma_bufcpy(in
, in_pos
, in_size
, coder
->thr
->in
,
567 &thr_in_size
, coder
->block_size
);
569 // Tell the Block encoder to finish if
570 // - it has got block_size bytes of input; or
571 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
572 // or LZMA_FULL_BARRIER was used.
574 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
575 const bool finish
= thr_in_size
== coder
->block_size
576 || (*in_pos
== in_size
&& action
!= LZMA_RUN
);
578 bool block_error
= false;
580 mythread_sync(coder
->thr
->mutex
) {
581 if (coder
->thr
->state
== THR_IDLE
) {
582 // Something has gone wrong with the Block
583 // encoder. It has set coder->thread_error
584 // which we will read a few lines later.
587 // Tell the Block encoder its new amount
588 // of input and update the state if needed.
589 coder
->thr
->in_size
= thr_in_size
;
592 coder
->thr
->state
= THR_FINISH
;
594 mythread_cond_signal(&coder
->thr
->cond
);
601 mythread_sync(coder
->mutex
) {
602 ret
= coder
->thread_error
;
616 /// Wait until more input can be consumed, more output can be read, or
617 /// an optional timeout is reached.
619 wait_for_work(lzma_coder
*coder
, mythread_condtime
*wait_abs
,
620 bool *has_blocked
, bool has_input
)
622 if (coder
->timeout
!= 0 && !*has_blocked
) {
623 // Every time when stream_encode_mt() is called via
624 // lzma_code(), *has_blocked starts as false. We set it
625 // to true here and calculate the absolute time when
626 // we must return if there's nothing to do.
628 // The idea of *has_blocked is to avoid unneeded calls
629 // to mythread_condtime_set(), which may do a syscall
630 // depending on the operating system.
632 mythread_condtime_set(wait_abs
, &coder
->cond
, coder
->timeout
);
635 bool timed_out
= false;
637 mythread_sync(coder
->mutex
) {
638 // There are four things that we wait. If one of them
639 // becomes possible, we return.
640 // - If there is input left, we need to get a free
641 // worker thread and an output buffer for it.
642 // - Data ready to be read from the output queue.
643 // - A worker thread indicates an error.
644 // - Time out occurs.
645 while ((!has_input
|| coder
->threads_free
== NULL
646 || !lzma_outq_has_buf(&coder
->outq
))
647 && !lzma_outq_is_readable(&coder
->outq
)
648 && coder
->thread_error
== LZMA_OK
650 if (coder
->timeout
!= 0)
651 timed_out
= mythread_cond_timedwait(
652 &coder
->cond
, &coder
->mutex
,
655 mythread_cond_wait(&coder
->cond
,
665 stream_encode_mt(lzma_coder
*coder
, const lzma_allocator
*allocator
,
666 const uint8_t *restrict in
, size_t *restrict in_pos
,
667 size_t in_size
, uint8_t *restrict out
,
668 size_t *restrict out_pos
, size_t out_size
, lzma_action action
)
670 switch (coder
->sequence
) {
671 case SEQ_STREAM_HEADER
:
672 lzma_bufcpy(coder
->header
, &coder
->header_pos
,
673 sizeof(coder
->header
),
674 out
, out_pos
, out_size
);
675 if (coder
->header_pos
< sizeof(coder
->header
))
678 coder
->header_pos
= 0;
679 coder
->sequence
= SEQ_BLOCK
;
684 // Initialized to silence warnings.
685 lzma_vli unpadded_size
= 0;
686 lzma_vli uncompressed_size
= 0;
687 lzma_ret ret
= LZMA_OK
;
689 // These are for wait_for_work().
690 bool has_blocked
= false;
691 mythread_condtime wait_abs
;
694 mythread_sync(coder
->mutex
) {
695 // Check for Block encoder errors.
696 ret
= coder
->thread_error
;
697 if (ret
!= LZMA_OK
) {
698 assert(ret
!= LZMA_STREAM_END
);
702 // Try to read compressed data to out[].
703 ret
= lzma_outq_read(&coder
->outq
,
704 out
, out_pos
, out_size
,
709 if (ret
== LZMA_STREAM_END
) {
710 // End of Block. Add it to the Index.
711 ret
= lzma_index_append(coder
->index
,
712 allocator
, unpadded_size
,
715 // If we didn't fill the output buffer yet,
716 // try to read more data. Maybe the next
717 // outbuf has been finished already too.
718 if (*out_pos
< out_size
)
722 if (ret
!= LZMA_OK
) {
723 // coder->thread_error was set or
724 // lzma_index_append() failed.
725 threads_stop(coder
, false);
729 // Try to give uncompressed data to a worker thread.
730 ret
= stream_encode_in(coder
, allocator
,
731 in
, in_pos
, in_size
, action
);
732 if (ret
!= LZMA_OK
) {
733 threads_stop(coder
, false);
737 // See if we should wait or return.
739 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
740 if (*in_pos
== in_size
) {
741 // LZMA_RUN: More data is probably coming
742 // so return to let the caller fill the
744 if (action
== LZMA_RUN
)
747 // LZMA_FULL_BARRIER: The same as with
748 // LZMA_RUN but tell the caller that the
749 // barrier was completed.
750 if (action
== LZMA_FULL_BARRIER
)
751 return LZMA_STREAM_END
;
753 // Finishing or flushing isn't completed until
754 // all input data has been encoded and copied
755 // to the output buffer.
756 if (lzma_outq_is_empty(&coder
->outq
)) {
757 // LZMA_FINISH: Continue to encode
759 if (action
== LZMA_FINISH
)
762 // LZMA_FULL_FLUSH: Return to tell
763 // the caller that flushing was
765 if (action
== LZMA_FULL_FLUSH
)
766 return LZMA_STREAM_END
;
770 // Return if there is no output space left.
771 // This check must be done after testing the input
772 // buffer, because we might want to use a different
774 if (*out_pos
== out_size
)
777 // Neither in nor out has been used completely.
778 // Wait until there's something we can do.
779 if (wait_for_work(coder
, &wait_abs
, &has_blocked
,
781 return LZMA_TIMED_OUT
;
784 // All Blocks have been encoded and the threads have stopped.
785 // Prepare to encode the Index field.
786 return_if_error(lzma_index_encoder_init(
787 &coder
->index_encoder
, allocator
,
789 coder
->sequence
= SEQ_INDEX
;
791 // Update the progress info to take the Index and
792 // Stream Footer into account. Those are very fast to encode
793 // so in terms of progress information they can be thought
794 // to be ready to be copied out.
795 coder
->progress_out
+= lzma_index_size(coder
->index
)
796 + LZMA_STREAM_HEADER_SIZE
;
802 // Call the Index encoder. It doesn't take any input, so
803 // those pointers can be NULL.
804 const lzma_ret ret
= coder
->index_encoder
.code(
805 coder
->index_encoder
.coder
, allocator
,
807 out
, out_pos
, out_size
, LZMA_RUN
);
808 if (ret
!= LZMA_STREAM_END
)
811 // Encode the Stream Footer into coder->buffer.
812 coder
->stream_flags
.backward_size
813 = lzma_index_size(coder
->index
);
814 if (lzma_stream_footer_encode(&coder
->stream_flags
,
815 coder
->header
) != LZMA_OK
)
816 return LZMA_PROG_ERROR
;
818 coder
->sequence
= SEQ_STREAM_FOOTER
;
823 case SEQ_STREAM_FOOTER
:
824 lzma_bufcpy(coder
->header
, &coder
->header_pos
,
825 sizeof(coder
->header
),
826 out
, out_pos
, out_size
);
827 return coder
->header_pos
< sizeof(coder
->header
)
828 ? LZMA_OK
: LZMA_STREAM_END
;
832 return LZMA_PROG_ERROR
;
837 stream_encoder_mt_end(lzma_coder
*coder
, const lzma_allocator
*allocator
)
839 // Threads must be killed before the output queue can be freed.
840 threads_end(coder
, allocator
);
841 lzma_outq_end(&coder
->outq
, allocator
);
843 for (size_t i
= 0; coder
->filters
[i
].id
!= LZMA_VLI_UNKNOWN
; ++i
)
844 lzma_free(coder
->filters
[i
].options
, allocator
);
846 lzma_next_end(&coder
->index_encoder
, allocator
);
847 lzma_index_end(coder
->index
, allocator
);
849 mythread_cond_destroy(&coder
->cond
);
850 mythread_mutex_destroy(&coder
->mutex
);
852 lzma_free(coder
, allocator
);
857 /// Options handling for lzma_stream_encoder_mt_init() and
858 /// lzma_stream_encoder_mt_memusage()
860 get_options(const lzma_mt
*options
, lzma_options_easy
*opt_easy
,
861 const lzma_filter
**filters
, uint64_t *block_size
,
862 uint64_t *outbuf_size_max
)
864 // Validate some of the options.
866 return LZMA_PROG_ERROR
;
868 if (options
->flags
!= 0 || options
->threads
== 0
869 || options
->threads
> LZMA_THREADS_MAX
)
870 return LZMA_OPTIONS_ERROR
;
872 if (options
->filters
!= NULL
) {
873 // Filter chain was given, use it as is.
874 *filters
= options
->filters
;
877 if (lzma_easy_preset(opt_easy
, options
->preset
))
878 return LZMA_OPTIONS_ERROR
;
880 *filters
= opt_easy
->filters
;
884 if (options
->block_size
> 0) {
885 if (options
->block_size
> BLOCK_SIZE_MAX
)
886 return LZMA_OPTIONS_ERROR
;
888 *block_size
= options
->block_size
;
890 // Determine the Block size from the filter chain.
891 *block_size
= lzma_mt_block_size(*filters
);
892 if (*block_size
== 0)
893 return LZMA_OPTIONS_ERROR
;
895 assert(*block_size
<= BLOCK_SIZE_MAX
);
898 // Calculate the maximum amount output that a single output buffer
899 // may need to hold. This is the same as the maximum total size of
901 *outbuf_size_max
= lzma_block_buffer_bound64(*block_size
);
902 if (*outbuf_size_max
== 0)
903 return LZMA_MEM_ERROR
;
910 get_progress(lzma_coder
*coder
, uint64_t *progress_in
, uint64_t *progress_out
)
912 // Lock coder->mutex to prevent finishing threads from moving their
913 // progress info from the worker_thread structure to lzma_coder.
914 mythread_sync(coder
->mutex
) {
915 *progress_in
= coder
->progress_in
;
916 *progress_out
= coder
->progress_out
;
918 for (size_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
919 mythread_sync(coder
->threads
[i
].mutex
) {
920 *progress_in
+= coder
->threads
[i
].progress_in
;
921 *progress_out
+= coder
->threads
[i
]
932 stream_encoder_mt_init(lzma_next_coder
*next
, const lzma_allocator
*allocator
,
933 const lzma_mt
*options
)
935 lzma_next_coder_init(&stream_encoder_mt_init
, next
, allocator
);
937 // Get the filter chain.
938 lzma_options_easy easy
;
939 const lzma_filter
*filters
;
941 uint64_t outbuf_size_max
;
942 return_if_error(get_options(options
, &easy
, &filters
,
943 &block_size
, &outbuf_size_max
));
945 #if SIZE_MAX < UINT64_MAX
946 if (block_size
> SIZE_MAX
)
947 return LZMA_MEM_ERROR
;
950 // Validate the filter chain so that we can give an error in this
951 // function instead of delaying it to the first call to lzma_code().
952 // The memory usage calculation verifies the filter chain as
953 // a side effect so we take advatange of that.
954 if (lzma_raw_encoder_memusage(filters
) == UINT64_MAX
)
955 return LZMA_OPTIONS_ERROR
;
957 // Validate the Check ID.
958 if ((unsigned int)(options
->check
) > LZMA_CHECK_ID_MAX
)
959 return LZMA_PROG_ERROR
;
961 if (!lzma_check_is_supported(options
->check
))
962 return LZMA_UNSUPPORTED_CHECK
;
964 // Allocate and initialize the base structure if needed.
965 if (next
->coder
== NULL
) {
966 next
->coder
= lzma_alloc(sizeof(lzma_coder
), allocator
);
967 if (next
->coder
== NULL
)
968 return LZMA_MEM_ERROR
;
970 // For the mutex and condition variable initializations
971 // the error handling has to be done here because
972 // stream_encoder_mt_end() doesn't know if they have
973 // already been initialized or not.
974 if (mythread_mutex_init(&next
->coder
->mutex
)) {
975 lzma_free(next
->coder
, allocator
);
977 return LZMA_MEM_ERROR
;
980 if (mythread_cond_init(&next
->coder
->cond
)) {
981 mythread_mutex_destroy(&next
->coder
->mutex
);
982 lzma_free(next
->coder
, allocator
);
984 return LZMA_MEM_ERROR
;
987 next
->code
= &stream_encode_mt
;
988 next
->end
= &stream_encoder_mt_end
;
989 next
->get_progress
= &get_progress
;
990 // next->update = &stream_encoder_mt_update;
992 next
->coder
->filters
[0].id
= LZMA_VLI_UNKNOWN
;
993 next
->coder
->index_encoder
= LZMA_NEXT_CODER_INIT
;
994 next
->coder
->index
= NULL
;
995 memzero(&next
->coder
->outq
, sizeof(next
->coder
->outq
));
996 next
->coder
->threads
= NULL
;
997 next
->coder
->threads_max
= 0;
998 next
->coder
->threads_initialized
= 0;
1001 // Basic initializations
1002 next
->coder
->sequence
= SEQ_STREAM_HEADER
;
1003 next
->coder
->block_size
= (size_t)(block_size
);
1004 next
->coder
->thread_error
= LZMA_OK
;
1005 next
->coder
->thr
= NULL
;
1007 // Allocate the thread-specific base structures.
1008 assert(options
->threads
> 0);
1009 if (next
->coder
->threads_max
!= options
->threads
) {
1010 threads_end(next
->coder
, allocator
);
1012 next
->coder
->threads
= NULL
;
1013 next
->coder
->threads_max
= 0;
1015 next
->coder
->threads_initialized
= 0;
1016 next
->coder
->threads_free
= NULL
;
1018 next
->coder
->threads
= lzma_alloc(
1019 options
->threads
* sizeof(worker_thread
),
1021 if (next
->coder
->threads
== NULL
)
1022 return LZMA_MEM_ERROR
;
1024 next
->coder
->threads_max
= options
->threads
;
1026 // Reuse the old structures and threads. Tell the running
1027 // threads to stop and wait until they have stopped.
1028 threads_stop(next
->coder
, true);
1032 return_if_error(lzma_outq_init(&next
->coder
->outq
, allocator
,
1033 outbuf_size_max
, options
->threads
));
1036 next
->coder
->timeout
= options
->timeout
;
1038 // Free the old filter chain and copy the new one.
1039 for (size_t i
= 0; next
->coder
->filters
[i
].id
!= LZMA_VLI_UNKNOWN
; ++i
)
1040 lzma_free(next
->coder
->filters
[i
].options
, allocator
);
1042 return_if_error(lzma_filters_copy(
1043 filters
, next
->coder
->filters
, allocator
));
1046 lzma_index_end(next
->coder
->index
, allocator
);
1047 next
->coder
->index
= lzma_index_init(allocator
);
1048 if (next
->coder
->index
== NULL
)
1049 return LZMA_MEM_ERROR
;
1052 next
->coder
->stream_flags
.version
= 0;
1053 next
->coder
->stream_flags
.check
= options
->check
;
1054 return_if_error(lzma_stream_header_encode(
1055 &next
->coder
->stream_flags
, next
->coder
->header
));
1057 next
->coder
->header_pos
= 0;
1060 next
->coder
->progress_in
= 0;
1061 next
->coder
->progress_out
= LZMA_STREAM_HEADER_SIZE
;
1067 extern LZMA_API(lzma_ret
)
1068 lzma_stream_encoder_mt(lzma_stream
*strm
, const lzma_mt
*options
)
1070 lzma_next_strm_init(stream_encoder_mt_init
, strm
, options
);
1072 strm
->internal
->supported_actions
[LZMA_RUN
] = true;
1073 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1074 strm
->internal
->supported_actions
[LZMA_FULL_FLUSH
] = true;
1075 strm
->internal
->supported_actions
[LZMA_FULL_BARRIER
] = true;
1076 strm
->internal
->supported_actions
[LZMA_FINISH
] = true;
1082 // This function name is a monster but it's consistent with the older
1083 // monster names. :-( 31 chars is the max that C99 requires so in that
1084 // sense it's not too long. ;-)
1085 extern LZMA_API(uint64_t)
1086 lzma_stream_encoder_mt_memusage(const lzma_mt
*options
)
1088 lzma_options_easy easy
;
1089 const lzma_filter
*filters
;
1090 uint64_t block_size
;
1091 uint64_t outbuf_size_max
;
1093 if (get_options(options
, &easy
, &filters
, &block_size
,
1094 &outbuf_size_max
) != LZMA_OK
)
1097 // Memory usage of the input buffers
1098 const uint64_t inbuf_memusage
= options
->threads
* block_size
;
1100 // Memory usage of the filter encoders
1101 uint64_t filters_memusage
= lzma_raw_encoder_memusage(filters
);
1102 if (filters_memusage
== UINT64_MAX
)
1105 filters_memusage
*= options
->threads
;
1107 // Memory usage of the output queue
1108 const uint64_t outq_memusage
= lzma_outq_memusage(
1109 outbuf_size_max
, options
->threads
);
1110 if (outq_memusage
== UINT64_MAX
)
1113 // Sum them with overflow checking.
1114 uint64_t total_memusage
= LZMA_MEMUSAGE_BASE
+ sizeof(lzma_coder
)
1115 + options
->threads
* sizeof(worker_thread
);
1117 if (UINT64_MAX
- total_memusage
< inbuf_memusage
)
1120 total_memusage
+= inbuf_memusage
;
1122 if (UINT64_MAX
- total_memusage
< filters_memusage
)
1125 total_memusage
+= filters_memusage
;
1127 if (UINT64_MAX
- total_memusage
< outq_memusage
)
1130 return total_memusage
+ outq_memusage
;