1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_stream_sequencer.h"
11 #include "base/logging.h"
12 #include "net/quic/reliable_quic_stream.h"
15 using std::numeric_limits
;
20 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream
* quic_stream
)
21 : stream_(quic_stream
),
22 num_bytes_consumed_(0),
23 close_offset_(numeric_limits
<QuicStreamOffset
>::max()),
25 num_bytes_buffered_(0),
26 num_frames_received_(0),
27 num_duplicate_frames_received_(0),
28 num_early_frames_received_(0) {
31 QuicStreamSequencer::~QuicStreamSequencer() {
34 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame
& frame
) {
35 ++num_frames_received_
;
36 if (IsDuplicate(frame
)) {
37 ++num_duplicate_frames_received_
;
38 // Silently ignore duplicates.
42 if (FrameOverlapsBufferedData(frame
)) {
43 stream_
->CloseConnectionWithDetails(
44 QUIC_INVALID_STREAM_FRAME
, "Stream frame overlaps with buffered data.");
48 QuicStreamOffset byte_offset
= frame
.offset
;
49 size_t data_len
= frame
.data
.TotalBufferSize();
50 if (data_len
== 0 && !frame
.fin
) {
51 // Stream frames must have data or a fin flag.
52 stream_
->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME
,
53 "Empty stream frame without FIN set.");
58 CloseStreamAtOffset(frame
.offset
+ data_len
);
65 data
.AppendIovec(frame
.data
.iovec(), frame
.data
.Size());
67 if (byte_offset
> num_bytes_consumed_
) {
68 ++num_early_frames_received_
;
71 // If the frame has arrived in-order then we can process it immediately, only
72 // buffering if the stream is unable to process it.
73 if (!blocked_
&& byte_offset
== num_bytes_consumed_
) {
74 DVLOG(1) << "Processing byte offset " << byte_offset
;
75 size_t bytes_consumed
= 0;
76 for (size_t i
= 0; i
< data
.Size(); ++i
) {
77 bytes_consumed
+= stream_
->ProcessRawData(
78 static_cast<char*>(data
.iovec()[i
].iov_base
),
79 data
.iovec()[i
].iov_len
);
81 num_bytes_consumed_
+= bytes_consumed
;
82 stream_
->AddBytesConsumed(bytes_consumed
);
84 if (MaybeCloseStream()) {
87 if (bytes_consumed
> data_len
) {
88 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
);
90 } else if (bytes_consumed
== data_len
) {
91 FlushBufferedFrames();
92 return; // it's safe to ack this frame.
94 // Set ourselves up to buffer what's left.
95 data_len
-= bytes_consumed
;
96 data
.Consume(bytes_consumed
);
97 byte_offset
+= bytes_consumed
;
101 // Buffer any remaining data to be consumed by the stream when ready.
102 for (size_t i
= 0; i
< data
.Size(); ++i
) {
103 DVLOG(1) << "Buffering stream data at offset " << byte_offset
;
104 const iovec
& iov
= data
.iovec()[i
];
105 buffered_frames_
.insert(std::make_pair(
106 byte_offset
, string(static_cast<char*>(iov
.iov_base
), iov
.iov_len
)));
107 byte_offset
+= iov
.iov_len
;
108 num_bytes_buffered_
+= iov
.iov_len
;
113 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset
) {
114 const QuicStreamOffset kMaxOffset
= numeric_limits
<QuicStreamOffset
>::max();
116 // If we have a scheduled termination or close, any new offset should match
118 if (close_offset_
!= kMaxOffset
&& offset
!= close_offset_
) {
119 stream_
->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS
);
123 close_offset_
= offset
;
128 bool QuicStreamSequencer::MaybeCloseStream() {
129 if (!blocked_
&& IsClosed()) {
130 DVLOG(1) << "Passing up termination, as we've processed "
131 << num_bytes_consumed_
<< " of " << close_offset_
133 // Technically it's an error if num_bytes_consumed isn't exactly
134 // equal, but error handling seems silly at this point.
135 stream_
->OnFinRead();
136 buffered_frames_
.clear();
137 num_bytes_buffered_
= 0;
143 int QuicStreamSequencer::GetReadableRegions(iovec
* iov
, size_t iov_len
) {
145 FrameMap::iterator it
= buffered_frames_
.begin();
147 QuicStreamOffset offset
= num_bytes_consumed_
;
148 while (it
!= buffered_frames_
.end() && index
< iov_len
) {
149 if (it
->first
!= offset
) return index
;
151 iov
[index
].iov_base
= static_cast<void*>(
152 const_cast<char*>(it
->second
.data()));
153 iov
[index
].iov_len
= it
->second
.size();
154 offset
+= it
->second
.size();
162 int QuicStreamSequencer::Readv(const struct iovec
* iov
, size_t iov_len
) {
164 FrameMap::iterator it
= buffered_frames_
.begin();
165 size_t iov_index
= 0;
166 size_t iov_offset
= 0;
167 size_t frame_offset
= 0;
168 QuicStreamOffset initial_bytes_consumed
= num_bytes_consumed_
;
170 while (iov_index
< iov_len
&&
171 it
!= buffered_frames_
.end() &&
172 it
->first
== num_bytes_consumed_
) {
173 int bytes_to_read
= min(iov
[iov_index
].iov_len
- iov_offset
,
174 it
->second
.size() - frame_offset
);
176 char* iov_ptr
= static_cast<char*>(iov
[iov_index
].iov_base
) + iov_offset
;
178 it
->second
.data() + frame_offset
, bytes_to_read
);
179 frame_offset
+= bytes_to_read
;
180 iov_offset
+= bytes_to_read
;
182 if (iov
[iov_index
].iov_len
== iov_offset
) {
183 // We've filled this buffer.
187 if (it
->second
.size() == frame_offset
) {
188 // We've copied this whole frame
189 RecordBytesConsumed(it
->second
.size());
190 buffered_frames_
.erase(it
);
191 it
= buffered_frames_
.begin();
195 // We've finished copying. If we have a partial frame, update it.
196 if (frame_offset
!= 0) {
197 buffered_frames_
.insert(std::make_pair(it
->first
+ frame_offset
,
198 it
->second
.substr(frame_offset
)));
199 buffered_frames_
.erase(buffered_frames_
.begin());
200 RecordBytesConsumed(frame_offset
);
202 return static_cast<int>(num_bytes_consumed_
- initial_bytes_consumed
);
205 bool QuicStreamSequencer::HasBytesToRead() const {
206 FrameMap::const_iterator it
= buffered_frames_
.begin();
208 return it
!= buffered_frames_
.end() && it
->first
== num_bytes_consumed_
;
211 bool QuicStreamSequencer::IsClosed() const {
212 return num_bytes_consumed_
>= close_offset_
;
215 bool QuicStreamSequencer::FrameOverlapsBufferedData(
216 const QuicStreamFrame
& frame
) const {
217 if (buffered_frames_
.empty()) {
221 FrameMap::const_iterator next_frame
=
222 buffered_frames_
.lower_bound(frame
.offset
);
223 // Duplicate frames should have been dropped in IsDuplicate.
224 DCHECK(next_frame
== buffered_frames_
.end() ||
225 next_frame
->first
!= frame
.offset
);
227 // If there is a buffered frame with a higher starting offset, then we check
228 // to see if the new frame runs into the higher frame.
229 if (next_frame
!= buffered_frames_
.end() &&
230 (frame
.offset
+ frame
.data
.TotalBufferSize()) > next_frame
->first
) {
231 DVLOG(1) << "New frame overlaps next frame: " << frame
.offset
<< " + "
232 << frame
.data
.TotalBufferSize() << " > " << next_frame
->first
;
236 // If there is a buffered frame with a lower starting offset, then we check
237 // to see if the buffered frame runs into the new frame.
238 if (next_frame
!= buffered_frames_
.begin()) {
239 FrameMap::const_iterator preceeding_frame
= --next_frame
;
240 QuicStreamOffset offset
= preceeding_frame
->first
;
241 uint64 data_length
= preceeding_frame
->second
.length();
242 if ((offset
+ data_length
) > frame
.offset
) {
243 DVLOG(1) << "Preceeding frame overlaps new frame: " << offset
<< " + "
244 << data_length
<< " > " << frame
.offset
;
251 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame
& frame
) const {
252 // A frame is duplicate if the frame offset is smaller than our bytes consumed
253 // or we have stored the frame in our map.
254 // TODO(pwestin): Is it possible that a new frame contain more data even if
255 // the offset is the same?
256 return frame
.offset
< num_bytes_consumed_
||
257 buffered_frames_
.find(frame
.offset
) != buffered_frames_
.end();
260 void QuicStreamSequencer::SetBlockedUntilFlush() {
264 void QuicStreamSequencer::FlushBufferedFrames() {
266 FrameMap::iterator it
= buffered_frames_
.find(num_bytes_consumed_
);
267 while (it
!= buffered_frames_
.end()) {
268 DVLOG(1) << "Flushing buffered packet at offset " << it
->first
;
269 string
* data
= &it
->second
;
270 size_t bytes_consumed
= stream_
->ProcessRawData(data
->c_str(),
272 RecordBytesConsumed(bytes_consumed
);
273 if (MaybeCloseStream()) {
276 if (bytes_consumed
> data
->size()) {
277 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
); // Programming error
279 } else if (bytes_consumed
== data
->size()) {
280 buffered_frames_
.erase(it
);
281 it
= buffered_frames_
.find(num_bytes_consumed_
);
283 string new_data
= it
->second
.substr(bytes_consumed
);
284 buffered_frames_
.erase(it
);
285 buffered_frames_
.insert(std::make_pair(num_bytes_consumed_
, new_data
));
292 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed
) {
293 num_bytes_consumed_
+= bytes_consumed
;
294 num_bytes_buffered_
-= bytes_consumed
;
296 stream_
->AddBytesConsumed(bytes_consumed
);