2 # Python Serial Port Extension for Win32, Linux, BSD, Jython
3 # serial driver for win32
6 # (C) 2001-2011 Chris Liechti <cliechti@gmx.net>
7 # this is distributed under a free software license, see license.txt
9 # Initial patch to use ctypes by Giovanni Bajo <rasky@develer.com>
12 from serial
import win32
14 from serial
.serialutil
import *
18 """Turn a port number into a device name"""
19 return 'COM%d' % (portnum
+1) # numbers are transformed to a string
22 class Win32Serial(SerialBase
):
23 """Serial port implementation for Win32 based on ctypes."""
25 BAUDRATES
= (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
26 9600, 19200, 38400, 57600, 115200)
28 def __init__(self
, *args
, **kwargs
):
30 self
._overlappedRead
= None
31 self
._overlappedWrite
= None
32 self
._rtsToggle
= False
34 self
._rtsState
= win32
.RTS_CONTROL_ENABLE
35 self
._dtrState
= win32
.DTR_CONTROL_ENABLE
38 SerialBase
.__init
__(self
, *args
, **kwargs
)
41 """Open port with current settings. This may throw a SerialException
42 if the port cannot be opened."""
43 if self
._port
is None:
44 raise SerialException("Port must be configured before it can be used.")
46 raise SerialException("Port is already open.")
47 # the "\\.\COMx" format is required for devices other than COM1-COM8
48 # not all versions of windows seem to support this properly
49 # so that the first few ports are used with the DOS device name
52 if port
.upper().startswith('COM') and int(port
[3:]) > 8:
53 port
= '\\\\.\\' + port
55 # for like COMnotanumber
57 self
.hComPort
= win32
.CreateFile(port
,
58 win32
.GENERIC_READ | win32
.GENERIC_WRITE
,
62 win32
.FILE_ATTRIBUTE_NORMAL | win32
.FILE_FLAG_OVERLAPPED
,
64 if self
.hComPort
== win32
.INVALID_HANDLE_VALUE
:
65 self
.hComPort
= None # 'cause __del__ is called anyway
66 raise SerialException("could not open port %r: %r" % (self
.portstr
, ctypes
.WinError()))
69 self
._overlappedRead
= win32
.OVERLAPPED()
70 self
._overlappedRead
.hEvent
= win32
.CreateEvent(None, 1, 0, None)
71 self
._overlappedWrite
= win32
.OVERLAPPED()
72 #~ self._overlappedWrite.hEvent = win32.CreateEvent(None, 1, 0, None)
73 self
._overlappedWrite
.hEvent
= win32
.CreateEvent(None, 0, 0, None)
76 win32
.SetupComm(self
.hComPort
, 4096, 4096)
78 # Save original timeout values:
79 self
._orgTimeouts
= win32
.COMMTIMEOUTS()
80 win32
.GetCommTimeouts(self
.hComPort
, ctypes
.byref(self
._orgTimeouts
))
82 self
._reconfigurePort
()
85 # Remove anything that was there
86 win32
.PurgeComm(self
.hComPort
,
87 win32
.PURGE_TXCLEAR | win32
.PURGE_TXABORT |
88 win32
.PURGE_RXCLEAR | win32
.PURGE_RXABORT
)
93 # ignore any exception when closing the port
94 # also to keep original exception that happened when setting up
102 def _reconfigurePort(self
):
103 """Set communication parameters on opened port."""
104 if not self
.hComPort
:
105 raise SerialException("Can only operate on a valid port handle")
107 # Set Windows timeout values
108 # timeouts is a tuple with the following items:
109 # (ReadIntervalTimeout,ReadTotalTimeoutMultiplier,
110 # ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier,
111 # WriteTotalTimeoutConstant)
112 if self
._timeout
is None:
113 timeouts
= (0, 0, 0, 0, 0)
114 elif self
._timeout
== 0:
115 timeouts
= (win32
.MAXDWORD
, 0, 0, 0, 0)
117 timeouts
= (0, 0, int(self
._timeout
*1000), 0, 0)
118 if self
._timeout
!= 0 and self
._interCharTimeout
is not None:
119 timeouts
= (int(self
._interCharTimeout
* 1000),) + timeouts
[1:]
121 if self
._writeTimeout
is None:
123 elif self
._writeTimeout
== 0:
124 timeouts
= timeouts
[:-2] + (0, win32
.MAXDWORD
)
126 timeouts
= timeouts
[:-2] + (0, int(self
._writeTimeout
*1000))
127 win32
.SetCommTimeouts(self
.hComPort
, ctypes
.byref(win32
.COMMTIMEOUTS(*timeouts
)))
129 win32
.SetCommMask(self
.hComPort
, win32
.EV_ERR
)
131 # Setup the connection info.
132 # Get state and modify it:
134 win32
.GetCommState(self
.hComPort
, ctypes
.byref(comDCB
))
135 comDCB
.BaudRate
= self
._baudrate
137 if self
._bytesize
== FIVEBITS
:
139 elif self
._bytesize
== SIXBITS
:
141 elif self
._bytesize
== SEVENBITS
:
143 elif self
._bytesize
== EIGHTBITS
:
146 raise ValueError("Unsupported number of data bits: %r" % self
._bytesize
)
148 if self
._parity
== PARITY_NONE
:
149 comDCB
.Parity
= win32
.NOPARITY
150 comDCB
.fParity
= 0 # Disable Parity Check
151 elif self
._parity
== PARITY_EVEN
:
152 comDCB
.Parity
= win32
.EVENPARITY
153 comDCB
.fParity
= 1 # Enable Parity Check
154 elif self
._parity
== PARITY_ODD
:
155 comDCB
.Parity
= win32
.ODDPARITY
156 comDCB
.fParity
= 1 # Enable Parity Check
157 elif self
._parity
== PARITY_MARK
:
158 comDCB
.Parity
= win32
.MARKPARITY
159 comDCB
.fParity
= 1 # Enable Parity Check
160 elif self
._parity
== PARITY_SPACE
:
161 comDCB
.Parity
= win32
.SPACEPARITY
162 comDCB
.fParity
= 1 # Enable Parity Check
164 raise ValueError("Unsupported parity mode: %r" % self
._parity
)
166 if self
._stopbits
== STOPBITS_ONE
:
167 comDCB
.StopBits
= win32
.ONESTOPBIT
168 elif self
._stopbits
== STOPBITS_ONE_POINT_FIVE
:
169 comDCB
.StopBits
= win32
.ONE5STOPBITS
170 elif self
._stopbits
== STOPBITS_TWO
:
171 comDCB
.StopBits
= win32
.TWOSTOPBITS
173 raise ValueError("Unsupported number of stop bits: %r" % self
._stopbits
)
175 comDCB
.fBinary
= 1 # Enable Binary Transmission
176 # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE)
178 comDCB
.fRtsControl
= win32
.RTS_CONTROL_HANDSHAKE
179 elif self
._rtsToggle
:
180 comDCB
.fRtsControl
= win32
.RTS_CONTROL_TOGGLE
182 comDCB
.fRtsControl
= self
._rtsState
184 comDCB
.fDtrControl
= win32
.DTR_CONTROL_HANDSHAKE
186 comDCB
.fDtrControl
= self
._dtrState
189 comDCB
.fOutxCtsFlow
= 0
191 comDCB
.fOutxCtsFlow
= self
._rtscts
192 comDCB
.fOutxDsrFlow
= self
._dsrdtr
193 comDCB
.fOutX
= self
._xonxoff
194 comDCB
.fInX
= self
._xonxoff
196 comDCB
.fErrorChar
= 0
197 comDCB
.fAbortOnError
= 0
199 comDCB
.XoffChar
= XOFF
201 if not win32
.SetCommState(self
.hComPort
, ctypes
.byref(comDCB
)):
202 raise ValueError("Cannot configure port, some setting was wrong. Original message: %r" % ctypes
.WinError())
204 #~ def __del__(self):
209 """internal close port helper"""
211 # Restore original timeout values:
212 win32
.SetCommTimeouts(self
.hComPort
, self
._orgTimeouts
)
214 win32
.CloseHandle(self
.hComPort
)
215 if self
._overlappedRead
is not None:
216 win32
.CloseHandle(self
._overlappedRead
.hEvent
)
217 self
._overlappedRead
= None
218 if self
._overlappedWrite
is not None:
219 win32
.CloseHandle(self
._overlappedWrite
.hEvent
)
220 self
._overlappedWrite
= None
229 def makeDeviceName(self
, port
):
232 # - - - - - - - - - - - - - - - - - - - - - - - -
235 """Return the number of characters currently in the input buffer."""
236 flags
= win32
.DWORD()
237 comstat
= win32
.COMSTAT()
238 if not win32
.ClearCommError(self
.hComPort
, ctypes
.byref(flags
), ctypes
.byref(comstat
)):
239 raise SerialException('call to ClearCommError failed')
240 return comstat
.cbInQue
242 def read(self
, size
=1):
243 """Read size bytes from the serial port. If a timeout is set it may
244 return less characters as requested. With no timeout it will block
245 until the requested number of bytes is read."""
246 if not self
.hComPort
: raise portNotOpenError
248 win32
.ResetEvent(self
._overlappedRead
.hEvent
)
249 flags
= win32
.DWORD()
250 comstat
= win32
.COMSTAT()
251 if not win32
.ClearCommError(self
.hComPort
, ctypes
.byref(flags
), ctypes
.byref(comstat
)):
252 raise SerialException('call to ClearCommError failed')
253 if self
.timeout
== 0:
254 n
= min(comstat
.cbInQue
, size
)
256 buf
= ctypes
.create_string_buffer(n
)
258 err
= win32
.ReadFile(self
.hComPort
, buf
, n
, ctypes
.byref(rc
), ctypes
.byref(self
._overlappedRead
))
259 if not err
and win32
.GetLastError() != win32
.ERROR_IO_PENDING
:
260 raise SerialException("ReadFile failed (%r)" % ctypes
.WinError())
261 err
= win32
.WaitForSingleObject(self
._overlappedRead
.hEvent
, win32
.INFINITE
)
262 read
= buf
.raw
[:rc
.value
]
266 buf
= ctypes
.create_string_buffer(size
)
268 err
= win32
.ReadFile(self
.hComPort
, buf
, size
, ctypes
.byref(rc
), ctypes
.byref(self
._overlappedRead
))
269 if not err
and win32
.GetLastError() != win32
.ERROR_IO_PENDING
:
270 raise SerialException("ReadFile failed (%r)" % ctypes
.WinError())
271 err
= win32
.GetOverlappedResult(self
.hComPort
, ctypes
.byref(self
._overlappedRead
), ctypes
.byref(rc
), True)
272 read
= buf
.raw
[:rc
.value
]
277 def write(self
, data
):
278 """Output the given string over the serial port."""
279 if not self
.hComPort
: raise portNotOpenError
280 #~ if not isinstance(data, (bytes, bytearray)):
281 #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
282 # convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview
283 data
= to_bytes(data
)
285 #~ win32event.ResetEvent(self._overlappedWrite.hEvent)
287 err
= win32
.WriteFile(self
.hComPort
, data
, len(data
), ctypes
.byref(n
), self
._overlappedWrite
)
288 if not err
and win32
.GetLastError() != win32
.ERROR_IO_PENDING
:
289 raise SerialException("WriteFile failed (%r)" % ctypes
.WinError())
290 if self
._writeTimeout
!= 0: # if blocking (None) or w/ write timeout (>0)
291 # Wait for the write to complete.
292 #~ win32.WaitForSingleObject(self._overlappedWrite.hEvent, win32.INFINITE)
293 err
= win32
.GetOverlappedResult(self
.hComPort
, self
._overlappedWrite
, ctypes
.byref(n
), True)
294 if n
.value
!= len(data
):
295 raise writeTimeoutError
301 """Flush of file like objects. In this case, wait until all data
303 while self
.outWaiting():
305 # XXX could also use WaitCommEvent with mask EV_TXEMPTY, but it would
306 # require overlapped IO and its also only possible to set a single mask
309 def flushInput(self
):
310 """Clear input buffer, discarding all that is in the buffer."""
311 if not self
.hComPort
: raise portNotOpenError
312 win32
.PurgeComm(self
.hComPort
, win32
.PURGE_RXCLEAR | win32
.PURGE_RXABORT
)
314 def flushOutput(self
):
315 """Clear output buffer, aborting the current output and
316 discarding all that is in the buffer."""
317 if not self
.hComPort
: raise portNotOpenError
318 win32
.PurgeComm(self
.hComPort
, win32
.PURGE_TXCLEAR | win32
.PURGE_TXABORT
)
320 def sendBreak(self
, duration
=0.25):
321 """Send break condition. Timed, returns to idle state after given duration."""
322 if not self
.hComPort
: raise portNotOpenError
324 win32
.SetCommBreak(self
.hComPort
)
326 win32
.ClearCommBreak(self
.hComPort
)
328 def setBreak(self
, level
=1):
329 """Set break: Controls TXD. When active, to transmitting is possible."""
330 if not self
.hComPort
: raise portNotOpenError
332 win32
.SetCommBreak(self
.hComPort
)
334 win32
.ClearCommBreak(self
.hComPort
)
336 def setRTS(self
, level
=1):
337 """Set terminal status line: Request To Send"""
338 # remember level for reconfigure
340 self
._rtsState
= win32
.RTS_CONTROL_ENABLE
342 self
._rtsState
= win32
.RTS_CONTROL_DISABLE
343 # also apply now if port is open
346 win32
.EscapeCommFunction(self
.hComPort
, win32
.SETRTS
)
348 win32
.EscapeCommFunction(self
.hComPort
, win32
.CLRRTS
)
350 def setDTR(self
, level
=1):
351 """Set terminal status line: Data Terminal Ready"""
352 # remember level for reconfigure
354 self
._dtrState
= win32
.DTR_CONTROL_ENABLE
356 self
._dtrState
= win32
.DTR_CONTROL_DISABLE
357 # also apply now if port is open
360 win32
.EscapeCommFunction(self
.hComPort
, win32
.SETDTR
)
362 win32
.EscapeCommFunction(self
.hComPort
, win32
.CLRDTR
)
364 def _GetCommModemStatus(self
):
366 win32
.GetCommModemStatus(self
.hComPort
, ctypes
.byref(stat
))
370 """Read terminal status line: Clear To Send"""
371 if not self
.hComPort
: raise portNotOpenError
372 return win32
.MS_CTS_ON
& self
._GetCommModemStatus
() != 0
375 """Read terminal status line: Data Set Ready"""
376 if not self
.hComPort
: raise portNotOpenError
377 return win32
.MS_DSR_ON
& self
._GetCommModemStatus
() != 0
380 """Read terminal status line: Ring Indicator"""
381 if not self
.hComPort
: raise portNotOpenError
382 return win32
.MS_RING_ON
& self
._GetCommModemStatus
() != 0
385 """Read terminal status line: Carrier Detect"""
386 if not self
.hComPort
: raise portNotOpenError
387 return win32
.MS_RLSD_ON
& self
._GetCommModemStatus
() != 0
389 # - - platform specific - - - -
391 def setBufferSize(self
, rx_size
=4096, tx_size
=None):
393 Recommend a buffer size to the driver (device driver can ignore this
394 vlaue). Must be called before the port is opended.
396 if tx_size
is None: tx_size
= rx_size
397 win32
.SetupComm(self
.hComPort
, rx_size
, tx_size
)
399 def setXON(self
, level
=True):
401 Manually control flow - when software flow control is enabled.
402 This will send XON (true) and XOFF (false) to the other device.
403 WARNING: this function is not portable to different platforms!
405 if not self
.hComPort
: raise portNotOpenError
407 win32
.EscapeCommFunction(self
.hComPort
, win32
.SETXON
)
409 win32
.EscapeCommFunction(self
.hComPort
, win32
.SETXOFF
)
411 def outWaiting(self
):
412 """return how many characters the in the outgoing buffer"""
413 flags
= win32
.DWORD()
414 comstat
= win32
.COMSTAT()
415 if not win32
.ClearCommError(self
.hComPort
, ctypes
.byref(flags
), ctypes
.byref(comstat
)):
416 raise SerialException('call to ClearCommError failed')
417 return comstat
.cbOutQue
419 # functions useful for RS-485 adapters
420 def setRtsToggle(self
, rtsToggle
):
421 """Change RTS toggle control setting."""
422 self
._rtsToggle
= rtsToggle
423 if self
._isOpen
: self
._reconfigurePort
()
425 def getRtsToggle(self
):
426 """Get the current RTS toggle control setting."""
427 return self
._rtsToggle
429 rtsToggle
= property(getRtsToggle
, setRtsToggle
, doc
="RTS toggle control setting")
432 # assemble Serial class with the platform specific implementation and the base
433 # for file-like behavior. for Python 2.6 and newer, that provide the new I/O
434 # library, derive from io.RawIOBase
438 # classic version with our own file-like emulation
439 class Serial(Win32Serial
, FileLike
):
443 class Serial(Win32Serial
, io
.RawIOBase
):
448 if __name__
== '__main__':
450 sys
.stdout
.write("%s\n" % s
)
453 sys
.stdout
.write("%s\n" % s
)
460 sys
.stdout
.write("%s\n" % s
)