1 '''This is like pexpect, but it will work with any file descriptor that you
2 pass it. You are responsible for opening and close the file descriptor.
3 This allows you to use Pexpect with sockets and named pipes (FIFOs).
7 This license is approved by the OSI and FSF as GPL-compatible.
8 http://opensource.org/licenses/isc-license.txt
10 Copyright (c) 2012, Noah Spurrier <noah@noah.org>
11 PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
12 PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
13 COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
14 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
15 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
16 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
17 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
18 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
19 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
20 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
24 from .spawnbase
import SpawnBase
25 from .exceptions
import ExceptionPexpect
, TIMEOUT
26 from .utils
import select_ignore_interrupts
, poll_ignore_interrupts
31 class fdspawn(SpawnBase
):
32 '''This is like pexpect.spawn but allows you to supply your own open file
33 descriptor. For example, you could use it to read through a file looking
34 for patterns, or to control a modem or serial device. '''
36 def __init__ (self
, fd
, args
=None, timeout
=30, maxread
=2000, searchwindowsize
=None,
37 logfile
=None, encoding
=None, codec_errors
='strict', use_poll
=False):
38 '''This takes a file descriptor (an int) or an object that support the
39 fileno() method (returning an int). All Python file-like objects
42 if type(fd
) != type(0) and hasattr(fd
, 'fileno'):
45 if type(fd
) != type(0):
46 raise ExceptionPexpect('The fd argument is not an int. If this is a command string then maybe you want to use pexpect.spawn.')
48 try: # make sure fd is a valid file descriptor
51 raise ExceptionPexpect('The fd argument is not a valid file descriptor.')
55 SpawnBase
.__init
__(self
, timeout
, maxread
, searchwindowsize
, logfile
,
56 encoding
=encoding
, codec_errors
=codec_errors
)
60 self
.name
= '<file descriptor %d>' % fd
61 self
.use_poll
= use_poll
64 """Close the file descriptor.
66 Calling this method a second time does nothing, but if the file
67 descriptor was closed elsewhere, :class:`OSError` will be raised.
69 if self
.child_fd
== -1:
73 os
.close(self
.child_fd
)
78 '''This checks if the file descriptor is still valid. If :func:`os.fstat`
79 does not raise an exception then we assume it is alive. '''
81 if self
.child_fd
== -1:
84 os
.fstat(self
.child_fd
)
89 def terminate (self
, force
=False): # pragma: no cover
90 '''Deprecated and invalid. Just raises an exception.'''
91 raise ExceptionPexpect('This method is not valid for file descriptors.')
93 # These four methods are left around for backwards compatibility, but not
94 # documented as part of fdpexpect. You're encouraged to use os.write
97 "Write to fd, return number of bytes written"
98 s
= self
._coerce
_send
_string
(s
)
101 b
= self
._encoder
.encode(s
, final
=False)
102 return os
.write(self
.child_fd
, b
)
104 def sendline(self
, s
):
105 "Write to fd with trailing newline, return number of bytes written"
106 s
= self
._coerce
_send
_string
(s
)
107 return self
.send(s
+ self
.linesep
)
110 "Write to fd, return None"
113 def writelines(self
, sequence
):
114 "Call self.write() for each item in sequence"
118 def read_nonblocking(self
, size
=1, timeout
=-1):
120 Read from the file descriptor and return the result as a string.
122 The read_nonblocking method of :class:`SpawnBase` assumes that a call
123 to os.read will not block (timeout parameter is ignored). This is not
124 the case for POSIX file-like objects such as sockets and serial ports.
126 Use :func:`select.select`, timeout is implemented conditionally for
129 :param int size: Read at most *size* bytes.
130 :param int timeout: Wait timeout seconds for file descriptor to be
131 ready to read. When -1 (default), use self.timeout. When 0, poll.
132 :return: String containing the bytes read
134 if os
.name
== 'posix':
136 timeout
= self
.timeout
137 rlist
= [self
.child_fd
]
141 rlist
= poll_ignore_interrupts(rlist
, timeout
)
143 rlist
, wlist
, xlist
= select_ignore_interrupts(
144 rlist
, wlist
, xlist
, timeout
146 if self
.child_fd
not in rlist
:
147 raise TIMEOUT('Timeout exceeded.')
148 return super(fdspawn
, self
).read_nonblocking(size
)