2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
25 This class is an interface for reactionner and poller
26 The satallite listen configuration from Arbiter in a port
27 the configuration gived by arbiter is schedulers where actionner will
30 When already launch and have a conf, actionner still listen to arbiter
33 if arbiter want it to have a new conf, satellite forgot old schedulers
34 (and actions into) take new ones and do the (new) job.
38 from multiprocessing
import Queue
, Manager
, active_children
47 import shinken
.pyro_wrapper
as pyro
49 sys
.exit("Shinken require the Python Pyro module. Please install it.")
54 from shinken
.message
import Message
55 from shinken
.worker
import Worker
56 from shinken
.load
import Load
57 from shinken
.daemon
import Daemon
, Interface
58 from shinken
.log
import logger
59 from shinken
.brok
import Brok
60 from shinken
.check
import Check
61 from shinken
.notification
import Notification
62 from shinken
.eventhandler
import EventHandler
65 # Interface for Arbiter, our big MASTER
67 class IForArbiter(Interface
):
69 # Arbiter ask us to do not manage a scheduler_id anymore
70 # I do it and don't ask why
71 def remove_from_conf(self
, sched_id
):
73 del self
.app
.schedulers
[sched_id
]
77 # Arbiter ask me which sched_id I manage, If it is not ok with it
78 # It will ask me to remove one or more sched_id
79 def what_i_managed(self
):
80 return self
.app
.schedulers
.keys()
82 # Call by arbiter if it thinks we are running but we must do not (like
83 # if I was a spare that take a conf but the master returns, I must die
84 # and wait a new conf)
86 # Arbiter : I don't care, hasta la vista baby!
87 # Us : ... <- Nothing! We are die! you don't follow
88 # anything or what?? Reading code is not a job for eyes only...
89 def wait_new_conf(self
):
90 print "Arbiter want me to wait a new conf"
91 self
.app
.schedulers
.clear()
92 self
.app
.cur_conf
= None
94 ### NB: following methods are only used by broker:
96 def push_broks(self
, broks
):
97 """ Used by the Arbiter to push broks to broker """
98 self
.app
.add_broks_to_queue(broks
.values())
101 # The arbiter ask us our external commands in queue
102 def get_external_commands(self
):
103 return self
.app
.get_external_commands()
106 # Interface for Schedulers
107 # If we are passive, they connect to this and
109 class ISchedulers(Interface
):
111 # A Scheduler send me actions to do
112 def push_actions(self
, actions
, sched_id
):
113 #print "A scheduler sned me actions", actions
114 self
.app
.add_actions(actions
, sched_id
)
117 # A scheduler ask us its returns
118 def get_returns(self
, sched_id
):
119 #print "A scheduler ask me the returns", sched_id
120 ret
= self
.app
.get_return_for_passive(sched_id
)
121 #print "Send mack", len(ret), "returns"
125 # Interface for Brokers
126 # They connect here and get all broks (data for brokers)
127 # datas must be ORDERED! (initial status BEFORE uodate...)
128 class IBroks(Interface
):
130 # poller or reactionner ask us actions
132 # print "We ask us broks"
133 res
= self
.app
.get_broks()
138 class BaseSatellite(Daemon
):
139 def __init__(self
, name
, config_file
, is_daemon
, do_replace
, debug
, debug_file
):
141 super(BaseSatellite
, self
).__init
__(name
, config_file
, is_daemon
, do_replace
, debug
, debug_file
)
146 # Now we create the interfaces
147 self
.interface
= IForArbiter(self
)
149 # The arbiter can resent us new conf in the pyro_daemon port.
150 # We do not want to loose time about it, so it's not a bloking
152 # If it send us a new conf, we reinit the connexions of all schedulers
153 def watch_for_new_conf(self
, timeout
):
154 self
.handleRequests(timeout
)
158 print "Stopping all network connexions"
159 self
.pyro_daemon
.unregister(self
.interface
)
160 super(BaseSatellite
, self
).do_stop()
164 class Satellite(BaseSatellite
):
165 def __init__(self
, name
, config_file
, is_daemon
, do_replace
, debug
, debug_file
):
167 super(Satellite
, self
).__init
__(name
, config_file
, is_daemon
, do_replace
, debug
, debug_file
)
169 # Keep broks so they can be eaten by a broker
172 self
.workers
= {} # dict of active workers
174 self
.nb_actions_in_workers
= 0
176 # Init stats like Load for workers
177 self
.wait_ratio
= Load(initial_value
=1)
179 self
.brok_interface
= IBroks(self
)
180 self
.scheduler_interface
= ISchedulers(self
)
182 # Just for having these attributes defined here. explicit > implicit ;)
187 self
.returns_queue
= None
189 self
.worker_modules
= {}
192 def pynag_con_init(self
, id):
193 """ Initialize or re-initialize connexion with scheduler """
194 sched
= self
.schedulers
[id]
195 # If sched is not active, I do not try to init
197 if not sched
['active']:
200 logger
.log("[%s] Init de connexion with %s at %s" % (self
.name
, sched
['name'], sched
['uri']))
201 running_id
= sched
['running_id']
202 sch_con
= sched
['con'] = Pyro
.core
.getProxyForURI(sched
['uri'])
205 # and get the running id
207 pyro
.set_timeout(sch_con
, 5)
208 new_run_id
= sch_con
.get_running_id()
209 except (Pyro
.errors
.ProtocolError
,Pyro
.errors
.NamingError
, cPickle
.PicklingError
, KeyError, Pyro
.errors
.CommunicationError
) , exp
:
210 logger
.log("[%s] Scheduler %s is not initilised or got network problem: %s" % (self
.name
, sched
['name'], str(exp
)))
214 # The schedulers have been restart : it has a new run_id.
215 # So we clear all verifs, they are obsolete now.
216 if sched
['running_id'] != 0 and new_run_id
!= running_id
:
217 logger
.log("[%s] The running id of the scheduler %s changed, we must clear it's actions" % (self
.name
, sched
['name']))
218 sched
['wait_homerun'].clear()
219 sched
['running_id'] = new_run_id
220 logger
.log("[%s] Connexion OK with scheduler %s" % (self
.name
, sched
['name']))
223 # Manage action return from Workers
224 # We just put them into the sched they are for
225 # and we clean unused properties like sched_id
226 def manage_action_return(self
, action
):
227 # Ok, it's a result. We get it, and fill verifs of the good sched_id
228 sched_id
= action
.sched_id
229 # Now we now where to put action, we do not need sched_id anymore
231 action
.status
= 'waitforhomerun'
232 self
.schedulers
[sched_id
]['wait_homerun'][action
.get_id()] = action
234 self
.nb_actions_in_workers
=- 1
237 # Return the chk to scheduler and clean them
238 # REF: doc/shinken-action-queues.png (6)
239 def manage_returns(self
):
241 # Fot all schedulers, we check for waitforhomerun and we send back results
242 for sched_id
in self
.schedulers
:
243 sched
= self
.schedulers
[sched_id
]
244 # If sched is not active, I do not try return
245 if not sched
['active']:
247 # Now ret have all verifs, we can return them
249 ret
= sched
['wait_homerun'].values()
253 if con
is not None: # None = not initialized
254 send_ok
= con
.put_results(ret
)
255 # Not connected or sched is gone
256 except (Pyro
.errors
.ProtocolError
, KeyError) , exp
:
258 self
.pynag_con_init(sched_id
)
260 except AttributeError , exp
: # the scheduler must not be initialized
262 except Exception , exp
:
263 print ''.join(Pyro
.util
.getPyroTraceback(exp
))
266 # We clean ONLY if the send is OK
268 sched
['wait_homerun'].clear()
270 self
.pynag_con_init(sched_id
)
271 logger
.log("Sent failed!")
274 # Get all returning actions for a call from a
276 def get_return_for_passive(self
, sched_id
):
277 # I do not know this scheduler?
278 if sched_id
not in self
.schedulers
:
279 print "I do not know about the scheduler", sched_id
282 sched
= self
.schedulers
[sched_id
]
283 print "Preparing to return", sched
['wait_homerun'].values()
284 ret
= copy
.copy(sched
['wait_homerun'].values())
285 sched
['wait_homerun'].clear()
286 print "Finally return", ret
290 # Create and launch a new worker, and put it into self.workers
291 # It can be mortal or not
292 def create_and_launch_worker(self
, module_name
='fork', mortal
=True):
293 q
= self
.worker_modules
[module_name
]['to_q']
295 # If we are in the fork module, do not specify a target
297 if module_name
== 'fork':
300 for module
in self
.modules_manager
.instances
:
301 if module
.properties
['type'] == module_name
:
305 w
= Worker(1, q
, self
.returns_queue
, self
.processes_by_worker
, \
306 mortal
=mortal
,max_plugins_output_length
= self
.max_plugins_output_length
, target
=target
)
307 self
.workers
[w
.id] = w
308 logger
.log("[%s] Allocating new %s Worker : %s" % (self
.name
, module_name
, w
.id))
313 logger
.log('Stopping all workers')
314 for w
in self
.workers
.values():
318 # queue = w.return_queue
319 # self.return_messages.remove(queue)
320 except AttributeError: # A already die worker
322 except AssertionError: # In a worker
325 logger
.log('Stopping all network connexions')
326 self
.pyro_daemon
.unregister(self
.brok_interface
)
327 self
.pyro_daemon
.unregister(self
.scheduler_interface
)
328 super(Satellite
, self
).do_stop()
331 # A simple fucntion to add objects in self
332 # like broks in self.broks, etc
333 # TODO : better tag ID?
335 if isinstance(elt
, Brok
):
336 # For brok, we TAG brok with our instance_id
337 elt
.data
['instance_id'] = 0
338 self
.broks
[elt
.id] = elt
342 # Someone ask us our broks. We send them, and clean the queue
344 res
= copy
.copy(self
.broks
)
349 # workers are processes, they can die in a numerous of ways
351 # *99.99% : bug in code, sorry :p
352 # *0.005 % : a mix between a stupid admin (or an admin without coffee),
354 # *0.005% : alien attack
355 # So they need to be detected, and restart if need
356 def check_and_del_zombie_workers(self
):
357 # Active children make a join with every one, useful :)
358 act
= active_children()
361 for w
in self
.workers
.values():
362 # If a worker go down and we do not ask him, it's not
363 # good : we can think having a worker and it's not True
366 logger
.log("[%s] Warning : the worker %s goes down unexpectly!" % (self
.name
, w
.id))
367 # AIM ... Press FIRE ... <B>HEAD SHOT!</B>
370 w_to_del
.append(w
.id)
371 # OK, now really del workers
376 # Here we create new workers if the queue load (len of verifs) is too long
377 def adjust_worker_number_by_load(self
):
378 # TODO : get a real value for a load
380 # I want at least min_workers or wish_workers (the biggest) but not more than max_workers
381 while len(self
.workers
) < self
.min_workers \
382 or (wish_worker
> len(self
.workers
) and len(self
.workers
) < self
.max_workers
):
383 for mod
in self
.worker_modules
:
384 self
.create_and_launch_worker(module_name
=mod
)
385 # TODO : if len(workers) > 2*wish, maybe we can kill a worker?
388 # Get the Queue() from an action by looking at which module
390 def _got_queue_from_action(self
, a
):
391 if hasattr(a
, 'module_type'):
392 if a
.module_type
in self
.worker_modules
:
393 if a
.module_type
!= 'fork':
394 print "GOT A SPECIAL QUEUE (%s) for" % a
.module_type
, a
.__dict
__,
395 return self
.worker_modules
[a
.module_type
]['to_q']
396 # Nothing found, it's not good at all!
398 # If none, call the standard 'fork'
399 return self
.worker_modules
['fork']['to_q']
402 # Add to our queues a list of actions
403 def add_actions(self
, lst
, sched_id
):
405 a
.sched_id
= sched_id
407 msg
= Message(id=0, type='Do', data
=a
)
408 q
= self
._got
_queue
_from
_action
(a
)
412 self
.nb_actions_in_workers
+= 1
415 # We get new actions from schedulers, we create a Message ant we
416 # put it in the s queue (from master to slave)
417 # REF: doc/shinken-action-queues.png (1)
418 def get_new_actions(self
):
419 # Here are the differences between a
420 # poller and a reactionner:
421 # Poller will only do checks,
422 # reactionner do actions
423 do_checks
= self
.__class
__.do_checks
424 do_actions
= self
.__class
__.do_actions
426 # We check for new check in each schedulers and put the result in new_checks
427 for sched_id
in self
.schedulers
:
428 sched
= self
.schedulers
[sched_id
]
429 # If sched is not active, I do not try return
430 if not sched
['active']:
435 if con
is not None: # None = not initilized
436 pyro
.set_timeout(con
, 120)
438 tmp
= con
.get_checks(do_checks
=do_checks
, do_actions
=do_actions
, \
439 poller_tags
=self
.poller_tags
, worker_name
=self
.name
)
440 print "Ask actions to", sched_id
, "got", len(tmp
)
441 # We 'tag' them with sched_id and put into queue for workers
442 # REF: doc/shinken-action-queues.png (2)
443 self
.add_actions(tmp
, sched_id
)
445 # a.sched_id = sched_id
447 # msg = Message(id=0, type='Do', data=a)
448 # q = self._got_queue_from_action(a)
452 # self.nb_actions_in_workers += 1
453 else: # no con? make the connexion
454 self
.pynag_con_init(sched_id
)
455 # Ok, con is not know, so we create it
456 # Or maybe is the connexion lsot, we recreate it
457 except (KeyError, Pyro
.errors
.ProtocolError
) , exp
:
459 self
.pynag_con_init(sched_id
)
460 # scheduler must not be initialized
461 # or scheduler must not have checks
462 except (AttributeError, Pyro
.errors
.NamingError
) , exp
:
464 # What the F**k? We do not know what happenned,
466 except Pyro
.errors
.ConnectionClosedError
, exp
:
468 self
.pynag_con_init(sched_id
)
469 except Exception , exp
:
470 print ''.join(Pyro
.util
.getPyroTraceback(exp
))
474 def do_loop_turn(self
):
475 # Maybe the arbiter ask us to wait for a new conf
476 # If true, we must restart all...
477 if self
.cur_conf
is None:
478 print "Begin wait initial"
479 self
.wait_for_initial_conf()
480 print "End wait initial"
481 if not self
.new_conf
: # we may have been interrupted or so; then just return from this loop turn
483 self
.setup_new_conf()
485 # Now we check if arbiter speek to us in the pyro_daemon.
486 # If so, we listen for it
487 # When it push us conf, we reinit connexions
488 # Sleep in waiting a new conf :)
489 self
.watch_for_new_conf(self
.timeout
)
491 self
.setup_new_conf()
493 print " ======================== "
495 self
.timeout
= self
.polling_interval
497 # Check if zombies workers are among us :)
498 # If so : KILL THEM ALL!!!
499 self
.check_and_del_zombie_workers()
501 # Print stats for debug
502 for sched_id
in self
.schedulers
:
503 sched
= self
.schedulers
[sched_id
]
504 for mod
in self
.worker_modules
:
505 # In workers we've got actions send to queue - queue size
506 q
= self
.worker_modules
[mod
]['to_q']
507 print '[%d][%s][%s]Stats : Workers:%d (Queued:%d Processing:%d ReturnWait:%d)' % \
508 (sched_id
, sched
['name'], mod
, len(self
.workers
), q
.qsize(), \
509 self
.nb_actions_in_workers
- q
.qsize(), len(self
.returns_queue
))
512 # Before return or get new actions, see how we manage
513 # old ones : are they still in queue (s)? If True, we
514 # must wait more or at least have more workers
515 wait_ratio
= self
.wait_ratio
.get_load()
517 for mod
in self
.worker_modules
:
518 q
= self
.worker_modules
[mod
]['to_q']
520 if total_q
!= 0 and wait_ratio
< 5*self
.polling_interval
:
521 print "I decide to up wait ratio"
522 self
.wait_ratio
.update_load(wait_ratio
* 2)
524 # Go to self.polling_interval on normal run, if wait_ratio
525 # was >5*self.polling_interval,
526 # it make it come near 5 because if < 5, go up :)
527 self
.wait_ratio
.update_load(self
.polling_interval
)
528 wait_ratio
= self
.wait_ratio
.get_load()
529 print "Wait ratio:", wait_ratio
531 # We can wait more than 1s if need,
532 # no more than 5s, but no less than 1
533 timeout
= self
.timeout
* wait_ratio
534 timeout
= max(self
.polling_interval
, timeout
)
535 self
.timeout
= min(5*self
.polling_interval
, timeout
)
537 # Maybe we do not have enouth workers, we check for it
538 # and launch new ones if need
539 self
.adjust_worker_number_by_load()
541 # Manage all messages we've got in the last timeout
542 # for queue in self.return_messages:
543 while len(self
.returns_queue
) != 0:
544 self
.manage_action_return(self
.returns_queue
.pop())
546 # If we are passive, we do not initiate the check getting
548 print "Am I passive?", self
.passive
550 print "I try to get new actions!"
551 # Now we can get new actions from schedulers
552 self
.get_new_actions()
554 # We send all finished checks
555 # REF: doc/shinken-action-queues.png (6)
556 self
.manage_returns()
559 def do_post_daemon_init(self
):
560 """ Do this satellite (poller or reactionner) post "daemonize" init:
561 we must register our interfaces for 3 possible callers: arbiter, schedulers or brokers. """
562 # And we register them
563 self
.uri2
= self
.pyro_daemon
.register(self
.interface
, "ForArbiter")
564 self
.uri3
= self
.pyro_daemon
.register(self
.brok_interface
, "Broks")
565 self
.uri4
= self
.pyro_daemon
.register(self
.scheduler_interface
, "Schedulers")
567 # self.s = Queue() # Global Master -> Slave
568 # We can open the Queeu for fork AFTER
569 self
.worker_modules
['fork'] = {'to_q' : Queue()}
570 self
.manager
= Manager()
571 self
.returns_queue
= self
.manager
.list()
574 def setup_new_conf(self
):
575 """ Setup the new received conf """
577 print "[%s] Sending us a configuration %s " % (self
.name
, conf
)
580 # Gout our name from the globals
581 if 'poller_name' in conf
['global']:
582 name
= conf
['global']['poller_name']
583 elif 'reactionner_name' in conf
['global']:
584 name
= conf
['global']['reactionner_name']
586 name
= 'Unnamed satellite'
589 self
.passive
= conf
['global']['passive']
590 print "Is passive?", self
.passive
592 logger
.log("[%s] Passive mode enabled." % self
.name
)
594 # If we've got something in the schedulers, we do not want it anymore
595 for sched_id
in conf
['schedulers'] :
597 if sched_id
in self
.schedulers
:
598 logger
.log("[%s] We already got the conf %d (%s)" % (self
.name
, sched_id
, conf
['schedulers'][sched_id
]['name']))
600 wait_homerun
= self
.schedulers
[sched_id
]['wait_homerun']
601 s
= conf
['schedulers'][sched_id
]
602 self
.schedulers
[sched_id
] = s
604 uri
= pyro
.create_uri(s
['address'], s
['port'], 'Checks', self
.use_ssl
)
605 print "DBG: scheduler UIR:", uri
607 self
.schedulers
[sched_id
]['uri'] = uri
609 self
.schedulers
[sched_id
]['wait_homerun'] = wait_homerun
611 self
.schedulers
[sched_id
]['wait_homerun'] = {}
612 self
.schedulers
[sched_id
]['running_id'] = 0
613 self
.schedulers
[sched_id
]['active'] = s
['active']
615 # Do not connect if we are a passive satellite
617 # And then we connect to it :)
618 self
.pynag_con_init(sched_id
)
621 self
.max_workers
= conf
['global']['max_workers']
622 self
.min_workers
= conf
['global']['min_workers']
624 self
.processes_by_worker
= conf
['global']['processes_by_worker']
625 self
.polling_interval
= conf
['global']['polling_interval']
626 self
.timeout
= self
.polling_interval
627 if 'poller_tags' in conf
['global']:
628 self
.poller_tags
= conf
['global']['poller_tags']
629 else: # for reactionner, poler_tag is [None]
630 self
.poller_tags
= []
631 if 'max_plugins_output_length' in conf
['global']:
632 self
.max_plugins_output_length
= conf
['global']['max_plugins_output_length']
633 else: # for reactionner, we don't really care about it
634 self
.max_plugins_output_length
= 8192
635 print "Max output lenght" , self
.max_plugins_output_length
636 # Set our giving timezone from arbiter
637 use_timezone
= conf
['global']['use_timezone']
638 if use_timezone
!= 'NOTSET':
639 logger
.log("[%s] Setting our timezone to %s" %(self
.name
, use_timezone
))
640 os
.environ
['TZ'] = use_timezone
643 logger
.log("We have our schedulers : %s" % (str(self
.schedulers
)))
646 # TODO: check how to better handle this with modules_manager..
647 mods
= conf
['global']['modules']
649 # If we already got it, bypass
650 if not module
.module_type
in self
.worker_modules
:
651 print "Add module object", module
652 self
.modules_manager
.modules
.append(module
)
653 logger
.log("[%s] Got module : %s " % (self
.name
, module
.module_type
))
654 self
.worker_modules
[module
.module_type
] = {'to_q' : Queue()}
659 for line
in self
.get_header():
662 self
.load_config_file()
664 self
.do_daemon_init_and_start()
666 self
.do_post_daemon_init()
668 # We wait for initial conf
669 self
.wait_for_initial_conf()
670 if not self
.new_conf
: # we must have either big problem or was requested to shutdown
672 self
.setup_new_conf()
674 # We can load our modules now
675 self
.modules_manager
.set_modules(self
.modules_manager
.modules
)
676 self
.modules_manager
.load_and_init()
678 # Allocate Mortal Threads
679 for _
in xrange(1, self
.min_workers
):
680 for mod
in self
.worker_modules
:
681 self
.create_and_launch_worker(module_name
=mod
)