1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #ifndef ReadableStreamImpl_h
6 #define ReadableStreamImpl_h
8 #include "bindings/core/v8/ExceptionState.h"
9 #include "bindings/core/v8/ScriptPromise.h"
10 #include "bindings/core/v8/ScriptPromiseResolver.h"
11 #include "bindings/core/v8/ScriptState.h"
12 #include "bindings/core/v8/ScriptValue.h"
13 #include "bindings/core/v8/V8ArrayBuffer.h"
14 #include "bindings/core/v8/V8Binding.h"
15 #include "bindings/core/v8/V8IteratorResultValue.h"
16 #include "core/dom/DOMArrayBuffer.h"
17 #include "core/dom/DOMArrayBufferView.h"
18 #include "core/dom/DOMException.h"
19 #include "core/streams/ReadableStream.h"
20 #include "wtf/Deque.h"
21 #include "wtf/RefPtr.h"
22 #include "wtf/text/WTFString.h"
27 // We define the default ChunkTypeTraits for frequently used types.
28 template<typename ChunkType
>
29 class ReadableStreamChunkTypeTraits
{
30 STATIC_ONLY(ReadableStreamChunkTypeTraits
);
34 class ReadableStreamChunkTypeTraits
<String
> {
36 typedef String HoldType
;
37 typedef const String
& PassType
;
39 static size_t size(const String
& chunk
) { return chunk
.length(); }
40 static ScriptValue
toScriptValue(ScriptState
* scriptState
, const HoldType
& value
)
42 return ScriptValue(scriptState
, v8String(scriptState
->isolate(), value
));
47 class ReadableStreamChunkTypeTraits
<DOMArrayBuffer
> {
49 typedef RefPtr
<DOMArrayBuffer
> HoldType
;
50 typedef PassRefPtr
<DOMArrayBuffer
> PassType
;
52 static size_t size(const PassType
& chunk
) { return chunk
->byteLength(); }
53 static ScriptValue
toScriptValue(ScriptState
* scriptState
, const HoldType
& value
)
55 return ScriptValue(scriptState
, toV8(value
.get(), scriptState
->context()->Global(), scriptState
->isolate()));
60 class ReadableStreamChunkTypeTraits
<DOMArrayBufferView
> {
62 typedef RefPtr
<DOMArrayBufferView
> HoldType
;
63 typedef PassRefPtr
<DOMArrayBufferView
> PassType
;
65 static size_t size(const PassType
& chunk
) { return chunk
->byteLength(); }
66 static ScriptValue
toScriptValue(ScriptState
* scriptState
, const HoldType
& value
)
68 return ScriptValue(scriptState
, toV8(value
.get(), scriptState
->context()->Global(), scriptState
->isolate()));
72 // ReadableStreamImpl<ChunkTypeTraits> is a ReadableStream subtype. It has a
73 // queue whose type depends on ChunkTypeTraits and it implements queue-related
74 // ReadableStream pure virtual methods.
75 template <typename ChunkTypeTraits
>
76 class ReadableStreamImpl
: public ReadableStream
{
78 class Strategy
: public GarbageCollectedFinalized
<Strategy
> {
80 virtual ~Strategy() { }
82 // These functions call ReadableStream::error on error.
83 virtual size_t size(const typename
ChunkTypeTraits::PassType
& chunk
, ReadableStream
*) { return ChunkTypeTraits::size(chunk
); }
84 virtual bool shouldApplyBackpressure(size_t totalQueueSize
, ReadableStream
*) = 0;
86 DEFINE_INLINE_VIRTUAL_TRACE() { }
89 class DefaultStrategy
: public Strategy
{
91 size_t size(const typename
ChunkTypeTraits::PassType
& chunk
, ReadableStream
*) override
{ return 1; }
92 bool shouldApplyBackpressure(size_t totalQueueSize
, ReadableStream
*) override
{ return totalQueueSize
> 1; }
95 class StrictStrategy
: public Strategy
{
97 size_t size(const typename
ChunkTypeTraits::PassType
& chunk
, ReadableStream
*) override
{ return 1; }
98 bool shouldApplyBackpressure(size_t totalQueueSize
, ReadableStream
*) override
{ return true; }
101 explicit ReadableStreamImpl(UnderlyingSource
* source
)
102 : ReadableStreamImpl(source
, new DefaultStrategy
) { }
103 ReadableStreamImpl(UnderlyingSource
* source
, Strategy
* strategy
)
104 : ReadableStream(source
)
105 , m_strategy(strategy
)
106 , m_totalQueueSize(0) { }
107 ~ReadableStreamImpl() override
{ }
109 // ReadableStream methods
110 ScriptPromise
read(ScriptState
*) override
;
112 bool enqueue(typename
ChunkTypeTraits::PassType
);
114 // This function is intended to be used by internal code to withdraw
115 // queued data. This pulls all data from this stream's queue, but
116 // ReadableStream public APIs can work with the behavior (i.e. it behaves
117 // as if multiple read-one-buffer calls were made).
118 void readInternal(Deque
<std::pair
<typename
ChunkTypeTraits::HoldType
, size_t>>& queue
);
120 DEFINE_INLINE_VIRTUAL_TRACE()
122 visitor
->trace(m_strategy
);
123 visitor
->trace(m_pendingReads
);
124 ReadableStream::trace(visitor
);
128 using PendingReads
= HeapDeque
<Member
<ScriptPromiseResolver
>>;
130 // ReadableStream methods
131 bool isQueueEmpty() const override
{ return m_queue
.isEmpty(); }
132 void clearQueue() override
135 m_totalQueueSize
= 0;
138 void resolveAllPendingReadsAsDone() override
140 for (auto& resolver
: m_pendingReads
) {
141 ScriptState
* scriptState
= resolver
->scriptState();
142 if (!scriptState
->contextIsValid())
144 ScriptState::Scope
scope(scriptState
);
145 resolver
->resolve(v8IteratorResultDone(scriptState
));
147 m_pendingReads
.clear();
150 void rejectAllPendingReads(DOMException
* reason
) override
152 for (auto& resolver
: m_pendingReads
)
153 resolver
->reject(reason
);
154 m_pendingReads
.clear();
157 bool shouldApplyBackpressure() override
159 return m_strategy
->shouldApplyBackpressure(m_totalQueueSize
, this);
161 bool hasPendingReads() const override
{ return !m_pendingReads
.isEmpty(); }
163 Member
<Strategy
> m_strategy
;
164 Deque
<std::pair
<typename
ChunkTypeTraits::HoldType
, size_t>> m_queue
;
165 PendingReads m_pendingReads
;
166 size_t m_totalQueueSize
;
169 template <typename ChunkTypeTraits
>
170 bool ReadableStreamImpl
<ChunkTypeTraits
>::enqueue(typename
ChunkTypeTraits::PassType chunk
)
172 size_t size
= m_strategy
->size(chunk
, this);
173 if (!enqueuePreliminaryCheck())
176 if (m_pendingReads
.isEmpty()) {
177 m_queue
.append(std::make_pair(chunk
, size
));
178 m_totalQueueSize
+= size
;
179 return enqueuePostAction();
182 ScriptPromiseResolver
* resolver
= m_pendingReads
.takeFirst();
183 ScriptState
* scriptState
= resolver
->scriptState();
184 if (!scriptState
->contextIsValid())
186 ScriptState::Scope
scope(scriptState
);
187 resolver
->resolve(v8IteratorResult(scriptState
, chunk
));
188 return enqueuePostAction();
191 template <typename ChunkTypeTraits
>
192 ScriptPromise ReadableStreamImpl
<ChunkTypeTraits
>::read(ScriptState
* scriptState
)
194 ASSERT(stateInternal() == Readable
);
195 if (m_queue
.isEmpty()) {
196 m_pendingReads
.append(ScriptPromiseResolver::create(scriptState
));
197 ScriptPromise promise
= m_pendingReads
.last()->promise();
198 readInternalPostAction();
202 auto pair
= m_queue
.takeFirst();
203 typename
ChunkTypeTraits::HoldType chunk
= pair
.first
;
204 size_t size
= pair
.second
;
205 ASSERT(m_totalQueueSize
>= size
);
206 m_totalQueueSize
-= size
;
207 readInternalPostAction();
209 return ScriptPromise::cast(scriptState
, v8IteratorResult(scriptState
, chunk
));
212 template <typename ChunkTypeTraits
>
213 void ReadableStreamImpl
<ChunkTypeTraits
>::readInternal(Deque
<std::pair
<typename
ChunkTypeTraits::HoldType
, size_t>>& queue
)
215 // We omit the preliminary check. Check it by yourself.
216 ASSERT(stateInternal() == Readable
);
217 ASSERT(m_pendingReads
.isEmpty());
218 ASSERT(queue
.isEmpty());
221 m_totalQueueSize
= 0;
222 readInternalPostAction();
227 #endif // ReadableStreamImpl_h