1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
3 * This file is part of the LibreOffice project.
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 * This file incorporates work covered by the following license notice:
11 * Licensed to the Apache Software Foundation (ASF) under one or more
12 * contributor license agreements. See the NOTICE file distributed
13 * with this work for additional information regarding copyright
14 * ownership. The ASF licenses this file to you under the Apache
15 * License, Version 2.0 (the "License"); you may not use this file
16 * except in compliance with the License. You may obtain a copy of
17 * the License at http://www.apache.org/licenses/LICENSE-2.0 .
20 #include <ThreadedDeflater.hxx>
22 #include <com/sun/star/packages/zip/ZipConstants.hpp>
23 #include <sal/log.hxx>
25 using namespace com::sun::star::packages::zip::ZipConstants
;
26 using namespace com::sun::star
;
30 const sal_Int64 MaxBlockSize
= 128 * 1024;
32 // Parallel ZLIB compression using threads. The class internally splits the data into
33 // blocks and spawns ThreadPool tasks to process them independently. This is achieved
34 // in a similar way how pigz works, see comments from Mark Adler at
35 // https://stackoverflow.com/questions/30294766/how-to-use-multiple-threads-for-zlib-compression
37 // https://stackoverflow.com/questions/30794053/how-to-use-multiple-threads-for-zlib-compression-same-input-source
39 // Everything here should be either read-only, or writing to distinct data, or atomic.
41 class ThreadedDeflater::Task
: public comphelper::ThreadTask
44 ThreadedDeflater
* deflater
;
51 Task(ThreadedDeflater
* deflater_
, int sequence_
, int blockSize_
, bool firstTask_
,
53 : comphelper::ThreadTask(deflater_
->threadTaskTag
)
57 , blockSize(blockSize_
)
58 , firstTask(firstTask_
)
64 virtual void doWork() override
;
67 ThreadedDeflater::ThreadedDeflater(sal_Int32 nSetLevel
)
68 : threadTaskTag(comphelper::ThreadPool::createThreadTaskTag())
71 , zlibLevel(nSetLevel
)
75 ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE
{ clear(); }
77 void ThreadedDeflater::deflateWrite(
78 const css::uno::Reference
<css::io::XInputStream
>& xInStream
,
79 std::function
<void(const css::uno::Sequence
<sal_Int8
>&, sal_Int32
)> aProcessInputFunc
,
80 std::function
<void(const css::uno::Sequence
<sal_Int8
>&, sal_Int32
)> aProcessOutputFunc
)
82 sal_Int64 nThreadCount
= comphelper::ThreadPool::getSharedOptimalPool().getWorkerCount();
83 sal_Int64 batchSize
= MaxBlockSize
* nThreadCount
;
84 inBuffer
.realloc(batchSize
);
85 prevDataBlock
.realloc(MaxBlockSize
);
86 outBuffers
.resize(nThreadCount
);
87 maProcessOutputFunc
= aProcessOutputFunc
;
88 bool firstTask
= true;
90 while (xInStream
->available() > 0)
92 sal_Int64 inputBytes
= xInStream
->readBytes(inBuffer
, batchSize
);
93 aProcessInputFunc(inBuffer
, inputBytes
);
94 totalIn
+= inputBytes
;
96 bool lastBatch
= xInStream
->available() <= 0;
97 sal_Int64 bytesPending
= inputBytes
;
98 while (bytesPending
> 0)
100 sal_Int64 taskSize
= std::min(MaxBlockSize
, bytesPending
);
101 bytesPending
-= taskSize
;
102 bool lastTask
= lastBatch
&& !bytesPending
;
103 comphelper::ThreadPool::getSharedOptimalPool().pushTask(
104 std::make_unique
<Task
>(this, sequence
++, taskSize
, firstTask
, lastTask
));
110 assert(bytesPending
== 0);
112 comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag
);
116 assert(inputBytes
== batchSize
);
117 std::copy_n(std::cbegin(inBuffer
) + (batchSize
- MaxBlockSize
), MaxBlockSize
,
118 prevDataBlock
.getArray());
121 processDeflatedBuffers();
125 void ThreadedDeflater::processDeflatedBuffers()
127 sal_Int64 batchOutputSize
= 0;
128 for (const auto& buffer
: outBuffers
)
129 batchOutputSize
+= buffer
.size();
131 css::uno::Sequence
<sal_Int8
> outBuffer(batchOutputSize
);
133 auto pos
= outBuffer
.getArray();
134 for (auto& buffer
: outBuffers
)
136 pos
= std::copy(buffer
.begin(), buffer
.end(), pos
);
140 maProcessOutputFunc(outBuffer
, batchOutputSize
);
141 totalOut
+= batchOutputSize
;
144 void ThreadedDeflater::clear()
146 inBuffer
= uno::Sequence
<sal_Int8
>();
151 #define deflateInit2 z_deflateInit2
152 #define deflateBound z_deflateBound
153 #define deflateSetDictionary z_deflateSetDictionary
154 #define deflate z_deflate
155 #define deflateEnd z_deflateEnd
158 void ThreadedDeflater::Task::doWork()
160 stream
.zalloc
= nullptr;
161 stream
.zfree
= nullptr;
162 stream
.opaque
= nullptr;
163 // -MAX_WBITS means 32k window size and raw stream
164 if (deflateInit2(&stream
, deflater
->zlibLevel
, Z_DEFLATED
, -MAX_WBITS
, DEF_MEM_LEVEL
,
168 SAL_WARN("package.threadeddeflate", "deflateInit2() failed");
171 // Find out size for our output buffer to be large enough for deflate() needing to be called just once.
172 sal_Int64 outputMaxSize
= deflateBound(&stream
, blockSize
);
173 // add extra size for Z_SYNC_FLUSH
175 deflater
->outBuffers
[sequence
].resize(outputMaxSize
);
176 sal_Int64 myInBufferStart
= sequence
* MaxBlockSize
;
177 // zlib doesn't handle const properly
178 unsigned char* inBufferPtr
= reinterpret_cast<unsigned char*>(
179 const_cast<signed char*>(deflater
->inBuffer
.getConstArray()));
182 // the window size is 32k, so set last 32k of previous data as the dictionary
183 assert(MAX_WBITS
== 15);
184 assert(MaxBlockSize
>= 32768);
187 deflateSetDictionary(&stream
, inBufferPtr
+ myInBufferStart
- 32768, 32768);
191 unsigned char* prevBufferPtr
= reinterpret_cast<unsigned char*>(
192 const_cast<signed char*>(deflater
->prevDataBlock
.getConstArray()));
193 deflateSetDictionary(&stream
, prevBufferPtr
+ MaxBlockSize
- 32768, 32768);
196 stream
.next_in
= inBufferPtr
+ myInBufferStart
;
197 stream
.avail_in
= blockSize
;
198 stream
.next_out
= reinterpret_cast<unsigned char*>(deflater
->outBuffers
[sequence
].data());
199 stream
.avail_out
= outputMaxSize
;
201 // The trick is in using Z_SYNC_FLUSH instead of Z_NO_FLUSH. It will align the data at a byte boundary,
202 // and since we use a raw stream, the data blocks then can be simply concatenated.
203 int res
= deflate(&stream
, lastTask
? Z_FINISH
: Z_SYNC_FLUSH
);
204 assert(stream
.avail_in
== 0); // Check that everything has been deflated.
205 if (lastTask
? res
== Z_STREAM_END
: res
== Z_OK
)
207 sal_Int64 outSize
= outputMaxSize
- stream
.avail_out
;
208 deflater
->outBuffers
[sequence
].resize(outSize
);
212 SAL_WARN("package.threadeddeflate", "deflate() failed");
220 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */