Fixed some C/C++ compiler errors due to stricter checks.
[rubinius.git] / machine / console.cpp
blob2c6ed71ffe051c757e0cf7fc3e4740c5e66a8555
1 #include "thread_state.hpp"
2 #include "console.hpp"
4 #include "arguments.hpp"
5 #include "environment.hpp"
6 #include "dispatch.hpp"
7 #include "on_stack.hpp"
8 #include "object_utils.hpp"
9 #include "thread_phase.hpp"
11 #include "class/array.hpp"
12 #include "class/channel.hpp"
13 #include "class/class.hpp"
14 #include "class/fsevent.hpp"
15 #include "class/io.hpp"
16 #include "class/string.hpp"
17 #include "class/thread.hpp"
19 #include "dtrace/dtrace.h"
21 #include "util/file.hpp"
22 #include "logger.hpp"
24 #include <fcntl.h>
25 #include <sys/file.h>
26 #include <sys/stat.h>
27 #include <sys/types.h>
29 #include <stdio.h>
30 #include <libgen.h>
32 // read
33 #include <unistd.h>
34 #include <sys/uio.h>
35 #include <sys/types.h>
37 #include <sstream>
39 namespace rubinius {
40 using namespace utilities;
42 namespace console {
43 static int open_file(STATE, std::string path) {
44 int perms = state->configuration()->console_access;
45 int fd = IO::open_with_cloexec(state, path.c_str(),
46 O_CREAT | O_TRUNC | O_RDWR | O_SYNC, perms);
48 if(fd < 0) {
49 logger::error("console: unable to open: %s, %s", path.c_str(), strerror(errno));
52 // The umask setting will override our permissions for open().
53 if(chmod(path.c_str(), perms) < 0) {
54 logger::error("console: unable to set mode: %s, %s", path.c_str(), strerror(errno));
57 return fd;
60 Request::Request(STATE, Console* console, Response* response)
61 : MachineThread(state, "rbx.console.request", MachineThread::eSmall)
62 , console_(console)
63 , response_(response)
64 , enabled_(false)
65 , fd_(-1)
66 , fsevent_(nullptr)
70 void Request::initialize(STATE) {
71 if((fd_ = open_file(state, console_->request_path())) < 0) {
72 logger::error("console request: unable to open file, %s", strerror(errno));
73 return;
76 FSEvent* fsevent_ = FSEvent::create(state);
77 fsevent_->watch_file(state, fd_, console_->request_path().c_str());
79 enabled_ = true;
82 void Request::start_thread(STATE) {
83 if(!enabled_) return;
85 MachineThread::start_thread(state);
88 void Request::wakeup(STATE) {
89 MachineThread::wakeup(state);
91 if(write(fd_, "\0", 1) < 0) {
92 logger::error("console: unable to wake request thread, %s", strerror(errno));
96 void Request::close_request() {
97 if(fd_ > 0) {
98 close(fd_);
99 fd_ = -1;
103 void Request::stop_thread(STATE) {
104 MachineThread::stop_thread(state);
106 close_request();
109 void Request::after_fork_child(STATE) {
110 close_request();
113 char* Request::read_request(STATE) {
114 file::LockGuard guard(fd_, LOCK_EX);
116 if(guard.status() != file::eLockSucceeded) {
117 logger::error("console: unable to lock request file, %s", strerror(errno));
118 return NULL;
121 char* buf[1024];
123 lseek(fd_, 0, SEEK_SET);
124 ssize_t bytes = ::read(fd_, buf, 1024);
126 char* req = NULL;
127 if(bytes > 0) {
128 req = new char[bytes+1];
129 memcpy(req, buf, bytes);
130 req[bytes] = 0;
131 // TODO diagnostics, metrics
132 // vm()->metrics().console.requests_received++;
133 } else if(bytes < 0) {
134 logger::error("console: unable to read request, %s", strerror(errno));
137 if(lseek(fd_, 0, SEEK_SET) < 0) {
138 logger::error("console: unable to rewind request file, %s", strerror(errno));
140 if(ftruncate(fd_, 0) < 0) {
141 logger::error("console: unable to truncate request file, %s", strerror(errno));
144 return req;
147 void Request::run(STATE) {
148 if(!enabled_) return;
150 while(!thread_exit_) {
151 Object* status = fsevent_->wait_for_event(state);
153 if(thread_exit_) break;
155 if(status->nil_p()) {
156 logger::error("console: request: wait for event failed, %s", strerror(errno));
157 continue;
160 char* request = read_request(state);
162 if(request) {
163 response_->send_request(state, request);
168 void Request::trace_objects(STATE, std::function<void (STATE, Object**)> f) {
169 f(state, reinterpret_cast<Object**>(&fsevent_));
172 Response::Response(STATE, Console* console)
173 : MachineThread(state, "rbx.console.response", MachineThread::eSmall)
174 , console_(console)
175 , inbox_(nullptr)
176 , outbox_(nullptr)
177 , fd_(-1)
178 , request_list_(nullptr)
179 , list_lock_()
180 , response_lock_()
181 , response_cond_()
183 inbox_ = as<Channel>(
184 console_->ruby_console()->get_ivar(state, state->symbol("@inbox")));
185 outbox_ = as<Channel>(
186 console_->ruby_console()->get_ivar(state, state->symbol("@outbox")));
189 Response::~Response() {
190 delete request_list_;
191 request_list_ = NULL;
194 void Response::initialize(STATE) {
195 Thread::create(state, thread_state());
198 void Response::start_thread(STATE) {
199 if((fd_ = open_file(state, console_->response_path())) < 0) {
200 logger::error("console response: unable to open file, %s", strerror(errno));
201 return;
204 new(&list_lock_) locks::spinlock_mutex;
205 new(&response_lock_) std::mutex;
206 new(&response_cond_) std::condition_variable;
208 request_list_ = new RequestList;
210 MachineThread::start_thread(state);
213 void Response::wakeup(STATE) {
214 MachineThread::wakeup(state);
216 inbox_->send(state, String::create(state, ""));
218 response_cond_.notify_one();
221 void Response::close_response() {
222 if(fd_ > 0) {
223 close(fd_);
224 fd_ = -1;
228 void Response::clear_requests() {
229 if(request_list_) {
230 for(RequestList::const_iterator i = request_list_->begin();
231 i != request_list_->end();
232 ++i)
234 delete[] *i;
237 request_list_ = NULL;
241 void Response::stop_thread(STATE) {
242 MachineThread::stop_thread(state);
244 close_response();
247 void Response::after_fork_child(STATE) {
248 close_response();
251 void Response::send_request(STATE, const char* request) {
252 std::lock_guard<locks::spinlock_mutex> guard(list_lock_);
254 request_list_->push_back(const_cast<char*>(request));
255 response_cond_.notify_one();
258 void Response::write_response(STATE, const char* response, intptr_t size) {
259 file::LockGuard guard(fd_, LOCK_EX);
261 if(guard.status() != file::eLockSucceeded) {
262 logger::error("console: unable to lock response file, %s", strerror(errno));
263 return;
266 if(lseek(fd_, 0, SEEK_SET) < 0) {
267 logger::error("console: unable to rewind response file, %s", strerror(errno));
268 return;
270 if(ftruncate(fd_, 0) < 0) {
271 logger::error("console: unable to truncate response file, %s", strerror(errno));
272 return;
275 if(::write(fd_, response, size) < 0) {
276 logger::error("console: unable to write response, %s", strerror(errno));
279 // TODO diagnostics, metrics
280 // vm()->metrics().console.responses_sent++;
283 void Response::run(STATE) {
284 size_t pending_requests = 0;
285 char* request = NULL;
287 String* response = 0;
288 OnStack<3> os(state, inbox_, outbox_, response);
290 while(!thread_exit_) {
292 std::lock_guard<locks::spinlock_mutex> guard(list_lock_);
294 if(request_list_->size() > 0) {
295 request = request_list_->back();
296 request_list_->pop_back();
300 if(thread_exit_) break;
302 if(request) {
303 ManagedPhase managed(state);
305 pending_requests++;
307 inbox_->send(state, String::create(state, request));
309 request = NULL;
312 if(pending_requests > 0) {
313 if((response = try_as<String>(outbox_->try_receive(state)))) {
314 write_response(state,
315 reinterpret_cast<const char*>(response->byte_address()),
316 response->byte_size());
317 pending_requests--;
318 continue;
323 UnmanagedPhase unmanaged(state);
324 std::unique_lock<std::mutex> lock(response_lock_);
326 if(thread_exit_) break;
328 response_cond_.wait(lock);
333 void Response::trace_objects(STATE, std::function<void (STATE, Object**)> f) {
334 f(state, reinterpret_cast<Object**>(&inbox_));
335 f(state, reinterpret_cast<Object**>(&outbox_));
338 Listener::Listener(STATE, Console* console)
339 : MachineThread(state, "rbx.console.listener", MachineThread::eSmall)
340 , console_(console)
341 , fsevent_(nullptr)
342 , fd_(-1)
346 Listener::~Listener() {
347 close(fd_);
350 void Listener::initialize(STATE) {
351 fd_ = ::open(console_->console_path().c_str(),
352 O_CREAT | O_TRUNC | O_RDWR | O_CLOEXEC,
353 state->configuration()->console_access.value);
355 if(fd_ < 0) {
356 logger::error("console: unable to open Console connection file, %s", strerror(errno));
359 // The umask setting will override our permissions for open().
360 if(chmod(console_->console_path().c_str(),
361 state->configuration()->console_access.value) < 0)
363 logger::error("console: unable to set mode for Console connection file, %s",
364 strerror(errno));
367 FSEvent* fsevent_ = FSEvent::create(state);
368 fsevent_->watch_file(state, fd_, console_->console_path().c_str());
371 void Listener::start_thread(STATE) {
372 MachineThread::start_thread(state);
375 void Listener::wakeup(STATE) {
376 MachineThread::wakeup(state);
378 if(write(fd_, "\0", 1) < 0) {
379 logger::error("console: unable to wake listener thread, %s", strerror(errno));
383 bool Listener::connection_initiated() {
384 struct stat st;
386 bool req = stat(console_->request_path().c_str(), &st) == 0 && S_ISREG(st.st_mode);
387 bool res = stat(console_->response_path().c_str(), &st) == 0 && S_ISREG(st.st_mode);
389 return req && res;
392 void Listener::run(STATE) {
393 while(!thread_exit_) {
394 Object* status = fsevent_->wait_for_event(state);
396 if(thread_exit_) break;
398 if(status->nil_p()) {
399 logger::error("console: listener: wait for event failed, %s", strerror(errno));
400 continue;
403 if(console_->connected_p()) continue;
405 if(connection_initiated()) {
406 console_->accept(state);
411 void Listener::trace_objects(STATE, std::function<void (STATE, Object**)> f) {
412 f(state, reinterpret_cast<Object**>(&fsevent_));
416 using namespace console;
418 Console::Console(STATE)
419 : listener_(0)
420 , response_(0)
421 , request_(0)
422 , ruby_console_(nullptr)
424 console_path_ = state->configuration()->console_path.value;
426 std::ostringstream basename;
427 basename << state->configuration()->console_path.value << "-"
428 << state->environment()->pid();
430 request_path_ = basename.str() + "-request";
431 response_path_ = basename.str() + "-response";
433 listener_ = new Listener(state, this);
436 Console::~Console() {
437 if(listener_) delete listener_;
438 reset();
441 bool Console::connected_p() {
442 return request_ && request_->enabled_p();
445 void Console::start(STATE) {
446 listener_->start(state);
449 void Console::accept(STATE) {
450 ruby_console_ = server_class(state)->send(state, 0, state->symbol("new"));
452 response_ = new Response(state, this);
453 request_ = new Request(state, this, response_);
456 void Console::reset() {
457 if(request_) {
458 delete request_;
459 request_ = 0;
462 if(response_) {
463 delete response_;
464 response_ = 0;
467 ruby_console_ = cNil;
470 void Console::after_fork_child(STATE) {
471 reset();
474 Class* Console::server_class(STATE) {
475 Module* mod = as<Module>(G(rubinius)->get_const(state, "Console"));
476 return as<Class>(mod->get_const(state, "Server"));
479 void Console::trace_objects(STATE, std::function<void (STATE, Object**)> f) {
480 f(state, reinterpret_cast<Object**>(&ruby_console_));
482 if(listener_) {
483 listener_->trace_objects(state, f);
486 if(request_) {
487 request_->trace_objects(state, f);
490 if(response_) {
491 response_->trace_objects(state, f);