1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/Mutex.h"
8 #include "mozilla/Attributes.h"
9 #include "mozilla/InputStreamLengthWrapper.h"
10 #include "nsIInputStreamLength.h"
11 #include "nsStreamUtils.h"
13 #include "nsICloneableInputStream.h"
14 #include "nsIEventTarget.h"
15 #include "nsICancelableRunnable.h"
16 #include "nsISafeOutputStream.h"
18 #include "nsIAsyncInputStream.h"
19 #include "nsIAsyncOutputStream.h"
20 #include "nsIBufferedStreams.h"
23 #include "nsServiceManagerUtils.h"
24 #include "nsThreadUtils.h"
25 #include "nsITransport.h"
26 #include "nsIStreamTransportService.h"
27 #include "NonBlockingAsyncInputStream.h"
29 using namespace mozilla
;
31 static NS_DEFINE_CID(kStreamTransportServiceCID
, NS_STREAMTRANSPORTSERVICE_CID
);
33 //-----------------------------------------------------------------------------
35 // This is a nsICancelableRunnable because we can dispatch it to Workers and
36 // those can be shut down at any time, and in these cases, Cancel() is called
38 class nsInputStreamReadyEvent final
: public CancelableRunnable
,
39 public nsIInputStreamCallback
,
40 public nsIRunnablePriority
{
42 NS_DECL_ISUPPORTS_INHERITED
44 nsInputStreamReadyEvent(const char* aName
, nsIInputStreamCallback
* aCallback
,
45 nsIEventTarget
* aTarget
, uint32_t aPriority
)
46 : CancelableRunnable(aName
),
49 mPriority(aPriority
) {}
52 ~nsInputStreamReadyEvent() {
57 // whoa!! looks like we never posted this event. take care to
58 // release mCallback on the correct thread. if mTarget lives on the
59 // calling thread, then we are ok. otherwise, we have to try to
60 // proxy the Release over the right thread. if that thread is dead,
61 // then there's nothing we can do... better to leak than crash.
64 nsresult rv
= mTarget
->IsOnCurrentThread(&val
);
65 if (NS_FAILED(rv
) || !val
) {
66 nsCOMPtr
<nsIInputStreamCallback
> event
= NS_NewInputStreamReadyEvent(
67 "~nsInputStreamReadyEvent", mCallback
, mTarget
, mPriority
);
70 rv
= event
->OnInputStreamReady(nullptr);
72 MOZ_ASSERT_UNREACHABLE("leaking stream event");
73 nsISupports
* sup
= event
;
81 NS_IMETHOD
OnInputStreamReady(nsIAsyncInputStream
* aStream
) override
{
84 nsresult rv
= mTarget
->Dispatch(this, NS_DISPATCH_NORMAL
);
86 NS_WARNING("Dispatch failed");
87 return NS_ERROR_FAILURE
;
93 NS_IMETHOD
Run() override
{
96 mCallback
->OnInputStreamReady(mStream
);
103 nsresult
Cancel() override
{
108 NS_IMETHOD
GetPriority(uint32_t* aPriority
) override
{
109 *aPriority
= mPriority
;
114 nsCOMPtr
<nsIAsyncInputStream
> mStream
;
115 nsCOMPtr
<nsIInputStreamCallback
> mCallback
;
116 nsCOMPtr
<nsIEventTarget
> mTarget
;
120 NS_IMPL_ISUPPORTS_INHERITED(nsInputStreamReadyEvent
, CancelableRunnable
,
121 nsIInputStreamCallback
, nsIRunnablePriority
)
123 //-----------------------------------------------------------------------------
125 // This is a nsICancelableRunnable because we can dispatch it to Workers and
126 // those can be shut down at any time, and in these cases, Cancel() is called
128 class nsOutputStreamReadyEvent final
: public CancelableRunnable
,
129 public nsIOutputStreamCallback
{
131 NS_DECL_ISUPPORTS_INHERITED
133 nsOutputStreamReadyEvent(nsIOutputStreamCallback
* aCallback
,
134 nsIEventTarget
* aTarget
)
135 : CancelableRunnable("nsOutputStreamReadyEvent"),
136 mCallback(aCallback
),
140 ~nsOutputStreamReadyEvent() {
145 // whoa!! looks like we never posted this event. take care to
146 // release mCallback on the correct thread. if mTarget lives on the
147 // calling thread, then we are ok. otherwise, we have to try to
148 // proxy the Release over the right thread. if that thread is dead,
149 // then there's nothing we can do... better to leak than crash.
152 nsresult rv
= mTarget
->IsOnCurrentThread(&val
);
153 if (NS_FAILED(rv
) || !val
) {
154 nsCOMPtr
<nsIOutputStreamCallback
> event
=
155 NS_NewOutputStreamReadyEvent(mCallback
, mTarget
);
158 rv
= event
->OnOutputStreamReady(nullptr);
160 MOZ_ASSERT_UNREACHABLE("leaking stream event");
161 nsISupports
* sup
= event
;
169 NS_IMETHOD
OnOutputStreamReady(nsIAsyncOutputStream
* aStream
) override
{
172 nsresult rv
= mTarget
->Dispatch(this, NS_DISPATCH_NORMAL
);
174 NS_WARNING("PostEvent failed");
175 return NS_ERROR_FAILURE
;
181 NS_IMETHOD
Run() override
{
184 mCallback
->OnOutputStreamReady(mStream
);
191 nsresult
Cancel() override
{
197 nsCOMPtr
<nsIAsyncOutputStream
> mStream
;
198 nsCOMPtr
<nsIOutputStreamCallback
> mCallback
;
199 nsCOMPtr
<nsIEventTarget
> mTarget
;
202 NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent
, CancelableRunnable
,
203 nsIOutputStreamCallback
)
205 //-----------------------------------------------------------------------------
207 already_AddRefed
<nsIInputStreamCallback
> NS_NewInputStreamReadyEvent(
208 const char* aName
, nsIInputStreamCallback
* aCallback
,
209 nsIEventTarget
* aTarget
, uint32_t aPriority
) {
210 NS_ASSERTION(aCallback
, "null callback");
211 NS_ASSERTION(aTarget
, "null target");
212 RefPtr
<nsInputStreamReadyEvent
> ev
=
213 new nsInputStreamReadyEvent(aName
, aCallback
, aTarget
, aPriority
);
217 already_AddRefed
<nsIOutputStreamCallback
> NS_NewOutputStreamReadyEvent(
218 nsIOutputStreamCallback
* aCallback
, nsIEventTarget
* aTarget
) {
219 NS_ASSERTION(aCallback
, "null callback");
220 NS_ASSERTION(aTarget
, "null target");
221 RefPtr
<nsOutputStreamReadyEvent
> ev
=
222 new nsOutputStreamReadyEvent(aCallback
, aTarget
);
226 //-----------------------------------------------------------------------------
227 // NS_AsyncCopy implementation
229 // abstract stream copier...
230 class nsAStreamCopier
: public nsIInputStreamCallback
,
231 public nsIOutputStreamCallback
,
232 public CancelableRunnable
{
234 NS_DECL_ISUPPORTS_INHERITED
237 : CancelableRunnable("nsAStreamCopier"),
238 mLock("nsAStreamCopier.mLock"),
240 mProgressCallback(nullptr),
243 mEventInProcess(false),
244 mEventIsPending(false),
248 mCancelStatus(NS_OK
) {}
250 // kick off the async copy...
251 nsresult
Start(nsIInputStream
* aSource
, nsIOutputStream
* aSink
,
252 nsIEventTarget
* aTarget
, nsAsyncCopyCallbackFun aCallback
,
253 void* aClosure
, uint32_t aChunksize
, bool aCloseSource
,
254 bool aCloseSink
, nsAsyncCopyProgressFun aProgressCallback
) {
258 mCallback
= aCallback
;
260 mChunkSize
= aChunksize
;
261 mCloseSource
= aCloseSource
;
262 mCloseSink
= aCloseSink
;
263 mProgressCallback
= aProgressCallback
;
265 mAsyncSource
= do_QueryInterface(mSource
);
266 mAsyncSink
= do_QueryInterface(mSink
);
268 return PostContinuationEvent();
271 // implemented by subclasses, returns number of bytes copied and
272 // sets source and sink condition before returning.
273 virtual uint32_t DoCopy(nsresult
* aSourceCondition
,
274 nsresult
* aSinkCondition
) = 0;
277 if (!mSource
|| !mSink
) {
281 nsresult cancelStatus
;
284 MutexAutoLock
lock(mLock
);
285 canceled
= mCanceled
;
286 cancelStatus
= mCancelStatus
;
289 // If the copy was canceled before Process() was even called, then
290 // sourceCondition and sinkCondition should be set to error results to
291 // ensure we don't call Finish() on a canceled nsISafeOutputStream.
292 MOZ_ASSERT(NS_FAILED(cancelStatus
) == canceled
, "cancel needs an error");
293 nsresult sourceCondition
= cancelStatus
;
294 nsresult sinkCondition
= cancelStatus
;
296 // Copy data from the source to the sink until we hit failure or have
297 // copied all the data.
299 // Note: copyFailed will be true if the source or the sink have
300 // reported an error, or if we failed to write any bytes
301 // because we have consumed all of our data.
302 bool copyFailed
= false;
304 uint32_t n
= DoCopy(&sourceCondition
, &sinkCondition
);
305 if (n
> 0 && mProgressCallback
) {
306 mProgressCallback(mClosure
, n
);
309 NS_FAILED(sourceCondition
) || NS_FAILED(sinkCondition
) || n
== 0;
311 MutexAutoLock
lock(mLock
);
312 canceled
= mCanceled
;
313 cancelStatus
= mCancelStatus
;
315 if (copyFailed
&& !canceled
) {
316 if (sourceCondition
== NS_BASE_STREAM_WOULD_BLOCK
&& mAsyncSource
) {
317 // need to wait for more data from source. while waiting for
318 // more source data, be sure to observe failures on output end.
319 mAsyncSource
->AsyncWait(this, 0, 0, nullptr);
322 mAsyncSink
->AsyncWait(this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY
,
327 if (sinkCondition
== NS_BASE_STREAM_WOULD_BLOCK
&& mAsyncSink
) {
328 // need to wait for more room in the sink. while waiting for
329 // more room in the sink, be sure to observer failures on the
331 mAsyncSink
->AsyncWait(this, 0, 0, nullptr);
334 mAsyncSource
->AsyncWait(
335 this, nsIAsyncInputStream::WAIT_CLOSURE_ONLY
, 0, nullptr);
340 if (copyFailed
|| canceled
) {
342 // cancel any previously-registered AsyncWait callbacks to avoid leaks
343 mAsyncSource
->AsyncWait(nullptr, 0, 0, nullptr);
348 mAsyncSource
->CloseWithStatus(canceled
? cancelStatus
354 mAsyncSource
= nullptr;
358 // cancel any previously-registered AsyncWait callbacks to avoid leaks
359 mAsyncSink
->AsyncWait(nullptr, 0, 0, nullptr);
364 mAsyncSink
->CloseWithStatus(canceled
? cancelStatus
367 // If we have an nsISafeOutputStream, and our
368 // sourceCondition and sinkCondition are not set to a
369 // failure state, finish writing.
370 nsCOMPtr
<nsISafeOutputStream
> sostream
= do_QueryInterface(mSink
);
371 if (sostream
&& NS_SUCCEEDED(sourceCondition
) &&
372 NS_SUCCEEDED(sinkCondition
)) {
379 mAsyncSink
= nullptr;
382 // notify state complete...
386 status
= sourceCondition
;
387 if (NS_SUCCEEDED(status
)) {
388 status
= sinkCondition
;
390 if (status
== NS_BASE_STREAM_CLOSED
) {
394 status
= cancelStatus
;
396 mCallback(mClosure
, status
);
403 nsresult
Cancel(nsresult aReason
) {
404 MutexAutoLock
lock(mLock
);
406 return NS_ERROR_FAILURE
;
409 if (NS_SUCCEEDED(aReason
)) {
410 NS_WARNING("cancel with non-failure status code");
411 aReason
= NS_BASE_STREAM_CLOSED
;
415 mCancelStatus
= aReason
;
419 NS_IMETHOD
OnInputStreamReady(nsIAsyncInputStream
* aSource
) override
{
420 PostContinuationEvent();
424 NS_IMETHOD
OnOutputStreamReady(nsIAsyncOutputStream
* aSink
) override
{
425 PostContinuationEvent();
429 // continuation event handler
430 NS_IMETHOD
Run() override
{
433 // clear "in process" flag and post any pending continuation event
434 MutexAutoLock
lock(mLock
);
435 mEventInProcess
= false;
436 if (mEventIsPending
) {
437 mEventIsPending
= false;
438 PostContinuationEvent_Locked();
444 nsresult
Cancel() MOZ_MUST_OVERRIDE override
= 0;
446 nsresult
PostContinuationEvent() {
447 // we cannot post a continuation event if there is currently
448 // an event in process. doing so could result in Process being
449 // run simultaneously on multiple threads, so we mark the event
450 // as pending, and if an event is already in process then we
451 // just let that existing event take care of posting the real
452 // continuation event.
454 MutexAutoLock
lock(mLock
);
455 return PostContinuationEvent_Locked();
458 nsresult
PostContinuationEvent_Locked() MOZ_REQUIRES(mLock
) {
460 if (mEventInProcess
) {
461 mEventIsPending
= true;
463 rv
= mTarget
->Dispatch(this, NS_DISPATCH_NORMAL
);
464 if (NS_SUCCEEDED(rv
)) {
465 mEventInProcess
= true;
467 NS_WARNING("unable to post continuation event");
474 nsCOMPtr
<nsIInputStream
> mSource
;
475 nsCOMPtr
<nsIOutputStream
> mSink
;
476 nsCOMPtr
<nsIAsyncInputStream
> mAsyncSource
;
477 nsCOMPtr
<nsIAsyncOutputStream
> mAsyncSink
;
478 nsCOMPtr
<nsIEventTarget
> mTarget
;
480 nsAsyncCopyCallbackFun mCallback
;
481 nsAsyncCopyProgressFun mProgressCallback
;
484 bool mEventInProcess
MOZ_GUARDED_BY(mLock
);
485 bool mEventIsPending
MOZ_GUARDED_BY(mLock
);
488 bool mCanceled
MOZ_GUARDED_BY(mLock
);
489 nsresult mCancelStatus
MOZ_GUARDED_BY(mLock
);
491 // virtual since subclasses call superclass Release()
492 virtual ~nsAStreamCopier() = default;
495 NS_IMPL_ISUPPORTS_INHERITED(nsAStreamCopier
, CancelableRunnable
,
496 nsIInputStreamCallback
, nsIOutputStreamCallback
)
498 class nsStreamCopierIB final
: public nsAStreamCopier
{
500 nsStreamCopierIB() = default;
501 virtual ~nsStreamCopierIB() = default;
503 struct MOZ_STACK_CLASS ReadSegmentsState
{
504 // the nsIOutputStream will outlive the ReadSegmentsState on the stack
505 nsIOutputStream
* MOZ_NON_OWNING_REF mSink
;
506 nsresult mSinkCondition
;
509 static nsresult
ConsumeInputBuffer(nsIInputStream
* aInStr
, void* aClosure
,
510 const char* aBuffer
, uint32_t aOffset
,
511 uint32_t aCount
, uint32_t* aCountWritten
) {
512 ReadSegmentsState
* state
= (ReadSegmentsState
*)aClosure
;
514 nsresult rv
= state
->mSink
->Write(aBuffer
, aCount
, aCountWritten
);
516 state
->mSinkCondition
= rv
;
517 } else if (*aCountWritten
== 0) {
518 state
->mSinkCondition
= NS_BASE_STREAM_CLOSED
;
521 return state
->mSinkCondition
;
524 uint32_t DoCopy(nsresult
* aSourceCondition
,
525 nsresult
* aSinkCondition
) override
{
526 ReadSegmentsState state
;
528 state
.mSinkCondition
= NS_OK
;
532 mSource
->ReadSegments(ConsumeInputBuffer
, &state
, mChunkSize
, &n
);
533 *aSinkCondition
= NS_SUCCEEDED(state
.mSinkCondition
) && n
== 0
534 ? mSink
->StreamStatus()
535 : state
.mSinkCondition
;
539 nsresult
Cancel() override
{ return NS_OK
; }
542 class nsStreamCopierOB final
: public nsAStreamCopier
{
544 nsStreamCopierOB() = default;
545 virtual ~nsStreamCopierOB() = default;
547 struct MOZ_STACK_CLASS WriteSegmentsState
{
548 // the nsIInputStream will outlive the WriteSegmentsState on the stack
549 nsIInputStream
* MOZ_NON_OWNING_REF mSource
;
550 nsresult mSourceCondition
;
553 static nsresult
FillOutputBuffer(nsIOutputStream
* aOutStr
, void* aClosure
,
554 char* aBuffer
, uint32_t aOffset
,
555 uint32_t aCount
, uint32_t* aCountRead
) {
556 WriteSegmentsState
* state
= (WriteSegmentsState
*)aClosure
;
558 nsresult rv
= state
->mSource
->Read(aBuffer
, aCount
, aCountRead
);
560 state
->mSourceCondition
= rv
;
561 } else if (*aCountRead
== 0) {
562 state
->mSourceCondition
= NS_BASE_STREAM_CLOSED
;
565 return state
->mSourceCondition
;
568 uint32_t DoCopy(nsresult
* aSourceCondition
,
569 nsresult
* aSinkCondition
) override
{
570 WriteSegmentsState state
;
571 state
.mSource
= mSource
;
572 state
.mSourceCondition
= NS_OK
;
576 mSink
->WriteSegments(FillOutputBuffer
, &state
, mChunkSize
, &n
);
577 *aSourceCondition
= NS_SUCCEEDED(state
.mSourceCondition
) && n
== 0
578 ? mSource
->StreamStatus()
579 : state
.mSourceCondition
;
583 nsresult
Cancel() override
{ return NS_OK
; }
586 //-----------------------------------------------------------------------------
588 nsresult
NS_AsyncCopy(nsIInputStream
* aSource
, nsIOutputStream
* aSink
,
589 nsIEventTarget
* aTarget
, nsAsyncCopyMode aMode
,
590 uint32_t aChunkSize
, nsAsyncCopyCallbackFun aCallback
,
591 void* aClosure
, bool aCloseSource
, bool aCloseSink
,
592 nsISupports
** aCopierCtx
,
593 nsAsyncCopyProgressFun aProgressCallback
) {
594 NS_ASSERTION(aTarget
, "non-null target required");
597 nsAStreamCopier
* copier
;
599 if (aMode
== NS_ASYNCCOPY_VIA_READSEGMENTS
) {
600 copier
= new nsStreamCopierIB();
602 copier
= new nsStreamCopierOB();
605 // Start() takes an owning ref to the copier...
607 rv
= copier
->Start(aSource
, aSink
, aTarget
, aCallback
, aClosure
, aChunkSize
,
608 aCloseSource
, aCloseSink
, aProgressCallback
);
611 *aCopierCtx
= static_cast<nsISupports
*>(static_cast<nsIRunnable
*>(copier
));
612 NS_ADDREF(*aCopierCtx
);
619 //-----------------------------------------------------------------------------
621 nsresult
NS_CancelAsyncCopy(nsISupports
* aCopierCtx
, nsresult aReason
) {
622 nsAStreamCopier
* copier
=
623 static_cast<nsAStreamCopier
*>(static_cast<nsIRunnable
*>(aCopierCtx
));
624 return copier
->Cancel(aReason
);
627 //-----------------------------------------------------------------------------
630 template <typename T
>
631 struct ResultTraits
{};
634 struct ResultTraits
<nsACString
> {
635 static void Clear(nsACString
& aString
) { aString
.Truncate(); }
637 static char* GetStorage(nsACString
& aString
) {
638 return aString
.BeginWriting();
643 struct ResultTraits
<nsTArray
<uint8_t>> {
644 static void Clear(nsTArray
<uint8_t>& aArray
) { aArray
.Clear(); }
646 static char* GetStorage(nsTArray
<uint8_t>& aArray
) {
647 return reinterpret_cast<char*>(aArray
.Elements());
652 template <typename T
>
653 nsresult
DoConsumeStream(nsIInputStream
* aStream
, uint32_t aMaxCount
,
656 ResultTraits
<T
>::Clear(aResult
);
660 rv
= aStream
->Available(&avail64
);
662 if (rv
== NS_BASE_STREAM_CLOSED
) {
671 uint32_t avail
= (uint32_t)XPCOM_MIN
<uint64_t>(avail64
, aMaxCount
);
673 // resize aResult buffer
674 uint32_t length
= aResult
.Length();
675 CheckedInt
<uint32_t> newLength
= CheckedInt
<uint32_t>(length
) + avail
;
676 if (!newLength
.isValid()) {
677 return NS_ERROR_FILE_TOO_BIG
;
680 if (!aResult
.SetLength(newLength
.value(), fallible
)) {
681 return NS_ERROR_OUT_OF_MEMORY
;
683 char* buf
= ResultTraits
<T
>::GetStorage(aResult
) + length
;
686 rv
= aStream
->Read(buf
, avail
, &n
);
691 MOZ_ASSERT(n
< avail
, "What happened there???");
692 aResult
.SetLength(length
+ n
);
703 nsresult
NS_ConsumeStream(nsIInputStream
* aStream
, uint32_t aMaxCount
,
704 nsACString
& aResult
) {
705 return DoConsumeStream(aStream
, aMaxCount
, aResult
);
708 nsresult
NS_ConsumeStream(nsIInputStream
* aStream
, uint32_t aMaxCount
,
709 nsTArray
<uint8_t>& aResult
) {
710 return DoConsumeStream(aStream
, aMaxCount
, aResult
);
713 //-----------------------------------------------------------------------------
715 static nsresult
TestInputStream(nsIInputStream
* aInStr
, void* aClosure
,
716 const char* aBuffer
, uint32_t aOffset
,
717 uint32_t aCount
, uint32_t* aCountWritten
) {
718 bool* result
= static_cast<bool*>(aClosure
);
721 return NS_ERROR_ABORT
; // don't call me anymore
724 bool NS_InputStreamIsBuffered(nsIInputStream
* aStream
) {
725 nsCOMPtr
<nsIBufferedInputStream
> bufferedIn
= do_QueryInterface(aStream
);
732 nsresult rv
= aStream
->ReadSegments(TestInputStream
, &result
, 1, &n
);
733 return result
|| rv
!= NS_ERROR_NOT_IMPLEMENTED
;
736 static nsresult
TestOutputStream(nsIOutputStream
* aOutStr
, void* aClosure
,
737 char* aBuffer
, uint32_t aOffset
,
738 uint32_t aCount
, uint32_t* aCountRead
) {
739 bool* result
= static_cast<bool*>(aClosure
);
742 return NS_ERROR_ABORT
; // don't call me anymore
745 bool NS_OutputStreamIsBuffered(nsIOutputStream
* aStream
) {
746 nsCOMPtr
<nsIBufferedOutputStream
> bufferedOut
= do_QueryInterface(aStream
);
753 aStream
->WriteSegments(TestOutputStream
, &result
, 1, &n
);
757 //-----------------------------------------------------------------------------
759 nsresult
NS_CopySegmentToStream(nsIInputStream
* aInStr
, void* aClosure
,
760 const char* aBuffer
, uint32_t aOffset
,
761 uint32_t aCount
, uint32_t* aCountWritten
) {
762 nsIOutputStream
* outStr
= static_cast<nsIOutputStream
*>(aClosure
);
766 nsresult rv
= outStr
->Write(aBuffer
, aCount
, &n
);
777 nsresult
NS_CopySegmentToBuffer(nsIInputStream
* aInStr
, void* aClosure
,
778 const char* aBuffer
, uint32_t aOffset
,
779 uint32_t aCount
, uint32_t* aCountWritten
) {
780 char* toBuf
= static_cast<char*>(aClosure
);
781 memcpy(&toBuf
[aOffset
], aBuffer
, aCount
);
782 *aCountWritten
= aCount
;
786 nsresult
NS_CopyBufferToSegment(nsIOutputStream
* aOutStr
, void* aClosure
,
787 char* aBuffer
, uint32_t aOffset
,
788 uint32_t aCount
, uint32_t* aCountRead
) {
789 const char* fromBuf
= static_cast<const char*>(aClosure
);
790 memcpy(aBuffer
, &fromBuf
[aOffset
], aCount
);
791 *aCountRead
= aCount
;
795 nsresult
NS_CopyStreamToSegment(nsIOutputStream
* aOutputStream
, void* aClosure
,
796 char* aToSegment
, uint32_t aFromOffset
,
797 uint32_t aCount
, uint32_t* aReadCount
) {
798 nsIInputStream
* fromStream
= static_cast<nsIInputStream
*>(aClosure
);
799 return fromStream
->Read(aToSegment
, aCount
, aReadCount
);
802 nsresult
NS_DiscardSegment(nsIInputStream
* aInStr
, void* aClosure
,
803 const char* aBuffer
, uint32_t aOffset
,
804 uint32_t aCount
, uint32_t* aCountWritten
) {
805 *aCountWritten
= aCount
;
809 //-----------------------------------------------------------------------------
811 nsresult
NS_WriteSegmentThunk(nsIInputStream
* aInStr
, void* aClosure
,
812 const char* aBuffer
, uint32_t aOffset
,
813 uint32_t aCount
, uint32_t* aCountWritten
) {
814 nsWriteSegmentThunk
* thunk
= static_cast<nsWriteSegmentThunk
*>(aClosure
);
815 return thunk
->mFun(thunk
->mStream
, thunk
->mClosure
, aBuffer
, aOffset
, aCount
,
819 nsresult
NS_FillArray(FallibleTArray
<char>& aDest
, nsIInputStream
* aInput
,
820 uint32_t aKeep
, uint32_t* aNewBytes
) {
821 MOZ_ASSERT(aInput
, "null stream");
822 MOZ_ASSERT(aKeep
<= aDest
.Length(), "illegal keep count");
824 char* aBuffer
= aDest
.Elements();
825 int64_t keepOffset
= int64_t(aDest
.Length()) - aKeep
;
826 if (aKeep
!= 0 && keepOffset
> 0) {
827 memmove(aBuffer
, aBuffer
+ keepOffset
, aKeep
);
831 aInput
->Read(aBuffer
+ aKeep
, aDest
.Capacity() - aKeep
, aNewBytes
);
835 // NOTE: we rely on the fact that the new slots are NOT initialized by
836 // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct()
838 aDest
.SetLengthAndRetainStorage(aKeep
+ *aNewBytes
);
840 MOZ_ASSERT(aDest
.Length() <= aDest
.Capacity(), "buffer overflow");
844 bool NS_InputStreamIsCloneable(nsIInputStream
* aSource
) {
849 nsCOMPtr
<nsICloneableInputStream
> cloneable
= do_QueryInterface(aSource
);
850 return cloneable
&& cloneable
->GetCloneable();
853 nsresult
NS_CloneInputStream(nsIInputStream
* aSource
,
854 nsIInputStream
** aCloneOut
,
855 nsIInputStream
** aReplacementOut
) {
856 if (NS_WARN_IF(!aSource
)) {
857 return NS_ERROR_FAILURE
;
860 // Attempt to perform the clone directly on the source stream
861 nsCOMPtr
<nsICloneableInputStream
> cloneable
= do_QueryInterface(aSource
);
862 if (cloneable
&& cloneable
->GetCloneable()) {
863 if (aReplacementOut
) {
864 *aReplacementOut
= nullptr;
866 return cloneable
->Clone(aCloneOut
);
869 // If we failed the clone and the caller does not want to replace their
870 // original stream, then we are done. Return error.
871 if (!aReplacementOut
) {
872 return NS_ERROR_FAILURE
;
875 // The caller has opted-in to the fallback clone support that replaces
876 // the original stream. Copy the data to a pipe and return two cloned
879 nsCOMPtr
<nsIInputStream
> reader
;
880 nsCOMPtr
<nsIInputStream
> readerClone
;
881 nsCOMPtr
<nsIOutputStream
> writer
;
883 NS_NewPipe(getter_AddRefs(reader
), getter_AddRefs(writer
), 0,
884 0, // default segment size and max size
885 true, true); // non-blocking
887 // Propagate length information provided by nsIInputStreamLength. We don't use
888 // InputStreamLengthHelper::GetSyncLength to avoid the risk of blocking when
889 // called off-main-thread.
891 if (nsCOMPtr
<nsIInputStreamLength
> streamLength
= do_QueryInterface(aSource
);
892 streamLength
&& NS_SUCCEEDED(streamLength
->Length(&length
)) &&
894 reader
= new mozilla::InputStreamLengthWrapper(reader
.forget(), length
);
897 cloneable
= do_QueryInterface(reader
);
898 MOZ_ASSERT(cloneable
&& cloneable
->GetCloneable());
900 nsresult rv
= cloneable
->Clone(getter_AddRefs(readerClone
));
901 if (NS_WARN_IF(NS_FAILED(rv
))) {
905 nsCOMPtr
<nsIEventTarget
> target
=
906 do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID
, &rv
);
907 if (NS_WARN_IF(NS_FAILED(rv
))) {
911 rv
= NS_AsyncCopy(aSource
, writer
, target
, NS_ASYNCCOPY_VIA_WRITESEGMENTS
);
912 if (NS_WARN_IF(NS_FAILED(rv
))) {
916 readerClone
.forget(aCloneOut
);
917 reader
.forget(aReplacementOut
);
922 nsresult
NS_MakeAsyncNonBlockingInputStream(
923 already_AddRefed
<nsIInputStream
> aSource
,
924 nsIAsyncInputStream
** aAsyncInputStream
, bool aCloseWhenDone
,
925 uint32_t aFlags
, uint32_t aSegmentSize
, uint32_t aSegmentCount
) {
926 nsCOMPtr
<nsIInputStream
> source
= std::move(aSource
);
927 if (NS_WARN_IF(!aAsyncInputStream
)) {
928 return NS_ERROR_FAILURE
;
931 bool nonBlocking
= false;
932 nsresult rv
= source
->IsNonBlocking(&nonBlocking
);
933 if (NS_WARN_IF(NS_FAILED(rv
))) {
937 nsCOMPtr
<nsIAsyncInputStream
> asyncStream
= do_QueryInterface(source
);
939 if (nonBlocking
&& asyncStream
) {
940 // This stream is perfect!
941 asyncStream
.forget(aAsyncInputStream
);
946 // If the stream is non-blocking but not async, we wrap it.
947 return NonBlockingAsyncInputStream::Create(source
.forget(),
951 nsCOMPtr
<nsIStreamTransportService
> sts
=
952 do_GetService(kStreamTransportServiceCID
, &rv
);
953 if (NS_WARN_IF(NS_FAILED(rv
))) {
957 nsCOMPtr
<nsITransport
> transport
;
958 rv
= sts
->CreateInputTransport(source
, aCloseWhenDone
,
959 getter_AddRefs(transport
));
960 if (NS_WARN_IF(NS_FAILED(rv
))) {
964 nsCOMPtr
<nsIInputStream
> wrapper
;
965 rv
= transport
->OpenInputStream(aFlags
, aSegmentSize
, aSegmentCount
,
966 getter_AddRefs(wrapper
));
967 if (NS_WARN_IF(NS_FAILED(rv
))) {
971 asyncStream
= do_QueryInterface(wrapper
);
972 MOZ_ASSERT(asyncStream
);
974 asyncStream
.forget(aAsyncInputStream
);