Merge branch 'master' of ssh://lausser,shinken@shinken.git.sourceforge.net/gitroot...
[shinken.git] / shinken / daemons / arbiterdaemon.py
blob794e2bfe16cd7427be4a80eb724b6503f156d631
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 import sys
24 import os
25 import time
26 import random
27 from Queue import Empty
30 from shinken.objects import Config
31 from shinken.external_command import ExternalCommandManager
32 from shinken.dispatcher import Dispatcher
33 from shinken.daemon import Daemon, Interface
34 from shinken.log import logger
35 from shinken.brok import Brok
36 from shinken.external_command import ExternalCommand
37 from shinken.pyro_wrapper import Pyro
40 # Interface for the other Arbiter
41 # It connects, and together we decide who's the Master and who's the Slave, etc.
42 # Here is a also a function to get a new conf from the master
43 class IForArbiter(Interface):
45 def have_conf(self, magic_hash):
46 # I've got a conf and a good one
47 if self.app.cur_conf and self.app.cur_conf.magic_hash == magic_hash:
48 return True
49 else: #I've no conf or a bad one
50 return False
52 # The master Arbiter is sending us a new conf. Ok, we take it
53 def put_conf(self, conf):
54 super(IForArbiter, self).put_conf(conf)
55 self.app.must_run = False
57 # The master arbiter asks me not to run!
58 def do_not_run(self):
59 # If i'm the master, then F**K YOU!
60 if self.app.is_master:
61 print "Some f***ing idiot asks me not to run. I'm a proud master, so I decide to run anyway"
62 # Else, I'm just a spare, so I listen to my master
63 else:
64 print "Someone asks me not to run"
65 self.app.last_master_speack = time.time()
66 self.app.must_run = False
70 # Here a function called by check_shinken to get deamon status
71 def get_satellite_status(self, daemon_type, daemon_name):
72 daemon_name_attr = daemon_type+"_name"
73 daemons = self.app.get_daemons(daemon_type)
74 if daemons:
75 for dae in daemons:
76 if hasattr(dae, daemon_name_attr) and getattr(dae, daemon_name_attr) == daemon_name:
77 if hasattr(dae, 'alive') and hasattr(dae, 'spare'):
78 return {'alive' : dae.alive, 'spare' : dae.spare}
79 return None
81 # Here a function called by check_shinken to get deamons list
82 def get_satellite_list(self, daemon_type):
83 satellite_list = []
84 daemon_name_attr = daemon_type+"_name"
85 daemons = self.app.get_daemons(daemon_type)
86 if daemons:
87 for dae in daemons:
88 if hasattr(dae, daemon_name_attr):
89 satellite_list.append(getattr(dae, daemon_name_attr))
90 else:
91 #If one deamon has no name... ouch!
92 return None
93 return satellite_list
94 return None
97 # Main Arbiter Class
98 class Arbiter(Daemon):
100 #properties = {}
102 def __init__(self, config_files, is_daemon, do_replace, verify_only, debug, debug_file):
104 super(Arbiter, self).__init__('arbiter', config_files[0], is_daemon, do_replace, debug, debug_file)
106 self.config_files = config_files
108 self.verify_only = verify_only
110 self.broks = {}
111 self.is_master = False
112 self.me = None
114 self.nb_broks_send = 0
116 # Now tab for external_commands
117 self.external_commands = []
119 self.fifo = None
121 # Use to know if we must still be alive or not
122 self.must_run = True
124 self.interface = IForArbiter(self)
125 self.conf = Config()
128 # Use for adding things like broks
129 def add(self, b):
130 if isinstance(b, Brok):
131 self.broks[b.id] = b
132 elif isinstance(b, ExternalCommand):
133 self.external_commands.append(b)
134 else:
135 logger.log('Warning : cannot manage object type %s (%s)' % (type(b), b))
138 # We must push our broks to the broker
139 # because it's stupid to make a crossing connexion
140 # so we find the broker responbile for our broks,
141 # and we send him it
142 # TODO : better find the broker, here it can be dead?
143 # or not the good one?
144 def push_broks_to_broker(self):
145 for brk in self.conf.brokers:
146 # Send only if alive of course
147 if brk.manage_arbiters and brk.alive:
148 is_send = brk.push_broks(self.broks)
149 if is_send:
150 # They are gone, we keep none!
151 self.broks.clear()
154 # We must take external_commands from all brokers
155 def get_external_commands_from_brokers(self):
156 for brk in self.conf.brokers:
157 # Get only if alive of course
158 if brk.alive:
159 new_cmds = brk.get_external_commands()
160 for new_cmd in new_cmds:
161 self.external_commands.append(new_cmd)
164 # We must take external_commands from all brokers
165 def get_external_commands_from_receivers(self):
166 for rec in self.conf.receivers:
167 # Get only if alive of course
168 if rec.alive:
169 new_cmds = rec.get_external_commands()
170 for new_cmd in new_cmds:
171 self.external_commands.append(new_cmd)
174 # Our links to satellites can raise broks. We must send them
175 def get_broks_from_satellitelinks(self):
176 tabs = [self.conf.brokers, self.conf.schedulerlinks, \
177 self.conf.pollers, self.conf.reactionners]
178 for tab in tabs:
179 for s in tab:
180 new_broks = s.get_all_broks()
181 for b in new_broks:
182 self.add(b)
185 # Our links to satellites can raise broks. We must send them
186 def get_initial_broks_from_satellitelinks(self):
187 tabs = [self.conf.brokers, self.conf.schedulerlinks, \
188 self.conf.pollers, self.conf.reactionners]
189 for tab in tabs:
190 for s in tab:
191 b = s.get_initial_status_brok()
192 self.add(b)
195 # Load the external commander
196 def load_external_command(self, e):
197 self.external_command = e
198 self.fifo = e.open()
201 def get_daemon_links(self, daemon_type):
202 #the attribute name to get those differs for schedulers and arbiters
203 if (daemon_type == 'scheduler' or daemon_type == 'arbiter'):
204 daemon_links = daemon_type+'links'
205 else:
206 daemon_links = daemon_type+'s'
207 return daemon_links
210 def load_config_file(self):
211 print "Loading configuration"
212 # REF: doc/shinken-conf-dispatching.png (1)
213 buf = self.conf.read_config(self.config_files)
214 raw_objects = self.conf.read_config_buf(buf)
216 # First we need to get arbiters and modules first
217 # so we can ask them some objects too
218 self.conf.create_objects_for_type(raw_objects, 'arbiter')
219 self.conf.create_objects_for_type(raw_objects, 'module')
221 self.conf.early_arbiter_linking()
223 # Search wich Arbiterlink I am
224 for arb in self.conf.arbiterlinks:
225 if arb.is_me():
226 arb.need_conf = False
227 self.me = arb
228 print "I am the arbiter :", arb.get_name()
229 self.is_master = not self.me.spare
230 print "Am I the master?", self.is_master
231 # Set myself as alive ;)
232 self.me.alive = True
233 else: #not me
234 arb.need_conf = True
236 if not self.me:
237 sys.exit("Error: I cannot find my own Arbiter object, I bail out. "
238 "To solve it, please change the host_name parameter in "
239 "the object Arbiter in the file shinken-specific.cfg. "
240 "Thanks.")
242 print "My own modules :"
243 for m in self.me.modules:
244 print m
246 # we request the instances without them being *started*
247 # (for these that are concerned ("external" modules):
248 # we will *start* these instances after we have been daemonized (if requested)
249 self.modules_manager.set_modules(self.me.modules)
250 self.do_load_modules(False)
252 # Call modules that manage this read configuration pass
253 self.hook_point('read_configuration')
255 # Now we ask for configuration modules if they
256 # got items for us
257 for inst in self.modules_manager.instances:
258 if 'configuration' in inst.phases:
259 try :
260 r = inst.get_objects()
261 except Exception, exp:
262 print "The instance %s raise an exception %s. I bypass it" % (inst.get_name(), str(exp))
263 continue
265 types_creations = self.conf.types_creations
266 for k in types_creations:
267 (cls, clss, prop) = types_creations[k]
268 if prop in r:
269 for x in r[prop]:
270 # test if raw_objects[k] is already set - if not, add empty array
271 if not k in raw_objects:
272 raw_objects[k] = []
273 # now append the object
274 raw_objects[k].append(x)
275 print "Added %i objects to %s from module %s" % (len(r[prop]), k, inst.get_name())
278 ### Resume standard operations ###
279 self.conf.create_objects(raw_objects)
281 # Maybe conf is already invalid
282 if not self.conf.conf_is_correct:
283 sys.exit("***> One or more problems was encountered while processing the config files...")
285 # Change Nagios2 names to Nagios3 ones
286 self.conf.old_properties_names_to_new()
288 # Create Template links
289 self.conf.linkify_templates()
291 # All inheritances
292 self.conf.apply_inheritance()
294 # Explode between types
295 self.conf.explode()
297 # Create Name reversed list for searching list
298 self.conf.create_reversed_list()
300 # Cleaning Twins objects
301 self.conf.remove_twins()
303 # Implicit inheritance for services
304 self.conf.apply_implicit_inheritance()
306 # Fill default values
307 self.conf.fill_default()
309 # Clean templates
310 self.conf.clean_useless()
312 # Pythonize values
313 self.conf.pythonize()
315 # Linkify objects each others
316 self.conf.linkify()
318 # applying dependancies
319 self.conf.apply_dependancies()
321 # Hacking some global parameter inherited from Nagios to create
322 # on the fly some Broker modules like for status.dat parameters
323 # or nagios.log one if there are no already available
324 self.conf.hack_old_nagios_parameters()
326 # Raise warning about curently unmanaged parameters
327 self.conf.warn_about_unmanaged_parameters()
329 # Exlode global conf parameters into Classes
330 self.conf.explode_global_conf()
332 # set ourown timezone and propagate it to other satellites
333 self.conf.propagate_timezone_option()
335 # Look for business rules, and create teh dep trees
336 self.conf.create_business_rules()
337 # And link them
338 self.conf.create_business_rules_dependencies()
341 # Warn about useless parameters in Shinken
342 self.conf.notice_about_useless_parameters()
344 # Manage all post-conf modules
345 self.hook_point('late_configuration')
347 # Correct conf?
348 self.conf.is_correct()
350 #If the conf is not correct, we must get out now
351 if not self.conf.conf_is_correct:
352 sys.exit("Configuration is incorrect, sorry, I bail out")
354 # REF: doc/shinken-conf-dispatching.png (2)
355 logger.log("Cutting the hosts and services into parts")
356 self.confs = self.conf.cut_into_parts()
358 # The conf can be incorrect here if the cut into parts see errors like
359 # a realm with hosts and not schedulers for it
360 if not self.conf.conf_is_correct:
361 sys.exit("Configuration is incorrect, sorry, I bail out")
363 logger.log('Things look okay - No serious problems were detected during the pre-flight check')
365 # Exit if we are just here for config checking
366 if self.verify_only:
367 sys.exit(0)
369 # Some properties need to be "flatten" (put in strings)
370 # before being send, like realms for hosts for example
371 # BEWARE: after the cutting part, because we stringify some properties
372 self.conf.prepare_for_sending()
374 # Ok, here we must check if we go on or not.
375 # TODO : check OK or not
376 self.pidfile = self.conf.lock_file
377 self.idontcareaboutsecurity = self.conf.idontcareaboutsecurity
378 self.user = self.conf.shinken_user
379 self.group = self.conf.shinken_group
380 self.workdir = os.path.expanduser('~'+self.user)
382 ## We need to set self.host & self.port to be used by do_daemon_init_and_start
383 self.host = self.me.address
384 self.port = self.me.port
386 logger.log("Configuration Loaded")
389 # Main loop function
390 def main(self):
391 # Log will be broks
392 for line in self.get_header():
393 self.log.log(line)
395 self.load_config_file()
397 self.do_daemon_init_and_start(self.conf)
398 self.uri_arb = self.pyro_daemon.register(self.interface, "ForArbiter")
400 # ok we are now fully daemon (if requested)
401 # now we can start our "external" modules (if any) :
402 self.modules_manager.init_and_start_instances()
404 # Ok now we can load the retention data
405 print "FOUCK"*100
406 self.hook_point('load_retention')
408 ## And go for the main loop
409 self.do_mainloop()
412 def setup_new_conf(self):
413 """ Setup a new conf received from a Master arbiter. """
414 conf = self.new_conf
415 self.new_conf = None
416 self.cur_conf = conf
417 self.conf = conf
418 for arb in self.conf.arbiterlinks:
419 if (arb.address, arb.port) == (self.host, self.port):
420 self.me = arb
421 arb.is_me = lambda: True # we now definitively know who we are, just keep it.
422 else:
423 arb.is_me = lambda: False # and we know who we are not, just keep it.
425 def do_loop_turn(self):
426 # If I am a spare, I wait for the master arbiter to send me
427 # true conf. When
428 if self.me.spare:
429 self.wait_for_initial_conf()
430 if not self.new_conf:
431 return
432 self.setup_new_conf()
433 print "I must wait now"
434 self.wait_for_master_death()
436 if self.must_run:
437 # Main loop
438 self.run()
441 # Get 'objects' from external modules
442 # It can be used for get external commands for example
443 def get_objects_from_from_queues(self):
444 for f in self.modules_manager.get_external_from_queues():
445 print "Groking from module instance %s" % f
446 while True:
447 try:
448 o = f.get(block=False)
449 print "Got object :", o
450 self.add(o)
451 except Empty:
452 break
454 # We wait (block) for arbiter to send us something
455 def wait_for_master_death(self):
456 print "Waiting for master death"
457 timeout = 1.0
458 self.last_master_speack = time.time()
460 while not self.interrupted:
461 elapsed, _, tcdiff = self.handleRequests(timeout)
462 # if there was a system Time Change (tcdiff) then we have to adapt last_master_speak:
463 if self.new_conf:
464 self.setup_new_conf()
465 if tcdiff:
466 self.last_master_speack += tcdiff
467 if elapsed:
468 self.last_master_speack = time.time()
469 timeout -= elapsed
470 if timeout > 0:
471 continue
473 timeout = 1.0
474 sys.stdout.write(".")
475 sys.stdout.flush()
477 # Now check if master is dead or not
478 now = time.time()
479 if now - self.last_master_speack > 5:
480 print "Master is dead!!!"
481 self.must_run = True
482 break
484 # Main function
485 def run(self):
486 # Before running, I must be sure who am I
487 # The arbiters change, so we must refound the new self.me
488 for arb in self.conf.arbiterlinks:
489 if arb.is_me():
490 self.me = arb
492 logger.log("Begin to dispatch configurations to satellites")
493 self.dispatcher = Dispatcher(self.conf, self.me)
494 self.dispatcher.check_alive()
495 self.dispatcher.check_dispatch()
496 # REF: doc/shinken-conf-dispatching.png (3)
497 self.dispatcher.dispatch()
499 # Now we can get all initial broks for our satellites
500 self.get_initial_broks_from_satellitelinks()
502 suppl_socks = None
504 # Now create the external commander
505 if os.name != 'nt':
506 e = ExternalCommandManager(self.conf, 'dispatcher')
507 e.load_arbiter(self)
508 # Arbiter need to know about external command to activate it
509 # if necessary
510 self.load_external_command(e)
511 if self.fifo is not None:
512 suppl_socks = [ self.fifo ]
514 print "Run baby, run..."
515 timeout = 1.0
517 while self.must_run and not self.interrupted:
519 elapsed, ins, _ = self.handleRequests(timeout, suppl_socks)
521 # If FIFO, read external command
522 if ins:
523 now = time.time()
524 ext_cmds = self.external_command.get()
525 if ext_cmds:
526 for ext_cmd in ext_cmds:
527 self.external_commands.append(ext_cmd)
528 else:
529 self.fifo = self.external_command.open()
530 if self.fifo is not None:
531 suppl_socks = [ self.fifo ]
532 else:
533 suppl_socks = None
534 elapsed += now - time.time()
536 if elapsed or ins:
537 timeout -= elapsed
538 if timeout > 0: # only continue if we are not over timeout
539 continue
541 # Timeout
542 timeout = 1.0 # reset the timeout value
544 # Call modules that manage a starting tick pass
545 self.hook_point('tick')
547 self.dispatcher.check_alive()
548 self.dispatcher.check_dispatch()
549 # REF: doc/shinken-conf-dispatching.png (3)
550 self.dispatcher.dispatch()
551 self.dispatcher.check_bad_dispatch()
553 # Now get things from our module instances
554 self.get_objects_from_from_queues()
556 # Maybe our satellites links raise new broks. Must reap them
557 self.get_broks_from_satellitelinks()
559 # One broker is responsible for our broks,
560 # we must give him our broks
561 self.push_broks_to_broker()
562 self.get_external_commands_from_brokers()
563 self.get_external_commands_from_receivers()
564 # send_conf_to_schedulers()
566 print "Nb Broks send:", self.nb_broks_send
567 self.nb_broks_send = 0
569 # Now send all external commands to schedulers
570 for ext_cmd in self.external_commands:
571 self.external_command.resolve_command(ext_cmd)
572 # It's send, do not keep them
573 # TODO: check if really send. Queue by scheduler?
574 self.external_commands = []
576 # This function returns the part of the conf where are stored the daemons of
577 # a given daemon type
578 def get_daemons(self, daemon_type):
579 # We get the list of the daemons from their links
580 # 'schedulerlinks' for schedulers, 'arbiterlinks' for arbiters
581 # and 'pollers', 'brokers', 'reactionners' for the other
582 if (daemon_type == 'scheduler' or daemon_type == 'arbiter'):
583 daemon_links = daemon_type+'links'
584 else:
585 daemon_links = daemon_type+'s'
587 if hasattr(self.conf, daemon_links):
588 return getattr(self.conf, daemon_links);
590 # If the links cannot be found, we have a problem
591 return None
594 # Helper functions for retention modules
595 # So we give our broks and external commands
596 def get_retention_data(self):
597 r = {}
598 r['broks'] = self.broks
599 r['external_commands'] = self.external_commands
600 return r
602 # Get back our data from a retention module
603 def restore_retention_data(self, data):
604 broks = data['broks']
605 external_commands = data['external_commands']
606 self.broks.update(broks)
607 self.external_commands.extend(external_commands)