Move WeakPtrFactory member to the end of LocalExtensionCache
[chromium-blink-merge.git] / net / websockets / websocket_channel.cc
blob30abb2db90fe8eb8c67fa3dc898d26a5a6003b61
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/websockets/websocket_channel.h"
7 #include <limits.h> // for INT_MAX
9 #include <algorithm>
10 #include <deque>
12 #include "base/basictypes.h" // for size_t
13 #include "base/big_endian.h"
14 #include "base/bind.h"
15 #include "base/compiler_specific.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/memory/weak_ptr.h"
18 #include "base/message_loop/message_loop.h"
19 #include "base/metrics/histogram.h"
20 #include "base/numerics/safe_conversions.h"
21 #include "base/stl_util.h"
22 #include "base/strings/stringprintf.h"
23 #include "base/time/time.h"
24 #include "net/base/io_buffer.h"
25 #include "net/base/net_log.h"
26 #include "net/http/http_request_headers.h"
27 #include "net/http/http_response_headers.h"
28 #include "net/http/http_util.h"
29 #include "net/websockets/websocket_errors.h"
30 #include "net/websockets/websocket_event_interface.h"
31 #include "net/websockets/websocket_frame.h"
32 #include "net/websockets/websocket_handshake_request_info.h"
33 #include "net/websockets/websocket_handshake_response_info.h"
34 #include "net/websockets/websocket_mux.h"
35 #include "net/websockets/websocket_stream.h"
36 #include "url/origin.h"
38 namespace net {
40 namespace {
42 using base::StreamingUtf8Validator;
44 const int kDefaultSendQuotaLowWaterMark = 1 << 16;
45 const int kDefaultSendQuotaHighWaterMark = 1 << 17;
46 const size_t kWebSocketCloseCodeLength = 2;
47 // Timeout for waiting for the server to acknowledge a closing handshake.
48 const int kClosingHandshakeTimeoutSeconds = 60;
49 // We wait for the server to close the underlying connection as recommended in
50 // https://tools.ietf.org/html/rfc6455#section-7.1.1
51 // We don't use 2MSL since there're server implementations that don't follow
52 // the recommendation and wait for the client to close the underlying
53 // connection. It leads to unnecessarily long time before CloseEvent
54 // invocation. We want to avoid this rather than strictly following the spec
55 // recommendation.
56 const int kUnderlyingConnectionCloseTimeoutSeconds = 2;
58 typedef WebSocketEventInterface::ChannelState ChannelState;
59 const ChannelState CHANNEL_ALIVE = WebSocketEventInterface::CHANNEL_ALIVE;
60 const ChannelState CHANNEL_DELETED = WebSocketEventInterface::CHANNEL_DELETED;
62 // Maximum close reason length = max control frame payload -
63 // status code length
64 // = 125 - 2
65 const size_t kMaximumCloseReasonLength = 125 - kWebSocketCloseCodeLength;
67 // Check a close status code for strict compliance with RFC6455. This is only
68 // used for close codes received from a renderer that we are intending to send
69 // out over the network. See ParseClose() for the restrictions on incoming close
70 // codes. The |code| parameter is type int for convenience of implementation;
71 // the real type is uint16. Code 1005 is treated specially; it cannot be set
72 // explicitly by Javascript but the renderer uses it to indicate we should send
73 // a Close frame with no payload.
74 bool IsStrictlyValidCloseStatusCode(int code) {
75 static const int kInvalidRanges[] = {
76 // [BAD, OK)
77 0, 1000, // 1000 is the first valid code
78 1006, 1007, // 1006 MUST NOT be set.
79 1014, 3000, // 1014 unassigned; 1015 up to 2999 are reserved.
80 5000, 65536, // Codes above 5000 are invalid.
82 const int* const kInvalidRangesEnd =
83 kInvalidRanges + arraysize(kInvalidRanges);
85 DCHECK_GE(code, 0);
86 DCHECK_LT(code, 65536);
87 const int* upper = std::upper_bound(kInvalidRanges, kInvalidRangesEnd, code);
88 DCHECK_NE(kInvalidRangesEnd, upper);
89 DCHECK_GT(upper, kInvalidRanges);
90 DCHECK_GT(*upper, code);
91 DCHECK_LE(*(upper - 1), code);
92 return ((upper - kInvalidRanges) % 2) == 0;
95 // Sets |name| to the name of the frame type for the given |opcode|. Note that
96 // for all of Text, Binary and Continuation opcode, this method returns
97 // "Data frame".
98 void GetFrameTypeForOpcode(WebSocketFrameHeader::OpCode opcode,
99 std::string* name) {
100 switch (opcode) {
101 case WebSocketFrameHeader::kOpCodeText: // fall-thru
102 case WebSocketFrameHeader::kOpCodeBinary: // fall-thru
103 case WebSocketFrameHeader::kOpCodeContinuation:
104 *name = "Data frame";
105 break;
107 case WebSocketFrameHeader::kOpCodePing:
108 *name = "Ping";
109 break;
111 case WebSocketFrameHeader::kOpCodePong:
112 *name = "Pong";
113 break;
115 case WebSocketFrameHeader::kOpCodeClose:
116 *name = "Close";
117 break;
119 default:
120 *name = "Unknown frame type";
121 break;
124 return;
127 } // namespace
129 // A class to encapsulate a set of frames and information about the size of
130 // those frames.
131 class WebSocketChannel::SendBuffer {
132 public:
133 SendBuffer() : total_bytes_(0) {}
135 // Add a WebSocketFrame to the buffer and increase total_bytes_.
136 void AddFrame(scoped_ptr<WebSocketFrame> chunk);
138 // Return a pointer to the frames_ for write purposes.
139 ScopedVector<WebSocketFrame>* frames() { return &frames_; }
141 private:
142 // The frames_ that will be sent in the next call to WriteFrames().
143 ScopedVector<WebSocketFrame> frames_;
145 // The total size of the payload data in |frames_|. This will be used to
146 // measure the throughput of the link.
147 // TODO(ricea): Measure the throughput of the link.
148 uint64 total_bytes_;
151 void WebSocketChannel::SendBuffer::AddFrame(scoped_ptr<WebSocketFrame> frame) {
152 total_bytes_ += frame->header.payload_length;
153 frames_.push_back(frame.release());
156 // Implementation of WebSocketStream::ConnectDelegate that simply forwards the
157 // calls on to the WebSocketChannel that created it.
158 class WebSocketChannel::ConnectDelegate
159 : public WebSocketStream::ConnectDelegate {
160 public:
161 explicit ConnectDelegate(WebSocketChannel* creator) : creator_(creator) {}
163 void OnSuccess(scoped_ptr<WebSocketStream> stream) override {
164 creator_->OnConnectSuccess(stream.Pass());
165 // |this| may have been deleted.
168 void OnFailure(const std::string& message) override {
169 creator_->OnConnectFailure(message);
170 // |this| has been deleted.
173 void OnStartOpeningHandshake(
174 scoped_ptr<WebSocketHandshakeRequestInfo> request) override {
175 creator_->OnStartOpeningHandshake(request.Pass());
178 void OnFinishOpeningHandshake(
179 scoped_ptr<WebSocketHandshakeResponseInfo> response) override {
180 creator_->OnFinishOpeningHandshake(response.Pass());
183 void OnSSLCertificateError(
184 scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks>
185 ssl_error_callbacks,
186 const SSLInfo& ssl_info,
187 bool fatal) override {
188 creator_->OnSSLCertificateError(
189 ssl_error_callbacks.Pass(), ssl_info, fatal);
192 private:
193 // A pointer to the WebSocketChannel that created this object. There is no
194 // danger of this pointer being stale, because deleting the WebSocketChannel
195 // cancels the connect process, deleting this object and preventing its
196 // callbacks from being called.
197 WebSocketChannel* const creator_;
199 DISALLOW_COPY_AND_ASSIGN(ConnectDelegate);
202 class WebSocketChannel::HandshakeNotificationSender
203 : public base::SupportsWeakPtr<HandshakeNotificationSender> {
204 public:
205 explicit HandshakeNotificationSender(WebSocketChannel* channel);
206 ~HandshakeNotificationSender();
208 static void Send(base::WeakPtr<HandshakeNotificationSender> sender);
210 ChannelState SendImmediately(WebSocketEventInterface* event_interface);
212 const WebSocketHandshakeRequestInfo* handshake_request_info() const {
213 return handshake_request_info_.get();
216 void set_handshake_request_info(
217 scoped_ptr<WebSocketHandshakeRequestInfo> request_info) {
218 handshake_request_info_ = request_info.Pass();
221 const WebSocketHandshakeResponseInfo* handshake_response_info() const {
222 return handshake_response_info_.get();
225 void set_handshake_response_info(
226 scoped_ptr<WebSocketHandshakeResponseInfo> response_info) {
227 handshake_response_info_ = response_info.Pass();
230 private:
231 WebSocketChannel* owner_;
232 scoped_ptr<WebSocketHandshakeRequestInfo> handshake_request_info_;
233 scoped_ptr<WebSocketHandshakeResponseInfo> handshake_response_info_;
236 WebSocketChannel::HandshakeNotificationSender::HandshakeNotificationSender(
237 WebSocketChannel* channel)
238 : owner_(channel) {}
240 WebSocketChannel::HandshakeNotificationSender::~HandshakeNotificationSender() {}
242 void WebSocketChannel::HandshakeNotificationSender::Send(
243 base::WeakPtr<HandshakeNotificationSender> sender) {
244 // Do nothing if |sender| is already destructed.
245 if (sender) {
246 WebSocketChannel* channel = sender->owner_;
247 sender->SendImmediately(channel->event_interface_.get());
251 ChannelState WebSocketChannel::HandshakeNotificationSender::SendImmediately(
252 WebSocketEventInterface* event_interface) {
254 if (handshake_request_info_.get()) {
255 if (CHANNEL_DELETED == event_interface->OnStartOpeningHandshake(
256 handshake_request_info_.Pass()))
257 return CHANNEL_DELETED;
260 if (handshake_response_info_.get()) {
261 if (CHANNEL_DELETED == event_interface->OnFinishOpeningHandshake(
262 handshake_response_info_.Pass()))
263 return CHANNEL_DELETED;
265 // TODO(yhirano): We can release |this| to save memory because
266 // there will be no more opening handshake notification.
269 return CHANNEL_ALIVE;
272 WebSocketChannel::PendingReceivedFrame::PendingReceivedFrame(
273 bool final,
274 WebSocketFrameHeader::OpCode opcode,
275 const scoped_refptr<IOBuffer>& data,
276 uint64 offset,
277 uint64 size)
278 : final_(final),
279 opcode_(opcode),
280 data_(data),
281 offset_(offset),
282 size_(size) {}
284 WebSocketChannel::PendingReceivedFrame::~PendingReceivedFrame() {}
286 void WebSocketChannel::PendingReceivedFrame::ResetOpcode() {
287 DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(opcode_));
288 opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
291 void WebSocketChannel::PendingReceivedFrame::DidConsume(uint64 bytes) {
292 DCHECK_LE(offset_, size_);
293 DCHECK_LE(bytes, size_ - offset_);
294 offset_ += bytes;
297 WebSocketChannel::WebSocketChannel(
298 scoped_ptr<WebSocketEventInterface> event_interface,
299 URLRequestContext* url_request_context)
300 : event_interface_(event_interface.Pass()),
301 url_request_context_(url_request_context),
302 send_quota_low_water_mark_(kDefaultSendQuotaLowWaterMark),
303 send_quota_high_water_mark_(kDefaultSendQuotaHighWaterMark),
304 current_send_quota_(0),
305 current_receive_quota_(0),
306 closing_handshake_timeout_(base::TimeDelta::FromSeconds(
307 kClosingHandshakeTimeoutSeconds)),
308 underlying_connection_close_timeout_(base::TimeDelta::FromSeconds(
309 kUnderlyingConnectionCloseTimeoutSeconds)),
310 has_received_close_frame_(false),
311 received_close_code_(0),
312 state_(FRESHLY_CONSTRUCTED),
313 notification_sender_(new HandshakeNotificationSender(this)),
314 sending_text_message_(false),
315 receiving_text_message_(false),
316 expecting_to_handle_continuation_(false),
317 initial_frame_forwarded_(false) {}
319 WebSocketChannel::~WebSocketChannel() {
320 // The stream may hold a pointer to read_frames_, and so it needs to be
321 // destroyed first.
322 stream_.reset();
323 // The timer may have a callback pointing back to us, so stop it just in case
324 // someone decides to run the event loop from their destructor.
325 close_timer_.Stop();
328 void WebSocketChannel::SendAddChannelRequest(
329 const GURL& socket_url,
330 const std::vector<std::string>& requested_subprotocols,
331 const url::Origin& origin) {
332 // Delegate to the tested version.
333 SendAddChannelRequestWithSuppliedCreator(
334 socket_url,
335 requested_subprotocols,
336 origin,
337 base::Bind(&WebSocketStream::CreateAndConnectStream));
340 void WebSocketChannel::SetState(State new_state) {
341 DCHECK_NE(state_, new_state);
343 if (new_state == CONNECTED)
344 established_on_ = base::TimeTicks::Now();
345 if (state_ == CONNECTED && !established_on_.is_null()) {
346 UMA_HISTOGRAM_LONG_TIMES(
347 "Net.WebSocket.Duration", base::TimeTicks::Now() - established_on_);
350 state_ = new_state;
353 bool WebSocketChannel::InClosingState() const {
354 // The state RECV_CLOSED is not supported here, because it is only used in one
355 // code path and should not leak into the code in general.
356 DCHECK_NE(RECV_CLOSED, state_)
357 << "InClosingState called with state_ == RECV_CLOSED";
358 return state_ == SEND_CLOSED || state_ == CLOSE_WAIT || state_ == CLOSED;
361 void WebSocketChannel::SendFrame(bool fin,
362 WebSocketFrameHeader::OpCode op_code,
363 const std::vector<char>& data) {
364 if (data.size() > INT_MAX) {
365 NOTREACHED() << "Frame size sanity check failed";
366 return;
368 if (stream_ == NULL) {
369 LOG(DFATAL) << "Got SendFrame without a connection established; "
370 << "misbehaving renderer? fin=" << fin << " op_code=" << op_code
371 << " data.size()=" << data.size();
372 return;
374 if (InClosingState()) {
375 DVLOG(1) << "SendFrame called in state " << state_
376 << ". This may be a bug, or a harmless race.";
377 return;
379 if (state_ != CONNECTED) {
380 NOTREACHED() << "SendFrame() called in state " << state_;
381 return;
383 if (data.size() > base::checked_cast<size_t>(current_send_quota_)) {
384 // TODO(ricea): Kill renderer.
385 ignore_result(
386 FailChannel("Send quota exceeded", kWebSocketErrorGoingAway, ""));
387 // |this| has been deleted.
388 return;
390 if (!WebSocketFrameHeader::IsKnownDataOpCode(op_code)) {
391 LOG(DFATAL) << "Got SendFrame with bogus op_code " << op_code
392 << "; misbehaving renderer? fin=" << fin
393 << " data.size()=" << data.size();
394 return;
396 if (op_code == WebSocketFrameHeader::kOpCodeText ||
397 (op_code == WebSocketFrameHeader::kOpCodeContinuation &&
398 sending_text_message_)) {
399 StreamingUtf8Validator::State state =
400 outgoing_utf8_validator_.AddBytes(vector_as_array(&data), data.size());
401 if (state == StreamingUtf8Validator::INVALID ||
402 (state == StreamingUtf8Validator::VALID_MIDPOINT && fin)) {
403 // TODO(ricea): Kill renderer.
404 ignore_result(
405 FailChannel("Browser sent a text frame containing invalid UTF-8",
406 kWebSocketErrorGoingAway,
407 ""));
408 // |this| has been deleted.
409 return;
411 sending_text_message_ = !fin;
412 DCHECK(!fin || state == StreamingUtf8Validator::VALID_ENDPOINT);
414 current_send_quota_ -= data.size();
415 // TODO(ricea): If current_send_quota_ has dropped below
416 // send_quota_low_water_mark_, it might be good to increase the "low
417 // water mark" and "high water mark", but only if the link to the WebSocket
418 // server is not saturated.
419 scoped_refptr<IOBuffer> buffer(new IOBuffer(data.size()));
420 std::copy(data.begin(), data.end(), buffer->data());
421 ignore_result(SendFrameFromIOBuffer(fin, op_code, buffer, data.size()));
422 // |this| may have been deleted.
425 void WebSocketChannel::SendFlowControl(int64 quota) {
426 DCHECK(state_ == CONNECTING || state_ == CONNECTED || state_ == SEND_CLOSED ||
427 state_ == CLOSE_WAIT);
428 // TODO(ricea): Kill the renderer if it tries to send us a negative quota
429 // value or > INT_MAX.
430 DCHECK_GE(quota, 0);
431 DCHECK_LE(quota, INT_MAX);
432 if (!pending_received_frames_.empty()) {
433 DCHECK_EQ(0u, current_receive_quota_);
435 while (!pending_received_frames_.empty() && quota > 0) {
436 PendingReceivedFrame& front = pending_received_frames_.front();
437 const uint64 data_size = front.size() - front.offset();
438 const uint64 bytes_to_send =
439 std::min(base::checked_cast<uint64>(quota), data_size);
440 const bool final = front.final() && data_size == bytes_to_send;
441 const char* data =
442 front.data().get() ? front.data()->data() + front.offset() : NULL;
443 DCHECK(!bytes_to_send || data) << "Non empty data should not be null.";
444 const std::vector<char> data_vector(data, data + bytes_to_send);
445 DVLOG(3) << "Sending frame previously split due to quota to the "
446 << "renderer: quota=" << quota << " data_size=" << data_size
447 << " bytes_to_send=" << bytes_to_send;
448 if (event_interface_->OnDataFrame(final, front.opcode(), data_vector) ==
449 CHANNEL_DELETED)
450 return;
451 if (bytes_to_send < data_size) {
452 front.DidConsume(bytes_to_send);
453 front.ResetOpcode();
454 return;
456 quota -= bytes_to_send;
458 pending_received_frames_.pop();
460 // If current_receive_quota_ == 0 then there is no pending ReadFrames()
461 // operation.
462 const bool start_read =
463 current_receive_quota_ == 0 && quota > 0 &&
464 (state_ == CONNECTED || state_ == SEND_CLOSED || state_ == CLOSE_WAIT);
465 current_receive_quota_ += quota;
466 if (start_read)
467 ignore_result(ReadFrames());
468 // |this| may have been deleted.
471 void WebSocketChannel::StartClosingHandshake(uint16 code,
472 const std::string& reason) {
473 if (InClosingState()) {
474 // When the associated renderer process is killed while the channel is in
475 // CLOSING state we reach here.
476 DVLOG(1) << "StartClosingHandshake called in state " << state_
477 << ". This may be a bug, or a harmless race.";
478 return;
480 if (state_ == CONNECTING) {
481 // Abort the in-progress handshake and drop the connection immediately.
482 stream_request_.reset();
483 SetState(CLOSED);
484 DoDropChannel(false, kWebSocketErrorAbnormalClosure, "");
485 return;
487 if (state_ != CONNECTED) {
488 NOTREACHED() << "StartClosingHandshake() called in state " << state_;
489 return;
492 DCHECK(!close_timer_.IsRunning());
493 // This use of base::Unretained() is safe because we stop the timer in the
494 // destructor.
495 close_timer_.Start(
496 FROM_HERE,
497 closing_handshake_timeout_,
498 base::Bind(&WebSocketChannel::CloseTimeout, base::Unretained(this)));
500 // Javascript actually only permits 1000 and 3000-4999, but the implementation
501 // itself may produce different codes. The length of |reason| is also checked
502 // by Javascript.
503 if (!IsStrictlyValidCloseStatusCode(code) ||
504 reason.size() > kMaximumCloseReasonLength) {
505 // "InternalServerError" is actually used for errors from any endpoint, per
506 // errata 3227 to RFC6455. If the renderer is sending us an invalid code or
507 // reason it must be malfunctioning in some way, and based on that we
508 // interpret this as an internal error.
509 if (SendClose(kWebSocketErrorInternalServerError, "") != CHANNEL_DELETED) {
510 DCHECK_EQ(CONNECTED, state_);
511 SetState(SEND_CLOSED);
513 return;
515 if (SendClose(
516 code,
517 StreamingUtf8Validator::Validate(reason) ? reason : std::string()) ==
518 CHANNEL_DELETED)
519 return;
520 DCHECK_EQ(CONNECTED, state_);
521 SetState(SEND_CLOSED);
524 void WebSocketChannel::SendAddChannelRequestForTesting(
525 const GURL& socket_url,
526 const std::vector<std::string>& requested_subprotocols,
527 const url::Origin& origin,
528 const WebSocketStreamCreator& creator) {
529 SendAddChannelRequestWithSuppliedCreator(
530 socket_url, requested_subprotocols, origin, creator);
533 void WebSocketChannel::SetClosingHandshakeTimeoutForTesting(
534 base::TimeDelta delay) {
535 closing_handshake_timeout_ = delay;
538 void WebSocketChannel::SetUnderlyingConnectionCloseTimeoutForTesting(
539 base::TimeDelta delay) {
540 underlying_connection_close_timeout_ = delay;
543 void WebSocketChannel::SendAddChannelRequestWithSuppliedCreator(
544 const GURL& socket_url,
545 const std::vector<std::string>& requested_subprotocols,
546 const url::Origin& origin,
547 const WebSocketStreamCreator& creator) {
548 DCHECK_EQ(FRESHLY_CONSTRUCTED, state_);
549 if (!socket_url.SchemeIsWSOrWSS()) {
550 // TODO(ricea): Kill the renderer (this error should have been caught by
551 // Javascript).
552 ignore_result(event_interface_->OnAddChannelResponse(true, "", ""));
553 // |this| is deleted here.
554 return;
556 socket_url_ = socket_url;
557 scoped_ptr<WebSocketStream::ConnectDelegate> connect_delegate(
558 new ConnectDelegate(this));
559 stream_request_ = creator.Run(socket_url_,
560 requested_subprotocols,
561 origin,
562 url_request_context_,
563 BoundNetLog(),
564 connect_delegate.Pass());
565 SetState(CONNECTING);
568 void WebSocketChannel::OnConnectSuccess(scoped_ptr<WebSocketStream> stream) {
569 DCHECK(stream);
570 DCHECK_EQ(CONNECTING, state_);
572 stream_ = stream.Pass();
574 SetState(CONNECTED);
576 if (event_interface_->OnAddChannelResponse(
577 false, stream_->GetSubProtocol(), stream_->GetExtensions()) ==
578 CHANNEL_DELETED)
579 return;
581 // TODO(ricea): Get flow control information from the WebSocketStream once we
582 // have a multiplexing WebSocketStream.
583 current_send_quota_ = send_quota_high_water_mark_;
584 if (event_interface_->OnFlowControl(send_quota_high_water_mark_) ==
585 CHANNEL_DELETED)
586 return;
588 // |stream_request_| is not used once the connection has succeeded.
589 stream_request_.reset();
591 ignore_result(ReadFrames());
592 // |this| may have been deleted.
595 void WebSocketChannel::OnConnectFailure(const std::string& message) {
596 DCHECK_EQ(CONNECTING, state_);
598 // Copy the message before we delete its owner.
599 std::string message_copy = message;
601 SetState(CLOSED);
602 stream_request_.reset();
604 if (CHANNEL_DELETED ==
605 notification_sender_->SendImmediately(event_interface_.get())) {
606 // |this| has been deleted.
607 return;
609 ChannelState result = event_interface_->OnFailChannel(message_copy);
610 DCHECK_EQ(CHANNEL_DELETED, result);
611 // |this| has been deleted.
614 void WebSocketChannel::OnSSLCertificateError(
615 scoped_ptr<WebSocketEventInterface::SSLErrorCallbacks> ssl_error_callbacks,
616 const SSLInfo& ssl_info,
617 bool fatal) {
618 ignore_result(event_interface_->OnSSLCertificateError(
619 ssl_error_callbacks.Pass(), socket_url_, ssl_info, fatal));
622 void WebSocketChannel::OnStartOpeningHandshake(
623 scoped_ptr<WebSocketHandshakeRequestInfo> request) {
624 DCHECK(!notification_sender_->handshake_request_info());
626 // Because it is hard to handle an IPC error synchronously is difficult,
627 // we asynchronously notify the information.
628 notification_sender_->set_handshake_request_info(request.Pass());
629 ScheduleOpeningHandshakeNotification();
632 void WebSocketChannel::OnFinishOpeningHandshake(
633 scoped_ptr<WebSocketHandshakeResponseInfo> response) {
634 DCHECK(!notification_sender_->handshake_response_info());
636 // Because it is hard to handle an IPC error synchronously is difficult,
637 // we asynchronously notify the information.
638 notification_sender_->set_handshake_response_info(response.Pass());
639 ScheduleOpeningHandshakeNotification();
642 void WebSocketChannel::ScheduleOpeningHandshakeNotification() {
643 base::MessageLoop::current()->PostTask(
644 FROM_HERE,
645 base::Bind(HandshakeNotificationSender::Send,
646 notification_sender_->AsWeakPtr()));
649 ChannelState WebSocketChannel::WriteFrames() {
650 int result = OK;
651 do {
652 // This use of base::Unretained is safe because this object owns the
653 // WebSocketStream and destroying it cancels all callbacks.
654 result = stream_->WriteFrames(
655 data_being_sent_->frames(),
656 base::Bind(base::IgnoreResult(&WebSocketChannel::OnWriteDone),
657 base::Unretained(this),
658 false));
659 if (result != ERR_IO_PENDING) {
660 if (OnWriteDone(true, result) == CHANNEL_DELETED)
661 return CHANNEL_DELETED;
662 // OnWriteDone() returns CHANNEL_DELETED on error. Here |state_| is
663 // guaranteed to be the same as before OnWriteDone() call.
665 } while (result == OK && data_being_sent_);
666 return CHANNEL_ALIVE;
669 ChannelState WebSocketChannel::OnWriteDone(bool synchronous, int result) {
670 DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
671 DCHECK_NE(CONNECTING, state_);
672 DCHECK_NE(ERR_IO_PENDING, result);
673 DCHECK(data_being_sent_);
674 switch (result) {
675 case OK:
676 if (data_to_send_next_) {
677 data_being_sent_ = data_to_send_next_.Pass();
678 if (!synchronous)
679 return WriteFrames();
680 } else {
681 data_being_sent_.reset();
682 if (current_send_quota_ < send_quota_low_water_mark_) {
683 // TODO(ricea): Increase low_water_mark and high_water_mark if
684 // throughput is high, reduce them if throughput is low. Low water
685 // mark needs to be >= the bandwidth delay product *of the IPC
686 // channel*. Because factors like context-switch time, thread wake-up
687 // time, and bus speed come into play it is complex and probably needs
688 // to be determined empirically.
689 DCHECK_LE(send_quota_low_water_mark_, send_quota_high_water_mark_);
690 // TODO(ricea): Truncate quota by the quota specified by the remote
691 // server, if the protocol in use supports quota.
692 int fresh_quota = send_quota_high_water_mark_ - current_send_quota_;
693 current_send_quota_ += fresh_quota;
694 return event_interface_->OnFlowControl(fresh_quota);
697 return CHANNEL_ALIVE;
699 // If a recoverable error condition existed, it would go here.
701 default:
702 DCHECK_LT(result, 0)
703 << "WriteFrames() should only return OK or ERR_ codes";
705 stream_->Close();
706 SetState(CLOSED);
707 return DoDropChannel(false, kWebSocketErrorAbnormalClosure, "");
711 ChannelState WebSocketChannel::ReadFrames() {
712 int result = OK;
713 while (result == OK && current_receive_quota_ > 0) {
714 // This use of base::Unretained is safe because this object owns the
715 // WebSocketStream, and any pending reads will be cancelled when it is
716 // destroyed.
717 result = stream_->ReadFrames(
718 &read_frames_,
719 base::Bind(base::IgnoreResult(&WebSocketChannel::OnReadDone),
720 base::Unretained(this),
721 false));
722 if (result != ERR_IO_PENDING) {
723 if (OnReadDone(true, result) == CHANNEL_DELETED)
724 return CHANNEL_DELETED;
726 DCHECK_NE(CLOSED, state_);
728 return CHANNEL_ALIVE;
731 ChannelState WebSocketChannel::OnReadDone(bool synchronous, int result) {
732 DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
733 DCHECK_NE(CONNECTING, state_);
734 DCHECK_NE(ERR_IO_PENDING, result);
735 switch (result) {
736 case OK:
737 // ReadFrames() must use ERR_CONNECTION_CLOSED for a closed connection
738 // with no data read, not an empty response.
739 DCHECK(!read_frames_.empty())
740 << "ReadFrames() returned OK, but nothing was read.";
741 for (size_t i = 0; i < read_frames_.size(); ++i) {
742 scoped_ptr<WebSocketFrame> frame(read_frames_[i]);
743 read_frames_[i] = NULL;
744 if (HandleFrame(frame.Pass()) == CHANNEL_DELETED)
745 return CHANNEL_DELETED;
747 read_frames_.clear();
748 // There should always be a call to ReadFrames pending.
749 // TODO(ricea): Unless we are out of quota.
750 DCHECK_NE(CLOSED, state_);
751 if (!synchronous)
752 return ReadFrames();
753 return CHANNEL_ALIVE;
755 case ERR_WS_PROTOCOL_ERROR:
756 // This could be kWebSocketErrorProtocolError (specifically, non-minimal
757 // encoding of payload length) or kWebSocketErrorMessageTooBig, or an
758 // extension-specific error.
759 return FailChannel("Invalid frame header",
760 kWebSocketErrorProtocolError,
761 "WebSocket Protocol Error");
763 default:
764 DCHECK_LT(result, 0)
765 << "ReadFrames() should only return OK or ERR_ codes";
767 stream_->Close();
768 SetState(CLOSED);
770 uint16 code = kWebSocketErrorAbnormalClosure;
771 std::string reason = "";
772 bool was_clean = false;
773 if (has_received_close_frame_) {
774 code = received_close_code_;
775 reason = received_close_reason_;
776 was_clean = (result == ERR_CONNECTION_CLOSED);
779 return DoDropChannel(was_clean, code, reason);
783 ChannelState WebSocketChannel::HandleFrame(scoped_ptr<WebSocketFrame> frame) {
784 if (frame->header.masked) {
785 // RFC6455 Section 5.1 "A client MUST close a connection if it detects a
786 // masked frame."
787 return FailChannel(
788 "A server must not mask any frames that it sends to the "
789 "client.",
790 kWebSocketErrorProtocolError,
791 "Masked frame from server");
793 const WebSocketFrameHeader::OpCode opcode = frame->header.opcode;
794 DCHECK(!WebSocketFrameHeader::IsKnownControlOpCode(opcode) ||
795 frame->header.final);
796 if (frame->header.reserved1 || frame->header.reserved2 ||
797 frame->header.reserved3) {
798 return FailChannel(base::StringPrintf(
799 "One or more reserved bits are on: reserved1 = %d, "
800 "reserved2 = %d, reserved3 = %d",
801 static_cast<int>(frame->header.reserved1),
802 static_cast<int>(frame->header.reserved2),
803 static_cast<int>(frame->header.reserved3)),
804 kWebSocketErrorProtocolError,
805 "Invalid reserved bit");
808 // Respond to the frame appropriately to its type.
809 return HandleFrameByState(
810 opcode, frame->header.final, frame->data, frame->header.payload_length);
813 ChannelState WebSocketChannel::HandleFrameByState(
814 const WebSocketFrameHeader::OpCode opcode,
815 bool final,
816 const scoped_refptr<IOBuffer>& data_buffer,
817 uint64 size) {
818 DCHECK_NE(RECV_CLOSED, state_)
819 << "HandleFrame() does not support being called re-entrantly from within "
820 "SendClose()";
821 DCHECK_NE(CLOSED, state_);
822 if (state_ == CLOSE_WAIT) {
823 std::string frame_name;
824 GetFrameTypeForOpcode(opcode, &frame_name);
826 // FailChannel() won't send another Close frame.
827 return FailChannel(
828 frame_name + " received after close", kWebSocketErrorProtocolError, "");
830 switch (opcode) {
831 case WebSocketFrameHeader::kOpCodeText: // fall-thru
832 case WebSocketFrameHeader::kOpCodeBinary:
833 case WebSocketFrameHeader::kOpCodeContinuation:
834 return HandleDataFrame(opcode, final, data_buffer, size);
836 case WebSocketFrameHeader::kOpCodePing:
837 DVLOG(1) << "Got Ping of size " << size;
838 if (state_ == CONNECTED)
839 return SendFrameFromIOBuffer(
840 true, WebSocketFrameHeader::kOpCodePong, data_buffer, size);
841 DVLOG(3) << "Ignored ping in state " << state_;
842 return CHANNEL_ALIVE;
844 case WebSocketFrameHeader::kOpCodePong:
845 DVLOG(1) << "Got Pong of size " << size;
846 // There is no need to do anything with pong messages.
847 return CHANNEL_ALIVE;
849 case WebSocketFrameHeader::kOpCodeClose: {
850 // TODO(ricea): If there is a message which is queued for transmission to
851 // the renderer, then the renderer should not receive an
852 // OnClosingHandshake or OnDropChannel IPC until the queued message has
853 // been completedly transmitted.
854 uint16 code = kWebSocketNormalClosure;
855 std::string reason;
856 std::string message;
857 if (!ParseClose(data_buffer, size, &code, &reason, &message)) {
858 return FailChannel(message, code, reason);
860 // TODO(ricea): Find a way to safely log the message from the close
861 // message (escape control codes and so on).
862 DVLOG(1) << "Got Close with code " << code;
863 switch (state_) {
864 case CONNECTED:
865 SetState(RECV_CLOSED);
867 if (SendClose(code, reason) == CHANNEL_DELETED)
868 return CHANNEL_DELETED;
869 DCHECK_EQ(RECV_CLOSED, state_);
871 SetState(CLOSE_WAIT);
872 DCHECK(!close_timer_.IsRunning());
873 // This use of base::Unretained() is safe because we stop the timer
874 // in the destructor.
875 close_timer_.Start(
876 FROM_HERE,
877 underlying_connection_close_timeout_,
878 base::Bind(
879 &WebSocketChannel::CloseTimeout, base::Unretained(this)));
881 if (event_interface_->OnClosingHandshake() == CHANNEL_DELETED)
882 return CHANNEL_DELETED;
883 has_received_close_frame_ = true;
884 received_close_code_ = code;
885 received_close_reason_ = reason;
886 break;
888 case SEND_CLOSED:
889 SetState(CLOSE_WAIT);
890 DCHECK(close_timer_.IsRunning());
891 close_timer_.Stop();
892 // This use of base::Unretained() is safe because we stop the timer
893 // in the destructor.
894 close_timer_.Start(
895 FROM_HERE,
896 underlying_connection_close_timeout_,
897 base::Bind(
898 &WebSocketChannel::CloseTimeout, base::Unretained(this)));
900 // From RFC6455 section 7.1.5: "Each endpoint
901 // will see the status code sent by the other end as _The WebSocket
902 // Connection Close Code_."
903 has_received_close_frame_ = true;
904 received_close_code_ = code;
905 received_close_reason_ = reason;
906 break;
908 default:
909 LOG(DFATAL) << "Got Close in unexpected state " << state_;
910 break;
912 return CHANNEL_ALIVE;
915 default:
916 return FailChannel(
917 base::StringPrintf("Unrecognized frame opcode: %d", opcode),
918 kWebSocketErrorProtocolError,
919 "Unknown opcode");
923 ChannelState WebSocketChannel::HandleDataFrame(
924 WebSocketFrameHeader::OpCode opcode,
925 bool final,
926 const scoped_refptr<IOBuffer>& data_buffer,
927 uint64 size) {
928 if (state_ != CONNECTED) {
929 DVLOG(3) << "Ignored data packet received in state " << state_;
930 return CHANNEL_ALIVE;
932 DCHECK(opcode == WebSocketFrameHeader::kOpCodeContinuation ||
933 opcode == WebSocketFrameHeader::kOpCodeText ||
934 opcode == WebSocketFrameHeader::kOpCodeBinary);
935 const bool got_continuation =
936 (opcode == WebSocketFrameHeader::kOpCodeContinuation);
937 if (got_continuation != expecting_to_handle_continuation_) {
938 const std::string console_log = got_continuation
939 ? "Received unexpected continuation frame."
940 : "Received start of new message but previous message is unfinished.";
941 const std::string reason = got_continuation
942 ? "Unexpected continuation"
943 : "Previous data frame unfinished";
944 return FailChannel(console_log, kWebSocketErrorProtocolError, reason);
946 expecting_to_handle_continuation_ = !final;
947 WebSocketFrameHeader::OpCode opcode_to_send = opcode;
948 if (!initial_frame_forwarded_ &&
949 opcode == WebSocketFrameHeader::kOpCodeContinuation) {
950 opcode_to_send = receiving_text_message_
951 ? WebSocketFrameHeader::kOpCodeText
952 : WebSocketFrameHeader::kOpCodeBinary;
954 if (opcode == WebSocketFrameHeader::kOpCodeText ||
955 (opcode == WebSocketFrameHeader::kOpCodeContinuation &&
956 receiving_text_message_)) {
957 // This call is not redundant when size == 0 because it tells us what
958 // the current state is.
959 StreamingUtf8Validator::State state = incoming_utf8_validator_.AddBytes(
960 size ? data_buffer->data() : NULL, static_cast<size_t>(size));
961 if (state == StreamingUtf8Validator::INVALID ||
962 (state == StreamingUtf8Validator::VALID_MIDPOINT && final)) {
963 return FailChannel("Could not decode a text frame as UTF-8.",
964 kWebSocketErrorProtocolError,
965 "Invalid UTF-8 in text frame");
967 receiving_text_message_ = !final;
968 DCHECK(!final || state == StreamingUtf8Validator::VALID_ENDPOINT);
970 if (size == 0U && !final)
971 return CHANNEL_ALIVE;
973 initial_frame_forwarded_ = !final;
974 if (size > current_receive_quota_ || !pending_received_frames_.empty()) {
975 const bool no_quota = (current_receive_quota_ == 0);
976 DCHECK(no_quota || pending_received_frames_.empty());
977 DVLOG(3) << "Queueing frame to renderer due to quota. quota="
978 << current_receive_quota_ << " size=" << size;
979 WebSocketFrameHeader::OpCode opcode_to_queue =
980 no_quota ? opcode_to_send : WebSocketFrameHeader::kOpCodeContinuation;
981 pending_received_frames_.push(PendingReceivedFrame(
982 final, opcode_to_queue, data_buffer, current_receive_quota_, size));
983 if (no_quota)
984 return CHANNEL_ALIVE;
985 size = current_receive_quota_;
986 final = false;
989 // TODO(ricea): Can this copy be eliminated?
990 const char* const data_begin = size ? data_buffer->data() : NULL;
991 const char* const data_end = data_begin + size;
992 const std::vector<char> data(data_begin, data_end);
993 current_receive_quota_ -= size;
995 // Sends the received frame to the renderer process.
996 return event_interface_->OnDataFrame(final, opcode_to_send, data);
999 ChannelState WebSocketChannel::SendFrameFromIOBuffer(
1000 bool fin,
1001 WebSocketFrameHeader::OpCode op_code,
1002 const scoped_refptr<IOBuffer>& buffer,
1003 uint64 size) {
1004 DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
1005 DCHECK(stream_);
1007 scoped_ptr<WebSocketFrame> frame(new WebSocketFrame(op_code));
1008 WebSocketFrameHeader& header = frame->header;
1009 header.final = fin;
1010 header.masked = true;
1011 header.payload_length = size;
1012 frame->data = buffer;
1014 if (data_being_sent_) {
1015 // Either the link to the WebSocket server is saturated, or several messages
1016 // are being sent in a batch.
1017 // TODO(ricea): Keep some statistics to work out the situation and adjust
1018 // quota appropriately.
1019 if (!data_to_send_next_)
1020 data_to_send_next_.reset(new SendBuffer);
1021 data_to_send_next_->AddFrame(frame.Pass());
1022 return CHANNEL_ALIVE;
1025 data_being_sent_.reset(new SendBuffer);
1026 data_being_sent_->AddFrame(frame.Pass());
1027 return WriteFrames();
1030 ChannelState WebSocketChannel::FailChannel(const std::string& message,
1031 uint16 code,
1032 const std::string& reason) {
1033 DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
1034 DCHECK_NE(CONNECTING, state_);
1035 DCHECK_NE(CLOSED, state_);
1037 // TODO(ricea): Logging.
1038 if (state_ == CONNECTED) {
1039 if (SendClose(code, reason) == CHANNEL_DELETED)
1040 return CHANNEL_DELETED;
1043 // Careful study of RFC6455 section 7.1.7 and 7.1.1 indicates the browser
1044 // should close the connection itself without waiting for the closing
1045 // handshake.
1046 stream_->Close();
1047 SetState(CLOSED);
1048 ChannelState result = event_interface_->OnFailChannel(message);
1049 DCHECK_EQ(CHANNEL_DELETED, result);
1050 return result;
1053 ChannelState WebSocketChannel::SendClose(uint16 code,
1054 const std::string& reason) {
1055 DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
1056 DCHECK_LE(reason.size(), kMaximumCloseReasonLength);
1057 scoped_refptr<IOBuffer> body;
1058 uint64 size = 0;
1059 if (code == kWebSocketErrorNoStatusReceived) {
1060 // Special case: translate kWebSocketErrorNoStatusReceived into a Close
1061 // frame with no payload.
1062 DCHECK(reason.empty());
1063 body = new IOBuffer(0);
1064 } else {
1065 const size_t payload_length = kWebSocketCloseCodeLength + reason.length();
1066 body = new IOBuffer(payload_length);
1067 size = payload_length;
1068 base::WriteBigEndian(body->data(), code);
1069 COMPILE_ASSERT(sizeof(code) == kWebSocketCloseCodeLength,
1070 they_should_both_be_two);
1071 std::copy(
1072 reason.begin(), reason.end(), body->data() + kWebSocketCloseCodeLength);
1074 if (SendFrameFromIOBuffer(
1075 true, WebSocketFrameHeader::kOpCodeClose, body, size) ==
1076 CHANNEL_DELETED)
1077 return CHANNEL_DELETED;
1078 return CHANNEL_ALIVE;
1081 bool WebSocketChannel::ParseClose(const scoped_refptr<IOBuffer>& buffer,
1082 uint64 size,
1083 uint16* code,
1084 std::string* reason,
1085 std::string* message) {
1086 reason->clear();
1087 if (size < kWebSocketCloseCodeLength) {
1088 if (size == 0U) {
1089 *code = kWebSocketErrorNoStatusReceived;
1090 return true;
1093 DVLOG(1) << "Close frame with payload size " << size << " received "
1094 << "(the first byte is " << std::hex
1095 << static_cast<int>(buffer->data()[0]) << ")";
1096 *code = kWebSocketErrorProtocolError;
1097 *message =
1098 "Received a broken close frame containing an invalid size body.";
1099 return false;
1102 const char* data = buffer->data();
1103 uint16 unchecked_code = 0;
1104 base::ReadBigEndian(data, &unchecked_code);
1105 COMPILE_ASSERT(sizeof(unchecked_code) == kWebSocketCloseCodeLength,
1106 they_should_both_be_two_bytes);
1108 switch (unchecked_code) {
1109 case kWebSocketErrorNoStatusReceived:
1110 case kWebSocketErrorAbnormalClosure:
1111 case kWebSocketErrorTlsHandshake:
1112 *code = kWebSocketErrorProtocolError;
1113 *message =
1114 "Received a broken close frame containing a reserved status code.";
1115 return false;
1117 default:
1118 *code = unchecked_code;
1119 break;
1122 std::string text(data + kWebSocketCloseCodeLength, data + size);
1123 if (StreamingUtf8Validator::Validate(text)) {
1124 reason->swap(text);
1125 return true;
1128 *code = kWebSocketErrorProtocolError;
1129 *reason = "Invalid UTF-8 in Close frame";
1130 *message = "Received a broken close frame containing invalid UTF-8.";
1131 return false;
1134 ChannelState WebSocketChannel::DoDropChannel(bool was_clean,
1135 uint16 code,
1136 const std::string& reason) {
1137 if (CHANNEL_DELETED ==
1138 notification_sender_->SendImmediately(event_interface_.get()))
1139 return CHANNEL_DELETED;
1140 ChannelState result =
1141 event_interface_->OnDropChannel(was_clean, code, reason);
1142 DCHECK_EQ(CHANNEL_DELETED, result);
1143 return result;
1146 void WebSocketChannel::CloseTimeout() {
1147 stream_->Close();
1148 SetState(CLOSED);
1149 DoDropChannel(false, kWebSocketErrorAbnormalClosure, "");
1150 // |this| has been deleted.
1153 } // namespace net