1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_session.h"
7 #include "base/stl_util.h"
8 #include "net/quic/crypto/proof_verifier.h"
9 #include "net/quic/quic_connection.h"
10 #include "net/quic/quic_headers_stream.h"
11 #include "net/ssl/ssl_info.h"
13 using base::StringPiece
;
21 #define ENDPOINT (is_server() ? "Server: " : " Client: ")
23 // We want to make sure we delete any closed streams in a safe manner.
24 // To avoid deleting a stream in mid-operation, we have a simple shim between
25 // us and the stream, so we can delete any streams when we return from
28 // We could just override the base methods, but this makes it easier to make
29 // sure we don't miss any.
30 class VisitorShim
: public QuicConnectionVisitorInterface
{
32 explicit VisitorShim(QuicSession
* session
) : session_(session
) {}
34 virtual bool OnStreamFrames(const vector
<QuicStreamFrame
>& frames
) OVERRIDE
{
35 bool accepted
= session_
->OnStreamFrames(frames
);
36 session_
->PostProcessAfterData();
39 virtual void OnRstStream(const QuicRstStreamFrame
& frame
) OVERRIDE
{
40 session_
->OnRstStream(frame
);
41 session_
->PostProcessAfterData();
44 virtual void OnGoAway(const QuicGoAwayFrame
& frame
) OVERRIDE
{
45 session_
->OnGoAway(frame
);
46 session_
->PostProcessAfterData();
49 virtual void OnWindowUpdateFrames(const vector
<QuicWindowUpdateFrame
>& frames
)
51 session_
->OnWindowUpdateFrames(frames
);
52 session_
->PostProcessAfterData();
55 virtual void OnBlockedFrames(const vector
<QuicBlockedFrame
>& frames
)
57 session_
->OnBlockedFrames(frames
);
58 session_
->PostProcessAfterData();
61 virtual void OnCanWrite() OVERRIDE
{
62 session_
->OnCanWrite();
63 session_
->PostProcessAfterData();
66 virtual void OnSuccessfulVersionNegotiation(
67 const QuicVersion
& version
) OVERRIDE
{
68 session_
->OnSuccessfulVersionNegotiation(version
);
71 virtual void OnConnectionClosed(
72 QuicErrorCode error
, bool from_peer
) OVERRIDE
{
73 session_
->OnConnectionClosed(error
, from_peer
);
74 // The session will go away, so don't bother with cleanup.
77 virtual void OnWriteBlocked() OVERRIDE
{
78 session_
->OnWriteBlocked();
81 virtual bool HasPendingWrites() const OVERRIDE
{
82 return session_
->HasPendingWrites();
85 virtual bool HasPendingHandshake() const OVERRIDE
{
86 return session_
->HasPendingHandshake();
90 QuicSession
* session_
;
93 QuicSession::QuicSession(QuicConnection
* connection
,
94 const QuicConfig
& config
)
95 : connection_(connection
),
96 visitor_shim_(new VisitorShim(this)),
98 max_open_streams_(config_
.max_streams_per_connection()),
99 next_stream_id_(is_server() ? 2 : 3),
100 largest_peer_created_stream_id_(0),
101 error_(QUIC_NO_ERROR
),
102 goaway_received_(false),
104 has_pending_handshake_(false) {
106 connection_
->set_visitor(visitor_shim_
.get());
107 connection_
->SetFromConfig(config_
);
108 if (connection_
->connected()) {
109 connection_
->SetOverallConnectionTimeout(
110 config_
.max_time_before_crypto_handshake());
112 headers_stream_
.reset(new QuicHeadersStream(this));
114 // For version above QUIC v12, the headers stream is stream 3, so the
115 // next available local stream ID should be 5.
116 DCHECK_EQ(kHeadersStreamId
, next_stream_id_
);
117 next_stream_id_
+= 2;
121 QuicSession::~QuicSession() {
122 STLDeleteElements(&closed_streams_
);
123 STLDeleteValues(&stream_map_
);
126 bool QuicSession::OnStreamFrames(const vector
<QuicStreamFrame
>& frames
) {
127 for (size_t i
= 0; i
< frames
.size(); ++i
) {
128 // TODO(rch) deal with the error case of stream id 0
129 if (IsClosedStream(frames
[i
].stream_id
)) {
133 ReliableQuicStream
* stream
= GetStream(frames
[i
].stream_id
);
134 if (stream
== NULL
) return false;
135 if (!stream
->WillAcceptStreamFrame(frames
[i
])) return false;
137 // TODO(alyssar) check against existing connection address: if changed, make
138 // sure we update the connection.
141 for (size_t i
= 0; i
< frames
.size(); ++i
) {
142 QuicStreamId stream_id
= frames
[i
].stream_id
;
143 ReliableQuicStream
* stream
= GetStream(stream_id
);
147 stream
->OnStreamFrame(frames
[i
]);
153 void QuicSession::OnStreamHeaders(QuicStreamId stream_id
,
154 StringPiece headers_data
) {
155 QuicDataStream
* stream
= GetDataStream(stream_id
);
157 // It's quite possible to receive headers after a stream has been reset.
160 stream
->OnStreamHeaders(headers_data
);
163 void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id
,
164 QuicPriority priority
) {
165 QuicDataStream
* stream
= GetDataStream(stream_id
);
167 // It's quite possible to receive headers after a stream has been reset.
170 stream
->OnStreamHeadersPriority(priority
);
173 void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id
,
176 QuicDataStream
* stream
= GetDataStream(stream_id
);
178 // It's quite possible to receive headers after a stream has been reset.
181 stream
->OnStreamHeadersComplete(fin
, frame_len
);
184 void QuicSession::OnRstStream(const QuicRstStreamFrame
& frame
) {
185 if (frame
.stream_id
== kCryptoStreamId
) {
186 connection()->SendConnectionCloseWithDetails(
187 QUIC_INVALID_STREAM_ID
,
188 "Attempt to reset the crypto stream");
191 if (frame
.stream_id
== kHeadersStreamId
) {
192 connection()->SendConnectionCloseWithDetails(
193 QUIC_INVALID_STREAM_ID
,
194 "Attempt to reset the headers stream");
197 QuicDataStream
* stream
= GetDataStream(frame
.stream_id
);
199 return; // Errors are handled by GetStream.
202 stream
->OnStreamReset(frame
);
205 void QuicSession::OnGoAway(const QuicGoAwayFrame
& frame
) {
206 DCHECK(frame
.last_good_stream_id
< next_stream_id_
);
207 goaway_received_
= true;
210 void QuicSession::OnConnectionClosed(QuicErrorCode error
, bool from_peer
) {
211 DCHECK(!connection_
->connected());
212 if (error_
== QUIC_NO_ERROR
) {
216 while (!stream_map_
.empty()) {
217 DataStreamMap::iterator it
= stream_map_
.begin();
218 QuicStreamId id
= it
->first
;
219 it
->second
->OnConnectionClosed(error
, from_peer
);
220 // The stream should call CloseStream as part of OnConnectionClosed.
221 if (stream_map_
.find(id
) != stream_map_
.end()) {
222 LOG(DFATAL
) << ENDPOINT
223 << "Stream failed to close under OnConnectionClosed";
229 void QuicSession::OnWindowUpdateFrames(
230 const vector
<QuicWindowUpdateFrame
>& frames
) {
231 for (size_t i
= 0; i
< frames
.size(); ++i
) {
232 // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
233 // assume that it still exists.
234 QuicStreamId stream_id
= frames
[i
].stream_id
;
235 if (stream_id
== 0) {
236 // This is a window update that applies to the connection, rather than an
237 // individual stream.
238 // TODO(rjshade): Adjust connection level flow control window.
240 << "Received connection level flow control window update with "
241 "byte offset: " << frames
[i
].byte_offset
;
245 QuicDataStream
* stream
= GetDataStream(stream_id
);
247 stream
->OnWindowUpdateFrame(frames
[i
]);
252 void QuicSession::OnBlockedFrames(const vector
<QuicBlockedFrame
>& frames
) {
253 for (size_t i
= 0; i
< frames
.size(); ++i
) {
254 // TODO(rjshade): Compare our flow control receive windows for specified
255 // streams: if we have a large window then maybe something
256 // had gone wrong with the flow control accounting.
257 DVLOG(1) << ENDPOINT
<< "Received BLOCKED frame with stream id: "
258 << frames
[i
].stream_id
;
262 void QuicSession::OnCanWrite() {
263 // We limit the number of writes to the number of pending streams. If more
264 // streams become pending, HasPendingWrites will be true, which will cause
265 // the connection to request resumption before yielding to other connections.
266 size_t num_writes
= write_blocked_streams_
.NumBlockedStreams();
268 for (size_t i
= 0; i
< num_writes
; ++i
) {
269 if (!write_blocked_streams_
.HasWriteBlockedStreams()) {
270 // Writing one stream removed another?! Something's broken.
271 LOG(DFATAL
) << "WriteBlockedStream is missing";
272 connection_
->CloseConnection(QUIC_INTERNAL_ERROR
, false);
275 if (!connection_
->CanWriteStreamData()) {
278 QuicStreamId stream_id
= write_blocked_streams_
.PopFront();
279 if (stream_id
== kCryptoStreamId
) {
280 has_pending_handshake_
= false; // We just popped it.
282 ReliableQuicStream
* stream
= GetStream(stream_id
);
283 if (stream
!= NULL
) {
284 // If the stream can't write all bytes, it'll re-add itself to the blocked
286 stream
->OnCanWrite();
291 bool QuicSession::HasPendingWrites() const {
292 return write_blocked_streams_
.HasWriteBlockedStreams();
295 bool QuicSession::HasPendingHandshake() const {
296 return has_pending_handshake_
;
299 QuicConsumedData
QuicSession::WritevData(
301 const IOVector
& data
,
302 QuicStreamOffset offset
,
304 QuicAckNotifier::DelegateInterface
* ack_notifier_delegate
) {
305 return connection_
->SendStreamData(id
, data
, offset
, fin
,
306 ack_notifier_delegate
);
309 size_t QuicSession::WriteHeaders(
311 const SpdyHeaderBlock
& headers
,
313 QuicAckNotifier::DelegateInterface
* ack_notifier_delegate
) {
314 return headers_stream_
->WriteHeaders(id
, headers
, fin
, ack_notifier_delegate
);
317 void QuicSession::SendRstStream(QuicStreamId id
,
318 QuicRstStreamErrorCode error
,
319 QuicStreamOffset bytes_written
) {
320 connection_
->SendRstStream(id
, error
, bytes_written
);
321 CloseStreamInner(id
, true);
324 void QuicSession::SendGoAway(QuicErrorCode error_code
, const string
& reason
) {
329 connection_
->SendGoAway(error_code
, largest_peer_created_stream_id_
, reason
);
332 void QuicSession::CloseStream(QuicStreamId stream_id
) {
333 CloseStreamInner(stream_id
, false);
336 void QuicSession::CloseStreamInner(QuicStreamId stream_id
,
337 bool locally_reset
) {
338 DVLOG(1) << ENDPOINT
<< "Closing stream " << stream_id
;
340 DataStreamMap::iterator it
= stream_map_
.find(stream_id
);
341 if (it
== stream_map_
.end()) {
342 DVLOG(1) << ENDPOINT
<< "Stream is already closed: " << stream_id
;
345 QuicDataStream
* stream
= it
->second
;
347 // Tell the stream that a RST has been sent.
349 stream
->set_rst_sent(true);
352 closed_streams_
.push_back(it
->second
);
353 stream_map_
.erase(it
);
357 bool QuicSession::IsEncryptionEstablished() {
358 return GetCryptoStream()->encryption_established();
361 bool QuicSession::IsCryptoHandshakeConfirmed() {
362 return GetCryptoStream()->handshake_confirmed();
365 void QuicSession::OnConfigNegotiated() {
366 connection_
->SetFromConfig(config_
);
369 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event
) {
371 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
372 // to QuicSession since it is the glue.
373 case ENCRYPTION_FIRST_ESTABLISHED
:
376 case ENCRYPTION_REESTABLISHED
:
377 // Retransmit originally packets that were sent, since they can't be
378 // decrypted by the peer.
379 connection_
->RetransmitUnackedPackets(INITIAL_ENCRYPTION_ONLY
);
382 case HANDSHAKE_CONFIRMED
:
383 LOG_IF(DFATAL
, !config_
.negotiated()) << ENDPOINT
384 << "Handshake confirmed without parameter negotiation.";
385 connection_
->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
386 max_open_streams_
= config_
.max_streams_per_connection();
390 LOG(ERROR
) << ENDPOINT
<< "Got unknown handshake event: " << event
;
394 void QuicSession::OnCryptoHandshakeMessageSent(
395 const CryptoHandshakeMessage
& message
) {
398 void QuicSession::OnCryptoHandshakeMessageReceived(
399 const CryptoHandshakeMessage
& message
) {
402 QuicConfig
* QuicSession::config() {
406 void QuicSession::ActivateStream(QuicDataStream
* stream
) {
407 DVLOG(1) << ENDPOINT
<< "num_streams: " << stream_map_
.size()
408 << ". activating " << stream
->id();
409 DCHECK_EQ(stream_map_
.count(stream
->id()), 0u);
410 stream_map_
[stream
->id()] = stream
;
413 QuicStreamId
QuicSession::GetNextStreamId() {
414 QuicStreamId id
= next_stream_id_
;
415 next_stream_id_
+= 2;
419 ReliableQuicStream
* QuicSession::GetStream(const QuicStreamId stream_id
) {
420 if (stream_id
== kCryptoStreamId
) {
421 return GetCryptoStream();
423 if (stream_id
== kHeadersStreamId
) {
424 return headers_stream_
.get();
426 return GetDataStream(stream_id
);
429 QuicDataStream
* QuicSession::GetDataStream(const QuicStreamId stream_id
) {
430 if (stream_id
== kCryptoStreamId
) {
431 DLOG(FATAL
) << "Attempt to call GetDataStream with the crypto stream id";
434 if (stream_id
== kHeadersStreamId
) {
435 DLOG(FATAL
) << "Attempt to call GetDataStream with the headers stream id";
439 DataStreamMap::iterator it
= stream_map_
.find(stream_id
);
440 if (it
!= stream_map_
.end()) {
444 if (IsClosedStream(stream_id
)) {
448 if (stream_id
% 2 == next_stream_id_
% 2) {
449 // We've received a frame for a locally-created stream that is not
450 // currently active. This is an error.
451 if (connection()->connected()) {
452 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM
);
457 return GetIncomingDataStream(stream_id
);
460 QuicDataStream
* QuicSession::GetIncomingDataStream(QuicStreamId stream_id
) {
461 if (IsClosedStream(stream_id
)) {
465 implicitly_created_streams_
.erase(stream_id
);
466 if (stream_id
> largest_peer_created_stream_id_
) {
467 // TODO(rch) add unit test for this
468 if (stream_id
- largest_peer_created_stream_id_
> kMaxStreamIdDelta
) {
469 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID
);
472 if (largest_peer_created_stream_id_
== 0) {
474 largest_peer_created_stream_id_
= 3;
476 largest_peer_created_stream_id_
= 1;
479 for (QuicStreamId id
= largest_peer_created_stream_id_
+ 2;
482 implicitly_created_streams_
.insert(id
);
484 largest_peer_created_stream_id_
= stream_id
;
486 QuicDataStream
* stream
= CreateIncomingDataStream(stream_id
);
487 if (stream
== NULL
) {
490 ActivateStream(stream
);
494 bool QuicSession::IsClosedStream(QuicStreamId id
) {
496 if (id
== kCryptoStreamId
) {
499 if (id
== kHeadersStreamId
) {
502 if (ContainsKey(stream_map_
, id
)) {
506 if (id
% 2 == next_stream_id_
% 2) {
507 // Locally created streams are strictly in-order. If the id is in the
508 // range of created streams and it's not active, it must have been closed.
509 return id
< next_stream_id_
;
511 // For peer created streams, we also need to consider implicitly created
513 return id
<= largest_peer_created_stream_id_
&&
514 implicitly_created_streams_
.count(id
) == 0;
517 size_t QuicSession::GetNumOpenStreams() const {
518 return stream_map_
.size() + implicitly_created_streams_
.size();
521 void QuicSession::MarkWriteBlocked(QuicStreamId id
, QuicPriority priority
) {
523 ReliableQuicStream
* stream
= GetStream(id
);
524 if (stream
!= NULL
) {
525 LOG_IF(DFATAL
, priority
!= stream
->EffectivePriority())
526 << "Priorities do not match. Got: " << priority
527 << " Expected: " << stream
->EffectivePriority();
529 LOG(DFATAL
) << "Marking unknown stream " << id
<< " blocked.";
533 if (id
== kCryptoStreamId
) {
534 DCHECK(!has_pending_handshake_
);
535 has_pending_handshake_
= true;
536 // TODO(jar): Be sure to use the highest priority for the crypto stream,
537 // perhaps by adding a "special" priority for it that is higher than
539 priority
= kHighestPriority
;
541 write_blocked_streams_
.PushBack(id
, priority
);
544 bool QuicSession::HasDataToWrite() const {
545 return write_blocked_streams_
.HasWriteBlockedStreams() ||
546 connection_
->HasQueuedData();
549 bool QuicSession::GetSSLInfo(SSLInfo
* ssl_info
) const {
554 void QuicSession::PostProcessAfterData() {
555 STLDeleteElements(&closed_streams_
);
556 closed_streams_
.clear();