2 # -*- coding: utf-8 -*-
3 #Copyright (c) 2009 Ondřej Súkup
4 #Parallel Python Software: http://www.parallelpython.com
5 # Copyright (c) 2005-2009, Vitalii Vanovschi
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are met:
9 # * Redistributions of source code must retain the above copyright notice,
10 # this list of conditions and the following disclaimer.
11 # * Redistributions in binary form must reproduce the above copyright
12 # notice, this list of conditions and the following disclaimer in the
13 # documentation and/or other materials provided with the distribution.
14 # * Neither the name of the author nor the names of its contributors
15 # may be used to endorse or promote products derived from this software
16 # without specific prior written permission.
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
28 # THE POSSIBILITY OF SUCH DAMAGE.
45 copyright
= "Copyright (c) 2009 Ondřej Súkup /n \
46 Copyright (c) 2005-2009 Vitalii Vanovschi. All rights reserved"
48 # compartibility with Python 2.6
51 sha_new
= hashlib
.sha1
57 class _NetworkServer(Server
):
58 """Network Server Class
61 def __init__(self
, ncpus
="autodetect", interface
="0.0.0.0",
62 broadcast
="255.255.255.255", port
=None, secret
=None,
63 timeout
=None, loglevel
=logging
.WARNING
, restart
=False,
65 Server
.__init
__(self
, ncpus
, secret
=secret
, loglevel
=loglevel
,
66 restart
=restart
, proto
=proto
)
68 self
.bcast
= broadcast
72 self
.port
= self
.default_port
73 self
.timeout
= timeout
75 self
.last_con_time
= time
.time()
76 self
.ncon_lock
= _thread
.allocate_lock()
78 logging
.debug("Strarting network server interface=%s port=%i"
79 % (self
.host
, self
.port
))
80 if self
.timeout
is not None:
81 logging
.debug("ppserver will exit in %i seconds if no "\
82 "connections with clients exist" % (self
.timeout
))
83 _thread
.start_new_thread(self
.check_timeout
, ())
85 def ncon_add(self
, val
):
86 """Keeps track of the number of connections and time of the last one"""
87 self
.ncon_lock
.acquire()
89 self
.last_con_time
= time
.time()
90 self
.ncon_lock
.release()
92 def check_timeout(self
):
93 """Checks if timeout happened and shutdowns server if it did"""
96 idle_time
= time
.time() - self
.last_con_time
97 if idle_time
< self
.timeout
:
98 time
.sleep(self
.timeout
- idle_time
)
100 logging
.debug("exiting ppserver due to timeout (no client"\
101 " connections in last %i sec)", self
.timeout
)
104 time
.sleep(self
.timeout
)
107 """Initiates listenting to incoming connections"""
109 ssocket
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
110 # following allows ppserver to restart faster on the same port
111 ssocket
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
112 ssocket
.bind((self
.host
, self
.port
))
115 logging
.error("Cannot create socket with port " + str(self
.port
)
116 + " (port is already in use)")
120 #accept connections from outside
121 (csocket
, address
) = ssocket
.accept()
122 #now do something with the clientsocket
123 #in this case, we'll pretend this is a threaded server
124 _thread
.start_new_thread(self
.crun
, (csocket
, ))
126 logging
.debug("Closing server socket")
129 def crun(self
, csocket
):
130 """Authenticates client and handles its jobs"""
131 mysocket
= pptransport
.CSocketTransport(csocket
)
133 mysocket
.send(version
)
134 #generate a random string
135 srandom
= "".join([random
.choice(string
.ascii_letters
)
137 mysocket
.send(srandom
)
138 answer
= sha_new(srandom
+self
.secret
).hexdigest()
139 cleintanswer
= mysocket
.receive()
140 if answer
!= cleintanswer
:
141 logging
.warning("Authentification failed, client host=%s, port=%i"
142 % csocket
.getpeername())
143 mysocket
.send("FAILED")
149 ctype
= mysocket
.receive()
150 logging
.debug("Control message received: " + ctype
)
154 #reset time at each new connection
155 self
.get_stats()["local"].time
= 0.0
156 mysocket
.send(str(self
.get_ncpus()))
159 mysocket
.send(str(self
.get_stats()["local"].time
))
162 sfunc
= mysocket
.creceive()
163 sargs
= mysocket
.receive()
164 fun
= self
.insert(sfunc
, sargs
)
166 mysocket
.send(sresult
)
168 #print sys.excepthook(*sys.exc_info())
169 logging
.debug("Closing client socket")
174 """Initiaates auto-discovery mechanism"""
175 discover
= ppauto
.Discover(self
)
176 _thread
.start_new_thread(discover
.run
,
177 ((self
.host
, self
.port
),
178 (self
.bcast
, self
.port
)),
182 def parse_config(file_loc
):
184 Parses a config file in a very forgiving way.
186 # If we don't have configobj installed then let the user know and exit
188 from configobj
import ConfigObj
189 except ImportError as ie
:
190 print("ERROR: You must have configobj installed to use \
191 configuration files. You can still use command line switches.", file=sys
.stderr
)
194 if not os
.access(file_loc
, os
.F_OK
):
195 print("ERROR: Can not access %s." % arg
, file=sys
.stderr
)
198 # Load the configuration file
199 config
= ConfigObj(file_loc
)
200 # try each config item and use the result if it exists. If it doesn't
201 # then simply pass and move along
203 args
['secret'] = config
['general'].get('secret')
208 autodiscovery
= config
['network'].as_bool('autodiscovery')
213 args
['interface'] = config
['network'].get('interface',
219 args
['broadcast'] = config
['network'].get('broadcast')
224 args
['port'] = config
['network'].as_int('port')
229 args
['loglevel'] = config
['general'].as_bool('debug')
234 args
['ncpus'] = config
['general'].as_int('workers')
239 args
['proto'] = config
['general'].as_int('proto')
244 args
['restart'] = config
['general'].as_bool('restart')
249 args
['timeout'] = config
['network'].as_int('timeout')
252 # Return a tuple of the args dict and autodiscovery variable
253 return args
, autodiscovery
258 print("Parallel Python Network Server (pp-" + version
+ ")")
259 print("Usage: ppserver.py [-hdar] [-n proto] [-c config_path]"\
260 " [-i interface] [-b broadcast] [-p port] [-w nworkers]"\
261 " [-s secret] [-t seconds]")
264 print("-h : this help message")
266 print("-a : enable auto-discovery service")
267 print("-r : restart worker process after each"\
269 print("-n proto : protocol number for pickle module")
270 print("-c path : path to config file")
271 print("-i interface : interface to listen")
272 print("-b broadcast : broadcast address for auto-discovery service")
273 print("-p port : port to listen")
274 print("-w nworkers : number of workers to start")
275 print("-s secret : secret for authentication")
276 print("-t seconds : timeout to exit if no connections with "\
279 print("Due to the security concerns always use a non-trivial secret key.")
280 print("Secret key set by -s switch will override secret key assigned by")
281 print("pp_secret variable in .pythonrc.py")
283 print("Please visit http://www.parallelpython.com for extended up-to-date")
284 print("documentation, examples and support forums")
287 if __name__
== "__main__":
289 opts
, args
= getopt
.getopt(sys
.argv
[1:],
290 "hdarn:c:b:i:p:w:s:t:", ["help"])
291 except getopt
.GetoptError
:
296 autodiscovery
= False
298 for opt
, arg
in opts
:
299 if opt
in ("-h", "--help"):
303 args
, autodiscovery
= parse_config(arg
)
305 args
["loglevel"] = logging
.DEBUG
307 args
["interface"] = arg
311 args
["port"] = int(arg
)
313 args
["ncpus"] = int(arg
)
317 args
["restart"] = True
319 args
["broadcast"] = arg
321 args
["proto"] = int(arg
)
323 args
["timeout"] = int(arg
)
325 server
= _NetworkServer(**args
)
329 #have to destroy it here explicitelly otherwise an exception
330 #comes out in Python 2.4
333 # Parallel Python Software: http://www.parallelpython.com