1 import serial
, time
, sys
, re
6 from query_yes_no
import query_yes_no
7 from elrs_helpers
import ElrsUploadResult
13 class PassthroughEnabled(Exception):
16 class PassthroughFailed(Exception):
19 def dbg_print(line
=''):
20 sys
.stdout
.write(line
+ '\n')
24 def _validate_serialrx(rl
, config
, expected
):
26 if type(expected
) == str:
28 rl
.set_delimiters(["# "])
30 rl
.write_str("get %s" % config
)
31 line
= rl
.read_line(1.).strip()
40 def bf_passthrough_init(port
, requestedBaudrate
):
42 dbg_print("======== PASSTHROUGH INIT ========")
43 dbg_print(" Trying to initialize %s @ %s" % (port
, requestedBaudrate
))
45 s
= serial
.Serial(port
=port
, baudrate
=115200,
46 bytesize
=8, parity
='N', stopbits
=1,
47 timeout
=1, xonxoff
=0, rtscts
=0)
49 rl
= SerialHelper
.SerialHelper(s
, 3., ['CCC', "# "])
51 # Send start command '#'
53 start
= rl
.read_line(2.).strip()
54 #dbg_print("BF INIT: '%s'" % start.replace("\r", ""))
56 raise PassthroughEnabled("Passthrough already enabled and bootloader active")
57 elif not start
or not start
.endswith("#"):
58 raise PassthroughEnabled("No CLI available. Already in passthrough mode?, If this fails reboot FC and try again!")
61 if not _validate_serialrx(rl
, "serialrx_provider", ["CRSF", "ELRS"]):
62 serial_check
.append("Serial Receiver Protocol is not set to CRSF! Hint: set serialrx_provider = CRSF")
63 if not _validate_serialrx(rl
, "serialrx_inverted", "OFF"):
64 serial_check
.append("Serial Receiver UART is inverted! Hint: set serialrx_inverted = OFF")
65 if not _validate_serialrx(rl
, "serialrx_halfduplex", ["OFF", "AUTO"]):
66 serial_check
.append("Serial Receiver UART is not in full duplex! Hint: set serialrx_halfduplex = OFF")
67 if _validate_serialrx(rl
, "rx_spi_protocol", "EXPRESSLRS" ) and serial_check
:
68 serial_check
= [ "ExpressLRS SPI RX detected\n\nUpdate via betaflight to flash your RX\nhttps://www.expresslrs.org/2.0/hardware/spi-receivers/" ]
71 error
= "\n\n [ERROR] Invalid serial RX configuration detected:\n"
72 for err
in serial_check
:
73 error
+= " !!! %s !!!\n" % err
74 error
+= "\n Please change the configuration and try again!\n"
75 raise PassthroughFailed(error
)
79 dbg_print("\nAttempting to detect FC UART configuration...")
81 rl
.set_delimiters(["\n"])
83 rl
.write_str("serial")
86 line
= rl
.read_line().strip()
87 #print("FC: '%s'" % line)
88 if not line
or "#" in line
:
91 if line
.startswith("serial"):
93 dbg_print(" '%s'" % line
)
94 config
= re
.search('serial ([0-9]+) ([0-9]+) ', line
)
95 if config
and (int(config
.group(2)) & 64 == 64):
96 dbg_print(" ** Serial RX config detected: '%s'" % line
)
97 SerialRXindex
= config
.group(1)
101 if not SerialRXindex
:
102 raise PassthroughFailed("!!! RX Serial not found !!!!\n Check configuration and try again...")
104 cmd
= "serialpassthrough %s %s" % (SerialRXindex
, requestedBaudrate
, )
106 dbg_print("Enabling serial passthrough...")
107 dbg_print(" CMD: '%s'" % cmd
)
111 dbg_print("======== PASSTHROUGH DONE ========")
114 def reset_to_bootloader(port
, baud
, target
, action
, accept
=None, chip_type
='ESP82') -> int:
115 dbg_print("======== RESET TO BOOTLOADER ========")
116 s
= serial
.Serial(port
=port
, baudrate
=baud
,
117 bytesize
=8, parity
='N', stopbits
=1,
118 timeout
=1, xonxoff
=0, rtscts
=0)
119 rl
= SerialHelper
.SerialHelper(s
, 3.)
121 BootloaderInitSeq
= bootloader
.get_init_seq(chip_type
)
122 dbg_print(" * Using full duplex (CRSF)")
123 #this is the training sequ for the ROM bootloader, we send it here so it doesn't auto-neg to the wrong baudrate by the BootloaderInitSeq that we send to reset ELRS
124 rl
.write(b
'\x07\x07\x12\x20' + 32 * b
'\x55')
126 rl
.write(BootloaderInitSeq
)
128 rx_target
= rl
.read_line().strip().upper()
129 if target
is not None:
130 flash_target
= re
.sub("_VIA_.*", "", target
.upper())
131 ignore_incorrect_target
= action
== "uploadforce"
133 dbg_print("Cannot detect RX target, blindly flashing!")
134 elif ignore_incorrect_target
:
135 dbg_print(f
"Force flashing {flash_target}, detected {rx_target}")
136 elif rx_target
!= flash_target
and rx_target
!= accept
:
137 if query_yes_no("\n\n\nWrong target selected! your RX is '%s', trying to flash '%s', continue? Y/N\n" % (rx_target
, flash_target
)):
138 dbg_print("Ok, flashing anyway!")
140 dbg_print("Wrong target selected your RX is '%s', trying to flash '%s'" % (rx_target
, flash_target
))
141 return ElrsUploadResult
.ErrorMismatch
142 elif flash_target
!= "":
143 dbg_print("Verified RX target '%s'" % (flash_target
))
147 return ElrsUploadResult
.Success
149 def init_passthrough(source
, target
, env
) -> int:
150 env
.AutodetectUploadPort([env
])
152 bf_passthrough_init(env
['UPLOAD_PORT'], env
['UPLOAD_SPEED'])
153 except PassthroughEnabled
as err
:
155 return reset_to_bootloader(env
['UPLOAD_PORT'], env
['UPLOAD_SPEED'], env
['PIOENV'], source
[0])
157 def main(custom_args
= None):
158 parser
= argparse
.ArgumentParser(
159 description
="Initialize BetaFlight passthrough and optionally send a reboot comamnd sequence")
160 parser
.add_argument("-b", "--baud", type=int, default
=420000,
161 help="Baud rate for passthrough communication")
162 parser
.add_argument("-p", "--port", type=str,
163 help="Override serial port autodetection and use PORT")
164 parser
.add_argument("-r", "--target", type=str,
165 help="The target firmware that is going to be uploaded")
166 parser
.add_argument("-nr", "--no-reset", action
="store_false",
167 dest
="reset_to_bl", help="Do not send reset_to_bootloader command sequence")
168 parser
.add_argument("-t", "--type", type=str, default
="ESP82",
169 help="Defines flash target type which is sent to target in reboot command")
170 parser
.add_argument("-a", "--action", type=str, default
="upload",
171 help="Upload action: upload (default), or uploadforce to flash even on target mismatch")
172 parser
.add_argument("--accept", type=str, default
=None,
173 help="Acceptable target to auto-overwrite")
175 args
= parser
.parse_args(custom_args
)
177 if (args
.port
== None):
178 args
.port
= serials_find
.get_serial_port()
180 returncode
= ElrsUploadResult
.Success
182 bf_passthrough_init(args
.port
, args
.baud
)
183 except PassthroughEnabled
as err
:
187 returncode
= reset_to_bootloader(args
.port
, args
.baud
, args
.target
, args
.action
, args
.accept
, args
.type)
191 if __name__
== '__main__':