Fixes #7171
[opentx.git] / tools / latency.py
blobd08c251d0d9fb3de836f708aff29203840acca53
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
4 # Copyright (C) OpenTX
6 # Based on code named
7 # th9x - http://code.google.com/p/th9x
8 # er9x - http://code.google.com/p/er9x
9 # gruvin9x - http://code.google.com/p/gruvin9x
11 # License GPLv2: http://www.gnu.org/licenses/gpl-2.0.html
13 # This program is free software; you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License version 2 as
15 # published by the Free Software Foundation.
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
23 import argparse
24 import csv
25 import os
28 def build_transitions_array(f, column):
29 f.seek(0)
30 reader = csv.reader(f, delimiter=',')
31 last = None
32 transitions = []
33 for i, row in enumerate(reader):
34 if i == 0:
35 continue
36 value = row[column]
37 if last is None or value != last:
38 transitions.append((float(row[0]) * 1000, int(value)))
39 last = value
40 debounced = []
41 i = 0
42 while i < len(transitions):
43 t, val = transitions[i]
44 if i < len(transitions) - 1 and transitions[i+1][0] - t < 0.002:
45 i += 2
46 continue
47 debounced.append((t, val))
48 i += 1
49 return debounced
52 class Frame:
53 def __init__(self):
54 self.transitions = []
56 def push(self, t, value):
57 self.transitions.append((t, value))
59 def start(self):
60 return self.transitions[0][0]
62 def end(self):
63 return self.transitions[-1][0]
65 def is_after(self, t):
66 return self.start() >= t
68 def output(self, time):
69 value = 0
70 for t, v in self.transitions:
71 if t > time:
72 return value
73 value = v
74 return value
76 @classmethod
77 def get_frames(cls, transitions):
78 result = []
79 current_frame = None
80 last = 0
81 for t, value in transitions:
82 if t - last > 3:
83 if current_frame:
84 result.append(current_frame)
85 current_frame = cls()
86 if current_frame is not None:
87 current_frame.push(t, value)
88 last = t
89 return result
92 class SBusFrame(Frame):
93 def byte(self, index):
94 t = self.start() + 0.12 * index + 0.015
95 value = 0
96 for bit in range(8):
97 value += (1 - self.output(t)) << bit
98 t += 0.010
99 return value
101 def is_lost(self):
102 return self.byte(23) & 0x04
104 def value(self, channel):
105 bits_available = 0
106 bits = 0
107 byte = 0
108 value = None
109 for i in range(channel + 1):
110 while bits_available < 11:
111 byte += 1
112 bits |= self.byte(byte) << bits_available
113 bits_available += 8
114 value = ((bits & 0b11111111111) - 0x3E0) * 5 / 8
115 bits_available -= 11
116 bits >>= 11
117 return round((value * 100) / 512)
119 def __str__(self):
120 return "%.03fms " % self.start() + " ".join(["%02X" % self.byte(i) for i in range(25)])
123 class PwmFrame(Frame):
124 def duration(self):
125 return self.end() - self.start()
127 def value(self, channel):
128 return round((self.duration() * 1000 - 1500) * 100 / 512)
130 def __str__(self):
131 return "%.03fms %d" % (self.start(), self.value(0))
134 class LatencyStatistics:
135 def __init__(self, trigger_transitions, frames, channel, highval, lowval):
136 self.trigger_transitions = trigger_transitions
137 self.frames = frames
138 self.channel = channel
139 self.highval = highval
140 self.lowval = lowval
142 def iter(self):
143 for t0, val in self.trigger_transitions[1:]:
144 value = self.highval if val == 1 else self.lowval
145 for frame in self.frames:
146 if frame.is_after(t0) and frame.value(self.channel) == value:
147 delay = frame.end() - t0
148 yield (t0, val, delay)
149 break
151 @staticmethod
152 def append_to_line(f, s, index, lines, columns):
153 if columns == 0:
154 f.write(s)
155 elif index < len(lines):
156 f.write(lines[index].strip() + ";" + s)
157 else:
158 f.write(";" * columns + s)
159 f.write("\n")
161 def export(self, path, title, append):
162 lines = []
163 columns = 0
164 if append and os.path.exists(path):
165 with open(path, 'r') as f:
166 lines = f.readlines()
167 columns = len(lines[0].split(";"))
168 with open(path, 'w') as f:
169 self.append_to_line(f, title, 0, lines, columns)
170 index = 1
171 for t0, val, delay in self.iter():
172 self.append_to_line(f, str(delay), index, lines, columns)
173 index += 1
175 def print(self):
176 mini, maxi = None, None
177 count = 0
178 total = 0
179 for t0, val, delay in self.iter():
180 count += 1
181 total += delay
182 if mini is None or delay < mini[0]:
183 mini = (delay, t0)
184 if maxi is None or delay > maxi[0]:
185 maxi = (delay, t0)
187 print("Delay between the switch toggle and the end of the SBUS frame:")
188 print(" Count = %d transitions" % count)
189 print(" Average = %.1fms" % (total / count))
190 print(" Mini = %.1fms @ %fs" % (mini[0], mini[1] / 1000))
191 print(" Maxi = %.1fms @ %fs" % (maxi[0], maxi[1] / 1000))
194 def main():
195 parser = argparse.ArgumentParser()
196 parser.add_argument('file', help='file to parse', type=argparse.FileType('r'))
197 parser.add_argument('--trigger', help='column in the CSV file where is the trigger', type=int, required=True)
198 parser.add_argument('--pwm', help='column in the CSV file where is the PWM output', type=int)
199 parser.add_argument('--sbus', help='column in the CSV file where is the SBUS output', type=int)
200 parser.add_argument('--channel', help='channel to check', type=int, default=0)
201 parser.add_argument('--highval', help='value of channel when trigger=HIGH', type=int, default=+100)
202 parser.add_argument('--lowval', help='value of channel when trigger=LOW', type=int, default=-100)
203 parser.add_argument('--export', help='CSV file to export latency values')
204 parser.add_argument('--title', help='CSV column title', default="Unknown")
205 parser.add_argument('--append', help='export CSV file in append mode', action='store_true')
206 args = parser.parse_args()
208 trigger_transitions = build_transitions_array(args.file, args.trigger)
210 if args.sbus:
211 sbus_transitions = build_transitions_array(args.file, args.sbus)
212 frames = SBusFrame.get_frames(sbus_transitions)
213 for frame in frames:
214 if frame.is_lost():
215 print("Frame lost bit @ %fs" % (frame.start() / 1000))
216 elif args.pwm:
217 pwm_transitions = build_transitions_array(args.file, args.pwm)
218 frames = PwmFrame.get_frames(pwm_transitions)
219 else:
220 print("Either a PWM or SBUS column in CSV must be specified")
221 exit()
223 statistics = LatencyStatistics(trigger_transitions, frames, args.channel, args.highval, args.lowval)
224 if args.export:
225 statistics.export(args.export, args.title, args.append)
226 statistics.print()
229 if __name__ == "__main__":
230 main()