1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
3 * This file is part of the LibreOffice project.
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
10 #include "XBufferedThreadedStream.hxx"
11 #include <com/sun/star/packages/zip/ZipIOException.hpp>
12 #include <cppuhelper/exc_hlp.hxx>
14 using namespace css::uno
;
15 using com::sun::star::packages::zip::ZipIOException
;
19 class UnzippingThread
: public salhelper::Thread
21 XBufferedThreadedStream
&mxStream
;
23 explicit UnzippingThread(XBufferedThreadedStream
&xStream
): Thread("Unzipping"), mxStream(xStream
) {}
25 virtual void execute() override
31 catch (const css::uno::Exception
&e
)
33 SAL_WARN("package", "Unexpected " << e
);
34 mxStream
.saveException(cppu::getCaughtException());
37 mxStream
.setTerminateThread();
43 XBufferedThreadedStream::XBufferedThreadedStream(
44 const Reference
<XInputStream
>& xSrcStream
,
45 sal_Int64 nStreamSize
)
46 : mxSrcStream( xSrcStream
)
48 , mnStreamSize( nStreamSize
)
50 , mxUnzippingThread( new UnzippingThread(*this) )
51 , mbTerminateThread( false )
53 mxUnzippingThread
->launch();
56 XBufferedThreadedStream::~XBufferedThreadedStream()
59 mxUnzippingThread
->join();
63 * Reads from UnbufferedStream in a separate thread and stores the buffer blocks
64 * in maPendingBuffers queue for further use.
66 void XBufferedThreadedStream::produce()
68 Buffer pProducedBuffer
;
69 sal_Int64
nTotalBytesRead(0);
70 std::unique_lock
<std::mutex
> aGuard( maBufferProtector
);
73 if( !maUsedBuffers
.empty() )
75 pProducedBuffer
= maUsedBuffers
.front();
80 nTotalBytesRead
+= mxSrcStream
->readBytes( pProducedBuffer
, nBufferSize
);
83 maPendingBuffers
.push( pProducedBuffer
);
84 maBufferConsumeResume
.notify_one();
86 if (!mbTerminateThread
)
87 maBufferProduceResume
.wait( aGuard
, [&]{return canProduce(); } );
89 } while( !mbTerminateThread
&& nTotalBytesRead
< mnStreamSize
);
93 * Fetches next available block from maPendingBuffers for use in Reading thread.
95 const Buffer
& XBufferedThreadedStream::getNextBlock()
97 const sal_Int32 nBufSize
= maInUseBuffer
.getLength();
98 if( nBufSize
<= 0 || mnOffset
>= nBufSize
)
100 std::unique_lock
<std::mutex
> aGuard( maBufferProtector
);
101 if( mnOffset
>= nBufSize
)
102 maUsedBuffers
.push( maInUseBuffer
);
104 maBufferConsumeResume
.wait( aGuard
, [&]{return canConsume(); } );
106 if( maPendingBuffers
.empty() )
108 maInUseBuffer
= Buffer();
109 if (maSavedException
.hasValue())
110 cppu::throwException(maSavedException
);
114 maInUseBuffer
= maPendingBuffers
.front();
115 maPendingBuffers
.pop();
118 if( maPendingBuffers
.size() <= nBufferLowWater
)
119 maBufferProduceResume
.notify_one();
123 return maInUseBuffer
;
126 void XBufferedThreadedStream::setTerminateThread()
128 std::unique_lock
<std::mutex
> aGuard( maBufferProtector
);
129 mbTerminateThread
= true;
130 maBufferProduceResume
.notify_one();
131 maBufferConsumeResume
.notify_one();
134 sal_Int32 SAL_CALL
XBufferedThreadedStream::readBytes( Sequence
< sal_Int8
>& rData
, sal_Int32 nBytesToRead
)
139 const sal_Int32 nAvailableSize
= static_cast< sal_Int32
> ( std::min
< sal_Int64
>( nBytesToRead
, remainingSize() ) );
140 rData
.realloc( nAvailableSize
);
141 sal_Int32 i
= 0, nPendingBytes
= nAvailableSize
;
143 while( nPendingBytes
)
145 const Buffer
&pBuffer
= getNextBlock();
146 if( pBuffer
.getLength() <= 0 )
148 rData
.realloc( nAvailableSize
- nPendingBytes
);
149 return nAvailableSize
- nPendingBytes
;
151 const sal_Int32 limit
= std::min
<sal_Int32
>( nPendingBytes
, pBuffer
.getLength() - mnOffset
);
153 memcpy( &rData
[i
], &pBuffer
[mnOffset
], limit
);
155 nPendingBytes
-= limit
;
161 return nAvailableSize
;
164 sal_Int32 SAL_CALL
XBufferedThreadedStream::readSomeBytes( Sequence
< sal_Int8
>& aData
, sal_Int32 nMaxBytesToRead
)
166 return readBytes( aData
, nMaxBytesToRead
);
168 void SAL_CALL
XBufferedThreadedStream::skipBytes( sal_Int32 nBytesToSkip
)
172 Sequence
< sal_Int8
> aSequence( nBytesToSkip
);
173 readBytes( aSequence
, nBytesToSkip
);
177 sal_Int32 SAL_CALL
XBufferedThreadedStream::available()
182 return static_cast< sal_Int32
> ( std::min
< sal_Int64
>( SAL_MAX_INT32
, remainingSize() ) );
185 void SAL_CALL
XBufferedThreadedStream::closeInput()
187 setTerminateThread();
188 mxUnzippingThread
->join();
189 mxSrcStream
->closeInput();
192 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */