Update git submodules
[LibreOffice.git] / io / source / stm / opipe.cxx
blob22e6218f3ef980aefe604ddbee4cc026900093d5
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /*
3 * This file is part of the LibreOffice project.
5 * This Source Code Form is subject to the terms of the Mozilla Public
6 * License, v. 2.0. If a copy of the MPL was not distributed with this
7 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 * This file incorporates work covered by the following license notice:
11 * Licensed to the Apache Software Foundation (ASF) under one or more
12 * contributor license agreements. See the NOTICE file distributed
13 * with this work for additional information regarding copyright
14 * ownership. The ASF licenses this file to you under the Apache
15 * License, Version 2.0 (the "License"); you may not use this file
16 * except in compliance with the License. You may obtain a copy of
17 * the License at http://www.apache.org/licenses/LICENSE-2.0 .
20 #include <sal/config.h>
22 #include <com/sun/star/io/BufferSizeExceededException.hpp>
23 #include <com/sun/star/io/NotConnectedException.hpp>
24 #include <com/sun/star/io/XPipe.hpp>
25 #include <com/sun/star/io/XConnectable.hpp>
27 #include <com/sun/star/lang/XServiceInfo.hpp>
29 #include <cppuhelper/implbase.hxx>
30 #include <cppuhelper/supportsservice.hxx>
32 #include <osl/conditn.hxx>
33 #include <osl/mutex.hxx>
35 #include <limits>
36 #include <memory>
37 #include <optional>
38 #include <string.h>
40 using namespace ::osl;
41 using namespace ::cppu;
42 using namespace ::com::sun::star::uno;
43 using namespace ::com::sun::star::io;
44 using namespace ::com::sun::star::lang;
46 #include "streamhelper.hxx"
48 namespace com::sun::star::uno { class XComponentContext; }
50 namespace io_stm{
52 namespace {
54 class OPipeImpl :
55 public WeakImplHelper< XPipe , XConnectable , XServiceInfo >
57 public:
58 OPipeImpl( );
60 public: // XInputStream
61 virtual sal_Int32 SAL_CALL readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead) override;
62 virtual sal_Int32 SAL_CALL readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead) override;
63 virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip) override;
64 virtual sal_Int32 SAL_CALL available() override;
65 virtual void SAL_CALL closeInput() override;
67 public: // XOutputStream
69 virtual void SAL_CALL writeBytes(const Sequence< sal_Int8 >& aData) override;
70 virtual void SAL_CALL flush() override;
71 virtual void SAL_CALL closeOutput() override;
73 public: // XConnectable
74 virtual void SAL_CALL setPredecessor(const Reference< XConnectable >& aPredecessor) override;
75 virtual Reference< XConnectable > SAL_CALL getPredecessor() override;
76 virtual void SAL_CALL setSuccessor(const Reference < XConnectable > & aSuccessor) override;
77 virtual Reference < XConnectable > SAL_CALL getSuccessor() override ;
80 public: // XServiceInfo
81 OUString SAL_CALL getImplementationName() override;
82 Sequence< OUString > SAL_CALL getSupportedServiceNames() override;
83 sal_Bool SAL_CALL supportsService(const OUString& ServiceName) override;
85 private:
87 Reference < XConnectable > m_succ;
88 Reference < XConnectable > m_pred;
90 sal_Int32 m_nBytesToSkip;
92 bool m_bOutputStreamClosed;
93 bool m_bInputStreamClosed;
95 osl::Condition m_conditionBytesAvail;
96 Mutex m_mutexAccess;
97 std::optional<MemFIFO> m_oFIFO;
102 OPipeImpl::OPipeImpl()
103 : m_nBytesToSkip(0 )
104 , m_bOutputStreamClosed(false )
105 , m_bInputStreamClosed( false )
106 , m_oFIFO( std::in_place )
112 sal_Int32 OPipeImpl::readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead)
114 while( true )
116 { // start guarded section
117 MutexGuard guard( m_mutexAccess );
118 if( m_bInputStreamClosed )
120 throw NotConnectedException(
121 "Pipe::readBytes NotConnectedException",
122 *this );
124 sal_Int32 nOccupiedBufferLen = m_oFIFO->getSize();
126 if( m_bOutputStreamClosed && nBytesToRead > nOccupiedBufferLen )
128 nBytesToRead = nOccupiedBufferLen;
131 if( nOccupiedBufferLen < nBytesToRead )
133 // wait outside guarded section
134 m_conditionBytesAvail.reset();
136 else {
137 // necessary bytes are available
138 m_oFIFO->read( aData , nBytesToRead );
139 return nBytesToRead;
141 } // end guarded section
143 // wait for new data outside guarded section!
144 m_conditionBytesAvail.wait();
149 sal_Int32 OPipeImpl::readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead)
151 while( true ) {
153 MutexGuard guard( m_mutexAccess );
154 if( m_bInputStreamClosed )
156 throw NotConnectedException(
157 "Pipe::readSomeBytes NotConnectedException",
158 *this );
160 if( m_oFIFO->getSize() )
162 sal_Int32 nSize = std::min( nMaxBytesToRead , m_oFIFO->getSize() );
163 aData.realloc( nSize );
164 m_oFIFO->read( aData , nSize );
165 return nSize;
168 if( m_bOutputStreamClosed )
170 // no bytes in buffer anymore
171 return 0;
175 m_conditionBytesAvail.wait();
180 void OPipeImpl::skipBytes(sal_Int32 nBytesToSkip)
182 MutexGuard guard( m_mutexAccess );
183 if( m_bInputStreamClosed )
185 throw NotConnectedException(
186 "Pipe::skipBytes NotConnectedException",
187 *this );
190 if( nBytesToSkip < 0
191 || (nBytesToSkip
192 > std::numeric_limits< sal_Int32 >::max() - m_nBytesToSkip) )
194 throw BufferSizeExceededException(
195 "Pipe::skipBytes BufferSizeExceededException",
196 *this );
198 m_nBytesToSkip += nBytesToSkip;
200 nBytesToSkip = std::min( m_oFIFO->getSize() , m_nBytesToSkip );
201 m_oFIFO->skip( nBytesToSkip );
202 m_nBytesToSkip -= nBytesToSkip;
206 sal_Int32 OPipeImpl::available()
208 MutexGuard guard( m_mutexAccess );
209 if( m_bInputStreamClosed )
211 throw NotConnectedException(
212 "Pipe::available NotConnectedException",
213 *this );
215 return m_oFIFO->getSize();
218 void OPipeImpl::closeInput()
220 MutexGuard guard( m_mutexAccess );
222 m_bInputStreamClosed = true;
224 m_oFIFO.reset();
226 // readBytes may throw an exception
227 m_conditionBytesAvail.set();
229 setSuccessor( Reference< XConnectable > () );
233 void OPipeImpl::writeBytes(const Sequence< sal_Int8 >& aData)
235 MutexGuard guard( m_mutexAccess );
237 if( m_bOutputStreamClosed )
239 throw NotConnectedException(
240 "Pipe::writeBytes NotConnectedException (outputstream)",
241 *this );
244 if( m_bInputStreamClosed )
246 throw NotConnectedException(
247 "Pipe::writeBytes NotConnectedException (inputstream)",
248 *this );
251 // check skipping
252 sal_Int32 nLen = aData.getLength();
253 if( m_nBytesToSkip && m_nBytesToSkip >= nLen ) {
254 // all must be skipped - forget whole call
255 m_nBytesToSkip -= nLen;
256 return;
259 // adjust buffersize if necessary
260 if( m_nBytesToSkip )
262 Sequence< sal_Int8 > seqCopy( nLen - m_nBytesToSkip );
263 memcpy( seqCopy.getArray() , &( aData.getConstArray()[m_nBytesToSkip] ) , nLen-m_nBytesToSkip );
264 m_oFIFO->write( seqCopy );
266 else
268 m_oFIFO->write( aData );
270 m_nBytesToSkip = 0;
272 // readBytes may check again if enough bytes are available
273 m_conditionBytesAvail.set();
277 void OPipeImpl::flush()
279 // nothing to do for a pipe
282 void OPipeImpl::closeOutput()
284 MutexGuard guard( m_mutexAccess );
286 m_bOutputStreamClosed = true;
287 m_conditionBytesAvail.set();
288 setPredecessor( Reference < XConnectable > () );
292 void OPipeImpl::setSuccessor( const Reference < XConnectable > &r )
294 /// if the references match, nothing needs to be done
295 if( m_succ != r ) {
296 /// store the reference for later use
297 m_succ = r;
299 if( m_succ.is() )
301 m_succ->setPredecessor(
302 Reference< XConnectable > ( static_cast< XConnectable * >(this) ) );
307 Reference < XConnectable > OPipeImpl::getSuccessor()
309 return m_succ;
313 // XDataSource
314 void OPipeImpl::setPredecessor( const Reference < XConnectable > &r )
316 if( r != m_pred ) {
317 m_pred = r;
318 if( m_pred.is() ) {
319 m_pred->setSuccessor(
320 Reference < XConnectable > ( static_cast< XConnectable * >(this) ) );
325 Reference < XConnectable > OPipeImpl::getPredecessor()
327 return m_pred;
331 // XServiceInfo
332 OUString OPipeImpl::getImplementationName()
334 return "com.sun.star.comp.io.stm.Pipe";
337 // XServiceInfo
338 sal_Bool OPipeImpl::supportsService(const OUString& ServiceName)
340 return cppu::supportsService(this, ServiceName);
343 // XServiceInfo
344 Sequence< OUString > OPipeImpl::getSupportedServiceNames()
346 return { "com.sun.star.io.Pipe" };
351 extern "C" SAL_DLLPUBLIC_EXPORT css::uno::XInterface*
352 io_OPipeImpl_get_implementation(
353 css::uno::XComponentContext* , css::uno::Sequence<css::uno::Any> const&)
355 return cppu::acquire(new io_stm::OPipeImpl());
358 /* vim:set shiftwidth=4 softtabstop=4 expandtab: */