Backed out changeset b462e7b742d8 (bug 1908261) for causing multiple reftest failures...
[gecko.git] / dom / cache / ReadStream.cpp
blob366dd5ab138507a49658aa8fcf0f9e3e88a069b7
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/dom/cache/ReadStream.h"
9 #include "mozilla/Unused.h"
10 #include "mozilla/dom/cache/CacheStreamControlChild.h"
11 #include "mozilla/dom/cache/CacheStreamControlParent.h"
12 #include "mozilla/dom/cache/CacheTypes.h"
13 #include "mozilla/ipc/IPCStreamUtils.h"
14 #include "mozilla/SnappyUncompressInputStream.h"
15 #include "nsIAsyncInputStream.h"
16 #include "nsIThread.h"
17 #include "nsStringStream.h"
18 #include "nsTArray.h"
20 namespace mozilla::dom::cache {
22 using mozilla::Unused;
24 // ----------------------------------------------------------------------------
26 // The inner stream class. This is where all of the real work is done. As
27 // an invariant Inner::Close() must be called before ~Inner(). This is
28 // guaranteed by our outer ReadStream class.
29 class ReadStream::Inner final : public ReadStream::Controllable {
30 public:
31 Inner(StreamControl* aControl, const nsID& aId, nsIInputStream* aStream);
33 void Serialize(Maybe<CacheReadStream>* aReadStreamOut, ErrorResult& aRv);
35 void Serialize(CacheReadStream* aReadStreamOut, ErrorResult& aRv);
37 // ReadStream::Controllable methods
38 virtual void CloseStream() override;
40 virtual void CloseStreamWithoutReporting() override;
42 virtual bool HasEverBeenRead() const override;
44 // Simulate nsIInputStream methods, but we don't actually inherit from it
45 nsresult Close();
47 nsresult Available(uint64_t* aNumAvailableOut);
49 nsresult StreamStatus();
51 nsresult Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut);
53 nsresult ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
54 uint32_t aCount, uint32_t* aNumReadOut);
56 nsresult IsNonBlocking(bool* aNonBlockingOut);
58 NS_DECL_OWNINGTHREAD;
60 ~Inner();
62 private:
63 class NoteClosedRunnable;
64 class ForgetRunnable;
66 void NoteClosed();
68 void Forget();
70 void NoteClosedOnOwningThread();
72 void ForgetOnOwningThread();
74 nsIInputStream* EnsureStream();
76 void AsyncOpenStreamOnOwningThread();
78 void MaybeAbortAsyncOpenStream();
80 void OpenStreamFailed();
82 inline SafeRefPtr<Inner> SafeRefPtrFromThis() {
83 return Controllable::SafeRefPtrFromThis().downcast<Inner>();
86 // Weak ref to the stream control actor. The actor will always call either
87 // CloseStream() or CloseStreamWithoutReporting() before it's destroyed. The
88 // weak ref is cleared in the resulting NoteClosedOnOwningThread() or
89 // ForgetOnOwningThread() method call.
90 StreamControl* mControl;
92 const nsID mId;
93 nsCOMPtr<nsISerialEventTarget> mOwningEventTarget;
95 enum State { Open, Closed, NumStates };
96 Atomic<State> mState;
97 Atomic<bool> mHasEverBeenRead;
98 bool mAsyncOpenStarted;
100 // The wrapped stream objects may not be threadsafe. We need to be able
101 // to close a stream on our owning thread while an IO thread is simultaneously
102 // reading the same stream. Therefore, protect all access to these stream
103 // objects with a mutex.
104 Mutex mMutex MOZ_UNANNOTATED;
105 CondVar mCondVar;
106 nsCOMPtr<nsIInputStream> mStream;
107 nsCOMPtr<nsIInputStream> mSnappyStream;
110 // ----------------------------------------------------------------------------
112 // Runnable to notify actors that the ReadStream has closed. This must
113 // be done on the thread associated with the PBackground actor. Must be
114 // cancelable to execute on Worker threads (which can occur when the
115 // ReadStream is constructed on a child process Worker thread).
116 class ReadStream::Inner::NoteClosedRunnable final : public CancelableRunnable {
117 public:
118 explicit NoteClosedRunnable(SafeRefPtr<ReadStream::Inner> aStream)
119 : CancelableRunnable("dom::cache::ReadStream::Inner::NoteClosedRunnable"),
120 mStream(std::move(aStream)) {}
122 NS_IMETHOD Run() override {
123 mStream->NoteClosedOnOwningThread();
124 return NS_OK;
127 // Note, we must proceed with the Run() method since our actor will not
128 // clean itself up until we note that the stream is closed.
129 nsresult Cancel() override {
130 Run();
131 return NS_OK;
134 private:
135 ~NoteClosedRunnable() = default;
137 const SafeRefPtr<ReadStream::Inner> mStream;
140 // ----------------------------------------------------------------------------
142 // Runnable to clear actors without reporting that the ReadStream has
143 // closed. Since this can trigger actor destruction, we need to do
144 // it on the thread associated with the PBackground actor. Must be
145 // cancelable to execute on Worker threads (which can occur when the
146 // ReadStream is constructed on a child process Worker thread).
147 class ReadStream::Inner::ForgetRunnable final : public CancelableRunnable {
148 public:
149 explicit ForgetRunnable(SafeRefPtr<ReadStream::Inner> aStream)
150 : CancelableRunnable("dom::cache::ReadStream::Inner::ForgetRunnable"),
151 mStream(std::move(aStream)) {}
153 NS_IMETHOD Run() override {
154 mStream->ForgetOnOwningThread();
155 return NS_OK;
158 // Note, we must proceed with the Run() method so that we properly
159 // call RemoveListener on the actor.
160 nsresult Cancel() override {
161 Run();
162 return NS_OK;
165 private:
166 ~ForgetRunnable() = default;
168 const SafeRefPtr<ReadStream::Inner> mStream;
171 // ----------------------------------------------------------------------------
173 ReadStream::Inner::Inner(StreamControl* aControl, const nsID& aId,
174 nsIInputStream* aStream)
175 : mControl(aControl),
176 mId(aId),
177 mOwningEventTarget(GetCurrentSerialEventTarget()),
178 mState(Open),
179 mHasEverBeenRead(false),
180 mAsyncOpenStarted(false),
181 mMutex("dom::cache::ReadStream"),
182 mCondVar(mMutex, "dom::cache::ReadStream"),
183 mStream(aStream),
184 mSnappyStream(aStream ? new SnappyUncompressInputStream(aStream)
185 : nullptr) {
186 MOZ_DIAGNOSTIC_ASSERT(mControl);
187 mControl->AddReadStream(SafeRefPtrFromThis());
190 void ReadStream::Inner::Serialize(Maybe<CacheReadStream>* aReadStreamOut,
191 ErrorResult& aRv) {
192 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
193 MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut);
194 aReadStreamOut->emplace(CacheReadStream());
195 Serialize(&aReadStreamOut->ref(), aRv);
198 void ReadStream::Inner::Serialize(CacheReadStream* aReadStreamOut,
199 ErrorResult& aRv) {
200 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
201 MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut);
203 if (mState != Open) {
204 aRv.ThrowTypeError(
205 "Response body is a cache file stream that has already been closed.");
206 return;
209 MOZ_DIAGNOSTIC_ASSERT(mControl);
211 aReadStreamOut->id() = mId;
212 mControl->SerializeControl(aReadStreamOut);
215 MutexAutoLock lock(mMutex);
216 mControl->SerializeStream(aReadStreamOut, mStream);
219 MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut->stream().isNothing() ||
220 aReadStreamOut->stream().ref().stream().type() !=
221 mozilla::ipc::InputStreamParams::T__None);
223 // We're passing ownership across the IPC barrier with the control, so
224 // do not signal that the stream is closed here.
225 Forget();
228 void ReadStream::Inner::CloseStream() {
229 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
230 MOZ_ALWAYS_SUCCEEDS(Close());
233 void ReadStream::Inner::CloseStreamWithoutReporting() {
234 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
235 Forget();
238 bool ReadStream::Inner::HasEverBeenRead() const {
239 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
240 return mHasEverBeenRead;
243 nsresult ReadStream::Inner::Close() {
244 // stream ops can happen on any thread
245 nsresult rv = NS_OK;
247 MutexAutoLock lock(mMutex);
248 if (mSnappyStream) {
249 rv = mSnappyStream->Close();
252 NoteClosed();
253 return rv;
256 nsresult ReadStream::Inner::Available(uint64_t* aNumAvailableOut) {
257 // stream ops can happen on any thread
258 nsresult rv = NS_OK;
260 MutexAutoLock lock(mMutex);
261 rv = EnsureStream()->Available(aNumAvailableOut);
264 if (NS_FAILED(rv)) {
265 Close();
268 return rv;
271 nsresult ReadStream::Inner::StreamStatus() {
272 // stream ops can happen on any thread
273 nsresult rv = NS_OK;
275 MutexAutoLock lock(mMutex);
276 rv = EnsureStream()->StreamStatus();
279 if (NS_FAILED(rv)) {
280 Close();
283 return rv;
286 nsresult ReadStream::Inner::Read(char* aBuf, uint32_t aCount,
287 uint32_t* aNumReadOut) {
288 // stream ops can happen on any thread
289 MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);
291 nsresult rv = NS_OK;
293 MutexAutoLock lock(mMutex);
294 rv = EnsureStream()->Read(aBuf, aCount, aNumReadOut);
297 if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) ||
298 *aNumReadOut == 0) {
299 Close();
302 mHasEverBeenRead = true;
304 return rv;
307 nsresult ReadStream::Inner::ReadSegments(nsWriteSegmentFun aWriter,
308 void* aClosure, uint32_t aCount,
309 uint32_t* aNumReadOut) {
310 // stream ops can happen on any thread
311 MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);
313 if (aCount) {
314 mHasEverBeenRead = true;
317 nsresult rv = NS_OK;
319 MutexAutoLock lock(mMutex);
320 rv = EnsureStream()->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
323 if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK &&
324 rv != NS_ERROR_NOT_IMPLEMENTED) ||
325 *aNumReadOut == 0) {
326 Close();
329 // Verify bytes were actually read before marking as being ever read. For
330 // example, code can test if the stream supports ReadSegments() by calling
331 // this method with a dummy callback which doesn't read anything. We don't
332 // want to trigger on that.
333 if (*aNumReadOut) {
334 mHasEverBeenRead = true;
337 return rv;
340 nsresult ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut) {
341 // stream ops can happen on any thread
342 MutexAutoLock lock(mMutex);
343 if (mSnappyStream) {
344 return mSnappyStream->IsNonBlocking(aNonBlockingOut);
346 *aNonBlockingOut = false;
347 return NS_OK;
350 ReadStream::Inner::~Inner() {
351 // Any thread
352 MOZ_DIAGNOSTIC_ASSERT(mState == Closed);
353 MOZ_DIAGNOSTIC_ASSERT(!mControl);
356 void ReadStream::Inner::NoteClosed() {
357 // Any thread
358 if (mState == Closed) {
359 return;
362 if (mOwningEventTarget->IsOnCurrentThread()) {
363 NoteClosedOnOwningThread();
364 return;
367 nsCOMPtr<nsIRunnable> runnable = new NoteClosedRunnable(SafeRefPtrFromThis());
368 MOZ_ALWAYS_SUCCEEDS(mOwningEventTarget->Dispatch(runnable.forget(),
369 nsIThread::DISPATCH_NORMAL));
372 void ReadStream::Inner::Forget() {
373 // Any thread
374 if (mState == Closed) {
375 return;
378 if (mOwningEventTarget->IsOnCurrentThread()) {
379 ForgetOnOwningThread();
380 return;
383 nsCOMPtr<nsIRunnable> runnable = new ForgetRunnable(SafeRefPtrFromThis());
384 MOZ_ALWAYS_SUCCEEDS(mOwningEventTarget->Dispatch(runnable.forget(),
385 nsIThread::DISPATCH_NORMAL));
388 void ReadStream::Inner::NoteClosedOnOwningThread() {
389 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
391 // Mark closed and do nothing if we were already closed
392 if (!mState.compareExchange(Open, Closed)) {
393 return;
396 MaybeAbortAsyncOpenStream();
398 MOZ_DIAGNOSTIC_ASSERT(mControl);
399 mControl->NoteClosed(SafeRefPtrFromThis(), mId);
400 mControl = nullptr;
403 void ReadStream::Inner::ForgetOnOwningThread() {
404 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
406 // Mark closed and do nothing if we were already closed
407 if (!mState.compareExchange(Open, Closed)) {
408 return;
411 MaybeAbortAsyncOpenStream();
413 MOZ_DIAGNOSTIC_ASSERT(mControl);
414 mControl->ForgetReadStream(SafeRefPtrFromThis());
415 mControl = nullptr;
418 nsIInputStream* ReadStream::Inner::EnsureStream() {
419 mMutex.AssertCurrentThreadOwns();
421 // We need to block the current thread while we open the stream. We
422 // cannot do this safely from the main owning thread since it would
423 // trigger deadlock. This should be ok, though, since a blocking
424 // stream like this should never be read on the owning thread anyway.
425 if (mOwningEventTarget->IsOnCurrentThread()) {
426 MOZ_CRASH("Blocking read on the js/ipc owning thread!");
429 if (mSnappyStream) {
430 return mSnappyStream;
433 nsCOMPtr<nsIRunnable> r = NewCancelableRunnableMethod(
434 "ReadStream::Inner::AsyncOpenStreamOnOwningThread", this,
435 &ReadStream::Inner::AsyncOpenStreamOnOwningThread);
436 nsresult rv =
437 mOwningEventTarget->Dispatch(r.forget(), nsIThread::DISPATCH_NORMAL);
438 if (NS_WARN_IF(NS_FAILED(rv))) {
439 OpenStreamFailed();
440 return mSnappyStream;
443 mCondVar.Wait();
444 MOZ_DIAGNOSTIC_ASSERT(mSnappyStream);
446 return mSnappyStream;
449 void ReadStream::Inner::AsyncOpenStreamOnOwningThread() {
450 MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
452 if (mSnappyStream) {
453 // Different threads might request opening the stream at the same time. If
454 // the earlier request succeeded, then use the result.
455 mCondVar.NotifyAll();
456 return;
459 if (!mControl || mState == Closed) {
460 MutexAutoLock lock(mMutex);
461 OpenStreamFailed();
462 mCondVar.NotifyAll();
463 return;
466 if (mAsyncOpenStarted) {
467 return;
469 mAsyncOpenStarted = true;
471 RefPtr<ReadStream::Inner> self = this;
472 mControl->OpenStream(mId, [self](nsCOMPtr<nsIInputStream>&& aStream) {
473 MutexAutoLock lock(self->mMutex);
474 self->mAsyncOpenStarted = false;
475 if (!self->mStream) {
476 if (!aStream) {
477 self->OpenStreamFailed();
478 } else {
479 self->mStream = std::move(aStream);
480 self->mSnappyStream = new SnappyUncompressInputStream(self->mStream);
483 self->mCondVar.NotifyAll();
487 void ReadStream::Inner::MaybeAbortAsyncOpenStream() {
488 if (!mAsyncOpenStarted) {
489 return;
492 MutexAutoLock lock(mMutex);
493 OpenStreamFailed();
494 mCondVar.NotifyAll();
497 void ReadStream::Inner::OpenStreamFailed() {
498 MOZ_DIAGNOSTIC_ASSERT(!mStream);
499 MOZ_DIAGNOSTIC_ASSERT(!mSnappyStream);
500 mMutex.AssertCurrentThreadOwns();
501 Unused << NS_NewCStringInputStream(getter_AddRefs(mStream), ""_ns);
502 mSnappyStream = mStream;
503 mStream->Close();
504 NoteClosed();
507 // ----------------------------------------------------------------------------
509 NS_IMPL_ISUPPORTS(cache::ReadStream, nsIInputStream, ReadStream);
511 // static
512 already_AddRefed<ReadStream> ReadStream::Create(
513 const Maybe<CacheReadStream>& aMaybeReadStream) {
514 if (aMaybeReadStream.isNothing()) {
515 return nullptr;
518 return Create(aMaybeReadStream.ref());
521 // static
522 already_AddRefed<ReadStream> ReadStream::Create(
523 const CacheReadStream& aReadStream) {
524 // The parameter may or may not be for a Cache created stream. The way we
525 // tell is by looking at the stream control actor. If the actor exists,
526 // then we know the Cache created it.
527 if (!aReadStream.control()) {
528 return nullptr;
531 MOZ_DIAGNOSTIC_ASSERT(aReadStream.stream().isNothing() ||
532 aReadStream.stream().ref().stream().type() !=
533 mozilla::ipc::InputStreamParams::T__None);
535 // Control is guaranteed to survive this method as ActorDestroy() cannot
536 // run on this thread until we complete.
537 StreamControl* control;
538 if (aReadStream.control().IsChild()) {
539 auto actor =
540 static_cast<CacheStreamControlChild*>(aReadStream.control().AsChild());
541 control = actor;
542 } else {
543 auto actor = static_cast<CacheStreamControlParent*>(
544 aReadStream.control().AsParent());
545 control = actor;
547 MOZ_DIAGNOSTIC_ASSERT(control);
549 nsCOMPtr<nsIInputStream> stream = DeserializeIPCStream(aReadStream.stream());
551 // Currently we expect all cache read streams to be blocking file streams.
552 #if defined(MOZ_DIAGNOSTIC_ASSERT_ENABLED)
553 if (stream) {
554 nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(stream);
555 MOZ_DIAGNOSTIC_ASSERT(!asyncStream);
557 #endif
559 return MakeAndAddRef<ReadStream>(MakeSafeRefPtr<ReadStream::Inner>(
560 std::move(control), aReadStream.id(), stream));
563 // static
564 already_AddRefed<ReadStream> ReadStream::Create(
565 PCacheStreamControlParent* aControl, const nsID& aId,
566 nsIInputStream* aStream) {
567 MOZ_DIAGNOSTIC_ASSERT(aControl);
569 return MakeAndAddRef<ReadStream>(MakeSafeRefPtr<ReadStream::Inner>(
570 static_cast<CacheStreamControlParent*>(aControl), aId, aStream));
573 void ReadStream::Serialize(Maybe<CacheReadStream>* aReadStreamOut,
574 ErrorResult& aRv) {
575 mInner->Serialize(aReadStreamOut, aRv);
578 void ReadStream::Serialize(CacheReadStream* aReadStreamOut, ErrorResult& aRv) {
579 mInner->Serialize(aReadStreamOut, aRv);
582 ReadStream::ReadStream(SafeRefPtr<ReadStream::Inner> aInner)
583 : mInner(std::move(aInner)) {
584 MOZ_DIAGNOSTIC_ASSERT(mInner);
587 ReadStream::~ReadStream() {
588 // Explicitly close the inner stream so that it does not have to
589 // deal with implicitly closing at destruction time.
590 mInner->Close();
593 NS_IMETHODIMP
594 ReadStream::Close() { return mInner->Close(); }
596 NS_IMETHODIMP
597 ReadStream::Available(uint64_t* aNumAvailableOut) {
598 return mInner->Available(aNumAvailableOut);
601 NS_IMETHODIMP
602 ReadStream::StreamStatus() { return mInner->StreamStatus(); }
604 NS_IMETHODIMP
605 ReadStream::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut) {
606 return mInner->Read(aBuf, aCount, aNumReadOut);
609 NS_IMETHODIMP
610 ReadStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
611 uint32_t aCount, uint32_t* aNumReadOut) {
612 return mInner->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
615 NS_IMETHODIMP
616 ReadStream::IsNonBlocking(bool* aNonBlockingOut) {
617 return mInner->IsNonBlocking(aNonBlockingOut);
620 } // namespace mozilla::dom::cache