1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_stream_sequencer.h"
10 #include "base/logging.h"
11 #include "base/metrics/sparse_histogram.h"
12 #include "net/quic/reliable_quic_stream.h"
16 using std::numeric_limits
;
21 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream
* quic_stream
)
22 : stream_(quic_stream
),
23 num_bytes_consumed_(0),
24 close_offset_(numeric_limits
<QuicStreamOffset
>::max()),
26 num_bytes_buffered_(0),
27 num_frames_received_(0),
28 num_duplicate_frames_received_(0),
29 num_early_frames_received_(0) {
32 QuicStreamSequencer::~QuicStreamSequencer() {
35 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame
& frame
) {
36 ++num_frames_received_
;
37 if (IsDuplicate(frame
)) {
38 ++num_duplicate_frames_received_
;
39 // Silently ignore duplicates.
43 if (FrameOverlapsBufferedData(frame
)) {
44 stream_
->CloseConnectionWithDetails(
45 QUIC_INVALID_STREAM_FRAME
, "Stream frame overlaps with buffered data.");
49 QuicStreamOffset byte_offset
= frame
.offset
;
50 size_t data_len
= frame
.data
.TotalBufferSize();
51 if (data_len
== 0 && !frame
.fin
) {
52 // Stream frames must have data or a fin flag.
53 stream_
->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME
,
54 "Empty stream frame without FIN set.");
59 CloseStreamAtOffset(frame
.offset
+ data_len
);
66 data
.AppendIovec(frame
.data
.iovec(), frame
.data
.Size());
68 if (byte_offset
> num_bytes_consumed_
) {
69 ++num_early_frames_received_
;
72 // If the frame has arrived in-order then we can process it immediately, only
73 // buffering if the stream is unable to process it.
74 if (!blocked_
&& byte_offset
== num_bytes_consumed_
) {
75 DVLOG(1) << "Processing byte offset " << byte_offset
;
76 size_t bytes_consumed
= 0;
77 for (size_t i
= 0; i
< data
.Size(); ++i
) {
78 bytes_consumed
+= stream_
->ProcessRawData(
79 static_cast<char*>(data
.iovec()[i
].iov_base
),
80 data
.iovec()[i
].iov_len
);
82 num_bytes_consumed_
+= bytes_consumed
;
83 stream_
->AddBytesConsumed(bytes_consumed
);
85 if (MaybeCloseStream()) {
88 if (bytes_consumed
> data_len
) {
89 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
);
91 } else if (bytes_consumed
== data_len
) {
92 FlushBufferedFrames();
93 return; // it's safe to ack this frame.
95 // Set ourselves up to buffer what's left.
96 data_len
-= bytes_consumed
;
97 data
.Consume(bytes_consumed
);
98 byte_offset
+= bytes_consumed
;
102 // Buffer any remaining data to be consumed by the stream when ready.
103 for (size_t i
= 0; i
< data
.Size(); ++i
) {
104 DVLOG(1) << "Buffering stream data at offset " << byte_offset
;
105 const iovec
& iov
= data
.iovec()[i
];
106 buffered_frames_
.insert(make_pair(
107 byte_offset
, string(static_cast<char*>(iov
.iov_base
), iov
.iov_len
)));
108 byte_offset
+= iov
.iov_len
;
109 num_bytes_buffered_
+= iov
.iov_len
;
114 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset
) {
115 const QuicStreamOffset kMaxOffset
= numeric_limits
<QuicStreamOffset
>::max();
117 // If we have a scheduled termination or close, any new offset should match
119 if (close_offset_
!= kMaxOffset
&& offset
!= close_offset_
) {
120 stream_
->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS
);
124 close_offset_
= offset
;
129 bool QuicStreamSequencer::MaybeCloseStream() {
130 if (!blocked_
&& IsClosed()) {
131 DVLOG(1) << "Passing up termination, as we've processed "
132 << num_bytes_consumed_
<< " of " << close_offset_
134 // Technically it's an error if num_bytes_consumed isn't exactly
135 // equal, but error handling seems silly at this point.
136 stream_
->OnFinRead();
137 buffered_frames_
.clear();
138 num_bytes_buffered_
= 0;
144 int QuicStreamSequencer::GetReadableRegions(iovec
* iov
, size_t iov_len
) {
146 FrameMap::iterator it
= buffered_frames_
.begin();
148 QuicStreamOffset offset
= num_bytes_consumed_
;
149 while (it
!= buffered_frames_
.end() && index
< iov_len
) {
150 if (it
->first
!= offset
) return index
;
152 iov
[index
].iov_base
= static_cast<void*>(
153 const_cast<char*>(it
->second
.data()));
154 iov
[index
].iov_len
= it
->second
.size();
155 offset
+= it
->second
.size();
163 int QuicStreamSequencer::Readv(const struct iovec
* iov
, size_t iov_len
) {
165 FrameMap::iterator it
= buffered_frames_
.begin();
166 size_t iov_index
= 0;
167 size_t iov_offset
= 0;
168 size_t frame_offset
= 0;
169 QuicStreamOffset initial_bytes_consumed
= num_bytes_consumed_
;
171 while (iov_index
< iov_len
&&
172 it
!= buffered_frames_
.end() &&
173 it
->first
== num_bytes_consumed_
) {
174 int bytes_to_read
= min(iov
[iov_index
].iov_len
- iov_offset
,
175 it
->second
.size() - frame_offset
);
177 char* iov_ptr
= static_cast<char*>(iov
[iov_index
].iov_base
) + iov_offset
;
179 it
->second
.data() + frame_offset
, bytes_to_read
);
180 frame_offset
+= bytes_to_read
;
181 iov_offset
+= bytes_to_read
;
183 if (iov
[iov_index
].iov_len
== iov_offset
) {
184 // We've filled this buffer.
188 if (it
->second
.size() == frame_offset
) {
189 // We've copied this whole frame
190 RecordBytesConsumed(it
->second
.size());
191 buffered_frames_
.erase(it
);
192 it
= buffered_frames_
.begin();
196 // We've finished copying. If we have a partial frame, update it.
197 if (frame_offset
!= 0) {
198 buffered_frames_
.insert(
199 make_pair(it
->first
+ frame_offset
, it
->second
.substr(frame_offset
)));
200 buffered_frames_
.erase(buffered_frames_
.begin());
201 RecordBytesConsumed(frame_offset
);
203 return static_cast<int>(num_bytes_consumed_
- initial_bytes_consumed
);
206 bool QuicStreamSequencer::HasBytesToRead() const {
207 FrameMap::const_iterator it
= buffered_frames_
.begin();
209 return it
!= buffered_frames_
.end() && it
->first
== num_bytes_consumed_
;
212 bool QuicStreamSequencer::IsClosed() const {
213 return num_bytes_consumed_
>= close_offset_
;
216 bool QuicStreamSequencer::FrameOverlapsBufferedData(
217 const QuicStreamFrame
& frame
) const {
218 if (buffered_frames_
.empty()) {
222 FrameMap::const_iterator next_frame
=
223 buffered_frames_
.lower_bound(frame
.offset
);
224 // Duplicate frames should have been dropped in IsDuplicate.
225 DCHECK(next_frame
== buffered_frames_
.end() ||
226 next_frame
->first
!= frame
.offset
);
228 // If there is a buffered frame with a higher starting offset, then we check
229 // to see if the new frame runs into the higher frame.
230 if (next_frame
!= buffered_frames_
.end() &&
231 (frame
.offset
+ frame
.data
.TotalBufferSize()) > next_frame
->first
) {
232 DVLOG(1) << "New frame overlaps next frame: " << frame
.offset
<< " + "
233 << frame
.data
.TotalBufferSize() << " > " << next_frame
->first
;
237 // If there is a buffered frame with a lower starting offset, then we check
238 // to see if the buffered frame runs into the new frame.
239 if (next_frame
!= buffered_frames_
.begin()) {
240 FrameMap::const_iterator preceeding_frame
= --next_frame
;
241 QuicStreamOffset offset
= preceeding_frame
->first
;
242 uint64 data_length
= preceeding_frame
->second
.length();
243 if ((offset
+ data_length
) > frame
.offset
) {
244 DVLOG(1) << "Preceeding frame overlaps new frame: " << offset
<< " + "
245 << data_length
<< " > " << frame
.offset
;
252 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame
& frame
) const {
253 // A frame is duplicate if the frame offset is smaller than our bytes consumed
254 // or we have stored the frame in our map.
255 // TODO(pwestin): Is it possible that a new frame contain more data even if
256 // the offset is the same?
257 return frame
.offset
< num_bytes_consumed_
||
258 buffered_frames_
.find(frame
.offset
) != buffered_frames_
.end();
261 void QuicStreamSequencer::SetBlockedUntilFlush() {
265 void QuicStreamSequencer::FlushBufferedFrames() {
267 FrameMap::iterator it
= buffered_frames_
.find(num_bytes_consumed_
);
268 while (it
!= buffered_frames_
.end()) {
269 DVLOG(1) << "Flushing buffered packet at offset " << it
->first
;
270 string
* data
= &it
->second
;
271 size_t bytes_consumed
= stream_
->ProcessRawData(data
->c_str(),
273 RecordBytesConsumed(bytes_consumed
);
274 if (MaybeCloseStream()) {
277 if (bytes_consumed
> data
->size()) {
278 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
); // Programming error
280 } else if (bytes_consumed
== data
->size()) {
281 buffered_frames_
.erase(it
);
282 it
= buffered_frames_
.find(num_bytes_consumed_
);
284 string new_data
= it
->second
.substr(bytes_consumed
);
285 buffered_frames_
.erase(it
);
286 buffered_frames_
.insert(make_pair(num_bytes_consumed_
, new_data
));
293 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed
) {
294 num_bytes_consumed_
+= bytes_consumed
;
295 num_bytes_buffered_
-= bytes_consumed
;
297 stream_
->AddBytesConsumed(bytes_consumed
);