1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "extensions/browser/api/cast_channel/cast_transport.h"
10 #include "base/format_macros.h"
11 #include "base/numerics/safe_conversions.h"
12 #include "base/strings/stringprintf.h"
13 #include "extensions/browser/api/cast_channel/cast_framer.h"
14 #include "extensions/browser/api/cast_channel/cast_message_util.h"
15 #include "extensions/browser/api/cast_channel/logger.h"
16 #include "extensions/browser/api/cast_channel/logger_util.h"
17 #include "extensions/common/api/cast_channel/cast_channel.pb.h"
18 #include "net/base/net_errors.h"
19 #include "net/socket/socket.h"
21 #define VLOG_WITH_CONNECTION(level) \
22 VLOG(level) << "[" << ip_endpoint_.ToString() << ", auth=" << channel_auth_ \
25 namespace extensions
{
27 namespace cast_channel
{
29 CastTransportImpl::CastTransportImpl(net::Socket
* socket
,
31 const net::IPEndPoint
& ip_endpoint
,
32 ChannelAuthType channel_auth
,
33 scoped_refptr
<Logger
> logger
)
36 write_state_(WRITE_STATE_NONE
),
37 read_state_(READ_STATE_NONE
),
38 error_state_(CHANNEL_ERROR_NONE
),
39 channel_id_(channel_id
),
40 ip_endpoint_(ip_endpoint
),
41 channel_auth_(channel_auth
),
45 // Buffer is reused across messages to minimize unnecessary buffer
47 read_buffer_
= new net::GrowableIOBuffer();
48 read_buffer_
->SetCapacity(MessageFramer::MessageHeader::max_message_size());
49 framer_
.reset(new MessageFramer(read_buffer_
));
52 CastTransportImpl::~CastTransportImpl() {
53 DCHECK(CalledOnValidThread());
58 proto::ReadState
CastTransportImpl::ReadStateToProto(
59 CastTransportImpl::ReadState state
) {
61 case CastTransportImpl::READ_STATE_NONE
:
62 return proto::READ_STATE_NONE
;
63 case CastTransportImpl::READ_STATE_READ
:
64 return proto::READ_STATE_READ
;
65 case CastTransportImpl::READ_STATE_READ_COMPLETE
:
66 return proto::READ_STATE_READ_COMPLETE
;
67 case CastTransportImpl::READ_STATE_DO_CALLBACK
:
68 return proto::READ_STATE_DO_CALLBACK
;
69 case CastTransportImpl::READ_STATE_ERROR
:
70 return proto::READ_STATE_ERROR
;
73 return proto::READ_STATE_NONE
;
78 proto::WriteState
CastTransportImpl::WriteStateToProto(
79 CastTransportImpl::WriteState state
) {
81 case CastTransportImpl::WRITE_STATE_NONE
:
82 return proto::WRITE_STATE_NONE
;
83 case CastTransportImpl::WRITE_STATE_WRITE
:
84 return proto::WRITE_STATE_WRITE
;
85 case CastTransportImpl::WRITE_STATE_WRITE_COMPLETE
:
86 return proto::WRITE_STATE_WRITE_COMPLETE
;
87 case CastTransportImpl::WRITE_STATE_DO_CALLBACK
:
88 return proto::WRITE_STATE_DO_CALLBACK
;
89 case CastTransportImpl::WRITE_STATE_ERROR
:
90 return proto::WRITE_STATE_ERROR
;
93 return proto::WRITE_STATE_NONE
;
98 proto::ErrorState
CastTransportImpl::ErrorStateToProto(ChannelError state
) {
100 case CHANNEL_ERROR_NONE
:
101 return proto::CHANNEL_ERROR_NONE
;
102 case CHANNEL_ERROR_CHANNEL_NOT_OPEN
:
103 return proto::CHANNEL_ERROR_CHANNEL_NOT_OPEN
;
104 case CHANNEL_ERROR_AUTHENTICATION_ERROR
:
105 return proto::CHANNEL_ERROR_AUTHENTICATION_ERROR
;
106 case CHANNEL_ERROR_CONNECT_ERROR
:
107 return proto::CHANNEL_ERROR_CONNECT_ERROR
;
108 case CHANNEL_ERROR_SOCKET_ERROR
:
109 return proto::CHANNEL_ERROR_SOCKET_ERROR
;
110 case CHANNEL_ERROR_TRANSPORT_ERROR
:
111 return proto::CHANNEL_ERROR_TRANSPORT_ERROR
;
112 case CHANNEL_ERROR_INVALID_MESSAGE
:
113 return proto::CHANNEL_ERROR_INVALID_MESSAGE
;
114 case CHANNEL_ERROR_INVALID_CHANNEL_ID
:
115 return proto::CHANNEL_ERROR_INVALID_CHANNEL_ID
;
116 case CHANNEL_ERROR_CONNECT_TIMEOUT
:
117 return proto::CHANNEL_ERROR_CONNECT_TIMEOUT
;
118 case CHANNEL_ERROR_UNKNOWN
:
119 return proto::CHANNEL_ERROR_UNKNOWN
;
122 return proto::CHANNEL_ERROR_NONE
;
126 void CastTransportImpl::SetReadDelegate(scoped_ptr
<Delegate
> delegate
) {
127 DCHECK(CalledOnValidThread());
129 delegate_
= delegate
.Pass();
135 void CastTransportImpl::FlushWriteQueue() {
136 for (; !write_queue_
.empty(); write_queue_
.pop()) {
137 net::CompletionCallback
& callback
= write_queue_
.front().callback
;
138 callback
.Run(net::ERR_FAILED
);
143 void CastTransportImpl::SendMessage(const CastMessage
& message
,
144 const net::CompletionCallback
& callback
) {
145 DCHECK(CalledOnValidThread());
146 std::string serialized_message
;
147 if (!MessageFramer::Serialize(message
, &serialized_message
)) {
148 logger_
->LogSocketEventForMessage(channel_id_
, proto::SEND_MESSAGE_FAILED
,
149 message
.namespace_(),
150 "Error when serializing message.");
151 callback
.Run(net::ERR_FAILED
);
154 WriteRequest
write_request(
155 message
.namespace_(), serialized_message
, callback
);
157 write_queue_
.push(write_request
);
158 logger_
->LogSocketEventForMessage(
159 channel_id_
, proto::MESSAGE_ENQUEUED
, message
.namespace_(),
160 base::StringPrintf("Queue size: %" PRIuS
, write_queue_
.size()));
161 if (write_state_
== WRITE_STATE_NONE
) {
162 SetWriteState(WRITE_STATE_WRITE
);
163 OnWriteResult(net::OK
);
167 CastTransportImpl::WriteRequest::WriteRequest(
168 const std::string
& namespace_
,
169 const std::string
& payload
,
170 const net::CompletionCallback
& callback
)
171 : message_namespace(namespace_
), callback(callback
) {
172 VLOG(2) << "WriteRequest size: " << payload
.size();
173 io_buffer
= new net::DrainableIOBuffer(new net::StringIOBuffer(payload
),
177 CastTransportImpl::WriteRequest::~WriteRequest() {
180 void CastTransportImpl::SetReadState(ReadState read_state
) {
181 if (read_state_
!= read_state
) {
182 read_state_
= read_state
;
183 logger_
->LogSocketReadState(channel_id_
, ReadStateToProto(read_state_
));
187 void CastTransportImpl::SetWriteState(WriteState write_state
) {
188 if (write_state_
!= write_state
) {
189 write_state_
= write_state
;
190 logger_
->LogSocketWriteState(channel_id_
, WriteStateToProto(write_state_
));
194 void CastTransportImpl::SetErrorState(ChannelError error_state
) {
195 VLOG_WITH_CONNECTION(2) << "SetErrorState: " << error_state
;
196 error_state_
= error_state
;
199 void CastTransportImpl::OnWriteResult(int result
) {
200 DCHECK(CalledOnValidThread());
201 if (write_queue_
.empty()) {
202 SetWriteState(WRITE_STATE_NONE
);
206 // Network operations can either finish synchronously or asynchronously.
207 // This method executes the state machine transitions in a loop so that
208 // write state transitions happen even when network operations finish
212 VLOG_WITH_CONNECTION(2) << "OnWriteResult (state=" << write_state_
<< ", "
213 << "result=" << rv
<< ", "
214 << "queue size=" << write_queue_
.size() << ")";
216 WriteState state
= write_state_
;
217 write_state_
= WRITE_STATE_NONE
;
219 case WRITE_STATE_WRITE
:
222 case WRITE_STATE_WRITE_COMPLETE
:
223 rv
= DoWriteComplete(rv
);
225 case WRITE_STATE_DO_CALLBACK
:
226 rv
= DoWriteCallback();
228 case WRITE_STATE_ERROR
:
229 rv
= DoWriteError(rv
);
232 NOTREACHED() << "BUG in write flow. Unknown state: " << state
;
235 } while (!write_queue_
.empty() && rv
!= net::ERR_IO_PENDING
&&
236 write_state_
!= WRITE_STATE_NONE
);
238 // No state change occurred in the switch case above. This means the
239 // write operation has completed.
240 if (write_state_
== WRITE_STATE_NONE
) {
241 logger_
->LogSocketWriteState(channel_id_
, WriteStateToProto(write_state_
));
244 if (rv
== net::ERR_FAILED
) {
245 VLOG_WITH_CONNECTION(2) << "Sending OnError().";
246 DCHECK_NE(CHANNEL_ERROR_NONE
, error_state_
);
247 delegate_
->OnError(error_state_
);
250 // If write loop is done because the queue is empty then set write
252 if (write_queue_
.empty()) {
253 SetWriteState(WRITE_STATE_NONE
);
256 // Write loop is done - flush the remaining write operations if an error
258 if (rv
== net::ERR_FAILED
) {
259 DCHECK_NE(CHANNEL_ERROR_NONE
, error_state_
);
264 int CastTransportImpl::DoWrite() {
265 DCHECK(!write_queue_
.empty());
266 WriteRequest
& request
= write_queue_
.front();
268 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = "
269 << request
.io_buffer
->size() << " bytes_written "
270 << request
.io_buffer
->BytesConsumed();
272 SetWriteState(WRITE_STATE_WRITE_COMPLETE
);
274 int rv
= socket_
->Write(
275 request
.io_buffer
.get(), request
.io_buffer
->BytesRemaining(),
276 base::Bind(&CastTransportImpl::OnWriteResult
, base::Unretained(this)));
277 logger_
->LogSocketEventWithRv(channel_id_
, proto::SOCKET_WRITE
, rv
);
281 int CastTransportImpl::DoWriteComplete(int result
) {
282 VLOG_WITH_CONNECTION(2) << "DoWriteComplete result=" << result
;
283 DCHECK(!write_queue_
.empty());
284 logger_
->LogSocketEventWithRv(channel_id_
, proto::SOCKET_WRITE
, result
);
285 if (result
<= 0) { // NOTE that 0 also indicates an error
286 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR
);
287 SetWriteState(WRITE_STATE_ERROR
);
288 return result
== 0 ? net::ERR_FAILED
: result
;
291 // Some bytes were successfully written
292 WriteRequest
& request
= write_queue_
.front();
293 scoped_refptr
<net::DrainableIOBuffer
> io_buffer
= request
.io_buffer
;
294 io_buffer
->DidConsume(result
);
295 if (io_buffer
->BytesRemaining() == 0) { // Message fully sent
296 SetWriteState(WRITE_STATE_DO_CALLBACK
);
298 SetWriteState(WRITE_STATE_WRITE
);
304 int CastTransportImpl::DoWriteCallback() {
305 VLOG_WITH_CONNECTION(2) << "DoWriteCallback";
306 DCHECK(!write_queue_
.empty());
308 SetWriteState(WRITE_STATE_WRITE
);
310 WriteRequest
& request
= write_queue_
.front();
311 int bytes_consumed
= request
.io_buffer
->BytesConsumed();
312 logger_
->LogSocketEventForMessage(
313 channel_id_
, proto::MESSAGE_WRITTEN
, request
.message_namespace
,
314 base::StringPrintf("Bytes: %d", bytes_consumed
));
315 request
.callback
.Run(net::OK
);
320 int CastTransportImpl::DoWriteError(int result
) {
321 VLOG_WITH_CONNECTION(2) << "DoWriteError result=" << result
;
322 DCHECK_NE(CHANNEL_ERROR_NONE
, error_state_
);
323 DCHECK_LT(result
, 0);
324 return net::ERR_FAILED
;
327 void CastTransportImpl::Start() {
328 DCHECK(CalledOnValidThread());
330 DCHECK(delegate_
) << "Read delegate must be set prior to calling Start()";
332 if (read_state_
== READ_STATE_NONE
) {
333 // Initialize and run the read state machine.
334 SetReadState(READ_STATE_READ
);
335 OnReadResult(net::OK
);
340 void CastTransportImpl::OnReadResult(int result
) {
341 DCHECK(CalledOnValidThread());
342 // Network operations can either finish synchronously or asynchronously.
343 // This method executes the state machine transitions in a loop so that
344 // write state transitions happen even when network operations finish
348 VLOG_WITH_CONNECTION(2) << "OnReadResult(state=" << read_state_
349 << ", result=" << rv
<< ")";
350 ReadState state
= read_state_
;
351 read_state_
= READ_STATE_NONE
;
354 case READ_STATE_READ
:
357 case READ_STATE_READ_COMPLETE
:
358 rv
= DoReadComplete(rv
);
360 case READ_STATE_DO_CALLBACK
:
361 rv
= DoReadCallback();
363 case READ_STATE_ERROR
:
364 rv
= DoReadError(rv
);
365 DCHECK_EQ(read_state_
, READ_STATE_NONE
);
368 NOTREACHED() << "BUG in read flow. Unknown state: " << state
;
371 } while (rv
!= net::ERR_IO_PENDING
&& read_state_
!= READ_STATE_NONE
);
373 // No state change occurred in do-while loop above. This means state has
374 // transitioned to NONE.
375 if (read_state_
== READ_STATE_NONE
) {
376 logger_
->LogSocketReadState(channel_id_
, ReadStateToProto(read_state_
));
379 if (rv
== net::ERR_FAILED
) {
380 DCHECK_NE(CHANNEL_ERROR_NONE
, error_state_
);
381 VLOG_WITH_CONNECTION(2) << "Sending OnError().";
382 delegate_
->OnError(error_state_
);
386 int CastTransportImpl::DoRead() {
387 VLOG_WITH_CONNECTION(2) << "DoRead";
388 SetReadState(READ_STATE_READ_COMPLETE
);
390 // Determine how many bytes need to be read.
391 size_t num_bytes_to_read
= framer_
->BytesRequested();
392 DCHECK_GT(num_bytes_to_read
, 0u);
394 // Read up to num_bytes_to_read into |current_read_buffer_|.
395 return socket_
->Read(
396 read_buffer_
.get(), base::checked_cast
<uint32
>(num_bytes_to_read
),
397 base::Bind(&CastTransportImpl::OnReadResult
, base::Unretained(this)));
400 int CastTransportImpl::DoReadComplete(int result
) {
401 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result
;
402 logger_
->LogSocketEventWithRv(channel_id_
, proto::SOCKET_READ
, result
);
404 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket.";
405 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR
);
406 SetReadState(READ_STATE_ERROR
);
407 return result
== 0 ? net::ERR_FAILED
: result
;
411 DCHECK(!current_message_
);
412 ChannelError framing_error
;
413 current_message_
= framer_
->Ingest(result
, &message_size
, &framing_error
);
414 if (current_message_
.get() && (framing_error
== CHANNEL_ERROR_NONE
)) {
415 DCHECK_GT(message_size
, static_cast<size_t>(0));
416 logger_
->LogSocketEventForMessage(
417 channel_id_
, proto::MESSAGE_READ
, current_message_
->namespace_(),
418 base::StringPrintf("Message size: %u",
419 static_cast<uint32
>(message_size
)));
420 SetReadState(READ_STATE_DO_CALLBACK
);
421 } else if (framing_error
!= CHANNEL_ERROR_NONE
) {
422 DCHECK(!current_message_
);
423 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE
);
424 SetReadState(READ_STATE_ERROR
);
426 DCHECK(!current_message_
);
427 SetReadState(READ_STATE_READ
);
432 int CastTransportImpl::DoReadCallback() {
433 VLOG_WITH_CONNECTION(2) << "DoReadCallback";
434 if (!IsCastMessageValid(*current_message_
)) {
435 SetReadState(READ_STATE_ERROR
);
436 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE
);
437 return net::ERR_INVALID_RESPONSE
;
439 SetReadState(READ_STATE_READ
);
440 delegate_
->OnMessage(*current_message_
);
441 current_message_
.reset();
445 int CastTransportImpl::DoReadError(int result
) {
446 VLOG_WITH_CONNECTION(2) << "DoReadError";
447 DCHECK_NE(CHANNEL_ERROR_NONE
, error_state_
);
448 DCHECK_LE(result
, 0);
449 return net::ERR_FAILED
;
452 } // namespace cast_channel
453 } // namespace core_api
454 } // namespace extensions