Add tlmConfirm to tlm_dl ota packet-structure (#2991)
[ExpressLRS.git] / src / python / BFinitPassthrough.py
blobf4ac6c9233a4a597658217be29b447c94537adcd
1 import serial, time, sys, re
2 import argparse
3 import serials_find
4 import SerialHelper
5 import bootloader
6 from query_yes_no import query_yes_no
7 from elrs_helpers import ElrsUploadResult
10 SCRIPT_DEBUG = False
13 class PassthroughEnabled(Exception):
14 pass
16 class PassthroughFailed(Exception):
17 pass
19 def dbg_print(line=''):
20 sys.stdout.write(line + '\n')
21 sys.stdout.flush()
24 def _validate_serialrx(rl, config, expected):
25 found = False
26 if type(expected) == str:
27 expected = [expected]
28 rl.set_delimiters(["# "])
29 rl.clear()
30 rl.write_str("get %s" % config)
31 line = rl.read_line(1.).strip()
32 for key in expected:
33 key = " = %s" % key
34 if key in line:
35 found = True
36 break
37 return found
40 def bf_passthrough_init(port, requestedBaudrate):
41 sys.stdout.flush()
42 dbg_print("======== PASSTHROUGH INIT ========")
43 dbg_print(" Trying to initialize %s @ %s" % (port, requestedBaudrate))
45 s = serial.Serial(port=port, baudrate=115200,
46 bytesize=8, parity='N', stopbits=1,
47 timeout=1, xonxoff=0, rtscts=0)
49 rl = SerialHelper.SerialHelper(s, 3., ['CCC', "# "])
50 rl.clear()
51 # Send start command '#'
52 rl.write_str("#")
53 start = rl.read_line(2.).strip()
54 #dbg_print("BF INIT: '%s'" % start.replace("\r", ""))
55 if "CCC" in start:
56 raise PassthroughEnabled("Passthrough already enabled and bootloader active")
57 elif not start or not start.endswith("#"):
58 raise PassthroughEnabled("No CLI available. Already in passthrough mode?, If this fails reboot FC and try again!")
60 serial_check = []
61 if not _validate_serialrx(rl, "serialrx_provider", ["CRSF", "ELRS"]):
62 serial_check.append("Serial Receiver Protocol is not set to CRSF! Hint: set serialrx_provider = CRSF")
63 if not _validate_serialrx(rl, "serialrx_inverted", "OFF"):
64 serial_check.append("Serial Receiver UART is inverted! Hint: set serialrx_inverted = OFF")
65 if not _validate_serialrx(rl, "serialrx_halfduplex", ["OFF", "AUTO"]):
66 serial_check.append("Serial Receiver UART is not in full duplex! Hint: set serialrx_halfduplex = OFF")
67 if _validate_serialrx(rl, "rx_spi_protocol", "EXPRESSLRS" ) and serial_check:
68 serial_check = [ "ExpressLRS SPI RX detected\n\nUpdate via betaflight to flash your RX\nhttps://www.expresslrs.org/2.0/hardware/spi-receivers/" ]
70 if serial_check:
71 error = "\n\n [ERROR] Invalid serial RX configuration detected:\n"
72 for err in serial_check:
73 error += " !!! %s !!!\n" % err
74 error += "\n Please change the configuration and try again!\n"
75 raise PassthroughFailed(error)
77 SerialRXindex = ""
79 dbg_print("\nAttempting to detect FC UART configuration...")
81 rl.set_delimiters(["\n"])
82 rl.clear()
83 rl.write_str("serial")
85 while True:
86 line = rl.read_line().strip()
87 #print("FC: '%s'" % line)
88 if not line or "#" in line:
89 break
91 if line.startswith("serial"):
92 if SCRIPT_DEBUG:
93 dbg_print(" '%s'" % line)
94 config = re.search('serial ([0-9]+) ([0-9]+) ', line)
95 if config and (int(config.group(2)) & 64 == 64):
96 dbg_print(" ** Serial RX config detected: '%s'" % line)
97 SerialRXindex = config.group(1)
98 if not SCRIPT_DEBUG:
99 break
101 if not SerialRXindex:
102 raise PassthroughFailed("!!! RX Serial not found !!!!\n Check configuration and try again...")
104 cmd = "serialpassthrough %s %s" % (SerialRXindex, requestedBaudrate, )
106 dbg_print("Enabling serial passthrough...")
107 dbg_print(" CMD: '%s'" % cmd)
108 rl.write_str(cmd)
109 time.sleep(.2)
110 s.close()
111 dbg_print("======== PASSTHROUGH DONE ========")
114 def reset_to_bootloader(port, baud, target, action, accept=None, chip_type='ESP82') -> int:
115 dbg_print("======== RESET TO BOOTLOADER ========")
116 s = serial.Serial(port=port, baudrate=baud,
117 bytesize=8, parity='N', stopbits=1,
118 timeout=1, xonxoff=0, rtscts=0)
119 rl = SerialHelper.SerialHelper(s, 3.)
120 rl.clear()
121 BootloaderInitSeq = bootloader.get_init_seq(chip_type)
122 dbg_print(" * Using full duplex (CRSF)")
123 #this is the training sequ for the ROM bootloader, we send it here so it doesn't auto-neg to the wrong baudrate by the BootloaderInitSeq that we send to reset ELRS
124 rl.write(b'\x07\x07\x12\x20' + 32 * b'\x55')
125 time.sleep(0.2)
126 rl.write(BootloaderInitSeq)
127 s.flush()
128 rx_target = rl.read_line().strip().upper()
129 if target is not None:
130 flash_target = re.sub("_VIA_.*", "", target.upper())
131 ignore_incorrect_target = action == "uploadforce"
132 if rx_target == "":
133 dbg_print("Cannot detect RX target, blindly flashing!")
134 elif ignore_incorrect_target:
135 dbg_print(f"Force flashing {flash_target}, detected {rx_target}")
136 elif rx_target != flash_target and rx_target != accept:
137 if query_yes_no("\n\n\nWrong target selected! your RX is '%s', trying to flash '%s', continue? Y/N\n" % (rx_target, flash_target)):
138 dbg_print("Ok, flashing anyway!")
139 else:
140 dbg_print("Wrong target selected your RX is '%s', trying to flash '%s'" % (rx_target, flash_target))
141 return ElrsUploadResult.ErrorMismatch
142 elif flash_target != "":
143 dbg_print("Verified RX target '%s'" % (flash_target))
144 time.sleep(.5)
145 s.close()
147 return ElrsUploadResult.Success
149 def init_passthrough(source, target, env) -> int:
150 env.AutodetectUploadPort([env])
151 try:
152 bf_passthrough_init(env['UPLOAD_PORT'], env['UPLOAD_SPEED'])
153 except PassthroughEnabled as err:
154 dbg_print(str(err))
155 return reset_to_bootloader(env['UPLOAD_PORT'], env['UPLOAD_SPEED'], env['PIOENV'], source[0])
157 def main(custom_args = None):
158 parser = argparse.ArgumentParser(
159 description="Initialize BetaFlight passthrough and optionally send a reboot comamnd sequence")
160 parser.add_argument("-b", "--baud", type=int, default=420000,
161 help="Baud rate for passthrough communication")
162 parser.add_argument("-p", "--port", type=str,
163 help="Override serial port autodetection and use PORT")
164 parser.add_argument("-r", "--target", type=str,
165 help="The target firmware that is going to be uploaded")
166 parser.add_argument("-nr", "--no-reset", action="store_false",
167 dest="reset_to_bl", help="Do not send reset_to_bootloader command sequence")
168 parser.add_argument("-t", "--type", type=str, default="ESP82",
169 help="Defines flash target type which is sent to target in reboot command")
170 parser.add_argument("-a", "--action", type=str, default="upload",
171 help="Upload action: upload (default), or uploadforce to flash even on target mismatch")
172 parser.add_argument("--accept", type=str, default=None,
173 help="Acceptable target to auto-overwrite")
175 args = parser.parse_args(custom_args)
177 if (args.port == None):
178 args.port = serials_find.get_serial_port()
180 returncode = ElrsUploadResult.Success
181 try:
182 bf_passthrough_init(args.port, args.baud)
183 except PassthroughEnabled as err:
184 dbg_print(str(err))
186 if args.reset_to_bl:
187 returncode = reset_to_bootloader(args.port, args.baud, args.target, args.action, args.accept, args.type)
189 return returncode
191 if __name__ == '__main__':
192 returncode = main()
193 exit(returncode)