4 from random
import randint
7 from json
import JSONEncoder
14 from firmware
import DeviceType
, FirmwareOptions
, RadioType
, MCUType
, TXType
16 import UnifiedConfiguration
18 from binary_flash
import UploadMethod
19 from external
import jmespath
21 class BuzzerMode(Enum
):
25 default
= 'default-tune'
26 custom
= 'custom-tune'
31 class RegulatoryDomain(Enum
):
33 us_433_wide
= 'us_433_wide'
44 def write32(mm
, pos
, val
):
46 mm
[pos
+ 0] = (val
>> 0) & 0xFF
47 mm
[pos
+ 1] = (val
>> 8) & 0xFF
48 mm
[pos
+ 2] = (val
>> 16) & 0xFF
49 mm
[pos
+ 3] = (val
>> 24) & 0xFF
54 val
+= mm
[pos
+ 1] << 8
55 val
+= mm
[pos
+ 2] << 16
56 val
+= mm
[pos
+ 3] << 24
59 def writeString(mm
, pos
, string
, maxlen
):
64 mm
[pos
:pos
+l
] = string
.encode()[0,l
]
68 def readString(mm
, pos
, maxlen
):
69 val
= mm
[pos
:mm
.find(b
'\x00', pos
)].decode()
70 return pos
+ maxlen
, val
72 def generateUID(phrase
):
74 int(item
) if item
.isdigit() else -1
75 for item
in phrase
.split(',')
77 if (4 <= len(uid
) <= 6) and all(ele
>= 0 and ele
< 256 for ele
in uid
):
78 # Extend the UID to 6 bytes, as only 4 are needed to bind
79 uid
= [0] * (6 - len(uid
)) + uid
82 uid
= hashlib
.md5(("-DMY_BINDING_PHRASE=\""+phrase
+"\"").encode()).digest()[0:6]
85 def patch_uid(mm
, pos
, args
):
88 mm
[pos
+1:pos
+ 7] = generateUID(args
.phrase
)
92 def patch_flash_discriminator(mm
, pos
, args
):
93 return write32(mm
, pos
, args
.flash_discriminator
)
95 def patch_wifi(mm
, pos
, args
):
100 interval
= args
.auto_wifi
* 1000
101 pos
= write32(mm
, pos
, interval
)
102 pos
= writeString(mm
, pos
, args
.ssid
, 33)
103 pos
= writeString(mm
, pos
, args
.password
, 65)
106 def patch_rx_params(mm
, pos
, args
):
107 pos
= write32(mm
, pos
, args
.rx_baud
if args
.airport_baud
is None else args
.airport_baud
)
109 val
&= ~
1 # unused1 - ex invert_tx
110 if args
.lock_on_first_connection
!= None:
112 val |
= (args
.lock_on_first_connection
<< 1)
113 val
&= ~
4 # unused2 - ex r9mm_mini_sbus
114 if args
.airport_baud
!= None:
116 val |
= 0 if args
.airport_baud
== 0 else 8
120 def patch_tx_params(mm
, pos
, args
, options
):
121 pos
= write32(mm
, pos
, args
.tlm_report
)
122 pos
= write32(mm
, pos
, args
.fan_min_runtime
)
124 val
&= ~
1 # unused1 - ex uart_inverted
125 if args
.unlock_higher_power
!= None:
127 val |
= (args
.unlock_higher_power
<< 1)
128 if args
.airport_baud
!= None:
130 val |
= 0 if args
.airport_baud
== 0 else 4
133 if options
.hasBuzzer
:
134 pos
= patch_buzzer(mm
, pos
, args
)
135 pos
= write32(mm
, pos
, 0 if args
.airport_baud
is None else args
.airport_baud
)
138 def patch_buzzer(mm
, pos
, args
):
139 melody
= args
.buzzer_melody
141 if args
.buzzer_mode
== BuzzerMode
.quiet
:
143 if args
.buzzer_mode
== BuzzerMode
.one
:
145 if args
.buzzer_mode
== BuzzerMode
.beep
:
147 melody
= 'A4 20 B4 20|60|0'
148 if args
.buzzer_mode
== BuzzerMode
.default
:
150 melody
= 'E5 40 E5 40 C5 120 E5 40 G5 22 G4 21|20|0'
151 if args
.buzzer_mode
== BuzzerMode
.custom
:
153 melody
= args
.buzzer_melody
158 if mode
== 2 and melody
:
159 melodyArray
= melodyparser
.parseToArray(melody
)
160 for element
in melodyArray
:
162 print("Melody truncated at 32 tones")
164 mm
[pos
+mpos
+0] = (int(element
[0]) >> 0) & 0xFF
165 mm
[pos
+mpos
+1] = (int(element
[0]) >> 8) & 0xFF
166 mm
[pos
+mpos
+2] = (int(element
[1]) >> 0) & 0xFF
167 mm
[pos
+mpos
+3] = (int(element
[1]) >> 8) & 0xFF
169 # If we are short, then add a terminating 0's
174 pos
+= 32*4 # 32 notes x (2 bytes tone, 2 bytes duration)
177 def FREQ_HZ_TO_REG_VAL_SX127X(freq
):
178 return int(freq
/61.03515625)
180 def FREQ_HZ_TO_REG_VAL_SX1280(freq
):
181 return int(freq
/(52000000.0/pow(2,18)))
183 def generate_domain(mm
, pos
, count
, init
, step
):
184 pos
= write32(mm
, pos
, count
)
186 for x
in range(count
):
187 pos
= write32(mm
, pos
, FREQ_HZ_TO_REG_VAL_SX127X(val
))
190 def domain_number(domain
):
191 if domain
== RegulatoryDomain
.au_915
:
193 elif domain
== RegulatoryDomain
.fcc_915
:
195 elif domain
== RegulatoryDomain
.eu_868
:
197 elif domain
== RegulatoryDomain
.in_866
:
199 elif domain
== RegulatoryDomain
.au_433
:
201 elif domain
== RegulatoryDomain
.eu_433
:
203 elif domain
== RegulatoryDomain
.us_433
:
205 elif domain
== RegulatoryDomain
.us_433_wide
:
208 def patch_firmware(options
, mm
, pos
, args
):
209 if options
.mcuType
is MCUType
.STM32
:
210 if options
.radioChip
is RadioType
.SX127X
and args
.domain
:
211 mm
[pos
] = domain_number(args
.domain
)
213 pos
= patch_uid(mm
, pos
, args
)
214 pos
= patch_flash_discriminator(mm
, pos
, args
)
215 if options
.deviceType
is DeviceType
.TX
:
216 pos
= patch_tx_params(mm
, pos
, args
, options
)
217 elif options
.deviceType
is DeviceType
.RX
:
218 pos
= patch_rx_params(mm
, pos
, args
)
220 patch_unified(args
, options
)
222 def patch_unified(args
, options
):
224 if args
.phrase
is not None:
225 json_flags
['uid'] = [x
for x
in generateUID(args
.phrase
)]
226 if args
.ssid
is not None:
227 json_flags
['wifi-ssid'] = args
.ssid
228 if args
.password
is not None and args
.ssid
is not None:
229 json_flags
['wifi-password'] = args
.password
230 if args
.auto_wifi
is not None:
231 json_flags
['wifi-on-interval'] = args
.auto_wifi
233 if args
.tlm_report
is not None:
234 json_flags
['tlm-interval'] = args
.tlm_report
235 if args
.unlock_higher_power
is not None:
236 json_flags
['unlock-higher-power'] = args
.unlock_higher_power
237 if args
.fan_min_runtime
is not None:
238 json_flags
['fan-runtime'] = args
.fan_min_runtime
240 if args
.airport_baud
is not None:
241 json_flags
['is-airport'] = True
242 if options
.deviceType
is DeviceType
.RX
:
243 json_flags
['rcvr-uart-baud'] = args
.airport_baud
245 json_flags
['airport-uart-baud'] = args
.airport_baud
246 elif args
.rx_baud
is not None:
247 json_flags
['rcvr-uart-baud'] = args
.rx_baud
249 if args
.lock_on_first_connection
is not None:
250 json_flags
['lock-on-first-connection'] = args
.lock_on_first_connection
252 if args
.domain
is not None:
253 json_flags
['domain'] = domain_number(args
.domain
)
255 json_flags
['flash-discriminator'] = randint(1,2**32-1)
257 UnifiedConfiguration
.doConfiguration(
259 JSONEncoder().encode(json_flags
),
261 'tx' if options
.deviceType
is DeviceType
.TX
else 'rx',
262 '2400' if options
.radioChip
is RadioType
.SX1280
else '900' if options
.radioChip
is RadioType
.SX127X
else 'dual',
263 '32' if options
.mcuType
is MCUType
.ESP32
and options
.deviceType
is DeviceType
.RX
else '',
268 def length_check(l
, f
):
271 raise argparse
.ArgumentTypeError(f
'too long, {l} chars max')
276 def ask_for_firmware(args
):
277 moduletype
= 'tx' if args
.tx
else 'rx'
278 with
open('hardware/targets.json') as f
:
279 targets
= json
.load(f
)
281 if args
.target
is not None:
283 config
= jmespath
.search('.'.join(map(lambda s
: f
'"{s}"', args
.target
.split('.'))), targets
)
286 for k
in jmespath
.search(f
'*.["{moduletype}_2400","{moduletype}_900","{moduletype}_dual"][].*[]', targets
):
289 print(f
"{i}) {k['product_name']}")
290 print('Choose a configuration to flash')
293 config
= products
[int(choice
)-1]
297 for m
in targets
[v
][t
]:
298 if targets
[v
][t
][m
]['product_name'] == config
['product_name']:
299 target
= f
'{v}.{t}.{m}'
300 return target
, config
302 class readable_dir(argparse
.Action
):
303 def __call__(self
, parser
, namespace
, values
, option_string
=None):
304 prospective_dir
=values
305 if not os
.path
.isdir(prospective_dir
):
306 raise argparse
.ArgumentTypeError("readable_dir:{0} is not a valid path".format(prospective_dir
))
307 if os
.access(prospective_dir
, os
.R_OK
):
308 setattr(namespace
,self
.dest
,prospective_dir
)
310 raise argparse
.ArgumentTypeError("readable_dir:{0} is not a readable dir".format(prospective_dir
))
312 class writeable_dir(argparse
.Action
):
313 def __call__(self
, parser
, namespace
, values
, option_string
=None):
314 prospective_dir
=values
315 if not os
.path
.isdir(prospective_dir
):
316 raise argparse
.ArgumentTypeError("readable_dir:{0} is not a valid path".format(prospective_dir
))
317 if os
.access(prospective_dir
, os
.W_OK
):
318 setattr(namespace
,self
.dest
,prospective_dir
)
320 raise argparse
.ArgumentTypeError("readable_dir:{0} is not a writeable dir".format(prospective_dir
))
322 class deprecate_action(argparse
.Action
):
323 def __call__(self
, parser
, namespace
, values
, option_string
=None):
324 delattr(namespace
, self
.dest
)
327 parser
= argparse
.ArgumentParser(description
="Configure Binary Firmware")
328 # firmware/targets directory
329 parser
.add_argument('--dir', action
=readable_dir
, default
=None, help='The directory that contains the "hardware" and other firmware directories')
330 parser
.add_argument('--fdir', action
=readable_dir
, default
=None, help='If specified, then the firmware files are loaded from this directory')
332 parser
.add_argument('--phrase', type=str, help='Your personal binding phrase')
333 parser
.add_argument('--flash-discriminator', type=int, default
=randint(1,2**32-1), dest
='flash_discriminator', help='Force a fixed flash-descriminator instead of random')
335 parser
.add_argument('--ssid', type=length_check(32, "ssid"), required
=False, help='Home network SSID')
336 parser
.add_argument('--password', type=length_check(64, "password"), required
=False, help='Home network password')
337 parser
.add_argument('--auto-wifi', type=int, help='Interval (in seconds) before WiFi auto starts, if no connection is made')
338 parser
.add_argument('--no-auto-wifi', action
='store_true', help='Disables WiFi auto start if no connection is made')
340 parser
.add_argument('--airport-baud', type=int, const
=None, nargs
='?', action
='store', help='If configured as an AirPort device then this is the baud rate to use')
342 parser
.add_argument('--rx-baud', type=int, const
=420000, nargs
='?', action
='store', help='The receiver baudrate talking to the flight controller')
343 parser
.add_argument('--lock-on-first-connection', dest
='lock_on_first_connection', action
='store_true', help='Lock RF mode on first connection')
344 parser
.add_argument('--no-lock-on-first-connection', dest
='lock_on_first_connection', action
='store_false', help='Do not lock RF mode on first connection')
345 parser
.set_defaults(lock_on_first_connection
=None)
347 parser
.add_argument('--tlm-report', type=int, const
=240, nargs
='?', action
='store', help='The interval (in milliseconds) between telemetry packets')
348 parser
.add_argument('--fan-min-runtime', type=int, const
=30, nargs
='?', action
='store', help='The minimum amount of time the fan should run for (in seconds) if it turns on')
349 parser
.add_argument('--unlock-higher-power', dest
='unlock_higher_power', action
='store_true', help='DANGER: Unlocks the higher power on modules that do not normally have sufficient cooling e.g. 1W on R9M')
350 parser
.add_argument('--no-unlock-higher-power', dest
='unlock_higher_power', action
='store_false', help='Set the max power level at the safe maximum level')
351 parser
.set_defaults(unlock_higher_power
=None)
353 parser
.add_argument('--buzzer-mode', type=BuzzerMode
, choices
=list(BuzzerMode
), default
=None, help='Which buzzer mode to use, if there is a buzzer')
354 parser
.add_argument('--buzzer-melody', type=str, default
=None, help='If the mode is "custom", then this is the tune')
356 parser
.add_argument('--domain', type=RegulatoryDomain
, choices
=list(RegulatoryDomain
), default
=None, help='For SX127X based devices, which regulatory domain is being used')
358 parser
.add_argument('--target', type=str, help='Unified target JSON path')
360 parser
.add_argument("--flash", type=UploadMethod
, choices
=list(UploadMethod
), help="Flashing Method")
361 parser
.add_argument("--erase", action
='store_true', default
=False, help="Full chip erase before flashing on ESP devices")
362 parser
.add_argument('--out', action
=writeable_dir
, default
=None)
363 parser
.add_argument("--port", type=str, help="SerialPort or WiFi address to flash firmware to")
364 parser
.add_argument("--baud", type=int, default
=0, help="Baud rate for serial communication")
365 parser
.add_argument("--force", action
='store_true', default
=False, help="Force upload even if target does not match")
366 parser
.add_argument("--confirm", action
='store_true', default
=False, help="Confirm upload if a mismatched target was previously uploaded")
367 parser
.add_argument("--tx", action
='store_true', default
=False, help="Flash a TX module, RX if not specified")
368 parser
.add_argument("--lbt", action
='store_true', default
=False, help="Use LBT firmware, default is FCC (only for 2.4GHz firmware)")
369 parser
.add_argument('--rx-as-tx', type=TXType
, choices
=list(TXType
), required
=False, default
=None, help="Flash an RX module with TX firmware, either internal (full-duplex) or external (half-duplex)")
370 # Deprecated options, left for backward compatibility
371 parser
.add_argument('--uart-inverted', action
=deprecate_action
, nargs
=0, help='Deprecated')
372 parser
.add_argument('--no-uart-inverted', action
=deprecate_action
, nargs
=0, help='Deprecated')
375 # Firmware file to patch/configure
376 parser
.add_argument("file", nargs
="?", type=argparse
.FileType("r+b"))
378 args
= parser
.parse_args()
380 if args
.dir is not None:
383 if args
.file is None:
384 args
.target
, config
= ask_for_firmware(args
)
386 file = config
['firmware']
387 if args
.rx_as_tx
is not None:
388 if config
['platform'].startswith('esp32') or config
['platform'].startswith('esp8285') and args
.rx_as_tx
== TXType
.internal
:
389 file = file.replace('_RX', '_TX')
391 print("Selected device cannot operate as 'RX-as-TX' of this type.")
392 print("STM32 does not support RX as TX.")
393 print("ESP8285 only supports full-duplex internal RX as TX.")
395 firmware_dir
= '' if args
.fdir
is None else args
.fdir
+ '/'
396 srcdir
= firmware_dir
+ ('LBT/' if args
.lbt
else 'FCC/') + file
398 shutil
.copy2(srcdir
+ '/firmware.bin', ".")
399 if os
.path
.exists(srcdir
+ '/bootloader.bin'): shutil
.copy2(srcdir
+ '/bootloader.bin', ".")
400 if os
.path
.exists(srcdir
+ '/partitions.bin'): shutil
.copy2(srcdir
+ '/partitions.bin', ".")
401 if os
.path
.exists(srcdir
+ '/boot_app0.bin'): shutil
.copy2(srcdir
+ '/boot_app0.bin', ".")
402 args
.file = open(dst
, 'r+b')
403 except FileNotFoundError
:
404 print("Firmware files not found, did you download and unpack them in this directory?")
407 args
.target
, config
= ask_for_firmware(args
)
410 mm
= mmap
.mmap(f
.fileno(), 0)
412 pos
= firmware
.get_hardware(mm
)
413 options
= FirmwareOptions(
414 False if config
['platform'] == 'stm32' else True,
415 True if 'features' in config
and 'buzzer' in config
['features'] else False,
416 MCUType
.STM32
if config
['platform'] == 'stm32' else MCUType
.ESP32
if config
['platform'].startswith('esp32') else MCUType
.ESP8266
,
417 DeviceType
.RX
if '.rx_' in args
.target
else DeviceType
.TX
,
418 RadioType
.SX127X
if '_900.' in args
.target
else RadioType
.SX1280
if '_2400.' in args
.target
else RadioType
.LR1121
,
419 config
['lua_name'] if 'lua_name' in config
else '',
420 config
['stlink']['bootloader'] if 'stlink' in config
else '',
421 config
['stlink']['offset'] if 'stlink' in config
else 0,
424 patch_firmware(options
, mm
, pos
, args
)
427 if options
.mcuType
== MCUType
.ESP8266
:
429 with
open(args
.file.name
, 'rb') as f_in
:
430 with gzip
.open('firmware.bin.gz', 'wb') as f_out
:
431 shutil
.copyfileobj(f_in
, f_out
)
434 args
.target
= config
.get('firmware')
435 args
.accept
= config
.get('prior_target_name')
436 args
.platform
= config
.get('platform')
437 return binary_flash
.upload(options
, args
)
438 elif 'upload_methods' in config
and 'stock' in config
['upload_methods']:
439 shutil
.copy(args
.file.name
, 'firmware.elrs')
442 if __name__
== '__main__':
445 except AssertionError as e
: