Fix : get back LiveStatus as default.
[shinken.git] / shinken / scheduler.py
blob1e613080df9e3958bc6fe010d9c66d5034fe775b
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
24 import time
25 import os
26 import traceback
28 import shinken.pyro_wrapper as pyro
29 from shinken.pyro_wrapper import Pyro
30 from shinken.external_command import ExternalCommand
31 from shinken.check import Check
32 from shinken.notification import Notification
33 from shinken.eventhandler import EventHandler
34 from shinken.brok import Brok
35 from shinken.downtime import Downtime
36 from shinken.contactdowntime import ContactDowntime
37 from shinken.comment import Comment
38 from shinken.log import logger
41 class Scheduler:
42 def __init__(self, scheduler_daemon):
43 self.sched_daemon = scheduler_daemon
44 # When set to false by us, we die and arbiter launch a new Scheduler
45 self.must_run = True
47 self.waiting_results = [] # satellites returns us results
48 # and for not waiting them, we are putting them here and
49 # consume thems later
51 # Every N seconds we call functions like consume, del zombies
52 # etc. All of theses functions are in recurrent_works with the
53 # every tick to run. So must be integer and > 0
54 # The order is important, so make key a int.
55 # TODO : at load, change value by configuration one (like reaper time, etc)
56 self.recurrent_works = {
57 0 : ('update_downtimes_and_comments', self.update_downtimes_and_comments, 1),
58 1 : ('schedule', self.schedule, 1), # just schedule
59 2 : ('consume_results', self.consume_results , 1), # incorpore checks and dependancies
60 3 : ('get_new_actions', self.get_new_actions, 1), # now get the news actions (checks, notif) raised
61 4 : ('get_new_broks', self.get_new_broks, 1), # and broks
62 5 : ('delete_zombie_checks', self.delete_zombie_checks, 1),
63 6 : ('delete_zombie_actions', self.delete_zombie_actions, 1),
64 # 3 : (self.delete_unwanted_notifications, 1),
65 7 : ('check_freshness', self.check_freshness, 10),
66 8 : ('clean_caches', self.clean_caches, 1),
67 9 : ('update_retention_file', self.update_retention_file, 3600),
68 10 : ('check_orphaned', self.check_orphaned, 60),
69 # For NagVis like tools : udpdate our status every 10s
70 11 : ('get_and_register_update_program_status_brok', self.get_and_register_update_program_status_brok, 10),
71 # Check for system time change. And AFTER get new checks
72 # so they are changed too.
73 12 : ('check_for_system_time_change', self.sched_daemon.check_for_system_time_change, 1),
74 # launch if need all internal checks
75 13 : ('manage_internal_checks', self.manage_internal_checks, 1),
78 # stats part
79 self.nb_checks_send = 0
80 self.nb_actions_send = 0
81 self.nb_broks_send = 0
82 self.nb_check_received = 0
84 # Log init
85 self.log = logger
86 self.log.load_obj(self)
88 self.instance_id = 0 # Temporary set. Will be erase later
90 # Ours queues
91 self.checks = {}
92 self.actions = {}
93 self.downtimes = {}
94 self.contact_downtimes = {}
95 self.comments = {}
96 self.broks = {}
98 self.has_full_broks = False # have a initial_broks in broks queue?
101 def reset(self):
102 self.must_run = True
103 del self.waiting_results[:]
104 for o in self.checks, self.actions, self.downtimes, self.contact_downtimes, self.comments, self.broks:
105 o.clear()
109 # Load conf for future use
110 def load_conf(self, conf):
111 self.program_start = int(time.time())
112 self.conf = conf
113 self.hostgroups = conf.hostgroups
114 self.hostgroups.create_reversed_list()
115 self.services = conf.services
116 # We need reversed list for search in the retention
117 # file read
118 self.services.create_reversed_list()
119 self.services.optimize_service_search(conf.hosts)
120 self.hosts = conf.hosts
121 # DBG:
122 # for h in self.hosts:
123 # print h.get_name(), h.parents
124 self.hosts.create_reversed_list()
126 self.notificationways = conf.notificationways
127 self.contacts = conf.contacts
128 self.contacts.create_reversed_list()
129 self.contactgroups = conf.contactgroups
130 self.contactgroups.create_reversed_list()
131 self.servicegroups = conf.servicegroups
132 self.servicegroups.create_reversed_list()
133 self.timeperiods = conf.timeperiods
134 self.timeperiods.create_reversed_list()
135 self.commands = conf.commands
137 # self.status_file = StatusFile(self) # External status file
138 self.instance_id = conf.instance_id # From Arbiter. Use for
139 # Broker to disting betweens
140 # schedulers
141 # self for instance_name
142 self.instance_name = conf.instance_name
144 # Now we can updte our 'ticks' for special calls
145 # like the retention one, etc
146 self.update_recurrent_works_tick('update_retention_file', self.conf.retention_update_interval)
149 # Update the 'tick' for a function call in our
150 # recurrent work
151 def update_recurrent_works_tick(self, f_name, new_tick):
152 for i in self.recurrent_works:
153 (name, f, old_tick) = self.recurrent_works[i]
154 if name == f_name:
155 print "Changing the tick for the function", name, new_tick
156 self.recurrent_works[i] = (name, f, new_tick)
159 # Load the pollers from our app master
160 def load_satellites(self, pollers, reactionners):
161 self.pollers = pollers
162 self.reactionners = reactionners
165 # Oh... Arbiter want us to die... For launch a new Scheduler
166 # "Mais qu'a-t-il de plus que je n'ais pas?"
167 def die(self):
168 self.must_run = False
170 # Load the external commander
171 def load_external_command(self, e):
172 self.external_command = e
175 # We've got activity in the fifo, we get and run commands
176 def run_external_command(self, command):
177 print "scheduler resolves command", command
178 ext_cmd = ExternalCommand(command)
179 self.external_command.resolve_command(ext_cmd)
182 def add_Brok(self, brok):
183 # For brok, we TAG brok with our instance_id
184 brok.data['instance_id'] = self.instance_id
185 self.broks[brok.id] = brok
187 def add_Notification(self, notif):
188 self.actions[notif.id] = notif
189 # A notification ask for a brok
190 if notif.contact is not None:
191 b = notif.get_initial_status_brok()
192 self.add(b)
194 def add_Check(self, c):
195 self.checks[c.id] = c
196 # A new check mean the host/service change it's next_check
197 # need to be refresh
198 b = c.ref.get_next_schedule_brok()
199 self.add(b)
201 def add_EventHandler(self, action):
202 # print "Add an event Handler", elt.id
203 self.actions[action.id] = action
205 def add_Downtime(self, dt):
206 self.downtimes[dt.id] = dt
207 if dt.extra_comment:
208 self.add_Comment(dt.extra_comment)
210 def add_ContactDowntime(self, contact_dt):
211 self.contact_downtimes[contact_dt.id] = contact_dt
213 def add_Comment(self, comment):
214 self.comments[comment.id] = comment
215 b = comment.ref.get_update_status_brok()
216 self.add(b)
218 # Schedulers have some queues. We can simplify call by adding
219 # elements into the proper queue just by looking at their type
220 # Brok -> self.broks
221 # Check -> self.checks
222 # Notification -> self.actions
223 # Downtime -> self.downtimes
224 # ContactDowntime -> self.contact_downtimes
225 def add(self, elt):
226 f = self.__add_actions.get(elt.__class__, None)
227 if f:
228 #print("found action for %s : %s" % (elt.__class__.__name__, f.__name__))
229 f(self, elt)
231 __add_actions = {
232 Check: add_Check,
233 Brok: add_Brok,
234 Notification: add_Notification,
235 EventHandler: add_EventHandler,
236 Downtime: add_Downtime,
237 ContactDowntime: add_ContactDowntime,
238 Comment: add_Comment
242 # Ours queues may explode if noone ask us for elements
243 # It's very dangerous : you can crash your server... and it's a bad thing :)
244 # So we 'just' keep last elements : 2 of max is a good overhead
245 def clean_queues(self):
246 max_checks = 2 * (len(self.hosts) + len(self.services))
247 max_broks = 2 * (len(self.hosts) + len(self.services))
248 max_actions = 2* len(self.contacts) * (len(self.hosts) + len(self.services))
250 # For checks, it's not very simple:
251 # For checks, they may be referred to their host/service
252 # We do not just del them in checks, but also in their service/host
253 # We want id of less than max_id - 2*max_checks
254 if len(self.checks) > max_checks:
255 id_max = self.checks.keys()[-1] # The max id is the last id
256 # : max is SO slow!
257 to_del_checks = [c for c in self.checks.values() if c.id < id_max - max_checks]
258 nb_checks_drops = len(to_del_checks)
259 if nb_checks_drops > 0:
260 print "I have to del some checks..., sorry", to_del_checks
261 for c in to_del_checks:
262 i = c.id
263 elt = c.ref
264 # First remove the link in host/service
265 elt.remove_in_progress_check(c)
266 # Then in dependant checks (I depend on, or check
267 # depend on me)
268 for dependant_checks in c.depend_on_me:
269 dependant_checks.depend_on.remove(c.id)
270 for c_temp in c.depend_on:
271 c_temp.depen_on_me.remove(c)
272 del self.checks[i] # Final Bye bye ...
273 else:
274 nb_checks_drops = 0
276 # For broks and actions, it's more simple
277 if len(self.broks) > max_broks:
278 id_max = self.broks.keys()[-1]
279 id_to_del_broks = [i for i in self.broks if i < id_max - max_broks]
280 nb_broks_drops = len(id_to_del_broks)
281 for i in id_to_del_broks:
282 del self.broks[i]
283 else:
284 nb_broks_drops = 0
286 if len(self.actions) > max_actions:
287 id_max = self.actions.keys()[-1]
288 id_to_del_actions = [i for i in self.actions if i < id_max - max_actions]
289 nb_actions_drops = len(id_to_del_actions)
290 for i in id_to_del_actions:
291 # Remeber to delete reference of notification in service/host
292 if i.is_a == 'notification':
293 item = self.actions[i].ref
294 item.remove_in_progress_notification(self.actions[i])
295 del self.actions[i]
296 else:
297 nb_actions_drops = 0
299 if nb_checks_drops != 0 or nb_broks_drops != 0 or nb_actions_drops != 0:
300 print "WARNING: We drop %d checks, %d broks and %d actions" % (nb_checks_drops, nb_broks_drops, nb_actions_drops)
301 logger.log("WARNING: We drop %d checks, %d broks and %d actions" % (nb_checks_drops, nb_broks_drops, nb_actions_drops))
304 # For tunning purpose we use caches but we do not whant them to explode
305 # So we clean thems
306 def clean_caches(self):
307 # print "********** Clean caches *********"
308 for tp in self.timeperiods:
309 tp.clean_cache()
312 # Ask item (host or service) a update_status
313 # and add it to our broks queue
314 def get_and_register_status_brok(self, item):
315 b = item.get_update_status_brok()
316 self.add(b)
319 # Ask item (host or service) a check_result_brok
320 # and add it to our broks queue
321 def get_and_register_check_result_brok(self, item):
322 b = item.get_check_result_brok()
323 self.add(b)
326 # We do not want this downtime id
327 def del_downtime(self, dt_id):
328 if dt_id in self.downtimes:
329 self.downtimes[dt_id].ref.del_downtime(dt_id)
330 del self.downtimes[dt_id]
332 # We do not want this downtime id
333 def del_contact_downtime(self, dt_id):
334 if dt_id in self.contact_downtimes:
335 self.contact_downtimes[dt_id].ref.del_downtime(dt_id)
336 del self.contact_downtimes[dt_id]
339 # We do not want this comment id
340 def del_comment(self, c_id):
341 if c_id in self.comments:
342 self.comments[c_id].ref.del_comment(c_id)
343 del self.comments[c_id]
346 # Called by poller to get checks
347 # Can get checks and actions (notifications and co)
348 def get_to_run_checks(self, do_checks=False, do_actions=False,
349 poller_tags=[]):
350 res = []
351 now = time.time()
353 # If poller want to do checks
354 if do_checks:
355 for c in self.checks.values():
356 # If the command is untagged, and the poller too, or if both are taggued
357 # with same name, go for it
358 # if do_check, call for poller, and so poller_tags by default is []
359 if (c.poller_tag is None and poller_tags == []) or c.poller_tag in poller_tags:
360 # must be ok to launch, and not an internal one (business rules based)
361 if c.status == 'scheduled' and c.is_launchable(now) and not c.internal:
362 c.status = 'inpoller'
363 # We do not send c, because it it link (c.ref) to
364 # host/service and poller do not need it. It just
365 # need a shell with id, command and defaults
366 # parameters. It's the goal of copy_shell
367 res.append(c.copy_shell())
369 # If reactionner want to notify too
370 if do_actions:
371 for a in self.actions.values():
372 if a.status == 'scheduled' and a.is_launchable(now):
373 a.status = 'inpoller'
374 if a.is_a == 'notification' and not a.contact:
375 # This is a "master" notification created by create_notifications.
376 # It will not be sent itself because it has no contact.
377 # We use it to create "child" notifications (for the contacts and
378 # notification_commands) which are executed in the reactionner.
379 item = a.ref
380 childnotifications = []
381 if not item.notification_is_blocked_by_item(a.type, now):
382 # If it is possible to send notifications of this type at the current time, then create
383 # a single notification for each contact of this item.
384 childnotifications = item.scatter_notification(a)
385 for c in childnotifications:
386 c.status = 'inpoller'
387 self.add(c) # this will send a brok
388 new_c = c.copy_shell()
389 res.append(new_c)
391 # If we have notification_interval then schedule the next notification (problems only)
392 if a.type == 'PROBLEM':
393 # Update the ref notif number after raise the one of the notification
394 if len(childnotifications) != 0:
395 # notif_nb of the master notification was already current_notification_number+1.
396 # If notifications were sent, then host/service-counter will also be incremented
397 item.current_notification_number = a.notif_nb
399 if item.notification_interval != 0 and a.t_to_go is not None:
400 # We must continue to send notifications.
401 # Just leave it in the actions list and set it to "scheduled" and it will be found again later
402 #a.t_to_go = a.t_to_go + item.notification_interval * item.__class__.interval_length
403 # Ask the service/host to compute the next notif time. It can be just
404 # a.t_to_go + item.notification_interval * item.__class__.interval_length
405 # or maybe before because we have an escalation that need to raise up before
406 a.t_to_go = item.get_next_notification_time(a)
408 a.notif_nb = item.current_notification_number + 1
409 a.status = 'scheduled'
410 else:
411 # Wipe out this master notification. One problem notification is enough.
412 item.remove_in_progress_notification(a)
413 self.actions[a.id].status = 'zombie'
415 else:
416 # Wipe out this master notification. We don't repeat recover/downtime/flap/etc...
417 item.remove_in_progress_notification(a)
418 self.actions[a.id].status = 'zombie'
419 else:
420 # This is for child notifications and eventhandlers
421 new_a = a.copy_shell()
422 res.append(new_a)
423 return res
426 # Called by poller and reactionner to send result
427 def put_results(self, c):
428 if c.is_a == 'notification':
429 # We will only see childnotifications here
430 try:
431 self.actions[c.id].get_return_from(c)
432 item = self.actions[c.id].ref
433 item.remove_in_progress_notification(c)
434 self.actions[c.id].status = 'zombie'
435 item.last_notification = c.check_time
436 #If we' ve got a problem with the notification, raise a Warning log
437 if c.exit_status != 0:
438 logger.log("Warning : the notification command '%s' raised an error (exit code=%d) : '%s'" % (c.command, c.exit_status, c.output))
439 except KeyError , exp:
440 logger.log("Warning : received an notification of an unknown id! %s" % str(exp))
442 elif c.is_a == 'check':
443 try:
444 self.checks[c.id].get_return_from(c)
445 self.checks[c.id].status = 'waitconsume'
446 except KeyError , exp:
447 logger.log("Warning : received an check of an unknown id! %s" % str(exp))
448 elif c.is_a == 'eventhandler':
449 # It just die
450 try:
451 self.actions[c.id].status = 'zombie'
452 # Maybe we reveied a return of a old even handler, so we can forget it
453 except KeyError:
454 pass
455 else:
456 logger.log("Error : the received result type in unknown ! %s" % str(c.is_a))
460 # Get teh good tabs for links by the kind. If unknown, return None
461 def get_links_from_type(self, type):
462 t = { 'poller' : self.pollers, 'reactionner' : self.reactionners }
463 if type in t :
464 return t[type]
465 return None
468 # Check if we do not connect to ofthen to this
469 def is_connexion_try_too_close(self, elt):
470 now = time.time()
471 last_connexion = elt['last_connexion']
472 if now - last_connexion < 5:
473 return True
474 return False
477 # initialise or re-initialise connexion with a poller
478 # or a reactionner
479 def pynag_con_init(self, id, type='poller'):
480 # Get teh good links tab for looping..
481 links = self.get_links_from_type(type)
482 if links is None:
483 logger.log('DBG: Type unknown for connexion! %s' % type)
484 return
486 # We want only to initiate connexions to the passive
487 # pollers and reactionners
488 passive = links[id]['passive']
489 if not passive:
490 return
492 # If we try to connect too much, we slow down our tests
493 if self.is_connexion_try_too_close(links[id]):
494 return
496 # Ok, we can now update it
497 links[id]['last_connexion'] = time.time()
499 print "Init connexion with", links[id]['uri']
501 uri = links[id]['uri']
502 links[id]['con'] = Pyro.core.getProxyForURI(uri)
503 con = links[id]['con']
505 try:
506 # intial ping must be quick
507 pyro.set_timeout(con, 5)
508 con.ping()
509 except Pyro.errors.ProtocolError, exp:
510 logger.log("[] Connexion problem to the %s %s : %s" % (type, links[id]['name'], str(exp)))
511 links[id]['con'] = None
512 return
513 except Pyro.errors.NamingError, exp:
514 logger.log("[] the %s '%s' is not initilised : %s" % (type, links[id]['name'], str(exp)))
515 links[id]['con'] = None
516 return
517 except KeyError , exp:
518 logger.log("[] the %s '%s' is not initilised : %s" % (type, links[id]['name'], str(exp)))
519 links[id]['con'] = None
520 traceback.print_stack()
521 return
522 except Pyro.errors.CommunicationError, exp:
523 logger.log("[] the %s '%s' got CommunicationError : %s" % (type, links[id]['name'], str(exp)))
524 links[id]['con'] = None
525 return
527 logger.log("[] Connexion OK to the %s %s" % (type, links[id]['name']))
530 # We should push actions to our passives satellites
531 def push_actions_to_passives_satellites(self):
532 # We loop for our passive pollers
533 for p in filter(lambda p: p['passive'], self.pollers.values()):
534 print "I will send actions to the poller", p
535 con = p['con']
536 poller_tags = p['poller_tags']
537 if con is not None:
538 # get actions
539 lst = self.get_to_run_checks(True, False, poller_tags)
540 try:
541 # intial ping must be quick
542 pyro.set_timeout(con, 120)
543 print "Sending", len(lst), "actions"
544 con.push_actions(lst, self.instance_id)
545 self.nb_checks_send += len(lst)
546 except Pyro.errors.ProtocolError, exp:
547 logger.log("[] Connexion problem to the %s %s : %s" % (type, p['name'], str(exp)))
548 p['con'] = None
549 return
550 except Pyro.errors.NamingError, exp:
551 logger.log("[] the %s '%s' is not initilised : %s" % (type, p['name'], str(exp)))
552 p['con'] = None
553 return
554 except KeyError , exp:
555 logger.log("[] the %s '%s' is not initilised : %s" % (type, p['name'], str(exp)))
556 p['con'] = None
557 traceback.print_stack()
558 return
559 except Pyro.errors.CommunicationError, exp:
560 logger.log("[] the %s '%s' got CommunicationError : %s" % (type, p['name'], str(exp)))
561 p['con'] = None
562 return
563 #we came back to normal timeout
564 pyro.set_timeout(con, 5)
565 else : # no connexion? try to reconnect
566 self.pynag_con_init(p['instance_id'], type='poller')
570 # We should get returns from satellites
571 def get_actions_from_passives_satellites(self):
572 # We loop for our passive pollers
573 for p in filter(lambda p: p['passive'], self.pollers.values()):
574 print "I will get actions from the poller", p
575 con = p['con']
576 poller_tags = p['poller_tags']
577 if con is not None:
578 try:
579 # intial ping must be quick
580 pyro.set_timeout(con, 120)
581 results = con.get_returns(self.instance_id)
582 nb_received = len(results)
583 self.nb_check_received += nb_received
584 print "Received %d passive results" % nb_received
585 self.waiting_results.extend(results)
586 except Pyro.errors.ProtocolError, exp:
587 logger.log("[] Connexion problem to the %s %s : %s" % (type, p['name'], str(exp)))
588 p['con'] = None
589 return
590 except Pyro.errors.NamingError, exp:
591 logger.log("[] the %s '%s' is not initilised : %s" % (type, p['name'], str(exp)))
592 p['con'] = None
593 return
594 except KeyError , exp:
595 logger.log("[] the %s '%s' is not initilised : %s" % (type, p['name'], str(exp)))
596 p['con'] = None
597 traceback.print_stack()
598 return
599 except Pyro.errors.CommunicationError, exp:
600 logger.log("[] the %s '%s' got CommunicationError : %s" % (type, p['name'], str(exp)))
601 p['con'] = None
602 return
603 #we came back to normal timeout
604 pyro.set_timeout(con, 5)
605 else: # no connexion, try reinit
606 self.pynag_con_init(p['instance_id'], type='poller')
610 # Some checks are purely internal, like business based one
611 # simply ask their ref to manage it when it's ok to run
612 def manage_internal_checks(self):
613 now = time.time()
614 for c in self.checks.values():
615 # must be ok to launch, and not an internal one (business rules based)
616 if c.status == 'scheduled' and c.is_launchable(now) and c.internal:
617 c.ref.manage_internal_check(c)
618 # it manage it, now just ask to consume it
619 # like for all checks
620 c.status = 'waitconsume'
624 # Call by brokers to have broks
625 # We give them, and clean them!
626 def get_broks(self):
627 res = self.broks
628 # They are gone, we keep none!
629 self.broks = {}
630 # print "returning broks"
631 # for b in res:
632 # print b, res[b]
633 return res
636 # Update the retention file and give it all of ours data in
637 # a dict so read can pickup what it wants
638 # For now compression is no use, but it can be add easylly
639 # just uncomment :)
640 def update_retention_file(self, forced=False):
641 # If we set the update to 0, we do not want of this
642 # if we do not forced (like at stopping)
643 if self.conf.retention_update_interval == 0 and not forced:
644 return
646 to_del = []
648 # Do the job for all modules that do the retention
649 for inst in self.sched_daemon.modules_manager.instances:
650 if 'retention' in inst.phases:
651 try:
652 # Ask it with self to they have full access, and a log object
653 # so they can easily raise log
654 inst.update_retention_objects(self, logger)
655 except Exception , exp:
656 print exp.__dict__
657 logger.log("[%s] Warning : The mod %s raise an exception: %s, I kill it" % (self.instance_name, inst.get_name(),str(exp)))
658 logger.log("[%s] Exception type : %s" % (self.instance_name, type(exp)))
659 logger.log("[%s] Traceback: %s" % (self.instance_name, traceback.format_exc()))
660 to_del.append(inst)
662 # Now remove mod that raise an exception
663 self.sched_daemon.modules_manager.clear_instances(to_del)
666 # Load the retention file and get status from it. It do not get all checks in progress
667 # for the moment, just the status and the notifications.
668 def retention_load(self):
669 to_del = []
670 # Do this job with modules too
671 for inst in self.sched_daemon.modules_manager.instances:
672 if 'retention' in inst.phases:
673 try:
674 # give us ourself (full control!) and a log manager object
675 b = inst.load_retention_objects(self, logger)
676 # Stop at the first module that succeed to load the retention
677 if b:
678 return
679 except Exception , exp:
680 print exp.__dict__
681 logger.log("[%s] Warning : The mod %s raise an exception: %s, I kill it" % (self.instance_name, inst.get_name(),str(exp)))
682 logger.log("[%s] Exception type : %s" % (self.instance_name, type(exp)))
683 logger.log("[%s] Traceback: %s" % (self.instance_name, traceback.format_exc()))
684 to_del.append(inst)
686 # Now remove mod that raise an exception
687 self.sched_daemon.modules_manager.clear_instances(to_del)
690 # Fill the self.broks with broks of self (process id, and co)
691 # broks of service and hosts (initial status)
692 def fill_initial_broks(self):
693 # First a Brok for delete all from my instance_id
694 b = Brok('clean_all_my_instance_id', {'instance_id' : self.instance_id})
695 self.add(b)
697 # first the program status
698 b = self.get_program_status_brok()
699 self.add(b)
701 # We can't initial_status from all this types
702 # The order is important, service need host...
703 initial_status_types = ( self.timeperiods, self.commands,
704 self.contacts, self.contactgroups,
705 self.hosts, self.hostgroups,
706 self.services, self.servicegroups )
708 for tab in initial_status_types:
709 for i in tab:
710 b = i.get_initial_status_brok()
711 self.add(b)
713 # We now have all full broks
714 self.has_full_broks = True
716 logger.log("[%s] Created initial Broks: %d" % (self.instance_name, len(self.broks)))
719 # Crate a brok with program status info
720 def get_and_register_program_status_brok(self):
721 b = self.get_program_status_brok()
722 self.add(b)
725 # Crate a brok with program status info
726 def get_and_register_update_program_status_brok(self):
727 b = self.get_program_status_brok()
728 b.type = 'update_program_status'
729 self.add(b)
732 # Get a brok with program status
733 # TODO : GET REAL VALUES
734 def get_program_status_brok(self):
735 now = int(time.time())
736 data = {"is_running" : 1,
737 "instance_id" : self.instance_id,
738 "instance_name": self.instance_name,
739 "last_alive" : now,
740 "program_start" : self.program_start,
741 "pid" : os.getpid(),
742 "daemon_mode" : 1,
743 "last_command_check" : now,
744 "last_log_rotation" : now,
745 "notifications_enabled" : self.conf.enable_notifications,
746 "active_service_checks_enabled" : self.conf.execute_service_checks,
747 "passive_service_checks_enabled" : self.conf.accept_passive_service_checks,
748 "active_host_checks_enabled" : self.conf.execute_host_checks,
749 "passive_host_checks_enabled" : self.conf.accept_passive_host_checks,
750 "event_handlers_enabled" : self.conf.enable_event_handlers,
751 "flap_detection_enabled" : self.conf.enable_flap_detection,
752 "failure_prediction_enabled" : 0,
753 "process_performance_data" : self.conf.process_performance_data,
754 "obsess_over_hosts" : self.conf.obsess_over_hosts,
755 "obsess_over_services" : self.conf.obsess_over_services,
756 "modified_host_attributes" : 0,
757 "modified_service_attributes" : 0,
758 "global_host_event_handler" : self.conf.global_host_event_handler,
759 'global_service_event_handler' : self.conf.global_service_event_handler,
760 'command_file' : self.conf.command_file
762 b = Brok('program_status', data)
763 return b
767 # Called every 1sec to consume every result in services or hosts
768 # with theses results, they are OK, CRITCAL, UP/DOWN, etc...
769 def consume_results(self):
770 #All results are in self.waiting_results
771 #We need to get them first
772 for c in self.waiting_results:
773 self.put_results(c)
774 self.waiting_results = []
776 #Then we consume them
777 #print "**********Consume*********"
778 for c in self.checks.values():
779 if c.status == 'waitconsume':
780 item = c.ref
781 item.consume_result(c)
784 # All 'finished' checks (no more dep) raise checks they depends on
785 for c in self.checks.values():
786 if c.status == 'havetoresolvedep':
787 for dependant_checks in c.depend_on_me:
788 # Ok, now dependant will no more wait c
789 dependant_checks.depend_on.remove(c.id)
790 # REMOVE OLD DEP CHECL -> zombie
791 c.status = 'zombie'
793 # Now, reinteger dep checks
794 for c in self.checks.values():
795 if c.status == 'waitdep' and len(c.depend_on) == 0:
796 item = c.ref
797 item.consume_result(c)
801 # Called every 1sec to delete all checks in a zombie state
802 # zombie = not usefull anymore
803 def delete_zombie_checks(self):
804 #print "**********Delete zombies checks****"
805 id_to_del = []
806 for c in self.checks.values():
807 if c.status == 'zombie':
808 id_to_del.append(c.id)
809 # une petite tape dans le dot et tu t'en vas, merci...
810 for id in id_to_del:
811 del self.checks[id] # ZANKUSEN!
814 # Called every 1sec to delete all actions in a zombie state
815 # zombie = not usefull anymore
816 def delete_zombie_actions(self):
817 #print "**********Delete zombies actions****"
818 id_to_del = []
819 for a in self.actions.values():
820 if a.status == 'zombie':
821 id_to_del.append(a.id)
822 # une petite tape dans le doc et tu t'en vas, merci...
823 for id in id_to_del:
824 del self.actions[id] # ZANKUSEN!
827 # Check for downtimes start and stop, and register
828 # them if need
829 def update_downtimes_and_comments(self):
830 broks = []
831 now = time.time()
833 # Check maintenance periods
834 for elt in [y for y in [x for x in self.hosts] + [x for x in self.services] if y.maintenance_period is not None]:
835 if not hasattr(elt, 'in_maintenance'):
836 setattr(elt, 'in_maintenance', False)
837 if not elt.in_maintenance:
838 if elt.maintenance_period.is_time_valid(now):
839 start_dt = elt.maintenance_period.get_next_valid_time_from_t(now)
840 end_dt = elt.maintenance_period.get_next_invalid_time_from_t(start_dt + 1) - 1
841 dt = Downtime(elt, start_dt, end_dt, 1, 0, 0, "system", "this downtime was automatically scheduled through a maintenance_period")
842 elt.add_downtime(dt)
843 self.add(dt)
844 self.get_and_register_status_brok(elt)
845 elt.in_maintenance = dt.id
846 else:
847 if not elt.in_maintenance in self.downtimes:
848 # the maint downtimes has expired or was manually deleted
849 elt.in_maintenance = False
851 # Check the validity of contact downtimes
852 for elt in self.contacts:
853 for dt in elt.downtimes:
854 dt.check_activation()
856 # A loop where those downtimes are removed
857 # which were marked for deletion (mostly by dt.exit())
858 for dt in self.downtimes.values():
859 if dt.can_be_deleted == True:
860 ref = dt.ref
861 self.del_downtime(dt.id)
862 broks.append(ref.get_update_status_brok())
864 # Same for contact downtimes:
865 for dt in self.contact_downtimes.values():
866 if dt.can_be_deleted == True:
867 ref = dt.ref
868 self.del_contact_downtime(dt.id)
869 broks.append(ref.get_update_status_brok())
871 # Downtimes are usually accompanied by a comment.
872 # An exiting downtime also invalidates it's comment.
873 for c in self.comments.values():
874 if c.can_be_deleted == True:
875 ref = c.ref
876 self.del_comment(c.id)
877 broks.append(ref.get_update_status_brok())
879 # Check start and stop times
880 for dt in self.downtimes.values():
881 if dt.real_end_time < now:
882 # this one has expired
883 broks.extend(dt.exit()) # returns downtimestop notifications
884 elif now >= dt.start_time and dt.fixed and not dt.is_in_effect:
885 # this one has to start now
886 broks.extend(dt.enter()) # returns downtimestart notifications
887 broks.append(dt.ref.get_update_status_brok())
889 for b in broks:
890 self.add(b)
893 # Main schedule function to make the regular scheduling
894 def schedule(self):
895 # ask for service and hosts their next check
896 for type_tab in [self.services, self.hosts]:
897 for i in type_tab:
898 i.schedule()
901 # Main actions reaper function : it get all new checks,
902 # notification and event handler from hosts and services
903 def get_new_actions(self):
904 # ask for service and hosts their next check
905 for type_tab in [self.services, self.hosts]:
906 for i in type_tab:
907 for a in i.actions:
908 self.add(a)
909 # We take all, we can clear it
910 i.actions = []
913 # Same the just upper, but for broks
914 def get_new_broks(self):
915 # ask for service and hosts their broks waiting
916 # be eaten
917 for type_tab in [self.services, self.hosts]:
918 for i in type_tab:
919 for b in i.broks:
920 self.add(b)
921 # We take all, we can clear it
922 i.broks = []
925 # Raise checks for no fresh states for services and hosts
926 def check_freshness(self):
927 #print "********** Check freshnesh******"
928 for type_tab in [self.services, self.hosts]:
929 for i in type_tab:
930 c = i.do_check_freshness()
931 if c is not None:
932 self.add(c)
935 # Check for orphaned checks : checks that never returns back
936 # so if inpoller and t_to_go < now - 300s : pb!
937 def check_orphaned(self):
938 now = int(time.time())
939 for c in self.checks.values():
940 if c.status == 'inpoller' and c.t_to_go < now - 300:
941 logger.log("Warning : the results of check %d never came back. I'm reenable it for polling" % c.id)
942 c.status = 'scheduled'
943 for a in self.actions.values():
944 if a.status == 'inpoller' and a.t_to_go < now - 300:
945 logger.log("Warning : the results of action %d never came back. I'm reenable it for polling" % a.id)
946 a.status = 'scheduled'
952 # Main function
953 def run(self):
954 # Then we see if we've got info in the retention file
955 self.retention_load()
957 # Ok, now all is initilised, we can make the inital broks
958 self.fill_initial_broks()
960 logger.log("[%s] First scheduling launched" % self.instance_name)
961 self.schedule()
962 logger.log("[%s] First scheduling done" % self.instance_name)
964 # Now connect to the passive satellites if need
965 for p_id in self.pollers:
966 self.pynag_con_init(p_id, type='poller')
968 # Ticks is for recurrent function call like consume
969 # del zombies etc
970 ticks = 0
971 timeout = 1.0 # For the select
973 gogogo = time.time()
975 while self.must_run:
977 elapsed, _, _ = self.sched_daemon.handleRequests(timeout)
978 if elapsed:
979 timeout -= elapsed
980 if timeout > 0:
981 continue
983 # Timeout or time over
984 timeout = 1.0
985 ticks += 1
987 # Do reccurent works like schedule, consume
988 # delete_zombie_checks
989 for i in self.recurrent_works:
990 (name, f, nb_ticks) = self.recurrent_works[i]
991 # A 0 in the tick will just disable it
992 if nb_ticks != 0:
993 if ticks % nb_ticks == 0:
994 # print "I run function :", name
997 #DBG : push actions to passives?
998 self.push_actions_to_passives_satellites()
999 self.get_actions_from_passives_satellites()
1002 #if ticks % 10 == 0:
1003 # self.conf.quick_debug()
1005 # stats
1006 nb_scheduled = len([c for c in self.checks.values() if c.status=='scheduled'])
1007 nb_inpoller = len([c for c in self.checks.values() if c.status=='inpoller'])
1008 nb_zombies = len([c for c in self.checks.values() if c.status=='zombie'])
1009 nb_notifications = len(self.actions)
1011 print "Checks:", "total", len(self.checks), "scheduled", nb_scheduled, "inpoller", nb_inpoller, "zombies", nb_zombies, "notifications", nb_notifications
1013 m = 0.0
1014 m_nb = 0
1015 for s in self.services:
1016 m += s.latency
1017 m_nb += 1
1018 if m_nb != 0:
1019 print "Average latency:", m, m_nb, m / m_nb
1021 # print "Notifications:", nb_notifications
1022 now = time.time()
1023 #for a in self.actions.values():
1024 # if a.is_a == 'notification':
1025 # print "Notif:", a.id, a.type, a.status, a.ref.get_name(), a.ref.state, a.contact.get_name(), 'level:%d' % a.notif_nb, 'launch in', int(a.t_to_go - now)
1026 # else:
1027 # print "Event:", a.id, a.status
1028 print "Nb checks send:", self.nb_checks_send
1029 self.nb_checks_send = 0
1030 print "Nb Broks send:", self.nb_broks_send
1031 self.nb_broks_send = 0
1033 time_elapsed = now - gogogo
1034 print "Check average =", int(self.nb_check_received / time_elapsed), "checks/s"
1036 #for n in self.actions.values():
1037 # if n.ref_type == 'service':
1038 # print 'Service notification', n
1039 # if n.ref_type == 'host':
1040 # print 'Host notification', n
1041 #print "."
1042 #print "Service still in checking?"
1043 #for s in self.services:
1044 # print s.get_name()+':'+str(s.in_checking)
1045 # for i in s.checks_in_progress:
1046 # print i, i.t_to_go
1047 #for s in self.hosts:
1048 # print s.get_name()+':'+str(s.in_checking)+str(s.checks_in_progress)
1049 # for i in s.checks_in_progress:
1050 # print i#self.checks[i]
1052 #for c in self.checks.values():
1053 # if c.ref_type == 'host':
1054 # print c.id, ":", c.status, 'Depend_on_me:', len(c.depend_on_me), 'depend_on', c.depend_on
1055 #hp=hpy()
1056 #print hp.heap()
1057 #print hp.heapu()
1059 self.update_retention_file(True)