Fix : fix hot module under windows test. (at leat I hope...)
[shinken.git] / test / test_notifications.py
blob914ae5718cc557f5f66d721993b9897978725b5b
1 #!/usr/bin/env python2.6
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 # This file is used to test host- and service-downtimes.
26 import time
28 from shinken_test import unittest, ShinkenTest
31 class TestConfig(ShinkenTest):
33 def test_continuous_notifications(self):
34 self.print_header()
35 # retry_interval 2
36 # critical notification
37 # run loop -> another notification
38 now = time.time()
39 host = self.sched.hosts.find_by_name("test_host_0")
40 host.checks_in_progress = []
41 host.act_depend_of = [] # ignore the router
42 svc = self.sched.services.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
44 #To make tests quicker we make notifications send very quickly
45 svc.notification_interval = 0.001
47 svc.checks_in_progress = []
48 svc.act_depend_of = [] # no hostchecks on critical checkresults
49 #--------------------------------------------------------------
50 # initialize host/service state
51 #--------------------------------------------------------------
52 self.scheduler_loop(1, [[host, 0, 'UP']], do_sleep=True, sleep_time=0.1)
53 print "- 1 x OK -------------------------------------"
54 self.scheduler_loop(1, [[svc, 0, 'OK']], do_sleep=True, sleep_time=0.1)
56 self.assert_(svc.current_notification_number == 0)
57 #--------------------------------------------------------------
58 # service reaches soft;1
59 # there must not be any notification
60 #--------------------------------------------------------------
61 print "- 1 x BAD get soft -------------------------------------"
62 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
63 # check_notification: not (soft)
64 print "---current_notification_number", svc.current_notification_number
65 #--------------------------------------------------------------
66 # service reaches hard;2
67 # a notification must have been created
68 # notification number must be 1
69 #--------------------------------------------------------------
70 print "- 1 x BAD get hard -------------------------------------"
71 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
72 self.show_and_clear_logs()
73 #self.show_and_clear_actions()
74 self.show_actions()
75 print svc.notifications_in_progress
76 for n in svc.notifications_in_progress.values():
77 print n
78 # check_notification: yes (hard)
79 print "---current_notification_number", svc.current_notification_number
80 # notification_number is already sent. the next one has been scheduled
81 # and is waiting for notification_interval to pass. so the current
82 # number is 2
83 self.assert_(svc.current_notification_number == 1)
84 print "---------------------------------1st round with a hard"
85 print "find a way to get the number of the last reaction"
86 cnn = svc.current_notification_number
87 print "- 5 x BAD repeat -------------------------------------"
88 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
89 self.show_and_clear_logs()
90 self.show_actions()
91 print "cnn and cur", cnn, svc.current_notification_number
92 self.assert_(svc.current_notification_number > cnn)
93 cnn = svc.current_notification_number
94 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
95 self.show_and_clear_logs()
96 self.show_actions()
97 print "svc.current_notification_number, cnn", svc.current_notification_number, cnn
98 self.assert_(svc.current_notification_number > cnn)
99 #--------------------------------------------------------------
100 # 2 cycles = 2 minutes = 2 new notifications
101 #--------------------------------------------------------------
102 cnn = svc.current_notification_number
103 self.scheduler_loop(2, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
104 self.show_and_clear_logs()
105 self.show_actions()
106 print "svc.current_notification_number, cnn", svc.current_notification_number, cnn
107 self.assert_(svc.current_notification_number > cnn)
108 #--------------------------------------------------------------
109 # 2 cycles = 2 minutes = 2 new notifications (theoretically)
110 # BUT: test_contact filters notifications
111 # we do not raise current_notification_number if no mail was sent
112 #--------------------------------------------------------------
113 now = time.time()
114 cmd = "[%lu] DISABLE_CONTACT_SVC_NOTIFICATIONS;test_contact" % now
115 self.sched.run_external_command(cmd)
116 cnn = svc.current_notification_number
117 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
118 self.show_and_clear_logs()
119 self.show_actions()
120 self.assert_(svc.current_notification_number == cnn)
121 #--------------------------------------------------------------
122 # again a normal cycle
123 # test_contact receives his mail
124 #--------------------------------------------------------------
125 now = time.time()
126 cmd = "[%lu] ENABLE_CONTACT_SVC_NOTIFICATIONS;test_contact" % now
127 self.sched.run_external_command(cmd)
128 #cnn = svc.current_notification_number
129 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
130 self.show_and_clear_logs()
131 self.show_actions()
132 print "svc.current_notification_number, cnn", svc.current_notification_number, cnn
133 self.assert_(svc.current_notification_number == cnn + 1)
134 #--------------------------------------------------------------
135 # now recover. there must be no scheduled/inpoller notification
136 #--------------------------------------------------------------
137 self.scheduler_loop(1, [[svc, 0, 'GOOD']], do_sleep=True, sleep_time=0.1)
138 self.show_and_clear_logs()
139 self.show_and_clear_actions()
140 self.assert_(svc.current_notification_number == 0)
143 def test_continuous_notifications_delayed(self):
144 self.print_header()
145 # retry_interval 2
146 # critical notification
147 # run loop -> another notification
148 now = time.time()
149 host = self.sched.hosts.find_by_name("test_host_0")
150 host.checks_in_progress = []
151 host.act_depend_of = [] # ignore the router
152 svc = self.sched.services.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
154 #To make tests quicker we make notifications send very quickly
155 svc.notification_interval = 0.001 #and send imediatly then
157 svc.first_notification_delay = 0.1 # set 6s for first notif delay
158 svc.checks_in_progress = []
159 svc.act_depend_of = [] # no hostchecks on critical checkresults
160 self.scheduler_loop(1, [[host, 0, 'UP']], do_sleep=True, sleep_time=1)
161 #-----------------------------------------------------------------
162 # initialize with a good check. there must be no pending notification
163 #-----------------------------------------------------------------
164 self.scheduler_loop(1, [[svc, 0, 'OK']], do_sleep=True, sleep_time=1)
165 self.show_and_clear_logs()
166 self.show_and_clear_actions()
167 self.assert_(svc.current_notification_number == 0)
168 #-----------------------------------------------------------------
169 # check fails and enters soft state.
170 # there must be no notification, only the event handler
171 #-----------------------------------------------------------------
172 self.scheduler_loop(1, [[svc, 1, 'BAD']], do_sleep=True, sleep_time=1)
173 self.assert_(self.count_actions() == 1)
174 print time.time()
175 print svc.last_time_warning, svc.last_time_critical, svc.last_time_unknown, svc.last_time_ok
176 last_time_not_ok = svc.last_time_non_ok_or_up()
177 deadline = svc.last_time_non_ok_or_up() + svc.first_notification_delay * svc.__class__.interval_length
178 #-----------------------------------------------------------------
179 # check fails again and enters hard state.
180 # now there is a (scheduled for later) notification and an event handler
181 # current_notification_number is still 0, until notifications
182 # have actually been sent
183 #-----------------------------------------------------------------
184 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
185 self.show_and_clear_logs()
186 self.show_actions()
187 self.assert_(svc.current_notification_number == 0)
188 #-----------------------------------------------------------------
189 # repeat bad checks during the delay time
190 # there is 1 action which is the scheduled notification
191 #-----------------------------------------------------------------
192 loop=0
193 while deadline > time.time():
194 loop += 1
195 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
196 self.show_and_clear_logs()
197 self.show_actions()
198 print deadline-time.time()
199 ###self.assert_(self.count_actions() == 1)
200 #-----------------------------------------------------------------
201 # now the delay period is over and the notification can be sent
202 # with the next bad check
203 # there is 1 action, the notification (
204 # 1 notification was sent, so current_notification_number is 1
205 #-----------------------------------------------------------------
206 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=1)
207 print "Counted actions", self.count_actions()
208 self.assert_(self.count_actions() == 2)
209 # 1 master, 1 child
210 self.assert_(svc.current_notification_number == 1)
211 self.show_actions()
212 self.assert_(len(svc.notifications_in_progress) == 1) # master is zombieand removed_from_in_progress
213 self.show_logs()
214 self.assert_(self.log_match(1, 'SERVICE NOTIFICATION.*;CRITICAL;'))
215 self.show_and_clear_logs()
216 self.show_actions()
217 #-----------------------------------------------------------------
218 # relax with a successful check
219 # there are 2 actions, one notification and one eventhandler
220 # current_notification_number was reset to 0
221 #-----------------------------------------------------------------
222 self.scheduler_loop(2, [[svc, 0, 'GOOD']], do_sleep=True, sleep_time=1)
223 self.assert_(self.log_match(1, 'SERVICE ALERT.*;OK;'))
224 self.assert_(self.log_match(2, 'SERVICE EVENT HANDLER.*;OK;'))
225 self.assert_(self.log_match(3, 'SERVICE NOTIFICATION.*;OK;'))
226 # evt reap 2 loops
227 self.assert_(svc.current_notification_number == 0)
228 self.assert_(len(svc.notifications_in_progress) == 0)
229 self.assert_(len(svc.notified_contacts) == 0)
230 #self.assert_(self.count_actions() == 2)
231 self.show_and_clear_logs()
232 self.show_and_clear_actions()
235 def test_continuous_notifications_delayed_recovers_fast(self):
236 self.print_header()
237 # retry_interval 2
238 # critical notification
239 # run loop -> another notification
240 now = time.time()
241 host = self.sched.hosts.find_by_name("test_host_0")
242 host.checks_in_progress = []
243 host.act_depend_of = [] # ignore the router
244 svc = self.sched.services.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
245 svc.first_notification_delay = 5
246 svc.checks_in_progress = []
247 svc.act_depend_of = [] # no hostchecks on critical checkresults
248 self.scheduler_loop(1, [[host, 0, 'UP']], do_sleep=True, sleep_time=0.1)
249 #-----------------------------------------------------------------
250 # initialize with a good check. there must be no pending notification
251 #-----------------------------------------------------------------
252 self.scheduler_loop(1, [[svc, 0, 'OK']], do_sleep=True, sleep_time=0.1)
253 self.show_and_clear_logs()
254 self.show_and_clear_actions()
255 self.assert_(svc.current_notification_number == 0)
256 #-----------------------------------------------------------------
257 # check fails and enters soft state.
258 # there must be no notification, only the event handler
259 #-----------------------------------------------------------------
260 self.scheduler_loop(1, [[svc, 1, 'BAD']], do_sleep=True, sleep_time=0.1)
261 self.show_and_clear_logs()
262 self.show_actions()
263 self.assert_(self.count_actions() == 1)
264 #-----------------------------------------------------------------
265 # check fails again and enters hard state.
266 # now there is a (scheduled for later) notification and an event handler
267 # current_notification_number is still 0 (will be raised when
268 # a notification is actually sent)
269 #-----------------------------------------------------------------
270 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
271 self.show_and_clear_logs()
272 self.show_actions()
273 self.assert_(self.count_actions() == 2)
274 self.assert_(svc.current_notification_number == 0)
275 #-----------------------------------------------------------------
276 # repeat bad checks during the delay time
277 # but only one time. we don't want to reach the deadline
278 # there is one action: the pending notification
279 #-----------------------------------------------------------------
280 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
281 self.show_and_clear_logs()
282 self.show_actions()
283 self.assert_(self.count_actions() == 1)
284 #-----------------------------------------------------------------
285 # relax with a successful check
286 # there is 1 action, the eventhandler.
287 # there is a second action: the master recover notification
288 # but it becomes a zombie very soon, because it has no effect
289 #-----------------------------------------------------------------
290 self.scheduler_loop(1, [[svc, 0, 'GOOD']], do_sleep=True, sleep_time=0.1)
291 self.assert_(self.log_match(1, 'SERVICE ALERT.*;OK;'))
292 self.assert_(self.log_match(2, 'SERVICE EVENT HANDLER.*;OK;'))
293 self.assert_(not self.log_match(3, 'SERVICE NOTIFICATION.*;OK;'))
294 self.show_actions()
295 self.assert_(len(svc.notifications_in_progress) == 0)
296 self.assert_(len(svc.notified_contacts) == 0)
297 self.assert_(self.count_actions() == 2)
298 self.show_and_clear_logs()
299 self.show_and_clear_actions()
302 def test_host_in_downtime_or_down_service_critical(self):
303 self.print_header()
304 # retry_interval 2
305 # critical notification
306 # run loop -> another notification
307 now = time.time()
308 host = self.sched.hosts.find_by_name("test_host_0")
309 host.checks_in_progress = []
310 host.act_depend_of = [] # ignore the router
311 svc = self.sched.services.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
313 #To make tests quicker we make notifications send very quickly
314 svc.notification_interval = 0.001
316 svc.checks_in_progress = []
317 svc.act_depend_of = [] # no hostchecks on critical checkresults
318 #--------------------------------------------------------------
319 # initialize host/service state
320 #--------------------------------------------------------------
321 self.scheduler_loop(1, [[host, 0, 'UP'], [svc, 0, 'OK']], do_sleep=True, sleep_time=0.1)
322 self.assert_(svc.current_notification_number == 0)
323 #--------------------------------------------------------------
324 # service reaches hard;2
325 # a notification must have been created
326 # notification number must be 1
327 #--------------------------------------------------------------
328 self.scheduler_loop(2, [[host, 0, 'UP'], [svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
329 self.show_logs()
330 self.show_actions()
331 self.assert_(self.log_match(1, 'SERVICE ALERT.*;CRITICAL;SOFT'))
332 self.assert_(self.log_match(2, 'SERVICE EVENT HANDLER.*;CRITICAL;SOFT'))
333 self.assert_(self.log_match(3, 'SERVICE ALERT.*;CRITICAL;HARD'))
334 self.assert_(self.log_match(4, 'SERVICE EVENT HANDLER.*;CRITICAL;HARD'))
335 self.assert_(self.log_match(5, 'SERVICE NOTIFICATION.*;CRITICAL;'))
336 self.assert_(svc.current_notification_number == 1)
337 self.clear_logs()
338 self.clear_actions()
339 #--------------------------------------------------------------
340 # reset host/service state
341 #--------------------------------------------------------------
342 self.scheduler_loop(1, [[host, 0, 'UP'], [svc, 0, 'OK']], do_sleep=True, sleep_time=0.1)
343 self.assert_(svc.current_notification_number == 0)
344 duration = 300
345 now = time.time()
346 # fixed downtime valid for the next 5 minutes
347 cmd = "[%lu] SCHEDULE_HOST_DOWNTIME;test_host_0;%d;%d;1;0;%d;lausser;blablub" % (now, now, now + duration, duration)
348 self.sched.run_external_command(cmd)
349 #--------------------------------------------------------------
350 # service reaches hard;2
351 # no notificatio
352 #--------------------------------------------------------------
353 self.scheduler_loop(2, [[host, 0, 'UP'], [svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
354 self.assert_(self.any_log_match('HOST NOTIFICATION.*;DOWNTIMESTART'))
355 self.assert_(not self.any_log_match('SERVICE NOTIFICATION.*;CRITICAL;'))
356 self.show_and_clear_logs()
357 self.show_and_clear_actions()
360 def test_only_notified_contacts_notifications(self):
361 self.print_header()
362 # retry_interval 2
363 # critical notification
364 # run loop -> another notification
365 now = time.time()
366 host = self.sched.hosts.find_by_name("test_host_0")
367 host.checks_in_progress = []
368 host.act_depend_of = [] # ignore the router
369 svc = self.sched.services.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
371 #To make tests quicker we make notifications send very quickly
372 svc.notification_interval = 0.001
374 svc.checks_in_progress = []
375 svc.act_depend_of = [] # no hostchecks on critical checkresults
377 #We want the contact to do not have a mail, so we remove tyhe 'u'
378 test_contact = self.sched.contacts.find_by_name('test_contact')
379 for nw in test_contact.notificationways:
380 nw.service_notification_options.remove('u')
382 #--------------------------------------------------------------
383 # initialize host/service state
384 #--------------------------------------------------------------
385 self.scheduler_loop(1, [[host, 0, 'UP']], do_sleep=True, sleep_time=0.1)
386 print "- 1 x OK -------------------------------------"
387 self.scheduler_loop(1, [[svc, 0, 'OK']], do_sleep=True, sleep_time=0.1)
389 self.assert_(svc.current_notification_number == 0)
390 #--------------------------------------------------------------
391 # service reaches soft;1
392 # there must not be any notification
393 #--------------------------------------------------------------
394 print "- 1 x BAD get soft -------------------------------------"
395 self.scheduler_loop(1, [[svc, 3, 'UNKNOWN']], do_sleep=True, sleep_time=0.1)
396 # check_notification: not (soft)
397 print "---current_notification_number", svc.current_notification_number
398 print "Contact we notified", svc.notified_contacts
399 #--------------------------------------------------------------
400 # service reaches hard;2
401 # a notification must have been created
402 # notification number must be 1
403 #--------------------------------------------------------------
404 print "- 1 x BAD get hard -------------------------------------"
405 self.scheduler_loop(1, [[svc, 3, 'UNKNOWN']], do_sleep=True, sleep_time=0.1)
406 self.show_and_clear_logs()
407 #self.show_and_clear_actions()
408 print "TOTO2"
409 self.show_actions()
410 print "notif in progress", svc.notifications_in_progress
411 for n in svc.notifications_in_progress.values():
412 print "TOTO", n.__dict__
413 # check_notification: yes (hard)
414 print "---current_notification_number", svc.current_notification_number
415 # The contact refuse our notification, so we are still at 0
416 self.assert_(svc.current_notification_number == 0)
417 print "---------------------------------1st round with a hard"
418 print "find a way to get the number of the last reaction"
419 cnn = svc.current_notification_number
420 print "- 5 x BAD repeat -------------------------------------"
421 self.scheduler_loop(1, [[svc, 3, 'BAD']], do_sleep=True, sleep_time=0.1)
422 self.show_and_clear_logs()
423 self.show_actions()
424 print "cnn and cur", cnn, svc.current_notification_number
426 cnn = svc.current_notification_number
427 self.scheduler_loop(1, [[svc, 3, 'BAD']], do_sleep=True, sleep_time=0.1)
428 self.show_and_clear_logs()
429 self.show_actions()
430 print "svc.current_notification_number, cnn", svc.current_notification_number, cnn
432 #--------------------------------------------------------------
433 # 2 cycles = 2 minutes = 2 new notifications
434 #--------------------------------------------------------------
435 cnn = svc.current_notification_number
436 self.scheduler_loop(2, [[svc, 3, 'BAD']], do_sleep=True, sleep_time=0.1)
437 self.show_and_clear_logs()
438 self.show_actions()
439 print "svc.current_notification_number, cnn", svc.current_notification_number, cnn
441 #--------------------------------------------------------------
442 # 2 cycles = 2 minutes = 2 new notifications (theoretically)
443 # BUT: test_contact filters notifications
444 # we do not raise current_notification_number if no mail was sent
445 #--------------------------------------------------------------
446 now = time.time()
447 cmd = "[%lu] DISABLE_CONTACT_SVC_NOTIFICATIONS;test_contact" % now
448 self.sched.run_external_command(cmd)
449 cnn = svc.current_notification_number
450 self.scheduler_loop(1, [[svc, 3, 'BAD']], do_sleep=True, sleep_time=0.1)
451 self.show_and_clear_logs()
452 self.show_actions()
453 self.assert_(svc.current_notification_number == cnn)
454 #--------------------------------------------------------------
455 # again a normal cycle
456 # test_contact receives his mail
457 #--------------------------------------------------------------
458 now = time.time()
459 cmd = "[%lu] ENABLE_CONTACT_SVC_NOTIFICATIONS;test_contact" % now
460 self.sched.run_external_command(cmd)
461 #cnn = svc.current_notification_number
462 self.scheduler_loop(1, [[svc, 3, 'BAD']], do_sleep=True, sleep_time=0.1)
463 self.show_and_clear_logs()
464 self.show_actions()
465 print "svc.current_notification_number, cnn", svc.current_notification_number, cnn
466 #self.assert_(svc.current_notification_number == cnn + 1)
467 #--------------------------------------------------------------
468 # now recover. there must be no scheduled/inpoller notification
469 #--------------------------------------------------------------
470 self.scheduler_loop(1, [[svc, 0, 'GOOD']], do_sleep=True, sleep_time=0.1)
472 print "prout"
473 # I do not want a notification of a recovery because
474 # the user did not have the notif first!
475 self.assert_(not self.any_log_match('notify-service'))
476 self.show_and_clear_logs()
477 self.show_and_clear_actions()
478 self.assert_(svc.current_notification_number == 0)
482 if __name__ == '__main__':
483 unittest.main()