1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_stream_sequencer.h"
11 #include "base/logging.h"
12 #include "net/quic/reliable_quic_stream.h"
15 using std::numeric_limits
;
20 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream
* quic_stream
)
21 : stream_(quic_stream
),
22 num_bytes_consumed_(0),
23 close_offset_(numeric_limits
<QuicStreamOffset
>::max()),
25 num_bytes_buffered_(0),
26 num_frames_received_(0),
27 num_duplicate_frames_received_(0),
28 num_early_frames_received_(0) {
31 QuicStreamSequencer::~QuicStreamSequencer() {
34 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame
& frame
) {
35 ++num_frames_received_
;
36 if (IsDuplicate(frame
)) {
37 ++num_duplicate_frames_received_
;
38 // Silently ignore duplicates.
42 if (FrameOverlapsBufferedData(frame
)) {
43 stream_
->CloseConnectionWithDetails(
44 QUIC_INVALID_STREAM_FRAME
, "Stream frame overlaps with buffered data.");
48 const QuicStreamOffset byte_offset
= frame
.offset
;
49 const size_t data_len
= frame
.data
.length();
50 if (data_len
== 0 && !frame
.fin
) {
51 // Stream frames must have data or a fin flag.
52 stream_
->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME
,
53 "Empty stream frame without FIN set.");
58 CloseStreamAtOffset(frame
.offset
+ data_len
);
64 if (byte_offset
> num_bytes_consumed_
) {
65 ++num_early_frames_received_
;
68 // If the frame has arrived in-order then we can process it immediately, only
69 // buffering if the stream is unable to process it.
70 size_t bytes_consumed
= 0;
71 if (!blocked_
&& byte_offset
== num_bytes_consumed_
) {
72 DVLOG(1) << "Processing byte offset " << byte_offset
;
74 stream_
->ProcessRawData(frame
.data
.data(), frame
.data
.length());
75 num_bytes_consumed_
+= bytes_consumed
;
76 stream_
->AddBytesConsumed(bytes_consumed
);
78 if (MaybeCloseStream()) {
81 if (bytes_consumed
> data_len
) {
82 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
);
84 } else if (bytes_consumed
== data_len
) {
85 FlushBufferedFrames();
86 return; // it's safe to ack this frame.
90 // Buffer any remaining data to be consumed by the stream when ready.
91 if (bytes_consumed
< data_len
) {
92 DVLOG(1) << "Buffering stream data at offset " << byte_offset
;
93 const size_t remaining_length
= data_len
- bytes_consumed
;
94 buffered_frames_
.insert(std::make_pair(
95 byte_offset
+ bytes_consumed
,
96 string(frame
.data
.data() + bytes_consumed
, remaining_length
)));
97 num_bytes_buffered_
+= remaining_length
;
101 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset
) {
102 const QuicStreamOffset kMaxOffset
= numeric_limits
<QuicStreamOffset
>::max();
104 // If we have a scheduled termination or close, any new offset should match
106 if (close_offset_
!= kMaxOffset
&& offset
!= close_offset_
) {
107 stream_
->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS
);
111 close_offset_
= offset
;
116 bool QuicStreamSequencer::MaybeCloseStream() {
117 if (!blocked_
&& IsClosed()) {
118 DVLOG(1) << "Passing up termination, as we've processed "
119 << num_bytes_consumed_
<< " of " << close_offset_
121 // Technically it's an error if num_bytes_consumed isn't exactly
122 // equal, but error handling seems silly at this point.
123 stream_
->OnFinRead();
124 buffered_frames_
.clear();
125 num_bytes_buffered_
= 0;
131 int QuicStreamSequencer::GetReadableRegions(iovec
* iov
, size_t iov_len
) {
133 FrameMap::iterator it
= buffered_frames_
.begin();
135 QuicStreamOffset offset
= num_bytes_consumed_
;
136 while (it
!= buffered_frames_
.end() && index
< iov_len
) {
137 if (it
->first
!= offset
) return index
;
139 iov
[index
].iov_base
= static_cast<void*>(
140 const_cast<char*>(it
->second
.data()));
141 iov
[index
].iov_len
= it
->second
.size();
142 offset
+= it
->second
.size();
150 int QuicStreamSequencer::Readv(const struct iovec
* iov
, size_t iov_len
) {
152 FrameMap::iterator it
= buffered_frames_
.begin();
153 size_t iov_index
= 0;
154 size_t iov_offset
= 0;
155 size_t frame_offset
= 0;
156 QuicStreamOffset initial_bytes_consumed
= num_bytes_consumed_
;
158 while (iov_index
< iov_len
&&
159 it
!= buffered_frames_
.end() &&
160 it
->first
== num_bytes_consumed_
) {
161 int bytes_to_read
= min(iov
[iov_index
].iov_len
- iov_offset
,
162 it
->second
.size() - frame_offset
);
164 char* iov_ptr
= static_cast<char*>(iov
[iov_index
].iov_base
) + iov_offset
;
166 it
->second
.data() + frame_offset
, bytes_to_read
);
167 frame_offset
+= bytes_to_read
;
168 iov_offset
+= bytes_to_read
;
170 if (iov
[iov_index
].iov_len
== iov_offset
) {
171 // We've filled this buffer.
175 if (it
->second
.size() == frame_offset
) {
176 // We've copied this whole frame
177 RecordBytesConsumed(it
->second
.size());
178 buffered_frames_
.erase(it
);
179 it
= buffered_frames_
.begin();
183 // We've finished copying. If we have a partial frame, update it.
184 if (frame_offset
!= 0) {
185 buffered_frames_
.insert(std::make_pair(it
->first
+ frame_offset
,
186 it
->second
.substr(frame_offset
)));
187 buffered_frames_
.erase(buffered_frames_
.begin());
188 RecordBytesConsumed(frame_offset
);
190 return static_cast<int>(num_bytes_consumed_
- initial_bytes_consumed
);
193 bool QuicStreamSequencer::HasBytesToRead() const {
194 FrameMap::const_iterator it
= buffered_frames_
.begin();
196 return it
!= buffered_frames_
.end() && it
->first
== num_bytes_consumed_
;
199 bool QuicStreamSequencer::IsClosed() const {
200 return num_bytes_consumed_
>= close_offset_
;
203 bool QuicStreamSequencer::FrameOverlapsBufferedData(
204 const QuicStreamFrame
& frame
) const {
205 if (buffered_frames_
.empty()) {
209 FrameMap::const_iterator next_frame
=
210 buffered_frames_
.lower_bound(frame
.offset
);
211 // Duplicate frames should have been dropped in IsDuplicate.
212 DCHECK(next_frame
== buffered_frames_
.end() ||
213 next_frame
->first
!= frame
.offset
);
215 // If there is a buffered frame with a higher starting offset, then we check
216 // to see if the new frame runs into the higher frame.
217 if (next_frame
!= buffered_frames_
.end() &&
218 (frame
.offset
+ frame
.data
.size()) > next_frame
->first
) {
219 DVLOG(1) << "New frame overlaps next frame: " << frame
.offset
<< " + "
220 << frame
.data
.size() << " > " << next_frame
->first
;
224 // If there is a buffered frame with a lower starting offset, then we check
225 // to see if the buffered frame runs into the new frame.
226 if (next_frame
!= buffered_frames_
.begin()) {
227 FrameMap::const_iterator preceeding_frame
= --next_frame
;
228 QuicStreamOffset offset
= preceeding_frame
->first
;
229 uint64 data_length
= preceeding_frame
->second
.length();
230 if ((offset
+ data_length
) > frame
.offset
) {
231 DVLOG(1) << "Preceeding frame overlaps new frame: " << offset
<< " + "
232 << data_length
<< " > " << frame
.offset
;
239 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame
& frame
) const {
240 // A frame is duplicate if the frame offset is smaller than our bytes consumed
241 // or we have stored the frame in our map.
242 // TODO(pwestin): Is it possible that a new frame contain more data even if
243 // the offset is the same?
244 return frame
.offset
< num_bytes_consumed_
||
245 buffered_frames_
.find(frame
.offset
) != buffered_frames_
.end();
248 void QuicStreamSequencer::SetBlockedUntilFlush() {
252 void QuicStreamSequencer::FlushBufferedFrames() {
254 FrameMap::iterator it
= buffered_frames_
.find(num_bytes_consumed_
);
255 while (it
!= buffered_frames_
.end()) {
256 DVLOG(1) << "Flushing buffered packet at offset " << it
->first
;
257 string
* data
= &it
->second
;
258 size_t bytes_consumed
= stream_
->ProcessRawData(data
->c_str(),
260 RecordBytesConsumed(bytes_consumed
);
261 if (MaybeCloseStream()) {
264 if (bytes_consumed
> data
->size()) {
265 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
); // Programming error
267 } else if (bytes_consumed
== data
->size()) {
268 buffered_frames_
.erase(it
);
269 it
= buffered_frames_
.find(num_bytes_consumed_
);
271 string new_data
= it
->second
.substr(bytes_consumed
);
272 buffered_frames_
.erase(it
);
273 buffered_frames_
.insert(std::make_pair(num_bytes_consumed_
, new_data
));
280 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed
) {
281 num_bytes_consumed_
+= bytes_consumed
;
282 num_bytes_buffered_
-= bytes_consumed
;
284 stream_
->AddBytesConsumed(bytes_consumed
);