Adding Peter Thatcher to the owners file.
[chromium-blink-merge.git] / build / android / pylib / chrome_test_server_spawner.py
blob052c2fde78ea6516ab15d759a492bf957a28e0d1
1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """A "Test Server Spawner" that handles killing/stopping per-test test servers.
7 It's used to accept requests from the device to spawn and kill instances of the
8 chrome test server on the host.
9 """
10 # pylint: disable=W0702
12 import BaseHTTPServer
13 import json
14 import logging
15 import os
16 import select
17 import struct
18 import subprocess
19 import sys
20 import threading
21 import time
22 import urlparse
24 from pylib import constants
25 from pylib import ports
27 from pylib.forwarder import Forwarder
30 # Path that are needed to import necessary modules when launching a testserver.
31 os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (':%s:%s:%s:%s:%s'
32 % (os.path.join(constants.DIR_SOURCE_ROOT, 'third_party'),
33 os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'tlslite'),
34 os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'pyftpdlib',
35 'src'),
36 os.path.join(constants.DIR_SOURCE_ROOT, 'net', 'tools', 'testserver'),
37 os.path.join(constants.DIR_SOURCE_ROOT, 'sync', 'tools', 'testserver')))
40 SERVER_TYPES = {
41 'http': '',
42 'ftp': '-f',
43 'sync': '', # Sync uses its own script, and doesn't take a server type arg.
44 'tcpecho': '--tcp-echo',
45 'udpecho': '--udp-echo',
49 # The timeout (in seconds) of starting up the Python test server.
50 TEST_SERVER_STARTUP_TIMEOUT = 10
52 def _WaitUntil(predicate, max_attempts=5):
53 """Blocks until the provided predicate (function) is true.
55 Returns:
56 Whether the provided predicate was satisfied once (before the timeout).
57 """
58 sleep_time_sec = 0.025
59 for _ in xrange(1, max_attempts):
60 if predicate():
61 return True
62 time.sleep(sleep_time_sec)
63 sleep_time_sec = min(1, sleep_time_sec * 2) # Don't wait more than 1 sec.
64 return False
67 def _CheckPortAvailable(port):
68 """Returns True if |port| is available."""
69 return _WaitUntil(lambda: ports.IsHostPortAvailable(port))
72 def _CheckPortNotAvailable(port):
73 """Returns True if |port| is not available."""
74 return _WaitUntil(lambda: not ports.IsHostPortAvailable(port))
77 def _CheckDevicePortStatus(device, port):
78 """Returns whether the provided port is used."""
79 return _WaitUntil(lambda: ports.IsDevicePortUsed(device, port))
82 def _GetServerTypeCommandLine(server_type):
83 """Returns the command-line by the given server type.
85 Args:
86 server_type: the server type to be used (e.g. 'http').
88 Returns:
89 A string containing the command-line argument.
90 """
91 if server_type not in SERVER_TYPES:
92 raise NotImplementedError('Unknown server type: %s' % server_type)
93 if server_type == 'udpecho':
94 raise Exception('Please do not run UDP echo tests because we do not have '
95 'a UDP forwarder tool.')
96 return SERVER_TYPES[server_type]
99 class TestServerThread(threading.Thread):
100 """A thread to run the test server in a separate process."""
102 def __init__(self, ready_event, arguments, device, tool):
103 """Initialize TestServerThread with the following argument.
105 Args:
106 ready_event: event which will be set when the test server is ready.
107 arguments: dictionary of arguments to run the test server.
108 device: An instance of DeviceUtils.
109 tool: instance of runtime error detection tool.
111 threading.Thread.__init__(self)
112 self.wait_event = threading.Event()
113 self.stop_flag = False
114 self.ready_event = ready_event
115 self.ready_event.clear()
116 self.arguments = arguments
117 self.device = device
118 self.tool = tool
119 self.test_server_process = None
120 self.is_ready = False
121 self.host_port = self.arguments['port']
122 assert isinstance(self.host_port, int)
123 # The forwarder device port now is dynamically allocated.
124 self.forwarder_device_port = 0
125 # Anonymous pipe in order to get port info from test server.
126 self.pipe_in = None
127 self.pipe_out = None
128 self.process = None
129 self.command_line = []
131 def _WaitToStartAndGetPortFromTestServer(self):
132 """Waits for the Python test server to start and gets the port it is using.
134 The port information is passed by the Python test server with a pipe given
135 by self.pipe_out. It is written as a result to |self.host_port|.
137 Returns:
138 Whether the port used by the test server was successfully fetched.
140 assert self.host_port == 0 and self.pipe_out and self.pipe_in
141 (in_fds, _, _) = select.select([self.pipe_in, ], [], [],
142 TEST_SERVER_STARTUP_TIMEOUT)
143 if len(in_fds) == 0:
144 logging.error('Failed to wait to the Python test server to be started.')
145 return False
146 # First read the data length as an unsigned 4-byte value. This
147 # is _not_ using network byte ordering since the Python test server packs
148 # size as native byte order and all Chromium platforms so far are
149 # configured to use little-endian.
150 # TODO(jnd): Change the Python test server and local_test_server_*.cc to
151 # use a unified byte order (either big-endian or little-endian).
152 data_length = os.read(self.pipe_in, struct.calcsize('=L'))
153 if data_length:
154 (data_length,) = struct.unpack('=L', data_length)
155 assert data_length
156 if not data_length:
157 logging.error('Failed to get length of server data.')
158 return False
159 port_json = os.read(self.pipe_in, data_length)
160 if not port_json:
161 logging.error('Failed to get server data.')
162 return False
163 logging.info('Got port json data: %s', port_json)
164 port_json = json.loads(port_json)
165 if port_json.has_key('port') and isinstance(port_json['port'], int):
166 self.host_port = port_json['port']
167 return _CheckPortNotAvailable(self.host_port)
168 logging.error('Failed to get port information from the server data.')
169 return False
171 def _GenerateCommandLineArguments(self):
172 """Generates the command line to run the test server.
174 Note that all options are processed by following the definitions in
175 testserver.py.
177 if self.command_line:
178 return
180 args_copy = dict(self.arguments)
182 # Translate the server type.
183 type_cmd = _GetServerTypeCommandLine(args_copy.pop('server-type'))
184 if type_cmd:
185 self.command_line.append(type_cmd)
187 # Use a pipe to get the port given by the instance of Python test server
188 # if the test does not specify the port.
189 assert self.host_port == args_copy['port']
190 if self.host_port == 0:
191 (self.pipe_in, self.pipe_out) = os.pipe()
192 self.command_line.append('--startup-pipe=%d' % self.pipe_out)
194 # Pass the remaining arguments as-is.
195 for key, values in args_copy.iteritems():
196 if not isinstance(values, list):
197 values = [values]
198 for value in values:
199 if value is None:
200 self.command_line.append('--%s' % key)
201 else:
202 self.command_line.append('--%s=%s' % (key, value))
204 def _CloseUnnecessaryFDsForTestServerProcess(self):
205 # This is required to avoid subtle deadlocks that could be caused by the
206 # test server child process inheriting undesirable file descriptors such as
207 # file lock file descriptors.
208 for fd in xrange(0, 1024):
209 if fd != self.pipe_out:
210 try:
211 os.close(fd)
212 except:
213 pass
215 def run(self):
216 logging.info('Start running the thread!')
217 self.wait_event.clear()
218 self._GenerateCommandLineArguments()
219 command = constants.DIR_SOURCE_ROOT
220 if self.arguments['server-type'] == 'sync':
221 command = [os.path.join(command, 'sync', 'tools', 'testserver',
222 'sync_testserver.py')] + self.command_line
223 else:
224 command = [os.path.join(command, 'net', 'tools', 'testserver',
225 'testserver.py')] + self.command_line
226 logging.info('Running: %s', command)
227 # Pass DIR_SOURCE_ROOT as the child's working directory so that relative
228 # paths in the arguments are resolved correctly.
229 self.process = subprocess.Popen(
230 command, preexec_fn=self._CloseUnnecessaryFDsForTestServerProcess,
231 cwd=constants.DIR_SOURCE_ROOT)
232 if self.process:
233 if self.pipe_out:
234 self.is_ready = self._WaitToStartAndGetPortFromTestServer()
235 else:
236 self.is_ready = _CheckPortNotAvailable(self.host_port)
237 if self.is_ready:
238 Forwarder.Map([(0, self.host_port)], self.device, self.tool)
239 # Check whether the forwarder is ready on the device.
240 self.is_ready = False
241 device_port = Forwarder.DevicePortForHostPort(self.host_port)
242 if device_port and _CheckDevicePortStatus(self.device, device_port):
243 self.is_ready = True
244 self.forwarder_device_port = device_port
245 # Wake up the request handler thread.
246 self.ready_event.set()
247 # Keep thread running until Stop() gets called.
248 _WaitUntil(lambda: self.stop_flag, max_attempts=sys.maxint)
249 if self.process.poll() is None:
250 self.process.kill()
251 Forwarder.UnmapDevicePort(self.forwarder_device_port, self.device)
252 self.process = None
253 self.is_ready = False
254 if self.pipe_out:
255 os.close(self.pipe_in)
256 os.close(self.pipe_out)
257 self.pipe_in = None
258 self.pipe_out = None
259 logging.info('Test-server has died.')
260 self.wait_event.set()
262 def Stop(self):
263 """Blocks until the loop has finished.
265 Note that this must be called in another thread.
267 if not self.process:
268 return
269 self.stop_flag = True
270 self.wait_event.wait()
273 class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
274 """A handler used to process http GET/POST request."""
276 def _SendResponse(self, response_code, response_reason, additional_headers,
277 contents):
278 """Generates a response sent to the client from the provided parameters.
280 Args:
281 response_code: number of the response status.
282 response_reason: string of reason description of the response.
283 additional_headers: dict of additional headers. Each key is the name of
284 the header, each value is the content of the header.
285 contents: string of the contents we want to send to client.
287 self.send_response(response_code, response_reason)
288 self.send_header('Content-Type', 'text/html')
289 # Specify the content-length as without it the http(s) response will not
290 # be completed properly (and the browser keeps expecting data).
291 self.send_header('Content-Length', len(contents))
292 for header_name in additional_headers:
293 self.send_header(header_name, additional_headers[header_name])
294 self.end_headers()
295 self.wfile.write(contents)
296 self.wfile.flush()
298 def _StartTestServer(self):
299 """Starts the test server thread."""
300 logging.info('Handling request to spawn a test server.')
301 content_type = self.headers.getheader('content-type')
302 if content_type != 'application/json':
303 raise Exception('Bad content-type for start request.')
304 content_length = self.headers.getheader('content-length')
305 if not content_length:
306 content_length = 0
307 try:
308 content_length = int(content_length)
309 except:
310 raise Exception('Bad content-length for start request.')
311 logging.info(content_length)
312 test_server_argument_json = self.rfile.read(content_length)
313 logging.info(test_server_argument_json)
314 assert not self.server.test_server_instance
315 ready_event = threading.Event()
316 self.server.test_server_instance = TestServerThread(
317 ready_event,
318 json.loads(test_server_argument_json),
319 self.server.device,
320 self.server.tool)
321 self.server.test_server_instance.setDaemon(True)
322 self.server.test_server_instance.start()
323 ready_event.wait()
324 if self.server.test_server_instance.is_ready:
325 self._SendResponse(200, 'OK', {}, json.dumps(
326 {'port': self.server.test_server_instance.forwarder_device_port,
327 'message': 'started'}))
328 logging.info('Test server is running on port: %d.',
329 self.server.test_server_instance.host_port)
330 else:
331 self.server.test_server_instance.Stop()
332 self.server.test_server_instance = None
333 self._SendResponse(500, 'Test Server Error.', {}, '')
334 logging.info('Encounter problem during starting a test server.')
336 def _KillTestServer(self):
337 """Stops the test server instance."""
338 # There should only ever be one test server at a time. This may do the
339 # wrong thing if we try and start multiple test servers.
340 if not self.server.test_server_instance:
341 return
342 port = self.server.test_server_instance.host_port
343 logging.info('Handling request to kill a test server on port: %d.', port)
344 self.server.test_server_instance.Stop()
345 # Make sure the status of test server is correct before sending response.
346 if _CheckPortAvailable(port):
347 self._SendResponse(200, 'OK', {}, 'killed')
348 logging.info('Test server on port %d is killed', port)
349 else:
350 self._SendResponse(500, 'Test Server Error.', {}, '')
351 logging.info('Encounter problem during killing a test server.')
352 self.server.test_server_instance = None
354 def do_POST(self):
355 parsed_path = urlparse.urlparse(self.path)
356 action = parsed_path.path
357 logging.info('Action for POST method is: %s.', action)
358 if action == '/start':
359 self._StartTestServer()
360 else:
361 self._SendResponse(400, 'Unknown request.', {}, '')
362 logging.info('Encounter unknown request: %s.', action)
364 def do_GET(self):
365 parsed_path = urlparse.urlparse(self.path)
366 action = parsed_path.path
367 params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1)
368 logging.info('Action for GET method is: %s.', action)
369 for param in params:
370 logging.info('%s=%s', param, params[param][0])
371 if action == '/kill':
372 self._KillTestServer()
373 elif action == '/ping':
374 # The ping handler is used to check whether the spawner server is ready
375 # to serve the requests. We don't need to test the status of the test
376 # server when handling ping request.
377 self._SendResponse(200, 'OK', {}, 'ready')
378 logging.info('Handled ping request and sent response.')
379 else:
380 self._SendResponse(400, 'Unknown request', {}, '')
381 logging.info('Encounter unknown request: %s.', action)
384 class SpawningServer(object):
385 """The class used to start/stop a http server."""
387 def __init__(self, test_server_spawner_port, device, tool):
388 logging.info('Creating new spawner on port: %d.', test_server_spawner_port)
389 self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port),
390 SpawningServerRequestHandler)
391 self.server.device = device
392 self.server.tool = tool
393 self.server.test_server_instance = None
394 self.server.build_type = constants.GetBuildType()
396 def _Listen(self):
397 logging.info('Starting test server spawner')
398 self.server.serve_forever()
400 def Start(self):
401 """Starts the test server spawner."""
402 listener_thread = threading.Thread(target=self._Listen)
403 listener_thread.setDaemon(True)
404 listener_thread.start()
406 def Stop(self):
407 """Stops the test server spawner.
409 Also cleans the server state.
411 self.CleanupState()
412 self.server.shutdown()
414 def CleanupState(self):
415 """Cleans up the spawning server state.
417 This should be called if the test server spawner is reused,
418 to avoid sharing the test server instance.
420 if self.server.test_server_instance:
421 self.server.test_server_instance.Stop()
422 self.server.test_server_instance = None