1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/websockets/websocket_job.h"
10 #include "base/lazy_instance.h"
11 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h"
13 #include "net/base/net_log.h"
14 #include "net/cookies/cookie_store.h"
15 #include "net/http/http_network_session.h"
16 #include "net/http/http_transaction_factory.h"
17 #include "net/http/http_util.h"
18 #include "net/spdy/spdy_session.h"
19 #include "net/spdy/spdy_session_pool.h"
20 #include "net/url_request/url_request_context.h"
21 #include "net/websockets/websocket_handshake_handler.h"
22 #include "net/websockets/websocket_net_log_params.h"
23 #include "net/websockets/websocket_throttle.h"
26 static const int kMaxPendingSendAllowed
= 32768; // 32 kilobytes.
30 // lower-case header names.
31 const char* const kCookieHeaders
[] = {
34 const char* const kSetCookieHeaders
[] = {
35 "set-cookie", "set-cookie2"
38 net::SocketStreamJob
* WebSocketJobFactory(
39 const GURL
& url
, net::SocketStream::Delegate
* delegate
,
40 net::URLRequestContext
* context
, net::CookieStore
* cookie_store
) {
41 net::WebSocketJob
* job
= new net::WebSocketJob(delegate
);
42 job
->InitSocketStream(new net::SocketStream(url
, job
, context
, cookie_store
));
46 class WebSocketJobInitSingleton
{
48 friend struct base::DefaultLazyInstanceTraits
<WebSocketJobInitSingleton
>;
49 WebSocketJobInitSingleton() {
50 net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory
);
51 net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory
);
55 static base::LazyInstance
<WebSocketJobInitSingleton
> g_websocket_job_init
=
56 LAZY_INSTANCE_INITIALIZER
;
58 } // anonymous namespace
62 bool WebSocketJob::websocket_over_spdy_enabled_
= false;
65 void WebSocketJob::EnsureInit() {
66 g_websocket_job_init
.Get();
70 void WebSocketJob::set_websocket_over_spdy_enabled(bool enabled
) {
71 websocket_over_spdy_enabled_
= enabled
;
74 WebSocketJob::WebSocketJob(SocketStream::Delegate
* delegate
)
75 : delegate_(delegate
),
78 handshake_request_(new WebSocketHandshakeRequestHandler
),
79 handshake_response_(new WebSocketHandshakeResponseHandler
),
80 started_to_send_handshake_request_(false),
81 handshake_request_sent_(0),
82 response_cookies_save_index_(0),
83 spdy_protocol_version_(0),
84 save_next_cookie_running_(false),
85 callback_pending_(false),
86 weak_ptr_factory_(this),
87 weak_ptr_factory_for_send_pending_(this) {
90 WebSocketJob::~WebSocketJob() {
91 DCHECK_EQ(CLOSED
, state_
);
93 DCHECK(!socket_
.get());
96 void WebSocketJob::Connect() {
97 DCHECK(socket_
.get());
98 DCHECK_EQ(state_
, INITIALIZED
);
103 bool WebSocketJob::SendData(const char* data
, int len
) {
109 return SendHandshakeRequest(data
, len
);
113 scoped_refptr
<IOBufferWithSize
> buffer
= new IOBufferWithSize(len
);
114 memcpy(buffer
->data(), data
, len
);
115 if (current_send_buffer_
.get() || !send_buffer_queue_
.empty()) {
116 send_buffer_queue_
.push_back(buffer
);
119 current_send_buffer_
= new DrainableIOBuffer(buffer
.get(), len
);
120 return SendDataInternal(current_send_buffer_
->data(),
121 current_send_buffer_
->BytesRemaining());
131 void WebSocketJob::Close() {
132 if (state_
== CLOSED
)
136 if (current_send_buffer_
.get()) {
137 // Will close in SendPending.
144 void WebSocketJob::RestartWithAuth(const AuthCredentials
& credentials
) {
146 socket_
->RestartWithAuth(credentials
);
149 void WebSocketJob::DetachDelegate() {
151 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
153 scoped_refptr
<WebSocketJob
> protect(this);
154 weak_ptr_factory_
.InvalidateWeakPtrs();
155 weak_ptr_factory_for_send_pending_
.InvalidateWeakPtrs();
159 socket_
->DetachDelegate();
161 if (!callback_
.is_null()) {
164 Release(); // Balanced with OnStartOpenConnection().
168 int WebSocketJob::OnStartOpenConnection(
169 SocketStream
* socket
, const CompletionCallback
& callback
) {
170 DCHECK(callback_
.is_null());
173 addresses_
= socket
->address_list();
174 if (!WebSocketThrottle::GetInstance()->PutInQueue(this)) {
175 return ERR_WS_THROTTLE_QUEUE_TOO_LARGE
;
179 int result
= delegate_
->OnStartOpenConnection(socket
, callback
);
180 DCHECK_EQ(OK
, result
);
183 // PutInQueue() may set |waiting_| true for throttling. In this case,
184 // Wakeup() will be called later.
185 callback_
= callback
;
186 AddRef(); // Balanced when callback_ is cleared.
187 return ERR_IO_PENDING
;
189 return TrySpdyStream();
192 void WebSocketJob::OnConnected(
193 SocketStream
* socket
, int max_pending_send_allowed
) {
194 if (state_
== CLOSED
)
196 DCHECK_EQ(CONNECTING
, state_
);
198 delegate_
->OnConnected(socket
, max_pending_send_allowed
);
201 void WebSocketJob::OnSentData(SocketStream
* socket
, int amount_sent
) {
202 DCHECK_NE(INITIALIZED
, state_
);
203 DCHECK_GT(amount_sent
, 0);
204 if (state_
== CLOSED
)
206 if (state_
== CONNECTING
) {
207 OnSentHandshakeRequest(socket
, amount_sent
);
211 DCHECK(state_
== OPEN
|| state_
== CLOSING
);
212 if (!current_send_buffer_
.get()) {
214 << "OnSentData current_send_buffer=NULL amount_sent=" << amount_sent
;
217 current_send_buffer_
->DidConsume(amount_sent
);
218 if (current_send_buffer_
->BytesRemaining() > 0)
221 // We need to report amount_sent of original buffer size, instead of
222 // amount sent to |socket|.
223 amount_sent
= current_send_buffer_
->size();
224 DCHECK_GT(amount_sent
, 0);
225 current_send_buffer_
= NULL
;
226 if (!weak_ptr_factory_for_send_pending_
.HasWeakPtrs()) {
227 base::MessageLoopForIO::current()->PostTask(
229 base::Bind(&WebSocketJob::SendPending
,
230 weak_ptr_factory_for_send_pending_
.GetWeakPtr()));
232 delegate_
->OnSentData(socket
, amount_sent
);
236 void WebSocketJob::OnReceivedData(
237 SocketStream
* socket
, const char* data
, int len
) {
238 DCHECK_NE(INITIALIZED
, state_
);
239 if (state_
== CLOSED
)
241 if (state_
== CONNECTING
) {
242 OnReceivedHandshakeResponse(socket
, data
, len
);
245 DCHECK(state_
== OPEN
|| state_
== CLOSING
);
246 if (delegate_
&& len
> 0)
247 delegate_
->OnReceivedData(socket
, data
, len
);
250 void WebSocketJob::OnClose(SocketStream
* socket
) {
252 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
254 scoped_refptr
<WebSocketJob
> protect(this);
255 weak_ptr_factory_
.InvalidateWeakPtrs();
257 SocketStream::Delegate
* delegate
= delegate_
;
260 if (!callback_
.is_null()) {
263 Release(); // Balanced with OnStartOpenConnection().
266 delegate
->OnClose(socket
);
269 void WebSocketJob::OnAuthRequired(
270 SocketStream
* socket
, AuthChallengeInfo
* auth_info
) {
272 delegate_
->OnAuthRequired(socket
, auth_info
);
275 void WebSocketJob::OnSSLCertificateError(
276 SocketStream
* socket
, const SSLInfo
& ssl_info
, bool fatal
) {
278 delegate_
->OnSSLCertificateError(socket
, ssl_info
, fatal
);
281 void WebSocketJob::OnError(const SocketStream
* socket
, int error
) {
282 if (delegate_
&& error
!= ERR_PROTOCOL_SWITCHED
)
283 delegate_
->OnError(socket
, error
);
286 void WebSocketJob::OnCreatedSpdyStream(int result
) {
287 DCHECK(spdy_websocket_stream_
.get());
288 DCHECK(socket_
.get());
289 DCHECK_NE(ERR_IO_PENDING
, result
);
291 if (state_
== CLOSED
) {
292 result
= ERR_ABORTED
;
293 } else if (result
== OK
) {
295 result
= ERR_PROTOCOL_SWITCHED
;
297 spdy_websocket_stream_
.reset();
303 void WebSocketJob::OnSentSpdyHeaders() {
304 DCHECK_NE(INITIALIZED
, state_
);
305 if (state_
!= CONNECTING
)
307 size_t original_length
= handshake_request_
->original_length();
308 handshake_request_
.reset();
310 delegate_
->OnSentData(socket_
.get(), original_length
);
313 void WebSocketJob::OnSpdyResponseHeadersUpdated(
314 const SpdyHeaderBlock
& response_headers
) {
315 DCHECK_NE(INITIALIZED
, state_
);
316 if (state_
!= CONNECTING
)
318 // TODO(toyoshim): Fallback to non-spdy connection?
319 handshake_response_
->ParseResponseHeaderBlock(response_headers
,
321 spdy_protocol_version_
);
323 SaveCookiesAndNotifyHeadersComplete();
326 void WebSocketJob::OnSentSpdyData(size_t bytes_sent
) {
327 DCHECK_NE(INITIALIZED
, state_
);
328 DCHECK_NE(CONNECTING
, state_
);
329 if (state_
== CLOSED
)
331 if (!spdy_websocket_stream_
.get())
333 OnSentData(socket_
.get(), static_cast<int>(bytes_sent
));
336 void WebSocketJob::OnReceivedSpdyData(scoped_ptr
<SpdyBuffer
> buffer
) {
337 DCHECK_NE(INITIALIZED
, state_
);
338 DCHECK_NE(CONNECTING
, state_
);
339 if (state_
== CLOSED
)
341 if (!spdy_websocket_stream_
.get())
345 socket_
.get(), buffer
->GetRemainingData(), buffer
->GetRemainingSize());
347 OnReceivedData(socket_
.get(), NULL
, 0);
351 void WebSocketJob::OnCloseSpdyStream() {
352 spdy_websocket_stream_
.reset();
353 OnClose(socket_
.get());
356 bool WebSocketJob::SendHandshakeRequest(const char* data
, int len
) {
357 DCHECK_EQ(state_
, CONNECTING
);
358 if (started_to_send_handshake_request_
)
360 if (!handshake_request_
->ParseRequest(data
, len
))
363 AddCookieHeaderAndSend();
367 void WebSocketJob::AddCookieHeaderAndSend() {
369 if (delegate_
&& !delegate_
->CanGetCookies(socket_
.get(), GetURLForCookies()))
372 if (socket_
.get() && delegate_
&& state_
== CONNECTING
) {
373 handshake_request_
->RemoveHeaders(kCookieHeaders
,
374 arraysize(kCookieHeaders
));
375 if (allow
&& socket_
->cookie_store()) {
376 // Add cookies, including HttpOnly cookies.
377 CookieOptions cookie_options
;
378 cookie_options
.set_include_httponly();
379 socket_
->cookie_store()->GetCookiesWithOptionsAsync(
380 GetURLForCookies(), cookie_options
,
381 base::Bind(&WebSocketJob::LoadCookieCallback
,
382 weak_ptr_factory_
.GetWeakPtr()));
389 void WebSocketJob::LoadCookieCallback(const std::string
& cookie
) {
391 // TODO(tyoshino): Sending cookie means that connection doesn't need
392 // PRIVACY_MODE_ENABLED as cookies may be server-bound and channel id
393 // wouldn't negatively affect privacy anyway. Need to restart connection
394 // or refactor to determine cookie status prior to connecting.
395 handshake_request_
->AppendHeaderIfMissing("Cookie", cookie
);
399 void WebSocketJob::DoSendData() {
400 if (spdy_websocket_stream_
.get()) {
401 scoped_ptr
<SpdyHeaderBlock
> headers(new SpdyHeaderBlock
);
402 handshake_request_
->GetRequestHeaderBlock(
403 socket_
->url(), headers
.get(), &challenge_
, spdy_protocol_version_
);
404 spdy_websocket_stream_
->SendRequest(headers
.Pass());
406 const std::string
& handshake_request
=
407 handshake_request_
->GetRawRequest();
408 handshake_request_sent_
= 0;
409 socket_
->net_log()->AddEvent(
410 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS
,
411 base::Bind(&NetLogWebSocketHandshakeCallback
, &handshake_request
));
412 socket_
->SendData(handshake_request
.data(),
413 handshake_request
.size());
415 // Just buffered in |handshake_request_|.
416 started_to_send_handshake_request_
= true;
419 void WebSocketJob::OnSentHandshakeRequest(
420 SocketStream
* socket
, int amount_sent
) {
421 DCHECK_EQ(state_
, CONNECTING
);
422 handshake_request_sent_
+= amount_sent
;
423 DCHECK_LE(handshake_request_sent_
, handshake_request_
->raw_length());
424 if (handshake_request_sent_
>= handshake_request_
->raw_length()) {
425 // handshake request has been sent.
426 // notify original size of handshake request to delegate.
427 // Reset the handshake_request_ first in case this object is deleted by the
429 size_t original_length
= handshake_request_
->original_length();
430 handshake_request_
.reset();
432 delegate_
->OnSentData(socket
, original_length
);
436 void WebSocketJob::OnReceivedHandshakeResponse(
437 SocketStream
* socket
, const char* data
, int len
) {
438 DCHECK_EQ(state_
, CONNECTING
);
439 if (handshake_response_
->HasResponse()) {
440 // If we already has handshake response, received data should be frame
441 // data, not handshake message.
442 received_data_after_handshake_
.insert(
443 received_data_after_handshake_
.end(), data
, data
+ len
);
447 size_t response_length
= handshake_response_
->ParseRawResponse(data
, len
);
448 if (!handshake_response_
->HasResponse()) {
449 // not yet. we need more data.
452 // handshake message is completed.
453 std::string raw_response
= handshake_response_
->GetRawResponse();
454 socket_
->net_log()->AddEvent(
455 NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS
,
456 base::Bind(&NetLogWebSocketHandshakeCallback
, &raw_response
));
457 if (len
- response_length
> 0) {
458 // If we received extra data, it should be frame data.
459 DCHECK(received_data_after_handshake_
.empty());
460 received_data_after_handshake_
.assign(data
+ response_length
, data
+ len
);
462 SaveCookiesAndNotifyHeadersComplete();
465 void WebSocketJob::SaveCookiesAndNotifyHeadersComplete() {
466 // handshake message is completed.
467 DCHECK(handshake_response_
->HasResponse());
469 // Extract cookies from the handshake response into a temporary vector.
470 response_cookies_
.clear();
471 response_cookies_save_index_
= 0;
473 handshake_response_
->GetHeaders(
474 kSetCookieHeaders
, arraysize(kSetCookieHeaders
), &response_cookies_
);
476 // Now, loop over the response cookies, and attempt to persist each.
480 void WebSocketJob::NotifyHeadersComplete() {
481 // Remove cookie headers, with malformed headers preserved.
482 // Actual handshake should be done in Blink.
483 handshake_response_
->RemoveHeaders(
484 kSetCookieHeaders
, arraysize(kSetCookieHeaders
));
485 std::string handshake_response
= handshake_response_
->GetResponse();
486 handshake_response_
.reset();
487 std::vector
<char> received_data(handshake_response
.begin(),
488 handshake_response
.end());
489 received_data
.insert(received_data
.end(),
490 received_data_after_handshake_
.begin(),
491 received_data_after_handshake_
.end());
492 received_data_after_handshake_
.clear();
496 DCHECK(!received_data
.empty());
498 delegate_
->OnReceivedData(
499 socket_
.get(), &received_data
.front(), received_data
.size());
501 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
504 void WebSocketJob::SaveNextCookie() {
505 if (!socket_
.get() || !delegate_
|| state_
!= CONNECTING
)
508 callback_pending_
= false;
509 save_next_cookie_running_
= true;
511 if (socket_
->cookie_store()) {
512 GURL url_for_cookies
= GetURLForCookies();
514 CookieOptions options
;
515 options
.set_include_httponly();
517 // Loop as long as SetCookieWithOptionsAsync completes synchronously. Since
518 // CookieMonster's asynchronous operation APIs queue the callback to run it
519 // on the thread where the API was called, there won't be race. I.e. unless
520 // the callback is run synchronously, it won't be run in parallel with this
522 while (!callback_pending_
&&
523 response_cookies_save_index_
< response_cookies_
.size()) {
524 std::string cookie
= response_cookies_
[response_cookies_save_index_
];
525 response_cookies_save_index_
++;
527 if (!delegate_
->CanSetCookie(
528 socket_
.get(), url_for_cookies
, cookie
, &options
))
531 callback_pending_
= true;
532 socket_
->cookie_store()->SetCookieWithOptionsAsync(
533 url_for_cookies
, cookie
, options
,
534 base::Bind(&WebSocketJob::OnCookieSaved
,
535 weak_ptr_factory_
.GetWeakPtr()));
539 save_next_cookie_running_
= false;
541 if (callback_pending_
)
544 response_cookies_
.clear();
545 response_cookies_save_index_
= 0;
547 NotifyHeadersComplete();
550 void WebSocketJob::OnCookieSaved(bool cookie_status
) {
551 // Tell the caller of SetCookieWithOptionsAsync() that this completion
552 // callback is invoked.
553 // - If the caller checks callback_pending earlier than this callback, the
554 // caller exits to let this method continue iteration.
555 // - Otherwise, the caller continues iteration.
556 callback_pending_
= false;
558 // Resume SaveNextCookie if the caller of SetCookieWithOptionsAsync() exited
559 // the loop. Otherwise, return.
560 if (save_next_cookie_running_
)
566 GURL
WebSocketJob::GetURLForCookies() const {
567 GURL url
= socket_
->url();
568 std::string scheme
= socket_
->is_secure() ? "https" : "http";
569 url::Replacements
<char> replacements
;
570 replacements
.SetScheme(scheme
.c_str(), url::Component(0, scheme
.length()));
571 return url
.ReplaceComponents(replacements
);
574 const AddressList
& WebSocketJob::address_list() const {
578 int WebSocketJob::TrySpdyStream() {
582 if (!websocket_over_spdy_enabled_
)
585 // Check if we have a SPDY session available.
586 HttpTransactionFactory
* factory
=
587 socket_
->context()->http_transaction_factory();
590 scoped_refptr
<HttpNetworkSession
> session
= factory
->GetSession();
593 SpdySessionPool
* spdy_pool
= session
->spdy_session_pool();
594 PrivacyMode privacy_mode
= socket_
->privacy_mode();
595 const SpdySessionKey
key(HostPortPair::FromURL(socket_
->url()),
596 socket_
->proxy_server(), privacy_mode
);
597 // Forbid wss downgrade to SPDY without SSL.
598 // TODO(toyoshim): Does it realize the same policy with HTTP?
599 base::WeakPtr
<SpdySession
> spdy_session
=
600 spdy_pool
->FindAvailableSession(key
, *socket_
->net_log());
605 bool was_npn_negotiated
;
606 NextProto protocol_negotiated
= kProtoUnknown
;
607 bool use_ssl
= spdy_session
->GetSSLInfo(
608 &ssl_info
, &was_npn_negotiated
, &protocol_negotiated
);
609 if (socket_
->is_secure() && !use_ssl
)
612 // Create SpdyWebSocketStream.
613 spdy_protocol_version_
= spdy_session
->GetProtocolVersion();
614 spdy_websocket_stream_
.reset(new SpdyWebSocketStream(spdy_session
, this));
616 int result
= spdy_websocket_stream_
->InitializeStream(
617 socket_
->url(), MEDIUM
, *socket_
->net_log());
619 OnConnected(socket_
.get(), kMaxPendingSendAllowed
);
620 return ERR_PROTOCOL_SWITCHED
;
622 if (result
!= ERR_IO_PENDING
) {
623 spdy_websocket_stream_
.reset();
627 return ERR_IO_PENDING
;
630 void WebSocketJob::SetWaiting() {
634 bool WebSocketJob::IsWaiting() const {
638 void WebSocketJob::Wakeup() {
642 DCHECK(!callback_
.is_null());
643 base::MessageLoopForIO::current()->PostTask(
645 base::Bind(&WebSocketJob::RetryPendingIO
,
646 weak_ptr_factory_
.GetWeakPtr()));
649 void WebSocketJob::RetryPendingIO() {
650 int result
= TrySpdyStream();
652 // In the case of ERR_IO_PENDING, CompleteIO() will be called from
653 // OnCreatedSpdyStream().
654 if (result
!= ERR_IO_PENDING
)
658 void WebSocketJob::CompleteIO(int result
) {
659 // |callback_| may be null if OnClose() or DetachDelegate() was called.
660 if (!callback_
.is_null()) {
661 CompletionCallback callback
= callback_
;
663 callback
.Run(result
);
664 Release(); // Balanced with OnStartOpenConnection().
668 bool WebSocketJob::SendDataInternal(const char* data
, int length
) {
669 if (spdy_websocket_stream_
.get())
670 return ERR_IO_PENDING
== spdy_websocket_stream_
->SendData(data
, length
);
672 return socket_
->SendData(data
, length
);
676 void WebSocketJob::CloseInternal() {
677 if (spdy_websocket_stream_
.get())
678 spdy_websocket_stream_
->Close();
683 void WebSocketJob::SendPending() {
684 if (current_send_buffer_
.get())
687 // Current buffer has been sent. Try next if any.
688 if (send_buffer_queue_
.empty()) {
689 // No more data to send.
690 if (state_
== CLOSING
)
695 scoped_refptr
<IOBufferWithSize
> next_buffer
= send_buffer_queue_
.front();
696 send_buffer_queue_
.pop_front();
697 current_send_buffer_
=
698 new DrainableIOBuffer(next_buffer
.get(), next_buffer
->size());
699 SendDataInternal(current_send_buffer_
->data(),
700 current_send_buffer_
->BytesRemaining());