Remove duplicate SOCKET_WRITE logging event from Cast Transport.
[chromium-blink-merge.git] / extensions / browser / api / cast_channel / cast_transport.cc
blob6022a9f2b882781348c94c0e1aa680bc61cef60f
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "extensions/browser/api/cast_channel/cast_transport.h"
7 #include <string>
9 #include "base/bind.h"
10 #include "base/format_macros.h"
11 #include "base/numerics/safe_conversions.h"
12 #include "base/strings/stringprintf.h"
13 #include "extensions/browser/api/cast_channel/cast_framer.h"
14 #include "extensions/browser/api/cast_channel/cast_message_util.h"
15 #include "extensions/browser/api/cast_channel/logger.h"
16 #include "extensions/browser/api/cast_channel/logger_util.h"
17 #include "extensions/common/api/cast_channel/cast_channel.pb.h"
18 #include "net/base/net_errors.h"
19 #include "net/socket/socket.h"
21 #define VLOG_WITH_CONNECTION(level) \
22 VLOG(level) << "[" << ip_endpoint_.ToString() << ", auth=" << channel_auth_ \
23 << "] "
25 namespace extensions {
26 namespace core_api {
27 namespace cast_channel {
29 CastTransportImpl::CastTransportImpl(net::Socket* socket,
30 int channel_id,
31 const net::IPEndPoint& ip_endpoint,
32 ChannelAuthType channel_auth,
33 scoped_refptr<Logger> logger)
34 : started_(false),
35 socket_(socket),
36 write_state_(WRITE_STATE_NONE),
37 read_state_(READ_STATE_NONE),
38 error_state_(CHANNEL_ERROR_NONE),
39 channel_id_(channel_id),
40 ip_endpoint_(ip_endpoint),
41 channel_auth_(channel_auth),
42 logger_(logger) {
43 DCHECK(socket);
45 // Buffer is reused across messages to minimize unnecessary buffer
46 // [re]allocations.
47 read_buffer_ = new net::GrowableIOBuffer();
48 read_buffer_->SetCapacity(MessageFramer::MessageHeader::max_message_size());
49 framer_.reset(new MessageFramer(read_buffer_));
52 CastTransportImpl::~CastTransportImpl() {
53 DCHECK(CalledOnValidThread());
54 FlushWriteQueue();
57 // static
58 proto::ReadState CastTransportImpl::ReadStateToProto(
59 CastTransportImpl::ReadState state) {
60 switch (state) {
61 case CastTransportImpl::READ_STATE_NONE:
62 return proto::READ_STATE_NONE;
63 case CastTransportImpl::READ_STATE_READ:
64 return proto::READ_STATE_READ;
65 case CastTransportImpl::READ_STATE_READ_COMPLETE:
66 return proto::READ_STATE_READ_COMPLETE;
67 case CastTransportImpl::READ_STATE_DO_CALLBACK:
68 return proto::READ_STATE_DO_CALLBACK;
69 case CastTransportImpl::READ_STATE_ERROR:
70 return proto::READ_STATE_ERROR;
71 default:
72 NOTREACHED();
73 return proto::READ_STATE_NONE;
77 // static
78 proto::WriteState CastTransportImpl::WriteStateToProto(
79 CastTransportImpl::WriteState state) {
80 switch (state) {
81 case CastTransportImpl::WRITE_STATE_NONE:
82 return proto::WRITE_STATE_NONE;
83 case CastTransportImpl::WRITE_STATE_WRITE:
84 return proto::WRITE_STATE_WRITE;
85 case CastTransportImpl::WRITE_STATE_WRITE_COMPLETE:
86 return proto::WRITE_STATE_WRITE_COMPLETE;
87 case CastTransportImpl::WRITE_STATE_DO_CALLBACK:
88 return proto::WRITE_STATE_DO_CALLBACK;
89 case CastTransportImpl::WRITE_STATE_ERROR:
90 return proto::WRITE_STATE_ERROR;
91 default:
92 NOTREACHED();
93 return proto::WRITE_STATE_NONE;
97 // static
98 proto::ErrorState CastTransportImpl::ErrorStateToProto(ChannelError state) {
99 switch (state) {
100 case CHANNEL_ERROR_NONE:
101 return proto::CHANNEL_ERROR_NONE;
102 case CHANNEL_ERROR_CHANNEL_NOT_OPEN:
103 return proto::CHANNEL_ERROR_CHANNEL_NOT_OPEN;
104 case CHANNEL_ERROR_AUTHENTICATION_ERROR:
105 return proto::CHANNEL_ERROR_AUTHENTICATION_ERROR;
106 case CHANNEL_ERROR_CONNECT_ERROR:
107 return proto::CHANNEL_ERROR_CONNECT_ERROR;
108 case CHANNEL_ERROR_SOCKET_ERROR:
109 return proto::CHANNEL_ERROR_SOCKET_ERROR;
110 case CHANNEL_ERROR_TRANSPORT_ERROR:
111 return proto::CHANNEL_ERROR_TRANSPORT_ERROR;
112 case CHANNEL_ERROR_INVALID_MESSAGE:
113 return proto::CHANNEL_ERROR_INVALID_MESSAGE;
114 case CHANNEL_ERROR_INVALID_CHANNEL_ID:
115 return proto::CHANNEL_ERROR_INVALID_CHANNEL_ID;
116 case CHANNEL_ERROR_CONNECT_TIMEOUT:
117 return proto::CHANNEL_ERROR_CONNECT_TIMEOUT;
118 case CHANNEL_ERROR_UNKNOWN:
119 return proto::CHANNEL_ERROR_UNKNOWN;
120 default:
121 NOTREACHED();
122 return proto::CHANNEL_ERROR_NONE;
126 void CastTransportImpl::SetReadDelegate(scoped_ptr<Delegate> delegate) {
127 DCHECK(CalledOnValidThread());
128 DCHECK(delegate);
129 delegate_ = delegate.Pass();
130 if (started_) {
131 delegate_->Start();
135 void CastTransportImpl::FlushWriteQueue() {
136 for (; !write_queue_.empty(); write_queue_.pop()) {
137 net::CompletionCallback& callback = write_queue_.front().callback;
138 callback.Run(net::ERR_FAILED);
139 callback.Reset();
143 void CastTransportImpl::SendMessage(const CastMessage& message,
144 const net::CompletionCallback& callback) {
145 DCHECK(CalledOnValidThread());
146 std::string serialized_message;
147 if (!MessageFramer::Serialize(message, &serialized_message)) {
148 logger_->LogSocketEventForMessage(channel_id_, proto::SEND_MESSAGE_FAILED,
149 message.namespace_(),
150 "Error when serializing message.");
151 callback.Run(net::ERR_FAILED);
152 return;
154 WriteRequest write_request(
155 message.namespace_(), serialized_message, callback);
157 write_queue_.push(write_request);
158 logger_->LogSocketEventForMessage(
159 channel_id_, proto::MESSAGE_ENQUEUED, message.namespace_(),
160 base::StringPrintf("Queue size: %" PRIuS, write_queue_.size()));
161 if (write_state_ == WRITE_STATE_NONE) {
162 SetWriteState(WRITE_STATE_WRITE);
163 OnWriteResult(net::OK);
167 CastTransportImpl::WriteRequest::WriteRequest(
168 const std::string& namespace_,
169 const std::string& payload,
170 const net::CompletionCallback& callback)
171 : message_namespace(namespace_), callback(callback) {
172 VLOG(2) << "WriteRequest size: " << payload.size();
173 io_buffer = new net::DrainableIOBuffer(new net::StringIOBuffer(payload),
174 payload.size());
177 CastTransportImpl::WriteRequest::~WriteRequest() {
180 void CastTransportImpl::SetReadState(ReadState read_state) {
181 if (read_state_ != read_state) {
182 read_state_ = read_state;
183 logger_->LogSocketReadState(channel_id_, ReadStateToProto(read_state_));
187 void CastTransportImpl::SetWriteState(WriteState write_state) {
188 if (write_state_ != write_state) {
189 write_state_ = write_state;
190 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_));
194 void CastTransportImpl::SetErrorState(ChannelError error_state) {
195 VLOG_WITH_CONNECTION(2) << "SetErrorState: " << error_state;
196 error_state_ = error_state;
199 void CastTransportImpl::OnWriteResult(int result) {
200 DCHECK(CalledOnValidThread());
201 if (write_queue_.empty()) {
202 SetWriteState(WRITE_STATE_NONE);
203 return;
206 // Network operations can either finish synchronously or asynchronously.
207 // This method executes the state machine transitions in a loop so that
208 // write state transitions happen even when network operations finish
209 // synchronously.
210 int rv = result;
211 do {
212 VLOG_WITH_CONNECTION(2) << "OnWriteResult (state=" << write_state_ << ", "
213 << "result=" << rv << ", "
214 << "queue size=" << write_queue_.size() << ")";
216 WriteState state = write_state_;
217 write_state_ = WRITE_STATE_NONE;
218 switch (state) {
219 case WRITE_STATE_WRITE:
220 rv = DoWrite();
221 break;
222 case WRITE_STATE_WRITE_COMPLETE:
223 rv = DoWriteComplete(rv);
224 break;
225 case WRITE_STATE_DO_CALLBACK:
226 rv = DoWriteCallback();
227 break;
228 case WRITE_STATE_ERROR:
229 rv = DoWriteError(rv);
230 break;
231 default:
232 NOTREACHED() << "BUG in write flow. Unknown state: " << state;
233 break;
235 } while (!write_queue_.empty() && rv != net::ERR_IO_PENDING &&
236 write_state_ != WRITE_STATE_NONE);
238 // No state change occurred in the switch case above. This means the
239 // write operation has completed.
240 if (write_state_ == WRITE_STATE_NONE) {
241 logger_->LogSocketWriteState(channel_id_, WriteStateToProto(write_state_));
244 if (rv == net::ERR_FAILED) {
245 VLOG_WITH_CONNECTION(2) << "Sending OnError().";
246 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_);
247 delegate_->OnError(error_state_);
250 // If write loop is done because the queue is empty then set write
251 // state to NONE
252 if (write_queue_.empty()) {
253 SetWriteState(WRITE_STATE_NONE);
256 // Write loop is done - flush the remaining write operations if an error
257 // was encountered.
258 if (rv == net::ERR_FAILED) {
259 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_);
260 FlushWriteQueue();
264 int CastTransportImpl::DoWrite() {
265 DCHECK(!write_queue_.empty());
266 WriteRequest& request = write_queue_.front();
268 VLOG_WITH_CONNECTION(2) << "WriteData byte_count = "
269 << request.io_buffer->size() << " bytes_written "
270 << request.io_buffer->BytesConsumed();
272 SetWriteState(WRITE_STATE_WRITE_COMPLETE);
274 int rv = socket_->Write(
275 request.io_buffer.get(), request.io_buffer->BytesRemaining(),
276 base::Bind(&CastTransportImpl::OnWriteResult, base::Unretained(this)));
277 return rv;
280 int CastTransportImpl::DoWriteComplete(int result) {
281 VLOG_WITH_CONNECTION(2) << "DoWriteComplete result=" << result;
282 DCHECK(!write_queue_.empty());
283 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_WRITE, result);
284 if (result <= 0) { // NOTE that 0 also indicates an error
285 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR);
286 SetWriteState(WRITE_STATE_ERROR);
287 return result == 0 ? net::ERR_FAILED : result;
290 // Some bytes were successfully written
291 WriteRequest& request = write_queue_.front();
292 scoped_refptr<net::DrainableIOBuffer> io_buffer = request.io_buffer;
293 io_buffer->DidConsume(result);
294 if (io_buffer->BytesRemaining() == 0) { // Message fully sent
295 SetWriteState(WRITE_STATE_DO_CALLBACK);
296 } else {
297 SetWriteState(WRITE_STATE_WRITE);
300 return net::OK;
303 int CastTransportImpl::DoWriteCallback() {
304 VLOG_WITH_CONNECTION(2) << "DoWriteCallback";
305 DCHECK(!write_queue_.empty());
307 SetWriteState(WRITE_STATE_WRITE);
309 WriteRequest& request = write_queue_.front();
310 int bytes_consumed = request.io_buffer->BytesConsumed();
311 logger_->LogSocketEventForMessage(
312 channel_id_, proto::MESSAGE_WRITTEN, request.message_namespace,
313 base::StringPrintf("Bytes: %d", bytes_consumed));
314 request.callback.Run(net::OK);
315 write_queue_.pop();
316 return net::OK;
319 int CastTransportImpl::DoWriteError(int result) {
320 VLOG_WITH_CONNECTION(2) << "DoWriteError result=" << result;
321 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_);
322 DCHECK_LT(result, 0);
323 return net::ERR_FAILED;
326 void CastTransportImpl::Start() {
327 DCHECK(CalledOnValidThread());
328 DCHECK(!started_);
329 DCHECK(delegate_) << "Read delegate must be set prior to calling Start()";
330 delegate_->Start();
331 if (read_state_ == READ_STATE_NONE) {
332 // Initialize and run the read state machine.
333 SetReadState(READ_STATE_READ);
334 OnReadResult(net::OK);
336 started_ = true;
339 void CastTransportImpl::OnReadResult(int result) {
340 DCHECK(CalledOnValidThread());
341 // Network operations can either finish synchronously or asynchronously.
342 // This method executes the state machine transitions in a loop so that
343 // write state transitions happen even when network operations finish
344 // synchronously.
345 int rv = result;
346 do {
347 VLOG_WITH_CONNECTION(2) << "OnReadResult(state=" << read_state_
348 << ", result=" << rv << ")";
349 ReadState state = read_state_;
350 read_state_ = READ_STATE_NONE;
352 switch (state) {
353 case READ_STATE_READ:
354 rv = DoRead();
355 break;
356 case READ_STATE_READ_COMPLETE:
357 rv = DoReadComplete(rv);
358 break;
359 case READ_STATE_DO_CALLBACK:
360 rv = DoReadCallback();
361 break;
362 case READ_STATE_ERROR:
363 rv = DoReadError(rv);
364 DCHECK_EQ(read_state_, READ_STATE_NONE);
365 break;
366 default:
367 NOTREACHED() << "BUG in read flow. Unknown state: " << state;
368 break;
370 } while (rv != net::ERR_IO_PENDING && read_state_ != READ_STATE_NONE);
372 // No state change occurred in do-while loop above. This means state has
373 // transitioned to NONE.
374 if (read_state_ == READ_STATE_NONE) {
375 logger_->LogSocketReadState(channel_id_, ReadStateToProto(read_state_));
378 if (rv == net::ERR_FAILED) {
379 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_);
380 VLOG_WITH_CONNECTION(2) << "Sending OnError().";
381 delegate_->OnError(error_state_);
385 int CastTransportImpl::DoRead() {
386 VLOG_WITH_CONNECTION(2) << "DoRead";
387 SetReadState(READ_STATE_READ_COMPLETE);
389 // Determine how many bytes need to be read.
390 size_t num_bytes_to_read = framer_->BytesRequested();
391 DCHECK_GT(num_bytes_to_read, 0u);
393 // Read up to num_bytes_to_read into |current_read_buffer_|.
394 return socket_->Read(
395 read_buffer_.get(), base::checked_cast<uint32>(num_bytes_to_read),
396 base::Bind(&CastTransportImpl::OnReadResult, base::Unretained(this)));
399 int CastTransportImpl::DoReadComplete(int result) {
400 VLOG_WITH_CONNECTION(2) << "DoReadComplete result = " << result;
401 logger_->LogSocketEventWithRv(channel_id_, proto::SOCKET_READ, result);
402 if (result <= 0) {
403 VLOG_WITH_CONNECTION(1) << "Read error, peer closed the socket.";
404 SetErrorState(CHANNEL_ERROR_SOCKET_ERROR);
405 SetReadState(READ_STATE_ERROR);
406 return result == 0 ? net::ERR_FAILED : result;
409 size_t message_size;
410 DCHECK(!current_message_);
411 ChannelError framing_error;
412 current_message_ = framer_->Ingest(result, &message_size, &framing_error);
413 if (current_message_.get() && (framing_error == CHANNEL_ERROR_NONE)) {
414 DCHECK_GT(message_size, static_cast<size_t>(0));
415 logger_->LogSocketEventForMessage(
416 channel_id_, proto::MESSAGE_READ, current_message_->namespace_(),
417 base::StringPrintf("Message size: %u",
418 static_cast<uint32>(message_size)));
419 SetReadState(READ_STATE_DO_CALLBACK);
420 } else if (framing_error != CHANNEL_ERROR_NONE) {
421 DCHECK(!current_message_);
422 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE);
423 SetReadState(READ_STATE_ERROR);
424 } else {
425 DCHECK(!current_message_);
426 SetReadState(READ_STATE_READ);
428 return net::OK;
431 int CastTransportImpl::DoReadCallback() {
432 VLOG_WITH_CONNECTION(2) << "DoReadCallback";
433 if (!IsCastMessageValid(*current_message_)) {
434 SetReadState(READ_STATE_ERROR);
435 SetErrorState(CHANNEL_ERROR_INVALID_MESSAGE);
436 return net::ERR_INVALID_RESPONSE;
438 SetReadState(READ_STATE_READ);
439 delegate_->OnMessage(*current_message_);
440 current_message_.reset();
441 return net::OK;
444 int CastTransportImpl::DoReadError(int result) {
445 VLOG_WITH_CONNECTION(2) << "DoReadError";
446 DCHECK_NE(CHANNEL_ERROR_NONE, error_state_);
447 DCHECK_LE(result, 0);
448 return net::ERR_FAILED;
451 } // namespace cast_channel
452 } // namespace core_api
453 } // namespace extensions