Cast: Fix rtcp event dedup logic in rtcp_receiver.
[chromium-blink-merge.git] / net / spdy / spdy_session.cc
blob4c4900f5d8fd937612fa522491050cdc4459a867
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/spdy/spdy_session.h"
7 #include <algorithm>
8 #include <map>
10 #include "base/basictypes.h"
11 #include "base/bind.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/metrics/field_trial.h"
16 #include "base/metrics/histogram.h"
17 #include "base/metrics/sparse_histogram.h"
18 #include "base/metrics/stats_counters.h"
19 #include "base/stl_util.h"
20 #include "base/strings/string_number_conversions.h"
21 #include "base/strings/string_util.h"
22 #include "base/strings/stringprintf.h"
23 #include "base/strings/utf_string_conversions.h"
24 #include "base/time/time.h"
25 #include "base/values.h"
26 #include "crypto/ec_private_key.h"
27 #include "crypto/ec_signature_creator.h"
28 #include "net/base/connection_type_histograms.h"
29 #include "net/base/net_log.h"
30 #include "net/base/net_util.h"
31 #include "net/cert/asn1_util.h"
32 #include "net/http/http_log_util.h"
33 #include "net/http/http_network_session.h"
34 #include "net/http/http_server_properties.h"
35 #include "net/http/http_util.h"
36 #include "net/spdy/spdy_buffer_producer.h"
37 #include "net/spdy/spdy_frame_builder.h"
38 #include "net/spdy/spdy_http_utils.h"
39 #include "net/spdy/spdy_protocol.h"
40 #include "net/spdy/spdy_session_pool.h"
41 #include "net/spdy/spdy_stream.h"
42 #include "net/ssl/server_bound_cert_service.h"
44 namespace net {
46 namespace {
48 const int kReadBufferSize = 8 * 1024;
49 const int kDefaultConnectionAtRiskOfLossSeconds = 10;
50 const int kHungIntervalSeconds = 10;
52 // Always start at 1 for the first stream id.
53 const SpdyStreamId kFirstStreamId = 1;
55 // Minimum seconds that unclaimed pushed streams will be kept in memory.
56 const int kMinPushedStreamLifetimeSeconds = 300;
58 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue(
59 const SpdyHeaderBlock& headers,
60 net::NetLog::LogLevel log_level) {
61 scoped_ptr<base::ListValue> headers_list(new base::ListValue());
62 for (SpdyHeaderBlock::const_iterator it = headers.begin();
63 it != headers.end(); ++it) {
64 headers_list->AppendString(
65 it->first + ": " +
66 ElideHeaderValueForNetLog(log_level, it->first, it->second));
68 return headers_list.Pass();
71 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers,
72 bool fin,
73 bool unidirectional,
74 SpdyPriority spdy_priority,
75 SpdyStreamId stream_id,
76 NetLog::LogLevel log_level) {
77 base::DictionaryValue* dict = new base::DictionaryValue();
78 dict->Set("headers",
79 SpdyHeaderBlockToListValue(*headers, log_level).release());
80 dict->SetBoolean("fin", fin);
81 dict->SetBoolean("unidirectional", unidirectional);
82 dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
83 dict->SetInteger("stream_id", stream_id);
84 return dict;
87 base::Value* NetLogSpdySynStreamReceivedCallback(
88 const SpdyHeaderBlock* headers,
89 bool fin,
90 bool unidirectional,
91 SpdyPriority spdy_priority,
92 SpdyStreamId stream_id,
93 SpdyStreamId associated_stream,
94 NetLog::LogLevel log_level) {
95 base::DictionaryValue* dict = new base::DictionaryValue();
96 dict->Set("headers",
97 SpdyHeaderBlockToListValue(*headers, log_level).release());
98 dict->SetBoolean("fin", fin);
99 dict->SetBoolean("unidirectional", unidirectional);
100 dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
101 dict->SetInteger("stream_id", stream_id);
102 dict->SetInteger("associated_stream", associated_stream);
103 return dict;
106 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback(
107 const SpdyHeaderBlock* headers,
108 bool fin,
109 SpdyStreamId stream_id,
110 NetLog::LogLevel log_level) {
111 base::DictionaryValue* dict = new base::DictionaryValue();
112 dict->Set("headers",
113 SpdyHeaderBlockToListValue(*headers, log_level).release());
114 dict->SetBoolean("fin", fin);
115 dict->SetInteger("stream_id", stream_id);
116 return dict;
119 base::Value* NetLogSpdySessionCloseCallback(int net_error,
120 const std::string* description,
121 NetLog::LogLevel /* log_level */) {
122 base::DictionaryValue* dict = new base::DictionaryValue();
123 dict->SetInteger("net_error", net_error);
124 dict->SetString("description", *description);
125 return dict;
128 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
129 NetLog::LogLevel /* log_level */) {
130 base::DictionaryValue* dict = new base::DictionaryValue();
131 dict->SetString("host", host_pair->first.ToString());
132 dict->SetString("proxy", host_pair->second.ToPacString());
133 return dict;
136 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
137 bool clear_persisted,
138 NetLog::LogLevel /* log_level */) {
139 base::DictionaryValue* dict = new base::DictionaryValue();
140 dict->SetString("host", host_port_pair.ToString());
141 dict->SetBoolean("clear_persisted", clear_persisted);
142 return dict;
145 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
146 SpdySettingsFlags flags,
147 uint32 value,
148 NetLog::LogLevel /* log_level */) {
149 base::DictionaryValue* dict = new base::DictionaryValue();
150 dict->SetInteger("id", id);
151 dict->SetInteger("flags", flags);
152 dict->SetInteger("value", value);
153 return dict;
156 base::Value* NetLogSpdySendSettingsCallback(const SettingsMap* settings,
157 NetLog::LogLevel /* log_level */) {
158 base::DictionaryValue* dict = new base::DictionaryValue();
159 base::ListValue* settings_list = new base::ListValue();
160 for (SettingsMap::const_iterator it = settings->begin();
161 it != settings->end(); ++it) {
162 const SpdySettingsIds id = it->first;
163 const SpdySettingsFlags flags = it->second.first;
164 const uint32 value = it->second.second;
165 settings_list->Append(new base::StringValue(
166 base::StringPrintf("[id:%u flags:%u value:%u]", id, flags, value)));
168 dict->Set("settings", settings_list);
169 return dict;
172 base::Value* NetLogSpdyWindowUpdateFrameCallback(
173 SpdyStreamId stream_id,
174 uint32 delta,
175 NetLog::LogLevel /* log_level */) {
176 base::DictionaryValue* dict = new base::DictionaryValue();
177 dict->SetInteger("stream_id", static_cast<int>(stream_id));
178 dict->SetInteger("delta", delta);
179 return dict;
182 base::Value* NetLogSpdySessionWindowUpdateCallback(
183 int32 delta,
184 int32 window_size,
185 NetLog::LogLevel /* log_level */) {
186 base::DictionaryValue* dict = new base::DictionaryValue();
187 dict->SetInteger("delta", delta);
188 dict->SetInteger("window_size", window_size);
189 return dict;
192 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
193 int size,
194 bool fin,
195 NetLog::LogLevel /* log_level */) {
196 base::DictionaryValue* dict = new base::DictionaryValue();
197 dict->SetInteger("stream_id", static_cast<int>(stream_id));
198 dict->SetInteger("size", size);
199 dict->SetBoolean("fin", fin);
200 return dict;
203 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
204 int status,
205 const std::string* description,
206 NetLog::LogLevel /* log_level */) {
207 base::DictionaryValue* dict = new base::DictionaryValue();
208 dict->SetInteger("stream_id", static_cast<int>(stream_id));
209 dict->SetInteger("status", status);
210 dict->SetString("description", *description);
211 return dict;
214 base::Value* NetLogSpdyPingCallback(SpdyPingId unique_id,
215 bool is_ack,
216 const char* type,
217 NetLog::LogLevel /* log_level */) {
218 base::DictionaryValue* dict = new base::DictionaryValue();
219 dict->SetInteger("unique_id", unique_id);
220 dict->SetString("type", type);
221 dict->SetBoolean("is_ack", is_ack);
222 return dict;
225 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
226 int active_streams,
227 int unclaimed_streams,
228 SpdyGoAwayStatus status,
229 NetLog::LogLevel /* log_level */) {
230 base::DictionaryValue* dict = new base::DictionaryValue();
231 dict->SetInteger("last_accepted_stream_id",
232 static_cast<int>(last_stream_id));
233 dict->SetInteger("active_streams", active_streams);
234 dict->SetInteger("unclaimed_streams", unclaimed_streams);
235 dict->SetInteger("status", static_cast<int>(status));
236 return dict;
239 // Helper function to return the total size of an array of objects
240 // with .size() member functions.
241 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
242 size_t total_size = 0;
243 for (size_t i = 0; i < N; ++i) {
244 total_size += arr[i].size();
246 return total_size;
249 // Helper class for std:find_if on STL container containing
250 // SpdyStreamRequest weak pointers.
251 class RequestEquals {
252 public:
253 RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request)
254 : request_(request) {}
256 bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const {
257 return request_.get() == request.get();
260 private:
261 const base::WeakPtr<SpdyStreamRequest> request_;
264 // The maximum number of concurrent streams we will ever create. Even if
265 // the server permits more, we will never exceed this limit.
266 const size_t kMaxConcurrentStreamLimit = 256;
268 } // namespace
270 SpdyProtocolErrorDetails MapFramerErrorToProtocolError(
271 SpdyFramer::SpdyError err) {
272 switch(err) {
273 case SpdyFramer::SPDY_NO_ERROR:
274 return SPDY_ERROR_NO_ERROR;
275 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
276 return SPDY_ERROR_INVALID_CONTROL_FRAME;
277 case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
278 return SPDY_ERROR_CONTROL_PAYLOAD_TOO_LARGE;
279 case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
280 return SPDY_ERROR_ZLIB_INIT_FAILURE;
281 case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
282 return SPDY_ERROR_UNSUPPORTED_VERSION;
283 case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
284 return SPDY_ERROR_DECOMPRESS_FAILURE;
285 case SpdyFramer::SPDY_COMPRESS_FAILURE:
286 return SPDY_ERROR_COMPRESS_FAILURE;
287 case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
288 return SPDY_ERROR_GOAWAY_FRAME_CORRUPT;
289 case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
290 return SPDY_ERROR_RST_STREAM_FRAME_CORRUPT;
291 case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
292 return SPDY_ERROR_INVALID_DATA_FRAME_FLAGS;
293 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
294 return SPDY_ERROR_INVALID_CONTROL_FRAME_FLAGS;
295 case SpdyFramer::SPDY_UNEXPECTED_FRAME:
296 return SPDY_ERROR_UNEXPECTED_FRAME;
297 default:
298 NOTREACHED();
299 return static_cast<SpdyProtocolErrorDetails>(-1);
303 SpdyProtocolErrorDetails MapRstStreamStatusToProtocolError(
304 SpdyRstStreamStatus status) {
305 switch(status) {
306 case RST_STREAM_PROTOCOL_ERROR:
307 return STATUS_CODE_PROTOCOL_ERROR;
308 case RST_STREAM_INVALID_STREAM:
309 return STATUS_CODE_INVALID_STREAM;
310 case RST_STREAM_REFUSED_STREAM:
311 return STATUS_CODE_REFUSED_STREAM;
312 case RST_STREAM_UNSUPPORTED_VERSION:
313 return STATUS_CODE_UNSUPPORTED_VERSION;
314 case RST_STREAM_CANCEL:
315 return STATUS_CODE_CANCEL;
316 case RST_STREAM_INTERNAL_ERROR:
317 return STATUS_CODE_INTERNAL_ERROR;
318 case RST_STREAM_FLOW_CONTROL_ERROR:
319 return STATUS_CODE_FLOW_CONTROL_ERROR;
320 case RST_STREAM_STREAM_IN_USE:
321 return STATUS_CODE_STREAM_IN_USE;
322 case RST_STREAM_STREAM_ALREADY_CLOSED:
323 return STATUS_CODE_STREAM_ALREADY_CLOSED;
324 case RST_STREAM_INVALID_CREDENTIALS:
325 return STATUS_CODE_INVALID_CREDENTIALS;
326 case RST_STREAM_FRAME_SIZE_ERROR:
327 return STATUS_CODE_FRAME_SIZE_ERROR;
328 case RST_STREAM_SETTINGS_TIMEOUT:
329 return STATUS_CODE_SETTINGS_TIMEOUT;
330 case RST_STREAM_CONNECT_ERROR:
331 return STATUS_CODE_CONNECT_ERROR;
332 case RST_STREAM_ENHANCE_YOUR_CALM:
333 return STATUS_CODE_ENHANCE_YOUR_CALM;
334 default:
335 NOTREACHED();
336 return static_cast<SpdyProtocolErrorDetails>(-1);
340 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) {
341 Reset();
344 SpdyStreamRequest::~SpdyStreamRequest() {
345 CancelRequest();
348 int SpdyStreamRequest::StartRequest(
349 SpdyStreamType type,
350 const base::WeakPtr<SpdySession>& session,
351 const GURL& url,
352 RequestPriority priority,
353 const BoundNetLog& net_log,
354 const CompletionCallback& callback) {
355 DCHECK(session);
356 DCHECK(!session_);
357 DCHECK(!stream_);
358 DCHECK(callback_.is_null());
360 type_ = type;
361 session_ = session;
362 url_ = url;
363 priority_ = priority;
364 net_log_ = net_log;
365 callback_ = callback;
367 base::WeakPtr<SpdyStream> stream;
368 int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream);
369 if (rv == OK) {
370 Reset();
371 stream_ = stream;
373 return rv;
376 void SpdyStreamRequest::CancelRequest() {
377 if (session_)
378 session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr());
379 Reset();
380 // Do this to cancel any pending CompleteStreamRequest() tasks.
381 weak_ptr_factory_.InvalidateWeakPtrs();
384 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
385 DCHECK(!session_);
386 base::WeakPtr<SpdyStream> stream = stream_;
387 DCHECK(stream);
388 Reset();
389 return stream;
392 void SpdyStreamRequest::OnRequestCompleteSuccess(
393 const base::WeakPtr<SpdyStream>& stream) {
394 DCHECK(session_);
395 DCHECK(!stream_);
396 DCHECK(!callback_.is_null());
397 CompletionCallback callback = callback_;
398 Reset();
399 DCHECK(stream);
400 stream_ = stream;
401 callback.Run(OK);
404 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
405 DCHECK(session_);
406 DCHECK(!stream_);
407 DCHECK(!callback_.is_null());
408 CompletionCallback callback = callback_;
409 Reset();
410 DCHECK_NE(rv, OK);
411 callback.Run(rv);
414 void SpdyStreamRequest::Reset() {
415 type_ = SPDY_BIDIRECTIONAL_STREAM;
416 session_.reset();
417 stream_.reset();
418 url_ = GURL();
419 priority_ = MINIMUM_PRIORITY;
420 net_log_ = BoundNetLog();
421 callback_.Reset();
424 SpdySession::ActiveStreamInfo::ActiveStreamInfo()
425 : stream(NULL),
426 waiting_for_syn_reply(false) {}
428 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
429 : stream(stream),
430 waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {}
432 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
434 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
436 SpdySession::PushedStreamInfo::PushedStreamInfo(
437 SpdyStreamId stream_id,
438 base::TimeTicks creation_time)
439 : stream_id(stream_id),
440 creation_time(creation_time) {}
442 SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
444 SpdySession::SpdySession(
445 const SpdySessionKey& spdy_session_key,
446 const base::WeakPtr<HttpServerProperties>& http_server_properties,
447 bool verify_domain_authentication,
448 bool enable_sending_initial_data,
449 bool enable_compression,
450 bool enable_ping_based_connection_checking,
451 NextProto default_protocol,
452 size_t stream_initial_recv_window_size,
453 size_t initial_max_concurrent_streams,
454 size_t max_concurrent_streams_limit,
455 TimeFunc time_func,
456 const HostPortPair& trusted_spdy_proxy,
457 NetLog* net_log)
458 : weak_factory_(this),
459 in_io_loop_(false),
460 spdy_session_key_(spdy_session_key),
461 pool_(NULL),
462 http_server_properties_(http_server_properties),
463 read_buffer_(new IOBuffer(kReadBufferSize)),
464 stream_hi_water_mark_(kFirstStreamId),
465 in_flight_write_frame_type_(DATA),
466 in_flight_write_frame_size_(0),
467 is_secure_(false),
468 certificate_error_code_(OK),
469 availability_state_(STATE_AVAILABLE),
470 read_state_(READ_STATE_DO_READ),
471 write_state_(WRITE_STATE_IDLE),
472 error_on_close_(OK),
473 max_concurrent_streams_(initial_max_concurrent_streams == 0 ?
474 kInitialMaxConcurrentStreams :
475 initial_max_concurrent_streams),
476 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0 ?
477 kMaxConcurrentStreamLimit :
478 max_concurrent_streams_limit),
479 streams_initiated_count_(0),
480 streams_pushed_count_(0),
481 streams_pushed_and_claimed_count_(0),
482 streams_abandoned_count_(0),
483 total_bytes_received_(0),
484 sent_settings_(false),
485 received_settings_(false),
486 stalled_streams_(0),
487 pings_in_flight_(0),
488 next_ping_id_(1),
489 last_activity_time_(time_func()),
490 last_compressed_frame_len_(0),
491 check_ping_status_pending_(false),
492 send_connection_header_prefix_(false),
493 flow_control_state_(FLOW_CONTROL_NONE),
494 stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
495 stream_initial_recv_window_size_(stream_initial_recv_window_size == 0 ?
496 kDefaultInitialRecvWindowSize :
497 stream_initial_recv_window_size),
498 session_send_window_size_(0),
499 session_recv_window_size_(0),
500 session_unacked_recv_window_bytes_(0),
501 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
502 verify_domain_authentication_(verify_domain_authentication),
503 enable_sending_initial_data_(enable_sending_initial_data),
504 enable_compression_(enable_compression),
505 enable_ping_based_connection_checking_(
506 enable_ping_based_connection_checking),
507 protocol_(default_protocol),
508 connection_at_risk_of_loss_time_(
509 base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
510 hung_interval_(
511 base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
512 trusted_spdy_proxy_(trusted_spdy_proxy),
513 time_func_(time_func) {
514 DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
515 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
516 DCHECK(HttpStreamFactory::spdy_enabled());
517 net_log_.BeginEvent(
518 NetLog::TYPE_SPDY_SESSION,
519 base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
520 next_unclaimed_push_stream_sweep_time_ = time_func_() +
521 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
522 // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
525 SpdySession::~SpdySession() {
526 CHECK(!in_io_loop_);
527 DCHECK(!pool_);
528 DcheckClosed();
530 // TODO(akalin): Check connection->is_initialized() instead. This
531 // requires re-working CreateFakeSpdySession(), though.
532 DCHECK(connection_->socket());
533 // With SPDY we can't recycle sockets.
534 connection_->socket()->Disconnect();
536 RecordHistograms();
538 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
541 void SpdySession::InitializeWithSocket(
542 scoped_ptr<ClientSocketHandle> connection,
543 SpdySessionPool* pool,
544 bool is_secure,
545 int certificate_error_code) {
546 CHECK(!in_io_loop_);
547 DCHECK_EQ(availability_state_, STATE_AVAILABLE);
548 DCHECK_EQ(read_state_, READ_STATE_DO_READ);
549 DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
550 DCHECK(!connection_);
552 DCHECK(certificate_error_code == OK ||
553 certificate_error_code < ERR_IO_PENDING);
554 // TODO(akalin): Check connection->is_initialized() instead. This
555 // requires re-working CreateFakeSpdySession(), though.
556 DCHECK(connection->socket());
558 base::StatsCounter spdy_sessions("spdy.sessions");
559 spdy_sessions.Increment();
561 connection_ = connection.Pass();
562 is_secure_ = is_secure;
563 certificate_error_code_ = certificate_error_code;
565 NextProto protocol_negotiated =
566 connection_->socket()->GetNegotiatedProtocol();
567 if (protocol_negotiated != kProtoUnknown) {
568 protocol_ = protocol_negotiated;
570 DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
571 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
573 if (protocol_ == kProtoSPDY4)
574 send_connection_header_prefix_ = true;
576 if (protocol_ >= kProtoSPDY31) {
577 flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
578 session_send_window_size_ = kSpdySessionInitialWindowSize;
579 session_recv_window_size_ = kSpdySessionInitialWindowSize;
580 } else if (protocol_ >= kProtoSPDY3) {
581 flow_control_state_ = FLOW_CONTROL_STREAM;
582 } else {
583 flow_control_state_ = FLOW_CONTROL_NONE;
586 buffered_spdy_framer_.reset(
587 new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_),
588 enable_compression_));
589 buffered_spdy_framer_->set_visitor(this);
590 buffered_spdy_framer_->set_debug_visitor(this);
591 UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol_, kProtoMaximumVersion);
592 #if defined(SPDY_PROXY_AUTH_ORIGIN)
593 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy",
594 host_port_pair().Equals(HostPortPair::FromURL(
595 GURL(SPDY_PROXY_AUTH_ORIGIN))));
596 #endif
598 net_log_.AddEvent(
599 NetLog::TYPE_SPDY_SESSION_INITIALIZED,
600 connection_->socket()->NetLog().source().ToEventParametersCallback());
602 DCHECK_NE(availability_state_, STATE_CLOSED);
603 connection_->AddHigherLayeredPool(this);
604 if (enable_sending_initial_data_)
605 SendInitialData();
606 pool_ = pool;
608 // Bootstrap the read loop.
609 base::MessageLoop::current()->PostTask(
610 FROM_HERE,
611 base::Bind(&SpdySession::PumpReadLoop,
612 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
615 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
616 if (!verify_domain_authentication_)
617 return true;
619 if (availability_state_ == STATE_CLOSED)
620 return false;
622 SSLInfo ssl_info;
623 bool was_npn_negotiated;
624 NextProto protocol_negotiated = kProtoUnknown;
625 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
626 return true; // This is not a secure session, so all domains are okay.
628 bool unused = false;
629 return
630 !ssl_info.client_cert_sent &&
631 (!ssl_info.channel_id_sent ||
632 (ServerBoundCertService::GetDomainForHost(domain) ==
633 ServerBoundCertService::GetDomainForHost(host_port_pair().host()))) &&
634 ssl_info.cert->VerifyNameMatch(domain, &unused);
637 int SpdySession::GetPushStream(
638 const GURL& url,
639 base::WeakPtr<SpdyStream>* stream,
640 const BoundNetLog& stream_net_log) {
641 CHECK(!in_io_loop_);
643 stream->reset();
645 // TODO(akalin): Add unit test exercising this code path.
646 if (availability_state_ == STATE_CLOSED)
647 return ERR_CONNECTION_CLOSED;
649 Error err = TryAccessStream(url);
650 if (err != OK)
651 return err;
653 *stream = GetActivePushStream(url);
654 if (*stream) {
655 DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
656 streams_pushed_and_claimed_count_++;
658 return OK;
661 // {,Try}CreateStream() and TryAccessStream() can be called with
662 // |in_io_loop_| set if a stream is being created in response to
663 // another being closed due to received data.
665 Error SpdySession::TryAccessStream(const GURL& url) {
666 DCHECK_NE(availability_state_, STATE_CLOSED);
668 if (is_secure_ && certificate_error_code_ != OK &&
669 (url.SchemeIs("https") || url.SchemeIs("wss"))) {
670 RecordProtocolErrorHistogram(
671 PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
672 CloseSessionResult result = DoCloseSession(
673 static_cast<Error>(certificate_error_code_),
674 "Tried to get SPDY stream for secure content over an unauthenticated "
675 "session.");
676 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
677 return ERR_SPDY_PROTOCOL_ERROR;
679 return OK;
682 int SpdySession::TryCreateStream(
683 const base::WeakPtr<SpdyStreamRequest>& request,
684 base::WeakPtr<SpdyStream>* stream) {
685 DCHECK(request);
687 if (availability_state_ == STATE_GOING_AWAY)
688 return ERR_FAILED;
690 // TODO(akalin): Add unit test exercising this code path.
691 if (availability_state_ == STATE_CLOSED)
692 return ERR_CONNECTION_CLOSED;
694 Error err = TryAccessStream(request->url());
695 if (err != OK)
696 return err;
698 if (!max_concurrent_streams_ ||
699 (active_streams_.size() + created_streams_.size() <
700 max_concurrent_streams_)) {
701 return CreateStream(*request, stream);
704 stalled_streams_++;
705 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
706 RequestPriority priority = request->priority();
707 CHECK_GE(priority, MINIMUM_PRIORITY);
708 CHECK_LE(priority, MAXIMUM_PRIORITY);
709 pending_create_stream_queues_[priority].push_back(request);
710 return ERR_IO_PENDING;
713 int SpdySession::CreateStream(const SpdyStreamRequest& request,
714 base::WeakPtr<SpdyStream>* stream) {
715 DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
716 DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
718 if (availability_state_ == STATE_GOING_AWAY)
719 return ERR_FAILED;
721 // TODO(akalin): Add unit test exercising this code path.
722 if (availability_state_ == STATE_CLOSED)
723 return ERR_CONNECTION_CLOSED;
725 Error err = TryAccessStream(request.url());
726 if (err != OK) {
727 // This should have been caught in TryCreateStream().
728 NOTREACHED();
729 return err;
732 DCHECK(connection_->socket());
733 DCHECK(connection_->socket()->IsConnected());
734 if (connection_->socket()) {
735 UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
736 connection_->socket()->IsConnected());
737 if (!connection_->socket()->IsConnected()) {
738 CloseSessionResult result = DoCloseSession(
739 ERR_CONNECTION_CLOSED,
740 "Tried to create SPDY stream for a closed socket connection.");
741 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
742 return ERR_CONNECTION_CLOSED;
746 scoped_ptr<SpdyStream> new_stream(
747 new SpdyStream(request.type(), GetWeakPtr(), request.url(),
748 request.priority(),
749 stream_initial_send_window_size_,
750 stream_initial_recv_window_size_,
751 request.net_log()));
752 *stream = new_stream->GetWeakPtr();
753 InsertCreatedStream(new_stream.Pass());
755 UMA_HISTOGRAM_CUSTOM_COUNTS(
756 "Net.SpdyPriorityCount",
757 static_cast<int>(request.priority()), 0, 10, 11);
759 return OK;
762 void SpdySession::CancelStreamRequest(
763 const base::WeakPtr<SpdyStreamRequest>& request) {
764 DCHECK(request);
765 RequestPriority priority = request->priority();
766 CHECK_GE(priority, MINIMUM_PRIORITY);
767 CHECK_LE(priority, MAXIMUM_PRIORITY);
769 #if DCHECK_IS_ON
770 // |request| should not be in a queue not matching its priority.
771 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
772 if (priority == i)
773 continue;
774 PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
775 DCHECK(std::find_if(queue->begin(),
776 queue->end(),
777 RequestEquals(request)) == queue->end());
779 #endif
781 PendingStreamRequestQueue* queue =
782 &pending_create_stream_queues_[priority];
783 // Remove |request| from |queue| while preserving the order of the
784 // other elements.
785 PendingStreamRequestQueue::iterator it =
786 std::find_if(queue->begin(), queue->end(), RequestEquals(request));
787 // The request may already be removed if there's a
788 // CompleteStreamRequest() in flight.
789 if (it != queue->end()) {
790 it = queue->erase(it);
791 // |request| should be in the queue at most once, and if it is
792 // present, should not be pending completion.
793 DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) ==
794 queue->end());
798 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() {
799 for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) {
800 if (pending_create_stream_queues_[j].empty())
801 continue;
803 base::WeakPtr<SpdyStreamRequest> pending_request =
804 pending_create_stream_queues_[j].front();
805 DCHECK(pending_request);
806 pending_create_stream_queues_[j].pop_front();
807 return pending_request;
809 return base::WeakPtr<SpdyStreamRequest>();
812 void SpdySession::ProcessPendingStreamRequests() {
813 // Like |max_concurrent_streams_|, 0 means infinite for
814 // |max_requests_to_process|.
815 size_t max_requests_to_process = 0;
816 if (max_concurrent_streams_ != 0) {
817 max_requests_to_process =
818 max_concurrent_streams_ -
819 (active_streams_.size() + created_streams_.size());
821 for (size_t i = 0;
822 max_requests_to_process == 0 || i < max_requests_to_process; ++i) {
823 base::WeakPtr<SpdyStreamRequest> pending_request =
824 GetNextPendingStreamRequest();
825 if (!pending_request)
826 break;
828 base::MessageLoop::current()->PostTask(
829 FROM_HERE,
830 base::Bind(&SpdySession::CompleteStreamRequest,
831 weak_factory_.GetWeakPtr(),
832 pending_request));
836 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
837 pooled_aliases_.insert(alias_key);
840 SpdyMajorVersion SpdySession::GetProtocolVersion() const {
841 DCHECK(buffered_spdy_framer_.get());
842 return buffered_spdy_framer_->protocol_version();
845 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
846 return weak_factory_.GetWeakPtr();
849 bool SpdySession::CloseOneIdleConnection() {
850 CHECK(!in_io_loop_);
851 DCHECK_NE(availability_state_, STATE_CLOSED);
852 DCHECK(pool_);
853 if (!active_streams_.empty())
854 return false;
855 CloseSessionResult result =
856 DoCloseSession(ERR_CONNECTION_CLOSED, "Closing one idle connection.");
857 if (result != SESSION_CLOSED_AND_REMOVED) {
858 NOTREACHED();
859 return false;
861 return true;
864 void SpdySession::EnqueueStreamWrite(
865 const base::WeakPtr<SpdyStream>& stream,
866 SpdyFrameType frame_type,
867 scoped_ptr<SpdyBufferProducer> producer) {
868 DCHECK(frame_type == HEADERS ||
869 frame_type == DATA ||
870 frame_type == CREDENTIAL ||
871 frame_type == SYN_STREAM);
872 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
875 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
876 SpdyStreamId stream_id,
877 RequestPriority priority,
878 SpdyControlFlags flags,
879 const SpdyHeaderBlock& headers) {
880 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
881 CHECK(it != active_streams_.end());
882 CHECK_EQ(it->second.stream->stream_id(), stream_id);
884 SendPrefacePingIfNoneInFlight();
886 DCHECK(buffered_spdy_framer_.get());
887 SpdyPriority spdy_priority =
888 ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion());
889 scoped_ptr<SpdyFrame> syn_frame(
890 buffered_spdy_framer_->CreateSynStream(stream_id, 0, spdy_priority, flags,
891 &headers));
893 base::StatsCounter spdy_requests("spdy.requests");
894 spdy_requests.Increment();
895 streams_initiated_count_++;
897 if (net_log().IsLogging()) {
898 net_log().AddEvent(
899 NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
900 base::Bind(&NetLogSpdySynStreamSentCallback, &headers,
901 (flags & CONTROL_FLAG_FIN) != 0,
902 (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
903 spdy_priority,
904 stream_id));
907 return syn_frame.Pass();
910 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
911 IOBuffer* data,
912 int len,
913 SpdyDataFlags flags) {
914 if (availability_state_ == STATE_CLOSED) {
915 NOTREACHED();
916 return scoped_ptr<SpdyBuffer>();
919 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
920 CHECK(it != active_streams_.end());
921 SpdyStream* stream = it->second.stream;
922 CHECK_EQ(stream->stream_id(), stream_id);
924 if (len < 0) {
925 NOTREACHED();
926 return scoped_ptr<SpdyBuffer>();
929 int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
931 bool send_stalled_by_stream =
932 (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
933 (stream->send_window_size() <= 0);
934 bool send_stalled_by_session = IsSendStalled();
936 // NOTE: There's an enum of the same name in histograms.xml.
937 enum SpdyFrameFlowControlState {
938 SEND_NOT_STALLED,
939 SEND_STALLED_BY_STREAM,
940 SEND_STALLED_BY_SESSION,
941 SEND_STALLED_BY_STREAM_AND_SESSION,
944 SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
945 if (send_stalled_by_stream) {
946 if (send_stalled_by_session) {
947 frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
948 } else {
949 frame_flow_control_state = SEND_STALLED_BY_STREAM;
951 } else if (send_stalled_by_session) {
952 frame_flow_control_state = SEND_STALLED_BY_SESSION;
955 if (flow_control_state_ == FLOW_CONTROL_STREAM) {
956 UMA_HISTOGRAM_ENUMERATION(
957 "Net.SpdyFrameStreamFlowControlState",
958 frame_flow_control_state,
959 SEND_STALLED_BY_STREAM + 1);
960 } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
961 UMA_HISTOGRAM_ENUMERATION(
962 "Net.SpdyFrameStreamAndSessionFlowControlState",
963 frame_flow_control_state,
964 SEND_STALLED_BY_STREAM_AND_SESSION + 1);
967 // Obey send window size of the stream if stream flow control is
968 // enabled.
969 if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
970 if (send_stalled_by_stream) {
971 stream->set_send_stalled_by_flow_control(true);
972 // Even though we're currently stalled only by the stream, we
973 // might end up being stalled by the session also.
974 QueueSendStalledStream(*stream);
975 net_log().AddEvent(
976 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
977 NetLog::IntegerCallback("stream_id", stream_id));
978 return scoped_ptr<SpdyBuffer>();
981 effective_len = std::min(effective_len, stream->send_window_size());
984 // Obey send window size of the session if session flow control is
985 // enabled.
986 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
987 if (send_stalled_by_session) {
988 stream->set_send_stalled_by_flow_control(true);
989 QueueSendStalledStream(*stream);
990 net_log().AddEvent(
991 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
992 NetLog::IntegerCallback("stream_id", stream_id));
993 return scoped_ptr<SpdyBuffer>();
996 effective_len = std::min(effective_len, session_send_window_size_);
999 DCHECK_GE(effective_len, 0);
1001 // Clear FIN flag if only some of the data will be in the data
1002 // frame.
1003 if (effective_len < len)
1004 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
1006 if (net_log().IsLogging()) {
1007 net_log().AddEvent(
1008 NetLog::TYPE_SPDY_SESSION_SEND_DATA,
1009 base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
1010 (flags & DATA_FLAG_FIN) != 0));
1013 // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
1014 if (effective_len > 0)
1015 SendPrefacePingIfNoneInFlight();
1017 // TODO(mbelshe): reduce memory copies here.
1018 DCHECK(buffered_spdy_framer_.get());
1019 scoped_ptr<SpdyFrame> frame(
1020 buffered_spdy_framer_->CreateDataFrame(
1021 stream_id, data->data(),
1022 static_cast<uint32>(effective_len), flags));
1024 scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
1026 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1027 DecreaseSendWindowSize(static_cast<int32>(effective_len));
1028 data_buffer->AddConsumeCallback(
1029 base::Bind(&SpdySession::OnWriteBufferConsumed,
1030 weak_factory_.GetWeakPtr(),
1031 static_cast<size_t>(effective_len)));
1034 return data_buffer.Pass();
1037 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
1038 DCHECK_NE(stream_id, 0u);
1040 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1041 if (it == active_streams_.end()) {
1042 NOTREACHED();
1043 return;
1046 CloseActiveStreamIterator(it, status);
1049 void SpdySession::CloseCreatedStream(
1050 const base::WeakPtr<SpdyStream>& stream, int status) {
1051 DCHECK_EQ(stream->stream_id(), 0u);
1053 CreatedStreamSet::iterator it = created_streams_.find(stream.get());
1054 if (it == created_streams_.end()) {
1055 NOTREACHED();
1056 return;
1059 CloseCreatedStreamIterator(it, status);
1062 void SpdySession::ResetStream(SpdyStreamId stream_id,
1063 SpdyRstStreamStatus status,
1064 const std::string& description) {
1065 DCHECK_NE(stream_id, 0u);
1067 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1068 if (it == active_streams_.end()) {
1069 NOTREACHED();
1070 return;
1073 ResetStreamIterator(it, status, description);
1076 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
1077 return ContainsKey(active_streams_, stream_id);
1080 LoadState SpdySession::GetLoadState() const {
1081 // Just report that we're idle since the session could be doing
1082 // many things concurrently.
1083 return LOAD_STATE_IDLE;
1086 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
1087 int status) {
1088 // TODO(mbelshe): We should send a RST_STREAM control frame here
1089 // so that the server can cancel a large send.
1091 scoped_ptr<SpdyStream> owned_stream(it->second.stream);
1092 active_streams_.erase(it);
1094 // TODO(akalin): When SpdyStream was ref-counted (and
1095 // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
1096 // was only done when status was not OK. This meant that pushed
1097 // streams can still be claimed after they're closed. This is
1098 // probably something that we still want to support, although server
1099 // push is hardly used. Write tests for this and fix this. (See
1100 // http://crbug.com/261712 .)
1101 if (owned_stream->type() == SPDY_PUSH_STREAM)
1102 unclaimed_pushed_streams_.erase(owned_stream->url());
1104 base::WeakPtr<SpdySession> weak_this = GetWeakPtr();
1106 DeleteStream(owned_stream.Pass(), status);
1108 if (!weak_this)
1109 return;
1111 if (availability_state_ == STATE_CLOSED)
1112 return;
1114 // If there are no active streams and the socket pool is stalled, close the
1115 // session to free up a socket slot.
1116 if (active_streams_.empty() && connection_->IsPoolStalled()) {
1117 CloseSessionResult result =
1118 DoCloseSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1119 CHECK_NE(result, SESSION_ALREADY_CLOSED);
1123 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
1124 int status) {
1125 scoped_ptr<SpdyStream> owned_stream(*it);
1126 created_streams_.erase(it);
1127 DeleteStream(owned_stream.Pass(), status);
1130 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
1131 SpdyRstStreamStatus status,
1132 const std::string& description) {
1133 // Send the RST_STREAM frame first as CloseActiveStreamIterator()
1134 // may close us.
1135 SpdyStreamId stream_id = it->first;
1136 RequestPriority priority = it->second.stream->priority();
1137 EnqueueResetStreamFrame(stream_id, priority, status, description);
1139 // Removes any pending writes for the stream except for possibly an
1140 // in-flight one.
1141 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
1144 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id,
1145 RequestPriority priority,
1146 SpdyRstStreamStatus status,
1147 const std::string& description) {
1148 DCHECK_NE(stream_id, 0u);
1150 net_log().AddEvent(
1151 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
1152 base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
1154 DCHECK(buffered_spdy_framer_.get());
1155 scoped_ptr<SpdyFrame> rst_frame(
1156 buffered_spdy_framer_->CreateRstStream(stream_id, status));
1158 EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
1159 RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status));
1162 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
1163 CHECK(!in_io_loop_);
1164 CHECK_NE(availability_state_, STATE_CLOSED);
1165 CHECK_EQ(read_state_, expected_read_state);
1167 result = DoReadLoop(expected_read_state, result);
1169 if (availability_state_ == STATE_CLOSED) {
1170 CHECK_EQ(result, error_on_close_);
1171 CHECK_LT(error_on_close_, ERR_IO_PENDING);
1172 RemoveFromPool();
1173 return;
1176 CHECK(result == OK || result == ERR_IO_PENDING);
1179 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
1180 CHECK(!in_io_loop_);
1181 CHECK_NE(availability_state_, STATE_CLOSED);
1182 CHECK_EQ(read_state_, expected_read_state);
1184 in_io_loop_ = true;
1186 int bytes_read_without_yielding = 0;
1188 // Loop until the session is closed, the read becomes blocked, or
1189 // the read limit is exceeded.
1190 while (true) {
1191 switch (read_state_) {
1192 case READ_STATE_DO_READ:
1193 CHECK_EQ(result, OK);
1194 result = DoRead();
1195 break;
1196 case READ_STATE_DO_READ_COMPLETE:
1197 if (result > 0)
1198 bytes_read_without_yielding += result;
1199 result = DoReadComplete(result);
1200 break;
1201 default:
1202 NOTREACHED() << "read_state_: " << read_state_;
1203 break;
1206 if (availability_state_ == STATE_CLOSED) {
1207 CHECK_EQ(result, error_on_close_);
1208 CHECK_LT(result, ERR_IO_PENDING);
1209 break;
1212 if (result == ERR_IO_PENDING)
1213 break;
1215 if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
1216 read_state_ = READ_STATE_DO_READ;
1217 base::MessageLoop::current()->PostTask(
1218 FROM_HERE,
1219 base::Bind(&SpdySession::PumpReadLoop,
1220 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
1221 result = ERR_IO_PENDING;
1222 break;
1226 CHECK(in_io_loop_);
1227 in_io_loop_ = false;
1229 return result;
1232 int SpdySession::DoRead() {
1233 CHECK(in_io_loop_);
1234 CHECK_NE(availability_state_, STATE_CLOSED);
1236 CHECK(connection_);
1237 CHECK(connection_->socket());
1238 read_state_ = READ_STATE_DO_READ_COMPLETE;
1239 return connection_->socket()->Read(
1240 read_buffer_.get(),
1241 kReadBufferSize,
1242 base::Bind(&SpdySession::PumpReadLoop,
1243 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
1246 int SpdySession::DoReadComplete(int result) {
1247 CHECK(in_io_loop_);
1248 DCHECK_NE(availability_state_, STATE_CLOSED);
1250 // Parse a frame. For now this code requires that the frame fit into our
1251 // buffer (kReadBufferSize).
1252 // TODO(mbelshe): support arbitrarily large frames!
1254 if (result == 0) {
1255 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
1256 total_bytes_received_, 1, 100000000, 50);
1257 CloseSessionResult close_session_result =
1258 DoCloseSession(ERR_CONNECTION_CLOSED, "Connection closed");
1259 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
1260 DCHECK_EQ(availability_state_, STATE_CLOSED);
1261 DCHECK_EQ(error_on_close_, ERR_CONNECTION_CLOSED);
1262 return ERR_CONNECTION_CLOSED;
1265 if (result < 0) {
1266 CloseSessionResult close_session_result =
1267 DoCloseSession(static_cast<Error>(result), "result is < 0.");
1268 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
1269 DCHECK_EQ(availability_state_, STATE_CLOSED);
1270 DCHECK_EQ(error_on_close_, result);
1271 return result;
1273 CHECK_LE(result, kReadBufferSize);
1274 total_bytes_received_ += result;
1276 last_activity_time_ = time_func_();
1278 DCHECK(buffered_spdy_framer_.get());
1279 char* data = read_buffer_->data();
1280 while (result > 0) {
1281 uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
1282 result -= bytes_processed;
1283 data += bytes_processed;
1285 if (availability_state_ == STATE_CLOSED) {
1286 DCHECK_LT(error_on_close_, ERR_IO_PENDING);
1287 return error_on_close_;
1290 DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
1293 read_state_ = READ_STATE_DO_READ;
1294 return OK;
1297 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
1298 CHECK(!in_io_loop_);
1299 DCHECK_NE(availability_state_, STATE_CLOSED);
1300 DCHECK_EQ(write_state_, expected_write_state);
1302 result = DoWriteLoop(expected_write_state, result);
1304 if (availability_state_ == STATE_CLOSED) {
1305 DCHECK_EQ(result, error_on_close_);
1306 DCHECK_LT(error_on_close_, ERR_IO_PENDING);
1307 RemoveFromPool();
1308 return;
1311 DCHECK(result == OK || result == ERR_IO_PENDING);
1314 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
1315 CHECK(!in_io_loop_);
1316 DCHECK_NE(availability_state_, STATE_CLOSED);
1317 DCHECK_NE(write_state_, WRITE_STATE_IDLE);
1318 DCHECK_EQ(write_state_, expected_write_state);
1320 in_io_loop_ = true;
1322 // Loop until the session is closed or the write becomes blocked.
1323 while (true) {
1324 switch (write_state_) {
1325 case WRITE_STATE_DO_WRITE:
1326 DCHECK_EQ(result, OK);
1327 result = DoWrite();
1328 break;
1329 case WRITE_STATE_DO_WRITE_COMPLETE:
1330 result = DoWriteComplete(result);
1331 break;
1332 case WRITE_STATE_IDLE:
1333 default:
1334 NOTREACHED() << "write_state_: " << write_state_;
1335 break;
1338 if (availability_state_ == STATE_CLOSED) {
1339 DCHECK_EQ(result, error_on_close_);
1340 DCHECK_LT(result, ERR_IO_PENDING);
1341 break;
1344 if (write_state_ == WRITE_STATE_IDLE) {
1345 DCHECK_EQ(result, ERR_IO_PENDING);
1346 break;
1349 if (result == ERR_IO_PENDING)
1350 break;
1353 CHECK(in_io_loop_);
1354 in_io_loop_ = false;
1356 return result;
1359 int SpdySession::DoWrite() {
1360 CHECK(in_io_loop_);
1361 DCHECK_NE(availability_state_, STATE_CLOSED);
1363 DCHECK(buffered_spdy_framer_);
1364 if (in_flight_write_) {
1365 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1366 } else {
1367 // Grab the next frame to send.
1368 SpdyFrameType frame_type = DATA;
1369 scoped_ptr<SpdyBufferProducer> producer;
1370 base::WeakPtr<SpdyStream> stream;
1371 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
1372 write_state_ = WRITE_STATE_IDLE;
1373 return ERR_IO_PENDING;
1376 if (stream.get())
1377 CHECK(!stream->IsClosed());
1379 // Activate the stream only when sending the SYN_STREAM frame to
1380 // guarantee monotonically-increasing stream IDs.
1381 if (frame_type == SYN_STREAM) {
1382 CHECK(stream.get());
1383 CHECK_EQ(stream->stream_id(), 0u);
1384 scoped_ptr<SpdyStream> owned_stream =
1385 ActivateCreatedStream(stream.get());
1386 InsertActivatedStream(owned_stream.Pass());
1389 in_flight_write_ = producer->ProduceBuffer();
1390 if (!in_flight_write_) {
1391 NOTREACHED();
1392 return ERR_UNEXPECTED;
1394 in_flight_write_frame_type_ = frame_type;
1395 in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
1396 DCHECK_GE(in_flight_write_frame_size_,
1397 buffered_spdy_framer_->GetFrameMinimumSize());
1398 in_flight_write_stream_ = stream;
1401 write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
1403 // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
1404 // with Socket implementations that don't store their IOBuffer
1405 // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
1406 scoped_refptr<IOBuffer> write_io_buffer =
1407 in_flight_write_->GetIOBufferForRemainingData();
1408 return connection_->socket()->Write(
1409 write_io_buffer.get(),
1410 in_flight_write_->GetRemainingSize(),
1411 base::Bind(&SpdySession::PumpWriteLoop,
1412 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
1415 int SpdySession::DoWriteComplete(int result) {
1416 CHECK(in_io_loop_);
1417 DCHECK_NE(availability_state_, STATE_CLOSED);
1418 DCHECK_NE(result, ERR_IO_PENDING);
1419 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1421 last_activity_time_ = time_func_();
1423 if (result < 0) {
1424 DCHECK_NE(result, ERR_IO_PENDING);
1425 in_flight_write_.reset();
1426 in_flight_write_frame_type_ = DATA;
1427 in_flight_write_frame_size_ = 0;
1428 in_flight_write_stream_.reset();
1429 CloseSessionResult close_session_result =
1430 DoCloseSession(static_cast<Error>(result), "Write error");
1431 DCHECK_EQ(close_session_result, SESSION_CLOSED_BUT_NOT_REMOVED);
1432 DCHECK_EQ(availability_state_, STATE_CLOSED);
1433 DCHECK_EQ(error_on_close_, result);
1434 return result;
1437 // It should not be possible to have written more bytes than our
1438 // in_flight_write_.
1439 DCHECK_LE(static_cast<size_t>(result),
1440 in_flight_write_->GetRemainingSize());
1442 if (result > 0) {
1443 in_flight_write_->Consume(static_cast<size_t>(result));
1445 // We only notify the stream when we've fully written the pending frame.
1446 if (in_flight_write_->GetRemainingSize() == 0) {
1447 // It is possible that the stream was cancelled while we were
1448 // writing to the socket.
1449 if (in_flight_write_stream_.get()) {
1450 DCHECK_GT(in_flight_write_frame_size_, 0u);
1451 in_flight_write_stream_->OnFrameWriteComplete(
1452 in_flight_write_frame_type_,
1453 in_flight_write_frame_size_);
1456 // Cleanup the write which just completed.
1457 in_flight_write_.reset();
1458 in_flight_write_frame_type_ = DATA;
1459 in_flight_write_frame_size_ = 0;
1460 in_flight_write_stream_.reset();
1464 write_state_ = WRITE_STATE_DO_WRITE;
1465 return OK;
1468 void SpdySession::DcheckGoingAway() const {
1469 #if DCHECK_IS_ON
1470 DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1471 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
1472 DCHECK(pending_create_stream_queues_[i].empty());
1474 DCHECK(created_streams_.empty());
1475 #endif
1478 void SpdySession::DcheckClosed() const {
1479 DcheckGoingAway();
1480 DCHECK_EQ(availability_state_, STATE_CLOSED);
1481 DCHECK_LT(error_on_close_, ERR_IO_PENDING);
1482 DCHECK(active_streams_.empty());
1483 DCHECK(unclaimed_pushed_streams_.empty());
1484 DCHECK(write_queue_.IsEmpty());
1487 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
1488 Error status) {
1489 DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1491 // The loops below are carefully written to avoid reentrancy problems.
1493 while (true) {
1494 size_t old_size = GetTotalSize(pending_create_stream_queues_);
1495 base::WeakPtr<SpdyStreamRequest> pending_request =
1496 GetNextPendingStreamRequest();
1497 if (!pending_request)
1498 break;
1499 // No new stream requests should be added while the session is
1500 // going away.
1501 DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_));
1502 pending_request->OnRequestCompleteFailure(ERR_ABORTED);
1505 while (true) {
1506 size_t old_size = active_streams_.size();
1507 ActiveStreamMap::iterator it =
1508 active_streams_.lower_bound(last_good_stream_id + 1);
1509 if (it == active_streams_.end())
1510 break;
1511 LogAbandonedActiveStream(it, status);
1512 CloseActiveStreamIterator(it, status);
1513 // No new streams should be activated while the session is going
1514 // away.
1515 DCHECK_GT(old_size, active_streams_.size());
1518 while (!created_streams_.empty()) {
1519 size_t old_size = created_streams_.size();
1520 CreatedStreamSet::iterator it = created_streams_.begin();
1521 LogAbandonedStream(*it, status);
1522 CloseCreatedStreamIterator(it, status);
1523 // No new streams should be created while the session is going
1524 // away.
1525 DCHECK_GT(old_size, created_streams_.size());
1528 write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
1530 DcheckGoingAway();
1533 void SpdySession::MaybeFinishGoingAway() {
1534 DcheckGoingAway();
1535 if (active_streams_.empty() && availability_state_ != STATE_CLOSED) {
1536 CloseSessionResult result =
1537 DoCloseSession(ERR_CONNECTION_CLOSED, "Finished going away");
1538 CHECK_NE(result, SESSION_ALREADY_CLOSED);
1542 SpdySession::CloseSessionResult SpdySession::DoCloseSession(
1543 Error err,
1544 const std::string& description) {
1545 CHECK_LT(err, ERR_IO_PENDING);
1547 if (availability_state_ == STATE_CLOSED)
1548 return SESSION_ALREADY_CLOSED;
1550 net_log_.AddEvent(
1551 NetLog::TYPE_SPDY_SESSION_CLOSE,
1552 base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
1554 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
1555 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
1556 total_bytes_received_, 1, 100000000, 50);
1558 CHECK(pool_);
1559 if (availability_state_ != STATE_GOING_AWAY)
1560 pool_->MakeSessionUnavailable(GetWeakPtr());
1562 availability_state_ = STATE_CLOSED;
1563 error_on_close_ = err;
1565 StartGoingAway(0, err);
1566 write_queue_.Clear();
1568 DcheckClosed();
1570 if (in_io_loop_)
1571 return SESSION_CLOSED_BUT_NOT_REMOVED;
1573 RemoveFromPool();
1574 return SESSION_CLOSED_AND_REMOVED;
1577 void SpdySession::RemoveFromPool() {
1578 DcheckClosed();
1579 CHECK(pool_);
1581 SpdySessionPool* pool = pool_;
1582 pool_ = NULL;
1583 pool->RemoveUnavailableSession(GetWeakPtr());
1586 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
1587 DCHECK(stream);
1588 std::string description = base::StringPrintf(
1589 "ABANDONED (stream_id=%d): ", stream->stream_id()) +
1590 stream->url().spec();
1591 stream->LogStreamError(status, description);
1592 // We don't increment the streams abandoned counter here. If the
1593 // stream isn't active (i.e., it hasn't written anything to the wire
1594 // yet) then it's as if it never existed. If it is active, then
1595 // LogAbandonedActiveStream() will increment the counters.
1598 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
1599 Error status) {
1600 DCHECK_GT(it->first, 0u);
1601 LogAbandonedStream(it->second.stream, status);
1602 ++streams_abandoned_count_;
1603 base::StatsCounter abandoned_streams("spdy.abandoned_streams");
1604 abandoned_streams.Increment();
1605 if (it->second.stream->type() == SPDY_PUSH_STREAM &&
1606 unclaimed_pushed_streams_.find(it->second.stream->url()) !=
1607 unclaimed_pushed_streams_.end()) {
1608 base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
1609 abandoned_push_streams.Increment();
1613 int SpdySession::GetNewStreamId() {
1614 int id = stream_hi_water_mark_;
1615 stream_hi_water_mark_ += 2;
1616 if (stream_hi_water_mark_ > 0x7fff)
1617 stream_hi_water_mark_ = 1;
1618 return id;
1621 void SpdySession::CloseSessionOnError(Error err,
1622 const std::string& description) {
1623 // We may be called from anywhere, so we can't expect a particular
1624 // return value.
1625 ignore_result(DoCloseSession(err, description));
1628 void SpdySession::MakeUnavailable() {
1629 if (availability_state_ < STATE_GOING_AWAY) {
1630 availability_state_ = STATE_GOING_AWAY;
1631 DCHECK(pool_);
1632 pool_->MakeSessionUnavailable(GetWeakPtr());
1636 base::Value* SpdySession::GetInfoAsValue() const {
1637 base::DictionaryValue* dict = new base::DictionaryValue();
1639 dict->SetInteger("source_id", net_log_.source().id);
1641 dict->SetString("host_port_pair", host_port_pair().ToString());
1642 if (!pooled_aliases_.empty()) {
1643 base::ListValue* alias_list = new base::ListValue();
1644 for (std::set<SpdySessionKey>::const_iterator it =
1645 pooled_aliases_.begin();
1646 it != pooled_aliases_.end(); it++) {
1647 alias_list->Append(new base::StringValue(
1648 it->host_port_pair().ToString()));
1650 dict->Set("aliases", alias_list);
1652 dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
1654 dict->SetInteger("active_streams", active_streams_.size());
1656 dict->SetInteger("unclaimed_pushed_streams",
1657 unclaimed_pushed_streams_.size());
1659 dict->SetBoolean("is_secure", is_secure_);
1661 dict->SetString("protocol_negotiated",
1662 SSLClientSocket::NextProtoToString(
1663 connection_->socket()->GetNegotiatedProtocol()));
1665 dict->SetInteger("error", error_on_close_);
1666 dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
1668 dict->SetInteger("streams_initiated_count", streams_initiated_count_);
1669 dict->SetInteger("streams_pushed_count", streams_pushed_count_);
1670 dict->SetInteger("streams_pushed_and_claimed_count",
1671 streams_pushed_and_claimed_count_);
1672 dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
1673 DCHECK(buffered_spdy_framer_.get());
1674 dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
1676 dict->SetBoolean("sent_settings", sent_settings_);
1677 dict->SetBoolean("received_settings", received_settings_);
1679 dict->SetInteger("send_window_size", session_send_window_size_);
1680 dict->SetInteger("recv_window_size", session_recv_window_size_);
1681 dict->SetInteger("unacked_recv_window_bytes",
1682 session_unacked_recv_window_bytes_);
1683 return dict;
1686 bool SpdySession::IsReused() const {
1687 return buffered_spdy_framer_->frames_received() > 0 ||
1688 connection_->reuse_type() == ClientSocketHandle::UNUSED_IDLE;
1691 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
1692 LoadTimingInfo* load_timing_info) const {
1693 return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
1694 load_timing_info);
1697 int SpdySession::GetPeerAddress(IPEndPoint* address) const {
1698 int rv = ERR_SOCKET_NOT_CONNECTED;
1699 if (connection_->socket()) {
1700 rv = connection_->socket()->GetPeerAddress(address);
1703 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
1704 rv == ERR_SOCKET_NOT_CONNECTED);
1706 return rv;
1709 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
1710 int rv = ERR_SOCKET_NOT_CONNECTED;
1711 if (connection_->socket()) {
1712 rv = connection_->socket()->GetLocalAddress(address);
1715 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
1716 rv == ERR_SOCKET_NOT_CONNECTED);
1718 return rv;
1721 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1722 SpdyFrameType frame_type,
1723 scoped_ptr<SpdyFrame> frame) {
1724 DCHECK(frame_type == RST_STREAM ||
1725 frame_type == SETTINGS ||
1726 frame_type == WINDOW_UPDATE ||
1727 frame_type == PING);
1728 EnqueueWrite(
1729 priority, frame_type,
1730 scoped_ptr<SpdyBufferProducer>(
1731 new SimpleBufferProducer(
1732 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1733 base::WeakPtr<SpdyStream>());
1736 void SpdySession::EnqueueWrite(RequestPriority priority,
1737 SpdyFrameType frame_type,
1738 scoped_ptr<SpdyBufferProducer> producer,
1739 const base::WeakPtr<SpdyStream>& stream) {
1740 if (availability_state_ == STATE_CLOSED)
1741 return;
1743 bool was_idle = write_queue_.IsEmpty();
1744 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1745 if (write_state_ == WRITE_STATE_IDLE) {
1746 DCHECK(was_idle);
1747 DCHECK(!in_flight_write_);
1748 write_state_ = WRITE_STATE_DO_WRITE;
1749 base::MessageLoop::current()->PostTask(
1750 FROM_HERE,
1751 base::Bind(&SpdySession::PumpWriteLoop,
1752 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
1756 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
1757 CHECK_EQ(stream->stream_id(), 0u);
1758 CHECK(created_streams_.find(stream.get()) == created_streams_.end());
1759 created_streams_.insert(stream.release());
1762 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
1763 CHECK_EQ(stream->stream_id(), 0u);
1764 CHECK(created_streams_.find(stream) != created_streams_.end());
1765 stream->set_stream_id(GetNewStreamId());
1766 scoped_ptr<SpdyStream> owned_stream(stream);
1767 created_streams_.erase(stream);
1768 return owned_stream.Pass();
1771 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
1772 SpdyStreamId stream_id = stream->stream_id();
1773 CHECK_NE(stream_id, 0u);
1774 std::pair<ActiveStreamMap::iterator, bool> result =
1775 active_streams_.insert(
1776 std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
1777 CHECK(result.second);
1778 ignore_result(stream.release());
1781 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
1782 if (in_flight_write_stream_.get() == stream.get()) {
1783 // If we're deleting the stream for the in-flight write, we still
1784 // need to let the write complete, so we clear
1785 // |in_flight_write_stream_| and let the write finish on its own
1786 // without notifying |in_flight_write_stream_|.
1787 in_flight_write_stream_.reset();
1790 write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
1792 // |stream->OnClose()| may end up closing |this|, so detect that.
1793 base::WeakPtr<SpdySession> weak_this = GetWeakPtr();
1795 stream->OnClose(status);
1797 if (!weak_this)
1798 return;
1800 switch (availability_state_) {
1801 case STATE_AVAILABLE:
1802 ProcessPendingStreamRequests();
1803 break;
1804 case STATE_GOING_AWAY:
1805 DcheckGoingAway();
1806 MaybeFinishGoingAway();
1807 break;
1808 case STATE_CLOSED:
1809 // Do nothing.
1810 break;
1814 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
1815 base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1817 PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
1818 if (unclaimed_it == unclaimed_pushed_streams_.end())
1819 return base::WeakPtr<SpdyStream>();
1821 SpdyStreamId stream_id = unclaimed_it->second.stream_id;
1822 unclaimed_pushed_streams_.erase(unclaimed_it);
1824 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
1825 if (active_it == active_streams_.end()) {
1826 NOTREACHED();
1827 return base::WeakPtr<SpdyStream>();
1830 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
1831 used_push_streams.Increment();
1832 return active_it->second.stream->GetWeakPtr();
1835 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
1836 bool* was_npn_negotiated,
1837 NextProto* protocol_negotiated) {
1838 *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
1839 *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
1840 return connection_->socket()->GetSSLInfo(ssl_info);
1843 bool SpdySession::GetSSLCertRequestInfo(
1844 SSLCertRequestInfo* cert_request_info) {
1845 if (!is_secure_)
1846 return false;
1847 GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
1848 return true;
1851 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
1852 CHECK(in_io_loop_);
1854 if (availability_state_ == STATE_CLOSED)
1855 return;
1857 RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code));
1858 std::string description = base::StringPrintf(
1859 "SPDY_ERROR error_code: %d.", error_code);
1860 CloseSessionResult result =
1861 DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, description);
1862 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
1865 void SpdySession::OnStreamError(SpdyStreamId stream_id,
1866 const std::string& description) {
1867 CHECK(in_io_loop_);
1869 if (availability_state_ == STATE_CLOSED)
1870 return;
1872 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1873 if (it == active_streams_.end()) {
1874 // We still want to send a frame to reset the stream even if we
1875 // don't know anything about it.
1876 EnqueueResetStreamFrame(
1877 stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
1878 return;
1881 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
1884 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
1885 size_t length,
1886 bool fin) {
1887 CHECK(in_io_loop_);
1889 if (availability_state_ == STATE_CLOSED)
1890 return;
1892 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1894 // By the time data comes in, the stream may already be inactive.
1895 if (it == active_streams_.end())
1896 return;
1898 SpdyStream* stream = it->second.stream;
1899 CHECK_EQ(stream->stream_id(), stream_id);
1901 DCHECK(buffered_spdy_framer_);
1902 size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
1903 stream->IncrementRawReceivedBytes(header_len);
1906 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
1907 const char* data,
1908 size_t len,
1909 bool fin) {
1910 CHECK(in_io_loop_);
1912 if (availability_state_ == STATE_CLOSED)
1913 return;
1915 if (data == NULL && len != 0) {
1916 // This is notification of consumed data padding.
1917 // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames.
1918 // See crbug.com/353012.
1919 return;
1922 DCHECK_LT(len, 1u << 24);
1923 if (net_log().IsLogging()) {
1924 net_log().AddEvent(
1925 NetLog::TYPE_SPDY_SESSION_RECV_DATA,
1926 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
1929 // Build the buffer as early as possible so that we go through the
1930 // session flow control checks and update
1931 // |unacked_recv_window_bytes_| properly even when the stream is
1932 // inactive (since the other side has still reduced its session send
1933 // window).
1934 scoped_ptr<SpdyBuffer> buffer;
1935 if (data) {
1936 DCHECK_GT(len, 0u);
1937 CHECK_LE(len, static_cast<size_t>(kReadBufferSize));
1938 buffer.reset(new SpdyBuffer(data, len));
1940 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1941 DecreaseRecvWindowSize(static_cast<int32>(len));
1942 buffer->AddConsumeCallback(
1943 base::Bind(&SpdySession::OnReadBufferConsumed,
1944 weak_factory_.GetWeakPtr()));
1946 } else {
1947 DCHECK_EQ(len, 0u);
1950 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1952 // By the time data comes in, the stream may already be inactive.
1953 if (it == active_streams_.end())
1954 return;
1956 SpdyStream* stream = it->second.stream;
1957 CHECK_EQ(stream->stream_id(), stream_id);
1959 stream->IncrementRawReceivedBytes(len);
1961 if (it->second.waiting_for_syn_reply) {
1962 const std::string& error = "Data received before SYN_REPLY.";
1963 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
1964 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
1965 return;
1968 stream->OnDataReceived(buffer.Pass());
1971 void SpdySession::OnSettings(bool clear_persisted) {
1972 CHECK(in_io_loop_);
1974 if (availability_state_ == STATE_CLOSED)
1975 return;
1977 if (clear_persisted)
1978 http_server_properties_->ClearSpdySettings(host_port_pair());
1980 if (net_log_.IsLogging()) {
1981 net_log_.AddEvent(
1982 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
1983 base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
1984 clear_persisted));
1988 void SpdySession::OnSetting(SpdySettingsIds id,
1989 uint8 flags,
1990 uint32 value) {
1991 CHECK(in_io_loop_);
1993 if (availability_state_ == STATE_CLOSED)
1994 return;
1996 HandleSetting(id, value);
1997 http_server_properties_->SetSpdySetting(
1998 host_port_pair(),
2000 static_cast<SpdySettingsFlags>(flags),
2001 value);
2002 received_settings_ = true;
2004 // Log the setting.
2005 net_log_.AddEvent(
2006 NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
2007 base::Bind(&NetLogSpdySettingCallback,
2008 id, static_cast<SpdySettingsFlags>(flags), value));
2011 void SpdySession::OnSendCompressedFrame(
2012 SpdyStreamId stream_id,
2013 SpdyFrameType type,
2014 size_t payload_len,
2015 size_t frame_len) {
2016 if (type != SYN_STREAM)
2017 return;
2019 DCHECK(buffered_spdy_framer_.get());
2020 size_t compressed_len =
2021 frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
2023 if (payload_len) {
2024 // Make sure we avoid early decimal truncation.
2025 int compression_pct = 100 - (100 * compressed_len) / payload_len;
2026 UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
2027 compression_pct);
2031 void SpdySession::OnReceiveCompressedFrame(
2032 SpdyStreamId stream_id,
2033 SpdyFrameType type,
2034 size_t frame_len) {
2035 last_compressed_frame_len_ = frame_len;
2038 int SpdySession::OnInitialResponseHeadersReceived(
2039 const SpdyHeaderBlock& response_headers,
2040 base::Time response_time,
2041 base::TimeTicks recv_first_byte_time,
2042 SpdyStream* stream) {
2043 CHECK(in_io_loop_);
2044 SpdyStreamId stream_id = stream->stream_id();
2045 // May invalidate |stream|.
2046 int rv = stream->OnInitialResponseHeadersReceived(
2047 response_headers, response_time, recv_first_byte_time);
2048 if (rv < 0) {
2049 DCHECK_NE(rv, ERR_IO_PENDING);
2050 DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2052 return rv;
2055 void SpdySession::OnSynStream(SpdyStreamId stream_id,
2056 SpdyStreamId associated_stream_id,
2057 SpdyPriority priority,
2058 bool fin,
2059 bool unidirectional,
2060 const SpdyHeaderBlock& headers) {
2061 CHECK(in_io_loop_);
2063 if (availability_state_ == STATE_CLOSED)
2064 return;
2066 base::Time response_time = base::Time::Now();
2067 base::TimeTicks recv_first_byte_time = time_func_();
2069 if (net_log_.IsLogging()) {
2070 net_log_.AddEvent(
2071 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
2072 base::Bind(&NetLogSpdySynStreamReceivedCallback,
2073 &headers, fin, unidirectional, priority,
2074 stream_id, associated_stream_id));
2077 // Server-initiated streams should have even sequence numbers.
2078 if ((stream_id & 0x1) != 0) {
2079 LOG(WARNING) << "Received invalid OnSyn stream id " << stream_id;
2080 return;
2083 if (IsStreamActive(stream_id)) {
2084 LOG(WARNING) << "Received OnSyn for active stream " << stream_id;
2085 return;
2088 RequestPriority request_priority =
2089 ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
2091 if (availability_state_ == STATE_GOING_AWAY) {
2092 // TODO(akalin): This behavior isn't in the SPDY spec, although it
2093 // probably should be.
2094 EnqueueResetStreamFrame(stream_id, request_priority,
2095 RST_STREAM_REFUSED_STREAM,
2096 "OnSyn received when going away");
2097 return;
2100 // TODO(jgraettinger): SpdyFramer simulates OnSynStream() from HEADERS
2101 // frames, which don't convey associated stream ID. Disable this check
2102 // for now, and re-enable when PUSH_PROMISE is implemented properly.
2103 if (associated_stream_id == 0 && GetProtocolVersion() < SPDY4) {
2104 std::string description = base::StringPrintf(
2105 "Received invalid OnSyn associated stream id %d for stream %d",
2106 associated_stream_id, stream_id);
2107 EnqueueResetStreamFrame(stream_id, request_priority,
2108 RST_STREAM_REFUSED_STREAM, description);
2109 return;
2112 streams_pushed_count_++;
2114 // TODO(mbelshe): DCHECK that this is a GET method?
2116 // Verify that the response had a URL for us.
2117 GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
2118 if (!gurl.is_valid()) {
2119 EnqueueResetStreamFrame(
2120 stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR,
2121 "Pushed stream url was invalid: " + gurl.spec());
2122 return;
2125 // Verify we have a valid stream association.
2126 ActiveStreamMap::iterator associated_it =
2127 active_streams_.find(associated_stream_id);
2128 // TODO(jgraettinger): (See PUSH_PROMISE comment above).
2129 if (GetProtocolVersion() < SPDY4 && associated_it == active_streams_.end()) {
2130 EnqueueResetStreamFrame(
2131 stream_id, request_priority, RST_STREAM_INVALID_STREAM,
2132 base::StringPrintf(
2133 "Received OnSyn with inactive associated stream %d",
2134 associated_stream_id));
2135 return;
2138 // Check that the SYN advertises the same origin as its associated stream.
2139 // Bypass this check if and only if this session is with a SPDY proxy that
2140 // is trusted explicitly via the --trusted-spdy-proxy switch.
2141 if (trusted_spdy_proxy_.Equals(host_port_pair())) {
2142 // Disallow pushing of HTTPS content.
2143 if (gurl.SchemeIs("https")) {
2144 EnqueueResetStreamFrame(
2145 stream_id, request_priority, RST_STREAM_REFUSED_STREAM,
2146 base::StringPrintf(
2147 "Rejected push of Cross Origin HTTPS content %d",
2148 associated_stream_id));
2150 } else if (GetProtocolVersion() < SPDY4) {
2151 // TODO(jgraettinger): (See PUSH_PROMISE comment above).
2152 GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
2153 if (associated_url.GetOrigin() != gurl.GetOrigin()) {
2154 EnqueueResetStreamFrame(
2155 stream_id, request_priority, RST_STREAM_REFUSED_STREAM,
2156 base::StringPrintf(
2157 "Rejected Cross Origin Push Stream %d",
2158 associated_stream_id));
2159 return;
2163 // There should not be an existing pushed stream with the same path.
2164 PushedStreamMap::iterator pushed_it =
2165 unclaimed_pushed_streams_.lower_bound(gurl);
2166 if (pushed_it != unclaimed_pushed_streams_.end() &&
2167 pushed_it->first == gurl) {
2168 EnqueueResetStreamFrame(
2169 stream_id, request_priority, RST_STREAM_PROTOCOL_ERROR,
2170 "Received duplicate pushed stream with url: " +
2171 gurl.spec());
2172 return;
2175 scoped_ptr<SpdyStream> stream(
2176 new SpdyStream(SPDY_PUSH_STREAM, GetWeakPtr(), gurl,
2177 request_priority,
2178 stream_initial_send_window_size_,
2179 stream_initial_recv_window_size_,
2180 net_log_));
2181 stream->set_stream_id(stream_id);
2182 stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2183 last_compressed_frame_len_ = 0;
2185 DeleteExpiredPushedStreams();
2186 PushedStreamMap::iterator inserted_pushed_it =
2187 unclaimed_pushed_streams_.insert(
2188 pushed_it,
2189 std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
2190 DCHECK(inserted_pushed_it != pushed_it);
2192 InsertActivatedStream(stream.Pass());
2194 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2195 if (active_it == active_streams_.end()) {
2196 NOTREACHED();
2197 return;
2200 // Parse the headers.
2201 if (OnInitialResponseHeadersReceived(
2202 headers, response_time,
2203 recv_first_byte_time, active_it->second.stream) != OK)
2204 return;
2206 base::StatsCounter push_requests("spdy.pushed_streams");
2207 push_requests.Increment();
2210 void SpdySession::DeleteExpiredPushedStreams() {
2211 if (unclaimed_pushed_streams_.empty())
2212 return;
2214 // Check that adequate time has elapsed since the last sweep.
2215 if (time_func_() < next_unclaimed_push_stream_sweep_time_)
2216 return;
2218 // Gather old streams to delete.
2219 base::TimeTicks minimum_freshness = time_func_() -
2220 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2221 std::vector<SpdyStreamId> streams_to_close;
2222 for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
2223 it != unclaimed_pushed_streams_.end(); ++it) {
2224 if (minimum_freshness > it->second.creation_time)
2225 streams_to_close.push_back(it->second.stream_id);
2228 for (std::vector<SpdyStreamId>::const_iterator to_close_it =
2229 streams_to_close.begin();
2230 to_close_it != streams_to_close.end(); ++to_close_it) {
2231 ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
2232 if (active_it == active_streams_.end())
2233 continue;
2235 LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
2236 // CloseActiveStreamIterator() will remove the stream from
2237 // |unclaimed_pushed_streams_|.
2238 ResetStreamIterator(
2239 active_it, RST_STREAM_REFUSED_STREAM, "Stream not claimed.");
2242 next_unclaimed_push_stream_sweep_time_ = time_func_() +
2243 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2246 void SpdySession::OnSynReply(SpdyStreamId stream_id,
2247 bool fin,
2248 const SpdyHeaderBlock& headers) {
2249 CHECK(in_io_loop_);
2251 if (availability_state_ == STATE_CLOSED)
2252 return;
2254 base::Time response_time = base::Time::Now();
2255 base::TimeTicks recv_first_byte_time = time_func_();
2257 if (net_log().IsLogging()) {
2258 net_log().AddEvent(
2259 NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
2260 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2261 &headers, fin, stream_id));
2264 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2265 if (it == active_streams_.end()) {
2266 // NOTE: it may just be that the stream was cancelled.
2267 return;
2270 SpdyStream* stream = it->second.stream;
2271 CHECK_EQ(stream->stream_id(), stream_id);
2273 stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2274 last_compressed_frame_len_ = 0;
2276 if (GetProtocolVersion() >= SPDY4) {
2277 const std::string& error =
2278 "SPDY4 wasn't expecting SYN_REPLY.";
2279 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2280 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2281 return;
2283 if (!it->second.waiting_for_syn_reply) {
2284 const std::string& error =
2285 "Received duplicate SYN_REPLY for stream.";
2286 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2287 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2288 return;
2290 it->second.waiting_for_syn_reply = false;
2292 ignore_result(OnInitialResponseHeadersReceived(
2293 headers, response_time, recv_first_byte_time, stream));
2296 void SpdySession::OnHeaders(SpdyStreamId stream_id,
2297 bool fin,
2298 const SpdyHeaderBlock& headers) {
2299 CHECK(in_io_loop_);
2301 if (availability_state_ == STATE_CLOSED)
2302 return;
2304 if (net_log().IsLogging()) {
2305 net_log().AddEvent(
2306 NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
2307 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2308 &headers, fin, stream_id));
2311 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2312 if (it == active_streams_.end()) {
2313 // NOTE: it may just be that the stream was cancelled.
2314 LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
2315 return;
2318 SpdyStream* stream = it->second.stream;
2319 CHECK_EQ(stream->stream_id(), stream_id);
2321 stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2322 last_compressed_frame_len_ = 0;
2324 if (it->second.waiting_for_syn_reply) {
2325 if (GetProtocolVersion() < SPDY4) {
2326 const std::string& error =
2327 "Was expecting SYN_REPLY, not HEADERS.";
2328 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2329 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2330 return;
2332 base::Time response_time = base::Time::Now();
2333 base::TimeTicks recv_first_byte_time = time_func_();
2335 it->second.waiting_for_syn_reply = false;
2336 ignore_result(OnInitialResponseHeadersReceived(
2337 headers, response_time, recv_first_byte_time, stream));
2338 } else {
2339 int rv = stream->OnAdditionalResponseHeadersReceived(headers);
2340 if (rv < 0) {
2341 DCHECK_NE(rv, ERR_IO_PENDING);
2342 DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2347 void SpdySession::OnRstStream(SpdyStreamId stream_id,
2348 SpdyRstStreamStatus status) {
2349 CHECK(in_io_loop_);
2351 if (availability_state_ == STATE_CLOSED)
2352 return;
2354 std::string description;
2355 net_log().AddEvent(
2356 NetLog::TYPE_SPDY_SESSION_RST_STREAM,
2357 base::Bind(&NetLogSpdyRstCallback,
2358 stream_id, status, &description));
2360 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2361 if (it == active_streams_.end()) {
2362 // NOTE: it may just be that the stream was cancelled.
2363 LOG(WARNING) << "Received RST for invalid stream" << stream_id;
2364 return;
2367 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2369 if (status == 0) {
2370 it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
2371 } else if (status == RST_STREAM_REFUSED_STREAM) {
2372 CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
2373 } else {
2374 RecordProtocolErrorHistogram(
2375 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
2376 it->second.stream->LogStreamError(
2377 ERR_SPDY_PROTOCOL_ERROR,
2378 base::StringPrintf("SPDY stream closed with status: %d", status));
2379 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
2380 // For now, it doesn't matter much - it is a protocol error.
2381 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
2385 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
2386 SpdyGoAwayStatus status) {
2387 CHECK(in_io_loop_);
2389 if (availability_state_ == STATE_CLOSED)
2390 return;
2392 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
2393 base::Bind(&NetLogSpdyGoAwayCallback,
2394 last_accepted_stream_id,
2395 active_streams_.size(),
2396 unclaimed_pushed_streams_.size(),
2397 status));
2398 MakeUnavailable();
2399 StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
2400 // This is to handle the case when we already don't have any active
2401 // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
2402 // active streams and so the last one being closed will finish the
2403 // going away process (see DeleteStream()).
2404 MaybeFinishGoingAway();
2407 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) {
2408 CHECK(in_io_loop_);
2410 if (availability_state_ == STATE_CLOSED)
2411 return;
2413 net_log_.AddEvent(
2414 NetLog::TYPE_SPDY_SESSION_PING,
2415 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received"));
2417 // Send response to a PING from server.
2418 if ((protocol_ >= kProtoSPDY4 && !is_ack) ||
2419 (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) {
2420 WritePingFrame(unique_id, true);
2421 return;
2424 --pings_in_flight_;
2425 if (pings_in_flight_ < 0) {
2426 RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
2427 CloseSessionResult result =
2428 DoCloseSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
2429 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
2430 pings_in_flight_ = 0;
2431 return;
2434 if (pings_in_flight_ > 0)
2435 return;
2437 // We will record RTT in histogram when there are no more client sent
2438 // pings_in_flight_.
2439 RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
2442 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
2443 uint32 delta_window_size) {
2444 CHECK(in_io_loop_);
2446 if (availability_state_ == STATE_CLOSED)
2447 return;
2449 DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
2450 net_log_.AddEvent(
2451 NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
2452 base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2453 stream_id, delta_window_size));
2455 if (stream_id == kSessionFlowControlStreamId) {
2456 // WINDOW_UPDATE for the session.
2457 if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
2458 LOG(WARNING) << "Received WINDOW_UPDATE for session when "
2459 << "session flow control is not turned on";
2460 // TODO(akalin): Record an error and close the session.
2461 return;
2464 if (delta_window_size < 1u) {
2465 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2466 CloseSessionResult result = DoCloseSession(
2467 ERR_SPDY_PROTOCOL_ERROR,
2468 "Received WINDOW_UPDATE with an invalid delta_window_size " +
2469 base::UintToString(delta_window_size));
2470 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
2471 return;
2474 IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
2475 } else {
2476 // WINDOW_UPDATE for a stream.
2477 if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2478 // TODO(akalin): Record an error and close the session.
2479 LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
2480 << " when flow control is not turned on";
2481 return;
2484 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2486 if (it == active_streams_.end()) {
2487 // NOTE: it may just be that the stream was cancelled.
2488 LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
2489 return;
2492 SpdyStream* stream = it->second.stream;
2493 CHECK_EQ(stream->stream_id(), stream_id);
2495 if (delta_window_size < 1u) {
2496 ResetStreamIterator(it,
2497 RST_STREAM_FLOW_CONTROL_ERROR,
2498 base::StringPrintf(
2499 "Received WINDOW_UPDATE with an invalid "
2500 "delta_window_size %ud", delta_window_size));
2501 return;
2504 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2505 it->second.stream->IncreaseSendWindowSize(
2506 static_cast<int32>(delta_window_size));
2510 void SpdySession::OnPushPromise(SpdyStreamId stream_id,
2511 SpdyStreamId promised_stream_id) {
2512 // TODO(akalin): Handle PUSH_PROMISE frames.
2515 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
2516 uint32 delta_window_size) {
2517 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2518 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2519 CHECK(it != active_streams_.end());
2520 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2521 SendWindowUpdateFrame(
2522 stream_id, delta_window_size, it->second.stream->priority());
2525 void SpdySession::SendInitialData() {
2526 DCHECK(enable_sending_initial_data_);
2527 DCHECK_NE(availability_state_, STATE_CLOSED);
2529 if (send_connection_header_prefix_) {
2530 DCHECK_EQ(protocol_, kProtoSPDY4);
2531 scoped_ptr<SpdyFrame> connection_header_prefix_frame(
2532 new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
2533 kHttp2ConnectionHeaderPrefixSize,
2534 false /* take_ownership */));
2535 // Count the prefix as part of the subsequent SETTINGS frame.
2536 EnqueueSessionWrite(HIGHEST, SETTINGS,
2537 connection_header_prefix_frame.Pass());
2540 // First, notify the server about the settings they should use when
2541 // communicating with us.
2542 SettingsMap settings_map;
2543 // Create a new settings frame notifying the server of our
2544 // max concurrent streams and initial window size.
2545 settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
2546 SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
2547 if (flow_control_state_ >= FLOW_CONTROL_STREAM &&
2548 stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
2549 settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
2550 SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
2551 stream_initial_recv_window_size_);
2553 SendSettings(settings_map);
2555 // Next, notify the server about our initial recv window size.
2556 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2557 // Bump up the receive window size to the real initial value. This
2558 // has to go here since the WINDOW_UPDATE frame sent by
2559 // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
2560 DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
2561 // This condition implies that |kDefaultInitialRecvWindowSize| -
2562 // |session_recv_window_size_| doesn't overflow.
2563 DCHECK_GT(session_recv_window_size_, 0);
2564 IncreaseRecvWindowSize(
2565 kDefaultInitialRecvWindowSize - session_recv_window_size_);
2568 // Finally, notify the server about the settings they have
2569 // previously told us to use when communicating with them (after
2570 // applying them).
2571 const SettingsMap& server_settings_map =
2572 http_server_properties_->GetSpdySettings(host_port_pair());
2573 if (server_settings_map.empty())
2574 return;
2576 SettingsMap::const_iterator it =
2577 server_settings_map.find(SETTINGS_CURRENT_CWND);
2578 uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0;
2579 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
2581 for (SettingsMap::const_iterator it = server_settings_map.begin();
2582 it != server_settings_map.end(); ++it) {
2583 const SpdySettingsIds new_id = it->first;
2584 const uint32 new_val = it->second.second;
2585 HandleSetting(new_id, new_val);
2588 SendSettings(server_settings_map);
2592 void SpdySession::SendSettings(const SettingsMap& settings) {
2593 DCHECK_NE(availability_state_, STATE_CLOSED);
2595 net_log_.AddEvent(
2596 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
2597 base::Bind(&NetLogSpdySendSettingsCallback, &settings));
2599 // Create the SETTINGS frame and send it.
2600 DCHECK(buffered_spdy_framer_.get());
2601 scoped_ptr<SpdyFrame> settings_frame(
2602 buffered_spdy_framer_->CreateSettings(settings));
2603 sent_settings_ = true;
2604 EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
2607 void SpdySession::HandleSetting(uint32 id, uint32 value) {
2608 switch (id) {
2609 case SETTINGS_MAX_CONCURRENT_STREAMS:
2610 max_concurrent_streams_ = std::min(static_cast<size_t>(value),
2611 kMaxConcurrentStreamLimit);
2612 ProcessPendingStreamRequests();
2613 break;
2614 case SETTINGS_INITIAL_WINDOW_SIZE: {
2615 if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2616 net_log().AddEvent(
2617 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
2618 return;
2621 if (value > static_cast<uint32>(kint32max)) {
2622 net_log().AddEvent(
2623 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
2624 NetLog::IntegerCallback("initial_window_size", value));
2625 return;
2628 // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
2629 int32 delta_window_size =
2630 static_cast<int32>(value) - stream_initial_send_window_size_;
2631 stream_initial_send_window_size_ = static_cast<int32>(value);
2632 UpdateStreamsSendWindowSize(delta_window_size);
2633 net_log().AddEvent(
2634 NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
2635 NetLog::IntegerCallback("delta_window_size", delta_window_size));
2636 break;
2641 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
2642 DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2643 for (ActiveStreamMap::iterator it = active_streams_.begin();
2644 it != active_streams_.end(); ++it) {
2645 it->second.stream->AdjustSendWindowSize(delta_window_size);
2648 for (CreatedStreamSet::const_iterator it = created_streams_.begin();
2649 it != created_streams_.end(); it++) {
2650 (*it)->AdjustSendWindowSize(delta_window_size);
2654 void SpdySession::SendPrefacePingIfNoneInFlight() {
2655 if (pings_in_flight_ || !enable_ping_based_connection_checking_)
2656 return;
2658 base::TimeTicks now = time_func_();
2659 // If there is no activity in the session, then send a preface-PING.
2660 if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
2661 SendPrefacePing();
2664 void SpdySession::SendPrefacePing() {
2665 WritePingFrame(next_ping_id_, false);
2668 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
2669 uint32 delta_window_size,
2670 RequestPriority priority) {
2671 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2672 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2673 if (it != active_streams_.end()) {
2674 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2675 } else {
2676 CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2677 CHECK_EQ(stream_id, kSessionFlowControlStreamId);
2680 net_log_.AddEvent(
2681 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
2682 base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2683 stream_id, delta_window_size));
2685 DCHECK(buffered_spdy_framer_.get());
2686 scoped_ptr<SpdyFrame> window_update_frame(
2687 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
2688 EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
2691 void SpdySession::WritePingFrame(uint32 unique_id, bool is_ack) {
2692 DCHECK(buffered_spdy_framer_.get());
2693 scoped_ptr<SpdyFrame> ping_frame(
2694 buffered_spdy_framer_->CreatePingFrame(unique_id, is_ack));
2695 EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
2697 if (net_log().IsLogging()) {
2698 net_log().AddEvent(
2699 NetLog::TYPE_SPDY_SESSION_PING,
2700 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "sent"));
2702 if (!is_ack) {
2703 next_ping_id_ += 2;
2704 ++pings_in_flight_;
2705 PlanToCheckPingStatus();
2706 last_ping_sent_time_ = time_func_();
2710 void SpdySession::PlanToCheckPingStatus() {
2711 if (check_ping_status_pending_)
2712 return;
2714 check_ping_status_pending_ = true;
2715 base::MessageLoop::current()->PostDelayedTask(
2716 FROM_HERE,
2717 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2718 time_func_()), hung_interval_);
2721 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
2722 CHECK(!in_io_loop_);
2723 DCHECK_NE(availability_state_, STATE_CLOSED);
2725 // Check if we got a response back for all PINGs we had sent.
2726 if (pings_in_flight_ == 0) {
2727 check_ping_status_pending_ = false;
2728 return;
2731 DCHECK(check_ping_status_pending_);
2733 base::TimeTicks now = time_func_();
2734 base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
2736 if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
2737 // Track all failed PING messages in a separate bucket.
2738 RecordPingRTTHistogram(base::TimeDelta::Max());
2739 CloseSessionResult result =
2740 DoCloseSession(ERR_SPDY_PING_FAILED, "Failed ping.");
2741 DCHECK_EQ(result, SESSION_CLOSED_AND_REMOVED);
2742 return;
2745 // Check the status of connection after a delay.
2746 base::MessageLoop::current()->PostDelayedTask(
2747 FROM_HERE,
2748 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2749 now),
2750 delay);
2753 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
2754 UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
2757 void SpdySession::RecordProtocolErrorHistogram(
2758 SpdyProtocolErrorDetails details) {
2759 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
2760 NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2761 if (EndsWith(host_port_pair().host(), "google.com", false)) {
2762 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
2763 NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2767 void SpdySession::RecordHistograms() {
2768 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
2769 streams_initiated_count_,
2770 0, 300, 50);
2771 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
2772 streams_pushed_count_,
2773 0, 300, 50);
2774 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
2775 streams_pushed_and_claimed_count_,
2776 0, 300, 50);
2777 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
2778 streams_abandoned_count_,
2779 0, 300, 50);
2780 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
2781 sent_settings_ ? 1 : 0, 2);
2782 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
2783 received_settings_ ? 1 : 0, 2);
2784 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
2785 stalled_streams_,
2786 0, 300, 50);
2787 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
2788 stalled_streams_ > 0 ? 1 : 0, 2);
2790 if (received_settings_) {
2791 // Enumerate the saved settings, and set histograms for it.
2792 const SettingsMap& settings_map =
2793 http_server_properties_->GetSpdySettings(host_port_pair());
2795 SettingsMap::const_iterator it;
2796 for (it = settings_map.begin(); it != settings_map.end(); ++it) {
2797 const SpdySettingsIds id = it->first;
2798 const uint32 val = it->second.second;
2799 switch (id) {
2800 case SETTINGS_CURRENT_CWND:
2801 // Record several different histograms to see if cwnd converges
2802 // for larger volumes of data being sent.
2803 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
2804 val, 1, 200, 100);
2805 if (total_bytes_received_ > 10 * 1024) {
2806 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
2807 val, 1, 200, 100);
2808 if (total_bytes_received_ > 25 * 1024) {
2809 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
2810 val, 1, 200, 100);
2811 if (total_bytes_received_ > 50 * 1024) {
2812 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
2813 val, 1, 200, 100);
2814 if (total_bytes_received_ > 100 * 1024) {
2815 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
2816 val, 1, 200, 100);
2821 break;
2822 case SETTINGS_ROUND_TRIP_TIME:
2823 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
2824 val, 1, 1200, 100);
2825 break;
2826 case SETTINGS_DOWNLOAD_RETRANS_RATE:
2827 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
2828 val, 1, 100, 50);
2829 break;
2830 default:
2831 break;
2837 void SpdySession::CompleteStreamRequest(
2838 const base::WeakPtr<SpdyStreamRequest>& pending_request) {
2839 // Abort if the request has already been cancelled.
2840 if (!pending_request)
2841 return;
2843 base::WeakPtr<SpdyStream> stream;
2844 int rv = CreateStream(*pending_request, &stream);
2846 if (rv == OK) {
2847 DCHECK(stream);
2848 pending_request->OnRequestCompleteSuccess(stream);
2849 } else {
2850 DCHECK(!stream);
2851 pending_request->OnRequestCompleteFailure(rv);
2855 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
2856 if (!is_secure_)
2857 return NULL;
2858 SSLClientSocket* ssl_socket =
2859 reinterpret_cast<SSLClientSocket*>(connection_->socket());
2860 DCHECK(ssl_socket);
2861 return ssl_socket;
2864 void SpdySession::OnWriteBufferConsumed(
2865 size_t frame_payload_size,
2866 size_t consume_size,
2867 SpdyBuffer::ConsumeSource consume_source) {
2868 // We can be called with |in_io_loop_| set if a write SpdyBuffer is
2869 // deleted (e.g., a stream is closed due to incoming data).
2871 if (availability_state_ == STATE_CLOSED)
2872 return;
2874 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2876 if (consume_source == SpdyBuffer::DISCARD) {
2877 // If we're discarding a frame or part of it, increase the send
2878 // window by the number of discarded bytes. (Although if we're
2879 // discarding part of a frame, it's probably because of a write
2880 // error and we'll be tearing down the session soon.)
2881 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
2882 DCHECK_GT(remaining_payload_bytes, 0u);
2883 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
2885 // For consumed bytes, the send window is increased when we receive
2886 // a WINDOW_UPDATE frame.
2889 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
2890 // We can be called with |in_io_loop_| set if a SpdyBuffer is
2891 // deleted (e.g., a stream is closed due to incoming data).
2893 DCHECK_NE(availability_state_, STATE_CLOSED);
2894 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2895 DCHECK_GE(delta_window_size, 1);
2897 // Check for overflow.
2898 int32 max_delta_window_size = kint32max - session_send_window_size_;
2899 if (delta_window_size > max_delta_window_size) {
2900 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2901 CloseSessionResult result = DoCloseSession(
2902 ERR_SPDY_PROTOCOL_ERROR,
2903 "Received WINDOW_UPDATE [delta: " +
2904 base::IntToString(delta_window_size) +
2905 "] for session overflows session_send_window_size_ [current: " +
2906 base::IntToString(session_send_window_size_) + "]");
2907 DCHECK_NE(result, SESSION_ALREADY_CLOSED);
2908 return;
2911 session_send_window_size_ += delta_window_size;
2913 net_log_.AddEvent(
2914 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
2915 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2916 delta_window_size, session_send_window_size_));
2918 DCHECK(!IsSendStalled());
2919 ResumeSendStalledStreams();
2922 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
2923 DCHECK_NE(availability_state_, STATE_CLOSED);
2924 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2926 // We only call this method when sending a frame. Therefore,
2927 // |delta_window_size| should be within the valid frame size range.
2928 DCHECK_GE(delta_window_size, 1);
2929 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
2931 // |send_window_size_| should have been at least |delta_window_size| for
2932 // this call to happen.
2933 DCHECK_GE(session_send_window_size_, delta_window_size);
2935 session_send_window_size_ -= delta_window_size;
2937 net_log_.AddEvent(
2938 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
2939 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2940 -delta_window_size, session_send_window_size_));
2943 void SpdySession::OnReadBufferConsumed(
2944 size_t consume_size,
2945 SpdyBuffer::ConsumeSource consume_source) {
2946 // We can be called with |in_io_loop_| set if a read SpdyBuffer is
2947 // deleted (e.g., discarded by a SpdyReadQueue).
2949 if (availability_state_ == STATE_CLOSED)
2950 return;
2952 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2953 DCHECK_GE(consume_size, 1u);
2954 DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
2956 IncreaseRecvWindowSize(static_cast<int32>(consume_size));
2959 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
2960 DCHECK_NE(availability_state_, STATE_CLOSED);
2961 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2962 DCHECK_GE(session_unacked_recv_window_bytes_, 0);
2963 DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
2964 DCHECK_GE(delta_window_size, 1);
2965 // Check for overflow.
2966 DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
2968 session_recv_window_size_ += delta_window_size;
2969 net_log_.AddEvent(
2970 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
2971 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
2972 delta_window_size, session_recv_window_size_));
2974 session_unacked_recv_window_bytes_ += delta_window_size;
2975 if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
2976 SendWindowUpdateFrame(kSessionFlowControlStreamId,
2977 session_unacked_recv_window_bytes_,
2978 HIGHEST);
2979 session_unacked_recv_window_bytes_ = 0;
2983 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
2984 CHECK(in_io_loop_);
2985 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2986 DCHECK_GE(delta_window_size, 1);
2988 // Since we never decrease the initial receive window size,
2989 // |delta_window_size| should never cause |recv_window_size_| to go
2990 // negative. If we do, the receive window isn't being respected.
2991 if (delta_window_size > session_recv_window_size_) {
2992 RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
2993 CloseSessionResult result = DoCloseSession(
2994 ERR_SPDY_PROTOCOL_ERROR,
2995 "delta_window_size is " + base::IntToString(delta_window_size) +
2996 " in DecreaseRecvWindowSize, which is larger than the receive " +
2997 "window size of " + base::IntToString(session_recv_window_size_));
2998 DCHECK_EQ(result, SESSION_CLOSED_BUT_NOT_REMOVED);
2999 return;
3002 session_recv_window_size_ -= delta_window_size;
3003 net_log_.AddEvent(
3004 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
3005 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3006 -delta_window_size, session_recv_window_size_));
3009 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
3010 DCHECK(stream.send_stalled_by_flow_control());
3011 RequestPriority priority = stream.priority();
3012 CHECK_GE(priority, MINIMUM_PRIORITY);
3013 CHECK_LE(priority, MAXIMUM_PRIORITY);
3014 stream_send_unstall_queue_[priority].push_back(stream.stream_id());
3017 void SpdySession::ResumeSendStalledStreams() {
3018 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3020 // We don't have to worry about new streams being queued, since
3021 // doing so would cause IsSendStalled() to return true. But we do
3022 // have to worry about streams being closed, as well as ourselves
3023 // being closed.
3025 while (availability_state_ != STATE_CLOSED && !IsSendStalled()) {
3026 size_t old_size = 0;
3027 #if DCHECK_IS_ON
3028 old_size = GetTotalSize(stream_send_unstall_queue_);
3029 #endif
3031 SpdyStreamId stream_id = PopStreamToPossiblyResume();
3032 if (stream_id == 0)
3033 break;
3034 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
3035 // The stream may actually still be send-stalled after this (due
3036 // to its own send window) but that's okay -- it'll then be
3037 // resumed once its send window increases.
3038 if (it != active_streams_.end())
3039 it->second.stream->PossiblyResumeIfSendStalled();
3041 // The size should decrease unless we got send-stalled again.
3042 if (!IsSendStalled())
3043 DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
3047 SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
3048 for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
3049 std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
3050 if (!queue->empty()) {
3051 SpdyStreamId stream_id = queue->front();
3052 queue->pop_front();
3053 return stream_id;
3056 return 0;
3059 } // namespace net