2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
28 import shinken
.pyro_wrapper
as pyro
29 from shinken
.pyro_wrapper
import Pyro
30 from shinken
.external_command
import ExternalCommand
31 from shinken
.check
import Check
32 from shinken
.notification
import Notification
33 from shinken
.eventhandler
import EventHandler
34 from shinken
.brok
import Brok
35 from shinken
.downtime
import Downtime
36 from shinken
.contactdowntime
import ContactDowntime
37 from shinken
.comment
import Comment
38 from shinken
.log
import logger
42 def __init__(self
, scheduler_daemon
):
43 self
.sched_daemon
= scheduler_daemon
44 # When set to false by us, we die and arbiter launch a new Scheduler
47 self
.waiting_results
= [] # satellites returns us results
48 # and for not waiting them, we are putting them here and
51 # Every N seconds we call functions like consume, del zombies
52 # etc. All of theses functions are in recurrent_works with the
53 # every tick to run. So must be integer and > 0
54 # The order is important, so make key a int.
55 # TODO : at load, change value by configuration one (like reaper time, etc)
56 self
.recurrent_works
= {
57 0 : ('update_downtimes_and_comments', self
.update_downtimes_and_comments
, 1),
58 1 : ('schedule', self
.schedule
, 1), # just schedule
59 2 : ('consume_results', self
.consume_results
, 1), # incorpore checks and dependancies
60 3 : ('get_new_actions', self
.get_new_actions
, 1), # now get the news actions (checks, notif) raised
61 4 : ('get_new_broks', self
.get_new_broks
, 1), # and broks
62 5 : ('delete_zombie_checks', self
.delete_zombie_checks
, 1),
63 6 : ('delete_zombie_actions', self
.delete_zombie_actions
, 1),
64 # 3 : (self.delete_unwanted_notifications, 1),
65 7 : ('check_freshness', self
.check_freshness
, 10),
66 8 : ('clean_caches', self
.clean_caches
, 1),
67 9 : ('update_retention_file', self
.update_retention_file
, 3600),
68 10 : ('check_orphaned', self
.check_orphaned
, 60),
69 # For NagVis like tools : udpdate our status every 10s
70 11 : ('get_and_register_update_program_status_brok', self
.get_and_register_update_program_status_brok
, 10),
71 # Check for system time change. And AFTER get new checks
72 # so they are changed too.
73 12 : ('check_for_system_time_change', self
.sched_daemon
.check_for_system_time_change
, 1),
74 # launch if need all internal checks
75 13 : ('manage_internal_checks', self
.manage_internal_checks
, 1),
79 self
.nb_checks_send
= 0
80 self
.nb_actions_send
= 0
81 self
.nb_broks_send
= 0
82 self
.nb_check_received
= 0
86 self
.log
.load_obj(self
)
88 self
.instance_id
= 0 # Temporary set. Will be erase later
94 self
.contact_downtimes
= {}
98 self
.has_full_broks
= False # have a initial_broks in broks queue?
103 del self
.waiting_results
[:]
104 for o
in self
.checks
, self
.actions
, self
.downtimes
, self
.contact_downtimes
, self
.comments
, self
.broks
:
109 # Load conf for future use
110 def load_conf(self
, conf
):
111 self
.program_start
= int(time
.time())
113 self
.hostgroups
= conf
.hostgroups
114 self
.hostgroups
.create_reversed_list()
115 self
.services
= conf
.services
116 # We need reversed list for search in the retention
118 self
.services
.create_reversed_list()
119 self
.services
.optimize_service_search(conf
.hosts
)
120 self
.hosts
= conf
.hosts
122 # for h in self.hosts:
123 # print h.get_name(), h.parents
124 self
.hosts
.create_reversed_list()
126 self
.notificationways
= conf
.notificationways
127 self
.contacts
= conf
.contacts
128 self
.contacts
.create_reversed_list()
129 self
.contactgroups
= conf
.contactgroups
130 self
.contactgroups
.create_reversed_list()
131 self
.servicegroups
= conf
.servicegroups
132 self
.servicegroups
.create_reversed_list()
133 self
.timeperiods
= conf
.timeperiods
134 self
.timeperiods
.create_reversed_list()
135 self
.commands
= conf
.commands
137 # self.status_file = StatusFile(self) # External status file
138 self
.instance_id
= conf
.instance_id
# From Arbiter. Use for
139 # Broker to disting betweens
141 # self for instance_name
142 self
.instance_name
= conf
.instance_name
144 # Now we can updte our 'ticks' for special calls
145 # like the retention one, etc
146 self
.update_recurrent_works_tick('update_retention_file', self
.conf
.retention_update_interval
)
149 # Update the 'tick' for a function call in our
151 def update_recurrent_works_tick(self
, f_name
, new_tick
):
152 for i
in self
.recurrent_works
:
153 (name
, f
, old_tick
) = self
.recurrent_works
[i
]
155 print "Changing the tick for the function", name
, new_tick
156 self
.recurrent_works
[i
] = (name
, f
, new_tick
)
159 # Load the pollers from our app master
160 def load_satellites(self
, pollers
, reactionners
):
161 self
.pollers
= pollers
162 self
.reactionners
= reactionners
165 # Oh... Arbiter want us to die... For launch a new Scheduler
166 # "Mais qu'a-t-il de plus que je n'ais pas?"
168 self
.must_run
= False
170 # Load the external commander
171 def load_external_command(self
, e
):
172 self
.external_command
= e
175 # We've got activity in the fifo, we get and run commands
176 def run_external_command(self
, command
):
177 print "scheduler resolves command", command
178 ext_cmd
= ExternalCommand(command
)
179 self
.external_command
.resolve_command(ext_cmd
)
182 def add_Brok(self
, brok
):
183 # For brok, we TAG brok with our instance_id
184 brok
.data
['instance_id'] = self
.instance_id
185 self
.broks
[brok
.id] = brok
187 def add_Notification(self
, notif
):
188 self
.actions
[notif
.id] = notif
189 # A notification ask for a brok
190 if notif
.contact
is not None:
191 b
= notif
.get_initial_status_brok()
194 def add_Check(self
, c
):
195 self
.checks
[c
.id] = c
196 # A new check mean the host/service change it's next_check
198 b
= c
.ref
.get_next_schedule_brok()
201 def add_EventHandler(self
, action
):
202 # print "Add an event Handler", elt.id
203 self
.actions
[action
.id] = action
205 def add_Downtime(self
, dt
):
206 self
.downtimes
[dt
.id] = dt
208 self
.add_Comment(dt
.extra_comment
)
210 def add_ContactDowntime(self
, contact_dt
):
211 self
.contact_downtimes
[contact_dt
.id] = contact_dt
213 def add_Comment(self
, comment
):
214 self
.comments
[comment
.id] = comment
215 b
= comment
.ref
.get_update_status_brok()
218 # Schedulers have some queues. We can simplify call by adding
219 # elements into the proper queue just by looking at their type
221 # Check -> self.checks
222 # Notification -> self.actions
223 # Downtime -> self.downtimes
224 # ContactDowntime -> self.contact_downtimes
226 f
= self
.__add
_actions
.get(elt
.__class
__, None)
228 #print("found action for %s : %s" % (elt.__class__.__name__, f.__name__))
234 Notification
: add_Notification
,
235 EventHandler
: add_EventHandler
,
236 Downtime
: add_Downtime
,
237 ContactDowntime
: add_ContactDowntime
,
242 # We call the function of modules that got the this
244 # TODO : find a way to merge this and the version in daemon.py
245 def hook_point(self
, hook_name
):
247 for inst
in self
.sched_daemon
.modules_manager
.instances
:
248 full_hook_name
= 'hook_' + hook_name
249 print inst
.get_name(), hasattr(inst
, full_hook_name
), hook_name
250 if hasattr(inst
, full_hook_name
):
251 f
= getattr(inst
, full_hook_name
)
253 print "Calling", full_hook_name
, "of", inst
.get_name()
255 #except Exception, exp:
256 # logger.log('The instance %s raise an exception %s. I kill it' % (inst.get_name(), str(exp)))
257 # to_del.append(inst)
259 #Now remove mod that raise an exception
260 self
.sched_daemon
.modules_manager
.clear_instances(to_del
)
263 # Ours queues may explode if noone ask us for elements
264 # It's very dangerous : you can crash your server... and it's a bad thing :)
265 # So we 'just' keep last elements : 2 of max is a good overhead
266 def clean_queues(self
):
267 max_checks
= 2 * (len(self
.hosts
) + len(self
.services
))
268 max_broks
= 2 * (len(self
.hosts
) + len(self
.services
))
269 max_actions
= 2* len(self
.contacts
) * (len(self
.hosts
) + len(self
.services
))
271 # For checks, it's not very simple:
272 # For checks, they may be referred to their host/service
273 # We do not just del them in checks, but also in their service/host
274 # We want id of less than max_id - 2*max_checks
275 if len(self
.checks
) > max_checks
:
276 id_max
= self
.checks
.keys()[-1] # The max id is the last id
278 to_del_checks
= [c
for c
in self
.checks
.values() if c
.id < id_max
- max_checks
]
279 nb_checks_drops
= len(to_del_checks
)
280 if nb_checks_drops
> 0:
281 print "I have to del some checks..., sorry", to_del_checks
282 for c
in to_del_checks
:
285 # First remove the link in host/service
286 elt
.remove_in_progress_check(c
)
287 # Then in dependant checks (I depend on, or check
289 for dependant_checks
in c
.depend_on_me
:
290 dependant_checks
.depend_on
.remove(c
.id)
291 for c_temp
in c
.depend_on
:
292 c_temp
.depen_on_me
.remove(c
)
293 del self
.checks
[i
] # Final Bye bye ...
297 # For broks and actions, it's more simple
298 if len(self
.broks
) > max_broks
:
299 id_max
= self
.broks
.keys()[-1]
300 id_to_del_broks
= [i
for i
in self
.broks
if i
< id_max
- max_broks
]
301 nb_broks_drops
= len(id_to_del_broks
)
302 for i
in id_to_del_broks
:
307 if len(self
.actions
) > max_actions
:
308 id_max
= self
.actions
.keys()[-1]
309 id_to_del_actions
= [i
for i
in self
.actions
if i
< id_max
- max_actions
]
310 nb_actions_drops
= len(id_to_del_actions
)
311 for i
in id_to_del_actions
:
312 # Remeber to delete reference of notification in service/host
313 if i
.is_a
== 'notification':
314 item
= self
.actions
[i
].ref
315 item
.remove_in_progress_notification(self
.actions
[i
])
320 if nb_checks_drops
!= 0 or nb_broks_drops
!= 0 or nb_actions_drops
!= 0:
321 print "WARNING: We drop %d checks, %d broks and %d actions" % (nb_checks_drops
, nb_broks_drops
, nb_actions_drops
)
322 logger
.log("WARNING: We drop %d checks, %d broks and %d actions" % (nb_checks_drops
, nb_broks_drops
, nb_actions_drops
))
325 # For tunning purpose we use caches but we do not whant them to explode
327 def clean_caches(self
):
328 # print "********** Clean caches *********"
329 for tp
in self
.timeperiods
:
333 # Ask item (host or service) a update_status
334 # and add it to our broks queue
335 def get_and_register_status_brok(self
, item
):
336 b
= item
.get_update_status_brok()
340 # Ask item (host or service) a check_result_brok
341 # and add it to our broks queue
342 def get_and_register_check_result_brok(self
, item
):
343 b
= item
.get_check_result_brok()
347 # We do not want this downtime id
348 def del_downtime(self
, dt_id
):
349 if dt_id
in self
.downtimes
:
350 self
.downtimes
[dt_id
].ref
.del_downtime(dt_id
)
351 del self
.downtimes
[dt_id
]
353 # We do not want this downtime id
354 def del_contact_downtime(self
, dt_id
):
355 if dt_id
in self
.contact_downtimes
:
356 self
.contact_downtimes
[dt_id
].ref
.del_downtime(dt_id
)
357 del self
.contact_downtimes
[dt_id
]
360 # We do not want this comment id
361 def del_comment(self
, c_id
):
362 if c_id
in self
.comments
:
363 self
.comments
[c_id
].ref
.del_comment(c_id
)
364 del self
.comments
[c_id
]
367 # Called by poller to get checks
368 # Can get checks and actions (notifications and co)
369 def get_to_run_checks(self
, do_checks
=False, do_actions
=False,
370 poller_tags
=['None'], reactionner_tags
=['None'], \
375 # If poller want to do checks
377 for c
in self
.checks
.values():
378 # If the command is untagged, and the poller too, or if both are taggued
379 # with same name, go for it
380 # if do_check, call for poller, and so poller_tags by default is ['None']
381 # by default poller_tag is 'None' and poller_tags is ['None']
382 if c
.poller_tag
in poller_tags
:
383 # must be ok to launch, and not an internal one (business rules based)
384 if c
.status
== 'scheduled' and c
.is_launchable(now
) and not c
.internal
:
385 c
.status
= 'inpoller'
386 c
.worker
= worker_name
387 # We do not send c, because it it link (c.ref) to
388 # host/service and poller do not need it. It just
389 # need a shell with id, command and defaults
390 # parameters. It's the goal of copy_shell
391 res
.append(c
.copy_shell())
393 # If reactionner want to notify too
395 for a
in self
.actions
.values():
396 # if do_action, call from reactionner, and so reactionner_tags by default is ['None']
397 # by default reactionner_tag is 'None' and ractioner_tags is ['None'] too
398 # So if not the good one, loop for next :)
399 if not a
.reactionner_tag
in reactionner_tags
:
402 # And now look for can launch or not :)
403 if a
.status
== 'scheduled' and a
.is_launchable(now
):
404 a
.status
= 'inpoller'
405 a
.worker
= worker_name
406 if a
.is_a
== 'notification' and not a
.contact
:
407 # This is a "master" notification created by create_notifications.
408 # It will not be sent itself because it has no contact.
409 # We use it to create "child" notifications (for the contacts and
410 # notification_commands) which are executed in the reactionner.
412 childnotifications
= []
413 if not item
.notification_is_blocked_by_item(a
.type, now
):
414 # If it is possible to send notifications of this type at the current time, then create
415 # a single notification for each contact of this item.
416 childnotifications
= item
.scatter_notification(a
)
417 for c
in childnotifications
:
418 c
.status
= 'inpoller'
419 self
.add(c
) # this will send a brok
420 new_c
= c
.copy_shell()
423 # If we have notification_interval then schedule the next notification (problems only)
424 if a
.type == 'PROBLEM':
425 # Update the ref notif number after raise the one of the notification
426 if len(childnotifications
) != 0:
427 # notif_nb of the master notification was already current_notification_number+1.
428 # If notifications were sent, then host/service-counter will also be incremented
429 item
.current_notification_number
= a
.notif_nb
431 if item
.notification_interval
!= 0 and a
.t_to_go
is not None:
432 # We must continue to send notifications.
433 # Just leave it in the actions list and set it to "scheduled" and it will be found again later
434 #a.t_to_go = a.t_to_go + item.notification_interval * item.__class__.interval_length
435 # Ask the service/host to compute the next notif time. It can be just
436 # a.t_to_go + item.notification_interval * item.__class__.interval_length
437 # or maybe before because we have an escalation that need to raise up before
438 a
.t_to_go
= item
.get_next_notification_time(a
)
440 a
.notif_nb
= item
.current_notification_number
+ 1
441 a
.status
= 'scheduled'
443 # Wipe out this master notification. One problem notification is enough.
444 item
.remove_in_progress_notification(a
)
445 self
.actions
[a
.id].status
= 'zombie'
448 # Wipe out this master notification. We don't repeat recover/downtime/flap/etc...
449 item
.remove_in_progress_notification(a
)
450 self
.actions
[a
.id].status
= 'zombie'
452 # This is for child notifications and eventhandlers
453 new_a
= a
.copy_shell()
458 # Called by poller and reactionner to send result
459 def put_results(self
, c
):
460 if c
.is_a
== 'notification':
461 # We will only see childnotifications here
463 self
.actions
[c
.id].get_return_from(c
)
464 item
= self
.actions
[c
.id].ref
465 item
.remove_in_progress_notification(c
)
466 self
.actions
[c
.id].status
= 'zombie'
467 item
.last_notification
= c
.check_time
468 #If we' ve got a problem with the notification, raise a Warning log
469 if c
.exit_status
!= 0:
470 logger
.log("Warning : the notification command '%s' raised an error (exit code=%d) : '%s'" % (c
.command
, c
.exit_status
, c
.output
))
471 except KeyError , exp
:
472 logger
.log("Warning : received an notification of an unknown id! %s" % str(exp
))
474 elif c
.is_a
== 'check':
476 self
.checks
[c
.id].get_return_from(c
)
477 self
.checks
[c
.id].status
= 'waitconsume'
478 except KeyError , exp
:
479 logger
.log("Warning : received an check of an unknown id! %s" % str(exp
))
480 elif c
.is_a
== 'eventhandler':
483 self
.actions
[c
.id].status
= 'zombie'
484 # Maybe we reveied a return of a old even handler, so we can forget it
488 logger
.log("Error : the received result type in unknown ! %s" % str(c
.is_a
))
492 # Get teh good tabs for links by the kind. If unknown, return None
493 def get_links_from_type(self
, type):
494 t
= { 'poller' : self
.pollers
, 'reactionner' : self
.reactionners
}
500 # Check if we do not connect to ofthen to this
501 def is_connexion_try_too_close(self
, elt
):
503 last_connexion
= elt
['last_connexion']
504 if now
- last_connexion
< 5:
509 # initialise or re-initialise connexion with a poller
511 def pynag_con_init(self
, id, type='poller'):
512 # Get teh good links tab for looping..
513 links
= self
.get_links_from_type(type)
515 logger
.log('DBG: Type unknown for connexion! %s' % type)
518 # We want only to initiate connexions to the passive
519 # pollers and reactionners
520 passive
= links
[id]['passive']
524 # If we try to connect too much, we slow down our tests
525 if self
.is_connexion_try_too_close(links
[id]):
528 # Ok, we can now update it
529 links
[id]['last_connexion'] = time
.time()
531 print "Init connexion with", links
[id]['uri']
533 uri
= links
[id]['uri']
534 links
[id]['con'] = Pyro
.core
.getProxyForURI(uri
)
535 con
= links
[id]['con']
538 # intial ping must be quick
539 pyro
.set_timeout(con
, 5)
541 except Pyro
.errors
.ProtocolError
, exp
:
542 logger
.log("[] Connexion problem to the %s %s : %s" % (type, links
[id]['name'], str(exp
)))
543 links
[id]['con'] = None
545 except Pyro
.errors
.NamingError
, exp
:
546 logger
.log("[] the %s '%s' is not initilised : %s" % (type, links
[id]['name'], str(exp
)))
547 links
[id]['con'] = None
549 except KeyError , exp
:
550 logger
.log("[] the %s '%s' is not initilised : %s" % (type, links
[id]['name'], str(exp
)))
551 links
[id]['con'] = None
552 traceback
.print_stack()
554 except Pyro
.errors
.CommunicationError
, exp
:
555 logger
.log("[] the %s '%s' got CommunicationError : %s" % (type, links
[id]['name'], str(exp
)))
556 links
[id]['con'] = None
559 logger
.log("[] Connexion OK to the %s %s" % (type, links
[id]['name']))
562 # We should push actions to our passives satellites
563 def push_actions_to_passives_satellites(self
):
564 # We loop for our passive pollers or reactionners
565 for p
in filter(lambda p
: p
['passive'], self
.pollers
.values()):
566 print "I will send actions to the poller", p
568 poller_tags
= p
['poller_tags']
571 lst
= self
.get_to_run_checks(True, False, poller_tags
, worker_name
=p
['name'])
573 # intial ping must be quick
574 pyro
.set_timeout(con
, 120)
575 print "Sending", len(lst
), "actions"
576 con
.push_actions(lst
, self
.instance_id
)
577 self
.nb_checks_send
+= len(lst
)
578 except Pyro
.errors
.ProtocolError
, exp
:
579 logger
.log("[] Connexion problem to the %s %s : %s" % (type, p
['name'], str(exp
)))
582 except Pyro
.errors
.NamingError
, exp
:
583 logger
.log("[] the %s '%s' is not initilised : %s" % (type, p
['name'], str(exp
)))
586 except KeyError , exp
:
587 logger
.log("[] the %s '%s' is not initilised : %s" % (type, p
['name'], str(exp
)))
589 traceback
.print_stack()
591 except Pyro
.errors
.CommunicationError
, exp
:
592 logger
.log("[] the %s '%s' got CommunicationError : %s" % (type, p
['name'], str(exp
)))
595 #we came back to normal timeout
596 pyro
.set_timeout(con
, 5)
597 else : # no connexion? try to reconnect
598 self
.pynag_con_init(p
['instance_id'], type='poller')
601 # We loop for our passive reactionners
602 for p
in filter(lambda p
: p
['passive'], self
.reactionners
.values()):
603 print "I will send actions to the reactionner", p
605 reactionner_tags
= p
['reactionner_tags']
608 lst
= self
.get_to_run_checks(False, True, reactionner_tags
=reactionner_tags
, worker_name
=p
['name'])
610 # intial ping must be quick
611 pyro
.set_timeout(con
, 120)
612 print "Sending", len(lst
), "actions"
613 con
.push_actions(lst
, self
.instance_id
)
614 self
.nb_checks_send
+= len(lst
)
615 except Pyro
.errors
.ProtocolError
, exp
:
616 logger
.log("[] Connexion problem to the %s %s : %s" % (type, p
['name'], str(exp
)))
619 except Pyro
.errors
.NamingError
, exp
:
620 logger
.log("[] the %s '%s' is not initilised : %s" % (type, p
['name'], str(exp
)))
623 except KeyError , exp
:
624 logger
.log("[] the %s '%s' is not initilised : %s" % (type, p
['name'], str(exp
)))
626 traceback
.print_stack()
628 except Pyro
.errors
.CommunicationError
, exp
:
629 logger
.log("[] the %s '%s' got CommunicationError : %s" % (type, p
['name'], str(exp
)))
632 #we came back to normal timeout
633 pyro
.set_timeout(con
, 5)
634 else : # no connexion? try to reconnect
635 self
.pynag_con_init(p
['instance_id'], type='reactionner')
639 # We should get returns from satellites
640 def get_actions_from_passives_satellites(self
):
641 # We loop for our passive pollers
642 for p
in filter(lambda p
: p
['passive'], self
.pollers
.values()):
643 print "I will get actions from the poller", p
645 poller_tags
= p
['poller_tags']
648 # intial ping must be quick
649 pyro
.set_timeout(con
, 120)
650 results
= con
.get_returns(self
.instance_id
)
651 nb_received
= len(results
)
652 self
.nb_check_received
+= nb_received
653 print "Received %d passive results" % nb_received
654 self
.waiting_results
.extend(results
)
655 except Pyro
.errors
.ProtocolError
, exp
:
656 logger
.log("[] Connexion problem to the %s %s : %s" % (type, p
['name'], str(exp
)))
659 except Pyro
.errors
.NamingError
, exp
:
660 logger
.log("[] the %s '%s' is not initilised : %s" % (type, p
['name'], str(exp
)))
663 except KeyError , exp
:
664 logger
.log("[] the %s '%s' is not initilised : %s" % (type, p
['name'], str(exp
)))
666 traceback
.print_stack()
668 except Pyro
.errors
.CommunicationError
, exp
:
669 logger
.log("[] the %s '%s' got CommunicationError : %s" % (type, p
['name'], str(exp
)))
672 #we came back to normal timeout
673 pyro
.set_timeout(con
, 5)
674 else: # no connexion, try reinit
675 self
.pynag_con_init(p
['instance_id'], type='poller')
677 # We loop for our passive reactionners
678 for p
in filter(lambda p
: p
['passive'], self
.reactionners
.values()):
679 print "I will get actions from the reactionner", p
681 reactionner_tags
= p
['reactionner_tags']
684 # intial ping must be quick
685 pyro
.set_timeout(con
, 120)
686 results
= con
.get_returns(self
.instance_id
)
687 nb_received
= len(results
)
688 self
.nb_check_received
+= nb_received
689 print "Received %d passive results" % nb_received
690 self
.waiting_results
.extend(results
)
691 except Pyro
.errors
.ProtocolError
, exp
:
692 logger
.log("[] Connexion problem to the %s %s : %s" % (type, p
['name'], str(exp
)))
695 except Pyro
.errors
.NamingError
, exp
:
696 logger
.log("[] the %s '%s' is not initilised : %s" % (type, p
['name'], str(exp
)))
699 except KeyError , exp
:
700 logger
.log("[] the %s '%s' is not initilised : %s" % (type, p
['name'], str(exp
)))
702 traceback
.print_stack()
704 except Pyro
.errors
.CommunicationError
, exp
:
705 logger
.log("[] the %s '%s' got CommunicationError : %s" % (type, p
['name'], str(exp
)))
708 #we came back to normal timeout
709 pyro
.set_timeout(con
, 5)
710 else: # no connexion, try reinit
711 self
.pynag_con_init(p
['instance_id'], type='reactionner')
715 # Some checks are purely internal, like business based one
716 # simply ask their ref to manage it when it's ok to run
717 def manage_internal_checks(self
):
719 for c
in self
.checks
.values():
720 # must be ok to launch, and not an internal one (business rules based)
721 if c
.status
== 'scheduled' and c
.is_launchable(now
) and c
.internal
:
722 c
.ref
.manage_internal_check(c
)
723 # it manage it, now just ask to consume it
724 # like for all checks
725 c
.status
= 'waitconsume'
729 # Call by brokers to have broks
730 # We give them, and clean them!
733 # They are gone, we keep none!
735 # print "returning broks"
741 # Update the retention file and give it all of ours data in
742 # a dict so read can pickup what it wants
743 # For now compression is no use, but it can be add easylly
745 def update_retention_file(self
, forced
=False):
746 # If we set the update to 0, we do not want of this
747 # if we do not forced (like at stopping)
748 if self
.conf
.retention_update_interval
== 0 and not forced
:
751 self
.hook_point('save_retention')
755 # Load the retention file and get status from it. It do not get all checks in progress
756 # for the moment, just the status and the notifications.
757 def retention_load(self
):
758 self
.hook_point('load_retention')
762 # Helper function for module, will give our host and service
764 def get_retention_data(self
):
765 # We create a all_data dict with lsit of dict of retention useful
766 # data of our hosts and services
767 all_data
= {'hosts' : {}, 'services' : {}}
770 running_properties
= h
.__class
__.running_properties
771 for prop
, entry
in running_properties
.items():
773 d
[prop
] = getattr(h
, prop
)
774 all_data
['hosts'][h
.host_name
] = d
776 #Now same for services
777 for s
in self
.services
:
779 running_properties
= s
.__class
__.running_properties
780 for prop
, entry
in running_properties
.items():
782 d
[prop
] = getattr(s
, prop
)
783 all_data
['services'][(s
.host
.host_name
, s
.service_description
)] = d
787 # Get back our broks from a retention module :)
788 def restore_retention_data(self
, data
):
789 #Now load interesting properties in hosts/services
790 #Taging retention=False prop that not be directly load
791 #Items will be with theirs status, but not in checking, so
792 #a new check will be launch like with a normal begining (random distributed
795 ret_hosts
= data
['hosts']
796 for ret_h_name
in ret_hosts
:
797 # We take the dict of our value to load
798 d
= data
['hosts'][ret_h_name
]
799 h
= self
.hosts
.find_by_name(ret_h_name
)
801 running_properties
= h
.__class
__.running_properties
802 for prop
, entry
in running_properties
.items():
804 # Mayeb the save was not with this value, so
805 # we just bypass this
807 setattr(h
, prop
, d
[prop
])
808 for a
in h
.notifications_in_progress
.values():
811 h
.update_in_checking()
812 #And also add downtimes and comments
813 for dt
in h
.downtimes
:
815 if hasattr(dt
, 'extra_comment'):
816 dt
.extra_comment
.ref
= h
818 dt
.extra_comment
= None
823 if h
.acknowledgement
is not None:
824 h
.acknowledgement
.ref
= h
827 ret_services
= data
['services']
828 for (ret_s_h_name
, ret_s_desc
) in ret_services
:
829 #We take the dict of our value to load
830 d
= data
['services'][(ret_s_h_name
, ret_s_desc
)]
831 s
= self
.services
.find_srv_by_name_and_hostname(ret_s_h_name
, ret_s_desc
)
833 running_properties
= s
.__class
__.running_properties
834 for prop
, entry
in running_properties
.items():
836 # Mayeb the save was not with this value, so
837 # we just bypass this
839 setattr(s
, prop
, d
[prop
])
840 for a
in s
.notifications_in_progress
.values():
843 s
.update_in_checking()
844 #And also add downtimes and comments
845 for dt
in s
.downtimes
:
847 if hasattr(dt
, 'extra_comment'):
848 dt
.extra_comment
.ref
= s
850 dt
.extra_comment
= None
855 if s
.acknowledgement
is not None:
856 s
.acknowledgement
.ref
= s
861 # Fill the self.broks with broks of self (process id, and co)
862 # broks of service and hosts (initial status)
863 def fill_initial_broks(self
):
864 # First a Brok for delete all from my instance_id
865 b
= Brok('clean_all_my_instance_id', {'instance_id' : self
.instance_id
})
868 # first the program status
869 b
= self
.get_program_status_brok()
872 # We can't initial_status from all this types
873 # The order is important, service need host...
874 initial_status_types
= ( self
.timeperiods
, self
.commands
,
875 self
.contacts
, self
.contactgroups
,
876 self
.hosts
, self
.hostgroups
,
877 self
.services
, self
.servicegroups
)
879 for tab
in initial_status_types
:
881 b
= i
.get_initial_status_brok()
884 # We now have all full broks
885 self
.has_full_broks
= True
887 logger
.log("[%s] Created initial Broks: %d" % (self
.instance_name
, len(self
.broks
)))
890 # Crate a brok with program status info
891 def get_and_register_program_status_brok(self
):
892 b
= self
.get_program_status_brok()
896 # Crate a brok with program status info
897 def get_and_register_update_program_status_brok(self
):
898 b
= self
.get_program_status_brok()
899 b
.type = 'update_program_status'
903 # Get a brok with program status
904 # TODO : GET REAL VALUES
905 def get_program_status_brok(self
):
906 now
= int(time
.time())
907 data
= {"is_running" : 1,
908 "instance_id" : self
.instance_id
,
909 "instance_name": self
.instance_name
,
911 "program_start" : self
.program_start
,
914 "last_command_check" : now
,
915 "last_log_rotation" : now
,
916 "notifications_enabled" : self
.conf
.enable_notifications
,
917 "active_service_checks_enabled" : self
.conf
.execute_service_checks
,
918 "passive_service_checks_enabled" : self
.conf
.accept_passive_service_checks
,
919 "active_host_checks_enabled" : self
.conf
.execute_host_checks
,
920 "passive_host_checks_enabled" : self
.conf
.accept_passive_host_checks
,
921 "event_handlers_enabled" : self
.conf
.enable_event_handlers
,
922 "flap_detection_enabled" : self
.conf
.enable_flap_detection
,
923 "failure_prediction_enabled" : 0,
924 "process_performance_data" : self
.conf
.process_performance_data
,
925 "obsess_over_hosts" : self
.conf
.obsess_over_hosts
,
926 "obsess_over_services" : self
.conf
.obsess_over_services
,
927 "modified_host_attributes" : 0,
928 "modified_service_attributes" : 0,
929 "global_host_event_handler" : self
.conf
.global_host_event_handler
,
930 'global_service_event_handler' : self
.conf
.global_service_event_handler
,
931 'command_file' : self
.conf
.command_file
933 b
= Brok('program_status', data
)
938 # Called every 1sec to consume every result in services or hosts
939 # with theses results, they are OK, CRITCAL, UP/DOWN, etc...
940 def consume_results(self
):
941 #All results are in self.waiting_results
942 #We need to get them first
943 for c
in self
.waiting_results
:
945 self
.waiting_results
= []
947 #Then we consume them
948 #print "**********Consume*********"
949 for c
in self
.checks
.values():
950 if c
.status
== 'waitconsume':
952 item
.consume_result(c
)
955 # All 'finished' checks (no more dep) raise checks they depends on
956 for c
in self
.checks
.values():
957 if c
.status
== 'havetoresolvedep':
958 for dependant_checks
in c
.depend_on_me
:
959 # Ok, now dependant will no more wait c
960 dependant_checks
.depend_on
.remove(c
.id)
961 # REMOVE OLD DEP CHECL -> zombie
964 # Now, reinteger dep checks
965 for c
in self
.checks
.values():
966 if c
.status
== 'waitdep' and len(c
.depend_on
) == 0:
968 item
.consume_result(c
)
972 # Called every 1sec to delete all checks in a zombie state
973 # zombie = not usefull anymore
974 def delete_zombie_checks(self
):
975 #print "**********Delete zombies checks****"
977 for c
in self
.checks
.values():
978 if c
.status
== 'zombie':
979 id_to_del
.append(c
.id)
980 # une petite tape dans le dot et tu t'en vas, merci...
982 del self
.checks
[id] # ZANKUSEN!
985 # Called every 1sec to delete all actions in a zombie state
986 # zombie = not usefull anymore
987 def delete_zombie_actions(self
):
988 #print "**********Delete zombies actions****"
990 for a
in self
.actions
.values():
991 if a
.status
== 'zombie':
992 id_to_del
.append(a
.id)
993 # une petite tape dans le doc et tu t'en vas, merci...
995 del self
.actions
[id] # ZANKUSEN!
998 # Check for downtimes start and stop, and register
1000 def update_downtimes_and_comments(self
):
1004 # Check maintenance periods
1005 for elt
in [y
for y
in [x
for x
in self
.hosts
] + [x
for x
in self
.services
] if y
.maintenance_period
is not None]:
1006 if not hasattr(elt
, 'in_maintenance'):
1007 setattr(elt
, 'in_maintenance', False)
1008 if not elt
.in_maintenance
:
1009 if elt
.maintenance_period
.is_time_valid(now
):
1010 start_dt
= elt
.maintenance_period
.get_next_valid_time_from_t(now
)
1011 end_dt
= elt
.maintenance_period
.get_next_invalid_time_from_t(start_dt
+ 1) - 1
1012 dt
= Downtime(elt
, start_dt
, end_dt
, 1, 0, 0, "system", "this downtime was automatically scheduled through a maintenance_period")
1013 elt
.add_downtime(dt
)
1015 self
.get_and_register_status_brok(elt
)
1016 elt
.in_maintenance
= dt
.id
1018 if not elt
.in_maintenance
in self
.downtimes
:
1019 # the maint downtimes has expired or was manually deleted
1020 elt
.in_maintenance
= False
1022 # Check the validity of contact downtimes
1023 for elt
in self
.contacts
:
1024 for dt
in elt
.downtimes
:
1025 dt
.check_activation()
1027 # A loop where those downtimes are removed
1028 # which were marked for deletion (mostly by dt.exit())
1029 for dt
in self
.downtimes
.values():
1030 if dt
.can_be_deleted
== True:
1032 self
.del_downtime(dt
.id)
1033 broks
.append(ref
.get_update_status_brok())
1035 # Same for contact downtimes:
1036 for dt
in self
.contact_downtimes
.values():
1037 if dt
.can_be_deleted
== True:
1039 self
.del_contact_downtime(dt
.id)
1040 broks
.append(ref
.get_update_status_brok())
1042 # Downtimes are usually accompanied by a comment.
1043 # An exiting downtime also invalidates it's comment.
1044 for c
in self
.comments
.values():
1045 if c
.can_be_deleted
== True:
1047 self
.del_comment(c
.id)
1048 broks
.append(ref
.get_update_status_brok())
1050 # Check start and stop times
1051 for dt
in self
.downtimes
.values():
1052 if dt
.real_end_time
< now
:
1053 # this one has expired
1054 broks
.extend(dt
.exit()) # returns downtimestop notifications
1055 elif now
>= dt
.start_time
and dt
.fixed
and not dt
.is_in_effect
:
1056 # this one has to start now
1057 broks
.extend(dt
.enter()) # returns downtimestart notifications
1058 broks
.append(dt
.ref
.get_update_status_brok())
1064 # Main schedule function to make the regular scheduling
1066 # ask for service and hosts their next check
1067 for type_tab
in [self
.services
, self
.hosts
]:
1072 # Main actions reaper function : it get all new checks,
1073 # notification and event handler from hosts and services
1074 def get_new_actions(self
):
1075 # ask for service and hosts their next check
1076 for type_tab
in [self
.services
, self
.hosts
]:
1080 # We take all, we can clear it
1084 # Same the just upper, but for broks
1085 def get_new_broks(self
):
1086 # ask for service and hosts their broks waiting
1088 for type_tab
in [self
.services
, self
.hosts
]:
1092 # We take all, we can clear it
1096 # Raise checks for no fresh states for services and hosts
1097 def check_freshness(self
):
1098 #print "********** Check freshnesh******"
1099 for type_tab
in [self
.services
, self
.hosts
]:
1101 c
= i
.do_check_freshness()
1106 # Check for orphaned checks : checks that never returns back
1107 # so if inpoller and t_to_go < now - 300s : pb!
1108 # Warn only one time for each "worker"
1109 def check_orphaned(self
):
1111 now
= int(time
.time())
1112 for c
in self
.checks
.values():
1113 if c
.status
== 'inpoller' and c
.t_to_go
< now
- 300:
1114 c
.status
= 'scheduled'
1115 if c
.worker
not in worker_names
:
1116 worker_names
[c
.worker
] = 1
1118 worker_names
[c
.worker
] += 1
1119 for a
in self
.actions
.values():
1120 if a
.status
== 'inpoller' and a
.t_to_go
< now
- 300:
1121 a
.status
= 'scheduled'
1122 if a
.worker
not in worker_names
:
1123 worker_names
[a
.worker
] = 1
1125 worker_names
[a
.worker
] += 1
1127 for w
in worker_names
:
1128 logger
.log("Warning : %d actions never came back for the satellite '%s'. I'm reenable them for polling" % (worker_names
[w
], w
))
1133 # Then we see if we've got info in the retention file
1134 self
.retention_load()
1136 # Ok, now all is initilised, we can make the inital broks
1137 self
.fill_initial_broks()
1139 logger
.log("[%s] First scheduling launched" % self
.instance_name
)
1141 logger
.log("[%s] First scheduling done" % self
.instance_name
)
1143 # Now connect to the passive satellites if need
1144 for p_id
in self
.pollers
:
1145 self
.pynag_con_init(p_id
, type='poller')
1147 # Ticks is for recurrent function call like consume
1150 timeout
= 1.0 # For the select
1152 gogogo
= time
.time()
1154 while self
.must_run
:
1156 elapsed
, _
, _
= self
.sched_daemon
.handleRequests(timeout
)
1162 # Timeout or time over
1166 # Do reccurent works like schedule, consume
1167 # delete_zombie_checks
1168 for i
in self
.recurrent_works
:
1169 (name
, f
, nb_ticks
) = self
.recurrent_works
[i
]
1170 # A 0 in the tick will just disable it
1172 if ticks
% nb_ticks
== 0:
1173 # print "I run function :", name
1176 #DBG : push actions to passives?
1177 self
.push_actions_to_passives_satellites()
1178 self
.get_actions_from_passives_satellites()
1181 #if ticks % 10 == 0:
1182 # self.conf.quick_debug()
1185 nb_scheduled
= len([c
for c
in self
.checks
.values() if c
.status
=='scheduled'])
1186 nb_inpoller
= len([c
for c
in self
.checks
.values() if c
.status
=='inpoller'])
1187 nb_zombies
= len([c
for c
in self
.checks
.values() if c
.status
=='zombie'])
1188 nb_notifications
= len(self
.actions
)
1190 print "Checks:", "total", len(self
.checks
), "scheduled", nb_scheduled
, "inpoller", nb_inpoller
, "zombies", nb_zombies
, "notifications", nb_notifications
1194 for s
in self
.services
:
1198 print "Average latency:", m
, m_nb
, m
/ m_nb
1200 # print "Notifications:", nb_notifications
1202 #for a in self.actions.values():
1203 # if a.is_a == 'notification':
1204 # print "Notif:", a.id, a.type, a.status, a.ref.get_name(), a.ref.state, a.contact.get_name(), 'level:%d' % a.notif_nb, 'launch in', int(a.t_to_go - now)
1206 # print "Event:", a.id, a.status
1207 print "Nb checks send:", self
.nb_checks_send
1208 self
.nb_checks_send
= 0
1209 print "Nb Broks send:", self
.nb_broks_send
1210 self
.nb_broks_send
= 0
1212 time_elapsed
= now
- gogogo
1213 print "Check average =", int(self
.nb_check_received
/ time_elapsed
), "checks/s"
1215 #for n in self.actions.values():
1216 # if n.ref_type == 'service':
1217 # print 'Service notification', n
1218 # if n.ref_type == 'host':
1219 # print 'Host notification', n
1221 #print "Service still in checking?"
1222 #for s in self.services:
1223 # print s.get_name()+':'+str(s.in_checking)
1224 # for i in s.checks_in_progress:
1225 # print i, i.t_to_go
1226 #for s in self.hosts:
1227 # print s.get_name()+':'+str(s.in_checking)+str(s.checks_in_progress)
1228 # for i in s.checks_in_progress:
1229 # print i#self.checks[i]
1231 #for c in self.checks.values():
1232 # if c.ref_type == 'host':
1233 # print c.id, ":", c.status, 'Depend_on_me:', len(c.depend_on_me), 'depend_on', c.depend_on
1238 # WE must save the retention at the quit BY OURSELF
1239 # because our daemon will not be able to do so for us
1240 self
.update_retention_file(True)