2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
24 This is the class of the dispatcher. It's role is to dispatch
25 configurations to other elements like schedulers, reactionner,
26 pollers, receivers and brokers. It is responsible for hight avaibility part. If an
27 element die and the element type have a spare, it send the confi of the
34 from shinken
.util
import alive_then_spare_then_deads
35 from shinken
.log
import logger
39 # Load all elements, set them no assigned
40 # and add them to elements, so loop will be easier :)
41 def __init__(self
, conf
, arbiter
):
42 self
.arbiter
= arbiter
43 # Pointer to the whole conf
45 self
.realms
= conf
.realms
46 # Direct pointer to importants elements for us
47 self
.arbiters
= self
.conf
.arbiterlinks
48 self
.schedulers
= self
.conf
.schedulerlinks
49 self
.reactionners
= self
.conf
.reactionners
50 self
.brokers
= self
.conf
.brokers
51 self
.receivers
= self
.conf
.receivers
52 self
.pollers
= self
.conf
.pollers
53 self
.dispatch_queue
= { 'schedulers': [], 'reactionners': [],
54 'brokers': [], 'pollers': [] , 'receivers' : []}
55 self
.elements
= [] #all elements, sched and satellites
56 self
.satellites
= [] #only satellites not schedulers
58 for cfg
in self
.conf
.confs
.values():
59 cfg
.is_assigned
= False
60 cfg
.assigned_to
= None
62 #Add satellites in the good lists
63 self
.elements
.extend(self
.schedulers
)
65 #Others are in 2 lists
66 self
.elements
.extend(self
.reactionners
)
67 self
.satellites
.extend(self
.reactionners
)
68 self
.elements
.extend(self
.pollers
)
69 self
.satellites
.extend(self
.pollers
)
70 self
.elements
.extend(self
.brokers
)
71 self
.satellites
.extend(self
.brokers
)
72 self
.elements
.extend(self
.receivers
)
73 self
.satellites
.extend(self
.receivers
)
75 #Some flag about dispatch need or not
76 self
.dispatch_ok
= False
77 self
.first_dispatch_done
= False
79 #Prepare the satellites confs
80 for satellite
in self
.satellites
:
81 satellite
.prepare_for_conf()
83 #Some properties must be give to satellites from global
84 #configuration, like the max_plugins_output_length to pollers
85 parameters
= {'max_plugins_output_length' : self
.conf
.max_plugins_output_length
}
86 for poller
in self
.pollers
:
87 poller
.add_global_conf_parameters(parameters
)
89 #Now realm will have a cfg pool for satellites
91 r
.prepare_for_satellites_conf()
93 # Reset need_conf for all schedulers.
94 for s
in self
.schedulers
:
97 for rec
in self
.receivers
:
100 #checks alive elements
101 def check_alive(self
):
102 for elt
in self
.elements
:
104 #print "Element", elt.get_name(), " alive:", elt.alive, "
106 #Not alive need new need_conf
107 #and spare too if they do not have already a conf
108 #REF: doc/shinken-scheduler-lost.png (1)
109 if not elt
.alive
or hasattr(elt
, 'conf') and elt
.conf
is None:
112 for arb
in self
.arbiters
:
114 if arb
!= self
.arbiter
:
116 #print "Arb", arb.get_name(), "alive?", arb.alive, arb.__dict__
119 # Check if all active items are still alive
120 # the result go into self.dispatch_ok
121 # TODO : finish need conf
122 def check_dispatch(self
):
123 # Check if the other arbiter have a conf
124 for arb
in self
.arbiters
:
126 if arb
!= self
.arbiter
:
127 if not arb
.have_conf(self
.conf
.magic_hash
):
128 arb
.put_conf(self
.conf
)
130 # Ok, he already have the conf. I remember him that
131 # he do not have to run, I'm stil alive!
134 # We check for confs to be dispatched on alive scheds. If not dispatch, need dispatch :)
135 # and if dipatch on a failed node, remove the association, and need a new disaptch
136 for r
in self
.realms
:
137 for cfg_id
in r
.confs
:
138 sched
= r
.confs
[cfg_id
].assigned_to
140 if self
.first_dispatch_done
:
141 logger
.log("Scheduler configuration %d is unmanaged!!" % cfg_id
)
142 self
.dispatch_ok
= False
145 self
.dispatch_ok
= False #so we ask a new dispatching
146 logger
.log("Warning : Scheduler %s had the configuration %d but is dead, I am not happy." % (sched
.get_name(), cfg_id
))
147 sched
.conf
.assigned_to
= None
148 sched
.conf
.is_assigned
= False
150 # Else: ok the conf is managed by a living scheduler
152 # Maybe satelite are alive, but do not still have a cfg but
153 # I think so. It is not good. I ask a global redispatch for
154 # the cfg_id I think is not corectly dispatched.
155 for r
in self
.realms
:
156 for cfg_id
in r
.confs
:
158 for kind
in ( 'reactionner', 'poller', 'broker' ):
159 # We must have the good number of satellite or we are not happy
160 # So we are sure to raise a dispatch every loop a satellite is missing
161 if len(r
.to_satellites_managed_by
[kind
][cfg_id
]) < r
.get_nb_of_must_have_satellites(kind
):
162 logger
.log("Warning : Missing satellite %s for configuration %d :" % (kind
, cfg_id
))
164 # TODO : less violent! Must resent to just who need?
165 # must be catch by satellite who see that it already have the conf (hash)
167 self
.dispatch_ok
= False #so we will redispatch all
168 r
.to_satellites_need_dispatch
[kind
][cfg_id
] = True
169 r
.to_satellites_managed_by
[kind
][cfg_id
] = []
170 for satellite
in r
.to_satellites_managed_by
[kind
][cfg_id
]:
171 # Maybe the sat was mark not alive, but still in
172 # to_satellites_managed_by that mean that a new dispatch
174 # Or maybe it is alive but I thought that this reactionner manage the conf
175 # but ot doesn't. I ask a full redispatch of these cfg for both cases
178 satellite
.reachable
and cfg_id
not in satellite
.what_i_managed()
179 except TypeError, exp
:
180 print "DBG: ERROR: (%s) for satellite %s" % (exp
, satellite
.__dict
__)
181 satellite
.reachable
= False
183 if not satellite
.alive
or (satellite
.reachable
and cfg_id
not in satellite
.what_i_managed()):
184 logger
.log('[%s] Warning : The %s %s seems to be down, I must re-dispatch its role to someone else.' % (r
.get_name(), kind
, satellite
.get_name()))
185 self
.dispatch_ok
= False #so we will redispatch all
186 r
.to_satellites_need_dispatch
[kind
][cfg_id
] = True
187 r
.to_satellites_managed_by
[kind
][cfg_id
] = []
188 # At the first pass, there is no cfg_id in to_satellites_managed_by
192 # Look for receivers. If they got conf, it's ok, if not, need a simple
194 for r
in self
.realms
:
195 for rec
in r
.receivers
:
196 # If the receiver do not have a conf, must got one :)
197 if rec
.reachable
and not rec
.got_conf():
198 self
.dispatch_ok
= False #so we will redispatch all
205 # Imagine a world where... oups...
206 # Imagine a master got the conf, network down
207 # a spare take it (good :) ). Like the Empire, the master
208 # strike back! It was still alive! (like Elvis). It still got conf
209 # and is running! not good!
210 # Bad dispatch : a link that say have a conf but I do not allow this
211 # so I ask it to wait a new conf and stop kidding.
212 def check_bad_dispatch(self
):
213 for elt
in self
.elements
:
214 if hasattr(elt
, 'conf'):
215 # If element have a conf, I do not care, it's a good dispatch
216 # If die : I do not ask it something, it won't respond..
217 if elt
.conf
is None and elt
.reachable
:
218 # print "Ask", elt.get_name() , 'if it got conf'
220 logger
.log('Warning : The element %s have a conf and should not have one! I ask it to idle now' % elt
.get_name())
223 # I do not care about order not send or not. If not,
224 # The next loop wil resent it
228 # I ask satellite witch sched_id they manage. If I am not agree, I ask
230 for satellite
in self
.satellites
:
231 kind
= satellite
.get_my_type()
232 if satellite
.reachable
:
233 cfg_ids
= satellite
.what_i_managed()
234 # I do nto care about satellites that do nothing, it already
236 if len(cfg_ids
) != 0:
238 for cfg_id
in cfg_ids
:
239 # DBG print kind, ":", satellite.get_name(), "manage cfg id:", cfg_id
240 # Ok, we search for realm that have the conf
241 for r
in self
.realms
:
242 if cfg_id
in r
.confs
:
243 # Ok we've got the realm, we check it's to_satellites_managed_by
244 # to see if reactionner is in. If not, we remove he sched_id for it
245 if not satellite
in r
.to_satellites_managed_by
[kind
][cfg_id
]:
246 id_to_delete
.append(cfg_id
)
247 # Maybe we removed all cfg_id of this reactionner
248 # We can make it idle, no active and wait_new_conf
249 if len(id_to_delete
) == len(cfg_ids
):
250 satellite
.active
= False
251 logger
.log("I ask %s to wait a new conf" % satellite
.get_name())
252 satellite
.wait_new_conf()
253 else:#It is not fully idle, just less cfg
254 for id in id_to_delete
:
255 logger
.log("I ask to remove configuration N%d from %s" %(cfg_id
, satellite
.get_name()))
256 satellite
.remove_from_conf(cfg_id
)
259 # Make a ORDERED list of schedulers so we can
260 # send them conf in this order for a specific realm
261 def get_scheduler_ordered_list(self
, r
):
262 # get scheds, alive and no spare first
264 for s
in r
.schedulers
:
267 # now the spare scheds of higher realms
268 # they are after the sched of realm, so
269 # they will be used after the spare of
271 for higher_r
in r
.higher_realms
:
272 for s
in higher_r
.schedulers
:
276 # Now we sort the scheds so we take master, then spare
277 # the dead, but we do not care about thems
278 scheds
.sort(alive_then_spare_then_deads
)
279 scheds
.reverse() #pop is last, I need first
282 print_sched
= [s
.get_name() for s
in scheds
]
283 print_sched
.reverse()
284 print_string
= '[%s] Schedulers order : ' % r
.get_name()
285 for s
in print_sched
:
286 print_string
+= '%s ' % s
287 logger
.log(print_string
)
293 # Manage the dispatch
294 # REF: doc/shinken-conf-dispatching.png (3)
296 # Ok, we pass at least one time in dispatch, so now errors are True errors
297 self
.first_dispatch_done
= True
299 # Is no need to dispatch, do not dispatch :)
300 if not self
.dispatch_ok
:
301 for r
in self
.realms
:
302 logger
.log("Dispatching Realm %s" % r
.get_name())
303 conf_to_dispatch
= [ cfg
for cfg
in r
.confs
.values() if not cfg
.is_assigned
]
304 nb_conf
= len(conf_to_dispatch
)
305 logger
.log('[%s] Dispatching %d/%d configurations' % (r
.get_name(), nb_conf
, len(r
.confs
)))
307 # Now we get in scheds all scheduler of this realm and upper so
308 # we will send them conf (in this order)
309 scheds
= self
.get_scheduler_ordered_list(r
)
311 # Try to send only for alive members
312 scheds
= [ s
for s
in scheds
if s
.alive
]
314 # Now we do the real job
315 # every_one_need_conf = False
316 for conf
in conf_to_dispatch
:
317 logger
.log('[%s] Dispatching one configuration' % r
.get_name())
319 # If there is no alive schedulers, not good...
321 logger
.log('[%s] but there a no alive schedulers in this realm!' % r
.get_name())
323 # we need to loop until the conf is assigned
324 # or when there are no more schedulers available
328 except IndexError: #No more schedulers.. not good, no loop
330 # The conf do not need to be dispatch
332 for kind
in ( 'reactionner', 'poller', 'broker' ):
333 r
.to_satellites
[kind
][cfg_id
] = None
334 r
.to_satellites_need_dispatch
[kind
][cfg_id
] = False
335 r
.to_satellites_managed_by
[kind
][cfg_id
] = []
338 logger
.log('[%s] Trying to send conf %d to scheduler %s' % (r
.get_name(), conf
.id, sched
.get_name()))
339 if not sched
.need_conf
:
340 logger
.log('[%s] The scheduler %s do not need conf, sorry' % (r
.get_name(), sched
.get_name()))
343 #every_one_need_conf = True
345 # We tag conf with the instance_name = scheduler_name
346 conf
.instance_name
= sched
.scheduler_name
347 # REF: doc/shinken-conf-dispatching.png (3)
348 # REF: doc/shinken-scheduler-lost.png (2)
349 override_conf
= sched
.get_override_configuration()
350 satellites_for_sched
= r
.get_satellites_links_for_scheduler()
351 print "Want to give a satellites pack for the scheduler", satellites_for_sched
352 conf_package
= (conf
, override_conf
, sched
.modules
, satellites_for_sched
)
353 print "Try to put the conf", conf_package
354 is_sent
= sched
.put_conf(conf_package
)
356 logger
.log('[%s] Warning : Dispatch fault for scheduler %s' %(r
.get_name(), sched
.get_name()))
359 logger
.log('[%s] Dispatch OK of for conf in scheduler %s' % (r
.get_name(), sched
.get_name()))
361 sched
.need_conf
= False
362 conf
.is_assigned
= True
363 conf
.assigned_to
= sched
365 # Now we generate the conf for satellites:
367 for kind
in ( 'reactionner', 'poller', 'broker' ):
368 r
.to_satellites
[kind
][cfg_id
] = sched
.give_satellite_cfg()
369 r
.to_satellites_need_dispatch
[kind
][cfg_id
] = True
370 r
.to_satellites_managed_by
[kind
][cfg_id
] = []
372 # Ok, the conf is dispatch, no more loop for this
376 # We pop conf to dispatch, so it must be no more conf...
377 conf_to_dispatch
= [ cfg
for cfg
in self
.conf
.confs
.values() if not cfg
.is_assigned
]
378 nb_missed
= len(conf_to_dispatch
)
380 logger
.log("WARNING : All schedulers configurations are not dispatched, %d are missing" % nb_missed
)
382 logger
.log("OK, all schedulers configurations are dispatched :)")
383 self
.dispatch_ok
= True
385 # Sched without conf in a dispatch ok are set to no need_conf
386 # so they do not raise dispatch where no use
388 for sched
in self
.schedulers
.items
.values():
389 if sched
.conf
is None:
390 # print "Tagging sched", sched.get_name(), "so it do not ask anymore for conf"
391 sched
.need_conf
= False
395 for arb
in self
.arbiters
:
396 arbiters_cfg
[arb
.id] = arb
.give_satellite_cfg()
398 # We put the satellites conf with the "new" way so they see only what we want
399 for r
in self
.realms
:
400 for cfg
in r
.confs
.values():
402 for kind
in ( 'reactionner', 'poller', 'broker' ):
403 if r
.to_satellites_need_dispatch
[kind
][cfg_id
]:
404 logger
.log('[%s] Dispatching %s' % (r
.get_name(),kind
) + 's')
405 cfg_for_satellite_part
= r
.to_satellites
[kind
][cfg_id
]
406 print "*"*10, "DBG: cfg_for_satellite_part", cfg_for_satellite_part
, r
.get_name(), cfg_id
408 # make copies of potential_react list for sort
410 for satellite
in r
.get_potential_satellites_by_type(kind
):
411 satellites
.append(satellite
)
412 satellites
.sort(alive_then_spare_then_deads
)
415 print b
.get_name(), b
.alive
417 # Only keep alive Satellites
418 satellites
= [s
for s
in satellites
if s
.alive
]
420 # If we got a broker, we make the list to pop a new
421 # item first for each scheduler, so it will smooth the load
422 # Butthe spare must stay atteh end ;)
424 nospare
= [s
for s
in satellites
if not s
.spare
]
425 #Should look over the list, not over
426 if len(nospare
) != 0:
427 idx
= cfg_id
% len(nospare
)
428 print "No spare", nospare
429 spares
= [s
for s
in satellites
if s
.spare
]
430 print "Spare", spares
431 print "Got 1", nospare
[idx
:]
432 print "Got 2", nospare
[:-idx
+1]
433 new_satellites
= nospare
[idx
:]
434 new_satellites
.extend(nospare
[:-idx
+1])
435 #print "New satellites", cfg_id, new_satellites
436 #for s in new_satellites:
437 # print "New satellites", cfg_id, s.get_name()
438 satellites
= new_satellites
439 satellites
.extend(spares
)
441 satellite_string
= "[%s] %s satellite order : " % (r
.get_name(), kind
)
442 for satellite
in satellites
:
443 satellite_string
+= '%s (spare:%s), ' % (satellite
.get_name(), str(satellite
.spare
))
445 logger
.log(satellite_string
)
448 # Now we dispatch cfg to every one ask for it
450 for satellite
in satellites
:
451 # Send only if we need, and if we can
452 if nb_cfg_sent
< r
.get_nb_of_must_have_satellites(kind
) and satellite
.alive
:
453 logger
.log('[%s] Trying to send configuration to %s %s' %(r
.get_name(), kind
, satellite
.get_name()))
454 satellite
.cfg
['schedulers'][cfg_id
] = cfg_for_satellite_part
455 if satellite
.manage_arbiters
:
456 satellite
.cfg
['arbiters'] = arbiters_cfg
458 # Brokers should have poller/reactionners links too
460 r
.fill_broker_with_poller_reactionner_links(satellite
)
462 is_sent
= satellite
.put_conf(satellite
.cfg
)
464 satellite
.active
= True
465 logger
.log('[%s] Dispatch OK of for configuration %s to %s %s' %(r
.get_name(), cfg_id
, kind
, satellite
.get_name()))
467 r
.to_satellites_managed_by
[kind
][cfg_id
].append(satellite
)
469 # If we got a broker, the conf_id must be send to only ONE
470 # broker, so here it's done, we are happy.
474 # #I've got enouth satellite, the next one are spare for me
475 if nb_cfg_sent
== r
.get_nb_of_must_have_satellites(kind
):
476 logger
.log("[%s] OK, no more %s sent need" % (r
.get_name(), kind
))
477 r
.to_satellites_need_dispatch
[kind
][cfg_id
] = False
480 # And now we dispatch receivers. It's mroe easy, they need ONE conf
481 # in all their life :)
482 for r
in self
.realms
:
483 for rec
in r
.receivers
:
485 logger
.log('[%s] Trying to send configuration to receiver %s' %(r
.get_name(), rec
.get_name()))
486 is_sent
= rec
.put_conf(rec
.cfg
)
489 rec
.need_conf
= False
490 logger
.log('[%s] Dispatch OK of for configuration to receiver %s' %(r
.get_name(), rec
.get_name()))