Add : set 'None' as the default poller_tag value, so a poller can get untaggued AND...
[shinken.git] / shinken / daemon.py
blobcf1c5a08c7350632b8a60c32ed4565d82b93a3f7
1 # -*- coding: utf-8 -*-
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 import os
24 import errno
25 import sys
26 import time
27 import signal
28 import select
29 import random
30 import ConfigParser
32 import shinken.pyro_wrapper as pyro
33 from shinken.pyro_wrapper import InvalidWorkDir, Pyro
35 from shinken.log import logger
36 from shinken.modulesmanager import ModulesManager
37 from shinken.property import StringProp, BoolProp, PathProp
40 if os.name != 'nt':
41 from pwd import getpwnam
42 from grp import getgrnam
45 ########################## DAEMON PART ###############################
46 # The standard I/O file descriptors are redirected to /dev/null by default.
47 REDIRECT_TO = getattr(os, "devnull", "/dev/null")
49 UMASK = 0
50 VERSION = "0.5"
53 class InvalidPidDir(Exception): pass
57 class Interface(Pyro.core.ObjBase, object):
58 """ Interface for pyro communications """
60 def __init__(self, app):
61 """ 'appĀ“ is to be set to the owner of this interface. """
63 Pyro.core.ObjBase.__init__(self)
65 self.app = app
66 self.running_id = "%d.%d" % (time.time(), random.random())
68 def ping(self):
69 return "pong"
71 def get_running_id(self):
72 return self.running_id
74 def put_conf(self, conf):
75 self.app.new_conf = conf
77 def wait_new_conf(self):
78 self.app.cur_conf = None
80 def have_conf(self):
81 return self.app.cur_conf is not None
86 class Daemon(object):
88 properties = {
89 'workdir': PathProp(default='/usr/local/shinken/var'),
90 'host': StringProp(default='0.0.0.0'),
91 'user': StringProp(default='shinken'),
92 'group': StringProp(default='shinken'),
93 'use_ssl': BoolProp(default='0'),
94 'certs_dir': StringProp(default='etc/certs'),
95 'ca_cert': StringProp(default='etc/certs/ca.pem'),
96 'server_cert': StringProp(default='etc/certs/server.pem'),
97 'use_local_log': BoolProp(default='0'),
98 'hard_ssl_name_check': BoolProp(default='0'),
99 'idontcareaboutsecurity': BoolProp(default='0'),
100 'spare': BoolProp(default='0')
103 def __init__(self, name, config_file, is_daemon, do_replace, debug, debug_file):
105 self.check_shm()
107 self.name = name
108 self.config_file = config_file
109 self.is_daemon = is_daemon
110 self.do_replace = do_replace
111 self.debug = debug
112 self.debug_file = debug_file
113 self.interrupted = False
115 now = time.time()
116 self.program_start = now
117 self.t_each_loop = now # used to track system time change
119 self.pyro_daemon = None
121 # Log init
122 self.log = logger
123 self.log.load_obj(self)
125 self.new_conf = None # used by controller to push conf
126 self.cur_conf = None
128 self.modules_manager = ModulesManager(name, self.find_modules_path(), [])
130 os.umask(UMASK)
131 self.set_exit_handler()
133 # At least, lose the local log file if need
134 def do_stop(self):
135 if self.modules_manager:
136 logger.log('Stopping all modules')
137 self.modules_manager.stop_all()
138 if self.pyro_daemon:
139 self.pyro_daemon.shutdown(True)
140 logger.quit()
143 def request_stop(self):
144 self.unlink() ## unlink first
145 self.do_stop()
146 print("Exiting")
147 sys.exit(0)
149 def do_loop_turn(self):
150 raise NotImplementedError()
152 def do_mainloop(self):
153 while True:
154 self.do_loop_turn()
155 if self.interrupted:
156 break
157 self.request_stop()
159 def do_load_modules(self, start_external=True):
160 self.modules_manager.load_and_init(start_external)
161 self.log.log("I correctly loaded the modules : %s " % ([ inst.get_name() for inst in self.modules_manager.instances ]))
164 def add(self, elt):
165 """ Dummy method for adding broker to this daemon """
166 pass
169 def load_config_file(self):
170 self.parse_config_file()
171 if self.config_file is not None:
172 # Some paths can be relatives. We must have a full path by taking
173 # the config file by reference
174 self.relative_paths_to_full(os.path.dirname(self.config_file))
175 # Then start to log all in the local file if asked so
176 self.register_local_log()
179 def change_to_workdir(self):
180 try:
181 os.chdir(self.workdir)
182 except Exception, e:
183 raise InvalidWorkDir(e)
184 print("Successfully changed to workdir: %s" % (self.workdir))
187 def unlink(self):
188 print "Unlinking", self.pidfile
189 try:
190 os.unlink(self.pidfile)
191 except Exception, e:
192 print("Got an error unlinking our pidfile: %s" % (e))
194 # Look if we need a local log or not
195 def register_local_log(self):
196 # The arbiter don't have such an attribute
197 if hasattr(self, 'use_local_log') and self.use_local_log:
198 try:
199 self.log.register_local_log(self.local_log)
200 except IOError, exp:
201 print "Error : opening the log file '%s' failed with '%s'" % (self.local_log, exp)
202 sys.exit(2)
203 logger.log("Using the local log file '%s'" % self.local_log)
206 def check_shm(self):
207 """ Only on linux: Check for /dev/shm write access """
208 import stat
209 shm_path = '/dev/shm'
210 if os.name == 'posix' and os.path.exists(shm_path):
211 # We get the access rights, and we check them
212 mode = stat.S_IMODE(os.lstat(shm_path)[stat.ST_MODE])
213 if not mode & stat.S_IWUSR or not mode & stat.S_IRUSR:
214 print("The directory %s is not writable or readable. Please launch as root chmod 777 %s" % (shm_path, shm_path))
215 sys.exit(2)
217 def __open_pidfile(self):
218 ## if problem on open or creating file it'll be raised to the caller:
219 try:
220 self.fpid = open(self.pidfile, 'arw+')
221 except Exception, e:
222 raise InvalidPidDir(e)
225 def check_parallel_run(self):
226 """ Check (in pidfile) if there isn't already a daemon running. If yes and do_replace: kill it.
227 Keep in self.fpid the File object to the pidfile. Will be used by writepid.
229 self.__open_pidfile()
230 try:
231 pid = int(self.fpid.read())
232 except:
233 print "stale pidfile exists (no or invalid or unreadable content). reusing it."
234 return
236 try:
237 os.kill(pid, 0)
238 except OverflowError, e:
239 ## pid is too long for "kill" : so bad content:
240 print("stale pidfile exists: pid=%d is too long" % (pid))
241 return
242 except os.error, e:
243 if e.errno == errno.ESRCH:
244 print("stale pidfile exists (pid=%d not exists). reusing it." % (pid))
245 return
246 raise
248 if not self.do_replace:
249 raise SystemExit, "valid pidfile exists and not forced to replace. Exiting."
251 print "Replacing previous instance ", pid
252 try:
253 os.kill(pid, 3)
254 except os.error, e:
255 if e.errno != errno.ESRCH:
256 raise
258 self.fpid.close()
259 ## TODO: give some time to wait that previous instance finishes ?
260 time.sleep(1)
261 ## we must also reopen the pid file cause the previous instance will normally have deleted it !!
262 self.__open_pidfile()
265 def write_pid(self, pid=None):
266 if pid is None:
267 pid = os.getpid()
268 self.fpid.seek(0)
269 self.fpid.truncate()
270 self.fpid.write("%d" % (pid))
271 self.fpid.close()
272 del self.fpid ## no longer needed
275 def close_fds(self, skip_close_fds):
276 """ Close all the process file descriptors. Skip the descriptors present in the skip_close_fds list """
277 #First we manage the file descriptor, because debug file can be
278 #relative to pwd
279 import resource
280 maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
281 if (maxfd == resource.RLIM_INFINITY):
282 maxfd = 1024
284 # Iterate through and close all file descriptors.
285 for fd in range(0, maxfd):
286 if fd in skip_close_fds: continue
287 try:
288 os.close(fd)
289 except OSError:# ERROR, fd wasn't open to begin with (ignored)
290 pass
293 def daemonize(self, skip_close_fds=None):
294 """ Go in "daemon" mode: close unused fds, redirect stdout/err, chdir, umask, fork-setsid-fork-writepid """
296 if skip_close_fds is None:
297 skip_close_fds = tuple()
299 print("Redirecting stdout and stderr as necessary..")
300 if self.debug:
301 fdtemp = os.open(self.debug_file, os.O_CREAT | os.O_WRONLY | os.O_TRUNC)
302 else:
303 fdtemp = os.open(REDIRECT_TO, os.O_RDWR)
305 ## We close all fd but what we need:
306 self.close_fds(skip_close_fds + ( self.fpid.fileno() , fdtemp ))
308 os.dup2(fdtemp, 1) # standard output (1)
309 os.dup2(fdtemp, 2) # standard error (2)
311 # Now the fork/setsid/fork..
312 try:
313 pid = os.fork()
314 except OSError, e:
315 raise Exception, "%s [%d]" % (e.strerror, e.errno)
316 if pid != 0:
317 # In the father ; we check if our child exit correctly
318 # it has effectively to write the pid of our futur little child..
319 def do_exit(sig, frame):
320 print("timeout waiting child while it should have quickly returned ; something weird happened")
321 os.kill(pid, 9)
322 sys.exit(1)
323 # wait the child process to check its return status:
324 signal.signal(signal.SIGALRM, do_exit)
325 signal.alarm(3) # forking & writing a pid in a file should be rather quick..
326 # if it's not then somewthing wrong can already be on the way so let's wait max 3 secs here.
327 pid, status = os.waitpid(pid, 0)
328 if status != 0:
329 print("something weird happened with/during second fork : status=", status)
330 os._exit(status != 0)
332 # halfway to daemonize..
333 os.setsid()
334 try:
335 pid = os.fork()
336 except OSError, e:
337 raise Exception, "%s [%d]" % (e.strerror, e.errno)
338 if pid != 0:
339 # we are the last step and the real daemon is actually correctly created at least.
340 # we have still the last responsibility to write the pid of the daemon itself.
341 self.write_pid(pid)
342 os._exit(0)
344 self.fpid.close()
345 del self.fpid
346 self.pid = os.getpid()
347 print("We are now fully daemonized :) pid=%d" % (self.pid))
350 def do_daemon_init_and_start(self, ssl_conf=None):
351 self.check_parallel_run()
352 self.setup_pyro_daemon(ssl_conf)
353 self.change_to_user_group()
354 self.change_to_workdir() ## must be done AFTER pyro daemon init
355 if self.is_daemon:
356 daemon_socket_fds = tuple( sock.fileno() for sock in self.pyro_daemon.get_sockets() )
357 self.daemonize(skip_close_fds=daemon_socket_fds)
358 else:
359 self.write_pid()
362 def setup_pyro_daemon(self, ssl_conf=None):
363 if ssl_conf is None:
364 ssl_conf = self
366 # The SSL part
367 if ssl_conf.use_ssl:
368 Pyro.config.PYROSSL_CERTDIR = os.path.abspath(ssl_conf.certs_dir)
369 print "Using ssl certificate directory : %s" % Pyro.config.PYROSSL_CERTDIR
370 Pyro.config.PYROSSL_CA_CERT = os.path.abspath(ssl_conf.ca_cert)
371 print "Using ssl ca cert file : %s" % Pyro.config.PYROSSL_CA_CERT
372 Pyro.config.PYROSSL_CERT = os.path.abspath(ssl_conf.server_cert)
373 print"Using ssl server cert file : %s" % Pyro.config.PYROSSL_CERT
374 if self.hard_ssl_name_check:
375 Pyro.config.PYROSSL_POSTCONNCHECK=1
376 else:
377 Pyro.config.PYROSSL_POSTCONNCHECK=0
379 # create the server
380 Pyro.config.PYRO_STORAGE = self.workdir
381 Pyro.config.PYRO_COMPRESSION = 1
382 Pyro.config.PYRO_MULTITHREADED = 0
384 self.pyro_daemon = pyro.ShinkenPyroDaemon(self.host, self.port, ssl_conf.use_ssl)
387 def get_socks_activity(self, socks, timeout):
388 try:
389 ins, _, _ = select.select(socks, [], [], timeout)
390 except select.error, e:
391 errnum, _ = e
392 if errnum == errno.EINTR:
393 return []
394 raise
395 return ins
398 def find_modules_path(self):
399 """ Find the absolute path of the shinken module directory and returns it. """
400 import shinken
402 # BEWARE: this way of finding path is good if we still
403 # DO NOT HAVE CHANGE PWD!!!
404 # Now get the module path. It's in fact the directory modules
405 # inside the shinken directory. So let's find it.
407 print "modulemanager file", shinken.modulesmanager.__file__
408 modulespath = os.path.abspath(shinken.modulesmanager.__file__)
409 print "modulemanager absolute file", modulespath
410 # We got one of the files of
411 parent_path = os.path.dirname(os.path.dirname(modulespath))
412 modulespath = os.path.join(parent_path, 'shinken', 'modules')
413 print("Using modules path : %s" % (modulespath))
415 return modulespath
417 #Just give the uid of a user by looking at it's name
418 def find_uid_from_name(self):
419 try:
420 return getpwnam(self.user)[2]
421 except KeyError , exp:
422 print "Error: the user", self.user, "is unknown"
423 return None
425 #Just give the gid of a group by looking at it's name
426 def find_gid_from_name(self):
427 try:
428 return getgrnam(self.group)[2]
429 except KeyError , exp:
430 print "Error: the group", self.group, "is unknown"
431 return None
434 def change_to_user_group(self, insane=None):
435 """ Change user of the running program. Just insult the admin if he wants root run (it can override).
436 If change failed we sys.exit(2) """
437 if insane is None:
438 insane = not self.idontcareaboutsecurity
440 # TODO: change user on nt
441 if os.name == 'nt':
442 print("Sorry, you can't change user on this system")
443 return
445 if (self.user == 'root' or self.group == 'root') and not insane:
446 print "What's ??? You want the application run under the root account?"
447 print "I am not agree with it. If you really want it, put :"
448 print "idontcareaboutsecurity=yes"
449 print "in the config file"
450 print "Exiting"
451 sys.exit(2)
453 uid = self.find_uid_from_name()
454 gid = self.find_gid_from_name()
455 if uid is None or gid is None:
456 print "Exiting"
457 sys.exit(2)
458 try:
459 # First group, then user :)
460 os.setregid(gid, gid)
461 os.setreuid(uid, uid)
462 except OSError, e:
463 print "Error : cannot change user/group to %s/%s (%s [%d])" % (self.user, self.group, e.strerror, e.errno)
464 print "Exiting"
465 sys.exit(2)
468 def parse_config_file(self):
469 """ Parse self.config_file and get all properties in it.
470 If some properties need a pythonization, we do it.
471 Also put default value in the properties if some are missing in the config_file """
472 properties = self.__class__.properties
473 if self.config_file is not None:
474 config = ConfigParser.ConfigParser()
475 config.read(self.config_file)
476 if config._sections == {}:
477 print "Bad or missing config file : %s " % self.config_file
478 sys.exit(2)
479 for (key, value) in config.items('daemon'):
480 if key in properties:
481 value = properties[key].pythonize(value)
482 setattr(self, key, value)
483 else:
484 print "No config file specified, use defaults parameters"
485 #Now fill all defaults where missing parameters
486 for prop, entry in properties.items():
487 if not hasattr(self, prop):
488 value = entry.pythonize(entry.default)
489 setattr(self, prop, value)
490 print "Using default value :", prop, value
493 #Some paths can be relatives. We must have a full path by taking
494 #the config file by reference
495 def relative_paths_to_full(self, reference_path):
496 #print "Create relative paths with", reference_path
497 properties = self.__class__.properties
498 for prop, entry in properties.items():
499 if isinstance(entry, PathProp):
500 path = getattr(self, prop)
501 if not os.path.isabs(path):
502 path = os.path.join(reference_path, path)
503 setattr(self, prop, path)
504 #print "Setting %s for %s" % (path, prop)
507 def manage_signal(self, sig, frame):
508 print("I'm process %d and I received signal %s" % (os.getpid(), str(sig)))
509 self.interrupted = True
512 def set_exit_handler(self):
513 func = self.manage_signal
514 if os.name == "nt":
515 try:
516 import win32api
517 win32api.SetConsoleCtrlHandler(func, True)
518 except ImportError:
519 version = ".".join(map(str, sys.version_info[:2]))
520 raise Exception("pywin32 not installed for Python " + version)
521 else:
522 for sig in (signal.SIGTERM, signal.SIGINT):
523 signal.signal(sig, func)
526 def get_header(self):
527 return ["Shinken %s" % VERSION,
528 "Copyright (c) 2009-2011 :",
529 "Gabes Jean (naparuba@gmail.com)",
530 "Gerhard Lausser, Gerhard.Lausser@consol.de",
531 "Gregory Starck, g.starck@gmail.com",
532 "Hartmut Goebel, h.goebel@goebel-consult.de",
533 "License: AGPL"]
535 def print_header(self):
536 for line in self.get_header():
537 print line
539 def handleRequests(self, timeout, suppl_socks=None):
540 """ Wait up to timeout to handle the pyro daemon requests.
541 If suppl_socks is given it also looks for activity on that list of fd.
542 Returns a 3-tuple:
543 If timeout: first arg is 0, second is [], third is possible system time change value
544 If not timeout (== some fd got activity):
545 - first arg is elapsed time since wait,
546 - second arg is sublist of suppl_socks that got activity.
547 - third arg is possible system time change value, or 0 if no change. """
548 before = time.time()
549 socks = self.pyro_daemon.get_sockets()
550 if suppl_socks:
551 socks.extend(suppl_socks)
552 ins = self.get_socks_activity(socks, timeout)
553 tcdiff = self.check_for_system_time_change()
554 before += tcdiff
555 if len(ins) == 0: # trivial case: no fd activity:
556 return 0, [], tcdiff
557 for sock in socks:
558 if sock in ins:
559 self.pyro_daemon.handleRequests(sock)
560 ins.remove(sock)
561 elapsed = time.time() - before
562 if elapsed == 0: # we have done a few instructions in 0 second exactly !? quantum computer ?
563 elapsed = 0.01 # but we absolutely need to return != 0 to indicate that we got activity
564 return elapsed, ins, tcdiff
567 def check_for_system_time_change(self):
568 """ Check for a possible system time change and act correspondingly.
569 If such a change is detected then we returns the number of seconds that changed. 0 if no time change was detected.
570 Time change can be positive or negative:
571 positive when we have been sent in the futur and negative if we have been sent in the past. """
572 now = time.time()
573 difference = now - self.t_each_loop
575 # If we have more than 15 min time change, we need to compensate it
576 if abs(difference) > 900:
577 self.compensate_system_time_change(difference)
578 else:
579 difference = 0
581 self.t_each_loop = now
583 return difference
585 def compensate_system_time_change(self, difference):
586 """ Default action for system time change. Actually a log is done """
587 logger.log('Warning: A system time change of %s has been detected. Compensating...' % difference)
591 # Use to wait conf from arbiter.
592 # It send us conf in our pyro_daemon. It put the have_conf prop
593 # if he send us something
594 # (it can just do a ping)
595 def wait_for_initial_conf(self, timeout=1.0):
596 logger.log("Waiting for initial configuration")
597 cur_timeout = timeout
598 # Arbiter do not already set our have_conf param
599 while not self.new_conf and not self.interrupted:
600 elapsed, _, _ = self.handleRequests(cur_timeout)
601 if elapsed:
602 cur_timeout -= elapsed
603 if cur_timeout > 0:
604 continue
605 cur_timeout = timeout
606 sys.stdout.write(".")
607 sys.stdout.flush()