2 from elrs_helpers
import ElrsUploadResult
5 def process_http_result(output_json_file
: str) -> int:
7 # Print the HTTP result that was saved to the json file
8 # Returns: ElrsUploadResult enum
9 with
open(output_json_file
) as f
:
10 output_json
= json
.load(f
)
12 result
= output_json
['status']
13 msg
= output_json
['msg']
16 # Update complete. Please wait for LED to resume blinking before disconnecting power.
17 msg
= f
'UPLOAD SUCCESS\n\033[32m{msg}\033[0m' # green
18 # 'ok' is the only acceptable result
19 retval
= ElrsUploadResult
.Success
20 elif result
== 'mismatch':
21 # <b>Current target:</b> LONG_ASS_NAME.<br><b>Uploaded image:</b> OTHER_NAME.<br/><br/>Flashing the wrong firmware may lock or damage your device.
22 msg
= msg
.replace('<br>', '\n').replace('<br/>', '\n') # convert breaks to newline
23 msg
= msg
.replace('<b>', '\033[34m').replace('</b>', '\033[0m') # bold to blue
24 msg
= '\033[33mTARGET MISMATCH\033[0m\n' + msg
# yellow warning
25 retval
= ElrsUploadResult
.ErrorMismatch
28 msg
= f
'UPLOAD ERROR\n\033[31m{msg}\033[0m' # red
29 retval
= ElrsUploadResult
.ErrorGeneral
32 print(msg
, flush
=True)
36 def do_upload(elrs_bin_target
, pio_target
, upload_addr
, env
):
37 bootloader_target
= None
38 app_start
= 0 # eka bootloader offset
40 bin_upload_output
= os
.path
.splitext(elrs_bin_target
)[0] + '-output.json'
41 if os
.path
.exists(bin_upload_output
):
42 os
.remove(bin_upload_output
)
44 cmd
= ["curl", "--max-time", "60",
45 "--retry", "2", "--retry-delay", "1",
46 "--header", "X-FileSize: " + str(os
.path
.getsize(elrs_bin_target
)),
47 "-o", "%s" % (bin_upload_output
)]
52 if pio_target
== 'uploadconfirm':
53 uri
= 'forceupdate?action=confirm'
55 if pio_target
== 'uploadforce':
56 cmd
+= ["-F", "force=1"]
58 cmd
+= "-F", "data=@%s" % (elrs_bin_target
),
60 upload_port
= env
.get('UPLOAD_PORT', None)
61 if upload_port
is not None:
62 upload_addr
= [upload_port
]
64 returncode
= ElrsUploadResult
.ErrorGeneral
65 for addr
in upload_addr
:
66 addr
= "http://%s/%s" % (addr
, uri
)
67 print(" ** UPLOADING TO: %s" % addr
)
69 subprocess
.check_call(cmd
+ [addr
])
70 returncode
= process_http_result(bin_upload_output
)
71 except subprocess
.CalledProcessError
as e
:
72 returncode
= e
.returncode
73 if returncode
== ElrsUploadResult
.Success
:
76 if returncode
!= ElrsUploadResult
.Success
:
77 print("WIFI upload FAILED!")
80 def on_upload(source
, target
, env
):
81 firmware_path
= str(source
[0])
82 bin_path
= os
.path
.dirname(firmware_path
)
83 upload_addr
= ['elrs_tx', 'elrs_tx.local']
84 elrs_bin_target
= os
.path
.join(bin_path
, 'firmware.elrs')
85 if not os
.path
.exists(elrs_bin_target
):
86 elrs_bin_target
= os
.path
.join(bin_path
, 'firmware.bin.gz')
87 if not os
.path
.exists(elrs_bin_target
):
88 elrs_bin_target
= os
.path
.join(bin_path
, 'firmware.bin')
89 if not os
.path
.exists(elrs_bin_target
):
90 raise Exception("No valid binary found!")
92 pio_target
= target
[0].name
93 return do_upload(elrs_bin_target
, pio_target
, upload_addr
, env
)