1 #include "thread_state.hpp"
4 #include "arguments.hpp"
5 #include "environment.hpp"
6 #include "dispatch.hpp"
7 #include "on_stack.hpp"
8 #include "object_utils.hpp"
9 #include "thread_phase.hpp"
11 #include "class/array.hpp"
12 #include "class/channel.hpp"
13 #include "class/class.hpp"
14 #include "class/fsevent.hpp"
15 #include "class/io.hpp"
16 #include "class/string.hpp"
17 #include "class/thread.hpp"
19 #include "dtrace/dtrace.h"
21 #include "util/file.hpp"
27 #include <sys/types.h>
35 #include <sys/types.h>
40 using namespace utilities
;
43 static int open_file(STATE
, std::string path
) {
44 int perms
= state
->configuration()->console_access
;
45 int fd
= IO::open_with_cloexec(state
, path
.c_str(),
46 O_CREAT
| O_TRUNC
| O_RDWR
| O_SYNC
, perms
);
49 logger::error("console: unable to open: %s, %s", path
.c_str(), strerror(errno
));
52 // The umask setting will override our permissions for open().
53 if(chmod(path
.c_str(), perms
) < 0) {
54 logger::error("console: unable to set mode: %s, %s", path
.c_str(), strerror(errno
));
60 Request::Request(STATE
, Console
* console
, Response
* response
)
61 : MachineThread(state
, "rbx.console.request", MachineThread::eSmall
)
70 void Request::initialize(STATE
) {
71 if((fd_
= open_file(state
, console_
->request_path())) < 0) {
72 logger::error("console request: unable to open file, %s", strerror(errno
));
76 FSEvent
* fsevent_
= FSEvent::create(state
);
77 fsevent_
->watch_file(state
, fd_
, console_
->request_path().c_str());
82 void Request::start_thread(STATE
) {
85 MachineThread::start_thread(state
);
88 void Request::wakeup(STATE
) {
89 MachineThread::wakeup(state
);
91 if(write(fd_
, "\0", 1) < 0) {
92 logger::error("console: unable to wake request thread, %s", strerror(errno
));
96 void Request::close_request() {
103 void Request::stop_thread(STATE
) {
104 MachineThread::stop_thread(state
);
109 void Request::after_fork_child(STATE
) {
113 char* Request::read_request(STATE
) {
114 file::LockGuard
guard(fd_
, LOCK_EX
);
116 if(guard
.status() != file::eLockSucceeded
) {
117 logger::error("console: unable to lock request file, %s", strerror(errno
));
123 lseek(fd_
, 0, SEEK_SET
);
124 ssize_t bytes
= ::read(fd_
, buf
, 1024);
128 req
= new char[bytes
+1];
129 memcpy(req
, buf
, bytes
);
131 // TODO diagnostics, metrics
132 // vm()->metrics().console.requests_received++;
133 } else if(bytes
< 0) {
134 logger::error("console: unable to read request, %s", strerror(errno
));
137 if(lseek(fd_
, 0, SEEK_SET
) < 0) {
138 logger::error("console: unable to rewind request file, %s", strerror(errno
));
140 if(ftruncate(fd_
, 0) < 0) {
141 logger::error("console: unable to truncate request file, %s", strerror(errno
));
147 void Request::run(STATE
) {
148 if(!enabled_
) return;
150 while(!thread_exit_
) {
151 Object
* status
= fsevent_
->wait_for_event(state
);
153 if(thread_exit_
) break;
155 if(status
->nil_p()) {
156 logger::error("console: request: wait for event failed, %s", strerror(errno
));
160 char* request
= read_request(state
);
163 response_
->send_request(state
, request
);
168 void Request::trace_objects(STATE
, std::function
<void (STATE
, Object
**)> f
) {
169 f(state
, reinterpret_cast<Object
**>(&fsevent_
));
172 Response::Response(STATE
, Console
* console
)
173 : MachineThread(state
, "rbx.console.response", MachineThread::eSmall
)
178 , request_list_(nullptr)
183 inbox_
= as
<Channel
>(
184 console_
->ruby_console()->get_ivar(state
, state
->symbol("@inbox")));
185 outbox_
= as
<Channel
>(
186 console_
->ruby_console()->get_ivar(state
, state
->symbol("@outbox")));
189 Response::~Response() {
190 delete request_list_
;
191 request_list_
= NULL
;
194 void Response::initialize(STATE
) {
195 Thread::create(state
, thread_state());
198 void Response::start_thread(STATE
) {
199 if((fd_
= open_file(state
, console_
->response_path())) < 0) {
200 logger::error("console response: unable to open file, %s", strerror(errno
));
204 new(&list_lock_
) locks::spinlock_mutex
;
205 new(&response_lock_
) std::mutex
;
206 new(&response_cond_
) std::condition_variable
;
208 request_list_
= new RequestList
;
210 MachineThread::start_thread(state
);
213 void Response::wakeup(STATE
) {
214 MachineThread::wakeup(state
);
216 inbox_
->send(state
, String::create(state
, ""));
218 response_cond_
.notify_one();
221 void Response::close_response() {
228 void Response::clear_requests() {
230 for(RequestList::const_iterator i
= request_list_
->begin();
231 i
!= request_list_
->end();
237 request_list_
= NULL
;
241 void Response::stop_thread(STATE
) {
242 MachineThread::stop_thread(state
);
247 void Response::after_fork_child(STATE
) {
251 void Response::send_request(STATE
, const char* request
) {
252 std::lock_guard
<locks::spinlock_mutex
> guard(list_lock_
);
254 request_list_
->push_back(const_cast<char*>(request
));
255 response_cond_
.notify_one();
258 void Response::write_response(STATE
, const char* response
, intptr_t size
) {
259 file::LockGuard
guard(fd_
, LOCK_EX
);
261 if(guard
.status() != file::eLockSucceeded
) {
262 logger::error("console: unable to lock response file, %s", strerror(errno
));
266 if(lseek(fd_
, 0, SEEK_SET
) < 0) {
267 logger::error("console: unable to rewind response file, %s", strerror(errno
));
270 if(ftruncate(fd_
, 0) < 0) {
271 logger::error("console: unable to truncate response file, %s", strerror(errno
));
275 if(::write(fd_
, response
, size
) < 0) {
276 logger::error("console: unable to write response, %s", strerror(errno
));
279 // TODO diagnostics, metrics
280 // vm()->metrics().console.responses_sent++;
283 void Response::run(STATE
) {
284 size_t pending_requests
= 0;
285 char* request
= NULL
;
287 String
* response
= 0;
288 OnStack
<3> os(state
, inbox_
, outbox_
, response
);
290 while(!thread_exit_
) {
292 std::lock_guard
<locks::spinlock_mutex
> guard(list_lock_
);
294 if(request_list_
->size() > 0) {
295 request
= request_list_
->back();
296 request_list_
->pop_back();
300 if(thread_exit_
) break;
303 ManagedPhase
managed(state
);
307 inbox_
->send(state
, String::create(state
, request
));
312 if(pending_requests
> 0) {
313 if((response
= try_as
<String
>(outbox_
->try_receive(state
)))) {
314 write_response(state
,
315 reinterpret_cast<const char*>(response
->byte_address()),
316 response
->byte_size());
323 UnmanagedPhase
unmanaged(state
);
324 std::unique_lock
<std::mutex
> lock(response_lock_
);
326 if(thread_exit_
) break;
328 response_cond_
.wait(lock
);
333 void Response::trace_objects(STATE
, std::function
<void (STATE
, Object
**)> f
) {
334 f(state
, reinterpret_cast<Object
**>(&inbox_
));
335 f(state
, reinterpret_cast<Object
**>(&outbox_
));
338 Listener::Listener(STATE
, Console
* console
)
339 : MachineThread(state
, "rbx.console.listener", MachineThread::eSmall
)
346 Listener::~Listener() {
350 void Listener::initialize(STATE
) {
351 fd_
= ::open(console_
->console_path().c_str(),
352 O_CREAT
| O_TRUNC
| O_RDWR
| O_CLOEXEC
,
353 state
->configuration()->console_access
.value
);
356 logger::error("console: unable to open Console connection file, %s", strerror(errno
));
359 // The umask setting will override our permissions for open().
360 if(chmod(console_
->console_path().c_str(),
361 state
->configuration()->console_access
.value
) < 0)
363 logger::error("console: unable to set mode for Console connection file, %s",
367 FSEvent
* fsevent_
= FSEvent::create(state
);
368 fsevent_
->watch_file(state
, fd_
, console_
->console_path().c_str());
371 void Listener::start_thread(STATE
) {
372 MachineThread::start_thread(state
);
375 void Listener::wakeup(STATE
) {
376 MachineThread::wakeup(state
);
378 if(write(fd_
, "\0", 1) < 0) {
379 logger::error("console: unable to wake listener thread, %s", strerror(errno
));
383 bool Listener::connection_initiated() {
386 bool req
= stat(console_
->request_path().c_str(), &st
) == 0 && S_ISREG(st
.st_mode
);
387 bool res
= stat(console_
->response_path().c_str(), &st
) == 0 && S_ISREG(st
.st_mode
);
392 void Listener::run(STATE
) {
393 while(!thread_exit_
) {
394 Object
* status
= fsevent_
->wait_for_event(state
);
396 if(thread_exit_
) break;
398 if(status
->nil_p()) {
399 logger::error("console: listener: wait for event failed, %s", strerror(errno
));
403 if(console_
->connected_p()) continue;
405 if(connection_initiated()) {
406 console_
->accept(state
);
411 void Listener::trace_objects(STATE
, std::function
<void (STATE
, Object
**)> f
) {
412 f(state
, reinterpret_cast<Object
**>(&fsevent_
));
416 using namespace console
;
418 Console::Console(STATE
)
422 , ruby_console_(nullptr)
424 console_path_
= state
->configuration()->console_path
.value
;
426 std::ostringstream basename
;
427 basename
<< state
->configuration()->console_path
.value
<< "-"
428 << state
->environment()->pid();
430 request_path_
= basename
.str() + "-request";
431 response_path_
= basename
.str() + "-response";
433 listener_
= new Listener(state
, this);
436 Console::~Console() {
437 if(listener_
) delete listener_
;
441 bool Console::connected_p() {
442 return request_
&& request_
->enabled_p();
445 void Console::start(STATE
) {
446 listener_
->start(state
);
449 void Console::accept(STATE
) {
450 ruby_console_
= server_class(state
)->send(state
, 0, state
->symbol("new"));
452 response_
= new Response(state
, this);
453 request_
= new Request(state
, this, response_
);
456 void Console::reset() {
467 ruby_console_
= cNil
;
470 void Console::after_fork_child(STATE
) {
474 Class
* Console::server_class(STATE
) {
475 Module
* mod
= as
<Module
>(G(rubinius
)->get_const(state
, "Console"));
476 return as
<Class
>(mod
->get_const(state
, "Server"));
479 void Console::trace_objects(STATE
, std::function
<void (STATE
, Object
**)> f
) {
480 f(state
, reinterpret_cast<Object
**>(&ruby_console_
));
483 listener_
->trace_objects(state
, f
);
487 request_
->trace_objects(state
, f
);
491 response_
->trace_objects(state
, f
);