1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_stream_sequencer.h"
11 #include "base/logging.h"
12 #include "net/quic/reliable_quic_stream.h"
15 using std::numeric_limits
;
20 QuicStreamSequencer::FrameData::FrameData(QuicStreamOffset offset
,
22 : offset(offset
), segment(segment
) {}
24 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream
* quic_stream
)
25 : stream_(quic_stream
),
26 num_bytes_consumed_(0),
27 close_offset_(numeric_limits
<QuicStreamOffset
>::max()),
29 num_bytes_buffered_(0),
30 num_frames_received_(0),
31 num_duplicate_frames_received_(0),
32 num_early_frames_received_(0) {
35 QuicStreamSequencer::~QuicStreamSequencer() {
38 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame
& frame
) {
39 ++num_frames_received_
;
40 FrameList::iterator insertion_point
= FindInsertionPoint(frame
);
41 if (IsDuplicate(frame
, insertion_point
)) {
42 ++num_duplicate_frames_received_
;
43 // Silently ignore duplicates.
47 if (FrameOverlapsBufferedData(frame
, insertion_point
)) {
48 stream_
->CloseConnectionWithDetails(
49 QUIC_INVALID_STREAM_FRAME
, "Stream frame overlaps with buffered data.");
53 const QuicStreamOffset byte_offset
= frame
.offset
;
54 const size_t data_len
= frame
.data
.length();
55 if (data_len
== 0 && !frame
.fin
) {
56 // Stream frames must have data or a fin flag.
57 stream_
->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME
,
58 "Empty stream frame without FIN set.");
63 CloseStreamAtOffset(frame
.offset
+ data_len
);
69 if (byte_offset
> num_bytes_consumed_
) {
70 ++num_early_frames_received_
;
73 DVLOG(1) << "Buffering stream data at offset " << byte_offset
;
74 // Inserting an empty string and then copying to avoid the extra copy.
76 buffered_frames_
.insert(insertion_point
, FrameData(byte_offset
, ""));
77 frame
.data
.CopyToString(&insertion_point
->segment
);
78 num_bytes_buffered_
+= data_len
;
84 if (byte_offset
== num_bytes_consumed_
) {
85 stream_
->OnDataAvailable();
89 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset
) {
90 const QuicStreamOffset kMaxOffset
= numeric_limits
<QuicStreamOffset
>::max();
92 // If there is a scheduled close, the new offset should match it.
93 if (close_offset_
!= kMaxOffset
&& offset
!= close_offset_
) {
94 stream_
->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS
);
98 close_offset_
= offset
;
103 bool QuicStreamSequencer::MaybeCloseStream() {
104 if (!blocked_
&& IsClosed()) {
105 DVLOG(1) << "Passing up termination, as we've processed "
106 << num_bytes_consumed_
<< " of " << close_offset_
108 // This will cause the stream to consume the fin.
109 // Technically it's an error if num_bytes_consumed isn't exactly
110 // equal, but error handling seems silly at this point.
111 stream_
->OnDataAvailable();
112 buffered_frames_
.clear();
113 num_bytes_buffered_
= 0;
119 int QuicStreamSequencer::GetReadableRegions(iovec
* iov
, size_t iov_len
) const {
121 FrameList::const_iterator it
= buffered_frames_
.begin();
123 QuicStreamOffset offset
= num_bytes_consumed_
;
124 while (it
!= buffered_frames_
.end() && index
< iov_len
) {
125 if (it
->offset
!= offset
) {
129 iov
[index
].iov_base
=
130 static_cast<void*>(const_cast<char*>(it
->segment
.data()));
131 iov
[index
].iov_len
= it
->segment
.size();
132 offset
+= it
->segment
.size();
140 int QuicStreamSequencer::Readv(const struct iovec
* iov
, size_t iov_len
) {
142 FrameList::iterator it
= buffered_frames_
.begin();
143 size_t iov_index
= 0;
144 size_t iov_offset
= 0;
145 size_t frame_offset
= 0;
146 QuicStreamOffset initial_bytes_consumed
= num_bytes_consumed_
;
148 while (iov_index
< iov_len
&& it
!= buffered_frames_
.end() &&
149 it
->offset
== num_bytes_consumed_
) {
150 int bytes_to_read
= min(iov
[iov_index
].iov_len
- iov_offset
,
151 it
->segment
.size() - frame_offset
);
153 char* iov_ptr
= static_cast<char*>(iov
[iov_index
].iov_base
) + iov_offset
;
154 memcpy(iov_ptr
, it
->segment
.data() + frame_offset
, bytes_to_read
);
155 frame_offset
+= bytes_to_read
;
156 iov_offset
+= bytes_to_read
;
158 if (iov
[iov_index
].iov_len
== iov_offset
) {
159 // We've filled this buffer.
163 if (it
->segment
.size() == frame_offset
) {
164 // We've copied this whole frame
165 RecordBytesConsumed(it
->segment
.size());
166 buffered_frames_
.erase(it
);
167 it
= buffered_frames_
.begin();
171 // Done copying. If there is a partial frame, update it.
172 if (frame_offset
!= 0) {
173 buffered_frames_
.push_front(
174 FrameData(it
->offset
+ frame_offset
, it
->segment
.substr(frame_offset
)));
175 buffered_frames_
.erase(it
);
176 RecordBytesConsumed(frame_offset
);
178 return static_cast<int>(num_bytes_consumed_
- initial_bytes_consumed
);
181 bool QuicStreamSequencer::HasBytesToRead() const {
182 return !buffered_frames_
.empty() &&
183 buffered_frames_
.begin()->offset
== num_bytes_consumed_
;
186 bool QuicStreamSequencer::IsClosed() const {
187 return num_bytes_consumed_
>= close_offset_
;
190 QuicStreamSequencer::FrameList::iterator
191 QuicStreamSequencer::FindInsertionPoint(const QuicStreamFrame
& frame
) {
192 if (buffered_frames_
.empty()) {
193 return buffered_frames_
.begin();
195 // If it's after all buffered_frames, return the end.
196 if (frame
.offset
>= (buffered_frames_
.rbegin()->offset
+
197 buffered_frames_
.rbegin()->segment
.length())) {
198 return buffered_frames_
.end();
200 FrameList::iterator iter
= buffered_frames_
.begin();
201 // Only advance the iterator if the data begins after the already received
202 // frame. If the new frame overlaps with an existing frame, the iterator will
203 // still point to the frame it overlaps with.
204 while (iter
!= buffered_frames_
.end() &&
205 frame
.offset
>= iter
->offset
+ iter
->segment
.length()) {
211 bool QuicStreamSequencer::FrameOverlapsBufferedData(
212 const QuicStreamFrame
& frame
,
213 FrameList::const_iterator insertion_point
) const {
214 if (buffered_frames_
.empty() || insertion_point
== buffered_frames_
.end()) {
217 // If there is a buffered frame with a higher starting offset, then check to
218 // see if the new frame overlaps the beginning of the higher frame.
219 if (frame
.offset
< insertion_point
->offset
&&
220 frame
.offset
+ frame
.data
.length() > insertion_point
->offset
) {
221 DVLOG(1) << "New frame overlaps next frame: " << frame
.offset
<< " + "
222 << frame
.data
.size() << " > " << insertion_point
->offset
;
225 // If there is a buffered frame with a lower starting offset, then check to
226 // see if the buffered frame runs into the new frame.
227 if (frame
.offset
>= insertion_point
->offset
&&
229 insertion_point
->offset
+ insertion_point
->segment
.length()) {
230 DVLOG(1) << "Preceeding frame overlaps new frame: "
231 << insertion_point
->offset
<< " + "
232 << insertion_point
->segment
.length() << " > " << frame
.offset
;
239 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed
) {
241 size_t end_offset
= num_bytes_consumed_
+ num_bytes_consumed
;
242 while (!buffered_frames_
.empty() && end_offset
!= num_bytes_consumed_
) {
243 FrameList::iterator it
= buffered_frames_
.begin();
244 if (it
->offset
!= num_bytes_consumed_
) {
245 LOG(DFATAL
) << "Invalid argument to MarkConsumed. "
246 << " num_bytes_consumed_: " << num_bytes_consumed_
247 << " end_offset: " << end_offset
<< " offset: " << it
->offset
248 << " length: " << it
->segment
.length();
249 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
);
253 if (it
->offset
+ it
->segment
.length() <= end_offset
) {
254 RecordBytesConsumed(it
->segment
.length());
255 // This chunk is entirely consumed.
256 buffered_frames_
.erase(it
);
260 // Partially consume this frame.
261 size_t delta
= end_offset
- it
->offset
;
262 RecordBytesConsumed(delta
);
263 string new_data
= it
->segment
.substr(delta
);
264 buffered_frames_
.erase(it
);
265 buffered_frames_
.push_front(FrameData(num_bytes_consumed_
, new_data
));
270 bool QuicStreamSequencer::IsDuplicate(
271 const QuicStreamFrame
& frame
,
272 FrameList::const_iterator insertion_point
) const {
273 // A frame is duplicate if the frame offset is smaller than the bytes consumed
274 // or identical to an already received frame.
275 return frame
.offset
< num_bytes_consumed_
||
276 (insertion_point
!= buffered_frames_
.end() &&
277 frame
.offset
== insertion_point
->offset
);
280 void QuicStreamSequencer::SetBlockedUntilFlush() {
284 void QuicStreamSequencer::SetUnblocked() {
286 if (IsClosed() || HasBytesToRead()) {
287 stream_
->OnDataAvailable();
291 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed
) {
292 num_bytes_consumed_
+= bytes_consumed
;
293 num_bytes_buffered_
-= bytes_consumed
;
295 stream_
->AddBytesConsumed(bytes_consumed
);