Add long running gmail memory benchmark for background tab.
[chromium-blink-merge.git] / tools / telemetry / third_party / pyserial / serial / serialcli.py
blob19169a34d8b81cf7aa53b650ae714270b35c200d
1 #! python
2 # Python Serial Port Extension for Win32, Linux, BSD, Jython and .NET/Mono
3 # serial driver for .NET/Mono (IronPython), .NET >= 2
4 # see __init__.py
6 # (C) 2008 Chris Liechti <cliechti@gmx.net>
7 # this is distributed under a free software license, see license.txt
9 import clr
10 import System
11 import System.IO.Ports
12 from serial.serialutil import *
15 def device(portnum):
16 """Turn a port number into a device name"""
17 return System.IO.Ports.SerialPort.GetPortNames()[portnum]
20 # must invoke function with byte array, make a helper to convert strings
21 # to byte arrays
22 sab = System.Array[System.Byte]
23 def as_byte_array(string):
24 return sab([ord(x) for x in string]) # XXX will require adaption when run with a 3.x compatible IronPython
26 class IronSerial(SerialBase):
27 """Serial port implementation for .NET/Mono."""
29 BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
30 9600, 19200, 38400, 57600, 115200)
32 def open(self):
33 """Open port with current settings. This may throw a SerialException
34 if the port cannot be opened."""
35 if self._port is None:
36 raise SerialException("Port must be configured before it can be used.")
37 if self._isOpen:
38 raise SerialException("Port is already open.")
39 try:
40 self._port_handle = System.IO.Ports.SerialPort(self.portstr)
41 except Exception, msg:
42 self._port_handle = None
43 raise SerialException("could not open port %s: %s" % (self.portstr, msg))
45 self._reconfigurePort()
46 self._port_handle.Open()
47 self._isOpen = True
48 if not self._rtscts:
49 self.setRTS(True)
50 self.setDTR(True)
51 self.flushInput()
52 self.flushOutput()
54 def _reconfigurePort(self):
55 """Set communication parameters on opened port."""
56 if not self._port_handle:
57 raise SerialException("Can only operate on a valid port handle")
59 #~ self._port_handle.ReceivedBytesThreshold = 1
61 if self._timeout is None:
62 self._port_handle.ReadTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
63 else:
64 self._port_handle.ReadTimeout = int(self._timeout*1000)
66 # if self._timeout != 0 and self._interCharTimeout is not None:
67 # timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:]
69 if self._writeTimeout is None:
70 self._port_handle.WriteTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
71 else:
72 self._port_handle.WriteTimeout = int(self._writeTimeout*1000)
75 # Setup the connection info.
76 try:
77 self._port_handle.BaudRate = self._baudrate
78 except IOError, e:
79 # catch errors from illegal baudrate settings
80 raise ValueError(str(e))
82 if self._bytesize == FIVEBITS:
83 self._port_handle.DataBits = 5
84 elif self._bytesize == SIXBITS:
85 self._port_handle.DataBits = 6
86 elif self._bytesize == SEVENBITS:
87 self._port_handle.DataBits = 7
88 elif self._bytesize == EIGHTBITS:
89 self._port_handle.DataBits = 8
90 else:
91 raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
93 if self._parity == PARITY_NONE:
94 self._port_handle.Parity = getattr(System.IO.Ports.Parity, 'None') # reserved keyword in Py3k
95 elif self._parity == PARITY_EVEN:
96 self._port_handle.Parity = System.IO.Ports.Parity.Even
97 elif self._parity == PARITY_ODD:
98 self._port_handle.Parity = System.IO.Ports.Parity.Odd
99 elif self._parity == PARITY_MARK:
100 self._port_handle.Parity = System.IO.Ports.Parity.Mark
101 elif self._parity == PARITY_SPACE:
102 self._port_handle.Parity = System.IO.Ports.Parity.Space
103 else:
104 raise ValueError("Unsupported parity mode: %r" % self._parity)
106 if self._stopbits == STOPBITS_ONE:
107 self._port_handle.StopBits = System.IO.Ports.StopBits.One
108 elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
109 self._port_handle.StopBits = System.IO.Ports.StopBits.OnePointFive
110 elif self._stopbits == STOPBITS_TWO:
111 self._port_handle.StopBits = System.IO.Ports.StopBits.Two
112 else:
113 raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
115 if self._rtscts and self._xonxoff:
116 self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSendXOnXOff
117 elif self._rtscts:
118 self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSend
119 elif self._xonxoff:
120 self._port_handle.Handshake = System.IO.Ports.Handshake.XOnXOff
121 else:
122 self._port_handle.Handshake = getattr(System.IO.Ports.Handshake, 'None') # reserved keyword in Py3k
124 #~ def __del__(self):
125 #~ self.close()
127 def close(self):
128 """Close port"""
129 if self._isOpen:
130 if self._port_handle:
131 try:
132 self._port_handle.Close()
133 except System.IO.Ports.InvalidOperationException:
134 # ignore errors. can happen for unplugged USB serial devices
135 pass
136 self._port_handle = None
137 self._isOpen = False
139 def makeDeviceName(self, port):
140 try:
141 return device(port)
142 except TypeError, e:
143 raise SerialException(str(e))
145 # - - - - - - - - - - - - - - - - - - - - - - - -
147 def inWaiting(self):
148 """Return the number of characters currently in the input buffer."""
149 if not self._port_handle: raise portNotOpenError
150 return self._port_handle.BytesToRead
152 def read(self, size=1):
153 """Read size bytes from the serial port. If a timeout is set it may
154 return less characters as requested. With no timeout it will block
155 until the requested number of bytes is read."""
156 if not self._port_handle: raise portNotOpenError
157 # must use single byte reads as this is the only way to read
158 # without applying encodings
159 data = bytearray()
160 while size:
161 try:
162 data.append(self._port_handle.ReadByte())
163 except System.TimeoutException, e:
164 break
165 else:
166 size -= 1
167 return bytes(data)
169 def write(self, data):
170 """Output the given string over the serial port."""
171 if not self._port_handle: raise portNotOpenError
172 if not isinstance(data, (bytes, bytearray)):
173 raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
174 try:
175 # must call overloaded method with byte array argument
176 # as this is the only one not applying encodings
177 self._port_handle.Write(as_byte_array(data), 0, len(data))
178 except System.TimeoutException, e:
179 raise writeTimeoutError
180 return len(data)
182 def flushInput(self):
183 """Clear input buffer, discarding all that is in the buffer."""
184 if not self._port_handle: raise portNotOpenError
185 self._port_handle.DiscardInBuffer()
187 def flushOutput(self):
188 """Clear output buffer, aborting the current output and
189 discarding all that is in the buffer."""
190 if not self._port_handle: raise portNotOpenError
191 self._port_handle.DiscardOutBuffer()
193 def sendBreak(self, duration=0.25):
194 """Send break condition. Timed, returns to idle state after given duration."""
195 if not self._port_handle: raise portNotOpenError
196 import time
197 self._port_handle.BreakState = True
198 time.sleep(duration)
199 self._port_handle.BreakState = False
201 def setBreak(self, level=True):
202 """Set break: Controls TXD. When active, to transmitting is possible."""
203 if not self._port_handle: raise portNotOpenError
204 self._port_handle.BreakState = bool(level)
206 def setRTS(self, level=True):
207 """Set terminal status line: Request To Send"""
208 if not self._port_handle: raise portNotOpenError
209 self._port_handle.RtsEnable = bool(level)
211 def setDTR(self, level=True):
212 """Set terminal status line: Data Terminal Ready"""
213 if not self._port_handle: raise portNotOpenError
214 self._port_handle.DtrEnable = bool(level)
216 def getCTS(self):
217 """Read terminal status line: Clear To Send"""
218 if not self._port_handle: raise portNotOpenError
219 return self._port_handle.CtsHolding
221 def getDSR(self):
222 """Read terminal status line: Data Set Ready"""
223 if not self._port_handle: raise portNotOpenError
224 return self._port_handle.DsrHolding
226 def getRI(self):
227 """Read terminal status line: Ring Indicator"""
228 if not self._port_handle: raise portNotOpenError
229 #~ return self._port_handle.XXX
230 return False #XXX an error would be better
232 def getCD(self):
233 """Read terminal status line: Carrier Detect"""
234 if not self._port_handle: raise portNotOpenError
235 return self._port_handle.CDHolding
237 # - - platform specific - - - -
238 # none
241 # assemble Serial class with the platform specific implementation and the base
242 # for file-like behavior. for Python 2.6 and newer, that provide the new I/O
243 # library, derive from io.RawIOBase
244 try:
245 import io
246 except ImportError:
247 # classic version with our own file-like emulation
248 class Serial(IronSerial, FileLike):
249 pass
250 else:
251 # io library present
252 class Serial(IronSerial, io.RawIOBase):
253 pass
256 # Nur Testfunktion!!
257 if __name__ == '__main__':
258 import sys
260 s = Serial(0)
261 sys.stdio.write('%s\n' % s)
263 s = Serial()
264 sys.stdio.write('%s\n' % s)
267 s.baudrate = 19200
268 s.databits = 7
269 s.close()
270 s.port = 0
271 s.open()
272 sys.stdio.write('%s\n' % s)