*Remove a leftover test file
[shinken.git] / shinken / dispatcher.py
blob2a8ec16b6eaf33216d4cd0df62fd9fc2f9b2cf71
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
22 #This is the class of the dispatcher. It's role is to dispatch
23 #configurations to other elements like schedulers, reactionner,
24 #pollers and brokers. It is responsible for hight avaibility part. If an
25 #element die and the element type have a spare, it send the confi of the
26 #dead to the spare
29 from shinken.util import alive_then_spare_then_deads
30 from shinken.log import logger
32 #Dispatcher Class
33 class Dispatcher:
34 #Load all elements, set them no assigned
35 #and add them to elements, so loop will be easier :)
36 def __init__(self, conf, arbiter):
37 self.arbiter = arbiter
38 #Pointer to the whole conf
39 self.conf = conf
40 self.realms = conf.realms
41 #Direct pointer to importants elements for us
42 self.arbiters = self.conf.arbiterlinks
43 self.schedulers = self.conf.schedulerlinks
44 self.reactionners = self.conf.reactionners
45 self.brokers = self.conf.brokers
46 self.pollers = self.conf.pollers
47 self.dispatch_queue = {'schedulers' : [], 'reactionners' : [],
48 'brokers' : [], 'pollers' : []}
49 self.elements = [] #all elements, sched and satellites
50 self.satellites = [] #only satellites not schedulers
52 for cfg in self.conf.confs.values():
53 cfg.is_assigned = False
54 cfg.assigned_to = None
56 #Add satellites in the good lists
57 self.elements.extend(self.schedulers)
59 #Others are in 2 lists
60 self.elements.extend(self.reactionners)
61 self.satellites.extend(self.reactionners)
62 self.elements.extend(self.pollers)
63 self.satellites.extend(self.pollers)
64 self.elements.extend(self.brokers)
65 self.satellites.extend(self.brokers)
67 #Some flag about dispatch need or not
68 self.dispatch_ok = False
69 self.first_dispatch_done = False
72 #Prepare the satellites confs
73 for satellite in self.satellites:
74 satellite.prepare_for_conf()
75 #print ""*5,satellite.get_name(), "Spare?", satellite.spare, "manage_sub_realms?", satellite.manage_sub_realms
77 #Some properties must be give to satellites from global
78 #configuration, like the max_plugins_output_length to pollers
79 parameters = {'max_plugins_output_length' : self.conf.max_plugins_output_length}
80 for poller in self.pollers:
81 poller.add_global_conf_parameters(parameters)
83 #Now realm will have a cfg pool for satellites
84 for r in self.realms:
85 r.prepare_for_satellites_conf()
88 #checks alive elements
89 def check_alive(self):
90 for elt in self.elements:
91 elt.ping()
92 #print "Element", elt.get_name(), " alive:", elt.alive, "
94 #Not alive need new need_conf
95 #and spare too if they do not have already a conf
96 #REF: doc/shinken-scheduler-lost.png (1)
97 if not elt.alive or hasattr(elt, 'conf') and elt.conf == None:
98 elt.need_conf = True
100 for arb in self.arbiters:
101 #If not me...
102 if arb != self.arbiter:
103 arb.ping()
104 #print "Arb", arb.get_name(), "alive?", arb.alive, arb.__dict__
107 #Check if all active items are still alive
108 #the result go into self.dispatch_ok
109 #TODO : finish need conf
110 def check_dispatch(self):
111 #Check if the other arbiter have a conf
112 for arb in self.arbiters:
113 #If not me...
114 if arb != self.arbiter:
115 if not arb.have_conf(self.conf.magic_hash):
116 arb.put_conf(self.conf)
117 else:
118 #Ok, he already have the conf. I remember him that
119 #he do not have to run, I'm stil alive!
120 arb.do_not_run()
123 #We check for confs to be dispatched on alive scheds. If not dispatch, need dispatch :)
124 #and if dipatch on a failed node, remove the association, and need a new disaptch
125 for r in self.realms:
126 for cfg_id in r.confs:
127 sched = r.confs[cfg_id].assigned_to
128 if sched == None:
129 if self.first_dispatch_done:
130 logger.log("Scheduler configuration %d is unmanaged!!" % cfg_id)
131 self.dispatch_ok = False
132 else:
133 if not sched.alive:
134 self.dispatch_ok = False #so we ask a new dispatching
135 logger.log("Warning : Scheduler %s had the configuration %d but is dead, I am not happy." % (sched.get_name(), cfg_id))
136 sched.conf.assigned_to = None
137 sched.conf.is_assigned = False
138 sched.conf = None
139 #Else: ok the conf is managed by a living scheduler
141 #Maybe satelite are alive, but do not still have a cfg but
142 #I think so. It is not good. I ask a global redispatch for
143 #the cfg_id I think is not corectly dispatched.
144 for r in self.realms:
145 for cfg_id in r.confs:
146 try:
147 for kind in ['reactionner', 'poller', 'broker']:
148 #We must have the good number of satellite or we are not happy
149 #So we are sure to raise a dispatch every loop a satellite is missing
150 if len(r.to_satellites_managed_by[kind][cfg_id]) < r.get_nb_of_must_have_satellites(kind):
151 logger.log("Warning : Missing satellite %s for configuration %d :" % (kind, cfg_id))
153 #TODO : less violent! Must resent to just who need?
154 #must be catch by satellite who see that it already have the conf (hash)
155 #and do nothing
156 self.dispatch_ok = False #so we will redispatch all
157 r.to_satellites_nb_assigned[kind][cfg_id] = 0
158 r.to_satellites_need_dispatch[kind][cfg_id] = True
159 r.to_satellites_managed_by[kind][cfg_id] = []
160 for satellite in r.to_satellites_managed_by[kind][cfg_id]:
161 #Maybe the sat was mark not alive, but still in
162 #to_satellites_managed_by that mean that a new dispatch
163 #is need
164 #Or maybe it is alive but I thought that this reactionner manage the conf
165 #but ot doesn't. I ask a full redispatch of these cfg for both cases
166 #DBG:
167 try :
168 satellite.reachable and cfg_id not in satellite.what_i_managed()
169 except TypeError, exp:
170 print "DBG: ERROR: (%s) for satellite %s" % (exp, satellite.__dict__)
171 satellite.reachable = False
173 if not satellite.alive or (satellite.reachable and cfg_id not in satellite.what_i_managed()):
174 logger.log('[%s] Warning : The %s %s seems to be down, I must re-dispatch its role to someone else.' % (r.get_name(), kind, satellite.get_name()))
175 self.dispatch_ok = False #so we will redispatch all
176 r.to_satellites_nb_assigned[kind][cfg_id] = 0
177 r.to_satellites_need_dispatch[kind][cfg_id] = True
178 r.to_satellites_managed_by[kind][cfg_id] = []
179 #At the first pass, there is no cfg_id in to_satellites_managed_by
180 except KeyError:
181 pass
184 #Imagine a world where... oups...
185 #Imagine a master got the conf, network down
186 #a spare take it (good :) ). Like the Empire, the master
187 #strike back! It was still alive! (like Elvis). It still got conf
188 #and is running! not good!
189 #Bad dispatch : a link that say have a conf but I do not allow this
190 #so I ask it to wait a new conf and stop kidding.
191 def check_bad_dispatch(self):
192 for elt in self.elements:
193 if hasattr(elt, 'conf'):
194 #If element have a conf, I do not care, it's a good dispatch
195 #If die : I do not ask it something, it won't respond..
196 if elt.conf == None and elt.reachable:
197 #print "Ask", elt.get_name() , 'if it got conf'
198 if elt.have_conf():
199 logger.log('Warning : The element %s have a conf and should not have one! I ask it to idle now' % elt.get_name())
200 elt.active = False
201 elt.wait_new_conf()
202 #I do not care about order not send or not. If not,
203 #The next loop wil resent it
204 #else:
205 # print "No conf"
207 #I ask satellite witch sched_id they manage. If I am not agree, I ask
208 #them to remove it
209 for satellite in self.satellites:
210 kind = satellite.get_my_type()
211 if satellite.reachable:
212 cfg_ids = satellite.what_i_managed()
213 #I do nto care about satellites that do nothing, it already
214 #do what I want :)
215 if len(cfg_ids) != 0:
216 id_to_delete = []
217 for cfg_id in cfg_ids:
218 #DBG print kind, ":", satellite.get_name(), "manage cfg id:", cfg_id
219 #Ok, we search for realm that have the conf
220 for r in self.realms:
221 if cfg_id in r.confs:
222 #Ok we've got the realm, we check it's to_satellites_managed_by
223 #to see if reactionner is in. If not, we remove he sched_id for it
224 if not satellite in r.to_satellites_managed_by[kind][cfg_id]:
225 id_to_delete.append(cfg_id)
226 #Maybe we removed all cfg_id of this reactionner
227 #We can make it idle, no active and wait_new_conf
228 if len(id_to_delete) == len(cfg_ids):
229 satellite.active = False
230 logger.log("I ask %s to wait a new conf" % satellite.get_name())
231 satellite.wait_new_conf()
232 else:#It is not fully idle, just less cfg
233 for id in id_to_delete:
234 logger.log("I ask to remove configuration N%d from %s" %(cfg_id, satellite.get_name()))
235 satellite.remove_from_conf(cfg_id)
238 #Make a ORDERED list of schedulers so we can
239 #send them conf in this order for a specific realm
240 def get_scheduler_ordered_list(self, r):
241 #get scheds, alive and no spare first
242 scheds = []
243 for s in r.schedulers:
244 scheds.append(s)
246 #now the spare scheds of higher realms
247 #they are after the sched of realm, so
248 #they will be used after the spare of
249 #the realm
250 for higher_r in r.higher_realms:
251 for s in higher_r.schedulers:
252 if s.spare:
253 scheds.append(s)
255 #Now we sort the scheds so we take master, then spare
256 #the dead, but we do not care about thems
257 scheds.sort(alive_then_spare_then_deads)
258 scheds.reverse() #pop is last, I need first
260 #DBG: dump
261 print_sched = [s.get_name() for s in scheds]
262 print_sched.reverse()
263 print_string = '[%s] Schedulers order : ' % r.get_name()
264 for s in print_sched:
265 print_string += '%s ' % s
266 logger.log(print_string)
267 #END DBG
269 return scheds
272 #Manage the dispatch
273 #REF: doc/shinken-conf-dispatching.png (3)
274 def dispatch(self):
275 #Ok, we pass at least one time in dispatch, so now errors are True errors
276 self.first_dispatch_done = True
278 #Is no need to dispatch, do not dispatch :)
279 if not self.dispatch_ok:
280 for r in self.realms:
281 logger.log("Dispatching Realm %s" % r.get_name())
282 conf_to_dispatch = [cfg for cfg in r.confs.values() if cfg.is_assigned==False]
283 nb_conf = len(conf_to_dispatch)
284 logger.log('[%s] Dispatching %d/%d configurations' % (r.get_name(), nb_conf, len(r.confs)))
286 #Now we get in scheds all scheduler of this realm and upper so
287 #we will send them conf (in this order)
288 scheds = self.get_scheduler_ordered_list(r)
290 #Try to send only for alive members
291 scheds = [s for s in scheds if s.alive]
293 #Now we do the real job
294 every_one_need_conf = False
295 for conf in conf_to_dispatch:
296 logger.log('[%s] Dispatching one configuration' % r.get_name())
298 #If there is no alive schedulers, not good...
299 if len(scheds) == 0:
300 logger.log('[%s] but there a no alive schedulers in this realm!' % r.get_name())
302 #we need to loop until the conf is assigned
303 #or when there are no more schedulers available
304 need_loop = True
305 while need_loop:
306 try:
307 sched = scheds.pop()
308 logger.log('[%s] Trying to send conf %d to scheduler %s' % (r.get_name(), conf.id, sched.get_name()))
309 if sched.need_conf:
310 every_one_need_conf = True
312 #We tag conf with the instance_name = scheduler_name
313 conf.instance_name = sched.scheduler_name
314 #REF: doc/shinken-conf-dispatching.png (3)
315 #REF: doc/shinken-scheduler-lost.png (2)
316 override_conf = sched.get_override_configuration()
317 conf_package = (conf, override_conf, sched.modules)
318 is_sent = sched.put_conf(conf_package)
319 if is_sent:
320 logger.log('[%s] Dispatch OK of for conf in scheduler %s' % (r.get_name(), sched.get_name()))
321 sched.conf = conf
322 sched.need_conf = False
323 conf.is_assigned = True
324 conf.assigned_to = sched
325 #Ok, the conf is dispatch, no more loop for this
326 #configuration
327 need_loop = False
329 #Now we generate the conf for satellites:
330 cfg_id = conf.id
331 for kind in ['reactionner', 'poller', 'broker']:
332 r.to_satellites[kind][cfg_id] = sched.give_satellite_cfg()
333 r.to_satellites_nb_assigned[kind][cfg_id] = 0
334 r.to_satellites_need_dispatch[kind][cfg_id] = True
335 r.to_satellites_managed_by[kind][cfg_id] = []
336 else:
337 logger.log('[%s] Warning : Dispatch fault for scheduler %s' %(r.get_name(), sched.get_name()))
338 else:
339 logger.log('[%s] The scheduler %s do not need conf, sorry' % (r.get_name(), sched.get_name()))
340 except IndexError: #No more schedulers.. not good, no loop
341 need_loop = False
342 #The conf do not need to be dispatch
343 cfg_id = conf.id
344 for kind in ['reactionner', 'poller', 'broker']:
345 r.to_satellites[kind][cfg_id] = None
346 r.to_satellites_nb_assigned[kind][cfg_id] = 0
347 r.to_satellites_need_dispatch[kind][cfg_id] = False
348 r.to_satellites_managed_by[kind][cfg_id] = []
350 #We pop conf to dispatch, so it must be no more conf...
351 conf_to_dispatch = [cfg for cfg in self.conf.confs.values() if cfg.is_assigned==False]
352 nb_missed = len(conf_to_dispatch)
353 if nb_missed > 0:
354 logger.log("WARNING : All schedulers configurations are not dispatched, %d are missing" % nb_missed)
355 else:
356 logger.log("OK, all schedulers configurations are dispatched :)")
357 self.dispatch_ok = True
359 #Sched without conf in a dispatch ok are set to no need_conf
360 #so they do not raise dispatch where no use
361 if self.dispatch_ok:
362 for sched in self.schedulers.items.values():
363 if sched.conf == None:
364 #print "Tagging sched", sched.get_name(), "so it do not ask anymore for conf"
365 sched.need_conf = False
368 arbiters_cfg = {}
369 for arb in self.arbiters:
370 arbiters_cfg[arb.id] = arb.give_satellite_cfg()
372 #We put the satellites conf with the "new" way so they see only what we want
373 for r in self.realms:
374 for cfg in r.confs.values():
375 cfg_id = cfg.id
376 for kind in ['reactionner', 'poller', 'broker']:
377 if r.to_satellites_need_dispatch[kind][cfg_id]:
378 logger.log('[%s] Dispatching %s' % (r.get_name(),kind) + 's')
379 cfg_for_satellite_part = r.to_satellites[kind][cfg_id]
381 #print "Sched Config part for ", kind+'s',":", cfg_for_satellite_part
382 #make copies of potential_react list for sort
383 satellites = []
384 for satellite in r.get_potential_satellites_by_type(kind):
385 satellites.append(satellite)
386 satellites.sort(alive_then_spare_then_deads)
387 satellite_string = "[%s] %s satellite order : " % (r.get_name(), kind)
388 for satellite in satellites:
389 satellite_string += '%s (spare:%s), ' % (satellite.get_name(), str(satellite.spare))
391 logger.log(satellite_string)
393 #Now we dispatch cfg to every one ask for it
394 nb_cfg_sent = 0
395 for satellite in satellites:
396 #Send only if we need, and if we can
397 if nb_cfg_sent < r.get_nb_of_must_have_satellites(kind) and satellite.alive:
398 logger.log('[%s] Trying to send configuration to %s %s' %(r.get_name(), kind, satellite.get_name()))
399 #cfg_for_satellite = {'schedulers' : {cfg_id : cfg_for_satellite_part}}
400 satellite.cfg['schedulers'][cfg_id] = cfg_for_satellite_part
401 if satellite.manage_arbiters:
402 satellite.cfg['arbiters'] = arbiters_cfg
403 #Brokers should have poller/reactionners links too
404 if kind == "broker":
405 r.fill_broker_with_poller_reactionner_links(satellite)
406 #cfg_for_satellite['modules'] = satellite.modules
407 is_sent = satellite.put_conf(satellite.cfg)#_for_satellite)
408 if is_sent:
409 satellite.active = True
410 logger.log('[%s] Dispatch OK of for configuration %s to %s %s' %(r.get_name(), cfg_id, kind, satellite.get_name()))
411 nb_cfg_sent += 1
412 r.to_satellites_managed_by[kind][cfg_id].append(satellite)
413 #else:
414 # #I've got enouth satellite, the next one are spare for me
415 r.to_satellites_nb_assigned[kind][cfg_id] = nb_cfg_sent
416 if nb_cfg_sent == r.get_nb_of_must_have_satellites(kind):
417 logger.log("[%s] OK, no more %s sent need" % (r.get_name(), kind))
418 r.to_satellites_need_dispatch[kind][cfg_id] = False