*Fix a bug in host.parents livestatus representation to make thruk happy
[shinken.git] / shinken / worker.py
blob3a10eb9e951788ddf41105d459f3f3e9a43dbfea
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
22 #This class is used for poller and reactionner to work.
23 #The worker is a process launch by theses process and read Message in a Queue
24 #(self.s) (slave)
25 #They launch the Check and then send the result in the Queue self.m (master)
26 #they can die if they do not do anything (param timeout)
28 from Queue import Empty
29 from multiprocessing import Process, Queue
31 import time, sys
33 #Worker class
34 class Worker:
35 id = 0#None
36 _process = None
37 _mortal = None
38 _idletime = None
39 _timeout = None
40 _c = None
41 def __init__(self, id, s, returns_queue, processes_by_worker, mortal=True, timeout=300, max_plugins_output_length=8192):
42 self.id = self.__class__.id
43 self.__class__.id += 1
45 self._mortal = mortal
46 self._idletime = 0
47 self._timeout = timeout
48 self.processes_by_worker = processes_by_worker
49 self._c = Queue() # Private Control queue for the Worker
50 self._process = Process(target=self.work, args=(s, returns_queue, self._c))
51 self.returns_queue = returns_queue
52 self.max_plugins_output_length = max_plugins_output_length
53 #Thread version : not good in cpython :(
54 #self._process = threading.Thread(target=self.work, args=(s, returns_queue, self._c))
55 self.i_am_dying = False
58 def is_mortal(self):
59 return self._mortal
62 def start(self):
63 self._process.start()
66 #Kill the backgroup process
67 #AND close correctly the queue
68 #the queue have a thread, so close it too....
69 def terminate(self):
70 self._process.terminate()
71 self._c.close()
72 self._c.join_thread()
75 def join(self, timeout=None):
76 self._process.join(timeout)
79 def is_alive(self):
80 return self._process.is_alive()
83 def is_killable(self):
84 return self._mortal and self._idletime > self._timeout
87 def add_idletime(self, time):
88 self._idletime = self._idletime + time
91 def reset_idle(self):
92 self._idletime = 0
95 def send_message(self, msg):
96 self._c.put(msg)
99 #A zombie is immortal, so kill not be kill anymore
100 def set_zombie(self):
101 self._mortal = False
104 #Get new checks if less than nb_checks_max
105 #If no new checks got and no check in queue,
106 #sleep for 1 sec
107 #REF: doc/shinken-action-queues.png (3)
108 def get_new_checks(self):
109 try:
110 while(len(self.checks) < self.processes_by_worker):
111 #print "I", self.id, "wait for a message"
112 msg = self.s.get(block=False)
113 if msg is not None:
114 self.checks.append(msg.get_data())
115 #print "I", self.id, "I've got a message!"
116 except Empty , exp:
117 if len(self.checks) == 0:
118 self._idletime = self._idletime + 1
119 time.sleep(1)
122 #Launch checks that are in status
123 #REF: doc/shinken-action-queues.png (4)
124 def launch_new_checks(self):
125 #queue
126 for chk in self.checks:
127 if chk.status == 'queue':
128 self._idletime = 0
129 r = chk.execute()
130 # Maybe we got a true big problem in the
131 # action lanching
132 if r == 'toomanyopenfiles':
133 # We should die as soon as we return all checks
134 self.i_am_dying = True
137 #Check the status of checks
138 #if done, return message finished :)
139 #REF: doc/shinken-action-queues.png (5)
140 def manage_finished_checks(self):
141 to_del = []
142 wait_time = 1
143 now = time.time()
144 for action in self.checks:
145 if action.status == 'launched' and action.last_poll < now - action.wait_time:
146 action.check_finished(self.max_plugins_output_length)
147 wait_time = min(wait_time, action.wait_time)
148 #If action done, we can launch a new one
149 if action.status in ('done', 'timeout'):
150 to_del.append(action)
151 #We answer to the master
152 #msg = Message(id=self.id, type='Result', data=action)
153 try:
154 self.returns_queue.append(action)#msg)
155 except IOError , exp:
156 print "[%d]Exiting: %s" % (self.id, exp)
157 sys.exit(2)
158 #Little sleep
159 self.wait_time = wait_time
161 for chk in to_del:
162 self.checks.remove(chk)
164 #Little seep
165 time.sleep(wait_time)
168 #Check if our system time change. If so, change our
169 def check_for_system_time_change(self):
170 now = time.time()
171 difference = now - self.t_each_loop
173 #Now set the new value for the tick loop
174 self.t_each_loop = now
176 #return the diff if it need, of just 0
177 if abs(difference) > 900:
178 return difference
179 else:
180 return 0
183 #id = id of the worker
184 #s = Global Queue Master->Slave
185 #m = Queue Slave->Master
186 #return_queue = queue managed by manager
187 #c = Control Queue for the worker
188 def work(self, s, returns_queue, c):
189 timeout = 1.0
190 self.checks = []
191 self.returns_queue = returns_queue
192 self.s = s
193 self.t_each_loop = time.time()
194 while True:
195 begin = time.time()
196 msg = None
197 cmsg = None
199 # If we are diyin (big problem!) we do not
200 # take new jobs, we just finished the current one
201 if not self.i_am_dying:
202 #REF: doc/shinken-action-queues.png (3)
203 self.get_new_checks()
204 #REF: doc/shinken-action-queues.png (4)
205 self.launch_new_checks()
206 #REF: doc/shinken-action-queues.png (5)
207 self.manage_finished_checks()
209 #Now get order from master
210 try:
211 cmsg = c.get(block=False)
212 if cmsg.get_type() == 'Die':
213 print "[%d]Dad say we are diing..." % self.id
214 break
215 except :
216 pass
218 if self._mortal == True and self._idletime > 2 * self._timeout:
219 print "[%d]Timeout, Arakiri" % self.id
220 #The master must be dead and we are loonely, we must die
221 break
223 # Look if we are dying, and if we finishe all current checks
224 # if so, we really die, our master poller will launch a new
225 # worker because we were too weack to manage our job :(
226 if len(self.checks) == 0 and self.i_am_dying:
227 print "[%d] I DIE because I cannot do my job as I should (too many open files?)... forgot me please." % self.id
228 break
230 #Manage a possible time change (our avant will be change with the diff)
231 diff = self.check_for_system_time_change()
232 begin += diff
234 timeout -= time.time() - begin
235 if timeout < 0:
236 timeout = 1.0