Version 6.1.4.1, tag libreoffice-6.1.4.1
[LibreOffice.git] / package / source / zipapi / XBufferedThreadedStream.cxx
blob2c36b73ea4094fa97f3256450d011ad790d6bba0
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3 * This file is part of the LibreOffice project.
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 */
10 #include "XBufferedThreadedStream.hxx"
11 #include <com/sun/star/packages/zip/ZipIOException.hpp>
12 #include <cppuhelper/exc_hlp.hxx>
14 using namespace css::uno;
15 using com::sun::star::packages::zip::ZipIOException;
17 namespace {
19 class UnzippingThread: public salhelper::Thread
21 XBufferedThreadedStream &mxStream;
22 public:
23 explicit UnzippingThread(XBufferedThreadedStream &xStream): Thread("Unzipping"), mxStream(xStream) {}
24 private:
25 virtual void execute() override
27 try
29 mxStream.produce();
31 catch (const css::uno::Exception &e)
33 SAL_WARN("package", "Unexpected " << e );
34 mxStream.saveException(cppu::getCaughtException());
37 mxStream.setTerminateThread();
43 XBufferedThreadedStream::XBufferedThreadedStream(
44 const Reference<XInputStream>& xSrcStream,
45 sal_Int64 nStreamSize)
46 : mxSrcStream( xSrcStream )
47 , mnPos(0)
48 , mnStreamSize( nStreamSize )
49 , mnOffset( 0 )
50 , mxUnzippingThread( new UnzippingThread(*this) )
51 , mbTerminateThread( false )
53 mxUnzippingThread->launch();
56 XBufferedThreadedStream::~XBufferedThreadedStream()
58 setTerminateThread();
59 mxUnzippingThread->join();
62 /**
63 * Reads from UnbufferedStream in a separate thread and stores the buffer blocks
64 * in maPendingBuffers queue for further use.
66 void XBufferedThreadedStream::produce()
68 Buffer pProducedBuffer;
69 sal_Int64 nTotalBytesRead(0);
70 std::unique_lock<std::mutex> aGuard( maBufferProtector );
73 if( !maUsedBuffers.empty() )
75 pProducedBuffer = maUsedBuffers.front();
76 maUsedBuffers.pop();
79 aGuard.unlock();
80 nTotalBytesRead += mxSrcStream->readBytes( pProducedBuffer, nBufferSize );
82 aGuard.lock();
83 maPendingBuffers.push( pProducedBuffer );
84 maBufferConsumeResume.notify_one();
86 if (!mbTerminateThread)
87 maBufferProduceResume.wait( aGuard, [&]{return canProduce(); } );
89 } while( !mbTerminateThread && nTotalBytesRead < mnStreamSize );
92 /**
93 * Fetches next available block from maPendingBuffers for use in Reading thread.
95 const Buffer& XBufferedThreadedStream::getNextBlock()
97 const sal_Int32 nBufSize = maInUseBuffer.getLength();
98 if( nBufSize <= 0 || mnOffset >= nBufSize )
100 std::unique_lock<std::mutex> aGuard( maBufferProtector );
101 if( mnOffset >= nBufSize )
102 maUsedBuffers.push( maInUseBuffer );
104 maBufferConsumeResume.wait( aGuard, [&]{return canConsume(); } );
106 if( maPendingBuffers.empty() )
108 maInUseBuffer = Buffer();
109 if (maSavedException.hasValue())
110 cppu::throwException(maSavedException);
112 else
114 maInUseBuffer = maPendingBuffers.front();
115 maPendingBuffers.pop();
116 mnOffset = 0;
118 if( maPendingBuffers.size() <= nBufferLowWater )
119 maBufferProduceResume.notify_one();
123 return maInUseBuffer;
126 void XBufferedThreadedStream::setTerminateThread()
128 std::unique_lock<std::mutex> aGuard( maBufferProtector );
129 mbTerminateThread = true;
130 maBufferProduceResume.notify_one();
131 maBufferConsumeResume.notify_one();
134 sal_Int32 SAL_CALL XBufferedThreadedStream::readBytes( Sequence< sal_Int8 >& rData, sal_Int32 nBytesToRead )
136 if( !hasBytes() )
137 return 0;
139 const sal_Int32 nAvailableSize = static_cast< sal_Int32 > ( std::min< sal_Int64 >( nBytesToRead, remainingSize() ) );
140 rData.realloc( nAvailableSize );
141 sal_Int32 i = 0, nPendingBytes = nAvailableSize;
143 while( nPendingBytes )
145 const Buffer &pBuffer = getNextBlock();
146 if( pBuffer.getLength() <= 0 )
148 rData.realloc( nAvailableSize - nPendingBytes );
149 return nAvailableSize - nPendingBytes;
151 const sal_Int32 limit = std::min<sal_Int32>( nPendingBytes, pBuffer.getLength() - mnOffset );
153 memcpy( &rData[i], &pBuffer[mnOffset], limit );
155 nPendingBytes -= limit;
156 mnOffset += limit;
157 mnPos += limit;
158 i += limit;
161 return nAvailableSize;
164 sal_Int32 SAL_CALL XBufferedThreadedStream::readSomeBytes( Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead )
166 return readBytes( aData, nMaxBytesToRead );
168 void SAL_CALL XBufferedThreadedStream::skipBytes( sal_Int32 nBytesToSkip )
170 if( nBytesToSkip )
172 Sequence < sal_Int8 > aSequence( nBytesToSkip );
173 readBytes( aSequence, nBytesToSkip );
177 sal_Int32 SAL_CALL XBufferedThreadedStream::available()
179 if( !hasBytes() )
180 return 0;
182 return static_cast< sal_Int32 > ( std::min< sal_Int64 >( SAL_MAX_INT32, remainingSize() ) );
185 void SAL_CALL XBufferedThreadedStream::closeInput()
187 setTerminateThread();
188 mxUnzippingThread->join();
189 mxSrcStream->closeInput();
192 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */