2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
22 #This class is an interface for reactionner and poller
23 #The satallite listen configuration from Arbiter in a port
24 #the configuration gived by arbiter is schedulers where actionner will
26 #When already launch and have a conf, actionner still listen to arbiter
28 #if arbiter whant it to have a new conf, satellite forgot old schedulers
30 #take new ones and do the (new) job.
32 from Queue
import Empty
33 from multiprocessing
import Queue
, Manager
, active_children
43 import shinken
.pyro_wrapper
as pyro
45 print "Shinken require the Python Pyro module. Please install it."
51 from message
import Message
52 from worker
import Worker
54 from daemon
import Daemon
55 from log
import logger
57 from check
import Check
58 from notification
import Notification
59 from eventhandler
import EventHandler
61 #Interface for Arbiter, our big MASTER
63 class IForArbiter(Pyro
.core
.ObjBase
):
64 #We keep app link because we are just here for it
65 def __init__(self
, app
):
66 Pyro
.core
.ObjBase
.__init
__(self
)
68 self
.schedulers
= app
.schedulers
71 #function called by arbiter for giving us our conf
72 #conf must be a dict with:
73 #'schedulers' : schedulers dict (by id) with address and port
74 #TODO: catch case where Arbiter send somethign we already have
75 #(same id+add+port) -> just do nothing :)
76 def put_conf(self
, conf
):
77 self
.app
.have_conf
= True
78 self
.app
.have_new_conf
= True
79 #Gout our name from the globals
80 if 'poller_name' in conf
['global']:
81 self
.name
= conf
['global']['poller_name']
82 elif 'reactionner_name' in conf
['global']:
83 self
.name
= conf
['global']['reactionner_name']
85 self
.name
= 'Unnamed satellite'
86 self
.app
.name
= self
.name
88 print "[%s] Sending us a configuration %s " % (self
.name
, conf
)
89 #If we've got something in the schedulers, we do not want it anymore
90 for sched_id
in conf
['schedulers'] :
92 if sched_id
in self
.schedulers
:
93 logger
.log("[%s] We already got the conf %d (%s)" % (self
.name
, sched_id
, conf
['schedulers'][sched_id
]['name']))
95 wait_homerun
= self
.schedulers
[sched_id
]['wait_homerun']
96 s
= conf
['schedulers'][sched_id
]
97 self
.schedulers
[sched_id
] = s
99 uri
= pyro
.create_uri(s
['address'], s
['port'], 'Checks')
101 self
.schedulers
[sched_id
]['uri'] = uri
103 self
.schedulers
[sched_id
]['wait_homerun'] = wait_homerun
105 self
.schedulers
[sched_id
]['wait_homerun'] = {}
106 self
.schedulers
[sched_id
]['running_id'] = 0
107 self
.schedulers
[sched_id
]['active'] = s
['active']
109 #And then we connect to it :)
110 self
.app
.pynag_con_init(sched_id
)
113 self
.app
.max_workers
= conf
['global']['max_workers']
114 self
.app
.min_workers
= conf
['global']['min_workers']
115 self
.app
.processes_by_worker
= conf
['global']['processes_by_worker']
116 self
.app
.polling_interval
= conf
['global']['polling_interval']
117 if 'poller_tags' in conf
['global']:
118 self
.app
.poller_tags
= conf
['global']['poller_tags']
119 else: #for reactionner, poler_tag is [None]
120 self
.app
.poller_tags
= []
121 if 'max_plugins_output_length' in conf
['global']:
122 self
.app
.max_plugins_output_length
= conf
['global']['max_plugins_output_length']
123 else: #for reactionner, we don't really care about it
124 self
.app
.max_plugins_output_length
= 8192
125 print "Max output lenght" , self
.app
.max_plugins_output_length
126 #Set our giving timezone from arbiter
127 use_timezone
= conf
['global']['use_timezone']
128 if use_timezone
!= 'NOTSET':
129 logger
.log("[%s] Setting our timezone to %s" %(self
.name
, use_timezone
))
130 os
.environ
['TZ'] = use_timezone
133 logger
.log("We have our schedulers : %s" % (str(self
.schedulers
)))
136 #Arbiter ask us to do not manage a scheduler_id anymore
137 #I do it and don't ask why
138 def remove_from_conf(self
, sched_id
):
140 del self
.schedulers
[sched_id
]
145 #Arbiter ask me which sched_id I manage, If it is not ok with it
146 #It will ask me to remove one or more sched_id
147 def what_i_managed(self
):
148 return self
.schedulers
.keys()
151 #Use for arbiter to know if we are alive
153 print "We ask us for a ping"
157 #Use by arbiter to know if we have a conf or not
158 #can be usefull if we must do nothing but
159 #we are not because it can KILL US!
161 return self
.app
.have_conf
164 #Call by arbiter if it thinks we are running but we must do not (like
165 #if I was a spare that take a conf but the master returns, I must die
166 #and wait a new conf)
168 #Arbiter : I don't care, hasta la vista baby!
169 #Us : ... <- Nothing! We are die! you don't follow
170 #anything or what?? Reading code is not a job for eyes only...
171 def wait_new_conf(self
):
172 print "Arbiter want me to wait a new conf"
173 self
.schedulers
.clear()
174 self
.app
.have_conf
= False
177 #Interface for Brokers
178 #They connect here and get all broks (data for brokers)
179 #datas must be ORDERED! (initial status BEFORE uodate...)
180 class IBroks(Pyro
.core
.ObjBase
):
182 def __init__(self
, app
):
183 Pyro
.core
.ObjBase
.__init
__(self
)
185 self
.running_id
= random
.random()
188 #Broker need to void it's broks?
189 def get_running_id(self
):
190 return self
.running_id
193 #poller or reactionner ask us actions
195 #print "We ask us broks"
196 res
= self
.app
.get_broks()
207 class Satellite(Daemon
):
208 def __init__(self
, config_file
, is_daemon
, do_replace
, debug
, debug_file
):
211 #From daemon to manage signal. Call self.manage_signal if
212 #exists, a dummy function otherwise
213 self
.set_exit_handler()
217 self
.log
.load_obj(self
)
219 #The config reading part
220 self
.config_file
= config_file
221 #Read teh config file if exist
222 #if not, default properties are used
223 self
.parse_config_file()
225 if self
.config_file
!= None:
226 #Some paths can be relatives. We must have a full path by taking
227 #the config file by reference
228 self
.relative_paths_to_full(os
.path
.dirname(config_file
))
230 #Check if another Scheduler is not running (with the same conf)
231 self
.check_parallel_run(do_replace
)
233 #If the admin don't care about security, I allow root running
234 insane
= not self
.idontcareaboutsecurity
236 #Keep broks so they can be eaten by a broker
239 #Try to change the user (not nt for the moment)
240 #TODO: change user on nt
242 self
.change_user(insane
)
244 logger
.log("Sorry, you can't change user on this system")
247 #Now the daemon part if need
249 self
.create_daemon(do_debug
=debug
, debug_file
=debug_file
)
251 #Now the specific stuff
252 #Bool to know if we have received conf from arbiter
253 self
.have_conf
= False
254 self
.have_new_conf
= False
255 self
.s
= Queue() #Global Master -> Slave
256 #self.m = Queue() #Slave -> Master
257 self
.manager
= Manager()
258 self
.returns_queue
= self
.manager
.list()
262 self
.workers
= {} #dict of active workers
264 self
.nb_actions_in_workers
= 0
266 #Init stats like Load for workers
267 self
.wait_ratio
= Load(initial_value
=1)
270 self
.t_each_loop
= time
.time() #use to track system time change
273 #initialise or re-initialise connexion with scheduler
274 def pynag_con_init(self
, id):
275 sched
= self
.schedulers
[id]
276 #If sched is not active, I do not try to init
278 if not sched
['active']:
281 logger
.log("[%s] Init de connexion with %s at %s" % (self
.name
, sched
['name'], sched
['uri']))
282 running_id
= sched
['running_id']
283 sched
['con'] = Pyro
.core
.getProxyForURI(sched
['uri'])
286 #and get the running id
288 pyro
.set_timeout(sched
['con'], 5)
289 new_run_id
= sched
['con'].get_running_id()
290 except (Pyro
.errors
.ProtocolError
,Pyro
.errors
.NamingError
, cPickle
.PicklingError
, KeyError, Pyro
.errors
.CommunicationError
) , exp
:
291 logger
.log("[%s] Scheduler %s is not initilised or got network problem: %s" % (self
.name
, sched
['name'], str(exp
)))
295 #The schedulers have been restart : it has a new run_id.
296 #So we clear all verifs, they are obsolete now.
297 if sched
['running_id'] != 0 and new_run_id
!= running_id
:
298 logger
.log("[%s] The running id of the scheduler %s changed, we must clear it's actions" % (self
.name
, sched
['name']))
299 sched
['wait_homerun'].clear()
300 sched
['running_id'] = new_run_id
301 logger
.log("[%s] Connexion OK with scheduler %s" % (self
.name
, sched
['name']))
304 #Manage action return from Workers
305 #We just put them into the sched they are for
306 #and we clean unused properties like sched_id
307 def manage_action_return(self
, action
):
308 #Ok, it's a result. We get it, and fill verifs of the good sched_id
309 sched_id
= action
.sched_id
310 #Now we now where to put action, we do not need sched_id anymore
312 action
.set_status('waitforhomerun')
313 self
.schedulers
[sched_id
]['wait_homerun'][action
.get_id()] = action
315 self
.nb_actions_in_workers
=- 1
318 #Return the chk to scheduler and clean them
319 #REF: doc/shinken-action-queues.png (6)
320 def manage_returns(self
):
322 #Fot all schedulers, we check for waitforhomerun and we send back results
323 for sched_id
in self
.schedulers
:
324 sched
= self
.schedulers
[sched_id
]
325 #If sched is not active, I do not try return
326 if not sched
['active']:
328 #Now ret have all verifs, we can return them
330 ret
= sched
['wait_homerun'].values()
334 if con
is not None: #None = not initialized
335 send_ok
= con
.put_results(ret
)
336 #Not connected or sched is gone
337 except (Pyro
.errors
.ProtocolError
, KeyError) , exp
:
339 self
.pynag_con_init(sched_id
)
341 except AttributeError , exp
: #the scheduler must not be initialized
343 except Exception , exp
:
344 print ''.join(Pyro
.util
.getPyroTraceback(exp
))
347 #We clean ONLY if the send is OK
349 sched
['wait_homerun'].clear()
351 self
.pynag_con_init(sched_id
)
352 logger
.log("Sent failed!")
356 #Use to wait conf from arbiter.
357 #It send us conf in our daemon. It put the have_conf prop
358 #if he send us something
359 #(it can just do a ping)
360 def wait_for_initial_conf(self
):
361 logger
.log("Waiting for initial configuration")
363 #Arbiter do not already set our have_conf param
364 while not self
.have_conf
:
365 socks
= pyro
.get_sockets(self
.daemon
)
368 ins
,outs
,exs
= select
.select(socks
,[],[],timeout
) # 'foreign' event loop
370 #Manage a possible time change (our avant will be change with the diff)
371 diff
= self
.check_for_system_time_change()
377 pyro
.handleRequests(self
.daemon
, sock
)
380 timeout
= timeout
- diff
381 break # no need to continue with the for loop
383 sys
.stdout
.write(".")
391 #The arbiter can resent us new conf in the daemon port.
392 #We do not want to loose time about it, so it's not a bloking
394 #If it send us a new conf, we reinit the connexions of all schedulers
395 def watch_for_new_conf(self
, timeout_daemon
):
396 socks
= pyro
.get_sockets(self
.daemon
)
398 ins
,outs
,exs
= select
.select(socks
,[],[],timeout_daemon
)
402 pyro
.handleRequests(self
.daemon
, sock
)
404 #have_new_conf is set with put_conf
405 #so another handle will not make a con_init
406 if self
.have_new_conf
:
407 for sched_id
in self
.schedulers
:
408 print "Got a new conf"
409 self
.pynag_con_init(sched_id
)
410 self
.have_new_conf
= False
413 #Check if our system time change. If so, change our
414 def check_for_system_time_change(self
):
416 difference
= now
- self
.t_each_loop
417 #If we have more than 15 min time change, we need to compensate
420 if abs(difference
) > 900:
421 self
.compensate_system_time_change(difference
)
423 #Now set the new value for the tick loop
424 self
.t_each_loop
= now
426 #return the diff if it need, of just 0
427 if abs(difference
) > 900:
433 #If we've got a system time change, we need to compensate it
434 #from now, we do not do anything in fact.
435 def compensate_system_time_change(self
, difference
):
436 logger
.log('Warning: A system time change of %s has been detected. Compensating...' % difference
)
437 #We only need to change some value
442 #Create and launch a new worker, and put it into self.workers
443 #It can be mortal or not
444 def create_and_launch_worker(self
, mortal
=True):
445 w
= Worker(1, self
.s
, self
.returns_queue
, self
.processes_by_worker
, \
446 mortal
=mortal
,max_plugins_output_length
= self
.max_plugins_output_length
)
447 self
.workers
[w
.id] = w
448 logger
.log("[%s] Allocating new Worker : %s" % (self
.name
, w
.id))
449 self
.workers
[w
.id].start()
452 #Manage signal function
453 #TODO : manage more than just quit
454 #Frame is just garbage
455 def manage_signal(self
, sig
, frame
):
456 logger
.log("\nExiting with signal %s" % sig
)
457 logger
.log('Stopping all workers')
458 for w
in self
.workers
.values():
462 #queue = w.return_queue
463 #self.return_messages.remove(queue)
464 except AttributeError: #A already die worker
466 except AssertionError: #In a worker
468 logger
.log('Stopping all network connexions')
469 self
.daemon
.disconnect(self
.interface
)
470 self
.daemon
.disconnect(self
.brok_interface
)
471 self
.daemon
.shutdown(True)
472 logger
.log("Unlinking pid file")
474 os
.unlink(self
.pidfile
)
476 print "Error un deleting pid file:", exp
477 logger
.log("Exiting")
481 #A simple fucntion to add objects in self
482 #like broks in self.broks, etc
483 #TODO : better tag ID?
485 if isinstance(elt
, Brok
):
486 #For brok, we TAG brok with our instance_id
487 elt
.data
['instance_id'] = 0
488 self
.broks
[elt
.id] = elt
492 #Someone ask us our broks. We send them, and clean the queue
494 res
= copy
.copy(self
.broks
)
499 #workers are processes, they can die in a numerous of ways
501 #*99.99% : bug in code, sorry :p
502 #*0.005 % : a mix between a stupid admin (or an admin without coffee),
504 #*0.005% : alien attack
505 #So they need to be detected, and restart if need
506 def check_and_del_zombie_workers(self
):
507 #Active children make a join with every one, useful :)
508 act
= active_children()
511 for w
in self
.workers
.values():
512 #If a worker go down and we do not ask him, it's not
513 #good : we can think having a worker and it's not True
516 logger
.log("[%s] Warning : the worker %s goes down unexpectly!" % (self
.name
, w
.id))
517 #AIM ... Press FIRE ... <B>HEAD SHOT!</B>
520 w_to_del
.append(w
.id)
521 #OK, now really del workers
526 #Here we create new workers if the queue load (len of verifs) is too long
527 def adjust_worker_number_by_load(self
):
528 #TODO : get a real value for a load
530 #I want at least min_workers or wish_workers (the biggest) but not more than max_workers
531 while len(self
.workers
) < self
.min_workers \
532 or (wish_worker
> len(self
.workers
) and len(self
.workers
) < self
.max_workers
):
533 self
.create_and_launch_worker()
534 #TODO : if len(workers) > 2*wish, maybe we can kill a worker?
537 #We get new actions from schedulers, we create a Message ant we
538 #put it in the s queue (from master to slave)
539 #REF: doc/shinken-action-queues.png (1)
540 def get_new_actions(self
):
541 #Here are the differences between a
542 #poller and a reactionner:
543 #Poller will only do checks,
544 #reactionner do actions
545 do_checks
= self
.__class
__.do_checks
546 do_actions
= self
.__class
__.do_actions
548 #We check for new check in each schedulers and put the result in new_checks
549 for sched_id
in self
.schedulers
:
550 sched
= self
.schedulers
[sched_id
]
551 #If sched is not active, I do not try return
552 if not sched
['active']:
557 if con
is not None: #None = not initilized
558 pyro
.set_timeout(con
, 120)
560 tmp
= con
.get_checks(do_checks
=do_checks
, do_actions
=do_actions
, poller_tags
=self
.poller_tags
)
561 print "Ask actions to", sched_id
, "got", len(tmp
)
562 #We 'tag' them with sched_id and put into queue for workers
563 #REF: doc/shinken-action-queues.png (2)
565 a
.sched_id
= sched_id
566 a
.set_status('queue')
567 msg
= Message(id=0, type='Do', data
=a
)
570 self
.nb_actions_in_workers
+= 1
571 else: #no con? make the connexion
572 self
.pynag_con_init(sched_id
)
573 #Ok, con is not know, so we create it
574 #Or maybe is the connexion lsot, we recreate it
575 except (KeyError, Pyro
.errors
.ProtocolError
) , exp
:
577 self
.pynag_con_init(sched_id
)
578 #scheduler must not be initialized
579 #or scheduler must not have checks
580 except (AttributeError, Pyro
.errors
.NamingError
) , exp
:
582 #What the F**k? We do not know what happenned,
584 except Pyro
.errors
.ConnectionClosedError
, exp
:
586 self
.pynag_con_init(sched_id
)
587 except Exception , exp
:
588 print ''.join(Pyro
.util
.getPyroTraceback(exp
))
593 #Main function, will loop forever
595 Pyro
.config
.PYRO_COMPRESSION
= 1
596 Pyro
.config
.PYRO_MULTITHREADED
= 0
597 Pyro
.config
.PYRO_STORAGE
= self
.workdir
598 logger
.log("Using working directory : %s" % os
.path
.abspath(self
.workdir
))
599 logger
.log("Opening port: %s" % self
.port
)
601 self
.daemon
= pyro
.init_daemon(self
.host
, self
.port
)
603 #Now we create the interfaces
604 self
.interface
= IForArbiter(self
)
605 self
.brok_interface
= IBroks(self
)
607 #And we register them
608 self
.uri2
= pyro
.register(self
.daemon
, self
.interface
, "ForArbiter")
609 self
.uri3
= pyro
.register(self
.daemon
, self
.brok_interface
, "Broks")
611 #We wait for initial conf
612 self
.wait_for_initial_conf()
614 #Connexion init with PyNag server
615 for sched_id
in self
.schedulers
:
617 self
.pynag_con_init(sched_id
)
618 self
.have_new_conf
= False
620 #Allocate Mortal Threads
621 for i
in xrange(1, self
.min_workers
):
622 self
.create_and_launch_worker() #create mortal worker
625 timeout
= self
.polling_interval
#default 1.0
627 begin_loop
= time
.time()
629 #Maybe the arbiter ask us to wait for a new conf
630 #If true, we must restart all...
631 if self
.have_conf
== False:
632 print "Begin wait initial"
633 self
.wait_for_initial_conf()
634 print "End wait initial"
635 #for sched_id in self.schedulers:
637 # self.pynag_con_init(sched_id)
639 #Now we check if arbiter speek to us in the daemon.
640 #If so, we listen for it
641 #When it push us conf, we reinit connexions
642 #Sleep in waiting a new conf :)
643 self
.watch_for_new_conf(timeout
)
645 #Manage a possible time change (our before will be change with the diff)
646 diff
= self
.check_for_system_time_change()
651 timeout
-= after
-begin_loop
653 if timeout
< 0: #for go in timeout
654 #print "Time out", timeout
657 except Empty
, exp
: #Time out Part
658 print " ======================== "
660 timeout
= self
.polling_interval
662 #Check if zombies workers are among us :)
663 #If so : KILL THEM ALL!!!
664 self
.check_and_del_zombie_workers()
666 #Print stats for debug
667 for sched_id
in self
.schedulers
:
668 sched
= self
.schedulers
[sched_id
]
669 #In workers we've got actions send to queue - queue size
670 print '[%d][%s]Stats : Workers:%d (Queued:%d Processing:%d ReturnWait:%d)' % \
671 (sched_id
, sched
['name'],len(self
.workers
), self
.s
.qsize(), \
672 self
.nb_actions_in_workers
- self
.s
.qsize(), len(self
.returns_queue
))
675 #Before return or get new actions, see how we manage
676 #old ones : are they still in queue (s)? If True, we
677 #must wait more or at least have more workers
678 wait_ratio
= self
.wait_ratio
.get_load()
679 if self
.s
.qsize() != 0 and wait_ratio
< 5*self
.polling_interval
:
680 print "I decide to up wait ratio"
681 self
.wait_ratio
.update_load(wait_ratio
* 2)
683 #Go to self.polling_interval on normal run, if wait_ratio
684 #was >5*self.polling_interval,
685 #it make it come near 5 because if < 5, go up :)
686 self
.wait_ratio
.update_load(self
.polling_interval
)
687 wait_ratio
= self
.wait_ratio
.get_load()
688 print "Wait ratio:", wait_ratio
690 #We can wait more than 1s if need,
691 #no more than 5s, but no less than 1
692 timeout
= timeout
* wait_ratio
693 timeout
= max(self
.polling_interval
, timeout
)
694 timeout
= min(5*self
.polling_interval
, timeout
)
696 #Maybe we do not have enouth workers, we check for it
697 #and launch new ones if need
698 self
.adjust_worker_number_by_load()
700 #Manage all messages we've got in the last timeout
701 #for queue in self.return_messages:
702 while(len(self
.returns_queue
) != 0):
703 self
.manage_action_return(self
.returns_queue
.pop())
705 #Now we can get new actions from schedulers
706 self
.get_new_actions()
708 #We send all finished checks
709 #REF: doc/shinken-action-queues.png (6)
710 self
.manage_returns()