1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_session.h"
7 #include "base/stl_util.h"
8 #include "net/quic/crypto/proof_verifier.h"
9 #include "net/quic/quic_connection.h"
10 #include "net/quic/quic_flags.h"
11 #include "net/quic/quic_headers_stream.h"
12 #include "net/ssl/ssl_info.h"
14 using base::StringPiece
;
22 #define ENDPOINT (is_server() ? "Server: " : " Client: ")
24 // We want to make sure we delete any closed streams in a safe manner.
25 // To avoid deleting a stream in mid-operation, we have a simple shim between
26 // us and the stream, so we can delete any streams when we return from
29 // We could just override the base methods, but this makes it easier to make
30 // sure we don't miss any.
31 class VisitorShim
: public QuicConnectionVisitorInterface
{
33 explicit VisitorShim(QuicSession
* session
) : session_(session
) {}
35 virtual void OnStreamFrames(const vector
<QuicStreamFrame
>& frames
) OVERRIDE
{
36 session_
->OnStreamFrames(frames
);
37 session_
->PostProcessAfterData();
39 virtual void OnRstStream(const QuicRstStreamFrame
& frame
) OVERRIDE
{
40 session_
->OnRstStream(frame
);
41 session_
->PostProcessAfterData();
44 virtual void OnGoAway(const QuicGoAwayFrame
& frame
) OVERRIDE
{
45 session_
->OnGoAway(frame
);
46 session_
->PostProcessAfterData();
49 virtual void OnWindowUpdateFrames(const vector
<QuicWindowUpdateFrame
>& frames
)
51 session_
->OnWindowUpdateFrames(frames
);
52 session_
->PostProcessAfterData();
55 virtual void OnBlockedFrames(const vector
<QuicBlockedFrame
>& frames
)
57 session_
->OnBlockedFrames(frames
);
58 session_
->PostProcessAfterData();
61 virtual void OnCanWrite() OVERRIDE
{
62 session_
->OnCanWrite();
63 session_
->PostProcessAfterData();
66 virtual void OnSuccessfulVersionNegotiation(
67 const QuicVersion
& version
) OVERRIDE
{
68 session_
->OnSuccessfulVersionNegotiation(version
);
71 virtual void OnConnectionClosed(
72 QuicErrorCode error
, bool from_peer
) OVERRIDE
{
73 session_
->OnConnectionClosed(error
, from_peer
);
74 // The session will go away, so don't bother with cleanup.
77 virtual void OnWriteBlocked() OVERRIDE
{
78 session_
->OnWriteBlocked();
81 virtual bool HasPendingWrites() const OVERRIDE
{
82 return session_
->HasPendingWrites();
85 virtual bool HasPendingHandshake() const OVERRIDE
{
86 return session_
->HasPendingHandshake();
89 virtual bool HasOpenDataStreams() const OVERRIDE
{
90 return session_
->HasOpenDataStreams();
94 QuicSession
* session_
;
97 QuicSession::QuicSession(QuicConnection
* connection
,
98 const QuicConfig
& config
)
99 : connection_(connection
),
100 visitor_shim_(new VisitorShim(this)),
102 max_open_streams_(config_
.max_streams_per_connection()),
103 next_stream_id_(is_server() ? 2 : 3),
104 largest_peer_created_stream_id_(0),
105 error_(QUIC_NO_ERROR
),
106 goaway_received_(false),
108 has_pending_handshake_(false) {
110 connection_
->set_visitor(visitor_shim_
.get());
111 connection_
->SetFromConfig(config_
);
112 if (connection_
->connected()) {
113 connection_
->SetOverallConnectionTimeout(
114 config_
.max_time_before_crypto_handshake());
116 headers_stream_
.reset(new QuicHeadersStream(this));
118 // For version above QUIC v12, the headers stream is stream 3, so the
119 // next available local stream ID should be 5.
120 DCHECK_EQ(kHeadersStreamId
, next_stream_id_
);
121 next_stream_id_
+= 2;
125 QuicSession::~QuicSession() {
126 STLDeleteElements(&closed_streams_
);
127 STLDeleteValues(&stream_map_
);
130 void QuicSession::OnStreamFrames(const vector
<QuicStreamFrame
>& frames
) {
131 for (size_t i
= 0; i
< frames
.size(); ++i
) {
132 // TODO(rch) deal with the error case of stream id 0.
133 QuicStreamId stream_id
= frames
[i
].stream_id
;
134 ReliableQuicStream
* stream
= GetStream(stream_id
);
138 stream
->OnStreamFrame(frames
[i
]);
142 void QuicSession::OnStreamHeaders(QuicStreamId stream_id
,
143 StringPiece headers_data
) {
144 QuicDataStream
* stream
= GetDataStream(stream_id
);
146 // It's quite possible to receive headers after a stream has been reset.
149 stream
->OnStreamHeaders(headers_data
);
152 void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id
,
153 QuicPriority priority
) {
154 QuicDataStream
* stream
= GetDataStream(stream_id
);
156 // It's quite possible to receive headers after a stream has been reset.
159 stream
->OnStreamHeadersPriority(priority
);
162 void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id
,
165 QuicDataStream
* stream
= GetDataStream(stream_id
);
167 // It's quite possible to receive headers after a stream has been reset.
170 stream
->OnStreamHeadersComplete(fin
, frame_len
);
173 void QuicSession::OnRstStream(const QuicRstStreamFrame
& frame
) {
174 if (frame
.stream_id
== kCryptoStreamId
) {
175 connection()->SendConnectionCloseWithDetails(
176 QUIC_INVALID_STREAM_ID
,
177 "Attempt to reset the crypto stream");
180 if (frame
.stream_id
== kHeadersStreamId
) {
181 connection()->SendConnectionCloseWithDetails(
182 QUIC_INVALID_STREAM_ID
,
183 "Attempt to reset the headers stream");
186 QuicDataStream
* stream
= GetDataStream(frame
.stream_id
);
188 return; // Errors are handled by GetStream.
191 stream
->OnStreamReset(frame
);
194 void QuicSession::OnGoAway(const QuicGoAwayFrame
& frame
) {
195 DCHECK(frame
.last_good_stream_id
< next_stream_id_
);
196 goaway_received_
= true;
199 void QuicSession::OnConnectionClosed(QuicErrorCode error
, bool from_peer
) {
200 DCHECK(!connection_
->connected());
201 if (error_
== QUIC_NO_ERROR
) {
205 while (!stream_map_
.empty()) {
206 DataStreamMap::iterator it
= stream_map_
.begin();
207 QuicStreamId id
= it
->first
;
208 it
->second
->OnConnectionClosed(error
, from_peer
);
209 // The stream should call CloseStream as part of OnConnectionClosed.
210 if (stream_map_
.find(id
) != stream_map_
.end()) {
211 LOG(DFATAL
) << ENDPOINT
212 << "Stream failed to close under OnConnectionClosed";
218 void QuicSession::OnWindowUpdateFrames(
219 const vector
<QuicWindowUpdateFrame
>& frames
) {
220 bool connection_window_updated
= false;
221 for (size_t i
= 0; i
< frames
.size(); ++i
) {
222 // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
223 // assume that it still exists.
224 QuicStreamId stream_id
= frames
[i
].stream_id
;
225 if (stream_id
== 0) {
226 // This is a window update that applies to the connection, rather than an
227 // individual stream.
229 << "Received connection level flow control window update with "
230 "byte offset: " << frames
[i
].byte_offset
;
231 if (FLAGS_enable_quic_connection_flow_control
&&
232 connection()->flow_controller()->UpdateSendWindowOffset(
233 frames
[i
].byte_offset
)) {
234 connection_window_updated
= true;
239 QuicDataStream
* stream
= GetDataStream(stream_id
);
241 stream
->OnWindowUpdateFrame(frames
[i
]);
245 // Connection level flow control window has increased, so blocked streams can
247 if (connection_window_updated
) {
252 void QuicSession::OnBlockedFrames(const vector
<QuicBlockedFrame
>& frames
) {
253 for (size_t i
= 0; i
< frames
.size(); ++i
) {
254 // TODO(rjshade): Compare our flow control receive windows for specified
255 // streams: if we have a large window then maybe something
256 // had gone wrong with the flow control accounting.
257 DVLOG(1) << ENDPOINT
<< "Received BLOCKED frame with stream id: "
258 << frames
[i
].stream_id
;
262 void QuicSession::OnCanWrite() {
263 // We limit the number of writes to the number of pending streams. If more
264 // streams become pending, HasPendingWrites will be true, which will cause
265 // the connection to request resumption before yielding to other connections.
266 size_t num_writes
= write_blocked_streams_
.NumBlockedStreams();
267 if (num_writes
== 0) {
271 QuicConnection::ScopedPacketBundler
ack_bundler(
272 connection_
.get(), QuicConnection::NO_ACK
);
273 for (size_t i
= 0; i
< num_writes
; ++i
) {
274 if (!write_blocked_streams_
.HasWriteBlockedStreams()) {
275 // Writing one stream removed another!? Something's broken.
276 LOG(DFATAL
) << "WriteBlockedStream is missing";
277 connection_
->CloseConnection(QUIC_INTERNAL_ERROR
, false);
280 if (!connection_
->CanWriteStreamData()) {
283 QuicStreamId stream_id
= write_blocked_streams_
.PopFront();
284 if (stream_id
== kCryptoStreamId
) {
285 has_pending_handshake_
= false; // We just popped it.
287 ReliableQuicStream
* stream
= GetStream(stream_id
);
288 if (stream
!= NULL
&& !stream
->flow_controller()->IsBlocked()) {
289 // If the stream can't write all bytes, it'll re-add itself to the blocked
291 stream
->OnCanWrite();
296 bool QuicSession::HasPendingWrites() const {
297 return write_blocked_streams_
.HasWriteBlockedStreams();
300 bool QuicSession::HasPendingHandshake() const {
301 return has_pending_handshake_
;
304 bool QuicSession::HasOpenDataStreams() const {
305 return GetNumOpenStreams() > 0;
308 QuicConsumedData
QuicSession::WritevData(
310 const IOVector
& data
,
311 QuicStreamOffset offset
,
313 QuicAckNotifier::DelegateInterface
* ack_notifier_delegate
) {
314 return connection_
->SendStreamData(id
, data
, offset
, fin
,
315 ack_notifier_delegate
);
318 size_t QuicSession::WriteHeaders(
320 const SpdyHeaderBlock
& headers
,
322 QuicAckNotifier::DelegateInterface
* ack_notifier_delegate
) {
323 return headers_stream_
->WriteHeaders(id
, headers
, fin
, ack_notifier_delegate
);
326 void QuicSession::SendRstStream(QuicStreamId id
,
327 QuicRstStreamErrorCode error
,
328 QuicStreamOffset bytes_written
) {
329 if (connection()->connected()) {
330 // Only send a RST_STREAM frame if still connected.
331 connection_
->SendRstStream(id
, error
, bytes_written
);
333 CloseStreamInner(id
, true);
336 void QuicSession::SendGoAway(QuicErrorCode error_code
, const string
& reason
) {
341 connection_
->SendGoAway(error_code
, largest_peer_created_stream_id_
, reason
);
344 void QuicSession::CloseStream(QuicStreamId stream_id
) {
345 CloseStreamInner(stream_id
, false);
348 void QuicSession::CloseStreamInner(QuicStreamId stream_id
,
349 bool locally_reset
) {
350 DVLOG(1) << ENDPOINT
<< "Closing stream " << stream_id
;
352 DataStreamMap::iterator it
= stream_map_
.find(stream_id
);
353 if (it
== stream_map_
.end()) {
354 DVLOG(1) << ENDPOINT
<< "Stream is already closed: " << stream_id
;
357 QuicDataStream
* stream
= it
->second
;
359 // Tell the stream that a RST has been sent.
361 stream
->set_rst_sent(true);
364 closed_streams_
.push_back(it
->second
);
365 stream_map_
.erase(it
);
369 bool QuicSession::IsEncryptionEstablished() {
370 return GetCryptoStream()->encryption_established();
373 bool QuicSession::IsCryptoHandshakeConfirmed() {
374 return GetCryptoStream()->handshake_confirmed();
377 void QuicSession::OnConfigNegotiated() {
378 connection_
->SetFromConfig(config_
);
379 // Tell all streams about the newly received peer receive window.
380 if (connection()->version() >= QUIC_VERSION_17
&&
381 config_
.HasReceivedInitialFlowControlWindowBytes()) {
382 // Streams which were created before the SHLO was received (0RTT requests)
383 // are now informed of the peer's initial flow control window.
384 uint32 new_flow_control_send_window
=
385 config_
.ReceivedInitialFlowControlWindowBytes();
386 if (new_flow_control_send_window
< kDefaultFlowControlSendWindow
) {
388 << "Peer sent us an invalid flow control send window: "
389 << new_flow_control_send_window
390 << ", below default: " << kDefaultFlowControlSendWindow
;
391 connection_
->SendConnectionClose(QUIC_FLOW_CONTROL_ERROR
);
394 DataStreamMap::iterator it
= stream_map_
.begin();
395 while (it
!= stream_map_
.end()) {
396 it
->second
->flow_controller()->UpdateSendWindowOffset(
397 new_flow_control_send_window
);
401 // Update connection level window.
402 connection()->flow_controller()->UpdateSendWindowOffset(
403 new_flow_control_send_window
);
407 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event
) {
409 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
410 // to QuicSession since it is the glue.
411 case ENCRYPTION_FIRST_ESTABLISHED
:
414 case ENCRYPTION_REESTABLISHED
:
415 // Retransmit originally packets that were sent, since they can't be
416 // decrypted by the peer.
417 connection_
->RetransmitUnackedPackets(INITIAL_ENCRYPTION_ONLY
);
420 case HANDSHAKE_CONFIRMED
:
421 LOG_IF(DFATAL
, !config_
.negotiated()) << ENDPOINT
422 << "Handshake confirmed without parameter negotiation.";
423 // Discard originally encrypted packets, since they can't be decrypted by
425 connection_
->NeuterUnencryptedPackets();
426 connection_
->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
427 max_open_streams_
= config_
.max_streams_per_connection();
431 LOG(ERROR
) << ENDPOINT
<< "Got unknown handshake event: " << event
;
435 void QuicSession::OnCryptoHandshakeMessageSent(
436 const CryptoHandshakeMessage
& message
) {
439 void QuicSession::OnCryptoHandshakeMessageReceived(
440 const CryptoHandshakeMessage
& message
) {
443 QuicConfig
* QuicSession::config() {
447 void QuicSession::ActivateStream(QuicDataStream
* stream
) {
448 DVLOG(1) << ENDPOINT
<< "num_streams: " << stream_map_
.size()
449 << ". activating " << stream
->id();
450 DCHECK_EQ(stream_map_
.count(stream
->id()), 0u);
451 stream_map_
[stream
->id()] = stream
;
454 QuicStreamId
QuicSession::GetNextStreamId() {
455 QuicStreamId id
= next_stream_id_
;
456 next_stream_id_
+= 2;
460 ReliableQuicStream
* QuicSession::GetStream(const QuicStreamId stream_id
) {
461 if (stream_id
== kCryptoStreamId
) {
462 return GetCryptoStream();
464 if (stream_id
== kHeadersStreamId
) {
465 return headers_stream_
.get();
467 return GetDataStream(stream_id
);
470 QuicDataStream
* QuicSession::GetDataStream(const QuicStreamId stream_id
) {
471 if (stream_id
== kCryptoStreamId
) {
472 DLOG(FATAL
) << "Attempt to call GetDataStream with the crypto stream id";
475 if (stream_id
== kHeadersStreamId
) {
476 DLOG(FATAL
) << "Attempt to call GetDataStream with the headers stream id";
480 DataStreamMap::iterator it
= stream_map_
.find(stream_id
);
481 if (it
!= stream_map_
.end()) {
485 if (IsClosedStream(stream_id
)) {
489 if (stream_id
% 2 == next_stream_id_
% 2) {
490 // We've received a frame for a locally-created stream that is not
491 // currently active. This is an error.
492 if (connection()->connected()) {
493 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM
);
498 return GetIncomingDataStream(stream_id
);
501 QuicDataStream
* QuicSession::GetIncomingDataStream(QuicStreamId stream_id
) {
502 if (IsClosedStream(stream_id
)) {
506 implicitly_created_streams_
.erase(stream_id
);
507 if (stream_id
> largest_peer_created_stream_id_
) {
508 if (stream_id
- largest_peer_created_stream_id_
> kMaxStreamIdDelta
) {
509 // We may already have sent a connection close due to multiple reset
510 // streams in the same packet.
511 if (connection()->connected()) {
512 LOG(ERROR
) << "Trying to get stream: " << stream_id
513 << ", largest peer created stream: "
514 << largest_peer_created_stream_id_
515 << ", max delta: " << kMaxStreamIdDelta
;
516 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID
);
520 if (largest_peer_created_stream_id_
== 0) {
522 largest_peer_created_stream_id_
= 3;
524 largest_peer_created_stream_id_
= 1;
527 for (QuicStreamId id
= largest_peer_created_stream_id_
+ 2;
530 implicitly_created_streams_
.insert(id
);
532 largest_peer_created_stream_id_
= stream_id
;
534 QuicDataStream
* stream
= CreateIncomingDataStream(stream_id
);
535 if (stream
== NULL
) {
538 ActivateStream(stream
);
542 bool QuicSession::IsClosedStream(QuicStreamId id
) {
544 if (id
== kCryptoStreamId
) {
547 if (id
== kHeadersStreamId
) {
550 if (ContainsKey(stream_map_
, id
)) {
554 if (id
% 2 == next_stream_id_
% 2) {
555 // Locally created streams are strictly in-order. If the id is in the
556 // range of created streams and it's not active, it must have been closed.
557 return id
< next_stream_id_
;
559 // For peer created streams, we also need to consider implicitly created
561 return id
<= largest_peer_created_stream_id_
&&
562 implicitly_created_streams_
.count(id
) == 0;
565 size_t QuicSession::GetNumOpenStreams() const {
566 return stream_map_
.size() + implicitly_created_streams_
.size();
569 void QuicSession::MarkWriteBlocked(QuicStreamId id
, QuicPriority priority
) {
571 ReliableQuicStream
* stream
= GetStream(id
);
572 if (stream
!= NULL
) {
573 LOG_IF(DFATAL
, priority
!= stream
->EffectivePriority())
574 << ENDPOINT
<< "Stream " << id
575 << "Priorities do not match. Got: " << priority
576 << " Expected: " << stream
->EffectivePriority();
578 LOG(DFATAL
) << "Marking unknown stream " << id
<< " blocked.";
582 if (id
== kCryptoStreamId
) {
583 DCHECK(!has_pending_handshake_
);
584 has_pending_handshake_
= true;
585 // TODO(jar): Be sure to use the highest priority for the crypto stream,
586 // perhaps by adding a "special" priority for it that is higher than
588 priority
= kHighestPriority
;
590 write_blocked_streams_
.PushBack(id
, priority
);
593 bool QuicSession::HasDataToWrite() const {
594 return write_blocked_streams_
.HasWriteBlockedStreams() ||
595 connection_
->HasQueuedData();
598 bool QuicSession::GetSSLInfo(SSLInfo
* ssl_info
) const {
603 void QuicSession::PostProcessAfterData() {
604 STLDeleteElements(&closed_streams_
);
605 closed_streams_
.clear();