Add explicit |forceOnlineSignin| to user pod status
[chromium-blink-merge.git] / net / quic / quic_session.cc
blob9be3a3bdd478c7bd8775575a6f46a0dcd5343501
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_session.h"
7 #include "base/stl_util.h"
8 #include "net/quic/crypto/proof_verifier.h"
9 #include "net/quic/quic_connection.h"
10 #include "net/quic/quic_headers_stream.h"
11 #include "net/ssl/ssl_info.h"
13 using base::StringPiece;
14 using base::hash_map;
15 using base::hash_set;
16 using std::make_pair;
17 using std::vector;
19 namespace net {
21 const size_t kMaxPrematurelyClosedStreamsTracked = 20;
22 const size_t kMaxZombieStreams = 20;
24 #define ENDPOINT (is_server() ? "Server: " : " Client: ")
26 // We want to make sure we delete any closed streams in a safe manner.
27 // To avoid deleting a stream in mid-operation, we have a simple shim between
28 // us and the stream, so we can delete any streams when we return from
29 // processing.
31 // We could just override the base methods, but this makes it easier to make
32 // sure we don't miss any.
33 class VisitorShim : public QuicConnectionVisitorInterface {
34 public:
35 explicit VisitorShim(QuicSession* session) : session_(session) {}
37 virtual bool OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE {
38 bool accepted = session_->OnStreamFrames(frames);
39 session_->PostProcessAfterData();
40 return accepted;
42 virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE {
43 session_->OnRstStream(frame);
44 session_->PostProcessAfterData();
47 virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE {
48 session_->OnGoAway(frame);
49 session_->PostProcessAfterData();
52 virtual bool OnCanWrite() OVERRIDE {
53 bool rc = session_->OnCanWrite();
54 session_->PostProcessAfterData();
55 return rc;
58 virtual void OnSuccessfulVersionNegotiation(
59 const QuicVersion& version) OVERRIDE {
60 session_->OnSuccessfulVersionNegotiation(version);
63 virtual void OnConfigNegotiated() OVERRIDE {
64 session_->OnConfigNegotiated();
67 virtual void OnConnectionClosed(
68 QuicErrorCode error, bool from_peer) OVERRIDE {
69 session_->OnConnectionClosed(error, from_peer);
70 // The session will go away, so don't bother with cleanup.
73 virtual void OnWriteBlocked() OVERRIDE {
74 session_->OnWriteBlocked();
77 virtual bool HasPendingHandshake() const OVERRIDE {
78 return session_->HasPendingHandshake();
81 private:
82 QuicSession* session_;
85 QuicSession::QuicSession(QuicConnection* connection,
86 const QuicConfig& config)
87 : connection_(connection),
88 visitor_shim_(new VisitorShim(this)),
89 config_(config),
90 max_open_streams_(config_.max_streams_per_connection()),
91 next_stream_id_(is_server() ? 2 : 3),
92 largest_peer_created_stream_id_(0),
93 error_(QUIC_NO_ERROR),
94 goaway_received_(false),
95 goaway_sent_(false),
96 has_pending_handshake_(false) {
98 connection_->set_visitor(visitor_shim_.get());
99 connection_->SetFromConfig(config_);
100 if (connection_->connected()) {
101 connection_->SetOverallConnectionTimeout(
102 config_.max_time_before_crypto_handshake());
104 if (connection_->version() > QUIC_VERSION_12) {
105 headers_stream_.reset(new QuicHeadersStream(this));
106 if (!is_server()) {
107 // For version above QUIC v12, the headers stream is stream 3, so the
108 // next available local stream ID should be 5.
109 DCHECK_EQ(kHeadersStreamId, next_stream_id_);
110 next_stream_id_ += 2;
115 QuicSession::~QuicSession() {
116 STLDeleteElements(&closed_streams_);
117 STLDeleteValues(&stream_map_);
120 bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) {
121 for (size_t i = 0; i < frames.size(); ++i) {
122 // TODO(rch) deal with the error case of stream id 0
123 if (IsClosedStream(frames[i].stream_id)) {
124 // If we get additional frames for a stream where we didn't process
125 // headers, it's highly likely our compression context will end up
126 // permanently out of sync with the peer's, so we give up and close the
127 // connection.
128 if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) {
129 connection()->SendConnectionClose(
130 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
131 return false;
133 continue;
136 ReliableQuicStream* stream = GetStream(frames[i].stream_id);
137 if (stream == NULL) return false;
138 if (!stream->WillAcceptStreamFrame(frames[i])) return false;
140 // TODO(alyssar) check against existing connection address: if changed, make
141 // sure we update the connection.
144 for (size_t i = 0; i < frames.size(); ++i) {
145 QuicStreamId stream_id = frames[i].stream_id;
146 ReliableQuicStream* stream = GetStream(stream_id);
147 if (!stream) {
148 continue;
150 stream->OnStreamFrame(frames[i]);
152 // If the stream is a data stream had been prematurely closed, and the
153 // headers are now decompressed, then we are finally finished
154 // with this stream.
155 if (ContainsKey(zombie_streams_, stream_id) &&
156 static_cast<QuicDataStream*>(stream)->headers_decompressed()) {
157 CloseZombieStream(stream_id);
161 while (!decompression_blocked_streams_.empty()) {
162 QuicHeaderId header_id = decompression_blocked_streams_.begin()->first;
163 if (header_id != decompressor_.current_header_id()) {
164 break;
166 QuicStreamId stream_id = decompression_blocked_streams_.begin()->second;
167 decompression_blocked_streams_.erase(header_id);
168 QuicDataStream* stream = GetDataStream(stream_id);
169 if (!stream) {
170 connection()->SendConnectionClose(
171 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
172 return false;
174 stream->OnDecompressorAvailable();
176 return true;
179 void QuicSession::OnStreamHeaders(QuicStreamId stream_id,
180 StringPiece headers_data) {
181 QuicDataStream* stream = GetDataStream(stream_id);
182 if (!stream) {
183 // It's quite possible to receive headers after a stream has been reset.
184 return;
186 stream->OnStreamHeaders(headers_data);
189 void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id,
190 QuicPriority priority) {
191 QuicDataStream* stream = GetDataStream(stream_id);
192 if (!stream) {
193 // It's quite possible to receive headers after a stream has been reset.
194 return;
196 stream->OnStreamHeadersPriority(priority);
199 void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id,
200 bool fin,
201 size_t frame_len) {
202 QuicDataStream* stream = GetDataStream(stream_id);
203 if (!stream) {
204 // It's quite possible to receive headers after a stream has been reset.
205 return;
207 stream->OnStreamHeadersComplete(fin, frame_len);
210 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
211 if (frame.stream_id == kCryptoStreamId) {
212 connection()->SendConnectionCloseWithDetails(
213 QUIC_INVALID_STREAM_ID,
214 "Attempt to reset the crypto stream");
215 return;
217 if (frame.stream_id == kHeadersStreamId &&
218 connection()->version() > QUIC_VERSION_12) {
219 connection()->SendConnectionCloseWithDetails(
220 QUIC_INVALID_STREAM_ID,
221 "Attempt to reset the headers stream");
222 return;
224 QuicDataStream* stream = GetDataStream(frame.stream_id);
225 if (!stream) {
226 return; // Errors are handled by GetStream.
228 if (ContainsKey(zombie_streams_, stream->id())) {
229 // If this was a zombie stream then we close it out now.
230 CloseZombieStream(stream->id());
231 // However, since the headers still have not been decompressed, we want to
232 // mark it a prematurely closed so that if we ever receive frames
233 // for this stream we can close the connection.
234 DCHECK(!stream->headers_decompressed());
235 AddPrematurelyClosedStream(frame.stream_id);
236 return;
238 if (connection()->version() <= QUIC_VERSION_12) {
239 if (stream->stream_bytes_read() > 0 && !stream->headers_decompressed()) {
240 connection()->SendConnectionClose(
241 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
244 stream->OnStreamReset(frame.error_code);
247 void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
248 DCHECK(frame.last_good_stream_id < next_stream_id_);
249 goaway_received_ = true;
252 void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) {
253 DCHECK(!connection_->connected());
254 if (error_ == QUIC_NO_ERROR) {
255 error_ = error;
258 while (!stream_map_.empty()) {
259 DataStreamMap::iterator it = stream_map_.begin();
260 QuicStreamId id = it->first;
261 it->second->OnConnectionClosed(error, from_peer);
262 // The stream should call CloseStream as part of OnConnectionClosed.
263 if (stream_map_.find(id) != stream_map_.end()) {
264 LOG(DFATAL) << ENDPOINT
265 << "Stream failed to close under OnConnectionClosed";
266 CloseStream(id);
271 bool QuicSession::OnCanWrite() {
272 // We latch this here rather than doing a traditional loop, because streams
273 // may be modifying the list as we loop.
274 int remaining_writes = write_blocked_streams_.NumBlockedStreams();
276 while (remaining_writes > 0 && connection_->CanWriteStreamData()) {
277 DCHECK(write_blocked_streams_.HasWriteBlockedStreams());
278 if (!write_blocked_streams_.HasWriteBlockedStreams()) {
279 LOG(DFATAL) << "WriteBlockedStream is missing";
280 connection_->CloseConnection(QUIC_INTERNAL_ERROR, false);
281 return true; // We have no write blocked streams.
283 QuicStreamId stream_id = write_blocked_streams_.PopFront();
284 if (stream_id == kCryptoStreamId) {
285 has_pending_handshake_ = false; // We just popped it.
287 ReliableQuicStream* stream = GetStream(stream_id);
288 if (stream != NULL) {
289 // If the stream can't write all bytes, it'll re-add itself to the blocked
290 // list.
291 stream->OnCanWrite();
293 --remaining_writes;
296 return !write_blocked_streams_.HasWriteBlockedStreams();
299 bool QuicSession::HasPendingHandshake() const {
300 return has_pending_handshake_;
303 QuicConsumedData QuicSession::WritevData(
304 QuicStreamId id,
305 const struct iovec* iov,
306 int iov_count,
307 QuicStreamOffset offset,
308 bool fin,
309 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
310 IOVector data;
311 data.AppendIovec(iov, iov_count);
312 return connection_->SendStreamData(id, data, offset, fin,
313 ack_notifier_delegate);
316 size_t QuicSession::WriteHeaders(QuicStreamId id,
317 const SpdyHeaderBlock& headers,
318 bool fin) {
319 DCHECK_LT(QUIC_VERSION_12, connection()->version());
320 if (connection()->version() <= QUIC_VERSION_12) {
321 return 0;
323 return headers_stream_->WriteHeaders(id, headers, fin);
326 void QuicSession::SendRstStream(QuicStreamId id,
327 QuicRstStreamErrorCode error) {
328 connection_->SendRstStream(id, error);
329 CloseStreamInner(id, true);
332 void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) {
333 goaway_sent_ = true;
334 connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason);
337 void QuicSession::CloseStream(QuicStreamId stream_id) {
338 CloseStreamInner(stream_id, false);
341 void QuicSession::CloseStreamInner(QuicStreamId stream_id,
342 bool locally_reset) {
343 DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
345 DataStreamMap::iterator it = stream_map_.find(stream_id);
346 if (it == stream_map_.end()) {
347 DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
348 return;
350 QuicDataStream* stream = it->second;
351 if (connection_->version() <= QUIC_VERSION_12 &&
352 connection_->connected() && !stream->headers_decompressed()) {
353 // If the stream is being closed locally (for example a client cancelling
354 // a request before receiving the response) then we need to make sure that
355 // we keep the stream alive long enough to process any response or
356 // RST_STREAM frames.
357 if (locally_reset && !is_server()) {
358 AddZombieStream(stream_id);
359 return;
362 // This stream has been closed before the headers were decompressed.
363 // This might cause problems with head of line blocking of headers.
364 // If the peer sent headers which were lost but we now close the stream
365 // we will never be able to decompress headers for other streams.
366 // To deal with this, we keep track of streams which have been closed
367 // prematurely. If we ever receive data frames for this steam, then we
368 // know there actually has been a problem and we close the connection.
369 AddPrematurelyClosedStream(stream->id());
371 closed_streams_.push_back(it->second);
372 if (ContainsKey(zombie_streams_, stream->id())) {
373 zombie_streams_.erase(stream->id());
375 stream_map_.erase(it);
376 stream->OnClose();
379 void QuicSession::AddZombieStream(QuicStreamId stream_id) {
380 if (zombie_streams_.size() == kMaxZombieStreams) {
381 QuicStreamId oldest_zombie_stream_id = zombie_streams_.begin()->first;
382 CloseZombieStream(oldest_zombie_stream_id);
383 // However, since the headers still have not been decompressed, we want to
384 // mark it a prematurely closed so that if we ever receive frames
385 // for this stream we can close the connection.
386 AddPrematurelyClosedStream(oldest_zombie_stream_id);
388 zombie_streams_.insert(make_pair(stream_id, true));
391 void QuicSession::CloseZombieStream(QuicStreamId stream_id) {
392 DCHECK(ContainsKey(zombie_streams_, stream_id));
393 zombie_streams_.erase(stream_id);
394 QuicDataStream* stream = GetDataStream(stream_id);
395 if (!stream) {
396 return;
398 stream_map_.erase(stream_id);
399 stream->OnClose();
400 closed_streams_.push_back(stream);
403 void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id) {
404 if (connection()->version() > QUIC_VERSION_12) {
405 return;
407 if (prematurely_closed_streams_.size() ==
408 kMaxPrematurelyClosedStreamsTracked) {
409 prematurely_closed_streams_.erase(prematurely_closed_streams_.begin());
411 prematurely_closed_streams_.insert(make_pair(stream_id, true));
414 bool QuicSession::IsEncryptionEstablished() {
415 return GetCryptoStream()->encryption_established();
418 bool QuicSession::IsCryptoHandshakeConfirmed() {
419 return GetCryptoStream()->handshake_confirmed();
422 void QuicSession::OnConfigNegotiated() {
423 connection_->SetFromConfig(config_);
426 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
427 switch (event) {
428 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
429 // to QuicSession since it is the glue.
430 case ENCRYPTION_FIRST_ESTABLISHED:
431 break;
433 case ENCRYPTION_REESTABLISHED:
434 // Retransmit originally packets that were sent, since they can't be
435 // decrypted by the peer.
436 connection_->RetransmitUnackedPackets(INITIAL_ENCRYPTION_ONLY);
437 break;
439 case HANDSHAKE_CONFIRMED:
440 LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT
441 << "Handshake confirmed without parameter negotiation.";
442 connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
443 max_open_streams_ = config_.max_streams_per_connection();
444 break;
446 default:
447 LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
451 void QuicSession::OnCryptoHandshakeMessageSent(
452 const CryptoHandshakeMessage& message) {
455 void QuicSession::OnCryptoHandshakeMessageReceived(
456 const CryptoHandshakeMessage& message) {
459 QuicConfig* QuicSession::config() {
460 return &config_;
463 void QuicSession::ActivateStream(QuicDataStream* stream) {
464 DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
465 << ". activating " << stream->id();
466 DCHECK_EQ(stream_map_.count(stream->id()), 0u);
467 stream_map_[stream->id()] = stream;
470 QuicStreamId QuicSession::GetNextStreamId() {
471 QuicStreamId id = next_stream_id_;
472 next_stream_id_ += 2;
473 return id;
476 ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) {
477 if (stream_id == kCryptoStreamId) {
478 return GetCryptoStream();
480 if (stream_id == kHeadersStreamId &&
481 connection_->version() > QUIC_VERSION_12) {
482 return headers_stream_.get();
484 return GetDataStream(stream_id);
487 QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) {
488 if (stream_id == kCryptoStreamId) {
489 DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id";
490 return NULL;
492 if (stream_id == kHeadersStreamId &&
493 connection_->version() > QUIC_VERSION_12) {
494 DLOG(FATAL) << "Attempt to call GetDataStream with the headers stream id";
495 return NULL;
498 DataStreamMap::iterator it = stream_map_.find(stream_id);
499 if (it != stream_map_.end()) {
500 return it->second;
503 if (IsClosedStream(stream_id)) {
504 return NULL;
507 if (stream_id % 2 == next_stream_id_ % 2) {
508 // We've received a frame for a locally-created stream that is not
509 // currently active. This is an error.
510 if (connection()->connected()) {
511 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
513 return NULL;
516 return GetIncomingDataStream(stream_id);
519 QuicDataStream* QuicSession::GetIncomingDataStream(QuicStreamId stream_id) {
520 if (IsClosedStream(stream_id)) {
521 return NULL;
524 if (goaway_sent_) {
525 // We've already sent a GoAway
526 SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY);
527 return NULL;
530 implicitly_created_streams_.erase(stream_id);
531 if (stream_id > largest_peer_created_stream_id_) {
532 // TODO(rch) add unit test for this
533 if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) {
534 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID);
535 return NULL;
537 if (largest_peer_created_stream_id_ == 0) {
538 if (is_server() && connection()->version() > QUIC_VERSION_12) {
539 largest_peer_created_stream_id_= 3;
540 } else {
541 largest_peer_created_stream_id_= 1;
544 for (QuicStreamId id = largest_peer_created_stream_id_ + 2;
545 id < stream_id;
546 id += 2) {
547 implicitly_created_streams_.insert(id);
549 largest_peer_created_stream_id_ = stream_id;
551 QuicDataStream* stream = CreateIncomingDataStream(stream_id);
552 if (stream == NULL) {
553 return NULL;
555 ActivateStream(stream);
556 return stream;
559 bool QuicSession::IsClosedStream(QuicStreamId id) {
560 DCHECK_NE(0u, id);
561 if (id == kCryptoStreamId) {
562 return false;
564 if (connection()->version() > QUIC_VERSION_12) {
565 if (id == kHeadersStreamId) {
566 return false;
569 if (ContainsKey(zombie_streams_, id)) {
570 return true;
572 if (ContainsKey(stream_map_, id)) {
573 // Stream is active
574 return false;
576 if (id % 2 == next_stream_id_ % 2) {
577 // Locally created streams are strictly in-order. If the id is in the
578 // range of created streams and it's not active, it must have been closed.
579 return id < next_stream_id_;
581 // For peer created streams, we also need to consider implicitly created
582 // streams.
583 return id <= largest_peer_created_stream_id_ &&
584 implicitly_created_streams_.count(id) == 0;
587 size_t QuicSession::GetNumOpenStreams() const {
588 return stream_map_.size() + implicitly_created_streams_.size() -
589 zombie_streams_.size();
592 void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) {
593 #ifndef NDEBUG
594 ReliableQuicStream* stream = GetStream(id);
595 if (stream != NULL) {
596 LOG_IF(DFATAL, priority != stream->EffectivePriority())
597 << "Priorities do not match. Got: " << priority
598 << " Expected: " << stream->EffectivePriority();
599 } else {
600 LOG(DFATAL) << "Marking unknown stream " << id << " blocked.";
602 #endif
604 if (id == kCryptoStreamId) {
605 DCHECK(!has_pending_handshake_);
606 has_pending_handshake_ = true;
607 // TODO(jar): Be sure to use the highest priority for the crypto stream,
608 // perhaps by adding a "special" priority for it that is higher than
609 // kHighestPriority.
610 priority = kHighestPriority;
612 write_blocked_streams_.PushBack(id, priority, connection()->version());
615 bool QuicSession::HasDataToWrite() const {
616 return write_blocked_streams_.HasWriteBlockedStreams() ||
617 connection_->HasQueuedData();
620 void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id,
621 QuicStreamId stream_id) {
622 DCHECK_GE(QUIC_VERSION_12, connection()->version());
623 decompression_blocked_streams_[header_id] = stream_id;
626 bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) {
627 NOTIMPLEMENTED();
628 return false;
631 void QuicSession::PostProcessAfterData() {
632 STLDeleteElements(&closed_streams_);
633 closed_streams_.clear();
636 } // namespace net