Parse MSP messages from the backpack while TX is in mavlink mode (#2883)
[ExpressLRS.git] / src / python / binary_configurator.py
blob647f2bd7d0d15f8283df5a73858c51f46b1f2c91
1 #!/usr/bin/python
3 import os
4 from random import randint
5 import argparse
6 import json
7 from json import JSONEncoder
8 import mmap
9 import hashlib
10 from enum import Enum
11 import shutil
13 import firmware
14 from firmware import DeviceType, FirmwareOptions, RadioType, MCUType, TXType
15 import melodyparser
16 import UnifiedConfiguration
17 import binary_flash
18 from binary_flash import UploadMethod
19 from external import jmespath
21 class BuzzerMode(Enum):
22 quiet = 'quiet'
23 one = 'one-beep'
24 beep = 'beep-tune'
25 default = 'default-tune'
26 custom = 'custom-tune'
28 def __str__(self):
29 return self.value
31 class RegulatoryDomain(Enum):
32 us_433 = 'us_433'
33 us_433_wide = 'us_433_wide'
34 eu_433 = 'eu_433'
35 au_433 = 'au_433'
36 in_866 = 'in_866'
37 eu_868 = 'eu_868'
38 au_915 = 'au_915'
39 fcc_915 = 'fcc_915'
41 def __str__(self):
42 return self.value
44 def write32(mm, pos, val):
45 if val != None:
46 mm[pos + 0] = (val >> 0) & 0xFF
47 mm[pos + 1] = (val >> 8) & 0xFF
48 mm[pos + 2] = (val >> 16) & 0xFF
49 mm[pos + 3] = (val >> 24) & 0xFF
50 return pos + 4
52 def read32(mm, pos):
53 val = mm[pos + 0]
54 val += mm[pos + 1] << 8
55 val += mm[pos + 2] << 16
56 val += mm[pos + 3] << 24
57 return pos + 4, val
59 def writeString(mm, pos, string, maxlen):
60 if string != None:
61 l = len(string)
62 if l > maxlen-1:
63 l = maxlen-1
64 mm[pos:pos+l] = string.encode()[0,l]
65 mm[pos+l] = 0
66 return pos + maxlen
68 def readString(mm, pos, maxlen):
69 val = mm[pos:mm.find(b'\x00', pos)].decode()
70 return pos + maxlen, val
72 def generateUID(phrase):
73 uid = [
74 int(item) if item.isdigit() else -1
75 for item in phrase.split(',')
77 if (4 <= len(uid) <= 6) and all(ele >= 0 and ele < 256 for ele in uid):
78 # Extend the UID to 6 bytes, as only 4 are needed to bind
79 uid = [0] * (6 - len(uid)) + uid
80 uid = bytes(uid)
81 else:
82 uid = hashlib.md5(("-DMY_BINDING_PHRASE=\""+phrase+"\"").encode()).digest()[0:6]
83 return uid
85 def patch_uid(mm, pos, args):
86 if (args.phrase):
87 mm[pos] = 1
88 mm[pos+1:pos + 7] = generateUID(args.phrase)
89 pos += 7
90 return pos
92 def patch_flash_discriminator(mm, pos, args):
93 return write32(mm, pos, args.flash_discriminator)
95 def patch_wifi(mm, pos, args):
96 interval = None
97 if args.no_auto_wifi:
98 interval = -1
99 elif args.auto_wifi:
100 interval = args.auto_wifi * 1000
101 pos = write32(mm, pos, interval)
102 pos = writeString(mm, pos, args.ssid, 33)
103 pos = writeString(mm, pos, args.password, 65)
104 return pos
106 def patch_rx_params(mm, pos, args):
107 pos = write32(mm, pos, args.rx_baud if args.airport_baud is None else args.airport_baud)
108 val = mm[pos]
109 val &= ~1 # unused1 - ex invert_tx
110 if args.lock_on_first_connection != None:
111 val &= ~2
112 val |= (args.lock_on_first_connection << 1)
113 val &= ~4 # unused2 - ex r9mm_mini_sbus
114 if args.airport_baud != None:
115 val |= ~8
116 val |= 0 if args.airport_baud == 0 else 8
117 mm[pos] = val
118 return pos + 1
120 def patch_tx_params(mm, pos, args, options):
121 pos = write32(mm, pos, args.tlm_report)
122 pos = write32(mm, pos, args.fan_min_runtime)
123 val = mm[pos]
124 val &= ~1 # unused1 - ex uart_inverted
125 if args.unlock_higher_power != None:
126 val &= ~2
127 val |= (args.unlock_higher_power << 1)
128 if args.airport_baud != None:
129 val |= ~4
130 val |= 0 if args.airport_baud == 0 else 4
131 mm[pos] = val
132 pos += 1
133 if options.hasBuzzer:
134 pos = patch_buzzer(mm, pos, args)
135 pos = write32(mm, pos, 0 if args.airport_baud is None else args.airport_baud)
136 return pos
138 def patch_buzzer(mm, pos, args):
139 melody = args.buzzer_melody
140 if args.buzzer_mode:
141 if args.buzzer_mode == BuzzerMode.quiet:
142 mm[pos] = 0
143 if args.buzzer_mode == BuzzerMode.one:
144 mm[pos] = 1
145 if args.buzzer_mode == BuzzerMode.beep:
146 mm[pos] = 2
147 melody = 'A4 20 B4 20|60|0'
148 if args.buzzer_mode == BuzzerMode.default:
149 mm[pos] = 2
150 melody = 'E5 40 E5 40 C5 120 E5 40 G5 22 G4 21|20|0'
151 if args.buzzer_mode == BuzzerMode.custom:
152 mm[pos] = 2
153 melody = args.buzzer_melody
154 mode = mm[pos]
155 pos += 1
157 mpos = 0
158 if mode == 2 and melody:
159 melodyArray = melodyparser.parseToArray(melody)
160 for element in melodyArray:
161 if mpos == 32*4:
162 print("Melody truncated at 32 tones")
163 break
164 mm[pos+mpos+0] = (int(element[0]) >> 0) & 0xFF
165 mm[pos+mpos+1] = (int(element[0]) >> 8) & 0xFF
166 mm[pos+mpos+2] = (int(element[1]) >> 0) & 0xFF
167 mm[pos+mpos+3] = (int(element[1]) >> 8) & 0xFF
168 mpos += 4
169 # If we are short, then add a terminating 0's
170 while(mpos < 32*4):
171 mm[pos+mpos] = 0
172 mpos += 1
174 pos += 32*4 # 32 notes x (2 bytes tone, 2 bytes duration)
175 return pos
177 def FREQ_HZ_TO_REG_VAL_SX127X(freq):
178 return int(freq/61.03515625)
180 def FREQ_HZ_TO_REG_VAL_SX1280(freq):
181 return int(freq/(52000000.0/pow(2,18)))
183 def generate_domain(mm, pos, count, init, step):
184 pos = write32(mm, pos, count)
185 val = init
186 for x in range(count):
187 pos = write32(mm, pos, FREQ_HZ_TO_REG_VAL_SX127X(val))
188 val += step
190 def domain_number(domain):
191 if domain == RegulatoryDomain.au_915:
192 return 0
193 elif domain == RegulatoryDomain.fcc_915:
194 return 1
195 elif domain == RegulatoryDomain.eu_868:
196 return 2
197 elif domain == RegulatoryDomain.in_866:
198 return 3
199 elif domain == RegulatoryDomain.au_433:
200 return 4
201 elif domain == RegulatoryDomain.eu_433:
202 return 5
203 elif domain == RegulatoryDomain.us_433:
204 return 6
205 elif domain == RegulatoryDomain.us_433_wide:
206 return 7
208 def patch_firmware(options, mm, pos, args):
209 if options.mcuType is MCUType.STM32:
210 if options.radioChip is RadioType.SX127X and args.domain:
211 mm[pos] = domain_number(args.domain)
212 pos += 1
213 pos = patch_uid(mm, pos, args)
214 pos = patch_flash_discriminator(mm, pos, args)
215 if options.deviceType is DeviceType.TX:
216 pos = patch_tx_params(mm, pos, args, options)
217 elif options.deviceType is DeviceType.RX:
218 pos = patch_rx_params(mm, pos, args)
219 else:
220 patch_unified(args, options)
222 def patch_unified(args, options):
223 json_flags = {}
224 if args.phrase is not None:
225 json_flags['uid'] = [x for x in generateUID(args.phrase)]
226 if args.ssid is not None:
227 json_flags['wifi-ssid'] = args.ssid
228 if args.password is not None and args.ssid is not None:
229 json_flags['wifi-password'] = args.password
230 if args.auto_wifi is not None:
231 json_flags['wifi-on-interval'] = args.auto_wifi
233 if args.tlm_report is not None:
234 json_flags['tlm-interval'] = args.tlm_report
235 if args.unlock_higher_power is not None:
236 json_flags['unlock-higher-power'] = args.unlock_higher_power
237 if args.fan_min_runtime is not None:
238 json_flags['fan-runtime'] = args.fan_min_runtime
240 if args.airport_baud is not None:
241 json_flags['is-airport'] = True
242 if options.deviceType is DeviceType.RX:
243 json_flags['rcvr-uart-baud'] = args.airport_baud
244 else:
245 json_flags['airport-uart-baud'] = args.airport_baud
246 elif args.rx_baud is not None:
247 json_flags['rcvr-uart-baud'] = args.rx_baud
249 if args.lock_on_first_connection is not None:
250 json_flags['lock-on-first-connection'] = args.lock_on_first_connection
252 if args.domain is not None:
253 json_flags['domain'] = domain_number(args.domain)
255 json_flags['flash-discriminator'] = randint(1,2**32-1)
257 UnifiedConfiguration.doConfiguration(
258 args.file,
259 JSONEncoder().encode(json_flags),
260 args.target,
261 'tx' if options.deviceType is DeviceType.TX else 'rx',
262 '2400' if options.radioChip is RadioType.SX1280 else '900' if options.radioChip is RadioType.SX127X else 'dual',
263 '32' if options.mcuType is MCUType.ESP32 and options.deviceType is DeviceType.RX else '',
264 options.luaName,
265 args.rx_as_tx
268 def length_check(l, f):
269 def x(s):
270 if (len(s) > l):
271 raise argparse.ArgumentTypeError(f'too long, {l} chars max')
272 else:
273 return s
274 return x
276 def ask_for_firmware(args):
277 moduletype = 'tx' if args.tx else 'rx'
278 with open('hardware/targets.json') as f:
279 targets = json.load(f)
280 products = []
281 if args.target is not None:
282 target = args.target
283 config = jmespath.search('.'.join(map(lambda s: f'"{s}"', args.target.split('.'))), targets)
284 else:
285 i = 0
286 for k in jmespath.search(f'*.["{moduletype}_2400","{moduletype}_900","{moduletype}_dual"][].*[]', targets):
287 i += 1
288 products.append(k)
289 print(f"{i}) {k['product_name']}")
290 print('Choose a configuration to flash')
291 choice = input()
292 if choice != "":
293 config = products[int(choice)-1]
294 for v in targets:
295 for t in targets[v]:
296 if t != 'name':
297 for m in targets[v][t]:
298 if targets[v][t][m]['product_name'] == config['product_name']:
299 target = f'{v}.{t}.{m}'
300 return target, config
302 class readable_dir(argparse.Action):
303 def __call__(self, parser, namespace, values, option_string=None):
304 prospective_dir=values
305 if not os.path.isdir(prospective_dir):
306 raise argparse.ArgumentTypeError("readable_dir:{0} is not a valid path".format(prospective_dir))
307 if os.access(prospective_dir, os.R_OK):
308 setattr(namespace,self.dest,prospective_dir)
309 else:
310 raise argparse.ArgumentTypeError("readable_dir:{0} is not a readable dir".format(prospective_dir))
312 class writeable_dir(argparse.Action):
313 def __call__(self, parser, namespace, values, option_string=None):
314 prospective_dir=values
315 if not os.path.isdir(prospective_dir):
316 raise argparse.ArgumentTypeError("readable_dir:{0} is not a valid path".format(prospective_dir))
317 if os.access(prospective_dir, os.W_OK):
318 setattr(namespace,self.dest,prospective_dir)
319 else:
320 raise argparse.ArgumentTypeError("readable_dir:{0} is not a writeable dir".format(prospective_dir))
322 class deprecate_action(argparse.Action):
323 def __call__(self, parser, namespace, values, option_string=None):
324 delattr(namespace, self.dest)
326 def main():
327 parser = argparse.ArgumentParser(description="Configure Binary Firmware")
328 # firmware/targets directory
329 parser.add_argument('--dir', action=readable_dir, default=None, help='The directory that contains the "hardware" and other firmware directories')
330 parser.add_argument('--fdir', action=readable_dir, default=None, help='If specified, then the firmware files are loaded from this directory')
331 # Bind phrase
332 parser.add_argument('--phrase', type=str, help='Your personal binding phrase')
333 parser.add_argument('--flash-discriminator', type=int, default=randint(1,2**32-1), dest='flash_discriminator', help='Force a fixed flash-descriminator instead of random')
334 # WiFi Params
335 parser.add_argument('--ssid', type=length_check(32, "ssid"), required=False, help='Home network SSID')
336 parser.add_argument('--password', type=length_check(64, "password"), required=False, help='Home network password')
337 parser.add_argument('--auto-wifi', type=int, help='Interval (in seconds) before WiFi auto starts, if no connection is made')
338 parser.add_argument('--no-auto-wifi', action='store_true', help='Disables WiFi auto start if no connection is made')
339 # AirPort
340 parser.add_argument('--airport-baud', type=int, const=None, nargs='?', action='store', help='If configured as an AirPort device then this is the baud rate to use')
341 # RX Params
342 parser.add_argument('--rx-baud', type=int, const=420000, nargs='?', action='store', help='The receiver baudrate talking to the flight controller')
343 parser.add_argument('--lock-on-first-connection', dest='lock_on_first_connection', action='store_true', help='Lock RF mode on first connection')
344 parser.add_argument('--no-lock-on-first-connection', dest='lock_on_first_connection', action='store_false', help='Do not lock RF mode on first connection')
345 parser.set_defaults(lock_on_first_connection=None)
346 # TX Params
347 parser.add_argument('--tlm-report', type=int, const=240, nargs='?', action='store', help='The interval (in milliseconds) between telemetry packets')
348 parser.add_argument('--fan-min-runtime', type=int, const=30, nargs='?', action='store', help='The minimum amount of time the fan should run for (in seconds) if it turns on')
349 parser.add_argument('--unlock-higher-power', dest='unlock_higher_power', action='store_true', help='DANGER: Unlocks the higher power on modules that do not normally have sufficient cooling e.g. 1W on R9M')
350 parser.add_argument('--no-unlock-higher-power', dest='unlock_higher_power', action='store_false', help='Set the max power level at the safe maximum level')
351 parser.set_defaults(unlock_higher_power=None)
352 # Buzzer
353 parser.add_argument('--buzzer-mode', type=BuzzerMode, choices=list(BuzzerMode), default=None, help='Which buzzer mode to use, if there is a buzzer')
354 parser.add_argument('--buzzer-melody', type=str, default=None, help='If the mode is "custom", then this is the tune')
355 # Regulatory domain
356 parser.add_argument('--domain', type=RegulatoryDomain, choices=list(RegulatoryDomain), default=None, help='For SX127X based devices, which regulatory domain is being used')
357 # Unified target
358 parser.add_argument('--target', type=str, help='Unified target JSON path')
359 # Flashing options
360 parser.add_argument("--flash", type=UploadMethod, choices=list(UploadMethod), help="Flashing Method")
361 parser.add_argument("--erase", action='store_true', default=False, help="Full chip erase before flashing on ESP devices")
362 parser.add_argument('--out', action=writeable_dir, default=None)
363 parser.add_argument("--port", type=str, help="SerialPort or WiFi address to flash firmware to")
364 parser.add_argument("--baud", type=int, default=0, help="Baud rate for serial communication")
365 parser.add_argument("--force", action='store_true', default=False, help="Force upload even if target does not match")
366 parser.add_argument("--confirm", action='store_true', default=False, help="Confirm upload if a mismatched target was previously uploaded")
367 parser.add_argument("--tx", action='store_true', default=False, help="Flash a TX module, RX if not specified")
368 parser.add_argument("--lbt", action='store_true', default=False, help="Use LBT firmware, default is FCC (only for 2.4GHz firmware)")
369 parser.add_argument('--rx-as-tx', type=TXType, choices=list(TXType), required=False, default=None, help="Flash an RX module with TX firmware, either internal (full-duplex) or external (half-duplex)")
370 # Deprecated options, left for backward compatibility
371 parser.add_argument('--uart-inverted', action=deprecate_action, nargs=0, help='Deprecated')
372 parser.add_argument('--no-uart-inverted', action=deprecate_action, nargs=0, help='Deprecated')
375 # Firmware file to patch/configure
376 parser.add_argument("file", nargs="?", type=argparse.FileType("r+b"))
378 args = parser.parse_args()
380 if args.dir is not None:
381 os.chdir(args.dir)
383 if args.file is None:
384 args.target, config = ask_for_firmware(args)
385 try:
386 file = config['firmware']
387 if args.rx_as_tx is not None:
388 if config['platform'].startswith('esp32') or config['platform'].startswith('esp8285') and args.rx_as_tx == TXType.internal:
389 file = file.replace('_RX', '_TX')
390 else:
391 print("Selected device cannot operate as 'RX-as-TX' of this type.")
392 print("STM32 does not support RX as TX.")
393 print("ESP8285 only supports full-duplex internal RX as TX.")
394 exit(1)
395 firmware_dir = '' if args.fdir is None else args.fdir + '/'
396 srcdir = firmware_dir + ('LBT/' if args.lbt else 'FCC/') + file
397 dst = 'firmware.bin'
398 shutil.copy2(srcdir + '/firmware.bin', ".")
399 if os.path.exists(srcdir + '/bootloader.bin'): shutil.copy2(srcdir + '/bootloader.bin', ".")
400 if os.path.exists(srcdir + '/partitions.bin'): shutil.copy2(srcdir + '/partitions.bin', ".")
401 if os.path.exists(srcdir + '/boot_app0.bin'): shutil.copy2(srcdir + '/boot_app0.bin', ".")
402 args.file = open(dst, 'r+b')
403 except FileNotFoundError:
404 print("Firmware files not found, did you download and unpack them in this directory?")
405 exit(1)
406 else:
407 args.target, config = ask_for_firmware(args)
409 with args.file as f:
410 mm = mmap.mmap(f.fileno(), 0)
412 pos = firmware.get_hardware(mm)
413 options = FirmwareOptions(
414 False if config['platform'] == 'stm32' else True,
415 True if 'features' in config and 'buzzer' in config['features'] else False,
416 MCUType.STM32 if config['platform'] == 'stm32' else MCUType.ESP32 if config['platform'].startswith('esp32') else MCUType.ESP8266,
417 DeviceType.RX if '.rx_' in args.target else DeviceType.TX,
418 RadioType.SX127X if '_900.' in args.target else RadioType.SX1280 if '_2400.' in args.target else RadioType.LR1121,
419 config['lua_name'] if 'lua_name' in config else '',
420 config['stlink']['bootloader'] if 'stlink' in config else '',
421 config['stlink']['offset'] if 'stlink' in config else 0,
422 config['firmware']
424 patch_firmware(options, mm, pos, args)
425 args.file.close()
427 if options.mcuType == MCUType.ESP8266:
428 import gzip
429 with open(args.file.name, 'rb') as f_in:
430 with gzip.open('firmware.bin.gz', 'wb') as f_out:
431 shutil.copyfileobj(f_in, f_out)
433 if args.flash:
434 args.target = config.get('firmware')
435 args.accept = config.get('prior_target_name')
436 args.platform = config.get('platform')
437 return binary_flash.upload(options, args)
438 elif 'upload_methods' in config and 'stock' in config['upload_methods']:
439 shutil.copy(args.file.name, 'firmware.elrs')
440 return 0
442 if __name__ == '__main__':
443 try:
444 exit(main())
445 except AssertionError as e:
446 print(e)
447 exit(1)