Re-subimission of https://codereview.chromium.org/1041213003/
[chromium-blink-merge.git] / extensions / browser / api / cast_channel / cast_transport.cc
blob777b16856a7066b767e0d962d7d20eb90a0786fb
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "extensions/browser/api/cast_channel/cast_transport.h"
7 #include <string>
9 #include "base/bind.h"
10 #include "base/format_macros.h"
11 #include "base/numerics/safe_conversions.h"
12 #include "base/strings/stringprintf.h"
13 #include "extensions/browser/api/cast_channel/cast_framer.h"
14 #include "extensions/browser/api/cast_channel/cast_message_util.h"
15 #include "extensions/browser/api/cast_channel/logger.h"
16 #include "extensions/browser/api/cast_channel/logger_util.h"
17 #include "extensions/common/api/cast_channel/cast_channel.pb.h"
18 #include "net/base/net_errors.h"
19 #include "net/socket/socket.h"
21 #define VLOG_WITH_CONNECTION(level) \
22 VLOG(level) << "[" << ip_endpoint_.ToString() << ", auth=" << channel_auth_ \
23 << "] "
25 namespace extensions {
26 namespace core_api {
27 namespace cast_channel {
29 CastTransportImpl::CastTransportImpl(net::Socket* socket,
30 int channel_id,
31 const net::IPEndPoint& ip_endpoint,
32 ChannelAuthType channel_auth,
33 scoped_refptr<Logger> logger)
34 : started_(false),
35 socket_(socket),
36 write_state_(WRITE_STATE_NONE),
37 read_state_(READ_STATE_NONE),
38 error_state_(CHANNEL_ERROR_NONE),
39 channel_id_(channel_id),
40 ip_endpoint_(ip_endpoint),
41 channel_auth_(channel_auth),
42 logger_(logger) {
43 DCHECK(socket);
45 // Buffer is reused across messages to minimize unnecessary buffer
46 // [re]allocations.
47 read_buffer_ = new net::GrowableIOBuffer();
48 read_buffer_->SetCapacity(MessageFramer::MessageHeader::max_message_size());
49 framer_.reset(new MessageFramer(read_buffer_));
52 CastTransportImpl::~CastTransportImpl() {
53 DCHECK(CalledOnValidThread());
54 FlushWriteQueue();
57 // static
58 proto::ReadState CastTransportImpl::ReadStateToProto(
59 CastTransportImpl::ReadState state) {
60 switch (state) {
61 case CastTransportImpl::READ_STATE_NONE:
62 return proto::READ_STATE_NONE;
63 case CastTransportImpl::READ_STATE_READ:
64 return proto::READ_STATE_READ;
65 case CastTransportImpl::READ_STATE_READ_COMPLETE:
66 return proto::READ_STATE_READ_COMPLETE;
67 case CastTransportImpl::READ_STATE_DO_CALLBACK:
68 return proto::READ_STATE_DO_CALLBACK;
69 case CastTransportImpl::READ_STATE_ERROR:
70 return proto::READ_STATE_ERROR;
71 default:
72 NOTREACHED();
73 return proto::READ_STATE_NONE;
77 // static
78 proto::WriteState CastTransportImpl::WriteStateToProto(
79 CastTransportImpl::WriteState state) {
80 switch (state) {
81 case CastTransportImpl::WRITE_STATE_NONE:
82 return proto::WRITE_STATE_NONE;
83 case CastTransportImpl::WRITE_STATE_WRITE:
84 return proto::WRITE_STATE_WRITE;
85 case CastTransportImpl::WRITE_STATE_WRITE_COMPLETE:
86 return proto::WRITE_STATE_WRITE_COMPLETE;
87 case CastTransportImpl::WRITE_STATE_DO_CALLBACK:
88 return proto::WRITE_STATE_DO_CALLBACK;
89 case CastTransportImpl::WRITE_STATE_ERROR:
90 return proto::WRITE_STATE_ERROR;
91 default:
92 NOTREACHED();
93 return proto::WRITE_STATE_NONE;
97 // static
98 proto::ErrorState CastTransportImpl::ErrorStateToProto(ChannelError state) {
99 switch (state) {
100 case CHANNEL_ERROR_NONE:
101 return proto::CHANNEL_ERROR_NONE;
102 case CHANNEL_ERROR_CHANNEL_NOT_OPEN:
103 return proto::CHANNEL_ERROR_CHANNEL_NOT_OPEN;
104 case CHANNEL_ERROR_AUTHENTICATION_ERROR:
105 return proto::CHANNEL_ERROR_AUTHENTICATION_ERROR;
106 case CHANNEL_ERROR_CONNECT_ERROR:
107 return proto::CHANNEL_ERROR_CONNECT_ERROR;
108 case CHANNEL_ERROR_SOCKET_ERROR:
109 return proto::CHANNEL_ERROR_SOCKET_ERROR;
110 case CHANNEL_ERROR_TRANSPORT_ERROR:
111 return proto::CHANNEL_ERROR_TRANSPORT_ERROR;
112 case CHANNEL_ERROR_INVALID_MESSAGE:
113 return proto::CHANNEL_ERROR_INVALID_MESSAGE;
114 case CHANNEL_ERROR_INVALID_CHANNEL_ID:
115 return proto::CHANNEL_ERROR_INVALID_CHANNEL_ID;
116 case CHANNEL_ERROR_CONNECT_TIMEOUT:
117 return proto::CHANNEL_ERROR_CONNECT_TIMEOUT;
118 case CHANNEL_ERROR_UNKNOWN:
119 return proto::CHANNEL_ERROR_UNKNOWN;
120 default:
121 NOTREACHED();
122 return proto::CHANNEL_ERROR_NONE;
126 void CastTransportImpl::SetReadDelegate(scoped_ptr<Delegate> delegate) {
127 DCHECK(CalledOnValidThread());
128 DCHECK(delegate);
129 delegate_ = delegate.Pass();
130 if (started_) {
131 delegate_->Start();
135 void CastTransportImpl::FlushWriteQueue() {
136 for (; !write_queue_.empty(); write_queue_.pop()) {
137 net::CompletionCallback& callback = write_queue_.front().callback;
138 callback.Run(net::ERR_FAILED);
139 callback.Reset();
143 void CastTransportImpl::SendMessage(const CastMessage& message,
144 const net::CompletionCallback& callback) {
145 DCHECK(CalledOnValidThread());
146 std::string serialized_message;
147 if (!MessageFramer::Serialize(message, &serialized_message)) {
148 logger_->LogSocketEventForMessage(channel_id_, proto::SEND_MESSAGE_FAILED,
149 message.namespace_(),
150 "Error when serializing message.");
151 callback.Run(net::ERR_FAILED);
152 return;
154 WriteRequest write_request(
155 message.namespace_(), serialized_message, callback);
157 write_queue_.push(write_request);
158 logger_->LogSocketEventForMessage(
159 channel_id_, proto::MESSAGE_ENQUEUED, message.namespace_(),
160 base::StringPrintf("Queue size: %" PRIuS, write_queue_.size()));
161 if (write_state_ == WRITE_STATE_NONE) {
162 SetWriteState(WRITE_STATE_WRITE);
163 OnWriteResult(net::OK);
167 CastTransportImpl::WriteRequest::WriteRequest(
168 const std::string& namespace_,
169 const std::string& payload,
170 const net::CompletionCallback& callback)
171 : message_namespace(namespace_), callback(callback) {
172 VLOG(2) << "WriteRequest size: " << payload.size();
173 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(payload),
174 payload.size());
177 CastTransportImpl::WriteRequest::~WriteRequest() {
180 void CastTransportImpl::SetReadState(ReadState read_state) {
181 if (read_state_ != read_state) {
182 read_state_ = read_state;
183 logger_->LogSocketReadState(channel_id_, ReadStateToProto(read_state_));
187 void CastTransportImpl::SetWriteState(WriteState write_state) {
188 if (write_state_ != write_state) {
189 write_state_ = write_state;
190 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_));
194 void CastTransportImpl::SetErrorState(ChannelError error_state) {
195 VLOG_WITH_CONNECTION(2) << "SetErrorState: " << error_state;
196 error_state_ = error_state;
199 void CastTransportImpl::OnWriteResult(int result) {
200 DCHECK(CalledOnValidThread());
201 if (write_queue_.empty()) {
202 SetWriteState(WRITE_STATE_NONE);
203 return;
206 // Network operations can either finish synchronously or asynchronously.
207 // This method executes the state machine transitions in a loop so that
208 // write state transitions happen even when network operations finish
209 // synchronously.
210 int rv = result;
211 do {
212 VLOG_WITH_CONNECTION(2) << "OnWriteResult (state=" << write_state_ << ", "
213 << "result=" << rv << ", "
214 << "queue size=" << write_queue_.size() << ")";
216 WriteState state = write_state_;
217 write_state_ = WRITE_STATE_NONE;
218 switch (state) {
219 case WRITE_STATE_WRITE:
220 rv = DoWrite();
221 break;
222 case WRITE_STATE_WRITE_COMPLETE:
223 rv = DoWriteComplete(rv);
224 break;
225 case WRITE_STATE_DO_CALLBACK:
226 rv = DoWriteCallback();
227 break;
228 case WRITE_STATE_ERROR:
229 rv = DoWriteError(rv);
230 break;
231 default:
232 NOTREACHED() << "BUG in write flow. Unknown state: " << state;
233 break;
235 } while (!write_queue_.empty() && rv != net::ERR_IO_PENDING &&
236 write_state_ != WRITE_STATE_NONE);
238 // No state change occurred in the switch case above. This means the
239 // write operation has completed.
240 if (write_state_ == WRITE_STATE_NONE) {
241 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_));
244 if (rv == net::ERR_FAILED) {
245 VLOG_WITH_CONNECTION(2) << "Sending OnError().";
246 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_);
247 delegate_->OnError(error_state_);
250 // If write loop is done because the queue is empty then set write
251 // state to NONE
252 if (write_queue_.empty()) {
253 SetWriteState(WRITE_STATE_NONE);
256 // Write loop is done - flush the remaining write operations if an error
257 // was encountered.
258 if (rv == net::ERR_FAILED) {
259 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_);
260 FlushWriteQueue();
264 int CastTransportImpl::DoWrite() {
265 DCHECK(!write_queue_.empty());
266 WriteRequest& request = write_queue_.front();
268 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = "
269 << request.io_buffer->size() << " bytes_written "
270 << request.io_buffer->BytesConsumed();
272 SetWriteState(WRITE_STATE_WRITE_COMPLETE);
274 int rv = socket_->Write(
275 request.io_buffer.get(), request.io_buffer->BytesRemaining(),
276 base::Bind(&CastTransportImpl::OnWriteResult, base::Unretained(this)));
277 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_WRITE, rv);
278 return rv;
281 int CastTransportImpl::DoWriteComplete(int result) {
282 VLOG_WITH_CONNECTION(2) << "DoWriteComplete result=" << result;
283 DCHECK(!write_queue_.empty());
284 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_WRITE, result);
285 if (result <= 0) { // NOTE that 0 also indicates an error
286 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR);
287 SetWriteState(WRITE_STATE_ERROR);
288 return result == 0 ? net::ERR_FAILED : result;
291 // Some bytes were successfully written
292 WriteRequest& request = write_queue_.front();
293 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer;
294 io_buffer->DidConsume(result);
295 if (io_buffer->BytesRemaining() == 0) { // Message fully sent
296 SetWriteState(WRITE_STATE_DO_CALLBACK);
297 } else {
298 SetWriteState(WRITE_STATE_WRITE);
301 return net::OK;
304 int CastTransportImpl::DoWriteCallback() {
305 VLOG_WITH_CONNECTION(2) << "DoWriteCallback";
306 DCHECK(!write_queue_.empty());
308 SetWriteState(WRITE_STATE_WRITE);
310 WriteRequest& request = write_queue_.front();
311 int bytes_consumed = request.io_buffer->BytesConsumed();
312 logger_->LogSocketEventForMessage(
313 channel_id_, proto::MESSAGE_WRITTEN, request.message_namespace,
314 base::StringPrintf("Bytes: %d", bytes_consumed));
315 request.callback.Run(net::OK);
316 write_queue_.pop();
317 return net::OK;
320 int CastTransportImpl::DoWriteError(int result) {
321 VLOG_WITH_CONNECTION(2) << "DoWriteError result=" << result;
322 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_);
323 DCHECK_LT(result, 0);
324 return net::ERR_FAILED;
327 void CastTransportImpl::Start() {
328 DCHECK(CalledOnValidThread());
329 DCHECK(!started_);
330 DCHECK(delegate_) << "Read delegate must be set prior to calling Start()";
331 delegate_->Start();
332 if (read_state_ == READ_STATE_NONE) {
333 // Initialize and run the read state machine.
334 SetReadState(READ_STATE_READ);
335 OnReadResult(net::OK);
337 started_ = true;
340 void CastTransportImpl::OnReadResult(int result) {
341 DCHECK(CalledOnValidThread());
342 // Network operations can either finish synchronously or asynchronously.
343 // This method executes the state machine transitions in a loop so that
344 // write state transitions happen even when network operations finish
345 // synchronously.
346 int rv = result;
347 do {
348 VLOG_WITH_CONNECTION(2) << "OnReadResult(state=" << read_state_
349 << ", result=" << rv << ")";
350 ReadState state = read_state_;
351 read_state_ = READ_STATE_NONE;
353 switch (state) {
354 case READ_STATE_READ:
355 rv = DoRead();
356 break;
357 case READ_STATE_READ_COMPLETE:
358 rv = DoReadComplete(rv);
359 break;
360 case READ_STATE_DO_CALLBACK:
361 rv = DoReadCallback();
362 break;
363 case READ_STATE_ERROR:
364 rv = DoReadError(rv);
365 DCHECK_EQ(read_state_, READ_STATE_NONE);
366 break;
367 default:
368 NOTREACHED() << "BUG in read flow. Unknown state: " << state;
369 break;
371 } while (rv != net::ERR_IO_PENDING && read_state_ != READ_STATE_NONE);
373 // No state change occurred in do-while loop above. This means state has
374 // transitioned to NONE.
375 if (read_state_ == READ_STATE_NONE) {
376 logger_->LogSocketReadState(channel_id_, ReadStateToProto(read_state_));
379 if (rv == net::ERR_FAILED) {
380 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_);
381 VLOG_WITH_CONNECTION(2) << "Sending OnError().";
382 delegate_->OnError(error_state_);
386 int CastTransportImpl::DoRead() {
387 VLOG_WITH_CONNECTION(2) << "DoRead";
388 SetReadState(READ_STATE_READ_COMPLETE);
390 // Determine how many bytes need to be read.
391 size_t num_bytes_to_read = framer_->BytesRequested();
392 DCHECK_GT(num_bytes_to_read, 0u);
394 // Read up to num_bytes_to_read into |current_read_buffer_|.
395 return socket_->Read(
396 read_buffer_.get(), base::checked_cast<uint32>(num_bytes_to_read),
397 base::Bind(&CastTransportImpl::OnReadResult, base::Unretained(this)));
400 int CastTransportImpl::DoReadComplete(int result) {
401 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result;
402 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_READ, result);
403 if (result <= 0) {
404 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket.";
405 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR);
406 SetReadState(READ_STATE_ERROR);
407 return result == 0 ? net::ERR_FAILED : result;
410 size_t message_size;
411 DCHECK(!current_message_);
412 ChannelError framing_error;
413 current_message_ = framer_->Ingest(result, &message_size, &framing_error);
414 if (current_message_.get() && (framing_error == CHANNEL_ERROR_NONE)) {
415 DCHECK_GT(message_size, static_cast<size_t>(0));
416 logger_->LogSocketEventForMessage(
417 channel_id_, proto::MESSAGE_READ, current_message_->namespace_(),
418 base::StringPrintf("Message size: %u",
419 static_cast<uint32>(message_size)));
420 SetReadState(READ_STATE_DO_CALLBACK);
421 } else if (framing_error != CHANNEL_ERROR_NONE) {
422 DCHECK(!current_message_);
423 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE);
424 SetReadState(READ_STATE_ERROR);
425 } else {
426 DCHECK(!current_message_);
427 SetReadState(READ_STATE_READ);
429 return net::OK;
432 int CastTransportImpl::DoReadCallback() {
433 VLOG_WITH_CONNECTION(2) << "DoReadCallback";
434 if (!IsCastMessageValid(*current_message_)) {
435 SetReadState(READ_STATE_ERROR);
436 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE);
437 return net::ERR_INVALID_RESPONSE;
439 SetReadState(READ_STATE_READ);
440 delegate_->OnMessage(*current_message_);
441 current_message_.reset();
442 return net::OK;
445 int CastTransportImpl::DoReadError(int result) {
446 VLOG_WITH_CONNECTION(2) << "DoReadError";
447 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_);
448 DCHECK_LE(result, 0);
449 return net::ERR_FAILED;
452 } // namespace cast_channel
453 } // namespace core_api
454 } // namespace extensions