1 # -*- coding: utf-8 -*-
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
32 import shinken
.pyro_wrapper
as pyro
33 from shinken
.pyro_wrapper
import InvalidWorkDir
, Pyro
35 from shinken
.log
import logger
36 from shinken
.modulesmanager
import ModulesManager
37 from shinken
.property import StringProp
, BoolProp
, PathProp
41 from pwd
import getpwnam
42 from grp
import getgrnam
45 ########################## DAEMON PART ###############################
46 # The standard I/O file descriptors are redirected to /dev/null by default.
47 REDIRECT_TO
= getattr(os
, "devnull", "/dev/null")
53 class InvalidPidDir(Exception): pass
57 class Interface(Pyro
.core
.ObjBase
, object):
58 """ Interface for pyro communications """
60 def __init__(self
, app
):
61 """ 'appĀ“ is to be set to the owner of this interface. """
63 Pyro
.core
.ObjBase
.__init
__(self
)
66 self
.running_id
= "%d.%d" % (time
.time(), random
.random())
71 def get_running_id(self
):
72 return self
.running_id
74 def put_conf(self
, conf
):
75 self
.app
.new_conf
= conf
77 def wait_new_conf(self
):
78 self
.app
.cur_conf
= None
81 return self
.app
.cur_conf
is not None
89 'workdir': PathProp(default
='/usr/local/shinken/var'),
90 'host': StringProp(default
='0.0.0.0'),
91 'user': StringProp(default
='shinken'),
92 'group': StringProp(default
='shinken'),
93 'use_ssl': BoolProp(default
='0'),
94 'certs_dir': StringProp(default
='etc/certs'),
95 'ca_cert': StringProp(default
='etc/certs/ca.pem'),
96 'server_cert': StringProp(default
='etc/certs/server.pem'),
97 'use_local_log': BoolProp(default
='0'),
98 'hard_ssl_name_check': BoolProp(default
='0'),
99 'idontcareaboutsecurity': BoolProp(default
='0'),
100 'spare': BoolProp(default
='0')
103 def __init__(self
, name
, config_file
, is_daemon
, do_replace
, debug
, debug_file
):
108 self
.config_file
= config_file
109 self
.is_daemon
= is_daemon
110 self
.do_replace
= do_replace
112 self
.debug_file
= debug_file
113 self
.interrupted
= False
116 self
.program_start
= now
117 self
.t_each_loop
= now
# used to track system time change
119 self
.pyro_daemon
= None
123 self
.log
.load_obj(self
)
125 self
.new_conf
= None # used by controller to push conf
128 self
.modules_manager
= ModulesManager(name
, self
.find_modules_path(), [])
131 self
.set_exit_handler()
133 # At least, lose the local log file if need
135 if self
.modules_manager
:
136 # We save what we can but NOT for the scheduler
137 # because the current sched object is a dummy one
138 # and the old one aleady do it!
139 if not hasattr(self
, 'sched'):
140 self
.hook_point('save_retention')
142 logger
.log('Stopping all modules')
143 self
.modules_manager
.stop_all()
145 self
.pyro_daemon
.shutdown(True)
149 def request_stop(self
):
150 self
.unlink() ## unlink first
155 def do_loop_turn(self
):
156 raise NotImplementedError()
158 def do_mainloop(self
):
165 def do_load_modules(self
, start_external
=True):
166 self
.modules_manager
.load_and_init(start_external
)
167 self
.log
.log("I correctly loaded the modules : %s " % ([ inst
.get_name() for inst
in self
.modules_manager
.instances
]))
171 """ Dummy method for adding broker to this daemon """
175 def load_config_file(self
):
176 self
.parse_config_file()
177 if self
.config_file
is not None:
178 # Some paths can be relatives. We must have a full path by taking
179 # the config file by reference
180 self
.relative_paths_to_full(os
.path
.dirname(self
.config_file
))
181 # Then start to log all in the local file if asked so
182 self
.register_local_log()
185 def change_to_workdir(self
):
187 os
.chdir(self
.workdir
)
189 raise InvalidWorkDir(e
)
190 print("Successfully changed to workdir: %s" % (self
.workdir
))
194 print "Unlinking", self
.pidfile
196 os
.unlink(self
.pidfile
)
198 print("Got an error unlinking our pidfile: %s" % (e
))
200 # Look if we need a local log or not
201 def register_local_log(self
):
202 # The arbiter don't have such an attribute
203 if hasattr(self
, 'use_local_log') and self
.use_local_log
:
205 self
.log
.register_local_log(self
.local_log
)
207 print "Error : opening the log file '%s' failed with '%s'" % (self
.local_log
, exp
)
209 logger
.log("Using the local log file '%s'" % self
.local_log
)
213 """ Only on linux: Check for /dev/shm write access """
215 shm_path
= '/dev/shm'
216 if os
.name
== 'posix' and os
.path
.exists(shm_path
):
217 # We get the access rights, and we check them
218 mode
= stat
.S_IMODE(os
.lstat(shm_path
)[stat
.ST_MODE
])
219 if not mode
& stat
.S_IWUSR
or not mode
& stat
.S_IRUSR
:
220 print("The directory %s is not writable or readable. Please launch as root chmod 777 %s" % (shm_path
, shm_path
))
223 def __open_pidfile(self
):
224 ## if problem on open or creating file it'll be raised to the caller:
226 self
.fpid
= open(self
.pidfile
, 'arw+')
228 raise InvalidPidDir(e
)
231 def check_parallel_run(self
):
232 """ Check (in pidfile) if there isn't already a daemon running. If yes and do_replace: kill it.
233 Keep in self.fpid the File object to the pidfile. Will be used by writepid.
235 self
.__open
_pidfile
()
237 pid
= int(self
.fpid
.read())
239 print "stale pidfile exists (no or invalid or unreadable content). reusing it."
244 except OverflowError, e
:
245 ## pid is too long for "kill" : so bad content:
246 print("stale pidfile exists: pid=%d is too long" % (pid
))
249 if e
.errno
== errno
.ESRCH
:
250 print("stale pidfile exists (pid=%d not exists). reusing it." % (pid
))
254 if not self
.do_replace
:
255 raise SystemExit, "valid pidfile exists and not forced to replace. Exiting."
257 print "Replacing previous instance ", pid
261 if e
.errno
!= errno
.ESRCH
:
265 ## TODO: give some time to wait that previous instance finishes ?
267 ## we must also reopen the pid file cause the previous instance will normally have deleted it !!
268 self
.__open
_pidfile
()
271 def write_pid(self
, pid
=None):
276 self
.fpid
.write("%d" % (pid
))
278 del self
.fpid
## no longer needed
281 def close_fds(self
, skip_close_fds
):
282 """ Close all the process file descriptors. Skip the descriptors present in the skip_close_fds list """
283 #First we manage the file descriptor, because debug file can be
286 maxfd
= resource
.getrlimit(resource
.RLIMIT_NOFILE
)[1]
287 if (maxfd
== resource
.RLIM_INFINITY
):
290 # Iterate through and close all file descriptors.
291 for fd
in range(0, maxfd
):
292 if fd
in skip_close_fds
: continue
295 except OSError:# ERROR, fd wasn't open to begin with (ignored)
299 def daemonize(self
, skip_close_fds
=None):
300 """ Go in "daemon" mode: close unused fds, redirect stdout/err, chdir, umask, fork-setsid-fork-writepid """
302 if skip_close_fds
is None:
303 skip_close_fds
= tuple()
305 print("Redirecting stdout and stderr as necessary..")
307 fdtemp
= os
.open(self
.debug_file
, os
.O_CREAT | os
.O_WRONLY | os
.O_TRUNC
)
309 fdtemp
= os
.open(REDIRECT_TO
, os
.O_RDWR
)
311 ## We close all fd but what we need:
312 self
.close_fds(skip_close_fds
+ ( self
.fpid
.fileno() , fdtemp
))
314 os
.dup2(fdtemp
, 1) # standard output (1)
315 os
.dup2(fdtemp
, 2) # standard error (2)
317 # Now the fork/setsid/fork..
321 raise Exception, "%s [%d]" % (e
.strerror
, e
.errno
)
323 # In the father ; we check if our child exit correctly
324 # it has effectively to write the pid of our futur little child..
325 def do_exit(sig
, frame
):
326 print("timeout waiting child while it should have quickly returned ; something weird happened")
329 # wait the child process to check its return status:
330 signal
.signal(signal
.SIGALRM
, do_exit
)
331 signal
.alarm(3) # forking & writing a pid in a file should be rather quick..
332 # if it's not then somewthing wrong can already be on the way so let's wait max 3 secs here.
333 pid
, status
= os
.waitpid(pid
, 0)
335 print("something weird happened with/during second fork : status=", status
)
336 os
._exit
(status
!= 0)
338 # halfway to daemonize..
343 raise Exception, "%s [%d]" % (e
.strerror
, e
.errno
)
345 # we are the last step and the real daemon is actually correctly created at least.
346 # we have still the last responsibility to write the pid of the daemon itself.
352 self
.pid
= os
.getpid()
353 print("We are now fully daemonized :) pid=%d" % (self
.pid
))
356 def do_daemon_init_and_start(self
, ssl_conf
=None):
357 self
.check_parallel_run()
358 self
.setup_pyro_daemon(ssl_conf
)
359 self
.change_to_user_group()
360 self
.change_to_workdir() ## must be done AFTER pyro daemon init
362 daemon_socket_fds
= tuple( sock
.fileno() for sock
in self
.pyro_daemon
.get_sockets() )
363 self
.daemonize(skip_close_fds
=daemon_socket_fds
)
368 def setup_pyro_daemon(self
, ssl_conf
=None):
374 Pyro
.config
.PYROSSL_CERTDIR
= os
.path
.abspath(ssl_conf
.certs_dir
)
375 print "Using ssl certificate directory : %s" % Pyro
.config
.PYROSSL_CERTDIR
376 Pyro
.config
.PYROSSL_CA_CERT
= os
.path
.abspath(ssl_conf
.ca_cert
)
377 print "Using ssl ca cert file : %s" % Pyro
.config
.PYROSSL_CA_CERT
378 Pyro
.config
.PYROSSL_CERT
= os
.path
.abspath(ssl_conf
.server_cert
)
379 print"Using ssl server cert file : %s" % Pyro
.config
.PYROSSL_CERT
380 if self
.hard_ssl_name_check
:
381 Pyro
.config
.PYROSSL_POSTCONNCHECK
=1
383 Pyro
.config
.PYROSSL_POSTCONNCHECK
=0
386 Pyro
.config
.PYRO_STORAGE
= self
.workdir
387 Pyro
.config
.PYRO_COMPRESSION
= 1
388 Pyro
.config
.PYRO_MULTITHREADED
= 0
390 self
.pyro_daemon
= pyro
.ShinkenPyroDaemon(self
.host
, self
.port
, ssl_conf
.use_ssl
)
393 def get_socks_activity(self
, socks
, timeout
):
395 ins
, _
, _
= select
.select(socks
, [], [], timeout
)
396 except select
.error
, e
:
398 if errnum
== errno
.EINTR
:
404 def find_modules_path(self
):
405 """ Find the absolute path of the shinken module directory and returns it. """
408 # BEWARE: this way of finding path is good if we still
409 # DO NOT HAVE CHANGE PWD!!!
410 # Now get the module path. It's in fact the directory modules
411 # inside the shinken directory. So let's find it.
413 print "modulemanager file", shinken
.modulesmanager
.__file
__
414 modulespath
= os
.path
.abspath(shinken
.modulesmanager
.__file
__)
415 print "modulemanager absolute file", modulespath
416 # We got one of the files of
417 parent_path
= os
.path
.dirname(os
.path
.dirname(modulespath
))
418 modulespath
= os
.path
.join(parent_path
, 'shinken', 'modules')
419 print("Using modules path : %s" % (modulespath
))
423 #Just give the uid of a user by looking at it's name
424 def find_uid_from_name(self
):
426 return getpwnam(self
.user
)[2]
427 except KeyError , exp
:
428 print "Error: the user", self
.user
, "is unknown"
431 #Just give the gid of a group by looking at it's name
432 def find_gid_from_name(self
):
434 return getgrnam(self
.group
)[2]
435 except KeyError , exp
:
436 print "Error: the group", self
.group
, "is unknown"
440 def change_to_user_group(self
, insane
=None):
441 """ Change user of the running program. Just insult the admin if he wants root run (it can override).
442 If change failed we sys.exit(2) """
444 insane
= not self
.idontcareaboutsecurity
446 # TODO: change user on nt
448 print("Sorry, you can't change user on this system")
451 if (self
.user
== 'root' or self
.group
== 'root') and not insane
:
452 print "What's ??? You want the application run under the root account?"
453 print "I am not agree with it. If you really want it, put :"
454 print "idontcareaboutsecurity=yes"
455 print "in the config file"
459 uid
= self
.find_uid_from_name()
460 gid
= self
.find_gid_from_name()
461 if uid
is None or gid
is None:
465 # First group, then user :)
466 os
.setregid(gid
, gid
)
467 os
.setreuid(uid
, uid
)
469 print "Error : cannot change user/group to %s/%s (%s [%d])" % (self
.user
, self
.group
, e
.strerror
, e
.errno
)
474 def parse_config_file(self
):
475 """ Parse self.config_file and get all properties in it.
476 If some properties need a pythonization, we do it.
477 Also put default value in the properties if some are missing in the config_file """
478 properties
= self
.__class
__.properties
479 if self
.config_file
is not None:
480 config
= ConfigParser
.ConfigParser()
481 config
.read(self
.config_file
)
482 if config
._sections
== {}:
483 print "Bad or missing config file : %s " % self
.config_file
485 for (key
, value
) in config
.items('daemon'):
486 if key
in properties
:
487 value
= properties
[key
].pythonize(value
)
488 setattr(self
, key
, value
)
490 print "No config file specified, use defaults parameters"
491 #Now fill all defaults where missing parameters
492 for prop
, entry
in properties
.items():
493 if not hasattr(self
, prop
):
494 value
= entry
.pythonize(entry
.default
)
495 setattr(self
, prop
, value
)
496 print "Using default value :", prop
, value
499 #Some paths can be relatives. We must have a full path by taking
500 #the config file by reference
501 def relative_paths_to_full(self
, reference_path
):
502 #print "Create relative paths with", reference_path
503 properties
= self
.__class
__.properties
504 for prop
, entry
in properties
.items():
505 if isinstance(entry
, PathProp
):
506 path
= getattr(self
, prop
)
507 if not os
.path
.isabs(path
):
508 path
= os
.path
.join(reference_path
, path
)
509 setattr(self
, prop
, path
)
510 #print "Setting %s for %s" % (path, prop)
513 def manage_signal(self
, sig
, frame
):
514 print("I'm process %d and I received signal %s" % (os
.getpid(), str(sig
)))
515 self
.interrupted
= True
518 def set_exit_handler(self
):
519 func
= self
.manage_signal
523 win32api
.SetConsoleCtrlHandler(func
, True)
525 version
= ".".join(map(str, sys
.version_info
[:2]))
526 raise Exception("pywin32 not installed for Python " + version
)
528 for sig
in (signal
.SIGTERM
, signal
.SIGINT
):
529 signal
.signal(sig
, func
)
532 def get_header(self
):
533 return ["Shinken %s" % VERSION
,
534 "Copyright (c) 2009-2011 :",
535 "Gabes Jean (naparuba@gmail.com)",
536 "Gerhard Lausser, Gerhard.Lausser@consol.de",
537 "Gregory Starck, g.starck@gmail.com",
538 "Hartmut Goebel, h.goebel@goebel-consult.de",
541 def print_header(self
):
542 for line
in self
.get_header():
545 def handleRequests(self
, timeout
, suppl_socks
=[]):
546 """ Wait up to timeout to handle the pyro daemon requests.
547 If suppl_socks is given it also looks for activity on that list of fd.
549 If timeout: first arg is 0, second is [], third is possible system time change value
550 If not timeout (== some fd got activity):
551 - first arg is elapsed time since wait,
552 - second arg is sublist of suppl_socks that got activity.
553 - third arg is possible system time change value, or 0 if no change. """
555 socks
= self
.pyro_daemon
.get_sockets()
557 socks
.extend(suppl_socks
)
558 ins
= self
.get_socks_activity(socks
, timeout
)
559 tcdiff
= self
.check_for_system_time_change()
561 if len(ins
) == 0: # trivial case: no fd activity:
564 if sock
in ins
and sock
not in suppl_socks
:
565 self
.pyro_daemon
.handleRequests(sock
)
567 elapsed
= time
.time() - before
568 if elapsed
== 0: # we have done a few instructions in 0 second exactly !? quantum computer ?
569 elapsed
= 0.01 # but we absolutely need to return != 0 to indicate that we got activity
570 return elapsed
, ins
, tcdiff
573 def check_for_system_time_change(self
):
574 """ Check for a possible system time change and act correspondingly.
575 If such a change is detected then we returns the number of seconds that changed. 0 if no time change was detected.
576 Time change can be positive or negative:
577 positive when we have been sent in the futur and negative if we have been sent in the past. """
579 difference
= now
- self
.t_each_loop
581 # If we have more than 15 min time change, we need to compensate it
582 if abs(difference
) > 900:
583 self
.compensate_system_time_change(difference
)
587 self
.t_each_loop
= now
591 def compensate_system_time_change(self
, difference
):
592 """ Default action for system time change. Actually a log is done """
593 logger
.log('Warning: A system time change of %s has been detected. Compensating...' % difference
)
597 # Use to wait conf from arbiter.
598 # It send us conf in our pyro_daemon. It put the have_conf prop
599 # if he send us something
600 # (it can just do a ping)
601 def wait_for_initial_conf(self
, timeout
=1.0):
602 logger
.log("Waiting for initial configuration")
603 cur_timeout
= timeout
604 # Arbiter do not already set our have_conf param
605 while not self
.new_conf
and not self
.interrupted
:
606 elapsed
, _
, _
= self
.handleRequests(cur_timeout
)
608 cur_timeout
-= elapsed
611 cur_timeout
= timeout
612 sys
.stdout
.write(".")
616 # We call the function of modules that got the this
618 def hook_point(self
, hook_name
):
620 for inst
in self
.modules_manager
.instances
:
621 full_hook_name
= 'hook_' + hook_name
622 print inst
.get_name(), hasattr(inst
, full_hook_name
), hook_name
623 if hasattr(inst
, full_hook_name
):
624 f
= getattr(inst
, full_hook_name
)
626 print "Calling", full_hook_name
, "of", inst
.get_name()
628 except Exception, exp
:
629 logger
.log('The instance %s raise an exception %s. I kill it' % (inst
.get_name(), str(exp
)))
632 #Now remove mod that raise an exception
633 self
.modules_manager
.clear_instances(to_del
)
636 # Dummy function for daemons. Get all retention data
637 # So a module can save them
638 def get_retention_data(self
):
641 # Save, to get back all data
642 def restore_retention_data(self
, data
):