Add explicit |forceOnlineSignin| to user pod status
[chromium-blink-merge.git] / net / quic / quic_stream_sequencer.cc
blob3e6e317cf9d915302cadc9183204ea363736fdd1
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_stream_sequencer.h"
7 #include <algorithm>
8 #include <limits>
10 #include "base/logging.h"
11 #include "net/quic/reliable_quic_stream.h"
13 using std::min;
14 using std::numeric_limits;
16 namespace net {
18 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream)
19 : stream_(quic_stream),
20 num_bytes_consumed_(0),
21 max_frame_memory_(numeric_limits<size_t>::max()),
22 close_offset_(numeric_limits<QuicStreamOffset>::max()),
23 blocked_(false) {
26 QuicStreamSequencer::QuicStreamSequencer(size_t max_frame_memory,
27 ReliableQuicStream* quic_stream)
28 : stream_(quic_stream),
29 num_bytes_consumed_(0),
30 max_frame_memory_(max_frame_memory),
31 close_offset_(numeric_limits<QuicStreamOffset>::max()),
32 blocked_(false) {
33 if (max_frame_memory < kMaxPacketSize) {
34 LOG(DFATAL) << "Setting max frame memory to " << max_frame_memory
35 << ". Some frames will be impossible to handle.";
39 QuicStreamSequencer::~QuicStreamSequencer() {
42 bool QuicStreamSequencer::WillAcceptStreamFrame(
43 const QuicStreamFrame& frame) const {
44 size_t data_len = frame.data.TotalBufferSize();
45 DCHECK_LE(data_len, max_frame_memory_);
47 if (IsDuplicate(frame)) {
48 return true;
50 QuicStreamOffset byte_offset = frame.offset;
51 if (data_len > max_frame_memory_) {
52 // We're never going to buffer this frame and we can't pass it up.
53 // The stream might only consume part of it and we'd need a partial ack.
55 // Ideally this should never happen, as we check that
56 // max_frame_memory_ > kMaxPacketSize and lower levels should reject
57 // frames larger than that.
58 return false;
60 if (byte_offset + data_len - num_bytes_consumed_ > max_frame_memory_) {
61 // We can buffer this but not right now. Toss it.
62 // It might be worth trying an experiment where we try best-effort buffering
63 return false;
65 return true;
68 bool QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
69 if (!WillAcceptStreamFrame(frame)) {
70 // This should not happen, as WillAcceptFrame should be called before
71 // OnStreamFrame. Error handling should be done by the caller.
72 return false;
74 if (IsDuplicate(frame)) {
75 // Silently ignore duplicates.
76 return true;
79 QuicStreamOffset byte_offset = frame.offset;
80 size_t data_len = frame.data.TotalBufferSize();
81 if (data_len == 0 && !frame.fin) {
82 // Stream frames must have data or a fin flag.
83 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME,
84 "Empty stream frame without FIN set.");
85 return false;
88 if (frame.fin) {
89 CloseStreamAtOffset(frame.offset + data_len);
90 if (data_len == 0) {
91 return true;
95 IOVector data;
96 data.AppendIovec(frame.data.iovec(), frame.data.Size());
97 if (!blocked_ && byte_offset == num_bytes_consumed_) {
98 DVLOG(1) << "Processing byte offset " << byte_offset;
99 size_t bytes_consumed = 0;
100 for (size_t i = 0; i < data.Size(); ++i) {
101 bytes_consumed += stream_->ProcessRawData(
102 static_cast<char*>(data.iovec()[i].iov_base),
103 data.iovec()[i].iov_len);
105 num_bytes_consumed_ += bytes_consumed;
106 if (MaybeCloseStream()) {
107 return true;
109 if (bytes_consumed > data_len) {
110 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
111 return false;
112 } else if (bytes_consumed == data_len) {
113 FlushBufferedFrames();
114 return true; // it's safe to ack this frame.
115 } else {
116 // Set ourselves up to buffer what's left
117 data_len -= bytes_consumed;
118 data.Consume(bytes_consumed);
119 byte_offset += bytes_consumed;
122 for (size_t i = 0; i < data.Size(); ++i) {
123 DVLOG(1) << "Buffering stream data at offset " << byte_offset;
124 frames_.insert(make_pair(byte_offset, string(
125 static_cast<char*>(data.iovec()[i].iov_base),
126 data.iovec()[i].iov_len)));
127 byte_offset += data.iovec()[i].iov_len;
129 return true;
132 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
133 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max();
135 // If we have a scheduled termination or close, any new offset should match
136 // it.
137 if (close_offset_ != kMaxOffset && offset != close_offset_) {
138 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
139 return;
142 close_offset_ = offset;
144 MaybeCloseStream();
147 bool QuicStreamSequencer::MaybeCloseStream() {
148 if (!blocked_ && IsClosed()) {
149 DVLOG(1) << "Passing up termination, as we've processed "
150 << num_bytes_consumed_ << " of " << close_offset_
151 << " bytes.";
152 // Technically it's an error if num_bytes_consumed isn't exactly
153 // equal, but error handling seems silly at this point.
154 stream_->OnFinRead();
155 frames_.clear();
156 return true;
158 return false;
161 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) {
162 DCHECK(!blocked_);
163 FrameMap::iterator it = frames_.begin();
164 size_t index = 0;
165 QuicStreamOffset offset = num_bytes_consumed_;
166 while (it != frames_.end() && index < iov_len) {
167 if (it->first != offset) return index;
169 iov[index].iov_base = static_cast<void*>(
170 const_cast<char*>(it->second.data()));
171 iov[index].iov_len = it->second.size();
172 offset += it->second.size();
174 ++index;
175 ++it;
177 return index;
180 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
181 DCHECK(!blocked_);
182 FrameMap::iterator it = frames_.begin();
183 size_t iov_index = 0;
184 size_t iov_offset = 0;
185 size_t frame_offset = 0;
186 size_t initial_bytes_consumed = num_bytes_consumed_;
188 while (iov_index < iov_len &&
189 it != frames_.end() &&
190 it->first == num_bytes_consumed_) {
191 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset,
192 it->second.size() - frame_offset);
194 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset;
195 memcpy(iov_ptr,
196 it->second.data() + frame_offset, bytes_to_read);
197 frame_offset += bytes_to_read;
198 iov_offset += bytes_to_read;
200 if (iov[iov_index].iov_len == iov_offset) {
201 // We've filled this buffer.
202 iov_offset = 0;
203 ++iov_index;
205 if (it->second.size() == frame_offset) {
206 // We've copied this whole frame
207 num_bytes_consumed_ += it->second.size();
208 frames_.erase(it);
209 it = frames_.begin();
210 frame_offset = 0;
213 // We've finished copying. If we have a partial frame, update it.
214 if (frame_offset != 0) {
215 frames_.insert(make_pair(it->first + frame_offset,
216 it->second.substr(frame_offset)));
217 frames_.erase(frames_.begin());
218 num_bytes_consumed_ += frame_offset;
220 return num_bytes_consumed_ - initial_bytes_consumed;
223 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) {
224 DCHECK(!blocked_);
225 size_t end_offset = num_bytes_consumed_ + num_bytes_consumed;
226 while (!frames_.empty() && end_offset != num_bytes_consumed_) {
227 FrameMap::iterator it = frames_.begin();
228 if (it->first != num_bytes_consumed_) {
229 LOG(DFATAL) << "Invalid argument to MarkConsumed. "
230 << " num_bytes_consumed_: " << num_bytes_consumed_
231 << " end_offset: " << end_offset
232 << " offset: " << it->first
233 << " length: " << it->second.length();
234 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
235 return;
238 if (it->first + it->second.length() <= end_offset) {
239 num_bytes_consumed_ += it->second.length();
240 // This chunk is entirely consumed.
241 frames_.erase(it);
242 continue;
245 // Partially consume this frame.
246 size_t delta = end_offset - it->first;
247 num_bytes_consumed_ += delta;
248 frames_.insert(make_pair(end_offset, it->second.substr(delta)));
249 frames_.erase(it);
250 break;
254 bool QuicStreamSequencer::HasBytesToRead() const {
255 FrameMap::const_iterator it = frames_.begin();
257 return it != frames_.end() && it->first == num_bytes_consumed_;
260 bool QuicStreamSequencer::IsClosed() const {
261 return num_bytes_consumed_ >= close_offset_;
264 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const {
265 // A frame is duplicate if the frame offset is smaller than our bytes consumed
266 // or we have stored the frame in our map.
267 // TODO(pwestin): Is it possible that a new frame contain more data even if
268 // the offset is the same?
269 return frame.offset < num_bytes_consumed_ ||
270 frames_.find(frame.offset) != frames_.end();
273 void QuicStreamSequencer::SetBlockedUntilFlush() {
274 blocked_ = true;
277 void QuicStreamSequencer::FlushBufferedFrames() {
278 blocked_ = false;
279 FrameMap::iterator it = frames_.find(num_bytes_consumed_);
280 while (it != frames_.end()) {
281 DVLOG(1) << "Flushing buffered packet at offset " << it->first;
282 string* data = &it->second;
283 size_t bytes_consumed = stream_->ProcessRawData(data->c_str(),
284 data->size());
285 num_bytes_consumed_ += bytes_consumed;
286 if (MaybeCloseStream()) {
287 return;
289 if (bytes_consumed > data->size()) {
290 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); // Programming error
291 return;
292 } else if (bytes_consumed == data->size()) {
293 frames_.erase(it);
294 it = frames_.find(num_bytes_consumed_);
295 } else {
296 string new_data = it->second.substr(bytes_consumed);
297 frames_.erase(it);
298 frames_.insert(make_pair(num_bytes_consumed_, new_data));
299 return;
302 MaybeCloseStream();
305 } // namespace net