*Add forgotten test case for host_without_cmd
[shinken.git] / shinken / scheduler.py
blobb123849ac6747dceec3a004c703879a77b59f78a
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
22 import select, time, os
24 import shinken.pyro_wrapper
26 from shinken.external_command import ExternalCommand
27 from shinken.check import Check
28 from shinken.notification import Notification
29 from shinken.eventhandler import EventHandler
30 from shinken.brok import Brok
31 from shinken.downtime import Downtime
32 from shinken.comment import Comment
33 from shinken.log import logger
37 #from guppy import hpy
39 class Scheduler:
40 def __init__(self, daemon):
41 self.daemon = daemon #Pyro daemon for incomming orders/askings
42 self.must_run = True #When set to false by us, we die and
43 #arbiter launch a new Scheduler
45 self.waiting_results = [] #satellites returns us results
46 #and for not waiting them, we are putting them here and
47 #consume thems later
49 #Every N seconds we call functions like consume, del zombies
50 #etc. All of theses functions are in recurrent_works with the
51 #every tick to run. So must be integer and > 0
52 #The order is important, so make key a int.
53 #TODO : at load, change value by configuration one (like reaper time, etc)
54 self.recurrent_works = {
55 0 : ('update_downtimes_and_comments', self.update_downtimes_and_comments, 1),
56 1 : ('schedule', self.schedule, 1), #just schedule
57 2 : ('consume_results', self.consume_results , 1), #incorpore checks and dependancies
58 3 : ('get_new_actions', self.get_new_actions, 1), #now get the news actions (checks, notif) raised
59 4 : ('get_new_broks', self.get_new_broks, 1), #and broks
60 5 : ('delete_zombie_checks', self.delete_zombie_checks, 1),
61 6 : ('delete_zombie_actions', self.delete_zombie_actions, 1),
62 #3 : (self.delete_unwanted_notifications, 1),
63 7 : ('check_freshness', self.check_freshness, 10),
64 8 : ('clean_caches', self.clean_caches, 1),
65 9 : ('update_retention_file', self.update_retention_file, 3600),
66 10 : ('check_orphaned', self.check_orphaned, 60),
67 #For NagVis like tools : udpdate our status every 10s
68 11 : ('get_and_register_update_program_status_brok', self.get_and_register_update_program_status_brok, 10),
69 #Check for system time change. And AFTER get new checks
70 #so they are changed too.
71 12 : ('check_for_system_time_change', self.check_for_system_time_change, 1),
72 #launch if need all internal checks
73 13 : ('manage_internal_checks', self.manage_internal_checks, 1),
76 #stats part
77 self.nb_checks_send = 0
78 self.nb_actions_send = 0
79 self.nb_broks_send = 0
80 self.nb_check_received = 0
82 #Log init
83 self.log = logger
84 self.log.load_obj(self)
86 self.instance_id = 0 # Temporary set. Will be erase later
87 #Ours queues
88 self.checks = {}
89 self.actions = {}
90 self.downtimes = {}
91 self.comments = {}
92 self.broks = {}
93 self.has_full_broks = False #have a initial_broks in broks queue?
97 #Load conf for future use
98 def load_conf(self, conf):
99 self.program_start = int(time.time())
100 self.conf = conf
101 self.hostgroups = conf.hostgroups
102 self.hostgroups.create_reversed_list()
103 self.services = conf.services
104 #We need reversed list for search in the retention
105 #file read
106 self.services.create_reversed_list()
107 self.services.optimize_service_search(conf.hosts)
108 self.hosts = conf.hosts
109 #DBG:
110 # for h in self.hosts:
111 # print h.get_name(), h.parents
112 self.hosts.create_reversed_list()
114 self.notificationways = conf.notificationways
115 self.contacts = conf.contacts
116 self.contacts.create_reversed_list()
117 self.contactgroups = conf.contactgroups
118 self.contactgroups.create_reversed_list()
119 self.servicegroups = conf.servicegroups
120 self.servicegroups.create_reversed_list()
121 self.timeperiods = conf.timeperiods
122 self.timeperiods.create_reversed_list()
123 self.commands = conf.commands
125 #self.status_file = StatusFile(self) #External status file
126 self.instance_id = conf.instance_id #From Arbiter. Use for
127 #Broker to disting betweens
128 #schedulers
129 #self for instance_name
130 self.instance_name = conf.instance_name
132 #Now we can updte our 'ticks' for special calls
133 #like the retention one, etc
134 self.update_recurrent_works_tick('update_retention_file', self.conf.retention_update_interval)
137 #Update the 'tick' for a function call in our
138 #recurrent work
139 def update_recurrent_works_tick(self, f_name, new_tick):
140 for i in self.recurrent_works:
141 (name, f, old_tick) = self.recurrent_works[i]
142 if name == f_name:
143 print "Changing the tick for the function", name, new_tick
144 self.recurrent_works[i] = (name, f, new_tick)
147 #Load the modules from our app master
148 def load_modules(self, modules_manager, mod_instances):
149 self.modules_manager = modules_manager
150 self.mod_instances = mod_instances
153 #Oh... Arbiter want us to die... For launch a new Scheduler
154 #"Mais qu'a-t-il de plus que je n'ais pas?"
155 def die(self):
156 #first update our retention data
157 self.update_retention_file(forced=True)
158 self.must_run = False
161 #Load the external commander
162 def load_external_command(self, e):
163 self.external_command = e
166 #We've got activity in the fifo, we get and run commands
167 def run_external_command(self, command):
168 print "scheduler resolves command", command
169 ext_cmd = ExternalCommand(command)
170 self.external_command.resolve_command(ext_cmd)
173 #Schedulers have some queues. We can simplify call by adding
174 #elements into the proper queue just by looking at their type
175 #Brok -> self.broks
176 #Check -> self.checks
177 #Notification -> self.actions
178 #Downtime -> self.downtimes
179 def add(self, elt):
180 #For checks and notif, add is also an update function
181 if isinstance(elt, Check):
182 self.checks[elt.id] = elt
183 #A new check mean the host/service change it's next_check
184 #need to be refresh
185 b = elt.ref.get_next_schedule_brok()
186 self.add(b)
187 return
188 if isinstance(elt, Brok):
189 #For brok, we TAG brok with our instance_id
190 elt.data['instance_id'] = self.instance_id
191 self.broks[elt.id] = elt
192 return
193 if isinstance(elt, Notification):
194 self.actions[elt.id] = elt
195 #A notification ask for a brok
196 if elt.contact != None:
197 b = elt.get_initial_status_brok()
198 self.add(b)
199 return
200 if isinstance(elt, EventHandler):
201 #print "Add an event Handler", elt.id
202 self.actions[elt.id] = elt
203 return
204 if isinstance(elt, Downtime):
205 self.downtimes[elt.id] = elt
206 self.add(elt.extra_comment)
207 return
208 if isinstance(elt, Comment):
209 self.comments[elt.id] = elt
210 b = elt.ref.get_update_status_brok()
211 self.add(b)
212 return
215 #Ours queues may explode if noone ask us for elements
216 #It's very dangerous : you can crash your server... and it's a bad thing :)
217 #So we 'just' keep last elements : 2 of max is a good overhead
218 def clean_queues(self):
219 max_checks = 2 * (len(self.hosts) + len(self.services))
220 max_broks = 2 * (len(self.hosts) + len(self.services))
221 max_actions = 2* len(self.contacts) * (len(self.hosts) + len(self.services))
223 #For checks, it's not very simple:
224 #For checks, they may be referred to their host/service
225 #We do not just del them in checks, but also in their service/host
226 #We want id of less than max_id - 2*max_checks
227 if len(self.checks) > max_checks:
228 id_max = self.checks.keys()[-1] #The max id is the last id
229 #: max is SO slow!
230 to_del_checks = [c for c in self.checks.values() if c.id < id_max - max_checks]
231 nb_checks_drops = len(to_del_checks)
232 if nb_checks_drops > 0:
233 print "I have to del some checks..., sorry", to_del_checks
234 for c in to_del_checks:
235 i = c.id
236 elt = c.ref
237 #First remove the link in host/service
238 elt.remove_in_progress_check(c)
239 #Then in dependant checks (I depend on, or check
240 #depend on me)
241 for dependant_checks in c.depend_on_me:
242 dependant_checks.depend_on.remove(c.id)
243 for c_temp in c.depend_on:
244 c_temp.depen_on_me.remove(c)
245 del self.checks[i] #Final Bye bye ...
246 else:
247 nb_checks_drops = 0
249 #For broks and actions, it's more simple
250 if len(self.broks) > max_broks:
251 id_max = self.broks.keys()[-1]
252 id_to_del_broks = [i for i in self.broks if i < id_max - max_broks]
253 nb_broks_drops = len(id_to_del_broks)
254 for i in id_to_del_broks:
255 del self.broks[i]
256 else:
257 nb_broks_drops = 0
259 if len(self.actions) > max_actions:
260 id_max = self.actions.keys()[-1]
261 id_to_del_actions = [i for i in self.actions if i < id_max - max_actions]
262 nb_actions_drops = len(id_to_del_actions)
263 for i in id_to_del_actions:
264 #Remeber to delete reference of notification in service/host
265 if i.is_a == 'notification':
266 item = self.actions[i].ref
267 item.remove_in_progress_notification(self.actions[i])
268 del self.actions[i]
269 else:
270 nb_actions_drops = 0
272 if nb_checks_drops != 0 or nb_broks_drops != 0 or nb_actions_drops != 0:
273 print "WARNING: We drop %d checks, %d broks and %d actions" % (nb_checks_drops, nb_broks_drops, nb_actions_drops)
274 logger.log("WARNING: We drop %d checks, %d broks and %d actions" % (nb_checks_drops, nb_broks_drops, nb_actions_drops))
277 #For tunning purpose we use caches but we do not whant them to explode
278 #So we clean thems
279 def clean_caches(self):
280 #print "********** Clean caches *********"
281 for tp in self.timeperiods:
282 tp.clean_cache()
285 #Ask item (host or service) a update_status
286 #and add it to our broks queue
287 def get_and_register_status_brok(self, item):
288 b = item.get_update_status_brok()
289 self.add(b)
292 #Ask item (host or service) a check_result_brok
293 #and add it to our broks queue
294 def get_and_register_check_result_brok(self, item):
295 b = item.get_check_result_brok()
296 self.add(b)
299 #We do not want this downtime id
300 def del_downtime(self, dt_id):
301 if dt_id in self.downtimes:
302 self.downtimes[dt_id].ref.del_downtime(dt_id)
303 del self.downtimes[dt_id]
306 #We do not want this comment id
307 def del_comment(self, c_id):
308 if c_id in self.comments:
309 self.comments[c_id].ref.del_comment(c_id)
310 del self.comments[c_id]
313 #Called by poller to get checks
314 #Can get checks and actions (notifications and co)
315 def get_to_run_checks(self, do_checks=False, do_actions=False, poller_tags=[]):
316 res = []
317 now = time.time()
319 #If poller want to do checks
320 if do_checks:
321 for c in self.checks.values():
322 # If the command is untagged, and the poller too, or if both are taggued
323 # with same name, go for it
324 if (c.poller_tag == None and poller_tags == []) or c.poller_tag in poller_tags:
325 # must be ok to launch, and not an internal one (business rules based)
326 if c.status == 'scheduled' and c.is_launchable(now) and not c.internal:
327 c.status = 'inpoller'
328 # We do not send c, because it it link (c.ref) to
329 # host/service and poller do not need it. It just
330 # need a shell with id, command and defaults
331 # parameters. It's the goal of copy_shell
332 res.append(c.copy_shell())
334 #If reactionner want to notify too
335 if do_actions:
336 for a in self.actions.values():
337 if a.status == 'scheduled' and a.is_launchable(now):
338 a.status = 'inpoller'
339 if a.is_a == 'notification' and not a.contact:
340 # This is a "master" notification created by create_notifications. It will not be sent itself because it has no contact.
341 # We use it to create "child" notifications (for the contacts and notification_commands) which are executed in the reactionner.
342 item = a.ref
343 childnotifications = []
344 if not item.notification_is_blocked_by_item(a.type, now):
345 # If it is possible to send notifications of this type at the current time, then create
346 # a single notification for each contact of this item.
347 childnotifications = item.scatter_notification(a)
348 for c in childnotifications:
349 c.status = 'inpoller'
350 self.add(c) # this will send a brok
351 new_c = c.copy_shell()
352 res.append(new_c)
354 # If we have notification_interval then schedule the next notification (problems only)
355 if a.type == 'PROBLEM':
356 if len(childnotifications) != 0:
357 # notif_nb of the master notification was already current_notification_number+1.
358 # If notifications were sent, then host/service-counter will also be incremented
359 item.current_notification_number = a.notif_nb
361 if item.notification_interval != 0 and a.t_to_go != None:
362 # We must continue to send notifications.
363 # Just leave it in the actions list and set it to "scheduled" and it will be found again later
364 # if a.t_to_go == None or item.notification_interval == None:
365 # print "A to go", a, a.t_to_go, item.notification_interval
366 a.t_to_go = a.t_to_go + item.notification_interval * item.__class__.interval_length
367 a.notif_nb = item.current_notification_number + 1
368 a.status = 'scheduled'
369 else:
370 # Wipe out this master notification. One problem notification is enough.
371 item.remove_in_progress_notification(a)
372 self.actions[a.id].status = 'zombie'
373 else:
374 # Wipe out this master notification. We don't repeat recover/downtime/flap/etc...
375 item.remove_in_progress_notification(a)
376 self.actions[a.id].status = 'zombie'
377 else:
378 # This is for child notifications and eventhandlers
379 new_a = a.copy_shell()
380 res.append(new_a)
381 return res
384 #Called by poller and reactionner to send result
385 def put_results(self, c):
386 if c.is_a == 'notification':
387 # We will only see childnotifications here
388 try:
389 self.actions[c.id].get_return_from(c)
390 item = self.actions[c.id].ref
391 item.remove_in_progress_notification(c)
392 self.actions[c.id].status = 'zombie'
393 item.last_notification = c.check_time
394 #If we' ve got a problem with the notification, raise a Warning log
395 if c.exit_status != 0:
396 logger.log("Warning : the notification command '%s' raised an error (exit code=%d) : '%s'" % (c.command, c.exit_status, c.output))
397 except KeyError , exp:
398 logger.log("Warning : received an notification of an unknown id! %s" % str(exp))
400 elif c.is_a == 'check':
401 try:
402 self.checks[c.id].get_return_from(c)
403 self.checks[c.id].status = 'waitconsume'
404 except KeyError , exp:
405 logger.log("Warning : received an check of an unknown id! %s" % str(exp))
406 elif c.is_a == 'eventhandler':
407 # It just die
408 try:
409 self.actions[c.id].status = 'zombie'
410 # Maybe we reveied a return of a old even handler, so we can forget it
411 except KeyError:
412 pass
413 else:
414 logger.log("Error : the received result type in unknown ! %s" % str(c.is_a))
417 # Some checks are purely internal, like business based one
418 # simply ask their ref to manage it when it's ok to run
419 def manage_internal_checks(self):
420 now = time.time()
421 for c in self.checks.values():
422 # must be ok to launch, and not an internal one (business rules based)
423 if c.status == 'scheduled' and c.is_launchable(now) and c.internal:
424 c.ref.manage_internal_check(c)
425 # it manage it, now just ask to consume it
426 # like for all checks
427 c.status = 'waitconsume'
431 #Call by brokers to have broks
432 #We give them, and clean them!
433 def get_broks(self):
434 res = self.broks
435 #They are gone, we keep none!
436 self.broks = {}
437 # print "returning broks"
438 # for b in res:
439 # print b, res[b]
440 return res
443 #Update the retention file and give it all of ours data in
444 #a dict so read can pickup what it wants
445 #For now compression is no use, but it can be add easylly
446 #just uncomment :)
447 def update_retention_file(self, forced=False):
448 #If we set the update to 0, we do not want of this
449 #if we do not forced (like at stopping)
450 if self.conf.retention_update_interval == 0 and not forced:
451 return
453 #Do the job for all modules that do the retention
454 for inst in self.mod_instances:
455 if 'retention' in inst.properties['phases']:
456 #Ask it with self to they have full access, and a log object
457 #so they can easily raise log
458 inst.update_retention_objects(self, logger)
462 #Load the retention file and get status from it. It do not get all checks in progress
463 #for the moment, just the status and the notifications.
464 def retention_load(self):
465 #Do this job with modules too
466 for inst in self.mod_instances:
467 if 'retention' in inst.properties['phases']:
468 #give us ourself (full control!) and a log manager object
469 b = inst.load_retention_objects(self, logger)
470 #Stop at the first module that succeed to load the retention
471 if b:
472 return
475 def check_for_system_time_change(self):
476 now = time.time()
477 difference = now - self.t_each_loop
478 #If we have more than 15 min time change, we need to compensate
480 if abs(difference) > 900:
481 self.compensate_system_time_change(difference)
483 #Now set the new value for the tick loop
484 self.t_each_loop = now
487 #If we've got a system time change, we need to compensate it
488 #So change our value, and all checks/notif ones
489 def compensate_system_time_change(self, difference):
490 logger.log('Warning: A system time change of %s has been detected. Compensating...' % difference)
491 #We only need to change some value
492 self.program_start = max(0, self.program_start + difference)
494 #Then we compasate all host/services
495 for h in self.hosts:
496 h.compensate_system_time_change(difference)
497 for s in self.services:
498 s.compensate_system_time_change(difference)
500 #Now all checks and actions
501 for c in self.checks.values():
502 #Already launch checks should not be touch
503 if c.status == 'scheduled':
504 t_to_go = c.t_to_go
505 ref = c.ref
506 new_t = max(0, t_to_go + difference)
507 #But it's no so simple, we must match the timeperiod
508 new_t = ref.check_period.get_next_valid_time_from_t(new_t)
509 #But maybe no there is no more new value! Not good :(
510 #Say as error, with error output
511 if new_t == None:
512 c.state = 'waitconsume'
513 c.exit_status = 2
514 c.output = '(Error: there is no available check time after time change!)'
515 c.check_time = time.time()
516 c.execution_time == 0
517 else:
518 c.t_to_go = new_t
519 ref.next_chk = new_t
521 #Now all checks and actions
522 for c in self.actions.values():
523 #Already launch checks should not be touch
524 if c.status == 'scheduled':
525 t_to_go = c.t_to_go
527 # Event handler do not have ref
528 ref = getattr(c, 'ref', None)
529 new_t = max(0, t_to_go + difference)
531 #Notification should be check with notification_period
532 if c.is_a == 'notification':
533 #But it's no so simple, we must match the timeperiod
534 new_t = ref.notification_period.get_next_valid_time_from_t(new_t)
536 #But maybe no there is no more new value! Not good :(
537 #Say as error, with error output
538 if new_t == None:
539 c.state = 'waitconsume'
540 c.exit_status = 2
541 c.output = '(Error: there is no available check time after time change!)'
542 c.check_time = time.time()
543 c.execution_time == 0
544 else:
545 c.t_to_go = new_t
548 #Fill the self.broks with broks of self (process id, and co)
549 #broks of service and hosts (initial status)
550 def fill_initial_broks(self):
551 #First a Brok for delete all from my instance_id
552 b = Brok('clean_all_my_instance_id', {'instance_id' : self.instance_id})
553 self.add(b)
555 #first the program status
556 b = self.get_program_status_brok()
557 self.add(b)
559 #We cant initial_status from all this types
560 #The order is important, service need host...
561 initial_status_types = [self.timeperiods, self.commands, \
562 self.contacts, self.contactgroups, \
563 self.hosts, self.hostgroups, \
564 self.services, self.servicegroups]
566 for tab in initial_status_types:
567 for i in tab:
568 b = i.get_initial_status_brok()
569 self.add(b)
571 #We now have all full broks
572 self.has_full_broks = True
574 logger.log("[%s] Created initial Broks: %d" % (self.instance_name, len(self.broks)))
577 #Crate a brok with program status info
578 def get_and_register_program_status_brok(self):
579 b = self.get_program_status_brok()
580 self.add(b)
583 #Crate a brok with program status info
584 def get_and_register_update_program_status_brok(self):
585 b = self.get_program_status_brok()
586 b.type = 'update_program_status'
587 self.add(b)
590 #Get a brok with program status
591 #TODO : GET REAL VALUES
592 def get_program_status_brok(self):
593 now = int(time.time())
594 data = {"is_running" : 1,
595 "instance_id" : self.instance_id,
596 "instance_name": self.instance_name,
597 "last_alive" : now,
598 "program_start" : self.program_start,
599 "pid" : os.getpid(),
600 "daemon_mode" : 1,
601 "last_command_check" : now,
602 "last_log_rotation" : now,
603 "notifications_enabled" : self.conf.enable_notifications,
604 "active_service_checks_enabled" : self.conf.execute_service_checks,
605 "passive_service_checks_enabled" : self.conf.accept_passive_service_checks,
606 "active_host_checks_enabled" : self.conf.execute_host_checks,
607 "passive_host_checks_enabled" : self.conf.accept_passive_host_checks,
608 "event_handlers_enabled" : self.conf.enable_event_handlers,
609 "flap_detection_enabled" : self.conf.enable_flap_detection,
610 "failure_prediction_enabled" : 0,
611 "process_performance_data" : self.conf.process_performance_data,
612 "obsess_over_hosts" : self.conf.obsess_over_hosts,
613 "obsess_over_services" : self.conf.obsess_over_services,
614 "modified_host_attributes" : 0,
615 "modified_service_attributes" : 0,
616 "global_host_event_handler" : self.conf.global_host_event_handler,
617 'global_service_event_handler' : self.conf.global_service_event_handler,
618 'command_file' : self.conf.command_file
620 b = Brok('program_status', data)
621 return b
625 #Called every 1sec to consume every result in services or hosts
626 #with theses results, they are OK, CRITCAL, UP/DOWN, etc...
627 def consume_results(self):
628 #All results are in self.waiting_results
629 #We need to get thems first
630 for c in self.waiting_results:
631 self.put_results(c)
632 self.waiting_results = []
634 #Then we consume thems
635 #print "**********Consume*********"
636 for c in self.checks.values():
637 if c.status == 'waitconsume':
638 item = c.ref
639 item.consume_result(c)
642 #All 'finished' checks (no more dep) raise checks they depends on
643 for c in self.checks.values():
644 if c.status == 'havetoresolvedep':
645 for dependant_checks in c.depend_on_me:
646 #Ok, now dependant will no more wait c
647 dependant_checks.depend_on.remove(c.id)
648 #REMOVE OLD DEP CHECL -> zombie
649 c.status = 'zombie'
651 #Now, reinteger dep checks
652 for c in self.checks.values():
653 if c.status == 'waitdep' and len(c.depend_on) == 0:
654 item = c.ref
655 item.consume_result(c)
659 #Called every 1sec to delete all checks in a zombie state
660 #zombie = not usefull anymore
661 def delete_zombie_checks(self):
662 #print "**********Delete zombies checks****"
663 id_to_del = []
664 for c in self.checks.values():
665 if c.status == 'zombie':
666 id_to_del.append(c.id)
667 #une petite tape dans le dot et tu t'en vas, merci...
668 for id in id_to_del:
669 del self.checks[id] # ZANKUSEN!
672 #Called every 1sec to delete all actions in a zombie state
673 #zombie = not usefull anymore
674 def delete_zombie_actions(self):
675 #print "**********Delete zombies actions****"
676 id_to_del = []
677 for a in self.actions.values():
678 if a.status == 'zombie':
679 id_to_del.append(a.id)
680 #une petite tape dans le doc et tu t'en vas, merci...
681 for id in id_to_del:
682 del self.actions[id] # ZANKUSEN!
685 #Check for downtimes start and stop, and register
686 #them if need
687 def update_downtimes_and_comments(self):
688 broks = []
689 now = time.time()
691 #Check maintenance periods
692 for elt in [y for y in [x for x in self.hosts] + [x for x in self.services] if y.maintenance_period != None]:
693 if not hasattr(elt, 'in_maintenance'):
694 setattr(elt, 'in_maintenance', False)
695 if not elt.in_maintenance:
696 if elt.maintenance_period.is_time_valid(now):
697 start_dt = elt.maintenance_period.get_next_valid_time_from_t(now)
698 end_dt = elt.maintenance_period.get_next_invalid_time_from_t(start_dt + 1) - 1
699 dt = Downtime(elt, start_dt, end_dt, 1, 0, 0, "system", "this downtime was automatically scheduled through a maintenance_period")
700 elt.add_downtime(dt)
701 self.add(dt)
702 self.get_and_register_status_brok(elt)
703 elt.in_maintenance = dt.id
704 else:
705 if not elt.in_maintenance in self.downtimes:
706 # the maint downtimes has expired or was manually deleted
707 elt.in_maintenance = False
709 #A loop where those downtimes are removed
710 #which were marked for deletion (mostly by dt.exit())
711 for dt in self.downtimes.values():
712 if dt.can_be_deleted == True:
713 ref = dt.ref
714 self.del_downtime(dt.id)
715 broks.append(ref.get_update_status_brok())
717 #Downtimes are usually accompanied by a comment.
718 #An exiting downtime also invalidates it's comment.
719 for c in self.comments.values():
720 if c.can_be_deleted == True:
721 ref = c.ref
722 self.del_comment(c.id)
723 broks.append(ref.get_update_status_brok())
725 #Check start and stop times
726 for dt in self.downtimes.values():
727 if dt.real_end_time < now:
728 #this one has expired
729 broks.extend(dt.exit()) # returns downtimestop notifications
730 elif now >= dt.start_time and dt.fixed and not dt.is_in_effect:
731 #this one has to start now
732 broks.extend(dt.enter()) # returns downtimestart notifications
733 broks.append(dt.ref.get_update_status_brok())
735 for b in broks:
736 self.add(b)
739 #Main schedule function to make the regular scheduling
740 def schedule(self):
741 #ask for service and hosts their next check
742 for type_tab in [self.services, self.hosts]:
743 for i in type_tab:
744 i.schedule()
747 #Main actions reaper function : it get all new checks,
748 #notification and event handler from hosts and services
749 def get_new_actions(self):
750 #ask for service and hosts their next check
751 for type_tab in [self.services, self.hosts]:
752 for i in type_tab:
753 for a in i.actions:
754 self.add(a)
755 #We take all, we can clear it
756 i.actions = []
759 #Same the just upper, but for broks
760 def get_new_broks(self):
761 #ask for service and hosts their broks waiting
762 #be eaten
763 for type_tab in [self.services, self.hosts]:
764 for i in type_tab:
765 for b in i.broks:
766 self.add(b)
767 #We take all, we can clear it
768 i.broks = []
771 #Raise checks for no fresh states for services and hosts
772 def check_freshness(self):
773 #print "********** Check freshnesh******"
774 for type_tab in [self.services, self.hosts]:
775 for i in type_tab:
776 c = i.do_check_freshness()
777 if c is not None:
778 self.add(c)
781 #Check for orphaned checks : checks that never returns back
782 #so if inpoller and t_to_go < now - 300s : pb!
783 def check_orphaned(self):
784 now = int(time.time())
785 for c in self.checks.values():
786 if c.status == 'inpoller' and c.t_to_go < now - 300:
787 logger.log("Warning : the results of check %d never came back. I'm reenable it for polling" % c.id)
788 c.status = 'scheduled'
789 for a in self.actions.values():
790 if a.status == 'inpoller' and a.t_to_go < now - 300:
791 logger.log("Warning : the results of action %d never came back. I'm reenable it for polling" % a.id)
792 a.status = 'scheduled'
798 #Main function
799 def run(self):
800 #Then we see if we've got info in the retention file
801 self.retention_load()
803 #Ok, now all is initilised, we can make the inital broks
804 self.fill_initial_broks()
806 logger.log("[%s] First scheduling launched" % self.instance_name)
807 self.schedule()
808 logger.log("[%s] First scheduling done" % self.instance_name)
809 #Ticks is for recurrent function call like consume
810 #del zombies etc
811 ticks = 0
812 timeout = 1.0 #For the select
814 gogogo = time.time()
815 self.t_each_loop = time.time() #use to track system time change
817 while self.must_run :
818 #Ok, still a difference between 3 and 4 ...
819 if shinken.pyro_wrapper.pyro_version == 3:
820 socks = self.daemon.getServerSockets()
821 else:
822 socks = self.daemon.sockets()
823 t_begin = time.time()
824 #socks.append(self.fifo)
825 # 'foreign' event loop
826 ins,outs,exs = select.select(socks,[],[],timeout)
827 if ins != []:
828 for s in socks:
829 if s in ins:
830 #Yes, even here there is a difference :)
831 if shinken.pyro_wrapper.pyro_version == 3:
832 self.daemon.handleRequests()
833 else:
834 self.daemon.handleRequests([s])
835 t_after = time.time()
836 diff = t_after-t_begin
837 timeout = timeout - diff
838 break # no need to continue with the for loop
839 else: #Timeout
840 timeout = 1.0
841 ticks += 1
843 #Do reccurent works like schedule, consume
844 #delete_zombie_checks
845 for i in self.recurrent_works:
846 (name, f, nb_ticks) = self.recurrent_works[i]
847 #A 0 in the tick will just disable it
848 if nb_ticks != 0:
849 if ticks % nb_ticks == 0:
850 #print "I run function :", name
853 #if ticks % 10 == 0:
854 # self.conf.quick_debug()
856 #stats
857 nb_scheduled = len([c for c in self.checks.values() if c.status=='scheduled'])
858 nb_inpoller = len([c for c in self.checks.values() if c.status=='inpoller'])
859 nb_zombies = len([c for c in self.checks.values() if c.status=='zombie'])
860 nb_notifications = len(self.actions)
862 print "Checks:", "total", len(self.checks), "scheduled", nb_scheduled, "inpoller", nb_inpoller, "zombies", nb_zombies, "notifications", nb_notifications
864 m = 0.0
865 m_nb = 0
866 for s in self.services:
867 m += s.latency
868 m_nb += 1
869 if m_nb != 0:
870 print "Average latency:", m, m_nb, m / m_nb
872 #print "Notifications:", nb_notifications
873 now = time.time()
874 #for a in self.actions.values():
875 # if a.is_a == 'notification':
876 # print "Notif:", a.id, a.type, a.status, a.ref.get_name(), a.ref.state, a.contact.get_name(), 'level:%d' % a.notif_nb, 'launch in', int(a.t_to_go - now)
877 # else:
878 # print "Event:", a.id, a.status
879 print "Nb checks send:", self.nb_checks_send
880 self.nb_checks_send = 0
881 print "Nb Broks send:", self.nb_broks_send
882 self.nb_broks_send = 0
884 time_elapsed = now - gogogo
885 print "Check average =", int(self.nb_check_received / time_elapsed), "checks/s"
887 #for n in self.actions.values():
888 # if n.ref_type == 'service':
889 # print 'Service notification', n
890 # if n.ref_type == 'host':
891 # print 'Host notification', n
892 #print "."
893 #print "Service still in checking?"
894 #for s in self.services:
895 # print s.get_name()+':'+str(s.in_checking)
896 # for i in s.checks_in_progress:
897 # print i, i.t_to_go
898 #for s in self.hosts:
899 # print s.get_name()+':'+str(s.in_checking)+str(s.checks_in_progress)
900 # for i in s.checks_in_progress:
901 # print i#self.checks[i]
903 #for c in self.checks.values():
904 # if c.ref_type == 'host':
905 # print c.id, ":", c.status, 'Depend_on_me:', len(c.depend_on_me), 'depend_on', c.depend_on
906 #hp=hpy()
907 #print hp.heap()
908 #print hp.heapu()
911 if timeout < 0:
912 timeout = 1.0