2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
27 from Queue
import Empty
30 from shinken
.objects
import Config
31 from shinken
.external_command
import ExternalCommandManager
32 from shinken
.dispatcher
import Dispatcher
33 from shinken
.daemon
import Daemon
, Interface
34 from shinken
.log
import logger
35 from shinken
.brok
import Brok
36 from shinken
.external_command
import ExternalCommand
37 from shinken
.pyro_wrapper
import Pyro
40 # Interface for the other Arbiter
41 # It connects, and together we decide who's the Master and who's the Slave, etc.
42 # Here is a also a function to get a new conf from the master
43 class IForArbiter(Interface
):
45 def have_conf(self
, magic_hash
):
46 # I've got a conf and a good one
47 if self
.app
.cur_conf
and self
.app
.cur_conf
.magic_hash
== magic_hash
:
49 else: #I've no conf or a bad one
52 # The master Arbiter is sending us a new conf. Ok, we take it
53 def put_conf(self
, conf
):
54 super(IForArbiter
, self
).put_conf(conf
)
55 self
.app
.must_run
= False
57 # The master arbiter asks me not to run!
59 # If i'm the master, then F**K YOU!
60 if self
.app
.is_master
:
61 print "Some f***ing idiot asks me not to run. I'm a proud master, so I decide to run anyway"
62 # Else, I'm just a spare, so I listen to my master
64 print "Someone asks me not to run"
65 self
.app
.last_master_speack
= time
.time()
66 self
.app
.must_run
= False
70 # Here a function called by check_shinken to get deamon status
71 def get_satellite_status(self
, daemon_type
, daemon_name
):
72 daemon_name_attr
= daemon_type
+"_name"
73 daemons
= self
.app
.get_daemons(daemon_type
)
76 if hasattr(dae
, daemon_name_attr
) and getattr(dae
, daemon_name_attr
) == daemon_name
:
77 if hasattr(dae
, 'alive') and hasattr(dae
, 'spare'):
78 return {'alive' : dae
.alive
, 'spare' : dae
.spare
}
81 # Here a function called by check_shinken to get deamons list
82 def get_satellite_list(self
, daemon_type
):
84 daemon_name_attr
= daemon_type
+"_name"
85 daemons
= self
.app
.get_daemons(daemon_type
)
88 if hasattr(dae
, daemon_name_attr
):
89 satellite_list
.append(getattr(dae
, daemon_name_attr
))
91 #If one deamon has no name... ouch!
98 class Arbiter(Daemon
):
102 def __init__(self
, config_files
, is_daemon
, do_replace
, verify_only
, debug
, debug_file
):
104 super(Arbiter
, self
).__init
__('arbiter', config_files
[0], is_daemon
, do_replace
, debug
, debug_file
)
106 self
.config_files
= config_files
108 self
.verify_only
= verify_only
111 self
.is_master
= False
114 self
.nb_broks_send
= 0
116 # Now tab for external_commands
117 self
.external_commands
= []
121 # Use to know if we must still be alive or not
124 self
.interface
= IForArbiter(self
)
128 # Use for adding things like broks
130 if isinstance(b
, Brok
):
132 elif isinstance(b
, ExternalCommand
):
133 self
.external_commands
.append(b
)
135 logger
.log('Warning : cannot manage object type %s (%s)' % (type(b
), b
))
138 # We must push our broks to the broker
139 # because it's stupid to make a crossing connexion
140 # so we find the broker responbile for our broks,
142 # TODO : better find the broker, here it can be dead?
143 # or not the good one?
144 def push_broks_to_broker(self
):
145 for brk
in self
.conf
.brokers
:
146 # Send only if alive of course
147 if brk
.manage_arbiters
and brk
.alive
:
148 is_send
= brk
.push_broks(self
.broks
)
150 # They are gone, we keep none!
154 # We must take external_commands from all brokers
155 def get_external_commands_from_brokers(self
):
156 for brk
in self
.conf
.brokers
:
157 # Get only if alive of course
159 new_cmds
= brk
.get_external_commands()
160 for new_cmd
in new_cmds
:
161 self
.external_commands
.append(new_cmd
)
164 # We must take external_commands from all brokers
165 def get_external_commands_from_receivers(self
):
166 for rec
in self
.conf
.receivers
:
167 # Get only if alive of course
169 new_cmds
= rec
.get_external_commands()
170 for new_cmd
in new_cmds
:
171 self
.external_commands
.append(new_cmd
)
174 # Our links to satellites can raise broks. We must send them
175 def get_broks_from_satellitelinks(self
):
176 tabs
= [self
.conf
.brokers
, self
.conf
.schedulerlinks
, \
177 self
.conf
.pollers
, self
.conf
.reactionners
]
180 new_broks
= s
.get_all_broks()
185 # Our links to satellites can raise broks. We must send them
186 def get_initial_broks_from_satellitelinks(self
):
187 tabs
= [self
.conf
.brokers
, self
.conf
.schedulerlinks
, \
188 self
.conf
.pollers
, self
.conf
.reactionners
]
191 b
= s
.get_initial_status_brok()
195 # Load the external commander
196 def load_external_command(self
, e
):
197 self
.external_command
= e
201 def get_daemon_links(self
, daemon_type
):
202 #the attribute name to get those differs for schedulers and arbiters
203 if (daemon_type
== 'scheduler' or daemon_type
== 'arbiter'):
204 daemon_links
= daemon_type
+'links'
206 daemon_links
= daemon_type
+'s'
210 def load_config_file(self
):
211 print "Loading configuration"
212 # REF: doc/shinken-conf-dispatching.png (1)
213 buf
= self
.conf
.read_config(self
.config_files
)
214 raw_objects
= self
.conf
.read_config_buf(buf
)
216 # First we need to get arbiters and modules first
217 # so we can ask them some objects too
218 self
.conf
.create_objects_for_type(raw_objects
, 'arbiter')
219 self
.conf
.create_objects_for_type(raw_objects
, 'module')
221 self
.conf
.early_arbiter_linking()
223 # Search wich Arbiterlink I am
224 for arb
in self
.conf
.arbiterlinks
:
226 arb
.need_conf
= False
228 print "I am the arbiter :", arb
.get_name()
229 self
.is_master
= not self
.me
.spare
230 print "Am I the master?", self
.is_master
231 # Set myself as alive ;)
237 sys
.exit("Error: I cannot find my own Arbiter object, I bail out. "
238 "To solve it, please change the host_name parameter in "
239 "the object Arbiter in the file shinken-specific.cfg. "
242 print "My own modules :"
243 for m
in self
.me
.modules
:
246 # we request the instances without them being *started*
247 # (for these that are concerned ("external" modules):
248 # we will *start* these instances after we have been daemonized (if requested)
249 self
.modules_manager
.set_modules(self
.me
.modules
)
250 self
.do_load_modules(False)
252 # Call modules that manage this read configuration pass
253 self
.hook_point('read_configuration')
255 # Now we ask for configuration modules if they
257 for inst
in self
.modules_manager
.instances
:
258 if 'configuration' in inst
.phases
:
260 r
= inst
.get_objects()
261 except Exception, exp
:
262 print "The instance %s raise an exception %s. I bypass it" % (inst
.get_name(), str(exp
))
265 types_creations
= self
.conf
.types_creations
266 for k
in types_creations
:
267 (cls
, clss
, prop
) = types_creations
[k
]
270 # test if raw_objects[k] is already set - if not, add empty array
271 if not k
in raw_objects
:
273 # now append the object
274 raw_objects
[k
].append(x
)
275 print "Added %i objects to %s from module %s" % (len(r
[prop
]), k
, inst
.get_name())
278 ### Resume standard operations ###
279 self
.conf
.create_objects(raw_objects
)
281 # Maybe conf is already invalid
282 if not self
.conf
.conf_is_correct
:
283 sys
.exit("***> One or more problems was encountered while processing the config files...")
285 # Change Nagios2 names to Nagios3 ones
286 self
.conf
.old_properties_names_to_new()
288 # Create Template links
289 self
.conf
.linkify_templates()
292 self
.conf
.apply_inheritance()
294 # Explode between types
297 # Create Name reversed list for searching list
298 self
.conf
.create_reversed_list()
300 # Cleaning Twins objects
301 self
.conf
.remove_twins()
303 # Implicit inheritance for services
304 self
.conf
.apply_implicit_inheritance()
306 # Fill default values
307 self
.conf
.fill_default()
310 self
.conf
.clean_useless()
313 self
.conf
.pythonize()
315 # Linkify objects each others
318 # applying dependancies
319 self
.conf
.apply_dependancies()
321 # Hacking some global parameter inherited from Nagios to create
322 # on the fly some Broker modules like for status.dat parameters
323 # or nagios.log one if there are no already available
324 self
.conf
.hack_old_nagios_parameters()
326 # Raise warning about curently unmanaged parameters
327 self
.conf
.warn_about_unmanaged_parameters()
329 # Exlode global conf parameters into Classes
330 self
.conf
.explode_global_conf()
332 # set ourown timezone and propagate it to other satellites
333 self
.conf
.propagate_timezone_option()
335 # Look for business rules, and create teh dep trees
336 self
.conf
.create_business_rules()
338 self
.conf
.create_business_rules_dependencies()
341 # Warn about useless parameters in Shinken
342 self
.conf
.notice_about_useless_parameters()
344 # Manage all post-conf modules
345 self
.hook_point('late_configuration')
348 self
.conf
.is_correct()
350 #If the conf is not correct, we must get out now
351 if not self
.conf
.conf_is_correct
:
352 sys
.exit("Configuration is incorrect, sorry, I bail out")
354 # REF: doc/shinken-conf-dispatching.png (2)
355 logger
.log("Cutting the hosts and services into parts")
356 self
.confs
= self
.conf
.cut_into_parts()
358 # The conf can be incorrect here if the cut into parts see errors like
359 # a realm with hosts and not schedulers for it
360 if not self
.conf
.conf_is_correct
:
361 sys
.exit("Configuration is incorrect, sorry, I bail out")
363 logger
.log('Things look okay - No serious problems were detected during the pre-flight check')
365 # Exit if we are just here for config checking
369 # Some properties need to be "flatten" (put in strings)
370 # before being send, like realms for hosts for example
371 # BEWARE: after the cutting part, because we stringify some properties
372 self
.conf
.prepare_for_sending()
374 # Ok, here we must check if we go on or not.
375 # TODO : check OK or not
376 self
.pidfile
= self
.conf
.lock_file
377 self
.idontcareaboutsecurity
= self
.conf
.idontcareaboutsecurity
378 self
.user
= self
.conf
.shinken_user
379 self
.group
= self
.conf
.shinken_group
380 self
.workdir
= os
.path
.expanduser('~'+self
.user
)
382 ## We need to set self.host & self.port to be used by do_daemon_init_and_start
383 self
.host
= self
.me
.address
384 self
.port
= self
.me
.port
386 logger
.log("Configuration Loaded")
392 for line
in self
.get_header():
395 self
.load_config_file()
397 self
.do_daemon_init_and_start(self
.conf
)
398 self
.uri_arb
= self
.pyro_daemon
.register(self
.interface
, "ForArbiter")
400 # ok we are now fully daemon (if requested)
401 # now we can start our "external" modules (if any) :
402 self
.modules_manager
.init_and_start_instances()
404 # Ok now we can load the retention data
406 self
.hook_point('load_retention')
408 ## And go for the main loop
412 def setup_new_conf(self
):
413 """ Setup a new conf received from a Master arbiter. """
418 for arb
in self
.conf
.arbiterlinks
:
419 if (arb
.address
, arb
.port
) == (self
.host
, self
.port
):
421 arb
.is_me
= lambda: True # we now definitively know who we are, just keep it.
423 arb
.is_me
= lambda: False # and we know who we are not, just keep it.
425 def do_loop_turn(self
):
426 # If I am a spare, I wait for the master arbiter to send me
429 self
.wait_for_initial_conf()
430 if not self
.new_conf
:
432 self
.setup_new_conf()
433 print "I must wait now"
434 self
.wait_for_master_death()
441 # Get 'objects' from external modules
442 # It can be used for get external commands for example
443 def get_objects_from_from_queues(self
):
444 for f
in self
.modules_manager
.get_external_from_queues():
445 print "Groking from module instance %s" % f
448 o
= f
.get(block
=False)
449 print "Got object :", o
454 # We wait (block) for arbiter to send us something
455 def wait_for_master_death(self
):
456 print "Waiting for master death"
458 self
.last_master_speack
= time
.time()
460 while not self
.interrupted
:
461 elapsed
, _
, tcdiff
= self
.handleRequests(timeout
)
462 # if there was a system Time Change (tcdiff) then we have to adapt last_master_speak:
464 self
.setup_new_conf()
466 self
.last_master_speack
+= tcdiff
468 self
.last_master_speack
= time
.time()
474 sys
.stdout
.write(".")
477 # Now check if master is dead or not
479 if now
- self
.last_master_speack
> 5:
480 print "Master is dead!!!"
486 # Before running, I must be sure who am I
487 # The arbiters change, so we must refound the new self.me
488 for arb
in self
.conf
.arbiterlinks
:
492 logger
.log("Begin to dispatch configurations to satellites")
493 self
.dispatcher
= Dispatcher(self
.conf
, self
.me
)
494 self
.dispatcher
.check_alive()
495 self
.dispatcher
.check_dispatch()
496 # REF: doc/shinken-conf-dispatching.png (3)
497 self
.dispatcher
.dispatch()
499 # Now we can get all initial broks for our satellites
500 self
.get_initial_broks_from_satellitelinks()
504 # Now create the external commander
506 e
= ExternalCommandManager(self
.conf
, 'dispatcher')
508 # Arbiter need to know about external command to activate it
510 self
.load_external_command(e
)
511 if self
.fifo
is not None:
512 suppl_socks
= [ self
.fifo
]
514 print "Run baby, run..."
517 while self
.must_run
and not self
.interrupted
:
519 elapsed
, ins
, _
= self
.handleRequests(timeout
, suppl_socks
)
521 # If FIFO, read external command
524 ext_cmds
= self
.external_command
.get()
526 for ext_cmd
in ext_cmds
:
527 self
.external_commands
.append(ext_cmd
)
529 self
.fifo
= self
.external_command
.open()
530 if self
.fifo
is not None:
531 suppl_socks
= [ self
.fifo
]
534 elapsed
+= now
- time
.time()
538 if timeout
> 0: # only continue if we are not over timeout
542 timeout
= 1.0 # reset the timeout value
544 # Call modules that manage a starting tick pass
545 self
.hook_point('tick')
547 self
.dispatcher
.check_alive()
548 self
.dispatcher
.check_dispatch()
549 # REF: doc/shinken-conf-dispatching.png (3)
550 self
.dispatcher
.dispatch()
551 self
.dispatcher
.check_bad_dispatch()
553 # Now get things from our module instances
554 self
.get_objects_from_from_queues()
556 # Maybe our satellites links raise new broks. Must reap them
557 self
.get_broks_from_satellitelinks()
559 # One broker is responsible for our broks,
560 # we must give him our broks
561 self
.push_broks_to_broker()
562 self
.get_external_commands_from_brokers()
563 self
.get_external_commands_from_receivers()
564 # send_conf_to_schedulers()
566 print "Nb Broks send:", self
.nb_broks_send
567 self
.nb_broks_send
= 0
569 # Now send all external commands to schedulers
570 for ext_cmd
in self
.external_commands
:
571 self
.external_command
.resolve_command(ext_cmd
)
572 # It's send, do not keep them
573 # TODO: check if really send. Queue by scheduler?
574 self
.external_commands
= []
576 # This function returns the part of the conf where are stored the daemons of
577 # a given daemon type
578 def get_daemons(self
, daemon_type
):
579 # We get the list of the daemons from their links
580 # 'schedulerlinks' for schedulers, 'arbiterlinks' for arbiters
581 # and 'pollers', 'brokers', 'reactionners' for the other
582 if (daemon_type
== 'scheduler' or daemon_type
== 'arbiter'):
583 daemon_links
= daemon_type
+'links'
585 daemon_links
= daemon_type
+'s'
587 if hasattr(self
.conf
, daemon_links
):
588 return getattr(self
.conf
, daemon_links
);
590 # If the links cannot be found, we have a problem
594 # Helper functions for retention modules
595 # So we give our broks and external commands
596 def get_retention_data(self
):
598 r
['broks'] = self
.broks
599 r
['external_commands'] = self
.external_commands
602 # Get back our data from a retention module
603 def restore_retention_data(self
, data
):
604 broks
= data
['broks']
605 external_commands
= data
['external_commands']
606 self
.broks
.update(broks
)
607 self
.external_commands
.extend(external_commands
)