Add ICU message format support
[chromium-blink-merge.git] / testing / legion / examples / subprocess / subprocess_test.py
blob30b4f6ee562f472d7fac39345f40049eab4bfaeb
1 #!/usr/bin/env python
2 # Copyright 2015 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
6 """A host test module demonstrating interacting with remote subprocesses."""
8 import argparse
9 import logging
10 import os
11 import sys
12 import time
13 import xmlrpclib
15 # Map the testing directory so we can import legion.legion_test.
16 TESTING_DIR = os.path.join(
17 os.path.dirname(os.path.abspath(__file__)),
18 '..', '..', '..', '..', 'testing')
19 sys.path.append(TESTING_DIR)
21 from legion import legion_test_case
22 from legion import jsonrpclib
25 class ExampleTestController(legion_test_case.TestCase):
26 """An example controller using the remote subprocess functions."""
28 @classmethod
29 def setUpClass(cls):
30 """Creates the task machine and waits until it connects."""
31 parser = argparse.ArgumentParser()
32 parser.add_argument('--task-hash')
33 parser.add_argument('--os', default='Ubuntu-14.04')
34 args, _ = parser.parse_known_args()
36 cls.task = cls.CreateTask(
37 isolated_hash=args.task_hash,
38 dimensions={'os': args.os},
39 idle_timeout_secs=90,
40 connection_timeout_secs=90,
41 verbosity=logging.DEBUG)
42 cls.task.Create()
43 cls.task.WaitForConnection()
45 def testMultipleProcesses(self):
46 """Tests that processes can be run and controlled simultaneously."""
47 start = time.time()
48 logging.info('Starting "sleep 10" and "sleep 20"')
49 sleep10 = self.task.Process(['sleep', '10'])
50 sleep20 = self.task.Process(['sleep', '20'])
52 logging.info('Waiting for sleep 10 to finish and verifying timing')
53 sleep10.Wait()
54 elapsed = time.time() - start
55 self.assertGreaterEqual(elapsed, 10)
56 self.assertLess(elapsed, 11)
58 logging.info('Waiting for sleep 20 to finish and verifying timing')
59 sleep20.Wait()
60 elapsed = time.time() - start
61 self.assertGreaterEqual(elapsed, 20)
63 sleep10.Delete()
64 sleep20.Delete()
66 def testTerminate(self):
67 """Tests that a process can be correctly terminated."""
68 start = time.time()
70 logging.info('Starting "sleep 20"')
71 sleep20 = self.task.Process(['sleep', '20'])
72 logging.info('Calling Terminate()')
73 sleep20.Terminate()
74 try:
75 logging.info('Trying to wait for sleep 20 to complete')
76 sleep20.Wait()
77 except xmlrpclib.Fault:
78 pass
79 finally:
80 sleep20.Delete()
81 logging.info('Checking to make sure sleep 20 was actually terminated')
82 self.assertLess(time.time() - start, 20)
84 def testLs(self):
85 """Tests that the returned results from a process are correct."""
86 logging.info('Calling "ls"')
87 ls = self.task.Process(['ls'])
88 logging.info('Trying to wait for ls to complete')
89 ls.Wait()
90 logging.info('Checking that ls completed and returned the correct results')
91 self.assertEqual(ls.GetReturncode(), 0)
92 self.assertIn('task.isolate', ls.ReadStdout())
94 def testProcessOutput(self):
95 """Tests that a process's output gets logged to a file in the output-dir."""
96 code = ('import sys\n'
97 'sys.stdout.write("Hello stdout")\n'
98 'sys.stderr.write("Hello stderr")')
99 self.task.rpc.WriteFile('test.py', code)
100 proc = self.task.Process(['python', 'test.py'],)
101 proc.Wait()
103 self.CheckProcessOutput('stdout', proc.key, 'Hello stdout')
104 self.CheckProcessOutput('stderr', proc.key, 'Hello stderr')
106 def testCustomKey(self):
107 """Tests that a custom key passed to a process works correctly."""
108 code = ('import sys\n'
109 'sys.stdout.write("Hello CustomKey stdout")\n'
110 'sys.stderr.write("Hello CustomKey stderr")')
111 self.task.rpc.WriteFile('test.py', code)
112 proc = self.task.Process(['python', 'test.py'], key='CustomKey')
113 proc.Wait()
115 self.CheckProcessOutput('stdout', 'CustomKey', 'Hello CustomKey stdout')
116 self.CheckProcessOutput('stderr', 'CustomKey', 'Hello CustomKey stderr')
118 def testKeyReuse(self):
119 """Tests that a key cannot be reused."""
120 self.task.Process(['ls'], key='KeyReuse')
121 self.assertRaises(jsonrpclib.Fault, self.task.Process, ['ls'],
122 key='KeyReuse')
124 def CheckProcessOutput(self, pipe, key, expected):
125 """Checks that a process' output files are correct."""
126 logging.info('Reading output file')
127 output_dir = self.task.rpc.GetOutputDir()
128 path = self.task.rpc.PathJoin(output_dir, '%s.%s' % (key, pipe))
129 actual = self.task.rpc.ReadFile(path)
130 self.assertEqual(expected, actual)
132 if __name__ == '__main__':
133 legion_test_case.main()