1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/websockets/websocket_job.h"
10 #include "base/lazy_instance.h"
11 #include "base/string_tokenizer.h"
12 #include "googleurl/src/gurl.h"
13 #include "net/base/net_errors.h"
14 #include "net/base/net_log.h"
15 #include "net/cookies/cookie_store.h"
16 #include "net/base/io_buffer.h"
17 #include "net/http/http_network_session.h"
18 #include "net/http/http_transaction_factory.h"
19 #include "net/http/http_util.h"
20 #include "net/spdy/spdy_session.h"
21 #include "net/spdy/spdy_session_pool.h"
22 #include "net/url_request/url_request_context.h"
23 #include "net/websockets/websocket_handshake_handler.h"
24 #include "net/websockets/websocket_net_log_params.h"
25 #include "net/websockets/websocket_throttle.h"
27 static const int kMaxPendingSendAllowed
= 32768; // 32 kilobytes.
31 // lower-case header names.
32 const char* const kCookieHeaders
[] = {
35 const char* const kSetCookieHeaders
[] = {
36 "set-cookie", "set-cookie2"
39 net::SocketStreamJob
* WebSocketJobFactory(
40 const GURL
& url
, net::SocketStream::Delegate
* delegate
) {
41 net::WebSocketJob
* job
= new net::WebSocketJob(delegate
);
42 job
->InitSocketStream(new net::SocketStream(url
, job
));
46 class WebSocketJobInitSingleton
{
48 friend struct base::DefaultLazyInstanceTraits
<WebSocketJobInitSingleton
>;
49 WebSocketJobInitSingleton() {
50 net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory
);
51 net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory
);
55 static base::LazyInstance
<WebSocketJobInitSingleton
> g_websocket_job_init
=
56 LAZY_INSTANCE_INITIALIZER
;
58 } // anonymous namespace
62 bool WebSocketJob::websocket_over_spdy_enabled_
= false;
65 void WebSocketJob::EnsureInit() {
66 g_websocket_job_init
.Get();
70 void WebSocketJob::set_websocket_over_spdy_enabled(bool enabled
) {
71 websocket_over_spdy_enabled_
= enabled
;
74 WebSocketJob::WebSocketJob(SocketStream::Delegate
* delegate
)
75 : delegate_(delegate
),
78 handshake_request_(new WebSocketHandshakeRequestHandler
),
79 handshake_response_(new WebSocketHandshakeResponseHandler
),
80 started_to_send_handshake_request_(false),
81 handshake_request_sent_(0),
82 response_cookies_save_index_(0),
83 spdy_protocol_version_(0),
84 ALLOW_THIS_IN_INITIALIZER_LIST(weak_ptr_factory_(this)),
85 ALLOW_THIS_IN_INITIALIZER_LIST(weak_ptr_factory_for_send_pending_(this)) {
88 WebSocketJob::~WebSocketJob() {
89 DCHECK_EQ(CLOSED
, state_
);
91 DCHECK(!socket_
.get());
94 void WebSocketJob::Connect() {
95 DCHECK(socket_
.get());
96 DCHECK_EQ(state_
, INITIALIZED
);
101 bool WebSocketJob::SendData(const char* data
, int len
) {
107 return SendHandshakeRequest(data
, len
);
111 scoped_refptr
<IOBufferWithSize
> buffer
= new IOBufferWithSize(len
);
112 memcpy(buffer
->data(), data
, len
);
113 if (current_send_buffer_
|| !send_buffer_queue_
.empty()) {
114 send_buffer_queue_
.push_back(buffer
);
117 current_send_buffer_
= new DrainableIOBuffer(buffer
.get(), len
);
118 return SendDataInternal(current_send_buffer_
->data(),
119 current_send_buffer_
->BytesRemaining());
129 void WebSocketJob::Close() {
130 if (state_
== CLOSED
)
134 if (current_send_buffer_
) {
135 // Will close in SendPending.
142 void WebSocketJob::RestartWithAuth(const AuthCredentials
& credentials
) {
144 socket_
->RestartWithAuth(credentials
);
147 void WebSocketJob::DetachDelegate() {
149 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
150 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
152 scoped_refptr
<WebSocketJob
> protect(this);
153 weak_ptr_factory_
.InvalidateWeakPtrs();
154 weak_ptr_factory_for_send_pending_
.InvalidateWeakPtrs();
158 socket_
->DetachDelegate();
160 if (!callback_
.is_null()) {
163 Release(); // Balanced with OnStartOpenConnection().
167 int WebSocketJob::OnStartOpenConnection(
168 SocketStream
* socket
, const CompletionCallback
& callback
) {
169 DCHECK(callback_
.is_null());
171 addresses_
= socket
->address_list();
172 WebSocketThrottle::GetInstance()->PutInQueue(this);
174 int result
= delegate_
->OnStartOpenConnection(socket
, callback
);
175 DCHECK_EQ(OK
, result
);
178 // PutInQueue() may set |waiting_| true for throttling. In this case,
179 // Wakeup() will be called later.
180 callback_
= callback
;
181 AddRef(); // Balanced when callback_ is cleared.
182 return ERR_IO_PENDING
;
184 return TrySpdyStream();
187 void WebSocketJob::OnConnected(
188 SocketStream
* socket
, int max_pending_send_allowed
) {
189 if (state_
== CLOSED
)
191 DCHECK_EQ(CONNECTING
, state_
);
193 delegate_
->OnConnected(socket
, max_pending_send_allowed
);
196 void WebSocketJob::OnSentData(SocketStream
* socket
, int amount_sent
) {
197 DCHECK_NE(INITIALIZED
, state_
);
198 DCHECK_GT(amount_sent
, 0);
199 if (state_
== CLOSED
)
201 if (state_
== CONNECTING
) {
202 OnSentHandshakeRequest(socket
, amount_sent
);
206 DCHECK(state_
== OPEN
|| state_
== CLOSING
);
207 if (!current_send_buffer_
) {
208 VLOG(1) << "OnSentData current_send_buffer=NULL amount_sent="
212 current_send_buffer_
->DidConsume(amount_sent
);
213 if (current_send_buffer_
->BytesRemaining() > 0)
216 // We need to report amount_sent of original buffer size, instead of
217 // amount sent to |socket|.
218 amount_sent
= current_send_buffer_
->size();
219 DCHECK_GT(amount_sent
, 0);
220 current_send_buffer_
= NULL
;
221 if (!weak_ptr_factory_for_send_pending_
.HasWeakPtrs()) {
222 MessageLoopForIO::current()->PostTask(
224 base::Bind(&WebSocketJob::SendPending
,
225 weak_ptr_factory_for_send_pending_
.GetWeakPtr()));
227 delegate_
->OnSentData(socket
, amount_sent
);
231 void WebSocketJob::OnReceivedData(
232 SocketStream
* socket
, const char* data
, int len
) {
233 DCHECK_NE(INITIALIZED
, state_
);
234 if (state_
== CLOSED
)
236 if (state_
== CONNECTING
) {
237 OnReceivedHandshakeResponse(socket
, data
, len
);
240 DCHECK(state_
== OPEN
|| state_
== CLOSING
);
241 if (delegate_
&& len
> 0)
242 delegate_
->OnReceivedData(socket
, data
, len
);
245 void WebSocketJob::OnClose(SocketStream
* socket
) {
247 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
248 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
250 scoped_refptr
<WebSocketJob
> protect(this);
251 weak_ptr_factory_
.InvalidateWeakPtrs();
253 SocketStream::Delegate
* delegate
= delegate_
;
256 if (!callback_
.is_null()) {
259 Release(); // Balanced with OnStartOpenConnection().
262 delegate
->OnClose(socket
);
265 void WebSocketJob::OnAuthRequired(
266 SocketStream
* socket
, AuthChallengeInfo
* auth_info
) {
268 delegate_
->OnAuthRequired(socket
, auth_info
);
271 void WebSocketJob::OnSSLCertificateError(
272 SocketStream
* socket
, const SSLInfo
& ssl_info
, bool fatal
) {
274 delegate_
->OnSSLCertificateError(socket
, ssl_info
, fatal
);
277 void WebSocketJob::OnError(const SocketStream
* socket
, int error
) {
278 if (delegate_
&& error
!= ERR_PROTOCOL_SWITCHED
)
279 delegate_
->OnError(socket
, error
);
282 void WebSocketJob::OnCreatedSpdyStream(int result
) {
283 DCHECK(spdy_websocket_stream_
.get());
284 DCHECK(socket_
.get());
285 DCHECK_NE(ERR_IO_PENDING
, result
);
287 if (state_
== CLOSED
) {
288 result
= ERR_ABORTED
;
289 } else if (result
== OK
) {
291 result
= ERR_PROTOCOL_SWITCHED
;
293 spdy_websocket_stream_
.reset();
299 void WebSocketJob::OnSentSpdyHeaders(int result
) {
300 DCHECK_NE(INITIALIZED
, state_
);
301 if (state_
!= CONNECTING
)
304 delegate_
->OnSentData(socket_
, handshake_request_
->original_length());
305 handshake_request_
.reset();
308 int WebSocketJob::OnReceivedSpdyResponseHeader(
309 const SpdyHeaderBlock
& headers
, int status
) {
310 DCHECK_NE(INITIALIZED
, state_
);
311 if (state_
!= CONNECTING
)
315 // TODO(toyoshim): Fallback to non-spdy connection?
316 handshake_response_
->ParseResponseHeaderBlock(headers
,
318 spdy_protocol_version_
);
320 SaveCookiesAndNotifyHeaderComplete();
324 void WebSocketJob::OnSentSpdyData(int amount_sent
) {
325 DCHECK_NE(INITIALIZED
, state_
);
326 DCHECK_NE(CONNECTING
, state_
);
327 if (state_
== CLOSED
)
329 if (!spdy_websocket_stream_
.get())
331 OnSentData(socket_
, amount_sent
);
334 void WebSocketJob::OnReceivedSpdyData(const char* data
, int length
) {
335 DCHECK_NE(INITIALIZED
, state_
);
336 DCHECK_NE(CONNECTING
, state_
);
337 if (state_
== CLOSED
)
339 if (!spdy_websocket_stream_
.get())
341 OnReceivedData(socket_
, data
, length
);
344 void WebSocketJob::OnCloseSpdyStream() {
345 spdy_websocket_stream_
.reset();
349 bool WebSocketJob::SendHandshakeRequest(const char* data
, int len
) {
350 DCHECK_EQ(state_
, CONNECTING
);
351 if (started_to_send_handshake_request_
)
353 if (!handshake_request_
->ParseRequest(data
, len
))
356 // handshake message is completed.
357 handshake_response_
->set_protocol_version(
358 handshake_request_
->protocol_version());
359 AddCookieHeaderAndSend();
363 void WebSocketJob::AddCookieHeaderAndSend() {
365 if (delegate_
&& !delegate_
->CanGetCookies(socket_
, GetURLForCookies()))
368 if (socket_
&& delegate_
&& state_
== CONNECTING
) {
369 handshake_request_
->RemoveHeaders(
370 kCookieHeaders
, arraysize(kCookieHeaders
));
371 if (allow
&& socket_
->context()->cookie_store()) {
372 // Add cookies, including HttpOnly cookies.
373 CookieOptions cookie_options
;
374 cookie_options
.set_include_httponly();
375 socket_
->context()->cookie_store()->GetCookiesWithOptionsAsync(
376 GetURLForCookies(), cookie_options
,
377 base::Bind(&WebSocketJob::LoadCookieCallback
,
378 weak_ptr_factory_
.GetWeakPtr()));
385 void WebSocketJob::LoadCookieCallback(const std::string
& cookie
) {
387 handshake_request_
->AppendHeaderIfMissing("Cookie", cookie
);
391 void WebSocketJob::DoSendData() {
392 if (spdy_websocket_stream_
.get()) {
393 scoped_ptr
<SpdyHeaderBlock
> headers(new SpdyHeaderBlock
);
394 handshake_request_
->GetRequestHeaderBlock(
395 socket_
->url(), headers
.get(), &challenge_
, spdy_protocol_version_
);
396 spdy_websocket_stream_
->SendRequest(headers
.Pass());
398 const std::string
& handshake_request
=
399 handshake_request_
->GetRawRequest();
400 handshake_request_sent_
= 0;
401 socket_
->net_log()->AddEvent(
402 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS
,
403 base::Bind(&NetLogWebSocketHandshakeCallback
, &handshake_request
));
404 socket_
->SendData(handshake_request
.data(),
405 handshake_request
.size());
407 // Just buffered in |handshake_request_|.
408 started_to_send_handshake_request_
= true;
411 void WebSocketJob::OnSentHandshakeRequest(
412 SocketStream
* socket
, int amount_sent
) {
413 DCHECK_EQ(state_
, CONNECTING
);
414 handshake_request_sent_
+= amount_sent
;
415 DCHECK_LE(handshake_request_sent_
, handshake_request_
->raw_length());
416 if (handshake_request_sent_
>= handshake_request_
->raw_length()) {
417 // handshake request has been sent.
418 // notify original size of handshake request to delegate.
420 delegate_
->OnSentData(
422 handshake_request_
->original_length());
423 handshake_request_
.reset();
427 void WebSocketJob::OnReceivedHandshakeResponse(
428 SocketStream
* socket
, const char* data
, int len
) {
429 DCHECK_EQ(state_
, CONNECTING
);
430 if (handshake_response_
->HasResponse()) {
431 // If we already has handshake response, received data should be frame
432 // data, not handshake message.
433 received_data_after_handshake_
.insert(
434 received_data_after_handshake_
.end(), data
, data
+ len
);
438 size_t response_length
= handshake_response_
->ParseRawResponse(data
, len
);
439 if (!handshake_response_
->HasResponse()) {
440 // not yet. we need more data.
443 // handshake message is completed.
444 std::string raw_response
= handshake_response_
->GetRawResponse();
445 socket_
->net_log()->AddEvent(
446 NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS
,
447 base::Bind(&NetLogWebSocketHandshakeCallback
, &raw_response
));
448 if (len
- response_length
> 0) {
449 // If we received extra data, it should be frame data.
450 DCHECK(received_data_after_handshake_
.empty());
451 received_data_after_handshake_
.assign(data
+ response_length
, data
+ len
);
453 SaveCookiesAndNotifyHeaderComplete();
456 void WebSocketJob::SaveCookiesAndNotifyHeaderComplete() {
457 // handshake message is completed.
458 DCHECK(handshake_response_
->HasResponse());
460 response_cookies_
.clear();
461 response_cookies_save_index_
= 0;
463 handshake_response_
->GetHeaders(
464 kSetCookieHeaders
, arraysize(kSetCookieHeaders
), &response_cookies_
);
466 // Now, loop over the response cookies, and attempt to persist each.
470 void WebSocketJob::SaveNextCookie() {
471 if (response_cookies_save_index_
== response_cookies_
.size()) {
472 response_cookies_
.clear();
473 response_cookies_save_index_
= 0;
475 // Remove cookie headers, with malformed headers preserved.
476 // Actual handshake should be done in WebKit.
477 handshake_response_
->RemoveHeaders(
478 kSetCookieHeaders
, arraysize(kSetCookieHeaders
));
479 std::string handshake_response
= handshake_response_
->GetResponse();
480 std::vector
<char> received_data(handshake_response
.begin(),
481 handshake_response
.end());
482 received_data
.insert(received_data
.end(),
483 received_data_after_handshake_
.begin(),
484 received_data_after_handshake_
.end());
485 received_data_after_handshake_
.clear();
489 DCHECK(!received_data
.empty());
491 delegate_
->OnReceivedData(
492 socket_
, &received_data
.front(), received_data
.size());
494 handshake_response_
.reset();
496 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
497 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
502 CookieOptions options
;
503 GURL url
= GetURLForCookies();
504 std::string cookie
= response_cookies_
[response_cookies_save_index_
];
505 if (delegate_
&& !delegate_
->CanSetCookie(socket_
, url
, cookie
, &options
))
508 if (socket_
&& delegate_
&& state_
== CONNECTING
) {
509 response_cookies_save_index_
++;
510 if (allow
&& socket_
->context()->cookie_store()) {
511 options
.set_include_httponly();
512 socket_
->context()->cookie_store()->SetCookieWithOptionsAsync(
513 url
, cookie
, options
,
514 base::Bind(&WebSocketJob::SaveCookieCallback
,
515 weak_ptr_factory_
.GetWeakPtr()));
522 void WebSocketJob::SaveCookieCallback(bool cookie_status
) {
526 GURL
WebSocketJob::GetURLForCookies() const {
527 GURL url
= socket_
->url();
528 std::string scheme
= socket_
->is_secure() ? "https" : "http";
529 url_canon::Replacements
<char> replacements
;
530 replacements
.SetScheme(scheme
.c_str(),
531 url_parse::Component(0, scheme
.length()));
532 return url
.ReplaceComponents(replacements
);
535 const AddressList
& WebSocketJob::address_list() const {
539 int WebSocketJob::TrySpdyStream() {
543 if (!websocket_over_spdy_enabled_
)
546 // Check if we have a SPDY session available.
547 HttpTransactionFactory
* factory
=
548 socket_
->context()->http_transaction_factory();
551 scoped_refptr
<HttpNetworkSession
> session
= factory
->GetSession();
554 SpdySessionPool
* spdy_pool
= session
->spdy_session_pool();
555 const HostPortProxyPair
pair(HostPortPair::FromURL(socket_
->url()),
556 socket_
->proxy_server());
557 if (!spdy_pool
->HasSession(pair
))
560 // Forbid wss downgrade to SPDY without SSL.
561 // TODO(toyoshim): Does it realize the same policy with HTTP?
562 scoped_refptr
<SpdySession
> spdy_session
=
563 spdy_pool
->Get(pair
, *socket_
->net_log());
565 bool was_npn_negotiated
;
566 NextProto protocol_negotiated
= kProtoUnknown
;
567 bool use_ssl
= spdy_session
->GetSSLInfo(
568 &ssl_info
, &was_npn_negotiated
, &protocol_negotiated
);
569 if (socket_
->is_secure() && !use_ssl
)
572 // Create SpdyWebSocketStream.
573 spdy_protocol_version_
= spdy_session
->GetProtocolVersion();
574 spdy_websocket_stream_
.reset(new SpdyWebSocketStream(spdy_session
, this));
576 int result
= spdy_websocket_stream_
->InitializeStream(
577 socket_
->url(), MEDIUM
, *socket_
->net_log());
579 OnConnected(socket_
, kMaxPendingSendAllowed
);
580 return ERR_PROTOCOL_SWITCHED
;
582 if (result
!= ERR_IO_PENDING
) {
583 spdy_websocket_stream_
.reset();
587 return ERR_IO_PENDING
;
590 void WebSocketJob::SetWaiting() {
594 bool WebSocketJob::IsWaiting() const {
598 void WebSocketJob::Wakeup() {
602 DCHECK(!callback_
.is_null());
603 MessageLoopForIO::current()->PostTask(
605 base::Bind(&WebSocketJob::RetryPendingIO
,
606 weak_ptr_factory_
.GetWeakPtr()));
609 void WebSocketJob::RetryPendingIO() {
610 int result
= TrySpdyStream();
612 // In the case of ERR_IO_PENDING, CompleteIO() will be called from
613 // OnCreatedSpdyStream().
614 if (result
!= ERR_IO_PENDING
)
618 void WebSocketJob::CompleteIO(int result
) {
619 // |callback_| may be null if OnClose() or DetachDelegate() was called.
620 if (!callback_
.is_null()) {
621 CompletionCallback callback
= callback_
;
623 callback
.Run(result
);
624 Release(); // Balanced with OnStartOpenConnection().
628 bool WebSocketJob::SendDataInternal(const char* data
, int length
) {
629 if (spdy_websocket_stream_
.get())
630 return ERR_IO_PENDING
== spdy_websocket_stream_
->SendData(data
, length
);
632 return socket_
->SendData(data
, length
);
636 void WebSocketJob::CloseInternal() {
637 if (spdy_websocket_stream_
.get())
638 spdy_websocket_stream_
->Close();
643 void WebSocketJob::SendPending() {
644 if (current_send_buffer_
)
647 // Current buffer has been sent. Try next if any.
648 if (send_buffer_queue_
.empty()) {
649 // No more data to send.
650 if (state_
== CLOSING
)
655 scoped_refptr
<IOBufferWithSize
> next_buffer
= send_buffer_queue_
.front();
656 send_buffer_queue_
.pop_front();
657 current_send_buffer_
= new DrainableIOBuffer(next_buffer
,
658 next_buffer
->size());
659 SendDataInternal(current_send_buffer_
->data(),
660 current_send_buffer_
->BytesRemaining());