rAc - revert invalid suggestions to edit mode
[chromium-blink-merge.git] / net / quic / quic_stream_sequencer.cc
blob145a3778de5f433c5d6088a94bb0387fdcfb30a3
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_stream_sequencer.h"
7 #include <algorithm>
8 #include <limits>
10 #include "base/logging.h"
11 #include "net/quic/reliable_quic_stream.h"
13 using std::min;
14 using std::numeric_limits;
16 namespace net {
18 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream)
19 : stream_(quic_stream),
20 num_bytes_consumed_(0),
21 max_frame_memory_(numeric_limits<size_t>::max()),
22 close_offset_(numeric_limits<QuicStreamOffset>::max()),
23 blocked_(false),
24 num_bytes_buffered_(0) {
27 QuicStreamSequencer::QuicStreamSequencer(size_t max_frame_memory,
28 ReliableQuicStream* quic_stream)
29 : stream_(quic_stream),
30 num_bytes_consumed_(0),
31 max_frame_memory_(max_frame_memory),
32 close_offset_(numeric_limits<QuicStreamOffset>::max()),
33 blocked_(false),
34 num_bytes_buffered_(0) {
35 if (max_frame_memory < kMaxPacketSize) {
36 LOG(DFATAL) << "Setting max frame memory to " << max_frame_memory
37 << ". Some frames will be impossible to handle.";
41 QuicStreamSequencer::~QuicStreamSequencer() {
44 bool QuicStreamSequencer::WillAcceptStreamFrame(
45 const QuicStreamFrame& frame) const {
46 size_t data_len = frame.data.TotalBufferSize();
47 if (data_len > max_frame_memory_) {
48 LOG(DFATAL) << "data_len: " << data_len << " > max_frame_memory_: "
49 << max_frame_memory_;
50 return false;
53 if (IsDuplicate(frame)) {
54 return true;
56 QuicStreamOffset byte_offset = frame.offset;
57 if (data_len > max_frame_memory_) {
58 // We're never going to buffer this frame and we can't pass it up.
59 // The stream might only consume part of it and we'd need a partial ack.
61 // Ideally this should never happen, as we check that
62 // max_frame_memory_ > kMaxPacketSize and lower levels should reject
63 // frames larger than that.
64 return false;
66 if (byte_offset + data_len - num_bytes_consumed_ > max_frame_memory_) {
67 // We can buffer this but not right now. Toss it.
68 // It might be worth trying an experiment where we try best-effort buffering
69 return false;
71 return true;
74 bool QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
75 if (!WillAcceptStreamFrame(frame)) {
76 // This should not happen, as WillAcceptFrame should be called before
77 // OnStreamFrame. Error handling should be done by the caller.
78 return false;
80 if (IsDuplicate(frame)) {
81 // Silently ignore duplicates.
82 return true;
85 QuicStreamOffset byte_offset = frame.offset;
86 size_t data_len = frame.data.TotalBufferSize();
87 if (data_len == 0 && !frame.fin) {
88 // Stream frames must have data or a fin flag.
89 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME,
90 "Empty stream frame without FIN set.");
91 return false;
94 if (frame.fin) {
95 CloseStreamAtOffset(frame.offset + data_len);
96 if (data_len == 0) {
97 return true;
101 IOVector data;
102 data.AppendIovec(frame.data.iovec(), frame.data.Size());
104 // If the frame has arrived in-order then we can process it immediately, only
105 // buffering if the stream is unable to process it.
106 if (!blocked_ && byte_offset == num_bytes_consumed_) {
107 DVLOG(1) << "Processing byte offset " << byte_offset;
108 size_t bytes_consumed = 0;
109 for (size_t i = 0; i < data.Size(); ++i) {
110 bytes_consumed += stream_->ProcessRawData(
111 static_cast<char*>(data.iovec()[i].iov_base),
112 data.iovec()[i].iov_len);
114 num_bytes_consumed_ += bytes_consumed;
115 if (MaybeCloseStream()) {
116 return true;
118 if (bytes_consumed > data_len) {
119 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
120 return false;
121 } else if (bytes_consumed == data_len) {
122 FlushBufferedFrames();
123 return true; // it's safe to ack this frame.
124 } else {
125 // Set ourselves up to buffer what's left.
126 data_len -= bytes_consumed;
127 data.Consume(bytes_consumed);
128 byte_offset += bytes_consumed;
132 // Buffer any remaining data to be consumed by the stream when ready.
133 for (size_t i = 0; i < data.Size(); ++i) {
134 DVLOG(1) << "Buffering stream data at offset " << byte_offset;
135 const iovec& iov = data.iovec()[i];
136 frames_.insert(make_pair(
137 byte_offset, string(static_cast<char*>(iov.iov_base), iov.iov_len)));
138 byte_offset += iov.iov_len;
139 num_bytes_buffered_ += iov.iov_len;
141 return true;
144 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
145 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max();
147 // If we have a scheduled termination or close, any new offset should match
148 // it.
149 if (close_offset_ != kMaxOffset && offset != close_offset_) {
150 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
151 return;
154 close_offset_ = offset;
156 MaybeCloseStream();
159 bool QuicStreamSequencer::MaybeCloseStream() {
160 if (!blocked_ && IsClosed()) {
161 DVLOG(1) << "Passing up termination, as we've processed "
162 << num_bytes_consumed_ << " of " << close_offset_
163 << " bytes.";
164 // Technically it's an error if num_bytes_consumed isn't exactly
165 // equal, but error handling seems silly at this point.
166 stream_->OnFinRead();
167 frames_.clear();
168 num_bytes_buffered_ = 0;
169 return true;
171 return false;
174 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) {
175 DCHECK(!blocked_);
176 FrameMap::iterator it = frames_.begin();
177 size_t index = 0;
178 QuicStreamOffset offset = num_bytes_consumed_;
179 while (it != frames_.end() && index < iov_len) {
180 if (it->first != offset) return index;
182 iov[index].iov_base = static_cast<void*>(
183 const_cast<char*>(it->second.data()));
184 iov[index].iov_len = it->second.size();
185 offset += it->second.size();
187 ++index;
188 ++it;
190 return index;
193 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
194 DCHECK(!blocked_);
195 FrameMap::iterator it = frames_.begin();
196 size_t iov_index = 0;
197 size_t iov_offset = 0;
198 size_t frame_offset = 0;
199 size_t initial_bytes_consumed = num_bytes_consumed_;
201 while (iov_index < iov_len &&
202 it != frames_.end() &&
203 it->first == num_bytes_consumed_) {
204 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset,
205 it->second.size() - frame_offset);
207 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset;
208 memcpy(iov_ptr,
209 it->second.data() + frame_offset, bytes_to_read);
210 frame_offset += bytes_to_read;
211 iov_offset += bytes_to_read;
213 if (iov[iov_index].iov_len == iov_offset) {
214 // We've filled this buffer.
215 iov_offset = 0;
216 ++iov_index;
218 if (it->second.size() == frame_offset) {
219 // We've copied this whole frame
220 RecordBytesConsumed(it->second.size());
221 frames_.erase(it);
222 it = frames_.begin();
223 frame_offset = 0;
226 // We've finished copying. If we have a partial frame, update it.
227 if (frame_offset != 0) {
228 frames_.insert(make_pair(it->first + frame_offset,
229 it->second.substr(frame_offset)));
230 frames_.erase(frames_.begin());
231 RecordBytesConsumed(frame_offset);
233 return num_bytes_consumed_ - initial_bytes_consumed;
236 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) {
237 DCHECK(!blocked_);
238 size_t end_offset = num_bytes_consumed_ + num_bytes_consumed;
239 while (!frames_.empty() && end_offset != num_bytes_consumed_) {
240 FrameMap::iterator it = frames_.begin();
241 if (it->first != num_bytes_consumed_) {
242 LOG(DFATAL) << "Invalid argument to MarkConsumed. "
243 << " num_bytes_consumed_: " << num_bytes_consumed_
244 << " end_offset: " << end_offset
245 << " offset: " << it->first
246 << " length: " << it->second.length();
247 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
248 return;
251 if (it->first + it->second.length() <= end_offset) {
252 num_bytes_consumed_ += it->second.length();
253 num_bytes_buffered_ -= it->second.length();
254 // This chunk is entirely consumed.
255 frames_.erase(it);
256 continue;
259 // Partially consume this frame.
260 size_t delta = end_offset - it->first;
261 RecordBytesConsumed(delta);
262 frames_.insert(make_pair(end_offset, it->second.substr(delta)));
263 frames_.erase(it);
264 break;
268 bool QuicStreamSequencer::HasBytesToRead() const {
269 FrameMap::const_iterator it = frames_.begin();
271 return it != frames_.end() && it->first == num_bytes_consumed_;
274 bool QuicStreamSequencer::IsClosed() const {
275 return num_bytes_consumed_ >= close_offset_;
278 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const {
279 // A frame is duplicate if the frame offset is smaller than our bytes consumed
280 // or we have stored the frame in our map.
281 // TODO(pwestin): Is it possible that a new frame contain more data even if
282 // the offset is the same?
283 return frame.offset < num_bytes_consumed_ ||
284 frames_.find(frame.offset) != frames_.end();
287 void QuicStreamSequencer::SetBlockedUntilFlush() {
288 blocked_ = true;
291 void QuicStreamSequencer::FlushBufferedFrames() {
292 blocked_ = false;
293 FrameMap::iterator it = frames_.find(num_bytes_consumed_);
294 while (it != frames_.end()) {
295 DVLOG(1) << "Flushing buffered packet at offset " << it->first;
296 string* data = &it->second;
297 size_t bytes_consumed = stream_->ProcessRawData(data->c_str(),
298 data->size());
299 RecordBytesConsumed(bytes_consumed);
300 if (MaybeCloseStream()) {
301 return;
303 if (bytes_consumed > data->size()) {
304 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); // Programming error
305 return;
306 } else if (bytes_consumed == data->size()) {
307 frames_.erase(it);
308 it = frames_.find(num_bytes_consumed_);
309 } else {
310 string new_data = it->second.substr(bytes_consumed);
311 frames_.erase(it);
312 frames_.insert(make_pair(num_bytes_consumed_, new_data));
313 return;
316 MaybeCloseStream();
319 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) {
320 num_bytes_consumed_ += bytes_consumed;
321 num_bytes_buffered_ -= bytes_consumed;
324 } // namespace net