bump product version to 7.6.3.2-android
[LibreOffice.git] / package / source / zipapi / XBufferedThreadedStream.cxx
blobd3bf995d907329dea8714fac34d5d4de89c3bca6
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3 * This file is part of the LibreOffice project.
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8 */
10 #include "XBufferedThreadedStream.hxx"
12 using namespace css::uno;
14 namespace {
16 class UnzippingThread: public salhelper::Thread
18 XBufferedThreadedStream &mxStream;
19 public:
20 explicit UnzippingThread(XBufferedThreadedStream &xStream): Thread("Unzipping"), mxStream(xStream) {}
21 private:
22 virtual void execute() override
24 try
26 mxStream.produce();
28 catch (...)
30 mxStream.saveException(std::current_exception());
33 mxStream.setTerminateThread();
39 XBufferedThreadedStream::XBufferedThreadedStream(
40 const Reference<XInputStream>& xSrcStream,
41 sal_Int64 nStreamSize)
42 : mxSrcStream( xSrcStream )
43 , mnPos(0)
44 , mnStreamSize( nStreamSize )
45 , mnOffset( 0 )
46 , mxUnzippingThread( new UnzippingThread(*this) )
47 , mbTerminateThread( false )
49 mxUnzippingThread->launch();
52 XBufferedThreadedStream::~XBufferedThreadedStream()
54 setTerminateThread();
55 mxUnzippingThread->join();
58 /**
59 * Reads from UnbufferedStream in a separate thread and stores the buffer blocks
60 * in maPendingBuffers queue for further use.
62 void XBufferedThreadedStream::produce()
64 Buffer pProducedBuffer;
65 sal_Int64 nTotalBytesRead(0);
66 std::unique_lock<std::mutex> aGuard( maBufferProtector );
69 if( !maUsedBuffers.empty() )
71 pProducedBuffer = maUsedBuffers.front();
72 maUsedBuffers.pop();
75 aGuard.unlock();
76 nTotalBytesRead += mxSrcStream->readBytes( pProducedBuffer, nBufferSize );
78 aGuard.lock();
79 maPendingBuffers.push( pProducedBuffer );
80 maBufferConsumeResume.notify_one();
82 if (!mbTerminateThread)
83 maBufferProduceResume.wait( aGuard, [&]{return canProduce(); } );
85 } while( !mbTerminateThread && nTotalBytesRead < mnStreamSize );
88 /**
89 * Fetches next available block from maPendingBuffers for use in Reading thread.
91 const Buffer& XBufferedThreadedStream::getNextBlock()
93 std::unique_lock<std::mutex> aGuard( maBufferProtector );
94 const sal_Int32 nBufSize = maInUseBuffer.getLength();
95 if( nBufSize <= 0 || mnOffset >= nBufSize )
97 if( mnOffset >= nBufSize )
98 maUsedBuffers.push( maInUseBuffer );
100 maBufferConsumeResume.wait( aGuard, [&]{return canConsume(); } );
102 if( maPendingBuffers.empty() )
104 maInUseBuffer = Buffer();
105 if (maSavedException)
106 std::rethrow_exception(maSavedException);
108 else
110 maInUseBuffer = maPendingBuffers.front();
111 maPendingBuffers.pop();
112 mnOffset = 0;
114 if( maPendingBuffers.size() <= nBufferLowWater )
115 maBufferProduceResume.notify_one();
119 return maInUseBuffer;
122 void XBufferedThreadedStream::setTerminateThread()
124 std::scoped_lock<std::mutex> aGuard( maBufferProtector );
125 mbTerminateThread = true;
126 maBufferProduceResume.notify_one();
127 maBufferConsumeResume.notify_one();
130 sal_Int32 SAL_CALL XBufferedThreadedStream::readBytes( Sequence< sal_Int8 >& rData, sal_Int32 nBytesToRead )
132 if( !hasBytes() )
133 return 0;
135 const sal_Int32 nAvailableSize = static_cast< sal_Int32 > ( std::min< sal_Int64 >( nBytesToRead, remainingSize() ) );
136 rData.realloc( nAvailableSize );
137 auto pData = rData.getArray();
138 sal_Int32 i = 0, nPendingBytes = nAvailableSize;
140 while( nPendingBytes )
142 const Buffer &pBuffer = getNextBlock();
143 if( !pBuffer.hasElements() )
145 rData.realloc( nAvailableSize - nPendingBytes );
146 return nAvailableSize - nPendingBytes;
148 const sal_Int32 limit = std::min<sal_Int32>( nPendingBytes, pBuffer.getLength() - mnOffset );
150 memcpy( &pData[i], &pBuffer[mnOffset], limit );
152 nPendingBytes -= limit;
153 mnOffset += limit;
154 mnPos += limit;
155 i += limit;
158 return nAvailableSize;
161 sal_Int32 SAL_CALL XBufferedThreadedStream::readSomeBytes( Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead )
163 return readBytes( aData, nMaxBytesToRead );
165 void SAL_CALL XBufferedThreadedStream::skipBytes( sal_Int32 nBytesToSkip )
167 if( nBytesToSkip )
169 Sequence < sal_Int8 > aSequence( nBytesToSkip );
170 readBytes( aSequence, nBytesToSkip );
174 sal_Int32 SAL_CALL XBufferedThreadedStream::available()
176 if( !hasBytes() )
177 return 0;
179 return static_cast< sal_Int32 > ( std::min< sal_Int64 >( SAL_MAX_INT32, remainingSize() ) );
182 void SAL_CALL XBufferedThreadedStream::closeInput()
184 setTerminateThread();
185 mxUnzippingThread->join();
186 mxSrcStream->closeInput();
189 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */