1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 * The multiplex stream concatenates a list of input streams into a single
12 #include "mozilla/Attributes.h"
13 #include "mozilla/MathAlgorithms.h"
15 #include "base/basictypes.h"
17 #include "nsMultiplexInputStream.h"
18 #include "nsIMultiplexInputStream.h"
19 #include "nsISeekableStream.h"
21 #include "nsCOMArray.h"
22 #include "nsIClassInfoImpl.h"
23 #include "nsIIPCSerializableInputStream.h"
24 #include "mozilla/ipc/InputStreamUtils.h"
26 using namespace mozilla::ipc
;
28 using mozilla::DeprecatedAbs
;
30 class nsMultiplexInputStream final
31 : public nsIMultiplexInputStream
32 , public nsISeekableStream
33 , public nsIIPCSerializableInputStream
36 nsMultiplexInputStream();
38 NS_DECL_THREADSAFE_ISUPPORTS
39 NS_DECL_NSIINPUTSTREAM
40 NS_DECL_NSIMULTIPLEXINPUTSTREAM
41 NS_DECL_NSISEEKABLESTREAM
42 NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
45 ~nsMultiplexInputStream()
49 struct MOZ_STACK_CLASS ReadSegmentsState
51 nsCOMPtr
<nsIInputStream
> mThisStream
;
53 nsWriteSegmentFun mWriter
;
58 static NS_METHOD
ReadSegCb(nsIInputStream
* aIn
, void* aClosure
,
59 const char* aFromRawSegment
, uint32_t aToOffset
,
60 uint32_t aCount
, uint32_t* aWriteCount
);
62 nsTArray
<nsCOMPtr
<nsIInputStream
>> mStreams
;
63 uint32_t mCurrentStream
;
64 bool mStartedReadingCurrent
;
68 NS_IMPL_ADDREF(nsMultiplexInputStream
)
69 NS_IMPL_RELEASE(nsMultiplexInputStream
)
71 NS_IMPL_CLASSINFO(nsMultiplexInputStream
, nullptr, nsIClassInfo::THREADSAFE
,
72 NS_MULTIPLEXINPUTSTREAM_CID
)
74 NS_IMPL_QUERY_INTERFACE_CI(nsMultiplexInputStream
,
75 nsIMultiplexInputStream
,
78 nsIIPCSerializableInputStream
)
79 NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream
,
80 nsIMultiplexInputStream
,
85 AvailableMaybeSeek(nsIInputStream
* aStream
, uint64_t* aResult
)
87 nsresult rv
= aStream
->Available(aResult
);
88 if (rv
== NS_BASE_STREAM_CLOSED
) {
89 // Blindly seek to the current position if Available() returns
90 // NS_BASE_STREAM_CLOSED.
91 // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
92 // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
93 nsCOMPtr
<nsISeekableStream
> seekable
= do_QueryInterface(aStream
);
95 nsresult rv
= seekable
->Seek(nsISeekableStream::NS_SEEK_CUR
, 0);
96 if (NS_SUCCEEDED(rv
)) {
97 rv
= aStream
->Available(aResult
);
105 TellMaybeSeek(nsISeekableStream
* aSeekable
, int64_t* aResult
)
107 nsresult rv
= aSeekable
->Tell(aResult
);
108 if (rv
== NS_BASE_STREAM_CLOSED
) {
109 // Blindly seek to the current position if Tell() returns
110 // NS_BASE_STREAM_CLOSED.
111 // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
112 // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
113 nsresult rv
= aSeekable
->Seek(nsISeekableStream::NS_SEEK_CUR
, 0);
114 if (NS_SUCCEEDED(rv
)) {
115 rv
= aSeekable
->Tell(aResult
);
121 nsMultiplexInputStream::nsMultiplexInputStream()
123 mStartedReadingCurrent(false),
128 /* readonly attribute unsigned long count; */
130 nsMultiplexInputStream::GetCount(uint32_t* aCount
)
132 *aCount
= mStreams
.Length();
138 SeekableStreamAtBeginning(nsIInputStream
* aStream
)
141 nsCOMPtr
<nsISeekableStream
> stream
= do_QueryInterface(aStream
);
142 if (stream
&& NS_SUCCEEDED(stream
->Tell(&streamPos
)) && streamPos
!= 0) {
149 /* void appendStream (in nsIInputStream stream); */
151 nsMultiplexInputStream::AppendStream(nsIInputStream
* aStream
)
153 NS_ASSERTION(SeekableStreamAtBeginning(aStream
),
154 "Appended stream not at beginning.");
155 return mStreams
.AppendElement(aStream
) ? NS_OK
: NS_ERROR_OUT_OF_MEMORY
;
158 /* void insertStream (in nsIInputStream stream, in unsigned long index); */
160 nsMultiplexInputStream::InsertStream(nsIInputStream
* aStream
, uint32_t aIndex
)
162 NS_ASSERTION(SeekableStreamAtBeginning(aStream
),
163 "Inserted stream not at beginning.");
164 mStreams
.InsertElementAt(aIndex
, aStream
);
165 if (mCurrentStream
> aIndex
||
166 (mCurrentStream
== aIndex
&& mStartedReadingCurrent
)) {
172 /* void removeStream (in unsigned long index); */
174 nsMultiplexInputStream::RemoveStream(uint32_t aIndex
)
176 mStreams
.RemoveElementAt(aIndex
);
177 if (mCurrentStream
> aIndex
) {
179 } else if (mCurrentStream
== aIndex
) {
180 mStartedReadingCurrent
= false;
186 /* nsIInputStream getStream (in unsigned long index); */
188 nsMultiplexInputStream::GetStream(uint32_t aIndex
, nsIInputStream
** aResult
)
190 *aResult
= mStreams
.SafeElementAt(aIndex
, nullptr);
191 if (NS_WARN_IF(!*aResult
)) {
192 return NS_ERROR_NOT_AVAILABLE
;
201 nsMultiplexInputStream::Close()
203 mStatus
= NS_BASE_STREAM_CLOSED
;
207 uint32_t len
= mStreams
.Length();
208 for (uint32_t i
= 0; i
< len
; ++i
) {
209 nsresult rv2
= mStreams
[i
]->Close();
210 // We still want to close all streams, but we should return an error
211 if (NS_FAILED(rv2
)) {
218 /* unsigned long long available (); */
220 nsMultiplexInputStream::Available(uint64_t* aResult
)
222 if (NS_FAILED(mStatus
)) {
229 uint32_t len
= mStreams
.Length();
230 for (uint32_t i
= mCurrentStream
; i
< len
; i
++) {
231 uint64_t streamAvail
;
232 rv
= AvailableMaybeSeek(mStreams
[i
], &streamAvail
);
233 if (NS_WARN_IF(NS_FAILED(rv
))) {
236 avail
+= streamAvail
;
242 /* [noscript] unsigned long read (in charPtr buf, in unsigned long count); */
244 nsMultiplexInputStream::Read(char* aBuf
, uint32_t aCount
, uint32_t* aResult
)
246 // It is tempting to implement this method in terms of ReadSegments, but
247 // that would prevent this class from being used with streams that only
248 // implement Read (e.g., file streams).
252 if (mStatus
== NS_BASE_STREAM_CLOSED
) {
255 if (NS_FAILED(mStatus
)) {
261 uint32_t len
= mStreams
.Length();
262 while (mCurrentStream
< len
&& aCount
) {
264 rv
= mStreams
[mCurrentStream
]->Read(aBuf
, aCount
, &read
);
266 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
267 // (This is a bug in those stream implementations)
268 if (rv
== NS_BASE_STREAM_CLOSED
) {
269 NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED");
272 } else if (NS_FAILED(rv
)) {
278 mStartedReadingCurrent
= false;
280 NS_ASSERTION(aCount
>= read
, "Read more than requested");
284 mStartedReadingCurrent
= true;
287 return *aResult
? NS_OK
: rv
;
290 /* [noscript] unsigned long readSegments (in nsWriteSegmentFun writer,
291 * in voidPtr closure,
292 * in unsigned long count); */
294 nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter
, void* aClosure
,
295 uint32_t aCount
, uint32_t* aResult
)
297 if (mStatus
== NS_BASE_STREAM_CLOSED
) {
301 if (NS_FAILED(mStatus
)) {
305 NS_ASSERTION(aWriter
, "missing aWriter");
308 ReadSegmentsState state
;
309 state
.mThisStream
= this;
311 state
.mWriter
= aWriter
;
312 state
.mClosure
= aClosure
;
315 uint32_t len
= mStreams
.Length();
316 while (mCurrentStream
< len
&& aCount
) {
318 rv
= mStreams
[mCurrentStream
]->ReadSegments(ReadSegCb
, &state
, aCount
, &read
);
320 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
321 // (This is a bug in those stream implementations)
322 if (rv
== NS_BASE_STREAM_CLOSED
) {
323 NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED");
328 // if |aWriter| decided to stop reading segments...
329 if (state
.mDone
|| NS_FAILED(rv
)) {
333 // if stream is empty, then advance to the next stream.
336 mStartedReadingCurrent
= false;
338 NS_ASSERTION(aCount
>= read
, "Read more than requested");
339 state
.mOffset
+= read
;
341 mStartedReadingCurrent
= true;
345 // if we successfully read some data, then this call succeeded.
346 *aResult
= state
.mOffset
;
347 return state
.mOffset
? NS_OK
: rv
;
351 nsMultiplexInputStream::ReadSegCb(nsIInputStream
* aIn
, void* aClosure
,
352 const char* aFromRawSegment
,
353 uint32_t aToOffset
, uint32_t aCount
,
354 uint32_t* aWriteCount
)
357 ReadSegmentsState
* state
= (ReadSegmentsState
*)aClosure
;
358 rv
= (state
->mWriter
)(state
->mThisStream
,
361 aToOffset
+ state
->mOffset
,
370 /* readonly attribute boolean nonBlocking; */
372 nsMultiplexInputStream::IsNonBlocking(bool* aNonBlocking
)
374 uint32_t len
= mStreams
.Length();
376 // Claim to be non-blocking, since we won't block the caller.
377 // On the other hand we'll never return NS_BASE_STREAM_WOULD_BLOCK,
378 // so maybe we should claim to be blocking? It probably doesn't
379 // matter in practice.
380 *aNonBlocking
= true;
383 for (uint32_t i
= 0; i
< len
; ++i
) {
384 nsresult rv
= mStreams
[i
]->IsNonBlocking(aNonBlocking
);
385 if (NS_WARN_IF(NS_FAILED(rv
))) {
388 // If one is non-blocking the entire stream becomes non-blocking
389 // (except that we don't implement nsIAsyncInputStream, so there's
390 // not much for the caller to do if Read returns "would block")
398 /* void seek (in int32_t whence, in int32_t offset); */
400 nsMultiplexInputStream::Seek(int32_t aWhence
, int64_t aOffset
)
402 if (NS_FAILED(mStatus
)) {
408 uint32_t oldCurrentStream
= mCurrentStream
;
409 bool oldStartedReadingCurrent
= mStartedReadingCurrent
;
411 if (aWhence
== NS_SEEK_SET
) {
412 int64_t remaining
= aOffset
;
416 for (uint32_t i
= 0; i
< mStreams
.Length(); ++i
) {
417 nsCOMPtr
<nsISeekableStream
> stream
=
418 do_QueryInterface(mStreams
[i
]);
420 return NS_ERROR_FAILURE
;
423 // See if all remaining streams should be rewound
424 if (remaining
== 0) {
425 if (i
< oldCurrentStream
||
426 (i
== oldCurrentStream
&& oldStartedReadingCurrent
)) {
427 rv
= stream
->Seek(NS_SEEK_SET
, 0);
428 if (NS_WARN_IF(NS_FAILED(rv
))) {
437 // Get position in current stream
439 if (i
> oldCurrentStream
||
440 (i
== oldCurrentStream
&& !oldStartedReadingCurrent
)) {
443 rv
= TellMaybeSeek(stream
, &streamPos
);
444 if (NS_WARN_IF(NS_FAILED(rv
))) {
449 // See if we need to seek current stream forward or backward
450 if (remaining
< streamPos
) {
451 rv
= stream
->Seek(NS_SEEK_SET
, remaining
);
452 if (NS_WARN_IF(NS_FAILED(rv
))) {
457 mStartedReadingCurrent
= remaining
!= 0;
460 } else if (remaining
> streamPos
) {
461 if (i
< oldCurrentStream
) {
462 // We're already at end so no need to seek this stream
463 remaining
-= streamPos
;
464 NS_ASSERTION(remaining
>= 0, "Remaining invalid");
467 rv
= AvailableMaybeSeek(mStreams
[i
], &avail
);
468 if (NS_WARN_IF(NS_FAILED(rv
))) {
472 int64_t newPos
= XPCOM_MIN(remaining
, streamPos
+ (int64_t)avail
);
474 rv
= stream
->Seek(NS_SEEK_SET
, newPos
);
475 if (NS_WARN_IF(NS_FAILED(rv
))) {
480 mStartedReadingCurrent
= true;
483 NS_ASSERTION(remaining
>= 0, "Remaining invalid");
486 NS_ASSERTION(remaining
== streamPos
, "Huh?");
494 if (aWhence
== NS_SEEK_CUR
&& aOffset
> 0) {
495 int64_t remaining
= aOffset
;
496 for (uint32_t i
= mCurrentStream
; remaining
&& i
< mStreams
.Length(); ++i
) {
497 nsCOMPtr
<nsISeekableStream
> stream
=
498 do_QueryInterface(mStreams
[i
]);
501 rv
= AvailableMaybeSeek(mStreams
[i
], &avail
);
502 if (NS_WARN_IF(NS_FAILED(rv
))) {
506 int64_t seek
= XPCOM_MIN((int64_t)avail
, remaining
);
508 rv
= stream
->Seek(NS_SEEK_CUR
, seek
);
509 if (NS_WARN_IF(NS_FAILED(rv
))) {
514 mStartedReadingCurrent
= true;
522 if (aWhence
== NS_SEEK_CUR
&& aOffset
< 0) {
523 int64_t remaining
= -aOffset
;
524 for (uint32_t i
= mCurrentStream
; remaining
&& i
!= (uint32_t)-1; --i
) {
525 nsCOMPtr
<nsISeekableStream
> stream
=
526 do_QueryInterface(mStreams
[i
]);
529 rv
= TellMaybeSeek(stream
, &pos
);
530 if (NS_WARN_IF(NS_FAILED(rv
))) {
534 int64_t seek
= XPCOM_MIN(pos
, remaining
);
536 rv
= stream
->Seek(NS_SEEK_CUR
, -seek
);
537 if (NS_WARN_IF(NS_FAILED(rv
))) {
542 mStartedReadingCurrent
= seek
!= -pos
;
550 if (aWhence
== NS_SEEK_CUR
) {
551 NS_ASSERTION(aOffset
== 0, "Should have handled all non-zero values");
556 if (aWhence
== NS_SEEK_END
) {
558 return NS_ERROR_INVALID_ARG
;
560 int64_t remaining
= aOffset
;
561 for (uint32_t i
= mStreams
.Length() - 1; i
!= (uint32_t)-1; --i
) {
562 nsCOMPtr
<nsISeekableStream
> stream
=
563 do_QueryInterface(mStreams
[i
]);
565 // See if all remaining streams should be seeked to end
566 if (remaining
== 0) {
567 if (i
>= oldCurrentStream
) {
568 rv
= stream
->Seek(NS_SEEK_END
, 0);
569 if (NS_WARN_IF(NS_FAILED(rv
))) {
577 // Get position in current stream
579 if (i
< oldCurrentStream
) {
583 rv
= AvailableMaybeSeek(mStreams
[i
], &avail
);
584 if (NS_WARN_IF(NS_FAILED(rv
))) {
591 // See if we have enough data in the current stream.
592 if (DeprecatedAbs(remaining
) < streamPos
) {
593 rv
= stream
->Seek(NS_SEEK_END
, remaining
);
594 if (NS_WARN_IF(NS_FAILED(rv
))) {
599 mStartedReadingCurrent
= true;
602 } else if (DeprecatedAbs(remaining
) > streamPos
) {
603 if (i
> oldCurrentStream
||
604 (i
== oldCurrentStream
&& !oldStartedReadingCurrent
)) {
605 // We're already at start so no need to seek this stream
606 remaining
+= streamPos
;
609 rv
= TellMaybeSeek(stream
, &avail
);
610 if (NS_WARN_IF(NS_FAILED(rv
))) {
614 int64_t newPos
= streamPos
+ XPCOM_MIN(avail
, DeprecatedAbs(remaining
));
616 rv
= stream
->Seek(NS_SEEK_END
, -newPos
);
617 if (NS_WARN_IF(NS_FAILED(rv
))) {
622 mStartedReadingCurrent
= true;
627 NS_ASSERTION(remaining
== streamPos
, "Huh?");
635 // other Seeks not implemented yet
636 return NS_ERROR_NOT_IMPLEMENTED
;
639 /* uint32_t tell (); */
641 nsMultiplexInputStream::Tell(int64_t* aResult
)
643 if (NS_FAILED(mStatus
)) {
650 last
= mStartedReadingCurrent
? mCurrentStream
+ 1 : mCurrentStream
;
651 for (i
= 0; i
< last
; ++i
) {
652 nsCOMPtr
<nsISeekableStream
> stream
= do_QueryInterface(mStreams
[i
]);
653 if (NS_WARN_IF(!stream
)) {
654 return NS_ERROR_NO_INTERFACE
;
658 rv
= TellMaybeSeek(stream
, &pos
);
659 if (NS_WARN_IF(NS_FAILED(rv
))) {
669 /* void setEOF (); */
671 nsMultiplexInputStream::SetEOF()
673 return NS_ERROR_NOT_IMPLEMENTED
;
677 nsMultiplexInputStreamConstructor(nsISupports
* aOuter
,
684 return NS_ERROR_NO_AGGREGATION
;
687 nsMultiplexInputStream
* inst
= new nsMultiplexInputStream();
689 return NS_ERROR_OUT_OF_MEMORY
;
693 nsresult rv
= inst
->QueryInterface(aIID
, aResult
);
700 nsMultiplexInputStream::Serialize(InputStreamParams
& aParams
,
701 FileDescriptorArray
& aFileDescriptors
)
703 MultiplexInputStreamParams params
;
705 uint32_t streamCount
= mStreams
.Length();
708 InfallibleTArray
<InputStreamParams
>& streams
= params
.streams();
710 streams
.SetCapacity(streamCount
);
711 for (uint32_t index
= 0; index
< streamCount
; index
++) {
712 InputStreamParams childStreamParams
;
713 SerializeInputStream(mStreams
[index
], childStreamParams
,
716 streams
.AppendElement(childStreamParams
);
720 params
.currentStream() = mCurrentStream
;
721 params
.status() = mStatus
;
722 params
.startedReadingCurrent() = mStartedReadingCurrent
;
728 nsMultiplexInputStream::Deserialize(const InputStreamParams
& aParams
,
729 const FileDescriptorArray
& aFileDescriptors
)
731 if (aParams
.type() !=
732 InputStreamParams::TMultiplexInputStreamParams
) {
733 NS_ERROR("Received unknown parameters from the other process!");
737 const MultiplexInputStreamParams
& params
=
738 aParams
.get_MultiplexInputStreamParams();
740 const InfallibleTArray
<InputStreamParams
>& streams
= params
.streams();
742 uint32_t streamCount
= streams
.Length();
743 for (uint32_t index
= 0; index
< streamCount
; index
++) {
744 nsCOMPtr
<nsIInputStream
> stream
=
745 DeserializeInputStream(streams
[index
], aFileDescriptors
);
747 NS_WARNING("Deserialize failed!");
751 if (NS_FAILED(AppendStream(stream
))) {
752 NS_WARNING("AppendStream failed!");
757 mCurrentStream
= params
.currentStream();
758 mStatus
= params
.status();
759 mStartedReadingCurrent
= params
.startedReadingCurrent();