2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
24 #This class is used for poller and reactionner to work.
25 #The worker is a process launch by theses process and read Message in a Queue
27 #They launch the Check and then send the result in the Queue self.m (master)
28 #they can die if they do not do anything (param timeout)
30 from Queue
import Empty
31 from multiprocessing
import Process
, Queue
46 def __init__(self
, id, s
, returns_queue
, processes_by_worker
, mortal
=True, timeout
=300, max_plugins_output_length
=8192, target
=None):
47 self
.id = self
.__class
__.id
48 self
.__class
__.id += 1
52 self
._timeout
= timeout
53 self
.processes_by_worker
= processes_by_worker
54 self
._c
= Queue() # Private Control queue for the Worker
55 # By default, take our own code
58 self
._process
= Process(target
=target
, args
=(s
, returns_queue
, self
._c
))
59 self
.returns_queue
= returns_queue
60 self
.max_plugins_output_length
= max_plugins_output_length
61 self
.i_am_dying
= False
72 # Kill the background process
73 # AND close correctly the queue
74 # the queue have a thread, so close it too....
76 self
._process
.terminate()
81 def join(self
, timeout
=None):
82 self
._process
.join(timeout
)
86 return self
._process
.is_alive()
89 def is_killable(self
):
90 return self
._mortal
and self
._idletime
> self
._timeout
93 def add_idletime(self
, time
):
94 self
._idletime
= self
._idletime
+ time
101 def send_message(self
, msg
):
105 # A zombie is immortal, so kill not be kill anymore
106 def set_zombie(self
):
110 # Get new checks if less than nb_checks_max
111 # If no new checks got and no check in queue,
113 # REF: doc/shinken-action-queues.png (3)
114 def get_new_checks(self
):
116 while(len(self
.checks
) < self
.processes_by_worker
):
117 #print "I", self.id, "wait for a message"
118 msg
= self
.s
.get(block
=False)
120 self
.checks
.append(msg
.get_data())
121 #print "I", self.id, "I've got a message!"
123 if len(self
.checks
) == 0:
124 self
._idletime
= self
._idletime
+ 1
126 # Maybe the Queue() is not available, if so, just return
127 # get back to work :)
132 #Launch checks that are in status
133 #REF: doc/shinken-action-queues.png (4)
134 def launch_new_checks(self
):
136 for chk
in self
.checks
:
137 if chk
.status
== 'queue':
140 # Maybe we got a true big problem in the
142 if r
== 'toomanyopenfiles':
143 # We should die as soon as we return all checks
144 self
.i_am_dying
= True
147 #Check the status of checks
148 #if done, return message finished :)
149 #REF: doc/shinken-action-queues.png (5)
150 def manage_finished_checks(self
):
154 for action
in self
.checks
:
155 if action
.status
== 'launched' and action
.last_poll
< now
- action
.wait_time
:
156 action
.check_finished(self
.max_plugins_output_length
)
157 wait_time
= min(wait_time
, action
.wait_time
)
158 #If action done, we can launch a new one
159 if action
.status
in ('done', 'timeout'):
160 to_del
.append(action
)
161 #We answer to the master
162 #msg = Message(id=self.id, type='Result', data=action)
164 self
.returns_queue
.append(action
)#msg)
165 except IOError , exp
:
166 print "[%d]Exiting: %s" % (self
.id, exp
)
169 self
.wait_time
= wait_time
172 self
.checks
.remove(chk
)
175 time
.sleep(wait_time
)
178 #Check if our system time change. If so, change our
179 def check_for_system_time_change(self
):
181 difference
= now
- self
.t_each_loop
183 #Now set the new value for the tick loop
184 self
.t_each_loop
= now
186 #return the diff if it need, of just 0
187 if abs(difference
) > 900:
193 #id = id of the worker
194 #s = Global Queue Master->Slave
195 #m = Queue Slave->Master
196 #return_queue = queue managed by manager
197 #c = Control Queue for the worker
198 def work(self
, s
, returns_queue
, c
):
199 ## restore default signal handler for the workers:
200 signal
.signal(signal
.SIGTERM
, signal
.SIG_DFL
)
203 self
.returns_queue
= returns_queue
205 self
.t_each_loop
= time
.time()
211 # If we are dying (big problem!) we do not
212 # take new jobs, we just finished the current one
213 if not self
.i_am_dying
:
214 #REF: doc/shinken-action-queues.png (3)
215 self
.get_new_checks()
216 #REF: doc/shinken-action-queues.png (4)
217 self
.launch_new_checks()
218 #REF: doc/shinken-action-queues.png (5)
219 self
.manage_finished_checks()
221 #Now get order from master
223 cmsg
= c
.get(block
=False)
224 if cmsg
.get_type() == 'Die':
225 print "[%d]Dad say we are dying..." % self
.id
230 if self
._mortal
== True and self
._idletime
> 2 * self
._timeout
:
231 print "[%d]Timeout, Harakiri" % self
.id
232 #The master must be dead and we are loonely, we must die
235 # Look if we are dying, and if we finishe all current checks
236 # if so, we really die, our master poller will launch a new
237 # worker because we were too weack to manage our job :(
238 if len(self
.checks
) == 0 and self
.i_am_dying
:
239 print "[%d] I DIE because I cannot do my job as I should (too many open files?)... forgot me please." % self
.id
242 # Manage a possible time change (our avant will be change with the diff)
243 diff
= self
.check_for_system_time_change()
246 timeout
-= time
.time() - begin