Add test_runner support for new accessibility event
[chromium-blink-merge.git] / remoting / host / native_messaging / native_messaging_reader.cc
blobb2ff66d8582d1bfd6e4479861b404e101bff24ec
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/host/native_messaging/native_messaging_reader.h"
7 #include <string>
9 #include "base/bind.h"
10 #include "base/files/file.h"
11 #include "base/json/json_reader.h"
12 #include "base/location.h"
13 #include "base/sequenced_task_runner.h"
14 #include "base/single_thread_task_runner.h"
15 #include "base/stl_util.h"
16 #include "base/thread_task_runner_handle.h"
17 #include "base/threading/sequenced_worker_pool.h"
18 #include "base/values.h"
20 namespace {
22 // uint32 is specified in the protocol as the type for the message header.
23 typedef uint32 MessageLengthType;
25 const int kMessageHeaderSize = sizeof(MessageLengthType);
27 // Limit the size of received messages, to avoid excessive memory-allocation in
28 // this process, and potential overflow issues when casting to a signed 32-bit
29 // int.
30 const MessageLengthType kMaximumMessageSize = 1024 * 1024;
32 } // namespace
34 namespace remoting {
36 class NativeMessagingReader::Core {
37 public:
38 Core(base::File file,
39 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner,
40 scoped_refptr<base::SequencedTaskRunner> read_task_runner,
41 base::WeakPtr<NativeMessagingReader> reader_);
42 ~Core();
44 // Reads a message from the Native Messaging client and passes it to
45 // |message_callback_| on the originating thread. Called on the reader thread.
46 void ReadMessage();
48 private:
49 // Notify the reader's EOF callback when an error occurs or EOF is reached.
50 void NotifyEof();
52 base::File read_stream_;
54 base::WeakPtr<NativeMessagingReader> reader_;
56 // Used to post the caller-supplied reader callbacks on the caller thread.
57 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_;
59 // Used to DCHECK that the reader code executes on the correct thread.
60 scoped_refptr<base::SequencedTaskRunner> read_task_runner_;
62 DISALLOW_COPY_AND_ASSIGN(Core);
65 NativeMessagingReader::Core::Core(
66 base::File file,
67 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner,
68 scoped_refptr<base::SequencedTaskRunner> read_task_runner,
69 base::WeakPtr<NativeMessagingReader> reader)
70 : read_stream_(file.Pass()),
71 reader_(reader),
72 caller_task_runner_(caller_task_runner),
73 read_task_runner_(read_task_runner) {
76 NativeMessagingReader::Core::~Core() {}
78 void NativeMessagingReader::Core::ReadMessage() {
79 DCHECK(read_task_runner_->RunsTasksOnCurrentThread());
81 // Keep reading messages until the stream is closed or an error occurs.
82 while (true) {
83 MessageLengthType message_length;
84 int read_result = read_stream_.ReadAtCurrentPos(
85 reinterpret_cast<char*>(&message_length), kMessageHeaderSize);
86 if (read_result != kMessageHeaderSize) {
87 // 0 means EOF which is normal and should not be logged as an error.
88 if (read_result != 0) {
89 LOG(ERROR) << "Failed to read message header, read returned "
90 << read_result;
92 NotifyEof();
93 return;
96 if (message_length > kMaximumMessageSize) {
97 LOG(ERROR) << "Message size too large: " << message_length;
98 NotifyEof();
99 return;
102 std::string message_json(message_length, '\0');
103 read_result = read_stream_.ReadAtCurrentPos(string_as_array(&message_json),
104 message_length);
105 if (read_result != static_cast<int>(message_length)) {
106 LOG(ERROR) << "Failed to read message body, read returned "
107 << read_result;
108 NotifyEof();
109 return;
112 scoped_ptr<base::Value> message(base::JSONReader::Read(message_json));
113 if (!message) {
114 LOG(ERROR) << "Failed to parse JSON message: " << message;
115 NotifyEof();
116 return;
119 // Notify callback of new message.
120 caller_task_runner_->PostTask(
121 FROM_HERE, base::Bind(&NativeMessagingReader::InvokeMessageCallback,
122 reader_, base::Passed(&message)));
126 void NativeMessagingReader::Core::NotifyEof() {
127 DCHECK(read_task_runner_->RunsTasksOnCurrentThread());
128 caller_task_runner_->PostTask(
129 FROM_HERE,
130 base::Bind(&NativeMessagingReader::InvokeEofCallback, reader_));
133 NativeMessagingReader::NativeMessagingReader(base::File file)
134 : reader_thread_("Reader"),
135 weak_factory_(this) {
136 reader_thread_.Start();
137 read_task_runner_ = reader_thread_.message_loop_proxy();
138 core_.reset(new Core(file.Pass(), base::ThreadTaskRunnerHandle::Get(),
139 read_task_runner_, weak_factory_.GetWeakPtr()));
142 NativeMessagingReader::~NativeMessagingReader() {
143 read_task_runner_->DeleteSoon(FROM_HERE, core_.release());
146 void NativeMessagingReader::Start(MessageCallback message_callback,
147 base::Closure eof_callback) {
148 message_callback_ = message_callback;
149 eof_callback_ = eof_callback;
151 // base::Unretained is safe since |core_| is only deleted via the
152 // DeleteSoon task which is posted from this class's dtor.
153 read_task_runner_->PostTask(
154 FROM_HERE, base::Bind(&NativeMessagingReader::Core::ReadMessage,
155 base::Unretained(core_.get())));
158 void NativeMessagingReader::InvokeMessageCallback(
159 scoped_ptr<base::Value> message) {
160 message_callback_.Run(message.Pass());
163 void NativeMessagingReader::InvokeEofCallback() {
164 eof_callback_.Run();
167 } // namespace remoting