Fix : get back LiveStatus as default.
[shinken.git] / shinken / satellite.py
blobbccfe40c6a1e4596d919728a4a7cd2cff4808c15
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
24 """
25 This class is an interface for reactionner and poller
26 The satallite listen configuration from Arbiter in a port
27 the configuration gived by arbiter is schedulers where actionner will
28 take actions.
30 When already launch and have a conf, actionner still listen to arbiter
31 (one a timeout)
33 if arbiter want it to have a new conf, satellite forgot old schedulers
34 (and actions into) take new ones and do the (new) job.
35 """
38 from multiprocessing import Queue, Manager, active_children
39 import os
40 import copy
41 import time
42 import sys
43 import cPickle
46 try:
47 import shinken.pyro_wrapper as pyro
48 except ImportError:
49 sys.exit("Shinken require the Python Pyro module. Please install it.")
51 Pyro = pyro.Pyro
54 from shinken.message import Message
55 from shinken.worker import Worker
56 from shinken.load import Load
57 from shinken.daemon import Daemon, Interface
58 from shinken.log import logger
59 from shinken.brok import Brok
60 from shinken.check import Check
61 from shinken.notification import Notification
62 from shinken.eventhandler import EventHandler
65 # Interface for Arbiter, our big MASTER
66 # It put us our conf
67 class IForArbiter(Interface):
69 # Arbiter ask us to do not manage a scheduler_id anymore
70 # I do it and don't ask why
71 def remove_from_conf(self, sched_id):
72 try:
73 del self.app.schedulers[sched_id]
74 except KeyError:
75 pass
77 # Arbiter ask me which sched_id I manage, If it is not ok with it
78 # It will ask me to remove one or more sched_id
79 def what_i_managed(self):
80 return self.app.schedulers.keys()
82 # Call by arbiter if it thinks we are running but we must do not (like
83 # if I was a spare that take a conf but the master returns, I must die
84 # and wait a new conf)
85 # Us : No please...
86 # Arbiter : I don't care, hasta la vista baby!
87 # Us : ... <- Nothing! We are die! you don't follow
88 # anything or what?? Reading code is not a job for eyes only...
89 def wait_new_conf(self):
90 print "Arbiter want me to wait a new conf"
91 self.app.schedulers.clear()
92 self.app.cur_conf = None
94 ### NB: following methods are only used by broker:
96 def push_broks(self, broks):
97 """ Used by the Arbiter to push broks to broker """
98 self.app.add_broks_to_queue(broks.values())
99 return True
101 # The arbiter ask us our external commands in queue
102 def get_external_commands(self):
103 return self.app.get_external_commands()
106 # Interface for Schedulers
107 # If we are passive, they connect to this and
108 # send/get actions
109 class ISchedulers(Interface):
111 # A Scheduler send me actions to do
112 def push_actions(self, actions, sched_id):
113 print "A scheduler sned me actions", actions
114 self.app.add_actions(actions, sched_id)
117 # A scheduler ask us its returns
118 def get_returns(self, sched_id):
119 print "A scheduler ask me the returns", sched_id
120 ret = self.app.get_return_for_passive(sched_id)
121 print "Send mack", len(ret), "returns"
122 return ret
125 # Interface for Brokers
126 # They connect here and get all broks (data for brokers)
127 # datas must be ORDERED! (initial status BEFORE uodate...)
128 class IBroks(Interface):
130 # poller or reactionner ask us actions
131 def get_broks(self):
132 # print "We ask us broks"
133 res = self.app.get_broks()
134 return res
138 class BaseSatellite(Daemon):
139 def __init__(self, name, config_file, is_daemon, do_replace, debug, debug_file):
141 super(BaseSatellite, self).__init__(name, config_file, is_daemon, do_replace, debug, debug_file)
143 # Ours schedulers
144 self.schedulers = {}
146 # Now we create the interfaces
147 self.interface = IForArbiter(self)
149 # The arbiter can resent us new conf in the pyro_daemon port.
150 # We do not want to loose time about it, so it's not a bloking
151 # wait, timeout = 0s
152 # If it send us a new conf, we reinit the connexions of all schedulers
153 def watch_for_new_conf(self, timeout):
154 self.handleRequests(timeout)
156 def do_stop(self):
157 if self.pyro_daemon:
158 print "Stopping all network connexions"
159 self.pyro_daemon.unregister(self.interface)
160 super(BaseSatellite, self).do_stop()
163 # Our main APP class
164 class Satellite(BaseSatellite):
165 def __init__(self, name, config_file, is_daemon, do_replace, debug, debug_file):
167 super(Satellite, self).__init__(name, config_file, is_daemon, do_replace, debug, debug_file)
169 # Keep broks so they can be eaten by a broker
170 self.broks = {}
172 self.workers = {} # dict of active workers
174 self.nb_actions_in_workers = 0
176 # Init stats like Load for workers
177 self.wait_ratio = Load(initial_value=1)
179 self.brok_interface = IBroks(self)
180 self.scheduler_interface = ISchedulers(self)
182 # Just for having these attributes defined here. explicit > implicit ;)
183 self.uri2 = None
184 self.uri3 = None
185 self.s = None
186 self.manager = None
187 self.returns_queue = None
189 self.worker_modules = {}
192 def pynag_con_init(self, id):
193 """ Initialize or re-initialize connexion with scheduler """
194 sched = self.schedulers[id]
195 # If sched is not active, I do not try to init
196 # it is just useless
197 if not sched['active']:
198 return
200 logger.log("[%s] Init de connexion with %s at %s" % (self.name, sched['name'], sched['uri']))
201 running_id = sched['running_id']
202 sch_con = sched['con'] = Pyro.core.getProxyForURI(sched['uri'])
204 # timeout of 120 s
205 # and get the running id
206 try:
207 pyro.set_timeout(sch_con, 5)
208 new_run_id = sch_con.get_running_id()
209 except (Pyro.errors.ProtocolError,Pyro.errors.NamingError, cPickle.PicklingError, KeyError, Pyro.errors.CommunicationError) , exp:
210 logger.log("[%s] Scheduler %s is not initilised or got network problem: %s" % (self.name, sched['name'], str(exp)))
211 sched['con'] = None
212 return
214 # The schedulers have been restart : it has a new run_id.
215 # So we clear all verifs, they are obsolete now.
216 if sched['running_id'] != 0 and new_run_id != running_id:
217 logger.log("[%s] The running id of the scheduler %s changed, we must clear it's actions" % (self.name, sched['name']))
218 sched['wait_homerun'].clear()
219 sched['running_id'] = new_run_id
220 logger.log("[%s] Connexion OK with scheduler %s" % (self.name, sched['name']))
223 # Manage action return from Workers
224 # We just put them into the sched they are for
225 # and we clean unused properties like sched_id
226 def manage_action_return(self, action):
227 # Ok, it's a result. We get it, and fill verifs of the good sched_id
228 sched_id = action.sched_id
229 # Now we now where to put action, we do not need sched_id anymore
230 del action.sched_id
231 action.status = 'waitforhomerun'
232 self.schedulers[sched_id]['wait_homerun'][action.get_id()] = action
233 # We update stats
234 self.nb_actions_in_workers =- 1
237 # Return the chk to scheduler and clean them
238 # REF: doc/shinken-action-queues.png (6)
239 def manage_returns(self):
240 total_sent = 0
241 # Fot all schedulers, we check for waitforhomerun and we send back results
242 for sched_id in self.schedulers:
243 sched = self.schedulers[sched_id]
244 # If sched is not active, I do not try return
245 if not sched['active']:
246 continue
247 # Now ret have all verifs, we can return them
248 send_ok = False
249 ret = sched['wait_homerun'].values()
250 if ret is not []:
251 try:
252 con = sched['con']
253 if con is not None: # None = not initialized
254 send_ok = con.put_results(ret)
255 # Not connected or sched is gone
256 except (Pyro.errors.ProtocolError, KeyError) , exp:
257 print exp
258 self.pynag_con_init(sched_id)
259 return
260 except AttributeError , exp: # the scheduler must not be initialized
261 print exp
262 except Exception , exp:
263 print ''.join(Pyro.util.getPyroTraceback(exp))
264 sys.exit(0)
266 # We clean ONLY if the send is OK
267 if send_ok :
268 sched['wait_homerun'].clear()
269 else:
270 self.pynag_con_init(sched_id)
271 logger.log("Sent failed!")
274 # Get all returning actions for a call from a
275 # scheduler
276 def get_return_for_passive(self, sched_id):
277 # I do not know this scheduler?
278 if sched_id not in self.schedulers:
279 print "I do not know about the scheduler", sched_id
280 return []
282 sched = self.schedulers[sched_id]
283 print "Preparing to return", sched['wait_homerun'].values()
284 ret = copy.copy(sched['wait_homerun'].values())
285 sched['wait_homerun'].clear()
286 print "Finally return", ret
287 return ret
290 # Create and launch a new worker, and put it into self.workers
291 # It can be mortal or not
292 def create_and_launch_worker(self, module_name='fork', mortal=True):
293 q = self.worker_modules[module_name]['to_q']
295 # If we are in the fork module, do not specify a target
296 target = None
297 if module_name == 'fork':
298 target = None
299 else:
300 for module in self.modules_manager.instances:
301 if module.properties['type'] == module_name:
302 target = module.work
303 if target is None:
304 return
305 w = Worker(1, q, self.returns_queue, self.processes_by_worker, \
306 mortal=mortal,max_plugins_output_length = self.max_plugins_output_length, target=target )
307 self.workers[w.id] = w
308 logger.log("[%s] Allocating new %s Worker : %s" % (self.name, module_name, w.id))
309 w.start()
312 def do_stop(self):
313 logger.log('Stopping all workers')
314 for w in self.workers.values():
315 try:
316 w.terminate()
317 w.join(timeout=1)
318 # queue = w.return_queue
319 # self.return_messages.remove(queue)
320 except AttributeError: # A already die worker
321 pass
322 except AssertionError: # In a worker
323 pass
324 if self.pyro_daemon:
325 logger.log('Stopping all network connexions')
326 self.pyro_daemon.unregister(self.brok_interface)
327 self.pyro_daemon.unregister(self.scheduler_interface)
328 super(Satellite, self).do_stop()
331 # A simple fucntion to add objects in self
332 # like broks in self.broks, etc
333 # TODO : better tag ID?
334 def add(self, elt):
335 if isinstance(elt, Brok):
336 # For brok, we TAG brok with our instance_id
337 elt.data['instance_id'] = 0
338 self.broks[elt.id] = elt
339 return
342 # Someone ask us our broks. We send them, and clean the queue
343 def get_broks(self):
344 res = copy.copy(self.broks)
345 self.broks.clear()
346 return res
349 # workers are processes, they can die in a numerous of ways
350 # like :
351 # *99.99% : bug in code, sorry :p
352 # *0.005 % : a mix between a stupid admin (or an admin without coffee),
353 # and a kill command
354 # *0.005% : alien attack
355 # So they need to be detected, and restart if need
356 def check_and_del_zombie_workers(self):
357 # Active children make a join with every one, useful :)
358 act = active_children()
360 w_to_del = []
361 for w in self.workers.values():
362 # If a worker go down and we do not ask him, it's not
363 # good : we can think having a worker and it's not True
364 # So we del it
365 if not w.is_alive():
366 logger.log("[%s] Warning : the worker %s goes down unexpectly!" % (self.name, w.id))
367 # AIM ... Press FIRE ... <B>HEAD SHOT!</B>
368 w.terminate()
369 w.join(timeout=1)
370 w_to_del.append(w.id)
371 # OK, now really del workers
372 for id in w_to_del:
373 del self.workers[id]
376 # Here we create new workers if the queue load (len of verifs) is too long
377 def adjust_worker_number_by_load(self):
378 # TODO : get a real value for a load
379 wish_worker = 1
380 # I want at least min_workers or wish_workers (the biggest) but not more than max_workers
381 while len(self.workers) < self.min_workers \
382 or (wish_worker > len(self.workers) and len(self.workers) < self.max_workers):
383 for mod in self.worker_modules:
384 self.create_and_launch_worker(module_name=mod)
385 # TODO : if len(workers) > 2*wish, maybe we can kill a worker?
388 # Get the Queue() from an action by looking at which module
389 # it wants
390 def _got_queue_from_action(self, a):
391 if hasattr(a, 'module_type'):
392 if a.module_type in self.worker_modules:
393 if a.module_type != 'fork':
394 print "GOT A SPECIAL QUEUE (%s) for" % a.module_type, a.__dict__,
395 return self.worker_modules[a.module_type]['to_q']
396 # Nothing found, it's not good at all!
397 return None
398 # If none, call the standard 'fork'
399 return self.worker_modules['fork']['to_q']
402 # Add to our queues a list of actions
403 def add_actions(self, lst, sched_id):
404 for a in lst:
405 a.sched_id = sched_id
406 a.status = 'queue'
407 msg = Message(id=0, type='Do', data=a)
408 q = self._got_queue_from_action(a)
409 if q is not None:
410 q.put(msg)
411 # Update stats
412 self.nb_actions_in_workers += 1
415 # We get new actions from schedulers, we create a Message ant we
416 # put it in the s queue (from master to slave)
417 # REF: doc/shinken-action-queues.png (1)
418 def get_new_actions(self):
419 # Here are the differences between a
420 # poller and a reactionner:
421 # Poller will only do checks,
422 # reactionner do actions
423 do_checks = self.__class__.do_checks
424 do_actions = self.__class__.do_actions
426 # We check for new check in each schedulers and put the result in new_checks
427 for sched_id in self.schedulers:
428 sched = self.schedulers[sched_id]
429 # If sched is not active, I do not try return
430 if not sched['active']:
431 continue
433 try:
434 con = sched['con']
435 if con is not None: # None = not initilized
436 pyro.set_timeout(con, 120)
437 # OK, go for it :)
438 tmp = con.get_checks(do_checks=do_checks, do_actions=do_actions, poller_tags=self.poller_tags)
439 print "Ask actions to", sched_id, "got", len(tmp)
440 # We 'tag' them with sched_id and put into queue for workers
441 # REF: doc/shinken-action-queues.png (2)
442 self.add_actions(tmp, sched_id)
443 #for a in tmp:
444 # a.sched_id = sched_id
445 # a.status = 'queue'
446 # msg = Message(id=0, type='Do', data=a)
447 # q = self._got_queue_from_action(a)
448 # if q != None:
449 # q.put(msg)
450 # # Update stats
451 # self.nb_actions_in_workers += 1
452 else: # no con? make the connexion
453 self.pynag_con_init(sched_id)
454 # Ok, con is not know, so we create it
455 # Or maybe is the connexion lsot, we recreate it
456 except (KeyError, Pyro.errors.ProtocolError) , exp:
457 print exp
458 self.pynag_con_init(sched_id)
459 # scheduler must not be initialized
460 # or scheduler must not have checks
461 except (AttributeError, Pyro.errors.NamingError) , exp:
462 print exp
463 # What the F**k? We do not know what happenned,
464 # so.. bye bye :)
465 except Pyro.errors.ConnectionClosedError , exp:
466 print exp
467 self.pynag_con_init(sched_id)
468 except Exception , exp:
469 print ''.join(Pyro.util.getPyroTraceback(exp))
470 sys.exit(0)
473 def do_loop_turn(self):
474 # Maybe the arbiter ask us to wait for a new conf
475 # If true, we must restart all...
476 if self.cur_conf is None:
477 print "Begin wait initial"
478 self.wait_for_initial_conf()
479 print "End wait initial"
480 if not self.new_conf: # we may have been interrupted or so; then just return from this loop turn
481 return
482 self.setup_new_conf()
484 # Now we check if arbiter speek to us in the pyro_daemon.
485 # If so, we listen for it
486 # When it push us conf, we reinit connexions
487 # Sleep in waiting a new conf :)
488 self.watch_for_new_conf(self.timeout)
489 if self.new_conf:
490 self.setup_new_conf()
492 print " ======================== "
494 self.timeout = self.polling_interval
496 # Check if zombies workers are among us :)
497 # If so : KILL THEM ALL!!!
498 self.check_and_del_zombie_workers()
500 # Print stats for debug
501 for sched_id in self.schedulers:
502 sched = self.schedulers[sched_id]
503 for mod in self.worker_modules:
504 # In workers we've got actions send to queue - queue size
505 q = self.worker_modules[mod]['to_q']
506 print '[%d][%s][%s]Stats : Workers:%d (Queued:%d Processing:%d ReturnWait:%d)' % \
507 (sched_id, sched['name'], mod, len(self.workers), q.qsize(), \
508 self.nb_actions_in_workers - q.qsize(), len(self.returns_queue))
511 # Before return or get new actions, see how we manage
512 # old ones : are they still in queue (s)? If True, we
513 # must wait more or at least have more workers
514 wait_ratio = self.wait_ratio.get_load()
515 total_q = 0
516 for mod in self.worker_modules:
517 q = self.worker_modules[mod]['to_q']
518 total_q += q.qsize()
519 if total_q != 0 and wait_ratio < 5*self.polling_interval:
520 print "I decide to up wait ratio"
521 self.wait_ratio.update_load(wait_ratio * 2)
522 else:
523 # Go to self.polling_interval on normal run, if wait_ratio
524 # was >5*self.polling_interval,
525 # it make it come near 5 because if < 5, go up :)
526 self.wait_ratio.update_load(self.polling_interval)
527 wait_ratio = self.wait_ratio.get_load()
528 print "Wait ratio:", wait_ratio
530 # We can wait more than 1s if need,
531 # no more than 5s, but no less than 1
532 timeout = self.timeout * wait_ratio
533 timeout = max(self.polling_interval, timeout)
534 self.timeout = min(5*self.polling_interval, timeout)
536 # Maybe we do not have enouth workers, we check for it
537 # and launch new ones if need
538 self.adjust_worker_number_by_load()
540 # Manage all messages we've got in the last timeout
541 # for queue in self.return_messages:
542 while len(self.returns_queue) != 0:
543 self.manage_action_return(self.returns_queue.pop())
545 # If we are passive, we do not initiate the check getting
546 # and return
547 print "Am I passive?", self.passive
548 if not self.passive:
549 print "I try to get new actions!"
550 # Now we can get new actions from schedulers
551 self.get_new_actions()
553 # We send all finished checks
554 # REF: doc/shinken-action-queues.png (6)
555 self.manage_returns()
558 def do_post_daemon_init(self):
559 """ Do this satellite (poller or reactionner) post "daemonize" init:
560 we must register our interfaces for 3 possible callers: arbiter, schedulers or brokers. """
561 # And we register them
562 self.uri2 = self.pyro_daemon.register(self.interface, "ForArbiter")
563 self.uri3 = self.pyro_daemon.register(self.brok_interface, "Broks")
564 self.uri4 = self.pyro_daemon.register(self.scheduler_interface, "Schedulers")
566 # self.s = Queue() # Global Master -> Slave
567 # We can open the Queeu for fork AFTER
568 self.worker_modules['fork'] = {'to_q' : Queue()}
569 self.manager = Manager()
570 self.returns_queue = self.manager.list()
573 def setup_new_conf(self):
574 """ Setup the new received conf """
575 conf = self.new_conf
576 print "[%s] Sending us a configuration %s " % (self.name, conf)
577 self.new_conf = None
578 self.cur_conf = conf
579 # Gout our name from the globals
580 if 'poller_name' in conf['global']:
581 name = conf['global']['poller_name']
582 elif 'reactionner_name' in conf['global']:
583 name = conf['global']['reactionner_name']
584 else:
585 name = 'Unnamed satellite'
586 self.name = name
588 # If we've got something in the schedulers, we do not want it anymore
589 for sched_id in conf['schedulers'] :
590 already_got = False
591 if sched_id in self.schedulers:
592 logger.log("[%s] We already got the conf %d (%s)" % (self.name, sched_id, conf['schedulers'][sched_id]['name']))
593 already_got = True
594 wait_homerun = self.schedulers[sched_id]['wait_homerun']
595 s = conf['schedulers'][sched_id]
596 self.schedulers[sched_id] = s
598 uri = pyro.create_uri(s['address'], s['port'], 'Checks', self.use_ssl)
599 print "DBG: scheduler UIR:", uri
601 self.schedulers[sched_id]['uri'] = uri
602 if already_got:
603 self.schedulers[sched_id]['wait_homerun'] = wait_homerun
604 else:
605 self.schedulers[sched_id]['wait_homerun'] = {}
606 self.schedulers[sched_id]['running_id'] = 0
607 self.schedulers[sched_id]['active'] = s['active']
609 # And then we connect to it :)
610 self.pynag_con_init(sched_id)
612 # Now the limit part
613 self.max_workers = conf['global']['max_workers']
614 self.min_workers = conf['global']['min_workers']
615 self.passive = conf['global']['passive']
616 print "Is passive?", self.passive
617 self.processes_by_worker = conf['global']['processes_by_worker']
618 self.polling_interval = conf['global']['polling_interval']
619 self.timeout = self.polling_interval
620 if 'poller_tags' in conf['global']:
621 self.poller_tags = conf['global']['poller_tags']
622 else: # for reactionner, poler_tag is [None]
623 self.poller_tags = []
624 if 'max_plugins_output_length' in conf['global']:
625 self.max_plugins_output_length = conf['global']['max_plugins_output_length']
626 else: # for reactionner, we don't really care about it
627 self.max_plugins_output_length = 8192
628 print "Max output lenght" , self.max_plugins_output_length
629 # Set our giving timezone from arbiter
630 use_timezone = conf['global']['use_timezone']
631 if use_timezone != 'NOTSET':
632 logger.log("[%s] Setting our timezone to %s" %(self.name, use_timezone))
633 os.environ['TZ'] = use_timezone
634 time.tzset()
636 logger.log("We have our schedulers : %s" % (str(self.schedulers)))
638 # Now manage modules
639 # TODO: check how to better handle this with modules_manager..
640 mods = conf['global']['modules']
641 for module in mods:
642 # If we already got it, bypass
643 if not module.module_type in self.worker_modules:
644 print "Add module object", module
645 self.modules_manager.modules.append(module)
646 logger.log("[%s] Got module : %s " % (self.name, module.module_type))
647 self.worker_modules[module.module_type] = {'to_q' : Queue()}
650 def main(self):
652 for line in self.get_header():
653 self.log.log(line)
655 self.load_config_file()
657 self.do_daemon_init_and_start()
659 self.do_post_daemon_init()
661 # We wait for initial conf
662 self.wait_for_initial_conf()
663 if not self.new_conf: # we must have either big problem or was requested to shutdown
664 return
665 self.setup_new_conf()
667 # We can load our modules now
668 self.modules_manager.set_modules(self.modules_manager.modules)
669 self.modules_manager.load_and_init()
671 # Allocate Mortal Threads
672 for _ in xrange(1, self.min_workers):
673 for mod in self.worker_modules:
674 self.create_and_launch_worker(module_name=mod)
676 # Now main loop
677 self.do_mainloop()