2 # -*- coding: utf-8 -*-
7 # th9x - http://code.google.com/p/th9x
8 # er9x - http://code.google.com/p/er9x
9 # gruvin9x - http://code.google.com/p/gruvin9x
11 # License GPLv2: http://www.gnu.org/licenses/gpl-2.0.html
13 # This program is free software; you can redistribute it and/or modify
14 # it under the terms of the GNU General Public License version 2 as
15 # published by the Free Software Foundation.
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
28 def build_transitions_array(f
, column
):
30 reader
= csv
.reader(f
, delimiter
=',')
33 for i
, row
in enumerate(reader
):
37 if last
is None or value
!= last
:
38 transitions
.append((float(row
[0]) * 1000, int(value
)))
42 while i
< len(transitions
):
43 t
, val
= transitions
[i
]
44 if i
< len(transitions
) - 1 and transitions
[i
+1][0] - t
< 0.002:
47 debounced
.append((t
, val
))
56 def push(self
, t
, value
):
57 self
.transitions
.append((t
, value
))
60 return self
.transitions
[0][0]
63 return self
.transitions
[-1][0]
65 def is_after(self
, t
):
66 return self
.start() >= t
68 def output(self
, time
):
70 for t
, v
in self
.transitions
:
77 def get_frames(cls
, transitions
):
81 for t
, value
in transitions
:
84 result
.append(current_frame
)
86 if current_frame
is not None:
87 current_frame
.push(t
, value
)
92 class SBusFrame(Frame
):
93 def byte(self
, index
):
94 t
= self
.start() + 0.12 * index
+ 0.015
97 value
+= (1 - self
.output(t
)) << bit
102 return self
.byte(23) & 0x04
104 def value(self
, channel
):
109 for i
in range(channel
+ 1):
110 while bits_available
< 11:
112 bits |
= self
.byte(byte
) << bits_available
114 value
= ((bits
& 0b11111111111) - 0x3E0) * 5 / 8
117 return round((value
* 100) / 512)
120 return "%.03fms " % self
.start() + " ".join(["%02X" % self
.byte(i
) for i
in range(25)])
123 class PwmFrame(Frame
):
125 return self
.end() - self
.start()
127 def value(self
, channel
):
128 return round((self
.duration() * 1000 - 1500) * 100 / 512)
131 return "%.03fms %d" % (self
.start(), self
.value(0))
134 class LatencyStatistics
:
135 def __init__(self
, trigger_transitions
, frames
, channel
, highval
, lowval
):
136 self
.trigger_transitions
= trigger_transitions
138 self
.channel
= channel
139 self
.highval
= highval
143 for t0
, val
in self
.trigger_transitions
[1:]:
144 value
= self
.highval
if val
== 1 else self
.lowval
145 for frame
in self
.frames
:
146 if frame
.is_after(t0
) and frame
.value(self
.channel
) == value
:
147 delay
= frame
.end() - t0
148 yield (t0
, val
, delay
)
152 def append_to_line(f
, s
, index
, lines
, columns
):
155 elif index
< len(lines
):
156 f
.write(lines
[index
].strip() + ";" + s
)
158 f
.write(";" * columns
+ s
)
161 def export(self
, path
, title
, append
):
164 if append
and os
.path
.exists(path
):
165 with
open(path
, 'r') as f
:
166 lines
= f
.readlines()
167 columns
= len(lines
[0].split(";"))
168 with
open(path
, 'w') as f
:
169 self
.append_to_line(f
, title
, 0, lines
, columns
)
171 for t0
, val
, delay
in self
.iter():
172 self
.append_to_line(f
, str(delay
), index
, lines
, columns
)
176 mini
, maxi
= None, None
179 for t0
, val
, delay
in self
.iter():
182 if mini
is None or delay
< mini
[0]:
184 if maxi
is None or delay
> maxi
[0]:
187 print("Delay between the switch toggle and the end of the SBUS frame:")
188 print(" Count = %d transitions" % count
)
189 print(" Average = %.1fms" % (total
/ count
))
190 print(" Mini = %.1fms @ %fs" % (mini
[0], mini
[1] / 1000))
191 print(" Maxi = %.1fms @ %fs" % (maxi
[0], maxi
[1] / 1000))
195 parser
= argparse
.ArgumentParser()
196 parser
.add_argument('file', help='file to parse', type=argparse
.FileType('r'))
197 parser
.add_argument('--trigger', help='column in the CSV file where is the trigger', type=int, required
=True)
198 parser
.add_argument('--pwm', help='column in the CSV file where is the PWM output', type=int)
199 parser
.add_argument('--sbus', help='column in the CSV file where is the SBUS output', type=int)
200 parser
.add_argument('--channel', help='channel to check', type=int, default
=0)
201 parser
.add_argument('--highval', help='value of channel when trigger=HIGH', type=int, default
=+100)
202 parser
.add_argument('--lowval', help='value of channel when trigger=LOW', type=int, default
=-100)
203 parser
.add_argument('--export', help='CSV file to export latency values')
204 parser
.add_argument('--title', help='CSV column title', default
="Unknown")
205 parser
.add_argument('--append', help='export CSV file in append mode', action
='store_true')
206 args
= parser
.parse_args()
208 trigger_transitions
= build_transitions_array(args
.file, args
.trigger
)
211 sbus_transitions
= build_transitions_array(args
.file, args
.sbus
)
212 frames
= SBusFrame
.get_frames(sbus_transitions
)
215 print("Frame lost bit @ %fs" % (frame
.start() / 1000))
217 pwm_transitions
= build_transitions_array(args
.file, args
.pwm
)
218 frames
= PwmFrame
.get_frames(pwm_transitions
)
220 print("Either a PWM or SBUS column in CSV must be specified")
223 statistics
= LatencyStatistics(trigger_transitions
, frames
, args
.channel
, args
.highval
, args
.lowval
)
225 statistics
.export(args
.export
, args
.title
, args
.append
)
229 if __name__
== "__main__":