1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim:set ts=2 sw=2 sts=2 et cindent: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "StreamUtils.h"
8 #include "mozilla/dom/ReadableStream.h"
9 #include "mozilla/dom/ReadableStreamDefaultController.h"
10 #include "mozilla/dom/ReadableByteStreamController.h"
11 #include "mozilla/dom/UnderlyingSourceCallbackHelpers.h"
12 #include "mozilla/dom/UnderlyingSourceBinding.h"
13 #include "mozilla/dom/WorkerCommon.h"
14 #include "mozilla/dom/WorkerPrivate.h"
15 #include "mozilla/dom/WorkerRunnable.h"
16 #include "js/experimental/TypedData.h"
17 #include "nsStreamUtils.h"
19 namespace mozilla::dom
{
21 using namespace streams_abstract
;
23 // UnderlyingSourceAlgorithmsBase
24 NS_IMPL_CYCLE_COLLECTION(UnderlyingSourceAlgorithmsBase
)
25 NS_IMPL_CYCLE_COLLECTING_ADDREF(UnderlyingSourceAlgorithmsBase
)
26 NS_IMPL_CYCLE_COLLECTING_RELEASE(UnderlyingSourceAlgorithmsBase
)
27 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSourceAlgorithmsBase
)
28 NS_INTERFACE_MAP_ENTRY(nsISupports
)
31 NS_IMPL_CYCLE_COLLECTION_INHERITED_WITH_JS_MEMBERS(
32 UnderlyingSourceAlgorithms
, UnderlyingSourceAlgorithmsBase
,
33 (mGlobal
, mStartCallback
, mPullCallback
, mCancelCallback
),
35 NS_IMPL_ADDREF_INHERITED(UnderlyingSourceAlgorithms
,
36 UnderlyingSourceAlgorithmsBase
)
37 NS_IMPL_RELEASE_INHERITED(UnderlyingSourceAlgorithms
,
38 UnderlyingSourceAlgorithmsBase
)
39 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSourceAlgorithms
)
40 NS_INTERFACE_MAP_END_INHERITING(UnderlyingSourceAlgorithmsBase
)
42 // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller-from-underlying-source
43 void UnderlyingSourceAlgorithms::StartCallback(
44 JSContext
* aCx
, ReadableStreamController
& aController
,
45 JS::MutableHandle
<JS::Value
> aRetVal
, ErrorResult
& aRv
) {
46 if (!mStartCallback
) {
47 // Step 2: Let startAlgorithm be an algorithm that returns undefined.
48 aRetVal
.setUndefined();
52 // Step 5: If underlyingSourceDict["start"] exists, then set startAlgorithm to
53 // an algorithm which returns the result of invoking
54 // underlyingSourceDict["start"] with argument list « controller » and
55 // callback this value underlyingSource.
56 JS::Rooted
<JSObject
*> thisObj(aCx
, mUnderlyingSource
);
57 ReadableStreamDefaultControllerOrReadableByteStreamController controller
;
58 if (aController
.IsDefault()) {
59 controller
.SetAsReadableStreamDefaultController() = aController
.AsDefault();
61 controller
.SetAsReadableByteStreamController() = aController
.AsByte();
64 return mStartCallback
->Call(thisObj
, controller
, aRetVal
, aRv
,
65 "UnderlyingSource.start",
66 CallbackFunction::eRethrowExceptions
);
69 // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller-from-underlying-source
70 already_AddRefed
<Promise
> UnderlyingSourceAlgorithms::PullCallback(
71 JSContext
* aCx
, ReadableStreamController
& aController
, ErrorResult
& aRv
) {
72 JS::Rooted
<JSObject
*> thisObj(aCx
, mUnderlyingSource
);
74 // Step 3: Let pullAlgorithm be an algorithm that returns a promise resolved
76 return Promise::CreateResolvedWithUndefined(mGlobal
, aRv
);
79 // Step 6: If underlyingSourceDict["pull"] exists, then set pullAlgorithm to
80 // an algorithm which returns the result of invoking
81 // underlyingSourceDict["pull"] with argument list « controller » and callback
82 // this value underlyingSource.
83 ReadableStreamDefaultControllerOrReadableByteStreamController controller
;
84 if (aController
.IsDefault()) {
85 controller
.SetAsReadableStreamDefaultController() = aController
.AsDefault();
87 controller
.SetAsReadableByteStreamController() = aController
.AsByte();
90 RefPtr
<Promise
> promise
=
91 mPullCallback
->Call(thisObj
, controller
, aRv
, "UnderlyingSource.pull",
92 CallbackFunction::eRethrowExceptions
);
94 return promise
.forget();
97 // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller-from-underlying-source
98 already_AddRefed
<Promise
> UnderlyingSourceAlgorithms::CancelCallback(
99 JSContext
* aCx
, const Optional
<JS::Handle
<JS::Value
>>& aReason
,
101 if (!mCancelCallback
) {
102 // Step 4: Let cancelAlgorithm be an algorithm that returns a promise
103 // resolved with undefined.
104 return Promise::CreateResolvedWithUndefined(mGlobal
, aRv
);
107 // Step 7: If underlyingSourceDict["cancel"] exists, then set cancelAlgorithm
108 // to an algorithm which takes an argument reason and returns the result of
109 // invoking underlyingSourceDict["cancel"] with argument list « reason » and
110 // callback this value underlyingSource.
111 JS::Rooted
<JSObject
*> thisObj(aCx
, mUnderlyingSource
);
112 RefPtr
<Promise
> promise
=
113 mCancelCallback
->Call(thisObj
, aReason
, aRv
, "UnderlyingSource.cancel",
114 CallbackFunction::eRethrowExceptions
);
116 return promise
.forget();
120 // https://streams.spec.whatwg.org/#readablestream-set-up
121 // https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support
122 // Step 1: Let startAlgorithm be an algorithm that returns undefined.
123 void UnderlyingSourceAlgorithmsWrapper::StartCallback(
124 JSContext
*, ReadableStreamController
&, JS::MutableHandle
<JS::Value
> aRetVal
,
126 aRetVal
.setUndefined();
130 // https://streams.spec.whatwg.org/#readablestream-set-up
131 // https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support
132 // Step 2: Let pullAlgorithmWrapper be an algorithm that runs these steps:
133 already_AddRefed
<Promise
> UnderlyingSourceAlgorithmsWrapper::PullCallback(
134 JSContext
* aCx
, ReadableStreamController
& aController
, ErrorResult
& aRv
) {
135 nsCOMPtr
<nsIGlobalObject
> global
= aController
.GetParentObject();
136 return PromisifyAlgorithm(
138 [&](ErrorResult
& aRv
) MOZ_CAN_RUN_SCRIPT_FOR_DEFINITION
{
139 return PullCallbackImpl(aCx
, aController
, aRv
);
145 // https://streams.spec.whatwg.org/#readablestream-set-up
146 // https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support
147 // Step 3: Let cancelAlgorithmWrapper be an algorithm that runs these steps:
148 already_AddRefed
<Promise
> UnderlyingSourceAlgorithmsWrapper::CancelCallback(
149 JSContext
* aCx
, const Optional
<JS::Handle
<JS::Value
>>& aReason
,
151 nsCOMPtr
<nsIGlobalObject
> global
= xpc::CurrentNativeGlobal(aCx
);
152 return PromisifyAlgorithm(
154 [&](ErrorResult
& aRv
) MOZ_CAN_RUN_SCRIPT_FOR_DEFINITION
{
155 return CancelCallbackImpl(aCx
, aReason
, aRv
);
160 NS_IMPL_ISUPPORTS(InputStreamHolder
, nsIInputStreamCallback
)
162 InputStreamHolder::InputStreamHolder(nsIGlobalObject
* aGlobal
,
163 InputToReadableStreamAlgorithms
* aCallback
,
164 nsIAsyncInputStream
* aInput
)
165 : GlobalTeardownObserver(aGlobal
), mCallback(aCallback
), mInput(aInput
) {}
167 void InputStreamHolder::Init(JSContext
* aCx
) {
168 if (!NS_IsMainThread()) {
170 WorkerPrivate
* workerPrivate
= GetWorkerPrivateFromContext(aCx
);
171 MOZ_ASSERT(workerPrivate
);
173 workerPrivate
->AssertIsOnWorkerThread();
175 // Note, this will create a ref-cycle between the holder and the stream.
176 // The cycle is broken when the stream is closed or the worker begins
178 mWorkerRef
= StrongWorkerRef::Create(workerPrivate
, "InputStreamHolder",
179 [self
= RefPtr
{this}]() {});
180 if (NS_WARN_IF(!mWorkerRef
)) {
186 InputStreamHolder::~InputStreamHolder() = default;
188 void InputStreamHolder::DisconnectFromOwner() {
190 GlobalTeardownObserver::DisconnectFromOwner();
193 void InputStreamHolder::Shutdown() {
197 // NOTE(krosylight): Dropping mAsyncWaitAlgorithms here means letting cycle
198 // collection happen on the underlying source, which can cause a dangling
199 // read promise that never resolves. Doing so shouldn't be a problem at
201 // Note that this is currently primarily for Fetch which does not explicitly
202 // close its streams at shutdown. (i.e. to prevent memory leak for cases e.g
203 // WPT /fetch/api/basic/stream-response.any.html)
204 mAsyncWaitAlgorithms
= nullptr;
205 // If we have an AsyncWait running, we'll get a callback and clear
206 // the mAsyncWaitWorkerRef
207 mWorkerRef
= nullptr;
210 nsresult
InputStreamHolder::AsyncWait(uint32_t aFlags
, uint32_t aRequestedCount
,
211 nsIEventTarget
* aEventTarget
) {
212 nsresult rv
= mInput
->AsyncWait(this, aFlags
, aRequestedCount
, aEventTarget
);
213 if (NS_SUCCEEDED(rv
)) {
214 mAsyncWaitWorkerRef
= mWorkerRef
;
215 mAsyncWaitAlgorithms
= mCallback
;
220 NS_IMETHODIMP
InputStreamHolder::OnInputStreamReady(
221 nsIAsyncInputStream
* aStream
) {
222 mAsyncWaitWorkerRef
= nullptr;
223 mAsyncWaitAlgorithms
= nullptr;
224 // We may get called back after ::Shutdown()
226 return mCallback
->OnInputStreamReady(aStream
);
228 return NS_ERROR_FAILURE
;
231 NS_IMPL_ISUPPORTS_CYCLE_COLLECTION_INHERITED(InputToReadableStreamAlgorithms
,
232 UnderlyingSourceAlgorithmsWrapper
,
233 nsIInputStreamCallback
)
234 NS_IMPL_CYCLE_COLLECTION_WEAK_PTR_INHERITED(InputToReadableStreamAlgorithms
,
235 UnderlyingSourceAlgorithmsWrapper
,
236 mPullPromise
, mStream
)
238 InputToReadableStreamAlgorithms::InputToReadableStreamAlgorithms(
239 JSContext
* aCx
, nsIAsyncInputStream
* aInput
, ReadableStream
* aStream
)
240 : mOwningEventTarget(GetCurrentSerialEventTarget()),
241 mInput(new InputStreamHolder(aStream
->GetParentObject(), this, aInput
)),
246 already_AddRefed
<Promise
> InputToReadableStreamAlgorithms::PullCallbackImpl(
247 JSContext
* aCx
, ReadableStreamController
& aController
, ErrorResult
& aRv
) {
248 MOZ_ASSERT(aController
.IsByte());
249 ReadableStream
* stream
= aController
.Stream();
252 MOZ_DIAGNOSTIC_ASSERT(stream
->Disturbed());
254 MOZ_DIAGNOSTIC_ASSERT(!IsClosed());
255 MOZ_ASSERT(!mPullPromise
);
256 mPullPromise
= Promise::CreateInfallible(aController
.GetParentObject());
258 MOZ_DIAGNOSTIC_ASSERT(mInput
);
260 nsresult rv
= mInput
->AsyncWait(0, 0, mOwningEventTarget
);
261 if (NS_WARN_IF(NS_FAILED(rv
))) {
262 ErrorPropagation(aCx
, stream
, rv
);
267 return do_AddRef(mPullPromise
);
270 // _BOUNDARY because OnInputStreamReady doesn't have [can-run-script]
271 MOZ_CAN_RUN_SCRIPT_BOUNDARY NS_IMETHODIMP
272 InputToReadableStreamAlgorithms::OnInputStreamReady(
273 nsIAsyncInputStream
* aStream
) {
274 MOZ_DIAGNOSTIC_ASSERT(aStream
);
276 // Already closed. We have nothing else to do here.
281 AutoEntryScript
aes(mStream
->GetParentObject(),
282 "InputToReadableStream data available");
284 MOZ_DIAGNOSTIC_ASSERT(mInput
);
286 JSContext
* cx
= aes
.cx();
289 nsresult rv
= mInput
->Available(&size
);
290 MOZ_ASSERT_IF(NS_SUCCEEDED(rv
), size
> 0);
292 // No warning for stream closed.
293 if (rv
== NS_BASE_STREAM_CLOSED
|| NS_WARN_IF(NS_FAILED(rv
))) {
294 ErrorPropagation(cx
, mStream
, rv
);
298 // Not having a promise means we are pinged by stream closure
299 // (WAIT_CLOSURE_ONLY below), but here we still have more data to read. Let's
300 // wait for the next read request in that case.
305 MOZ_DIAGNOSTIC_ASSERT(mPullPromise
->State() ==
306 Promise::PromiseState::Pending
);
308 ErrorResult errorResult
;
309 PullFromInputStream(cx
, size
, errorResult
);
310 errorResult
.WouldReportJSException();
311 if (errorResult
.Failed()) {
312 ErrorPropagation(cx
, mStream
, errorResult
.StealNSResult());
316 // PullFromInputStream can fulfill read request, which can trigger read
317 // request chunk steps, which again may execute JS. But it should be still
318 // safe from cycle collection as the caller nsIAsyncInputStream should hold
319 // the reference of `this`.
321 // That said, it's generally good to be cautious as there's no guarantee that
322 // the interface is implemented in the safest way.
323 MOZ_DIAGNOSTIC_ASSERT(mPullPromise
);
325 mPullPromise
->MaybeResolveWithUndefined();
326 mPullPromise
= nullptr;
329 MOZ_DIAGNOSTIC_ASSERT(mInput
);
331 // Subscribe WAIT_CLOSURE_ONLY so that OnInputStreamReady can be called when
333 rv
= mInput
->AsyncWait(nsIAsyncInputStream::WAIT_CLOSURE_ONLY
, 0,
335 if (NS_WARN_IF(NS_FAILED(rv
))) {
336 ErrorPropagation(cx
, mStream
, errorResult
.StealNSResult());
344 void InputToReadableStreamAlgorithms::WriteIntoReadRequestBuffer(
345 JSContext
* aCx
, ReadableStream
* aStream
, JS::Handle
<JSObject
*> aBuffer
,
346 uint32_t aLength
, uint32_t* aByteWritten
, ErrorResult
& aRv
) {
347 MOZ_DIAGNOSTIC_ASSERT(aBuffer
);
348 MOZ_DIAGNOSTIC_ASSERT(aByteWritten
);
349 MOZ_DIAGNOSTIC_ASSERT(mInput
);
350 MOZ_DIAGNOSTIC_ASSERT(!IsClosed());
351 MOZ_DIAGNOSTIC_ASSERT(mPullPromise
->State() ==
352 Promise::PromiseState::Pending
);
358 // Bug 1754513: Hazard suppression.
360 // Because mInput->Read is detected as possibly GCing by the
361 // current state of our static hazard analysis, we need to do the
362 // suppression here. This can be removed with future improvements
363 // to the static analysis.
364 JS::AutoSuppressGCAnalysis suppress
;
365 JS::AutoCheckCannotGC noGC
;
368 buffer
= JS_GetArrayBufferViewData(aBuffer
, &isSharedMemory
, noGC
);
369 MOZ_ASSERT(!isSharedMemory
);
371 rv
= mInput
->Read(static_cast<char*>(buffer
), aLength
, &written
);
372 if (NS_WARN_IF(NS_FAILED(rv
))) {
378 *aByteWritten
= written
;
381 // If bytesWritten is zero, then the stream has been closed; return rather
382 // than enqueueing a chunk filled with zeros.
383 aRv
.Throw(NS_BASE_STREAM_CLOSED
);
390 // https://streams.spec.whatwg.org/#readablestream-pull-from-bytes
391 // This is a ReadableStream algorithm but will probably be used solely in
392 // InputToReadableStreamAlgorithms.
393 void InputToReadableStreamAlgorithms::PullFromInputStream(JSContext
* aCx
,
396 // Step 1. Assert: stream.[[controller]] implements
397 // ReadableByteStreamController.
398 MOZ_ASSERT(mStream
->Controller()->IsByte());
400 // Step 2. Let available be bytes’s length. (aAvailable)
401 // Step 3. Let desiredSize be available.
402 uint64_t desiredSize
= aAvailable
;
404 // Step 4. If stream’s current BYOB request view is non-null, then set
405 // desiredSize to stream’s current BYOB request view's byte length.
406 JS::Rooted
<JSObject
*> byobView(aCx
);
407 mStream
->GetCurrentBYOBRequestView(aCx
, &byobView
, aRv
);
412 desiredSize
= JS_GetArrayBufferViewByteLength(byobView
);
415 // Step 5. Let pullSize be the smaller value of available and desiredSize.
417 // To avoid OOMing up on huge amounts of available data on a 32 bit system,
418 // as well as potentially overflowing nsIInputStream's Read method's
419 // parameter, let's limit our maximum chunk size to 256MB.
421 // (Note that nsIInputStream uses uint64_t for Available and uint32_t for
423 uint64_t pullSize
= std::min(static_cast<uint64_t>(256 * 1024 * 1024),
424 std::min(aAvailable
, desiredSize
));
426 // Step 6. Let pulled be the first pullSize bytes of bytes.
427 // Step 7. Remove the first pullSize bytes from bytes.
429 // We do this in step 8 and 9, as we don't have a direct access to the data
430 // but need to let nsIInputStream to write into the view.
432 // Step 8. If stream’s current BYOB request view is non-null, then:
434 // Step 8.1. Write pulled into stream’s current BYOB request view.
435 uint32_t bytesWritten
= 0;
436 WriteIntoReadRequestBuffer(aCx
, mStream
, byobView
, pullSize
, &bytesWritten
,
442 // Step 8.2. Perform ?
443 // ReadableByteStreamControllerRespond(stream.[[controller]], pullSize).
445 // But we do not use pullSize but use byteWritten here, since nsIInputStream
446 // does not guarantee to read as much as it told in Available().
447 MOZ_DIAGNOSTIC_ASSERT(pullSize
== bytesWritten
);
448 ReadableByteStreamControllerRespond(
449 aCx
, MOZ_KnownLive(mStream
->Controller()->AsByte()), bytesWritten
, aRv
);
451 // Step 9. Otherwise,
453 // Step 9.1. Set view to the result of creating a Uint8Array from pulled in
454 // stream’s relevant Realm.
455 UniquePtr
<uint8_t[], JS::FreePolicy
> buffer(
456 static_cast<uint8_t*>(JS_malloc(aCx
, pullSize
)));
458 aRv
.ThrowTypeError("Out of memory");
462 uint32_t bytesWritten
= 0;
463 nsresult rv
= mInput
->Read((char*)buffer
.get(), pullSize
, &bytesWritten
);
465 rv
= NS_BASE_STREAM_CLOSED
;
472 MOZ_DIAGNOSTIC_ASSERT(pullSize
== bytesWritten
);
473 JS::Rooted
<JSObject
*> view(aCx
, nsJSUtils::MoveBufferAsUint8Array(
474 aCx
, bytesWritten
, std::move(buffer
)));
476 JS_ClearPendingException(aCx
);
477 aRv
.ThrowTypeError("Out of memory");
481 // Step 9.2. Perform ?
482 // ReadableByteStreamControllerEnqueue(stream.[[controller]], view).
483 ReadableByteStreamControllerEnqueue(
484 aCx
, MOZ_KnownLive(mStream
->Controller()->AsByte()), view
, aRv
);
488 void InputToReadableStreamAlgorithms::CloseAndReleaseObjects(
489 JSContext
* aCx
, ReadableStream
* aStream
) {
490 MOZ_DIAGNOSTIC_ASSERT(!IsClosed());
494 if (aStream
->State() == ReadableStream::ReaderState::Readable
) {
495 IgnoredErrorResult rv
;
496 aStream
->CloseNative(aCx
, rv
);
497 NS_WARNING_ASSERTION(!rv
.Failed(), "Failed to Close Stream");
501 void InputToReadableStreamAlgorithms::ReleaseObjects() {
503 mInput
->CloseWithStatus(NS_BASE_STREAM_CLOSED
);
508 // It's okay to leave a potentially unsettled promise as-is as this is only
509 // used to prevent reentrant to PullCallback. CloseNative() or ErrorNative()
510 // will settle the read requests for us.
511 mPullPromise
= nullptr;
514 nsIInputStream
* InputToReadableStreamAlgorithms::MaybeGetInputStreamIfUnread() {
515 MOZ_ASSERT(!mStream
->Disturbed(),
516 "Should be only called on non-disturbed streams");
517 return mInput
->GetInputStream();
520 void InputToReadableStreamAlgorithms::ErrorPropagation(JSContext
* aCx
,
521 ReadableStream
* aStream
,
528 // Let's close the stream.
529 if (aError
== NS_BASE_STREAM_CLOSED
) {
530 CloseAndReleaseObjects(aCx
, aStream
);
534 // Let's use a generic error.
536 // XXXbz can we come up with a better error message here to tell the
537 // consumer what went wrong?
538 rv
.ThrowTypeError("Error in input stream");
540 JS::Rooted
<JS::Value
> errorValue(aCx
);
541 bool ok
= ToJSValue(aCx
, std::move(rv
), &errorValue
);
542 MOZ_RELEASE_ASSERT(ok
, "ToJSValue never fails for ErrorResult");
545 // This will be ignored if it's already errored.
546 IgnoredErrorResult rv
;
547 aStream
->ErrorNative(aCx
, errorValue
, rv
);
548 NS_WARNING_ASSERTION(!rv
.Failed(), "Failed to error InputToReadableStream");
551 MOZ_ASSERT(IsClosed());
554 NS_IMPL_ISUPPORTS_CYCLE_COLLECTION_INHERITED_0(
555 NonAsyncInputToReadableStreamAlgorithms
, UnderlyingSourceAlgorithmsWrapper
)
556 NS_IMPL_CYCLE_COLLECTION_INHERITED(NonAsyncInputToReadableStreamAlgorithms
,
557 UnderlyingSourceAlgorithmsWrapper
,
560 already_AddRefed
<Promise
>
561 NonAsyncInputToReadableStreamAlgorithms::PullCallbackImpl(
562 JSContext
* aCx
, ReadableStreamController
& aController
, ErrorResult
& aRv
) {
563 if (!mAsyncAlgorithms
) {
564 nsCOMPtr
<nsIAsyncInputStream
> asyncStream
;
566 // NS_MakeAsyncNonBlockingInputStream may immediately start a stream read
567 // via nsInputStreamTransport::OpenInputStream, which is why this should be
568 // called on a pull callback instead of in the constructor.
569 nsresult rv
= NS_MakeAsyncNonBlockingInputStream(
570 mInput
.forget(), getter_AddRefs(asyncStream
));
571 if (NS_WARN_IF(NS_FAILED(rv
))) {
576 mAsyncAlgorithms
= MakeRefPtr
<InputToReadableStreamAlgorithms
>(
577 aCx
, asyncStream
, aController
.Stream());
581 return mAsyncAlgorithms
->PullCallbackImpl(aCx
, aController
, aRv
);
584 } // namespace mozilla::dom