Add : service without host will be just droped, like Nagios.
[shinken.git] / shinken / dispatcher.py
blobcb703b31e0439b0afb38699d64ac61441eca4452
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 """
24 This is the class of the dispatcher. It's role is to dispatch
25 configurations to other elements like schedulers, reactionner,
26 pollers, receivers and brokers. It is responsible for hight avaibility part. If an
27 element die and the element type have a spare, it send the confi of the
28 dead to the spare
29 """
31 import random
32 import itertools
34 from shinken.util import alive_then_spare_then_deads
35 from shinken.log import logger
37 # Dispatcher Class
38 class Dispatcher:
39 # Load all elements, set them no assigned
40 # and add them to elements, so loop will be easier :)
41 def __init__(self, conf, arbiter):
42 self.arbiter = arbiter
43 # Pointer to the whole conf
44 self.conf = conf
45 self.realms = conf.realms
46 # Direct pointer to importants elements for us
47 self.arbiters = self.conf.arbiterlinks
48 self.schedulers = self.conf.schedulerlinks
49 self.reactionners = self.conf.reactionners
50 self.brokers = self.conf.brokers
51 self.receivers = self.conf.receivers
52 self.pollers = self.conf.pollers
53 self.dispatch_queue = { 'schedulers': [], 'reactionners': [],
54 'brokers': [], 'pollers': [] , 'receivers' : []}
55 self.elements = [] #all elements, sched and satellites
56 self.satellites = [] #only satellites not schedulers
58 for cfg in self.conf.confs.values():
59 cfg.is_assigned = False
60 cfg.assigned_to = None
62 #Add satellites in the good lists
63 self.elements.extend(self.schedulers)
65 #Others are in 2 lists
66 self.elements.extend(self.reactionners)
67 self.satellites.extend(self.reactionners)
68 self.elements.extend(self.pollers)
69 self.satellites.extend(self.pollers)
70 self.elements.extend(self.brokers)
71 self.satellites.extend(self.brokers)
72 self.elements.extend(self.receivers)
73 self.satellites.extend(self.receivers)
75 #Some flag about dispatch need or not
76 self.dispatch_ok = False
77 self.first_dispatch_done = False
79 #Prepare the satellites confs
80 for satellite in self.satellites:
81 satellite.prepare_for_conf()
83 #Some properties must be give to satellites from global
84 #configuration, like the max_plugins_output_length to pollers
85 parameters = {'max_plugins_output_length' : self.conf.max_plugins_output_length}
86 for poller in self.pollers:
87 poller.add_global_conf_parameters(parameters)
89 #Now realm will have a cfg pool for satellites
90 for r in self.realms:
91 r.prepare_for_satellites_conf()
93 # Reset need_conf for all schedulers.
94 for s in self.schedulers:
95 s.need_conf = True
96 # Same for receivers
97 for rec in self.receivers:
98 rec.need_conf = True
100 #checks alive elements
101 def check_alive(self):
102 for elt in self.elements:
103 elt.ping()
104 #print "Element", elt.get_name(), " alive:", elt.alive, "
106 #Not alive need new need_conf
107 #and spare too if they do not have already a conf
108 #REF: doc/shinken-scheduler-lost.png (1)
109 if not elt.alive or hasattr(elt, 'conf') and elt.conf is None:
110 elt.need_conf = True
112 for arb in self.arbiters:
113 #If not me...
114 if arb != self.arbiter:
115 arb.ping()
116 #print "Arb", arb.get_name(), "alive?", arb.alive, arb.__dict__
119 # Check if all active items are still alive
120 # the result go into self.dispatch_ok
121 # TODO : finish need conf
122 def check_dispatch(self):
123 # Check if the other arbiter have a conf
124 for arb in self.arbiters:
125 # If not me...
126 if arb != self.arbiter:
127 if not arb.have_conf(self.conf.magic_hash):
128 arb.put_conf(self.conf)
129 else:
130 # Ok, he already have the conf. I remember him that
131 # he do not have to run, I'm stil alive!
132 arb.do_not_run()
134 # We check for confs to be dispatched on alive scheds. If not dispatch, need dispatch :)
135 # and if dipatch on a failed node, remove the association, and need a new disaptch
136 for r in self.realms:
137 for cfg_id in r.confs:
138 sched = r.confs[cfg_id].assigned_to
139 if sched is None:
140 if self.first_dispatch_done:
141 logger.log("Scheduler configuration %d is unmanaged!!" % cfg_id)
142 self.dispatch_ok = False
143 else:
144 if not sched.alive:
145 self.dispatch_ok = False #so we ask a new dispatching
146 logger.log("Warning : Scheduler %s had the configuration %d but is dead, I am not happy." % (sched.get_name(), cfg_id))
147 sched.conf.assigned_to = None
148 sched.conf.is_assigned = False
149 sched.conf = None
150 # Else: ok the conf is managed by a living scheduler
152 # Maybe satelite are alive, but do not still have a cfg but
153 # I think so. It is not good. I ask a global redispatch for
154 # the cfg_id I think is not corectly dispatched.
155 for r in self.realms:
156 for cfg_id in r.confs:
157 try:
158 for kind in ( 'reactionner', 'poller', 'broker' ):
159 # We must have the good number of satellite or we are not happy
160 # So we are sure to raise a dispatch every loop a satellite is missing
161 if len(r.to_satellites_managed_by[kind][cfg_id]) < r.get_nb_of_must_have_satellites(kind):
162 logger.log("Warning : Missing satellite %s for configuration %d :" % (kind, cfg_id))
164 # TODO : less violent! Must resent to just who need?
165 # must be catch by satellite who see that it already have the conf (hash)
166 # and do nothing
167 self.dispatch_ok = False #so we will redispatch all
168 r.to_satellites_need_dispatch[kind][cfg_id] = True
169 r.to_satellites_managed_by[kind][cfg_id] = []
170 for satellite in r.to_satellites_managed_by[kind][cfg_id]:
171 # Maybe the sat was mark not alive, but still in
172 # to_satellites_managed_by that mean that a new dispatch
173 # is need
174 # Or maybe it is alive but I thought that this reactionner manage the conf
175 # but ot doesn't. I ask a full redispatch of these cfg for both cases
176 # DBG:
177 try :
178 satellite.reachable and cfg_id not in satellite.what_i_managed()
179 except TypeError, exp:
180 print "DBG: ERROR: (%s) for satellite %s" % (exp, satellite.__dict__)
181 satellite.reachable = False
183 if not satellite.alive or (satellite.reachable and cfg_id not in satellite.what_i_managed()):
184 logger.log('[%s] Warning : The %s %s seems to be down, I must re-dispatch its role to someone else.' % (r.get_name(), kind, satellite.get_name()))
185 self.dispatch_ok = False #so we will redispatch all
186 r.to_satellites_need_dispatch[kind][cfg_id] = True
187 r.to_satellites_managed_by[kind][cfg_id] = []
188 # At the first pass, there is no cfg_id in to_satellites_managed_by
189 except KeyError:
190 pass
192 # Look for receivers. If they got conf, it's ok, if not, need a simple
193 # conf
194 for r in self.realms:
195 for rec in r.receivers:
196 # If the receiver do not have a conf, must got one :)
197 if rec.reachable and not rec.got_conf():
198 self.dispatch_ok = False #so we will redispatch all
199 rec.need_conf = True
205 # Imagine a world where... oups...
206 # Imagine a master got the conf, network down
207 # a spare take it (good :) ). Like the Empire, the master
208 # strike back! It was still alive! (like Elvis). It still got conf
209 # and is running! not good!
210 # Bad dispatch : a link that say have a conf but I do not allow this
211 # so I ask it to wait a new conf and stop kidding.
212 def check_bad_dispatch(self):
213 for elt in self.elements:
214 if hasattr(elt, 'conf'):
215 # If element have a conf, I do not care, it's a good dispatch
216 # If die : I do not ask it something, it won't respond..
217 if elt.conf is None and elt.reachable:
218 # print "Ask", elt.get_name() , 'if it got conf'
219 if elt.have_conf():
220 logger.log('Warning : The element %s have a conf and should not have one! I ask it to idle now' % elt.get_name())
221 elt.active = False
222 elt.wait_new_conf()
223 # I do not care about order not send or not. If not,
224 # The next loop wil resent it
225 # else:
226 # print "No conf"
228 # I ask satellite witch sched_id they manage. If I am not agree, I ask
229 # them to remove it
230 for satellite in self.satellites:
231 kind = satellite.get_my_type()
232 if satellite.reachable:
233 cfg_ids = satellite.what_i_managed()
234 # I do nto care about satellites that do nothing, it already
235 # do what I want :)
236 if len(cfg_ids) != 0:
237 id_to_delete = []
238 for cfg_id in cfg_ids:
239 # DBG print kind, ":", satellite.get_name(), "manage cfg id:", cfg_id
240 # Ok, we search for realm that have the conf
241 for r in self.realms:
242 if cfg_id in r.confs:
243 # Ok we've got the realm, we check it's to_satellites_managed_by
244 # to see if reactionner is in. If not, we remove he sched_id for it
245 if not satellite in r.to_satellites_managed_by[kind][cfg_id]:
246 id_to_delete.append(cfg_id)
247 # Maybe we removed all cfg_id of this reactionner
248 # We can make it idle, no active and wait_new_conf
249 if len(id_to_delete) == len(cfg_ids):
250 satellite.active = False
251 logger.log("I ask %s to wait a new conf" % satellite.get_name())
252 satellite.wait_new_conf()
253 else:#It is not fully idle, just less cfg
254 for id in id_to_delete:
255 logger.log("I ask to remove configuration N%d from %s" %(cfg_id, satellite.get_name()))
256 satellite.remove_from_conf(cfg_id)
259 # Make a ORDERED list of schedulers so we can
260 # send them conf in this order for a specific realm
261 def get_scheduler_ordered_list(self, r):
262 # get scheds, alive and no spare first
263 scheds = []
264 for s in r.schedulers:
265 scheds.append(s)
267 # now the spare scheds of higher realms
268 # they are after the sched of realm, so
269 # they will be used after the spare of
270 # the realm
271 for higher_r in r.higher_realms:
272 for s in higher_r.schedulers:
273 if s.spare:
274 scheds.append(s)
276 # Now we sort the scheds so we take master, then spare
277 # the dead, but we do not care about thems
278 scheds.sort(alive_then_spare_then_deads)
279 scheds.reverse() #pop is last, I need first
281 #DBG: dump
282 print_sched = [s.get_name() for s in scheds]
283 print_sched.reverse()
284 print_string = '[%s] Schedulers order : ' % r.get_name()
285 for s in print_sched:
286 print_string += '%s ' % s
287 logger.log(print_string)
288 #END DBG
290 return scheds
293 # Manage the dispatch
294 # REF: doc/shinken-conf-dispatching.png (3)
295 def dispatch(self):
296 # Ok, we pass at least one time in dispatch, so now errors are True errors
297 self.first_dispatch_done = True
299 # Is no need to dispatch, do not dispatch :)
300 if not self.dispatch_ok:
301 for r in self.realms:
302 logger.log("Dispatching Realm %s" % r.get_name())
303 conf_to_dispatch = [ cfg for cfg in r.confs.values() if not cfg.is_assigned ]
304 nb_conf = len(conf_to_dispatch)
305 logger.log('[%s] Dispatching %d/%d configurations' % (r.get_name(), nb_conf, len(r.confs)))
307 # Now we get in scheds all scheduler of this realm and upper so
308 # we will send them conf (in this order)
309 scheds = self.get_scheduler_ordered_list(r)
311 # Try to send only for alive members
312 scheds = [ s for s in scheds if s.alive ]
314 # Now we do the real job
315 # every_one_need_conf = False
316 for conf in conf_to_dispatch:
317 logger.log('[%s] Dispatching one configuration' % r.get_name())
319 # If there is no alive schedulers, not good...
320 if len(scheds) == 0:
321 logger.log('[%s] but there a no alive schedulers in this realm!' % r.get_name())
323 # we need to loop until the conf is assigned
324 # or when there are no more schedulers available
325 while True:
326 try:
327 sched = scheds.pop()
328 except IndexError: #No more schedulers.. not good, no loop
329 # need_loop = False
330 # The conf do not need to be dispatch
331 cfg_id = conf.id
332 for kind in ( 'reactionner', 'poller', 'broker' ):
333 r.to_satellites[kind][cfg_id] = None
334 r.to_satellites_need_dispatch[kind][cfg_id] = False
335 r.to_satellites_managed_by[kind][cfg_id] = []
336 break
338 logger.log('[%s] Trying to send conf %d to scheduler %s' % (r.get_name(), conf.id, sched.get_name()))
339 if not sched.need_conf:
340 logger.log('[%s] The scheduler %s do not need conf, sorry' % (r.get_name(), sched.get_name()))
341 continue
343 #every_one_need_conf = True
345 # We tag conf with the instance_name = scheduler_name
346 conf.instance_name = sched.scheduler_name
347 # REF: doc/shinken-conf-dispatching.png (3)
348 # REF: doc/shinken-scheduler-lost.png (2)
349 override_conf = sched.get_override_configuration()
350 satellites_for_sched = r.get_satellites_links_for_scheduler()
351 print "Want to give a satellites pack for the scheduler", satellites_for_sched
352 conf_package = (conf, override_conf, sched.modules, satellites_for_sched)
353 print "Try to put the conf", conf_package
354 is_sent = sched.put_conf(conf_package)
355 if not is_sent:
356 logger.log('[%s] Warning : Dispatch fault for scheduler %s' %(r.get_name(), sched.get_name()))
357 continue
359 logger.log('[%s] Dispatch OK of for conf in scheduler %s' % (r.get_name(), sched.get_name()))
360 sched.conf = conf
361 sched.need_conf = False
362 conf.is_assigned = True
363 conf.assigned_to = sched
365 # Now we generate the conf for satellites:
366 cfg_id = conf.id
367 for kind in ( 'reactionner', 'poller', 'broker' ):
368 r.to_satellites[kind][cfg_id] = sched.give_satellite_cfg()
369 r.to_satellites_need_dispatch[kind][cfg_id] = True
370 r.to_satellites_managed_by[kind][cfg_id] = []
372 # Ok, the conf is dispatch, no more loop for this
373 # configuration
374 break
376 # We pop conf to dispatch, so it must be no more conf...
377 conf_to_dispatch = [ cfg for cfg in self.conf.confs.values() if not cfg.is_assigned ]
378 nb_missed = len(conf_to_dispatch)
379 if nb_missed > 0:
380 logger.log("WARNING : All schedulers configurations are not dispatched, %d are missing" % nb_missed)
381 else:
382 logger.log("OK, all schedulers configurations are dispatched :)")
383 self.dispatch_ok = True
385 # Sched without conf in a dispatch ok are set to no need_conf
386 # so they do not raise dispatch where no use
387 if self.dispatch_ok:
388 for sched in self.schedulers.items.values():
389 if sched.conf is None:
390 # print "Tagging sched", sched.get_name(), "so it do not ask anymore for conf"
391 sched.need_conf = False
394 arbiters_cfg = {}
395 for arb in self.arbiters:
396 arbiters_cfg[arb.id] = arb.give_satellite_cfg()
398 # We put the satellites conf with the "new" way so they see only what we want
399 for r in self.realms:
400 for cfg in r.confs.values():
401 cfg_id = cfg.id
402 for kind in ( 'reactionner', 'poller', 'broker' ):
403 if r.to_satellites_need_dispatch[kind][cfg_id]:
404 logger.log('[%s] Dispatching %s' % (r.get_name(),kind) + 's')
405 cfg_for_satellite_part = r.to_satellites[kind][cfg_id]
406 print "*"*10, "DBG: cfg_for_satellite_part", cfg_for_satellite_part, r.get_name(), cfg_id
408 # make copies of potential_react list for sort
409 satellites = []
410 for satellite in r.get_potential_satellites_by_type(kind):
411 satellites.append(satellite)
412 satellites.sort(alive_then_spare_then_deads)
413 print "All broker"
414 for b in satellites:
415 print b.get_name(), b.alive
417 # Only keep alive Satellites
418 satellites = [s for s in satellites if s.alive]
420 # If we got a broker, we make the list to pop a new
421 # item first for each scheduler, so it will smooth the load
422 # Butthe spare must stay atteh end ;)
423 if kind == "broker":
424 nospare = [s for s in satellites if not s.spare]
425 #Should look over the list, not over
426 if len(nospare) != 0:
427 idx = cfg_id % len(nospare)
428 print "No spare", nospare
429 spares = [s for s in satellites if s.spare]
430 print "Spare", spares
431 print "Got 1", nospare[idx:]
432 print "Got 2", nospare[:-idx+1]
433 new_satellites = nospare[idx:]
434 new_satellites.extend(nospare[:-idx+1])
435 #print "New satellites", cfg_id, new_satellites
436 #for s in new_satellites:
437 # print "New satellites", cfg_id, s.get_name()
438 satellites = new_satellites
439 satellites.extend(spares)
441 satellite_string = "[%s] %s satellite order : " % (r.get_name(), kind)
442 for satellite in satellites:
443 satellite_string += '%s (spare:%s), ' % (satellite.get_name(), str(satellite.spare))
445 logger.log(satellite_string)
448 # Now we dispatch cfg to every one ask for it
449 nb_cfg_sent = 0
450 for satellite in satellites:
451 # Send only if we need, and if we can
452 if nb_cfg_sent < r.get_nb_of_must_have_satellites(kind) and satellite.alive:
453 logger.log('[%s] Trying to send configuration to %s %s' %(r.get_name(), kind, satellite.get_name()))
454 satellite.cfg['schedulers'][cfg_id] = cfg_for_satellite_part
455 if satellite.manage_arbiters:
456 satellite.cfg['arbiters'] = arbiters_cfg
458 # Brokers should have poller/reactionners links too
459 if kind == "broker":
460 r.fill_broker_with_poller_reactionner_links(satellite)
462 is_sent = satellite.put_conf(satellite.cfg)
463 if is_sent:
464 satellite.active = True
465 logger.log('[%s] Dispatch OK of for configuration %s to %s %s' %(r.get_name(), cfg_id, kind, satellite.get_name()))
466 nb_cfg_sent += 1
467 r.to_satellites_managed_by[kind][cfg_id].append(satellite)
469 # If we got a broker, the conf_id must be send to only ONE
470 # broker, so here it's done, we are happy.
471 if kind == "broker":
472 break
473 # else:
474 # #I've got enouth satellite, the next one are spare for me
475 if nb_cfg_sent == r.get_nb_of_must_have_satellites(kind):
476 logger.log("[%s] OK, no more %s sent need" % (r.get_name(), kind))
477 r.to_satellites_need_dispatch[kind][cfg_id] = False
480 # And now we dispatch receivers. It's mroe easy, they need ONE conf
481 # in all their life :)
482 for r in self.realms:
483 for rec in r.receivers:
484 if rec.need_conf:
485 logger.log('[%s] Trying to send configuration to receiver %s' %(r.get_name(), rec.get_name()))
486 is_sent = rec.put_conf(rec.cfg)
487 if is_sent:
488 rec.active = True
489 rec.need_conf = False
490 logger.log('[%s] Dispatch OK of for configuration to receiver %s' %(r.get_name(), rec.get_name()))