1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_session.h"
7 #include "base/stl_util.h"
8 #include "net/quic/crypto/proof_verifier.h"
9 #include "net/quic/quic_connection.h"
10 #include "net/quic/quic_headers_stream.h"
11 #include "net/ssl/ssl_info.h"
13 using base::StringPiece
;
21 const size_t kMaxPrematurelyClosedStreamsTracked
= 20;
22 const size_t kMaxZombieStreams
= 20;
24 #define ENDPOINT (is_server() ? "Server: " : " Client: ")
26 // We want to make sure we delete any closed streams in a safe manner.
27 // To avoid deleting a stream in mid-operation, we have a simple shim between
28 // us and the stream, so we can delete any streams when we return from
31 // We could just override the base methods, but this makes it easier to make
32 // sure we don't miss any.
33 class VisitorShim
: public QuicConnectionVisitorInterface
{
35 explicit VisitorShim(QuicSession
* session
) : session_(session
) {}
37 virtual bool OnStreamFrames(const vector
<QuicStreamFrame
>& frames
) OVERRIDE
{
38 bool accepted
= session_
->OnStreamFrames(frames
);
39 session_
->PostProcessAfterData();
42 virtual void OnRstStream(const QuicRstStreamFrame
& frame
) OVERRIDE
{
43 session_
->OnRstStream(frame
);
44 session_
->PostProcessAfterData();
47 virtual void OnGoAway(const QuicGoAwayFrame
& frame
) OVERRIDE
{
48 session_
->OnGoAway(frame
);
49 session_
->PostProcessAfterData();
52 virtual bool OnCanWrite() OVERRIDE
{
53 bool rc
= session_
->OnCanWrite();
54 session_
->PostProcessAfterData();
58 virtual void OnSuccessfulVersionNegotiation(
59 const QuicVersion
& version
) OVERRIDE
{
60 session_
->OnSuccessfulVersionNegotiation(version
);
63 virtual void OnConfigNegotiated() OVERRIDE
{
64 session_
->OnConfigNegotiated();
67 virtual void OnConnectionClosed(
68 QuicErrorCode error
, bool from_peer
) OVERRIDE
{
69 session_
->OnConnectionClosed(error
, from_peer
);
70 // The session will go away, so don't bother with cleanup.
73 virtual void OnWriteBlocked() OVERRIDE
{
74 session_
->OnWriteBlocked();
77 virtual bool HasPendingHandshake() const OVERRIDE
{
78 return session_
->HasPendingHandshake();
82 QuicSession
* session_
;
85 QuicSession::QuicSession(QuicConnection
* connection
,
86 const QuicConfig
& config
)
87 : connection_(connection
),
88 visitor_shim_(new VisitorShim(this)),
90 max_open_streams_(config_
.max_streams_per_connection()),
91 next_stream_id_(is_server() ? 2 : 3),
92 largest_peer_created_stream_id_(0),
93 error_(QUIC_NO_ERROR
),
94 goaway_received_(false),
96 has_pending_handshake_(false) {
98 connection_
->set_visitor(visitor_shim_
.get());
99 connection_
->SetFromConfig(config_
);
100 if (connection_
->connected()) {
101 connection_
->SetOverallConnectionTimeout(
102 config_
.max_time_before_crypto_handshake());
104 if (connection_
->version() > QUIC_VERSION_12
) {
105 headers_stream_
.reset(new QuicHeadersStream(this));
107 // For version above QUIC v12, the headers stream is stream 3, so the
108 // next available local stream ID should be 5.
109 DCHECK_EQ(kHeadersStreamId
, next_stream_id_
);
110 next_stream_id_
+= 2;
115 QuicSession::~QuicSession() {
116 STLDeleteElements(&closed_streams_
);
117 STLDeleteValues(&stream_map_
);
120 bool QuicSession::OnStreamFrames(const vector
<QuicStreamFrame
>& frames
) {
121 for (size_t i
= 0; i
< frames
.size(); ++i
) {
122 // TODO(rch) deal with the error case of stream id 0
123 if (IsClosedStream(frames
[i
].stream_id
)) {
124 // If we get additional frames for a stream where we didn't process
125 // headers, it's highly likely our compression context will end up
126 // permanently out of sync with the peer's, so we give up and close the
128 if (ContainsKey(prematurely_closed_streams_
, frames
[i
].stream_id
)) {
129 connection()->SendConnectionClose(
130 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED
);
136 ReliableQuicStream
* stream
= GetStream(frames
[i
].stream_id
);
137 if (stream
== NULL
) return false;
138 if (!stream
->WillAcceptStreamFrame(frames
[i
])) return false;
140 // TODO(alyssar) check against existing connection address: if changed, make
141 // sure we update the connection.
144 for (size_t i
= 0; i
< frames
.size(); ++i
) {
145 QuicStreamId stream_id
= frames
[i
].stream_id
;
146 ReliableQuicStream
* stream
= GetStream(stream_id
);
150 stream
->OnStreamFrame(frames
[i
]);
152 // If the stream is a data stream had been prematurely closed, and the
153 // headers are now decompressed, then we are finally finished
155 if (ContainsKey(zombie_streams_
, stream_id
) &&
156 static_cast<QuicDataStream
*>(stream
)->headers_decompressed()) {
157 CloseZombieStream(stream_id
);
161 while (!decompression_blocked_streams_
.empty()) {
162 QuicHeaderId header_id
= decompression_blocked_streams_
.begin()->first
;
163 if (header_id
!= decompressor_
.current_header_id()) {
166 QuicStreamId stream_id
= decompression_blocked_streams_
.begin()->second
;
167 decompression_blocked_streams_
.erase(header_id
);
168 QuicDataStream
* stream
= GetDataStream(stream_id
);
170 connection()->SendConnectionClose(
171 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED
);
174 stream
->OnDecompressorAvailable();
179 void QuicSession::OnStreamHeaders(QuicStreamId stream_id
,
180 StringPiece headers_data
) {
181 QuicDataStream
* stream
= GetDataStream(stream_id
);
183 // It's quite possible to receive headers after a stream has been reset.
186 stream
->OnStreamHeaders(headers_data
);
189 void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id
,
190 QuicPriority priority
) {
191 QuicDataStream
* stream
= GetDataStream(stream_id
);
193 // It's quite possible to receive headers after a stream has been reset.
196 stream
->OnStreamHeadersPriority(priority
);
199 void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id
,
202 QuicDataStream
* stream
= GetDataStream(stream_id
);
204 // It's quite possible to receive headers after a stream has been reset.
207 stream
->OnStreamHeadersComplete(fin
, frame_len
);
210 void QuicSession::OnRstStream(const QuicRstStreamFrame
& frame
) {
211 if (frame
.stream_id
== kCryptoStreamId
) {
212 connection()->SendConnectionCloseWithDetails(
213 QUIC_INVALID_STREAM_ID
,
214 "Attempt to reset the crypto stream");
217 if (frame
.stream_id
== kHeadersStreamId
&&
218 connection()->version() > QUIC_VERSION_12
) {
219 connection()->SendConnectionCloseWithDetails(
220 QUIC_INVALID_STREAM_ID
,
221 "Attempt to reset the headers stream");
224 QuicDataStream
* stream
= GetDataStream(frame
.stream_id
);
226 return; // Errors are handled by GetStream.
228 if (ContainsKey(zombie_streams_
, stream
->id())) {
229 // If this was a zombie stream then we close it out now.
230 CloseZombieStream(stream
->id());
231 // However, since the headers still have not been decompressed, we want to
232 // mark it a prematurely closed so that if we ever receive frames
233 // for this stream we can close the connection.
234 DCHECK(!stream
->headers_decompressed());
235 AddPrematurelyClosedStream(frame
.stream_id
);
238 if (connection()->version() <= QUIC_VERSION_12
) {
239 if (stream
->stream_bytes_read() > 0 && !stream
->headers_decompressed()) {
240 connection()->SendConnectionClose(
241 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED
);
244 stream
->OnStreamReset(frame
.error_code
);
247 void QuicSession::OnGoAway(const QuicGoAwayFrame
& frame
) {
248 DCHECK(frame
.last_good_stream_id
< next_stream_id_
);
249 goaway_received_
= true;
252 void QuicSession::OnConnectionClosed(QuicErrorCode error
, bool from_peer
) {
253 DCHECK(!connection_
->connected());
254 if (error_
== QUIC_NO_ERROR
) {
258 while (!stream_map_
.empty()) {
259 DataStreamMap::iterator it
= stream_map_
.begin();
260 QuicStreamId id
= it
->first
;
261 it
->second
->OnConnectionClosed(error
, from_peer
);
262 // The stream should call CloseStream as part of OnConnectionClosed.
263 if (stream_map_
.find(id
) != stream_map_
.end()) {
264 LOG(DFATAL
) << ENDPOINT
265 << "Stream failed to close under OnConnectionClosed";
271 bool QuicSession::OnCanWrite() {
272 // We latch this here rather than doing a traditional loop, because streams
273 // may be modifying the list as we loop.
274 int remaining_writes
= write_blocked_streams_
.NumBlockedStreams();
276 while (remaining_writes
> 0 && connection_
->CanWriteStreamData()) {
277 DCHECK(write_blocked_streams_
.HasWriteBlockedStreams());
278 if (!write_blocked_streams_
.HasWriteBlockedStreams()) {
279 LOG(DFATAL
) << "WriteBlockedStream is missing";
280 connection_
->CloseConnection(QUIC_INTERNAL_ERROR
, false);
281 return true; // We have no write blocked streams.
283 QuicStreamId stream_id
= write_blocked_streams_
.PopFront();
284 if (stream_id
== kCryptoStreamId
) {
285 has_pending_handshake_
= false; // We just popped it.
287 ReliableQuicStream
* stream
= GetStream(stream_id
);
288 if (stream
!= NULL
) {
289 // If the stream can't write all bytes, it'll re-add itself to the blocked
291 stream
->OnCanWrite();
296 return !write_blocked_streams_
.HasWriteBlockedStreams();
299 bool QuicSession::HasPendingHandshake() const {
300 return has_pending_handshake_
;
303 QuicConsumedData
QuicSession::WritevData(
305 const struct iovec
* iov
,
307 QuicStreamOffset offset
,
309 QuicAckNotifier::DelegateInterface
* ack_notifier_delegate
) {
311 data
.AppendIovec(iov
, iov_count
);
312 return connection_
->SendStreamData(id
, data
, offset
, fin
,
313 ack_notifier_delegate
);
316 size_t QuicSession::WriteHeaders(QuicStreamId id
,
317 const SpdyHeaderBlock
& headers
,
319 DCHECK_LT(QUIC_VERSION_12
, connection()->version());
320 if (connection()->version() <= QUIC_VERSION_12
) {
323 return headers_stream_
->WriteHeaders(id
, headers
, fin
);
326 void QuicSession::SendRstStream(QuicStreamId id
,
327 QuicRstStreamErrorCode error
) {
328 connection_
->SendRstStream(id
, error
);
329 CloseStreamInner(id
, true);
332 void QuicSession::SendGoAway(QuicErrorCode error_code
, const string
& reason
) {
334 connection_
->SendGoAway(error_code
, largest_peer_created_stream_id_
, reason
);
337 void QuicSession::CloseStream(QuicStreamId stream_id
) {
338 CloseStreamInner(stream_id
, false);
341 void QuicSession::CloseStreamInner(QuicStreamId stream_id
,
342 bool locally_reset
) {
343 DVLOG(1) << ENDPOINT
<< "Closing stream " << stream_id
;
345 DataStreamMap::iterator it
= stream_map_
.find(stream_id
);
346 if (it
== stream_map_
.end()) {
347 DVLOG(1) << ENDPOINT
<< "Stream is already closed: " << stream_id
;
350 QuicDataStream
* stream
= it
->second
;
351 if (connection_
->version() <= QUIC_VERSION_12
&&
352 connection_
->connected() && !stream
->headers_decompressed()) {
353 // If the stream is being closed locally (for example a client cancelling
354 // a request before receiving the response) then we need to make sure that
355 // we keep the stream alive long enough to process any response or
356 // RST_STREAM frames.
357 if (locally_reset
&& !is_server()) {
358 AddZombieStream(stream_id
);
362 // This stream has been closed before the headers were decompressed.
363 // This might cause problems with head of line blocking of headers.
364 // If the peer sent headers which were lost but we now close the stream
365 // we will never be able to decompress headers for other streams.
366 // To deal with this, we keep track of streams which have been closed
367 // prematurely. If we ever receive data frames for this steam, then we
368 // know there actually has been a problem and we close the connection.
369 AddPrematurelyClosedStream(stream
->id());
371 closed_streams_
.push_back(it
->second
);
372 if (ContainsKey(zombie_streams_
, stream
->id())) {
373 zombie_streams_
.erase(stream
->id());
375 stream_map_
.erase(it
);
379 void QuicSession::AddZombieStream(QuicStreamId stream_id
) {
380 if (zombie_streams_
.size() == kMaxZombieStreams
) {
381 QuicStreamId oldest_zombie_stream_id
= zombie_streams_
.begin()->first
;
382 CloseZombieStream(oldest_zombie_stream_id
);
383 // However, since the headers still have not been decompressed, we want to
384 // mark it a prematurely closed so that if we ever receive frames
385 // for this stream we can close the connection.
386 AddPrematurelyClosedStream(oldest_zombie_stream_id
);
388 zombie_streams_
.insert(make_pair(stream_id
, true));
391 void QuicSession::CloseZombieStream(QuicStreamId stream_id
) {
392 DCHECK(ContainsKey(zombie_streams_
, stream_id
));
393 zombie_streams_
.erase(stream_id
);
394 QuicDataStream
* stream
= GetDataStream(stream_id
);
398 stream_map_
.erase(stream_id
);
400 closed_streams_
.push_back(stream
);
403 void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id
) {
404 if (connection()->version() > QUIC_VERSION_12
) {
407 if (prematurely_closed_streams_
.size() ==
408 kMaxPrematurelyClosedStreamsTracked
) {
409 prematurely_closed_streams_
.erase(prematurely_closed_streams_
.begin());
411 prematurely_closed_streams_
.insert(make_pair(stream_id
, true));
414 bool QuicSession::IsEncryptionEstablished() {
415 return GetCryptoStream()->encryption_established();
418 bool QuicSession::IsCryptoHandshakeConfirmed() {
419 return GetCryptoStream()->handshake_confirmed();
422 void QuicSession::OnConfigNegotiated() {
423 connection_
->SetFromConfig(config_
);
426 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event
) {
428 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
429 // to QuicSession since it is the glue.
430 case ENCRYPTION_FIRST_ESTABLISHED
:
433 case ENCRYPTION_REESTABLISHED
:
434 // Retransmit originally packets that were sent, since they can't be
435 // decrypted by the peer.
436 connection_
->RetransmitUnackedPackets(INITIAL_ENCRYPTION_ONLY
);
439 case HANDSHAKE_CONFIRMED
:
440 LOG_IF(DFATAL
, !config_
.negotiated()) << ENDPOINT
441 << "Handshake confirmed without parameter negotiation.";
442 connection_
->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
443 max_open_streams_
= config_
.max_streams_per_connection();
447 LOG(ERROR
) << ENDPOINT
<< "Got unknown handshake event: " << event
;
451 void QuicSession::OnCryptoHandshakeMessageSent(
452 const CryptoHandshakeMessage
& message
) {
455 void QuicSession::OnCryptoHandshakeMessageReceived(
456 const CryptoHandshakeMessage
& message
) {
459 QuicConfig
* QuicSession::config() {
463 void QuicSession::ActivateStream(QuicDataStream
* stream
) {
464 DVLOG(1) << ENDPOINT
<< "num_streams: " << stream_map_
.size()
465 << ". activating " << stream
->id();
466 DCHECK_EQ(stream_map_
.count(stream
->id()), 0u);
467 stream_map_
[stream
->id()] = stream
;
470 QuicStreamId
QuicSession::GetNextStreamId() {
471 QuicStreamId id
= next_stream_id_
;
472 next_stream_id_
+= 2;
476 ReliableQuicStream
* QuicSession::GetStream(const QuicStreamId stream_id
) {
477 if (stream_id
== kCryptoStreamId
) {
478 return GetCryptoStream();
480 if (stream_id
== kHeadersStreamId
&&
481 connection_
->version() > QUIC_VERSION_12
) {
482 return headers_stream_
.get();
484 return GetDataStream(stream_id
);
487 QuicDataStream
* QuicSession::GetDataStream(const QuicStreamId stream_id
) {
488 if (stream_id
== kCryptoStreamId
) {
489 DLOG(FATAL
) << "Attempt to call GetDataStream with the crypto stream id";
492 if (stream_id
== kHeadersStreamId
&&
493 connection_
->version() > QUIC_VERSION_12
) {
494 DLOG(FATAL
) << "Attempt to call GetDataStream with the headers stream id";
498 DataStreamMap::iterator it
= stream_map_
.find(stream_id
);
499 if (it
!= stream_map_
.end()) {
503 if (IsClosedStream(stream_id
)) {
507 if (stream_id
% 2 == next_stream_id_
% 2) {
508 // We've received a frame for a locally-created stream that is not
509 // currently active. This is an error.
510 if (connection()->connected()) {
511 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM
);
516 return GetIncomingDataStream(stream_id
);
519 QuicDataStream
* QuicSession::GetIncomingDataStream(QuicStreamId stream_id
) {
520 if (IsClosedStream(stream_id
)) {
525 // We've already sent a GoAway
526 SendRstStream(stream_id
, QUIC_STREAM_PEER_GOING_AWAY
);
530 implicitly_created_streams_
.erase(stream_id
);
531 if (stream_id
> largest_peer_created_stream_id_
) {
532 // TODO(rch) add unit test for this
533 if (stream_id
- largest_peer_created_stream_id_
> kMaxStreamIdDelta
) {
534 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID
);
537 if (largest_peer_created_stream_id_
== 0) {
538 if (is_server() && connection()->version() > QUIC_VERSION_12
) {
539 largest_peer_created_stream_id_
= 3;
541 largest_peer_created_stream_id_
= 1;
544 for (QuicStreamId id
= largest_peer_created_stream_id_
+ 2;
547 implicitly_created_streams_
.insert(id
);
549 largest_peer_created_stream_id_
= stream_id
;
551 QuicDataStream
* stream
= CreateIncomingDataStream(stream_id
);
552 if (stream
== NULL
) {
555 ActivateStream(stream
);
559 bool QuicSession::IsClosedStream(QuicStreamId id
) {
561 if (id
== kCryptoStreamId
) {
564 if (connection()->version() > QUIC_VERSION_12
) {
565 if (id
== kHeadersStreamId
) {
569 if (ContainsKey(zombie_streams_
, id
)) {
572 if (ContainsKey(stream_map_
, id
)) {
576 if (id
% 2 == next_stream_id_
% 2) {
577 // Locally created streams are strictly in-order. If the id is in the
578 // range of created streams and it's not active, it must have been closed.
579 return id
< next_stream_id_
;
581 // For peer created streams, we also need to consider implicitly created
583 return id
<= largest_peer_created_stream_id_
&&
584 implicitly_created_streams_
.count(id
) == 0;
587 size_t QuicSession::GetNumOpenStreams() const {
588 return stream_map_
.size() + implicitly_created_streams_
.size() -
589 zombie_streams_
.size();
592 void QuicSession::MarkWriteBlocked(QuicStreamId id
, QuicPriority priority
) {
594 ReliableQuicStream
* stream
= GetStream(id
);
595 if (stream
!= NULL
) {
596 LOG_IF(DFATAL
, priority
!= stream
->EffectivePriority())
597 << "Priorities do not match. Got: " << priority
598 << " Expected: " << stream
->EffectivePriority();
600 LOG(DFATAL
) << "Marking unknown stream " << id
<< " blocked.";
604 if (id
== kCryptoStreamId
) {
605 DCHECK(!has_pending_handshake_
);
606 has_pending_handshake_
= true;
607 // TODO(jar): Be sure to use the highest priority for the crypto stream,
608 // perhaps by adding a "special" priority for it that is higher than
610 priority
= kHighestPriority
;
612 write_blocked_streams_
.PushBack(id
, priority
, connection()->version());
615 bool QuicSession::HasDataToWrite() const {
616 return write_blocked_streams_
.HasWriteBlockedStreams() ||
617 connection_
->HasQueuedData();
620 void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id
,
621 QuicStreamId stream_id
) {
622 DCHECK_GE(QUIC_VERSION_12
, connection()->version());
623 decompression_blocked_streams_
[header_id
] = stream_id
;
626 bool QuicSession::GetSSLInfo(SSLInfo
* ssl_info
) {
631 void QuicSession::PostProcessAfterData() {
632 STLDeleteElements(&closed_streams_
);
633 closed_streams_
.clear();