1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/dom/cache/ReadStream.h"
9 #include "mozilla/Unused.h"
10 #include "mozilla/dom/cache/CacheStreamControlChild.h"
11 #include "mozilla/dom/cache/CacheStreamControlParent.h"
12 #include "mozilla/dom/cache/CacheTypes.h"
13 #include "mozilla/ipc/IPCStreamUtils.h"
14 #include "mozilla/SnappyUncompressInputStream.h"
15 #include "nsIAsyncInputStream.h"
16 #include "nsIThread.h"
17 #include "nsStringStream.h"
20 namespace mozilla::dom::cache
{
22 using mozilla::Unused
;
24 // ----------------------------------------------------------------------------
26 // The inner stream class. This is where all of the real work is done. As
27 // an invariant Inner::Close() must be called before ~Inner(). This is
28 // guaranteed by our outer ReadStream class.
29 class ReadStream::Inner final
: public ReadStream::Controllable
{
31 Inner(StreamControl
* aControl
, const nsID
& aId
, nsIInputStream
* aStream
);
33 void Serialize(Maybe
<CacheReadStream
>* aReadStreamOut
, ErrorResult
& aRv
);
35 void Serialize(CacheReadStream
* aReadStreamOut
, ErrorResult
& aRv
);
37 // ReadStream::Controllable methods
38 virtual void CloseStream() override
;
40 virtual void CloseStreamWithoutReporting() override
;
42 virtual bool HasEverBeenRead() const override
;
44 // Simulate nsIInputStream methods, but we don't actually inherit from it
47 nsresult
Available(uint64_t* aNumAvailableOut
);
49 nsresult
StreamStatus();
51 nsresult
Read(char* aBuf
, uint32_t aCount
, uint32_t* aNumReadOut
);
53 nsresult
ReadSegments(nsWriteSegmentFun aWriter
, void* aClosure
,
54 uint32_t aCount
, uint32_t* aNumReadOut
);
56 nsresult
IsNonBlocking(bool* aNonBlockingOut
);
63 class NoteClosedRunnable
;
70 void NoteClosedOnOwningThread();
72 void ForgetOnOwningThread();
74 nsIInputStream
* EnsureStream();
76 void AsyncOpenStreamOnOwningThread();
78 void MaybeAbortAsyncOpenStream();
80 void OpenStreamFailed();
82 inline SafeRefPtr
<Inner
> SafeRefPtrFromThis() {
83 return Controllable::SafeRefPtrFromThis().downcast
<Inner
>();
86 // Weak ref to the stream control actor. The actor will always call either
87 // CloseStream() or CloseStreamWithoutReporting() before it's destroyed. The
88 // weak ref is cleared in the resulting NoteClosedOnOwningThread() or
89 // ForgetOnOwningThread() method call.
90 StreamControl
* mControl
;
93 nsCOMPtr
<nsISerialEventTarget
> mOwningEventTarget
;
95 enum State
{ Open
, Closed
, NumStates
};
97 Atomic
<bool> mHasEverBeenRead
;
98 bool mAsyncOpenStarted
;
100 // The wrapped stream objects may not be threadsafe. We need to be able
101 // to close a stream on our owning thread while an IO thread is simultaneously
102 // reading the same stream. Therefore, protect all access to these stream
103 // objects with a mutex.
104 Mutex mMutex MOZ_UNANNOTATED
;
106 nsCOMPtr
<nsIInputStream
> mStream
;
107 nsCOMPtr
<nsIInputStream
> mSnappyStream
;
110 // ----------------------------------------------------------------------------
112 // Runnable to notify actors that the ReadStream has closed. This must
113 // be done on the thread associated with the PBackground actor. Must be
114 // cancelable to execute on Worker threads (which can occur when the
115 // ReadStream is constructed on a child process Worker thread).
116 class ReadStream::Inner::NoteClosedRunnable final
: public CancelableRunnable
{
118 explicit NoteClosedRunnable(SafeRefPtr
<ReadStream::Inner
> aStream
)
119 : CancelableRunnable("dom::cache::ReadStream::Inner::NoteClosedRunnable"),
120 mStream(std::move(aStream
)) {}
122 NS_IMETHOD
Run() override
{
123 mStream
->NoteClosedOnOwningThread();
127 // Note, we must proceed with the Run() method since our actor will not
128 // clean itself up until we note that the stream is closed.
129 nsresult
Cancel() override
{
135 ~NoteClosedRunnable() = default;
137 const SafeRefPtr
<ReadStream::Inner
> mStream
;
140 // ----------------------------------------------------------------------------
142 // Runnable to clear actors without reporting that the ReadStream has
143 // closed. Since this can trigger actor destruction, we need to do
144 // it on the thread associated with the PBackground actor. Must be
145 // cancelable to execute on Worker threads (which can occur when the
146 // ReadStream is constructed on a child process Worker thread).
147 class ReadStream::Inner::ForgetRunnable final
: public CancelableRunnable
{
149 explicit ForgetRunnable(SafeRefPtr
<ReadStream::Inner
> aStream
)
150 : CancelableRunnable("dom::cache::ReadStream::Inner::ForgetRunnable"),
151 mStream(std::move(aStream
)) {}
153 NS_IMETHOD
Run() override
{
154 mStream
->ForgetOnOwningThread();
158 // Note, we must proceed with the Run() method so that we properly
159 // call RemoveListener on the actor.
160 nsresult
Cancel() override
{
166 ~ForgetRunnable() = default;
168 const SafeRefPtr
<ReadStream::Inner
> mStream
;
171 // ----------------------------------------------------------------------------
173 ReadStream::Inner::Inner(StreamControl
* aControl
, const nsID
& aId
,
174 nsIInputStream
* aStream
)
175 : mControl(aControl
),
177 mOwningEventTarget(GetCurrentSerialEventTarget()),
179 mHasEverBeenRead(false),
180 mAsyncOpenStarted(false),
181 mMutex("dom::cache::ReadStream"),
182 mCondVar(mMutex
, "dom::cache::ReadStream"),
184 mSnappyStream(aStream
? new SnappyUncompressInputStream(aStream
)
186 MOZ_DIAGNOSTIC_ASSERT(mControl
);
187 mControl
->AddReadStream(SafeRefPtrFromThis());
190 void ReadStream::Inner::Serialize(Maybe
<CacheReadStream
>* aReadStreamOut
,
192 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
193 MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut
);
194 aReadStreamOut
->emplace(CacheReadStream());
195 Serialize(&aReadStreamOut
->ref(), aRv
);
198 void ReadStream::Inner::Serialize(CacheReadStream
* aReadStreamOut
,
200 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
201 MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut
);
203 if (mState
!= Open
) {
205 "Response body is a cache file stream that has already been closed.");
209 MOZ_DIAGNOSTIC_ASSERT(mControl
);
211 aReadStreamOut
->id() = mId
;
212 mControl
->SerializeControl(aReadStreamOut
);
215 MutexAutoLock
lock(mMutex
);
216 mControl
->SerializeStream(aReadStreamOut
, mStream
);
219 MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut
->stream().isNothing() ||
220 aReadStreamOut
->stream().ref().stream().type() !=
221 mozilla::ipc::InputStreamParams::T__None
);
223 // We're passing ownership across the IPC barrier with the control, so
224 // do not signal that the stream is closed here.
228 void ReadStream::Inner::CloseStream() {
229 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
230 MOZ_ALWAYS_SUCCEEDS(Close());
233 void ReadStream::Inner::CloseStreamWithoutReporting() {
234 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
238 bool ReadStream::Inner::HasEverBeenRead() const {
239 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
240 return mHasEverBeenRead
;
243 nsresult
ReadStream::Inner::Close() {
244 // stream ops can happen on any thread
247 MutexAutoLock
lock(mMutex
);
249 rv
= mSnappyStream
->Close();
256 nsresult
ReadStream::Inner::Available(uint64_t* aNumAvailableOut
) {
257 // stream ops can happen on any thread
260 MutexAutoLock
lock(mMutex
);
261 rv
= EnsureStream()->Available(aNumAvailableOut
);
271 nsresult
ReadStream::Inner::StreamStatus() {
272 // stream ops can happen on any thread
275 MutexAutoLock
lock(mMutex
);
276 rv
= EnsureStream()->StreamStatus();
286 nsresult
ReadStream::Inner::Read(char* aBuf
, uint32_t aCount
,
287 uint32_t* aNumReadOut
) {
288 // stream ops can happen on any thread
289 MOZ_DIAGNOSTIC_ASSERT(aNumReadOut
);
293 MutexAutoLock
lock(mMutex
);
294 rv
= EnsureStream()->Read(aBuf
, aCount
, aNumReadOut
);
297 if ((NS_FAILED(rv
) && rv
!= NS_BASE_STREAM_WOULD_BLOCK
) ||
302 mHasEverBeenRead
= true;
307 nsresult
ReadStream::Inner::ReadSegments(nsWriteSegmentFun aWriter
,
308 void* aClosure
, uint32_t aCount
,
309 uint32_t* aNumReadOut
) {
310 // stream ops can happen on any thread
311 MOZ_DIAGNOSTIC_ASSERT(aNumReadOut
);
314 mHasEverBeenRead
= true;
319 MutexAutoLock
lock(mMutex
);
320 rv
= EnsureStream()->ReadSegments(aWriter
, aClosure
, aCount
, aNumReadOut
);
323 if ((NS_FAILED(rv
) && rv
!= NS_BASE_STREAM_WOULD_BLOCK
&&
324 rv
!= NS_ERROR_NOT_IMPLEMENTED
) ||
329 // Verify bytes were actually read before marking as being ever read. For
330 // example, code can test if the stream supports ReadSegments() by calling
331 // this method with a dummy callback which doesn't read anything. We don't
332 // want to trigger on that.
334 mHasEverBeenRead
= true;
340 nsresult
ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut
) {
341 // stream ops can happen on any thread
342 MutexAutoLock
lock(mMutex
);
344 return mSnappyStream
->IsNonBlocking(aNonBlockingOut
);
346 *aNonBlockingOut
= false;
350 ReadStream::Inner::~Inner() {
352 MOZ_DIAGNOSTIC_ASSERT(mState
== Closed
);
353 MOZ_DIAGNOSTIC_ASSERT(!mControl
);
356 void ReadStream::Inner::NoteClosed() {
358 if (mState
== Closed
) {
362 if (mOwningEventTarget
->IsOnCurrentThread()) {
363 NoteClosedOnOwningThread();
367 nsCOMPtr
<nsIRunnable
> runnable
= new NoteClosedRunnable(SafeRefPtrFromThis());
368 MOZ_ALWAYS_SUCCEEDS(mOwningEventTarget
->Dispatch(runnable
.forget(),
369 nsIThread::DISPATCH_NORMAL
));
372 void ReadStream::Inner::Forget() {
374 if (mState
== Closed
) {
378 if (mOwningEventTarget
->IsOnCurrentThread()) {
379 ForgetOnOwningThread();
383 nsCOMPtr
<nsIRunnable
> runnable
= new ForgetRunnable(SafeRefPtrFromThis());
384 MOZ_ALWAYS_SUCCEEDS(mOwningEventTarget
->Dispatch(runnable
.forget(),
385 nsIThread::DISPATCH_NORMAL
));
388 void ReadStream::Inner::NoteClosedOnOwningThread() {
389 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
391 // Mark closed and do nothing if we were already closed
392 if (!mState
.compareExchange(Open
, Closed
)) {
396 MaybeAbortAsyncOpenStream();
398 MOZ_DIAGNOSTIC_ASSERT(mControl
);
399 mControl
->NoteClosed(SafeRefPtrFromThis(), mId
);
403 void ReadStream::Inner::ForgetOnOwningThread() {
404 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
406 // Mark closed and do nothing if we were already closed
407 if (!mState
.compareExchange(Open
, Closed
)) {
411 MaybeAbortAsyncOpenStream();
413 MOZ_DIAGNOSTIC_ASSERT(mControl
);
414 mControl
->ForgetReadStream(SafeRefPtrFromThis());
418 nsIInputStream
* ReadStream::Inner::EnsureStream() {
419 mMutex
.AssertCurrentThreadOwns();
421 // We need to block the current thread while we open the stream. We
422 // cannot do this safely from the main owning thread since it would
423 // trigger deadlock. This should be ok, though, since a blocking
424 // stream like this should never be read on the owning thread anyway.
425 if (mOwningEventTarget
->IsOnCurrentThread()) {
426 MOZ_CRASH("Blocking read on the js/ipc owning thread!");
430 return mSnappyStream
;
433 nsCOMPtr
<nsIRunnable
> r
= NewCancelableRunnableMethod(
434 "ReadStream::Inner::AsyncOpenStreamOnOwningThread", this,
435 &ReadStream::Inner::AsyncOpenStreamOnOwningThread
);
437 mOwningEventTarget
->Dispatch(r
.forget(), nsIThread::DISPATCH_NORMAL
);
438 if (NS_WARN_IF(NS_FAILED(rv
))) {
440 return mSnappyStream
;
444 MOZ_DIAGNOSTIC_ASSERT(mSnappyStream
);
446 return mSnappyStream
;
449 void ReadStream::Inner::AsyncOpenStreamOnOwningThread() {
450 MOZ_ASSERT(mOwningEventTarget
->IsOnCurrentThread());
453 // Different threads might request opening the stream at the same time. If
454 // the earlier request succeeded, then use the result.
455 mCondVar
.NotifyAll();
459 if (!mControl
|| mState
== Closed
) {
460 MutexAutoLock
lock(mMutex
);
462 mCondVar
.NotifyAll();
466 if (mAsyncOpenStarted
) {
469 mAsyncOpenStarted
= true;
471 RefPtr
<ReadStream::Inner
> self
= this;
472 mControl
->OpenStream(mId
, [self
](nsCOMPtr
<nsIInputStream
>&& aStream
) {
473 MutexAutoLock
lock(self
->mMutex
);
474 self
->mAsyncOpenStarted
= false;
475 if (!self
->mStream
) {
477 self
->OpenStreamFailed();
479 self
->mStream
= std::move(aStream
);
480 self
->mSnappyStream
= new SnappyUncompressInputStream(self
->mStream
);
483 self
->mCondVar
.NotifyAll();
487 void ReadStream::Inner::MaybeAbortAsyncOpenStream() {
488 if (!mAsyncOpenStarted
) {
492 MutexAutoLock
lock(mMutex
);
494 mCondVar
.NotifyAll();
497 void ReadStream::Inner::OpenStreamFailed() {
498 MOZ_DIAGNOSTIC_ASSERT(!mStream
);
499 MOZ_DIAGNOSTIC_ASSERT(!mSnappyStream
);
500 mMutex
.AssertCurrentThreadOwns();
501 Unused
<< NS_NewCStringInputStream(getter_AddRefs(mStream
), ""_ns
);
502 mSnappyStream
= mStream
;
507 // ----------------------------------------------------------------------------
509 NS_IMPL_ISUPPORTS(cache::ReadStream
, nsIInputStream
, ReadStream
);
512 already_AddRefed
<ReadStream
> ReadStream::Create(
513 const Maybe
<CacheReadStream
>& aMaybeReadStream
) {
514 if (aMaybeReadStream
.isNothing()) {
518 return Create(aMaybeReadStream
.ref());
522 already_AddRefed
<ReadStream
> ReadStream::Create(
523 const CacheReadStream
& aReadStream
) {
524 // The parameter may or may not be for a Cache created stream. The way we
525 // tell is by looking at the stream control actor. If the actor exists,
526 // then we know the Cache created it.
527 if (!aReadStream
.control()) {
531 MOZ_DIAGNOSTIC_ASSERT(aReadStream
.stream().isNothing() ||
532 aReadStream
.stream().ref().stream().type() !=
533 mozilla::ipc::InputStreamParams::T__None
);
535 // Control is guaranteed to survive this method as ActorDestroy() cannot
536 // run on this thread until we complete.
537 StreamControl
* control
;
538 if (aReadStream
.control().IsChild()) {
540 static_cast<CacheStreamControlChild
*>(aReadStream
.control().AsChild());
543 auto actor
= static_cast<CacheStreamControlParent
*>(
544 aReadStream
.control().AsParent());
547 MOZ_DIAGNOSTIC_ASSERT(control
);
549 nsCOMPtr
<nsIInputStream
> stream
= DeserializeIPCStream(aReadStream
.stream());
551 // Currently we expect all cache read streams to be blocking file streams.
552 #if defined(MOZ_DIAGNOSTIC_ASSERT_ENABLED)
554 nsCOMPtr
<nsIAsyncInputStream
> asyncStream
= do_QueryInterface(stream
);
555 MOZ_DIAGNOSTIC_ASSERT(!asyncStream
);
559 return MakeAndAddRef
<ReadStream
>(MakeSafeRefPtr
<ReadStream::Inner
>(
560 std::move(control
), aReadStream
.id(), stream
));
564 already_AddRefed
<ReadStream
> ReadStream::Create(
565 PCacheStreamControlParent
* aControl
, const nsID
& aId
,
566 nsIInputStream
* aStream
) {
567 MOZ_DIAGNOSTIC_ASSERT(aControl
);
569 return MakeAndAddRef
<ReadStream
>(MakeSafeRefPtr
<ReadStream::Inner
>(
570 static_cast<CacheStreamControlParent
*>(aControl
), aId
, aStream
));
573 void ReadStream::Serialize(Maybe
<CacheReadStream
>* aReadStreamOut
,
575 mInner
->Serialize(aReadStreamOut
, aRv
);
578 void ReadStream::Serialize(CacheReadStream
* aReadStreamOut
, ErrorResult
& aRv
) {
579 mInner
->Serialize(aReadStreamOut
, aRv
);
582 ReadStream::ReadStream(SafeRefPtr
<ReadStream::Inner
> aInner
)
583 : mInner(std::move(aInner
)) {
584 MOZ_DIAGNOSTIC_ASSERT(mInner
);
587 ReadStream::~ReadStream() {
588 // Explicitly close the inner stream so that it does not have to
589 // deal with implicitly closing at destruction time.
594 ReadStream::Close() { return mInner
->Close(); }
597 ReadStream::Available(uint64_t* aNumAvailableOut
) {
598 return mInner
->Available(aNumAvailableOut
);
602 ReadStream::StreamStatus() { return mInner
->StreamStatus(); }
605 ReadStream::Read(char* aBuf
, uint32_t aCount
, uint32_t* aNumReadOut
) {
606 return mInner
->Read(aBuf
, aCount
, aNumReadOut
);
610 ReadStream::ReadSegments(nsWriteSegmentFun aWriter
, void* aClosure
,
611 uint32_t aCount
, uint32_t* aNumReadOut
) {
612 return mInner
->ReadSegments(aWriter
, aClosure
, aCount
, aNumReadOut
);
616 ReadStream::IsNonBlocking(bool* aNonBlockingOut
) {
617 return mInner
->IsNonBlocking(aNonBlockingOut
);
620 } // namespace mozilla::dom::cache