2 # Copyright 2015 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
6 """A host test module demonstrating interacting with remote subprocesses."""
15 # Map the testing directory so we can import legion.legion_test.
16 TESTING_DIR
= os
.path
.join(
17 os
.path
.dirname(os
.path
.abspath(__file__
)),
18 '..', '..', '..', '..', 'testing')
19 sys
.path
.append(TESTING_DIR
)
21 from legion
import legion_test_case
22 from legion
import jsonrpclib
25 class ExampleTestController(legion_test_case
.TestCase
):
26 """An example controller using the remote subprocess functions."""
28 # We use python from the command line to mimic ls and sleep due to
29 # compatibility issues across OSes.
30 LS
= ['python', '-c', 'import os; print os.listdir(".")']
31 SLEEP10
= ['python', '-c', 'import time; time.sleep(10)']
32 SLEEP20
= ['python', '-c', 'import time; time.sleep(20)']
36 """Creates the task machine and waits until it connects."""
37 parser
= argparse
.ArgumentParser()
38 parser
.add_argument('--task-hash')
39 parser
.add_argument('--os', default
='Ubuntu-14.04')
40 args
, _
= parser
.parse_known_args()
42 cls
.task
= cls
.CreateTask(
43 isolated_hash
=args
.task_hash
,
44 dimensions
={'os': args
.os
},
46 connection_timeout_secs
=90,
47 verbosity
=logging
.DEBUG
)
49 cls
.task
.WaitForConnection()
51 def testMultipleProcesses(self
):
52 """Tests that processes can be run and controlled simultaneously."""
54 logging
.info('Starting "sleep 10" and "sleep 20"')
55 sleep10
= self
.task
.Process(self
.SLEEP10
)
56 sleep20
= self
.task
.Process(self
.SLEEP20
)
58 logging
.info('Waiting for "sleep 10" to finish and verifying timing')
60 elapsed
= time
.time() - start
61 self
.assertGreaterEqual(elapsed
, 10)
62 self
.assertLess(elapsed
, 11)
64 logging
.info('Waiting for "sleep 20" to finish and verifying timing')
66 elapsed
= time
.time() - start
67 self
.assertGreaterEqual(elapsed
, 20)
72 def testTerminate(self
):
73 """Tests that a process can be correctly terminated."""
76 logging
.info('Starting "sleep 20"')
77 sleep20
= self
.task
.Process(self
.SLEEP20
)
78 logging
.info('Calling Terminate()')
81 logging
.info('Trying to wait for "sleep 20" to complete')
83 except xmlrpclib
.Fault
:
87 logging
.info('Checking to make sure "sleep 20" was actually terminated')
88 self
.assertLess(time
.time() - start
, 20)
91 """Tests that the returned results from a process are correct."""
92 ls
= self
.task
.Process(self
.LS
)
93 logging
.info('Trying to wait for "ls" to complete')
95 self
.assertEqual(ls
.GetReturncode(), 0)
96 self
.assertIn('task.isolate', ls
.ReadStdout())
98 def testProcessOutput(self
):
99 """Tests that a process's output gets logged to a file in the output-dir."""
100 code
= ('import sys\n'
101 'sys.stdout.write("Hello stdout")\n'
102 'sys.stderr.write("Hello stderr")')
103 self
.task
.rpc
.WriteFile('test.py', code
)
104 proc
= self
.task
.Process(['python', 'test.py'])
107 self
.CheckProcessOutput('stdout', proc
.key
, 'Hello stdout')
108 self
.CheckProcessOutput('stderr', proc
.key
, 'Hello stderr')
110 def testCustomKey(self
):
111 """Tests that a custom key passed to a process works correctly."""
112 code
= ('import sys\n'
113 'sys.stdout.write("Hello CustomKey stdout")\n'
114 'sys.stderr.write("Hello CustomKey stderr")')
115 self
.task
.rpc
.WriteFile('test.py', code
)
116 proc
= self
.task
.Process(['python', 'test.py'], key
='CustomKey')
119 self
.CheckProcessOutput('stdout', 'CustomKey', 'Hello CustomKey stdout')
120 self
.CheckProcessOutput('stderr', 'CustomKey', 'Hello CustomKey stderr')
122 def testKeyReuse(self
):
123 """Tests that a key cannot be reused."""
124 self
.task
.Process(self
.LS
, key
='KeyReuse')
125 self
.assertRaises(jsonrpclib
.Fault
, self
.task
.Process
, self
.LS
,
128 def CheckProcessOutput(self
, pipe
, key
, expected
):
129 """Checks that a process' output files are correct."""
130 logging
.info('Reading output file')
131 output_dir
= self
.task
.rpc
.GetOutputDir()
132 path
= self
.task
.rpc
.PathJoin(output_dir
, '%s.%s' % (key
, pipe
))
133 actual
= self
.task
.rpc
.ReadFile(path
)
134 self
.assertEqual(expected
, actual
)
137 if __name__
== '__main__':
138 legion_test_case
.main()