Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / testing / legion / examples / subprocess / subprocess_test.py
blob9bd486ab39bdc59afdf269ebc803bda898685aee
1 #!/usr/bin/env python
2 # Copyright 2015 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
6 """A host test module demonstrating interacting with remote subprocesses."""
8 import argparse
9 import logging
10 import os
11 import sys
12 import time
13 import xmlrpclib
15 # Map the testing directory so we can import legion.legion_test.
16 TESTING_DIR = os.path.join(
17 os.path.dirname(os.path.abspath(__file__)),
18 '..', '..', '..', '..', 'testing')
19 sys.path.append(TESTING_DIR)
21 from legion import legion_test_case
22 from legion import jsonrpclib
25 class ExampleTestController(legion_test_case.TestCase):
26 """An example controller using the remote subprocess functions."""
28 # We use python from the command line to mimic ls and sleep due to
29 # compatibility issues across OSes.
30 LS = ['python', '-c', 'import os; print os.listdir(".")']
31 SLEEP10 = ['python', '-c', 'import time; time.sleep(10)']
32 SLEEP20 = ['python', '-c', 'import time; time.sleep(20)']
34 @classmethod
35 def setUpClass(cls):
36 """Creates the task machine and waits until it connects."""
37 parser = argparse.ArgumentParser()
38 parser.add_argument('--task-hash')
39 parser.add_argument('--os', default='Ubuntu-14.04')
40 args, _ = parser.parse_known_args()
42 cls.task = cls.CreateTask(
43 isolated_hash=args.task_hash,
44 dimensions={'os': args.os},
45 idle_timeout_secs=90,
46 connection_timeout_secs=90,
47 verbosity=logging.DEBUG)
48 cls.task.Create()
49 cls.task.WaitForConnection()
51 def testMultipleProcesses(self):
52 """Tests that processes can be run and controlled simultaneously."""
53 start = time.time()
54 logging.info('Starting "sleep 10" and "sleep 20"')
55 sleep10 = self.task.Process(self.SLEEP10)
56 sleep20 = self.task.Process(self.SLEEP20)
58 logging.info('Waiting for "sleep 10" to finish and verifying timing')
59 sleep10.Wait()
60 elapsed = time.time() - start
61 self.assertGreaterEqual(elapsed, 10)
62 self.assertLess(elapsed, 11)
64 logging.info('Waiting for "sleep 20" to finish and verifying timing')
65 sleep20.Wait()
66 elapsed = time.time() - start
67 self.assertGreaterEqual(elapsed, 20)
69 sleep10.Delete()
70 sleep20.Delete()
72 def testTerminate(self):
73 """Tests that a process can be correctly terminated."""
74 start = time.time()
76 logging.info('Starting "sleep 20"')
77 sleep20 = self.task.Process(self.SLEEP20)
78 logging.info('Calling Terminate()')
79 sleep20.Terminate()
80 try:
81 logging.info('Trying to wait for "sleep 20" to complete')
82 sleep20.Wait()
83 except xmlrpclib.Fault:
84 pass
85 finally:
86 sleep20.Delete()
87 logging.info('Checking to make sure "sleep 20" was actually terminated')
88 self.assertLess(time.time() - start, 20)
90 def testLs(self):
91 """Tests that the returned results from a process are correct."""
92 ls = self.task.Process(self.LS)
93 logging.info('Trying to wait for "ls" to complete')
94 ls.Wait()
95 self.assertEqual(ls.GetReturncode(), 0)
96 self.assertIn('task.isolate', ls.ReadStdout())
98 def testProcessOutput(self):
99 """Tests that a process's output gets logged to a file in the output-dir."""
100 code = ('import sys\n'
101 'sys.stdout.write("Hello stdout")\n'
102 'sys.stderr.write("Hello stderr")')
103 self.task.rpc.WriteFile('test.py', code)
104 proc = self.task.Process(['python', 'test.py'])
105 proc.Wait()
107 self.CheckProcessOutput('stdout', proc.key, 'Hello stdout')
108 self.CheckProcessOutput('stderr', proc.key, 'Hello stderr')
110 def testCustomKey(self):
111 """Tests that a custom key passed to a process works correctly."""
112 code = ('import sys\n'
113 'sys.stdout.write("Hello CustomKey stdout")\n'
114 'sys.stderr.write("Hello CustomKey stderr")')
115 self.task.rpc.WriteFile('test.py', code)
116 proc = self.task.Process(['python', 'test.py'], key='CustomKey')
117 proc.Wait()
119 self.CheckProcessOutput('stdout', 'CustomKey', 'Hello CustomKey stdout')
120 self.CheckProcessOutput('stderr', 'CustomKey', 'Hello CustomKey stderr')
122 def testKeyReuse(self):
123 """Tests that a key cannot be reused."""
124 self.task.Process(self.LS, key='KeyReuse')
125 self.assertRaises(jsonrpclib.Fault, self.task.Process, self.LS,
126 key='KeyReuse')
128 def CheckProcessOutput(self, pipe, key, expected):
129 """Checks that a process' output files are correct."""
130 logging.info('Reading output file')
131 output_dir = self.task.rpc.GetOutputDir()
132 path = self.task.rpc.PathJoin(output_dir, '%s.%s' % (key, pipe))
133 actual = self.task.rpc.ReadFile(path)
134 self.assertEqual(expected, actual)
137 if __name__ == '__main__':
138 legion_test_case.main()