2 # Copyright 2014 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
6 """Mutational ClusterFuzz fuzzer. A pre-built corpus of ipcdump files has
7 to be uploaded to ClusterFuzz along with this script. As chrome is being
8 developed, the corpus will become out-of-date and needs to be updated.
10 This fuzzer will pick some ipcdumps from the corpus, concatenate them with
11 ipc_message_util and mutate the result with ipc_fuzzer_mutate.
20 FUZZER_NAME_OPTION
= '--fuzzer-name=mutate'
21 IPC_MESSAGE_UTIL_APPLICATION
= 'ipc_message_util'
22 IPCDUMP_MERGE_LIMIT
= 50
24 class MutationalFuzzer
:
25 def parse_arguments(self
):
26 self
.args
= utils
.parse_arguments()
28 def set_application_paths(self
):
29 chrome_application_path
= utils
.get_application_path()
30 chrome_application_directory
= os
.path
.dirname(chrome_application_path
)
32 self
.ipc_message_util_binary
= utils
.application_name_for_platform(
33 IPC_MESSAGE_UTIL_APPLICATION
)
34 self
.ipc_fuzzer_binary
= utils
.get_fuzzer_application_name()
35 self
.ipc_replay_binary
= utils
.get_replay_application_name()
36 self
.ipc_message_util_binary_path
= os
.path
.join(
37 chrome_application_directory
, self
.ipc_message_util_binary
)
38 self
.ipc_fuzzer_binary_path
= os
.path
.join(
39 chrome_application_directory
, self
.ipc_fuzzer_binary
)
40 self
.ipc_replay_binary_path
= os
.path
.join(
41 chrome_application_directory
, self
.ipc_replay_binary
)
44 # Corpus should be set per job as a fuzzer-specific environment variable.
45 corpus
= os
.getenv('IPC_CORPUS_DIR', 'default')
46 corpus_directory
= os
.path
.join(self
.args
.input_dir
, corpus
)
47 if not os
.path
.exists(corpus_directory
):
48 sys
.exit('Corpus directory "%s" not found.' % corpus_directory
)
50 entries
= os
.listdir(corpus_directory
)
51 entries
= [i
for i
in entries
if i
.endswith(utils
.IPCDUMP_EXTENSION
)]
52 self
.corpus
= [os
.path
.join(corpus_directory
, entry
) for entry
in entries
]
54 def create_mutated_ipcdump_testcase(self
):
55 ipcdumps
= ','.join(random
.sample(self
.corpus
, IPCDUMP_MERGE_LIMIT
))
56 tmp_ipcdump_testcase
= utils
.create_temp_file()
57 mutated_ipcdump_testcase
= (
58 utils
.random_ipcdump_testcase_path(self
.args
.output_dir
))
60 # Concatenate ipcdumps -> tmp_ipcdump.
62 self
.ipc_message_util_binary_path
,
66 if subprocess
.call(cmd
):
67 sys
.exit('%s failed.' % self
.ipc_message_util_binary
)
69 # Mutate tmp_ipcdump -> mutated_ipcdump.
71 self
.ipc_fuzzer_binary_path
,
74 mutated_ipcdump_testcase
,
76 if subprocess
.call(cmd
):
77 sys
.exit('%s failed.' % self
.ipc_fuzzer_binary
)
79 utils
.create_flags_file(
80 mutated_ipcdump_testcase
, self
.ipc_replay_binary_path
)
81 os
.remove(tmp_ipcdump_testcase
)
84 self
.parse_arguments()
85 self
.set_application_paths()
87 for _
in xrange(self
.args
.no_of_files
):
88 self
.create_mutated_ipcdump_testcase()
92 if __name__
== "__main__":
93 fuzzer
= MutationalFuzzer()
94 sys
.exit(fuzzer
.main())