Add details (where missing) for histograms and remove a few that are not worth provid...
[chromium-blink-merge.git] / net / quic / reliable_quic_stream.cc
blob218bb09619d8831fe11d0981cc4fff0066ccef8b
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/reliable_quic_stream.h"
7 #include "base/logging.h"
8 #include "net/quic/iovector.h"
9 #include "net/quic/quic_flow_controller.h"
10 #include "net/quic/quic_session.h"
11 #include "net/quic/quic_write_blocked_list.h"
13 using base::StringPiece;
14 using std::min;
16 namespace net {
18 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
20 namespace {
22 struct iovec MakeIovec(StringPiece data) {
23 struct iovec iov = {const_cast<char*>(data.data()),
24 static_cast<size_t>(data.size())};
25 return iov;
28 } // namespace
30 // Wrapper that aggregates OnAckNotifications for packets sent using
31 // WriteOrBufferData and delivers them to the original
32 // QuicAckNotifier::DelegateInterface after all bytes written using
33 // WriteOrBufferData are acked. This level of indirection is
34 // necessary because the delegate interface provides no mechanism that
35 // WriteOrBufferData can use to inform it that the write required
36 // multiple WritevData calls or that only part of the data has been
37 // sent out by the time ACKs start arriving.
38 class ReliableQuicStream::ProxyAckNotifierDelegate
39 : public QuicAckNotifier::DelegateInterface {
40 public:
41 explicit ProxyAckNotifierDelegate(DelegateInterface* delegate)
42 : delegate_(delegate),
43 pending_acks_(0),
44 wrote_last_data_(false),
45 num_original_packets_(0),
46 num_original_bytes_(0),
47 num_retransmitted_packets_(0),
48 num_retransmitted_bytes_(0) {
51 virtual void OnAckNotification(int num_original_packets,
52 int num_original_bytes,
53 int num_retransmitted_packets,
54 int num_retransmitted_bytes,
55 QuicTime::Delta delta_largest_observed)
56 OVERRIDE {
57 DCHECK_LT(0, pending_acks_);
58 --pending_acks_;
59 num_original_packets_ += num_original_packets;
60 num_original_bytes_ += num_original_bytes;
61 num_retransmitted_packets_ += num_retransmitted_packets;
62 num_retransmitted_bytes_ += num_retransmitted_bytes;
64 if (wrote_last_data_ && pending_acks_ == 0) {
65 delegate_->OnAckNotification(num_original_packets_,
66 num_original_bytes_,
67 num_retransmitted_packets_,
68 num_retransmitted_bytes_,
69 delta_largest_observed);
73 void WroteData(bool last_data) {
74 DCHECK(!wrote_last_data_);
75 ++pending_acks_;
76 wrote_last_data_ = last_data;
79 protected:
80 // Delegates are ref counted.
81 virtual ~ProxyAckNotifierDelegate() {
84 private:
85 // Original delegate. delegate_->OnAckNotification will be called when:
86 // wrote_last_data_ == true and pending_acks_ == 0
87 scoped_refptr<DelegateInterface> delegate_;
89 // Number of outstanding acks.
90 int pending_acks_;
92 // True if no pending writes remain.
93 bool wrote_last_data_;
95 // Accumulators.
96 int num_original_packets_;
97 int num_original_bytes_;
98 int num_retransmitted_packets_;
99 int num_retransmitted_bytes_;
101 DISALLOW_COPY_AND_ASSIGN(ProxyAckNotifierDelegate);
104 ReliableQuicStream::PendingData::PendingData(
105 string data_in, scoped_refptr<ProxyAckNotifierDelegate> delegate_in)
106 : data(data_in), delegate(delegate_in) {
109 ReliableQuicStream::PendingData::~PendingData() {
112 ReliableQuicStream::ReliableQuicStream(QuicStreamId id, QuicSession* session)
113 : sequencer_(this),
114 id_(id),
115 session_(session),
116 stream_bytes_read_(0),
117 stream_bytes_written_(0),
118 stream_error_(QUIC_STREAM_NO_ERROR),
119 connection_error_(QUIC_NO_ERROR),
120 read_side_closed_(false),
121 write_side_closed_(false),
122 fin_buffered_(false),
123 fin_sent_(false),
124 rst_sent_(false),
125 is_server_(session_->is_server()),
126 flow_controller_(
127 session_->connection()->version(),
128 id_,
129 is_server_,
130 session_->config()->HasReceivedInitialFlowControlWindowBytes() ?
131 session_->config()->ReceivedInitialFlowControlWindowBytes() :
132 kDefaultFlowControlSendWindow,
133 session_->connection()->max_flow_control_receive_window_bytes(),
134 session_->connection()->max_flow_control_receive_window_bytes()),
135 connection_flow_controller_(session_->connection()->flow_controller()) {
138 ReliableQuicStream::~ReliableQuicStream() {
141 bool ReliableQuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
142 if (read_side_closed_) {
143 DVLOG(1) << ENDPOINT << "Ignoring frame " << frame.stream_id;
144 // We don't want to be reading: blackhole the data.
145 return true;
148 if (frame.stream_id != id_) {
149 LOG(ERROR) << "Error!";
150 return false;
153 // This count include duplicate data received.
154 stream_bytes_read_ += frame.data.TotalBufferSize();
156 bool accepted = sequencer_.OnStreamFrame(frame);
158 if (flow_controller_.FlowControlViolation() ||
159 connection_flow_controller_->FlowControlViolation()) {
160 session_->connection()->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR);
161 return false;
163 MaybeSendWindowUpdate();
165 return accepted;
168 void ReliableQuicStream::MaybeSendWindowUpdate() {
169 flow_controller_.MaybeSendWindowUpdate(session()->connection());
170 connection_flow_controller_->MaybeSendWindowUpdate(session()->connection());
173 int ReliableQuicStream::num_frames_received() const {
174 return sequencer_.num_frames_received();
177 int ReliableQuicStream::num_duplicate_frames_received() const {
178 return sequencer_.num_duplicate_frames_received();
181 void ReliableQuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
182 stream_error_ = frame.error_code;
183 CloseWriteSide();
184 CloseReadSide();
187 void ReliableQuicStream::OnConnectionClosed(QuicErrorCode error,
188 bool from_peer) {
189 if (read_side_closed_ && write_side_closed_) {
190 return;
192 if (error != QUIC_NO_ERROR) {
193 stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
194 connection_error_ = error;
197 CloseWriteSide();
198 CloseReadSide();
201 void ReliableQuicStream::OnFinRead() {
202 DCHECK(sequencer_.IsClosed());
203 CloseReadSide();
206 void ReliableQuicStream::Reset(QuicRstStreamErrorCode error) {
207 DCHECK_NE(QUIC_STREAM_NO_ERROR, error);
208 stream_error_ = error;
209 // Sending a RstStream results in calling CloseStream.
210 session()->SendRstStream(id(), error, stream_bytes_written_);
211 rst_sent_ = true;
214 void ReliableQuicStream::CloseConnection(QuicErrorCode error) {
215 session()->connection()->SendConnectionClose(error);
218 void ReliableQuicStream::CloseConnectionWithDetails(QuicErrorCode error,
219 const string& details) {
220 session()->connection()->SendConnectionCloseWithDetails(error, details);
223 QuicVersion ReliableQuicStream::version() const {
224 return session()->connection()->version();
227 void ReliableQuicStream::WriteOrBufferData(
228 StringPiece data,
229 bool fin,
230 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
231 if (data.empty() && !fin) {
232 LOG(DFATAL) << "data.empty() && !fin";
233 return;
236 if (fin_buffered_) {
237 LOG(DFATAL) << "Fin already buffered";
238 return;
241 scoped_refptr<ProxyAckNotifierDelegate> proxy_delegate;
242 if (ack_notifier_delegate != NULL) {
243 proxy_delegate = new ProxyAckNotifierDelegate(ack_notifier_delegate);
246 QuicConsumedData consumed_data(0, false);
247 fin_buffered_ = fin;
249 if (queued_data_.empty()) {
250 struct iovec iov(MakeIovec(data));
251 consumed_data = WritevData(&iov, 1, fin, proxy_delegate.get());
252 DCHECK_LE(consumed_data.bytes_consumed, data.length());
255 bool write_completed;
256 // If there's unconsumed data or an unconsumed fin, queue it.
257 if (consumed_data.bytes_consumed < data.length() ||
258 (fin && !consumed_data.fin_consumed)) {
259 StringPiece remainder(data.substr(consumed_data.bytes_consumed));
260 queued_data_.push_back(PendingData(remainder.as_string(), proxy_delegate));
261 write_completed = false;
262 } else {
263 write_completed = true;
266 if ((proxy_delegate.get() != NULL) &&
267 (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed)) {
268 proxy_delegate->WroteData(write_completed);
272 void ReliableQuicStream::OnCanWrite() {
273 bool fin = false;
274 while (!queued_data_.empty()) {
275 PendingData* pending_data = &queued_data_.front();
276 ProxyAckNotifierDelegate* delegate = pending_data->delegate.get();
277 if (queued_data_.size() == 1 && fin_buffered_) {
278 fin = true;
280 struct iovec iov(MakeIovec(pending_data->data));
281 QuicConsumedData consumed_data = WritevData(&iov, 1, fin, delegate);
282 if (consumed_data.bytes_consumed == pending_data->data.size() &&
283 fin == consumed_data.fin_consumed) {
284 queued_data_.pop_front();
285 if (delegate != NULL) {
286 delegate->WroteData(true);
288 } else {
289 if (consumed_data.bytes_consumed > 0) {
290 pending_data->data.erase(0, consumed_data.bytes_consumed);
291 if (delegate != NULL) {
292 delegate->WroteData(false);
295 break;
300 void ReliableQuicStream::MaybeSendBlocked() {
301 flow_controller_.MaybeSendBlocked(session()->connection());
302 connection_flow_controller_->MaybeSendBlocked(session()->connection());
303 // If we are connection level flow control blocked, then add the stream
304 // to the write blocked list. It will be given a chance to write when a
305 // connection level WINDOW_UPDATE arrives.
306 if (connection_flow_controller_->IsBlocked() &&
307 !flow_controller_.IsBlocked()) {
308 session_->MarkWriteBlocked(id(), EffectivePriority());
312 QuicConsumedData ReliableQuicStream::WritevData(
313 const struct iovec* iov,
314 int iov_count,
315 bool fin,
316 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
317 if (write_side_closed_) {
318 DLOG(ERROR) << ENDPOINT << "Attempt to write when the write side is closed";
319 return QuicConsumedData(0, false);
322 // How much data we want to write.
323 size_t write_length = TotalIovecLength(iov, iov_count);
325 // A FIN with zero data payload should not be flow control blocked.
326 bool fin_with_zero_data = (fin && write_length == 0);
328 if (flow_controller_.IsEnabled()) {
329 // How much data we are allowed to write from flow control.
330 uint64 send_window = flow_controller_.SendWindowSize();
331 if (connection_flow_controller_->IsEnabled()) {
332 send_window =
333 min(send_window, connection_flow_controller_->SendWindowSize());
336 if (send_window == 0 && !fin_with_zero_data) {
337 // Quick return if we can't send anything.
338 MaybeSendBlocked();
339 return QuicConsumedData(0, false);
342 if (write_length > send_window) {
343 // Don't send the FIN if we aren't going to send all the data.
344 fin = false;
346 // Writing more data would be a violation of flow control.
347 write_length = send_window;
351 // Fill an IOVector with bytes from the iovec.
352 IOVector data;
353 data.AppendIovecAtMostBytes(iov, iov_count, write_length);
355 QuicConsumedData consumed_data = session()->WritevData(
356 id(), data, stream_bytes_written_, fin, ack_notifier_delegate);
357 stream_bytes_written_ += consumed_data.bytes_consumed;
359 AddBytesSent(consumed_data.bytes_consumed);
361 if (consumed_data.bytes_consumed == write_length) {
362 if (!fin_with_zero_data) {
363 MaybeSendBlocked();
365 if (fin && consumed_data.fin_consumed) {
366 fin_sent_ = true;
367 CloseWriteSide();
368 } else if (fin && !consumed_data.fin_consumed) {
369 session_->MarkWriteBlocked(id(), EffectivePriority());
371 } else {
372 session_->MarkWriteBlocked(id(), EffectivePriority());
374 return consumed_data;
377 void ReliableQuicStream::CloseReadSide() {
378 if (read_side_closed_) {
379 return;
381 DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
383 read_side_closed_ = true;
384 if (write_side_closed_) {
385 DVLOG(1) << ENDPOINT << "Closing stream: " << id();
386 session_->CloseStream(id());
390 void ReliableQuicStream::CloseWriteSide() {
391 if (write_side_closed_) {
392 return;
394 DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
396 write_side_closed_ = true;
397 if (read_side_closed_) {
398 DVLOG(1) << ENDPOINT << "Closing stream: " << id();
399 session_->CloseStream(id());
403 bool ReliableQuicStream::HasBufferedData() const {
404 return !queued_data_.empty();
407 void ReliableQuicStream::OnClose() {
408 CloseReadSide();
409 CloseWriteSide();
411 if (!fin_sent_ && !rst_sent_) {
412 // For flow control accounting, we must tell the peer how many bytes we have
413 // written on this stream before termination. Done here if needed, using a
414 // RST frame.
415 DVLOG(1) << ENDPOINT << "Sending RST in OnClose: " << id();
416 session_->SendRstStream(id(), QUIC_RST_FLOW_CONTROL_ACCOUNTING,
417 stream_bytes_written_);
418 rst_sent_ = true;
422 void ReliableQuicStream::OnWindowUpdateFrame(
423 const QuicWindowUpdateFrame& frame) {
424 if (!flow_controller_.IsEnabled()) {
425 DLOG(DFATAL) << "Flow control not enabled! " << version();
426 return;
429 if (flow_controller_.UpdateSendWindowOffset(frame.byte_offset)) {
430 // We can write again!
431 // TODO(rjshade): This does not respect priorities (e.g. multiple
432 // outstanding POSTs are unblocked on arrival of
433 // SHLO with initial window).
434 // As long as the connection is not flow control blocked, we can write!
435 OnCanWrite();
439 void ReliableQuicStream::AddBytesBuffered(uint64 bytes) {
440 if (flow_controller_.IsEnabled()) {
441 flow_controller_.AddBytesBuffered(bytes);
442 connection_flow_controller_->AddBytesBuffered(bytes);
446 void ReliableQuicStream::RemoveBytesBuffered(uint64 bytes) {
447 if (flow_controller_.IsEnabled()) {
448 flow_controller_.RemoveBytesBuffered(bytes);
449 connection_flow_controller_->RemoveBytesBuffered(bytes);
453 void ReliableQuicStream::AddBytesSent(uint64 bytes) {
454 if (flow_controller_.IsEnabled()) {
455 flow_controller_.AddBytesSent(bytes);
456 connection_flow_controller_->AddBytesSent(bytes);
460 void ReliableQuicStream::AddBytesConsumed(uint64 bytes) {
461 if (flow_controller_.IsEnabled()) {
462 flow_controller_.AddBytesConsumed(bytes);
463 connection_flow_controller_->AddBytesConsumed(bytes);
467 bool ReliableQuicStream::IsFlowControlBlocked() {
468 return flow_controller_.IsBlocked() ||
469 connection_flow_controller_->IsBlocked();
472 } // namespace net