1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_stream_sequencer.h"
10 #include "base/logging.h"
11 #include "base/metrics/sparse_histogram.h"
12 #include "net/quic/reliable_quic_stream.h"
16 using std::numeric_limits
;
20 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream
* quic_stream
)
21 : stream_(quic_stream
),
22 num_bytes_consumed_(0),
23 close_offset_(numeric_limits
<QuicStreamOffset
>::max()),
25 num_bytes_buffered_(0),
26 num_frames_received_(0),
27 num_duplicate_frames_received_(0) {
30 QuicStreamSequencer::~QuicStreamSequencer() {
33 bool QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame
& frame
) {
34 ++num_frames_received_
;
35 if (IsDuplicate(frame
)) {
36 ++num_duplicate_frames_received_
;
37 // Silently ignore duplicates.
41 if (FrameOverlapsBufferedData(frame
)) {
42 stream_
->CloseConnectionWithDetails(
43 QUIC_INVALID_STREAM_FRAME
, "Stream frame overlaps with buffered data.");
47 QuicStreamOffset byte_offset
= frame
.offset
;
48 size_t data_len
= frame
.data
.TotalBufferSize();
49 if (data_len
== 0 && !frame
.fin
) {
50 // Stream frames must have data or a fin flag.
51 stream_
->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME
,
52 "Empty stream frame without FIN set.");
57 CloseStreamAtOffset(frame
.offset
+ data_len
);
64 data
.AppendIovec(frame
.data
.iovec(), frame
.data
.Size());
66 // If the frame has arrived in-order then we can process it immediately, only
67 // buffering if the stream is unable to process it.
68 if (!blocked_
&& byte_offset
== num_bytes_consumed_
) {
69 DVLOG(1) << "Processing byte offset " << byte_offset
;
70 size_t bytes_consumed
= 0;
71 for (size_t i
= 0; i
< data
.Size(); ++i
) {
72 bytes_consumed
+= stream_
->ProcessRawData(
73 static_cast<char*>(data
.iovec()[i
].iov_base
),
74 data
.iovec()[i
].iov_len
);
76 num_bytes_consumed_
+= bytes_consumed
;
77 stream_
->AddBytesConsumed(bytes_consumed
);
79 if (MaybeCloseStream()) {
82 if (bytes_consumed
> data_len
) {
83 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
);
85 } else if (bytes_consumed
== data_len
) {
86 FlushBufferedFrames();
87 return true; // it's safe to ack this frame.
89 // Set ourselves up to buffer what's left.
90 data_len
-= bytes_consumed
;
91 data
.Consume(bytes_consumed
);
92 byte_offset
+= bytes_consumed
;
96 // Buffer any remaining data to be consumed by the stream when ready.
97 for (size_t i
= 0; i
< data
.Size(); ++i
) {
98 DVLOG(1) << "Buffering stream data at offset " << byte_offset
;
99 const iovec
& iov
= data
.iovec()[i
];
100 buffered_frames_
.insert(make_pair(
101 byte_offset
, string(static_cast<char*>(iov
.iov_base
), iov
.iov_len
)));
102 byte_offset
+= iov
.iov_len
;
103 num_bytes_buffered_
+= iov
.iov_len
;
108 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset
) {
109 const QuicStreamOffset kMaxOffset
= numeric_limits
<QuicStreamOffset
>::max();
111 // If we have a scheduled termination or close, any new offset should match
113 if (close_offset_
!= kMaxOffset
&& offset
!= close_offset_
) {
114 stream_
->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS
);
118 close_offset_
= offset
;
123 bool QuicStreamSequencer::MaybeCloseStream() {
124 if (!blocked_
&& IsClosed()) {
125 DVLOG(1) << "Passing up termination, as we've processed "
126 << num_bytes_consumed_
<< " of " << close_offset_
128 // Technically it's an error if num_bytes_consumed isn't exactly
129 // equal, but error handling seems silly at this point.
130 stream_
->OnFinRead();
131 buffered_frames_
.clear();
132 num_bytes_buffered_
= 0;
138 int QuicStreamSequencer::GetReadableRegions(iovec
* iov
, size_t iov_len
) {
140 FrameMap::iterator it
= buffered_frames_
.begin();
142 QuicStreamOffset offset
= num_bytes_consumed_
;
143 while (it
!= buffered_frames_
.end() && index
< iov_len
) {
144 if (it
->first
!= offset
) return index
;
146 iov
[index
].iov_base
= static_cast<void*>(
147 const_cast<char*>(it
->second
.data()));
148 iov
[index
].iov_len
= it
->second
.size();
149 offset
+= it
->second
.size();
157 int QuicStreamSequencer::Readv(const struct iovec
* iov
, size_t iov_len
) {
159 FrameMap::iterator it
= buffered_frames_
.begin();
160 size_t iov_index
= 0;
161 size_t iov_offset
= 0;
162 size_t frame_offset
= 0;
163 size_t initial_bytes_consumed
= num_bytes_consumed_
;
165 while (iov_index
< iov_len
&&
166 it
!= buffered_frames_
.end() &&
167 it
->first
== num_bytes_consumed_
) {
168 int bytes_to_read
= min(iov
[iov_index
].iov_len
- iov_offset
,
169 it
->second
.size() - frame_offset
);
171 char* iov_ptr
= static_cast<char*>(iov
[iov_index
].iov_base
) + iov_offset
;
173 it
->second
.data() + frame_offset
, bytes_to_read
);
174 frame_offset
+= bytes_to_read
;
175 iov_offset
+= bytes_to_read
;
177 if (iov
[iov_index
].iov_len
== iov_offset
) {
178 // We've filled this buffer.
182 if (it
->second
.size() == frame_offset
) {
183 // We've copied this whole frame
184 RecordBytesConsumed(it
->second
.size());
185 buffered_frames_
.erase(it
);
186 it
= buffered_frames_
.begin();
190 // We've finished copying. If we have a partial frame, update it.
191 if (frame_offset
!= 0) {
192 buffered_frames_
.insert(
193 make_pair(it
->first
+ frame_offset
, it
->second
.substr(frame_offset
)));
194 buffered_frames_
.erase(buffered_frames_
.begin());
195 RecordBytesConsumed(frame_offset
);
197 return num_bytes_consumed_
- initial_bytes_consumed
;
200 bool QuicStreamSequencer::HasBytesToRead() const {
201 FrameMap::const_iterator it
= buffered_frames_
.begin();
203 return it
!= buffered_frames_
.end() && it
->first
== num_bytes_consumed_
;
206 bool QuicStreamSequencer::IsClosed() const {
207 return num_bytes_consumed_
>= close_offset_
;
210 bool QuicStreamSequencer::FrameOverlapsBufferedData(
211 const QuicStreamFrame
& frame
) const {
212 if (buffered_frames_
.empty()) {
216 FrameMap::const_iterator next_frame
=
217 buffered_frames_
.lower_bound(frame
.offset
);
218 // Duplicate frames should have been dropped in IsDuplicate.
219 DCHECK(next_frame
== buffered_frames_
.end() ||
220 next_frame
->first
!= frame
.offset
);
222 // If there is a buffered frame with a higher starting offset, then we check
223 // to see if the new frame runs into the higher frame.
224 if (next_frame
!= buffered_frames_
.end() &&
225 (frame
.offset
+ frame
.data
.TotalBufferSize()) > next_frame
->first
) {
226 DVLOG(1) << "New frame overlaps next frame: " << frame
.offset
<< " + "
227 << frame
.data
.TotalBufferSize() << " > " << next_frame
->first
;
231 // If there is a buffered frame with a lower starting offset, then we check
232 // to see if the buffered frame runs into the new frame.
233 if (next_frame
!= buffered_frames_
.begin()) {
234 FrameMap::const_iterator preceeding_frame
= --next_frame
;
235 QuicStreamOffset offset
= preceeding_frame
->first
;
236 uint64 data_length
= preceeding_frame
->second
.length();
237 if ((offset
+ data_length
) > frame
.offset
) {
238 DVLOG(1) << "Preceeding frame overlaps new frame: " << offset
<< " + "
239 << data_length
<< " > " << frame
.offset
;
246 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame
& frame
) const {
247 // A frame is duplicate if the frame offset is smaller than our bytes consumed
248 // or we have stored the frame in our map.
249 // TODO(pwestin): Is it possible that a new frame contain more data even if
250 // the offset is the same?
251 return frame
.offset
< num_bytes_consumed_
||
252 buffered_frames_
.find(frame
.offset
) != buffered_frames_
.end();
255 void QuicStreamSequencer::SetBlockedUntilFlush() {
259 void QuicStreamSequencer::FlushBufferedFrames() {
261 FrameMap::iterator it
= buffered_frames_
.find(num_bytes_consumed_
);
262 while (it
!= buffered_frames_
.end()) {
263 DVLOG(1) << "Flushing buffered packet at offset " << it
->first
;
264 string
* data
= &it
->second
;
265 size_t bytes_consumed
= stream_
->ProcessRawData(data
->c_str(),
267 RecordBytesConsumed(bytes_consumed
);
268 if (MaybeCloseStream()) {
271 if (bytes_consumed
> data
->size()) {
272 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
); // Programming error
274 } else if (bytes_consumed
== data
->size()) {
275 buffered_frames_
.erase(it
);
276 it
= buffered_frames_
.find(num_bytes_consumed_
);
278 string new_data
= it
->second
.substr(bytes_consumed
);
279 buffered_frames_
.erase(it
);
280 buffered_frames_
.insert(make_pair(num_bytes_consumed_
, new_data
));
287 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed
) {
288 num_bytes_consumed_
+= bytes_consumed
;
289 num_bytes_buffered_
-= bytes_consumed
;
291 stream_
->AddBytesConsumed(bytes_consumed
);