1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/host/native_messaging/native_messaging_reader.h"
10 #include "base/json/json_reader.h"
11 #include "base/location.h"
12 #include "base/sequenced_task_runner.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/stl_util.h"
15 #include "base/thread_task_runner_handle.h"
16 #include "base/threading/sequenced_worker_pool.h"
17 #include "base/values.h"
18 #include "net/base/file_stream.h"
22 // uint32 is specified in the protocol as the type for the message header.
23 typedef uint32 MessageLengthType
;
25 const int kMessageHeaderSize
= sizeof(MessageLengthType
);
27 // Limit the size of received messages, to avoid excessive memory-allocation in
28 // this process, and potential overflow issues when casting to a signed 32-bit
30 const MessageLengthType kMaximumMessageSize
= 1024 * 1024;
36 class NativeMessagingReader::Core
{
38 Core(base::PlatformFile handle
,
39 scoped_refptr
<base::SingleThreadTaskRunner
> caller_task_runner
,
40 scoped_refptr
<base::SequencedTaskRunner
> read_task_runner
,
41 base::WeakPtr
<NativeMessagingReader
> reader_
);
44 // Reads a message from the Native Messaging client and passes it to
45 // |message_callback_| on the originating thread. Called on the reader thread.
49 // Notify the reader's EOF callback when an error occurs or EOF is reached.
52 net::FileStream read_stream_
;
54 base::WeakPtr
<NativeMessagingReader
> reader_
;
56 // Used to post the caller-supplied reader callbacks on the caller thread.
57 scoped_refptr
<base::SingleThreadTaskRunner
> caller_task_runner_
;
59 // Used to DCHECK that the reader code executes on the correct thread.
60 scoped_refptr
<base::SequencedTaskRunner
> read_task_runner_
;
62 DISALLOW_COPY_AND_ASSIGN(Core
);
65 NativeMessagingReader::Core::Core(
66 base::PlatformFile handle
,
67 scoped_refptr
<base::SingleThreadTaskRunner
> caller_task_runner
,
68 scoped_refptr
<base::SequencedTaskRunner
> read_task_runner
,
69 base::WeakPtr
<NativeMessagingReader
> reader
)
70 : read_stream_(handle
, base::PLATFORM_FILE_READ
, NULL
),
72 caller_task_runner_(caller_task_runner
),
73 read_task_runner_(read_task_runner
) {
76 NativeMessagingReader::Core::~Core() {}
78 void NativeMessagingReader::Core::ReadMessage() {
79 DCHECK(read_task_runner_
->RunsTasksOnCurrentThread());
81 // Keep reading messages until the stream is closed or an error occurs.
83 MessageLengthType message_length
;
84 int read_result
= read_stream_
.ReadUntilComplete(
85 reinterpret_cast<char*>(&message_length
), kMessageHeaderSize
);
86 if (read_result
!= kMessageHeaderSize
) {
87 // 0 means EOF which is normal and should not be logged as an error.
88 if (read_result
!= 0) {
89 LOG(ERROR
) << "Failed to read message header, read returned "
96 if (message_length
> kMaximumMessageSize
) {
97 LOG(ERROR
) << "Message size too large: " << message_length
;
102 std::string
message_json(message_length
, '\0');
103 read_result
= read_stream_
.ReadUntilComplete(string_as_array(&message_json
),
105 if (read_result
!= static_cast<int>(message_length
)) {
106 LOG(ERROR
) << "Failed to read message body, read returned "
112 scoped_ptr
<base::Value
> message(base::JSONReader::Read(message_json
));
114 LOG(ERROR
) << "Failed to parse JSON message: " << message
;
119 // Notify callback of new message.
120 caller_task_runner_
->PostTask(
121 FROM_HERE
, base::Bind(&NativeMessagingReader::InvokeMessageCallback
,
122 reader_
, base::Passed(&message
)));
126 void NativeMessagingReader::Core::NotifyEof() {
127 DCHECK(read_task_runner_
->RunsTasksOnCurrentThread());
128 caller_task_runner_
->PostTask(
130 base::Bind(&NativeMessagingReader::InvokeEofCallback
, reader_
));
133 NativeMessagingReader::NativeMessagingReader(base::PlatformFile handle
)
134 : reader_thread_("Reader"),
135 weak_factory_(this) {
136 reader_thread_
.Start();
137 read_task_runner_
= reader_thread_
.message_loop_proxy();
138 core_
.reset(new Core(handle
, base::ThreadTaskRunnerHandle::Get(),
139 read_task_runner_
, weak_factory_
.GetWeakPtr()));
142 NativeMessagingReader::~NativeMessagingReader() {
143 read_task_runner_
->DeleteSoon(FROM_HERE
, core_
.release());
146 void NativeMessagingReader::Start(MessageCallback message_callback
,
147 base::Closure eof_callback
) {
148 message_callback_
= message_callback
;
149 eof_callback_
= eof_callback
;
151 // base::Unretained is safe since |core_| is only deleted via the
152 // DeleteSoon task which is posted from this class's dtor.
153 read_task_runner_
->PostTask(
154 FROM_HERE
, base::Bind(&NativeMessagingReader::Core::ReadMessage
,
155 base::Unretained(core_
.get())));
158 void NativeMessagingReader::InvokeMessageCallback(
159 scoped_ptr
<base::Value
> message
) {
160 message_callback_
.Run(message
.Pass());
163 void NativeMessagingReader::InvokeEofCallback() {
167 } // namespace remoting