1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_stream_sequencer.h"
10 #include "base/logging.h"
11 #include "net/quic/reliable_quic_stream.h"
14 using std::numeric_limits
;
18 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream
* quic_stream
)
19 : stream_(quic_stream
),
20 num_bytes_consumed_(0),
21 max_frame_memory_(numeric_limits
<size_t>::max()),
22 close_offset_(numeric_limits
<QuicStreamOffset
>::max()),
26 QuicStreamSequencer::QuicStreamSequencer(size_t max_frame_memory
,
27 ReliableQuicStream
* quic_stream
)
28 : stream_(quic_stream
),
29 num_bytes_consumed_(0),
30 max_frame_memory_(max_frame_memory
),
31 close_offset_(numeric_limits
<QuicStreamOffset
>::max()),
33 if (max_frame_memory
< kMaxPacketSize
) {
34 LOG(DFATAL
) << "Setting max frame memory to " << max_frame_memory
35 << ". Some frames will be impossible to handle.";
39 QuicStreamSequencer::~QuicStreamSequencer() {
42 bool QuicStreamSequencer::WillAcceptStreamFrame(
43 const QuicStreamFrame
& frame
) const {
44 size_t data_len
= frame
.data
.TotalBufferSize();
45 DCHECK_LE(data_len
, max_frame_memory_
);
47 if (IsDuplicate(frame
)) {
50 QuicStreamOffset byte_offset
= frame
.offset
;
51 if (data_len
> max_frame_memory_
) {
52 // We're never going to buffer this frame and we can't pass it up.
53 // The stream might only consume part of it and we'd need a partial ack.
55 // Ideally this should never happen, as we check that
56 // max_frame_memory_ > kMaxPacketSize and lower levels should reject
57 // frames larger than that.
60 if (byte_offset
+ data_len
- num_bytes_consumed_
> max_frame_memory_
) {
61 // We can buffer this but not right now. Toss it.
62 // It might be worth trying an experiment where we try best-effort buffering
68 bool QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame
& frame
) {
69 if (!WillAcceptStreamFrame(frame
)) {
70 // This should not happen, as WillAcceptFrame should be called before
71 // OnStreamFrame. Error handling should be done by the caller.
74 if (IsDuplicate(frame
)) {
75 // Silently ignore duplicates.
79 QuicStreamOffset byte_offset
= frame
.offset
;
80 size_t data_len
= frame
.data
.TotalBufferSize();
81 if (data_len
== 0 && !frame
.fin
) {
82 // Stream frames must have data or a fin flag.
83 stream_
->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME
,
84 "Empty stream frame without FIN set.");
89 CloseStreamAtOffset(frame
.offset
+ data_len
);
96 data
.AppendIovec(frame
.data
.iovec(), frame
.data
.Size());
97 if (!blocked_
&& byte_offset
== num_bytes_consumed_
) {
98 DVLOG(1) << "Processing byte offset " << byte_offset
;
99 size_t bytes_consumed
= 0;
100 for (size_t i
= 0; i
< data
.Size(); ++i
) {
101 bytes_consumed
+= stream_
->ProcessRawData(
102 static_cast<char*>(data
.iovec()[i
].iov_base
),
103 data
.iovec()[i
].iov_len
);
105 num_bytes_consumed_
+= bytes_consumed
;
106 if (MaybeCloseStream()) {
109 if (bytes_consumed
> data_len
) {
110 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
);
112 } else if (bytes_consumed
== data_len
) {
113 FlushBufferedFrames();
114 return true; // it's safe to ack this frame.
116 // Set ourselves up to buffer what's left
117 data_len
-= bytes_consumed
;
118 data
.Consume(bytes_consumed
);
119 byte_offset
+= bytes_consumed
;
122 for (size_t i
= 0; i
< data
.Size(); ++i
) {
123 DVLOG(1) << "Buffering stream data at offset " << byte_offset
;
124 frames_
.insert(make_pair(byte_offset
, string(
125 static_cast<char*>(data
.iovec()[i
].iov_base
),
126 data
.iovec()[i
].iov_len
)));
127 byte_offset
+= data
.iovec()[i
].iov_len
;
132 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset
) {
133 const QuicStreamOffset kMaxOffset
= numeric_limits
<QuicStreamOffset
>::max();
135 // If we have a scheduled termination or close, any new offset should match
137 if (close_offset_
!= kMaxOffset
&& offset
!= close_offset_
) {
138 stream_
->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS
);
142 close_offset_
= offset
;
147 bool QuicStreamSequencer::MaybeCloseStream() {
148 if (!blocked_
&& IsClosed()) {
149 DVLOG(1) << "Passing up termination, as we've processed "
150 << num_bytes_consumed_
<< " of " << close_offset_
152 // Technically it's an error if num_bytes_consumed isn't exactly
153 // equal, but error handling seems silly at this point.
154 stream_
->OnFinRead();
161 int QuicStreamSequencer::GetReadableRegions(iovec
* iov
, size_t iov_len
) {
163 FrameMap::iterator it
= frames_
.begin();
165 QuicStreamOffset offset
= num_bytes_consumed_
;
166 while (it
!= frames_
.end() && index
< iov_len
) {
167 if (it
->first
!= offset
) return index
;
169 iov
[index
].iov_base
= static_cast<void*>(
170 const_cast<char*>(it
->second
.data()));
171 iov
[index
].iov_len
= it
->second
.size();
172 offset
+= it
->second
.size();
180 int QuicStreamSequencer::Readv(const struct iovec
* iov
, size_t iov_len
) {
182 FrameMap::iterator it
= frames_
.begin();
183 size_t iov_index
= 0;
184 size_t iov_offset
= 0;
185 size_t frame_offset
= 0;
186 size_t initial_bytes_consumed
= num_bytes_consumed_
;
188 while (iov_index
< iov_len
&&
189 it
!= frames_
.end() &&
190 it
->first
== num_bytes_consumed_
) {
191 int bytes_to_read
= min(iov
[iov_index
].iov_len
- iov_offset
,
192 it
->second
.size() - frame_offset
);
194 char* iov_ptr
= static_cast<char*>(iov
[iov_index
].iov_base
) + iov_offset
;
196 it
->second
.data() + frame_offset
, bytes_to_read
);
197 frame_offset
+= bytes_to_read
;
198 iov_offset
+= bytes_to_read
;
200 if (iov
[iov_index
].iov_len
== iov_offset
) {
201 // We've filled this buffer.
205 if (it
->second
.size() == frame_offset
) {
206 // We've copied this whole frame
207 num_bytes_consumed_
+= it
->second
.size();
209 it
= frames_
.begin();
213 // We've finished copying. If we have a partial frame, update it.
214 if (frame_offset
!= 0) {
215 frames_
.insert(make_pair(it
->first
+ frame_offset
,
216 it
->second
.substr(frame_offset
)));
217 frames_
.erase(frames_
.begin());
218 num_bytes_consumed_
+= frame_offset
;
220 return num_bytes_consumed_
- initial_bytes_consumed
;
223 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed
) {
225 size_t end_offset
= num_bytes_consumed_
+ num_bytes_consumed
;
226 while (!frames_
.empty() && end_offset
!= num_bytes_consumed_
) {
227 FrameMap::iterator it
= frames_
.begin();
228 if (it
->first
!= num_bytes_consumed_
) {
229 LOG(DFATAL
) << "Invalid argument to MarkConsumed. "
230 << " num_bytes_consumed_: " << num_bytes_consumed_
231 << " end_offset: " << end_offset
232 << " offset: " << it
->first
233 << " length: " << it
->second
.length();
234 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
);
238 if (it
->first
+ it
->second
.length() <= end_offset
) {
239 num_bytes_consumed_
+= it
->second
.length();
240 // This chunk is entirely consumed.
245 // Partially consume this frame.
246 size_t delta
= end_offset
- it
->first
;
247 num_bytes_consumed_
+= delta
;
248 frames_
.insert(make_pair(end_offset
, it
->second
.substr(delta
)));
254 bool QuicStreamSequencer::HasBytesToRead() const {
255 FrameMap::const_iterator it
= frames_
.begin();
257 return it
!= frames_
.end() && it
->first
== num_bytes_consumed_
;
260 bool QuicStreamSequencer::IsClosed() const {
261 return num_bytes_consumed_
>= close_offset_
;
264 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame
& frame
) const {
265 // A frame is duplicate if the frame offset is smaller than our bytes consumed
266 // or we have stored the frame in our map.
267 // TODO(pwestin): Is it possible that a new frame contain more data even if
268 // the offset is the same?
269 return frame
.offset
< num_bytes_consumed_
||
270 frames_
.find(frame
.offset
) != frames_
.end();
273 void QuicStreamSequencer::SetBlockedUntilFlush() {
277 void QuicStreamSequencer::FlushBufferedFrames() {
279 FrameMap::iterator it
= frames_
.find(num_bytes_consumed_
);
280 while (it
!= frames_
.end()) {
281 DVLOG(1) << "Flushing buffered packet at offset " << it
->first
;
282 string
* data
= &it
->second
;
283 size_t bytes_consumed
= stream_
->ProcessRawData(data
->c_str(),
285 num_bytes_consumed_
+= bytes_consumed
;
286 if (MaybeCloseStream()) {
289 if (bytes_consumed
> data
->size()) {
290 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
); // Programming error
292 } else if (bytes_consumed
== data
->size()) {
294 it
= frames_
.find(num_bytes_consumed_
);
296 string new_data
= it
->second
.substr(bytes_consumed
);
298 frames_
.insert(make_pair(num_bytes_consumed_
, new_data
));