1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
3 * This file is part of the LibreOffice project.
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
10 #include "XBufferedThreadedStream.hxx"
12 using namespace css::uno
;
16 class UnzippingThread
: public salhelper::Thread
18 XBufferedThreadedStream
&mxStream
;
20 explicit UnzippingThread(XBufferedThreadedStream
&xStream
): Thread("Unzipping"), mxStream(xStream
) {}
22 virtual void execute() override
30 mxStream
.saveException(std::current_exception());
33 mxStream
.setTerminateThread();
39 XBufferedThreadedStream::XBufferedThreadedStream(
40 const Reference
<XInputStream
>& xSrcStream
,
41 sal_Int64 nStreamSize
)
42 : mxSrcStream( xSrcStream
)
44 , mnStreamSize( nStreamSize
)
46 , mxUnzippingThread( new UnzippingThread(*this) )
47 , mbTerminateThread( false )
49 mxUnzippingThread
->launch();
52 XBufferedThreadedStream::~XBufferedThreadedStream()
55 mxUnzippingThread
->join();
59 * Reads from UnbufferedStream in a separate thread and stores the buffer blocks
60 * in maPendingBuffers queue for further use.
62 void XBufferedThreadedStream::produce()
64 Buffer pProducedBuffer
;
65 sal_Int64
nTotalBytesRead(0);
66 std::unique_lock
<std::mutex
> aGuard( maBufferProtector
);
69 if( !maUsedBuffers
.empty() )
71 pProducedBuffer
= maUsedBuffers
.front();
76 nTotalBytesRead
+= mxSrcStream
->readBytes( pProducedBuffer
, nBufferSize
);
79 maPendingBuffers
.push( pProducedBuffer
);
80 maBufferConsumeResume
.notify_one();
82 if (!mbTerminateThread
)
83 maBufferProduceResume
.wait( aGuard
, [&]{return canProduce(); } );
85 } while( !mbTerminateThread
&& nTotalBytesRead
< mnStreamSize
);
89 * Fetches next available block from maPendingBuffers for use in Reading thread.
91 const Buffer
& XBufferedThreadedStream::getNextBlock()
93 std::unique_lock
<std::mutex
> aGuard( maBufferProtector
);
94 const sal_Int32 nBufSize
= maInUseBuffer
.getLength();
95 if( nBufSize
<= 0 || mnOffset
>= nBufSize
)
97 if( mnOffset
>= nBufSize
)
98 maUsedBuffers
.push( maInUseBuffer
);
100 maBufferConsumeResume
.wait( aGuard
, [&]{return canConsume(); } );
102 if( maPendingBuffers
.empty() )
104 maInUseBuffer
= Buffer();
105 if (maSavedException
)
106 std::rethrow_exception(maSavedException
);
110 maInUseBuffer
= maPendingBuffers
.front();
111 maPendingBuffers
.pop();
114 if( maPendingBuffers
.size() <= nBufferLowWater
)
115 maBufferProduceResume
.notify_one();
119 return maInUseBuffer
;
122 void XBufferedThreadedStream::setTerminateThread()
124 std::scoped_lock
<std::mutex
> aGuard( maBufferProtector
);
125 mbTerminateThread
= true;
126 maBufferProduceResume
.notify_one();
127 maBufferConsumeResume
.notify_one();
130 sal_Int32 SAL_CALL
XBufferedThreadedStream::readBytes( Sequence
< sal_Int8
>& rData
, sal_Int32 nBytesToRead
)
135 const sal_Int32 nAvailableSize
= static_cast< sal_Int32
> ( std::min
< sal_Int64
>( nBytesToRead
, remainingSize() ) );
136 rData
.realloc( nAvailableSize
);
137 auto pData
= rData
.getArray();
138 sal_Int32 i
= 0, nPendingBytes
= nAvailableSize
;
140 while( nPendingBytes
)
142 const Buffer
&pBuffer
= getNextBlock();
143 if( !pBuffer
.hasElements() )
145 rData
.realloc( nAvailableSize
- nPendingBytes
);
146 return nAvailableSize
- nPendingBytes
;
148 const sal_Int32 limit
= std::min
<sal_Int32
>( nPendingBytes
, pBuffer
.getLength() - mnOffset
);
150 memcpy( &pData
[i
], &pBuffer
[mnOffset
], limit
);
152 nPendingBytes
-= limit
;
158 return nAvailableSize
;
161 sal_Int32 SAL_CALL
XBufferedThreadedStream::readSomeBytes( Sequence
< sal_Int8
>& aData
, sal_Int32 nMaxBytesToRead
)
163 return readBytes( aData
, nMaxBytesToRead
);
165 void SAL_CALL
XBufferedThreadedStream::skipBytes( sal_Int32 nBytesToSkip
)
169 Sequence
< sal_Int8
> aSequence( nBytesToSkip
);
170 readBytes( aSequence
, nBytesToSkip
);
174 sal_Int32 SAL_CALL
XBufferedThreadedStream::available()
179 return static_cast< sal_Int32
> ( std::min
< sal_Int64
>( SAL_MAX_INT32
, remainingSize() ) );
182 void SAL_CALL
XBufferedThreadedStream::closeInput()
184 setTerminateThread();
185 mxUnzippingThread
->join();
186 mxSrcStream
->closeInput();
189 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */