1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/spy/spy.h"
10 #include "base/compiler_specific.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/memory/ref_counted.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/strings/string_number_conversions.h"
16 #include "base/strings/string_split.h"
17 #include "base/threading/thread.h"
18 #include "base/threading/worker_pool.h"
19 #include "base/time/time.h"
20 #include "mojo/public/cpp/system/core.h"
21 #include "mojo/service_manager/service_manager.h"
22 #include "mojo/spy/common.h"
23 #include "mojo/spy/public/spy.mojom.h"
24 #include "mojo/spy/spy_server_impl.h"
25 #include "mojo/spy/websocket_server.h"
30 mojo::WebSocketServer
* ws_server
= NULL
;
32 const size_t kMessageBufSize
= 2 * 1024;
33 const size_t kHandleBufSize
= 64;
34 const int kDefaultWebSocketPort
= 42424;
36 void CloseHandles(MojoHandle
* handles
, size_t count
) {
37 for (size_t ix
= 0; ix
!= count
; ++count
)
38 MojoClose(handles
[ix
]);
41 // In charge of processing messages that flow over a
42 // single message pipe.
43 class MessageProcessor
:
44 public base::RefCountedThreadSafe
<MessageProcessor
> {
46 MessageProcessor(base::MessageLoopProxy
* control_loop_proxy
)
47 : last_result_(MOJO_RESULT_OK
),
49 control_loop_proxy_(control_loop_proxy
) {
50 message_count_
[0] = 0;
51 message_count_
[1] = 0;
56 void Start(mojo::ScopedMessagePipeHandle client
,
57 mojo::ScopedMessagePipeHandle interceptor
,
59 std::vector
<mojo::MessagePipeHandle
> pipes
;
60 pipes
.push_back(client
.get());
61 pipes
.push_back(interceptor
.get());
62 std::vector
<MojoHandleSignals
> handle_signals
;
63 handle_signals
.push_back(MOJO_HANDLE_SIGNAL_READABLE
);
64 handle_signals
.push_back(MOJO_HANDLE_SIGNAL_READABLE
);
66 scoped_ptr
<char[]> mbuf(new char[kMessageBufSize
]);
67 scoped_ptr
<MojoHandle
[]> hbuf(new MojoHandle
[kHandleBufSize
]);
69 // Main processing loop:
70 // 1- Wait for an endpoint to have a message.
71 // 2- Read the message
73 // 4- Wait until the opposite port is ready for writting
74 // 4- Write the message to opposite port.
77 int r
= WaitMany(pipes
, handle_signals
, MOJO_DEADLINE_INDEFINITE
);
78 if ((r
< 0) || (r
> 1)) {
83 uint32_t bytes_read
= kMessageBufSize
;
84 uint32_t handles_read
= kHandleBufSize
;
86 if (!CheckResult(ReadMessageRaw(pipes
[r
],
87 mbuf
.get(), &bytes_read
,
88 hbuf
.get(), &handles_read
,
89 MOJO_READ_MESSAGE_FLAG_NONE
)))
92 if (!bytes_read
&& !handles_read
)
96 handle_count_
[r
] += handles_read
;
98 // Intercept message pipes which are returned via the ReadMessageRaw
100 for (uint32_t i
= 0; i
< handles_read
; i
++) {
101 // Hack to determine if a handle is a message pipe.
103 // We should have an API which given a handle returns additional
104 // information about the handle which includes its type, etc.
105 if (MojoReadMessage(hbuf
[i
], NULL
, NULL
, NULL
, NULL
,
106 MOJO_READ_MESSAGE_FLAG_NONE
) !=
107 MOJO_RESULT_INVALID_ARGUMENT
) {
108 mojo::ScopedMessagePipeHandle message_pipe_handle
;
109 message_pipe_handle
.reset(mojo::MessagePipeHandle(hbuf
[i
]));
111 mojo::ScopedMessagePipeHandle faux_client
;
112 mojo::ScopedMessagePipeHandle interceptor
;
113 CreateMessagePipe(NULL
, &faux_client
, &interceptor
);
115 base::WorkerPool::PostTask(
117 base::Bind(&MessageProcessor::Start
,
119 base::Passed(&message_pipe_handle
),
120 base::Passed(&interceptor
),
123 hbuf
.get()[i
] = faux_client
.release().value();
128 bytes_transfered_
+= bytes_read
;
130 LogMessageInfo(mbuf
.get(), url
);
132 mojo::MessagePipeHandle write_handle
= (r
== 0) ? pipes
[1] : pipes
[0];
133 if (!CheckResult(Wait(write_handle
,
134 MOJO_HANDLE_SIGNAL_WRITABLE
,
135 MOJO_DEADLINE_INDEFINITE
)))
138 if (!CheckResult(WriteMessageRaw(write_handle
,
139 mbuf
.get(), bytes_read
,
140 hbuf
.get(), handles_read
,
141 MOJO_WRITE_MESSAGE_FLAG_NONE
))) {
142 // On failure we own the handles. For now just close them.
144 CloseHandles(hbuf
.get(), handles_read
);
151 friend class base::RefCountedThreadSafe
<MessageProcessor
>;
152 virtual ~MessageProcessor() {}
154 bool CheckResult(MojoResult mr
) {
155 if (mr
== MOJO_RESULT_OK
)
161 void LogInvalidMessage(const mojo::MojoMessageHeader
& header
) {
162 LOG(ERROR
) << "Invalid message: Number of Fields: "
164 << " Number of bytes: "
170 // Validates the message as per the mojo spec.
171 bool IsValidMessage(const mojo::MojoMessageHeader
& header
) {
172 if (header
.num_fields
== 2) {
173 if (header
.num_bytes
!= sizeof(mojo::MojoMessageHeader
)) {
174 LogInvalidMessage(header
);
177 } else if (header
.num_fields
== 3) {
178 if (header
.num_bytes
!= sizeof(mojo::MojoRequestHeader
)) {
179 LogInvalidMessage(header
);
181 } else if (header
.num_fields
> 3) {
182 if (header
.num_bytes
< sizeof(mojo::MojoRequestHeader
)) {
183 LogInvalidMessage(header
);
187 // These flags should be specified in request or response messages.
188 if (header
.num_fields
< 3 &&
189 ((header
.flags
& mojo::kMessageExpectsResponse
) ||
190 (header
.flags
& mojo::kMessageIsResponse
))) {
191 LOG(ERROR
) << "Invalid request message.";
192 LogInvalidMessage(header
);
195 // These flags are mutually exclusive.
196 if ((header
.flags
& mojo::kMessageExpectsResponse
) &&
197 (header
.flags
& mojo::kMessageIsResponse
)) {
198 LOG(ERROR
) << "Invalid flags combination in request message.";
199 LogInvalidMessage(header
);
205 void LogMessageInfo(void* data
, const GURL
& url
) {
206 mojo::MojoMessageData
* message_data
=
207 reinterpret_cast<mojo::MojoMessageData
*>(data
);
208 if (IsValidMessage(message_data
->header
)) {
209 control_loop_proxy_
->PostTask(
211 base::Bind(&mojo::WebSocketServer::LogMessageInfo
,
212 base::Unretained(ws_server
),
213 message_data
->header
, url
, base::Time::Now()));
217 MojoResult last_result_
;
218 uint32_t bytes_transfered_
;
219 uint32_t message_count_
[2];
220 uint32_t handle_count_
[2];
221 scoped_refptr
<base::MessageLoopProxy
> control_loop_proxy_
;
224 // In charge of intercepting access to the service manager.
225 class SpyInterceptor
: public mojo::ServiceManager::Interceptor
{
227 explicit SpyInterceptor(scoped_refptr
<mojo::SpyServerImpl
> spy_server
,
228 base::MessageLoopProxy
* control_loop_proxy
)
229 : spy_server_(spy_server
),
230 proxy_(base::MessageLoopProxy::current()),
231 control_loop_proxy_(control_loop_proxy
){
235 virtual mojo::ServiceProviderPtr
OnConnectToClient(
236 const GURL
& url
, mojo::ServiceProviderPtr real_client
) OVERRIDE
{
237 if (!MustIntercept(url
))
238 return real_client
.Pass();
240 // You can get an invalid handle if the app (or service) is
241 // created by unconventional means, for example the command line.
243 return real_client
.Pass();
245 mojo::ScopedMessagePipeHandle faux_client
;
246 mojo::ScopedMessagePipeHandle interceptor
;
247 CreateMessagePipe(NULL
, &faux_client
, &interceptor
);
249 scoped_refptr
<MessageProcessor
> processor
= new MessageProcessor(
250 control_loop_proxy_
);
251 mojo::ScopedMessagePipeHandle real_handle
= real_client
.PassMessagePipe();
252 base::WorkerPool::PostTask(
254 base::Bind(&MessageProcessor::Start
,
256 base::Passed(&real_handle
), base::Passed(&interceptor
),
260 mojo::ServiceProviderPtr faux_provider
;
261 faux_provider
.Bind(faux_client
.Pass());
262 return faux_provider
.Pass();
265 bool MustIntercept(const GURL
& url
) {
266 // TODO(cpu): manage who and when to intercept.
269 base::Bind(&mojo::SpyServerImpl::OnIntercept
, spy_server_
, url
));
273 scoped_refptr
<mojo::SpyServerImpl
> spy_server_
;
274 scoped_refptr
<base::MessageLoopProxy
> proxy_
;
275 scoped_refptr
<base::MessageLoopProxy
> control_loop_proxy_
;
278 void StartWebServer(int port
, mojo::ScopedMessagePipeHandle pipe
) {
279 // TODO(cpu) figure out lifetime of the server. See Spy() dtor.
280 ws_server
= new mojo::WebSocketServer(port
, pipe
.Pass());
288 : websocket_port(kDefaultWebSocketPort
) {
292 SpyOptions
ProcessOptions(const std::string
& options
) {
293 SpyOptions spy_options
;
296 base::StringPairs kv_pairs
;
297 base::SplitStringIntoKeyValuePairs(options
, ':', ',', &kv_pairs
);
298 base::StringPairs::iterator it
= kv_pairs
.begin();
299 for (; it
!= kv_pairs
.end(); ++it
) {
300 if (it
->first
== "port") {
302 if (base::StringToInt(it
->second
, &port
))
303 spy_options
.websocket_port
= port
;
313 Spy::Spy(mojo::ServiceManager
* service_manager
, const std::string
& options
) {
314 SpyOptions spy_options
= ProcessOptions(options
);
316 spy_server_
= new SpyServerImpl();
318 // Start the tread what will accept commands from the frontend.
319 control_thread_
.reset(new base::Thread("mojo_spy_control_thread"));
320 base::Thread::Options
thread_options(base::MessageLoop::TYPE_IO
, 0);
321 control_thread_
->StartWithOptions(thread_options
);
322 control_thread_
->message_loop_proxy()->PostTask(
323 FROM_HERE
, base::Bind(&StartWebServer
,
324 spy_options
.websocket_port
,
325 base::Passed(spy_server_
->ServerPipe())));
327 // Start intercepting mojo services.
328 service_manager
->SetInterceptor(new SpyInterceptor(
329 spy_server_
, control_thread_
->message_loop_proxy()));
333 // TODO(cpu): Do not leak the interceptor. Lifetime between the
334 // service_manager and the spy is still unclear hence the leak.