srd: drop deprecated PyEval_InitThreads() on Python 3.9+
[libsigrokdecode/gsi.git] / decoders / ieee488 / pd.py
blobb0948a685b40bdf08999a98179c57be3264208b8
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2016 Rudolf Reuter <reuterru@arcor.de>
5 ## Copyright (C) 2017 Marcus Comstedt <marcus@mc.pp.se>
6 ## Copyright (C) 2019 Gerhard Sittig <gerhard.sittig@gmx.net>
7 ##
8 ## This program is free software; you can redistribute it and/or modify
9 ## it under the terms of the GNU General Public License as published by
10 ## the Free Software Foundation; either version 2 of the License, or
11 ## (at your option) any later version.
13 ## This program is distributed in the hope that it will be useful,
14 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
15 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 ## GNU General Public License for more details.
18 ## You should have received a copy of the GNU General Public License
19 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
22 # This file was created from earlier implementations of the 'gpib' and
23 # the 'iec' protocol decoders. It combines the parallel and the serial
24 # transmission variants in a single instance with optional inputs for
25 # maximum code re-use.
27 # TODO
28 # - Extend annotations for improved usability.
29 # - Keep talkers' data streams on separate annotation rows? Is this useful
30 # here at the GPIB level, or shall stacked decoders dispatch these? May
31 # depend on how often captures get inspected which involve multiple peers.
32 # - Make serial bit annotations optional? Could slow down interactive
33 # exploration for long captures (see USB).
34 # - Move the inlined Commodore IEC peripherals support to a stacked decoder
35 # when more peripherals get added.
36 # - SCPI over GPIB may "represent somewhat naturally" here already when
37 # text lines are a single run of data at the GPIB layer (each line between
38 # the address spec and either EOI or ATN). So a stacked SCPI decoder may
39 # only become necessary when the text lines' content shall get inspected.
41 import sigrokdecode as srd
42 from common.srdhelper import bitpack
44 '''
45 OUTPUT_PYTHON format for stacked decoders:
47 General packet format:
48 [<ptype>, <addr>, <pdata>]
50 This is the list of <ptype>s and their respective <pdata> values:
52 Raw bits and bytes at the physical transport level:
53 - 'IEC_BIT': <addr> is not applicable, <pdata> is the transport's bit value.
54 - 'GPIB_RAW': <addr> is not applicable, <pdata> is the transport's
55 byte value. Data bytes are in the 0x00-0xff range, command/address
56 bytes are in the 0x100-0x1ff range.
58 GPIB level byte fields (commands, addresses, pieces of data):
59 - 'COMMAND': <addr> is not applicable, <pdata> is the command's byte value.
60 - 'LISTEN': <addr> is the listener address (0-30), <pdata> is the raw
61 byte value (including the 0x20 offset).
62 - 'TALK': <addr> is the talker address (0-30), <pdata> is the raw byte
63 value (including the 0x40 offset).
64 - 'SECONDARY': <addr> is the secondary address (0-31), <pdata> is the
65 raw byte value (including the 0x60 offset).
66 - 'MSB_SET': <addr> as well as <pdata> are the raw byte value (including
67 the 0x80 offset). This usually does not happen for GPIB bytes with ATN
68 active, but was observed with the IEC bus and Commodore floppy drives,
69 when addressing channels within the device.
70 - 'DATA_BYTE': <addr> is the talker address (when available), <pdata>
71 is the raw data byte (transport layer, ATN inactive).
72 - 'PPOLL': <addr> is not applicable, <pdata> is a list of bit indices
73 (DIO1 to DIO8 order) which responded to the PP request.
75 Extracted payload information (peers and their communicated data):
76 - 'TALK_LISTEN': <addr> is the current talker, <pdata> is the list of
77 current listeners. These updates for the current "connected peers"
78 are sent when the set of peers changes, i.e. after talkers/listeners
79 got selected or deselected. Of course the data only covers what could
80 be gathered from the input data. Some controllers may not explicitly
81 address themselves, or captures may not include an early setup phase.
82 - 'TALKER_BYTES': <addr> is the talker address (when available), <pdata>
83 is the accumulated byte sequence between addressing a talker and EOI,
84 or the next command/address.
85 - 'TALKER_TEXT': <addr> is the talker address (when available), <pdata>
86 is the accumulated text sequence between addressing a talker and EOI,
87 or the next command/address.
88 '''
90 class ChannelError(Exception):
91 pass
93 def _format_ann_texts(fmts, **args):
94 if not fmts:
95 return None
96 return [fmt.format(**args) for fmt in fmts]
98 _cmd_table = {
99 # Command codes in the 0x00-0x1f range.
100 0x01: ['Go To Local', 'GTL'],
101 0x04: ['Selected Device Clear', 'SDC'],
102 0x05: ['Parallel Poll Configure', 'PPC'],
103 0x08: ['Global Execute Trigger', 'GET'],
104 0x09: ['Take Control', 'TCT'],
105 0x11: ['Local Lock Out', 'LLO'],
106 0x14: ['Device Clear', 'DCL'],
107 0x15: ['Parallel Poll Unconfigure', 'PPU'],
108 0x18: ['Serial Poll Enable', 'SPE'],
109 0x19: ['Serial Poll Disable', 'SPD'],
110 # Unknown type of command.
111 None: ['Unknown command 0x{cmd:02x}', 'command 0x{cmd:02x}', 'cmd {cmd:02x}', 'C{cmd_ord:c}'],
112 # Special listener/talker "addresses" (deselecting previous peers).
113 0x3f: ['Unlisten', 'UNL'],
114 0x5f: ['Untalk', 'UNT'],
117 def _is_command(b):
118 # Returns a tuple of booleans (or None when not applicable) whether
119 # the raw GPIB byte is: a command, an un-listen, an un-talk command.
120 if b in range(0x00, 0x20):
121 return True, None, None
122 if b in range(0x20, 0x40) and (b & 0x1f) == 31:
123 return True, True, False
124 if b in range(0x40, 0x60) and (b & 0x1f) == 31:
125 return True, False, True
126 return False, None, None
128 def _is_listen_addr(b):
129 if b in range(0x20, 0x40):
130 return b & 0x1f
131 return None
133 def _is_talk_addr(b):
134 if b in range(0x40, 0x60):
135 return b & 0x1f
136 return None
138 def _is_secondary_addr(b):
139 if b in range(0x60, 0x80):
140 return b & 0x1f
141 return None
143 def _is_msb_set(b):
144 if b & 0x80:
145 return b
146 return None
148 def _get_raw_byte(b, atn):
149 # "Decorate" raw byte values for stacked decoders.
150 return b | 0x100 if atn else b
152 def _get_raw_text(b, atn):
153 return ['{leader}{data:02x}'.format(leader = '/' if atn else '', data = b)]
155 def _get_command_texts(b):
156 fmts = _cmd_table.get(b, None)
157 known = fmts is not None
158 if not fmts:
159 fmts = _cmd_table.get(None, None)
160 if not fmts:
161 return known, None
162 return known, _format_ann_texts(fmts, cmd = b, cmd_ord = ord('0') + b)
164 def _get_address_texts(b):
165 laddr = _is_listen_addr(b)
166 taddr = _is_talk_addr(b)
167 saddr = _is_secondary_addr(b)
168 msb = _is_msb_set(b)
169 fmts = None
170 if laddr is not None:
171 fmts = ['Listen {addr:d}', 'L {addr:d}', 'L{addr_ord:c}']
172 addr = laddr
173 elif taddr is not None:
174 fmts = ['Talk {addr:d}', 'T {addr:d}', 'T{addr_ord:c}']
175 addr = taddr
176 elif saddr is not None:
177 fmts = ['Secondary {addr:d}', 'S {addr:d}', 'S{addr_ord:c}']
178 addr = saddr
179 elif msb is not None: # For IEC bus compat.
180 fmts = ['Secondary {addr:d}', 'S {addr:d}', 'S{addr_ord:c}']
181 addr = msb
182 return _format_ann_texts(fmts, addr = addr, addr_ord = ord('0') + addr)
184 def _get_data_text(b):
185 # TODO Move the table of ASCII control characters to a common location?
186 # TODO Move the "printable with escapes" logic to a common helper?
187 _control_codes = {
188 0x00: 'NUL',
189 0x01: 'SOH',
190 0x02: 'STX',
191 0x03: 'ETX',
192 0x04: 'EOT',
193 0x05: 'ENQ',
194 0x06: 'ACK',
195 0x07: 'BEL',
196 0x08: 'BS',
197 0x09: 'TAB',
198 0x0a: 'LF',
199 0x0b: 'VT',
200 0x0c: 'FF',
201 0x0d: 'CR',
202 0x0e: 'SO',
203 0x0f: 'SI',
204 0x10: 'DLE',
205 0x11: 'DC1',
206 0x12: 'DC2',
207 0x13: 'DC3',
208 0x14: 'DC4',
209 0x15: 'NAK',
210 0x16: 'SYN',
211 0x17: 'ETB',
212 0x18: 'CAN',
213 0x19: 'EM',
214 0x1a: 'SUB',
215 0x1b: 'ESC',
216 0x1c: 'FS',
217 0x1d: 'GS',
218 0x1e: 'RS',
219 0x1f: 'US',
221 # Yes, exclude 0x7f (DEL) here. It's considered non-printable.
222 if b in range(0x20, 0x7f) and b not in ('[', ']'):
223 return '{:s}'.format(chr(b))
224 elif b in _control_codes:
225 return '[{:s}]'.format(_control_codes[b])
226 # Use a compact yet readable and unambigous presentation for bytes
227 # which contain non-printables. The format that is used here is
228 # compatible with 93xx EEPROM and UART decoders.
229 return '[{:02x}]'.format(b)
232 PIN_DIO1, PIN_DIO2, PIN_DIO3, PIN_DIO4,
233 PIN_DIO5, PIN_DIO6, PIN_DIO7, PIN_DIO8,
234 PIN_EOI, PIN_DAV, PIN_NRFD, PIN_NDAC,
235 PIN_IFC, PIN_SRQ, PIN_ATN, PIN_REN,
236 PIN_CLK,
237 ) = range(17)
238 PIN_DATA = PIN_DIO1
241 ANN_RAW_BIT, ANN_RAW_BYTE,
242 ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA,
243 ANN_EOI,
244 ANN_PP,
245 ANN_TEXT,
246 # TODO Want to provide one annotation class per talker address (0-30)?
247 ANN_IEC_PERIPH,
248 ANN_WARN,
249 ) = range(12)
252 BIN_RAW,
253 BIN_DATA,
254 # TODO Want to provide one binary annotation class per talker address (0-30)?
255 ) = range(2)
257 class Decoder(srd.Decoder):
258 api_version = 3
259 id = 'ieee488'
260 name = 'IEEE-488'
261 longname = 'IEEE-488 GPIB/HPIB/IEC'
262 desc = 'IEEE-488 General Purpose Interface Bus (GPIB/HPIB or IEC).'
263 license = 'gplv2+'
264 inputs = ['logic']
265 outputs = ['ieee488']
266 tags = ['PC', 'Retro computing']
267 channels = (
268 {'id': 'dio1' , 'name': 'DIO1/DATA',
269 'desc': 'Data I/O bit 1, or serial data'},
271 optional_channels = (
272 {'id': 'dio2' , 'name': 'DIO2', 'desc': 'Data I/O bit 2'},
273 {'id': 'dio3' , 'name': 'DIO3', 'desc': 'Data I/O bit 3'},
274 {'id': 'dio4' , 'name': 'DIO4', 'desc': 'Data I/O bit 4'},
275 {'id': 'dio5' , 'name': 'DIO5', 'desc': 'Data I/O bit 5'},
276 {'id': 'dio6' , 'name': 'DIO6', 'desc': 'Data I/O bit 6'},
277 {'id': 'dio7' , 'name': 'DIO7', 'desc': 'Data I/O bit 7'},
278 {'id': 'dio8' , 'name': 'DIO8', 'desc': 'Data I/O bit 8'},
279 {'id': 'eoi', 'name': 'EOI', 'desc': 'End or identify'},
280 {'id': 'dav', 'name': 'DAV', 'desc': 'Data valid'},
281 {'id': 'nrfd', 'name': 'NRFD', 'desc': 'Not ready for data'},
282 {'id': 'ndac', 'name': 'NDAC', 'desc': 'Not data accepted'},
283 {'id': 'ifc', 'name': 'IFC', 'desc': 'Interface clear'},
284 {'id': 'srq', 'name': 'SRQ', 'desc': 'Service request'},
285 {'id': 'atn', 'name': 'ATN', 'desc': 'Attention'},
286 {'id': 'ren', 'name': 'REN', 'desc': 'Remote enable'},
287 {'id': 'clk', 'name': 'CLK', 'desc': 'Serial clock'},
289 options = (
290 {'id': 'iec_periph', 'desc': 'Decode Commodore IEC peripherals',
291 'default': 'no', 'values': ('no', 'yes')},
292 {'id': 'delim', 'desc': 'Payload data delimiter',
293 'default': 'eol', 'values': ('none', 'eol')},
294 {'id': 'atn_parity', 'desc': 'ATN commands use parity',
295 'default': 'no', 'values': ('no', 'yes')},
297 annotations = (
298 ('bit', 'IEC bit'),
299 ('raw', 'Raw byte'),
300 ('cmd', 'Command'),
301 ('laddr', 'Listener address'),
302 ('taddr', 'Talker address'),
303 ('saddr', 'Secondary address'),
304 ('data', 'Data byte'),
305 ('eoi', 'EOI'),
306 ('pp', 'Parallel poll'),
307 ('text', 'Talker text'),
308 ('periph', 'IEC bus peripherals'),
309 ('warning', 'Warning'),
311 annotation_rows = (
312 ('bits', 'IEC bits', (ANN_RAW_BIT,)),
313 ('raws', 'Raw bytes', (ANN_RAW_BYTE,)),
314 ('gpib', 'Commands/data', (ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA,)),
315 ('eois', 'EOI', (ANN_EOI,)),
316 ('polls', 'Polls', (ANN_PP,)),
317 ('texts', 'Talker texts', (ANN_TEXT,)),
318 ('periphs', 'IEC peripherals', (ANN_IEC_PERIPH,)),
319 ('warnings', 'Warnings', (ANN_WARN,)),
321 binary = (
322 ('raw', 'Raw bytes'),
323 ('data', 'Talker bytes'),
326 def __init__(self):
327 self.reset()
329 def reset(self):
330 self.curr_raw = None
331 self.curr_atn = None
332 self.curr_eoi = None
333 self.latch_atn = None
334 self.latch_eoi = None
335 self.accu_bytes = []
336 self.accu_text = []
337 self.ss_raw = None
338 self.es_raw = None
339 self.ss_eoi = None
340 self.es_eoi = None
341 self.ss_text = None
342 self.es_text = None
343 self.ss_pp = None
344 self.last_talker = None
345 self.last_listener = []
346 self.last_iec_addr = None
347 self.last_iec_sec = None
349 def start(self):
350 self.out_ann = self.register(srd.OUTPUT_ANN)
351 self.out_bin = self.register(srd.OUTPUT_BINARY)
352 self.out_python = self.register(srd.OUTPUT_PYTHON)
354 def putg(self, ss, es, data):
355 self.put(ss, es, self.out_ann, data)
357 def putbin(self, ss, es, data):
358 self.put(ss, es, self.out_bin, data)
360 def putpy(self, ss, es, ptype, addr, pdata):
361 self.put(ss, es, self.out_python, [ptype, addr, pdata])
363 def emit_eoi_ann(self, ss, es):
364 self.putg(ss, es, [ANN_EOI, ['EOI']])
366 def emit_bin_ann(self, ss, es, ann_cls, data):
367 self.putbin(ss, es, [ann_cls, bytes(data)])
369 def emit_data_ann(self, ss, es, ann_cls, data):
370 self.putg(ss, es, [ann_cls, data])
372 def emit_warn_ann(self, ss, es, data):
373 self.putg(ss, es, [ANN_WARN, data])
375 def flush_bytes_text_accu(self):
376 if self.accu_bytes and self.ss_text is not None and self.es_text is not None:
377 self.emit_bin_ann(self.ss_text, self.es_text, BIN_DATA, bytearray(self.accu_bytes))
378 self.putpy(self.ss_text, self.es_text, 'TALKER_BYTES', self.last_talker, bytearray(self.accu_bytes))
379 self.accu_bytes = []
380 if self.accu_text and self.ss_text is not None and self.es_text is not None:
381 text = ''.join(self.accu_text)
382 self.emit_data_ann(self.ss_text, self.es_text, ANN_TEXT, [text])
383 self.putpy(self.ss_text, self.es_text, 'TALKER_TEXT', self.last_talker, text)
384 self.accu_text = []
385 self.ss_text = self.es_text = None
387 def check_extra_flush(self, b):
388 # Optionally flush previously accumulated runs of payload data
389 # according to user specified conditions.
390 if self.options['delim'] == 'none':
391 return
392 if not self.accu_bytes:
393 return
395 # This implementation exlusively handles "text lines", but adding
396 # support for more variants here is straight forward.
398 # Search for the first data byte _after_ a user specified text
399 # line termination sequence was seen. The termination sequence's
400 # alphabet may be variable, and the sequence may span multiple
401 # data bytes. We accept either CR or LF, and combine the CR+LF
402 # sequence to strive for maximum length annotations for improved
403 # readability at different zoom levels. It's acceptable that this
404 # implementation would also combine multiple line terminations
405 # like LF+LF.
406 term_chars = (10, 13)
407 is_eol = b in term_chars
408 had_eol = self.accu_bytes[-1] in term_chars
409 if had_eol and not is_eol:
410 self.flush_bytes_text_accu()
412 def check_pp(self, dio = None):
413 # The combination of ATN and EOI means PP (parallel poll). Track
414 # this condition's start and end, and keep grabing the DIO lines'
415 # state as long as the condition is seen, since DAV is not used
416 # in the PP communication.
417 capture_in_pp = self.curr_atn and self.curr_eoi
418 decoder_in_pp = self.ss_pp is not None
419 if capture_in_pp and not decoder_in_pp:
420 # Phase starts. Track its ss. Start collecting DIO state.
421 self.ss_pp = self.samplenum
422 self.dio_pp = []
423 return 'enter'
424 if not capture_in_pp and decoder_in_pp:
425 # Phase ends. Void its ss. Process collected DIO state.
426 ss, es = self.ss_pp, self.samplenum
427 dio = self.dio_pp or []
428 self.ss_pp, self.dio_pp = None, None
429 if ss == es:
430 # False positive, caused by low oversampling.
431 return 'leave'
432 # Emit its annotation. Translate bit indices 0..7 for the
433 # DIO1..DIO8 signals to display text. Pass bit indices in
434 # the Python output for upper layers.
436 # TODO The presentation of this information may need more
437 # adjustment. The bit positions need not translate to known
438 # device addresses. Bits need not even belong to a single
439 # device. Participants and their location in the DIO pattern
440 # is configurable. Leave the interpretation to upper layers.
441 bits = [i for i, b in enumerate(dio) if b]
442 bits_text = ' '.join(['{}'.format(i + 1) for i in bits])
443 dios = ['DIO{}'.format(i + 1) for i in bits]
444 dios_text = ' '.join(dios or ['-'])
445 text = [
446 'PPOLL {}'.format(dios_text),
447 'PP {}'.format(bits_text),
448 'PP',
450 self.emit_data_ann(ss, es, ANN_PP, text)
451 self.putpy(ss, es, 'PPOLL', None, bits)
452 # Cease collecting DIO state.
453 return 'leave'
454 if decoder_in_pp:
455 # Keep collecting DIO state for each individual sample in
456 # the PP phase. Logically OR all DIO values that were seen.
457 # This increases robustness for low oversampling captures,
458 # where DIO may no longer be asserted when ATN/EOI deassert,
459 # and DIO was not asserted yet when ATN/EOI start asserting.
460 if dio is None:
461 dio = []
462 if len(dio) > len(self.dio_pp):
463 self.dio_pp.extend([ 0, ] * (len(dio) - len(self.dio_pp)))
464 for i, b in enumerate(dio):
465 self.dio_pp[i] |= b
466 return 'keep'
467 return 'idle'
469 def handle_ifc_change(self, ifc):
470 # Track IFC line for parallel input.
471 # Assertion of IFC de-selects all talkers and listeners.
472 if ifc:
473 self.last_talker = None
474 self.last_listener = []
475 self.flush_bytes_text_accu()
477 def handle_eoi_change(self, eoi):
478 # Track EOI line for parallel and serial input.
479 if eoi:
480 self.ss_eoi = self.samplenum
481 self.curr_eoi = eoi
482 else:
483 self.es_eoi = self.samplenum
484 if self.ss_eoi and self.latch_eoi:
485 self.emit_eoi_ann(self.ss_eoi, self.es_eoi)
486 self.es_text = self.es_eoi
487 self.flush_bytes_text_accu()
488 self.ss_eoi = self.es_eoi = None
489 self.curr_eoi = None
491 def handle_atn_change(self, atn):
492 # Track ATN line for parallel and serial input.
493 self.curr_atn = atn
494 if atn:
495 self.flush_bytes_text_accu()
497 def handle_iec_periph(self, ss, es, addr, sec, data):
498 # The annotation is optional.
499 if self.options['iec_periph'] != 'yes':
500 return
501 # Void internal state.
502 if addr is None and sec is None and data is None:
503 self.last_iec_addr = None
504 self.last_iec_sec = None
505 return
506 # Grab and evaluate new input.
507 _iec_addr_names = {
508 # TODO Add more items here. See the "Device numbering" section
509 # of the https://en.wikipedia.org/wiki/Commodore_bus page.
510 8: 'Disk 0',
511 9: 'Disk 1',
513 _iec_disk_range = range(8, 16)
514 if addr is not None:
515 self.last_iec_addr = addr
516 name = _iec_addr_names.get(addr, None)
517 if name:
518 self.emit_data_ann(ss, es, ANN_IEC_PERIPH, [name])
519 addr = self.last_iec_addr # Simplify subsequent logic.
520 if sec is not None:
521 # BEWARE! The secondary address is a full byte and includes
522 # the 0x60 offset, to also work when the MSB was set.
523 self.last_iec_sec = sec
524 subcmd, channel = sec & 0xf0, sec & 0x0f
525 channel_ord = ord('0') + channel
526 if addr is not None and addr in _iec_disk_range:
527 subcmd_fmts = {
528 0x60: ['Reopen {ch:d}', 'Re {ch:d}', 'R{ch_ord:c}'],
529 0xe0: ['Close {ch:d}', 'Cl {ch:d}', 'C{ch_ord:c}'],
530 0xf0: ['Open {ch:d}', 'Op {ch:d}', 'O{ch_ord:c}'],
531 }.get(subcmd, None)
532 if subcmd_fmts:
533 texts = _format_ann_texts(subcmd_fmts, ch = channel, ch_ord = channel_ord)
534 self.emit_data_ann(ss, es, ANN_IEC_PERIPH, texts)
535 sec = self.last_iec_sec # Simplify subsequent logic.
536 if data is not None:
537 if addr is None or sec is None:
538 return
539 # TODO Process data depending on peripheral type and channel?
541 def handle_data_byte(self):
542 if not self.curr_atn:
543 self.check_extra_flush(self.curr_raw)
544 b = self.curr_raw
545 texts = _get_raw_text(b, self.curr_atn)
546 self.emit_data_ann(self.ss_raw, self.es_raw, ANN_RAW_BYTE, texts)
547 self.emit_bin_ann(self.ss_raw, self.es_raw, BIN_RAW, b.to_bytes(1, byteorder='big'))
548 self.putpy(self.ss_raw, self.es_raw, 'GPIB_RAW', None, _get_raw_byte(b, self.curr_atn))
549 if self.curr_atn:
550 ann_cls = None
551 upd_iec = False,
552 py_type = None
553 py_peers = False
554 if self.options['atn_parity'] == 'yes':
555 par = 1 if b & 0x80 else 0
556 b &= ~0x80
557 ones = bin(b).count('1') + par
558 if ones % 2:
559 warn_texts = ['Command parity error', 'parity', 'PAR']
560 self.emit_warn_ann(self.ss_raw, self.es_raw, warn_texts)
561 is_cmd, is_unl, is_unt = _is_command(b)
562 laddr = _is_listen_addr(b)
563 taddr = _is_talk_addr(b)
564 saddr = _is_secondary_addr(b)
565 msb = _is_msb_set(b)
566 if is_cmd:
567 known, texts = _get_command_texts(b)
568 if not known:
569 warn_texts = ['Unknown GPIB command', 'unknown', 'UNK']
570 self.emit_warn_ann(self.ss_raw, self.es_raw, warn_texts)
571 ann_cls = ANN_CMD
572 py_type, py_addr = 'COMMAND', None
573 if is_unl:
574 self.last_listener = []
575 py_peers = True
576 if is_unt:
577 self.last_talker = None
578 py_peers = True
579 if is_unl or is_unt:
580 upd_iec = True, None, None, None
581 elif laddr is not None:
582 addr = laddr
583 texts = _get_address_texts(b)
584 ann_cls = ANN_LADDR
585 py_type, py_addr = 'LISTEN', addr
586 if addr == self.last_talker:
587 self.last_talker = None
588 self.last_listener.append(addr)
589 upd_iec = True, addr, None, None
590 py_peers = True
591 elif taddr is not None:
592 addr = taddr
593 texts = _get_address_texts(b)
594 ann_cls = ANN_TADDR
595 py_type, py_addr = 'TALK', addr
596 if addr in self.last_listener:
597 self.last_listener.remove(addr)
598 self.last_talker = addr
599 upd_iec = True, addr, None, None
600 py_peers = True
601 elif saddr is not None:
602 addr = saddr
603 texts = _get_address_texts(b)
604 ann_cls = ANN_SADDR
605 upd_iec = True, None, b, None
606 py_type, py_addr = 'SECONDARY', addr
607 elif msb is not None:
608 # These are not really "secondary addresses", but they
609 # are used by the Commodore IEC bus (floppy channels).
610 texts = _get_address_texts(b)
611 ann_cls = ANN_SADDR
612 upd_iec = True, None, b, None
613 py_type, py_addr = 'MSB_SET', b
614 if ann_cls is not None and texts is not None:
615 self.emit_data_ann(self.ss_raw, self.es_raw, ann_cls, texts)
616 if upd_iec[0]:
617 self.handle_iec_periph(self.ss_raw, self.es_raw, upd_iec[1], upd_iec[2], upd_iec[3])
618 if py_type:
619 self.putpy(self.ss_raw, self.es_raw, py_type, py_addr, b)
620 if py_peers:
621 self.last_listener.sort()
622 self.putpy(self.ss_raw, self.es_raw, 'TALK_LISTEN', self.last_talker, self.last_listener)
623 else:
624 self.accu_bytes.append(b)
625 text = _get_data_text(b)
626 if not self.accu_text:
627 self.ss_text = self.ss_raw
628 self.accu_text.append(text)
629 self.es_text = self.es_raw
630 self.emit_data_ann(self.ss_raw, self.es_raw, ANN_DATA, [text])
631 self.handle_iec_periph(self.ss_raw, self.es_raw, None, None, b)
632 self.putpy(self.ss_raw, self.es_raw, 'DATA_BYTE', self.last_talker, b)
634 def handle_dav_change(self, dav, data):
635 if dav:
636 # Data availability starts when the flag goes active.
637 self.ss_raw = self.samplenum
638 self.curr_raw = bitpack(data)
639 self.latch_atn = self.curr_atn
640 self.latch_eoi = self.curr_eoi
641 return
642 # Data availability ends when the flag goes inactive. Handle the
643 # previously captured data byte according to associated flags.
644 self.es_raw = self.samplenum
645 self.handle_data_byte()
646 self.ss_raw = self.es_raw = None
647 self.curr_raw = None
649 def inject_dav_phase(self, ss, es, data):
650 # Inspection of serial input has resulted in one raw byte which
651 # spans a given period of time. Pretend we had seen a DAV active
652 # phase, to re-use code for the parallel transmission.
653 self.ss_raw = ss
654 self.curr_raw = bitpack(data)
655 self.latch_atn = self.curr_atn
656 self.latch_eoi = self.curr_eoi
657 self.es_raw = es
658 self.handle_data_byte()
659 self.ss_raw = self.es_raw = None
660 self.curr_raw = None
662 def invert_pins(self, pins):
663 # All lines (including data bits!) are low active and thus need
664 # to get inverted to receive their logical state (high active,
665 # regular data bit values). Cope with inputs being optional.
666 return [1 - p if p in (0, 1) else p for p in pins]
668 def decode_serial(self, has_clk, has_data_1, has_atn, has_srq):
669 if not has_clk or not has_data_1 or not has_atn:
670 raise ChannelError('IEC bus needs at least ATN and serial CLK and DATA.')
672 # This is a rephrased version of decoders/iec/pd.py:decode().
673 # SRQ was not used there either. Magic numbers were eliminated.
675 STEP_WAIT_READY_TO_SEND,
676 STEP_WAIT_READY_FOR_DATA,
677 STEP_PREP_DATA_TEST_EOI,
678 STEP_CLOCK_DATA_BITS,
679 ) = range(4)
680 step_wait_conds = (
681 [{PIN_ATN: 'f'}, {PIN_DATA: 'l', PIN_CLK: 'h'}],
682 [{PIN_ATN: 'f'}, {PIN_DATA: 'h', PIN_CLK: 'h'}, {PIN_CLK: 'l'}],
683 [{PIN_ATN: 'f'}, {PIN_DATA: 'f'}, {PIN_CLK: 'l'}],
684 [{PIN_ATN: 'f'}, {PIN_CLK: 'e'}],
686 step = STEP_WAIT_READY_TO_SEND
687 bits = []
689 while True:
691 # Sample input pin values. Keep DATA/CLK in verbatim form to
692 # re-use 'iec' decoder logic. Turn ATN to positive logic for
693 # easier processing. The data bits get handled during byte
694 # accumulation.
695 pins = self.wait(step_wait_conds[step])
696 data, clk = pins[PIN_DATA], pins[PIN_CLK]
697 atn, = self.invert_pins([pins[PIN_ATN]])
699 if self.matched[0]:
700 # Falling edge on ATN, reset step.
701 step = STEP_WAIT_READY_TO_SEND
703 if step == STEP_WAIT_READY_TO_SEND:
704 # Don't use self.matched[1] here since we might come from
705 # a step with different conds due to the code above.
706 if data == 0 and clk == 1:
707 # Rising edge on CLK while DATA is low: Ready to send.
708 step = STEP_WAIT_READY_FOR_DATA
709 elif step == STEP_WAIT_READY_FOR_DATA:
710 if data == 1 and clk == 1:
711 # Rising edge on DATA while CLK is high: Ready for data.
712 ss_byte = self.samplenum
713 self.handle_atn_change(atn)
714 if self.curr_eoi:
715 self.handle_eoi_change(False)
716 bits = []
717 step = STEP_PREP_DATA_TEST_EOI
718 elif clk == 0:
719 # CLK low again, transfer aborted.
720 step = STEP_WAIT_READY_TO_SEND
721 elif step == STEP_PREP_DATA_TEST_EOI:
722 if data == 0 and clk == 1:
723 # DATA goes low while CLK is still high, EOI confirmed.
724 self.handle_eoi_change(True)
725 elif clk == 0:
726 step = STEP_CLOCK_DATA_BITS
727 ss_bit = self.samplenum
728 elif step == STEP_CLOCK_DATA_BITS:
729 if self.matched[1]:
730 if clk == 1:
731 # Rising edge on CLK; latch DATA.
732 bits.append(data)
733 elif clk == 0:
734 # Falling edge on CLK; end of bit.
735 es_bit = self.samplenum
736 self.emit_data_ann(ss_bit, es_bit, ANN_RAW_BIT, ['{:d}'.format(bits[-1])])
737 self.putpy(ss_bit, es_bit, 'IEC_BIT', None, bits[-1])
738 ss_bit = self.samplenum
739 if len(bits) == 8:
740 es_byte = self.samplenum
741 self.inject_dav_phase(ss_byte, es_byte, bits)
742 if self.curr_eoi:
743 self.handle_eoi_change(False)
744 step = STEP_WAIT_READY_TO_SEND
746 def decode_parallel(self, has_data_n, has_dav, has_atn, has_eoi, has_srq):
748 if False in has_data_n or not has_dav or not has_atn:
749 raise ChannelError('IEEE-488 needs at least ATN and DAV and eight DIO lines.')
750 has_ifc = self.has_channel(PIN_IFC)
752 # Capture data lines at the falling edge of DAV, process their
753 # values at rising DAV edge (when data validity ends). Also make
754 # sure to start inspection when the capture happens to start with
755 # low signal levels, i.e. won't include the initial falling edge.
756 # Scan for ATN/EOI edges as well (including the trick which works
757 # around initial pin state).
759 # Use efficient edge based wait conditions for most activities,
760 # though some phases may require individual inspection of each
761 # sample (think parallel poll in combination with slow sampling).
763 # Map low-active physical transport lines to positive logic here,
764 # to simplify logical inspection/decoding of communicated data,
765 # and to avoid redundancy and inconsistency in later code paths.
766 waitcond = []
767 idx_dav = len(waitcond)
768 waitcond.append({PIN_DAV: 'l'})
769 idx_atn = len(waitcond)
770 waitcond.append({PIN_ATN: 'l'})
771 idx_eoi = None
772 if has_eoi:
773 idx_eoi = len(waitcond)
774 waitcond.append({PIN_EOI: 'l'})
775 idx_ifc = None
776 if has_ifc:
777 idx_ifc = len(waitcond)
778 waitcond.append({PIN_IFC: 'l'})
779 idx_pp_check = None
780 def add_data_cond(conds):
781 idx = len(conds)
782 conds.append({'skip': 1})
783 return idx
784 def del_data_cond(conds, idx):
785 conds.pop(idx)
786 return None
787 while True:
788 pins = self.wait(waitcond)
789 pins = self.invert_pins(pins)
791 # BEWARE! Order of evaluation does matter. For low samplerate
792 # captures, many edges fall onto the same sample number. So
793 # we process active edges of flags early (before processing
794 # data bits), and inactive edges late (after data got processed).
795 want_pp_check = False
796 if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 1:
797 self.handle_ifc_change(pins[PIN_IFC])
798 if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 1:
799 self.handle_eoi_change(pins[PIN_EOI])
800 want_pp_check = True
801 if self.matched[idx_atn] and pins[PIN_ATN] == 1:
802 self.handle_atn_change(pins[PIN_ATN])
803 want_pp_check = True
804 if want_pp_check and not idx_pp_check:
805 pp = self.check_pp()
806 if pp in ('enter',):
807 idx_pp_check = add_data_cond(waitcond)
808 if self.matched[idx_dav]:
809 self.handle_dav_change(pins[PIN_DAV], pins[PIN_DIO1:PIN_DIO8 + 1])
810 if idx_pp_check:
811 pp = self.check_pp(pins[PIN_DIO1:PIN_DIO8 + 1])
812 want_pp_check = False
813 if self.matched[idx_atn] and pins[PIN_ATN] == 0:
814 self.handle_atn_change(pins[PIN_ATN])
815 want_pp_check = True
816 if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 0:
817 self.handle_eoi_change(pins[PIN_EOI])
818 want_pp_check = True
819 if idx_pp_check is not None and want_pp_check:
820 pp = self.check_pp(pins[PIN_DIO1:PIN_DIO8 + 1])
821 if pp in ('leave',) and idx_pp_check is not None:
822 idx_pp_check = del_data_cond(waitcond, idx_pp_check)
823 if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 0:
824 self.handle_ifc_change(pins[PIN_IFC])
826 waitcond[idx_dav][PIN_DAV] = 'e'
827 waitcond[idx_atn][PIN_ATN] = 'e'
828 if has_eoi:
829 waitcond[idx_eoi][PIN_EOI] = 'e'
830 if has_ifc:
831 waitcond[idx_ifc][PIN_IFC] = 'e'
833 def decode(self):
834 # The decoder's boilerplate declares some of the input signals as
835 # optional, but only to support both serial and parallel variants.
836 # The CLK signal discriminates the two. For either variant some
837 # of the "optional" signals are not really optional for proper
838 # operation of the decoder. Check these conditions here.
839 has_clk = self.has_channel(PIN_CLK)
840 has_data_1 = self.has_channel(PIN_DIO1)
841 has_data_n = [bool(self.has_channel(pin) for pin in range(PIN_DIO1, PIN_DIO8 + 1))]
842 has_dav = self.has_channel(PIN_DAV)
843 has_atn = self.has_channel(PIN_ATN)
844 has_eoi = self.has_channel(PIN_EOI)
845 has_srq = self.has_channel(PIN_SRQ)
846 if has_clk:
847 self.decode_serial(has_clk, has_data_1, has_atn, has_srq)
848 else:
849 self.decode_parallel(has_data_n, has_dav, has_atn, has_eoi, has_srq)