1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim:set ts=4 sts=2 sw=2 et cin: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "nsIOService.h"
8 #include "nsInputStreamPump.h"
9 #include "nsIStreamTransportService.h"
10 #include "nsIThreadRetargetableStreamListener.h"
11 #include "nsThreadUtils.h"
13 #include "mozilla/Logging.h"
14 #include "mozilla/NonBlockingAsyncInputStream.h"
15 #include "mozilla/ProfilerLabels.h"
16 #include "mozilla/SlicedInputStream.h"
17 #include "mozilla/StaticPrefs_network.h"
18 #include "nsIStreamListener.h"
19 #include "nsILoadGroup.h"
21 #include "nsNetUtil.h"
22 #include "nsStreamUtils.h"
26 // MOZ_LOG=nsStreamPump:5
28 static mozilla::LazyLogModule
gStreamPumpLog("nsStreamPump");
30 #define LOG(args) MOZ_LOG(gStreamPumpLog, mozilla::LogLevel::Debug, args)
32 //-----------------------------------------------------------------------------
33 // nsInputStreamPump methods
34 //-----------------------------------------------------------------------------
36 nsInputStreamPump::nsInputStreamPump() : mOffMainThread(!NS_IsMainThread()) {}
38 nsresult
nsInputStreamPump::Create(nsInputStreamPump
** result
,
39 nsIInputStream
* stream
, uint32_t segsize
,
40 uint32_t segcount
, bool closeWhenDone
,
41 nsISerialEventTarget
* mainThreadTarget
) {
42 nsresult rv
= NS_ERROR_OUT_OF_MEMORY
;
43 RefPtr
<nsInputStreamPump
> pump
= new nsInputStreamPump();
45 rv
= pump
->Init(stream
, segsize
, segcount
, closeWhenDone
, mainThreadTarget
);
46 if (NS_SUCCEEDED(rv
)) {
54 PeekData(nsInputStreamPump::PeekSegmentFun fun
, void* closure
)
55 : mFunc(fun
), mClosure(closure
) {}
57 nsInputStreamPump::PeekSegmentFun mFunc
;
61 static nsresult
CallPeekFunc(nsIInputStream
* aInStream
, void* aClosure
,
62 const char* aFromSegment
, uint32_t aToOffset
,
63 uint32_t aCount
, uint32_t* aWriteCount
) {
64 NS_ASSERTION(aToOffset
== 0, "Called more than once?");
65 NS_ASSERTION(aCount
> 0, "Called without data?");
67 PeekData
* data
= static_cast<PeekData
*>(aClosure
);
68 data
->mFunc(data
->mClosure
, reinterpret_cast<const uint8_t*>(aFromSegment
),
70 return NS_BINDING_ABORTED
;
73 nsresult
nsInputStreamPump::PeekStream(PeekSegmentFun callback
, void* closure
) {
74 RecursiveMutexAutoLock
lock(mMutex
);
76 MOZ_ASSERT(mAsyncStream
, "PeekStream called without stream");
78 nsresult rv
= CreateBufferedStreamIfNeeded();
79 NS_ENSURE_SUCCESS(rv
, rv
);
81 // See if the pipe is closed by checking the return of Available.
83 rv
= mAsyncStream
->Available(&dummy64
);
84 if (NS_FAILED(rv
)) return rv
;
85 uint32_t dummy
= (uint32_t)std::min(dummy64
, (uint64_t)UINT32_MAX
);
87 PeekData
data(callback
, closure
);
88 return mAsyncStream
->ReadSegments(
89 CallPeekFunc
, &data
, mozilla::net::nsIOService::gDefaultSegmentSize
,
93 nsresult
nsInputStreamPump::EnsureWaiting() {
94 mMutex
.AssertCurrentThreadIn();
96 // no need to worry about multiple threads... an input stream pump lives
97 // on only one thread at a time.
98 MOZ_ASSERT(mAsyncStream
);
99 if (!mWaitingForInputStreamReady
&& !mProcessingCallbacks
) {
100 // Ensure OnStateStop is called on the main thread only when this pump is
101 // created on main thread.
102 if (mState
== STATE_STOP
&& !mOffMainThread
) {
103 nsCOMPtr
<nsISerialEventTarget
> mainThread
=
104 mLabeledMainThreadTarget
105 ? mLabeledMainThreadTarget
106 : do_AddRef(mozilla::GetMainThreadSerialEventTarget());
107 if (mTargetThread
!= mainThread
) {
108 mTargetThread
= mainThread
;
111 MOZ_ASSERT(mTargetThread
);
112 nsresult rv
= mAsyncStream
->AsyncWait(this, 0, 0, mTargetThread
);
114 NS_ERROR("AsyncWait failed");
117 // Any retargeting during STATE_START or START_TRANSFER is complete
118 // after the call to AsyncWait; next callback will be on mTargetThread.
119 mRetargeting
= false;
120 mWaitingForInputStreamReady
= true;
125 //-----------------------------------------------------------------------------
126 // nsInputStreamPump::nsISupports
127 //-----------------------------------------------------------------------------
129 // although this class can only be accessed from one thread at a time, we do
130 // allow its ownership to move from thread to thread, assuming the consumer
131 // understands the limitations of this.
132 NS_IMPL_ADDREF(nsInputStreamPump
)
133 NS_IMPL_RELEASE(nsInputStreamPump
)
134 NS_INTERFACE_MAP_BEGIN(nsInputStreamPump
)
135 NS_INTERFACE_MAP_ENTRY(nsIRequest
)
136 NS_INTERFACE_MAP_ENTRY(nsIThreadRetargetableRequest
)
137 NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback
)
138 NS_INTERFACE_MAP_ENTRY(nsIInputStreamPump
)
139 NS_INTERFACE_MAP_ENTRY_CONCRETE(nsInputStreamPump
)
140 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports
, nsIInputStreamPump
)
143 //-----------------------------------------------------------------------------
144 // nsInputStreamPump::nsIRequest
145 //-----------------------------------------------------------------------------
148 nsInputStreamPump::GetName(nsACString
& result
) {
149 RecursiveMutexAutoLock
lock(mMutex
);
156 nsInputStreamPump::IsPending(bool* result
) {
157 RecursiveMutexAutoLock
lock(mMutex
);
159 *result
= (mState
!= STATE_IDLE
&& mState
!= STATE_DEAD
);
164 nsInputStreamPump::GetStatus(nsresult
* status
) {
165 RecursiveMutexAutoLock
lock(mMutex
);
171 NS_IMETHODIMP
nsInputStreamPump::SetCanceledReason(const nsACString
& aReason
) {
172 return SetCanceledReasonImpl(aReason
);
175 NS_IMETHODIMP
nsInputStreamPump::GetCanceledReason(nsACString
& aReason
) {
176 return GetCanceledReasonImpl(aReason
);
179 NS_IMETHODIMP
nsInputStreamPump::CancelWithReason(nsresult aStatus
,
180 const nsACString
& aReason
) {
181 return CancelWithReasonImpl(aStatus
, aReason
);
185 nsInputStreamPump::Cancel(nsresult status
) {
186 RecursiveMutexAutoLock
lock(mMutex
);
190 LOG(("nsInputStreamPump::Cancel [this=%p status=%" PRIx32
"]\n", this,
191 static_cast<uint32_t>(status
)));
193 if (NS_FAILED(mStatus
)) {
194 LOG((" already canceled\n"));
198 NS_ASSERTION(NS_FAILED(status
), "cancel with non-failure status code");
201 // close input stream
203 // If mSuspendCount != 0, EnsureWaiting will be called by Resume().
204 // Note that while suspended, OnInputStreamReady will
205 // not do anything, and also note that calling asyncWait
206 // on a closed stream works and will dispatch an event immediately.
208 nsCOMPtr
<nsIEventTarget
> currentTarget
= NS_GetCurrentThread();
209 if (mTargetThread
&& currentTarget
!= mTargetThread
) {
210 nsresult rv
= mTargetThread
->Dispatch(NS_NewRunnableFunction(
211 "nsInputStreamPump::Cancel", [self
= RefPtr
{this}, status
] {
212 RecursiveMutexAutoLock
lock(self
->mMutex
);
213 if (!self
->mAsyncStream
) {
216 self
->mAsyncStream
->CloseWithStatus(status
);
217 if (self
->mSuspendCount
== 0) {
218 self
->EnsureWaiting();
221 NS_ENSURE_SUCCESS(rv
, rv
);
223 mAsyncStream
->CloseWithStatus(status
);
224 if (mSuspendCount
== 0) {
233 nsInputStreamPump::Suspend() {
234 RecursiveMutexAutoLock
lock(mMutex
);
236 LOG(("nsInputStreamPump::Suspend [this=%p]\n", this));
237 NS_ENSURE_TRUE(mState
!= STATE_IDLE
&& mState
!= STATE_DEAD
,
238 NS_ERROR_UNEXPECTED
);
244 nsInputStreamPump::Resume() {
245 RecursiveMutexAutoLock
lock(mMutex
);
247 LOG(("nsInputStreamPump::Resume [this=%p]\n", this));
248 NS_ENSURE_TRUE(mSuspendCount
> 0, NS_ERROR_UNEXPECTED
);
249 NS_ENSURE_TRUE(mState
!= STATE_IDLE
&& mState
!= STATE_DEAD
,
250 NS_ERROR_UNEXPECTED
);
252 // There is a brief in-between state when we null out mAsyncStream in
253 // OnStateStop() before calling OnStopRequest, and only afterwards set
254 // STATE_DEAD, which we need to handle gracefully.
255 if (--mSuspendCount
== 0 && mAsyncStream
) {
262 nsInputStreamPump::GetLoadFlags(nsLoadFlags
* aLoadFlags
) {
263 RecursiveMutexAutoLock
lock(mMutex
);
265 *aLoadFlags
= mLoadFlags
;
270 nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags
) {
271 RecursiveMutexAutoLock
lock(mMutex
);
273 mLoadFlags
= aLoadFlags
;
278 nsInputStreamPump::GetTRRMode(nsIRequest::TRRMode
* aTRRMode
) {
279 return GetTRRModeImpl(aTRRMode
);
283 nsInputStreamPump::SetTRRMode(nsIRequest::TRRMode aTRRMode
) {
284 return SetTRRModeImpl(aTRRMode
);
288 nsInputStreamPump::GetLoadGroup(nsILoadGroup
** aLoadGroup
) {
289 RecursiveMutexAutoLock
lock(mMutex
);
291 *aLoadGroup
= do_AddRef(mLoadGroup
).take();
296 nsInputStreamPump::SetLoadGroup(nsILoadGroup
* aLoadGroup
) {
297 RecursiveMutexAutoLock
lock(mMutex
);
299 mLoadGroup
= aLoadGroup
;
303 //-----------------------------------------------------------------------------
304 // nsInputStreamPump::nsIInputStreamPump implementation
305 //-----------------------------------------------------------------------------
308 nsInputStreamPump::Init(nsIInputStream
* stream
, uint32_t segsize
,
309 uint32_t segcount
, bool closeWhenDone
,
310 nsISerialEventTarget
* mainThreadTarget
) {
311 // probably we can't be multithread-accessed yet
312 RecursiveMutexAutoLock
lock(mMutex
);
313 NS_ENSURE_TRUE(mState
== STATE_IDLE
, NS_ERROR_IN_PROGRESS
);
317 mSegCount
= segcount
;
318 mCloseWhenDone
= closeWhenDone
;
319 mLabeledMainThreadTarget
= mainThreadTarget
;
320 if (mOffMainThread
&& mLabeledMainThreadTarget
) {
323 "Init stream pump off main thread with a main thread event target.");
324 return NS_ERROR_FAILURE
;
331 nsInputStreamPump::AsyncRead(nsIStreamListener
* listener
) {
332 RecursiveMutexAutoLock
lock(mMutex
);
334 // This ensures only one thread can interact with a pump at a time
335 NS_ENSURE_TRUE(mState
== STATE_IDLE
, NS_ERROR_IN_PROGRESS
);
336 NS_ENSURE_ARG_POINTER(listener
);
337 MOZ_ASSERT(NS_IsMainThread() || mOffMainThread
,
338 "nsInputStreamPump should be read from the "
339 "main thread only.");
341 nsresult rv
= NS_MakeAsyncNonBlockingInputStream(
342 mStream
.forget(), getter_AddRefs(mAsyncStream
), mCloseWhenDone
, mSegSize
,
344 if (NS_WARN_IF(NS_FAILED(rv
))) {
348 MOZ_ASSERT(mAsyncStream
);
350 // mStreamOffset now holds the number of bytes currently read.
353 // grab event queue (we must do this here by contract, since all notifications
354 // must go to the thread which called AsyncRead)
355 if (NS_IsMainThread() && mLabeledMainThreadTarget
) {
356 mTargetThread
= mLabeledMainThreadTarget
;
358 mTargetThread
= mozilla::GetCurrentSerialEventTarget();
360 NS_ENSURE_STATE(mTargetThread
);
362 rv
= EnsureWaiting();
363 if (NS_FAILED(rv
)) return rv
;
365 if (mLoadGroup
) mLoadGroup
->AddRequest(this, nullptr);
367 mState
= STATE_START
;
368 mListener
= listener
;
372 //-----------------------------------------------------------------------------
373 // nsInputStreamPump::nsIInputStreamCallback implementation
374 //-----------------------------------------------------------------------------
377 nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream
* stream
) {
378 LOG(("nsInputStreamPump::OnInputStreamReady [this=%p]\n", this));
380 AUTO_PROFILER_LABEL("nsInputStreamPump::OnInputStreamReady", NETWORK
);
382 // this function has been called from a PLEvent, so we can safely call
383 // any listener or progress sink methods directly from here.
386 // There should only be one iteration of this loop happening at a time.
387 // To prevent AsyncWait() (called during callbacks or on other threads)
388 // from creating a parallel OnInputStreamReady(), we use:
390 // -- a boolean mProcessingCallbacks to detect parallel loops
391 // when exiting the mutex for callbacks.
392 RecursiveMutexAutoLock
lock(mMutex
);
394 // Prevent parallel execution during callbacks, while out of mutex.
395 if (mProcessingCallbacks
) {
396 MOZ_ASSERT(!mProcessingCallbacks
);
399 mProcessingCallbacks
= true;
400 if (mSuspendCount
|| mState
== STATE_IDLE
|| mState
== STATE_DEAD
) {
401 mWaitingForInputStreamReady
= false;
402 mProcessingCallbacks
= false;
409 nextState
= OnStateStart();
412 nextState
= OnStateTransfer();
415 mRetargeting
= false;
416 nextState
= OnStateStop();
420 MOZ_ASSERT_UNREACHABLE("Unknown enum value.");
421 return NS_ERROR_UNEXPECTED
;
424 bool stillTransferring
=
425 (mState
== STATE_TRANSFER
&& nextState
== STATE_TRANSFER
);
426 if (stillTransferring
) {
427 NS_ASSERTION(NS_SUCCEEDED(mStatus
),
428 "Should not have failed status for ongoing transfer");
430 NS_ASSERTION(mState
!= nextState
,
431 "Only OnStateTransfer can be called more than once.");
434 NS_ASSERTION(mState
!= STATE_STOP
,
435 "Retargeting should not happen during OnStateStop.");
438 // Set mRetargeting so EnsureWaiting will be called. It ensures that
439 // OnStateStop is called on the main thread.
440 if (nextState
== STATE_STOP
&& !NS_IsMainThread() && !mOffMainThread
) {
444 // Unset mProcessingCallbacks here (while we have lock) so our own call to
445 // EnsureWaiting isn't blocked by it.
446 mProcessingCallbacks
= false;
448 // We must break the loop if suspended during one of the previous
452 mWaitingForInputStreamReady
= false;
456 // Wait asynchronously if there is still data to transfer, or we're
457 // switching event delivery to another thread.
458 if (stillTransferring
|| mRetargeting
) {
460 mWaitingForInputStreamReady
= false;
461 nsresult rv
= EnsureWaiting();
462 if (NS_SUCCEEDED(rv
)) break;
464 // Failure to start asynchronous wait: stop transfer.
465 // Do not set mStatus if it was previously set to report a failure.
466 if (NS_SUCCEEDED(mStatus
)) {
469 nextState
= STATE_STOP
;
477 uint32_t nsInputStreamPump::OnStateStart() MOZ_REQUIRES(mMutex
) {
478 mMutex
.AssertCurrentThreadIn();
480 AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateStart", NETWORK
);
482 LOG((" OnStateStart [this=%p]\n", this));
486 // need to check the reason why the stream is ready. this is required
487 // so our listener can check our status from OnStartRequest.
488 // XXX async streams should have a GetStatus method!
489 if (NS_SUCCEEDED(mStatus
)) {
491 rv
= mAsyncStream
->Available(&avail
);
492 if (NS_FAILED(rv
) && rv
!= NS_BASE_STREAM_CLOSED
) mStatus
= rv
;
496 nsCOMPtr
<nsIStreamListener
> listener
= mListener
;
497 // We're on the writing thread
500 // Note: Must exit mutex for call to OnStartRequest to avoid
501 // deadlocks when calls to RetargetDeliveryTo for multiple
502 // nsInputStreamPumps are needed (e.g. nsHttpChannel).
503 RecursiveMutexAutoUnlock
unlock(mMutex
);
504 rv
= listener
->OnStartRequest(this);
507 // an error returned from OnStartRequest should cause us to abort; however,
508 // we must not stomp on mStatus if already canceled.
509 if (NS_FAILED(rv
) && NS_SUCCEEDED(mStatus
)) mStatus
= rv
;
511 return NS_SUCCEEDED(mStatus
) ? STATE_TRANSFER
: STATE_STOP
;
514 uint32_t nsInputStreamPump::OnStateTransfer() MOZ_REQUIRES(mMutex
) {
515 mMutex
.AssertCurrentThreadIn();
517 AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateTransfer", NETWORK
);
519 LOG((" OnStateTransfer [this=%p]\n", this));
521 // if canceled, go directly to STATE_STOP...
522 if (NS_FAILED(mStatus
)) return STATE_STOP
;
524 nsresult rv
= CreateBufferedStreamIfNeeded();
525 if (NS_WARN_IF(NS_FAILED(rv
))) {
530 rv
= mAsyncStream
->Available(&avail
);
531 LOG((" Available returned [stream=%p rv=%" PRIx32
" avail=%" PRIu64
"]\n",
532 mAsyncStream
.get(), static_cast<uint32_t>(rv
), avail
));
534 if (rv
== NS_BASE_STREAM_CLOSED
) {
537 } else if (NS_SUCCEEDED(rv
) && avail
) {
538 // we used to limit avail to 16K - we were afraid some ODA handlers
539 // might assume they wouldn't get more than 16K at once
540 // we're removing that limit since it speeds up local file access.
541 // Now there's an implicit 64K limit of 4 16K segments
542 // NOTE: ok, so the story is as follows. OnDataAvailable impls
543 // are by contract supposed to consume exactly |avail| bytes.
544 // however, many do not... mailnews... stream converters...
545 // cough, cough. the input stream pump is fairly tolerant
546 // in this regard; however, if an ODA does not consume any
547 // data from the stream, then we could potentially end up in
548 // an infinite loop. we do our best here to try to catch
549 // such an error. (see bug 189672)
551 // in most cases this QI will succeed (mAsyncStream is almost always
552 // a nsPipeInputStream, which implements nsITellableStream::Tell).
553 int64_t offsetBefore
;
554 nsCOMPtr
<nsITellableStream
> tellable
= do_QueryInterface(mAsyncStream
);
555 if (tellable
&& NS_FAILED(tellable
->Tell(&offsetBefore
))) {
556 MOZ_ASSERT_UNREACHABLE("Tell failed on readable stream");
560 uint32_t odaAvail
= avail
> UINT32_MAX
? UINT32_MAX
: uint32_t(avail
);
562 LOG((" calling OnDataAvailable [offset=%" PRIu64
" count=%" PRIu64
564 mStreamOffset
, avail
, odaAvail
));
567 // We may be called on non-MainThread even if mOffMainThread is
568 // false, due to RetargetDeliveryTo(), so don't use AssertOnThread()
570 MOZ_ASSERT(mTargetThread
->IsOnCurrentThread());
572 MOZ_ASSERT(NS_IsMainThread());
575 nsCOMPtr
<nsIStreamListener
> listener
= mListener
;
576 // Note: Must exit mutex for call to OnStartRequest to avoid
577 // deadlocks when calls to RetargetDeliveryTo for multiple
578 // nsInputStreamPumps are needed (e.g. nsHttpChannel).
579 RecursiveMutexAutoUnlock
unlock(mMutex
);
580 // We're on the writing thread for mListener and mAsyncStream.
581 // mStreamOffset is only touched in OnStateTransfer, and AsyncRead
582 // shouldn't be called during OnDataAvailable()
584 MOZ_PUSH_IGNORE_THREAD_SAFETY
585 rv
= listener
->OnDataAvailable(this, mAsyncStream
, mStreamOffset
,
587 MOZ_POP_THREAD_SAFETY
590 // don't enter this code if ODA failed or called Cancel
591 if (NS_SUCCEEDED(rv
) && NS_SUCCEEDED(mStatus
)) {
592 // test to see if this ODA failed to consume data
594 // NOTE: if Tell fails, which can happen if the stream is
595 // now closed, then we assume that everything was read.
597 if (NS_FAILED(tellable
->Tell(&offsetAfter
))) {
598 offsetAfter
= offsetBefore
+ odaAvail
;
600 if (offsetAfter
> offsetBefore
) {
601 mStreamOffset
+= (offsetAfter
- offsetBefore
);
602 } else if (mSuspendCount
== 0) {
604 // possible infinite loop if we continue pumping data!
606 // NOTE: although not allowed by nsIStreamListener, we
607 // will allow the ODA impl to Suspend the pump. IMAP
610 NS_ERROR("OnDataAvailable implementation consumed no data");
611 mStatus
= NS_ERROR_UNEXPECTED
;
614 mStreamOffset
+= odaAvail
; // assume ODA behaved well
619 // an error returned from Available or OnDataAvailable should cause us to
620 // abort; however, we must not stop on mStatus if already canceled.
622 if (NS_SUCCEEDED(mStatus
)) {
626 // if stream is now closed, advance to STATE_STOP right away.
627 // Available may return 0 bytes available at the moment; that
628 // would not mean that we are done.
629 // XXX async streams should have a GetStatus method!
630 rv
= mAsyncStream
->Available(&avail
);
631 if (NS_SUCCEEDED(rv
)) return STATE_TRANSFER
;
632 if (rv
!= NS_BASE_STREAM_CLOSED
) mStatus
= rv
;
638 nsresult
nsInputStreamPump::CallOnStateStop() {
639 RecursiveMutexAutoLock
lock(mMutex
);
641 MOZ_ASSERT(NS_IsMainThread(),
642 "CallOnStateStop should only be called on the main thread.");
644 mState
= OnStateStop();
648 uint32_t nsInputStreamPump::OnStateStop() MOZ_REQUIRES(mMutex
) {
649 mMutex
.AssertCurrentThreadIn();
651 if (!NS_IsMainThread() && !mOffMainThread
) {
652 // This method can be called on a different thread if nsInputStreamPump
653 // is used off the main-thread.
654 if (NS_SUCCEEDED(mStatus
) && mListener
&&
655 mozilla::StaticPrefs::network_send_OnDataFinished_nsInputStreamPump()) {
656 nsCOMPtr
<nsIThreadRetargetableStreamListener
> retargetableListener
=
657 do_QueryInterface(mListener
);
658 if (retargetableListener
) {
659 retargetableListener
->OnDataFinished(mStatus
);
662 nsresult rv
= mLabeledMainThreadTarget
->Dispatch(
663 mozilla::NewRunnableMethod("nsInputStreamPump::CallOnStateStop", this,
664 &nsInputStreamPump::CallOnStateStop
));
665 NS_ENSURE_SUCCESS(rv
, STATE_DEAD
);
669 AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateStop", NETWORK
);
671 LOG((" OnStateStop [this=%p status=%" PRIx32
"]\n", this,
672 static_cast<uint32_t>(mStatus
)));
674 // if an error occurred, we must be sure to pass the error onto the async
675 // stream. in some cases, this is redundant, but since close is idempotent,
676 // this is OK. otherwise, be sure to honor the "close-when-done" option.
678 if (!mAsyncStream
|| !mListener
) {
679 MOZ_ASSERT(mAsyncStream
, "null mAsyncStream: OnStateStop called twice?");
680 MOZ_ASSERT(mListener
, "null mListener: OnStateStop called twice?");
684 if (NS_FAILED(mStatus
)) {
685 mAsyncStream
->CloseWithStatus(mStatus
);
686 } else if (mCloseWhenDone
) {
687 mAsyncStream
->Close();
690 mAsyncStream
= nullptr;
693 // We're on the writing thread.
694 // We believe that mStatus can't be changed on us here.
697 nsCOMPtr
<nsIStreamListener
> listener
= mListener
;
698 nsresult status
= mStatus
;
699 // Note: Must exit mutex for call to OnStartRequest to avoid
700 // deadlocks when calls to RetargetDeliveryTo for multiple
701 // nsInputStreamPumps are needed (e.g. nsHttpChannel).
702 RecursiveMutexAutoUnlock
unlock(mMutex
);
704 listener
->OnStopRequest(this, status
);
706 mTargetThread
= nullptr;
709 if (mLoadGroup
) mLoadGroup
->RemoveRequest(this, nullptr, mStatus
);
714 nsresult
nsInputStreamPump::CreateBufferedStreamIfNeeded() {
715 if (mAsyncStreamIsBuffered
) {
719 // ReadSegments is not available for any nsIAsyncInputStream. In order to use
720 // it, we wrap a nsIBufferedInputStream around it, if needed.
722 if (NS_InputStreamIsBuffered(mAsyncStream
)) {
723 mAsyncStreamIsBuffered
= true;
727 nsCOMPtr
<nsIInputStream
> stream
;
728 nsresult rv
= NS_NewBufferedInputStream(getter_AddRefs(stream
),
729 mAsyncStream
.forget(), 4096);
730 NS_ENSURE_SUCCESS(rv
, rv
);
732 // A buffered inputStream must implement nsIAsyncInputStream.
733 mAsyncStream
= do_QueryInterface(stream
);
734 MOZ_DIAGNOSTIC_ASSERT(mAsyncStream
);
735 mAsyncStreamIsBuffered
= true;
740 //-----------------------------------------------------------------------------
741 // nsIThreadRetargetableRequest
742 //-----------------------------------------------------------------------------
745 nsInputStreamPump::RetargetDeliveryTo(nsISerialEventTarget
* aNewTarget
) {
746 RecursiveMutexAutoLock
lock(mMutex
);
748 NS_ENSURE_ARG(aNewTarget
);
749 NS_ENSURE_TRUE(mState
== STATE_START
|| mState
== STATE_TRANSFER
,
750 NS_ERROR_UNEXPECTED
);
752 // If canceled, do not retarget. Return with canceled status.
753 if (NS_FAILED(mStatus
)) {
757 if (aNewTarget
== mTargetThread
) {
758 NS_WARNING("Retargeting delivery to same thread");
762 if (mOffMainThread
) {
763 // Don't support retargeting if this pump is already used off the main
765 return NS_ERROR_FAILURE
;
768 // Ensure that |mListener| and any subsequent listeners can be retargeted
769 // to another thread.
771 nsCOMPtr
<nsIThreadRetargetableStreamListener
> retargetableListener
=
772 do_QueryInterface(mListener
, &rv
);
773 if (NS_SUCCEEDED(rv
) && retargetableListener
) {
774 rv
= retargetableListener
->CheckListenerChain();
775 if (NS_SUCCEEDED(rv
)) {
776 mTargetThread
= aNewTarget
;
781 ("nsInputStreamPump::RetargetDeliveryTo [this=%p aNewTarget=%p] "
782 "%s listener [%p] rv[%" PRIx32
"]",
783 this, aNewTarget
, (mTargetThread
== aNewTarget
? "success" : "failure"),
784 (nsIStreamListener
*)mListener
, static_cast<uint32_t>(rv
)));
789 nsInputStreamPump::GetDeliveryTarget(nsISerialEventTarget
** aNewTarget
) {
790 RecursiveMutexAutoLock
lock(mMutex
);
792 nsCOMPtr
<nsISerialEventTarget
> target
= mTargetThread
;
793 target
.forget(aNewTarget
);