1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_stream_sequencer.h"
11 #include "base/logging.h"
12 #include "net/quic/reliable_quic_stream.h"
15 using std::numeric_limits
;
20 QuicStreamSequencer::FrameData::FrameData(QuicStreamOffset offset
,
22 : offset(offset
), segment(segment
) {}
24 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream
* quic_stream
)
25 : stream_(quic_stream
),
26 num_bytes_consumed_(0),
27 close_offset_(numeric_limits
<QuicStreamOffset
>::max()),
29 num_bytes_buffered_(0),
30 num_frames_received_(0),
31 num_duplicate_frames_received_(0),
32 num_early_frames_received_(0) {
35 QuicStreamSequencer::~QuicStreamSequencer() {
38 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame
& frame
) {
39 ++num_frames_received_
;
40 FrameList::iterator insertion_point
= FindInsertionPoint(frame
);
41 if (IsDuplicate(frame
, insertion_point
)) {
42 ++num_duplicate_frames_received_
;
43 // Silently ignore duplicates.
47 if (FrameOverlapsBufferedData(frame
, insertion_point
)) {
48 stream_
->CloseConnectionWithDetails(
49 QUIC_INVALID_STREAM_FRAME
, "Stream frame overlaps with buffered data.");
53 const QuicStreamOffset byte_offset
= frame
.offset
;
54 const size_t data_len
= frame
.data
.length();
55 if (data_len
== 0 && !frame
.fin
) {
56 // Stream frames must have data or a fin flag.
57 stream_
->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME
,
58 "Empty stream frame without FIN set.");
63 CloseStreamAtOffset(frame
.offset
+ data_len
);
69 if (byte_offset
> num_bytes_consumed_
) {
70 ++num_early_frames_received_
;
73 DVLOG(1) << "Buffering stream data at offset " << byte_offset
;
74 // Inserting an empty string and then copying to avoid the extra copy.
76 buffered_frames_
.insert(insertion_point
, FrameData(byte_offset
, ""));
77 frame
.data
.CopyToString(&insertion_point
->segment
);
78 num_bytes_buffered_
+= data_len
;
84 if (byte_offset
== num_bytes_consumed_
) {
85 stream_
->OnDataAvailable();
89 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset
) {
90 const QuicStreamOffset kMaxOffset
= numeric_limits
<QuicStreamOffset
>::max();
92 // If there is a scheduled close, the new offset should match it.
93 if (close_offset_
!= kMaxOffset
&& offset
!= close_offset_
) {
94 stream_
->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS
);
98 close_offset_
= offset
;
103 bool QuicStreamSequencer::MaybeCloseStream() {
104 if (!blocked_
&& IsClosed()) {
105 DVLOG(1) << "Passing up termination, as we've processed "
106 << num_bytes_consumed_
<< " of " << close_offset_
108 // Technically it's an error if num_bytes_consumed isn't exactly
109 // equal, but error handling seems silly at this point.
110 stream_
->OnFinRead();
111 buffered_frames_
.clear();
112 num_bytes_buffered_
= 0;
118 int QuicStreamSequencer::GetReadableRegions(iovec
* iov
, size_t iov_len
) const {
120 FrameList::const_iterator it
= buffered_frames_
.begin();
122 QuicStreamOffset offset
= num_bytes_consumed_
;
123 while (it
!= buffered_frames_
.end() && index
< iov_len
) {
124 if (it
->offset
!= offset
) {
128 iov
[index
].iov_base
=
129 static_cast<void*>(const_cast<char*>(it
->segment
.data()));
130 iov
[index
].iov_len
= it
->segment
.size();
131 offset
+= it
->segment
.size();
139 int QuicStreamSequencer::Readv(const struct iovec
* iov
, size_t iov_len
) {
141 FrameList::iterator it
= buffered_frames_
.begin();
142 size_t iov_index
= 0;
143 size_t iov_offset
= 0;
144 size_t frame_offset
= 0;
145 QuicStreamOffset initial_bytes_consumed
= num_bytes_consumed_
;
147 while (iov_index
< iov_len
&& it
!= buffered_frames_
.end() &&
148 it
->offset
== num_bytes_consumed_
) {
149 int bytes_to_read
= min(iov
[iov_index
].iov_len
- iov_offset
,
150 it
->segment
.size() - frame_offset
);
152 char* iov_ptr
= static_cast<char*>(iov
[iov_index
].iov_base
) + iov_offset
;
153 memcpy(iov_ptr
, it
->segment
.data() + frame_offset
, bytes_to_read
);
154 frame_offset
+= bytes_to_read
;
155 iov_offset
+= bytes_to_read
;
157 if (iov
[iov_index
].iov_len
== iov_offset
) {
158 // We've filled this buffer.
162 if (it
->segment
.size() == frame_offset
) {
163 // We've copied this whole frame
164 RecordBytesConsumed(it
->segment
.size());
165 buffered_frames_
.erase(it
);
166 it
= buffered_frames_
.begin();
170 // Done copying. If there is a partial frame, update it.
171 if (frame_offset
!= 0) {
172 buffered_frames_
.push_front(
173 FrameData(it
->offset
+ frame_offset
, it
->segment
.substr(frame_offset
)));
174 buffered_frames_
.erase(it
);
175 RecordBytesConsumed(frame_offset
);
177 return static_cast<int>(num_bytes_consumed_
- initial_bytes_consumed
);
180 bool QuicStreamSequencer::HasBytesToRead() const {
181 return !buffered_frames_
.empty() &&
182 buffered_frames_
.begin()->offset
== num_bytes_consumed_
;
185 bool QuicStreamSequencer::IsClosed() const {
186 return num_bytes_consumed_
>= close_offset_
;
189 QuicStreamSequencer::FrameList::iterator
190 QuicStreamSequencer::FindInsertionPoint(const QuicStreamFrame
& frame
) {
191 if (buffered_frames_
.empty()) {
192 return buffered_frames_
.begin();
194 // If it's after all buffered_frames, return the end.
195 if (frame
.offset
>= (buffered_frames_
.rbegin()->offset
+
196 buffered_frames_
.rbegin()->segment
.length())) {
197 return buffered_frames_
.end();
199 FrameList::iterator iter
= buffered_frames_
.begin();
200 // Only advance the iterator if the data begins after the already received
201 // frame. If the new frame overlaps with an existing frame, the iterator will
202 // still point to the frame it overlaps with.
203 while (iter
!= buffered_frames_
.end() &&
204 frame
.offset
>= iter
->offset
+ iter
->segment
.length()) {
210 bool QuicStreamSequencer::FrameOverlapsBufferedData(
211 const QuicStreamFrame
& frame
,
212 FrameList::const_iterator insertion_point
) const {
213 if (buffered_frames_
.empty() || insertion_point
== buffered_frames_
.end()) {
216 // If there is a buffered frame with a higher starting offset, then check to
217 // see if the new frame overlaps the beginning of the higher frame.
218 if (frame
.offset
< insertion_point
->offset
&&
219 frame
.offset
+ frame
.data
.length() > insertion_point
->offset
) {
220 DVLOG(1) << "New frame overlaps next frame: " << frame
.offset
<< " + "
221 << frame
.data
.size() << " > " << insertion_point
->offset
;
224 // If there is a buffered frame with a lower starting offset, then check to
225 // see if the buffered frame runs into the new frame.
226 if (frame
.offset
>= insertion_point
->offset
&&
228 insertion_point
->offset
+ insertion_point
->segment
.length()) {
229 DVLOG(1) << "Preceeding frame overlaps new frame: "
230 << insertion_point
->offset
<< " + "
231 << insertion_point
->segment
.length() << " > " << frame
.offset
;
238 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed
) {
240 size_t end_offset
= num_bytes_consumed_
+ num_bytes_consumed
;
241 while (!buffered_frames_
.empty() && end_offset
!= num_bytes_consumed_
) {
242 FrameList::iterator it
= buffered_frames_
.begin();
243 if (it
->offset
!= num_bytes_consumed_
) {
244 LOG(DFATAL
) << "Invalid argument to MarkConsumed. "
245 << " num_bytes_consumed_: " << num_bytes_consumed_
246 << " end_offset: " << end_offset
<< " offset: " << it
->offset
247 << " length: " << it
->segment
.length();
248 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
);
252 if (it
->offset
+ it
->segment
.length() <= end_offset
) {
253 RecordBytesConsumed(it
->segment
.length());
254 // This chunk is entirely consumed.
255 buffered_frames_
.erase(it
);
259 // Partially consume this frame.
260 size_t delta
= end_offset
- it
->offset
;
261 RecordBytesConsumed(delta
);
262 string new_data
= it
->segment
.substr(delta
);
263 buffered_frames_
.erase(it
);
264 buffered_frames_
.push_front(FrameData(num_bytes_consumed_
, new_data
));
269 bool QuicStreamSequencer::IsDuplicate(
270 const QuicStreamFrame
& frame
,
271 FrameList::const_iterator insertion_point
) const {
272 // A frame is duplicate if the frame offset is smaller than the bytes consumed
273 // or identical to an already received frame.
274 return frame
.offset
< num_bytes_consumed_
||
275 (insertion_point
!= buffered_frames_
.end() &&
276 frame
.offset
== insertion_point
->offset
);
279 void QuicStreamSequencer::SetBlockedUntilFlush() {
283 void QuicStreamSequencer::SetUnblocked() {
285 if (IsClosed() || HasBytesToRead()) {
286 stream_
->OnDataAvailable();
290 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed
) {
291 num_bytes_consumed_
+= bytes_consumed
;
292 num_bytes_buffered_
-= bytes_consumed
;
294 stream_
->AddBytesConsumed(bytes_consumed
);