Add default implementations for AppWindowRegistry::Observer notifications.
[chromium-blink-merge.git] / net / websockets / websocket_job.cc
blobcd8f09dd791a5b60dd6adfcfc2f948ec496456b1
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/websockets/websocket_job.h"
7 #include <algorithm>
9 #include "base/bind.h"
10 #include "base/lazy_instance.h"
11 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h"
13 #include "net/base/net_log.h"
14 #include "net/cookies/cookie_store.h"
15 #include "net/http/http_network_session.h"
16 #include "net/http/http_transaction_factory.h"
17 #include "net/http/http_util.h"
18 #include "net/spdy/spdy_session.h"
19 #include "net/spdy/spdy_session_pool.h"
20 #include "net/url_request/url_request_context.h"
21 #include "net/websockets/websocket_handshake_handler.h"
22 #include "net/websockets/websocket_net_log_params.h"
23 #include "net/websockets/websocket_throttle.h"
24 #include "url/gurl.h"
26 static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes.
28 namespace {
30 // lower-case header names.
31 const char* const kCookieHeaders[] = {
32 "cookie", "cookie2"
34 const char* const kSetCookieHeaders[] = {
35 "set-cookie", "set-cookie2"
38 net::SocketStreamJob* WebSocketJobFactory(
39 const GURL& url, net::SocketStream::Delegate* delegate,
40 net::URLRequestContext* context, net::CookieStore* cookie_store) {
41 net::WebSocketJob* job = new net::WebSocketJob(delegate);
42 job->InitSocketStream(new net::SocketStream(url, job, context, cookie_store));
43 return job;
46 class WebSocketJobInitSingleton {
47 private:
48 friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
49 WebSocketJobInitSingleton() {
50 net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
51 net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
55 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init =
56 LAZY_INSTANCE_INITIALIZER;
58 } // anonymous namespace
60 namespace net {
62 bool WebSocketJob::websocket_over_spdy_enabled_ = false;
64 // static
65 void WebSocketJob::EnsureInit() {
66 g_websocket_job_init.Get();
69 // static
70 void WebSocketJob::set_websocket_over_spdy_enabled(bool enabled) {
71 websocket_over_spdy_enabled_ = enabled;
74 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
75 : delegate_(delegate),
76 state_(INITIALIZED),
77 waiting_(false),
78 handshake_request_(new WebSocketHandshakeRequestHandler),
79 handshake_response_(new WebSocketHandshakeResponseHandler),
80 started_to_send_handshake_request_(false),
81 handshake_request_sent_(0),
82 response_cookies_save_index_(0),
83 spdy_protocol_version_(0),
84 save_next_cookie_running_(false),
85 callback_pending_(false),
86 weak_ptr_factory_(this),
87 weak_ptr_factory_for_send_pending_(this) {
90 WebSocketJob::~WebSocketJob() {
91 DCHECK_EQ(CLOSED, state_);
92 DCHECK(!delegate_);
93 DCHECK(!socket_.get());
96 void WebSocketJob::Connect() {
97 DCHECK(socket_.get());
98 DCHECK_EQ(state_, INITIALIZED);
99 state_ = CONNECTING;
100 socket_->Connect();
103 bool WebSocketJob::SendData(const char* data, int len) {
104 switch (state_) {
105 case INITIALIZED:
106 return false;
108 case CONNECTING:
109 return SendHandshakeRequest(data, len);
111 case OPEN:
113 scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len);
114 memcpy(buffer->data(), data, len);
115 if (current_send_buffer_.get() || !send_buffer_queue_.empty()) {
116 send_buffer_queue_.push_back(buffer);
117 return true;
119 current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len);
120 return SendDataInternal(current_send_buffer_->data(),
121 current_send_buffer_->BytesRemaining());
124 case CLOSING:
125 case CLOSED:
126 return false;
128 return false;
131 void WebSocketJob::Close() {
132 if (state_ == CLOSED)
133 return;
135 state_ = CLOSING;
136 if (current_send_buffer_.get()) {
137 // Will close in SendPending.
138 return;
140 state_ = CLOSED;
141 CloseInternal();
144 void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) {
145 state_ = CONNECTING;
146 socket_->RestartWithAuth(credentials);
149 void WebSocketJob::DetachDelegate() {
150 state_ = CLOSED;
151 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
153 scoped_refptr<WebSocketJob> protect(this);
154 weak_ptr_factory_.InvalidateWeakPtrs();
155 weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs();
157 delegate_ = NULL;
158 if (socket_.get())
159 socket_->DetachDelegate();
160 socket_ = NULL;
161 if (!callback_.is_null()) {
162 waiting_ = false;
163 callback_.Reset();
164 Release(); // Balanced with OnStartOpenConnection().
168 int WebSocketJob::OnStartOpenConnection(
169 SocketStream* socket, const CompletionCallback& callback) {
170 DCHECK(callback_.is_null());
171 state_ = CONNECTING;
173 addresses_ = socket->address_list();
174 if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) {
175 return ERR_WS_THROTTLE_QUEUE_TOO_LARGE;
178 if (delegate_) {
179 int result = delegate_->OnStartOpenConnection(socket, callback);
180 DCHECK_EQ(OK, result);
182 if (waiting_) {
183 // PutInQueue() may set |waiting_| true for throttling. In this case,
184 // Wakeup() will be called later.
185 callback_ = callback;
186 AddRef(); // Balanced when callback_ is cleared.
187 return ERR_IO_PENDING;
189 return TrySpdyStream();
192 void WebSocketJob::OnConnected(
193 SocketStream* socket, int max_pending_send_allowed) {
194 if (state_ == CLOSED)
195 return;
196 DCHECK_EQ(CONNECTING, state_);
197 if (delegate_)
198 delegate_->OnConnected(socket, max_pending_send_allowed);
201 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
202 DCHECK_NE(INITIALIZED, state_);
203 DCHECK_GT(amount_sent, 0);
204 if (state_ == CLOSED)
205 return;
206 if (state_ == CONNECTING) {
207 OnSentHandshakeRequest(socket, amount_sent);
208 return;
210 if (delegate_) {
211 DCHECK(state_ == OPEN || state_ == CLOSING);
212 if (!current_send_buffer_.get()) {
213 VLOG(1)
214 << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent;
215 return;
217 current_send_buffer_->DidConsume(amount_sent);
218 if (current_send_buffer_->BytesRemaining() > 0)
219 return;
221 // We need to report amount_sent of original buffer size, instead of
222 // amount sent to |socket|.
223 amount_sent = current_send_buffer_->size();
224 DCHECK_GT(amount_sent, 0);
225 current_send_buffer_ = NULL;
226 if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) {
227 base::MessageLoopForIO::current()->PostTask(
228 FROM_HERE,
229 base::Bind(&WebSocketJob::SendPending,
230 weak_ptr_factory_for_send_pending_.GetWeakPtr()));
232 delegate_->OnSentData(socket, amount_sent);
236 void WebSocketJob::OnReceivedData(
237 SocketStream* socket, const char* data, int len) {
238 DCHECK_NE(INITIALIZED, state_);
239 if (state_ == CLOSED)
240 return;
241 if (state_ == CONNECTING) {
242 OnReceivedHandshakeResponse(socket, data, len);
243 return;
245 DCHECK(state_ == OPEN || state_ == CLOSING);
246 if (delegate_ && len > 0)
247 delegate_->OnReceivedData(socket, data, len);
250 void WebSocketJob::OnClose(SocketStream* socket) {
251 state_ = CLOSED;
252 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
254 scoped_refptr<WebSocketJob> protect(this);
255 weak_ptr_factory_.InvalidateWeakPtrs();
257 SocketStream::Delegate* delegate = delegate_;
258 delegate_ = NULL;
259 socket_ = NULL;
260 if (!callback_.is_null()) {
261 waiting_ = false;
262 callback_.Reset();
263 Release(); // Balanced with OnStartOpenConnection().
265 if (delegate)
266 delegate->OnClose(socket);
269 void WebSocketJob::OnAuthRequired(
270 SocketStream* socket, AuthChallengeInfo* auth_info) {
271 if (delegate_)
272 delegate_->OnAuthRequired(socket, auth_info);
275 void WebSocketJob::OnSSLCertificateError(
276 SocketStream* socket, const SSLInfo& ssl_info, bool fatal) {
277 if (delegate_)
278 delegate_->OnSSLCertificateError(socket, ssl_info, fatal);
281 void WebSocketJob::OnError(const SocketStream* socket, int error) {
282 if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
283 delegate_->OnError(socket, error);
286 void WebSocketJob::OnCreatedSpdyStream(int result) {
287 DCHECK(spdy_websocket_stream_.get());
288 DCHECK(socket_.get());
289 DCHECK_NE(ERR_IO_PENDING, result);
291 if (state_ == CLOSED) {
292 result = ERR_ABORTED;
293 } else if (result == OK) {
294 state_ = CONNECTING;
295 result = ERR_PROTOCOL_SWITCHED;
296 } else {
297 spdy_websocket_stream_.reset();
300 CompleteIO(result);
303 void WebSocketJob::OnSentSpdyHeaders() {
304 DCHECK_NE(INITIALIZED, state_);
305 if (state_ != CONNECTING)
306 return;
307 size_t original_length = handshake_request_->original_length();
308 handshake_request_.reset();
309 if (delegate_)
310 delegate_->OnSentData(socket_.get(), original_length);
313 void WebSocketJob::OnSpdyResponseHeadersUpdated(
314 const SpdyHeaderBlock& response_headers) {
315 DCHECK_NE(INITIALIZED, state_);
316 if (state_ != CONNECTING)
317 return;
318 // TODO(toyoshim): Fallback to non-spdy connection?
319 handshake_response_->ParseResponseHeaderBlock(response_headers,
320 challenge_,
321 spdy_protocol_version_);
323 SaveCookiesAndNotifyHeadersComplete();
326 void WebSocketJob::OnSentSpdyData(size_t bytes_sent) {
327 DCHECK_NE(INITIALIZED, state_);
328 DCHECK_NE(CONNECTING, state_);
329 if (state_ == CLOSED)
330 return;
331 if (!spdy_websocket_stream_.get())
332 return;
333 OnSentData(socket_.get(), static_cast<int>(bytes_sent));
336 void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) {
337 DCHECK_NE(INITIALIZED, state_);
338 DCHECK_NE(CONNECTING, state_);
339 if (state_ == CLOSED)
340 return;
341 if (!spdy_websocket_stream_.get())
342 return;
343 if (buffer) {
344 OnReceivedData(
345 socket_.get(), buffer->GetRemainingData(), buffer->GetRemainingSize());
346 } else {
347 OnReceivedData(socket_.get(), NULL, 0);
351 void WebSocketJob::OnCloseSpdyStream() {
352 spdy_websocket_stream_.reset();
353 OnClose(socket_.get());
356 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
357 DCHECK_EQ(state_, CONNECTING);
358 if (started_to_send_handshake_request_)
359 return false;
360 if (!handshake_request_->ParseRequest(data, len))
361 return false;
363 AddCookieHeaderAndSend();
364 return true;
367 void WebSocketJob::AddCookieHeaderAndSend() {
368 bool allow = true;
369 if (delegate_ && !delegate_->CanGetCookies(socket_.get(), GetURLForCookies()))
370 allow = false;
372 if (socket_.get() && delegate_ && state_ == CONNECTING) {
373 handshake_request_->RemoveHeaders(kCookieHeaders,
374 arraysize(kCookieHeaders));
375 if (allow && socket_->cookie_store()) {
376 // Add cookies, including HttpOnly cookies.
377 CookieOptions cookie_options;
378 cookie_options.set_include_httponly();
379 socket_->cookie_store()->GetCookiesWithOptionsAsync(
380 GetURLForCookies(), cookie_options,
381 base::Bind(&WebSocketJob::LoadCookieCallback,
382 weak_ptr_factory_.GetWeakPtr()));
383 } else {
384 DoSendData();
389 void WebSocketJob::LoadCookieCallback(const std::string& cookie) {
390 if (!cookie.empty())
391 // TODO(tyoshino): Sending cookie means that connection doesn't need
392 // PRIVACY_MODE_ENABLED as cookies may be server-bound and channel id
393 // wouldn't negatively affect privacy anyway. Need to restart connection
394 // or refactor to determine cookie status prior to connecting.
395 handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
396 DoSendData();
399 void WebSocketJob::DoSendData() {
400 if (spdy_websocket_stream_.get()) {
401 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
402 handshake_request_->GetRequestHeaderBlock(
403 socket_->url(), headers.get(), &challenge_, spdy_protocol_version_);
404 spdy_websocket_stream_->SendRequest(headers.Pass());
405 } else {
406 const std::string& handshake_request =
407 handshake_request_->GetRawRequest();
408 handshake_request_sent_ = 0;
409 socket_->net_log()->AddEvent(
410 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
411 base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request));
412 socket_->SendData(handshake_request.data(),
413 handshake_request.size());
415 // Just buffered in |handshake_request_|.
416 started_to_send_handshake_request_ = true;
419 void WebSocketJob::OnSentHandshakeRequest(
420 SocketStream* socket, int amount_sent) {
421 DCHECK_EQ(state_, CONNECTING);
422 handshake_request_sent_ += amount_sent;
423 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
424 if (handshake_request_sent_ >= handshake_request_->raw_length()) {
425 // handshake request has been sent.
426 // notify original size of handshake request to delegate.
427 // Reset the handshake_request_ first in case this object is deleted by the
428 // delegate.
429 size_t original_length = handshake_request_->original_length();
430 handshake_request_.reset();
431 if (delegate_)
432 delegate_->OnSentData(socket, original_length);
436 void WebSocketJob::OnReceivedHandshakeResponse(
437 SocketStream* socket, const char* data, int len) {
438 DCHECK_EQ(state_, CONNECTING);
439 if (handshake_response_->HasResponse()) {
440 // If we already has handshake response, received data should be frame
441 // data, not handshake message.
442 received_data_after_handshake_.insert(
443 received_data_after_handshake_.end(), data, data + len);
444 return;
447 size_t response_length = handshake_response_->ParseRawResponse(data, len);
448 if (!handshake_response_->HasResponse()) {
449 // not yet. we need more data.
450 return;
452 // handshake message is completed.
453 std::string raw_response = handshake_response_->GetRawResponse();
454 socket_->net_log()->AddEvent(
455 NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
456 base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response));
457 if (len - response_length > 0) {
458 // If we received extra data, it should be frame data.
459 DCHECK(received_data_after_handshake_.empty());
460 received_data_after_handshake_.assign(data + response_length, data + len);
462 SaveCookiesAndNotifyHeadersComplete();
465 void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() {
466 // handshake message is completed.
467 DCHECK(handshake_response_->HasResponse());
469 // Extract cookies from the handshake response into a temporary vector.
470 response_cookies_.clear();
471 response_cookies_save_index_ = 0;
473 handshake_response_->GetHeaders(
474 kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
476 // Now, loop over the response cookies, and attempt to persist each.
477 SaveNextCookie();
480 void WebSocketJob::NotifyHeadersComplete() {
481 // Remove cookie headers, with malformed headers preserved.
482 // Actual handshake should be done in Blink.
483 handshake_response_->RemoveHeaders(
484 kSetCookieHeaders, arraysize(kSetCookieHeaders));
485 std::string handshake_response = handshake_response_->GetResponse();
486 handshake_response_.reset();
487 std::vector<char> received_data(handshake_response.begin(),
488 handshake_response.end());
489 received_data.insert(received_data.end(),
490 received_data_after_handshake_.begin(),
491 received_data_after_handshake_.end());
492 received_data_after_handshake_.clear();
494 state_ = OPEN;
496 DCHECK(!received_data.empty());
497 if (delegate_)
498 delegate_->OnReceivedData(
499 socket_.get(), &received_data.front(), received_data.size());
501 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
504 void WebSocketJob::SaveNextCookie() {
505 if (!socket_.get() || !delegate_ || state_ != CONNECTING)
506 return;
508 callback_pending_ = false;
509 save_next_cookie_running_ = true;
511 if (socket_->cookie_store()) {
512 GURL url_for_cookies = GetURLForCookies();
514 CookieOptions options;
515 options.set_include_httponly();
517 // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since
518 // CookieMonster's asynchronous operation APIs queue the callback to run it
519 // on the thread where the API was called, there won't be race. I.e. unless
520 // the callback is run synchronously, it won't be run in parallel with this
521 // method.
522 while (!callback_pending_ &&
523 response_cookies_save_index_ < response_cookies_.size()) {
524 std::string cookie = response_cookies_[response_cookies_save_index_];
525 response_cookies_save_index_++;
527 if (!delegate_->CanSetCookie(
528 socket_.get(), url_for_cookies, cookie, &options))
529 continue;
531 callback_pending_ = true;
532 socket_->cookie_store()->SetCookieWithOptionsAsync(
533 url_for_cookies, cookie, options,
534 base::Bind(&WebSocketJob::OnCookieSaved,
535 weak_ptr_factory_.GetWeakPtr()));
539 save_next_cookie_running_ = false;
541 if (callback_pending_)
542 return;
544 response_cookies_.clear();
545 response_cookies_save_index_ = 0;
547 NotifyHeadersComplete();
550 void WebSocketJob::OnCookieSaved(bool cookie_status) {
551 // Tell the caller of SetCookieWithOptionsAsync() that this completion
552 // callback is invoked.
553 // - If the caller checks callback_pending earlier than this callback, the
554 // caller exits to let this method continue iteration.
555 // - Otherwise, the caller continues iteration.
556 callback_pending_ = false;
558 // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited
559 // the loop. Otherwise, return.
560 if (save_next_cookie_running_)
561 return;
563 SaveNextCookie();
566 GURL WebSocketJob::GetURLForCookies() const {
567 GURL url = socket_->url();
568 std::string scheme = socket_->is_secure() ? "https" : "http";
569 url::Replacements<char> replacements;
570 replacements.SetScheme(scheme.c_str(), url::Component(0, scheme.length()));
571 return url.ReplaceComponents(replacements);
574 const AddressList& WebSocketJob::address_list() const {
575 return addresses_;
578 int WebSocketJob::TrySpdyStream() {
579 if (!socket_.get())
580 return ERR_FAILED;
582 if (!websocket_over_spdy_enabled_)
583 return OK;
585 // Check if we have a SPDY session available.
586 HttpTransactionFactory* factory =
587 socket_->context()->http_transaction_factory();
588 if (!factory)
589 return OK;
590 scoped_refptr<HttpNetworkSession> session = factory->GetSession();
591 if (!session.get())
592 return OK;
593 SpdySessionPool* spdy_pool = session->spdy_session_pool();
594 PrivacyMode privacy_mode = socket_->privacy_mode();
595 const SpdySessionKey key(HostPortPair::FromURL(socket_->url()),
596 socket_->proxy_server(), privacy_mode);
597 // Forbid wss downgrade to SPDY without SSL.
598 // TODO(toyoshim): Does it realize the same policy with HTTP?
599 base::WeakPtr<SpdySession> spdy_session =
600 spdy_pool->FindAvailableSession(key, *socket_->net_log());
601 if (!spdy_session)
602 return OK;
604 SSLInfo ssl_info;
605 bool was_npn_negotiated;
606 NextProto protocol_negotiated = kProtoUnknown;
607 bool use_ssl = spdy_session->GetSSLInfo(
608 &ssl_info, &was_npn_negotiated, &protocol_negotiated);
609 if (socket_->is_secure() && !use_ssl)
610 return OK;
612 // Create SpdyWebSocketStream.
613 spdy_protocol_version_ = spdy_session->GetProtocolVersion();
614 spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this));
616 int result = spdy_websocket_stream_->InitializeStream(
617 socket_->url(), MEDIUM, *socket_->net_log());
618 if (result == OK) {
619 OnConnected(socket_.get(), kMaxPendingSendAllowed);
620 return ERR_PROTOCOL_SWITCHED;
622 if (result != ERR_IO_PENDING) {
623 spdy_websocket_stream_.reset();
624 return OK;
627 return ERR_IO_PENDING;
630 void WebSocketJob::SetWaiting() {
631 waiting_ = true;
634 bool WebSocketJob::IsWaiting() const {
635 return waiting_;
638 void WebSocketJob::Wakeup() {
639 if (!waiting_)
640 return;
641 waiting_ = false;
642 DCHECK(!callback_.is_null());
643 base::MessageLoopForIO::current()->PostTask(
644 FROM_HERE,
645 base::Bind(&WebSocketJob::RetryPendingIO,
646 weak_ptr_factory_.GetWeakPtr()));
649 void WebSocketJob::RetryPendingIO() {
650 int result = TrySpdyStream();
652 // In the case of ERR_IO_PENDING, CompleteIO() will be called from
653 // OnCreatedSpdyStream().
654 if (result != ERR_IO_PENDING)
655 CompleteIO(result);
658 void WebSocketJob::CompleteIO(int result) {
659 // |callback_| may be null if OnClose() or DetachDelegate() was called.
660 if (!callback_.is_null()) {
661 CompletionCallback callback = callback_;
662 callback_.Reset();
663 callback.Run(result);
664 Release(); // Balanced with OnStartOpenConnection().
668 bool WebSocketJob::SendDataInternal(const char* data, int length) {
669 if (spdy_websocket_stream_.get())
670 return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
671 if (socket_.get())
672 return socket_->SendData(data, length);
673 return false;
676 void WebSocketJob::CloseInternal() {
677 if (spdy_websocket_stream_.get())
678 spdy_websocket_stream_->Close();
679 if (socket_.get())
680 socket_->Close();
683 void WebSocketJob::SendPending() {
684 if (current_send_buffer_.get())
685 return;
687 // Current buffer has been sent. Try next if any.
688 if (send_buffer_queue_.empty()) {
689 // No more data to send.
690 if (state_ == CLOSING)
691 CloseInternal();
692 return;
695 scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front();
696 send_buffer_queue_.pop_front();
697 current_send_buffer_ =
698 new DrainableIOBuffer(next_buffer.get(), next_buffer->size());
699 SendDataInternal(current_send_buffer_->data(),
700 current_send_buffer_->BytesRemaining());
703 } // namespace net