2 ## This file is part of the libsigrokdecode project.
4 ## Copyright (C) 2016 Rudolf Reuter <reuterru@arcor.de>
5 ## Copyright (C) 2017 Marcus Comstedt <marcus@mc.pp.se>
6 ## Copyright (C) 2019 Gerhard Sittig <gerhard.sittig@gmx.net>
8 ## This program is free software; you can redistribute it and/or modify
9 ## it under the terms of the GNU General Public License as published by
10 ## the Free Software Foundation; either version 2 of the License, or
11 ## (at your option) any later version.
13 ## This program is distributed in the hope that it will be useful,
14 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
15 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 ## GNU General Public License for more details.
18 ## You should have received a copy of the GNU General Public License
19 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
22 # This file was created from earlier implementations of the 'gpib' and
23 # the 'iec' protocol decoders. It combines the parallel and the serial
24 # transmission variants in a single instance with optional inputs for
25 # maximum code re-use.
28 # - Extend annotations for improved usability.
29 # - Keep talkers' data streams on separate annotation rows? Is this useful
30 # here at the GPIB level, or shall stacked decoders dispatch these? May
31 # depend on how often captures get inspected which involve multiple peers.
32 # - Make serial bit annotations optional? Could slow down interactive
33 # exploration for long captures (see USB).
34 # - Move the inlined Commodore IEC peripherals support to a stacked decoder
35 # when more peripherals get added.
36 # - SCPI over GPIB may "represent somewhat naturally" here already when
37 # text lines are a single run of data at the GPIB layer (each line between
38 # the address spec and either EOI or ATN). So a stacked SCPI decoder may
39 # only become necessary when the text lines' content shall get inspected.
41 import sigrokdecode
as srd
42 from common
.srdhelper
import bitpack
45 OUTPUT_PYTHON format for stacked decoders:
47 General packet format:
48 [<ptype>, <addr>, <pdata>]
50 This is the list of <ptype>s and their respective <pdata> values:
52 Raw bits and bytes at the physical transport level:
53 - 'IEC_BIT': <addr> is not applicable, <pdata> is the transport's bit value.
54 - 'GPIB_RAW': <addr> is not applicable, <pdata> is the transport's
55 byte value. Data bytes are in the 0x00-0xff range, command/address
56 bytes are in the 0x100-0x1ff range.
58 GPIB level byte fields (commands, addresses, pieces of data):
59 - 'COMMAND': <addr> is not applicable, <pdata> is the command's byte value.
60 - 'LISTEN': <addr> is the listener address (0-30), <pdata> is the raw
61 byte value (including the 0x20 offset).
62 - 'TALK': <addr> is the talker address (0-30), <pdata> is the raw byte
63 value (including the 0x40 offset).
64 - 'SECONDARY': <addr> is the secondary address (0-31), <pdata> is the
65 raw byte value (including the 0x60 offset).
66 - 'MSB_SET': <addr> as well as <pdata> are the raw byte value (including
67 the 0x80 offset). This usually does not happen for GPIB bytes with ATN
68 active, but was observed with the IEC bus and Commodore floppy drives,
69 when addressing channels within the device.
70 - 'DATA_BYTE': <addr> is the talker address (when available), <pdata>
71 is the raw data byte (transport layer, ATN inactive).
72 - 'PPOLL': <addr> is not applicable, <pdata> is a list of bit indices
73 (DIO1 to DIO8 order) which responded to the PP request.
75 Extracted payload information (peers and their communicated data):
76 - 'TALK_LISTEN': <addr> is the current talker, <pdata> is the list of
77 current listeners. These updates for the current "connected peers"
78 are sent when the set of peers changes, i.e. after talkers/listeners
79 got selected or deselected. Of course the data only covers what could
80 be gathered from the input data. Some controllers may not explicitly
81 address themselves, or captures may not include an early setup phase.
82 - 'TALKER_BYTES': <addr> is the talker address (when available), <pdata>
83 is the accumulated byte sequence between addressing a talker and EOI,
84 or the next command/address.
85 - 'TALKER_TEXT': <addr> is the talker address (when available), <pdata>
86 is the accumulated text sequence between addressing a talker and EOI,
87 or the next command/address.
90 class ChannelError(Exception):
93 def _format_ann_texts(fmts
, **args
):
96 return [fmt
.format(**args
) for fmt
in fmts
]
99 # Command codes in the 0x00-0x1f range.
100 0x01: ['Go To Local', 'GTL'],
101 0x04: ['Selected Device Clear', 'SDC'],
102 0x05: ['Parallel Poll Configure', 'PPC'],
103 0x08: ['Global Execute Trigger', 'GET'],
104 0x09: ['Take Control', 'TCT'],
105 0x11: ['Local Lock Out', 'LLO'],
106 0x14: ['Device Clear', 'DCL'],
107 0x15: ['Parallel Poll Unconfigure', 'PPU'],
108 0x18: ['Serial Poll Enable', 'SPE'],
109 0x19: ['Serial Poll Disable', 'SPD'],
110 # Unknown type of command.
111 None: ['Unknown command 0x{cmd:02x}', 'command 0x{cmd:02x}', 'cmd {cmd:02x}', 'C{cmd_ord:c}'],
112 # Special listener/talker "addresses" (deselecting previous peers).
113 0x3f: ['Unlisten', 'UNL'],
114 0x5f: ['Untalk', 'UNT'],
118 # Returns a tuple of booleans (or None when not applicable) whether
119 # the raw GPIB byte is: a command, an un-listen, an un-talk command.
120 if b
in range(0x00, 0x20):
121 return True, None, None
122 if b
in range(0x20, 0x40) and (b
& 0x1f) == 31:
123 return True, True, False
124 if b
in range(0x40, 0x60) and (b
& 0x1f) == 31:
125 return True, False, True
126 return False, None, None
128 def _is_listen_addr(b
):
129 if b
in range(0x20, 0x40):
133 def _is_talk_addr(b
):
134 if b
in range(0x40, 0x60):
138 def _is_secondary_addr(b
):
139 if b
in range(0x60, 0x80):
148 def _get_raw_byte(b
, atn
):
149 # "Decorate" raw byte values for stacked decoders.
150 return b |
0x100 if atn
else b
152 def _get_raw_text(b
, atn
):
153 return ['{leader}{data:02x}'.format(leader
= '/' if atn
else '', data
= b
)]
155 def _get_command_texts(b
):
156 fmts
= _cmd_table
.get(b
, None)
157 known
= fmts
is not None
159 fmts
= _cmd_table
.get(None, None)
162 return known
, _format_ann_texts(fmts
, cmd
= b
, cmd_ord
= ord('0') + b
)
164 def _get_address_texts(b
):
165 laddr
= _is_listen_addr(b
)
166 taddr
= _is_talk_addr(b
)
167 saddr
= _is_secondary_addr(b
)
170 if laddr
is not None:
171 fmts
= ['Listen {addr:d}', 'L {addr:d}', 'L{addr_ord:c}']
173 elif taddr
is not None:
174 fmts
= ['Talk {addr:d}', 'T {addr:d}', 'T{addr_ord:c}']
176 elif saddr
is not None:
177 fmts
= ['Secondary {addr:d}', 'S {addr:d}', 'S{addr_ord:c}']
179 elif msb
is not None: # For IEC bus compat.
180 fmts
= ['Secondary {addr:d}', 'S {addr:d}', 'S{addr_ord:c}']
182 return _format_ann_texts(fmts
, addr
= addr
, addr_ord
= ord('0') + addr
)
184 def _get_data_text(b
):
185 # TODO Move the table of ASCII control characters to a common location?
186 # TODO Move the "printable with escapes" logic to a common helper?
221 # Yes, exclude 0x7f (DEL) here. It's considered non-printable.
222 if b
in range(0x20, 0x7f) and b
not in ('[', ']'):
223 return '{:s}'.format(chr(b
))
224 elif b
in _control_codes
:
225 return '[{:s}]'.format(_control_codes
[b
])
226 # Use a compact yet readable and unambigous presentation for bytes
227 # which contain non-printables. The format that is used here is
228 # compatible with 93xx EEPROM and UART decoders.
229 return '[{:02x}]'.format(b
)
232 PIN_DIO1
, PIN_DIO2
, PIN_DIO3
, PIN_DIO4
,
233 PIN_DIO5
, PIN_DIO6
, PIN_DIO7
, PIN_DIO8
,
234 PIN_EOI
, PIN_DAV
, PIN_NRFD
, PIN_NDAC
,
235 PIN_IFC
, PIN_SRQ
, PIN_ATN
, PIN_REN
,
241 ANN_RAW_BIT
, ANN_RAW_BYTE
,
242 ANN_CMD
, ANN_LADDR
, ANN_TADDR
, ANN_SADDR
, ANN_DATA
,
246 # TODO Want to provide one annotation class per talker address (0-30)?
254 # TODO Want to provide one binary annotation class per talker address (0-30)?
257 class Decoder(srd
.Decoder
):
261 longname
= 'IEEE-488 GPIB/HPIB/IEC'
262 desc
= 'IEEE-488 General Purpose Interface Bus (GPIB/HPIB or IEC).'
265 outputs
= ['ieee488']
266 tags
= ['PC', 'Retro computing']
268 {'id': 'dio1' , 'name': 'DIO1/DATA',
269 'desc': 'Data I/O bit 1, or serial data'},
271 optional_channels
= (
272 {'id': 'dio2' , 'name': 'DIO2', 'desc': 'Data I/O bit 2'},
273 {'id': 'dio3' , 'name': 'DIO3', 'desc': 'Data I/O bit 3'},
274 {'id': 'dio4' , 'name': 'DIO4', 'desc': 'Data I/O bit 4'},
275 {'id': 'dio5' , 'name': 'DIO5', 'desc': 'Data I/O bit 5'},
276 {'id': 'dio6' , 'name': 'DIO6', 'desc': 'Data I/O bit 6'},
277 {'id': 'dio7' , 'name': 'DIO7', 'desc': 'Data I/O bit 7'},
278 {'id': 'dio8' , 'name': 'DIO8', 'desc': 'Data I/O bit 8'},
279 {'id': 'eoi', 'name': 'EOI', 'desc': 'End or identify'},
280 {'id': 'dav', 'name': 'DAV', 'desc': 'Data valid'},
281 {'id': 'nrfd', 'name': 'NRFD', 'desc': 'Not ready for data'},
282 {'id': 'ndac', 'name': 'NDAC', 'desc': 'Not data accepted'},
283 {'id': 'ifc', 'name': 'IFC', 'desc': 'Interface clear'},
284 {'id': 'srq', 'name': 'SRQ', 'desc': 'Service request'},
285 {'id': 'atn', 'name': 'ATN', 'desc': 'Attention'},
286 {'id': 'ren', 'name': 'REN', 'desc': 'Remote enable'},
287 {'id': 'clk', 'name': 'CLK', 'desc': 'Serial clock'},
290 {'id': 'iec_periph', 'desc': 'Decode Commodore IEC peripherals',
291 'default': 'no', 'values': ('no', 'yes')},
292 {'id': 'delim', 'desc': 'Payload data delimiter',
293 'default': 'eol', 'values': ('none', 'eol')},
294 {'id': 'atn_parity', 'desc': 'ATN commands use parity',
295 'default': 'no', 'values': ('no', 'yes')},
301 ('laddr', 'Listener address'),
302 ('taddr', 'Talker address'),
303 ('saddr', 'Secondary address'),
304 ('data', 'Data byte'),
306 ('pp', 'Parallel poll'),
307 ('text', 'Talker text'),
308 ('periph', 'IEC bus peripherals'),
309 ('warning', 'Warning'),
312 ('bits', 'IEC bits', (ANN_RAW_BIT
,)),
313 ('raws', 'Raw bytes', (ANN_RAW_BYTE
,)),
314 ('gpib', 'Commands/data', (ANN_CMD
, ANN_LADDR
, ANN_TADDR
, ANN_SADDR
, ANN_DATA
,)),
315 ('eois', 'EOI', (ANN_EOI
,)),
316 ('polls', 'Polls', (ANN_PP
,)),
317 ('texts', 'Talker texts', (ANN_TEXT
,)),
318 ('periphs', 'IEC peripherals', (ANN_IEC_PERIPH
,)),
319 ('warnings', 'Warnings', (ANN_WARN
,)),
322 ('raw', 'Raw bytes'),
323 ('data', 'Talker bytes'),
333 self
.latch_atn
= None
334 self
.latch_eoi
= None
344 self
.last_talker
= None
345 self
.last_listener
= []
346 self
.last_iec_addr
= None
347 self
.last_iec_sec
= None
350 self
.out_ann
= self
.register(srd
.OUTPUT_ANN
)
351 self
.out_bin
= self
.register(srd
.OUTPUT_BINARY
)
352 self
.out_python
= self
.register(srd
.OUTPUT_PYTHON
)
354 def putg(self
, ss
, es
, data
):
355 self
.put(ss
, es
, self
.out_ann
, data
)
357 def putbin(self
, ss
, es
, data
):
358 self
.put(ss
, es
, self
.out_bin
, data
)
360 def putpy(self
, ss
, es
, ptype
, addr
, pdata
):
361 self
.put(ss
, es
, self
.out_python
, [ptype
, addr
, pdata
])
363 def emit_eoi_ann(self
, ss
, es
):
364 self
.putg(ss
, es
, [ANN_EOI
, ['EOI']])
366 def emit_bin_ann(self
, ss
, es
, ann_cls
, data
):
367 self
.putbin(ss
, es
, [ann_cls
, bytes(data
)])
369 def emit_data_ann(self
, ss
, es
, ann_cls
, data
):
370 self
.putg(ss
, es
, [ann_cls
, data
])
372 def emit_warn_ann(self
, ss
, es
, data
):
373 self
.putg(ss
, es
, [ANN_WARN
, data
])
375 def flush_bytes_text_accu(self
):
376 if self
.accu_bytes
and self
.ss_text
is not None and self
.es_text
is not None:
377 self
.emit_bin_ann(self
.ss_text
, self
.es_text
, BIN_DATA
, bytearray(self
.accu_bytes
))
378 self
.putpy(self
.ss_text
, self
.es_text
, 'TALKER_BYTES', self
.last_talker
, bytearray(self
.accu_bytes
))
380 if self
.accu_text
and self
.ss_text
is not None and self
.es_text
is not None:
381 text
= ''.join(self
.accu_text
)
382 self
.emit_data_ann(self
.ss_text
, self
.es_text
, ANN_TEXT
, [text
])
383 self
.putpy(self
.ss_text
, self
.es_text
, 'TALKER_TEXT', self
.last_talker
, text
)
385 self
.ss_text
= self
.es_text
= None
387 def check_extra_flush(self
, b
):
388 # Optionally flush previously accumulated runs of payload data
389 # according to user specified conditions.
390 if self
.options
['delim'] == 'none':
392 if not self
.accu_bytes
:
395 # This implementation exlusively handles "text lines", but adding
396 # support for more variants here is straight forward.
398 # Search for the first data byte _after_ a user specified text
399 # line termination sequence was seen. The termination sequence's
400 # alphabet may be variable, and the sequence may span multiple
401 # data bytes. We accept either CR or LF, and combine the CR+LF
402 # sequence to strive for maximum length annotations for improved
403 # readability at different zoom levels. It's acceptable that this
404 # implementation would also combine multiple line terminations
406 term_chars
= (10, 13)
407 is_eol
= b
in term_chars
408 had_eol
= self
.accu_bytes
[-1] in term_chars
409 if had_eol
and not is_eol
:
410 self
.flush_bytes_text_accu()
412 def check_pp(self
, dio
= None):
413 # The combination of ATN and EOI means PP (parallel poll). Track
414 # this condition's start and end, and keep grabing the DIO lines'
415 # state as long as the condition is seen, since DAV is not used
416 # in the PP communication.
417 capture_in_pp
= self
.curr_atn
and self
.curr_eoi
418 decoder_in_pp
= self
.ss_pp
is not None
419 if capture_in_pp
and not decoder_in_pp
:
420 # Phase starts. Track its ss. Start collecting DIO state.
421 self
.ss_pp
= self
.samplenum
424 if not capture_in_pp
and decoder_in_pp
:
425 # Phase ends. Void its ss. Process collected DIO state.
426 ss
, es
= self
.ss_pp
, self
.samplenum
427 dio
= self
.dio_pp
or []
428 self
.ss_pp
, self
.dio_pp
= None, None
430 # False positive, caused by low oversampling.
432 # Emit its annotation. Translate bit indices 0..7 for the
433 # DIO1..DIO8 signals to display text. Pass bit indices in
434 # the Python output for upper layers.
436 # TODO The presentation of this information may need more
437 # adjustment. The bit positions need not translate to known
438 # device addresses. Bits need not even belong to a single
439 # device. Participants and their location in the DIO pattern
440 # is configurable. Leave the interpretation to upper layers.
441 bits
= [i
for i
, b
in enumerate(dio
) if b
]
442 bits_text
= ' '.join(['{}'.format(i
+ 1) for i
in bits
])
443 dios
= ['DIO{}'.format(i
+ 1) for i
in bits
]
444 dios_text
= ' '.join(dios
or ['-'])
446 'PPOLL {}'.format(dios_text
),
447 'PP {}'.format(bits_text
),
450 self
.emit_data_ann(ss
, es
, ANN_PP
, text
)
451 self
.putpy(ss
, es
, 'PPOLL', None, bits
)
452 # Cease collecting DIO state.
455 # Keep collecting DIO state for each individual sample in
456 # the PP phase. Logically OR all DIO values that were seen.
457 # This increases robustness for low oversampling captures,
458 # where DIO may no longer be asserted when ATN/EOI deassert,
459 # and DIO was not asserted yet when ATN/EOI start asserting.
462 if len(dio
) > len(self
.dio_pp
):
463 self
.dio_pp
.extend([ 0, ] * (len(dio
) - len(self
.dio_pp
)))
464 for i
, b
in enumerate(dio
):
469 def handle_ifc_change(self
, ifc
):
470 # Track IFC line for parallel input.
471 # Assertion of IFC de-selects all talkers and listeners.
473 self
.last_talker
= None
474 self
.last_listener
= []
475 self
.flush_bytes_text_accu()
477 def handle_eoi_change(self
, eoi
):
478 # Track EOI line for parallel and serial input.
480 self
.ss_eoi
= self
.samplenum
483 self
.es_eoi
= self
.samplenum
484 if self
.ss_eoi
and self
.latch_eoi
:
485 self
.emit_eoi_ann(self
.ss_eoi
, self
.es_eoi
)
486 self
.es_text
= self
.es_eoi
487 self
.flush_bytes_text_accu()
488 self
.ss_eoi
= self
.es_eoi
= None
491 def handle_atn_change(self
, atn
):
492 # Track ATN line for parallel and serial input.
495 self
.flush_bytes_text_accu()
497 def handle_iec_periph(self
, ss
, es
, addr
, sec
, data
):
498 # The annotation is optional.
499 if self
.options
['iec_periph'] != 'yes':
501 # Void internal state.
502 if addr
is None and sec
is None and data
is None:
503 self
.last_iec_addr
= None
504 self
.last_iec_sec
= None
506 # Grab and evaluate new input.
508 # TODO Add more items here. See the "Device numbering" section
509 # of the https://en.wikipedia.org/wiki/Commodore_bus page.
513 _iec_disk_range
= range(8, 16)
515 self
.last_iec_addr
= addr
516 name
= _iec_addr_names
.get(addr
, None)
518 self
.emit_data_ann(ss
, es
, ANN_IEC_PERIPH
, [name
])
519 addr
= self
.last_iec_addr
# Simplify subsequent logic.
521 # BEWARE! The secondary address is a full byte and includes
522 # the 0x60 offset, to also work when the MSB was set.
523 self
.last_iec_sec
= sec
524 subcmd
, channel
= sec
& 0xf0, sec
& 0x0f
525 channel_ord
= ord('0') + channel
526 if addr
is not None and addr
in _iec_disk_range
:
528 0x60: ['Reopen {ch:d}', 'Re {ch:d}', 'R{ch_ord:c}'],
529 0xe0: ['Close {ch:d}', 'Cl {ch:d}', 'C{ch_ord:c}'],
530 0xf0: ['Open {ch:d}', 'Op {ch:d}', 'O{ch_ord:c}'],
533 texts
= _format_ann_texts(subcmd_fmts
, ch
= channel
, ch_ord
= channel_ord
)
534 self
.emit_data_ann(ss
, es
, ANN_IEC_PERIPH
, texts
)
535 sec
= self
.last_iec_sec
# Simplify subsequent logic.
537 if addr
is None or sec
is None:
539 # TODO Process data depending on peripheral type and channel?
541 def handle_data_byte(self
):
542 if not self
.curr_atn
:
543 self
.check_extra_flush(self
.curr_raw
)
545 texts
= _get_raw_text(b
, self
.curr_atn
)
546 self
.emit_data_ann(self
.ss_raw
, self
.es_raw
, ANN_RAW_BYTE
, texts
)
547 self
.emit_bin_ann(self
.ss_raw
, self
.es_raw
, BIN_RAW
, b
.to_bytes(1, byteorder
='big'))
548 self
.putpy(self
.ss_raw
, self
.es_raw
, 'GPIB_RAW', None, _get_raw_byte(b
, self
.curr_atn
))
554 if self
.options
['atn_parity'] == 'yes':
555 par
= 1 if b
& 0x80 else 0
557 ones
= bin(b
).count('1') + par
559 warn_texts
= ['Command parity error', 'parity', 'PAR']
560 self
.emit_warn_ann(self
.ss_raw
, self
.es_raw
, warn_texts
)
561 is_cmd
, is_unl
, is_unt
= _is_command(b
)
562 laddr
= _is_listen_addr(b
)
563 taddr
= _is_talk_addr(b
)
564 saddr
= _is_secondary_addr(b
)
567 known
, texts
= _get_command_texts(b
)
569 warn_texts
= ['Unknown GPIB command', 'unknown', 'UNK']
570 self
.emit_warn_ann(self
.ss_raw
, self
.es_raw
, warn_texts
)
572 py_type
, py_addr
= 'COMMAND', None
574 self
.last_listener
= []
577 self
.last_talker
= None
580 upd_iec
= True, None, None, None
581 elif laddr
is not None:
583 texts
= _get_address_texts(b
)
585 py_type
, py_addr
= 'LISTEN', addr
586 if addr
== self
.last_talker
:
587 self
.last_talker
= None
588 self
.last_listener
.append(addr
)
589 upd_iec
= True, addr
, None, None
591 elif taddr
is not None:
593 texts
= _get_address_texts(b
)
595 py_type
, py_addr
= 'TALK', addr
596 if addr
in self
.last_listener
:
597 self
.last_listener
.remove(addr
)
598 self
.last_talker
= addr
599 upd_iec
= True, addr
, None, None
601 elif saddr
is not None:
603 texts
= _get_address_texts(b
)
605 upd_iec
= True, None, b
, None
606 py_type
, py_addr
= 'SECONDARY', addr
607 elif msb
is not None:
608 # These are not really "secondary addresses", but they
609 # are used by the Commodore IEC bus (floppy channels).
610 texts
= _get_address_texts(b
)
612 upd_iec
= True, None, b
, None
613 py_type
, py_addr
= 'MSB_SET', b
614 if ann_cls
is not None and texts
is not None:
615 self
.emit_data_ann(self
.ss_raw
, self
.es_raw
, ann_cls
, texts
)
617 self
.handle_iec_periph(self
.ss_raw
, self
.es_raw
, upd_iec
[1], upd_iec
[2], upd_iec
[3])
619 self
.putpy(self
.ss_raw
, self
.es_raw
, py_type
, py_addr
, b
)
621 self
.last_listener
.sort()
622 self
.putpy(self
.ss_raw
, self
.es_raw
, 'TALK_LISTEN', self
.last_talker
, self
.last_listener
)
624 self
.accu_bytes
.append(b
)
625 text
= _get_data_text(b
)
626 if not self
.accu_text
:
627 self
.ss_text
= self
.ss_raw
628 self
.accu_text
.append(text
)
629 self
.es_text
= self
.es_raw
630 self
.emit_data_ann(self
.ss_raw
, self
.es_raw
, ANN_DATA
, [text
])
631 self
.handle_iec_periph(self
.ss_raw
, self
.es_raw
, None, None, b
)
632 self
.putpy(self
.ss_raw
, self
.es_raw
, 'DATA_BYTE', self
.last_talker
, b
)
634 def handle_dav_change(self
, dav
, data
):
636 # Data availability starts when the flag goes active.
637 self
.ss_raw
= self
.samplenum
638 self
.curr_raw
= bitpack(data
)
639 self
.latch_atn
= self
.curr_atn
640 self
.latch_eoi
= self
.curr_eoi
642 # Data availability ends when the flag goes inactive. Handle the
643 # previously captured data byte according to associated flags.
644 self
.es_raw
= self
.samplenum
645 self
.handle_data_byte()
646 self
.ss_raw
= self
.es_raw
= None
649 def inject_dav_phase(self
, ss
, es
, data
):
650 # Inspection of serial input has resulted in one raw byte which
651 # spans a given period of time. Pretend we had seen a DAV active
652 # phase, to re-use code for the parallel transmission.
654 self
.curr_raw
= bitpack(data
)
655 self
.latch_atn
= self
.curr_atn
656 self
.latch_eoi
= self
.curr_eoi
658 self
.handle_data_byte()
659 self
.ss_raw
= self
.es_raw
= None
662 def invert_pins(self
, pins
):
663 # All lines (including data bits!) are low active and thus need
664 # to get inverted to receive their logical state (high active,
665 # regular data bit values). Cope with inputs being optional.
666 return [1 - p
if p
in (0, 1) else p
for p
in pins
]
668 def decode_serial(self
, has_clk
, has_data_1
, has_atn
, has_srq
):
669 if not has_clk
or not has_data_1
or not has_atn
:
670 raise ChannelError('IEC bus needs at least ATN and serial CLK and DATA.')
672 # This is a rephrased version of decoders/iec/pd.py:decode().
673 # SRQ was not used there either. Magic numbers were eliminated.
675 STEP_WAIT_READY_TO_SEND
,
676 STEP_WAIT_READY_FOR_DATA
,
677 STEP_PREP_DATA_TEST_EOI
,
678 STEP_CLOCK_DATA_BITS
,
681 [{PIN_ATN
: 'f'}, {PIN_DATA
: 'l', PIN_CLK
: 'h'}],
682 [{PIN_ATN
: 'f'}, {PIN_DATA
: 'h', PIN_CLK
: 'h'}, {PIN_CLK
: 'l'}],
683 [{PIN_ATN
: 'f'}, {PIN_DATA
: 'f'}, {PIN_CLK
: 'l'}],
684 [{PIN_ATN
: 'f'}, {PIN_CLK
: 'e'}],
686 step
= STEP_WAIT_READY_TO_SEND
691 # Sample input pin values. Keep DATA/CLK in verbatim form to
692 # re-use 'iec' decoder logic. Turn ATN to positive logic for
693 # easier processing. The data bits get handled during byte
695 pins
= self
.wait(step_wait_conds
[step
])
696 data
, clk
= pins
[PIN_DATA
], pins
[PIN_CLK
]
697 atn
, = self
.invert_pins([pins
[PIN_ATN
]])
700 # Falling edge on ATN, reset step.
701 step
= STEP_WAIT_READY_TO_SEND
703 if step
== STEP_WAIT_READY_TO_SEND
:
704 # Don't use self.matched[1] here since we might come from
705 # a step with different conds due to the code above.
706 if data
== 0 and clk
== 1:
707 # Rising edge on CLK while DATA is low: Ready to send.
708 step
= STEP_WAIT_READY_FOR_DATA
709 elif step
== STEP_WAIT_READY_FOR_DATA
:
710 if data
== 1 and clk
== 1:
711 # Rising edge on DATA while CLK is high: Ready for data.
712 ss_byte
= self
.samplenum
713 self
.handle_atn_change(atn
)
715 self
.handle_eoi_change(False)
717 step
= STEP_PREP_DATA_TEST_EOI
719 # CLK low again, transfer aborted.
720 step
= STEP_WAIT_READY_TO_SEND
721 elif step
== STEP_PREP_DATA_TEST_EOI
:
722 if data
== 0 and clk
== 1:
723 # DATA goes low while CLK is still high, EOI confirmed.
724 self
.handle_eoi_change(True)
726 step
= STEP_CLOCK_DATA_BITS
727 ss_bit
= self
.samplenum
728 elif step
== STEP_CLOCK_DATA_BITS
:
731 # Rising edge on CLK; latch DATA.
734 # Falling edge on CLK; end of bit.
735 es_bit
= self
.samplenum
736 self
.emit_data_ann(ss_bit
, es_bit
, ANN_RAW_BIT
, ['{:d}'.format(bits
[-1])])
737 self
.putpy(ss_bit
, es_bit
, 'IEC_BIT', None, bits
[-1])
738 ss_bit
= self
.samplenum
740 es_byte
= self
.samplenum
741 self
.inject_dav_phase(ss_byte
, es_byte
, bits
)
743 self
.handle_eoi_change(False)
744 step
= STEP_WAIT_READY_TO_SEND
746 def decode_parallel(self
, has_data_n
, has_dav
, has_atn
, has_eoi
, has_srq
):
748 if False in has_data_n
or not has_dav
or not has_atn
:
749 raise ChannelError('IEEE-488 needs at least ATN and DAV and eight DIO lines.')
750 has_ifc
= self
.has_channel(PIN_IFC
)
752 # Capture data lines at the falling edge of DAV, process their
753 # values at rising DAV edge (when data validity ends). Also make
754 # sure to start inspection when the capture happens to start with
755 # low signal levels, i.e. won't include the initial falling edge.
756 # Scan for ATN/EOI edges as well (including the trick which works
757 # around initial pin state).
759 # Use efficient edge based wait conditions for most activities,
760 # though some phases may require individual inspection of each
761 # sample (think parallel poll in combination with slow sampling).
763 # Map low-active physical transport lines to positive logic here,
764 # to simplify logical inspection/decoding of communicated data,
765 # and to avoid redundancy and inconsistency in later code paths.
767 idx_dav
= len(waitcond
)
768 waitcond
.append({PIN_DAV
: 'l'})
769 idx_atn
= len(waitcond
)
770 waitcond
.append({PIN_ATN
: 'l'})
773 idx_eoi
= len(waitcond
)
774 waitcond
.append({PIN_EOI
: 'l'})
777 idx_ifc
= len(waitcond
)
778 waitcond
.append({PIN_IFC
: 'l'})
780 def add_data_cond(conds
):
782 conds
.append({'skip': 1})
784 def del_data_cond(conds
, idx
):
788 pins
= self
.wait(waitcond
)
789 pins
= self
.invert_pins(pins
)
791 # BEWARE! Order of evaluation does matter. For low samplerate
792 # captures, many edges fall onto the same sample number. So
793 # we process active edges of flags early (before processing
794 # data bits), and inactive edges late (after data got processed).
795 want_pp_check
= False
796 if idx_ifc
is not None and self
.matched
[idx_ifc
] and pins
[PIN_IFC
] == 1:
797 self
.handle_ifc_change(pins
[PIN_IFC
])
798 if idx_eoi
is not None and self
.matched
[idx_eoi
] and pins
[PIN_EOI
] == 1:
799 self
.handle_eoi_change(pins
[PIN_EOI
])
801 if self
.matched
[idx_atn
] and pins
[PIN_ATN
] == 1:
802 self
.handle_atn_change(pins
[PIN_ATN
])
804 if want_pp_check
and not idx_pp_check
:
807 idx_pp_check
= add_data_cond(waitcond
)
808 if self
.matched
[idx_dav
]:
809 self
.handle_dav_change(pins
[PIN_DAV
], pins
[PIN_DIO1
:PIN_DIO8
+ 1])
811 pp
= self
.check_pp(pins
[PIN_DIO1
:PIN_DIO8
+ 1])
812 want_pp_check
= False
813 if self
.matched
[idx_atn
] and pins
[PIN_ATN
] == 0:
814 self
.handle_atn_change(pins
[PIN_ATN
])
816 if idx_eoi
is not None and self
.matched
[idx_eoi
] and pins
[PIN_EOI
] == 0:
817 self
.handle_eoi_change(pins
[PIN_EOI
])
819 if idx_pp_check
is not None and want_pp_check
:
820 pp
= self
.check_pp(pins
[PIN_DIO1
:PIN_DIO8
+ 1])
821 if pp
in ('leave',) and idx_pp_check
is not None:
822 idx_pp_check
= del_data_cond(waitcond
, idx_pp_check
)
823 if idx_ifc
is not None and self
.matched
[idx_ifc
] and pins
[PIN_IFC
] == 0:
824 self
.handle_ifc_change(pins
[PIN_IFC
])
826 waitcond
[idx_dav
][PIN_DAV
] = 'e'
827 waitcond
[idx_atn
][PIN_ATN
] = 'e'
829 waitcond
[idx_eoi
][PIN_EOI
] = 'e'
831 waitcond
[idx_ifc
][PIN_IFC
] = 'e'
834 # The decoder's boilerplate declares some of the input signals as
835 # optional, but only to support both serial and parallel variants.
836 # The CLK signal discriminates the two. For either variant some
837 # of the "optional" signals are not really optional for proper
838 # operation of the decoder. Check these conditions here.
839 has_clk
= self
.has_channel(PIN_CLK
)
840 has_data_1
= self
.has_channel(PIN_DIO1
)
841 has_data_n
= [bool(self
.has_channel(pin
) for pin
in range(PIN_DIO1
, PIN_DIO8
+ 1))]
842 has_dav
= self
.has_channel(PIN_DAV
)
843 has_atn
= self
.has_channel(PIN_ATN
)
844 has_eoi
= self
.has_channel(PIN_EOI
)
845 has_srq
= self
.has_channel(PIN_SRQ
)
847 self
.decode_serial(has_clk
, has_data_1
, has_atn
, has_srq
)
849 self
.decode_parallel(has_data_n
, has_dav
, has_atn
, has_eoi
, has_srq
)