1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "SlicedInputStream.h"
8 #include "mozilla/ipc/InputStreamUtils.h"
9 #include "mozilla/CheckedInt.h"
10 #include "mozilla/ScopeExit.h"
11 #include "nsISeekableStream.h"
12 #include "nsStreamUtils.h"
18 NS_IMPL_ADDREF(SlicedInputStream
);
19 NS_IMPL_RELEASE(SlicedInputStream
);
21 NS_INTERFACE_MAP_BEGIN(SlicedInputStream
)
22 NS_INTERFACE_MAP_ENTRY(nsIInputStream
)
23 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream
,
24 mWeakCloneableInputStream
|| !mInputStream
)
25 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(
26 nsIIPCSerializableInputStream
,
27 mWeakIPCSerializableInputStream
|| !mInputStream
)
28 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream
,
29 mWeakSeekableInputStream
|| !mInputStream
)
30 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsITellableStream
,
31 mWeakTellableInputStream
|| !mInputStream
)
32 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream
,
33 mWeakAsyncInputStream
|| !mInputStream
)
34 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback
,
35 mWeakAsyncInputStream
|| !mInputStream
)
36 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength
,
37 mWeakInputStreamLength
|| !mInputStream
)
38 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(
39 nsIAsyncInputStreamLength
, mWeakAsyncInputStreamLength
|| !mInputStream
)
40 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(
41 nsIInputStreamLengthCallback
,
42 mWeakAsyncInputStreamLength
|| !mInputStream
)
43 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports
, nsIInputStream
)
46 SlicedInputStream::SlicedInputStream(
47 already_AddRefed
<nsIInputStream
> aInputStream
, uint64_t aStart
,
49 : mWeakCloneableInputStream(nullptr),
50 mWeakIPCSerializableInputStream(nullptr),
51 mWeakSeekableInputStream(nullptr),
52 mWeakTellableInputStream(nullptr),
53 mWeakAsyncInputStream(nullptr),
54 mWeakInputStreamLength(nullptr),
55 mWeakAsyncInputStreamLength(nullptr),
61 mAsyncWaitRequestedCount(0),
62 mMutex("SlicedInputStream::mMutex") {
63 nsCOMPtr
<nsIInputStream
> inputStream
= std::move(aInputStream
);
64 SetSourceStream(inputStream
.forget());
67 SlicedInputStream::SlicedInputStream()
68 : mWeakCloneableInputStream(nullptr),
69 mWeakIPCSerializableInputStream(nullptr),
70 mWeakSeekableInputStream(nullptr),
71 mWeakTellableInputStream(nullptr),
72 mWeakAsyncInputStream(nullptr),
73 mWeakInputStreamLength(nullptr),
74 mWeakAsyncInputStreamLength(nullptr),
80 mAsyncWaitRequestedCount(0),
81 mMutex("SlicedInputStream::mMutex") {}
83 SlicedInputStream::~SlicedInputStream() = default;
85 void SlicedInputStream::SetSourceStream(
86 already_AddRefed
<nsIInputStream
> aInputStream
) {
87 MOZ_ASSERT(!mInputStream
);
89 mInputStream
= std::move(aInputStream
);
91 nsCOMPtr
<nsICloneableInputStream
> cloneableStream
=
92 do_QueryInterface(mInputStream
);
93 if (cloneableStream
&& SameCOMIdentity(mInputStream
, cloneableStream
)) {
94 mWeakCloneableInputStream
= cloneableStream
;
97 nsCOMPtr
<nsIIPCSerializableInputStream
> serializableStream
=
98 do_QueryInterface(mInputStream
);
99 if (serializableStream
&& SameCOMIdentity(mInputStream
, serializableStream
)) {
100 mWeakIPCSerializableInputStream
= serializableStream
;
103 nsCOMPtr
<nsISeekableStream
> seekableStream
= do_QueryInterface(mInputStream
);
104 if (seekableStream
&& SameCOMIdentity(mInputStream
, seekableStream
)) {
105 mWeakSeekableInputStream
= seekableStream
;
108 nsCOMPtr
<nsITellableStream
> tellableStream
= do_QueryInterface(mInputStream
);
109 if (tellableStream
&& SameCOMIdentity(mInputStream
, tellableStream
)) {
110 mWeakTellableInputStream
= tellableStream
;
113 nsCOMPtr
<nsIAsyncInputStream
> asyncInputStream
=
114 do_QueryInterface(mInputStream
);
115 if (asyncInputStream
&& SameCOMIdentity(mInputStream
, asyncInputStream
)) {
116 mWeakAsyncInputStream
= asyncInputStream
;
119 nsCOMPtr
<nsIInputStreamLength
> streamLength
= do_QueryInterface(mInputStream
);
120 if (streamLength
&& SameCOMIdentity(mInputStream
, streamLength
)) {
121 mWeakInputStreamLength
= streamLength
;
124 nsCOMPtr
<nsIAsyncInputStreamLength
> asyncStreamLength
=
125 do_QueryInterface(mInputStream
);
126 if (asyncStreamLength
&& SameCOMIdentity(mInputStream
, asyncStreamLength
)) {
127 mWeakAsyncInputStreamLength
= asyncStreamLength
;
131 uint64_t SlicedInputStream::AdjustRange(uint64_t aRange
) {
132 CheckedUint64
range(aRange
);
135 // Let's remove extra length from the end.
136 if (range
.isValid() && range
.value() > mStart
+ mLength
) {
137 aRange
-= XPCOM_MIN((uint64_t)aRange
, range
.value() - (mStart
+ mLength
));
140 // Let's remove extra length from the begin.
141 if (mCurPos
< mStart
) {
142 aRange
-= XPCOM_MIN((uint64_t)aRange
, mStart
- mCurPos
);
148 // nsIInputStream interface
151 SlicedInputStream::Close() {
152 NS_ENSURE_STATE(mInputStream
);
155 return mInputStream
->Close();
159 SlicedInputStream::Available(uint64_t* aLength
) {
160 NS_ENSURE_STATE(mInputStream
);
163 return NS_BASE_STREAM_CLOSED
;
166 nsresult rv
= mInputStream
->Available(aLength
);
167 if (rv
== NS_BASE_STREAM_CLOSED
) {
172 if (NS_WARN_IF(NS_FAILED(rv
))) {
176 *aLength
= AdjustRange(*aLength
);
181 SlicedInputStream::StreamStatus() {
182 NS_ENSURE_STATE(mInputStream
);
185 return NS_BASE_STREAM_CLOSED
;
188 nsresult rv
= mInputStream
->StreamStatus();
189 if (rv
== NS_BASE_STREAM_CLOSED
) {
196 SlicedInputStream::Read(char* aBuffer
, uint32_t aCount
, uint32_t* aReadCount
) {
203 if (mCurPos
< mStart
) {
204 nsCOMPtr
<nsISeekableStream
> seekableStream
=
205 do_QueryInterface(mInputStream
);
206 if (seekableStream
) {
208 seekableStream
->Seek(nsISeekableStream::NS_SEEK_SET
, mStart
);
209 if (NS_WARN_IF(NS_FAILED(rv
))) {
216 while (mCurPos
< mStart
) {
218 uint64_t bufCount
= XPCOM_MIN(mStart
- mCurPos
, (uint64_t)sizeof(buf
));
219 nsresult rv
= mInputStream
->Read(buf
, bufCount
, &bytesRead
);
220 if (NS_SUCCEEDED(rv
) && bytesRead
== 0) {
225 if (NS_WARN_IF(NS_FAILED(rv
))) {
229 mCurPos
+= bytesRead
;
234 // Let's reduce aCount in case it's too big.
235 if (mCurPos
+ aCount
> mStart
+ mLength
) {
236 aCount
= mStart
+ mLength
- mCurPos
;
239 // Nothing else to read.
244 nsresult rv
= mInputStream
->Read(aBuffer
, aCount
, aReadCount
);
245 if (NS_SUCCEEDED(rv
) && *aReadCount
== 0) {
250 if (NS_WARN_IF(NS_FAILED(rv
))) {
254 mCurPos
+= *aReadCount
;
259 SlicedInputStream::ReadSegments(nsWriteSegmentFun aWriter
, void* aClosure
,
260 uint32_t aCount
, uint32_t* aResult
) {
261 return NS_ERROR_NOT_IMPLEMENTED
;
265 SlicedInputStream::IsNonBlocking(bool* aNonBlocking
) {
266 NS_ENSURE_STATE(mInputStream
);
267 return mInputStream
->IsNonBlocking(aNonBlocking
);
270 // nsICloneableInputStream interface
273 SlicedInputStream::GetCloneable(bool* aCloneable
) {
274 NS_ENSURE_STATE(mInputStream
);
275 NS_ENSURE_STATE(mWeakCloneableInputStream
);
282 SlicedInputStream::Clone(nsIInputStream
** aResult
) {
283 NS_ENSURE_STATE(mInputStream
);
284 NS_ENSURE_STATE(mWeakCloneableInputStream
);
286 nsCOMPtr
<nsIInputStream
> clonedStream
;
287 nsresult rv
= mWeakCloneableInputStream
->Clone(getter_AddRefs(clonedStream
));
288 if (NS_WARN_IF(NS_FAILED(rv
))) {
292 nsCOMPtr
<nsIInputStream
> sis
=
293 new SlicedInputStream(clonedStream
.forget(), mStart
, mLength
);
299 // nsIAsyncInputStream interface
302 SlicedInputStream::CloseWithStatus(nsresult aStatus
) {
303 NS_ENSURE_STATE(mInputStream
);
304 NS_ENSURE_STATE(mWeakAsyncInputStream
);
307 return mWeakAsyncInputStream
->CloseWithStatus(aStatus
);
311 SlicedInputStream::AsyncWait(nsIInputStreamCallback
* aCallback
, uint32_t aFlags
,
312 uint32_t aRequestedCount
,
313 nsIEventTarget
* aEventTarget
) {
314 NS_ENSURE_STATE(mInputStream
);
315 NS_ENSURE_STATE(mWeakAsyncInputStream
);
317 nsCOMPtr
<nsIInputStreamCallback
> callback
= aCallback
? this : nullptr;
319 uint32_t flags
= aFlags
;
320 uint32_t requestedCount
= aRequestedCount
;
323 MutexAutoLock
lock(mMutex
);
325 if (NS_WARN_IF(mAsyncWaitCallback
&& aCallback
&&
326 mAsyncWaitCallback
!= aCallback
)) {
327 return NS_ERROR_FAILURE
;
330 mAsyncWaitCallback
= aCallback
;
332 // If we haven't started retrieving data, let's see if we can seek.
333 // If we cannot seek, we will do consecutive reads.
334 if (mCurPos
< mStart
&& mWeakSeekableInputStream
) {
335 nsresult rv
= mWeakSeekableInputStream
->Seek(
336 nsISeekableStream::NS_SEEK_SET
, mStart
);
337 if (NS_WARN_IF(NS_FAILED(rv
))) {
344 mAsyncWaitFlags
= aFlags
;
345 mAsyncWaitRequestedCount
= aRequestedCount
;
346 mAsyncWaitEventTarget
= aEventTarget
;
348 // If we are not at the right position, let's do an asyncWait just internal.
349 if (mCurPos
< mStart
) {
351 requestedCount
= mStart
- mCurPos
;
355 return mWeakAsyncInputStream
->AsyncWait(callback
, flags
, requestedCount
,
359 // nsIInputStreamCallback
362 SlicedInputStream::OnInputStreamReady(nsIAsyncInputStream
* aStream
) {
363 MOZ_ASSERT(mInputStream
);
364 MOZ_ASSERT(mWeakAsyncInputStream
);
365 MOZ_ASSERT(mWeakAsyncInputStream
== aStream
);
367 nsCOMPtr
<nsIInputStreamCallback
> callback
;
368 uint32_t asyncWaitFlags
= 0;
369 uint32_t asyncWaitRequestedCount
= 0;
370 nsCOMPtr
<nsIEventTarget
> asyncWaitEventTarget
;
373 MutexAutoLock
lock(mMutex
);
375 // We have been canceled in the meanwhile.
376 if (!mAsyncWaitCallback
) {
380 auto raii
= MakeScopeExit([&] {
381 mMutex
.AssertCurrentThreadOwns();
382 mAsyncWaitCallback
= nullptr;
383 mAsyncWaitEventTarget
= nullptr;
386 asyncWaitFlags
= mAsyncWaitFlags
;
387 asyncWaitRequestedCount
= mAsyncWaitRequestedCount
;
388 asyncWaitEventTarget
= mAsyncWaitEventTarget
;
390 // If at the end of this locked block, the callback is not null, it will be
391 // executed, otherwise, we are going to exec another AsyncWait().
392 callback
= mAsyncWaitCallback
;
394 if (mCurPos
< mStart
) {
397 while (mCurPos
< mStart
) {
399 uint64_t bufCount
= XPCOM_MIN(mStart
- mCurPos
, (uint64_t)sizeof(buf
));
400 rv
= mInputStream
->Read(buf
, bufCount
, &bytesRead
);
401 if (NS_SUCCEEDED(rv
) && bytesRead
== 0) {
406 if (rv
== NS_BASE_STREAM_WOULD_BLOCK
) {
408 asyncWaitRequestedCount
= mStart
- mCurPos
;
409 // Here we want to exec another AsyncWait().
414 if (NS_WARN_IF(NS_FAILED(rv
))) {
418 mCurPos
+= bytesRead
;
421 // Now we are ready to do the 'real' asyncWait.
422 if (mCurPos
>= mStart
) {
423 // We don't want to nullify the callback now, because it will be needed
424 // at the next ::OnInputStreamReady.
432 return callback
->OnInputStreamReady(this);
435 return mWeakAsyncInputStream
->AsyncWait(
436 this, asyncWaitFlags
, asyncWaitRequestedCount
, asyncWaitEventTarget
);
439 // nsIIPCSerializableInputStream
441 void SlicedInputStream::SerializedComplexity(uint32_t aMaxSize
,
444 uint32_t* aTransferables
) {
445 InputStreamHelper::SerializedComplexity(mInputStream
, aMaxSize
, aSizeUsed
,
446 aPipes
, aTransferables
);
448 // If we're going to be serializing a pipe to transfer the sliced data, and we
449 // are getting no efficiency improvements from transferables, stream this
450 // sliced input stream directly as a pipe to avoid streaming data which will
451 // be sliced off anyway.
452 if (*aPipes
> 0 && *aTransferables
== 0) {
459 void SlicedInputStream::Serialize(mozilla::ipc::InputStreamParams
& aParams
,
460 uint32_t aMaxSize
, uint32_t* aSizeUsed
) {
461 MOZ_ASSERT(mInputStream
);
462 MOZ_ASSERT(mWeakIPCSerializableInputStream
);
464 uint32_t sizeUsed
= 0, pipes
= 0, transferables
= 0;
465 SerializedComplexity(aMaxSize
, &sizeUsed
, &pipes
, &transferables
);
466 if (pipes
> 0 && transferables
== 0) {
467 InputStreamHelper::SerializeInputStreamAsPipe(this, aParams
);
471 SlicedInputStreamParams params
;
472 InputStreamHelper::SerializeInputStream(mInputStream
, params
.stream(),
473 aMaxSize
, aSizeUsed
);
474 params
.start() = mStart
;
475 params
.length() = mLength
;
476 params
.curPos() = mCurPos
;
477 params
.closed() = mClosed
;
482 bool SlicedInputStream::Deserialize(
483 const mozilla::ipc::InputStreamParams
& aParams
) {
484 MOZ_ASSERT(!mInputStream
);
485 MOZ_ASSERT(!mWeakIPCSerializableInputStream
);
487 if (aParams
.type() != InputStreamParams::TSlicedInputStreamParams
) {
488 NS_ERROR("Received unknown parameters from the other process!");
492 const SlicedInputStreamParams
& params
= aParams
.get_SlicedInputStreamParams();
494 nsCOMPtr
<nsIInputStream
> stream
=
495 InputStreamHelper::DeserializeInputStream(params
.stream());
497 NS_WARNING("Deserialize failed!");
501 SetSourceStream(stream
.forget());
503 mStart
= params
.start();
504 mLength
= params
.length();
505 mCurPos
= params
.curPos();
506 mClosed
= params
.closed();
514 SlicedInputStream::Seek(int32_t aWhence
, int64_t aOffset
) {
515 NS_ENSURE_STATE(mInputStream
);
516 NS_ENSURE_STATE(mWeakSeekableInputStream
);
523 offset
= mStart
+ aOffset
;
526 // mCurPos could be lower than mStart if the reading has not started yet.
527 offset
= XPCOM_MAX(mStart
, mCurPos
) + aOffset
;
531 rv
= mInputStream
->Available(&available
);
532 if (rv
== NS_BASE_STREAM_CLOSED
) {
537 if (NS_WARN_IF(NS_FAILED(rv
))) {
541 offset
= XPCOM_MIN(mStart
+ mLength
, available
) + aOffset
;
545 return NS_ERROR_ILLEGAL_VALUE
;
548 if (offset
< (int64_t)mStart
|| offset
> (int64_t)(mStart
+ mLength
)) {
549 return NS_ERROR_INVALID_ARG
;
552 rv
= mWeakSeekableInputStream
->Seek(NS_SEEK_SET
, offset
);
553 if (NS_WARN_IF(NS_FAILED(rv
))) {
562 SlicedInputStream::SetEOF() {
563 NS_ENSURE_STATE(mInputStream
);
564 NS_ENSURE_STATE(mWeakSeekableInputStream
);
567 return mWeakSeekableInputStream
->SetEOF();
573 SlicedInputStream::Tell(int64_t* aResult
) {
574 NS_ENSURE_STATE(mInputStream
);
575 NS_ENSURE_STATE(mWeakTellableInputStream
);
579 nsresult rv
= mWeakTellableInputStream
->Tell(&tell
);
580 if (NS_WARN_IF(NS_FAILED(rv
))) {
584 if (tell
< (int64_t)mStart
) {
589 *aResult
= tell
- mStart
;
590 if (*aResult
> (int64_t)mLength
) {
597 // nsIInputStreamLength
600 SlicedInputStream::Length(int64_t* aLength
) {
601 NS_ENSURE_STATE(mInputStream
);
602 NS_ENSURE_STATE(mWeakInputStreamLength
);
604 nsresult rv
= mWeakInputStreamLength
->Length(aLength
);
605 if (rv
== NS_BASE_STREAM_CLOSED
) {
610 if (NS_WARN_IF(NS_FAILED(rv
))) {
614 if (*aLength
== -1) {
618 *aLength
= (int64_t)AdjustRange((uint64_t)*aLength
);
622 // nsIAsyncInputStreamLength
625 SlicedInputStream::AsyncLengthWait(nsIInputStreamLengthCallback
* aCallback
,
626 nsIEventTarget
* aEventTarget
) {
627 NS_ENSURE_STATE(mInputStream
);
628 NS_ENSURE_STATE(mWeakAsyncInputStreamLength
);
630 nsCOMPtr
<nsIInputStreamLengthCallback
> callback
= aCallback
? this : nullptr;
632 MutexAutoLock
lock(mMutex
);
633 mAsyncWaitLengthCallback
= aCallback
;
636 return mWeakAsyncInputStreamLength
->AsyncLengthWait(callback
, aEventTarget
);
639 // nsIInputStreamLengthCallback
642 SlicedInputStream::OnInputStreamLengthReady(nsIAsyncInputStreamLength
* aStream
,
644 MOZ_ASSERT(mInputStream
);
645 MOZ_ASSERT(mWeakAsyncInputStreamLength
);
646 MOZ_ASSERT(mWeakAsyncInputStreamLength
== aStream
);
648 nsCOMPtr
<nsIInputStreamLengthCallback
> callback
;
650 MutexAutoLock
lock(mMutex
);
652 // We have been canceled in the meanwhile.
653 if (!mAsyncWaitLengthCallback
) {
657 callback
.swap(mAsyncWaitLengthCallback
);
661 aLength
= (int64_t)AdjustRange((uint64_t)aLength
);
664 MOZ_ASSERT(callback
);
665 return callback
->OnInputStreamLengthReady(this, aLength
);
668 } // namespace mozilla