Add : orphaned test to all tests.
[shinken.git] / test / test_modules_nrpe_poller.py
blob6de4c47dfa4b4eacbdaa91cac8d7e427dbb97dee
1 #!/usr/bin/env python2.6
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 # This file is used to test reading and processing of config files
26 import os
27 from Queue import Empty
28 from multiprocessing import Queue, Manager, active_children
30 from shinken_test import *
31 from shinken.log import logger
32 from shinken.objects.module import Module
33 from shinken.modules import nrpe_poller
34 from shinken.modules.nrpe_poller import get_instance
37 modconf = Module()
38 modconf.module_name = "NrpePoller"
39 modconf.module_type = nrpe_poller.properties['type']
40 modconf.properties = nrpe_poller.properties.copy()
43 class TestNrpePoller(ShinkenTest):
44 #setUp is in shinken_test
45 # def setUp(self):
46 # self.setup_with_file('etc/nagios_module_hack_cmd_poller_tag.cfg')
49 #Change ME :)
50 def test_nrpe_poller(self):
52 mod = nrpe_poller.Nrpe_poller(modconf)
54 sl = get_instance(mod)
55 # Look if we really change our commands
57 print sl.__dict__
58 sl.id = 1
59 sl.i_am_dying = False
61 to_queue = Queue()
62 manager = Manager()
63 from_queue = manager.list()
64 control_queue = Queue()
66 # We prepare a check in the to_queue
67 status = 'queue'
68 command = "$USER1$/check_nrpe -H localhost33 -n -u -t 1 -c check_load3 -a 20"# -a arg1 arg2 arg3"
69 ref = None
70 t_to_to = time.time()
71 c = Check(status, command, ref, t_to_to)
73 msg = Message(id=0, type='Do', data=c)
74 to_queue.put(msg)
76 # The worker will read a message by loop. We want it to
77 # do 2 loops, so we fake a message, adn the Number 2 is a real
78 # exit one
79 msg1 = Message(id=0, type='All is good, continue')
80 msg2 = Message(id=0, type='Die')
82 control_queue.put(msg1)
83 for _ in xrange(1, 2):
84 control_queue.put(msg1)
85 #control_queue.put(msg1)
86 #control_queue.put(msg1)
87 #control_queue.put(msg1)
88 #control_queue.put(msg1)
89 control_queue.put(msg2)
90 sl.work(to_queue, from_queue, control_queue)
92 o = from_queue.pop()
93 print "O", o
95 print o.__dict__
96 self.assert_(o.status == 'done')
97 self.assert_(o.exit_status == 2)
99 to_queue.close()
100 control_queue.close()
105 if __name__ == '__main__':
106 unittest.main()