ozone: evdev: Sync caps lock LED state to evdev
[chromium-blink-merge.git] / tools / telemetry / third_party / pyserial / serial / serialwin32.py
blobdfdd9536a6b896085f3aad9e127dbf16cb013cb4
1 #! python
2 # Python Serial Port Extension for Win32, Linux, BSD, Jython
3 # serial driver for win32
4 # see __init__.py
6 # (C) 2001-2011 Chris Liechti <cliechti@gmx.net>
7 # this is distributed under a free software license, see license.txt
9 # Initial patch to use ctypes by Giovanni Bajo <rasky@develer.com>
11 import ctypes
12 from serial import win32
14 from serial.serialutil import *
17 def device(portnum):
18 """Turn a port number into a device name"""
19 return 'COM%d' % (portnum+1) # numbers are transformed to a string
22 class Win32Serial(SerialBase):
23 """Serial port implementation for Win32 based on ctypes."""
25 BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
26 9600, 19200, 38400, 57600, 115200)
28 def __init__(self, *args, **kwargs):
29 self.hComPort = None
30 self._overlappedRead = None
31 self._overlappedWrite = None
32 self._rtsToggle = False
34 self._rtsState = win32.RTS_CONTROL_ENABLE
35 self._dtrState = win32.DTR_CONTROL_ENABLE
38 SerialBase.__init__(self, *args, **kwargs)
40 def open(self):
41 """Open port with current settings. This may throw a SerialException
42 if the port cannot be opened."""
43 if self._port is None:
44 raise SerialException("Port must be configured before it can be used.")
45 if self._isOpen:
46 raise SerialException("Port is already open.")
47 # the "\\.\COMx" format is required for devices other than COM1-COM8
48 # not all versions of windows seem to support this properly
49 # so that the first few ports are used with the DOS device name
50 port = self.portstr
51 try:
52 if port.upper().startswith('COM') and int(port[3:]) > 8:
53 port = '\\\\.\\' + port
54 except ValueError:
55 # for like COMnotanumber
56 pass
57 self.hComPort = win32.CreateFile(port,
58 win32.GENERIC_READ | win32.GENERIC_WRITE,
59 0, # exclusive access
60 None, # no security
61 win32.OPEN_EXISTING,
62 win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED,
64 if self.hComPort == win32.INVALID_HANDLE_VALUE:
65 self.hComPort = None # 'cause __del__ is called anyway
66 raise SerialException("could not open port %r: %r" % (self.portstr, ctypes.WinError()))
68 try:
69 self._overlappedRead = win32.OVERLAPPED()
70 self._overlappedRead.hEvent = win32.CreateEvent(None, 1, 0, None)
71 self._overlappedWrite = win32.OVERLAPPED()
72 #~ self._overlappedWrite.hEvent = win32.CreateEvent(None, 1, 0, None)
73 self._overlappedWrite.hEvent = win32.CreateEvent(None, 0, 0, None)
75 # Setup a 4k buffer
76 win32.SetupComm(self.hComPort, 4096, 4096)
78 # Save original timeout values:
79 self._orgTimeouts = win32.COMMTIMEOUTS()
80 win32.GetCommTimeouts(self.hComPort, ctypes.byref(self._orgTimeouts))
82 self._reconfigurePort()
84 # Clear buffers:
85 # Remove anything that was there
86 win32.PurgeComm(self.hComPort,
87 win32.PURGE_TXCLEAR | win32.PURGE_TXABORT |
88 win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
89 except:
90 try:
91 self._close()
92 except:
93 # ignore any exception when closing the port
94 # also to keep original exception that happened when setting up
95 pass
96 self.hComPort = None
97 raise
98 else:
99 self._isOpen = True
102 def _reconfigurePort(self):
103 """Set communication parameters on opened port."""
104 if not self.hComPort:
105 raise SerialException("Can only operate on a valid port handle")
107 # Set Windows timeout values
108 # timeouts is a tuple with the following items:
109 # (ReadIntervalTimeout,ReadTotalTimeoutMultiplier,
110 # ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier,
111 # WriteTotalTimeoutConstant)
112 if self._timeout is None:
113 timeouts = (0, 0, 0, 0, 0)
114 elif self._timeout == 0:
115 timeouts = (win32.MAXDWORD, 0, 0, 0, 0)
116 else:
117 timeouts = (0, 0, int(self._timeout*1000), 0, 0)
118 if self._timeout != 0 and self._interCharTimeout is not None:
119 timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:]
121 if self._writeTimeout is None:
122 pass
123 elif self._writeTimeout == 0:
124 timeouts = timeouts[:-2] + (0, win32.MAXDWORD)
125 else:
126 timeouts = timeouts[:-2] + (0, int(self._writeTimeout*1000))
127 win32.SetCommTimeouts(self.hComPort, ctypes.byref(win32.COMMTIMEOUTS(*timeouts)))
129 win32.SetCommMask(self.hComPort, win32.EV_ERR)
131 # Setup the connection info.
132 # Get state and modify it:
133 comDCB = win32.DCB()
134 win32.GetCommState(self.hComPort, ctypes.byref(comDCB))
135 comDCB.BaudRate = self._baudrate
137 if self._bytesize == FIVEBITS:
138 comDCB.ByteSize = 5
139 elif self._bytesize == SIXBITS:
140 comDCB.ByteSize = 6
141 elif self._bytesize == SEVENBITS:
142 comDCB.ByteSize = 7
143 elif self._bytesize == EIGHTBITS:
144 comDCB.ByteSize = 8
145 else:
146 raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
148 if self._parity == PARITY_NONE:
149 comDCB.Parity = win32.NOPARITY
150 comDCB.fParity = 0 # Disable Parity Check
151 elif self._parity == PARITY_EVEN:
152 comDCB.Parity = win32.EVENPARITY
153 comDCB.fParity = 1 # Enable Parity Check
154 elif self._parity == PARITY_ODD:
155 comDCB.Parity = win32.ODDPARITY
156 comDCB.fParity = 1 # Enable Parity Check
157 elif self._parity == PARITY_MARK:
158 comDCB.Parity = win32.MARKPARITY
159 comDCB.fParity = 1 # Enable Parity Check
160 elif self._parity == PARITY_SPACE:
161 comDCB.Parity = win32.SPACEPARITY
162 comDCB.fParity = 1 # Enable Parity Check
163 else:
164 raise ValueError("Unsupported parity mode: %r" % self._parity)
166 if self._stopbits == STOPBITS_ONE:
167 comDCB.StopBits = win32.ONESTOPBIT
168 elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
169 comDCB.StopBits = win32.ONE5STOPBITS
170 elif self._stopbits == STOPBITS_TWO:
171 comDCB.StopBits = win32.TWOSTOPBITS
172 else:
173 raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
175 comDCB.fBinary = 1 # Enable Binary Transmission
176 # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE)
177 if self._rtscts:
178 comDCB.fRtsControl = win32.RTS_CONTROL_HANDSHAKE
179 elif self._rtsToggle:
180 comDCB.fRtsControl = win32.RTS_CONTROL_TOGGLE
181 else:
182 comDCB.fRtsControl = self._rtsState
183 if self._dsrdtr:
184 comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE
185 else:
186 comDCB.fDtrControl = self._dtrState
188 if self._rtsToggle:
189 comDCB.fOutxCtsFlow = 0
190 else:
191 comDCB.fOutxCtsFlow = self._rtscts
192 comDCB.fOutxDsrFlow = self._dsrdtr
193 comDCB.fOutX = self._xonxoff
194 comDCB.fInX = self._xonxoff
195 comDCB.fNull = 0
196 comDCB.fErrorChar = 0
197 comDCB.fAbortOnError = 0
198 comDCB.XonChar = XON
199 comDCB.XoffChar = XOFF
201 if not win32.SetCommState(self.hComPort, ctypes.byref(comDCB)):
202 raise ValueError("Cannot configure port, some setting was wrong. Original message: %r" % ctypes.WinError())
204 #~ def __del__(self):
205 #~ self.close()
208 def _close(self):
209 """internal close port helper"""
210 if self.hComPort:
211 # Restore original timeout values:
212 win32.SetCommTimeouts(self.hComPort, self._orgTimeouts)
213 # Close COM-Port:
214 win32.CloseHandle(self.hComPort)
215 if self._overlappedRead is not None:
216 win32.CloseHandle(self._overlappedRead.hEvent)
217 self._overlappedRead = None
218 if self._overlappedWrite is not None:
219 win32.CloseHandle(self._overlappedWrite.hEvent)
220 self._overlappedWrite = None
221 self.hComPort = None
223 def close(self):
224 """Close port"""
225 if self._isOpen:
226 self._close()
227 self._isOpen = False
229 def makeDeviceName(self, port):
230 return device(port)
232 # - - - - - - - - - - - - - - - - - - - - - - - -
234 def inWaiting(self):
235 """Return the number of characters currently in the input buffer."""
236 flags = win32.DWORD()
237 comstat = win32.COMSTAT()
238 if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)):
239 raise SerialException('call to ClearCommError failed')
240 return comstat.cbInQue
242 def read(self, size=1):
243 """Read size bytes from the serial port. If a timeout is set it may
244 return less characters as requested. With no timeout it will block
245 until the requested number of bytes is read."""
246 if not self.hComPort: raise portNotOpenError
247 if size > 0:
248 win32.ResetEvent(self._overlappedRead.hEvent)
249 flags = win32.DWORD()
250 comstat = win32.COMSTAT()
251 if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)):
252 raise SerialException('call to ClearCommError failed')
253 if self.timeout == 0:
254 n = min(comstat.cbInQue, size)
255 if n > 0:
256 buf = ctypes.create_string_buffer(n)
257 rc = win32.DWORD()
258 err = win32.ReadFile(self.hComPort, buf, n, ctypes.byref(rc), ctypes.byref(self._overlappedRead))
259 if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
260 raise SerialException("ReadFile failed (%r)" % ctypes.WinError())
261 err = win32.WaitForSingleObject(self._overlappedRead.hEvent, win32.INFINITE)
262 read = buf.raw[:rc.value]
263 else:
264 read = bytes()
265 else:
266 buf = ctypes.create_string_buffer(size)
267 rc = win32.DWORD()
268 err = win32.ReadFile(self.hComPort, buf, size, ctypes.byref(rc), ctypes.byref(self._overlappedRead))
269 if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
270 raise SerialException("ReadFile failed (%r)" % ctypes.WinError())
271 err = win32.GetOverlappedResult(self.hComPort, ctypes.byref(self._overlappedRead), ctypes.byref(rc), True)
272 read = buf.raw[:rc.value]
273 else:
274 read = bytes()
275 return bytes(read)
277 def write(self, data):
278 """Output the given string over the serial port."""
279 if not self.hComPort: raise portNotOpenError
280 #~ if not isinstance(data, (bytes, bytearray)):
281 #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
282 # convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview
283 data = to_bytes(data)
284 if data:
285 #~ win32event.ResetEvent(self._overlappedWrite.hEvent)
286 n = win32.DWORD()
287 err = win32.WriteFile(self.hComPort, data, len(data), ctypes.byref(n), self._overlappedWrite)
288 if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
289 raise SerialException("WriteFile failed (%r)" % ctypes.WinError())
290 if self._writeTimeout != 0: # if blocking (None) or w/ write timeout (>0)
291 # Wait for the write to complete.
292 #~ win32.WaitForSingleObject(self._overlappedWrite.hEvent, win32.INFINITE)
293 err = win32.GetOverlappedResult(self.hComPort, self._overlappedWrite, ctypes.byref(n), True)
294 if n.value != len(data):
295 raise writeTimeoutError
296 return n.value
297 else:
298 return 0
300 def flush(self):
301 """Flush of file like objects. In this case, wait until all data
302 is written."""
303 while self.outWaiting():
304 time.sleep(0.05)
305 # XXX could also use WaitCommEvent with mask EV_TXEMPTY, but it would
306 # require overlapped IO and its also only possible to set a single mask
307 # on the port---
309 def flushInput(self):
310 """Clear input buffer, discarding all that is in the buffer."""
311 if not self.hComPort: raise portNotOpenError
312 win32.PurgeComm(self.hComPort, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
314 def flushOutput(self):
315 """Clear output buffer, aborting the current output and
316 discarding all that is in the buffer."""
317 if not self.hComPort: raise portNotOpenError
318 win32.PurgeComm(self.hComPort, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT)
320 def sendBreak(self, duration=0.25):
321 """Send break condition. Timed, returns to idle state after given duration."""
322 if not self.hComPort: raise portNotOpenError
323 import time
324 win32.SetCommBreak(self.hComPort)
325 time.sleep(duration)
326 win32.ClearCommBreak(self.hComPort)
328 def setBreak(self, level=1):
329 """Set break: Controls TXD. When active, to transmitting is possible."""
330 if not self.hComPort: raise portNotOpenError
331 if level:
332 win32.SetCommBreak(self.hComPort)
333 else:
334 win32.ClearCommBreak(self.hComPort)
336 def setRTS(self, level=1):
337 """Set terminal status line: Request To Send"""
338 # remember level for reconfigure
339 if level:
340 self._rtsState = win32.RTS_CONTROL_ENABLE
341 else:
342 self._rtsState = win32.RTS_CONTROL_DISABLE
343 # also apply now if port is open
344 if self.hComPort:
345 if level:
346 win32.EscapeCommFunction(self.hComPort, win32.SETRTS)
347 else:
348 win32.EscapeCommFunction(self.hComPort, win32.CLRRTS)
350 def setDTR(self, level=1):
351 """Set terminal status line: Data Terminal Ready"""
352 # remember level for reconfigure
353 if level:
354 self._dtrState = win32.DTR_CONTROL_ENABLE
355 else:
356 self._dtrState = win32.DTR_CONTROL_DISABLE
357 # also apply now if port is open
358 if self.hComPort:
359 if level:
360 win32.EscapeCommFunction(self.hComPort, win32.SETDTR)
361 else:
362 win32.EscapeCommFunction(self.hComPort, win32.CLRDTR)
364 def _GetCommModemStatus(self):
365 stat = win32.DWORD()
366 win32.GetCommModemStatus(self.hComPort, ctypes.byref(stat))
367 return stat.value
369 def getCTS(self):
370 """Read terminal status line: Clear To Send"""
371 if not self.hComPort: raise portNotOpenError
372 return win32.MS_CTS_ON & self._GetCommModemStatus() != 0
374 def getDSR(self):
375 """Read terminal status line: Data Set Ready"""
376 if not self.hComPort: raise portNotOpenError
377 return win32.MS_DSR_ON & self._GetCommModemStatus() != 0
379 def getRI(self):
380 """Read terminal status line: Ring Indicator"""
381 if not self.hComPort: raise portNotOpenError
382 return win32.MS_RING_ON & self._GetCommModemStatus() != 0
384 def getCD(self):
385 """Read terminal status line: Carrier Detect"""
386 if not self.hComPort: raise portNotOpenError
387 return win32.MS_RLSD_ON & self._GetCommModemStatus() != 0
389 # - - platform specific - - - -
391 def setBufferSize(self, rx_size=4096, tx_size=None):
392 """\
393 Recommend a buffer size to the driver (device driver can ignore this
394 vlaue). Must be called before the port is opended.
396 if tx_size is None: tx_size = rx_size
397 win32.SetupComm(self.hComPort, rx_size, tx_size)
399 def setXON(self, level=True):
400 """\
401 Manually control flow - when software flow control is enabled.
402 This will send XON (true) and XOFF (false) to the other device.
403 WARNING: this function is not portable to different platforms!
405 if not self.hComPort: raise portNotOpenError
406 if level:
407 win32.EscapeCommFunction(self.hComPort, win32.SETXON)
408 else:
409 win32.EscapeCommFunction(self.hComPort, win32.SETXOFF)
411 def outWaiting(self):
412 """return how many characters the in the outgoing buffer"""
413 flags = win32.DWORD()
414 comstat = win32.COMSTAT()
415 if not win32.ClearCommError(self.hComPort, ctypes.byref(flags), ctypes.byref(comstat)):
416 raise SerialException('call to ClearCommError failed')
417 return comstat.cbOutQue
419 # functions useful for RS-485 adapters
420 def setRtsToggle(self, rtsToggle):
421 """Change RTS toggle control setting."""
422 self._rtsToggle = rtsToggle
423 if self._isOpen: self._reconfigurePort()
425 def getRtsToggle(self):
426 """Get the current RTS toggle control setting."""
427 return self._rtsToggle
429 rtsToggle = property(getRtsToggle, setRtsToggle, doc="RTS toggle control setting")
432 # assemble Serial class with the platform specific implementation and the base
433 # for file-like behavior. for Python 2.6 and newer, that provide the new I/O
434 # library, derive from io.RawIOBase
435 try:
436 import io
437 except ImportError:
438 # classic version with our own file-like emulation
439 class Serial(Win32Serial, FileLike):
440 pass
441 else:
442 # io library present
443 class Serial(Win32Serial, io.RawIOBase):
444 pass
447 # Nur Testfunktion!!
448 if __name__ == '__main__':
449 s = Serial(0)
450 sys.stdout.write("%s\n" % s)
452 s = Serial()
453 sys.stdout.write("%s\n" % s)
455 s.baudrate = 19200
456 s.databits = 7
457 s.close()
458 s.port = 0
459 s.open()
460 sys.stdout.write("%s\n" % s)