1 import serial
, time
, sys
, re
6 from query_yes_no
import query_yes_no
7 from elrs_helpers
import ElrsUploadResult
13 class PassthroughEnabled(Exception):
16 class PassthroughFailed(Exception):
19 def dbg_print(line
=''):
20 sys
.stdout
.write(line
+ '\n')
24 def _validate_serialrx(rl
, config
, expected
):
26 if type(expected
) == str:
28 rl
.set_delimiters(["# "])
30 rl
.write_str("get %s" % config
)
31 line
= rl
.read_line(1.).strip()
40 def bf_passthrough_init(port
, requestedBaudrate
, half_duplex
=False):
42 dbg_print("======== PASSTHROUGH INIT ========")
43 dbg_print(" Trying to initialize %s @ %s" % (port
, requestedBaudrate
))
45 s
= serial
.Serial(port
=port
, baudrate
=115200,
46 bytesize
=8, parity
='N', stopbits
=1,
47 timeout
=1, xonxoff
=0, rtscts
=0)
49 rl
= SerialHelper
.SerialHelper(s
, 3., ['CCC', "# "])
51 # Send start command '#'
52 rl
.write_str("#", half_duplex
)
53 start
= rl
.read_line(2.).strip()
54 #dbg_print("BF INIT: '%s'" % start.replace("\r", ""))
56 raise PassthroughEnabled("Passthrough already enabled and bootloader active")
57 elif not start
or not start
.endswith("#"):
58 raise PassthroughEnabled("No CLI available. Already in passthrough mode?, If this fails reboot FC and try again!")
61 if not _validate_serialrx(rl
, "serialrx_provider", [["CRSF", "ELRS"], "GHST"][half_duplex
]):
62 serial_check
.append("Serial Receiver Protocol is not set to CRSF! Hint: set serialrx_provider = CRSF")
63 if not _validate_serialrx(rl
, "serialrx_inverted", "OFF"):
64 serial_check
.append("Serial Receiver UART is inverted! Hint: set serialrx_inverted = OFF")
65 if not _validate_serialrx(rl
, "serialrx_halfduplex", ["OFF", "AUTO"]):
66 serial_check
.append("Serial Receiver UART is not in full duplex! Hint: set serialrx_halfduplex = OFF")
67 if _validate_serialrx(rl
, "rx_spi_protocol", "EXPRESSLRS" ) and serial_check
:
68 serial_check
= [ "ExpressLRS SPI RX detected\n\nUpdate via betaflight to flash your RX\nhttps://www.expresslrs.org/2.0/hardware/spi-receivers/" ]
71 error
= "\n\n [ERROR] Invalid serial RX configuration detected:\n"
72 for err
in serial_check
:
73 error
+= " !!! %s !!!\n" % err
74 error
+= "\n Please change the configuration and try again!\n"
75 raise PassthroughFailed(error
)
79 dbg_print("\nAttempting to detect FC UART configuration...")
81 rl
.set_delimiters(["\n"])
83 rl
.write_str("serial")
86 line
= rl
.read_line().strip()
87 #print("FC: '%s'" % line)
88 if not line
or "#" in line
:
91 if line
.startswith("serial"):
93 dbg_print(" '%s'" % line
)
94 config
= re
.search('serial ([0-9]+) ([0-9]+) ', line
)
95 if config
and (int(config
.group(2)) & 64 == 64):
96 dbg_print(" ** Serial RX config detected: '%s'" % line
)
97 SerialRXindex
= config
.group(1)
101 if not SerialRXindex
:
102 raise PassthroughFailed("!!! RX Serial not found !!!!\n Check configuration and try again...")
104 cmd
= "serialpassthrough %s %s" % (SerialRXindex
, requestedBaudrate
, )
106 dbg_print("Enabling serial passthrough...")
107 dbg_print(" CMD: '%s'" % cmd
)
111 dbg_print("======== PASSTHROUGH DONE ========")
114 def reset_to_bootloader(port
, baud
, target
, action
, accept
=None, half_duplex
=False, chip_type
='ESP82') -> int:
115 dbg_print("======== RESET TO BOOTLOADER ========")
116 s
= serial
.Serial(port
=port
, baudrate
=baud
,
117 bytesize
=8, parity
='N', stopbits
=1,
118 timeout
=1, xonxoff
=0, rtscts
=0)
119 rl
= SerialHelper
.SerialHelper(s
, 3.)
122 BootloaderInitSeq
= bootloader
.get_init_seq('GHST', chip_type
)
123 dbg_print(" * Using half duplex (GHST)")
125 BootloaderInitSeq
= bootloader
.get_init_seq('CRSF', chip_type
)
126 dbg_print(" * Using full duplex (CRSF)")
127 #this is the training sequ for the ROM bootloader, we send it here so it doesn't auto-neg to the wrong baudrate by the BootloaderInitSeq that we send to reset ELRS
128 rl
.write(b
'\x07\x07\x12\x20' + 32 * b
'\x55')
130 rl
.write(BootloaderInitSeq
)
132 rx_target
= rl
.read_line().strip().upper()
133 if target
is not None:
134 flash_target
= re
.sub("_VIA_.*", "", target
.upper())
135 ignore_incorrect_target
= action
== "uploadforce"
137 dbg_print("Cannot detect RX target, blindly flashing!")
138 elif ignore_incorrect_target
:
139 dbg_print(f
"Force flashing {flash_target}, detected {rx_target}")
140 elif rx_target
!= flash_target
and rx_target
!= accept
:
141 if query_yes_no("\n\n\nWrong target selected! your RX is '%s', trying to flash '%s', continue? Y/N\n" % (rx_target
, flash_target
)):
142 dbg_print("Ok, flashing anyway!")
144 dbg_print("Wrong target selected your RX is '%s', trying to flash '%s'" % (rx_target
, flash_target
))
145 return ElrsUploadResult
.ErrorMismatch
146 elif flash_target
!= "":
147 dbg_print("Verified RX target '%s'" % (flash_target
))
151 return ElrsUploadResult
.Success
153 def init_passthrough(source
, target
, env
) -> int:
154 env
.AutodetectUploadPort([env
])
156 bf_passthrough_init(env
['UPLOAD_PORT'], env
['UPLOAD_SPEED'])
157 except PassthroughEnabled
as err
:
159 return reset_to_bootloader(env
['UPLOAD_PORT'], env
['UPLOAD_SPEED'], env
['PIOENV'], source
[0])
161 def main(custom_args
= None):
162 parser
= argparse
.ArgumentParser(
163 description
="Initialize BetaFlight passthrough and optionally send a reboot comamnd sequence")
164 parser
.add_argument("-b", "--baud", type=int, default
=420000,
165 help="Baud rate for passthrough communication")
166 parser
.add_argument("-p", "--port", type=str,
167 help="Override serial port autodetection and use PORT")
168 parser
.add_argument("-r", "--target", type=str,
169 help="The target firmware that is going to be uploaded")
170 parser
.add_argument("-nr", "--no-reset", action
="store_false",
171 dest
="reset_to_bl", help="Do not send reset_to_bootloader command sequence")
172 parser
.add_argument("-hd", "--half-duplex", action
="store_true",
173 dest
="half_duplex", help="Use half duplex mode")
174 parser
.add_argument("-t", "--type", type=str, default
="ESP82",
175 help="Defines flash target type which is sent to target in reboot command")
176 parser
.add_argument("-a", "--action", type=str, default
="upload",
177 help="Upload action: upload (default), or uploadforce to flash even on target mismatch")
178 parser
.add_argument("--accept", type=str, default
=None,
179 help="Acceptable target to auto-overwrite")
181 args
= parser
.parse_args(custom_args
)
183 if (args
.port
== None):
184 args
.port
= serials_find
.get_serial_port()
186 returncode
= ElrsUploadResult
.Success
188 bf_passthrough_init(args
.port
, args
.baud
)
189 except PassthroughEnabled
as err
:
193 returncode
= reset_to_bootloader(args
.port
, args
.baud
, args
.target
, args
.action
, args
.accept
, args
.half_duplex
, args
.type)
197 if __name__
== '__main__':