1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_stream_sequencer.h"
10 #include "base/logging.h"
11 #include "net/quic/reliable_quic_stream.h"
14 using std::numeric_limits
;
18 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream
* quic_stream
)
19 : stream_(quic_stream
),
20 num_bytes_consumed_(0),
21 max_frame_memory_(numeric_limits
<size_t>::max()),
22 close_offset_(numeric_limits
<QuicStreamOffset
>::max()),
24 num_bytes_buffered_(0) {
27 QuicStreamSequencer::QuicStreamSequencer(size_t max_frame_memory
,
28 ReliableQuicStream
* quic_stream
)
29 : stream_(quic_stream
),
30 num_bytes_consumed_(0),
31 max_frame_memory_(max_frame_memory
),
32 close_offset_(numeric_limits
<QuicStreamOffset
>::max()),
34 num_bytes_buffered_(0) {
35 if (max_frame_memory
< kMaxPacketSize
) {
36 LOG(DFATAL
) << "Setting max frame memory to " << max_frame_memory
37 << ". Some frames will be impossible to handle.";
41 QuicStreamSequencer::~QuicStreamSequencer() {
44 bool QuicStreamSequencer::WillAcceptStreamFrame(
45 const QuicStreamFrame
& frame
) const {
46 size_t data_len
= frame
.data
.TotalBufferSize();
47 if (data_len
> max_frame_memory_
) {
48 LOG(DFATAL
) << "data_len: " << data_len
<< " > max_frame_memory_: "
53 if (IsDuplicate(frame
)) {
56 QuicStreamOffset byte_offset
= frame
.offset
;
57 if (data_len
> max_frame_memory_
) {
58 // We're never going to buffer this frame and we can't pass it up.
59 // The stream might only consume part of it and we'd need a partial ack.
61 // Ideally this should never happen, as we check that
62 // max_frame_memory_ > kMaxPacketSize and lower levels should reject
63 // frames larger than that.
66 if (byte_offset
+ data_len
- num_bytes_consumed_
> max_frame_memory_
) {
67 // We can buffer this but not right now. Toss it.
68 // It might be worth trying an experiment where we try best-effort buffering
74 bool QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame
& frame
) {
75 if (!WillAcceptStreamFrame(frame
)) {
76 // This should not happen, as WillAcceptFrame should be called before
77 // OnStreamFrame. Error handling should be done by the caller.
80 if (IsDuplicate(frame
)) {
81 // Silently ignore duplicates.
85 QuicStreamOffset byte_offset
= frame
.offset
;
86 size_t data_len
= frame
.data
.TotalBufferSize();
87 if (data_len
== 0 && !frame
.fin
) {
88 // Stream frames must have data or a fin flag.
89 stream_
->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME
,
90 "Empty stream frame without FIN set.");
95 CloseStreamAtOffset(frame
.offset
+ data_len
);
102 data
.AppendIovec(frame
.data
.iovec(), frame
.data
.Size());
104 // If the frame has arrived in-order then we can process it immediately, only
105 // buffering if the stream is unable to process it.
106 if (!blocked_
&& byte_offset
== num_bytes_consumed_
) {
107 DVLOG(1) << "Processing byte offset " << byte_offset
;
108 size_t bytes_consumed
= 0;
109 for (size_t i
= 0; i
< data
.Size(); ++i
) {
110 bytes_consumed
+= stream_
->ProcessRawData(
111 static_cast<char*>(data
.iovec()[i
].iov_base
),
112 data
.iovec()[i
].iov_len
);
114 num_bytes_consumed_
+= bytes_consumed
;
115 if (MaybeCloseStream()) {
118 if (bytes_consumed
> data_len
) {
119 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
);
121 } else if (bytes_consumed
== data_len
) {
122 FlushBufferedFrames();
123 return true; // it's safe to ack this frame.
125 // Set ourselves up to buffer what's left.
126 data_len
-= bytes_consumed
;
127 data
.Consume(bytes_consumed
);
128 byte_offset
+= bytes_consumed
;
132 // Buffer any remaining data to be consumed by the stream when ready.
133 for (size_t i
= 0; i
< data
.Size(); ++i
) {
134 DVLOG(1) << "Buffering stream data at offset " << byte_offset
;
135 const iovec
& iov
= data
.iovec()[i
];
136 frames_
.insert(make_pair(
137 byte_offset
, string(static_cast<char*>(iov
.iov_base
), iov
.iov_len
)));
138 byte_offset
+= iov
.iov_len
;
139 num_bytes_buffered_
+= iov
.iov_len
;
144 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset
) {
145 const QuicStreamOffset kMaxOffset
= numeric_limits
<QuicStreamOffset
>::max();
147 // If we have a scheduled termination or close, any new offset should match
149 if (close_offset_
!= kMaxOffset
&& offset
!= close_offset_
) {
150 stream_
->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS
);
154 close_offset_
= offset
;
159 bool QuicStreamSequencer::MaybeCloseStream() {
160 if (!blocked_
&& IsClosed()) {
161 DVLOG(1) << "Passing up termination, as we've processed "
162 << num_bytes_consumed_
<< " of " << close_offset_
164 // Technically it's an error if num_bytes_consumed isn't exactly
165 // equal, but error handling seems silly at this point.
166 stream_
->OnFinRead();
168 num_bytes_buffered_
= 0;
174 int QuicStreamSequencer::GetReadableRegions(iovec
* iov
, size_t iov_len
) {
176 FrameMap::iterator it
= frames_
.begin();
178 QuicStreamOffset offset
= num_bytes_consumed_
;
179 while (it
!= frames_
.end() && index
< iov_len
) {
180 if (it
->first
!= offset
) return index
;
182 iov
[index
].iov_base
= static_cast<void*>(
183 const_cast<char*>(it
->second
.data()));
184 iov
[index
].iov_len
= it
->second
.size();
185 offset
+= it
->second
.size();
193 int QuicStreamSequencer::Readv(const struct iovec
* iov
, size_t iov_len
) {
195 FrameMap::iterator it
= frames_
.begin();
196 size_t iov_index
= 0;
197 size_t iov_offset
= 0;
198 size_t frame_offset
= 0;
199 size_t initial_bytes_consumed
= num_bytes_consumed_
;
201 while (iov_index
< iov_len
&&
202 it
!= frames_
.end() &&
203 it
->first
== num_bytes_consumed_
) {
204 int bytes_to_read
= min(iov
[iov_index
].iov_len
- iov_offset
,
205 it
->second
.size() - frame_offset
);
207 char* iov_ptr
= static_cast<char*>(iov
[iov_index
].iov_base
) + iov_offset
;
209 it
->second
.data() + frame_offset
, bytes_to_read
);
210 frame_offset
+= bytes_to_read
;
211 iov_offset
+= bytes_to_read
;
213 if (iov
[iov_index
].iov_len
== iov_offset
) {
214 // We've filled this buffer.
218 if (it
->second
.size() == frame_offset
) {
219 // We've copied this whole frame
220 RecordBytesConsumed(it
->second
.size());
222 it
= frames_
.begin();
226 // We've finished copying. If we have a partial frame, update it.
227 if (frame_offset
!= 0) {
228 frames_
.insert(make_pair(it
->first
+ frame_offset
,
229 it
->second
.substr(frame_offset
)));
230 frames_
.erase(frames_
.begin());
231 RecordBytesConsumed(frame_offset
);
233 return num_bytes_consumed_
- initial_bytes_consumed
;
236 bool QuicStreamSequencer::HasBytesToRead() const {
237 FrameMap::const_iterator it
= frames_
.begin();
239 return it
!= frames_
.end() && it
->first
== num_bytes_consumed_
;
242 bool QuicStreamSequencer::IsClosed() const {
243 return num_bytes_consumed_
>= close_offset_
;
246 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame
& frame
) const {
247 // A frame is duplicate if the frame offset is smaller than our bytes consumed
248 // or we have stored the frame in our map.
249 // TODO(pwestin): Is it possible that a new frame contain more data even if
250 // the offset is the same?
251 return frame
.offset
< num_bytes_consumed_
||
252 frames_
.find(frame
.offset
) != frames_
.end();
255 void QuicStreamSequencer::SetBlockedUntilFlush() {
259 void QuicStreamSequencer::FlushBufferedFrames() {
261 FrameMap::iterator it
= frames_
.find(num_bytes_consumed_
);
262 while (it
!= frames_
.end()) {
263 DVLOG(1) << "Flushing buffered packet at offset " << it
->first
;
264 string
* data
= &it
->second
;
265 size_t bytes_consumed
= stream_
->ProcessRawData(data
->c_str(),
267 RecordBytesConsumed(bytes_consumed
);
268 if (MaybeCloseStream()) {
271 if (bytes_consumed
> data
->size()) {
272 stream_
->Reset(QUIC_ERROR_PROCESSING_STREAM
); // Programming error
274 } else if (bytes_consumed
== data
->size()) {
276 it
= frames_
.find(num_bytes_consumed_
);
278 string new_data
= it
->second
.substr(bytes_consumed
);
280 frames_
.insert(make_pair(num_bytes_consumed_
, new_data
));
287 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed
) {
288 num_bytes_consumed_
+= bytes_consumed
;
289 num_bytes_buffered_
-= bytes_consumed
;