1 ///////////////////////////////////////////////////////////////////////////////
3 /// \file stream_encoder_mt.c
4 /// \brief Multithreaded .xz Stream encoder
6 // Author: Lasse Collin
8 // This file has been put into the public domain.
9 // You can do whatever you want with this file.
11 ///////////////////////////////////////////////////////////////////////////////
13 #include "filter_encoder.h"
14 #include "easy_preset.h"
15 #include "block_encoder.h"
16 #include "block_buffer_encoder.h"
17 #include "index_encoder.h"
21 /// Maximum supported block size. This makes it simpler to prevent integer
22 /// overflows if we are given unusually large block size.
23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
30 /// Encoding is in progress.
33 /// Encoding is in progress but no more input data will
37 /// The main thread wants the thread to stop whatever it was doing
41 /// The main thread wants the thread to exit. We could use
42 /// cancellation but since there's stopped anyway, this is lazier.
47 typedef struct lzma_stream_coder_s lzma_stream_coder
;
49 typedef struct worker_thread_s worker_thread
;
50 struct worker_thread_s
{
53 /// Input buffer of coder->block_size bytes. The main thread will
54 /// put new input into this and update in_size accordingly. Once
55 /// no more input is coming, state will be set to THR_FINISH.
58 /// Amount of data available in the input buffer. This is modified
59 /// only by the main thread.
62 /// Output buffer for this thread. This is set by the main
63 /// thread every time a new Block is started with this thread
67 /// Pointer to the main structure is needed when putting this
68 /// thread back to the stack of free threads.
69 lzma_stream_coder
*coder
;
71 /// The allocator is set by the main thread. Since a copy of the
72 /// pointer is kept here, the application must not change the
73 /// allocator before calling lzma_end().
74 const lzma_allocator
*allocator
;
76 /// Amount of uncompressed data that has already been compressed.
79 /// Amount of compressed data that is ready.
80 uint64_t progress_out
;
83 lzma_next_coder block_encoder
;
85 /// Compression options for this Block
86 lzma_block block_options
;
88 /// Filter chain for this thread. By copying the filters array
89 /// to each thread it is possible to change the filter chain
90 /// between Blocks using lzma_filters_update().
91 lzma_filter filters
[LZMA_FILTERS_MAX
+ 1];
93 /// Next structure in the stack of free worker threads.
99 /// The ID of this thread is used to join the thread
100 /// when it's not needed anymore.
105 struct lzma_stream_coder_s
{
113 /// Start a new Block every block_size bytes of input unless
114 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
117 /// The filter chain to use for the next Block.
118 /// This can be updated using lzma_filters_update()
119 /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH.
120 lzma_filter filters
[LZMA_FILTERS_MAX
+ 1];
122 /// A copy of filters[] will be put here when attempting to get
123 /// a new worker thread. This will be copied to a worker thread
124 /// when a thread becomes free and then this cache is marked as
125 /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache
126 /// the filter options from filters[] would get uselessly copied
127 /// multiple times (allocated and freed) when waiting for a new free
130 /// This is freed if filters[] is updated via lzma_filters_update().
131 lzma_filter filters_cache
[LZMA_FILTERS_MAX
+ 1];
134 /// Index to hold sizes of the Blocks
138 lzma_next_coder index_encoder
;
141 /// Stream Flags for encoding the Stream Header and Stream Footer.
142 lzma_stream_flags stream_flags
;
144 /// Buffer to hold Stream Header and Stream Footer.
145 uint8_t header
[LZMA_STREAM_HEADER_SIZE
];
147 /// Read position in header[]
151 /// Output buffer queue for compressed data
154 /// How much memory to allocate for each lzma_outbuf.buf
155 size_t outbuf_alloc_size
;
158 /// Maximum wait time if cannot use all the input and cannot
159 /// fill the output buffer. This is in milliseconds.
163 /// Error code from a worker thread
164 lzma_ret thread_error
;
166 /// Array of allocated thread-specific structures
167 worker_thread
*threads
;
169 /// Number of structures in "threads" above. This is also the
170 /// number of threads that will be created at maximum.
171 uint32_t threads_max
;
173 /// Number of thread structures that have been initialized, and
174 /// thus the number of worker threads actually created so far.
175 uint32_t threads_initialized
;
177 /// Stack of free threads. When a thread finishes, it puts itself
178 /// back into this stack. This starts as empty because threads
179 /// are created only when actually needed.
180 worker_thread
*threads_free
;
182 /// The most recent worker thread to which the main thread writes
183 /// the new input from the application.
187 /// Amount of uncompressed data in Blocks that have already
189 uint64_t progress_in
;
191 /// Amount of compressed data in Stream Header + Blocks that
192 /// have already been finished.
193 uint64_t progress_out
;
196 mythread_mutex mutex
;
201 /// Tell the main thread that something has gone wrong.
203 worker_error(worker_thread
*thr
, lzma_ret ret
)
205 assert(ret
!= LZMA_OK
);
206 assert(ret
!= LZMA_STREAM_END
);
208 mythread_sync(thr
->coder
->mutex
) {
209 if (thr
->coder
->thread_error
== LZMA_OK
)
210 thr
->coder
->thread_error
= ret
;
212 mythread_cond_signal(&thr
->coder
->cond
);
220 worker_encode(worker_thread
*thr
, size_t *out_pos
, worker_state state
)
222 assert(thr
->progress_in
== 0);
223 assert(thr
->progress_out
== 0);
225 // Set the Block options.
226 thr
->block_options
= (lzma_block
){
228 .check
= thr
->coder
->stream_flags
.check
,
229 .compressed_size
= thr
->outbuf
->allocated
,
230 .uncompressed_size
= thr
->coder
->block_size
,
231 .filters
= thr
->filters
,
234 // Calculate maximum size of the Block Header. This amount is
235 // reserved in the beginning of the buffer so that Block Header
236 // along with Compressed Size and Uncompressed Size can be
238 lzma_ret ret
= lzma_block_header_size(&thr
->block_options
);
239 if (ret
!= LZMA_OK
) {
240 worker_error(thr
, ret
);
244 // Initialize the Block encoder.
245 ret
= lzma_block_encoder_init(&thr
->block_encoder
,
246 thr
->allocator
, &thr
->block_options
);
247 if (ret
!= LZMA_OK
) {
248 worker_error(thr
, ret
);
255 *out_pos
= thr
->block_options
.header_size
;
256 const size_t out_size
= thr
->outbuf
->allocated
;
259 mythread_sync(thr
->mutex
) {
260 // Store in_pos and *out_pos into *thr so that
261 // an application may read them via
262 // lzma_get_progress() to get progress information.
264 // NOTE: These aren't updated when the encoding
265 // finishes. Instead, the final values are taken
266 // later from thr->outbuf.
267 thr
->progress_in
= in_pos
;
268 thr
->progress_out
= *out_pos
;
270 while (in_size
== thr
->in_size
271 && thr
->state
== THR_RUN
)
272 mythread_cond_wait(&thr
->cond
, &thr
->mutex
);
275 in_size
= thr
->in_size
;
278 // Return if we were asked to stop or exit.
279 if (state
>= THR_STOP
)
282 lzma_action action
= state
== THR_FINISH
283 ? LZMA_FINISH
: LZMA_RUN
;
285 // Limit the amount of input given to the Block encoder
286 // at once. This way this thread can react fairly quickly
287 // if the main thread wants us to stop or exit.
288 static const size_t in_chunk_max
= 16384;
289 size_t in_limit
= in_size
;
290 if (in_size
- in_pos
> in_chunk_max
) {
291 in_limit
= in_pos
+ in_chunk_max
;
295 ret
= thr
->block_encoder
.code(
296 thr
->block_encoder
.coder
, thr
->allocator
,
297 thr
->in
, &in_pos
, in_limit
, thr
->outbuf
->buf
,
298 out_pos
, out_size
, action
);
299 } while (ret
== LZMA_OK
&& *out_pos
< out_size
);
302 case LZMA_STREAM_END
:
303 assert(state
== THR_FINISH
);
305 // Encode the Block Header. By doing it after
306 // the compression, we can store the Compressed Size
307 // and Uncompressed Size fields.
308 ret
= lzma_block_header_encode(&thr
->block_options
,
310 if (ret
!= LZMA_OK
) {
311 worker_error(thr
, ret
);
318 // The data was incompressible. Encode it using uncompressed
321 // First wait that we have gotten all the input.
322 mythread_sync(thr
->mutex
) {
323 while (thr
->state
== THR_RUN
)
324 mythread_cond_wait(&thr
->cond
, &thr
->mutex
);
327 in_size
= thr
->in_size
;
330 if (state
>= THR_STOP
)
333 // Do the encoding. This takes care of the Block Header too.
335 ret
= lzma_block_uncomp_encode(&thr
->block_options
,
336 thr
->in
, in_size
, thr
->outbuf
->buf
,
339 // It shouldn't fail.
340 if (ret
!= LZMA_OK
) {
341 worker_error(thr
, LZMA_PROG_ERROR
);
348 worker_error(thr
, ret
);
352 // Set the size information that will be read by the main thread
353 // to write the Index field.
354 thr
->outbuf
->unpadded_size
355 = lzma_block_unpadded_size(&thr
->block_options
);
356 assert(thr
->outbuf
->unpadded_size
!= 0);
357 thr
->outbuf
->uncompressed_size
= thr
->block_options
.uncompressed_size
;
363 static MYTHREAD_RET_TYPE
364 worker_start(void *thr_ptr
)
366 worker_thread
*thr
= thr_ptr
;
367 worker_state state
= THR_IDLE
; // Init to silence a warning
371 mythread_sync(thr
->mutex
) {
373 // The thread is already idle so if we are
374 // requested to stop, just set the state.
375 if (thr
->state
== THR_STOP
) {
376 thr
->state
= THR_IDLE
;
377 mythread_cond_signal(&thr
->cond
);
381 if (state
!= THR_IDLE
)
384 mythread_cond_wait(&thr
->cond
, &thr
->mutex
);
390 assert(state
!= THR_IDLE
);
391 assert(state
!= THR_STOP
);
393 if (state
<= THR_FINISH
)
394 state
= worker_encode(thr
, &out_pos
, state
);
396 if (state
== THR_EXIT
)
399 // Mark the thread as idle unless the main thread has
400 // told us to exit. Signal is needed for the case
401 // where the main thread is waiting for the threads to stop.
402 mythread_sync(thr
->mutex
) {
403 if (thr
->state
!= THR_EXIT
) {
404 thr
->state
= THR_IDLE
;
405 mythread_cond_signal(&thr
->cond
);
409 mythread_sync(thr
->coder
->mutex
) {
410 // If no errors occurred, make the encoded data
411 // available to be copied out.
412 if (state
== THR_FINISH
) {
413 thr
->outbuf
->pos
= out_pos
;
414 thr
->outbuf
->finished
= true;
417 // Update the main progress info.
418 thr
->coder
->progress_in
419 += thr
->outbuf
->uncompressed_size
;
420 thr
->coder
->progress_out
+= out_pos
;
421 thr
->progress_in
= 0;
422 thr
->progress_out
= 0;
424 // Return this thread to the stack of free threads.
425 thr
->next
= thr
->coder
->threads_free
;
426 thr
->coder
->threads_free
= thr
;
428 mythread_cond_signal(&thr
->coder
->cond
);
432 // Exiting, free the resources.
433 lzma_filters_free(thr
->filters
, thr
->allocator
);
435 mythread_mutex_destroy(&thr
->mutex
);
436 mythread_cond_destroy(&thr
->cond
);
438 lzma_next_end(&thr
->block_encoder
, thr
->allocator
);
439 lzma_free(thr
->in
, thr
->allocator
);
440 return MYTHREAD_RET_VALUE
;
444 /// Make the threads stop but not exit. Optionally wait for them to stop.
446 threads_stop(lzma_stream_coder
*coder
, bool wait_for_threads
)
448 // Tell the threads to stop.
449 for (uint32_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
450 mythread_sync(coder
->threads
[i
].mutex
) {
451 coder
->threads
[i
].state
= THR_STOP
;
452 mythread_cond_signal(&coder
->threads
[i
].cond
);
456 if (!wait_for_threads
)
459 // Wait for the threads to settle in the idle state.
460 for (uint32_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
461 mythread_sync(coder
->threads
[i
].mutex
) {
462 while (coder
->threads
[i
].state
!= THR_IDLE
)
463 mythread_cond_wait(&coder
->threads
[i
].cond
,
464 &coder
->threads
[i
].mutex
);
472 /// Stop the threads and free the resources associated with them.
473 /// Wait until the threads have exited.
475 threads_end(lzma_stream_coder
*coder
, const lzma_allocator
*allocator
)
477 for (uint32_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
478 mythread_sync(coder
->threads
[i
].mutex
) {
479 coder
->threads
[i
].state
= THR_EXIT
;
480 mythread_cond_signal(&coder
->threads
[i
].cond
);
484 for (uint32_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
485 int ret
= mythread_join(coder
->threads
[i
].thread_id
);
490 lzma_free(coder
->threads
, allocator
);
495 /// Initialize a new worker_thread structure and create a new thread.
497 initialize_new_thread(lzma_stream_coder
*coder
,
498 const lzma_allocator
*allocator
)
500 worker_thread
*thr
= &coder
->threads
[coder
->threads_initialized
];
502 thr
->in
= lzma_alloc(coder
->block_size
, allocator
);
504 return LZMA_MEM_ERROR
;
506 if (mythread_mutex_init(&thr
->mutex
))
509 if (mythread_cond_init(&thr
->cond
))
512 thr
->state
= THR_IDLE
;
513 thr
->allocator
= allocator
;
515 thr
->progress_in
= 0;
516 thr
->progress_out
= 0;
517 thr
->block_encoder
= LZMA_NEXT_CODER_INIT
;
518 thr
->filters
[0].id
= LZMA_VLI_UNKNOWN
;
520 if (mythread_create(&thr
->thread_id
, &worker_start
, thr
))
523 ++coder
->threads_initialized
;
529 mythread_cond_destroy(&thr
->cond
);
532 mythread_mutex_destroy(&thr
->mutex
);
535 lzma_free(thr
->in
, allocator
);
536 return LZMA_MEM_ERROR
;
541 get_thread(lzma_stream_coder
*coder
, const lzma_allocator
*allocator
)
543 // If there are no free output subqueues, there is no
544 // point to try getting a thread.
545 if (!lzma_outq_has_buf(&coder
->outq
))
548 // That's also true if we cannot allocate memory for the output
549 // buffer in the output queue.
550 return_if_error(lzma_outq_prealloc_buf(&coder
->outq
, allocator
,
551 coder
->outbuf_alloc_size
));
553 // Make a thread-specific copy of the filter chain. Put it in
554 // the cache array first so that if we cannot get a new thread yet,
555 // the allocation is ready when we try again.
556 if (coder
->filters_cache
[0].id
== LZMA_VLI_UNKNOWN
)
557 return_if_error(lzma_filters_copy(
558 coder
->filters
, coder
->filters_cache
, allocator
));
560 // If there is a free structure on the stack, use it.
561 mythread_sync(coder
->mutex
) {
562 if (coder
->threads_free
!= NULL
) {
563 coder
->thr
= coder
->threads_free
;
564 coder
->threads_free
= coder
->threads_free
->next
;
568 if (coder
->thr
== NULL
) {
569 // If there are no uninitialized structures left, return.
570 if (coder
->threads_initialized
== coder
->threads_max
)
573 // Initialize a new thread.
574 return_if_error(initialize_new_thread(coder
, allocator
));
577 // Reset the parts of the thread state that have to be done
578 // in the main thread.
579 mythread_sync(coder
->thr
->mutex
) {
580 coder
->thr
->state
= THR_RUN
;
581 coder
->thr
->in_size
= 0;
582 coder
->thr
->outbuf
= lzma_outq_get_buf(&coder
->outq
, NULL
);
584 // Free the old thread-specific filter options and replace
585 // them with the already-allocated new options from
586 // coder->filters_cache[]. Then mark the cache as empty.
587 lzma_filters_free(coder
->thr
->filters
, allocator
);
588 memcpy(coder
->thr
->filters
, coder
->filters_cache
,
589 sizeof(coder
->filters_cache
));
590 coder
->filters_cache
[0].id
= LZMA_VLI_UNKNOWN
;
592 mythread_cond_signal(&coder
->thr
->cond
);
600 stream_encode_in(lzma_stream_coder
*coder
, const lzma_allocator
*allocator
,
601 const uint8_t *restrict in
, size_t *restrict in_pos
,
602 size_t in_size
, lzma_action action
)
604 while (*in_pos
< in_size
605 || (coder
->thr
!= NULL
&& action
!= LZMA_RUN
)) {
606 if (coder
->thr
== NULL
) {
608 const lzma_ret ret
= get_thread(coder
, allocator
);
609 if (coder
->thr
== NULL
)
613 // Copy the input data to thread's buffer.
614 size_t thr_in_size
= coder
->thr
->in_size
;
615 lzma_bufcpy(in
, in_pos
, in_size
, coder
->thr
->in
,
616 &thr_in_size
, coder
->block_size
);
618 // Tell the Block encoder to finish if
619 // - it has got block_size bytes of input; or
620 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
621 // or LZMA_FULL_BARRIER was used.
623 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
624 const bool finish
= thr_in_size
== coder
->block_size
625 || (*in_pos
== in_size
&& action
!= LZMA_RUN
);
627 bool block_error
= false;
629 mythread_sync(coder
->thr
->mutex
) {
630 if (coder
->thr
->state
== THR_IDLE
) {
631 // Something has gone wrong with the Block
632 // encoder. It has set coder->thread_error
633 // which we will read a few lines later.
636 // Tell the Block encoder its new amount
637 // of input and update the state if needed.
638 coder
->thr
->in_size
= thr_in_size
;
641 coder
->thr
->state
= THR_FINISH
;
643 mythread_cond_signal(&coder
->thr
->cond
);
648 lzma_ret ret
= LZMA_OK
; // Init to silence a warning.
650 mythread_sync(coder
->mutex
) {
651 ret
= coder
->thread_error
;
665 /// Wait until more input can be consumed, more output can be read, or
666 /// an optional timeout is reached.
668 wait_for_work(lzma_stream_coder
*coder
, mythread_condtime
*wait_abs
,
669 bool *has_blocked
, bool has_input
)
671 if (coder
->timeout
!= 0 && !*has_blocked
) {
672 // Every time when stream_encode_mt() is called via
673 // lzma_code(), *has_blocked starts as false. We set it
674 // to true here and calculate the absolute time when
675 // we must return if there's nothing to do.
677 // This way if we block multiple times for short moments
678 // less than "timeout" milliseconds, we will return once
679 // "timeout" amount of time has passed since the *first*
680 // blocking occurred. If the absolute time was calculated
681 // again every time we block, "timeout" would effectively
682 // be meaningless if we never consecutively block longer
683 // than "timeout" ms.
685 mythread_condtime_set(wait_abs
, &coder
->cond
, coder
->timeout
);
688 bool timed_out
= false;
690 mythread_sync(coder
->mutex
) {
691 // There are four things that we wait. If one of them
692 // becomes possible, we return.
693 // - If there is input left, we need to get a free
694 // worker thread and an output buffer for it.
695 // - Data ready to be read from the output queue.
696 // - A worker thread indicates an error.
697 // - Time out occurs.
698 while ((!has_input
|| coder
->threads_free
== NULL
699 || !lzma_outq_has_buf(&coder
->outq
))
700 && !lzma_outq_is_readable(&coder
->outq
)
701 && coder
->thread_error
== LZMA_OK
703 if (coder
->timeout
!= 0)
704 timed_out
= mythread_cond_timedwait(
705 &coder
->cond
, &coder
->mutex
,
708 mythread_cond_wait(&coder
->cond
,
718 stream_encode_mt(void *coder_ptr
, const lzma_allocator
*allocator
,
719 const uint8_t *restrict in
, size_t *restrict in_pos
,
720 size_t in_size
, uint8_t *restrict out
,
721 size_t *restrict out_pos
, size_t out_size
, lzma_action action
)
723 lzma_stream_coder
*coder
= coder_ptr
;
725 switch (coder
->sequence
) {
726 case SEQ_STREAM_HEADER
:
727 lzma_bufcpy(coder
->header
, &coder
->header_pos
,
728 sizeof(coder
->header
),
729 out
, out_pos
, out_size
);
730 if (coder
->header_pos
< sizeof(coder
->header
))
733 coder
->header_pos
= 0;
734 coder
->sequence
= SEQ_BLOCK
;
739 // Initialized to silence warnings.
740 lzma_vli unpadded_size
= 0;
741 lzma_vli uncompressed_size
= 0;
742 lzma_ret ret
= LZMA_OK
;
744 // These are for wait_for_work().
745 bool has_blocked
= false;
746 mythread_condtime wait_abs
= { 0 };
749 mythread_sync(coder
->mutex
) {
750 // Check for Block encoder errors.
751 ret
= coder
->thread_error
;
752 if (ret
!= LZMA_OK
) {
753 assert(ret
!= LZMA_STREAM_END
);
754 break; // Break out of mythread_sync.
757 // Try to read compressed data to out[].
758 ret
= lzma_outq_read(&coder
->outq
, allocator
,
759 out
, out_pos
, out_size
,
764 if (ret
== LZMA_STREAM_END
) {
765 // End of Block. Add it to the Index.
766 ret
= lzma_index_append(coder
->index
,
767 allocator
, unpadded_size
,
769 if (ret
!= LZMA_OK
) {
770 threads_stop(coder
, false);
774 // If we didn't fill the output buffer yet,
775 // try to read more data. Maybe the next
776 // outbuf has been finished already too.
777 if (*out_pos
< out_size
)
781 if (ret
!= LZMA_OK
) {
782 // coder->thread_error was set.
783 threads_stop(coder
, false);
787 // Try to give uncompressed data to a worker thread.
788 ret
= stream_encode_in(coder
, allocator
,
789 in
, in_pos
, in_size
, action
);
790 if (ret
!= LZMA_OK
) {
791 threads_stop(coder
, false);
795 // See if we should wait or return.
797 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
798 if (*in_pos
== in_size
) {
799 // LZMA_RUN: More data is probably coming
800 // so return to let the caller fill the
802 if (action
== LZMA_RUN
)
805 // LZMA_FULL_BARRIER: The same as with
806 // LZMA_RUN but tell the caller that the
807 // barrier was completed.
808 if (action
== LZMA_FULL_BARRIER
)
809 return LZMA_STREAM_END
;
811 // Finishing or flushing isn't completed until
812 // all input data has been encoded and copied
813 // to the output buffer.
814 if (lzma_outq_is_empty(&coder
->outq
)) {
815 // LZMA_FINISH: Continue to encode
817 if (action
== LZMA_FINISH
)
820 // LZMA_FULL_FLUSH: Return to tell
821 // the caller that flushing was
823 if (action
== LZMA_FULL_FLUSH
)
824 return LZMA_STREAM_END
;
828 // Return if there is no output space left.
829 // This check must be done after testing the input
830 // buffer, because we might want to use a different
832 if (*out_pos
== out_size
)
835 // Neither in nor out has been used completely.
836 // Wait until there's something we can do.
837 if (wait_for_work(coder
, &wait_abs
, &has_blocked
,
839 return LZMA_TIMED_OUT
;
842 // All Blocks have been encoded and the threads have stopped.
843 // Prepare to encode the Index field.
844 return_if_error(lzma_index_encoder_init(
845 &coder
->index_encoder
, allocator
,
847 coder
->sequence
= SEQ_INDEX
;
849 // Update the progress info to take the Index and
850 // Stream Footer into account. Those are very fast to encode
851 // so in terms of progress information they can be thought
852 // to be ready to be copied out.
853 coder
->progress_out
+= lzma_index_size(coder
->index
)
854 + LZMA_STREAM_HEADER_SIZE
;
860 // Call the Index encoder. It doesn't take any input, so
861 // those pointers can be NULL.
862 const lzma_ret ret
= coder
->index_encoder
.code(
863 coder
->index_encoder
.coder
, allocator
,
865 out
, out_pos
, out_size
, LZMA_RUN
);
866 if (ret
!= LZMA_STREAM_END
)
869 // Encode the Stream Footer into coder->buffer.
870 coder
->stream_flags
.backward_size
871 = lzma_index_size(coder
->index
);
872 if (lzma_stream_footer_encode(&coder
->stream_flags
,
873 coder
->header
) != LZMA_OK
)
874 return LZMA_PROG_ERROR
;
876 coder
->sequence
= SEQ_STREAM_FOOTER
;
881 case SEQ_STREAM_FOOTER
:
882 lzma_bufcpy(coder
->header
, &coder
->header_pos
,
883 sizeof(coder
->header
),
884 out
, out_pos
, out_size
);
885 return coder
->header_pos
< sizeof(coder
->header
)
886 ? LZMA_OK
: LZMA_STREAM_END
;
890 return LZMA_PROG_ERROR
;
895 stream_encoder_mt_end(void *coder_ptr
, const lzma_allocator
*allocator
)
897 lzma_stream_coder
*coder
= coder_ptr
;
899 // Threads must be killed before the output queue can be freed.
900 threads_end(coder
, allocator
);
901 lzma_outq_end(&coder
->outq
, allocator
);
903 lzma_filters_free(coder
->filters
, allocator
);
904 lzma_filters_free(coder
->filters_cache
, allocator
);
906 lzma_next_end(&coder
->index_encoder
, allocator
);
907 lzma_index_end(coder
->index
, allocator
);
909 mythread_cond_destroy(&coder
->cond
);
910 mythread_mutex_destroy(&coder
->mutex
);
912 lzma_free(coder
, allocator
);
918 stream_encoder_mt_update(void *coder_ptr
, const lzma_allocator
*allocator
,
919 const lzma_filter
*filters
,
920 const lzma_filter
*reversed_filters
921 lzma_attribute((__unused__
)))
923 lzma_stream_coder
*coder
= coder_ptr
;
925 // Applications shouldn't attempt to change the options when
926 // we are already encoding the Index or Stream Footer.
927 if (coder
->sequence
> SEQ_BLOCK
)
928 return LZMA_PROG_ERROR
;
930 // For now the threaded encoder doesn't support changing
931 // the options in the middle of a Block.
932 if (coder
->thr
!= NULL
)
933 return LZMA_PROG_ERROR
;
935 // Check if the filter chain seems mostly valid. See the comment
936 // in stream_encoder_mt_init().
937 if (lzma_raw_encoder_memusage(filters
) == UINT64_MAX
)
938 return LZMA_OPTIONS_ERROR
;
940 // Make a copy to a temporary buffer first. This way the encoder
941 // state stays unchanged if an error occurs in lzma_filters_copy().
942 lzma_filter temp
[LZMA_FILTERS_MAX
+ 1];
943 return_if_error(lzma_filters_copy(filters
, temp
, allocator
));
945 // Free the options of the old chain as well as the cache.
946 lzma_filters_free(coder
->filters
, allocator
);
947 lzma_filters_free(coder
->filters_cache
, allocator
);
949 // Copy the new filter chain in place.
950 memcpy(coder
->filters
, temp
, sizeof(temp
));
956 /// Options handling for lzma_stream_encoder_mt_init() and
957 /// lzma_stream_encoder_mt_memusage()
959 get_options(const lzma_mt
*options
, lzma_options_easy
*opt_easy
,
960 const lzma_filter
**filters
, uint64_t *block_size
,
961 uint64_t *outbuf_size_max
)
963 // Validate some of the options.
965 return LZMA_PROG_ERROR
;
967 if (options
->flags
!= 0 || options
->threads
== 0
968 || options
->threads
> LZMA_THREADS_MAX
)
969 return LZMA_OPTIONS_ERROR
;
971 if (options
->filters
!= NULL
) {
972 // Filter chain was given, use it as is.
973 *filters
= options
->filters
;
976 if (lzma_easy_preset(opt_easy
, options
->preset
))
977 return LZMA_OPTIONS_ERROR
;
979 *filters
= opt_easy
->filters
;
983 if (options
->block_size
> 0) {
984 if (options
->block_size
> BLOCK_SIZE_MAX
)
985 return LZMA_OPTIONS_ERROR
;
987 *block_size
= options
->block_size
;
989 // Determine the Block size from the filter chain.
990 *block_size
= lzma_mt_block_size(*filters
);
991 if (*block_size
== 0)
992 return LZMA_OPTIONS_ERROR
;
994 assert(*block_size
<= BLOCK_SIZE_MAX
);
997 // Calculate the maximum amount output that a single output buffer
998 // may need to hold. This is the same as the maximum total size of
1000 *outbuf_size_max
= lzma_block_buffer_bound64(*block_size
);
1001 if (*outbuf_size_max
== 0)
1002 return LZMA_MEM_ERROR
;
1009 get_progress(void *coder_ptr
, uint64_t *progress_in
, uint64_t *progress_out
)
1011 lzma_stream_coder
*coder
= coder_ptr
;
1013 // Lock coder->mutex to prevent finishing threads from moving their
1014 // progress info from the worker_thread structure to lzma_stream_coder.
1015 mythread_sync(coder
->mutex
) {
1016 *progress_in
= coder
->progress_in
;
1017 *progress_out
= coder
->progress_out
;
1019 for (size_t i
= 0; i
< coder
->threads_initialized
; ++i
) {
1020 mythread_sync(coder
->threads
[i
].mutex
) {
1021 *progress_in
+= coder
->threads
[i
].progress_in
;
1022 *progress_out
+= coder
->threads
[i
]
1033 stream_encoder_mt_init(lzma_next_coder
*next
, const lzma_allocator
*allocator
,
1034 const lzma_mt
*options
)
1036 lzma_next_coder_init(&stream_encoder_mt_init
, next
, allocator
);
1038 // Get the filter chain.
1039 lzma_options_easy easy
;
1040 const lzma_filter
*filters
;
1041 uint64_t block_size
;
1042 uint64_t outbuf_size_max
;
1043 return_if_error(get_options(options
, &easy
, &filters
,
1044 &block_size
, &outbuf_size_max
));
1046 #if SIZE_MAX < UINT64_MAX
1047 if (block_size
> SIZE_MAX
|| outbuf_size_max
> SIZE_MAX
)
1048 return LZMA_MEM_ERROR
;
1051 // Validate the filter chain so that we can give an error in this
1052 // function instead of delaying it to the first call to lzma_code().
1053 // The memory usage calculation verifies the filter chain as
1054 // a side effect so we take advantage of that. It's not a perfect
1055 // check though as raw encoder allows LZMA1 too but such problems
1056 // will be caught eventually with Block Header encoder.
1057 if (lzma_raw_encoder_memusage(filters
) == UINT64_MAX
)
1058 return LZMA_OPTIONS_ERROR
;
1060 // Validate the Check ID.
1061 if ((unsigned int)(options
->check
) > LZMA_CHECK_ID_MAX
)
1062 return LZMA_PROG_ERROR
;
1064 if (!lzma_check_is_supported(options
->check
))
1065 return LZMA_UNSUPPORTED_CHECK
;
1067 // Allocate and initialize the base structure if needed.
1068 lzma_stream_coder
*coder
= next
->coder
;
1069 if (coder
== NULL
) {
1070 coder
= lzma_alloc(sizeof(lzma_stream_coder
), allocator
);
1072 return LZMA_MEM_ERROR
;
1074 next
->coder
= coder
;
1076 // For the mutex and condition variable initializations
1077 // the error handling has to be done here because
1078 // stream_encoder_mt_end() doesn't know if they have
1079 // already been initialized or not.
1080 if (mythread_mutex_init(&coder
->mutex
)) {
1081 lzma_free(coder
, allocator
);
1083 return LZMA_MEM_ERROR
;
1086 if (mythread_cond_init(&coder
->cond
)) {
1087 mythread_mutex_destroy(&coder
->mutex
);
1088 lzma_free(coder
, allocator
);
1090 return LZMA_MEM_ERROR
;
1093 next
->code
= &stream_encode_mt
;
1094 next
->end
= &stream_encoder_mt_end
;
1095 next
->get_progress
= &get_progress
;
1096 next
->update
= &stream_encoder_mt_update
;
1098 coder
->filters
[0].id
= LZMA_VLI_UNKNOWN
;
1099 coder
->filters_cache
[0].id
= LZMA_VLI_UNKNOWN
;
1100 coder
->index_encoder
= LZMA_NEXT_CODER_INIT
;
1101 coder
->index
= NULL
;
1102 memzero(&coder
->outq
, sizeof(coder
->outq
));
1103 coder
->threads
= NULL
;
1104 coder
->threads_max
= 0;
1105 coder
->threads_initialized
= 0;
1108 // Basic initializations
1109 coder
->sequence
= SEQ_STREAM_HEADER
;
1110 coder
->block_size
= (size_t)(block_size
);
1111 coder
->outbuf_alloc_size
= (size_t)(outbuf_size_max
);
1112 coder
->thread_error
= LZMA_OK
;
1115 // Allocate the thread-specific base structures.
1116 assert(options
->threads
> 0);
1117 if (coder
->threads_max
!= options
->threads
) {
1118 threads_end(coder
, allocator
);
1120 coder
->threads
= NULL
;
1121 coder
->threads_max
= 0;
1123 coder
->threads_initialized
= 0;
1124 coder
->threads_free
= NULL
;
1126 coder
->threads
= lzma_alloc(
1127 options
->threads
* sizeof(worker_thread
),
1129 if (coder
->threads
== NULL
)
1130 return LZMA_MEM_ERROR
;
1132 coder
->threads_max
= options
->threads
;
1134 // Reuse the old structures and threads. Tell the running
1135 // threads to stop and wait until they have stopped.
1136 threads_stop(coder
, true);
1140 return_if_error(lzma_outq_init(&coder
->outq
, allocator
,
1144 coder
->timeout
= options
->timeout
;
1146 // Free the old filter chain and the cache.
1147 lzma_filters_free(coder
->filters
, allocator
);
1148 lzma_filters_free(coder
->filters_cache
, allocator
);
1150 // Copy the new filter chain.
1151 return_if_error(lzma_filters_copy(
1152 filters
, coder
->filters
, allocator
));
1155 lzma_index_end(coder
->index
, allocator
);
1156 coder
->index
= lzma_index_init(allocator
);
1157 if (coder
->index
== NULL
)
1158 return LZMA_MEM_ERROR
;
1161 coder
->stream_flags
.version
= 0;
1162 coder
->stream_flags
.check
= options
->check
;
1163 return_if_error(lzma_stream_header_encode(
1164 &coder
->stream_flags
, coder
->header
));
1166 coder
->header_pos
= 0;
1169 coder
->progress_in
= 0;
1170 coder
->progress_out
= LZMA_STREAM_HEADER_SIZE
;
1176 #ifdef HAVE_SYMBOL_VERSIONS_LINUX
1177 // These are for compatibility with binaries linked against liblzma that
1178 // has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7.
1179 // Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2
1180 // but it has been added here anyway since someone might misread the
1181 // RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist.
1182 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha",
1183 lzma_ret
, lzma_stream_encoder_mt_512a
)(
1184 lzma_stream
*strm
, const lzma_mt
*options
)
1185 lzma_nothrow lzma_attr_warn_unused_result
1186 __attribute__((__alias__("lzma_stream_encoder_mt_52")));
1188 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2",
1189 lzma_ret
, lzma_stream_encoder_mt_522
)(
1190 lzma_stream
*strm
, const lzma_mt
*options
)
1191 lzma_nothrow lzma_attr_warn_unused_result
1192 __attribute__((__alias__("lzma_stream_encoder_mt_52")));
1194 LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2",
1195 lzma_ret
, lzma_stream_encoder_mt_52
)(
1196 lzma_stream
*strm
, const lzma_mt
*options
)
1197 lzma_nothrow lzma_attr_warn_unused_result
;
1199 #define lzma_stream_encoder_mt lzma_stream_encoder_mt_52
1201 extern LZMA_API(lzma_ret
)
1202 lzma_stream_encoder_mt(lzma_stream
*strm
, const lzma_mt
*options
)
1204 lzma_next_strm_init(stream_encoder_mt_init
, strm
, options
);
1206 strm
->internal
->supported_actions
[LZMA_RUN
] = true;
1207 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1208 strm
->internal
->supported_actions
[LZMA_FULL_FLUSH
] = true;
1209 strm
->internal
->supported_actions
[LZMA_FULL_BARRIER
] = true;
1210 strm
->internal
->supported_actions
[LZMA_FINISH
] = true;
1216 #ifdef HAVE_SYMBOL_VERSIONS_LINUX
1217 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha",
1218 uint64_t, lzma_stream_encoder_mt_memusage_512a
)(
1219 const lzma_mt
*options
) lzma_nothrow lzma_attr_pure
1220 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1222 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2",
1223 uint64_t, lzma_stream_encoder_mt_memusage_522
)(
1224 const lzma_mt
*options
) lzma_nothrow lzma_attr_pure
1225 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1227 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2",
1228 uint64_t, lzma_stream_encoder_mt_memusage_52
)(
1229 const lzma_mt
*options
) lzma_nothrow lzma_attr_pure
;
1231 #define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52
1233 // This function name is a monster but it's consistent with the older
1234 // monster names. :-( 31 chars is the max that C99 requires so in that
1235 // sense it's not too long. ;-)
1236 extern LZMA_API(uint64_t)
1237 lzma_stream_encoder_mt_memusage(const lzma_mt
*options
)
1239 lzma_options_easy easy
;
1240 const lzma_filter
*filters
;
1241 uint64_t block_size
;
1242 uint64_t outbuf_size_max
;
1244 if (get_options(options
, &easy
, &filters
, &block_size
,
1245 &outbuf_size_max
) != LZMA_OK
)
1248 // Memory usage of the input buffers
1249 const uint64_t inbuf_memusage
= options
->threads
* block_size
;
1251 // Memory usage of the filter encoders
1252 uint64_t filters_memusage
= lzma_raw_encoder_memusage(filters
);
1253 if (filters_memusage
== UINT64_MAX
)
1256 filters_memusage
*= options
->threads
;
1258 // Memory usage of the output queue
1259 const uint64_t outq_memusage
= lzma_outq_memusage(
1260 outbuf_size_max
, options
->threads
);
1261 if (outq_memusage
== UINT64_MAX
)
1264 // Sum them with overflow checking.
1265 uint64_t total_memusage
= LZMA_MEMUSAGE_BASE
1266 + sizeof(lzma_stream_coder
)
1267 + options
->threads
* sizeof(worker_thread
);
1269 if (UINT64_MAX
- total_memusage
< inbuf_memusage
)
1272 total_memusage
+= inbuf_memusage
;
1274 if (UINT64_MAX
- total_memusage
< filters_memusage
)
1277 total_memusage
+= filters_memusage
;
1279 if (UINT64_MAX
- total_memusage
< outq_memusage
)
1282 return total_memusage
+ outq_memusage
;