Complete SyncMessageFilter initialization after SyncChannel initialization
[chromium-blink-merge.git] / net / quic / quic_stream_sequencer.cc
blob80c011f0e267999f789be015ec5cbbf13c202ec1
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_stream_sequencer.h"
7 #include <algorithm>
8 #include <limits>
9 #include <utility>
11 #include "base/logging.h"
12 #include "net/quic/reliable_quic_stream.h"
14 using std::min;
15 using std::numeric_limits;
16 using std::string;
18 namespace net {
20 QuicStreamSequencer::FrameData::FrameData(QuicStreamOffset offset,
21 string segment)
22 : offset(offset), segment(segment) {}
24 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream)
25 : stream_(quic_stream),
26 num_bytes_consumed_(0),
27 close_offset_(numeric_limits<QuicStreamOffset>::max()),
28 blocked_(false),
29 num_bytes_buffered_(0),
30 num_frames_received_(0),
31 num_duplicate_frames_received_(0),
32 num_early_frames_received_(0) {
35 QuicStreamSequencer::~QuicStreamSequencer() {
38 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
39 ++num_frames_received_;
40 FrameList::iterator insertion_point = FindInsertionPoint(frame);
41 if (IsDuplicate(frame, insertion_point)) {
42 ++num_duplicate_frames_received_;
43 // Silently ignore duplicates.
44 return;
47 if (FrameOverlapsBufferedData(frame, insertion_point)) {
48 stream_->CloseConnectionWithDetails(
49 QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data.");
50 return;
53 const QuicStreamOffset byte_offset = frame.offset;
54 const size_t data_len = frame.data.length();
55 if (data_len == 0 && !frame.fin) {
56 // Stream frames must have data or a fin flag.
57 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME,
58 "Empty stream frame without FIN set.");
59 return;
62 if (frame.fin) {
63 CloseStreamAtOffset(frame.offset + data_len);
64 if (data_len == 0) {
65 return;
69 if (byte_offset > num_bytes_consumed_) {
70 ++num_early_frames_received_;
73 DVLOG(1) << "Buffering stream data at offset " << byte_offset;
74 // Inserting an empty string and then copying to avoid the extra copy.
75 insertion_point =
76 buffered_frames_.insert(insertion_point, FrameData(byte_offset, ""));
77 frame.data.CopyToString(&insertion_point->segment);
78 num_bytes_buffered_ += data_len;
80 if (blocked_) {
81 return;
84 if (byte_offset == num_bytes_consumed_) {
85 stream_->OnDataAvailable();
89 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
90 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max();
92 // If there is a scheduled close, the new offset should match it.
93 if (close_offset_ != kMaxOffset && offset != close_offset_) {
94 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
95 return;
98 close_offset_ = offset;
100 MaybeCloseStream();
103 bool QuicStreamSequencer::MaybeCloseStream() {
104 if (!blocked_ && IsClosed()) {
105 DVLOG(1) << "Passing up termination, as we've processed "
106 << num_bytes_consumed_ << " of " << close_offset_
107 << " bytes.";
108 // Technically it's an error if num_bytes_consumed isn't exactly
109 // equal, but error handling seems silly at this point.
110 stream_->OnFinRead();
111 buffered_frames_.clear();
112 num_bytes_buffered_ = 0;
113 return true;
115 return false;
118 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const {
119 DCHECK(!blocked_);
120 FrameList::const_iterator it = buffered_frames_.begin();
121 size_t index = 0;
122 QuicStreamOffset offset = num_bytes_consumed_;
123 while (it != buffered_frames_.end() && index < iov_len) {
124 if (it->offset != offset) {
125 return index;
128 iov[index].iov_base =
129 static_cast<void*>(const_cast<char*>(it->segment.data()));
130 iov[index].iov_len = it->segment.size();
131 offset += it->segment.size();
133 ++index;
134 ++it;
136 return index;
139 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
140 DCHECK(!blocked_);
141 FrameList::iterator it = buffered_frames_.begin();
142 size_t iov_index = 0;
143 size_t iov_offset = 0;
144 size_t frame_offset = 0;
145 QuicStreamOffset initial_bytes_consumed = num_bytes_consumed_;
147 while (iov_index < iov_len && it != buffered_frames_.end() &&
148 it->offset == num_bytes_consumed_) {
149 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset,
150 it->segment.size() - frame_offset);
152 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset;
153 memcpy(iov_ptr, it->segment.data() + frame_offset, bytes_to_read);
154 frame_offset += bytes_to_read;
155 iov_offset += bytes_to_read;
157 if (iov[iov_index].iov_len == iov_offset) {
158 // We've filled this buffer.
159 iov_offset = 0;
160 ++iov_index;
162 if (it->segment.size() == frame_offset) {
163 // We've copied this whole frame
164 RecordBytesConsumed(it->segment.size());
165 buffered_frames_.erase(it);
166 it = buffered_frames_.begin();
167 frame_offset = 0;
170 // Done copying. If there is a partial frame, update it.
171 if (frame_offset != 0) {
172 buffered_frames_.push_front(
173 FrameData(it->offset + frame_offset, it->segment.substr(frame_offset)));
174 buffered_frames_.erase(it);
175 RecordBytesConsumed(frame_offset);
177 return static_cast<int>(num_bytes_consumed_ - initial_bytes_consumed);
180 bool QuicStreamSequencer::HasBytesToRead() const {
181 return !buffered_frames_.empty() &&
182 buffered_frames_.begin()->offset == num_bytes_consumed_;
185 bool QuicStreamSequencer::IsClosed() const {
186 return num_bytes_consumed_ >= close_offset_;
189 QuicStreamSequencer::FrameList::iterator
190 QuicStreamSequencer::FindInsertionPoint(const QuicStreamFrame& frame) {
191 if (buffered_frames_.empty()) {
192 return buffered_frames_.begin();
194 // If it's after all buffered_frames, return the end.
195 if (frame.offset >= (buffered_frames_.rbegin()->offset +
196 buffered_frames_.rbegin()->segment.length())) {
197 return buffered_frames_.end();
199 FrameList::iterator iter = buffered_frames_.begin();
200 // Only advance the iterator if the data begins after the already received
201 // frame. If the new frame overlaps with an existing frame, the iterator will
202 // still point to the frame it overlaps with.
203 while (iter != buffered_frames_.end() &&
204 frame.offset >= iter->offset + iter->segment.length()) {
205 ++iter;
207 return iter;
210 bool QuicStreamSequencer::FrameOverlapsBufferedData(
211 const QuicStreamFrame& frame,
212 FrameList::const_iterator insertion_point) const {
213 if (buffered_frames_.empty() || insertion_point == buffered_frames_.end()) {
214 return false;
216 // If there is a buffered frame with a higher starting offset, then check to
217 // see if the new frame overlaps the beginning of the higher frame.
218 if (frame.offset < insertion_point->offset &&
219 frame.offset + frame.data.length() > insertion_point->offset) {
220 DVLOG(1) << "New frame overlaps next frame: " << frame.offset << " + "
221 << frame.data.size() << " > " << insertion_point->offset;
222 return true;
224 // If there is a buffered frame with a lower starting offset, then check to
225 // see if the buffered frame runs into the new frame.
226 if (frame.offset >= insertion_point->offset &&
227 frame.offset <
228 insertion_point->offset + insertion_point->segment.length()) {
229 DVLOG(1) << "Preceeding frame overlaps new frame: "
230 << insertion_point->offset << " + "
231 << insertion_point->segment.length() << " > " << frame.offset;
232 return true;
235 return false;
238 void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) {
239 DCHECK(!blocked_);
240 size_t end_offset = num_bytes_consumed_ + num_bytes_consumed;
241 while (!buffered_frames_.empty() && end_offset != num_bytes_consumed_) {
242 FrameList::iterator it = buffered_frames_.begin();
243 if (it->offset != num_bytes_consumed_) {
244 LOG(DFATAL) << "Invalid argument to MarkConsumed. "
245 << " num_bytes_consumed_: " << num_bytes_consumed_
246 << " end_offset: " << end_offset << " offset: " << it->offset
247 << " length: " << it->segment.length();
248 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
249 return;
252 if (it->offset + it->segment.length() <= end_offset) {
253 RecordBytesConsumed(it->segment.length());
254 // This chunk is entirely consumed.
255 buffered_frames_.erase(it);
256 continue;
259 // Partially consume this frame.
260 size_t delta = end_offset - it->offset;
261 RecordBytesConsumed(delta);
262 string new_data = it->segment.substr(delta);
263 buffered_frames_.erase(it);
264 buffered_frames_.push_front(FrameData(num_bytes_consumed_, new_data));
265 break;
269 bool QuicStreamSequencer::IsDuplicate(
270 const QuicStreamFrame& frame,
271 FrameList::const_iterator insertion_point) const {
272 // A frame is duplicate if the frame offset is smaller than the bytes consumed
273 // or identical to an already received frame.
274 return frame.offset < num_bytes_consumed_ ||
275 (insertion_point != buffered_frames_.end() &&
276 frame.offset == insertion_point->offset);
279 void QuicStreamSequencer::SetBlockedUntilFlush() {
280 blocked_ = true;
283 void QuicStreamSequencer::SetUnblocked() {
284 blocked_ = false;
285 if (IsClosed() || HasBytesToRead()) {
286 stream_->OnDataAvailable();
290 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) {
291 num_bytes_consumed_ += bytes_consumed;
292 num_bytes_buffered_ -= bytes_consumed;
294 stream_->AddBytesConsumed(bytes_consumed);
297 } // namespace net