1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "extensions/browser/api/cast_channel/cast_transport.h"
10 #include "base/format_macros.h"
11 #include "base/numerics/safe_conversions.h"
12 #include "base/strings/stringprintf.h"
13 #include "extensions/browser/api/cast_channel/cast_framer.h"
14 #include "extensions/browser/api/cast_channel/cast_message_util.h"
15 #include "extensions/browser/api/cast_channel/logger.h"
16 #include "extensions/browser/api/cast_channel/logger_util.h"
17 #include "extensions/common/api/cast_channel/cast_channel.pb.h"
18 #include "net/base/net_errors.h"
19 #include "net/socket/socket.h"
21 #define VLOG_WITH_CONNECTION(level) \
22 VLOG(level) << "[" << ip_endpoint_.ToString() << ", auth=" << channel_auth_ \
25 namespace extensions
{
27 namespace cast_channel
{
29 CastTransportImpl::CastTransportImpl(net::Socket
* socket
,
31 const net::IPEndPoint
& ip_endpoint
,
32 ChannelAuthType channel_auth
,
33 scoped_refptr
<Logger
> logger
)
36 write_state_(WRITE_STATE_NONE
),
37 read_state_(READ_STATE_NONE
),
38 error_state_(CHANNEL_ERROR_NONE
),
39 channel_id_(channel_id
),
40 ip_endpoint_(ip_endpoint
),
41 channel_auth_(channel_auth
),
45 // Buffer is reused across messages to minimize unnecessary buffer
47 read_buffer_
= new net::GrowableIOBuffer();
48 read_buffer_
->SetCapacity(MessageFramer::MessageHeader::max_message_size());
49 framer_
.reset(new MessageFramer(read_buffer_
));
52 CastTransportImpl::~CastTransportImpl() {
53 DCHECK(CalledOnValidThread());
58 proto::ReadState
CastTransportImpl::ReadStateToProto(
59 CastTransportImpl::ReadState state
) {
61 case CastTransportImpl::READ_STATE_NONE
:
62 return proto::READ_STATE_NONE
;
63 case CastTransportImpl::READ_STATE_READ
:
64 return proto::READ_STATE_READ
;
65 case CastTransportImpl::READ_STATE_READ_COMPLETE
:
66 return proto::READ_STATE_READ_COMPLETE
;
67 case CastTransportImpl::READ_STATE_DO_CALLBACK
:
68 return proto::READ_STATE_DO_CALLBACK
;
69 case CastTransportImpl::READ_STATE_ERROR
:
70 return proto::READ_STATE_ERROR
;
73 return proto::READ_STATE_NONE
;
78 proto::WriteState
CastTransportImpl::WriteStateToProto(
79 CastTransportImpl::WriteState state
) {
81 case CastTransportImpl::WRITE_STATE_NONE
:
82 return proto::WRITE_STATE_NONE
;
83 case CastTransportImpl::WRITE_STATE_WRITE
:
84 return proto::WRITE_STATE_WRITE
;
85 case CastTransportImpl::WRITE_STATE_WRITE_COMPLETE
:
86 return proto::WRITE_STATE_WRITE_COMPLETE
;
87 case CastTransportImpl::WRITE_STATE_DO_CALLBACK
:
88 return proto::WRITE_STATE_DO_CALLBACK
;
89 case CastTransportImpl::WRITE_STATE_ERROR
:
90 return proto::WRITE_STATE_ERROR
;
93 return proto::WRITE_STATE_NONE
;
98 proto::ErrorState
CastTransportImpl::ErrorStateToProto(ChannelError state
) {
100 case CHANNEL_ERROR_NONE
:
101 return proto::CHANNEL_ERROR_NONE
;
102 case CHANNEL_ERROR_CHANNEL_NOT_OPEN
:
103 return proto::CHANNEL_ERROR_CHANNEL_NOT_OPEN
;
104 case CHANNEL_ERROR_AUTHENTICATION_ERROR
:
105 return proto::CHANNEL_ERROR_AUTHENTICATION_ERROR
;
106 case CHANNEL_ERROR_CONNECT_ERROR
:
107 return proto::CHANNEL_ERROR_CONNECT_ERROR
;
108 case CHANNEL_ERROR_SOCKET_ERROR
:
109 return proto::CHANNEL_ERROR_SOCKET_ERROR
;
110 case CHANNEL_ERROR_TRANSPORT_ERROR
:
111 return proto::CHANNEL_ERROR_TRANSPORT_ERROR
;
112 case CHANNEL_ERROR_INVALID_MESSAGE
:
113 return proto::CHANNEL_ERROR_INVALID_MESSAGE
;
114 case CHANNEL_ERROR_INVALID_CHANNEL_ID
:
115 return proto::CHANNEL_ERROR_INVALID_CHANNEL_ID
;
116 case CHANNEL_ERROR_CONNECT_TIMEOUT
:
117 return proto::CHANNEL_ERROR_CONNECT_TIMEOUT
;
118 case CHANNEL_ERROR_UNKNOWN
:
119 return proto::CHANNEL_ERROR_UNKNOWN
;
122 return proto::CHANNEL_ERROR_NONE
;
126 void CastTransportImpl::SetReadDelegate(scoped_ptr
<Delegate
> delegate
) {
127 DCHECK(CalledOnValidThread());
129 delegate_
= delegate
.Pass();
135 void CastTransportImpl::FlushWriteQueue() {
136 for (; !write_queue_
.empty(); write_queue_
.pop()) {
137 net::CompletionCallback
& callback
= write_queue_
.front().callback
;
138 callback
.Run(net::ERR_FAILED
);
143 void CastTransportImpl::SendMessage(const CastMessage
& message
,
144 const net::CompletionCallback
& callback
) {
145 DCHECK(CalledOnValidThread());
146 std::string serialized_message
;
147 if (!MessageFramer::Serialize(message
, &serialized_message
)) {
148 logger_
->LogSocketEventForMessage(channel_id_
, proto::SEND_MESSAGE_FAILED
,
149 message
.namespace_(),
150 "Error when serializing message.");
151 callback
.Run(net::ERR_FAILED
);
154 WriteRequest
write_request(
155 message
.namespace_(), serialized_message
, callback
);
157 write_queue_
.push(write_request
);
158 logger_
->LogSocketEventForMessage(
159 channel_id_
, proto::MESSAGE_ENQUEUED
, message
.namespace_(),
160 base::StringPrintf("Queue size: %" PRIuS
, write_queue_
.size()));
161 if (write_state_
== WRITE_STATE_NONE
) {
162 SetWriteState(WRITE_STATE_WRITE
);
163 OnWriteResult(net::OK
);
167 CastTransportImpl::WriteRequest::WriteRequest(
168 const std::string
& namespace_
,
169 const std::string
& payload
,
170 const net::CompletionCallback
& callback
)
171 : message_namespace(namespace_
), callback(callback
) {
172 VLOG(2) << "WriteRequest size: " << payload
.size();
173 io_buffer
= new net::DrainableIOBuffer(new net::StringIOBuffer(payload
),
177 CastTransportImpl::WriteRequest::~WriteRequest() {
180 void CastTransportImpl::SetReadState(ReadState read_state
) {
181 if (read_state_
!= read_state
) {
182 read_state_
= read_state
;
183 logger_
->LogSocketReadState(channel_id_
, ReadStateToProto(read_state_
));
187 void CastTransportImpl::SetWriteState(WriteState write_state
) {
188 if (write_state_
!= write_state
) {
189 write_state_
= write_state
;
190 logger_
->LogSocketWriteState(channel_id_
, WriteStateToProto(write_state_
));
194 void CastTransportImpl::SetErrorState(ChannelError error_state
) {
195 VLOG_WITH_CONNECTION(2) << "SetErrorState: " << error_state
;
196 error_state_
= error_state
;
199 void CastTransportImpl::OnWriteResult(int result
) {
200 DCHECK(CalledOnValidThread());
201 if (write_queue_
.empty()) {
202 SetWriteState(WRITE_STATE_NONE
);
206 // Network operations can either finish synchronously or asynchronously.
207 // This method executes the state machine transitions in a loop so that
208 // write state transitions happen even when network operations finish
212 VLOG_WITH_CONNECTION(2) << "OnWriteResult (state=" << write_state_
<< ", "
213 << "result=" << rv
<< ", "
214 << "queue size=" << write_queue_
.size() << ")";
216 WriteState state
= write_state_
;
217 write_state_
= WRITE_STATE_NONE
;
219 case WRITE_STATE_WRITE
:
222 case WRITE_STATE_WRITE_COMPLETE
:
223 rv
= DoWriteComplete(rv
);
225 case WRITE_STATE_DO_CALLBACK
:
226 rv
= DoWriteCallback();
228 case WRITE_STATE_ERROR
:
229 rv
= DoWriteError(rv
);
232 NOTREACHED() << "BUG in write flow. Unknown state: " << state
;
235 } while (!write_queue_
.empty() && rv
!= net::ERR_IO_PENDING
&&
236 write_state_
!= WRITE_STATE_NONE
);
238 // No state change occurred in the switch case above. This means the
239 // write operation has completed.
240 if (write_state_
== WRITE_STATE_NONE
) {
241 logger_
->LogSocketWriteState(channel_id_
, WriteStateToProto(write_state_
));
244 if (rv
== net::ERR_FAILED
) {
245 VLOG_WITH_CONNECTION(2) << "Sending OnError().";
246 DCHECK_NE(CHANNEL_ERROR_NONE
, error_state_
);
247 delegate_
->OnError(error_state_
);
250 // If write loop is done because the queue is empty then set write
252 if (write_queue_
.empty()) {
253 SetWriteState(WRITE_STATE_NONE
);
256 // Write loop is done - flush the remaining write operations if an error
258 if (rv
== net::ERR_FAILED
) {
259 DCHECK_NE(CHANNEL_ERROR_NONE
, error_state_
);
264 int CastTransportImpl::DoWrite() {
265 DCHECK(!write_queue_
.empty());
266 WriteRequest
& request
= write_queue_
.front();
268 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = "
269 << request
.io_buffer
->size() << " bytes_written "
270 << request
.io_buffer
->BytesConsumed();
272 SetWriteState(WRITE_STATE_WRITE_COMPLETE
);
274 int rv
= socket_
->Write(
275 request
.io_buffer
.get(), request
.io_buffer
->BytesRemaining(),
276 base::Bind(&CastTransportImpl::OnWriteResult
, base::Unretained(this)));
280 int CastTransportImpl::DoWriteComplete(int result
) {
281 VLOG_WITH_CONNECTION(2) << "DoWriteComplete result=" << result
;
282 DCHECK(!write_queue_
.empty());
283 logger_
->LogSocketEventWithRv(channel_id_
, proto::SOCKET_WRITE
, result
);
284 if (result
<= 0) { // NOTE that 0 also indicates an error
285 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR
);
286 SetWriteState(WRITE_STATE_ERROR
);
287 return result
== 0 ? net::ERR_FAILED
: result
;
290 // Some bytes were successfully written
291 WriteRequest
& request
= write_queue_
.front();
292 scoped_refptr
<net::DrainableIOBuffer
> io_buffer
= request
.io_buffer
;
293 io_buffer
->DidConsume(result
);
294 if (io_buffer
->BytesRemaining() == 0) { // Message fully sent
295 SetWriteState(WRITE_STATE_DO_CALLBACK
);
297 SetWriteState(WRITE_STATE_WRITE
);
303 int CastTransportImpl::DoWriteCallback() {
304 VLOG_WITH_CONNECTION(2) << "DoWriteCallback";
305 DCHECK(!write_queue_
.empty());
307 SetWriteState(WRITE_STATE_WRITE
);
309 WriteRequest
& request
= write_queue_
.front();
310 int bytes_consumed
= request
.io_buffer
->BytesConsumed();
311 logger_
->LogSocketEventForMessage(
312 channel_id_
, proto::MESSAGE_WRITTEN
, request
.message_namespace
,
313 base::StringPrintf("Bytes: %d", bytes_consumed
));
314 request
.callback
.Run(net::OK
);
319 int CastTransportImpl::DoWriteError(int result
) {
320 VLOG_WITH_CONNECTION(2) << "DoWriteError result=" << result
;
321 DCHECK_NE(CHANNEL_ERROR_NONE
, error_state_
);
322 DCHECK_LT(result
, 0);
323 return net::ERR_FAILED
;
326 void CastTransportImpl::Start() {
327 DCHECK(CalledOnValidThread());
329 DCHECK(delegate_
) << "Read delegate must be set prior to calling Start()";
331 if (read_state_
== READ_STATE_NONE
) {
332 // Initialize and run the read state machine.
333 SetReadState(READ_STATE_READ
);
334 OnReadResult(net::OK
);
339 void CastTransportImpl::OnReadResult(int result
) {
340 DCHECK(CalledOnValidThread());
341 // Network operations can either finish synchronously or asynchronously.
342 // This method executes the state machine transitions in a loop so that
343 // write state transitions happen even when network operations finish
347 VLOG_WITH_CONNECTION(2) << "OnReadResult(state=" << read_state_
348 << ", result=" << rv
<< ")";
349 ReadState state
= read_state_
;
350 read_state_
= READ_STATE_NONE
;
353 case READ_STATE_READ
:
356 case READ_STATE_READ_COMPLETE
:
357 rv
= DoReadComplete(rv
);
359 case READ_STATE_DO_CALLBACK
:
360 rv
= DoReadCallback();
362 case READ_STATE_ERROR
:
363 rv
= DoReadError(rv
);
364 DCHECK_EQ(read_state_
, READ_STATE_NONE
);
367 NOTREACHED() << "BUG in read flow. Unknown state: " << state
;
370 } while (rv
!= net::ERR_IO_PENDING
&& read_state_
!= READ_STATE_NONE
);
372 // No state change occurred in do-while loop above. This means state has
373 // transitioned to NONE.
374 if (read_state_
== READ_STATE_NONE
) {
375 logger_
->LogSocketReadState(channel_id_
, ReadStateToProto(read_state_
));
378 if (rv
== net::ERR_FAILED
) {
379 DCHECK_NE(CHANNEL_ERROR_NONE
, error_state_
);
380 VLOG_WITH_CONNECTION(2) << "Sending OnError().";
381 delegate_
->OnError(error_state_
);
385 int CastTransportImpl::DoRead() {
386 VLOG_WITH_CONNECTION(2) << "DoRead";
387 SetReadState(READ_STATE_READ_COMPLETE
);
389 // Determine how many bytes need to be read.
390 size_t num_bytes_to_read
= framer_
->BytesRequested();
391 DCHECK_GT(num_bytes_to_read
, 0u);
393 // Read up to num_bytes_to_read into |current_read_buffer_|.
394 return socket_
->Read(
395 read_buffer_
.get(), base::checked_cast
<uint32
>(num_bytes_to_read
),
396 base::Bind(&CastTransportImpl::OnReadResult
, base::Unretained(this)));
399 int CastTransportImpl::DoReadComplete(int result
) {
400 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result
;
401 logger_
->LogSocketEventWithRv(channel_id_
, proto::SOCKET_READ
, result
);
403 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket.";
404 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR
);
405 SetReadState(READ_STATE_ERROR
);
406 return result
== 0 ? net::ERR_FAILED
: result
;
410 DCHECK(!current_message_
);
411 ChannelError framing_error
;
412 current_message_
= framer_
->Ingest(result
, &message_size
, &framing_error
);
413 if (current_message_
.get() && (framing_error
== CHANNEL_ERROR_NONE
)) {
414 DCHECK_GT(message_size
, static_cast<size_t>(0));
415 logger_
->LogSocketEventForMessage(
416 channel_id_
, proto::MESSAGE_READ
, current_message_
->namespace_(),
417 base::StringPrintf("Message size: %u",
418 static_cast<uint32
>(message_size
)));
419 SetReadState(READ_STATE_DO_CALLBACK
);
420 } else if (framing_error
!= CHANNEL_ERROR_NONE
) {
421 DCHECK(!current_message_
);
422 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE
);
423 SetReadState(READ_STATE_ERROR
);
425 DCHECK(!current_message_
);
426 SetReadState(READ_STATE_READ
);
431 int CastTransportImpl::DoReadCallback() {
432 VLOG_WITH_CONNECTION(2) << "DoReadCallback";
433 if (!IsCastMessageValid(*current_message_
)) {
434 SetReadState(READ_STATE_ERROR
);
435 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE
);
436 return net::ERR_INVALID_RESPONSE
;
438 SetReadState(READ_STATE_READ
);
439 delegate_
->OnMessage(*current_message_
);
440 current_message_
.reset();
444 int CastTransportImpl::DoReadError(int result
) {
445 VLOG_WITH_CONNECTION(2) << "DoReadError";
446 DCHECK_NE(CHANNEL_ERROR_NONE
, error_state_
);
447 DCHECK_LE(result
, 0);
448 return net::ERR_FAILED
;
451 } // namespace cast_channel
453 } // namespace extensions