1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_stream_sequencer.h"
11 #include "base/logging.h"
12 #include "net/quic/reliable_quic_stream.h"
15 using std::numeric_limits
;
20 QuicStreamSequencer::FrameData::FrameData(QuicStreamOffset offset
,
22 : offset(offset
), segment(segment
) {}
24 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream
* quic_stream
)
25 : stream_(quic_stream
),
26 num_bytes_consumed_(0),
27 close_offset_(numeric_limits
<QuicStreamOffset
>::max()),
29 num_bytes_buffered_(0),
30 num_frames_received_(0),
31 num_duplicate_frames_received_(0),
32 num_early_frames_received_(0) {
35 QuicStreamSequencer::~QuicStreamSequencer() {
38 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame
& frame
) {
39 ++num_frames_received_
;
40 FrameList::iterator insertion_point
= FindInsertionPoint(frame
);
41 if (IsDuplicate(frame
, insertion_point
)) {
42 ++num_duplicate_frames_received_
;
43 // Silently ignore duplicates.
47 if (FrameOverlapsBufferedData(frame
, insertion_point
)) {
48 stream_
->CloseConnectionWithDetails(
49 QUIC_INVALID_STREAM_FRAME
, "Stream frame overlaps with buffered data.");
53 const QuicStreamOffset byte_offset
= frame
.offset
;
54 const size_t data_len
= frame
.data
.length();
55 if (data_len
== 0 && !frame
.fin
) {
56 // Stream frames must have data or a fin flag.
57 stream_
->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME
,
58 "Empty stream frame without FIN set.");
63 CloseStreamAtOffset(frame
.offset
+ data_len
);
69 if (byte_offset
> num_bytes_consumed_
) {
70 ++num_early_frames_received_
;
73 // If the frame has arrived in-order then process it immediately, only
74 // buffering if the stream is unable to process it.
75 size_t bytes_consumed
= 0;
76 if (!blocked_
&& byte_offset
== num_bytes_consumed_
) {
77 DVLOG(1) << "Processing byte offset " << byte_offset
;
79 stream_
->ProcessRawData(frame
.data
.data(), frame
.data
.length());
80 num_bytes_consumed_
+= bytes_consumed
;
81 stream_
->AddBytesConsumed(bytes_consumed
);
83 if (MaybeCloseStream()) {
86 if (bytes_consumed
> data_len
) {
87 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
);
89 } else if (bytes_consumed
== data_len
) {
90 FlushBufferedFrames();
91 return; // it's safe to ack this frame.
95 // Buffer any remaining data to be consumed by the stream when ready.
96 if (bytes_consumed
< data_len
) {
97 DVLOG(1) << "Buffering stream data at offset " << byte_offset
;
98 const size_t remaining_length
= data_len
- bytes_consumed
;
99 // Inserting an empty string and then copying to avoid the extra copy.
100 insertion_point
= buffered_frames_
.insert(
101 insertion_point
, FrameData(byte_offset
+ bytes_consumed
, ""));
102 insertion_point
->segment
=
103 string(frame
.data
.data() + bytes_consumed
, remaining_length
);
104 num_bytes_buffered_
+= remaining_length
;
108 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset
) {
109 const QuicStreamOffset kMaxOffset
= numeric_limits
<QuicStreamOffset
>::max();
111 // If there is a scheduled close, the new offset should match it.
112 if (close_offset_
!= kMaxOffset
&& offset
!= close_offset_
) {
113 stream_
->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS
);
117 close_offset_
= offset
;
122 bool QuicStreamSequencer::MaybeCloseStream() {
123 if (!blocked_
&& IsClosed()) {
124 DVLOG(1) << "Passing up termination, as we've processed "
125 << num_bytes_consumed_
<< " of " << close_offset_
127 // Technically it's an error if num_bytes_consumed isn't exactly
128 // equal, but error handling seems silly at this point.
129 stream_
->OnFinRead();
130 buffered_frames_
.clear();
131 num_bytes_buffered_
= 0;
137 int QuicStreamSequencer::GetReadableRegions(iovec
* iov
, size_t iov_len
) const {
139 FrameList::const_iterator it
= buffered_frames_
.begin();
141 QuicStreamOffset offset
= num_bytes_consumed_
;
142 while (it
!= buffered_frames_
.end() && index
< iov_len
) {
143 if (it
->offset
!= offset
) {
147 iov
[index
].iov_base
=
148 static_cast<void*>(const_cast<char*>(it
->segment
.data()));
149 iov
[index
].iov_len
= it
->segment
.size();
150 offset
+= it
->segment
.size();
158 int QuicStreamSequencer::Readv(const struct iovec
* iov
, size_t iov_len
) {
160 FrameList::iterator it
= buffered_frames_
.begin();
161 size_t iov_index
= 0;
162 size_t iov_offset
= 0;
163 size_t frame_offset
= 0;
164 QuicStreamOffset initial_bytes_consumed
= num_bytes_consumed_
;
166 while (iov_index
< iov_len
&& it
!= buffered_frames_
.end() &&
167 it
->offset
== num_bytes_consumed_
) {
168 int bytes_to_read
= min(iov
[iov_index
].iov_len
- iov_offset
,
169 it
->segment
.size() - frame_offset
);
171 char* iov_ptr
= static_cast<char*>(iov
[iov_index
].iov_base
) + iov_offset
;
172 memcpy(iov_ptr
, it
->segment
.data() + frame_offset
, bytes_to_read
);
173 frame_offset
+= bytes_to_read
;
174 iov_offset
+= bytes_to_read
;
176 if (iov
[iov_index
].iov_len
== iov_offset
) {
177 // We've filled this buffer.
181 if (it
->segment
.size() == frame_offset
) {
182 // We've copied this whole frame
183 RecordBytesConsumed(it
->segment
.size());
184 buffered_frames_
.erase(it
);
185 it
= buffered_frames_
.begin();
189 // Done copying. If there is a partial frame, update it.
190 if (frame_offset
!= 0) {
191 buffered_frames_
.push_front(
192 FrameData(it
->offset
+ frame_offset
, it
->segment
.substr(frame_offset
)));
193 buffered_frames_
.erase(it
);
194 RecordBytesConsumed(frame_offset
);
196 return static_cast<int>(num_bytes_consumed_
- initial_bytes_consumed
);
199 bool QuicStreamSequencer::HasBytesToRead() const {
200 return !buffered_frames_
.empty() &&
201 buffered_frames_
.begin()->offset
== num_bytes_consumed_
;
204 bool QuicStreamSequencer::IsClosed() const {
205 return num_bytes_consumed_
>= close_offset_
;
208 QuicStreamSequencer::FrameList::iterator
209 QuicStreamSequencer::FindInsertionPoint(const QuicStreamFrame
& frame
) {
210 if (buffered_frames_
.empty()) {
211 return buffered_frames_
.begin();
213 // If it's after all buffered_frames, return the end.
214 if (frame
.offset
>= (buffered_frames_
.rbegin()->offset
+
215 buffered_frames_
.rbegin()->segment
.length())) {
216 return buffered_frames_
.end();
218 FrameList::iterator iter
= buffered_frames_
.begin();
219 // Only advance the iterator if the data begins after the already received
220 // frame. If the new frame overlaps with an existing frame, the iterator will
221 // still point to the frame it overlaps with.
222 while (iter
!= buffered_frames_
.end() &&
223 frame
.offset
>= iter
->offset
+ iter
->segment
.length()) {
229 bool QuicStreamSequencer::FrameOverlapsBufferedData(
230 const QuicStreamFrame
& frame
,
231 FrameList::const_iterator insertion_point
) const {
232 if (buffered_frames_
.empty() || insertion_point
== buffered_frames_
.end()) {
235 // If there is a buffered frame with a higher starting offset, then check to
236 // see if the new frame overlaps the beginning of the higher frame.
237 if (frame
.offset
< insertion_point
->offset
&&
238 frame
.offset
+ frame
.data
.length() > insertion_point
->offset
) {
239 DVLOG(1) << "New frame overlaps next frame: " << frame
.offset
<< " + "
240 << frame
.data
.size() << " > " << insertion_point
->offset
;
243 // If there is a buffered frame with a lower starting offset, then check to
244 // see if the buffered frame runs into the new frame.
245 if (frame
.offset
>= insertion_point
->offset
&&
247 insertion_point
->offset
+ insertion_point
->segment
.length()) {
248 DVLOG(1) << "Preceeding frame overlaps new frame: "
249 << insertion_point
->offset
<< " + "
250 << insertion_point
->segment
.length() << " > " << frame
.offset
;
257 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed
) {
259 size_t end_offset
= num_bytes_consumed_
+ num_bytes_consumed
;
260 while (!buffered_frames_
.empty() && end_offset
!= num_bytes_consumed_
) {
261 FrameList::iterator it
= buffered_frames_
.begin();
262 if (it
->offset
!= num_bytes_consumed_
) {
263 LOG(DFATAL
) << "Invalid argument to MarkConsumed. "
264 << " num_bytes_consumed_: " << num_bytes_consumed_
265 << " end_offset: " << end_offset
<< " offset: " << it
->offset
266 << " length: " << it
->segment
.length();
267 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
);
271 if (it
->offset
+ it
->segment
.length() <= end_offset
) {
272 RecordBytesConsumed(it
->segment
.length());
273 // This chunk is entirely consumed.
274 buffered_frames_
.erase(it
);
278 // Partially consume this frame.
279 size_t delta
= end_offset
- it
->offset
;
280 RecordBytesConsumed(delta
);
281 string new_data
= it
->segment
.substr(delta
);
282 buffered_frames_
.erase(it
);
283 buffered_frames_
.push_front(FrameData(num_bytes_consumed_
, new_data
));
288 bool QuicStreamSequencer::IsDuplicate(
289 const QuicStreamFrame
& frame
,
290 FrameList::const_iterator insertion_point
) const {
291 // A frame is duplicate if the frame offset is smaller than the bytes consumed
292 // or identical to an already received frame.
293 return frame
.offset
< num_bytes_consumed_
||
294 (insertion_point
!= buffered_frames_
.end() &&
295 frame
.offset
== insertion_point
->offset
);
298 void QuicStreamSequencer::SetBlockedUntilFlush() {
302 void QuicStreamSequencer::FlushBufferedFrames() {
304 FrameList::iterator it
= buffered_frames_
.begin();
305 while (it
!= buffered_frames_
.end() && it
->offset
== num_bytes_consumed_
) {
306 DVLOG(1) << "Flushing buffered packet at offset " << it
->offset
;
307 const string
* data
= &it
->segment
;
308 size_t bytes_consumed
= stream_
->ProcessRawData(data
->c_str(),
310 RecordBytesConsumed(bytes_consumed
);
311 if (MaybeCloseStream()) {
314 if (bytes_consumed
> data
->size()) {
315 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
); // Programming error
318 if (bytes_consumed
< data
->size()) {
319 string new_data
= data
->substr(bytes_consumed
);
320 buffered_frames_
.erase(it
);
321 buffered_frames_
.push_front(FrameData(num_bytes_consumed_
, new_data
));
324 buffered_frames_
.erase(it
++);
329 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed
) {
330 num_bytes_consumed_
+= bytes_consumed
;
331 num_bytes_buffered_
-= bytes_consumed
;
333 stream_
->AddBytesConsumed(bytes_consumed
);