Backed out changeset b71c8c052463 (bug 1943846) for causing mass failures. CLOSED...
[gecko.git] / netwerk / base / nsInputStreamPump.cpp
blob53c81051db9b549e3188425367e1e84b09df7b19
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim:set ts=4 sts=2 sw=2 et cin: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "nsIOService.h"
8 #include "nsInputStreamPump.h"
9 #include "nsIStreamTransportService.h"
10 #include "nsIThreadRetargetableStreamListener.h"
11 #include "nsThreadUtils.h"
12 #include "nsCOMPtr.h"
13 #include "mozilla/Logging.h"
14 #include "mozilla/NonBlockingAsyncInputStream.h"
15 #include "mozilla/ProfilerLabels.h"
16 #include "mozilla/SlicedInputStream.h"
17 #include "mozilla/StaticPrefs_network.h"
18 #include "nsIStreamListener.h"
19 #include "nsILoadGroup.h"
20 #include "nsNetCID.h"
21 #include "nsNetUtil.h"
22 #include "nsStreamUtils.h"
23 #include <algorithm>
26 // MOZ_LOG=nsStreamPump:5
28 static mozilla::LazyLogModule gStreamPumpLog("nsStreamPump");
29 #undef LOG
30 #define LOG(args) MOZ_LOG(gStreamPumpLog, mozilla::LogLevel::Debug, args)
32 //-----------------------------------------------------------------------------
33 // nsInputStreamPump methods
34 //-----------------------------------------------------------------------------
36 nsInputStreamPump::nsInputStreamPump() : mOffMainThread(!NS_IsMainThread()) {}
38 nsresult nsInputStreamPump::Create(nsInputStreamPump** result,
39 nsIInputStream* stream, uint32_t segsize,
40 uint32_t segcount, bool closeWhenDone,
41 nsISerialEventTarget* mainThreadTarget) {
42 nsresult rv = NS_ERROR_OUT_OF_MEMORY;
43 RefPtr<nsInputStreamPump> pump = new nsInputStreamPump();
44 if (pump) {
45 rv = pump->Init(stream, segsize, segcount, closeWhenDone, mainThreadTarget);
46 if (NS_SUCCEEDED(rv)) {
47 pump.forget(result);
50 return rv;
53 struct PeekData {
54 PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure)
55 : mFunc(fun), mClosure(closure) {}
57 nsInputStreamPump::PeekSegmentFun mFunc;
58 void* mClosure;
61 static nsresult CallPeekFunc(nsIInputStream* aInStream, void* aClosure,
62 const char* aFromSegment, uint32_t aToOffset,
63 uint32_t aCount, uint32_t* aWriteCount) {
64 NS_ASSERTION(aToOffset == 0, "Called more than once?");
65 NS_ASSERTION(aCount > 0, "Called without data?");
67 PeekData* data = static_cast<PeekData*>(aClosure);
68 data->mFunc(data->mClosure, reinterpret_cast<const uint8_t*>(aFromSegment),
69 aCount);
70 return NS_BINDING_ABORTED;
73 nsresult nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure) {
74 RecursiveMutexAutoLock lock(mMutex);
76 MOZ_ASSERT(mAsyncStream, "PeekStream called without stream");
78 nsresult rv = CreateBufferedStreamIfNeeded();
79 NS_ENSURE_SUCCESS(rv, rv);
81 // See if the pipe is closed by checking the return of Available.
82 uint64_t dummy64;
83 rv = mAsyncStream->Available(&dummy64);
84 if (NS_FAILED(rv)) return rv;
85 uint32_t dummy = (uint32_t)std::min(dummy64, (uint64_t)UINT32_MAX);
87 PeekData data(callback, closure);
88 return mAsyncStream->ReadSegments(
89 CallPeekFunc, &data, mozilla::net::nsIOService::gDefaultSegmentSize,
90 &dummy);
93 nsresult nsInputStreamPump::EnsureWaiting() {
94 mMutex.AssertCurrentThreadIn();
96 // no need to worry about multiple threads... an input stream pump lives
97 // on only one thread at a time.
98 MOZ_ASSERT(mAsyncStream);
99 if (!mWaitingForInputStreamReady && !mProcessingCallbacks) {
100 // Ensure OnStateStop is called on the main thread only when this pump is
101 // created on main thread.
102 if (mState == STATE_STOP && !mOffMainThread) {
103 nsCOMPtr<nsISerialEventTarget> mainThread =
104 mLabeledMainThreadTarget
105 ? mLabeledMainThreadTarget
106 : do_AddRef(mozilla::GetMainThreadSerialEventTarget());
107 if (mTargetThread != mainThread) {
108 mTargetThread = mainThread;
111 MOZ_ASSERT(mTargetThread);
112 nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread);
113 if (NS_FAILED(rv)) {
114 NS_ERROR("AsyncWait failed");
115 return rv;
117 // Any retargeting during STATE_START or START_TRANSFER is complete
118 // after the call to AsyncWait; next callback will be on mTargetThread.
119 mRetargeting = false;
120 mWaitingForInputStreamReady = true;
122 return NS_OK;
125 //-----------------------------------------------------------------------------
126 // nsInputStreamPump::nsISupports
127 //-----------------------------------------------------------------------------
129 // although this class can only be accessed from one thread at a time, we do
130 // allow its ownership to move from thread to thread, assuming the consumer
131 // understands the limitations of this.
132 NS_IMPL_ADDREF(nsInputStreamPump)
133 NS_IMPL_RELEASE(nsInputStreamPump)
134 NS_INTERFACE_MAP_BEGIN(nsInputStreamPump)
135 NS_INTERFACE_MAP_ENTRY(nsIRequest)
136 NS_INTERFACE_MAP_ENTRY(nsIThreadRetargetableRequest)
137 NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback)
138 NS_INTERFACE_MAP_ENTRY(nsIInputStreamPump)
139 NS_INTERFACE_MAP_ENTRY_CONCRETE(nsInputStreamPump)
140 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStreamPump)
141 NS_INTERFACE_MAP_END
143 //-----------------------------------------------------------------------------
144 // nsInputStreamPump::nsIRequest
145 //-----------------------------------------------------------------------------
147 NS_IMETHODIMP
148 nsInputStreamPump::GetName(nsACString& result) {
149 RecursiveMutexAutoLock lock(mMutex);
151 result.Truncate();
152 return NS_OK;
155 NS_IMETHODIMP
156 nsInputStreamPump::IsPending(bool* result) {
157 RecursiveMutexAutoLock lock(mMutex);
159 *result = (mState != STATE_IDLE && mState != STATE_DEAD);
160 return NS_OK;
163 NS_IMETHODIMP
164 nsInputStreamPump::GetStatus(nsresult* status) {
165 RecursiveMutexAutoLock lock(mMutex);
167 *status = mStatus;
168 return NS_OK;
171 NS_IMETHODIMP nsInputStreamPump::SetCanceledReason(const nsACString& aReason) {
172 return SetCanceledReasonImpl(aReason);
175 NS_IMETHODIMP nsInputStreamPump::GetCanceledReason(nsACString& aReason) {
176 return GetCanceledReasonImpl(aReason);
179 NS_IMETHODIMP nsInputStreamPump::CancelWithReason(nsresult aStatus,
180 const nsACString& aReason) {
181 return CancelWithReasonImpl(aStatus, aReason);
184 NS_IMETHODIMP
185 nsInputStreamPump::Cancel(nsresult status) {
186 RecursiveMutexAutoLock lock(mMutex);
188 AssertOnThread();
190 LOG(("nsInputStreamPump::Cancel [this=%p status=%" PRIx32 "]\n", this,
191 static_cast<uint32_t>(status)));
193 if (NS_FAILED(mStatus)) {
194 LOG((" already canceled\n"));
195 return NS_OK;
198 NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code");
199 mStatus = status;
201 // close input stream
202 if (mAsyncStream) {
203 // If mSuspendCount != 0, EnsureWaiting will be called by Resume().
204 // Note that while suspended, OnInputStreamReady will
205 // not do anything, and also note that calling asyncWait
206 // on a closed stream works and will dispatch an event immediately.
208 nsCOMPtr<nsIEventTarget> currentTarget = NS_GetCurrentThread();
209 if (mTargetThread && currentTarget != mTargetThread) {
210 nsresult rv = mTargetThread->Dispatch(NS_NewRunnableFunction(
211 "nsInputStreamPump::Cancel", [self = RefPtr{this}, status] {
212 RecursiveMutexAutoLock lock(self->mMutex);
213 if (!self->mAsyncStream) {
214 return;
216 self->mAsyncStream->CloseWithStatus(status);
217 if (self->mSuspendCount == 0) {
218 self->EnsureWaiting();
220 }));
221 NS_ENSURE_SUCCESS(rv, rv);
222 } else {
223 mAsyncStream->CloseWithStatus(status);
224 if (mSuspendCount == 0) {
225 EnsureWaiting();
229 return NS_OK;
232 NS_IMETHODIMP
233 nsInputStreamPump::Suspend() {
234 RecursiveMutexAutoLock lock(mMutex);
236 LOG(("nsInputStreamPump::Suspend [this=%p]\n", this));
237 NS_ENSURE_TRUE(mState != STATE_IDLE && mState != STATE_DEAD,
238 NS_ERROR_UNEXPECTED);
239 ++mSuspendCount;
240 return NS_OK;
243 NS_IMETHODIMP
244 nsInputStreamPump::Resume() {
245 RecursiveMutexAutoLock lock(mMutex);
247 LOG(("nsInputStreamPump::Resume [this=%p]\n", this));
248 NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED);
249 NS_ENSURE_TRUE(mState != STATE_IDLE && mState != STATE_DEAD,
250 NS_ERROR_UNEXPECTED);
252 // There is a brief in-between state when we null out mAsyncStream in
253 // OnStateStop() before calling OnStopRequest, and only afterwards set
254 // STATE_DEAD, which we need to handle gracefully.
255 if (--mSuspendCount == 0 && mAsyncStream) {
256 EnsureWaiting();
258 return NS_OK;
261 NS_IMETHODIMP
262 nsInputStreamPump::GetLoadFlags(nsLoadFlags* aLoadFlags) {
263 RecursiveMutexAutoLock lock(mMutex);
265 *aLoadFlags = mLoadFlags;
266 return NS_OK;
269 NS_IMETHODIMP
270 nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags) {
271 RecursiveMutexAutoLock lock(mMutex);
273 mLoadFlags = aLoadFlags;
274 return NS_OK;
277 NS_IMETHODIMP
278 nsInputStreamPump::GetTRRMode(nsIRequest::TRRMode* aTRRMode) {
279 return GetTRRModeImpl(aTRRMode);
282 NS_IMETHODIMP
283 nsInputStreamPump::SetTRRMode(nsIRequest::TRRMode aTRRMode) {
284 return SetTRRModeImpl(aTRRMode);
287 NS_IMETHODIMP
288 nsInputStreamPump::GetLoadGroup(nsILoadGroup** aLoadGroup) {
289 RecursiveMutexAutoLock lock(mMutex);
291 *aLoadGroup = do_AddRef(mLoadGroup).take();
292 return NS_OK;
295 NS_IMETHODIMP
296 nsInputStreamPump::SetLoadGroup(nsILoadGroup* aLoadGroup) {
297 RecursiveMutexAutoLock lock(mMutex);
299 mLoadGroup = aLoadGroup;
300 return NS_OK;
303 //-----------------------------------------------------------------------------
304 // nsInputStreamPump::nsIInputStreamPump implementation
305 //-----------------------------------------------------------------------------
307 NS_IMETHODIMP
308 nsInputStreamPump::Init(nsIInputStream* stream, uint32_t segsize,
309 uint32_t segcount, bool closeWhenDone,
310 nsISerialEventTarget* mainThreadTarget) {
311 // probably we can't be multithread-accessed yet
312 RecursiveMutexAutoLock lock(mMutex);
313 NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
315 mStream = stream;
316 mSegSize = segsize;
317 mSegCount = segcount;
318 mCloseWhenDone = closeWhenDone;
319 mLabeledMainThreadTarget = mainThreadTarget;
320 if (mOffMainThread && mLabeledMainThreadTarget) {
321 MOZ_ASSERT(
322 false,
323 "Init stream pump off main thread with a main thread event target.");
324 return NS_ERROR_FAILURE;
327 return NS_OK;
330 NS_IMETHODIMP
331 nsInputStreamPump::AsyncRead(nsIStreamListener* listener) {
332 RecursiveMutexAutoLock lock(mMutex);
334 // This ensures only one thread can interact with a pump at a time
335 NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
336 NS_ENSURE_ARG_POINTER(listener);
337 MOZ_ASSERT(NS_IsMainThread() || mOffMainThread,
338 "nsInputStreamPump should be read from the "
339 "main thread only.");
341 nsresult rv = NS_MakeAsyncNonBlockingInputStream(
342 mStream.forget(), getter_AddRefs(mAsyncStream), mCloseWhenDone, mSegSize,
343 mSegCount);
344 if (NS_WARN_IF(NS_FAILED(rv))) {
345 return rv;
348 MOZ_ASSERT(mAsyncStream);
350 // mStreamOffset now holds the number of bytes currently read.
351 mStreamOffset = 0;
353 // grab event queue (we must do this here by contract, since all notifications
354 // must go to the thread which called AsyncRead)
355 if (NS_IsMainThread() && mLabeledMainThreadTarget) {
356 mTargetThread = mLabeledMainThreadTarget;
357 } else {
358 mTargetThread = mozilla::GetCurrentSerialEventTarget();
360 NS_ENSURE_STATE(mTargetThread);
362 rv = EnsureWaiting();
363 if (NS_FAILED(rv)) return rv;
365 if (mLoadGroup) mLoadGroup->AddRequest(this, nullptr);
367 mState = STATE_START;
368 mListener = listener;
369 return NS_OK;
372 //-----------------------------------------------------------------------------
373 // nsInputStreamPump::nsIInputStreamCallback implementation
374 //-----------------------------------------------------------------------------
376 NS_IMETHODIMP
377 nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream* stream) {
378 LOG(("nsInputStreamPump::OnInputStreamReady [this=%p]\n", this));
380 AUTO_PROFILER_LABEL("nsInputStreamPump::OnInputStreamReady", NETWORK);
382 // this function has been called from a PLEvent, so we can safely call
383 // any listener or progress sink methods directly from here.
385 for (;;) {
386 // There should only be one iteration of this loop happening at a time.
387 // To prevent AsyncWait() (called during callbacks or on other threads)
388 // from creating a parallel OnInputStreamReady(), we use:
389 // -- a mutex; and
390 // -- a boolean mProcessingCallbacks to detect parallel loops
391 // when exiting the mutex for callbacks.
392 RecursiveMutexAutoLock lock(mMutex);
394 // Prevent parallel execution during callbacks, while out of mutex.
395 if (mProcessingCallbacks) {
396 MOZ_ASSERT(!mProcessingCallbacks);
397 break;
399 mProcessingCallbacks = true;
400 if (mSuspendCount || mState == STATE_IDLE || mState == STATE_DEAD) {
401 mWaitingForInputStreamReady = false;
402 mProcessingCallbacks = false;
403 break;
406 uint32_t nextState;
407 switch (mState) {
408 case STATE_START:
409 nextState = OnStateStart();
410 break;
411 case STATE_TRANSFER:
412 nextState = OnStateTransfer();
413 break;
414 case STATE_STOP:
415 mRetargeting = false;
416 nextState = OnStateStop();
417 break;
418 default:
419 nextState = 0;
420 MOZ_ASSERT_UNREACHABLE("Unknown enum value.");
421 return NS_ERROR_UNEXPECTED;
424 bool stillTransferring =
425 (mState == STATE_TRANSFER && nextState == STATE_TRANSFER);
426 if (stillTransferring) {
427 NS_ASSERTION(NS_SUCCEEDED(mStatus),
428 "Should not have failed status for ongoing transfer");
429 } else {
430 NS_ASSERTION(mState != nextState,
431 "Only OnStateTransfer can be called more than once.");
433 if (mRetargeting) {
434 NS_ASSERTION(mState != STATE_STOP,
435 "Retargeting should not happen during OnStateStop.");
438 // Set mRetargeting so EnsureWaiting will be called. It ensures that
439 // OnStateStop is called on the main thread.
440 if (nextState == STATE_STOP && !NS_IsMainThread() && !mOffMainThread) {
441 mRetargeting = true;
444 // Unset mProcessingCallbacks here (while we have lock) so our own call to
445 // EnsureWaiting isn't blocked by it.
446 mProcessingCallbacks = false;
448 // We must break the loop if suspended during one of the previous
449 // operation.
450 if (mSuspendCount) {
451 mState = nextState;
452 mWaitingForInputStreamReady = false;
453 break;
456 // Wait asynchronously if there is still data to transfer, or we're
457 // switching event delivery to another thread.
458 if (stillTransferring || mRetargeting) {
459 mState = nextState;
460 mWaitingForInputStreamReady = false;
461 nsresult rv = EnsureWaiting();
462 if (NS_SUCCEEDED(rv)) break;
464 // Failure to start asynchronous wait: stop transfer.
465 // Do not set mStatus if it was previously set to report a failure.
466 if (NS_SUCCEEDED(mStatus)) {
467 mStatus = rv;
469 nextState = STATE_STOP;
472 mState = nextState;
474 return NS_OK;
477 uint32_t nsInputStreamPump::OnStateStart() MOZ_REQUIRES(mMutex) {
478 mMutex.AssertCurrentThreadIn();
480 AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateStart", NETWORK);
482 LOG((" OnStateStart [this=%p]\n", this));
484 nsresult rv;
486 // need to check the reason why the stream is ready. this is required
487 // so our listener can check our status from OnStartRequest.
488 // XXX async streams should have a GetStatus method!
489 if (NS_SUCCEEDED(mStatus)) {
490 uint64_t avail;
491 rv = mAsyncStream->Available(&avail);
492 if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED) mStatus = rv;
496 nsCOMPtr<nsIStreamListener> listener = mListener;
497 // We're on the writing thread
498 AssertOnThread();
500 // Note: Must exit mutex for call to OnStartRequest to avoid
501 // deadlocks when calls to RetargetDeliveryTo for multiple
502 // nsInputStreamPumps are needed (e.g. nsHttpChannel).
503 RecursiveMutexAutoUnlock unlock(mMutex);
504 rv = listener->OnStartRequest(this);
507 // an error returned from OnStartRequest should cause us to abort; however,
508 // we must not stomp on mStatus if already canceled.
509 if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus)) mStatus = rv;
511 return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP;
514 uint32_t nsInputStreamPump::OnStateTransfer() MOZ_REQUIRES(mMutex) {
515 mMutex.AssertCurrentThreadIn();
517 AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateTransfer", NETWORK);
519 LOG((" OnStateTransfer [this=%p]\n", this));
521 // if canceled, go directly to STATE_STOP...
522 if (NS_FAILED(mStatus)) return STATE_STOP;
524 nsresult rv = CreateBufferedStreamIfNeeded();
525 if (NS_WARN_IF(NS_FAILED(rv))) {
526 return STATE_STOP;
529 uint64_t avail;
530 rv = mAsyncStream->Available(&avail);
531 LOG((" Available returned [stream=%p rv=%" PRIx32 " avail=%" PRIu64 "]\n",
532 mAsyncStream.get(), static_cast<uint32_t>(rv), avail));
534 if (rv == NS_BASE_STREAM_CLOSED) {
535 rv = NS_OK;
536 avail = 0;
537 } else if (NS_SUCCEEDED(rv) && avail) {
538 // we used to limit avail to 16K - we were afraid some ODA handlers
539 // might assume they wouldn't get more than 16K at once
540 // we're removing that limit since it speeds up local file access.
541 // Now there's an implicit 64K limit of 4 16K segments
542 // NOTE: ok, so the story is as follows. OnDataAvailable impls
543 // are by contract supposed to consume exactly |avail| bytes.
544 // however, many do not... mailnews... stream converters...
545 // cough, cough. the input stream pump is fairly tolerant
546 // in this regard; however, if an ODA does not consume any
547 // data from the stream, then we could potentially end up in
548 // an infinite loop. we do our best here to try to catch
549 // such an error. (see bug 189672)
551 // in most cases this QI will succeed (mAsyncStream is almost always
552 // a nsPipeInputStream, which implements nsITellableStream::Tell).
553 int64_t offsetBefore;
554 nsCOMPtr<nsITellableStream> tellable = do_QueryInterface(mAsyncStream);
555 if (tellable && NS_FAILED(tellable->Tell(&offsetBefore))) {
556 MOZ_ASSERT_UNREACHABLE("Tell failed on readable stream");
557 offsetBefore = 0;
560 uint32_t odaAvail = avail > UINT32_MAX ? UINT32_MAX : uint32_t(avail);
562 LOG((" calling OnDataAvailable [offset=%" PRIu64 " count=%" PRIu64
563 "(%u)]\n",
564 mStreamOffset, avail, odaAvail));
567 // We may be called on non-MainThread even if mOffMainThread is
568 // false, due to RetargetDeliveryTo(), so don't use AssertOnThread()
569 if (mTargetThread) {
570 MOZ_ASSERT(mTargetThread->IsOnCurrentThread());
571 } else {
572 MOZ_ASSERT(NS_IsMainThread());
575 nsCOMPtr<nsIStreamListener> listener = mListener;
576 // Note: Must exit mutex for call to OnStartRequest to avoid
577 // deadlocks when calls to RetargetDeliveryTo for multiple
578 // nsInputStreamPumps are needed (e.g. nsHttpChannel).
579 RecursiveMutexAutoUnlock unlock(mMutex);
580 // We're on the writing thread for mListener and mAsyncStream.
581 // mStreamOffset is only touched in OnStateTransfer, and AsyncRead
582 // shouldn't be called during OnDataAvailable()
584 MOZ_PUSH_IGNORE_THREAD_SAFETY
585 rv = listener->OnDataAvailable(this, mAsyncStream, mStreamOffset,
586 odaAvail);
587 MOZ_POP_THREAD_SAFETY
590 // don't enter this code if ODA failed or called Cancel
591 if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) {
592 // test to see if this ODA failed to consume data
593 if (tellable) {
594 // NOTE: if Tell fails, which can happen if the stream is
595 // now closed, then we assume that everything was read.
596 int64_t offsetAfter;
597 if (NS_FAILED(tellable->Tell(&offsetAfter))) {
598 offsetAfter = offsetBefore + odaAvail;
600 if (offsetAfter > offsetBefore) {
601 mStreamOffset += (offsetAfter - offsetBefore);
602 } else if (mSuspendCount == 0) {
604 // possible infinite loop if we continue pumping data!
606 // NOTE: although not allowed by nsIStreamListener, we
607 // will allow the ODA impl to Suspend the pump. IMAP
608 // does this :-(
610 NS_ERROR("OnDataAvailable implementation consumed no data");
611 mStatus = NS_ERROR_UNEXPECTED;
613 } else {
614 mStreamOffset += odaAvail; // assume ODA behaved well
619 // an error returned from Available or OnDataAvailable should cause us to
620 // abort; however, we must not stop on mStatus if already canceled.
622 if (NS_SUCCEEDED(mStatus)) {
623 if (NS_FAILED(rv)) {
624 mStatus = rv;
625 } else if (avail) {
626 // if stream is now closed, advance to STATE_STOP right away.
627 // Available may return 0 bytes available at the moment; that
628 // would not mean that we are done.
629 // XXX async streams should have a GetStatus method!
630 rv = mAsyncStream->Available(&avail);
631 if (NS_SUCCEEDED(rv)) return STATE_TRANSFER;
632 if (rv != NS_BASE_STREAM_CLOSED) mStatus = rv;
635 return STATE_STOP;
638 nsresult nsInputStreamPump::CallOnStateStop() {
639 RecursiveMutexAutoLock lock(mMutex);
641 MOZ_ASSERT(NS_IsMainThread(),
642 "CallOnStateStop should only be called on the main thread.");
644 mState = OnStateStop();
645 return NS_OK;
648 uint32_t nsInputStreamPump::OnStateStop() MOZ_REQUIRES(mMutex) {
649 mMutex.AssertCurrentThreadIn();
651 if (!NS_IsMainThread() && !mOffMainThread) {
652 // This method can be called on a different thread if nsInputStreamPump
653 // is used off the main-thread.
654 if (NS_SUCCEEDED(mStatus) && mListener &&
655 mozilla::StaticPrefs::network_send_OnDataFinished_nsInputStreamPump()) {
656 nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener =
657 do_QueryInterface(mListener);
658 if (retargetableListener) {
659 retargetableListener->OnDataFinished(mStatus);
662 nsresult rv = mLabeledMainThreadTarget->Dispatch(
663 mozilla::NewRunnableMethod("nsInputStreamPump::CallOnStateStop", this,
664 &nsInputStreamPump::CallOnStateStop));
665 NS_ENSURE_SUCCESS(rv, STATE_DEAD);
666 return STATE_DEAD;
669 AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateStop", NETWORK);
671 LOG((" OnStateStop [this=%p status=%" PRIx32 "]\n", this,
672 static_cast<uint32_t>(mStatus)));
674 // if an error occurred, we must be sure to pass the error onto the async
675 // stream. in some cases, this is redundant, but since close is idempotent,
676 // this is OK. otherwise, be sure to honor the "close-when-done" option.
678 if (!mAsyncStream || !mListener) {
679 MOZ_ASSERT(mAsyncStream, "null mAsyncStream: OnStateStop called twice?");
680 MOZ_ASSERT(mListener, "null mListener: OnStateStop called twice?");
681 return STATE_DEAD;
684 if (NS_FAILED(mStatus)) {
685 mAsyncStream->CloseWithStatus(mStatus);
686 } else if (mCloseWhenDone) {
687 mAsyncStream->Close();
690 mAsyncStream = nullptr;
691 mIsPending = false;
693 // We're on the writing thread.
694 // We believe that mStatus can't be changed on us here.
695 AssertOnThread();
697 nsCOMPtr<nsIStreamListener> listener = mListener;
698 nsresult status = mStatus;
699 // Note: Must exit mutex for call to OnStartRequest to avoid
700 // deadlocks when calls to RetargetDeliveryTo for multiple
701 // nsInputStreamPumps are needed (e.g. nsHttpChannel).
702 RecursiveMutexAutoUnlock unlock(mMutex);
704 listener->OnStopRequest(this, status);
706 mTargetThread = nullptr;
707 mListener = nullptr;
709 if (mLoadGroup) mLoadGroup->RemoveRequest(this, nullptr, mStatus);
711 return STATE_DEAD;
714 nsresult nsInputStreamPump::CreateBufferedStreamIfNeeded() {
715 if (mAsyncStreamIsBuffered) {
716 return NS_OK;
719 // ReadSegments is not available for any nsIAsyncInputStream. In order to use
720 // it, we wrap a nsIBufferedInputStream around it, if needed.
722 if (NS_InputStreamIsBuffered(mAsyncStream)) {
723 mAsyncStreamIsBuffered = true;
724 return NS_OK;
727 nsCOMPtr<nsIInputStream> stream;
728 nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(stream),
729 mAsyncStream.forget(), 4096);
730 NS_ENSURE_SUCCESS(rv, rv);
732 // A buffered inputStream must implement nsIAsyncInputStream.
733 mAsyncStream = do_QueryInterface(stream);
734 MOZ_DIAGNOSTIC_ASSERT(mAsyncStream);
735 mAsyncStreamIsBuffered = true;
737 return NS_OK;
740 //-----------------------------------------------------------------------------
741 // nsIThreadRetargetableRequest
742 //-----------------------------------------------------------------------------
744 NS_IMETHODIMP
745 nsInputStreamPump::RetargetDeliveryTo(nsISerialEventTarget* aNewTarget) {
746 RecursiveMutexAutoLock lock(mMutex);
748 NS_ENSURE_ARG(aNewTarget);
749 NS_ENSURE_TRUE(mState == STATE_START || mState == STATE_TRANSFER,
750 NS_ERROR_UNEXPECTED);
752 // If canceled, do not retarget. Return with canceled status.
753 if (NS_FAILED(mStatus)) {
754 return mStatus;
757 if (aNewTarget == mTargetThread) {
758 NS_WARNING("Retargeting delivery to same thread");
759 return NS_OK;
762 if (mOffMainThread) {
763 // Don't support retargeting if this pump is already used off the main
764 // thread.
765 return NS_ERROR_FAILURE;
768 // Ensure that |mListener| and any subsequent listeners can be retargeted
769 // to another thread.
770 nsresult rv = NS_OK;
771 nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener =
772 do_QueryInterface(mListener, &rv);
773 if (NS_SUCCEEDED(rv) && retargetableListener) {
774 rv = retargetableListener->CheckListenerChain();
775 if (NS_SUCCEEDED(rv)) {
776 mTargetThread = aNewTarget;
777 mRetargeting = true;
780 LOG(
781 ("nsInputStreamPump::RetargetDeliveryTo [this=%p aNewTarget=%p] "
782 "%s listener [%p] rv[%" PRIx32 "]",
783 this, aNewTarget, (mTargetThread == aNewTarget ? "success" : "failure"),
784 (nsIStreamListener*)mListener, static_cast<uint32_t>(rv)));
785 return rv;
788 NS_IMETHODIMP
789 nsInputStreamPump::GetDeliveryTarget(nsISerialEventTarget** aNewTarget) {
790 RecursiveMutexAutoLock lock(mMutex);
792 nsCOMPtr<nsISerialEventTarget> target = mTargetThread;
793 target.forget(aNewTarget);
794 return NS_OK;