2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
7 #This file is part of Shinken.
9 #Shinken is free software: you can redistribute it and/or modify
10 #it under the terms of the GNU Affero General Public License as published by
11 #the Free Software Foundation, either version 3 of the License, or
12 #(at your option) any later version.
14 #Shinken is distributed in the hope that it will be useful,
15 #but WITHOUT ANY WARRANTY; without even the implied warranty of
16 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 #GNU Affero General Public License for more details.
19 #You should have received a copy of the GNU Affero General Public License
20 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 #This class is use to mnager modules and call callback
32 from multiprocessing
import Process
, Queue
34 #modulepath = os.path.join(os.path.dirname(imp.find_module("pluginloader")[1]), "modules/")
35 #Thanks http://pytute.blogspot.com/2007/04/python-plugin-system.html
37 from basemodule
import BaseModule
39 class ModulesManager(object):
41 def __init__(self
, modules_type
, modules_path
, modules
):
42 self
.modules_path
= modules_path
43 self
.modules_type
= modules_type
44 self
.modules
= modules
45 self
.allowed_types
= [ plug
.module_type
for plug
in modules
]
46 self
.imported_modules
= []
47 self
.modules_assoc
= []
53 #We get all modules file of our type (end with broker.py for example)
54 modules_files
= [ fname
[:-3] for fname
in os
.listdir(self
.modules_path
)
55 if fname
.endswith(self
.modules_type
+".py") ]
57 #And directories (no remove of .py but still with broker for example at the end)
58 modules_files
.extend([ fname
for fname
in os
.listdir(self
.modules_path
)
59 if fname
.endswith(self
.modules_type
) ])
61 # Now we try to load thems
62 if not self
.modules_path
in sys
.path
:
63 sys
.path
.append(self
.modules_path
)
65 del self
.imported_modules
[:]
66 for fname
in modules_files
:
68 print("importing %s" % (fname
))
69 self
.imported_modules
.append(__import__(fname
))
70 except ImportError , exp
:
71 print "Warning :", exp
73 del self
.modules_assoc
[:]
74 for mod_conf
in self
.modules
:
75 module_type
= mod_conf
.module_type
77 for module
in self
.imported_modules
:
78 if module
.properties
['type'] == module_type
:
79 self
.modules_assoc
.append((mod_conf
, module
))
83 #No module is suitable, we Raise a Warning
84 print "Warning : the module type %s for %s was not found in modules!" % (module_type
, mod_conf
.get_name())
87 def try_instance_init(self
, inst
):
88 """ Try to "init" the given module instance.
89 Returns: True on successfull init. False if instance init method raised any Exception. """
92 except Exception as e
:
93 print "Error : the instance %s raised an exception %s, I remove it!" % (inst
.get_name(), str(e
))
94 print "Back trace of this remove :"
95 traceback
.print_exc(file=sys
.stdout
)
99 def clear_instances(self
, insts
=None):
101 insts
= self
.instances
[:]
103 self
.remove_instance(i
)
105 # actually only arbiter call this method with start_external=False..
106 def get_instances(self
, start_external
=True):
107 """ Create, init and then returns the list of module instances that the caller needs.
108 By default (start_external=True) also start the execution of the instances that are "external".
109 If an instance can't be created or init'ed then only log is done. That instance is skipped. """
110 self
.clear_instances()
111 for (mod_conf
, module
) in self
.modules_assoc
:
113 mod_conf
.properties
= module
.properties
.copy()
114 inst
= module
.get_instance(mod_conf
)
115 if inst
is None: #None = Bad thing happened :)
117 ## TODO: temporary for back comptability with previous modules :
118 if not isinstance(inst
, BaseModule
):
119 print("Notice: module %s is old module style (not instance of basemodule.BaseModule)" % (mod_conf
.get_name()))
120 inst
.props
= inst
.properties
= mod_conf
.properties
.copy()
121 inst
.is_external
= inst
.props
['external'] = inst
.props
.get('external', False)
122 inst
.phases
= inst
.props
.get('phases', []) ## though a module defined with no phase is quite useless ..
123 inst
.phases
.append(None) ## to permit simpler get_*_ methods
125 self
.instances
.append(inst
)
126 except Exception , exp
:
127 print "Error : the module %s raised an exception %s, I remove it!" % (mod_conf
.get_name(), str(exp
))
128 print "Back trace of this remove :"
129 traceback
.print_exc(file=sys
.stdout
)
131 print "Load", len(self
.instances
), "module instances"
134 for inst
in self
.instances
:
135 if not self
.try_instance_init(inst
):
140 self
.instances
.remove(inst
)
143 self
.__start
_ext
_instances
()
145 return self
.instances
147 def __start_ext_instances(self
):
148 for inst
in self
.instances
:
150 self
.__set
_ext
_inst
_queues
(inst
)
151 print("Starting external process for instance %s" % (inst
.name
))
152 p
= inst
.process
= Process(target
=inst
.main
, args
=())
153 inst
.properties
['process'] = p
## TODO: temporary
155 print("%s is now started ; pid=%d" % (inst
.name
, p
.pid
))
157 def __set_ext_inst_queues(self
, inst
):
158 if isinstance(inst
, BaseModule
):
161 BaseModule
.create_queues__(inst
)
162 ## TODO: temporary until new style module is used by every shinken module:
163 inst
.properties
['to_queue'] = inst
.to_q
164 inst
.properties
['from_queue'] = inst
.from_q
166 # actually only called by arbiter...
167 # TODO: but this actually leads to a double "init" call.. maybe a "uninit" would be needed ?
168 def init_and_start_instances(self
):
169 for inst
in self
.instances
:
171 self
.__start
_ext
_instances
()
173 def close_inst_queues(self
, inst
):
174 """ Release the resources associated with the queues from the given module instance """
175 for q
in (inst
.to_q
, inst
.from_q
):
176 if q
is None: continue
179 inst
.to_q
= inst
.from_q
= None
181 def remove_instance(self
, inst
):
182 # External instances need to be close before (process + queues)
184 inst
.process
.terminate()
185 inst
.process
.join(timeout
=1)
186 self
.close_inst_queues(inst
)
187 # Then do not listen anymore about it
188 self
.instances
.remove(inst
)
191 def check_alive_instances(self
):
194 for inst
in self
.instances
:
195 if inst
.is_external
and not inst
.process
.is_alive():
196 print "Error : the external module %s goes down unexpectly!" % inst
.get_name()
200 self
.remove_instance(inst
)
203 def get_internal_instances(self
, phase
=None):
204 return [ inst
for inst
in self
.instances
if not inst
.is_external
and phase
in inst
.phases
]
207 def get_external_instances(self
, phase
=None):
208 return [ inst
for inst
in self
.instances
if inst
.is_external
and phase
in inst
.phases
]
211 def get_external_to_queues(self
, phase
=None):
212 return [ inst
.to_q
for inst
in self
.instances
if inst
.is_external
and phase
in inst
.phases
]
215 def get_external_from_queues(self
, phase
=None):
216 return [ inst
.from_q
for inst
in self
.instances
if inst
.is_external
and phase
in inst
.phases
]
220 #Ask internal to quit if they can
221 for inst
in self
.get_internal_instances():
222 if hasattr(inst
, 'quit') and callable(inst
.quit
):
224 for inst
in self
.get_external_instances():
225 self
.remove_instance(inst
)