Parse MSP messages from the backpack while TX is in mavlink mode (#2883)
[ExpressLRS.git] / src / python / BFinitPassthrough.py
blobcecf15d8cfb3be87ad1fe3937b74f34d5072f4b1
1 import serial, time, sys, re
2 import argparse
3 import serials_find
4 import SerialHelper
5 import bootloader
6 from query_yes_no import query_yes_no
7 from elrs_helpers import ElrsUploadResult
10 SCRIPT_DEBUG = False
13 class PassthroughEnabled(Exception):
14 pass
16 class PassthroughFailed(Exception):
17 pass
19 def dbg_print(line=''):
20 sys.stdout.write(line + '\n')
21 sys.stdout.flush()
24 def _validate_serialrx(rl, config, expected):
25 found = False
26 if type(expected) == str:
27 expected = [expected]
28 rl.set_delimiters(["# "])
29 rl.clear()
30 rl.write_str("get %s" % config)
31 line = rl.read_line(1.).strip()
32 for key in expected:
33 key = " = %s" % key
34 if key in line:
35 found = True
36 break
37 return found
40 def bf_passthrough_init(port, requestedBaudrate, half_duplex=False):
41 sys.stdout.flush()
42 dbg_print("======== PASSTHROUGH INIT ========")
43 dbg_print(" Trying to initialize %s @ %s" % (port, requestedBaudrate))
45 s = serial.Serial(port=port, baudrate=115200,
46 bytesize=8, parity='N', stopbits=1,
47 timeout=1, xonxoff=0, rtscts=0)
49 rl = SerialHelper.SerialHelper(s, 3., ['CCC', "# "])
50 rl.clear()
51 # Send start command '#'
52 rl.write_str("#", half_duplex)
53 start = rl.read_line(2.).strip()
54 #dbg_print("BF INIT: '%s'" % start.replace("\r", ""))
55 if "CCC" in start:
56 raise PassthroughEnabled("Passthrough already enabled and bootloader active")
57 elif not start or not start.endswith("#"):
58 raise PassthroughEnabled("No CLI available. Already in passthrough mode?, If this fails reboot FC and try again!")
60 serial_check = []
61 if not _validate_serialrx(rl, "serialrx_provider", [["CRSF", "ELRS"], "GHST"][half_duplex]):
62 serial_check.append("Serial Receiver Protocol is not set to CRSF! Hint: set serialrx_provider = CRSF")
63 if not _validate_serialrx(rl, "serialrx_inverted", "OFF"):
64 serial_check.append("Serial Receiver UART is inverted! Hint: set serialrx_inverted = OFF")
65 if not _validate_serialrx(rl, "serialrx_halfduplex", ["OFF", "AUTO"]):
66 serial_check.append("Serial Receiver UART is not in full duplex! Hint: set serialrx_halfduplex = OFF")
67 if _validate_serialrx(rl, "rx_spi_protocol", "EXPRESSLRS" ) and serial_check:
68 serial_check = [ "ExpressLRS SPI RX detected\n\nUpdate via betaflight to flash your RX\nhttps://www.expresslrs.org/2.0/hardware/spi-receivers/" ]
70 if serial_check:
71 error = "\n\n [ERROR] Invalid serial RX configuration detected:\n"
72 for err in serial_check:
73 error += " !!! %s !!!\n" % err
74 error += "\n Please change the configuration and try again!\n"
75 raise PassthroughFailed(error)
77 SerialRXindex = ""
79 dbg_print("\nAttempting to detect FC UART configuration...")
81 rl.set_delimiters(["\n"])
82 rl.clear()
83 rl.write_str("serial")
85 while True:
86 line = rl.read_line().strip()
87 #print("FC: '%s'" % line)
88 if not line or "#" in line:
89 break
91 if line.startswith("serial"):
92 if SCRIPT_DEBUG:
93 dbg_print(" '%s'" % line)
94 config = re.search('serial ([0-9]+) ([0-9]+) ', line)
95 if config and (int(config.group(2)) & 64 == 64):
96 dbg_print(" ** Serial RX config detected: '%s'" % line)
97 SerialRXindex = config.group(1)
98 if not SCRIPT_DEBUG:
99 break
101 if not SerialRXindex:
102 raise PassthroughFailed("!!! RX Serial not found !!!!\n Check configuration and try again...")
104 cmd = "serialpassthrough %s %s" % (SerialRXindex, requestedBaudrate, )
106 dbg_print("Enabling serial passthrough...")
107 dbg_print(" CMD: '%s'" % cmd)
108 rl.write_str(cmd)
109 time.sleep(.2)
110 s.close()
111 dbg_print("======== PASSTHROUGH DONE ========")
114 def reset_to_bootloader(port, baud, target, action, accept=None, half_duplex=False, chip_type='ESP82') -> int:
115 dbg_print("======== RESET TO BOOTLOADER ========")
116 s = serial.Serial(port=port, baudrate=baud,
117 bytesize=8, parity='N', stopbits=1,
118 timeout=1, xonxoff=0, rtscts=0)
119 rl = SerialHelper.SerialHelper(s, 3.)
120 rl.clear()
121 if half_duplex:
122 BootloaderInitSeq = bootloader.get_init_seq('GHST', chip_type)
123 dbg_print(" * Using half duplex (GHST)")
124 else:
125 BootloaderInitSeq = bootloader.get_init_seq('CRSF', chip_type)
126 dbg_print(" * Using full duplex (CRSF)")
127 #this is the training sequ for the ROM bootloader, we send it here so it doesn't auto-neg to the wrong baudrate by the BootloaderInitSeq that we send to reset ELRS
128 rl.write(b'\x07\x07\x12\x20' + 32 * b'\x55')
129 time.sleep(0.2)
130 rl.write(BootloaderInitSeq)
131 s.flush()
132 rx_target = rl.read_line().strip().upper()
133 if target is not None:
134 flash_target = re.sub("_VIA_.*", "", target.upper())
135 ignore_incorrect_target = action == "uploadforce"
136 if rx_target == "":
137 dbg_print("Cannot detect RX target, blindly flashing!")
138 elif ignore_incorrect_target:
139 dbg_print(f"Force flashing {flash_target}, detected {rx_target}")
140 elif rx_target != flash_target and rx_target != accept:
141 if query_yes_no("\n\n\nWrong target selected! your RX is '%s', trying to flash '%s', continue? Y/N\n" % (rx_target, flash_target)):
142 dbg_print("Ok, flashing anyway!")
143 else:
144 dbg_print("Wrong target selected your RX is '%s', trying to flash '%s'" % (rx_target, flash_target))
145 return ElrsUploadResult.ErrorMismatch
146 elif flash_target != "":
147 dbg_print("Verified RX target '%s'" % (flash_target))
148 time.sleep(.5)
149 s.close()
151 return ElrsUploadResult.Success
153 def init_passthrough(source, target, env) -> int:
154 env.AutodetectUploadPort([env])
155 try:
156 bf_passthrough_init(env['UPLOAD_PORT'], env['UPLOAD_SPEED'])
157 except PassthroughEnabled as err:
158 dbg_print(str(err))
159 return reset_to_bootloader(env['UPLOAD_PORT'], env['UPLOAD_SPEED'], env['PIOENV'], source[0])
161 def main(custom_args = None):
162 parser = argparse.ArgumentParser(
163 description="Initialize BetaFlight passthrough and optionally send a reboot comamnd sequence")
164 parser.add_argument("-b", "--baud", type=int, default=420000,
165 help="Baud rate for passthrough communication")
166 parser.add_argument("-p", "--port", type=str,
167 help="Override serial port autodetection and use PORT")
168 parser.add_argument("-r", "--target", type=str,
169 help="The target firmware that is going to be uploaded")
170 parser.add_argument("-nr", "--no-reset", action="store_false",
171 dest="reset_to_bl", help="Do not send reset_to_bootloader command sequence")
172 parser.add_argument("-hd", "--half-duplex", action="store_true",
173 dest="half_duplex", help="Use half duplex mode")
174 parser.add_argument("-t", "--type", type=str, default="ESP82",
175 help="Defines flash target type which is sent to target in reboot command")
176 parser.add_argument("-a", "--action", type=str, default="upload",
177 help="Upload action: upload (default), or uploadforce to flash even on target mismatch")
178 parser.add_argument("--accept", type=str, default=None,
179 help="Acceptable target to auto-overwrite")
181 args = parser.parse_args(custom_args)
183 if (args.port == None):
184 args.port = serials_find.get_serial_port()
186 returncode = ElrsUploadResult.Success
187 try:
188 bf_passthrough_init(args.port, args.baud)
189 except PassthroughEnabled as err:
190 dbg_print(str(err))
192 if args.reset_to_bl:
193 returncode = reset_to_bootloader(args.port, args.baud, args.target, args.action, args.accept, args.half_duplex, args.type)
195 return returncode
197 if __name__ == '__main__':
198 returncode = main()
199 exit(returncode)