Extract SIGPIPE ignoring code to a common place.
[chromium-blink-merge.git] / net / websockets / websocket_job.cc
blobff60ebeaa1ad45386df31f12462418afa64acd99
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/websockets/websocket_job.h"
7 #include <algorithm>
9 #include "base/bind.h"
10 #include "base/lazy_instance.h"
11 #include "base/string_tokenizer.h"
12 #include "googleurl/src/gurl.h"
13 #include "net/base/net_errors.h"
14 #include "net/base/net_log.h"
15 #include "net/cookies/cookie_store.h"
16 #include "net/base/io_buffer.h"
17 #include "net/http/http_network_session.h"
18 #include "net/http/http_transaction_factory.h"
19 #include "net/http/http_util.h"
20 #include "net/spdy/spdy_session.h"
21 #include "net/spdy/spdy_session_pool.h"
22 #include "net/url_request/url_request_context.h"
23 #include "net/websockets/websocket_handshake_handler.h"
24 #include "net/websockets/websocket_net_log_params.h"
25 #include "net/websockets/websocket_throttle.h"
27 static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes.
29 namespace {
31 // lower-case header names.
32 const char* const kCookieHeaders[] = {
33 "cookie", "cookie2"
35 const char* const kSetCookieHeaders[] = {
36 "set-cookie", "set-cookie2"
39 net::SocketStreamJob* WebSocketJobFactory(
40 const GURL& url, net::SocketStream::Delegate* delegate) {
41 net::WebSocketJob* job = new net::WebSocketJob(delegate);
42 job->InitSocketStream(new net::SocketStream(url, job));
43 return job;
46 class WebSocketJobInitSingleton {
47 private:
48 friend struct base::DefaultLazyInstanceTraits<WebSocketJobInitSingleton>;
49 WebSocketJobInitSingleton() {
50 net::SocketStreamJob::RegisterProtocolFactory("ws", WebSocketJobFactory);
51 net::SocketStreamJob::RegisterProtocolFactory("wss", WebSocketJobFactory);
55 static base::LazyInstance<WebSocketJobInitSingleton> g_websocket_job_init =
56 LAZY_INSTANCE_INITIALIZER;
58 } // anonymous namespace
60 namespace net {
62 bool WebSocketJob::websocket_over_spdy_enabled_ = false;
64 // static
65 void WebSocketJob::EnsureInit() {
66 g_websocket_job_init.Get();
69 // static
70 void WebSocketJob::set_websocket_over_spdy_enabled(bool enabled) {
71 websocket_over_spdy_enabled_ = enabled;
74 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
75 : delegate_(delegate),
76 state_(INITIALIZED),
77 waiting_(false),
78 handshake_request_(new WebSocketHandshakeRequestHandler),
79 handshake_response_(new WebSocketHandshakeResponseHandler),
80 started_to_send_handshake_request_(false),
81 handshake_request_sent_(0),
82 response_cookies_save_index_(0),
83 spdy_protocol_version_(0),
84 ALLOW_THIS_IN_INITIALIZER_LIST(weak_ptr_factory_(this)),
85 ALLOW_THIS_IN_INITIALIZER_LIST(weak_ptr_factory_for_send_pending_(this)) {
88 WebSocketJob::~WebSocketJob() {
89 DCHECK_EQ(CLOSED, state_);
90 DCHECK(!delegate_);
91 DCHECK(!socket_.get());
94 void WebSocketJob::Connect() {
95 DCHECK(socket_.get());
96 DCHECK_EQ(state_, INITIALIZED);
97 state_ = CONNECTING;
98 socket_->Connect();
101 bool WebSocketJob::SendData(const char* data, int len) {
102 switch (state_) {
103 case INITIALIZED:
104 return false;
106 case CONNECTING:
107 return SendHandshakeRequest(data, len);
109 case OPEN:
111 scoped_refptr<IOBufferWithSize> buffer = new IOBufferWithSize(len);
112 memcpy(buffer->data(), data, len);
113 if (current_send_buffer_ || !send_buffer_queue_.empty()) {
114 send_buffer_queue_.push_back(buffer);
115 return true;
117 current_send_buffer_ = new DrainableIOBuffer(buffer.get(), len);
118 return SendDataInternal(current_send_buffer_->data(),
119 current_send_buffer_->BytesRemaining());
122 case CLOSING:
123 case CLOSED:
124 return false;
126 return false;
129 void WebSocketJob::Close() {
130 if (state_ == CLOSED)
131 return;
133 state_ = CLOSING;
134 if (current_send_buffer_) {
135 // Will close in SendPending.
136 return;
138 state_ = CLOSED;
139 CloseInternal();
142 void WebSocketJob::RestartWithAuth(const AuthCredentials& credentials) {
143 state_ = CONNECTING;
144 socket_->RestartWithAuth(credentials);
147 void WebSocketJob::DetachDelegate() {
148 state_ = CLOSED;
149 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
150 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
152 scoped_refptr<WebSocketJob> protect(this);
153 weak_ptr_factory_.InvalidateWeakPtrs();
154 weak_ptr_factory_for_send_pending_.InvalidateWeakPtrs();
156 delegate_ = NULL;
157 if (socket_)
158 socket_->DetachDelegate();
159 socket_ = NULL;
160 if (!callback_.is_null()) {
161 waiting_ = false;
162 callback_.Reset();
163 Release(); // Balanced with OnStartOpenConnection().
167 int WebSocketJob::OnStartOpenConnection(
168 SocketStream* socket, const CompletionCallback& callback) {
169 DCHECK(callback_.is_null());
170 state_ = CONNECTING;
171 addresses_ = socket->address_list();
172 WebSocketThrottle::GetInstance()->PutInQueue(this);
173 if (delegate_) {
174 int result = delegate_->OnStartOpenConnection(socket, callback);
175 DCHECK_EQ(OK, result);
177 if (waiting_) {
178 // PutInQueue() may set |waiting_| true for throttling. In this case,
179 // Wakeup() will be called later.
180 callback_ = callback;
181 AddRef(); // Balanced when callback_ is cleared.
182 return ERR_IO_PENDING;
184 return TrySpdyStream();
187 void WebSocketJob::OnConnected(
188 SocketStream* socket, int max_pending_send_allowed) {
189 if (state_ == CLOSED)
190 return;
191 DCHECK_EQ(CONNECTING, state_);
192 if (delegate_)
193 delegate_->OnConnected(socket, max_pending_send_allowed);
196 void WebSocketJob::OnSentData(SocketStream* socket, int amount_sent) {
197 DCHECK_NE(INITIALIZED, state_);
198 DCHECK_GT(amount_sent, 0);
199 if (state_ == CLOSED)
200 return;
201 if (state_ == CONNECTING) {
202 OnSentHandshakeRequest(socket, amount_sent);
203 return;
205 if (delegate_) {
206 DCHECK(state_ == OPEN || state_ == CLOSING);
207 if (!current_send_buffer_) {
208 VLOG(1) << "OnSentData current_send_buffer=NULL amount_sent="
209 << amount_sent;
210 return;
212 current_send_buffer_->DidConsume(amount_sent);
213 if (current_send_buffer_->BytesRemaining() > 0)
214 return;
216 // We need to report amount_sent of original buffer size, instead of
217 // amount sent to |socket|.
218 amount_sent = current_send_buffer_->size();
219 DCHECK_GT(amount_sent, 0);
220 current_send_buffer_ = NULL;
221 if (!weak_ptr_factory_for_send_pending_.HasWeakPtrs()) {
222 MessageLoopForIO::current()->PostTask(
223 FROM_HERE,
224 base::Bind(&WebSocketJob::SendPending,
225 weak_ptr_factory_for_send_pending_.GetWeakPtr()));
227 delegate_->OnSentData(socket, amount_sent);
231 void WebSocketJob::OnReceivedData(
232 SocketStream* socket, const char* data, int len) {
233 DCHECK_NE(INITIALIZED, state_);
234 if (state_ == CLOSED)
235 return;
236 if (state_ == CONNECTING) {
237 OnReceivedHandshakeResponse(socket, data, len);
238 return;
240 DCHECK(state_ == OPEN || state_ == CLOSING);
241 if (delegate_ && len > 0)
242 delegate_->OnReceivedData(socket, data, len);
245 void WebSocketJob::OnClose(SocketStream* socket) {
246 state_ = CLOSED;
247 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
248 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
250 scoped_refptr<WebSocketJob> protect(this);
251 weak_ptr_factory_.InvalidateWeakPtrs();
253 SocketStream::Delegate* delegate = delegate_;
254 delegate_ = NULL;
255 socket_ = NULL;
256 if (!callback_.is_null()) {
257 waiting_ = false;
258 callback_.Reset();
259 Release(); // Balanced with OnStartOpenConnection().
261 if (delegate)
262 delegate->OnClose(socket);
265 void WebSocketJob::OnAuthRequired(
266 SocketStream* socket, AuthChallengeInfo* auth_info) {
267 if (delegate_)
268 delegate_->OnAuthRequired(socket, auth_info);
271 void WebSocketJob::OnSSLCertificateError(
272 SocketStream* socket, const SSLInfo& ssl_info, bool fatal) {
273 if (delegate_)
274 delegate_->OnSSLCertificateError(socket, ssl_info, fatal);
277 void WebSocketJob::OnError(const SocketStream* socket, int error) {
278 if (delegate_ && error != ERR_PROTOCOL_SWITCHED)
279 delegate_->OnError(socket, error);
282 void WebSocketJob::OnCreatedSpdyStream(int result) {
283 DCHECK(spdy_websocket_stream_.get());
284 DCHECK(socket_.get());
285 DCHECK_NE(ERR_IO_PENDING, result);
287 if (state_ == CLOSED) {
288 result = ERR_ABORTED;
289 } else if (result == OK) {
290 state_ = CONNECTING;
291 result = ERR_PROTOCOL_SWITCHED;
292 } else {
293 spdy_websocket_stream_.reset();
296 CompleteIO(result);
299 void WebSocketJob::OnSentSpdyHeaders(int result) {
300 DCHECK_NE(INITIALIZED, state_);
301 if (state_ != CONNECTING)
302 return;
303 if (delegate_)
304 delegate_->OnSentData(socket_, handshake_request_->original_length());
305 handshake_request_.reset();
308 int WebSocketJob::OnReceivedSpdyResponseHeader(
309 const SpdyHeaderBlock& headers, int status) {
310 DCHECK_NE(INITIALIZED, state_);
311 if (state_ != CONNECTING)
312 return status;
313 if (status != OK)
314 return status;
315 // TODO(toyoshim): Fallback to non-spdy connection?
316 handshake_response_->ParseResponseHeaderBlock(headers,
317 challenge_,
318 spdy_protocol_version_);
320 SaveCookiesAndNotifyHeaderComplete();
321 return OK;
324 void WebSocketJob::OnSentSpdyData(int amount_sent) {
325 DCHECK_NE(INITIALIZED, state_);
326 DCHECK_NE(CONNECTING, state_);
327 if (state_ == CLOSED)
328 return;
329 if (!spdy_websocket_stream_.get())
330 return;
331 OnSentData(socket_, amount_sent);
334 void WebSocketJob::OnReceivedSpdyData(const char* data, int length) {
335 DCHECK_NE(INITIALIZED, state_);
336 DCHECK_NE(CONNECTING, state_);
337 if (state_ == CLOSED)
338 return;
339 if (!spdy_websocket_stream_.get())
340 return;
341 OnReceivedData(socket_, data, length);
344 void WebSocketJob::OnCloseSpdyStream() {
345 spdy_websocket_stream_.reset();
346 OnClose(socket_);
349 bool WebSocketJob::SendHandshakeRequest(const char* data, int len) {
350 DCHECK_EQ(state_, CONNECTING);
351 if (started_to_send_handshake_request_)
352 return false;
353 if (!handshake_request_->ParseRequest(data, len))
354 return false;
356 // handshake message is completed.
357 handshake_response_->set_protocol_version(
358 handshake_request_->protocol_version());
359 AddCookieHeaderAndSend();
360 return true;
363 void WebSocketJob::AddCookieHeaderAndSend() {
364 bool allow = true;
365 if (delegate_ && !delegate_->CanGetCookies(socket_, GetURLForCookies()))
366 allow = false;
368 if (socket_ && delegate_ && state_ == CONNECTING) {
369 handshake_request_->RemoveHeaders(
370 kCookieHeaders, arraysize(kCookieHeaders));
371 if (allow && socket_->context()->cookie_store()) {
372 // Add cookies, including HttpOnly cookies.
373 CookieOptions cookie_options;
374 cookie_options.set_include_httponly();
375 socket_->context()->cookie_store()->GetCookiesWithOptionsAsync(
376 GetURLForCookies(), cookie_options,
377 base::Bind(&WebSocketJob::LoadCookieCallback,
378 weak_ptr_factory_.GetWeakPtr()));
379 } else {
380 DoSendData();
385 void WebSocketJob::LoadCookieCallback(const std::string& cookie) {
386 if (!cookie.empty())
387 handshake_request_->AppendHeaderIfMissing("Cookie", cookie);
388 DoSendData();
391 void WebSocketJob::DoSendData() {
392 if (spdy_websocket_stream_.get()) {
393 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
394 handshake_request_->GetRequestHeaderBlock(
395 socket_->url(), headers.get(), &challenge_, spdy_protocol_version_);
396 spdy_websocket_stream_->SendRequest(headers.Pass());
397 } else {
398 const std::string& handshake_request =
399 handshake_request_->GetRawRequest();
400 handshake_request_sent_ = 0;
401 socket_->net_log()->AddEvent(
402 NetLog::TYPE_WEB_SOCKET_SEND_REQUEST_HEADERS,
403 base::Bind(&NetLogWebSocketHandshakeCallback, &handshake_request));
404 socket_->SendData(handshake_request.data(),
405 handshake_request.size());
407 // Just buffered in |handshake_request_|.
408 started_to_send_handshake_request_ = true;
411 void WebSocketJob::OnSentHandshakeRequest(
412 SocketStream* socket, int amount_sent) {
413 DCHECK_EQ(state_, CONNECTING);
414 handshake_request_sent_ += amount_sent;
415 DCHECK_LE(handshake_request_sent_, handshake_request_->raw_length());
416 if (handshake_request_sent_ >= handshake_request_->raw_length()) {
417 // handshake request has been sent.
418 // notify original size of handshake request to delegate.
419 if (delegate_)
420 delegate_->OnSentData(
421 socket,
422 handshake_request_->original_length());
423 handshake_request_.reset();
427 void WebSocketJob::OnReceivedHandshakeResponse(
428 SocketStream* socket, const char* data, int len) {
429 DCHECK_EQ(state_, CONNECTING);
430 if (handshake_response_->HasResponse()) {
431 // If we already has handshake response, received data should be frame
432 // data, not handshake message.
433 received_data_after_handshake_.insert(
434 received_data_after_handshake_.end(), data, data + len);
435 return;
438 size_t response_length = handshake_response_->ParseRawResponse(data, len);
439 if (!handshake_response_->HasResponse()) {
440 // not yet. we need more data.
441 return;
443 // handshake message is completed.
444 std::string raw_response = handshake_response_->GetRawResponse();
445 socket_->net_log()->AddEvent(
446 NetLog::TYPE_WEB_SOCKET_READ_RESPONSE_HEADERS,
447 base::Bind(&NetLogWebSocketHandshakeCallback, &raw_response));
448 if (len - response_length > 0) {
449 // If we received extra data, it should be frame data.
450 DCHECK(received_data_after_handshake_.empty());
451 received_data_after_handshake_.assign(data + response_length, data + len);
453 SaveCookiesAndNotifyHeaderComplete();
456 void WebSocketJob::SaveCookiesAndNotifyHeaderComplete() {
457 // handshake message is completed.
458 DCHECK(handshake_response_->HasResponse());
460 response_cookies_.clear();
461 response_cookies_save_index_ = 0;
463 handshake_response_->GetHeaders(
464 kSetCookieHeaders, arraysize(kSetCookieHeaders), &response_cookies_);
466 // Now, loop over the response cookies, and attempt to persist each.
467 SaveNextCookie();
470 void WebSocketJob::SaveNextCookie() {
471 if (response_cookies_save_index_ == response_cookies_.size()) {
472 response_cookies_.clear();
473 response_cookies_save_index_ = 0;
475 // Remove cookie headers, with malformed headers preserved.
476 // Actual handshake should be done in WebKit.
477 handshake_response_->RemoveHeaders(
478 kSetCookieHeaders, arraysize(kSetCookieHeaders));
479 std::string handshake_response = handshake_response_->GetResponse();
480 std::vector<char> received_data(handshake_response.begin(),
481 handshake_response.end());
482 received_data.insert(received_data.end(),
483 received_data_after_handshake_.begin(),
484 received_data_after_handshake_.end());
485 received_data_after_handshake_.clear();
487 state_ = OPEN;
489 DCHECK(!received_data.empty());
490 if (delegate_)
491 delegate_->OnReceivedData(
492 socket_, &received_data.front(), received_data.size());
494 handshake_response_.reset();
496 WebSocketThrottle::GetInstance()->RemoveFromQueue(this);
497 WebSocketThrottle::GetInstance()->WakeupSocketIfNecessary();
498 return;
501 bool allow = true;
502 CookieOptions options;
503 GURL url = GetURLForCookies();
504 std::string cookie = response_cookies_[response_cookies_save_index_];
505 if (delegate_ && !delegate_->CanSetCookie(socket_, url, cookie, &options))
506 allow = false;
508 if (socket_ && delegate_ && state_ == CONNECTING) {
509 response_cookies_save_index_++;
510 if (allow && socket_->context()->cookie_store()) {
511 options.set_include_httponly();
512 socket_->context()->cookie_store()->SetCookieWithOptionsAsync(
513 url, cookie, options,
514 base::Bind(&WebSocketJob::SaveCookieCallback,
515 weak_ptr_factory_.GetWeakPtr()));
516 } else {
517 SaveNextCookie();
522 void WebSocketJob::SaveCookieCallback(bool cookie_status) {
523 SaveNextCookie();
526 GURL WebSocketJob::GetURLForCookies() const {
527 GURL url = socket_->url();
528 std::string scheme = socket_->is_secure() ? "https" : "http";
529 url_canon::Replacements<char> replacements;
530 replacements.SetScheme(scheme.c_str(),
531 url_parse::Component(0, scheme.length()));
532 return url.ReplaceComponents(replacements);
535 const AddressList& WebSocketJob::address_list() const {
536 return addresses_;
539 int WebSocketJob::TrySpdyStream() {
540 if (!socket_.get())
541 return ERR_FAILED;
543 if (!websocket_over_spdy_enabled_)
544 return OK;
546 // Check if we have a SPDY session available.
547 HttpTransactionFactory* factory =
548 socket_->context()->http_transaction_factory();
549 if (!factory)
550 return OK;
551 scoped_refptr<HttpNetworkSession> session = factory->GetSession();
552 if (!session.get())
553 return OK;
554 SpdySessionPool* spdy_pool = session->spdy_session_pool();
555 const HostPortProxyPair pair(HostPortPair::FromURL(socket_->url()),
556 socket_->proxy_server());
557 if (!spdy_pool->HasSession(pair))
558 return OK;
560 // Forbid wss downgrade to SPDY without SSL.
561 // TODO(toyoshim): Does it realize the same policy with HTTP?
562 scoped_refptr<SpdySession> spdy_session =
563 spdy_pool->Get(pair, *socket_->net_log());
564 SSLInfo ssl_info;
565 bool was_npn_negotiated;
566 NextProto protocol_negotiated = kProtoUnknown;
567 bool use_ssl = spdy_session->GetSSLInfo(
568 &ssl_info, &was_npn_negotiated, &protocol_negotiated);
569 if (socket_->is_secure() && !use_ssl)
570 return OK;
572 // Create SpdyWebSocketStream.
573 spdy_protocol_version_ = spdy_session->GetProtocolVersion();
574 spdy_websocket_stream_.reset(new SpdyWebSocketStream(spdy_session, this));
576 int result = spdy_websocket_stream_->InitializeStream(
577 socket_->url(), MEDIUM, *socket_->net_log());
578 if (result == OK) {
579 OnConnected(socket_, kMaxPendingSendAllowed);
580 return ERR_PROTOCOL_SWITCHED;
582 if (result != ERR_IO_PENDING) {
583 spdy_websocket_stream_.reset();
584 return OK;
587 return ERR_IO_PENDING;
590 void WebSocketJob::SetWaiting() {
591 waiting_ = true;
594 bool WebSocketJob::IsWaiting() const {
595 return waiting_;
598 void WebSocketJob::Wakeup() {
599 if (!waiting_)
600 return;
601 waiting_ = false;
602 DCHECK(!callback_.is_null());
603 MessageLoopForIO::current()->PostTask(
604 FROM_HERE,
605 base::Bind(&WebSocketJob::RetryPendingIO,
606 weak_ptr_factory_.GetWeakPtr()));
609 void WebSocketJob::RetryPendingIO() {
610 int result = TrySpdyStream();
612 // In the case of ERR_IO_PENDING, CompleteIO() will be called from
613 // OnCreatedSpdyStream().
614 if (result != ERR_IO_PENDING)
615 CompleteIO(result);
618 void WebSocketJob::CompleteIO(int result) {
619 // |callback_| may be null if OnClose() or DetachDelegate() was called.
620 if (!callback_.is_null()) {
621 CompletionCallback callback = callback_;
622 callback_.Reset();
623 callback.Run(result);
624 Release(); // Balanced with OnStartOpenConnection().
628 bool WebSocketJob::SendDataInternal(const char* data, int length) {
629 if (spdy_websocket_stream_.get())
630 return ERR_IO_PENDING == spdy_websocket_stream_->SendData(data, length);
631 if (socket_.get())
632 return socket_->SendData(data, length);
633 return false;
636 void WebSocketJob::CloseInternal() {
637 if (spdy_websocket_stream_.get())
638 spdy_websocket_stream_->Close();
639 if (socket_.get())
640 socket_->Close();
643 void WebSocketJob::SendPending() {
644 if (current_send_buffer_)
645 return;
647 // Current buffer has been sent. Try next if any.
648 if (send_buffer_queue_.empty()) {
649 // No more data to send.
650 if (state_ == CLOSING)
651 CloseInternal();
652 return;
655 scoped_refptr<IOBufferWithSize> next_buffer = send_buffer_queue_.front();
656 send_buffer_queue_.pop_front();
657 current_send_buffer_ = new DrainableIOBuffer(next_buffer,
658 next_buffer->size());
659 SendDataInternal(current_send_buffer_->data(),
660 current_send_buffer_->BytesRemaining());
663 } // namespace net