Add : (Denis GERMAIN) check_shinken plugin and interface in the arbiter to get data.
[shinken.git] / shinken / dispatcher.py
blob550f618609a94eeee68540b3c3674c5e8c04cb60
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
24 #This is the class of the dispatcher. It's role is to dispatch
25 #configurations to other elements like schedulers, reactionner,
26 #pollers and brokers. It is responsible for hight avaibility part. If an
27 #element die and the element type have a spare, it send the confi of the
28 #dead to the spare
31 from shinken.util import alive_then_spare_then_deads
32 from shinken.log import logger
34 #Dispatcher Class
35 class Dispatcher:
36 #Load all elements, set them no assigned
37 #and add them to elements, so loop will be easier :)
38 def __init__(self, conf, arbiter):
39 self.arbiter = arbiter
40 #Pointer to the whole conf
41 self.conf = conf
42 self.realms = conf.realms
43 #Direct pointer to importants elements for us
44 self.arbiters = self.conf.arbiterlinks
45 self.schedulers = self.conf.schedulerlinks
46 self.reactionners = self.conf.reactionners
47 self.brokers = self.conf.brokers
48 self.pollers = self.conf.pollers
49 self.dispatch_queue = { 'schedulers': [], 'reactionners': [],
50 'brokers': [], 'pollers': [] }
51 self.elements = [] #all elements, sched and satellites
52 self.satellites = [] #only satellites not schedulers
54 for cfg in self.conf.confs.values():
55 cfg.is_assigned = False
56 cfg.assigned_to = None
58 #Add satellites in the good lists
59 self.elements.extend(self.schedulers)
61 #Others are in 2 lists
62 self.elements.extend(self.reactionners)
63 self.satellites.extend(self.reactionners)
64 self.elements.extend(self.pollers)
65 self.satellites.extend(self.pollers)
66 self.elements.extend(self.brokers)
67 self.satellites.extend(self.brokers)
69 #Some flag about dispatch need or not
70 self.dispatch_ok = False
71 self.first_dispatch_done = False
74 #Prepare the satellites confs
75 for satellite in self.satellites:
76 satellite.prepare_for_conf()
77 #print ""*5,satellite.get_name(), "Spare?", satellite.spare, "manage_sub_realms?", satellite.manage_sub_realms
79 #Some properties must be give to satellites from global
80 #configuration, like the max_plugins_output_length to pollers
81 parameters = {'max_plugins_output_length' : self.conf.max_plugins_output_length}
82 for poller in self.pollers:
83 poller.add_global_conf_parameters(parameters)
85 #Now realm will have a cfg pool for satellites
86 for r in self.realms:
87 r.prepare_for_satellites_conf()
89 # Reset need_conf for all schedulers.
90 for s in self.schedulers:
91 s.need_conf = True
93 #checks alive elements
94 def check_alive(self):
95 for elt in self.elements:
96 elt.ping()
97 #print "Element", elt.get_name(), " alive:", elt.alive, "
99 #Not alive need new need_conf
100 #and spare too if they do not have already a conf
101 #REF: doc/shinken-scheduler-lost.png (1)
102 if not elt.alive or hasattr(elt, 'conf') and elt.conf is None:
103 elt.need_conf = True
105 for arb in self.arbiters:
106 #If not me...
107 if arb != self.arbiter:
108 arb.ping()
109 #print "Arb", arb.get_name(), "alive?", arb.alive, arb.__dict__
112 #Check if all active items are still alive
113 #the result go into self.dispatch_ok
114 #TODO : finish need conf
115 def check_dispatch(self):
116 #Check if the other arbiter have a conf
117 for arb in self.arbiters:
118 #If not me...
119 if arb != self.arbiter:
120 if not arb.have_conf(self.conf.magic_hash):
121 arb.put_conf(self.conf)
122 else:
123 #Ok, he already have the conf. I remember him that
124 #he do not have to run, I'm stil alive!
125 arb.do_not_run()
127 #We check for confs to be dispatched on alive scheds. If not dispatch, need dispatch :)
128 #and if dipatch on a failed node, remove the association, and need a new disaptch
129 for r in self.realms:
130 for cfg_id in r.confs:
131 sched = r.confs[cfg_id].assigned_to
132 if sched is None:
133 if self.first_dispatch_done:
134 logger.log("Scheduler configuration %d is unmanaged!!" % cfg_id)
135 self.dispatch_ok = False
136 else:
137 if not sched.alive:
138 self.dispatch_ok = False #so we ask a new dispatching
139 logger.log("Warning : Scheduler %s had the configuration %d but is dead, I am not happy." % (sched.get_name(), cfg_id))
140 sched.conf.assigned_to = None
141 sched.conf.is_assigned = False
142 sched.conf = None
143 #Else: ok the conf is managed by a living scheduler
145 #Maybe satelite are alive, but do not still have a cfg but
146 #I think so. It is not good. I ask a global redispatch for
147 #the cfg_id I think is not corectly dispatched.
148 for r in self.realms:
149 for cfg_id in r.confs:
150 try:
151 for kind in ( 'reactionner', 'poller', 'broker' ):
152 #We must have the good number of satellite or we are not happy
153 #So we are sure to raise a dispatch every loop a satellite is missing
154 if len(r.to_satellites_managed_by[kind][cfg_id]) < r.get_nb_of_must_have_satellites(kind):
155 logger.log("Warning : Missing satellite %s for configuration %d :" % (kind, cfg_id))
157 #TODO : less violent! Must resent to just who need?
158 #must be catch by satellite who see that it already have the conf (hash)
159 #and do nothing
160 self.dispatch_ok = False #so we will redispatch all
161 r.to_satellites_nb_assigned[kind][cfg_id] = 0
162 r.to_satellites_need_dispatch[kind][cfg_id] = True
163 r.to_satellites_managed_by[kind][cfg_id] = []
164 for satellite in r.to_satellites_managed_by[kind][cfg_id]:
165 #Maybe the sat was mark not alive, but still in
166 #to_satellites_managed_by that mean that a new dispatch
167 #is need
168 #Or maybe it is alive but I thought that this reactionner manage the conf
169 #but ot doesn't. I ask a full redispatch of these cfg for both cases
170 #DBG:
171 try :
172 satellite.reachable and cfg_id not in satellite.what_i_managed()
173 except TypeError, exp:
174 print "DBG: ERROR: (%s) for satellite %s" % (exp, satellite.__dict__)
175 satellite.reachable = False
177 if not satellite.alive or (satellite.reachable and cfg_id not in satellite.what_i_managed()):
178 logger.log('[%s] Warning : The %s %s seems to be down, I must re-dispatch its role to someone else.' % (r.get_name(), kind, satellite.get_name()))
179 self.dispatch_ok = False #so we will redispatch all
180 r.to_satellites_nb_assigned[kind][cfg_id] = 0
181 r.to_satellites_need_dispatch[kind][cfg_id] = True
182 r.to_satellites_managed_by[kind][cfg_id] = []
183 #At the first pass, there is no cfg_id in to_satellites_managed_by
184 except KeyError:
185 pass
188 #Imagine a world where... oups...
189 #Imagine a master got the conf, network down
190 #a spare take it (good :) ). Like the Empire, the master
191 #strike back! It was still alive! (like Elvis). It still got conf
192 #and is running! not good!
193 #Bad dispatch : a link that say have a conf but I do not allow this
194 #so I ask it to wait a new conf and stop kidding.
195 def check_bad_dispatch(self):
196 for elt in self.elements:
197 if hasattr(elt, 'conf'):
198 #If element have a conf, I do not care, it's a good dispatch
199 #If die : I do not ask it something, it won't respond..
200 if elt.conf is None and elt.reachable:
201 #print "Ask", elt.get_name() , 'if it got conf'
202 if elt.have_conf():
203 logger.log('Warning : The element %s have a conf and should not have one! I ask it to idle now' % elt.get_name())
204 elt.active = False
205 elt.wait_new_conf()
206 #I do not care about order not send or not. If not,
207 #The next loop wil resent it
208 #else:
209 # print "No conf"
211 #I ask satellite witch sched_id they manage. If I am not agree, I ask
212 #them to remove it
213 for satellite in self.satellites:
214 kind = satellite.get_my_type()
215 if satellite.reachable:
216 cfg_ids = satellite.what_i_managed()
217 # I do nto care about satellites that do nothing, it already
218 # do what I want :)
219 if len(cfg_ids) != 0:
220 id_to_delete = []
221 for cfg_id in cfg_ids:
222 #DBG print kind, ":", satellite.get_name(), "manage cfg id:", cfg_id
223 #Ok, we search for realm that have the conf
224 for r in self.realms:
225 if cfg_id in r.confs:
226 #Ok we've got the realm, we check it's to_satellites_managed_by
227 #to see if reactionner is in. If not, we remove he sched_id for it
228 if not satellite in r.to_satellites_managed_by[kind][cfg_id]:
229 id_to_delete.append(cfg_id)
230 #Maybe we removed all cfg_id of this reactionner
231 #We can make it idle, no active and wait_new_conf
232 if len(id_to_delete) == len(cfg_ids):
233 satellite.active = False
234 logger.log("I ask %s to wait a new conf" % satellite.get_name())
235 satellite.wait_new_conf()
236 else:#It is not fully idle, just less cfg
237 for id in id_to_delete:
238 logger.log("I ask to remove configuration N%d from %s" %(cfg_id, satellite.get_name()))
239 satellite.remove_from_conf(cfg_id)
242 #Make a ORDERED list of schedulers so we can
243 #send them conf in this order for a specific realm
244 def get_scheduler_ordered_list(self, r):
245 #get scheds, alive and no spare first
246 scheds = []
247 for s in r.schedulers:
248 scheds.append(s)
250 #now the spare scheds of higher realms
251 #they are after the sched of realm, so
252 #they will be used after the spare of
253 #the realm
254 for higher_r in r.higher_realms:
255 for s in higher_r.schedulers:
256 if s.spare:
257 scheds.append(s)
259 #Now we sort the scheds so we take master, then spare
260 #the dead, but we do not care about thems
261 scheds.sort(alive_then_spare_then_deads)
262 scheds.reverse() #pop is last, I need first
264 #DBG: dump
265 print_sched = [s.get_name() for s in scheds]
266 print_sched.reverse()
267 print_string = '[%s] Schedulers order : ' % r.get_name()
268 for s in print_sched:
269 print_string += '%s ' % s
270 logger.log(print_string)
271 #END DBG
273 return scheds
276 #Manage the dispatch
277 #REF: doc/shinken-conf-dispatching.png (3)
278 def dispatch(self):
279 #Ok, we pass at least one time in dispatch, so now errors are True errors
280 self.first_dispatch_done = True
282 #Is no need to dispatch, do not dispatch :)
283 if not self.dispatch_ok:
284 for r in self.realms:
285 logger.log("Dispatching Realm %s" % r.get_name())
286 conf_to_dispatch = [ cfg for cfg in r.confs.values() if not cfg.is_assigned ]
287 nb_conf = len(conf_to_dispatch)
288 logger.log('[%s] Dispatching %d/%d configurations' % (r.get_name(), nb_conf, len(r.confs)))
290 #Now we get in scheds all scheduler of this realm and upper so
291 #we will send them conf (in this order)
292 scheds = self.get_scheduler_ordered_list(r)
294 #Try to send only for alive members
295 scheds = [ s for s in scheds if s.alive ]
297 #Now we do the real job
298 #every_one_need_conf = False
299 for conf in conf_to_dispatch:
300 logger.log('[%s] Dispatching one configuration' % r.get_name())
302 #If there is no alive schedulers, not good...
303 if len(scheds) == 0:
304 logger.log('[%s] but there a no alive schedulers in this realm!' % r.get_name())
306 # we need to loop until the conf is assigned
307 # or when there are no more schedulers available
308 while True:
309 try:
310 sched = scheds.pop()
311 except IndexError: #No more schedulers.. not good, no loop
312 #need_loop = False
313 #The conf do not need to be dispatch
314 cfg_id = conf.id
315 for kind in ( 'reactionner', 'poller', 'broker' ):
316 r.to_satellites[kind][cfg_id] = None
317 r.to_satellites_nb_assigned[kind][cfg_id] = 0
318 r.to_satellites_need_dispatch[kind][cfg_id] = False
319 r.to_satellites_managed_by[kind][cfg_id] = []
320 break
322 logger.log('[%s] Trying to send conf %d to scheduler %s' % (r.get_name(), conf.id, sched.get_name()))
323 if not sched.need_conf:
324 logger.log('[%s] The scheduler %s do not need conf, sorry' % (r.get_name(), sched.get_name()))
325 continue
327 #every_one_need_conf = True
329 #We tag conf with the instance_name = scheduler_name
330 conf.instance_name = sched.scheduler_name
331 #REF: doc/shinken-conf-dispatching.png (3)
332 #REF: doc/shinken-scheduler-lost.png (2)
333 override_conf = sched.get_override_configuration()
334 satellites_for_sched = r.get_satellites_links_for_scheduler()
335 print "Want to give a satellites pack for the scheduler", satellites_for_sched
336 conf_package = (conf, override_conf, sched.modules, satellites_for_sched)
337 print "Try to put the conf", conf_package
338 is_sent = sched.put_conf(conf_package)
339 if not is_sent:
340 logger.log('[%s] Warning : Dispatch fault for scheduler %s' %(r.get_name(), sched.get_name()))
341 continue
343 logger.log('[%s] Dispatch OK of for conf in scheduler %s' % (r.get_name(), sched.get_name()))
344 sched.conf = conf
345 sched.need_conf = False
346 conf.is_assigned = True
347 conf.assigned_to = sched
349 #Now we generate the conf for satellites:
350 cfg_id = conf.id
351 for kind in ( 'reactionner', 'poller', 'broker' ):
352 r.to_satellites[kind][cfg_id] = sched.give_satellite_cfg()
353 r.to_satellites_nb_assigned[kind][cfg_id] = 0
354 r.to_satellites_need_dispatch[kind][cfg_id] = True
355 r.to_satellites_managed_by[kind][cfg_id] = []
357 #Ok, the conf is dispatch, no more loop for this
358 #configuration
359 break
361 #We pop conf to dispatch, so it must be no more conf...
362 conf_to_dispatch = [ cfg for cfg in self.conf.confs.values() if not cfg.is_assigned ]
363 nb_missed = len(conf_to_dispatch)
364 if nb_missed > 0:
365 logger.log("WARNING : All schedulers configurations are not dispatched, %d are missing" % nb_missed)
366 else:
367 logger.log("OK, all schedulers configurations are dispatched :)")
368 self.dispatch_ok = True
370 #Sched without conf in a dispatch ok are set to no need_conf
371 #so they do not raise dispatch where no use
372 if self.dispatch_ok:
373 for sched in self.schedulers.items.values():
374 if sched.conf is None:
375 #print "Tagging sched", sched.get_name(), "so it do not ask anymore for conf"
376 sched.need_conf = False
379 arbiters_cfg = {}
380 for arb in self.arbiters:
381 arbiters_cfg[arb.id] = arb.give_satellite_cfg()
383 #We put the satellites conf with the "new" way so they see only what we want
384 for r in self.realms:
385 for cfg in r.confs.values():
386 cfg_id = cfg.id
387 for kind in ( 'reactionner', 'poller', 'broker' ):
388 if r.to_satellites_need_dispatch[kind][cfg_id]:
389 logger.log('[%s] Dispatching %s' % (r.get_name(),kind) + 's')
390 cfg_for_satellite_part = r.to_satellites[kind][cfg_id]
391 print "*"*10, "DBG: cfg_for_satellite_part", cfg_for_satellite_part, r.get_name(), cfg_id
393 #print "Sched Config part for ", kind+'s',":", cfg_for_satellite_part
394 #make copies of potential_react list for sort
395 satellites = []
396 for satellite in r.get_potential_satellites_by_type(kind):
397 satellites.append(satellite)
398 satellites.sort(alive_then_spare_then_deads)
399 satellite_string = "[%s] %s satellite order : " % (r.get_name(), kind)
400 for satellite in satellites:
401 satellite_string += '%s (spare:%s), ' % (satellite.get_name(), str(satellite.spare))
403 logger.log(satellite_string)
405 #Now we dispatch cfg to every one ask for it
406 nb_cfg_sent = 0
407 for satellite in satellites:
408 #Send only if we need, and if we can
409 if nb_cfg_sent < r.get_nb_of_must_have_satellites(kind) and satellite.alive:
410 logger.log('[%s] Trying to send configuration to %s %s' %(r.get_name(), kind, satellite.get_name()))
411 #cfg_for_satellite = {'schedulers' : {cfg_id : cfg_for_satellite_part}}
412 satellite.cfg['schedulers'][cfg_id] = cfg_for_satellite_part
413 if satellite.manage_arbiters:
414 satellite.cfg['arbiters'] = arbiters_cfg
416 #Brokers should have poller/reactionners links too
417 if kind == "broker":
418 r.fill_broker_with_poller_reactionner_links(satellite)
420 is_sent = satellite.put_conf(satellite.cfg)#_for_satellite)
421 if is_sent:
422 satellite.active = True
423 logger.log('[%s] Dispatch OK of for configuration %s to %s %s' %(r.get_name(), cfg_id, kind, satellite.get_name()))
424 nb_cfg_sent += 1
425 r.to_satellites_managed_by[kind][cfg_id].append(satellite)
426 # else:
427 # #I've got enouth satellite, the next one are spare for me
428 r.to_satellites_nb_assigned[kind][cfg_id] = nb_cfg_sent
429 if nb_cfg_sent == r.get_nb_of_must_have_satellites(kind):
430 logger.log("[%s] OK, no more %s sent need" % (r.get_name(), kind))
431 r.to_satellites_need_dispatch[kind][cfg_id] = False