Merge branch 'master' of ssh://lausser,shinken@shinken.git.sourceforge.net/gitroot...
[shinken.git] / shinken / worker.py
blob145c75541f6aeebd45f9227eaff9c07cb46d1e7f
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
24 #This class is used for poller and reactionner to work.
25 #The worker is a process launch by theses process and read Message in a Queue
26 #(self.s) (slave)
27 #They launch the Check and then send the result in the Queue self.m (master)
28 #they can die if they do not do anything (param timeout)
30 from Queue import Empty
31 from multiprocessing import Process, Queue
33 import time
34 import sys
35 import signal
38 #Worker class
39 class Worker:
40 id = 0#None
41 _process = None
42 _mortal = None
43 _idletime = None
44 _timeout = None
45 _c = None
46 def __init__(self, id, s, returns_queue, processes_by_worker, mortal=True, timeout=300, max_plugins_output_length=8192, target=None):
47 self.id = self.__class__.id
48 self.__class__.id += 1
50 self._mortal = mortal
51 self._idletime = 0
52 self._timeout = timeout
53 self.processes_by_worker = processes_by_worker
54 self._c = Queue() # Private Control queue for the Worker
55 # By default, take our own code
56 if target is None:
57 target=self.work
58 self._process = Process(target=target, args=(s, returns_queue, self._c))
59 self.returns_queue = returns_queue
60 self.max_plugins_output_length = max_plugins_output_length
61 self.i_am_dying = False
64 def is_mortal(self):
65 return self._mortal
68 def start(self):
69 self._process.start()
72 # Kill the background process
73 # AND close correctly the queue
74 # the queue have a thread, so close it too....
75 def terminate(self):
76 self._process.terminate()
77 self._c.close()
78 self._c.join_thread()
81 def join(self, timeout=None):
82 self._process.join(timeout)
85 def is_alive(self):
86 return self._process.is_alive()
89 def is_killable(self):
90 return self._mortal and self._idletime > self._timeout
93 def add_idletime(self, time):
94 self._idletime = self._idletime + time
97 def reset_idle(self):
98 self._idletime = 0
101 def send_message(self, msg):
102 self._c.put(msg)
105 # A zombie is immortal, so kill not be kill anymore
106 def set_zombie(self):
107 self._mortal = False
110 # Get new checks if less than nb_checks_max
111 # If no new checks got and no check in queue,
112 # sleep for 1 sec
113 # REF: doc/shinken-action-queues.png (3)
114 def get_new_checks(self):
115 try:
116 while(len(self.checks) < self.processes_by_worker):
117 #print "I", self.id, "wait for a message"
118 msg = self.s.get(block=False)
119 if msg is not None:
120 self.checks.append(msg.get_data())
121 #print "I", self.id, "I've got a message!"
122 except Empty , exp:
123 if len(self.checks) == 0:
124 self._idletime = self._idletime + 1
125 time.sleep(1)
126 # Maybe the Queue() is not available, if so, just return
127 # get back to work :)
128 except IOError, exp:
129 return
132 #Launch checks that are in status
133 #REF: doc/shinken-action-queues.png (4)
134 def launch_new_checks(self):
135 #queue
136 for chk in self.checks:
137 if chk.status == 'queue':
138 self._idletime = 0
139 r = chk.execute()
140 # Maybe we got a true big problem in the
141 # action lanching
142 if r == 'toomanyopenfiles':
143 # We should die as soon as we return all checks
144 self.i_am_dying = True
147 #Check the status of checks
148 #if done, return message finished :)
149 #REF: doc/shinken-action-queues.png (5)
150 def manage_finished_checks(self):
151 to_del = []
152 wait_time = 1
153 now = time.time()
154 for action in self.checks:
155 if action.status == 'launched' and action.last_poll < now - action.wait_time:
156 action.check_finished(self.max_plugins_output_length)
157 wait_time = min(wait_time, action.wait_time)
158 #If action done, we can launch a new one
159 if action.status in ('done', 'timeout'):
160 to_del.append(action)
161 #We answer to the master
162 #msg = Message(id=self.id, type='Result', data=action)
163 try:
164 self.returns_queue.append(action)#msg)
165 except IOError , exp:
166 print "[%d]Exiting: %s" % (self.id, exp)
167 sys.exit(2)
168 #Little sleep
169 self.wait_time = wait_time
171 for chk in to_del:
172 self.checks.remove(chk)
174 #Little seep
175 time.sleep(wait_time)
178 #Check if our system time change. If so, change our
179 def check_for_system_time_change(self):
180 now = time.time()
181 difference = now - self.t_each_loop
183 #Now set the new value for the tick loop
184 self.t_each_loop = now
186 #return the diff if it need, of just 0
187 if abs(difference) > 900:
188 return difference
189 else:
190 return 0
193 #id = id of the worker
194 #s = Global Queue Master->Slave
195 #m = Queue Slave->Master
196 #return_queue = queue managed by manager
197 #c = Control Queue for the worker
198 def work(self, s, returns_queue, c):
199 ## restore default signal handler for the workers:
200 signal.signal(signal.SIGTERM, signal.SIG_DFL)
201 timeout = 1.0
202 self.checks = []
203 self.returns_queue = returns_queue
204 self.s = s
205 self.t_each_loop = time.time()
206 while True:
207 begin = time.time()
208 msg = None
209 cmsg = None
211 # If we are dying (big problem!) we do not
212 # take new jobs, we just finished the current one
213 if not self.i_am_dying:
214 #REF: doc/shinken-action-queues.png (3)
215 self.get_new_checks()
216 #REF: doc/shinken-action-queues.png (4)
217 self.launch_new_checks()
218 #REF: doc/shinken-action-queues.png (5)
219 self.manage_finished_checks()
221 #Now get order from master
222 try:
223 cmsg = c.get(block=False)
224 if cmsg.get_type() == 'Die':
225 print "[%d]Dad say we are dying..." % self.id
226 break
227 except :
228 pass
230 if self._mortal == True and self._idletime > 2 * self._timeout:
231 print "[%d]Timeout, Harakiri" % self.id
232 #The master must be dead and we are loonely, we must die
233 break
235 # Look if we are dying, and if we finishe all current checks
236 # if so, we really die, our master poller will launch a new
237 # worker because we were too weack to manage our job :(
238 if len(self.checks) == 0 and self.i_am_dying:
239 print "[%d] I DIE because I cannot do my job as I should (too many open files?)... forgot me please." % self.id
240 break
242 # Manage a possible time change (our avant will be change with the diff)
243 diff = self.check_for_system_time_change()
244 begin += diff
246 timeout -= time.time() - begin
247 if timeout < 0:
248 timeout = 1.0