2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
22 #This is the class of the dispatcher. It's role is to dispatch
23 #configurations to other elements like schedulers, reactionner,
24 #pollers and brokers. It is responsible for hight avaibility part. If an
25 #element die and the element type have a spare, it send the confi of the
29 from shinken
.util
import alive_then_spare_then_deads
30 from shinken
.log
import logger
34 #Load all elements, set them no assigned
35 #and add them to elements, so loop will be easier :)
36 def __init__(self
, conf
, arbiter
):
37 self
.arbiter
= arbiter
38 #Pointer to the whole conf
40 self
.realms
= conf
.realms
41 #Direct pointer to importants elements for us
42 self
.arbiters
= self
.conf
.arbiterlinks
43 self
.schedulers
= self
.conf
.schedulerlinks
44 self
.reactionners
= self
.conf
.reactionners
45 self
.brokers
= self
.conf
.brokers
46 self
.pollers
= self
.conf
.pollers
47 self
.dispatch_queue
= {'schedulers' : [], 'reactionners' : [],
48 'brokers' : [], 'pollers' : []}
49 self
.elements
= [] #all elements, sched and satellites
50 self
.satellites
= [] #only satellites not schedulers
52 for cfg
in self
.conf
.confs
.values():
53 cfg
.is_assigned
= False
54 cfg
.assigned_to
= None
56 #Add satellites in the good lists
57 self
.elements
.extend(self
.schedulers
)
59 #Others are in 2 lists
60 self
.elements
.extend(self
.reactionners
)
61 self
.satellites
.extend(self
.reactionners
)
62 self
.elements
.extend(self
.pollers
)
63 self
.satellites
.extend(self
.pollers
)
64 self
.elements
.extend(self
.brokers
)
65 self
.satellites
.extend(self
.brokers
)
67 #Some flag about dispatch need or not
68 self
.dispatch_ok
= False
69 self
.first_dispatch_done
= False
72 #Prepare the satellites confs
73 for satellite
in self
.satellites
:
74 satellite
.prepare_for_conf()
75 #print ""*5,satellite.get_name(), "Spare?", satellite.spare, "manage_sub_realms?", satellite.manage_sub_realms
77 #Some properties must be give to satellites from global
78 #configuration, like the max_plugins_output_length to pollers
79 parameters
= {'max_plugins_output_length' : self
.conf
.max_plugins_output_length
}
80 for poller
in self
.pollers
:
81 poller
.add_global_conf_parameters(parameters
)
83 #Now realm will have a cfg pool for satellites
85 r
.prepare_for_satellites_conf()
88 #checks alive elements
89 def check_alive(self
):
90 for elt
in self
.elements
:
92 #print "Element", elt.get_name(), " alive:", elt.alive, "
94 #Not alive need new need_conf
95 #and spare too if they do not have already a conf
96 #REF: doc/shinken-scheduler-lost.png (1)
97 if not elt
.alive
or hasattr(elt
, 'conf') and elt
.conf
== None:
100 for arb
in self
.arbiters
:
102 if arb
!= self
.arbiter
:
104 #print "Arb", arb.get_name(), "alive?", arb.alive, arb.__dict__
107 #Check if all active items are still alive
108 #the result go into self.dispatch_ok
109 #TODO : finish need conf
110 def check_dispatch(self
):
111 #Check if the other arbiter have a conf
112 for arb
in self
.arbiters
:
114 if arb
!= self
.arbiter
:
115 if not arb
.have_conf(self
.conf
.magic_hash
):
116 arb
.put_conf(self
.conf
)
118 #Ok, he already have the conf. I remember him that
119 #he do not have to run, I'm stil alive!
123 #We check for confs to be dispatched on alive scheds. If not dispatch, need dispatch :)
124 #and if dipatch on a failed node, remove the association, and need a new disaptch
125 for r
in self
.realms
:
126 for cfg_id
in r
.confs
:
127 sched
= r
.confs
[cfg_id
].assigned_to
129 if self
.first_dispatch_done
:
130 logger
.log("Scheduler configuration %d is unmanaged!!" % cfg_id
)
131 self
.dispatch_ok
= False
134 self
.dispatch_ok
= False #so we ask a new dispatching
135 logger
.log("Warning : Scheduler %s had the configuration %d but is dead, I am not happy." % (sched
.get_name(), cfg_id
))
136 sched
.conf
.assigned_to
= None
137 sched
.conf
.is_assigned
= False
139 #Else: ok the conf is managed by a living scheduler
141 #Maybe satelite are alive, but do not still have a cfg but
142 #I think so. It is not good. I ask a global redispatch for
143 #the cfg_id I think is not corectly dispatched.
144 for r
in self
.realms
:
145 for cfg_id
in r
.confs
:
147 for kind
in ['reactionner', 'poller', 'broker']:
148 #We must have the good number of satellite or we are not happy
149 #So we are sure to raise a dispatch every loop a satellite is missing
150 if len(r
.to_satellites_managed_by
[kind
][cfg_id
]) < r
.get_nb_of_must_have_satellites(kind
):
151 logger
.log("Warning : Missing satellite %s for configuration %d :" % (kind
, cfg_id
))
153 #TODO : less violent! Must resent to just who need?
154 #must be catch by satellite who see that it already have the conf (hash)
156 self
.dispatch_ok
= False #so we will redispatch all
157 r
.to_satellites_nb_assigned
[kind
][cfg_id
] = 0
158 r
.to_satellites_need_dispatch
[kind
][cfg_id
] = True
159 r
.to_satellites_managed_by
[kind
][cfg_id
] = []
160 for satellite
in r
.to_satellites_managed_by
[kind
][cfg_id
]:
161 #Maybe the sat was mark not alive, but still in
162 #to_satellites_managed_by that mean that a new dispatch
164 #Or maybe it is alive but I thought that this reactionner manage the conf
165 #but ot doesn't. I ask a full redispatch of these cfg for both cases
168 satellite
.reachable
and cfg_id
not in satellite
.what_i_managed()
169 except TypeError, exp
:
170 print "DBG: ERROR: (%s) for satellite %s" % (exp
, satellite
.__dict
__)
171 satellite
.reachable
= False
173 if not satellite
.alive
or (satellite
.reachable
and cfg_id
not in satellite
.what_i_managed()):
174 logger
.log('[%s] Warning : The %s %s seems to be down, I must re-dispatch its role to someone else.' % (r
.get_name(), kind
, satellite
.get_name()))
175 self
.dispatch_ok
= False #so we will redispatch all
176 r
.to_satellites_nb_assigned
[kind
][cfg_id
] = 0
177 r
.to_satellites_need_dispatch
[kind
][cfg_id
] = True
178 r
.to_satellites_managed_by
[kind
][cfg_id
] = []
179 #At the first pass, there is no cfg_id in to_satellites_managed_by
184 #Imagine a world where... oups...
185 #Imagine a master got the conf, network down
186 #a spare take it (good :) ). Like the Empire, the master
187 #strike back! It was still alive! (like Elvis). It still got conf
188 #and is running! not good!
189 #Bad dispatch : a link that say have a conf but I do not allow this
190 #so I ask it to wait a new conf and stop kidding.
191 def check_bad_dispatch(self
):
192 for elt
in self
.elements
:
193 if hasattr(elt
, 'conf'):
194 #If element have a conf, I do not care, it's a good dispatch
195 #If die : I do not ask it something, it won't respond..
196 if elt
.conf
== None and elt
.reachable
:
197 #print "Ask", elt.get_name() , 'if it got conf'
199 logger
.log('Warning : The element %s have a conf and should not have one! I ask it to idle now' % elt
.get_name())
202 #I do not care about order not send or not. If not,
203 #The next loop wil resent it
207 #I ask satellite witch sched_id they manage. If I am not agree, I ask
209 for satellite
in self
.satellites
:
210 kind
= satellite
.get_my_type()
211 if satellite
.reachable
:
212 cfg_ids
= satellite
.what_i_managed()
213 #I do nto care about satellites that do nothing, it already
215 if len(cfg_ids
) != 0:
217 for cfg_id
in cfg_ids
:
218 #DBG print kind, ":", satellite.get_name(), "manage cfg id:", cfg_id
219 #Ok, we search for realm that have the conf
220 for r
in self
.realms
:
221 if cfg_id
in r
.confs
:
222 #Ok we've got the realm, we check it's to_satellites_managed_by
223 #to see if reactionner is in. If not, we remove he sched_id for it
224 if not satellite
in r
.to_satellites_managed_by
[kind
][cfg_id
]:
225 id_to_delete
.append(cfg_id
)
226 #Maybe we removed all cfg_id of this reactionner
227 #We can make it idle, no active and wait_new_conf
228 if len(id_to_delete
) == len(cfg_ids
):
229 satellite
.active
= False
230 logger
.log("I ask %s to wait a new conf" % satellite
.get_name())
231 satellite
.wait_new_conf()
232 else:#It is not fully idle, just less cfg
233 for id in id_to_delete
:
234 logger
.log("I ask to remove configuration N%d from %s" %(cfg_id
, satellite
.get_name()))
235 satellite
.remove_from_conf(cfg_id
)
238 #Make a ORDERED list of schedulers so we can
239 #send them conf in this order for a specific realm
240 def get_scheduler_ordered_list(self
, r
):
241 #get scheds, alive and no spare first
243 for s
in r
.schedulers
:
246 #now the spare scheds of higher realms
247 #they are after the sched of realm, so
248 #they will be used after the spare of
250 for higher_r
in r
.higher_realms
:
251 for s
in higher_r
.schedulers
:
255 #Now we sort the scheds so we take master, then spare
256 #the dead, but we do not care about thems
257 scheds
.sort(alive_then_spare_then_deads
)
258 scheds
.reverse() #pop is last, I need first
261 print_sched
= [s
.get_name() for s
in scheds
]
262 print_sched
.reverse()
263 print_string
= '[%s] Schedulers order : ' % r
.get_name()
264 for s
in print_sched
:
265 print_string
+= '%s ' % s
266 logger
.log(print_string
)
273 #REF: doc/shinken-conf-dispatching.png (3)
275 #Ok, we pass at least one time in dispatch, so now errors are True errors
276 self
.first_dispatch_done
= True
278 #Is no need to dispatch, do not dispatch :)
279 if not self
.dispatch_ok
:
280 for r
in self
.realms
:
281 logger
.log("Dispatching Realm %s" % r
.get_name())
282 conf_to_dispatch
= [cfg
for cfg
in r
.confs
.values() if cfg
.is_assigned
==False]
283 nb_conf
= len(conf_to_dispatch
)
284 logger
.log('[%s] Dispatching %d/%d configurations' % (r
.get_name(), nb_conf
, len(r
.confs
)))
286 #Now we get in scheds all scheduler of this realm and upper so
287 #we will send them conf (in this order)
288 scheds
= self
.get_scheduler_ordered_list(r
)
290 #Try to send only for alive members
291 scheds
= [s
for s
in scheds
if s
.alive
]
293 #Now we do the real job
294 every_one_need_conf
= False
295 for conf
in conf_to_dispatch
:
296 logger
.log('[%s] Dispatching one configuration' % r
.get_name())
298 #If there is no alive schedulers, not good...
300 logger
.log('[%s] but there a no alive schedulers in this realm!' % r
.get_name())
302 #we need to loop until the conf is assigned
303 #or when there are no more schedulers available
308 logger
.log('[%s] Trying to send conf %d to scheduler %s' % (r
.get_name(), conf
.id, sched
.get_name()))
310 every_one_need_conf
= True
312 #We tag conf with the instance_name = scheduler_name
313 conf
.instance_name
= sched
.scheduler_name
314 #REF: doc/shinken-conf-dispatching.png (3)
315 #REF: doc/shinken-scheduler-lost.png (2)
316 override_conf
= sched
.get_override_configuration()
317 conf_package
= (conf
, override_conf
, sched
.modules
)
318 is_sent
= sched
.put_conf(conf_package
)
320 logger
.log('[%s] Dispatch OK of for conf in scheduler %s' % (r
.get_name(), sched
.get_name()))
322 sched
.need_conf
= False
323 conf
.is_assigned
= True
324 conf
.assigned_to
= sched
325 #Ok, the conf is dispatch, no more loop for this
329 #Now we generate the conf for satellites:
331 for kind
in ['reactionner', 'poller', 'broker']:
332 r
.to_satellites
[kind
][cfg_id
] = sched
.give_satellite_cfg()
333 r
.to_satellites_nb_assigned
[kind
][cfg_id
] = 0
334 r
.to_satellites_need_dispatch
[kind
][cfg_id
] = True
335 r
.to_satellites_managed_by
[kind
][cfg_id
] = []
337 logger
.log('[%s] Warning : Dispatch fault for scheduler %s' %(r
.get_name(), sched
.get_name()))
339 logger
.log('[%s] The scheduler %s do not need conf, sorry' % (r
.get_name(), sched
.get_name()))
340 except IndexError: #No more schedulers.. not good, no loop
342 #The conf do not need to be dispatch
344 for kind
in ['reactionner', 'poller', 'broker']:
345 r
.to_satellites
[kind
][cfg_id
] = None
346 r
.to_satellites_nb_assigned
[kind
][cfg_id
] = 0
347 r
.to_satellites_need_dispatch
[kind
][cfg_id
] = False
348 r
.to_satellites_managed_by
[kind
][cfg_id
] = []
350 #We pop conf to dispatch, so it must be no more conf...
351 conf_to_dispatch
= [cfg
for cfg
in self
.conf
.confs
.values() if cfg
.is_assigned
==False]
352 nb_missed
= len(conf_to_dispatch
)
354 logger
.log("WARNING : All schedulers configurations are not dispatched, %d are missing" % nb_missed
)
356 logger
.log("OK, all schedulers configurations are dispatched :)")
357 self
.dispatch_ok
= True
359 #Sched without conf in a dispatch ok are set to no need_conf
360 #so they do not raise dispatch where no use
362 for sched
in self
.schedulers
.items
.values():
363 if sched
.conf
== None:
364 #print "Tagging sched", sched.get_name(), "so it do not ask anymore for conf"
365 sched
.need_conf
= False
369 for arb
in self
.arbiters
:
370 arbiters_cfg
[arb
.id] = arb
.give_satellite_cfg()
372 #We put the satellites conf with the "new" way so they see only what we want
373 for r
in self
.realms
:
374 for cfg
in r
.confs
.values():
376 for kind
in ['reactionner', 'poller', 'broker']:
377 if r
.to_satellites_need_dispatch
[kind
][cfg_id
]:
378 logger
.log('[%s] Dispatching %s' % (r
.get_name(),kind
) + 's')
379 cfg_for_satellite_part
= r
.to_satellites
[kind
][cfg_id
]
381 #print "Sched Config part for ", kind+'s',":", cfg_for_satellite_part
382 #make copies of potential_react list for sort
384 for satellite
in r
.get_potential_satellites_by_type(kind
):
385 satellites
.append(satellite
)
386 satellites
.sort(alive_then_spare_then_deads
)
387 satellite_string
= "[%s] %s satellite order : " % (r
.get_name(), kind
)
388 for satellite
in satellites
:
389 satellite_string
+= '%s (spare:%s), ' % (satellite
.get_name(), str(satellite
.spare
))
391 logger
.log(satellite_string
)
393 #Now we dispatch cfg to every one ask for it
395 for satellite
in satellites
:
396 #Send only if we need, and if we can
397 if nb_cfg_sent
< r
.get_nb_of_must_have_satellites(kind
) and satellite
.alive
:
398 logger
.log('[%s] Trying to send configuration to %s %s' %(r
.get_name(), kind
, satellite
.get_name()))
399 #cfg_for_satellite = {'schedulers' : {cfg_id : cfg_for_satellite_part}}
400 satellite
.cfg
['schedulers'][cfg_id
] = cfg_for_satellite_part
401 if satellite
.manage_arbiters
:
402 satellite
.cfg
['arbiters'] = arbiters_cfg
403 #Brokers should have poller/reactionners links too
405 r
.fill_broker_with_poller_reactionner_links(satellite
)
406 #cfg_for_satellite['modules'] = satellite.modules
407 is_sent
= satellite
.put_conf(satellite
.cfg
)#_for_satellite)
409 satellite
.active
= True
410 logger
.log('[%s] Dispatch OK of for configuration %s to %s %s' %(r
.get_name(), cfg_id
, kind
, satellite
.get_name()))
412 r
.to_satellites_managed_by
[kind
][cfg_id
].append(satellite
)
414 # #I've got enouth satellite, the next one are spare for me
415 r
.to_satellites_nb_assigned
[kind
][cfg_id
] = nb_cfg_sent
416 if nb_cfg_sent
== r
.get_nb_of_must_have_satellites(kind
):
417 logger
.log("[%s] OK, no more %s sent need" % (r
.get_name(), kind
))
418 r
.to_satellites_need_dispatch
[kind
][cfg_id
] = False