2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
22 #This is the class of the Arbiter. It's role is to read configuration,
23 #cuts it, and send it to other elements like schedulers, reactionner
24 #or pollers. It is responsible for hight avaibility part. If a scheduler
26 #it send it's conf to another if available.
27 #It also read order form users (nagios.cmd) and send orders to schedulers.
36 from Queue
import Empty
39 # We try to raise up recusion limit on
40 # but we don't have resource module on windows
43 # All the pickle will ask for a lot of recursion, so we must make
44 # sure to set it at a high value. The maximum recursion depth depends
45 # on the Python version and the process limit "stack size".
46 # The factors used were aquired by testing a broad range of installations
47 stacksize_soft
, stacksize_hard
= resource
.getrlimit(3)
48 if sys
.version_info
< (2,6):
49 sys
.setrecursionlimit(int(stacksize_soft
* 0.65 + 1100))
50 elif sys
.version_info
< (3,):
51 sys
.setrecursionlimit(int(stacksize_soft
* 1.9 + 3200))
53 sys
.setrecursionlimit(int(stacksize_soft
* 2.4 + 3200))
56 ## Make sure people are using Python 2.5 or higher
57 if sys
.version_info
< (2,4):
58 print "Shinken requires as a minimum Python 2.4.x, sorry"
60 elif sys
.version_info
>= (3,):
61 print "Shinken is not yet compatible with Python3k, sorry"
65 #In fact it's useful for installed daemon
66 sys
.path
.insert(0, '.')
69 #Try to load shinken lib.
70 #Maybe it's not in our python path, so we detect it
71 #it so (it's a untar install) we add .. in the path
73 from shinken
.util
import to_bool
74 if hasattr(sys
.modules
['__main__'], '__file__'):
75 my_path
= os
.path
.abspath(sys
.modules
['__main__'].__file
__)
76 elts
= os
.path
.dirname(my_path
).split(os
.sep
)[:-1]
77 elts
.append('shinken')
78 sys
.path
.append(os
.sep
.join(elts
))
80 if hasattr(sys
.modules
['__main__'], '__file__'):
81 #Now add in the python path the shinken lib
82 #if we launch it in a direct way and
83 #the shinken is not a python lib
84 my_path
= os
.path
.abspath(sys
.modules
['__main__'].__file
__)
85 elts
= os
.path
.dirname(my_path
).split(os
.sep
)[:-1]
86 sys
.path
.append(os
.sep
.join(elts
))
87 elts
.append('shinken')
88 sys
.path
.append(os
.sep
.join(elts
))
93 import shinken
.pyro_wrapper
95 print "Shinken require the Python Pyro module. Please install it."
98 Pyro
= shinken
.pyro_wrapper
.Pyro
101 from shinken
.util
import to_bool
102 #from scheduler import Scheduler
103 from shinken
.config
import Config
104 from shinken
.external_command
import ExternalCommandManager
105 from shinken
.dispatcher
import Dispatcher
106 from shinken
.daemon
import Daemon
107 from shinken
.log
import logger
108 from shinken
.modulesmanager
import ModulesManager
109 from shinken
.brok
import Brok
110 from shinken
.external_command
import ExternalCommand
117 #Interface for the other Arbiter
118 #It connect, and we manage who is the Master, slave etc.
119 #Here is a also a fnction to have a new conf from the master
120 class IArbiters(Pyro
.core
.ObjBase
):
121 #we keep arbiter link
122 def __init__(self
, arbiter
):
123 Pyro
.core
.ObjBase
.__init
__(self
)
124 self
.arbiter
= arbiter
125 self
.running_id
= random
.random()
128 #Broker need to void it's broks?
129 def get_running_id(self
):
130 return self
.running_id
133 def have_conf(self
, magic_hash
):
134 #I've got a conf and the good one
135 if self
.arbiter
.have_conf
and self
.arbiter
.conf
.magic_hash
== magic_hash
:
137 else: #No conf or a bad one
141 #The master Arbiter is sending us a new conf. Ok, we take it
142 def put_conf(self
, conf
):
143 self
.arbiter
.conf
= conf
144 print "Get conf:", self
.arbiter
.conf
145 self
.arbiter
.have_conf
= True
146 print "Just after reception"
147 self
.arbiter
.must_run
= False
155 #the master arbiter ask me to do not run!
156 def do_not_run(self
):
157 #If i'm the master, just FUCK YOU!
158 if self
.arbiter
.is_master
:
159 print "Some fucking idiot ask me to do not run. I'm a proud master, so I'm still running"
160 #Else I'm just a spare, so I listen to my master
162 print "Someone ask me to do not run"
163 self
.arbiter
.last_master_speack
= time
.time()
164 self
.arbiter
.must_run
= False
168 class Arbiter(Daemon
):
172 #'workdir' : {'default' : '/usr/local/shinken/var', 'pythonize' : None},
173 #'pidfile' : {'default' : '/usr/local/shinken/var/arbiterd.pid', 'pythonize' : None},
174 #'port' : {'default' : '7768', 'pythonize' : to_int},
175 #'host' : {'default' : '0.0.0.0', 'pythonize' : None},
176 #'user' : {'default' : 'shinken', 'pythonize' : None},
177 #'group' : {'default' : 'shinken', 'pythonize' : None},
178 #'idontcareaboutsecurity' : {'default' : '0', 'pythonize' : to_bool}
182 def __init__(self
, config_files
, is_daemon
, do_replace
, verify_only
, debug
, debug_file
):
183 self
.config_file
= config_files
[0]
184 self
.config_files
= config_files
185 self
.is_daemon
= is_daemon
186 self
.verify_only
= verify_only
187 self
.do_replace
= do_replace
189 self
.debug_file
= debug_file
191 #From daemon to manage signal. Call self.manage_signal if
192 #exists, a dummy function otherwise
193 self
.set_exit_handler()
195 self
.is_master
= False
198 self
.nb_broks_send
= 0
200 #Now tab for external_commands
201 self
.external_commands
= []
203 self
.t_each_loop
= time
.time() # to track system time change
206 #Use for adding things like broks
208 if isinstance(b
, Brok
):
210 elif isinstance(b
, ExternalCommand
):
211 self
.external_commands
.append(b
)
213 logger
.log('Warning : cannot manage object type %s (%s)' % (type(b
), b
))
216 #We must push our broks to the broker
217 #because it's stupid to make a crossing connexion
218 #so we find the broker responbile for our broks,
220 #TODO : better find the broker, here it can be dead?
221 #or not the good one?
222 def push_broks_to_broker(self
):
223 for brk
in self
.conf
.brokers
:
224 #Send only if alive of course
225 if brk
.manage_arbiters
and brk
.alive
:
226 is_send
= brk
.push_broks(self
.broks
)
228 #They are gone, we keep none!
232 #We must take external_commands from all brokers
233 def get_external_commands_from_brokers(self
):
234 for brk
in self
.conf
.brokers
:
235 #Get only if alive of course
237 new_cmds
= brk
.get_external_commands()
238 for new_cmd
in new_cmds
:
239 self
.external_commands
.append(new_cmd
)
242 #Our links to satellites can raise broks. We must send them
243 def get_broks_from_satellitelinks(self
):
244 tabs
= [self
.conf
.brokers
, self
.conf
.schedulerlinks
, \
245 self
.conf
.pollers
, self
.conf
.reactionners
]
248 new_broks
= s
.get_all_broks()
253 #Our links to satellites can raise broks. We must send them
254 def get_initial_broks_from_satellitelinks(self
):
255 tabs
= [self
.conf
.brokers
, self
.conf
.schedulerlinks
, \
256 self
.conf
.pollers
, self
.conf
.reactionners
]
259 b
= s
.get_initial_status_brok()
263 #Load the external commander
264 def load_external_command(self
, e
):
265 self
.external_command
= e
269 #Check if our system time change. If so, change our
270 def check_for_system_time_change(self
):
272 difference
= now
- self
.t_each_loop
273 #If we have more than 15 min time change, we need to compensate
276 if abs(difference
) > 900:
277 self
.compensate_system_time_change(difference
)
279 #Now set the new value for the tick loop
280 self
.t_each_loop
= now
282 #return the diff if it need, of just 0
283 if abs(difference
) > 900:
289 #If we've got a system time change, we need to compensate it
290 #from now, we do not do anything in fact.
291 def compensate_system_time_change(self
, difference
):
292 logger
.log('Warning: A system time change of %s has been detected. Compensating...' % difference
)
293 #We only need to change some value
300 self
.log
.load_obj(self
)
303 for line
in self
.get_header():
304 self
.log
.log(line
)#, format = 'TOTO %s\n')
306 #Use to know if we must still be alive or not
309 print "Loading configuration"
311 #The config Class must have the USERN macro
312 #There are 256 of them, so we create online
313 Config
.fill_usern_macros()
315 #REF: doc/shinken-conf-dispatching.png (1)
316 buf
= self
.conf
.read_config(self
.config_files
)
318 raw_objects
= self
.conf
.read_config_buf(buf
)
320 #### Loading Arbiter module part ####
322 #first we need to get arbtiers and modules first
323 #so we can ask them some objects too
324 self
.conf
.create_objects_for_type(raw_objects
, 'arbiter')
325 self
.conf
.create_objects_for_type(raw_objects
, 'module')
328 self
.conf
.early_arbiter_linking()
330 #Search wich Arbiterlink I am
331 for arb
in self
.conf
.arbiterlinks
:
333 arb
.need_conf
= False
335 print "I am the arbiter :", arb
.get_name()
336 print "Am I the master?", not self
.me
.spare
340 #If None, there will be huge problems. The conf will be invalid
341 #And we will bail out after print all errors
343 print "My own modules :"
344 for m
in self
.me
.modules
:
347 #BEWARE: this way of finding path is good if we still
348 #DO NOT HAVE CHANGE PWD!!!
349 #Now get the module path. It's in fact the directory modules
350 #inside the shinken directory. So let's find it.
351 print "modulemanager file", shinken
.modulesmanager
.__file
__
352 modulespath
= os
.path
.abspath(shinken
.modulesmanager
.__file
__)
353 print "modulemanager absolute file", modulespath
354 #We got one of the files of
355 elts
= os
.path
.dirname(modulespath
).split(os
.sep
)[:-1]
356 elts
.append('shinken')
357 elts
.append('modules')
358 self
.modulespath
= os
.sep
.join(elts
)
359 logger
.log("Using modules path : %s" % os
.sep
.join(elts
))
362 self
.modules_manager
= ModulesManager('arbiter', self
.modulespath
, self
.me
.modules
)
363 self
.modules_manager
.load()
364 self
.mod_instances
= self
.modules_manager
.get_instances()
366 # Now we ask for configuration modules if they
368 for inst
in self
.mod_instances
:
369 if 'configuration' in inst
.properties
['phases']:
371 r
= inst
.get_objects()
372 types_creations
= self
.conf
.types_creations
373 for k
in types_creations
:
374 (cls
, clss
, prop
) = types_creations
[k
]
377 # test if raw_objects[k] is already set - if not, add empty array
378 if not k
in raw_objects
:
380 # now append the object
381 raw_objects
[k
].append(x
)
382 print "Added %i objects to %s from module %s" % (len(r
[prop
]), k
, inst
.get_name())
383 except Exception, exp
:
384 print "The instance %s raise an exception %s. I bypass it" % (inst
.get_name(), str(exp
))
386 ### Resume standard operations ###
387 self
.conf
.create_objects(raw_objects
)
389 #Maybe conf is already invalid
390 if not self
.conf
.conf_is_correct
:
391 print "***> One or more problems was encountered while processing the config files..."
395 #************** Change Nagios2 names to Nagios3 ones ******
396 self
.conf
.old_properties_names_to_new()
398 #print "****************** Create Template links **********"
399 self
.conf
.linkify_templates()
401 #print "****************** Inheritance *******************"
402 self
.conf
.apply_inheritance()
404 #print "****************** Explode ***********************"
407 #print "***************** Create Name reversed list ******"
408 self
.conf
.create_reversed_list()
410 #print "***************** Cleaning Twins *****************"
411 self
.conf
.remove_twins()
413 #print "****************** Implicit inheritance *******************"
414 self
.conf
.apply_implicit_inheritance()
416 #print "****************** Fill default ******************"
417 self
.conf
.fill_default()
419 #print "****************** Clean templates ******************"
420 self
.conf
.clean_useless()
422 #print "****************** Pythonize ******************"
423 self
.conf
.pythonize()
425 #print "****************** Linkify ******************"
428 #print "*************** applying dependancies ************"
429 self
.conf
.apply_dependancies()
431 #Hacking some global parameter inherited from Nagios to create
432 #on the fly some Broker modules like for status.dat parameters
433 #or nagios.log one if there are no already available
434 self
.conf
.hack_old_nagios_parameters()
436 #Raise warning about curently unmanaged parameters
437 self
.conf
.warn_about_unmanaged_parameters()
439 #print "************** Exlode global conf ****************"
440 self
.conf
.explode_global_conf()
442 #set ourown timezone and propagate it to other satellites
443 self
.conf
.propagate_timezone_option()
445 # Look for business rules, and create teh dep trees
446 self
.conf
.create_business_rules()
448 self
.conf
.create_business_rules_dependencies()
451 #************* Print warning about useless parameters in Shinken **************"
452 self
.conf
.notice_about_useless_parameters()
454 #print "****************** Correct ?******************"
455 self
.conf
.is_correct()
457 #If the conf is not correct, we must get out now
458 if not self
.conf
.conf_is_correct
:
459 print "Configuration is incorrect, sorry, I bail out"
465 #from guppy import hpy
471 print "Error : I cannot find my own Arbiter object, I bail out"
472 print "To solve it : please change the host_name parameter in the object Arbiter"
473 print "in the file shinken-specific.cfg. Thanks."
477 #If I am a spare, I must wait a (true) conf from Arbiter Master
478 self
.wait_conf
= self
.me
.spare
481 #for r in self.conf.realms:
482 # print r.get_name(), r.__dict__
486 #REF: doc/shinken-conf-dispatching.png (2)
487 logger
.log("Cutting the hosts and services into parts")
488 self
.confs
= self
.conf
.cut_into_parts()
490 #The conf can be incorrect here if the cut into parts see errors like
491 #a realm with hosts and not schedulers for it
492 if not self
.conf
.conf_is_correct
:
493 print "Configuration is incorrect, sorry, I bail out"
496 logger
.log('Things look okay - No serious problems were detected during the pre-flight check')
498 #Exit if we are just here for config checking
504 #Some properties need to be "flatten" (put in strings)
505 #before being send, like realms for hosts for example
506 #BEWARE: after the cutting part, because we stringify some properties
507 self
.conf
.prepare_for_sending()
510 #Ok, here we must check if we go on or not.
511 #TODO : check OK or not
512 self
.pidfile
= self
.conf
.lock_file
513 self
.idontcareaboutsecurity
= self
.conf
.idontcareaboutsecurity
514 self
.user
= self
.conf
.shinken_user
515 self
.group
= self
.conf
.shinken_group
516 self
.workdir
= os
.path
.expanduser('~'+self
.user
)
518 #If we go, we must go in daemon or not
519 #Check if another Scheduler is not running (with the same conf)
520 self
.check_parallel_run(do_replace
)
522 #If the admin don't care about security, I allow root running
523 insane
= not self
.idontcareaboutsecurity
526 #Try to change the user (not nt for the moment)
527 #TODO: change user on nt
529 self
.change_user(insane
)
531 logger
.log("Warning : you can't change user on this system")
533 #Now the daemon part if need
535 self
.create_daemon(do_debug
=debug
, debug_file
=debug_file
)
537 logger
.log("Opening of the network port")
538 #Now open the daemon port for Broks and other Arbiter sends
539 Pyro
.config
.PYRO_STORAGE
= self
.workdir
540 Pyro
.config
.PYRO_COMPRESSION
= 1
541 Pyro
.config
.PYRO_MULTITHREADED
= 0
542 logger
.log("Using working directory : %s" % os
.path
.abspath(self
.workdir
))
544 self
.poller_daemon
= shinken
.pyro_wrapper
.init_daemon(self
.me
.address
, self
.me
.port
)
546 logger
.log("Listening on %s:%d" % (self
.me
.address
, self
.me
.port
))
548 self
.iarbiters
= IArbiters(self
)
550 self
.uri_arb
= shinken
.pyro_wrapper
.register(self
.poller_daemon
, self
.iarbiters
, "ForArbiter")
552 logger
.log("Configuration Loaded")
556 #If I am a spare, I wait for the master arbiter to send me
559 self
.wait_initial_conf()
560 else:#I'm the master, I've got a conf
561 self
.is_master
= True
562 self
.have_conf
= True
564 #Ok, now It've got a True conf, Now I wait to get too much
567 print "I must wait now"
568 self
.wait_for_master_death()
575 #Get 'objects' from external modules
576 #It can be used for get external commands for example
577 def get_objects_from_from_queues(self
):
578 for f
in self
.modules_manager
.get_external_from_queues():
579 print "Groking from module instance %s" % f
583 o
= f
.get(block
=False)
584 print "Got object :", o
591 #We wait (block) for arbiter to send us conf
592 def wait_initial_conf(self
):
593 self
.have_conf
= False
594 print "Waiting for configuration from master"
596 while not self
.have_conf
:
597 socks
= shinken
.pyro_wrapper
.get_sockets(self
.poller_daemon
)
600 # 'foreign' event loop
601 ins
, outs
, exs
= select
.select(socks
, [], [], timeout
)
603 #Manage a possible time change (our avant will be change with the diff)
604 diff
= self
.check_for_system_time_change()
610 shinken
.pyro_wrapper
.handleRequests(self
.poller_daemon
, s
)
613 timeout
= timeout
- diff
614 break # no need to continue with the for loop
616 sys
.stdout
.write(".")
624 #We wait (block) for arbiter to send us something
625 def wait_for_master_death(self
):
626 print "Waiting for master death"
628 is_master_dead
= False
629 self
.last_master_speack
= time
.time()
630 while not is_master_dead
:
631 socks
= shinken
.pyro_wrapper
.get_sockets(self
.poller_daemon
)
633 # 'foreign' event loop
634 ins
, outs
, exs
= select
.select(socks
, [], [], timeout
)
636 #Manage a possible time change (our avant will be change with the diff)
637 diff
= self
.check_for_system_time_change()
643 shinken
.pyro_wrapper
.handleRequests(self
.poller_daemon
, s
)
644 self
.last_master_speack
= time
.time()
647 timeout
= timeout
- diff
649 sys
.stdout
.write(".")
656 #Now check if master is dead or not
658 if now
- self
.last_master_speack
> 5:
659 print "Master is dead!!!"
661 is_master_dead
= True
664 #Manage signal function
665 #Frame is just garbage
666 def manage_signal(self
, sig
, frame
):
667 print "\nExiting with signal", sig
668 print "Stopping all network connexions"
669 self
.poller_daemon
.shutdown(True)
670 print "Unlinking pid file"
672 os
.unlink(self
.pidfile
)
674 print "Error un deleting pid file:", exp
680 #Before running, I must be sure who am I
681 #The arbiters change, so we must refound the new self.me
682 for arb
in self
.conf
.arbiterlinks
:
686 logger
.log("Begin to dispatch configurations to satellites")
687 self
.dispatcher
= Dispatcher(self
.conf
, self
.me
)
688 self
.dispatcher
.check_alive()
689 self
.dispatcher
.check_dispatch()
690 #REF: doc/shinken-conf-dispatching.png (3)
691 self
.dispatcher
.dispatch()
693 #Now we can get all initial broks for our satellites
694 self
.get_initial_broks_from_satellitelinks()
696 #Now create the external commander
698 e
= ExternalCommandManager(self
.conf
, 'dispatcher')
700 #Scheduler need to know about external command to activate it
702 self
.load_external_command(e
)
706 print "Run baby, run..."
708 while self
.must_run
:
710 daemon_sockets
= shinken
.pyro_wrapper
.get_sockets(self
.poller_daemon
)
711 socks
.extend(daemon_sockets
)
713 if self
.fifo
!= None:
714 socks
.append(self
.fifo
)
715 # 'foreign' event loop
716 ins
, outs
, exs
= select
.select(socks
, [], [], timeout
)
718 #Manage a possible time change (our avant will be change with the diff)
719 diff
= self
.check_for_system_time_change()
725 if s
in daemon_sockets
:
726 shinken
.pyro_wrapper
.handleRequests(self
.poller_daemon
, s
)
729 timeout
= timeout
- diff
730 break # no need to continue with the for loop
731 #If FIFO, read external command
733 ext_cmds
= self
.external_command
.get()
734 for ext_cmd
in ext_cmds
:
735 self
.external_commands
.append(ext_cmd
)
736 self
.fifo
= self
.external_command
.open()
739 self
.dispatcher
.check_alive()
740 self
.dispatcher
.check_dispatch()
741 #REF: doc/shinken-conf-dispatching.png (3)
742 self
.dispatcher
.dispatch()
743 self
.dispatcher
.check_bad_dispatch()
745 #Now get things from our module instances
746 self
.get_objects_from_from_queues()
748 #Maybe our satellites links raise new broks. Must reap them
749 self
.get_broks_from_satellitelinks()
751 #One broker is responsible for our broks,
752 #we must give him our broks
753 self
.push_broks_to_broker()
754 self
.get_external_commands_from_brokers()
755 #send_conf_to_schedulers()
758 print "Nb Broks send:", self
.nb_broks_send
759 #logger.log("Nb Broks send: %d" % self.nb_broks_send)
760 self
.nb_broks_send
= 0
763 #Now send all external commands to schedulers
764 for ext_cmd
in self
.external_commands
:
765 self
.external_command
.resolve_command(ext_cmd
)
766 #It's send, do not keep them
767 #TODO: check if really send. Queue by scheduler?
768 self
.external_commands
= []
775 ################### Process launch part
777 print "Shinken Arbiter Daemon, version %s, from :" % VERSION
778 print " Gabes Jean, naparuba@gmail.com"
779 print " Gerhard Lausser, Gerhard.Lausser@consol.de"
780 print "Usage: %s [options] -c configfile [-c additionnal_config_file]" % name
782 print " -c, --config"
783 print "\tConfig file (your nagios.cfg). Multiple -c can be used, it will be like if all files was just one"
784 print " -d, --daemon"
785 print "\tRun in daemon mode"
786 print " -r, --replace"
787 print "\tReplace previous running scheduler"
789 print "\tPrint detailed help screen"
791 print "\tDebug File. Default : no use (why debug a bug free program? :) )"
797 if __name__
== "__main__":
800 opts
, args
= getopt
.getopt(sys
.argv
[1:], "hvrdc::w", ["help", "verify-config", "replace", "daemon", "config=", "debug=", "easter"])
801 except getopt
.GetoptError
, err
:
802 # print help information and exit:
803 print str(err
) # will print something like "option -a not recognized"
814 if o
in ("-h", "--help"):
817 elif o
in ("-v", "--verify-config"):
819 elif o
in ("-r", "--replace"):
821 elif o
in ("-c", "--config"):
822 config_files
.append(a
)
823 elif o
in ("-d", "--daemon"):
825 elif o
in ("--debug"):
829 print "Sorry, the option", o
, a
, "is unknown"
833 if len(config_files
) == 0:
834 print "Error : config file is need"
838 p
= Arbiter(config_files
, is_daemon
, do_replace
, verify_only
, debug
, debug_file
)
839 #Ok, now we load the config
844 #command = """p.main()"""
845 #cProfile.runctx( command, globals(), locals(), filename="/tmp/Arbiter.profile" )