1 # -*- coding: utf-8 -*-
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
32 import shinken
.pyro_wrapper
as pyro
33 from shinken
.pyro_wrapper
import InvalidWorkDir
, Pyro
35 from shinken
.log
import logger
36 from shinken
.modulesmanager
import ModulesManager
37 from shinken
.property import StringProp
, BoolProp
, PathProp
41 from pwd
import getpwnam
42 from grp
import getgrnam
45 ########################## DAEMON PART ###############################
46 # The standard I/O file descriptors are redirected to /dev/null by default.
47 REDIRECT_TO
= getattr(os
, "devnull", "/dev/null")
53 class InvalidPidDir(Exception): pass
57 class Interface(Pyro
.core
.ObjBase
, object):
58 """ Interface for pyro communications """
60 def __init__(self
, app
):
61 """ 'appĀ“ is to be set to the owner of this interface. """
63 Pyro
.core
.ObjBase
.__init
__(self
)
66 self
.running_id
= "%d.%d" % (time
.time(), random
.random())
71 def get_running_id(self
):
72 return self
.running_id
74 def put_conf(self
, conf
):
75 self
.app
.new_conf
= conf
77 def wait_new_conf(self
):
78 self
.app
.cur_conf
= None
81 return self
.app
.cur_conf
is not None
89 'workdir': PathProp(default
='/usr/local/shinken/var'),
90 'host': StringProp(default
='0.0.0.0'),
91 'user': StringProp(default
='shinken'),
92 'group': StringProp(default
='shinken'),
93 'use_ssl': BoolProp(default
='0'),
94 'certs_dir': StringProp(default
='etc/certs'),
95 'ca_cert': StringProp(default
='etc/certs/ca.pem'),
96 'server_cert': StringProp(default
='etc/certs/server.pem'),
97 'use_local_log': BoolProp(default
='0'),
98 'hard_ssl_name_check': BoolProp(default
='0'),
99 'idontcareaboutsecurity': BoolProp(default
='0'),
100 'spare': BoolProp(default
='0')
103 def __init__(self
, name
, config_file
, is_daemon
, do_replace
, debug
, debug_file
):
108 self
.config_file
= config_file
109 self
.is_daemon
= is_daemon
110 self
.do_replace
= do_replace
112 self
.debug_file
= debug_file
113 self
.interrupted
= False
116 self
.program_start
= now
117 self
.t_each_loop
= now
# used to track system time change
119 self
.pyro_daemon
= None
123 self
.log
.load_obj(self
)
125 self
.new_conf
= None # used by controller to push conf
128 self
.modules_manager
= ModulesManager(name
, self
.find_modules_path(), [])
131 self
.set_exit_handler()
133 # At least, lose the local log file if need
135 if self
.modules_manager
:
136 logger
.log('Stopping all modules')
137 self
.modules_manager
.stop_all()
139 self
.pyro_daemon
.shutdown(True)
143 def request_stop(self
):
144 self
.unlink() ## unlink first
149 def do_loop_turn(self
):
150 raise NotImplementedError()
152 def do_mainloop(self
):
159 def do_load_modules(self
, start_external
=True):
160 self
.modules_manager
.load_and_init(start_external
)
161 self
.log
.log("I correctly loaded the modules : %s " % ([ inst
.get_name() for inst
in self
.modules_manager
.instances
]))
165 """ Dummy method for adding broker to this daemon """
169 def load_config_file(self
):
170 self
.parse_config_file()
171 if self
.config_file
is not None:
172 # Some paths can be relatives. We must have a full path by taking
173 # the config file by reference
174 self
.relative_paths_to_full(os
.path
.dirname(self
.config_file
))
175 # Then start to log all in the local file if asked so
176 self
.register_local_log()
179 def change_to_workdir(self
):
181 os
.chdir(self
.workdir
)
183 raise InvalidWorkDir(e
)
184 print("Successfully changed to workdir: %s" % (self
.workdir
))
188 print "Unlinking", self
.pidfile
190 os
.unlink(self
.pidfile
)
192 print("Got an error unlinking our pidfile: %s" % (e
))
194 # Look if we need a local log or not
195 def register_local_log(self
):
196 # The arbiter don't have such an attribute
197 if hasattr(self
, 'use_local_log') and self
.use_local_log
:
199 self
.log
.register_local_log(self
.local_log
)
201 print "Error : opening the log file '%s' failed with '%s'" % (self
.local_log
, exp
)
203 logger
.log("Using the local log file '%s'" % self
.local_log
)
207 """ Only on linux: Check for /dev/shm write access """
209 shm_path
= '/dev/shm'
210 if os
.name
== 'posix' and os
.path
.exists(shm_path
):
211 # We get the access rights, and we check them
212 mode
= stat
.S_IMODE(os
.lstat(shm_path
)[stat
.ST_MODE
])
213 if not mode
& stat
.S_IWUSR
or not mode
& stat
.S_IRUSR
:
214 print("The directory %s is not writable or readable. Please launch as root chmod 777 %s" % (shm_path
, shm_path
))
217 def __open_pidfile(self
):
218 ## if problem on open or creating file it'll be raised to the caller:
220 self
.fpid
= open(self
.pidfile
, 'arw+')
222 raise InvalidPidDir(e
)
225 def check_parallel_run(self
):
226 """ Check (in pidfile) if there isn't already a daemon running. If yes and do_replace: kill it.
227 Keep in self.fpid the File object to the pidfile. Will be used by writepid.
229 self
.__open
_pidfile
()
231 pid
= int(self
.fpid
.read())
233 print "stale pidfile exists (no or invalid or unreadable content). reusing it."
238 except OverflowError, e
:
239 ## pid is too long for "kill" : so bad content:
240 print("stale pidfile exists: pid=%d is too long" % (pid
))
243 if e
.errno
== errno
.ESRCH
:
244 print("stale pidfile exists (pid=%d not exists). reusing it." % (pid
))
248 if not self
.do_replace
:
249 raise SystemExit, "valid pidfile exists and not forced to replace. Exiting."
251 print "Replacing previous instance ", pid
255 if e
.errno
!= errno
.ESRCH
:
259 ## TODO: give some time to wait that previous instance finishes ?
261 ## we must also reopen the pid file cause the previous instance will normally have deleted it !!
262 self
.__open
_pidfile
()
265 def write_pid(self
, pid
=None):
270 self
.fpid
.write("%d" % (pid
))
272 del self
.fpid
## no longer needed
275 def close_fds(self
, skip_close_fds
):
276 """ Close all the process file descriptors. Skip the descriptors present in the skip_close_fds list """
277 #First we manage the file descriptor, because debug file can be
280 maxfd
= resource
.getrlimit(resource
.RLIMIT_NOFILE
)[1]
281 if (maxfd
== resource
.RLIM_INFINITY
):
284 # Iterate through and close all file descriptors.
285 for fd
in range(0, maxfd
):
286 if fd
in skip_close_fds
: continue
289 except OSError:# ERROR, fd wasn't open to begin with (ignored)
293 def daemonize(self
, skip_close_fds
=None):
294 """ Go in "daemon" mode: close unused fds, redirect stdout/err, chdir, umask, fork-setsid-fork-writepid """
296 if skip_close_fds
is None:
297 skip_close_fds
= tuple()
299 print("Redirecting stdout and stderr as necessary..")
301 fdtemp
= os
.open(self
.debug_file
, os
.O_CREAT | os
.O_WRONLY | os
.O_TRUNC
)
303 fdtemp
= os
.open(REDIRECT_TO
, os
.O_RDWR
)
305 ## We close all fd but what we need:
306 self
.close_fds(skip_close_fds
+ ( self
.fpid
.fileno() , fdtemp
))
308 os
.dup2(fdtemp
, 1) # standard output (1)
309 os
.dup2(fdtemp
, 2) # standard error (2)
311 # Now the fork/setsid/fork..
315 raise Exception, "%s [%d]" % (e
.strerror
, e
.errno
)
317 # In the father ; we check if our child exit correctly
318 # it has effectively to write the pid of our futur little child..
319 def do_exit(sig
, frame
):
320 print("timeout waiting child while it should have quickly returned ; something weird happened")
323 # wait the child process to check its return status:
324 signal
.signal(signal
.SIGALRM
, do_exit
)
325 signal
.alarm(3) # forking & writing a pid in a file should be rather quick..
326 # if it's not then somewthing wrong can already be on the way so let's wait max 3 secs here.
327 pid
, status
= os
.waitpid(pid
, 0)
329 print("something weird happened with/during second fork : status=", status
)
330 os
._exit
(status
!= 0)
332 # halfway to daemonize..
337 raise Exception, "%s [%d]" % (e
.strerror
, e
.errno
)
339 # we are the last step and the real daemon is actually correctly created at least.
340 # we have still the last responsibility to write the pid of the daemon itself.
346 self
.pid
= os
.getpid()
347 print("We are now fully daemonized :) pid=%d" % (self
.pid
))
350 def do_daemon_init_and_start(self
, ssl_conf
=None):
351 self
.check_parallel_run()
352 self
.setup_pyro_daemon(ssl_conf
)
353 self
.change_to_user_group()
354 self
.change_to_workdir() ## must be done AFTER pyro daemon init
356 daemon_socket_fds
= tuple( sock
.fileno() for sock
in self
.pyro_daemon
.get_sockets() )
357 self
.daemonize(skip_close_fds
=daemon_socket_fds
)
362 def setup_pyro_daemon(self
, ssl_conf
=None):
368 Pyro
.config
.PYROSSL_CERTDIR
= os
.path
.abspath(ssl_conf
.certs_dir
)
369 print "Using ssl certificate directory : %s" % Pyro
.config
.PYROSSL_CERTDIR
370 Pyro
.config
.PYROSSL_CA_CERT
= os
.path
.abspath(ssl_conf
.ca_cert
)
371 print "Using ssl ca cert file : %s" % Pyro
.config
.PYROSSL_CA_CERT
372 Pyro
.config
.PYROSSL_CERT
= os
.path
.abspath(ssl_conf
.server_cert
)
373 print"Using ssl server cert file : %s" % Pyro
.config
.PYROSSL_CERT
374 if self
.hard_ssl_name_check
:
375 Pyro
.config
.PYROSSL_POSTCONNCHECK
=1
377 Pyro
.config
.PYROSSL_POSTCONNCHECK
=0
380 Pyro
.config
.PYRO_STORAGE
= self
.workdir
381 Pyro
.config
.PYRO_COMPRESSION
= 1
382 Pyro
.config
.PYRO_MULTITHREADED
= 0
384 self
.pyro_daemon
= pyro
.ShinkenPyroDaemon(self
.host
, self
.port
, ssl_conf
.use_ssl
)
387 def get_socks_activity(self
, socks
, timeout
):
389 ins
, _
, _
= select
.select(socks
, [], [], timeout
)
390 except select
.error
, e
:
392 if errnum
== errno
.EINTR
:
398 def find_modules_path(self
):
399 """ Find the absolute path of the shinken module directory and returns it. """
402 # BEWARE: this way of finding path is good if we still
403 # DO NOT HAVE CHANGE PWD!!!
404 # Now get the module path. It's in fact the directory modules
405 # inside the shinken directory. So let's find it.
407 print "modulemanager file", shinken
.modulesmanager
.__file
__
408 modulespath
= os
.path
.abspath(shinken
.modulesmanager
.__file
__)
409 print "modulemanager absolute file", modulespath
410 # We got one of the files of
411 parent_path
= os
.path
.dirname(os
.path
.dirname(modulespath
))
412 modulespath
= os
.path
.join(parent_path
, 'shinken', 'modules')
413 print("Using modules path : %s" % (modulespath
))
417 #Just give the uid of a user by looking at it's name
418 def find_uid_from_name(self
):
420 return getpwnam(self
.user
)[2]
421 except KeyError , exp
:
422 print "Error: the user", self
.user
, "is unknown"
425 #Just give the gid of a group by looking at it's name
426 def find_gid_from_name(self
):
428 return getgrnam(self
.group
)[2]
429 except KeyError , exp
:
430 print "Error: the group", self
.group
, "is unknown"
434 def change_to_user_group(self
, insane
=None):
435 """ Change user of the running program. Just insult the admin if he wants root run (it can override).
436 If change failed we sys.exit(2) """
438 insane
= not self
.idontcareaboutsecurity
440 # TODO: change user on nt
442 print("Sorry, you can't change user on this system")
445 if (self
.user
== 'root' or self
.group
== 'root') and not insane
:
446 print "What's ??? You want the application run under the root account?"
447 print "I am not agree with it. If you really want it, put :"
448 print "idontcareaboutsecurity=yes"
449 print "in the config file"
453 uid
= self
.find_uid_from_name()
454 gid
= self
.find_gid_from_name()
455 if uid
is None or gid
is None:
459 # First group, then user :)
460 os
.setregid(gid
, gid
)
461 os
.setreuid(uid
, uid
)
463 print "Error : cannot change user/group to %s/%s (%s [%d])" % (self
.user
, self
.group
, e
.strerror
, e
.errno
)
468 def parse_config_file(self
):
469 """ Parse self.config_file and get all properties in it.
470 If some properties need a pythonization, we do it.
471 Also put default value in the properties if some are missing in the config_file """
472 properties
= self
.__class
__.properties
473 if self
.config_file
is not None:
474 config
= ConfigParser
.ConfigParser()
475 config
.read(self
.config_file
)
476 if config
._sections
== {}:
477 print "Bad or missing config file : %s " % self
.config_file
479 for (key
, value
) in config
.items('daemon'):
480 if key
in properties
:
481 value
= properties
[key
].pythonize(value
)
482 setattr(self
, key
, value
)
484 print "No config file specified, use defaults parameters"
485 #Now fill all defaults where missing parameters
486 for prop
, entry
in properties
.items():
487 if not hasattr(self
, prop
):
488 value
= entry
.pythonize(entry
.default
)
489 setattr(self
, prop
, value
)
490 print "Using default value :", prop
, value
493 #Some paths can be relatives. We must have a full path by taking
494 #the config file by reference
495 def relative_paths_to_full(self
, reference_path
):
496 #print "Create relative paths with", reference_path
497 properties
= self
.__class
__.properties
498 for prop
, entry
in properties
.items():
499 if isinstance(entry
, PathProp
):
500 path
= getattr(self
, prop
)
501 if not os
.path
.isabs(path
):
502 path
= os
.path
.join(reference_path
, path
)
503 setattr(self
, prop
, path
)
504 #print "Setting %s for %s" % (path, prop)
507 def manage_signal(self
, sig
, frame
):
508 print("I'm process %d and I received signal %s" % (os
.getpid(), str(sig
)))
509 self
.interrupted
= True
512 def set_exit_handler(self
):
513 func
= self
.manage_signal
517 win32api
.SetConsoleCtrlHandler(func
, True)
519 version
= ".".join(map(str, sys
.version_info
[:2]))
520 raise Exception("pywin32 not installed for Python " + version
)
522 for sig
in (signal
.SIGTERM
, signal
.SIGINT
):
523 signal
.signal(sig
, func
)
526 def get_header(self
):
527 return ["Shinken %s" % VERSION
,
528 "Copyright (c) 2009-2011 :",
529 "Gabes Jean (naparuba@gmail.com)",
530 "Gerhard Lausser, Gerhard.Lausser@consol.de",
531 "Gregory Starck, g.starck@gmail.com",
532 "Hartmut Goebel, h.goebel@goebel-consult.de",
535 def print_header(self
):
536 for line
in self
.get_header():
539 def handleRequests(self
, timeout
, suppl_socks
=None):
540 """ Wait up to timeout to handle the pyro daemon requests.
541 If suppl_socks is given it also looks for activity on that list of fd.
543 If timeout: first arg is 0, second is [], third is possible system time change value
544 If not timeout (== some fd got activity):
545 - first arg is elapsed time since wait,
546 - second arg is sublist of suppl_socks that got activity.
547 - third arg is possible system time change value, or 0 if no change. """
549 socks
= self
.pyro_daemon
.get_sockets()
551 socks
.extend(suppl_socks
)
552 ins
= self
.get_socks_activity(socks
, timeout
)
553 tcdiff
= self
.check_for_system_time_change()
555 if len(ins
) == 0: # trivial case: no fd activity:
559 self
.pyro_daemon
.handleRequests(sock
)
561 elapsed
= time
.time() - before
562 if elapsed
== 0: # we have done a few instructions in 0 second exactly !? quantum computer ?
563 elapsed
= 0.01 # but we absolutely need to return != 0 to indicate that we got activity
564 return elapsed
, ins
, tcdiff
567 def check_for_system_time_change(self
):
568 """ Check for a possible system time change and act correspondingly.
569 If such a change is detected then we returns the number of seconds that changed. 0 if no time change was detected.
570 Time change can be positive or negative:
571 positive when we have been sent in the futur and negative if we have been sent in the past. """
573 difference
= now
- self
.t_each_loop
575 # If we have more than 15 min time change, we need to compensate it
576 if abs(difference
) > 900:
577 self
.compensate_system_time_change(difference
)
581 self
.t_each_loop
= now
585 def compensate_system_time_change(self
, difference
):
586 """ Default action for system time change. Actually a log is done """
587 logger
.log('Warning: A system time change of %s has been detected. Compensating...' % difference
)
591 # Use to wait conf from arbiter.
592 # It send us conf in our pyro_daemon. It put the have_conf prop
593 # if he send us something
594 # (it can just do a ping)
595 def wait_for_initial_conf(self
, timeout
=1.0):
596 logger
.log("Waiting for initial configuration")
597 cur_timeout
= timeout
598 # Arbiter do not already set our have_conf param
599 while not self
.new_conf
and not self
.interrupted
:
600 elapsed
, _
, _
= self
.handleRequests(cur_timeout
)
602 cur_timeout
-= elapsed
605 cur_timeout
= timeout
606 sys
.stdout
.write(".")