1 // SPDX-License-Identifier: 0BSD
3 ///////////////////////////////////////////////////////////////////////////////
5 /// \file stream_encoder_mt.c
6 /// \brief Multithreaded .xz Stream encoder
8 // Author: Lasse Collin
10 ///////////////////////////////////////////////////////////////////////////////
12 #include "filter_encoder.h"
13 #include "easy_preset.h"
14 #include "block_encoder.h"
15 #include "block_buffer_encoder.h"
16 #include "index_encoder.h"
20 /// Maximum supported block size. This makes it simpler to prevent integer
21 /// overflows if we are given unusually large block size.
22 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
29 /// Encoding is in progress.
32 /// Encoding is in progress but no more input data will
36 /// The main thread wants the thread to stop whatever it was doing
40 /// The main thread wants the thread to exit. We could use
41 /// cancellation but since there's stopped anyway, this is lazier.
46 typedef struct lzma_stream_coder_s lzma_stream_coder
;
48 typedef struct worker_thread_s worker_thread
;
49 struct worker_thread_s
{
52 /// Input buffer of coder->block_size bytes. The main thread will
53 /// put new input into this and update in_size accordingly. Once
54 /// no more input is coming, state will be set to THR_FINISH.
57 /// Amount of data available in the input buffer. This is modified
58 /// only by the main thread.
61 /// Output buffer for this thread. This is set by the main
62 /// thread every time a new Block is started with this thread
66 /// Pointer to the main structure is needed when putting this
67 /// thread back to the stack of free threads.
68 lzma_stream_coder
*coder
;
70 /// The allocator is set by the main thread. Since a copy of the
71 /// pointer is kept here, the application must not change the
72 /// allocator before calling lzma_end().
73 const lzma_allocator
*allocator
;
75 /// Amount of uncompressed data that has already been compressed.
78 /// Amount of compressed data that is ready.
79 uint64_t progress_out
;
82 lzma_next_coder block_encoder
;
84 /// Compression options for this Block
85 lzma_block block_options
;
87 /// Filter chain for this thread. By copying the filters array
88 /// to each thread it is possible to change the filter chain
89 /// between Blocks using lzma_filters_update().
90 lzma_filter filters
[LZMA_FILTERS_MAX
+ 1];
92 /// Next structure in the stack of free worker threads.
98 /// The ID of this thread is used to join the thread
99 /// when it's not needed anymore.
104 struct lzma_stream_coder_s
{
112 /// Start a new Block every block_size bytes of input unless
113 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
116 /// The filter chain to use for the next Block.
117 /// This can be updated using lzma_filters_update()
118 /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH.
119 lzma_filter filters
[LZMA_FILTERS_MAX
+ 1];
121 /// A copy of filters[] will be put here when attempting to get
122 /// a new worker thread. This will be copied to a worker thread
123 /// when a thread becomes free and then this cache is marked as
124 /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache
125 /// the filter options from filters[] would get uselessly copied
126 /// multiple times (allocated and freed) when waiting for a new free
129 /// This is freed if filters[] is updated via lzma_filters_update().
130 lzma_filter filters_cache
[LZMA_FILTERS_MAX
+ 1];
133 /// Index to hold sizes of the Blocks
137 lzma_next_coder index_encoder
;
140 /// Stream Flags for encoding the Stream Header and Stream Footer.
141 lzma_stream_flags stream_flags
;
143 /// Buffer to hold Stream Header and Stream Footer.
144 uint8_t header
[LZMA_STREAM_HEADER_SIZE
];
146 /// Read position in header[]
150 /// Output buffer queue for compressed data
153 /// How much memory to allocate for each lzma_outbuf.buf
154 size_t outbuf_alloc_size
;
157 /// Maximum wait time if cannot use all the input and cannot
158 /// fill the output buffer. This is in milliseconds.
162 /// Error code from a worker thread
163 lzma_ret thread_error
;
165 /// Array of allocated thread-specific structures
166 worker_thread
*threads
;
168 /// Number of structures in "threads" above. This is also the
169 /// number of threads that will be created at maximum.
170 uint32_t threads_max
;
172 /// Number of thread structures that have been initialized, and
173 /// thus the number of worker threads actually created so far.
174 uint32_t threads_initialized
;
176 /// Stack of free threads. When a thread finishes, it puts itself
177 /// back into this stack. This starts as empty because threads
178 /// are created only when actually needed.
179 worker_thread
*threads_free
;
181 /// The most recent worker thread to which the main thread writes
182 /// the new input from the application.
186 /// Amount of uncompressed data in Blocks that have already
188 uint64_t progress_in
;
190 /// Amount of compressed data in Stream Header + Blocks that
191 /// have already been finished.
192 uint64_t progress_out
;
195 mythread_mutex mutex
;
200 /// Tell the main thread that something has gone wrong.
202 worker_error(worker_thread
*thr
, lzma_ret ret
)
204 assert(ret
!= LZMA_OK
);
205 assert(ret
!= LZMA_STREAM_END
);
207 mythread_sync(thr
->coder
->mutex
) {
208 if (thr
->coder
->thread_error
== LZMA_OK
)
209 thr
->coder
->thread_error
= ret
;
211 mythread_cond_signal(&thr
->coder
->cond
);
219 worker_encode(worker_thread
*thr
, size_t *out_pos
, worker_state state
)
221 assert(thr
->progress_in
== 0);
222 assert(thr
->progress_out
== 0);
224 // Set the Block options.
225 thr
->block_options
= (lzma_block
){
227 .check
= thr
->coder
->stream_flags
.check
,
228 .compressed_size
= thr
->outbuf
->allocated
,
229 .uncompressed_size
= thr
->coder
->block_size
,
230 .filters
= thr
->filters
,
233 // Calculate maximum size of the Block Header. This amount is
234 // reserved in the beginning of the buffer so that Block Header
235 // along with Compressed Size and Uncompressed Size can be
237 lzma_ret ret
= lzma_block_header_size(&thr
->block_options
);
238 if (ret
!= LZMA_OK
) {
239 worker_error(thr
, ret
);
243 // Initialize the Block encoder.
244 ret
= lzma_block_encoder_init(&thr
->block_encoder
,
245 thr
->allocator
, &thr
->block_options
);
246 if (ret
!= LZMA_OK
) {
247 worker_error(thr
, ret
);
254 *out_pos
= thr
->block_options
.header_size
;
255 const size_t out_size
= thr
->outbuf
->allocated
;
258 mythread_sync(thr
->mutex
) {
259 // Store in_pos and *out_pos into *thr so that
260 // an application may read them via
261 // lzma_get_progress() to get progress information.
263 // NOTE: These aren't updated when the encoding
264 // finishes. Instead, the final values are taken
265 // later from thr->outbuf.
266 thr
->progress_in
= in_pos
;
267 thr
->progress_out
= *out_pos
;
269 while (in_size
== thr
->in_size
270 && thr
->state
== THR_RUN
)
271 mythread_cond_wait(&thr
->cond
, &thr
->mutex
);
274 in_size
= thr
->in_size
;
277 // Return if we were asked to stop or exit.
278 if (state
>= THR_STOP
)
281 lzma_action action
= state
== THR_FINISH
282 ? LZMA_FINISH
: LZMA_RUN
;
284 // Limit the amount of input given to the Block encoder
285 // at once. This way this thread can react fairly quickly
286 // if the main thread wants us to stop or exit.
287 static const size_t in_chunk_max
= 16384;
288 size_t in_limit
= in_size
;
289 if (in_size
- in_pos
> in_chunk_max
) {
290 in_limit
= in_pos
+ in_chunk_max
;
294 ret
= thr
->block_encoder
.code(
295 thr
->block_encoder
.coder
, thr
->allocator
,
296 thr
->in
, &in_pos
, in_limit
, thr
->outbuf
->buf
,
297 out_pos
, out_size
, action
);
298 } while (ret
== LZMA_OK
&& *out_pos
< out_size
);
301 case LZMA_STREAM_END
:
302 assert(state
== THR_FINISH
);
304 // Encode the Block Header. By doing it after
305 // the compression, we can store the Compressed Size
306 // and Uncompressed Size fields.
307 ret
= lzma_block_header_encode(&thr
->block_options
,
309 if (ret
!= LZMA_OK
) {
310 worker_error(thr
, ret
);
317 // The data was incompressible. Encode it using uncompressed
320 // First wait that we have gotten all the input.
321 mythread_sync(thr
->mutex
) {
322 while (thr
->state
== THR_RUN
)
323 mythread_cond_wait(&thr
->cond
, &thr
->mutex
);
326 in_size
= thr
->in_size
;
329 if (state
>= THR_STOP
)
332 // Do the encoding. This takes care of the Block Header too.
334 ret
= lzma_block_uncomp_encode(&thr
->block_options
,
335 thr
->in
, in_size
, thr
->outbuf
->buf
,
338 // It shouldn't fail.
339 if (ret
!= LZMA_OK
) {
340 worker_error(thr
, LZMA_PROG_ERROR
);
347 worker_error(thr
, ret
);
351 // Set the size information that will be read by the main thread
352 // to write the Index field.
353 thr
->outbuf
->unpadded_size
354 = lzma_block_unpadded_size(&thr
->block_options
);
355 assert(thr
->outbuf
->unpadded_size
!= 0);
356 thr
->outbuf
->uncompressed_size
= thr
->block_options
.uncompressed_size
;
362 static MYTHREAD_RET_TYPE
363 worker_start(void *thr_ptr
)
365 worker_thread
*thr
= thr_ptr
;
366 worker_state state
= THR_IDLE
; // Init to silence a warning
370 mythread_sync(thr
->mutex
) {
372 // The thread is already idle so if we are
373 // requested to stop, just set the state.
374 if (thr
->state
== THR_STOP
) {
375 thr
->state
= THR_IDLE
;
376 mythread_cond_signal(&thr
->cond
);
380 if (state
!= THR_IDLE
)
383 mythread_cond_wait(&thr
->cond
, &thr
->mutex
);
389 assert(state
!= THR_IDLE
);
390 assert(state
!= THR_STOP
);
392 if (state
<= THR_FINISH
)
393 state
= worker_encode(thr
, &out_pos
, state
);
395 if (state
== THR_EXIT
)
398 // Mark the thread as idle unless the main thread has
399 // told us to exit. Signal is needed for the case
400 // where the main thread is waiting for the threads to stop.
401 mythread_sync(thr
->mutex
) {
402 if (thr
->state
!= THR_EXIT
) {
403 thr
->state
= THR_IDLE
;
404 mythread_cond_signal(&thr
->cond
);
408 mythread_sync(thr
->coder
->mutex
) {
409 // If no errors occurred, make the encoded data
410 // available to be copied out.
411 if (state
== THR_FINISH
) {
412 thr
->outbuf
->pos
= out_pos
;
413 thr
->outbuf
->finished
= true;
416 // Update the main progress info.
417 thr
->coder
->progress_in
418 += thr
->outbuf
->uncompressed_size
;
419 thr
->coder
->progress_out
+= out_pos
;
420 thr
->progress_in
= 0;
421 thr
->progress_out
= 0;
423 // Return this thread to the stack of free threads.
424 thr
->next
= thr
->coder
->threads_free
;
425 thr
->coder
->threads_free
= thr
;
427 mythread_cond_signal(&thr
->coder
->cond
);
431 // Exiting, free the resources.
432 lzma_filters_free(thr
->filters
, thr
->allocator
);
434 mythread_mutex_destroy(&thr
->mutex
);
435 mythread_cond_destroy(&thr
->cond
);
437 lzma_next_end(&thr
->block_encoder
, thr
->allocator
);
438 lzma_free(thr
->in
, thr
->allocator
);
439 return MYTHREAD_RET_VALUE
;
443 /// Make the threads stop but not exit. Optionally wait for them to stop.
445 threads_stop(lzma_stream_coder
*coder
, bool wait_for_threads
)
447 // Tell the threads to stop.
448 for (uint32_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
449 mythread_sync(coder
->threads
[i
].mutex
) {
450 coder
->threads
[i
].state
= THR_STOP
;
451 mythread_cond_signal(&coder
->threads
[i
].cond
);
455 if (!wait_for_threads
)
458 // Wait for the threads to settle in the idle state.
459 for (uint32_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
460 mythread_sync(coder
->threads
[i
].mutex
) {
461 while (coder
->threads
[i
].state
!= THR_IDLE
)
462 mythread_cond_wait(&coder
->threads
[i
].cond
,
463 &coder
->threads
[i
].mutex
);
471 /// Stop the threads and free the resources associated with them.
472 /// Wait until the threads have exited.
474 threads_end(lzma_stream_coder
*coder
, const lzma_allocator
*allocator
)
476 for (uint32_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
477 mythread_sync(coder
->threads
[i
].mutex
) {
478 coder
->threads
[i
].state
= THR_EXIT
;
479 mythread_cond_signal(&coder
->threads
[i
].cond
);
483 for (uint32_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
484 int ret
= mythread_join(coder
->threads
[i
].thread_id
);
489 lzma_free(coder
->threads
, allocator
);
494 /// Initialize a new worker_thread structure and create a new thread.
496 initialize_new_thread(lzma_stream_coder
*coder
,
497 const lzma_allocator
*allocator
)
499 worker_thread
*thr
= &coder
->threads
[coder
->threads_initialized
];
501 thr
->in
= lzma_alloc(coder
->block_size
, allocator
);
503 return LZMA_MEM_ERROR
;
505 if (mythread_mutex_init(&thr
->mutex
))
508 if (mythread_cond_init(&thr
->cond
))
511 thr
->state
= THR_IDLE
;
512 thr
->allocator
= allocator
;
514 thr
->progress_in
= 0;
515 thr
->progress_out
= 0;
516 thr
->block_encoder
= LZMA_NEXT_CODER_INIT
;
517 thr
->filters
[0].id
= LZMA_VLI_UNKNOWN
;
519 if (mythread_create(&thr
->thread_id
, &worker_start
, thr
))
522 ++coder
->threads_initialized
;
528 mythread_cond_destroy(&thr
->cond
);
531 mythread_mutex_destroy(&thr
->mutex
);
534 lzma_free(thr
->in
, allocator
);
535 return LZMA_MEM_ERROR
;
540 get_thread(lzma_stream_coder
*coder
, const lzma_allocator
*allocator
)
542 // If there are no free output subqueues, there is no
543 // point to try getting a thread.
544 if (!lzma_outq_has_buf(&coder
->outq
))
547 // That's also true if we cannot allocate memory for the output
548 // buffer in the output queue.
549 return_if_error(lzma_outq_prealloc_buf(&coder
->outq
, allocator
,
550 coder
->outbuf_alloc_size
));
552 // Make a thread-specific copy of the filter chain. Put it in
553 // the cache array first so that if we cannot get a new thread yet,
554 // the allocation is ready when we try again.
555 if (coder
->filters_cache
[0].id
== LZMA_VLI_UNKNOWN
)
556 return_if_error(lzma_filters_copy(
557 coder
->filters
, coder
->filters_cache
, allocator
));
559 // If there is a free structure on the stack, use it.
560 mythread_sync(coder
->mutex
) {
561 if (coder
->threads_free
!= NULL
) {
562 coder
->thr
= coder
->threads_free
;
563 coder
->threads_free
= coder
->threads_free
->next
;
567 if (coder
->thr
== NULL
) {
568 // If there are no uninitialized structures left, return.
569 if (coder
->threads_initialized
== coder
->threads_max
)
572 // Initialize a new thread.
573 return_if_error(initialize_new_thread(coder
, allocator
));
576 // Reset the parts of the thread state that have to be done
577 // in the main thread.
578 mythread_sync(coder
->thr
->mutex
) {
579 coder
->thr
->state
= THR_RUN
;
580 coder
->thr
->in_size
= 0;
581 coder
->thr
->outbuf
= lzma_outq_get_buf(&coder
->outq
, NULL
);
583 // Free the old thread-specific filter options and replace
584 // them with the already-allocated new options from
585 // coder->filters_cache[]. Then mark the cache as empty.
586 lzma_filters_free(coder
->thr
->filters
, allocator
);
587 memcpy(coder
->thr
->filters
, coder
->filters_cache
,
588 sizeof(coder
->filters_cache
));
589 coder
->filters_cache
[0].id
= LZMA_VLI_UNKNOWN
;
591 mythread_cond_signal(&coder
->thr
->cond
);
599 stream_encode_in(lzma_stream_coder
*coder
, const lzma_allocator
*allocator
,
600 const uint8_t *restrict in
, size_t *restrict in_pos
,
601 size_t in_size
, lzma_action action
)
603 while (*in_pos
< in_size
604 || (coder
->thr
!= NULL
&& action
!= LZMA_RUN
)) {
605 if (coder
->thr
== NULL
) {
607 const lzma_ret ret
= get_thread(coder
, allocator
);
608 if (coder
->thr
== NULL
)
612 // Copy the input data to thread's buffer.
613 size_t thr_in_size
= coder
->thr
->in_size
;
614 lzma_bufcpy(in
, in_pos
, in_size
, coder
->thr
->in
,
615 &thr_in_size
, coder
->block_size
);
617 // Tell the Block encoder to finish if
618 // - it has got block_size bytes of input; or
619 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
620 // or LZMA_FULL_BARRIER was used.
622 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
623 const bool finish
= thr_in_size
== coder
->block_size
624 || (*in_pos
== in_size
&& action
!= LZMA_RUN
);
626 bool block_error
= false;
628 mythread_sync(coder
->thr
->mutex
) {
629 if (coder
->thr
->state
== THR_IDLE
) {
630 // Something has gone wrong with the Block
631 // encoder. It has set coder->thread_error
632 // which we will read a few lines later.
635 // Tell the Block encoder its new amount
636 // of input and update the state if needed.
637 coder
->thr
->in_size
= thr_in_size
;
640 coder
->thr
->state
= THR_FINISH
;
642 mythread_cond_signal(&coder
->thr
->cond
);
647 lzma_ret ret
= LZMA_OK
; // Init to silence a warning.
649 mythread_sync(coder
->mutex
) {
650 ret
= coder
->thread_error
;
664 /// Wait until more input can be consumed, more output can be read, or
665 /// an optional timeout is reached.
667 wait_for_work(lzma_stream_coder
*coder
, mythread_condtime
*wait_abs
,
668 bool *has_blocked
, bool has_input
)
670 if (coder
->timeout
!= 0 && !*has_blocked
) {
671 // Every time when stream_encode_mt() is called via
672 // lzma_code(), *has_blocked starts as false. We set it
673 // to true here and calculate the absolute time when
674 // we must return if there's nothing to do.
676 // This way if we block multiple times for short moments
677 // less than "timeout" milliseconds, we will return once
678 // "timeout" amount of time has passed since the *first*
679 // blocking occurred. If the absolute time was calculated
680 // again every time we block, "timeout" would effectively
681 // be meaningless if we never consecutively block longer
682 // than "timeout" ms.
684 mythread_condtime_set(wait_abs
, &coder
->cond
, coder
->timeout
);
687 bool timed_out
= false;
689 mythread_sync(coder
->mutex
) {
690 // There are four things that we wait. If one of them
691 // becomes possible, we return.
692 // - If there is input left, we need to get a free
693 // worker thread and an output buffer for it.
694 // - Data ready to be read from the output queue.
695 // - A worker thread indicates an error.
696 // - Time out occurs.
697 while ((!has_input
|| coder
->threads_free
== NULL
698 || !lzma_outq_has_buf(&coder
->outq
))
699 && !lzma_outq_is_readable(&coder
->outq
)
700 && coder
->thread_error
== LZMA_OK
702 if (coder
->timeout
!= 0)
703 timed_out
= mythread_cond_timedwait(
704 &coder
->cond
, &coder
->mutex
,
707 mythread_cond_wait(&coder
->cond
,
717 stream_encode_mt(void *coder_ptr
, const lzma_allocator
*allocator
,
718 const uint8_t *restrict in
, size_t *restrict in_pos
,
719 size_t in_size
, uint8_t *restrict out
,
720 size_t *restrict out_pos
, size_t out_size
, lzma_action action
)
722 lzma_stream_coder
*coder
= coder_ptr
;
724 switch (coder
->sequence
) {
725 case SEQ_STREAM_HEADER
:
726 lzma_bufcpy(coder
->header
, &coder
->header_pos
,
727 sizeof(coder
->header
),
728 out
, out_pos
, out_size
);
729 if (coder
->header_pos
< sizeof(coder
->header
))
732 coder
->header_pos
= 0;
733 coder
->sequence
= SEQ_BLOCK
;
738 // Initialized to silence warnings.
739 lzma_vli unpadded_size
= 0;
740 lzma_vli uncompressed_size
= 0;
741 lzma_ret ret
= LZMA_OK
;
743 // These are for wait_for_work().
744 bool has_blocked
= false;
745 mythread_condtime wait_abs
= { 0 };
748 mythread_sync(coder
->mutex
) {
749 // Check for Block encoder errors.
750 ret
= coder
->thread_error
;
751 if (ret
!= LZMA_OK
) {
752 assert(ret
!= LZMA_STREAM_END
);
753 break; // Break out of mythread_sync.
756 // Try to read compressed data to out[].
757 ret
= lzma_outq_read(&coder
->outq
, allocator
,
758 out
, out_pos
, out_size
,
763 if (ret
== LZMA_STREAM_END
) {
764 // End of Block. Add it to the Index.
765 ret
= lzma_index_append(coder
->index
,
766 allocator
, unpadded_size
,
768 if (ret
!= LZMA_OK
) {
769 threads_stop(coder
, false);
773 // If we didn't fill the output buffer yet,
774 // try to read more data. Maybe the next
775 // outbuf has been finished already too.
776 if (*out_pos
< out_size
)
780 if (ret
!= LZMA_OK
) {
781 // coder->thread_error was set.
782 threads_stop(coder
, false);
786 // Try to give uncompressed data to a worker thread.
787 ret
= stream_encode_in(coder
, allocator
,
788 in
, in_pos
, in_size
, action
);
789 if (ret
!= LZMA_OK
) {
790 threads_stop(coder
, false);
794 // See if we should wait or return.
796 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
797 if (*in_pos
== in_size
) {
798 // LZMA_RUN: More data is probably coming
799 // so return to let the caller fill the
801 if (action
== LZMA_RUN
)
804 // LZMA_FULL_BARRIER: The same as with
805 // LZMA_RUN but tell the caller that the
806 // barrier was completed.
807 if (action
== LZMA_FULL_BARRIER
)
808 return LZMA_STREAM_END
;
810 // Finishing or flushing isn't completed until
811 // all input data has been encoded and copied
812 // to the output buffer.
813 if (lzma_outq_is_empty(&coder
->outq
)) {
814 // LZMA_FINISH: Continue to encode
816 if (action
== LZMA_FINISH
)
819 // LZMA_FULL_FLUSH: Return to tell
820 // the caller that flushing was
822 if (action
== LZMA_FULL_FLUSH
)
823 return LZMA_STREAM_END
;
827 // Return if there is no output space left.
828 // This check must be done after testing the input
829 // buffer, because we might want to use a different
831 if (*out_pos
== out_size
)
834 // Neither in nor out has been used completely.
835 // Wait until there's something we can do.
836 if (wait_for_work(coder
, &wait_abs
, &has_blocked
,
838 return LZMA_TIMED_OUT
;
841 // All Blocks have been encoded and the threads have stopped.
842 // Prepare to encode the Index field.
843 return_if_error(lzma_index_encoder_init(
844 &coder
->index_encoder
, allocator
,
846 coder
->sequence
= SEQ_INDEX
;
848 // Update the progress info to take the Index and
849 // Stream Footer into account. Those are very fast to encode
850 // so in terms of progress information they can be thought
851 // to be ready to be copied out.
852 coder
->progress_out
+= lzma_index_size(coder
->index
)
853 + LZMA_STREAM_HEADER_SIZE
;
859 // Call the Index encoder. It doesn't take any input, so
860 // those pointers can be NULL.
861 const lzma_ret ret
= coder
->index_encoder
.code(
862 coder
->index_encoder
.coder
, allocator
,
864 out
, out_pos
, out_size
, LZMA_RUN
);
865 if (ret
!= LZMA_STREAM_END
)
868 // Encode the Stream Footer into coder->buffer.
869 coder
->stream_flags
.backward_size
870 = lzma_index_size(coder
->index
);
871 if (lzma_stream_footer_encode(&coder
->stream_flags
,
872 coder
->header
) != LZMA_OK
)
873 return LZMA_PROG_ERROR
;
875 coder
->sequence
= SEQ_STREAM_FOOTER
;
880 case SEQ_STREAM_FOOTER
:
881 lzma_bufcpy(coder
->header
, &coder
->header_pos
,
882 sizeof(coder
->header
),
883 out
, out_pos
, out_size
);
884 return coder
->header_pos
< sizeof(coder
->header
)
885 ? LZMA_OK
: LZMA_STREAM_END
;
889 return LZMA_PROG_ERROR
;
894 stream_encoder_mt_end(void *coder_ptr
, const lzma_allocator
*allocator
)
896 lzma_stream_coder
*coder
= coder_ptr
;
898 // Threads must be killed before the output queue can be freed.
899 threads_end(coder
, allocator
);
900 lzma_outq_end(&coder
->outq
, allocator
);
902 lzma_filters_free(coder
->filters
, allocator
);
903 lzma_filters_free(coder
->filters_cache
, allocator
);
905 lzma_next_end(&coder
->index_encoder
, allocator
);
906 lzma_index_end(coder
->index
, allocator
);
908 mythread_cond_destroy(&coder
->cond
);
909 mythread_mutex_destroy(&coder
->mutex
);
911 lzma_free(coder
, allocator
);
917 stream_encoder_mt_update(void *coder_ptr
, const lzma_allocator
*allocator
,
918 const lzma_filter
*filters
,
919 const lzma_filter
*reversed_filters
920 lzma_attribute((__unused__
)))
922 lzma_stream_coder
*coder
= coder_ptr
;
924 // Applications shouldn't attempt to change the options when
925 // we are already encoding the Index or Stream Footer.
926 if (coder
->sequence
> SEQ_BLOCK
)
927 return LZMA_PROG_ERROR
;
929 // For now the threaded encoder doesn't support changing
930 // the options in the middle of a Block.
931 if (coder
->thr
!= NULL
)
932 return LZMA_PROG_ERROR
;
934 // Check if the filter chain seems mostly valid. See the comment
935 // in stream_encoder_mt_init().
936 if (lzma_raw_encoder_memusage(filters
) == UINT64_MAX
)
937 return LZMA_OPTIONS_ERROR
;
939 // Make a copy to a temporary buffer first. This way the encoder
940 // state stays unchanged if an error occurs in lzma_filters_copy().
941 lzma_filter temp
[LZMA_FILTERS_MAX
+ 1];
942 return_if_error(lzma_filters_copy(filters
, temp
, allocator
));
944 // Free the options of the old chain as well as the cache.
945 lzma_filters_free(coder
->filters
, allocator
);
946 lzma_filters_free(coder
->filters_cache
, allocator
);
948 // Copy the new filter chain in place.
949 memcpy(coder
->filters
, temp
, sizeof(temp
));
955 /// Options handling for lzma_stream_encoder_mt_init() and
956 /// lzma_stream_encoder_mt_memusage()
958 get_options(const lzma_mt
*options
, lzma_options_easy
*opt_easy
,
959 const lzma_filter
**filters
, uint64_t *block_size
,
960 uint64_t *outbuf_size_max
)
962 // Validate some of the options.
964 return LZMA_PROG_ERROR
;
966 if (options
->flags
!= 0 || options
->threads
== 0
967 || options
->threads
> LZMA_THREADS_MAX
)
968 return LZMA_OPTIONS_ERROR
;
970 if (options
->filters
!= NULL
) {
971 // Filter chain was given, use it as is.
972 *filters
= options
->filters
;
975 if (lzma_easy_preset(opt_easy
, options
->preset
))
976 return LZMA_OPTIONS_ERROR
;
978 *filters
= opt_easy
->filters
;
981 // If the Block size is not set, determine it from the filter chain.
982 if (options
->block_size
> 0)
983 *block_size
= options
->block_size
;
985 *block_size
= lzma_mt_block_size(*filters
);
987 // UINT64_MAX > BLOCK_SIZE_MAX, so the second condition
988 // should be optimized out by any reasonable compiler.
989 // The second condition should be there in the unlikely event that
990 // the macros change and UINT64_MAX < BLOCK_SIZE_MAX.
991 if (*block_size
> BLOCK_SIZE_MAX
|| *block_size
== UINT64_MAX
)
992 return LZMA_OPTIONS_ERROR
;
994 // Calculate the maximum amount output that a single output buffer
995 // may need to hold. This is the same as the maximum total size of
997 *outbuf_size_max
= lzma_block_buffer_bound64(*block_size
);
998 if (*outbuf_size_max
== 0)
999 return LZMA_MEM_ERROR
;
1006 get_progress(void *coder_ptr
, uint64_t *progress_in
, uint64_t *progress_out
)
1008 lzma_stream_coder
*coder
= coder_ptr
;
1010 // Lock coder->mutex to prevent finishing threads from moving their
1011 // progress info from the worker_thread structure to lzma_stream_coder.
1012 mythread_sync(coder
->mutex
) {
1013 *progress_in
= coder
->progress_in
;
1014 *progress_out
= coder
->progress_out
;
1016 for (size_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
1017 mythread_sync(coder
->threads
[i
].mutex
) {
1018 *progress_in
+= coder
->threads
[i
].progress_in
;
1019 *progress_out
+= coder
->threads
[i
]
1030 stream_encoder_mt_init(lzma_next_coder
*next
, const lzma_allocator
*allocator
,
1031 const lzma_mt
*options
)
1033 lzma_next_coder_init(&stream_encoder_mt_init
, next
, allocator
);
1035 // Get the filter chain.
1036 lzma_options_easy easy
;
1037 const lzma_filter
*filters
;
1038 uint64_t block_size
;
1039 uint64_t outbuf_size_max
;
1040 return_if_error(get_options(options
, &easy
, &filters
,
1041 &block_size
, &outbuf_size_max
));
1043 #if SIZE_MAX < UINT64_MAX
1044 if (block_size
> SIZE_MAX
|| outbuf_size_max
> SIZE_MAX
)
1045 return LZMA_MEM_ERROR
;
1048 // Validate the filter chain so that we can give an error in this
1049 // function instead of delaying it to the first call to lzma_code().
1050 // The memory usage calculation verifies the filter chain as
1051 // a side effect so we take advantage of that. It's not a perfect
1052 // check though as raw encoder allows LZMA1 too but such problems
1053 // will be caught eventually with Block Header encoder.
1054 if (lzma_raw_encoder_memusage(filters
) == UINT64_MAX
)
1055 return LZMA_OPTIONS_ERROR
;
1057 // Validate the Check ID.
1058 if ((unsigned int)(options
->check
) > LZMA_CHECK_ID_MAX
)
1059 return LZMA_PROG_ERROR
;
1061 if (!lzma_check_is_supported(options
->check
))
1062 return LZMA_UNSUPPORTED_CHECK
;
1064 // Allocate and initialize the base structure if needed.
1065 lzma_stream_coder
*coder
= next
->coder
;
1066 if (coder
== NULL
) {
1067 coder
= lzma_alloc(sizeof(lzma_stream_coder
), allocator
);
1069 return LZMA_MEM_ERROR
;
1071 next
->coder
= coder
;
1073 // For the mutex and condition variable initializations
1074 // the error handling has to be done here because
1075 // stream_encoder_mt_end() doesn't know if they have
1076 // already been initialized or not.
1077 if (mythread_mutex_init(&coder
->mutex
)) {
1078 lzma_free(coder
, allocator
);
1080 return LZMA_MEM_ERROR
;
1083 if (mythread_cond_init(&coder
->cond
)) {
1084 mythread_mutex_destroy(&coder
->mutex
);
1085 lzma_free(coder
, allocator
);
1087 return LZMA_MEM_ERROR
;
1090 next
->code
= &stream_encode_mt
;
1091 next
->end
= &stream_encoder_mt_end
;
1092 next
->get_progress
= &get_progress
;
1093 next
->update
= &stream_encoder_mt_update
;
1095 coder
->filters
[0].id
= LZMA_VLI_UNKNOWN
;
1096 coder
->filters_cache
[0].id
= LZMA_VLI_UNKNOWN
;
1097 coder
->index_encoder
= LZMA_NEXT_CODER_INIT
;
1098 coder
->index
= NULL
;
1099 memzero(&coder
->outq
, sizeof(coder
->outq
));
1100 coder
->threads
= NULL
;
1101 coder
->threads_max
= 0;
1102 coder
->threads_initialized
= 0;
1105 // Basic initializations
1106 coder
->sequence
= SEQ_STREAM_HEADER
;
1107 coder
->block_size
= (size_t)(block_size
);
1108 coder
->outbuf_alloc_size
= (size_t)(outbuf_size_max
);
1109 coder
->thread_error
= LZMA_OK
;
1112 // Allocate the thread-specific base structures.
1113 assert(options
->threads
> 0);
1114 if (coder
->threads_max
!= options
->threads
) {
1115 threads_end(coder
, allocator
);
1117 coder
->threads
= NULL
;
1118 coder
->threads_max
= 0;
1120 coder
->threads_initialized
= 0;
1121 coder
->threads_free
= NULL
;
1123 coder
->threads
= lzma_alloc(
1124 options
->threads
* sizeof(worker_thread
),
1126 if (coder
->threads
== NULL
)
1127 return LZMA_MEM_ERROR
;
1129 coder
->threads_max
= options
->threads
;
1131 // Reuse the old structures and threads. Tell the running
1132 // threads to stop and wait until they have stopped.
1133 threads_stop(coder
, true);
1137 return_if_error(lzma_outq_init(&coder
->outq
, allocator
,
1141 coder
->timeout
= options
->timeout
;
1143 // Free the old filter chain and the cache.
1144 lzma_filters_free(coder
->filters
, allocator
);
1145 lzma_filters_free(coder
->filters_cache
, allocator
);
1147 // Copy the new filter chain.
1148 return_if_error(lzma_filters_copy(
1149 filters
, coder
->filters
, allocator
));
1152 lzma_index_end(coder
->index
, allocator
);
1153 coder
->index
= lzma_index_init(allocator
);
1154 if (coder
->index
== NULL
)
1155 return LZMA_MEM_ERROR
;
1158 coder
->stream_flags
.version
= 0;
1159 coder
->stream_flags
.check
= options
->check
;
1160 return_if_error(lzma_stream_header_encode(
1161 &coder
->stream_flags
, coder
->header
));
1163 coder
->header_pos
= 0;
1166 coder
->progress_in
= 0;
1167 coder
->progress_out
= LZMA_STREAM_HEADER_SIZE
;
1173 #ifdef HAVE_SYMBOL_VERSIONS_LINUX
1174 // These are for compatibility with binaries linked against liblzma that
1175 // has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7.
1176 // Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2
1177 // but it has been added here anyway since someone might misread the
1178 // RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist.
1179 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha",
1180 lzma_ret
, lzma_stream_encoder_mt_512a
)(
1181 lzma_stream
*strm
, const lzma_mt
*options
)
1182 lzma_nothrow lzma_attr_warn_unused_result
1183 __attribute__((__alias__("lzma_stream_encoder_mt_52")));
1185 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2",
1186 lzma_ret
, lzma_stream_encoder_mt_522
)(
1187 lzma_stream
*strm
, const lzma_mt
*options
)
1188 lzma_nothrow lzma_attr_warn_unused_result
1189 __attribute__((__alias__("lzma_stream_encoder_mt_52")));
1191 LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2",
1192 lzma_ret
, lzma_stream_encoder_mt_52
)(
1193 lzma_stream
*strm
, const lzma_mt
*options
)
1194 lzma_nothrow lzma_attr_warn_unused_result
;
1196 #define lzma_stream_encoder_mt lzma_stream_encoder_mt_52
1198 extern LZMA_API(lzma_ret
)
1199 lzma_stream_encoder_mt(lzma_stream
*strm
, const lzma_mt
*options
)
1201 lzma_next_strm_init(stream_encoder_mt_init
, strm
, options
);
1203 strm
->internal
->supported_actions
[LZMA_RUN
] = true;
1204 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1205 strm
->internal
->supported_actions
[LZMA_FULL_FLUSH
] = true;
1206 strm
->internal
->supported_actions
[LZMA_FULL_BARRIER
] = true;
1207 strm
->internal
->supported_actions
[LZMA_FINISH
] = true;
1213 #ifdef HAVE_SYMBOL_VERSIONS_LINUX
1214 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha",
1215 uint64_t, lzma_stream_encoder_mt_memusage_512a
)(
1216 const lzma_mt
*options
) lzma_nothrow lzma_attr_pure
1217 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1219 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2",
1220 uint64_t, lzma_stream_encoder_mt_memusage_522
)(
1221 const lzma_mt
*options
) lzma_nothrow lzma_attr_pure
1222 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1224 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2",
1225 uint64_t, lzma_stream_encoder_mt_memusage_52
)(
1226 const lzma_mt
*options
) lzma_nothrow lzma_attr_pure
;
1228 #define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52
1230 // This function name is a monster but it's consistent with the older
1231 // monster names. :-( 31 chars is the max that C99 requires so in that
1232 // sense it's not too long. ;-)
1233 extern LZMA_API(uint64_t)
1234 lzma_stream_encoder_mt_memusage(const lzma_mt
*options
)
1236 lzma_options_easy easy
;
1237 const lzma_filter
*filters
;
1238 uint64_t block_size
;
1239 uint64_t outbuf_size_max
;
1241 if (get_options(options
, &easy
, &filters
, &block_size
,
1242 &outbuf_size_max
) != LZMA_OK
)
1245 // Memory usage of the input buffers
1246 const uint64_t inbuf_memusage
= options
->threads
* block_size
;
1248 // Memory usage of the filter encoders
1249 uint64_t filters_memusage
= lzma_raw_encoder_memusage(filters
);
1250 if (filters_memusage
== UINT64_MAX
)
1253 filters_memusage
*= options
->threads
;
1255 // Memory usage of the output queue
1256 const uint64_t outq_memusage
= lzma_outq_memusage(
1257 outbuf_size_max
, options
->threads
);
1258 if (outq_memusage
== UINT64_MAX
)
1261 // Sum them with overflow checking.
1262 uint64_t total_memusage
= LZMA_MEMUSAGE_BASE
1263 + sizeof(lzma_stream_coder
)
1264 + options
->threads
* sizeof(worker_thread
);
1266 if (UINT64_MAX
- total_memusage
< inbuf_memusage
)
1269 total_memusage
+= inbuf_memusage
;
1271 if (UINT64_MAX
- total_memusage
< filters_memusage
)
1274 total_memusage
+= filters_memusage
;
1276 if (UINT64_MAX
- total_memusage
< outq_memusage
)
1279 return total_memusage
+ outq_memusage
;