2 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
6 """Shards a given test suite and runs the shards in parallel.
8 ShardingSupervisor is called to process the command line options and creates
9 the specified number of worker threads. These threads then run each shard of
10 the test in a separate process and report on the results. When all the shards
11 have been completed, the supervisor reprints any lines indicating a test
12 failure for convenience. If only one shard is to be run, a single subprocess
13 is started for that shard and the output is identical to gtest's output.
25 from stdio_buffer
import StdioBuffer
26 from xml
.dom
import minidom
29 BASE_PATH
= os
.path
.dirname(os
.path
.abspath(__file__
))
30 sys
.path
.append(os
.path
.join(BASE_PATH
, ".."))
32 import find_depot_tools
# pylint: disable=F0401,W0611
33 # Fixes a bug in Windows where some shards die upon starting
34 # TODO(charleslee): actually fix this bug
35 import subprocess2
as subprocess
37 # Unable to find depot_tools, so just use standard subprocess
40 SS_USAGE
= "python %prog [options] path/to/test [gtest_args]"
41 SS_DEFAULT_NUM_CORES
= 4
42 SS_DEFAULT_SHARDS_PER_CORE
= 5 # num_shards = cores * SHARDS_PER_CORE
43 SS_DEFAULT_RUNS_PER_CORE
= 1 # num_workers = cores * RUNS_PER_CORE
44 SS_DEFAULT_RETRY_PERCENT
= 5 # --retry-failed ignored if more than 5% fail
45 SS_DEFAULT_TIMEOUT
= 530 # Slightly less than buildbot's default 600 seconds
49 """Detects the number of cores on the machine.
52 The number of cores on the machine or DEFAULT_NUM_CORES if it could not
56 # Override on some Chromium Valgrind bots.
57 if "CHROME_VALGRIND_NUMCPUS" in os
.environ
:
58 return int(os
.environ
["CHROME_VALGRIND_NUMCPUS"])
60 if hasattr(os
, "sysconf"):
61 if "SC_NPROCESSORS_ONLN" in os
.sysconf_names
:
63 return int(os
.sysconf("SC_NPROCESSORS_ONLN"))
66 return int(os
.popen2("sysctl -n hw.ncpu")[1].read())
68 return int(os
.environ
["NUMBER_OF_PROCESSORS"])
70 return SS_DEFAULT_NUM_CORES
73 def GetGTestOutput(args
):
74 """Extracts gtest_output from the args. Returns none if not present."""
77 if '--gtest_output=' in arg
:
78 return arg
.split('=')[1]
82 def AppendToGTestOutput(gtest_args
, value
):
84 current_value
= GetGTestOutput(args
)
88 current_arg
= '--gtest_output=' + current_value
89 args
.remove(current_arg
)
90 args
.append('--gtest_output=' + current_value
+ value
)
94 def RemoveGTestOutput(gtest_args
):
96 current_value
= GetGTestOutput(args
)
100 args
.remove('--gtest_output=' + current_value
)
104 def AppendToXML(final_xml
, generic_path
, shard
):
105 """Combine the shard xml file with the final xml file."""
107 path
= generic_path
+ str(shard
)
110 with
open(path
) as shard_xml_file
:
111 shard_xml
= minidom
.parse(shard_xml_file
)
113 # If the shard crashed, gtest will not have generated an xml file.
117 # Out final xml is empty, let's prepopulate it with the first one we see.
120 shard_node
= shard_xml
.documentElement
121 final_node
= final_xml
.documentElement
123 testcases
= shard_node
.getElementsByTagName('testcase')
124 final_testcases
= final_node
.getElementsByTagName('testcase')
126 final_testsuites
= final_node
.getElementsByTagName('testsuite')
127 final_testsuites_by_name
= dict(
128 (suite
.getAttribute('name'), suite
) for suite
in final_testsuites
)
130 for testcase
in testcases
:
131 name
= testcase
.getAttribute('name')
132 classname
= testcase
.getAttribute('classname')
133 failures
= testcase
.getElementsByTagName('failure')
134 status
= testcase
.getAttribute('status')
135 elapsed
= testcase
.getAttribute('time')
137 # don't bother updating the final xml if there is no data.
138 if status
== 'notrun':
141 # Look in our final xml to see if it's there.
142 # There has to be a better way...
143 merged_into_final_testcase
= False
144 for final_testcase
in final_testcases
:
145 final_name
= final_testcase
.getAttribute('name')
146 final_classname
= final_testcase
.getAttribute('classname')
147 if final_name
== name
and final_classname
== classname
:
148 # We got the same entry.
149 final_testcase
.setAttribute('status', status
)
150 final_testcase
.setAttribute('time', elapsed
)
151 for failure
in failures
:
152 final_testcase
.appendChild(failure
)
153 merged_into_final_testcase
= True
155 # We couldn't find an existing testcase to merge the results into, so we
156 # copy the node into the existing test suite.
157 if not merged_into_final_testcase
:
158 testsuite
= testcase
.parentNode
159 final_testsuite
= final_testsuites_by_name
[testsuite
.getAttribute('name')]
160 final_testsuite
.appendChild(testcase
)
165 def RunShard(test
, total_shards
, index
, gtest_args
, stdout
, stderr
):
166 """Runs a single test shard in a subprocess.
169 The Popen object representing the subprocess handle.
173 # If there is a gtest_output
174 test_args
= AppendToGTestOutput(gtest_args
, str(index
))
175 args
.extend(test_args
)
176 env
= os
.environ
.copy()
177 env
["GTEST_TOTAL_SHARDS"] = str(total_shards
)
178 env
["GTEST_SHARD_INDEX"] = str(index
)
180 # Use a unique log file for each shard
181 # Allows ui_tests to be run in parallel on the same machine
182 env
["CHROME_LOG_FILE"] = "chrome_log_%d" % index
184 return subprocess
.Popen(
189 universal_newlines
=True)
192 class ShardRunner(threading
.Thread
):
193 """Worker thread that manages a single shard at a time.
196 supervisor: The ShardingSupervisor that this worker reports to.
197 counter: Called to get the next shard index to run.
198 test_start: Regex that detects when a test runs.
199 test_ok: Regex that detects a passing test.
200 test_fail: Regex that detects a failing test.
201 current_test: The name of the currently running test.
204 def __init__(self
, supervisor
, counter
, test_start
, test_ok
, test_fail
):
205 """Inits ShardRunner and sets the current test to nothing."""
206 threading
.Thread
.__init
__(self
)
207 self
.supervisor
= supervisor
208 self
.counter
= counter
209 self
.test_start
= test_start
210 self
.test_ok
= test_ok
211 self
.test_fail
= test_fail
212 self
.current_test
= ""
214 def ReportFailure(self
, description
, index
, test_name
):
215 """Assembles and reports a failure line to be printed later."""
216 log_line
= "%s (%i): %s\n" % (description
, index
, test_name
)
217 self
.supervisor
.LogTestFailure(log_line
)
219 def ProcessLine(self
, index
, line
):
220 """Checks a shard output line for test status, and reports a failure or
221 incomplete test if needed.
223 results
= self
.test_start
.search(line
)
225 if self
.current_test
:
226 self
.ReportFailure("INCOMPLETE", index
, self
.current_test
)
227 self
.current_test
= results
.group(1)
228 self
.supervisor
.IncrementTestCount()
231 results
= self
.test_ok
.search(line
)
233 self
.current_test
= ""
236 results
= self
.test_fail
.search(line
)
238 self
.ReportFailure("FAILED", index
, results
.group(1))
239 self
.current_test
= ""
242 """Runs shards and outputs the results.
244 Gets the next shard index from the supervisor, runs it in a subprocess,
245 and collects the output. The output is read character by character in
246 case the shard crashes without an ending newline. Each line is processed
251 index
= self
.counter
.get_nowait()
256 self
.supervisor
.test
, self
.supervisor
.total_shards
, index
,
257 self
.supervisor
.gtest_args
, subprocess
.PIPE
, subprocess
.PIPE
)
258 buf
= StdioBuffer(shard
)
259 # Spawn two threads to collect stdio output
260 stdout_collector_thread
= buf
.handle_pipe(sys
.stdout
, shard
.stdout
)
261 stderr_collector_thread
= buf
.handle_pipe(sys
.stderr
, shard
.stderr
)
263 pipe
, line
= buf
.readline()
264 if pipe
is None and line
is None:
265 shard_running
= False
266 if not line
and not shard_running
:
268 self
.ProcessLine(index
, line
)
269 self
.supervisor
.LogOutputLine(index
, line
, pipe
)
270 stdout_collector_thread
.join()
271 stderr_collector_thread
.join()
272 if self
.current_test
:
273 self
.ReportFailure("INCOMPLETE", index
, self
.current_test
)
274 self
.supervisor
.ShardIndexCompleted(index
)
275 if shard
.returncode
!= 0:
276 self
.supervisor
.LogShardFailure(index
)
279 class ShardingSupervisor(object):
280 """Supervisor object that handles the worker threads.
283 test: Name of the test to shard.
284 num_shards_to_run: Total number of shards to split the test into.
285 num_runs: Total number of worker threads to create for running shards.
286 color: Indicates which coloring mode to use in the output.
287 original_order: True if shard output should be printed as it comes.
288 prefix: True if each line should indicate the shard index.
289 retry_percent: Integer specifying the max percent of tests to retry.
290 gtest_args: The options to pass to gtest.
291 failed_tests: List of statements from shard output indicating a failure.
292 failed_shards: List of shards that contained failing tests.
293 shards_completed: List of flags indicating which shards have finished.
294 shard_output: Buffer that stores output from each shard as (stdio, line).
295 test_counter: Stores the total number of tests run.
296 total_slaves: Total number of slaves running this test.
297 slave_index: Current slave to run tests for.
299 If total_slaves is set, we run only a subset of the tests. This is meant to be
300 used when we want to shard across machines as well as across cpus. In that
301 case the number of shards to execute will be the same, but they will be
302 smaller, as the total number of shards in the test suite will be multiplied
305 For example, if you are on a quad core machine, the sharding supervisor by
306 default will use 20 shards for the whole suite. However, if you set
307 total_slaves to 2, it will split the suite in 40 shards and will only run
308 shards [0-19] or shards [20-39] depending if you set slave_index to 0 or 1.
311 SHARD_COMPLETED
= object()
313 def __init__(self
, test
, num_shards_to_run
, num_runs
, color
, original_order
,
314 prefix
, retry_percent
, timeout
, total_slaves
, slave_index
,
316 """Inits ShardingSupervisor with given options and gtest arguments."""
318 # Number of shards to run locally.
319 self
.num_shards_to_run
= num_shards_to_run
320 # Total shards in the test suite running across all slaves.
321 self
.total_shards
= num_shards_to_run
* total_slaves
322 self
.slave_index
= slave_index
323 self
.num_runs
= num_runs
325 self
.original_order
= original_order
327 self
.retry_percent
= retry_percent
328 self
.timeout
= timeout
329 self
.gtest_args
= gtest_args
330 self
.failed_tests
= []
331 self
.failed_shards
= []
332 self
.shards_completed
= [False] * self
.num_shards_to_run
333 self
.shard_output
= [Queue
.Queue() for _
in range(self
.num_shards_to_run
)]
334 self
.test_counter
= itertools
.count()
337 """Runs the test and manages the worker threads.
339 Runs the test and outputs a summary at the end. All the tests in the
340 suite are run by creating (cores * runs_per_core) threads and
341 (cores * shards_per_core) shards. When all the worker threads have
342 finished, the lines saved in failed_tests are printed again. If enabled,
343 and failed tests that do not have FLAKY or FAILS in their names are run
344 again, serially, and the results are printed.
347 1 if some unexpected (not FLAKY or FAILS) tests failed, 0 otherwise.
350 # Regular expressions for parsing GTest logs. Test names look like
351 # SomeTestCase.SomeTest
352 # SomeName/SomeTestCase.SomeTest/1
353 # This regex also matches SomeName.SomeTest/1 and
354 # SomeName/SomeTestCase.SomeTest, which should be harmless.
355 test_name_regex
= r
"((\w+/)?\w+\.\w+(/\d+)?)"
357 # Regex for filtering out ANSI escape codes when using color.
358 ansi_regex
= r
"(?:\x1b\[.*?[a-zA-Z])?"
360 test_start
= re
.compile(
361 ansi_regex
+ r
"\[\s+RUN\s+\] " + ansi_regex
+ test_name_regex
)
362 test_ok
= re
.compile(
363 ansi_regex
+ r
"\[\s+OK\s+\] " + ansi_regex
+ test_name_regex
)
364 test_fail
= re
.compile(
365 ansi_regex
+ r
"\[\s+FAILED\s+\] " + ansi_regex
+ test_name_regex
)
368 counter
= Queue
.Queue()
369 start_point
= self
.num_shards_to_run
* self
.slave_index
370 for i
in range(start_point
, start_point
+ self
.num_shards_to_run
):
373 for i
in range(self
.num_runs
):
374 worker
= ShardRunner(
375 self
, counter
, test_start
, test_ok
, test_fail
)
377 workers
.append(worker
)
378 if self
.original_order
:
379 for worker
in workers
:
384 # All the shards are done. Merge all the XML files and generate the
386 output_arg
= GetGTestOutput(self
.gtest_args
)
388 xml
, xml_path
= output_arg
.split(':', 1)
391 for i
in range(start_point
, start_point
+ self
.num_shards_to_run
):
392 final_xml
= AppendToXML(final_xml
, xml_path
, i
)
395 with
open(xml_path
, 'w') as final_file
:
396 final_xml
.writexml(final_file
)
398 num_failed
= len(self
.failed_shards
)
400 self
.failed_shards
.sort()
401 self
.WriteText(sys
.stdout
,
402 "\nFAILED SHARDS: %s\n" % str(self
.failed_shards
),
405 self
.WriteText(sys
.stdout
, "\nALL SHARDS PASSED!\n", "\x1b[1;5;32m")
406 self
.PrintSummary(self
.failed_tests
)
407 if self
.retry_percent
< 0:
408 return len(self
.failed_shards
) > 0
410 self
.failed_tests
= [x
for x
in self
.failed_tests
if x
.find("FAILS_") < 0]
411 self
.failed_tests
= [x
for x
in self
.failed_tests
if x
.find("FLAKY_") < 0]
412 if not self
.failed_tests
:
414 return self
.RetryFailedTests()
416 def LogTestFailure(self
, line
):
417 """Saves a line in the lsit of failed tests to be printed at the end."""
418 if line
not in self
.failed_tests
:
419 self
.failed_tests
.append(line
)
421 def LogShardFailure(self
, index
):
422 """Records that a test in the given shard has failed."""
423 self
.failed_shards
.append(index
)
425 def WaitForShards(self
):
426 """Prints the output from each shard in consecutive order, waiting for
427 the current shard to finish before starting on the next shard.
430 for shard_index
in range(self
.num_shards_to_run
):
433 _
, line
= self
.shard_output
[shard_index
].get(True, self
.timeout
)
435 # Shard timed out, notice failure and move on.
436 self
.LogShardFailure(shard_index
)
437 # TODO(maruel): Print last test. It'd be simpler to have the
438 # processing in the main thread.
439 # TODO(maruel): Make sure the worker thread terminates.
440 sys
.stdout
.write('TIMED OUT\n\n')
442 'FAILURE: SHARD %d TIMED OUT; %d seconds' % (
443 shard_index
, self
.timeout
))
445 if line
is self
.SHARD_COMPLETED
:
447 sys
.stdout
.write(line
)
450 print 'CAUGHT EXCEPTION: dumping remaining data:'
451 for shard_index
in range(self
.num_shards_to_run
):
454 _
, line
= self
.shard_output
[shard_index
].get(False)
456 # Shard timed out, notice failure and move on.
457 self
.LogShardFailure(shard_index
)
459 if line
is self
.SHARD_COMPLETED
:
461 sys
.stdout
.write(line
)
464 def LogOutputLine(self
, index
, line
, pipe
=sys
.stdout
):
465 """Either prints the shard output line immediately or saves it in the
466 output buffer, depending on the settings. Also optionally adds a prefix.
467 Adds a (sys.stdout, line) or (sys.stderr, line) tuple in the output queue.
470 array_index
= index
- (self
.num_shards_to_run
* self
.slave_index
)
472 line
= "%i>%s" % (index
, line
)
473 if self
.original_order
:
476 self
.shard_output
[array_index
].put((pipe
, line
))
478 def IncrementTestCount(self
):
479 """Increments the number of tests run. This is relevant to the
480 --retry-percent option.
482 self
.test_counter
.next()
484 def ShardIndexCompleted(self
, index
):
485 """Records that a shard has finished so the output from the next shard
489 array_index
= index
- (self
.num_shards_to_run
* self
.slave_index
)
490 self
.shard_output
[array_index
].put((sys
.stdout
, self
.SHARD_COMPLETED
))
492 def RetryFailedTests(self
):
493 """Reruns any failed tests serially and prints another summary of the
494 results if no more than retry_percent failed.
496 num_tests_run
= self
.test_counter
.next()
497 if len(self
.failed_tests
) > self
.retry_percent
* num_tests_run
:
498 sys
.stdout
.write("\nNOT RETRYING FAILED TESTS (too many failed)\n")
500 self
.WriteText(sys
.stdout
, "\nRETRYING FAILED TESTS:\n", "\x1b[1;5;33m")
501 sharded_description
= re
.compile(r
": (?:\d+>)?(.*)")
502 gtest_filters
= [sharded_description
.search(line
).group(1)
503 for line
in self
.failed_tests
]
504 sys
.stdout
.write("\nRETRY GTEST FILTERS: %r\n" % gtest_filters
)
507 for test_filter
in gtest_filters
:
508 args
= [self
.test
, "--gtest_filter=" + test_filter
]
509 # Don't update the xml output files during retry.
510 stripped_gtests_args
= RemoveGTestOutput(self
.gtest_args
)
511 args
.extend(stripped_gtests_args
)
512 sys
.stdout
.write("\nRETRY COMMAND: %r\n" % args
)
513 rerun
= subprocess
.Popen(args
, stdout
=sys
.stdout
, stderr
=sys
.stderr
)
515 if rerun
.returncode
!= 0:
516 failed_retries
.append(test_filter
)
518 self
.WriteText(sys
.stdout
, "RETRY RESULTS:\n", "\x1b[1;5;33m")
519 self
.PrintSummary(failed_retries
)
520 return len(failed_retries
) > 0
522 def PrintSummary(self
, failed_tests
):
523 """Prints a summary of the test results.
525 If any shards had failing tests, the list is sorted and printed. Then all
526 the lines that indicate a test failure are reproduced.
529 self
.WriteText(sys
.stdout
, "FAILED TESTS:\n", "\x1b[1;5;31m")
530 for line
in failed_tests
:
531 sys
.stdout
.write(line
)
533 self
.WriteText(sys
.stdout
, "ALL TESTS PASSED!\n", "\x1b[1;5;32m")
535 def WriteText(self
, pipe
, text
, ansi
):
536 """Writes the text to the pipe with the ansi escape code, if colored
537 output is set, for Unix systems.
547 parser
= optparse
.OptionParser(usage
=SS_USAGE
)
549 "-n", "--shards_per_core", type="int", default
=SS_DEFAULT_SHARDS_PER_CORE
,
550 help="number of shards to generate per CPU")
552 "-r", "--runs_per_core", type="int", default
=SS_DEFAULT_RUNS_PER_CORE
,
553 help="number of shards to run in parallel per CPU")
555 "-c", "--color", action
="store_true",
556 default
=sys
.platform
!= "win32" and sys
.stdout
.isatty(),
557 help="force color output, also used by gtest if --gtest_color is not"
560 "--no-color", action
="store_false", dest
="color",
561 help="disable color output")
563 "-s", "--runshard", type="int", help="single shard index to run")
565 "--reorder", action
="store_true",
566 help="ensure that all output from an earlier shard is printed before"
567 " output from a later shard")
568 # TODO(charleslee): for backwards compatibility with master.cfg file
570 "--original-order", action
="store_true",
571 help="print shard output in its orginal jumbled order of execution"
572 " (useful for debugging flaky tests)")
574 "--prefix", action
="store_true",
575 help="prefix each line of shard output with 'N>', where N is the shard"
576 " index (forced True when --original-order is True)")
578 "--random-seed", action
="store_true",
579 help="shuffle the tests with a random seed value")
581 "--retry-failed", action
="store_true",
582 help="retry tests that did not pass serially")
584 "--retry-percent", type="int",
585 default
=SS_DEFAULT_RETRY_PERCENT
,
586 help="ignore --retry-failed if more than this percent fail [0, 100]"
587 " (default = %i)" % SS_DEFAULT_RETRY_PERCENT
)
589 "-t", "--timeout", type="int", default
=SS_DEFAULT_TIMEOUT
,
590 help="timeout in seconds to wait for a shard (default=%default s)")
592 "--total-slaves", type="int", default
=1,
593 help="if running a subset, number of slaves sharing the test")
595 "--slave-index", type="int", default
=0,
596 help="if running a subset, index of the slave to run tests for")
598 parser
.disable_interspersed_args()
599 (options
, args
) = parser
.parse_args()
602 parser
.error("You must specify a path to test!")
603 if not os
.path
.exists(args
[0]):
604 parser
.error("%s does not exist!" % args
[0])
606 num_cores
= DetectNumCores()
608 if options
.shards_per_core
< 1:
609 parser
.error("You must have at least 1 shard per core!")
610 num_shards_to_run
= num_cores
* options
.shards_per_core
612 if options
.runs_per_core
< 1:
613 parser
.error("You must have at least 1 run per core!")
614 num_runs
= num_cores
* options
.runs_per_core
617 gtest_args
= ["--gtest_color=%s" % {
618 True: "yes", False: "no"}[options
.color
]] + args
[1:]
620 if options
.original_order
:
621 options
.prefix
= True
623 # TODO(charleslee): for backwards compatibility with buildbot's log_parser
625 options
.original_order
= False
626 options
.prefix
= True
628 if options
.random_seed
:
629 seed
= random
.randint(1, 99999)
630 gtest_args
.extend(["--gtest_shuffle", "--gtest_random_seed=%i" % seed
])
632 if options
.retry_failed
:
633 if options
.retry_percent
< 0 or options
.retry_percent
> 100:
634 parser
.error("Retry percent must be an integer [0, 100]!")
636 options
.retry_percent
= -1
638 if options
.runshard
!= None:
639 # run a single shard and exit
640 if (options
.runshard
< 0 or options
.runshard
>= num_shards_to_run
):
641 parser
.error("Invalid shard number given parameters!")
643 test
, num_shards_to_run
, options
.runshard
, gtest_args
, None, None)
647 # When running browser_tests, load the test binary into memory before running
648 # any tests. This is needed to prevent loading it from disk causing the first
649 # run tests to timeout flakily. See: http://crbug.com/124260
650 if "browser_tests" in test
:
652 args
.extend(gtest_args
)
653 args
.append("--warmup")
654 result
= subprocess
.call(args
,
656 universal_newlines
=True)
657 # If the test fails, don't run anything else.
661 # shard and run the whole test
662 ss
= ShardingSupervisor(
663 test
, num_shards_to_run
, num_runs
, options
.color
,
664 options
.original_order
, options
.prefix
, options
.retry_percent
,
665 options
.timeout
, options
.total_slaves
, options
.slave_index
, gtest_args
)
666 return ss
.ShardTest()
669 if __name__
== "__main__":