Add tlmConfirm to tlm_dl ota packet-structure (#2991)
[ExpressLRS.git] / src / python / SerialHelper.py
blobb06dce50a0bdc4e5aa1247e8847bc0f8aec10ddc
1 import time
3 encoding = "utf-8"
5 class SerialHelper:
6 def __init__(self, serial, timeout=2, delimiters=["\n", "CCC"]):
7 self.serial = serial
8 self.timeout = timeout
9 self.clear()
10 self.set_delimiters(delimiters)
12 def encode(self, data):
13 if type(data) == str:
14 return data.encode(encoding)
15 return data
17 def clear(self):
18 self.serial.reset_input_buffer()
19 self.buf = bytearray()
21 def set_serial(self, serial):
22 self.serial = serial
24 def set_timeout(self, timeout):
25 self.timeout = timeout
27 def set_delimiters(self, delimiters):
28 self.delimiters = [
29 bytes(d, encoding) if type(d) == str else d for d in delimiters]
31 def read_line(self, timeout=None):
32 if timeout is None or timeout <= 0.:
33 timeout = self.timeout
34 buf = self.buf
35 def has_delimiter():
36 for d in self.delimiters:
37 if d in buf:
38 return True
40 start = time.time()
41 while not has_delimiter() and ((time.time() - start) < timeout):
42 i = max(0, min(2048, self.serial.in_waiting))
43 data = self.serial.read(i)
44 if not data:
45 continue
47 buf.extend(data)
49 for delimiter in self.delimiters:
50 i = buf.find(delimiter)
51 if i >= 0:
52 offset = i+len(delimiter)
53 r = buf[:offset]
54 self.buf = buf[offset:]
55 return self.__convert_to_str(r)
57 # No match
59 # Timeout, reset buffer and return empty string
60 #print("TIMEOUT! Got:\n>>>>>>\n{}\n<<<<<<\n".format(buf))
61 self.buf = bytearray()
62 return ""
64 def write(self, data):
65 serial = self.serial
66 data = self.encode(data)
67 cnt = serial.write(data)
68 serial.flush()
70 def write_str(self, data):
71 self.write(data+'\r\n')
73 def __convert_to_str(self, data):
74 try:
75 return data.decode(encoding)
76 except UnicodeDecodeError:
77 return ""