1 #!/usr/bin/env python2.6
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 # This file is used to test the npcd broker module
26 import os
, sys
, string
, time
28 from shinken_test
import unittest
, ShinkenTest
30 from shinken
.objects
.module
import Module
32 from shinken
.modules
import npcdmod_broker
33 from shinken
.modules
.npcdmod_broker
import Npcd_broker
36 sys
.setcheckinterval(10000)
39 modconf
.module_name
= "ncpd"
40 modconf
.module_type
= npcdmod_broker
.properties
['type']
41 modconf
.properties
= npcdmod_broker
.properties
.copy()
45 class TestConfig(ShinkenTest
):
52 def fake_check(self
, ref
, exit_status
, output
="OK"):
56 check
= ref
.actions
.pop()
57 self
.sched
.add(check
) # check is now in sched.checks[]
59 check
.check_time
= now
61 check
.exit_status
= exit_status
62 check
.execution_time
= 0.001
63 check
.status
= 'waitconsume'
64 self
.sched
.waiting_results
.append(check
)
67 def scheduler_loop(self
, count
, reflist
):
69 (obj
, exit_status
, output
) = ref
70 obj
.checks_in_progress
= []
71 for loop
in range(1, count
+ 1):
72 print "processing check", loop
74 (obj
, exit_status
, output
) = ref
75 obj
.update_in_checking()
76 self
.fake_check(obj
, exit_status
, output
)
77 self
.sched
.consume_results()
80 (obj
, exit_status
, output
) = ref
81 obj
.checks_in_progress
= []
82 self
.sched
.update_downtimes_and_comments()
83 #time.sleep(ref.retry_interval * 60 + 1)
87 def worker_loop(self
):
88 self
.sched
.delete_zombie_checks()
89 self
.sched
.delete_zombie_actions()
90 checks
= self
.sched
.get_to_run_checks(True, False)
91 actions
= self
.sched
.get_to_run_checks(False, True)
92 #print "------------ worker loop checks ----------------"
94 #print "------------ worker loop actions ----------------"
96 #print "------------ worker loop new ----------------"
98 #print "---> fake return of action", a.id
101 self
.sched
.put_results(a
)
103 #print "------------ worker loop end ----------------"
106 def update_broker(self
):
107 for brok
in self
.sched
.broks
.values():
108 self
.npcdmod_broker
.manage_brok(brok
)
109 self
.sched
.broks
= {}
112 def print_header(self
):
113 print "#" * 80 + "\n" + "#" + " " * 78 + "#"
114 print "#" + string
.center(self
.id(), 78) + "#"
115 print "#" + " " * 78 + "#\n" + "#" * 80 + "\n"
118 def write_correct_config(self
):
119 file = open("npcd.cfg", "w")
120 file.write("perfdata_file = /tmp/pfnerf")
121 file.write("perfdata_spool_dir = /tmp/pnp4shinken/var/perfdata")
122 file.write("perfdata_spool_filename=pferf")
126 def write_incomplete_config(self
):
127 file = open("npcd.cfg", "w")
128 file.write("perfdata_file = /tmp/pfnerf")
129 file.write("perfdata_spool_filename=pferf")
133 def test_write_perfdata_file(self
):
135 if os
.path
.exists("./perfdata"):
136 os
.unlink("./perfdata")
138 self
.npcdmod_broker
= Npcd_broker(modconf
, None, './perfdata', '.', 'perfdata-target', 15)
139 self
.npcdmod_broker
.properties
['to_queue'] = 0
141 self
.npcdmod_broker
.init()
142 self
.sched
.fill_initial_broks()
144 print "got initial broks"
146 host
= self
.sched
.hosts
.find_by_name("test_host_0")
147 host
.checks_in_progress
= []
148 host
.act_depend_of
= [] # ignore the router
149 router
= self
.sched
.hosts
.find_by_name("test_router_0")
150 router
.checks_in_progress
= []
151 router
.act_depend_of
= [] # ignore the router
152 svc
= self
.sched
.services
.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
153 svc
.checks_in_progress
= []
154 svc
.act_depend_of
= [] # no hostchecks on critical checkresults
155 self
.scheduler_loop(2, [[host
, 0, 'UP | value1=1 value2=2'], [router
, 0, 'UP | rtt=10'], [svc
, 2, 'BAD | value1=0 value2=0']])
157 self
.assert_(os
.path
.exists("./perfdata"))
158 if os
.path
.exists("./perfdata"):
159 self
.npcdmod_broker
.logfile
.close()
160 os
.unlink("./perfdata")
164 if __name__
== '__main__':
166 command
= """unittest.main()"""
168 #cProfile.runctx( command, globals(), locals(), filename="Thruk.profile" )