Update README.md
[usbkill.git] / usbkill / usbkill.py
blob1b85d2640d1af63200214b2d69f88e3796e39ffe
1 #!/usr/bin/env python
3 # _ _ _ _ _
4 # | | | | (_) | |
5 # _ _ ___| |__ | | _ _| | |
6 # | | | |/___) _ \| |_/ ) | | |
7 # | |_| |___ | |_) ) _ (| | | |
8 # |____/(___/|____/|_| \_)_|\_)_)
11 # Hephaestos <hephaestos@riseup.net> - 8764 EF6F D5C1 7838 8D10 E061 CF84 9CE5 42D0 B12B
12 # <https://github.com/hephaest0s/usbkill>
14 # This program is free software: you can redistribute it and/or modify
15 # it under the terms of the GNU General Public License as published by
16 # the Free Software Foundation, either version 3 of the License, or
17 # (at your option) any later version.
19 # This program is distributed in the hope that it will be useful,
20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 # GNU General Public License for more details.
24 # You should have received a copy of the GNU General Public License
25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
27 __version__ = "1.0-rc.4"
29 import re
30 import subprocess
31 import platform
32 import os, sys, signal
33 from time import sleep
34 from datetime import datetime
36 # Get the current platform
37 CURRENT_PLATFORM = platform.system().upper()
39 # Darwin specific library
40 if CURRENT_PLATFORM.startswith("DARWIN"):
41 import plistlib
43 # We compile this function beforehand for efficiency.
44 DEVICE_RE = [ re.compile(".+ID\s(?P<id>\w+:\w+)"), re.compile("0x([0-9a-z]{4})") ]
46 # Set the settings filename here
47 SETTINGS_FILE = '/etc/usbkill.ini'
49 help_message = """
50 usbkill is a simple program with one goal: quickly shutdown the computer when a USB is inserted or removed.
51 Events are logged in /var/log/usbkill/kills.log
52 You can configure a whitelist of USB ids that are acceptable to insert and the remove.
53 The USB id can be found by running the command 'lsusb'.
54 Settings can be changed in /etc/usbkill.ini
55 In order to be able to shutdown the computer, this program needs to run as root.
57 Options:
58 -h --help: Show this help
59 --version: Print usbkill version and exit
60 --cs: Copy program folder usbkill.ini to /etc/usbkill/usbkill.ini
61 --no-shut-down: Execute all the (destructive) commands you defined in usbkill.ini,
62 but don't turn off the computer
63 """
65 class DeviceCountSet(dict):
66 # Warning: this class has behavior you may not expect.
67 # This is because the overloaded __add__ is a mixture of add and overwrite
68 def __init__(self, list):
69 count = dict()
70 for i in list:
71 if type(i) == dict:
72 count[i.keys()[0]] = i.values()[0]
73 elif i in count:
74 count[i] += 1
75 else:
76 count[i] = 1
77 super(DeviceCountSet,self).__init__(count)
79 def __add__(self, other):
80 newdic = dict(self)
81 if type(other) in ['tuple', 'list']:
82 for k in other:
83 newdic[k] = 1
84 else:
85 for k,v in other.items():
86 if k in newdic:
87 if newdic[k] < v:
88 newdic[k] = v
89 else:
90 newdic[k] = v
91 return newdic
93 def log(settings, msg):
94 log_file = settings['log_file']
96 contents = '\n{0} {1}\nCurrent state:\n'.format(str(datetime.now()), msg)
97 with open(log_file, 'a+') as log:
98 log.write(contents)
100 # Log current USB state
101 if CURRENT_PLATFORM.startswith("DARWIN"):
102 os.system("system_profiler SPUSBDataType >> " + log_file)
103 else:
104 os.system("lsusb >> " + log_file)
106 def shred(settings):
107 shredder = settings['remove_file_cmd']
109 # List logs and settings to be removed
110 if settings['melt_usbkill']:
111 settings['folders_to_remove'].append(os.path.dirname(settings['log_file']))
112 settings['folders_to_remove'].append(os.path.dirname(SETTINGS_FILE))
113 usbkill_folder = os.path.dirname(os.path.realpath(__file__))
114 if usbkill_folder.upper().startswith('USB'):
115 settings['folders_to_remove'].append(usbkill_folder)
116 else:
117 settings['files_to_remove'].append(os.path.realpath(__file__))
118 settings['files_to_remove'].append(usbkill_folder + "/usbkill.ini")
120 # Remove files and folders
121 for _file in settings['files_to_remove'] + settings['folders_to_remove']:
122 os.system(shredder + _file )
124 def kill_computer(settings):
125 # Log what is happening:
126 if not settings['melt_usbkill']: # No need to spend time on logging if logs will be removed
127 log(settings, "Detected a USB change. Dumping the list of connected devices and killing the computer...")
129 # Shred as specified in settings
130 shred(settings)
132 # Execute kill commands in order.
133 for command in settings['kill_commands']:
134 os.system(command)
136 if settings['do_sync']:
137 # Sync the filesystem to save recent changes
138 os.system("sync")
139 else:
140 # If syncing is risky because it might take too long, then sleep for 5ms.
141 # This will still allow for syncing in most cases.
142 sleep(0.05)
144 # Wipe ram and/or swap
145 if settings['do_wipe_ram'] and settings['do_wipe_swap']:
146 os.system(settings['wipe_ram_cmd'] + " & " + settings['wipe_swap_cmd'])
147 elif settings['do_wipe_ram']:
148 os.system(settings['wipe_ram_cmd'])
149 elif settings['do_wipe_swap']:
150 os.system(settings['wipe_swap_cmd'])
152 if settings['shut_down']: # (Use argument --no-shut-down to prevent a shutdown.)
153 # Finally poweroff computer immediately
154 if CURRENT_PLATFORM.startswith("DARWIN"):
155 # OS X (Darwin) - Will halt ungracefully, without signaling apps
156 os.system("killall Finder ; killall loginwindow ; halt -q")
157 elif CURRENT_PLATFORM.endswith("BSD"):
158 # BSD-based systems - Will shutdown
159 os.system("shutdown -h now")
160 else:
161 # Linux-based systems - Will shutdown
162 os.system("poweroff -f")
164 # Exit the process to prevent executing twice (or more) all commands
165 sys.exit(0)
167 def lsusb_darwin():
168 # Use OS X system_profiler (native and 60% faster than lsusb port)
169 df = subprocess.check_output("system_profiler SPUSBDataType -xml -detailLevel mini", shell=True)
170 if sys.version_info[0] == 2:
171 df = plistlib.readPlistFromString(df)
172 elif sys.version_info[0] == 3:
173 df = plistlib.loads(df)
175 def check_inside(result, devices):
177 I suspect this function can become more readable.
179 # Do not take devices with Built-in_Device=Yes
180 try:
181 result["Built-in_Device"]
182 except KeyError:
184 # Check if vendor_id/product_id is available for this one
185 try:
186 # Ensure vendor_id and product_id are present
187 assert "vendor_id" in result and "product_id" in result
189 try:
190 vendor_id = DEVICE_RE[1].findall(result["vendor_id"])[0]
191 except IndexError:
192 # Assume this is not an standard vendor_id (probably apple_vendor_id)
193 vendor_id = result["vendor_id"];
195 try:
196 product_id = DEVICE_RE[1].findall(result["product_id"])[0]
197 except IndexError:
198 # Assume this is not an standard product_id (probably apple_vendor_id)
199 product_id = result["product_id"];
201 # Append to the list of devices
202 devices.append(vendor_id + ':' + product_id)
204 except AssertionError: {}
206 # Check if there is items inside
207 try:
208 # Looks like, do the while again
209 for result_deep in result["_items"]:
210 # Check what's inside the _items array
211 check_inside(result_deep, devices)
213 except KeyError: {}
215 # Run the loop
216 devices = []
217 for result in df[0]["_items"]:
218 check_inside(result, devices)
219 return devices
221 def lsusb():
222 # A Python version of the command 'lsusb' that returns a list of connected usbids
223 if CURRENT_PLATFORM.startswith("DARWIN"):
224 # Use OS X system_profiler (native, 60% faster, and doesn't need the lsusb port)
225 return DeviceCountSet(lsusb_darwin())
226 else:
227 # Use lsusb on linux and bsd
228 return DeviceCountSet(DEVICE_RE[0].findall(subprocess.check_output("lsusb", shell=True).decode('utf-8').strip()))
230 def program_present(program):
231 if sys.version_info[0] == 3:
232 # Python3
233 from shutil import which
234 return which(program) != None
236 else:
238 Test if an executable exist in Python2
239 -> http://stackoverflow.com/a/377028
241 def is_exe(fpath):
242 return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
244 fpath, fname = os.path.split(program)
245 if fpath and is_exe(program):
246 return True
247 else:
248 for path in os.environ["PATH"].split(os.pathsep):
249 path = path.strip('"')
250 exe_file = os.path.join(path, program)
251 if is_exe(exe_file):
252 return True
253 return False
255 def load_settings(filename):
256 # Libraries that are only needed in this function:
257 from json import loads as jsonloads
258 if sys.version_info[0] == 3:
259 # Python3
260 import configparser
261 def get_setting(name, gtype=''):
263 configparser: Compatibility layer for Python 2/3
264 Function currently depends on a side effect, which is not necessary.
266 section = config['config']
267 if gtype == 'FLOAT':
268 return section.getfloat(name)
269 elif gtype == 'INT':
270 return section.getint(name)
271 elif gtype == 'BOOL':
272 return section.getboolean(name)
273 return section[name]
274 else:
275 #Python2
276 import ConfigParser as configparser
277 def get_setting(name, gtype=''):
278 if gtype == 'FLOAT':
279 return config.getfloat('config', name)
280 elif gtype == 'INT':
281 return config.getint('config', name)
282 elif gtype == 'BOOL':
283 return config.getboolean('config', name)
284 return config.get('config', name)
286 config = configparser.ConfigParser()
288 # Read all lines of settings file
289 config.read(filename)
291 # Build settings
292 settings = dict({
293 'sleep_time' : get_setting('sleep', 'FLOAT'),
294 'whitelist': DeviceCountSet(jsonloads(get_setting('whitelist').strip())),
295 'log_file': get_setting('log_file'),
296 'melt_usbkill' : get_setting('melt_usbkill', 'BOOL'),
297 'remove_file_cmd' : get_setting('remove_file_cmd') + " ",
298 'files_to_remove' : jsonloads(get_setting('files_to_remove').strip()),
299 'folders_to_remove' : jsonloads(get_setting('folders_to_remove').strip()),
300 'do_sync' : get_setting('do_sync', 'BOOL'),
301 'kill_commands': jsonloads(get_setting('kill_commands').strip())
304 settings['do_wipe_ram'] = False
305 if get_setting('do_wipe_ram', 'BOOL'):
306 settings['do_wipe_ram'] = True
307 settings['wipe_ram_cmd'] = get_setting('wipe_ram_cmd') + " "
309 settings['do_wipe_swap'] = False
310 if get_setting('do_wipe_swap', 'BOOL'):
311 settings['do_wipe_swap'] = True
312 settings['wipe_swap_cmd'] = get_setting('wipe_swap_cmd') + " "
314 return settings
316 def loop(settings):
317 # Main loop that checks every 'sleep_time' seconds if computer should be killed.
318 # Allows only whitelisted usb devices to connect!
319 # Does not allow usb device that was present during program start to disconnect!
320 start_devices = lsusb()
321 acceptable_devices = start_devices + settings['whitelist']
323 # Write to logs that loop is starting:
324 msg = "[INFO] Started patrolling the USB ports every " + str(settings['sleep_time']) + " seconds..."
325 log(settings, msg)
326 print(msg)
328 # Main loop
329 while True:
330 # List the current usb devices
331 current_devices = lsusb()
333 # Check that all current devices are in the set of acceptable devices
334 # and their cardinality is less than or equal to what is allowed
335 for device, count in current_devices.items():
336 if device not in acceptable_devices:
337 # A device with unknown usbid detected
338 kill_computer(settings)
339 if count > acceptable_devices[device]:
340 # Count of a usbid is larger than what is acceptable (too many devices sharing usbid)
341 kill_computer(settings)
343 # Check that all start devices are still present in current devices
344 # and their cardinality still the same
345 for device, count in start_devices.items():
346 if device not in current_devices:
347 # A usbid has disappeared completely
348 kill_computer(settings)
349 if count > current_devices[device]:
350 # Count of a usbid device is lower than at program start (not enough devices for given usbid)
351 kill_computer(settings)
353 sleep(settings['sleep_time'])
355 def startup_checks():
356 # Splash
357 print(" _ _ _ _ _ \n" +
358 " | | | | (_) | | \n" +
359 " _ _ ___| |__ | | _ _| | | \n" +
360 " | | | |/___) _ \| |_/ ) | | | \n" +
361 " | |_| |___ | |_) ) _ (| | | | \n" +
362 " |____/(___/|____/|_| \_)_|\_)_)\n")
364 # Check arguments
365 args = sys.argv[1:]
367 # Check for help
368 if '-h' in args or '--help' in args:
369 sys.exit(help_message)
371 if '--version' in args:
372 print('usbkill', __version__)
373 sys.exit(0)
375 copy_settings = False
376 if '--cs' in args:
377 args.remove('--cs')
378 copy_settings = True
380 shut_down = True
381 if '--no-shut-down' in args:
382 print("[NOTICE] Ready to execute all the (potentially destructive) commands, but NOT shut down the computer.")
383 args.remove('--no-shut-down')
384 shut_down = False
386 # Check all other args
387 if len(args) > 0:
388 sys.exit("\n[ERROR] Argument not understood. Can only understand -h\n")
390 # Check if program is run as root, else exit.
391 # Root is needed to power off the computer.
392 if not os.geteuid() == 0:
393 sys.exit("\n[ERROR] This program needs to run as root.\n")
395 # Warn the user if he does not have FileVault
396 if CURRENT_PLATFORM.startswith("DARWIN"):
397 try:
398 # fdesetup return exit code 0 when true and 1 when false
399 subprocess.check_output(["/usr/bin/fdesetup", "isactive"])
400 except subprocess.CalledProcessError:
401 print("[NOTICE] FileVault is disabled. Sensitive data SHOULD be encrypted.")
403 # On first time use copy usbkill.ini to /etc/usebkill.ini
404 # If dev-mode, always copy and don't remove old settings
405 if not os.path.isfile(SETTINGS_FILE) or copy_settings:
406 sources_path = os.path.dirname(os.path.realpath(__file__))
407 if not os.path.isfile(os.path.join(sources_path, "install/usbkill.ini")):
408 sys.exit("\n[ERROR] You have lost your settings file. Get a new copy of the usbkill.ini and place it in /etc/ or in " + sources_path + "/\n")
409 print("[NOTICE] Copying install/setting.ini to " + SETTINGS_FILE )
410 os.system("cp " + sources_path + "install/usbkill.ini " + SETTINGS_FILE)
412 # Load settings
413 settings = load_settings(SETTINGS_FILE)
414 settings['shut_down'] = shut_down
416 # Make sure no spaces a present in paths to be wiped.
417 for name in settings['folders_to_remove'] + settings['files_to_remove']:
418 if ' ' in name:
419 msg += "[ERROR][WARNING] '" + name + "'as specified in your usbkill.ini contains a space.\n"
420 sys.exit(msg)
422 # Make sure srm is present if it will be used.
423 if settings['melt_usbkill'] or len(settings['folders_to_remove'] + settings['files_to_remove']) > 0:
424 if not program_present('srm'):
425 sys.exit("[ERROR] usbkill configured to destroy data, but srm not installed.\n")
426 if not settings['remove_file_cmd'].startswith('srm'):
427 sys.exit("[ERROR] remove_file_command should start with `srm'. srm should be used for automated data overwrite.\n")
428 # Make sure sdmem is present if it will be used.
429 if settings['do_wipe_ram']:
430 if not program_present('sdmem'):
431 sys.exit("[ERROR] usbkill configured to destroy data, but srm not installed.\n")
432 if not settings['wipe_ram_cmd'].startswith('sdmem'):
433 sys.exit("[ERROR] wipe_ram_cmd should start with `sdmem'. sdmem should be used for automated data overwrite.\n")
434 # Make sure sswap is present if it will be used.
435 if settings['do_wipe_swap']:
436 if not program_present('sswap'):
437 sys.exit("[ERROR] usbkill configured to destroy data, but srm not installed.\n")
438 if not settings['wipe_swap_cmd'].startswith('sswap'):
439 sys.exit("[ERROR] wipe_swap_cmd should start with `sswap'. sswap should be used for automated data overwrite.\n")
441 # Make sure there is a logging folder
442 log_folder = os.path.dirname(settings['log_file'])
443 if not os.path.isdir(log_folder):
444 os.mkdir(log_folder)
446 return settings
448 def go():
449 # Run startup checks and load settings
450 settings = startup_checks()
452 # Define exit handler now that settings are loaded...
453 def exit_handler(signum, frame):
454 print("\n[INFO] Exiting because exit signal was received\n")
455 log(settings, "[INFO] Exiting because exit signal was received")
456 sys.exit(0)
458 # Register handlers for clean exit of program
459 for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGQUIT, ]:
460 signal.signal(sig, exit_handler)
462 # Start main loop
463 loop(settings)
465 if __name__=="__main__":
466 go()