1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
| import codecs
import errno
import fcntl
import io
import os
import pty
import resource
import signal
import struct
import sys
import termios
import time
try:
import builtins # Python 3
except ImportError:
import __builtin__ as builtins # Python 2
# Constants
from pty import (STDIN_FILENO, CHILD)
from .util import which, PtyProcessError
_platform = sys.platform.lower()
# Solaris uses internal __fork_pty(). All others use pty.fork().
_is_solaris = (
_platform.startswith('solaris') or
_platform.startswith('sunos'))
if _is_solaris:
use_native_pty_fork = False
from . import _fork_pty
else:
use_native_pty_fork = True
PY3 = sys.version_info[0] >= 3
if PY3:
def _byte(i):
return bytes([i])
else:
def _byte(i):
return chr(i)
class FileNotFoundError(OSError): pass
class TimeoutError(OSError): pass
_EOF, _INTR = None, None
def _make_eof_intr():
"""Set constants _EOF and _INTR.
This avoids doing potentially costly operations on module load.
"""
global _EOF, _INTR
if (_EOF is not None) and (_INTR is not None):
return
# inherit EOF and INTR definitions from controlling process.
try:
from termios import VEOF, VINTR
fd = None
for name in 'stdin', 'stdout':
stream = getattr(sys, '__%s__' % name, None)
if stream is None or not hasattr(stream, 'fileno'):
continue
try:
fd = stream.fileno()
except ValueError:
continue
if fd is None:
# no fd, raise ValueError to fallback on CEOF, CINTR
raise ValueError("No stream has a fileno")
intr = ord(termios.tcgetattr(fd)[6][VINTR])
eof = ord(termios.tcgetattr(fd)[6][VEOF])
except (ImportError, OSError, IOError, ValueError, termios.error):
# unless the controlling process is also not a terminal,
# such as cron(1), or when stdin and stdout are both closed.
# Fall-back to using CEOF and CINTR. There
try:
from termios import CEOF, CINTR
(intr, eof) = (CINTR, CEOF)
except ImportError:
# ^C, ^D
(intr, eof) = (3, 4)
_INTR = _byte(intr)
_EOF = _byte(eof)
# setecho and setwinsize are pulled out here because on some platforms, we need
# to do this from the child before we exec()
def _setecho(fd, state):
errmsg = 'setecho() may not be called on this platform (it may still be possible to enable/disable echo when spawning the child process)'
try:
attr = termios.tcgetattr(fd)
except termios.error as err:
if err.args[0] == errno.EINVAL:
raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
raise
if state:
attr[3] = attr[3] | termios.ECHO
else:
attr[3] = attr[3] & ~termios.ECHO
try:
# I tried TCSADRAIN and TCSAFLUSH, but these were inconsistent and
# blocked on some platforms. TCSADRAIN would probably be ideal.
termios.tcsetattr(fd, termios.TCSANOW, attr)
except IOError as err:
if err.args[0] == errno.EINVAL:
raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
raise
def _setwinsize(fd, rows, cols):
# Some very old platforms have a bug that causes the value for
# termios.TIOCSWINSZ to be truncated. There was a hack here to work
# around this, but it caused problems with newer platforms so has been
# removed. For details see https://github.com/pexpect/pexpect/issues/39
TIOCSWINSZ = getattr(termios, 'TIOCSWINSZ', -2146929561)
# Note, assume ws_xpixel and ws_ypixel are zero.
s = struct.pack('HHHH', rows, cols, 0, 0)
fcntl.ioctl(fd, TIOCSWINSZ, s)
class PtyProcess(object):
'''This class represents a process running in a pseudoterminal.
The main constructor is the :meth:`spawn` classmethod.
'''
string_type = bytes
if PY3:
linesep = os.linesep.encode('ascii')
crlf = '\r\n'.encode('ascii')
@staticmethod
def write_to_stdout(b):
try:
return sys.stdout.buffer.write(b)
except AttributeError:
# If stdout has been replaced, it may not have .buffer
return sys.stdout.write(b.decode('ascii', 'replace'))
else:
linesep = os.linesep
crlf = '\r\n'
write_to_stdout = sys.stdout.write
encoding = None
argv = None
env = None
launch_dir = None
def __init__(self, pid, fd):
_make_eof_intr() # Ensure _EOF and _INTR are calculated
self.pid = pid
self.fd = fd
readf = io.open(fd, 'rb', buffering=0)
writef = io.open(fd, 'wb', buffering=0, closefd=False)
self.fileobj = io.BufferedRWPair(readf, writef)
self.terminated = False
self.closed = False
self.exitstatus = None
self.signalstatus = None
# status returned by os.waitpid
self.status = None
self.flag_eof = False
# Used by close() to give kernel time to update process status.
# Time in seconds.
self.delayafterclose = 0.1
# Used by terminate() to give kernel time to update process status.
# Time in seconds.
self.delayafterterminate = 0.1
@classmethod
def spawn(
cls, argv, cwd=None, env=None, echo=True, preexec_fn=None,
dimensions=(24, 80)):
'''Start the given command in a child process in a pseudo terminal.
This does all the fork/exec type of stuff for a pty, and returns an
instance of PtyProcess.
If preexec_fn is supplied, it will be called with no arguments in the
child process before exec-ing the specified command.
It may, for instance, set signal handlers to SIG_DFL or SIG_IGN.
Dimensions of the psuedoterminal used for the subprocess can be
specified as a tuple (rows, cols), or the default (24, 80) will be used.
'''
# Note that it is difficult for this method to fail.
# You cannot detect if the child process cannot start.
# So the only way you can tell if the child process started
# or not is to try to read from the file descriptor. If you get
# EOF immediately then it means that the child is already dead.
# That may not necessarily be bad because you may have spawned a child
# that performs some task; creates no stdout output; and then dies.
if not isinstance(argv, (list, tuple)):
raise TypeError("Expected a list or tuple for argv, got %r" % argv)
# Shallow copy of argv so we can modify it
argv = argv[:]
command = argv[0]
command_with_path = which(command)
if command_with_path is None:
raise FileNotFoundError('The command was not found or was not ' +
'executable: %s.' % command)
command = command_with_path
argv[0] = command
# [issue #119] To prevent the case where exec fails and the user is
# stuck interacting with a python child process instead of whatever
# was expected, we implement the solution from
# http://stackoverflow.com/a/3703179 to pass the exception to the
# parent process
# [issue #119] 1. Before forking, open a pipe in the parent process.
exec_err_pipe_read, exec_err_pipe_write = os.pipe()
if use_native_pty_fork:
pid, fd = pty.fork()
else:
# Use internal fork_pty, for Solaris
pid, fd = _fork_pty.fork_pty()
# Some platforms must call setwinsize() and setecho() from the
# child process, and others from the master process. We do both,
# allowing IOError for either.
if pid == CHILD:
# set window size
try:
_setwinsize(STDIN_FILENO, *dimensions)
except IOError as err:
if err.args[0] not in (errno.EINVAL, errno.ENOTTY):
raise
# disable echo if spawn argument echo was unset
if not echo:
try:
_setecho(STDIN_FILENO, False)
except (IOError, termios.error) as err:
if err.args[0] not in (errno.EINVAL, errno.ENOTTY):
raise
# [issue #119] 3. The child closes the reading end and sets the
# close-on-exec flag for the writing end.
os.close(exec_err_pipe_read)
fcntl.fcntl(exec_err_pipe_write, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
# Do not allow child to inherit open file descriptors from parent,
# with the exception of the exec_err_pipe_write of the pipe
# Impose ceiling on max_fd: AIX bugfix for users with unlimited
# nofiles where resource.RLIMIT_NOFILE is 2^63-1 and os.closerange()
# occasionally raises out of range error
max_fd = min(1048576, resource.getrlimit(resource.RLIMIT_NOFILE)[0])
os.closerange(3, exec_err_pipe_write)
os.closerange(exec_err_pipe_write+1, max_fd)
if cwd is not None:
os.chdir(cwd)
if preexec_fn is not None:
try:
preexec_fn()
except Exception as e:
ename = type(e).__name__
tosend = '{}:0:{}'.format(ename, str(e))
if PY3:
tosend = tosend.encode('utf-8')
os.write(exec_err_pipe_write, tosend)
os.close(exec_err_pipe_write)
os._exit(1)
try:
if env is None:
os.execv(command, argv)
else:
os.execvpe(command, argv, env)
except OSError as err:
# [issue #119] 5. If exec fails, the child writes the error
# code back to the parent using the pipe, then exits.
tosend = 'OSError:{}:{}'.format(err.errno, str(err))
if PY3:
tosend = tosend.encode('utf-8')
os.write(exec_err_pipe_write, tosend)
os.close(exec_err_pipe_write)
os._exit(os.EX_OSERR)
# Parent
inst = cls(pid, fd)
# Set some informational attributes
inst.argv = argv
if env is not None:
inst.env = env
if cwd is not None:
inst.launch_dir = cwd
# [issue #119] 2. After forking, the parent closes the writing end
# of the pipe and reads from the reading end.
os.close(exec_err_pipe_write)
exec_err_data = os.read(exec_err_pipe_read, 4096)
os.close(exec_err_pipe_read)
# [issue #119] 6. The parent reads eof (a zero-length read) if the
# child successfully performed exec, since close-on-exec made
# successful exec close the writing end of the pipe. Or, if exec
# failed, the parent reads the error code and can proceed
# accordingly. Either way, the parent blocks until the child calls
# exec.
if len(exec_err_data) != 0:
try:
errclass, errno_s, errmsg = exec_err_data.split(b':', 2)
exctype = getattr(builtins, errclass.decode('ascii'), Exception)
exception = exctype(errmsg.decode('utf-8', 'replace'))
if exctype is OSError:
exception.errno = int(errno_s)
except:
raise Exception('Subprocess failed, got bad error data: %r'
% exec_err_data)
else:
raise exception
try:
inst.setwinsize(*dimensions)
except IOError as err:
if err.args[0] not in (errno.EINVAL, errno.ENOTTY, errno.ENXIO):
raise
return inst
def __repr__(self):
clsname = type(self).__name__
if self.argv is not None:
args = [repr(self.argv)]
if self.env is not None:
args.append("env=%r" % self.env)
if self.launch_dir is not None:
args.append("cwd=%r" % self.launch_dir)
return "{}.spawn({})".format(clsname, ", ".join(args))
else:
return "{}(pid={}, fd={})".format(clsname, self.pid, self.fd)
@staticmethod
def _coerce_send_string(s):
if not isinstance(s, bytes):
return s.encode('utf-8')
return s
@staticmethod
def _coerce_read_string(s):
return s
def __del__(self):
'''This makes sure that no system resources are left open. Python only
garbage collects Python objects. OS file descriptors are not Python
objects, so they must be handled explicitly. If the child file
descriptor was opened outside of this class (passed to the constructor)
then this does not close it. '''
if not self.closed:
# It is possible for __del__ methods to execute during the
# teardown of the Python VM itself. Thus self.close() may
# trigger an exception because os.close may be None.
try:
self.close()
# which exception, shouldn't we catch explicitly .. ?
except:
pass
def fileno(self):
'''This returns the file descriptor of the pty for the child.
'''
return self.fd
def close(self, force=True):
'''This closes the connection with the child application. Note that
calling close() more than once is valid. This emulates standard Python
behavior with files. Set force to True if you want to make sure that
the child is terminated (SIGKILL is sent if the child ignores SIGHUP
and SIGINT). '''
if not self.closed:
self.flush()
self.fileobj.close() # Closes the file descriptor
# Give kernel time to update process status.
time.sleep(self.delayafterclose)
if self.isalive():
if not self.terminate(force):
raise PtyProcessError('Could not terminate the child.')
self.fd = -1
self.closed = True
#self.pid = None
def flush(self):
'''This does nothing. It is here to support the interface for a
File-like object. '''
pass
def isatty(self):
'''This returns True if the file descriptor is open and connected to a
tty(-like) device, else False.
On SVR4-style platforms implementing streams, such as SunOS and HP-UX,
the child pty may not appear as a terminal device. This means
methods such as setecho(), setwinsize(), getwinsize() may raise an
IOError. '''
return os.isatty(self.fd)
def waitnoecho(self, timeout=None):
'''This waits until the terminal ECHO flag is set False. This returns
True if the echo mode is off. This returns False if the ECHO flag was
not set False before the timeout. This can be used to detect when the
child is waiting for a password. Usually a child application will turn
off echo mode when it is waiting for the user to enter a password. For
example, instead of expecting the "password:" prompt you can wait for
the child to set ECHO off::
p = pexpect.spawn('ssh [email protected]')
p.waitnoecho()
p.sendline(mypassword)
If timeout==None then this method to block until ECHO flag is False.
'''
if timeout is not None:
end_time = time.time() + timeout
while True:
if not self.getecho():
return True
if timeout < 0 and timeout is not None:
return False
if timeout is not None:
timeout = end_time - time.time()
time.sleep(0.1)
def getecho(self):
'''This returns the terminal echo mode. This returns True if echo is
on or False if echo is off. Child applications that are expecting you
to enter a password often set ECHO False. See waitnoecho().
Not supported on platforms where ``isatty()`` returns False. '''
try:
attr = termios.tcgetattr(self.fd)
except termios.error as err:
errmsg = 'getecho() may not be called on this platform'
if err.args[0] == errno.EINVAL:
raise IOError(err.args[0], '%s: %s.' % (err.args[1], errmsg))
raise
self.echo = bool(attr[3] & termios.ECHO)
return self.echo
def setecho(self, state):
'''This sets the terminal echo mode on or off. Note that anything the
child sent before the echo will be lost, so you should be sure that
your input buffer is empty before you call setecho(). For example, the
following will work as expected::
p = pexpect.spawn('cat') # Echo is on by default.
p.sendline('1234') # We expect see this twice from the child...
p.expect(['1234']) # ... once from the tty echo...
p.expect(['1234']) # ... and again from cat itself.
p.setecho(False) # Turn off tty echo
p.sendline('abcd') # We will set this only once (echoed by cat).
p.sendline('wxyz') # We will set this only once (echoed by cat)
p.expect(['abcd'])
p.expect(['wxyz'])
The following WILL NOT WORK because the lines sent before the setecho
will be lost::
p = pexpect.spawn('cat')
p.sendline('1234')
p.setecho(False) # Turn off tty echo
p.sendline('abcd') # We will set this only once (echoed by cat).
p.sendline('wxyz') # We will set this only once (echoed by cat)
p.expect(['1234'])
p.expect(['1234'])
p.expect(['abcd'])
p.expect(['wxyz'])
Not supported on platforms where ``isatty()`` returns False.
'''
_setecho(self.fd, state)
self.echo = state
def read(self, size=1024):
"""Read and return at most ``size`` bytes from the pty.
Can block if there is nothing to read. Raises :exc:`EOFError` if the
terminal was closed.
Unlike Pexpect's ``read_nonblocking`` method, this doesn't try to deal
with the vagaries of EOF on platforms that do strange things, like IRIX
or older Solaris systems. It handles the errno=EIO pattern used on
Linux, and the empty-string return used on BSD platforms and (seemingly)
on recent Solaris.
"""
try:
s = self.fileobj.read1(size)
except (OSError, IOError) as err:
if err.args[0] == errno.EIO:
# Linux-style EOF
self.flag_eof = True
raise EOFError('End Of File (EOF). Exception style platform.')
raise
if s == b'':
# BSD-style EOF (also appears to work on recent Solaris (OpenIndiana))
self.flag_eof = True
raise EOFError('End Of File (EOF). Empty string style platform.')
return s
def readline(self):
"""Read one line from the pseudoterminal, and return it as unicode.
Can block if there is nothing to read. Raises :exc:`EOFError` if the
terminal was closed.
"""
try:
s = self.fileobj.readline()
except (OSError, IOError) as err:
if err.args[0] == errno.EIO:
# Linux-style EOF
self.flag_eof = True
raise EOFError('End Of File (EOF). Exception style platform.')
raise
if s == b'':
# BSD-style EOF (also appears to work on recent Solaris (OpenIndiana))
self.flag_eof = True
raise EOFError('End Of File (EOF). Empty string style platform.')
return s
def _writeb(self, b, flush=True):
n = self.fileobj.write(b)
if flush:
self.fileobj.flush()
return n
def write(self, s, flush=True):
"""Write bytes to the pseudoterminal.
Returns the number of bytes written.
"""
return self._writeb(s, flush=flush)
def sendcontrol(self, char):
'''Helper method that wraps send() with mnemonic access for sending control
character to the child (such as Ctrl-C or Ctrl-D). For example, to send
Ctrl-G (ASCII 7, bell, '\a')::
child.sendcontrol('g')
See also, sendintr() and sendeof().
'''
char = char.lower()
a = ord(char)
if 97 <= a <= 122:
a = a - ord('a') + 1
byte = _byte(a)
return self._writeb(byte), byte
d = {'@': 0, '`': 0,
'[': 27, '{': 27,
'\\': 28, '|': 28,
']': 29, '}': 29,
'^': 30, '~': 30,
'_': 31,
'?': 127}
if char not in d:
return 0, b''
byte = _byte(d[char])
return self._writeb(byte), byte
def sendeof(self):
'''This sends an EOF to the child. This sends a character which causes
the pending parent output buffer to be sent to the waiting child
program without waiting for end-of-line. If it is the first character
of the line, the read() in the user program returns 0, which signifies
end-of-file. This means to work as expected a sendeof() has to be
called at the beginning of a line. This method does not send a newline.
It is the responsibility of the caller to ensure the eof is sent at the
beginning of a line. '''
return self._writeb(_EOF), _EOF
def sendintr(self):
'''This sends a SIGINT to the child. It does not require
the SIGINT to be the first character on a line. '''
return self._writeb(_INTR), _INTR
def eof(self):
'''This returns True if the EOF exception was ever raised.
'''
return self.flag_eof
def terminate(self, force=False):
'''This forces a child process to terminate. It starts nicely with
SIGHUP and SIGINT. If "force" is True then moves onto SIGKILL. This
returns True if the child was terminated. This returns False if the
child could not be terminated. '''
if not self.isalive():
return True
try:
self.kill(signal.SIGHUP)
time.sleep(self.delayafterterminate)
if not self.isalive():
return True
self.kill(signal.SIGCONT)
time.sleep(self.delayafterterminate)
if not self.isalive():
return True
self.kill(signal.SIGINT)
time.sleep(self.delayafterterminate)
if not self.isalive():
return True
if force:
self.kill(signal.SIGKILL)
time.sleep(self.delayafterterminate)
if not self.isalive():
return True
else:
return False
return False
except OSError:
# I think there are kernel timing issues that sometimes cause
# this to happen. I think isalive() reports True, but the
# process is dead to the kernel.
# Make one last attempt to see if the kernel is up to date.
time.sleep(self.delayafterterminate)
if not self.isalive():
return True
else:
return False
def wait(self):
'''This waits until the child exits. This is a blocking call. This will
not read any data from the child, so this will block forever if the
child has unread output and has terminated. In other words, the child
may have printed output then called exit(), but, the child is
technically still alive until its output is read by the parent. '''
if self.isalive():
pid, status = os.waitpid(self.pid, 0)
else:
return self.exitstatus
self.exitstatus = os.WEXITSTATUS(status)
if os.WIFEXITED(status):
self.status = status
self.exitstatus = os.WEXITSTATUS(status)
self.signalstatus = None
self.terminated = True
elif os.WIFSIGNALED(status):
self.status = status
self.exitstatus = None
self.signalstatus = os.WTERMSIG(status)
self.terminated = True
elif os.WIFSTOPPED(status): # pragma: no cover
# You can't call wait() on a child process in the stopped state.
raise PtyProcessError('Called wait() on a stopped child ' +
'process. This is not supported. Is some other ' +
'process attempting job control with our child pid?')
return self.exitstatus
def isalive(self):
'''This tests if the child process is running or not. This is
non-blocking. If the child was terminated then this will read the
exitstatus or signalstatus of the child. This returns True if the child
process appears to be running or False if not. It can take literally
SECONDS for Solaris to return the right status. '''
if self.terminated:
return False
if self.flag_eof:
# This is for Linux, which requires the blocking form
# of waitpid to get the status of a defunct process.
# This is super-lame. The flag_eof would have been set
# in read_nonblocking(), so this should be safe.
waitpid_options = 0
else:
waitpid_options = os.WNOHANG
try:
pid, status = os.waitpid(self.pid, waitpid_options)
except OSError as e:
# No child processes
if e.errno == errno.ECHILD:
raise PtyProcessError('isalive() encountered condition ' +
'where "terminated" is 0, but there was no child ' +
'process. Did someone else call waitpid() ' +
'on our process?')
else:
raise
# I have to do this twice for Solaris.
# I can't even believe that I figured this out...
# If waitpid() returns 0 it means that no child process
# wishes to report, and the value of status is undefined.
if pid == 0:
try:
### os.WNOHANG) # Solaris!
pid, status = os.waitpid(self.pid, waitpid_options)
except OSError as e: # pragma: no cover
# This should never happen...
if e.errno == errno.ECHILD:
raise PtyProcessError('isalive() encountered condition ' +
'that should never happen. There was no child ' +
'process. Did someone else call waitpid() ' +
'on our process?')
else:
raise
# If pid is still 0 after two calls to waitpid() then the process
# really is alive. This seems to work on all platforms, except for
# Irix which seems to require a blocking call on waitpid or select,
# so I let read_nonblocking take care of this situation
# (unfortunately, this requires waiting through the timeout).
if pid == 0:
return True
if pid == 0:
return True
if os.WIFEXITED(status):
self.status = status
self.exitstatus = os.WEXITSTATUS(status)
self.signalstatus = None
self.terminated = True
elif os.WIFSIGNALED(status):
self.status = status
self.exitstatus = None
self.signalstatus = os.WTERMSIG(status)
self.terminated = True
elif os.WIFSTOPPED(status):
raise PtyProcessError('isalive() encountered condition ' +
'where child process is stopped. This is not ' +
'supported. Is some other process attempting ' +
'job control with our child pid?')
return False
def kill(self, sig):
"""Send the given signal to the child application.
In keeping with UNIX tradition it has a misleading name. It does not
necessarily kill the child unless you send the right signal. See the
:mod:`signal` module for constants representing signal numbers.
"""
# Same as os.kill, but the pid is given for you.
if self.isalive():
os.kill(self.pid, sig)
def getwinsize(self):
"""Return the window size of the pseudoterminal as a tuple (rows, cols).
"""
TIOCGWINSZ = getattr(termios, 'TIOCGWINSZ', 1074295912)
s = struct.pack('HHHH', 0, 0, 0, 0)
x = fcntl.ioctl(self.fd, TIOCGWINSZ, s)
return struct.unpack('HHHH', x)[0:2]
def setwinsize(self, rows, cols):
"""Set the terminal window size of the child tty.
This will cause a SIGWINCH signal to be sent to the child. This does not
change the physical window size. It changes the size reported to
TTY-aware applications like vi or curses -- applications that respond to
the SIGWINCH signal.
"""
return _setwinsize(self.fd, rows, cols)
class PtyProcessUnicode(PtyProcess):
"""Unicode wrapper around a process running in a pseudoterminal.
This class exposes a similar interface to :class:`PtyProcess`, but its read
methods return unicode, and its :meth:`write` accepts unicode.
"""
if PY3:
string_type = str
else:
string_type = unicode # analysis:ignore
def __init__(self, pid, fd, encoding='utf-8', codec_errors='strict'):
super(PtyProcessUnicode, self).__init__(pid, fd)
self.encoding = encoding
self.codec_errors = codec_errors
self.decoder = codecs.getincrementaldecoder(encoding)(errors=codec_errors)
def read(self, size=1024):
"""Read at most ``size`` bytes from the pty, return them as unicode.
Can block if there is nothing to read. Raises :exc:`EOFError` if the
terminal was closed.
The size argument still refers to bytes, not unicode code points.
"""
b = super(PtyProcessUnicode, self).read(size)
return self.decoder.decode(b, final=False)
def readline(self):
"""Read one line from the pseudoterminal, and return it as unicode.
Can block if there is nothing to read. Raises :exc:`EOFError` if the
terminal was closed.
"""
b = super(PtyProcessUnicode, self).readline()
return self.decoder.decode(b, final=False)
def write(self, s):
"""Write the unicode string ``s`` to the pseudoterminal.
Returns the number of bytes written.
"""
b = s.encode(self.encoding)
return super(PtyProcessUnicode, self).write(b)
|