1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
| '''This class extends pexpect.spawn to specialize setting up SSH connections.
This adds methods for login, logout, and expecting the shell prompt.
PEXPECT LICENSE
This license is approved by the OSI and FSF as GPL-compatible.
http://opensource.org/licenses/isc-license.txt
Copyright (c) 2012, Noah Spurrier <[email protected]>
PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
'''
from pexpect import ExceptionPexpect, TIMEOUT, EOF, spawn
import time
import os
import sys
import re
__all__ = ['ExceptionPxssh', 'pxssh']
# Exception classes used by this module.
class ExceptionPxssh(ExceptionPexpect):
'''Raised for pxssh exceptions.
'''
if sys.version_info > (3, 0):
from shlex import quote
else:
_find_unsafe = re.compile(r'[^\w@%+=:,./-]').search
def quote(s):
"""Return a shell-escaped version of the string *s*."""
if not s:
return "''"
if _find_unsafe(s) is None:
return s
# use single quotes, and put single quotes into double quotes
# the string $'b is then quoted as '$'"'"'b'
return "'" + s.replace("'", "'\"'\"'") + "'"
class pxssh (spawn):
'''This class extends pexpect.spawn to specialize setting up SSH
connections. This adds methods for login, logout, and expecting the shell
prompt. It does various tricky things to handle many situations in the SSH
login process. For example, if the session is your first login, then pxssh
automatically accepts the remote certificate; or if you have public key
authentication setup then pxssh won't wait for the password prompt.
pxssh uses the shell prompt to synchronize output from the remote host. In
order to make this more robust it sets the shell prompt to something more
unique than just $ or #. This should work on most Borne/Bash or Csh style
shells.
Example that runs a few commands on a remote server and prints the result::
from pexpect import pxssh
import getpass
try:
s = pxssh.pxssh()
hostname = raw_input('hostname: ')
username = raw_input('username: ')
password = getpass.getpass('password: ')
s.login(hostname, username, password)
s.sendline('uptime') # run a command
s.prompt() # match the prompt
print(s.before) # print everything before the prompt.
s.sendline('ls -l')
s.prompt()
print(s.before)
s.sendline('df')
s.prompt()
print(s.before)
s.logout()
except pxssh.ExceptionPxssh as e:
print("pxssh failed on login.")
print(e)
Example showing how to specify SSH options::
from pexpect import pxssh
s = pxssh.pxssh(options={
"StrictHostKeyChecking": "no",
"UserKnownHostsFile": "/dev/null"})
...
Note that if you have ssh-agent running while doing development with pxssh
then this can lead to a lot of confusion. Many X display managers (xdm,
gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI
dialog box popup asking for a password during development. You should turn
off any key agents during testing. The 'force_password' attribute will turn
off public key authentication. This will only work if the remote SSH server
is configured to allow password logins. Example of using 'force_password'
attribute::
s = pxssh.pxssh()
s.force_password = True
hostname = raw_input('hostname: ')
username = raw_input('username: ')
password = getpass.getpass('password: ')
s.login (hostname, username, password)
`debug_command_string` is only for the test suite to confirm that the string
generated for SSH is correct, using this will not allow you to do
anything other than get a string back from `pxssh.pxssh.login()`.
'''
def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None,
logfile=None, cwd=None, env=None, ignore_sighup=True, echo=True,
options={}, encoding=None, codec_errors='strict',
debug_command_string=False):
spawn.__init__(self, None, timeout=timeout, maxread=maxread,
searchwindowsize=searchwindowsize, logfile=logfile,
cwd=cwd, env=env, ignore_sighup=ignore_sighup, echo=echo,
encoding=encoding, codec_errors=codec_errors)
self.name = '<pxssh>'
#SUBTLE HACK ALERT! Note that the command that SETS the prompt uses a
#slightly different string than the regular expression to match it. This
#is because when you set the prompt the command will echo back, but we
#don't want to match the echoed command. So if we make the set command
#slightly different than the regex we eliminate the problem. To make the
#set command different we add a backslash in front of $. The $ doesn't
#need to be escaped, but it doesn't hurt and serves to make the set
#prompt command different than the regex.
# used to match the command-line prompt
self.UNIQUE_PROMPT = r"\[PEXPECT\][\$\#] "
self.PROMPT = self.UNIQUE_PROMPT
# used to set shell command-line prompt to UNIQUE_PROMPT.
self.PROMPT_SET_SH = r"PS1='[PEXPECT]\$ '"
self.PROMPT_SET_CSH = r"set prompt='[PEXPECT]\$ '"
self.SSH_OPTS = ("-o'RSAAuthentication=no'"
+ " -o 'PubkeyAuthentication=no'")
# Disabling host key checking, makes you vulnerable to MITM attacks.
# + " -o 'StrictHostKeyChecking=no'"
# + " -o 'UserKnownHostsFile /dev/null' ")
# Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from
# displaying a GUI password dialog. I have not figured out how to
# disable only SSH_ASKPASS without also disabling X11 forwarding.
# Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
#self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
self.force_password = False
self.debug_command_string = debug_command_string
# User defined SSH options, eg,
# ssh.otions = dict(StrictHostKeyChecking="no",UserKnownHostsFile="/dev/null")
self.options = options
def levenshtein_distance(self, a, b):
'''This calculates the Levenshtein distance between a and b.
'''
n, m = len(a), len(b)
if n > m:
a,b = b,a
n,m = m,n
current = range(n+1)
for i in range(1,m+1):
previous, current = current, [i]+[0]*n
for j in range(1,n+1):
add, delete = previous[j]+1, current[j-1]+1
change = previous[j-1]
if a[j-1] != b[i-1]:
change = change + 1
current[j] = min(add, delete, change)
return current[n]
def try_read_prompt(self, timeout_multiplier):
'''This facilitates using communication timeouts to perform
synchronization as quickly as possible, while supporting high latency
connections with a tunable worst case performance. Fast connections
should be read almost immediately. Worst case performance for this
method is timeout_multiplier * 3 seconds.
'''
# maximum time allowed to read the first response
first_char_timeout = timeout_multiplier * 0.5
# maximum time allowed between subsequent characters
inter_char_timeout = timeout_multiplier * 0.1
# maximum time for reading the entire prompt
total_timeout = timeout_multiplier * 3.0
prompt = self.string_type()
begin = time.time()
expired = 0.0
timeout = first_char_timeout
while expired < total_timeout:
try:
prompt += self.read_nonblocking(size=1, timeout=timeout)
expired = time.time() - begin # updated total time expired
timeout = inter_char_timeout
except TIMEOUT:
break
return prompt
def sync_original_prompt (self, sync_multiplier=1.0):
'''This attempts to find the prompt. Basically, press enter and record
the response; press enter again and record the response; if the two
responses are similar then assume we are at the original prompt.
This can be a slow function. Worst case with the default sync_multiplier
can take 12 seconds. Low latency connections are more likely to fail
with a low sync_multiplier. Best case sync time gets worse with a
high sync multiplier (500 ms with default). '''
# All of these timing pace values are magic.
# I came up with these based on what seemed reliable for
# connecting to a heavily loaded machine I have.
self.sendline()
time.sleep(0.1)
try:
# Clear the buffer before getting the prompt.
self.try_read_prompt(sync_multiplier)
except TIMEOUT:
pass
self.sendline()
x = self.try_read_prompt(sync_multiplier)
self.sendline()
a = self.try_read_prompt(sync_multiplier)
self.sendline()
b = self.try_read_prompt(sync_multiplier)
ld = self.levenshtein_distance(a,b)
len_a = len(a)
if len_a == 0:
return False
if float(ld)/len_a < 0.4:
return True
return False
### TODO: This is getting messy and I'm pretty sure this isn't perfect.
### TODO: I need to draw a flow chart for this.
### TODO: Unit tests for SSH tunnels, remote SSH command exec, disabling original prompt sync
def login (self, server, username, password='', terminal_type='ansi',
original_prompt=r"[#$]", login_timeout=10, port=None,
auto_prompt_reset=True, ssh_key=None, quiet=True,
sync_multiplier=1, check_local_ip=True,
password_regex=r'(?i)(?:password:)|(?:passphrase for key)',
ssh_tunnels={}, spawn_local_ssh=True,
sync_original_prompt=True, ssh_config=None):
'''This logs the user into the given server.
It uses
'original_prompt' to try to find the prompt right after login. When it
finds the prompt it immediately tries to reset the prompt to something
more easily matched. The default 'original_prompt' is very optimistic
and is easily fooled. It's more reliable to try to match the original
prompt as exactly as possible to prevent false matches by server
strings such as the "Message Of The Day". On many systems you can
disable the MOTD on the remote server by creating a zero-length file
called :file:`~/.hushlogin` on the remote server. If a prompt cannot be found
then this will not necessarily cause the login to fail. In the case of
a timeout when looking for the prompt we assume that the original
prompt was so weird that we could not match it, so we use a few tricks
to guess when we have reached the prompt. Then we hope for the best and
blindly try to reset the prompt to something more unique. If that fails
then login() raises an :class:`ExceptionPxssh` exception.
In some situations it is not possible or desirable to reset the
original prompt. In this case, pass ``auto_prompt_reset=False`` to
inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh
uses a unique prompt in the :meth:`prompt` method. If the original prompt is
not reset then this will disable the :meth:`prompt` method unless you
manually set the :attr:`PROMPT` attribute.
Set ``password_regex`` if there is a MOTD message with `password` in it.
Changing this is like playing in traffic, don't (p)expect it to match straight
away.
If you require to connect to another SSH server from the your original SSH
connection set ``spawn_local_ssh`` to `False` and this will use your current
session to do so. Setting this option to `False` and not having an active session
will trigger an error.
Set ``ssh_key`` to a file path to an SSH private key to use that SSH key
for the session authentication.
Set ``ssh_key`` to `True` to force passing the current SSH authentication socket
to the desired ``hostname``.
Set ``ssh_config`` to a file path string of an SSH client config file to pass that
file to the client to handle itself. You may set any options you wish in here, however
doing so will require you to post extra information that you may not want to if you
run into issues.
'''
session_regex_array = ["(?i)are you sure you want to continue connecting", original_prompt, password_regex, "(?i)permission denied", "(?i)terminal type", TIMEOUT]
session_init_regex_array = []
session_init_regex_array.extend(session_regex_array)
session_init_regex_array.extend(["(?i)connection closed by remote host", EOF])
ssh_options = ''.join([" -o '%s=%s'" % (o, v) for (o, v) in self.options.items()])
if quiet:
ssh_options = ssh_options + ' -q'
if not check_local_ip:
ssh_options = ssh_options + " -o'NoHostAuthenticationForLocalhost=yes'"
if self.force_password:
ssh_options = ssh_options + ' ' + self.SSH_OPTS
if ssh_config is not None:
if spawn_local_ssh and not os.path.isfile(ssh_config):
raise ExceptionPxssh('SSH config does not exist or is not a file.')
ssh_options = ssh_options + '-F ' + ssh_config
if port is not None:
ssh_options = ssh_options + ' -p %s'%(str(port))
if ssh_key is not None:
# Allow forwarding our SSH key to the current session
if ssh_key==True:
ssh_options = ssh_options + ' -A'
else:
if spawn_local_ssh and not os.path.isfile(ssh_key):
raise ExceptionPxssh('private ssh key does not exist or is not a file.')
ssh_options = ssh_options + ' -i %s' % (ssh_key)
# SSH tunnels, make sure you know what you're putting into the lists
# under each heading. Do not expect these to open 100% of the time,
# The port you're requesting might be bound.
#
# The structure should be like this:
# { 'local': ['2424:localhost:22'], # Local SSH tunnels
# 'remote': ['2525:localhost:22'], # Remote SSH tunnels
# 'dynamic': [8888] } # Dynamic/SOCKS tunnels
if ssh_tunnels!={} and isinstance({},type(ssh_tunnels)):
tunnel_types = {
'local':'L',
'remote':'R',
'dynamic':'D'
}
for tunnel_type in tunnel_types:
cmd_type = tunnel_types[tunnel_type]
if tunnel_type in ssh_tunnels:
tunnels = ssh_tunnels[tunnel_type]
for tunnel in tunnels:
if spawn_local_ssh==False:
tunnel = quote(str(tunnel))
ssh_options = ssh_options + ' -' + cmd_type + ' ' + str(tunnel)
cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
if self.debug_command_string:
return(cmd)
# Are we asking for a local ssh command or to spawn one in another session?
if spawn_local_ssh:
spawn._spawn(self, cmd)
else:
self.sendline(cmd)
# This does not distinguish between a remote server 'password' prompt
# and a local ssh 'passphrase' prompt (for unlocking a private key).
i = self.expect(session_init_regex_array, timeout=login_timeout)
# First phase
if i==0:
# New certificate -- always accept it.
# This is what you get if SSH does not have the remote host's
# public key stored in the 'known_hosts' cache.
self.sendline("yes")
i = self.expect(session_regex_array)
if i==2: # password or passphrase
self.sendline(password)
i = self.expect(session_regex_array)
if i==4:
self.sendline(terminal_type)
i = self.expect(session_regex_array)
if i==7:
self.close()
raise ExceptionPxssh('Could not establish connection to host')
# Second phase
if i==0:
# This is weird. This should not happen twice in a row.
self.close()
raise ExceptionPxssh('Weird error. Got "are you sure" prompt twice.')
elif i==1: # can occur if you have a public key pair set to authenticate.
### TODO: May NOT be OK if expect() got tricked and matched a false prompt.
pass
elif i==2: # password prompt again
# For incorrect passwords, some ssh servers will
# ask for the password again, others return 'denied' right away.
# If we get the password prompt again then this means
# we didn't get the password right the first time.
self.close()
raise ExceptionPxssh('password refused')
elif i==3: # permission denied -- password was bad.
self.close()
raise ExceptionPxssh('permission denied')
elif i==4: # terminal type again? WTF?
self.close()
raise ExceptionPxssh('Weird error. Got "terminal type" prompt twice.')
elif i==5: # Timeout
#This is tricky... I presume that we are at the command-line prompt.
#It may be that the shell prompt was so weird that we couldn't match
#it. Or it may be that we couldn't log in for some other reason. I
#can't be sure, but it's safe to guess that we did login because if
#I presume wrong and we are not logged in then this should be caught
#later when I try to set the shell prompt.
pass
elif i==6: # Connection closed by remote host
self.close()
raise ExceptionPxssh('connection closed')
else: # Unexpected
self.close()
raise ExceptionPxssh('unexpected login response')
if sync_original_prompt:
if not self.sync_original_prompt(sync_multiplier):
self.close()
raise ExceptionPxssh('could not synchronize with original prompt')
# We appear to be in.
# set shell prompt to something unique.
if auto_prompt_reset:
if not self.set_unique_prompt():
self.close()
raise ExceptionPxssh('could not set shell prompt '
'(received: %r, expected: %r).' % (
self.before, self.PROMPT,))
return True
def logout (self):
'''Sends exit to the remote shell.
If there are stopped jobs then this automatically sends exit twice.
'''
self.sendline("exit")
index = self.expect([EOF, "(?i)there are stopped jobs"])
if index==1:
self.sendline("exit")
self.expect(EOF)
self.close()
def prompt(self, timeout=-1):
'''Match the next shell prompt.
This is little more than a short-cut to the :meth:`~pexpect.spawn.expect`
method. Note that if you called :meth:`login` with
``auto_prompt_reset=False``, then before calling :meth:`prompt` you must
set the :attr:`PROMPT` attribute to a regex that it will use for
matching the prompt.
Calling :meth:`prompt` will erase the contents of the :attr:`before`
attribute even if no prompt is ever matched. If timeout is not given or
it is set to -1 then self.timeout is used.
:return: True if the shell prompt was matched, False if the timeout was
reached.
'''
if timeout == -1:
timeout = self.timeout
i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout)
if i==1:
return False
return True
def set_unique_prompt(self):
'''This sets the remote prompt to something more unique than ``#`` or ``$``.
This makes it easier for the :meth:`prompt` method to match the shell prompt
unambiguously. This method is called automatically by the :meth:`login`
method, but you may want to call it manually if you somehow reset the
shell prompt. For example, if you 'su' to a different user then you
will need to manually reset the prompt. This sends shell commands to
the remote host to set the prompt, so this assumes the remote host is
ready to receive commands.
Alternatively, you may use your own prompt pattern. In this case you
should call :meth:`login` with ``auto_prompt_reset=False``; then set the
:attr:`PROMPT` attribute to a regular expression. After that, the
:meth:`prompt` method will try to match your prompt pattern.
'''
self.sendline("unset PROMPT_COMMAND")
self.sendline(self.PROMPT_SET_SH) # sh-style
i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
if i == 0: # csh-style
self.sendline(self.PROMPT_SET_CSH)
i = self.expect([TIMEOUT, self.PROMPT], timeout=10)
if i == 0:
return False
return True
# vi:ts=4:sw=4:expandtab:ft=python:
|