2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
22 #This class is used for poller and reactionner to work.
23 #The worker is a process launch by theses process and read Message in a Queue
25 #They launch the Check and then send the result in the Queue self.m (master)
26 #they can die if they do not do anything (param timeout)
28 from Queue
import Empty
29 from multiprocessing
import Process
, Queue
41 def __init__(self
, id, s
, returns_queue
, processes_by_worker
, mortal
=True, timeout
=300, max_plugins_output_length
=8192):
42 self
.id = self
.__class
__.id
43 self
.__class
__.id += 1
47 self
._timeout
= timeout
48 self
.processes_by_worker
= processes_by_worker
49 self
._c
= Queue() # Private Control queue for the Worker
50 self
._process
= Process(target
=self
.work
, args
=(s
, returns_queue
, self
._c
))
51 self
.returns_queue
= returns_queue
52 self
.max_plugins_output_length
= max_plugins_output_length
53 #Thread version : not good in cpython :(
54 #self._process = threading.Thread(target=self.work, args=(s, returns_queue, self._c))
55 self
.i_am_dying
= False
66 #Kill the backgroup process
67 #AND close correctly the queue
68 #the queue have a thread, so close it too....
70 self
._process
.terminate()
75 def join(self
, timeout
=None):
76 self
._process
.join(timeout
)
80 return self
._process
.is_alive()
83 def is_killable(self
):
84 return self
._mortal
and self
._idletime
> self
._timeout
87 def add_idletime(self
, time
):
88 self
._idletime
= self
._idletime
+ time
95 def send_message(self
, msg
):
99 #A zombie is immortal, so kill not be kill anymore
100 def set_zombie(self
):
104 #Get new checks if less than nb_checks_max
105 #If no new checks got and no check in queue,
107 #REF: doc/shinken-action-queues.png (3)
108 def get_new_checks(self
):
110 while(len(self
.checks
) < self
.processes_by_worker
):
111 #print "I", self.id, "wait for a message"
112 msg
= self
.s
.get(block
=False)
114 self
.checks
.append(msg
.get_data())
115 #print "I", self.id, "I've got a message!"
117 if len(self
.checks
) == 0:
118 self
._idletime
= self
._idletime
+ 1
122 #Launch checks that are in status
123 #REF: doc/shinken-action-queues.png (4)
124 def launch_new_checks(self
):
126 for chk
in self
.checks
:
127 if chk
.status
== 'queue':
130 # Maybe we got a true big problem in the
132 if r
== 'toomanyopenfiles':
133 # We should die as soon as we return all checks
134 self
.i_am_dying
= True
137 #Check the status of checks
138 #if done, return message finished :)
139 #REF: doc/shinken-action-queues.png (5)
140 def manage_finished_checks(self
):
144 for action
in self
.checks
:
145 if action
.status
== 'launched' and action
.last_poll
< now
- action
.wait_time
:
146 action
.check_finished(self
.max_plugins_output_length
)
147 wait_time
= min(wait_time
, action
.wait_time
)
148 #If action done, we can launch a new one
149 if action
.status
in ('done', 'timeout'):
150 to_del
.append(action
)
151 #We answer to the master
152 #msg = Message(id=self.id, type='Result', data=action)
154 self
.returns_queue
.append(action
)#msg)
155 except IOError , exp
:
156 print "[%d]Exiting: %s" % (self
.id, exp
)
159 self
.wait_time
= wait_time
162 self
.checks
.remove(chk
)
165 time
.sleep(wait_time
)
168 #Check if our system time change. If so, change our
169 def check_for_system_time_change(self
):
171 difference
= now
- self
.t_each_loop
173 #Now set the new value for the tick loop
174 self
.t_each_loop
= now
176 #return the diff if it need, of just 0
177 if abs(difference
) > 900:
183 #id = id of the worker
184 #s = Global Queue Master->Slave
185 #m = Queue Slave->Master
186 #return_queue = queue managed by manager
187 #c = Control Queue for the worker
188 def work(self
, s
, returns_queue
, c
):
191 self
.returns_queue
= returns_queue
193 self
.t_each_loop
= time
.time()
199 # If we are diyin (big problem!) we do not
200 # take new jobs, we just finished the current one
201 if not self
.i_am_dying
:
202 #REF: doc/shinken-action-queues.png (3)
203 self
.get_new_checks()
204 #REF: doc/shinken-action-queues.png (4)
205 self
.launch_new_checks()
206 #REF: doc/shinken-action-queues.png (5)
207 self
.manage_finished_checks()
209 #Now get order from master
211 cmsg
= c
.get(block
=False)
212 if cmsg
.get_type() == 'Die':
213 print "[%d]Dad say we are diing..." % self
.id
218 if self
._mortal
== True and self
._idletime
> 2 * self
._timeout
:
219 print "[%d]Timeout, Arakiri" % self
.id
220 #The master must be dead and we are loonely, we must die
223 # Look if we are dying, and if we finishe all current checks
224 # if so, we really die, our master poller will launch a new
225 # worker because we were too weack to manage our job :(
226 if len(self
.checks
) == 0 and self
.i_am_dying
:
227 print "[%d] I DIE because I cannot do my job as I should (too many open files?)... forgot me please." % self
.id
230 #Manage a possible time change (our avant will be change with the diff)
231 diff
= self
.check_for_system_time_change()
234 timeout
-= time
.time() - begin