Update neopi.py
[NeoPI.git] / neopi.py
blob679e35420c3a24187a46e3a922d942b4fc6f56dc
1 #!/usr/bin/python
2 # Name: neopi.py
3 # Description: Utility to scan a file path for encrypted and obfuscated files
4 # Authors: Ben Hagen (ben.hagen@neohapsis.com)
5 # Scott Behrens (scott.behrens@neohapsis.com)
7 # Date: 11/4/2010
9 # pep-0008 - Is stupid. TABS FO'EVER!
11 # Try catch regular expressions/bad path/bad filename/bad regex/
13 # Library imports
14 import math
15 import sys
16 import os
17 import re
18 import csv
19 import zlib
20 import time
21 from collections import defaultdict
22 from optparse import OptionParser
24 class LanguageIC:
25 """Class that calculates a file's Index of Coincidence as
26 as well as a a subset of files average Index of Coincidence.
27 """
28 def __init__(self):
29 """Initialize results arrays as well as character counters."""
30 self.char_count = defaultdict(int)
31 self.total_char_count = 0
32 self.results = []
33 self.ic_total_results = ""
35 def calculate_char_count(self,data):
36 """Method to calculate character counts for a particular data file."""
37 if not data:
38 return 0
39 for x in range(256):
40 char = chr(x)
41 charcount = data.count(char)
42 self.char_count[char] += charcount
43 self.total_char_count += charcount
44 return
46 def calculate_IC(self):
47 """Calculate the Index of Coincidence for the self variables"""
48 total = 0
49 for val in self.char_count.values():
51 if val == 0:
52 continue
53 total += val * (val-1)
55 try:
56 ic_total = float(total)/(self.total_char_count * (self.total_char_count - 1))
57 except:
58 ic_total = 0
59 self.ic_total_results = ic_total
60 return
62 def calculate(self,data,filename):
63 """Calculate the Index of Coincidence for a file and append to self.ic_results array"""
64 if not data:
65 return 0
66 char_count = 0
67 total_char_count = 0
69 for x in range(256):
70 char = chr(x)
71 charcount = data.count(char)
72 char_count += charcount * (charcount - 1)
73 total_char_count += charcount
75 ic = float(char_count)/(total_char_count * (total_char_count - 1))
76 self.results.append({"filename":filename, "value":ic})
77 # Call method to calculate_char_count and append to total_char_count
78 self.calculate_char_count(data)
79 return ic
81 def sort(self):
82 self.results.sort(key=lambda item: item["value"])
83 self.results = resultsAddRank(self.results)
85 def printer(self, count):
86 """Print the top signature count match files for a given search"""
87 # Calculate the Total IC for a Search
88 self.calculate_IC()
89 print "\n[[ Average IC for Search ]]"
90 print self.ic_total_results
91 print "\n[[ Top %i lowest IC files ]]" % (count)
92 if (count > len(self.results)): count = len(self.results)
93 for x in range(count):
94 print ' {0:>7.4f} {1}'.format(self.results[x]["value"], self.results[x]["filename"])
95 return
97 class Entropy:
98 """Class that calculates a file's Entropy."""
100 def __init__(self):
101 """Instantiate the entropy_results array."""
102 self.results = []
104 def calculate(self,data,filename):
105 """Calculate the entropy for 'data' and append result to entropy_results array."""
107 if not data:
108 return 0
109 entropy = 0
110 data.replace(' ', '')
111 for x in range(256):
112 p_x = float(data.count(chr(x)))/len(data)
113 if p_x > 0:
114 entropy += - p_x * math.log(p_x, 2)
115 self.results.append({"filename":filename, "value":entropy})
116 return entropy
118 def sort(self):
119 self.results.sort(key=lambda item: item["value"])
120 self.results.reverse()
121 self.results = resultsAddRank(self.results)
123 def printer(self, count):
124 """Print the top signature count match files for a given search"""
125 print "\n[[ Top %i entropic files for a given search ]]" % (count)
126 if (count > len(self.results)): count = len(self.results)
127 for x in range(count):
128 print ' {0:>7.4f} {1}'.format(self.results[x]["value"], self.results[x]["filename"])
129 return
131 class LongestWord:
132 """Class that determines the longest word for a particular file."""
133 def __init__(self):
134 """Instantiate the longestword_results array."""
135 self.results = []
137 def calculate(self,data,filename):
138 """Find the longest word in a string and append to longestword_results array"""
139 if not data:
140 return "", 0
141 longest = 0
142 longest_word = ""
143 words = re.split("[\s,\n,\r]", data)
144 if words:
145 for word in words:
146 length = len(word)
147 if length > longest:
148 longest = length
149 longest_word = word
150 self.results.append({"filename":filename, "value":longest})
151 return longest
153 def sort(self):
154 self.results.sort(key=lambda item: item["value"])
155 self.results.reverse()
156 self.results = resultsAddRank(self.results)
158 def printer(self, count):
159 """Print the top signature count match files for a given search"""
160 print "\n[[ Top %i longest word files ]]" % (count)
161 if (count > len(self.results)): count = len(self.results)
162 for x in range(count):
163 print ' {0:>7} {1}'.format(self.results[x]["value"], self.results[x]["filename"])
164 return
166 class SignatureNasty:
167 """Generator that searches a given file for nasty expressions"""
169 def __init__(self):
170 """Instantiate the longestword_results array."""
171 self.results = []
173 def calculate(self, data, filename):
174 if not data:
175 return "", 0
176 # Lots taken from the wonderful post at http://stackoverflow.com/questions/3115559/exploitable-php-functions
177 valid_regex = re.compile('(eval\(|base64_decode|python_eval|exec\(|passthru|popen|proc_open|pcntl|assert\(|system\(|shell)', re.I)
178 matches = re.findall(valid_regex, data)
179 self.results.append({"filename":filename, "value":len(matches)})
180 return len(matches)
182 def sort(self):
183 self.results.sort(key=lambda item: item["value"])
184 self.results.reverse()
185 self.results = resultsAddRank(self.results)
187 def printer(self, count):
188 """Print the top signature count match files for a given search"""
189 print "\n[[ Top %i signature match counts ]]" % (count)
190 if (count > len(self.results)): count = len(self.results)
191 for x in range(count):
192 print ' {0:>7} {1}'.format(self.results[x]["value"], self.results[x]["filename"])
193 return
195 class Compression:
196 """Generator finds compression ratio"""
198 def __init__(self):
199 """Instantiate the results array."""
200 self.results = []
202 def calculate(self, data, filename):
203 if not data:
204 return "", 0
205 compressed = zlib.compress(data)
206 ratio = float(len(compressed)) / float(len(data))
207 self.results.append({"filename":filename, "value":ratio})
208 return ratio
210 def sort(self):
211 self.results.sort(key=lambda item: item["value"])
212 self.results.reverse()
213 self.results = resultsAddRank(self.results)
215 def printer(self, count):
216 """Print the top files for a given search"""
217 print "\n[[ Top %i compression match counts ]]" % (count)
218 if (count > len(self.results)): count = len(self.results)
219 for x in range(count):
220 print ' {0:>7.4f} {1}'.format(self.results[x]["value"], self.results[x]["filename"])
221 return
223 def resultsAddRank(results):
224 rank = 1
225 offset = 1
226 previousValue = False
227 newList = []
228 for file in results:
229 if (previousValue and previousValue != file["value"]):
230 rank = offset
231 file["rank"] = rank
232 newList.append(file)
233 previousValue = file["value"]
234 offset = offset + 1
235 return newList
237 class SearchFile:
238 """Generator that searches a given filepath with an optional regular
239 expression and returns the filepath and filename"""
240 def search_file_path(self, args, valid_regex):
241 for root, dirs, files in os.walk(args[0]):
242 for file in files:
243 filename = os.path.join(root, file)
244 if (valid_regex.search(file) and os.path.getsize(filename) > 60):
245 try:
246 data = open(root + "/" + file, 'rb').read()
247 except:
248 data = False
249 print "Could not read file :: %s/%s" % (root, file)
250 yield data, filename
252 if __name__ == "__main__":
253 """Parse all the options"""
255 timeStart = time.clock()
257 print """
258 ) ( (
259 ( /( )\ ))\ )
260 )\()) ( (()/(()/(
261 ((_)\ ))\ ( /(_))(_))
262 _((_)/((_))\(_))(_))
263 | \| (_)) ((_) _ \_ _|
264 | .` / -_) _ \ _/| |
265 |_|\_\___\___/_| |___| Ver. *.USEGIT
268 parser = OptionParser(usage="usage: %prog [options] <start directory> <OPTIONAL: filename regex>",
269 version="%prog 1.0")
270 parser.add_option("-c", "--csv",
271 action="store",
272 dest="is_csv",
273 default=False,
274 help="generate CSV outfile",
275 metavar="FILECSV")
276 parser.add_option("-a", "--all",
277 action="store_true",
278 dest="is_all",
279 default=False,
280 help="Run all (useful) tests [Entropy, Longest Word, IC, Signature]",)
281 parser.add_option("-z", "--zlib",
282 action="store_true",
283 dest="is_zlib",
284 default=False,
285 help="Run compression Test",)
286 parser.add_option("-e", "--entropy",
287 action="store_true",
288 dest="is_entropy",
289 default=False,
290 help="Run entropy Test",)
291 parser.add_option("-l", "--longestword",
292 action="store_true",
293 dest="is_longest",
294 default=False,
295 help="Run longest word test",)
296 parser.add_option("-i", "--ic",
297 action="store_true",
298 dest="is_ic",
299 default=False,
300 help="Run IC test",)
301 parser.add_option("-s", "--signature",
302 action="store_true",
303 dest="is_signature",
304 default=False,
305 help="Run signature test",)
306 parser.add_option("-A", "--auto",
307 action="store_true",
308 dest="is_auto",
309 default=False,
310 help="Run auto file extension tests",)
311 parser.add_option("-u", "--unicode",
312 action="store_true",
313 dest="ignore_unicode",
314 default=False,
315 help="Skip over unicode-y/UTF'y files",)
317 (options, args) = parser.parse_args()
319 # Error on invalid number of arguements
320 if len(args) < 1:
321 parser.print_help()
322 print ""
323 sys.exit()
325 # Error on an invalid path
326 if os.path.exists(args[0]) == False:
327 parser.error("Invalid path")
329 valid_regex = ""
330 if (len(args) == 2 and options.is_auto is False):
331 try:
332 valid_regex = re.compile(args[1])
333 except:
334 parser.error("Invalid regular expression")
335 else:
336 valid_regex = re.compile('.*')
337 tests = []
339 if options.is_auto:
340 valid_regex = re.compile('(\.php|\.asp|\.aspx|\.scath|\.bash|\.zsh|\.csh|\.tsch|\.pl|\.py|\.txt|\.cgi|\.cfm|\.htaccess)$')
342 if options.is_all:
343 tests.append(LanguageIC())
344 tests.append(Entropy())
345 tests.append(LongestWord())
346 tests.append(SignatureNasty())
347 else:
348 if options.is_entropy:
349 tests.append(Entropy())
350 if options.is_longest:
351 tests.append(LongestWord())
352 if options.is_ic:
353 tests.append(LanguageIC())
354 if options.is_signature:
355 tests.append(SignatureNasty())
356 if options.is_zlib:
357 tests.append(Compression())
359 # Instantiate the Generator Class used for searching, opening, and reading files
360 locator = SearchFile()
362 # CSV file output array
363 csv_array = []
364 csv_header = ["filename"]
366 # Grab the file and calculate each test against file
367 fileCount = 0
368 fileIgnoreCount = 0
369 for data, filename in locator.search_file_path(args, valid_regex):
370 if data:
371 # a row array for the CSV
372 csv_row = []
373 csv_row.append(filename)
375 if options.ignore_unicode:
376 asciiHighCount = 0
377 for character in data:
378 if ord(character) > 127:
379 asciiHighCount = asciiHighCount + 1
381 fileAsciiHighRatio = float(asciiHighCount) / float(len(data))
383 if (options.ignore_unicode == False or fileAsciiHighRatio < .1):
384 for test in tests:
385 calculated_value = test.calculate(data, filename)
386 # Make the header row if it hasn't been fully populated, +1 here to account for filename column
387 if len(csv_header) < len(tests) + 1:
388 csv_header.append(test.__class__.__name__)
389 csv_row.append(calculated_value)
390 fileCount = fileCount + 1
391 csv_array.append(csv_row)
392 else:
393 fileIgnoreCount = fileIgnoreCount + 1
395 if options.is_csv:
396 csv_array.insert(0,csv_header)
397 fileOutput = csv.writer(open(options.is_csv, "wb"))
398 fileOutput.writerows(csv_array)
400 timeFinish = time.clock()
402 # Print some stats
403 print "\n[[ Total files scanned: %i ]]" % (fileCount)
404 print "[[ Total files ignored: %i ]]" % (fileIgnoreCount)
405 print "[[ Scan Time: %f seconds ]]" % (timeFinish - timeStart)
407 # Print top rank lists
408 rank_list = {}
409 for test in tests:
410 test.sort()
411 test.printer(10)
412 for file in test.results:
413 rank_list[file["filename"]] = rank_list.setdefault(file["filename"], 0) + file["rank"]
415 rank_sorted = sorted(rank_list.items(), key=lambda x: x[1])
417 print "\n[[ Top cumulative ranked files ]]"
418 count = 10
419 if (count > len(rank_sorted)): count = len(rank_sorted)
420 for x in range(count):
421 print ' {0:>7} {1}'.format(rank_sorted[x][1], rank_sorted[x][0])