Add more checks to investigate SupervisedUserPrefStore crash at startup.
[chromium-blink-merge.git] / extensions / browser / api / cast_channel / cast_transport.cc
blob1b35d6be91bb9bb3d2fa5269f3ce32170282c000
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "extensions/browser/api/cast_channel/cast_transport.h"
7 #include <string>
9 #include "base/bind.h"
10 #include "base/format_macros.h"
11 #include "base/numerics/safe_conversions.h"
12 #include "base/strings/stringprintf.h"
13 #include "extensions/browser/api/cast_channel/cast_framer.h"
14 #include "extensions/browser/api/cast_channel/cast_message_util.h"
15 #include "extensions/browser/api/cast_channel/logger.h"
16 #include "extensions/browser/api/cast_channel/logger_util.h"
17 #include "extensions/common/api/cast_channel/cast_channel.pb.h"
18 #include "net/base/net_errors.h"
19 #include "net/socket/socket.h"
21 #define VLOG_WITH_CONNECTION(level) \
22 VLOG(level) << "[" << ip_endpoint_.ToString() << ", auth=" << channel_auth_ \
23 << "] "
25 namespace extensions {
26 namespace core_api {
27 namespace cast_channel {
29 CastTransportImpl::CastTransportImpl(net::Socket* socket,
30 int channel_id,
31 const net::IPEndPoint& ip_endpoint,
32 ChannelAuthType channel_auth,
33 scoped_refptr<Logger> logger)
34 : started_(false),
35 socket_(socket),
36 write_state_(WRITE_STATE_NONE),
37 read_state_(READ_STATE_NONE),
38 error_state_(CHANNEL_ERROR_NONE),
39 channel_id_(channel_id),
40 ip_endpoint_(ip_endpoint),
41 channel_auth_(channel_auth),
42 logger_(logger) {
43 DCHECK(socket);
45 // Buffer is reused across messages to minimize unnecessary buffer
46 // [re]allocations.
47 read_buffer_ = new net::GrowableIOBuffer();
48 read_buffer_->SetCapacity(MessageFramer::MessageHeader::max_message_size());
49 framer_.reset(new MessageFramer(read_buffer_));
52 CastTransportImpl::~CastTransportImpl() {
53 DCHECK(CalledOnValidThread());
54 FlushWriteQueue();
57 // static
58 proto::ReadState CastTransportImpl::ReadStateToProto(
59 CastTransportImpl::ReadState state) {
60 switch (state) {
61 case CastTransportImpl::READ_STATE_NONE:
62 return proto::READ_STATE_NONE;
63 case CastTransportImpl::READ_STATE_READ:
64 return proto::READ_STATE_READ;
65 case CastTransportImpl::READ_STATE_READ_COMPLETE:
66 return proto::READ_STATE_READ_COMPLETE;
67 case CastTransportImpl::READ_STATE_DO_CALLBACK:
68 return proto::READ_STATE_DO_CALLBACK;
69 case CastTransportImpl::READ_STATE_ERROR:
70 return proto::READ_STATE_ERROR;
71 default:
72 NOTREACHED();
73 return proto::READ_STATE_NONE;
77 // static
78 proto::WriteState CastTransportImpl::WriteStateToProto(
79 CastTransportImpl::WriteState state) {
80 switch (state) {
81 case CastTransportImpl::WRITE_STATE_NONE:
82 return proto::WRITE_STATE_NONE;
83 case CastTransportImpl::WRITE_STATE_WRITE:
84 return proto::WRITE_STATE_WRITE;
85 case CastTransportImpl::WRITE_STATE_WRITE_COMPLETE:
86 return proto::WRITE_STATE_WRITE_COMPLETE;
87 case CastTransportImpl::WRITE_STATE_DO_CALLBACK:
88 return proto::WRITE_STATE_DO_CALLBACK;
89 case CastTransportImpl::WRITE_STATE_ERROR:
90 return proto::WRITE_STATE_ERROR;
91 default:
92 NOTREACHED();
93 return proto::WRITE_STATE_NONE;
97 // static
98 proto::ErrorState CastTransportImpl::ErrorStateToProto(ChannelError state) {
99 switch (state) {
100 case CHANNEL_ERROR_NONE:
101 return proto::CHANNEL_ERROR_NONE;
102 case CHANNEL_ERROR_CHANNEL_NOT_OPEN:
103 return proto::CHANNEL_ERROR_CHANNEL_NOT_OPEN;
104 case CHANNEL_ERROR_AUTHENTICATION_ERROR:
105 return proto::CHANNEL_ERROR_AUTHENTICATION_ERROR;
106 case CHANNEL_ERROR_CONNECT_ERROR:
107 return proto::CHANNEL_ERROR_CONNECT_ERROR;
108 case CHANNEL_ERROR_SOCKET_ERROR:
109 return proto::CHANNEL_ERROR_SOCKET_ERROR;
110 case CHANNEL_ERROR_TRANSPORT_ERROR:
111 return proto::CHANNEL_ERROR_TRANSPORT_ERROR;
112 case CHANNEL_ERROR_INVALID_MESSAGE:
113 return proto::CHANNEL_ERROR_INVALID_MESSAGE;
114 case CHANNEL_ERROR_INVALID_CHANNEL_ID:
115 return proto::CHANNEL_ERROR_INVALID_CHANNEL_ID;
116 case CHANNEL_ERROR_CONNECT_TIMEOUT:
117 return proto::CHANNEL_ERROR_CONNECT_TIMEOUT;
118 case CHANNEL_ERROR_UNKNOWN:
119 return proto::CHANNEL_ERROR_UNKNOWN;
120 default:
121 NOTREACHED();
122 return proto::CHANNEL_ERROR_NONE;
126 void CastTransportImpl::SetReadDelegate(scoped_ptr<Delegate> read_delegate) {
127 DCHECK(CalledOnValidThread());
128 DCHECK(read_delegate);
129 read_delegate_ = read_delegate.Pass();
130 if (started_) {
131 read_delegate_->Start();
135 void CastTransportImpl::FlushWriteQueue() {
136 for (; !write_queue_.empty(); write_queue_.pop()) {
137 net::CompletionCallback& callback = write_queue_.front().callback;
138 callback.Run(net::ERR_FAILED);
139 callback.Reset();
143 void CastTransportImpl::SendMessage(const CastMessage& message,
144 const net::CompletionCallback& callback) {
145 DCHECK(CalledOnValidThread());
146 std::string serialized_message;
147 if (!MessageFramer::Serialize(message, &serialized_message)) {
148 logger_->LogSocketEventForMessage(channel_id_, proto::SEND_MESSAGE_FAILED,
149 message.namespace_(),
150 "Error when serializing message.");
151 callback.Run(net::ERR_FAILED);
152 return;
154 WriteRequest write_request(
155 message.namespace_(), serialized_message, callback);
157 write_queue_.push(write_request);
158 logger_->LogSocketEventForMessage(
159 channel_id_, proto::MESSAGE_ENQUEUED, message.namespace_(),
160 base::StringPrintf("Queue size: %" PRIuS, write_queue_.size()));
161 if (write_state_ == WRITE_STATE_NONE) {
162 SetWriteState(WRITE_STATE_WRITE);
163 OnWriteResult(net::OK);
167 CastTransportImpl::WriteRequest::WriteRequest(
168 const std::string& namespace_,
169 const std::string& payload,
170 const net::CompletionCallback& callback)
171 : message_namespace(namespace_), callback(callback) {
172 VLOG(2) << "WriteRequest size: " << payload.size();
173 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(payload),
174 payload.size());
177 CastTransportImpl::WriteRequest::~WriteRequest() {
180 void CastTransportImpl::SetReadState(ReadState read_state) {
181 if (read_state_ != read_state) {
182 read_state_ = read_state;
183 logger_->LogSocketReadState(channel_id_, ReadStateToProto(read_state_));
187 void CastTransportImpl::SetWriteState(WriteState write_state) {
188 if (write_state_ != write_state) {
189 write_state_ = write_state;
190 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_));
194 void CastTransportImpl::SetErrorState(ChannelError error_state) {
195 VLOG_WITH_CONNECTION(2) << "SetErrorState: " << error_state;
196 error_state_ = error_state;
199 void CastTransportImpl::OnWriteResult(int result) {
200 DCHECK(CalledOnValidThread());
201 if (write_queue_.empty()) {
202 SetWriteState(WRITE_STATE_NONE);
203 return;
206 // Network operations can either finish synchronously or asynchronously.
207 // This method executes the state machine transitions in a loop so that
208 // write state transitions happen even when network operations finish
209 // synchronously.
210 int rv = result;
211 do {
212 VLOG_WITH_CONNECTION(2) << "OnWriteResult (state=" << write_state_ << ", "
213 << "result=" << rv << ", "
214 << "queue size=" << write_queue_.size() << ")";
216 WriteState state = write_state_;
217 write_state_ = WRITE_STATE_NONE;
218 switch (state) {
219 case WRITE_STATE_WRITE:
220 rv = DoWrite();
221 break;
222 case WRITE_STATE_WRITE_COMPLETE:
223 rv = DoWriteComplete(rv);
224 break;
225 case WRITE_STATE_DO_CALLBACK:
226 rv = DoWriteCallback();
227 break;
228 case WRITE_STATE_ERROR:
229 rv = DoWriteError(rv);
230 break;
231 default:
232 NOTREACHED() << "BUG in write flow. Unknown state: " << state;
233 break;
235 } while (!write_queue_.empty() && rv != net::ERR_IO_PENDING &&
236 write_state_ != WRITE_STATE_NONE);
238 // No state change occurred in the switch case above. This means the
239 // write operation has completed.
240 if (write_state_ == WRITE_STATE_NONE) {
241 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_));
244 // If write loop is done because the queue is empty then set write
245 // state to NONE
246 if (write_queue_.empty()) {
247 SetWriteState(WRITE_STATE_NONE);
250 // Write loop is done - flush the remaining write operations if an error
251 // was encountered.
252 if (rv == net::ERR_FAILED) {
253 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_);
254 FlushWriteQueue();
258 int CastTransportImpl::DoWrite() {
259 DCHECK(!write_queue_.empty());
260 WriteRequest& request = write_queue_.front();
262 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = "
263 << request.io_buffer->size() << " bytes_written "
264 << request.io_buffer->BytesConsumed();
266 SetWriteState(WRITE_STATE_WRITE_COMPLETE);
268 int rv = socket_->Write(
269 request.io_buffer.get(), request.io_buffer->BytesRemaining(),
270 base::Bind(&CastTransportImpl::OnWriteResult, base::Unretained(this)));
271 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_WRITE, rv);
272 return rv;
275 int CastTransportImpl::DoWriteComplete(int result) {
276 VLOG_WITH_CONNECTION(2) << "DoWriteComplete result=" << result;
277 DCHECK(!write_queue_.empty());
278 if (result <= 0) { // NOTE that 0 also indicates an error
279 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR);
280 SetWriteState(WRITE_STATE_ERROR);
281 return result == 0 ? net::ERR_FAILED : result;
284 // Some bytes were successfully written
285 WriteRequest& request = write_queue_.front();
286 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer;
287 io_buffer->DidConsume(result);
288 if (io_buffer->BytesRemaining() == 0) { // Message fully sent
289 SetWriteState(WRITE_STATE_DO_CALLBACK);
290 } else {
291 SetWriteState(WRITE_STATE_WRITE);
294 return net::OK;
297 int CastTransportImpl::DoWriteCallback() {
298 VLOG_WITH_CONNECTION(2) << "DoWriteCallback";
299 DCHECK(!write_queue_.empty());
301 SetWriteState(WRITE_STATE_WRITE);
303 WriteRequest& request = write_queue_.front();
304 int bytes_consumed = request.io_buffer->BytesConsumed();
305 logger_->LogSocketEventForMessage(
306 channel_id_, proto::MESSAGE_WRITTEN, request.message_namespace,
307 base::StringPrintf("Bytes: %d", bytes_consumed));
308 request.callback.Run(net::OK);
309 write_queue_.pop();
310 return net::OK;
313 int CastTransportImpl::DoWriteError(int result) {
314 VLOG_WITH_CONNECTION(2) << "DoWriteError result=" << result;
315 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_);
316 DCHECK_LT(result, 0);
317 return net::ERR_FAILED;
320 void CastTransportImpl::Start() {
321 DCHECK(CalledOnValidThread());
322 DCHECK(!started_);
323 DCHECK(read_delegate_)
324 << "Read delegate must be set prior to calling Start()";
325 read_delegate_->Start();
326 if (read_state_ == READ_STATE_NONE) {
327 // Initialize and run the read state machine.
328 SetReadState(READ_STATE_READ);
329 OnReadResult(net::OK);
331 started_ = true;
334 void CastTransportImpl::OnReadResult(int result) {
335 DCHECK(CalledOnValidThread());
336 // Network operations can either finish synchronously or asynchronously.
337 // This method executes the state machine transitions in a loop so that
338 // write state transitions happen even when network operations finish
339 // synchronously.
340 int rv = result;
341 do {
342 VLOG_WITH_CONNECTION(2) << "OnReadResult(state=" << read_state_
343 << ", result=" << rv << ")";
344 ReadState state = read_state_;
345 read_state_ = READ_STATE_NONE;
347 switch (state) {
348 case READ_STATE_READ:
349 rv = DoRead();
350 break;
351 case READ_STATE_READ_COMPLETE:
352 rv = DoReadComplete(rv);
353 break;
354 case READ_STATE_DO_CALLBACK:
355 rv = DoReadCallback();
356 break;
357 case READ_STATE_ERROR:
358 rv = DoReadError(rv);
359 DCHECK_EQ(read_state_, READ_STATE_NONE);
360 break;
361 default:
362 NOTREACHED() << "BUG in read flow. Unknown state: " << state;
363 break;
365 } while (rv != net::ERR_IO_PENDING && read_state_ != READ_STATE_NONE);
367 // No state change occurred in do-while loop above. This means state has
368 // transitioned to NONE.
369 if (read_state_ == READ_STATE_NONE) {
370 logger_->LogSocketReadState(channel_id_, ReadStateToProto(read_state_));
373 if (rv == net::ERR_FAILED) {
374 VLOG_WITH_CONNECTION(2) << "Sending OnError().";
375 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_);
376 read_delegate_->OnError(error_state_, logger_->GetLastErrors(channel_id_));
380 int CastTransportImpl::DoRead() {
381 VLOG_WITH_CONNECTION(2) << "DoRead";
382 SetReadState(READ_STATE_READ_COMPLETE);
384 // Determine how many bytes need to be read.
385 size_t num_bytes_to_read = framer_->BytesRequested();
386 DCHECK_GT(num_bytes_to_read, 0u);
388 // Read up to num_bytes_to_read into |current_read_buffer_|.
389 return socket_->Read(
390 read_buffer_.get(), base::checked_cast<uint32>(num_bytes_to_read),
391 base::Bind(&CastTransportImpl::OnReadResult, base::Unretained(this)));
394 int CastTransportImpl::DoReadComplete(int result) {
395 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result;
397 if (result <= 0) {
398 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket.";
399 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR);
400 SetReadState(READ_STATE_ERROR);
401 return result == 0 ? net::ERR_FAILED : result;
404 size_t message_size;
405 DCHECK(!current_message_);
406 ChannelError framing_error;
407 current_message_ = framer_->Ingest(result, &message_size, &framing_error);
408 if (current_message_.get() && (framing_error == CHANNEL_ERROR_NONE)) {
409 DCHECK_GT(message_size, static_cast<size_t>(0));
410 logger_->LogSocketEventForMessage(
411 channel_id_, proto::MESSAGE_READ, current_message_->namespace_(),
412 base::StringPrintf("Message size: %u",
413 static_cast<uint32>(message_size)));
414 SetReadState(READ_STATE_DO_CALLBACK);
415 } else if (framing_error != CHANNEL_ERROR_NONE) {
416 DCHECK(!current_message_);
417 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE);
418 SetReadState(READ_STATE_ERROR);
419 } else {
420 DCHECK(!current_message_);
421 SetReadState(READ_STATE_READ);
423 return net::OK;
426 int CastTransportImpl::DoReadCallback() {
427 VLOG_WITH_CONNECTION(2) << "DoReadCallback";
428 if (!IsCastMessageValid(*current_message_)) {
429 SetReadState(READ_STATE_ERROR);
430 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE);
431 return net::ERR_INVALID_RESPONSE;
433 SetReadState(READ_STATE_READ);
434 logger_->LogSocketEventForMessage(channel_id_, proto::NOTIFY_ON_MESSAGE,
435 current_message_->namespace_(),
436 std::string());
438 read_delegate_->OnMessage(*current_message_);
439 current_message_.reset();
440 return net::OK;
443 int CastTransportImpl::DoReadError(int result) {
444 VLOG_WITH_CONNECTION(2) << "DoReadError";
445 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_);
446 DCHECK_LE(result, 0);
447 return net::ERR_FAILED;
450 } // namespace cast_channel
451 } // namespace core_api
452 } // namespace extensions