Add : child_dependencies/ parent_dependencies in livestatus module.
[shinken.git] / shinken / scheduler.py
blob288752eb539e8ba944b2305bd913943ee7beb086
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
22 import select, time, os
24 import shinken.pyro_wrapper as pyro
26 from shinken.external_command import ExternalCommand
27 from shinken.check import Check
28 from shinken.notification import Notification
29 from shinken.eventhandler import EventHandler
30 from shinken.brok import Brok
31 from shinken.downtime import Downtime
32 from shinken.contactdowntime import ContactDowntime
33 from shinken.comment import Comment
34 from shinken.log import logger
38 #from guppy import hpy
40 class Scheduler:
41 def __init__(self, daemon):
42 self.daemon = daemon #Pyro daemon for incomming orders/askings
43 self.must_run = True #When set to false by us, we die and
44 #arbiter launch a new Scheduler
46 self.waiting_results = [] #satellites returns us results
47 #and for not waiting them, we are putting them here and
48 #consume thems later
50 #Every N seconds we call functions like consume, del zombies
51 #etc. All of theses functions are in recurrent_works with the
52 #every tick to run. So must be integer and > 0
53 #The order is important, so make key a int.
54 #TODO : at load, change value by configuration one (like reaper time, etc)
55 self.recurrent_works = {
56 0 : ('update_downtimes_and_comments', self.update_downtimes_and_comments, 1),
57 1 : ('schedule', self.schedule, 1), #just schedule
58 2 : ('consume_results', self.consume_results , 1), #incorpore checks and dependancies
59 3 : ('get_new_actions', self.get_new_actions, 1), #now get the news actions (checks, notif) raised
60 4 : ('get_new_broks', self.get_new_broks, 1), #and broks
61 5 : ('delete_zombie_checks', self.delete_zombie_checks, 1),
62 6 : ('delete_zombie_actions', self.delete_zombie_actions, 1),
63 #3 : (self.delete_unwanted_notifications, 1),
64 7 : ('check_freshness', self.check_freshness, 10),
65 8 : ('clean_caches', self.clean_caches, 1),
66 9 : ('update_retention_file', self.update_retention_file, 3600),
67 10 : ('check_orphaned', self.check_orphaned, 60),
68 #For NagVis like tools : udpdate our status every 10s
69 11 : ('get_and_register_update_program_status_brok', self.get_and_register_update_program_status_brok, 10),
70 #Check for system time change. And AFTER get new checks
71 #so they are changed too.
72 12 : ('check_for_system_time_change', self.check_for_system_time_change, 1),
73 #launch if need all internal checks
74 13 : ('manage_internal_checks', self.manage_internal_checks, 1),
77 #stats part
78 self.nb_checks_send = 0
79 self.nb_actions_send = 0
80 self.nb_broks_send = 0
81 self.nb_check_received = 0
83 #Log init
84 self.log = logger
85 self.log.load_obj(self)
87 self.instance_id = 0 # Temporary set. Will be erase later
88 #Ours queues
89 self.checks = {}
90 self.actions = {}
91 self.downtimes = {}
92 self.contact_downtimes = {}
93 self.comments = {}
94 self.broks = {}
95 self.has_full_broks = False #have a initial_broks in broks queue?
99 #Load conf for future use
100 def load_conf(self, conf):
101 self.program_start = int(time.time())
102 self.conf = conf
103 self.hostgroups = conf.hostgroups
104 self.hostgroups.create_reversed_list()
105 self.services = conf.services
106 #We need reversed list for search in the retention
107 #file read
108 self.services.create_reversed_list()
109 self.services.optimize_service_search(conf.hosts)
110 self.hosts = conf.hosts
111 #DBG:
112 # for h in self.hosts:
113 # print h.get_name(), h.parents
114 self.hosts.create_reversed_list()
116 self.notificationways = conf.notificationways
117 self.contacts = conf.contacts
118 self.contacts.create_reversed_list()
119 self.contactgroups = conf.contactgroups
120 self.contactgroups.create_reversed_list()
121 self.servicegroups = conf.servicegroups
122 self.servicegroups.create_reversed_list()
123 self.timeperiods = conf.timeperiods
124 self.timeperiods.create_reversed_list()
125 self.commands = conf.commands
127 #self.status_file = StatusFile(self) #External status file
128 self.instance_id = conf.instance_id #From Arbiter. Use for
129 #Broker to disting betweens
130 #schedulers
131 #self for instance_name
132 self.instance_name = conf.instance_name
134 #Now we can updte our 'ticks' for special calls
135 #like the retention one, etc
136 self.update_recurrent_works_tick('update_retention_file', self.conf.retention_update_interval)
139 #Update the 'tick' for a function call in our
140 #recurrent work
141 def update_recurrent_works_tick(self, f_name, new_tick):
142 for i in self.recurrent_works:
143 (name, f, old_tick) = self.recurrent_works[i]
144 if name == f_name:
145 print "Changing the tick for the function", name, new_tick
146 self.recurrent_works[i] = (name, f, new_tick)
149 #Load the modules from our app master
150 def load_modules(self, modules_manager, mod_instances):
151 self.modules_manager = modules_manager
152 self.mod_instances = mod_instances
155 #Oh... Arbiter want us to die... For launch a new Scheduler
156 #"Mais qu'a-t-il de plus que je n'ais pas?"
157 def die(self):
158 #first update our retention data
159 self.update_retention_file(forced=True)
160 self.must_run = False
163 #Load the external commander
164 def load_external_command(self, e):
165 self.external_command = e
168 #We've got activity in the fifo, we get and run commands
169 def run_external_command(self, command):
170 print "scheduler resolves command", command
171 ext_cmd = ExternalCommand(command)
172 self.external_command.resolve_command(ext_cmd)
175 #Schedulers have some queues. We can simplify call by adding
176 #elements into the proper queue just by looking at their type
177 #Brok -> self.broks
178 #Check -> self.checks
179 #Notification -> self.actions
180 #Downtime -> self.downtimes
181 #ContactDowntime -> self.contact_downtimes
182 def add(self, elt):
183 #For checks and notif, add is also an update function
184 if isinstance(elt, Check):
185 self.checks[elt.id] = elt
186 #A new check mean the host/service change it's next_check
187 #need to be refresh
188 b = elt.ref.get_next_schedule_brok()
189 self.add(b)
190 return
191 if isinstance(elt, Brok):
192 #For brok, we TAG brok with our instance_id
193 elt.data['instance_id'] = self.instance_id
194 self.broks[elt.id] = elt
195 return
196 if isinstance(elt, Notification):
197 self.actions[elt.id] = elt
198 #A notification ask for a brok
199 if elt.contact != None:
200 b = elt.get_initial_status_brok()
201 self.add(b)
202 return
203 if isinstance(elt, EventHandler):
204 #print "Add an event Handler", elt.id
205 self.actions[elt.id] = elt
206 return
207 if isinstance(elt, Downtime):
208 self.downtimes[elt.id] = elt
209 self.add(elt.extra_comment)
210 return
211 if isinstance(elt, ContactDowntime):
212 self.contact_downtimes[elt.id] = elt
213 return
214 if isinstance(elt, Comment):
215 self.comments[elt.id] = elt
216 b = elt.ref.get_update_status_brok()
217 self.add(b)
218 return
221 #Ours queues may explode if noone ask us for elements
222 #It's very dangerous : you can crash your server... and it's a bad thing :)
223 #So we 'just' keep last elements : 2 of max is a good overhead
224 def clean_queues(self):
225 max_checks = 2 * (len(self.hosts) + len(self.services))
226 max_broks = 2 * (len(self.hosts) + len(self.services))
227 max_actions = 2* len(self.contacts) * (len(self.hosts) + len(self.services))
229 #For checks, it's not very simple:
230 #For checks, they may be referred to their host/service
231 #We do not just del them in checks, but also in their service/host
232 #We want id of less than max_id - 2*max_checks
233 if len(self.checks) > max_checks:
234 id_max = self.checks.keys()[-1] #The max id is the last id
235 #: max is SO slow!
236 to_del_checks = [c for c in self.checks.values() if c.id < id_max - max_checks]
237 nb_checks_drops = len(to_del_checks)
238 if nb_checks_drops > 0:
239 print "I have to del some checks..., sorry", to_del_checks
240 for c in to_del_checks:
241 i = c.id
242 elt = c.ref
243 #First remove the link in host/service
244 elt.remove_in_progress_check(c)
245 #Then in dependant checks (I depend on, or check
246 #depend on me)
247 for dependant_checks in c.depend_on_me:
248 dependant_checks.depend_on.remove(c.id)
249 for c_temp in c.depend_on:
250 c_temp.depen_on_me.remove(c)
251 del self.checks[i] #Final Bye bye ...
252 else:
253 nb_checks_drops = 0
255 #For broks and actions, it's more simple
256 if len(self.broks) > max_broks:
257 id_max = self.broks.keys()[-1]
258 id_to_del_broks = [i for i in self.broks if i < id_max - max_broks]
259 nb_broks_drops = len(id_to_del_broks)
260 for i in id_to_del_broks:
261 del self.broks[i]
262 else:
263 nb_broks_drops = 0
265 if len(self.actions) > max_actions:
266 id_max = self.actions.keys()[-1]
267 id_to_del_actions = [i for i in self.actions if i < id_max - max_actions]
268 nb_actions_drops = len(id_to_del_actions)
269 for i in id_to_del_actions:
270 #Remeber to delete reference of notification in service/host
271 if i.is_a == 'notification':
272 item = self.actions[i].ref
273 item.remove_in_progress_notification(self.actions[i])
274 del self.actions[i]
275 else:
276 nb_actions_drops = 0
278 if nb_checks_drops != 0 or nb_broks_drops != 0 or nb_actions_drops != 0:
279 print "WARNING: We drop %d checks, %d broks and %d actions" % (nb_checks_drops, nb_broks_drops, nb_actions_drops)
280 logger.log("WARNING: We drop %d checks, %d broks and %d actions" % (nb_checks_drops, nb_broks_drops, nb_actions_drops))
283 #For tunning purpose we use caches but we do not whant them to explode
284 #So we clean thems
285 def clean_caches(self):
286 #print "********** Clean caches *********"
287 for tp in self.timeperiods:
288 tp.clean_cache()
291 #Ask item (host or service) a update_status
292 #and add it to our broks queue
293 def get_and_register_status_brok(self, item):
294 b = item.get_update_status_brok()
295 self.add(b)
298 #Ask item (host or service) a check_result_brok
299 #and add it to our broks queue
300 def get_and_register_check_result_brok(self, item):
301 b = item.get_check_result_brok()
302 self.add(b)
305 #We do not want this downtime id
306 def del_downtime(self, dt_id):
307 if dt_id in self.downtimes:
308 self.downtimes[dt_id].ref.del_downtime(dt_id)
309 del self.downtimes[dt_id]
311 #We do not want this downtime id
312 def del_contact_downtime(self, dt_id):
313 if dt_id in self.contact_downtimes:
314 self.contact_downtimes[dt_id].ref.del_downtime(dt_id)
315 del self.contact_downtimes[dt_id]
318 #We do not want this comment id
319 def del_comment(self, c_id):
320 if c_id in self.comments:
321 self.comments[c_id].ref.del_comment(c_id)
322 del self.comments[c_id]
325 #Called by poller to get checks
326 #Can get checks and actions (notifications and co)
327 def get_to_run_checks(self, do_checks=False, do_actions=False, poller_tags=[]):
328 res = []
329 now = time.time()
331 #If poller want to do checks
332 if do_checks:
333 for c in self.checks.values():
334 # If the command is untagged, and the poller too, or if both are taggued
335 # with same name, go for it
336 if (c.poller_tag == None and poller_tags == []) or c.poller_tag in poller_tags:
337 # must be ok to launch, and not an internal one (business rules based)
338 if c.status == 'scheduled' and c.is_launchable(now) and not c.internal:
339 c.status = 'inpoller'
340 # We do not send c, because it it link (c.ref) to
341 # host/service and poller do not need it. It just
342 # need a shell with id, command and defaults
343 # parameters. It's the goal of copy_shell
344 res.append(c.copy_shell())
346 #If reactionner want to notify too
347 if do_actions:
348 for a in self.actions.values():
349 if a.status == 'scheduled' and a.is_launchable(now):
350 a.status = 'inpoller'
351 if a.is_a == 'notification' and not a.contact:
352 # This is a "master" notification created by create_notifications.
353 # It will not be sent itself because it has no contact.
354 # We use it to create "child" notifications (for the contacts and
355 # notification_commands) which are executed in the reactionner.
356 item = a.ref
357 childnotifications = []
358 if not item.notification_is_blocked_by_item(a.type, now):
359 # If it is possible to send notifications of this type at the current time, then create
360 # a single notification for each contact of this item.
361 childnotifications = item.scatter_notification(a)
362 for c in childnotifications:
363 c.status = 'inpoller'
364 self.add(c) # this will send a brok
365 new_c = c.copy_shell()
366 res.append(new_c)
368 # If we have notification_interval then schedule the next notification (problems only)
369 if a.type == 'PROBLEM':
370 # Update the ref notif number after raise the one of the notification
371 if len(childnotifications) != 0:
372 # notif_nb of the master notification was already current_notification_number+1.
373 # If notifications were sent, then host/service-counter will also be incremented
374 item.current_notification_number = a.notif_nb
376 if item.notification_interval != 0 and a.t_to_go != None:
377 # We must continue to send notifications.
378 # Just leave it in the actions list and set it to "scheduled" and it will be found again later
379 #a.t_to_go = a.t_to_go + item.notification_interval * item.__class__.interval_length
380 # Ask the service/host to compute the next notif time. It can be just
381 # a.t_to_go + item.notification_interval * item.__class__.interval_length
382 # or maybe before because we have an escalation that need to raise up before
383 a.t_to_go = item.get_next_notification_time(a)
385 a.notif_nb = item.current_notification_number + 1
386 a.status = 'scheduled'
387 else:
388 # Wipe out this master notification. One problem notification is enough.
389 item.remove_in_progress_notification(a)
390 self.actions[a.id].status = 'zombie'
392 else:
393 # Wipe out this master notification. We don't repeat recover/downtime/flap/etc...
394 item.remove_in_progress_notification(a)
395 self.actions[a.id].status = 'zombie'
396 else:
397 # This is for child notifications and eventhandlers
398 new_a = a.copy_shell()
399 res.append(new_a)
400 return res
403 #Called by poller and reactionner to send result
404 def put_results(self, c):
405 if c.is_a == 'notification':
406 # We will only see childnotifications here
407 try:
408 self.actions[c.id].get_return_from(c)
409 item = self.actions[c.id].ref
410 item.remove_in_progress_notification(c)
411 self.actions[c.id].status = 'zombie'
412 item.last_notification = c.check_time
413 #If we' ve got a problem with the notification, raise a Warning log
414 if c.exit_status != 0:
415 logger.log("Warning : the notification command '%s' raised an error (exit code=%d) : '%s'" % (c.command, c.exit_status, c.output))
416 except KeyError , exp:
417 logger.log("Warning : received an notification of an unknown id! %s" % str(exp))
419 elif c.is_a == 'check':
420 try:
421 self.checks[c.id].get_return_from(c)
422 self.checks[c.id].status = 'waitconsume'
423 except KeyError , exp:
424 logger.log("Warning : received an check of an unknown id! %s" % str(exp))
425 elif c.is_a == 'eventhandler':
426 # It just die
427 try:
428 self.actions[c.id].status = 'zombie'
429 # Maybe we reveied a return of a old even handler, so we can forget it
430 except KeyError:
431 pass
432 else:
433 logger.log("Error : the received result type in unknown ! %s" % str(c.is_a))
436 # Some checks are purely internal, like business based one
437 # simply ask their ref to manage it when it's ok to run
438 def manage_internal_checks(self):
439 now = time.time()
440 for c in self.checks.values():
441 # must be ok to launch, and not an internal one (business rules based)
442 if c.status == 'scheduled' and c.is_launchable(now) and c.internal:
443 c.ref.manage_internal_check(c)
444 # it manage it, now just ask to consume it
445 # like for all checks
446 c.status = 'waitconsume'
450 #Call by brokers to have broks
451 #We give them, and clean them!
452 def get_broks(self):
453 res = self.broks
454 #They are gone, we keep none!
455 self.broks = {}
456 # print "returning broks"
457 # for b in res:
458 # print b, res[b]
459 return res
462 #Update the retention file and give it all of ours data in
463 #a dict so read can pickup what it wants
464 #For now compression is no use, but it can be add easylly
465 #just uncomment :)
466 def update_retention_file(self, forced=False):
467 #If we set the update to 0, we do not want of this
468 #if we do not forced (like at stopping)
469 if self.conf.retention_update_interval == 0 and not forced:
470 return
472 #Do the job for all modules that do the retention
473 for inst in self.mod_instances:
474 if 'retention' in inst.properties['phases']:
475 #Ask it with self to they have full access, and a log object
476 #so they can easily raise log
477 inst.update_retention_objects(self, logger)
481 #Load the retention file and get status from it. It do not get all checks in progress
482 #for the moment, just the status and the notifications.
483 def retention_load(self):
484 #Do this job with modules too
485 for inst in self.mod_instances:
486 if 'retention' in inst.properties['phases']:
487 #give us ourself (full control!) and a log manager object
488 b = inst.load_retention_objects(self, logger)
489 #Stop at the first module that succeed to load the retention
490 if b:
491 return
494 def check_for_system_time_change(self):
495 now = time.time()
496 difference = now - self.t_each_loop
497 #If we have more than 15 min time change, we need to compensate
499 if abs(difference) > 900:
500 self.compensate_system_time_change(difference)
502 #Now set the new value for the tick loop
503 self.t_each_loop = now
506 #If we've got a system time change, we need to compensate it
507 #So change our value, and all checks/notif ones
508 def compensate_system_time_change(self, difference):
509 logger.log('Warning: A system time change of %s has been detected. Compensating...' % difference)
510 #We only need to change some value
511 self.program_start = max(0, self.program_start + difference)
513 #Then we compasate all host/services
514 for h in self.hosts:
515 h.compensate_system_time_change(difference)
516 for s in self.services:
517 s.compensate_system_time_change(difference)
519 #Now all checks and actions
520 for c in self.checks.values():
521 #Already launch checks should not be touch
522 if c.status == 'scheduled':
523 t_to_go = c.t_to_go
524 ref = c.ref
525 new_t = max(0, t_to_go + difference)
526 #But it's no so simple, we must match the timeperiod
527 new_t = ref.check_period.get_next_valid_time_from_t(new_t)
528 #But maybe no there is no more new value! Not good :(
529 #Say as error, with error output
530 if new_t == None:
531 c.state = 'waitconsume'
532 c.exit_status = 2
533 c.output = '(Error: there is no available check time after time change!)'
534 c.check_time = time.time()
535 c.execution_time == 0
536 else:
537 c.t_to_go = new_t
538 ref.next_chk = new_t
540 #Now all checks and actions
541 for c in self.actions.values():
542 #Already launch checks should not be touch
543 if c.status == 'scheduled':
544 t_to_go = c.t_to_go
546 # Event handler do not have ref
547 ref = getattr(c, 'ref', None)
548 new_t = max(0, t_to_go + difference)
550 #Notification should be check with notification_period
551 if c.is_a == 'notification':
552 #But it's no so simple, we must match the timeperiod
553 new_t = ref.notification_period.get_next_valid_time_from_t(new_t)
554 # And got a creation_time variable too
555 c.creation_time = c.creation_time + difference
557 #But maybe no there is no more new value! Not good :(
558 #Say as error, with error output
559 if new_t == None:
560 c.state = 'waitconsume'
561 c.exit_status = 2
562 c.output = '(Error: there is no available check time after time change!)'
563 c.check_time = time.time()
564 c.execution_time == 0
565 else:
566 c.t_to_go = new_t
569 #Fill the self.broks with broks of self (process id, and co)
570 #broks of service and hosts (initial status)
571 def fill_initial_broks(self):
572 #First a Brok for delete all from my instance_id
573 b = Brok('clean_all_my_instance_id', {'instance_id' : self.instance_id})
574 self.add(b)
576 #first the program status
577 b = self.get_program_status_brok()
578 self.add(b)
580 #We cant initial_status from all this types
581 #The order is important, service need host...
582 initial_status_types = [self.timeperiods, self.commands, \
583 self.contacts, self.contactgroups, \
584 self.hosts, self.hostgroups, \
585 self.services, self.servicegroups]
587 for tab in initial_status_types:
588 for i in tab:
589 b = i.get_initial_status_brok()
590 self.add(b)
592 #We now have all full broks
593 self.has_full_broks = True
595 logger.log("[%s] Created initial Broks: %d" % (self.instance_name, len(self.broks)))
598 #Crate a brok with program status info
599 def get_and_register_program_status_brok(self):
600 b = self.get_program_status_brok()
601 self.add(b)
604 #Crate a brok with program status info
605 def get_and_register_update_program_status_brok(self):
606 b = self.get_program_status_brok()
607 b.type = 'update_program_status'
608 self.add(b)
611 #Get a brok with program status
612 #TODO : GET REAL VALUES
613 def get_program_status_brok(self):
614 now = int(time.time())
615 data = {"is_running" : 1,
616 "instance_id" : self.instance_id,
617 "instance_name": self.instance_name,
618 "last_alive" : now,
619 "program_start" : self.program_start,
620 "pid" : os.getpid(),
621 "daemon_mode" : 1,
622 "last_command_check" : now,
623 "last_log_rotation" : now,
624 "notifications_enabled" : self.conf.enable_notifications,
625 "active_service_checks_enabled" : self.conf.execute_service_checks,
626 "passive_service_checks_enabled" : self.conf.accept_passive_service_checks,
627 "active_host_checks_enabled" : self.conf.execute_host_checks,
628 "passive_host_checks_enabled" : self.conf.accept_passive_host_checks,
629 "event_handlers_enabled" : self.conf.enable_event_handlers,
630 "flap_detection_enabled" : self.conf.enable_flap_detection,
631 "failure_prediction_enabled" : 0,
632 "process_performance_data" : self.conf.process_performance_data,
633 "obsess_over_hosts" : self.conf.obsess_over_hosts,
634 "obsess_over_services" : self.conf.obsess_over_services,
635 "modified_host_attributes" : 0,
636 "modified_service_attributes" : 0,
637 "global_host_event_handler" : self.conf.global_host_event_handler,
638 'global_service_event_handler' : self.conf.global_service_event_handler,
639 'command_file' : self.conf.command_file
641 b = Brok('program_status', data)
642 return b
646 #Called every 1sec to consume every result in services or hosts
647 #with theses results, they are OK, CRITCAL, UP/DOWN, etc...
648 def consume_results(self):
649 #All results are in self.waiting_results
650 #We need to get thems first
651 for c in self.waiting_results:
652 self.put_results(c)
653 self.waiting_results = []
655 #Then we consume thems
656 #print "**********Consume*********"
657 for c in self.checks.values():
658 if c.status == 'waitconsume':
659 item = c.ref
660 item.consume_result(c)
663 #All 'finished' checks (no more dep) raise checks they depends on
664 for c in self.checks.values():
665 if c.status == 'havetoresolvedep':
666 for dependant_checks in c.depend_on_me:
667 #Ok, now dependant will no more wait c
668 dependant_checks.depend_on.remove(c.id)
669 #REMOVE OLD DEP CHECL -> zombie
670 c.status = 'zombie'
672 #Now, reinteger dep checks
673 for c in self.checks.values():
674 if c.status == 'waitdep' and len(c.depend_on) == 0:
675 item = c.ref
676 item.consume_result(c)
680 #Called every 1sec to delete all checks in a zombie state
681 #zombie = not usefull anymore
682 def delete_zombie_checks(self):
683 #print "**********Delete zombies checks****"
684 id_to_del = []
685 for c in self.checks.values():
686 if c.status == 'zombie':
687 id_to_del.append(c.id)
688 #une petite tape dans le dot et tu t'en vas, merci...
689 for id in id_to_del:
690 del self.checks[id] # ZANKUSEN!
693 #Called every 1sec to delete all actions in a zombie state
694 #zombie = not usefull anymore
695 def delete_zombie_actions(self):
696 #print "**********Delete zombies actions****"
697 id_to_del = []
698 for a in self.actions.values():
699 if a.status == 'zombie':
700 id_to_del.append(a.id)
701 #une petite tape dans le doc et tu t'en vas, merci...
702 for id in id_to_del:
703 del self.actions[id] # ZANKUSEN!
706 #Check for downtimes start and stop, and register
707 #them if need
708 def update_downtimes_and_comments(self):
709 broks = []
710 now = time.time()
712 #Check maintenance periods
713 for elt in [y for y in [x for x in self.hosts] + [x for x in self.services] if y.maintenance_period != None]:
714 if not hasattr(elt, 'in_maintenance'):
715 setattr(elt, 'in_maintenance', False)
716 if not elt.in_maintenance:
717 if elt.maintenance_period.is_time_valid(now):
718 start_dt = elt.maintenance_period.get_next_valid_time_from_t(now)
719 end_dt = elt.maintenance_period.get_next_invalid_time_from_t(start_dt + 1) - 1
720 dt = Downtime(elt, start_dt, end_dt, 1, 0, 0, "system", "this downtime was automatically scheduled through a maintenance_period")
721 elt.add_downtime(dt)
722 self.add(dt)
723 self.get_and_register_status_brok(elt)
724 elt.in_maintenance = dt.id
725 else:
726 if not elt.in_maintenance in self.downtimes:
727 # the maint downtimes has expired or was manually deleted
728 elt.in_maintenance = False
730 # Check the validity of contact downtimes
731 for elt in self.contacts:
732 for dt in elt.downtimes:
733 dt.check_activation()
735 #A loop where those downtimes are removed
736 #which were marked for deletion (mostly by dt.exit())
737 for dt in self.downtimes.values():
738 if dt.can_be_deleted == True:
739 ref = dt.ref
740 self.del_downtime(dt.id)
741 broks.append(ref.get_update_status_brok())
743 # Same for contact downtimes:
744 for dt in self.contact_downtimes.values():
745 if dt.can_be_deleted == True:
746 ref = dt.ref
747 self.del_contact_downtime(dt.id)
748 broks.append(ref.get_update_status_brok())
750 #Downtimes are usually accompanied by a comment.
751 #An exiting downtime also invalidates it's comment.
752 for c in self.comments.values():
753 if c.can_be_deleted == True:
754 ref = c.ref
755 self.del_comment(c.id)
756 broks.append(ref.get_update_status_brok())
758 #Check start and stop times
759 for dt in self.downtimes.values():
760 if dt.real_end_time < now:
761 #this one has expired
762 broks.extend(dt.exit()) # returns downtimestop notifications
763 elif now >= dt.start_time and dt.fixed and not dt.is_in_effect:
764 #this one has to start now
765 broks.extend(dt.enter()) # returns downtimestart notifications
766 broks.append(dt.ref.get_update_status_brok())
768 for b in broks:
769 self.add(b)
772 #Main schedule function to make the regular scheduling
773 def schedule(self):
774 #ask for service and hosts their next check
775 for type_tab in [self.services, self.hosts]:
776 for i in type_tab:
777 i.schedule()
780 #Main actions reaper function : it get all new checks,
781 #notification and event handler from hosts and services
782 def get_new_actions(self):
783 #ask for service and hosts their next check
784 for type_tab in [self.services, self.hosts]:
785 for i in type_tab:
786 for a in i.actions:
787 self.add(a)
788 #We take all, we can clear it
789 i.actions = []
792 #Same the just upper, but for broks
793 def get_new_broks(self):
794 #ask for service and hosts their broks waiting
795 #be eaten
796 for type_tab in [self.services, self.hosts]:
797 for i in type_tab:
798 for b in i.broks:
799 self.add(b)
800 #We take all, we can clear it
801 i.broks = []
804 #Raise checks for no fresh states for services and hosts
805 def check_freshness(self):
806 #print "********** Check freshnesh******"
807 for type_tab in [self.services, self.hosts]:
808 for i in type_tab:
809 c = i.do_check_freshness()
810 if c is not None:
811 self.add(c)
814 #Check for orphaned checks : checks that never returns back
815 #so if inpoller and t_to_go < now - 300s : pb!
816 def check_orphaned(self):
817 now = int(time.time())
818 for c in self.checks.values():
819 if c.status == 'inpoller' and c.t_to_go < now - 300:
820 logger.log("Warning : the results of check %d never came back. I'm reenable it for polling" % c.id)
821 c.status = 'scheduled'
822 for a in self.actions.values():
823 if a.status == 'inpoller' and a.t_to_go < now - 300:
824 logger.log("Warning : the results of action %d never came back. I'm reenable it for polling" % a.id)
825 a.status = 'scheduled'
831 #Main function
832 def run(self):
833 #Then we see if we've got info in the retention file
834 self.retention_load()
836 #Ok, now all is initilised, we can make the inital broks
837 self.fill_initial_broks()
839 logger.log("[%s] First scheduling launched" % self.instance_name)
840 self.schedule()
841 logger.log("[%s] First scheduling done" % self.instance_name)
842 #Ticks is for recurrent function call like consume
843 #del zombies etc
844 ticks = 0
845 timeout = 1.0 #For the select
847 gogogo = time.time()
848 self.t_each_loop = time.time() #use to track system time change
850 while self.must_run :
851 socks = pyro.get_sockets(self.daemon)
852 t_begin = time.time()
853 #socks.append(self.fifo)
854 # 'foreign' event loop
855 ins,outs,exs = select.select(socks,[],[],timeout)
856 if ins != []:
857 for s in socks:
858 if s in ins:
859 pyro.handleRequests(self.daemon, s)
860 t_after = time.time()
861 diff = t_after-t_begin
862 timeout = timeout - diff
863 break # no need to continue with the for loop
864 else: #Timeout
865 timeout = 1.0
866 ticks += 1
868 #Do reccurent works like schedule, consume
869 #delete_zombie_checks
870 for i in self.recurrent_works:
871 (name, f, nb_ticks) = self.recurrent_works[i]
872 #A 0 in the tick will just disable it
873 if nb_ticks != 0:
874 if ticks % nb_ticks == 0:
875 #print "I run function :", name
878 #if ticks % 10 == 0:
879 # self.conf.quick_debug()
881 #stats
882 nb_scheduled = len([c for c in self.checks.values() if c.status=='scheduled'])
883 nb_inpoller = len([c for c in self.checks.values() if c.status=='inpoller'])
884 nb_zombies = len([c for c in self.checks.values() if c.status=='zombie'])
885 nb_notifications = len(self.actions)
887 print "Checks:", "total", len(self.checks), "scheduled", nb_scheduled, "inpoller", nb_inpoller, "zombies", nb_zombies, "notifications", nb_notifications
889 m = 0.0
890 m_nb = 0
891 for s in self.services:
892 m += s.latency
893 m_nb += 1
894 if m_nb != 0:
895 print "Average latency:", m, m_nb, m / m_nb
897 #print "Notifications:", nb_notifications
898 now = time.time()
899 #for a in self.actions.values():
900 # if a.is_a == 'notification':
901 # print "Notif:", a.id, a.type, a.status, a.ref.get_name(), a.ref.state, a.contact.get_name(), 'level:%d' % a.notif_nb, 'launch in', int(a.t_to_go - now)
902 # else:
903 # print "Event:", a.id, a.status
904 print "Nb checks send:", self.nb_checks_send
905 self.nb_checks_send = 0
906 print "Nb Broks send:", self.nb_broks_send
907 self.nb_broks_send = 0
909 time_elapsed = now - gogogo
910 print "Check average =", int(self.nb_check_received / time_elapsed), "checks/s"
912 #for n in self.actions.values():
913 # if n.ref_type == 'service':
914 # print 'Service notification', n
915 # if n.ref_type == 'host':
916 # print 'Host notification', n
917 #print "."
918 #print "Service still in checking?"
919 #for s in self.services:
920 # print s.get_name()+':'+str(s.in_checking)
921 # for i in s.checks_in_progress:
922 # print i, i.t_to_go
923 #for s in self.hosts:
924 # print s.get_name()+':'+str(s.in_checking)+str(s.checks_in_progress)
925 # for i in s.checks_in_progress:
926 # print i#self.checks[i]
928 #for c in self.checks.values():
929 # if c.ref_type == 'host':
930 # print c.id, ":", c.status, 'Depend_on_me:', len(c.depend_on_me), 'depend_on', c.depend_on
931 #hp=hpy()
932 #print hp.heap()
933 #print hp.heapu()
936 if timeout < 0:
937 timeout = 1.0