Fix version.sh compatiblity with Solaris
[xz/debian.git] / src / liblzma / common / stream_encoder_mt.c
blobf0fef1523318f3aeab06e64fa54a6015983c154d
1 // SPDX-License-Identifier: 0BSD
3 ///////////////////////////////////////////////////////////////////////////////
4 //
5 /// \file stream_encoder_mt.c
6 /// \brief Multithreaded .xz Stream encoder
7 //
8 // Author: Lasse Collin
9 //
10 ///////////////////////////////////////////////////////////////////////////////
12 #include "filter_encoder.h"
13 #include "easy_preset.h"
14 #include "block_encoder.h"
15 #include "block_buffer_encoder.h"
16 #include "index_encoder.h"
17 #include "outqueue.h"
20 /// Maximum supported block size. This makes it simpler to prevent integer
21 /// overflows if we are given unusually large block size.
22 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
25 typedef enum {
26 /// Waiting for work.
27 THR_IDLE,
29 /// Encoding is in progress.
30 THR_RUN,
32 /// Encoding is in progress but no more input data will
33 /// be read.
34 THR_FINISH,
36 /// The main thread wants the thread to stop whatever it was doing
37 /// but not exit.
38 THR_STOP,
40 /// The main thread wants the thread to exit. We could use
41 /// cancellation but since there's stopped anyway, this is lazier.
42 THR_EXIT,
44 } worker_state;
46 typedef struct lzma_stream_coder_s lzma_stream_coder;
48 typedef struct worker_thread_s worker_thread;
49 struct worker_thread_s {
50 worker_state state;
52 /// Input buffer of coder->block_size bytes. The main thread will
53 /// put new input into this and update in_size accordingly. Once
54 /// no more input is coming, state will be set to THR_FINISH.
55 uint8_t *in;
57 /// Amount of data available in the input buffer. This is modified
58 /// only by the main thread.
59 size_t in_size;
61 /// Output buffer for this thread. This is set by the main
62 /// thread every time a new Block is started with this thread
63 /// structure.
64 lzma_outbuf *outbuf;
66 /// Pointer to the main structure is needed when putting this
67 /// thread back to the stack of free threads.
68 lzma_stream_coder *coder;
70 /// The allocator is set by the main thread. Since a copy of the
71 /// pointer is kept here, the application must not change the
72 /// allocator before calling lzma_end().
73 const lzma_allocator *allocator;
75 /// Amount of uncompressed data that has already been compressed.
76 uint64_t progress_in;
78 /// Amount of compressed data that is ready.
79 uint64_t progress_out;
81 /// Block encoder
82 lzma_next_coder block_encoder;
84 /// Compression options for this Block
85 lzma_block block_options;
87 /// Filter chain for this thread. By copying the filters array
88 /// to each thread it is possible to change the filter chain
89 /// between Blocks using lzma_filters_update().
90 lzma_filter filters[LZMA_FILTERS_MAX + 1];
92 /// Next structure in the stack of free worker threads.
93 worker_thread *next;
95 mythread_mutex mutex;
96 mythread_cond cond;
98 /// The ID of this thread is used to join the thread
99 /// when it's not needed anymore.
100 mythread thread_id;
104 struct lzma_stream_coder_s {
105 enum {
106 SEQ_STREAM_HEADER,
107 SEQ_BLOCK,
108 SEQ_INDEX,
109 SEQ_STREAM_FOOTER,
110 } sequence;
112 /// Start a new Block every block_size bytes of input unless
113 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
114 size_t block_size;
116 /// The filter chain to use for the next Block.
117 /// This can be updated using lzma_filters_update()
118 /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH.
119 lzma_filter filters[LZMA_FILTERS_MAX + 1];
121 /// A copy of filters[] will be put here when attempting to get
122 /// a new worker thread. This will be copied to a worker thread
123 /// when a thread becomes free and then this cache is marked as
124 /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache
125 /// the filter options from filters[] would get uselessly copied
126 /// multiple times (allocated and freed) when waiting for a new free
127 /// worker thread.
129 /// This is freed if filters[] is updated via lzma_filters_update().
130 lzma_filter filters_cache[LZMA_FILTERS_MAX + 1];
133 /// Index to hold sizes of the Blocks
134 lzma_index *index;
136 /// Index encoder
137 lzma_next_coder index_encoder;
140 /// Stream Flags for encoding the Stream Header and Stream Footer.
141 lzma_stream_flags stream_flags;
143 /// Buffer to hold Stream Header and Stream Footer.
144 uint8_t header[LZMA_STREAM_HEADER_SIZE];
146 /// Read position in header[]
147 size_t header_pos;
150 /// Output buffer queue for compressed data
151 lzma_outq outq;
153 /// How much memory to allocate for each lzma_outbuf.buf
154 size_t outbuf_alloc_size;
157 /// Maximum wait time if cannot use all the input and cannot
158 /// fill the output buffer. This is in milliseconds.
159 uint32_t timeout;
162 /// Error code from a worker thread
163 lzma_ret thread_error;
165 /// Array of allocated thread-specific structures
166 worker_thread *threads;
168 /// Number of structures in "threads" above. This is also the
169 /// number of threads that will be created at maximum.
170 uint32_t threads_max;
172 /// Number of thread structures that have been initialized, and
173 /// thus the number of worker threads actually created so far.
174 uint32_t threads_initialized;
176 /// Stack of free threads. When a thread finishes, it puts itself
177 /// back into this stack. This starts as empty because threads
178 /// are created only when actually needed.
179 worker_thread *threads_free;
181 /// The most recent worker thread to which the main thread writes
182 /// the new input from the application.
183 worker_thread *thr;
186 /// Amount of uncompressed data in Blocks that have already
187 /// been finished.
188 uint64_t progress_in;
190 /// Amount of compressed data in Stream Header + Blocks that
191 /// have already been finished.
192 uint64_t progress_out;
195 mythread_mutex mutex;
196 mythread_cond cond;
200 /// Tell the main thread that something has gone wrong.
201 static void
202 worker_error(worker_thread *thr, lzma_ret ret)
204 assert(ret != LZMA_OK);
205 assert(ret != LZMA_STREAM_END);
207 mythread_sync(thr->coder->mutex) {
208 if (thr->coder->thread_error == LZMA_OK)
209 thr->coder->thread_error = ret;
211 mythread_cond_signal(&thr->coder->cond);
214 return;
218 static worker_state
219 worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
221 assert(thr->progress_in == 0);
222 assert(thr->progress_out == 0);
224 // Set the Block options.
225 thr->block_options = (lzma_block){
226 .version = 0,
227 .check = thr->coder->stream_flags.check,
228 .compressed_size = thr->outbuf->allocated,
229 .uncompressed_size = thr->coder->block_size,
230 .filters = thr->filters,
233 // Calculate maximum size of the Block Header. This amount is
234 // reserved in the beginning of the buffer so that Block Header
235 // along with Compressed Size and Uncompressed Size can be
236 // written there.
237 lzma_ret ret = lzma_block_header_size(&thr->block_options);
238 if (ret != LZMA_OK) {
239 worker_error(thr, ret);
240 return THR_STOP;
243 // Initialize the Block encoder.
244 ret = lzma_block_encoder_init(&thr->block_encoder,
245 thr->allocator, &thr->block_options);
246 if (ret != LZMA_OK) {
247 worker_error(thr, ret);
248 return THR_STOP;
251 size_t in_pos = 0;
252 size_t in_size = 0;
254 *out_pos = thr->block_options.header_size;
255 const size_t out_size = thr->outbuf->allocated;
257 do {
258 mythread_sync(thr->mutex) {
259 // Store in_pos and *out_pos into *thr so that
260 // an application may read them via
261 // lzma_get_progress() to get progress information.
263 // NOTE: These aren't updated when the encoding
264 // finishes. Instead, the final values are taken
265 // later from thr->outbuf.
266 thr->progress_in = in_pos;
267 thr->progress_out = *out_pos;
269 while (in_size == thr->in_size
270 && thr->state == THR_RUN)
271 mythread_cond_wait(&thr->cond, &thr->mutex);
273 state = thr->state;
274 in_size = thr->in_size;
277 // Return if we were asked to stop or exit.
278 if (state >= THR_STOP)
279 return state;
281 lzma_action action = state == THR_FINISH
282 ? LZMA_FINISH : LZMA_RUN;
284 // Limit the amount of input given to the Block encoder
285 // at once. This way this thread can react fairly quickly
286 // if the main thread wants us to stop or exit.
287 static const size_t in_chunk_max = 16384;
288 size_t in_limit = in_size;
289 if (in_size - in_pos > in_chunk_max) {
290 in_limit = in_pos + in_chunk_max;
291 action = LZMA_RUN;
294 ret = thr->block_encoder.code(
295 thr->block_encoder.coder, thr->allocator,
296 thr->in, &in_pos, in_limit, thr->outbuf->buf,
297 out_pos, out_size, action);
298 } while (ret == LZMA_OK && *out_pos < out_size);
300 switch (ret) {
301 case LZMA_STREAM_END:
302 assert(state == THR_FINISH);
304 // Encode the Block Header. By doing it after
305 // the compression, we can store the Compressed Size
306 // and Uncompressed Size fields.
307 ret = lzma_block_header_encode(&thr->block_options,
308 thr->outbuf->buf);
309 if (ret != LZMA_OK) {
310 worker_error(thr, ret);
311 return THR_STOP;
314 break;
316 case LZMA_OK:
317 // The data was incompressible. Encode it using uncompressed
318 // LZMA2 chunks.
320 // First wait that we have gotten all the input.
321 mythread_sync(thr->mutex) {
322 while (thr->state == THR_RUN)
323 mythread_cond_wait(&thr->cond, &thr->mutex);
325 state = thr->state;
326 in_size = thr->in_size;
329 if (state >= THR_STOP)
330 return state;
332 // Do the encoding. This takes care of the Block Header too.
333 *out_pos = 0;
334 ret = lzma_block_uncomp_encode(&thr->block_options,
335 thr->in, in_size, thr->outbuf->buf,
336 out_pos, out_size);
338 // It shouldn't fail.
339 if (ret != LZMA_OK) {
340 worker_error(thr, LZMA_PROG_ERROR);
341 return THR_STOP;
344 break;
346 default:
347 worker_error(thr, ret);
348 return THR_STOP;
351 // Set the size information that will be read by the main thread
352 // to write the Index field.
353 thr->outbuf->unpadded_size
354 = lzma_block_unpadded_size(&thr->block_options);
355 assert(thr->outbuf->unpadded_size != 0);
356 thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
358 return THR_FINISH;
362 static MYTHREAD_RET_TYPE
363 worker_start(void *thr_ptr)
365 worker_thread *thr = thr_ptr;
366 worker_state state = THR_IDLE; // Init to silence a warning
368 while (true) {
369 // Wait for work.
370 mythread_sync(thr->mutex) {
371 while (true) {
372 // The thread is already idle so if we are
373 // requested to stop, just set the state.
374 if (thr->state == THR_STOP) {
375 thr->state = THR_IDLE;
376 mythread_cond_signal(&thr->cond);
379 state = thr->state;
380 if (state != THR_IDLE)
381 break;
383 mythread_cond_wait(&thr->cond, &thr->mutex);
387 size_t out_pos = 0;
389 assert(state != THR_IDLE);
390 assert(state != THR_STOP);
392 if (state <= THR_FINISH)
393 state = worker_encode(thr, &out_pos, state);
395 if (state == THR_EXIT)
396 break;
398 // Mark the thread as idle unless the main thread has
399 // told us to exit. Signal is needed for the case
400 // where the main thread is waiting for the threads to stop.
401 mythread_sync(thr->mutex) {
402 if (thr->state != THR_EXIT) {
403 thr->state = THR_IDLE;
404 mythread_cond_signal(&thr->cond);
408 mythread_sync(thr->coder->mutex) {
409 // If no errors occurred, make the encoded data
410 // available to be copied out.
411 if (state == THR_FINISH) {
412 thr->outbuf->pos = out_pos;
413 thr->outbuf->finished = true;
416 // Update the main progress info.
417 thr->coder->progress_in
418 += thr->outbuf->uncompressed_size;
419 thr->coder->progress_out += out_pos;
420 thr->progress_in = 0;
421 thr->progress_out = 0;
423 // Return this thread to the stack of free threads.
424 thr->next = thr->coder->threads_free;
425 thr->coder->threads_free = thr;
427 mythread_cond_signal(&thr->coder->cond);
431 // Exiting, free the resources.
432 lzma_filters_free(thr->filters, thr->allocator);
434 mythread_mutex_destroy(&thr->mutex);
435 mythread_cond_destroy(&thr->cond);
437 lzma_next_end(&thr->block_encoder, thr->allocator);
438 lzma_free(thr->in, thr->allocator);
439 return MYTHREAD_RET_VALUE;
443 /// Make the threads stop but not exit. Optionally wait for them to stop.
444 static void
445 threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
447 // Tell the threads to stop.
448 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
449 mythread_sync(coder->threads[i].mutex) {
450 coder->threads[i].state = THR_STOP;
451 mythread_cond_signal(&coder->threads[i].cond);
455 if (!wait_for_threads)
456 return;
458 // Wait for the threads to settle in the idle state.
459 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
460 mythread_sync(coder->threads[i].mutex) {
461 while (coder->threads[i].state != THR_IDLE)
462 mythread_cond_wait(&coder->threads[i].cond,
463 &coder->threads[i].mutex);
467 return;
471 /// Stop the threads and free the resources associated with them.
472 /// Wait until the threads have exited.
473 static void
474 threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
476 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
477 mythread_sync(coder->threads[i].mutex) {
478 coder->threads[i].state = THR_EXIT;
479 mythread_cond_signal(&coder->threads[i].cond);
483 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
484 int ret = mythread_join(coder->threads[i].thread_id);
485 assert(ret == 0);
486 (void)ret;
489 lzma_free(coder->threads, allocator);
490 return;
494 /// Initialize a new worker_thread structure and create a new thread.
495 static lzma_ret
496 initialize_new_thread(lzma_stream_coder *coder,
497 const lzma_allocator *allocator)
499 worker_thread *thr = &coder->threads[coder->threads_initialized];
501 thr->in = lzma_alloc(coder->block_size, allocator);
502 if (thr->in == NULL)
503 return LZMA_MEM_ERROR;
505 if (mythread_mutex_init(&thr->mutex))
506 goto error_mutex;
508 if (mythread_cond_init(&thr->cond))
509 goto error_cond;
511 thr->state = THR_IDLE;
512 thr->allocator = allocator;
513 thr->coder = coder;
514 thr->progress_in = 0;
515 thr->progress_out = 0;
516 thr->block_encoder = LZMA_NEXT_CODER_INIT;
517 thr->filters[0].id = LZMA_VLI_UNKNOWN;
519 if (mythread_create(&thr->thread_id, &worker_start, thr))
520 goto error_thread;
522 ++coder->threads_initialized;
523 coder->thr = thr;
525 return LZMA_OK;
527 error_thread:
528 mythread_cond_destroy(&thr->cond);
530 error_cond:
531 mythread_mutex_destroy(&thr->mutex);
533 error_mutex:
534 lzma_free(thr->in, allocator);
535 return LZMA_MEM_ERROR;
539 static lzma_ret
540 get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
542 // If there are no free output subqueues, there is no
543 // point to try getting a thread.
544 if (!lzma_outq_has_buf(&coder->outq))
545 return LZMA_OK;
547 // That's also true if we cannot allocate memory for the output
548 // buffer in the output queue.
549 return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
550 coder->outbuf_alloc_size));
552 // Make a thread-specific copy of the filter chain. Put it in
553 // the cache array first so that if we cannot get a new thread yet,
554 // the allocation is ready when we try again.
555 if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN)
556 return_if_error(lzma_filters_copy(
557 coder->filters, coder->filters_cache, allocator));
559 // If there is a free structure on the stack, use it.
560 mythread_sync(coder->mutex) {
561 if (coder->threads_free != NULL) {
562 coder->thr = coder->threads_free;
563 coder->threads_free = coder->threads_free->next;
567 if (coder->thr == NULL) {
568 // If there are no uninitialized structures left, return.
569 if (coder->threads_initialized == coder->threads_max)
570 return LZMA_OK;
572 // Initialize a new thread.
573 return_if_error(initialize_new_thread(coder, allocator));
576 // Reset the parts of the thread state that have to be done
577 // in the main thread.
578 mythread_sync(coder->thr->mutex) {
579 coder->thr->state = THR_RUN;
580 coder->thr->in_size = 0;
581 coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
583 // Free the old thread-specific filter options and replace
584 // them with the already-allocated new options from
585 // coder->filters_cache[]. Then mark the cache as empty.
586 lzma_filters_free(coder->thr->filters, allocator);
587 memcpy(coder->thr->filters, coder->filters_cache,
588 sizeof(coder->filters_cache));
589 coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
591 mythread_cond_signal(&coder->thr->cond);
594 return LZMA_OK;
598 static lzma_ret
599 stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
600 const uint8_t *restrict in, size_t *restrict in_pos,
601 size_t in_size, lzma_action action)
603 while (*in_pos < in_size
604 || (coder->thr != NULL && action != LZMA_RUN)) {
605 if (coder->thr == NULL) {
606 // Get a new thread.
607 const lzma_ret ret = get_thread(coder, allocator);
608 if (coder->thr == NULL)
609 return ret;
612 // Copy the input data to thread's buffer.
613 size_t thr_in_size = coder->thr->in_size;
614 lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
615 &thr_in_size, coder->block_size);
617 // Tell the Block encoder to finish if
618 // - it has got block_size bytes of input; or
619 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
620 // or LZMA_FULL_BARRIER was used.
622 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
623 const bool finish = thr_in_size == coder->block_size
624 || (*in_pos == in_size && action != LZMA_RUN);
626 bool block_error = false;
628 mythread_sync(coder->thr->mutex) {
629 if (coder->thr->state == THR_IDLE) {
630 // Something has gone wrong with the Block
631 // encoder. It has set coder->thread_error
632 // which we will read a few lines later.
633 block_error = true;
634 } else {
635 // Tell the Block encoder its new amount
636 // of input and update the state if needed.
637 coder->thr->in_size = thr_in_size;
639 if (finish)
640 coder->thr->state = THR_FINISH;
642 mythread_cond_signal(&coder->thr->cond);
646 if (block_error) {
647 lzma_ret ret = LZMA_OK; // Init to silence a warning.
649 mythread_sync(coder->mutex) {
650 ret = coder->thread_error;
653 return ret;
656 if (finish)
657 coder->thr = NULL;
660 return LZMA_OK;
664 /// Wait until more input can be consumed, more output can be read, or
665 /// an optional timeout is reached.
666 static bool
667 wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
668 bool *has_blocked, bool has_input)
670 if (coder->timeout != 0 && !*has_blocked) {
671 // Every time when stream_encode_mt() is called via
672 // lzma_code(), *has_blocked starts as false. We set it
673 // to true here and calculate the absolute time when
674 // we must return if there's nothing to do.
676 // This way if we block multiple times for short moments
677 // less than "timeout" milliseconds, we will return once
678 // "timeout" amount of time has passed since the *first*
679 // blocking occurred. If the absolute time was calculated
680 // again every time we block, "timeout" would effectively
681 // be meaningless if we never consecutively block longer
682 // than "timeout" ms.
683 *has_blocked = true;
684 mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
687 bool timed_out = false;
689 mythread_sync(coder->mutex) {
690 // There are four things that we wait. If one of them
691 // becomes possible, we return.
692 // - If there is input left, we need to get a free
693 // worker thread and an output buffer for it.
694 // - Data ready to be read from the output queue.
695 // - A worker thread indicates an error.
696 // - Time out occurs.
697 while ((!has_input || coder->threads_free == NULL
698 || !lzma_outq_has_buf(&coder->outq))
699 && !lzma_outq_is_readable(&coder->outq)
700 && coder->thread_error == LZMA_OK
701 && !timed_out) {
702 if (coder->timeout != 0)
703 timed_out = mythread_cond_timedwait(
704 &coder->cond, &coder->mutex,
705 wait_abs) != 0;
706 else
707 mythread_cond_wait(&coder->cond,
708 &coder->mutex);
712 return timed_out;
716 static lzma_ret
717 stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
718 const uint8_t *restrict in, size_t *restrict in_pos,
719 size_t in_size, uint8_t *restrict out,
720 size_t *restrict out_pos, size_t out_size, lzma_action action)
722 lzma_stream_coder *coder = coder_ptr;
724 switch (coder->sequence) {
725 case SEQ_STREAM_HEADER:
726 lzma_bufcpy(coder->header, &coder->header_pos,
727 sizeof(coder->header),
728 out, out_pos, out_size);
729 if (coder->header_pos < sizeof(coder->header))
730 return LZMA_OK;
732 coder->header_pos = 0;
733 coder->sequence = SEQ_BLOCK;
735 // Fall through
737 case SEQ_BLOCK: {
738 // Initialized to silence warnings.
739 lzma_vli unpadded_size = 0;
740 lzma_vli uncompressed_size = 0;
741 lzma_ret ret = LZMA_OK;
743 // These are for wait_for_work().
744 bool has_blocked = false;
745 mythread_condtime wait_abs = { 0 };
747 while (true) {
748 mythread_sync(coder->mutex) {
749 // Check for Block encoder errors.
750 ret = coder->thread_error;
751 if (ret != LZMA_OK) {
752 assert(ret != LZMA_STREAM_END);
753 break; // Break out of mythread_sync.
756 // Try to read compressed data to out[].
757 ret = lzma_outq_read(&coder->outq, allocator,
758 out, out_pos, out_size,
759 &unpadded_size,
760 &uncompressed_size);
763 if (ret == LZMA_STREAM_END) {
764 // End of Block. Add it to the Index.
765 ret = lzma_index_append(coder->index,
766 allocator, unpadded_size,
767 uncompressed_size);
768 if (ret != LZMA_OK) {
769 threads_stop(coder, false);
770 return ret;
773 // If we didn't fill the output buffer yet,
774 // try to read more data. Maybe the next
775 // outbuf has been finished already too.
776 if (*out_pos < out_size)
777 continue;
780 if (ret != LZMA_OK) {
781 // coder->thread_error was set.
782 threads_stop(coder, false);
783 return ret;
786 // Try to give uncompressed data to a worker thread.
787 ret = stream_encode_in(coder, allocator,
788 in, in_pos, in_size, action);
789 if (ret != LZMA_OK) {
790 threads_stop(coder, false);
791 return ret;
794 // See if we should wait or return.
796 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
797 if (*in_pos == in_size) {
798 // LZMA_RUN: More data is probably coming
799 // so return to let the caller fill the
800 // input buffer.
801 if (action == LZMA_RUN)
802 return LZMA_OK;
804 // LZMA_FULL_BARRIER: The same as with
805 // LZMA_RUN but tell the caller that the
806 // barrier was completed.
807 if (action == LZMA_FULL_BARRIER)
808 return LZMA_STREAM_END;
810 // Finishing or flushing isn't completed until
811 // all input data has been encoded and copied
812 // to the output buffer.
813 if (lzma_outq_is_empty(&coder->outq)) {
814 // LZMA_FINISH: Continue to encode
815 // the Index field.
816 if (action == LZMA_FINISH)
817 break;
819 // LZMA_FULL_FLUSH: Return to tell
820 // the caller that flushing was
821 // completed.
822 if (action == LZMA_FULL_FLUSH)
823 return LZMA_STREAM_END;
827 // Return if there is no output space left.
828 // This check must be done after testing the input
829 // buffer, because we might want to use a different
830 // return code.
831 if (*out_pos == out_size)
832 return LZMA_OK;
834 // Neither in nor out has been used completely.
835 // Wait until there's something we can do.
836 if (wait_for_work(coder, &wait_abs, &has_blocked,
837 *in_pos < in_size))
838 return LZMA_TIMED_OUT;
841 // All Blocks have been encoded and the threads have stopped.
842 // Prepare to encode the Index field.
843 return_if_error(lzma_index_encoder_init(
844 &coder->index_encoder, allocator,
845 coder->index));
846 coder->sequence = SEQ_INDEX;
848 // Update the progress info to take the Index and
849 // Stream Footer into account. Those are very fast to encode
850 // so in terms of progress information they can be thought
851 // to be ready to be copied out.
852 coder->progress_out += lzma_index_size(coder->index)
853 + LZMA_STREAM_HEADER_SIZE;
856 // Fall through
858 case SEQ_INDEX: {
859 // Call the Index encoder. It doesn't take any input, so
860 // those pointers can be NULL.
861 const lzma_ret ret = coder->index_encoder.code(
862 coder->index_encoder.coder, allocator,
863 NULL, NULL, 0,
864 out, out_pos, out_size, LZMA_RUN);
865 if (ret != LZMA_STREAM_END)
866 return ret;
868 // Encode the Stream Footer into coder->buffer.
869 coder->stream_flags.backward_size
870 = lzma_index_size(coder->index);
871 if (lzma_stream_footer_encode(&coder->stream_flags,
872 coder->header) != LZMA_OK)
873 return LZMA_PROG_ERROR;
875 coder->sequence = SEQ_STREAM_FOOTER;
878 // Fall through
880 case SEQ_STREAM_FOOTER:
881 lzma_bufcpy(coder->header, &coder->header_pos,
882 sizeof(coder->header),
883 out, out_pos, out_size);
884 return coder->header_pos < sizeof(coder->header)
885 ? LZMA_OK : LZMA_STREAM_END;
888 assert(0);
889 return LZMA_PROG_ERROR;
893 static void
894 stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
896 lzma_stream_coder *coder = coder_ptr;
898 // Threads must be killed before the output queue can be freed.
899 threads_end(coder, allocator);
900 lzma_outq_end(&coder->outq, allocator);
902 lzma_filters_free(coder->filters, allocator);
903 lzma_filters_free(coder->filters_cache, allocator);
905 lzma_next_end(&coder->index_encoder, allocator);
906 lzma_index_end(coder->index, allocator);
908 mythread_cond_destroy(&coder->cond);
909 mythread_mutex_destroy(&coder->mutex);
911 lzma_free(coder, allocator);
912 return;
916 static lzma_ret
917 stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator,
918 const lzma_filter *filters,
919 const lzma_filter *reversed_filters
920 lzma_attribute((__unused__)))
922 lzma_stream_coder *coder = coder_ptr;
924 // Applications shouldn't attempt to change the options when
925 // we are already encoding the Index or Stream Footer.
926 if (coder->sequence > SEQ_BLOCK)
927 return LZMA_PROG_ERROR;
929 // For now the threaded encoder doesn't support changing
930 // the options in the middle of a Block.
931 if (coder->thr != NULL)
932 return LZMA_PROG_ERROR;
934 // Check if the filter chain seems mostly valid. See the comment
935 // in stream_encoder_mt_init().
936 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
937 return LZMA_OPTIONS_ERROR;
939 // Make a copy to a temporary buffer first. This way the encoder
940 // state stays unchanged if an error occurs in lzma_filters_copy().
941 lzma_filter temp[LZMA_FILTERS_MAX + 1];
942 return_if_error(lzma_filters_copy(filters, temp, allocator));
944 // Free the options of the old chain as well as the cache.
945 lzma_filters_free(coder->filters, allocator);
946 lzma_filters_free(coder->filters_cache, allocator);
948 // Copy the new filter chain in place.
949 memcpy(coder->filters, temp, sizeof(temp));
951 return LZMA_OK;
955 /// Options handling for lzma_stream_encoder_mt_init() and
956 /// lzma_stream_encoder_mt_memusage()
957 static lzma_ret
958 get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
959 const lzma_filter **filters, uint64_t *block_size,
960 uint64_t *outbuf_size_max)
962 // Validate some of the options.
963 if (options == NULL)
964 return LZMA_PROG_ERROR;
966 if (options->flags != 0 || options->threads == 0
967 || options->threads > LZMA_THREADS_MAX)
968 return LZMA_OPTIONS_ERROR;
970 if (options->filters != NULL) {
971 // Filter chain was given, use it as is.
972 *filters = options->filters;
973 } else {
974 // Use a preset.
975 if (lzma_easy_preset(opt_easy, options->preset))
976 return LZMA_OPTIONS_ERROR;
978 *filters = opt_easy->filters;
981 // If the Block size is not set, determine it from the filter chain.
982 if (options->block_size > 0)
983 *block_size = options->block_size;
984 else
985 *block_size = lzma_mt_block_size(*filters);
987 // UINT64_MAX > BLOCK_SIZE_MAX, so the second condition
988 // should be optimized out by any reasonable compiler.
989 // The second condition should be there in the unlikely event that
990 // the macros change and UINT64_MAX < BLOCK_SIZE_MAX.
991 if (*block_size > BLOCK_SIZE_MAX || *block_size == UINT64_MAX)
992 return LZMA_OPTIONS_ERROR;
994 // Calculate the maximum amount output that a single output buffer
995 // may need to hold. This is the same as the maximum total size of
996 // a Block.
997 *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
998 if (*outbuf_size_max == 0)
999 return LZMA_MEM_ERROR;
1001 return LZMA_OK;
1005 static void
1006 get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
1008 lzma_stream_coder *coder = coder_ptr;
1010 // Lock coder->mutex to prevent finishing threads from moving their
1011 // progress info from the worker_thread structure to lzma_stream_coder.
1012 mythread_sync(coder->mutex) {
1013 *progress_in = coder->progress_in;
1014 *progress_out = coder->progress_out;
1016 for (size_t i = 0; i < coder->threads_initialized; ++i) {
1017 mythread_sync(coder->threads[i].mutex) {
1018 *progress_in += coder->threads[i].progress_in;
1019 *progress_out += coder->threads[i]
1020 .progress_out;
1025 return;
1029 static lzma_ret
1030 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
1031 const lzma_mt *options)
1033 lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
1035 // Get the filter chain.
1036 lzma_options_easy easy;
1037 const lzma_filter *filters;
1038 uint64_t block_size;
1039 uint64_t outbuf_size_max;
1040 return_if_error(get_options(options, &easy, &filters,
1041 &block_size, &outbuf_size_max));
1043 #if SIZE_MAX < UINT64_MAX
1044 if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
1045 return LZMA_MEM_ERROR;
1046 #endif
1048 // Validate the filter chain so that we can give an error in this
1049 // function instead of delaying it to the first call to lzma_code().
1050 // The memory usage calculation verifies the filter chain as
1051 // a side effect so we take advantage of that. It's not a perfect
1052 // check though as raw encoder allows LZMA1 too but such problems
1053 // will be caught eventually with Block Header encoder.
1054 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
1055 return LZMA_OPTIONS_ERROR;
1057 // Validate the Check ID.
1058 if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
1059 return LZMA_PROG_ERROR;
1061 if (!lzma_check_is_supported(options->check))
1062 return LZMA_UNSUPPORTED_CHECK;
1064 // Allocate and initialize the base structure if needed.
1065 lzma_stream_coder *coder = next->coder;
1066 if (coder == NULL) {
1067 coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
1068 if (coder == NULL)
1069 return LZMA_MEM_ERROR;
1071 next->coder = coder;
1073 // For the mutex and condition variable initializations
1074 // the error handling has to be done here because
1075 // stream_encoder_mt_end() doesn't know if they have
1076 // already been initialized or not.
1077 if (mythread_mutex_init(&coder->mutex)) {
1078 lzma_free(coder, allocator);
1079 next->coder = NULL;
1080 return LZMA_MEM_ERROR;
1083 if (mythread_cond_init(&coder->cond)) {
1084 mythread_mutex_destroy(&coder->mutex);
1085 lzma_free(coder, allocator);
1086 next->coder = NULL;
1087 return LZMA_MEM_ERROR;
1090 next->code = &stream_encode_mt;
1091 next->end = &stream_encoder_mt_end;
1092 next->get_progress = &get_progress;
1093 next->update = &stream_encoder_mt_update;
1095 coder->filters[0].id = LZMA_VLI_UNKNOWN;
1096 coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
1097 coder->index_encoder = LZMA_NEXT_CODER_INIT;
1098 coder->index = NULL;
1099 memzero(&coder->outq, sizeof(coder->outq));
1100 coder->threads = NULL;
1101 coder->threads_max = 0;
1102 coder->threads_initialized = 0;
1105 // Basic initializations
1106 coder->sequence = SEQ_STREAM_HEADER;
1107 coder->block_size = (size_t)(block_size);
1108 coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
1109 coder->thread_error = LZMA_OK;
1110 coder->thr = NULL;
1112 // Allocate the thread-specific base structures.
1113 assert(options->threads > 0);
1114 if (coder->threads_max != options->threads) {
1115 threads_end(coder, allocator);
1117 coder->threads = NULL;
1118 coder->threads_max = 0;
1120 coder->threads_initialized = 0;
1121 coder->threads_free = NULL;
1123 coder->threads = lzma_alloc(
1124 options->threads * sizeof(worker_thread),
1125 allocator);
1126 if (coder->threads == NULL)
1127 return LZMA_MEM_ERROR;
1129 coder->threads_max = options->threads;
1130 } else {
1131 // Reuse the old structures and threads. Tell the running
1132 // threads to stop and wait until they have stopped.
1133 threads_stop(coder, true);
1136 // Output queue
1137 return_if_error(lzma_outq_init(&coder->outq, allocator,
1138 options->threads));
1140 // Timeout
1141 coder->timeout = options->timeout;
1143 // Free the old filter chain and the cache.
1144 lzma_filters_free(coder->filters, allocator);
1145 lzma_filters_free(coder->filters_cache, allocator);
1147 // Copy the new filter chain.
1148 return_if_error(lzma_filters_copy(
1149 filters, coder->filters, allocator));
1151 // Index
1152 lzma_index_end(coder->index, allocator);
1153 coder->index = lzma_index_init(allocator);
1154 if (coder->index == NULL)
1155 return LZMA_MEM_ERROR;
1157 // Stream Header
1158 coder->stream_flags.version = 0;
1159 coder->stream_flags.check = options->check;
1160 return_if_error(lzma_stream_header_encode(
1161 &coder->stream_flags, coder->header));
1163 coder->header_pos = 0;
1165 // Progress info
1166 coder->progress_in = 0;
1167 coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1169 return LZMA_OK;
1173 #ifdef HAVE_SYMBOL_VERSIONS_LINUX
1174 // These are for compatibility with binaries linked against liblzma that
1175 // has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7.
1176 // Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2
1177 // but it has been added here anyway since someone might misread the
1178 // RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist.
1179 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha",
1180 lzma_ret, lzma_stream_encoder_mt_512a)(
1181 lzma_stream *strm, const lzma_mt *options)
1182 lzma_nothrow lzma_attr_warn_unused_result
1183 __attribute__((__alias__("lzma_stream_encoder_mt_52")));
1185 LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2",
1186 lzma_ret, lzma_stream_encoder_mt_522)(
1187 lzma_stream *strm, const lzma_mt *options)
1188 lzma_nothrow lzma_attr_warn_unused_result
1189 __attribute__((__alias__("lzma_stream_encoder_mt_52")));
1191 LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2",
1192 lzma_ret, lzma_stream_encoder_mt_52)(
1193 lzma_stream *strm, const lzma_mt *options)
1194 lzma_nothrow lzma_attr_warn_unused_result;
1196 #define lzma_stream_encoder_mt lzma_stream_encoder_mt_52
1197 #endif
1198 extern LZMA_API(lzma_ret)
1199 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1201 lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1203 strm->internal->supported_actions[LZMA_RUN] = true;
1204 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1205 strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1206 strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1207 strm->internal->supported_actions[LZMA_FINISH] = true;
1209 return LZMA_OK;
1213 #ifdef HAVE_SYMBOL_VERSIONS_LINUX
1214 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha",
1215 uint64_t, lzma_stream_encoder_mt_memusage_512a)(
1216 const lzma_mt *options) lzma_nothrow lzma_attr_pure
1217 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1219 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2",
1220 uint64_t, lzma_stream_encoder_mt_memusage_522)(
1221 const lzma_mt *options) lzma_nothrow lzma_attr_pure
1222 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1224 LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2",
1225 uint64_t, lzma_stream_encoder_mt_memusage_52)(
1226 const lzma_mt *options) lzma_nothrow lzma_attr_pure;
1228 #define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52
1229 #endif
1230 // This function name is a monster but it's consistent with the older
1231 // monster names. :-( 31 chars is the max that C99 requires so in that
1232 // sense it's not too long. ;-)
1233 extern LZMA_API(uint64_t)
1234 lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1236 lzma_options_easy easy;
1237 const lzma_filter *filters;
1238 uint64_t block_size;
1239 uint64_t outbuf_size_max;
1241 if (get_options(options, &easy, &filters, &block_size,
1242 &outbuf_size_max) != LZMA_OK)
1243 return UINT64_MAX;
1245 // Memory usage of the input buffers
1246 const uint64_t inbuf_memusage = options->threads * block_size;
1248 // Memory usage of the filter encoders
1249 uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1250 if (filters_memusage == UINT64_MAX)
1251 return UINT64_MAX;
1253 filters_memusage *= options->threads;
1255 // Memory usage of the output queue
1256 const uint64_t outq_memusage = lzma_outq_memusage(
1257 outbuf_size_max, options->threads);
1258 if (outq_memusage == UINT64_MAX)
1259 return UINT64_MAX;
1261 // Sum them with overflow checking.
1262 uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1263 + sizeof(lzma_stream_coder)
1264 + options->threads * sizeof(worker_thread);
1266 if (UINT64_MAX - total_memusage < inbuf_memusage)
1267 return UINT64_MAX;
1269 total_memusage += inbuf_memusage;
1271 if (UINT64_MAX - total_memusage < filters_memusage)
1272 return UINT64_MAX;
1274 total_memusage += filters_memusage;
1276 if (UINT64_MAX - total_memusage < outq_memusage)
1277 return UINT64_MAX;
1279 return total_memusage + outq_memusage;