Module netmiko.scp_handler
Netmiko SCP operations.
Supports file get and file put operations.
SCP requires a separate SSH connection for a control channel.
Expand source code
"""
Netmiko SCP operations.
Supports file get and file put operations.
SCP requires a separate SSH connection for a control channel.
"""
from typing import Callable, Optional, Any, Type
from typing import TYPE_CHECKING
from types import TracebackType
import re
import os
import hashlib
import scp
import sys
if TYPE_CHECKING:
from netmiko.base_connection import BaseConnection
class SCPConn(object):
"""
Establish a secure copy channel to the remote network device.
Must close the SCP connection to get the file to write to the remote filesystem
"""
def __init__(
self,
ssh_conn: "BaseConnection",
socket_timeout: float = 10.0,
progress: Optional[Callable[..., Any]] = None,
progress4: Optional[Callable[..., Any]] = None,
) -> None:
self.ssh_ctl_chan = ssh_conn
self.socket_timeout = socket_timeout
self.progress = progress
self.progress4 = progress4
self.establish_scp_conn()
def establish_scp_conn(self) -> None:
"""Establish the secure copy connection."""
ssh_connect_params = self.ssh_ctl_chan._connect_params_dict()
self.scp_conn = self.ssh_ctl_chan._build_ssh_client()
self.scp_conn.connect(**ssh_connect_params)
self.scp_client = scp.SCPClient(
self.scp_conn.get_transport(),
socket_timeout=self.socket_timeout,
progress=self.progress,
progress4=self.progress4,
)
def scp_transfer_file(self, source_file: str, dest_file: str) -> None:
"""Put file using SCP (for backwards compatibility)."""
self.scp_client.put(source_file, dest_file)
def scp_get_file(self, source_file: str, dest_file: str) -> None:
"""Get file using SCP."""
self.scp_client.get(source_file, dest_file)
def scp_put_file(self, source_file: str, dest_file: str) -> None:
"""Put file using SCP."""
self.scp_client.put(source_file, dest_file)
def close(self) -> None:
"""Close the SCP connection."""
self.scp_conn.close()
class BaseFileTransfer(object):
"""Class to manage SCP file transfer and associated SSH control channel."""
def __init__(
self,
ssh_conn: "BaseConnection",
source_file: str,
dest_file: str,
file_system: Optional[str] = None,
direction: str = "put",
socket_timeout: float = 10.0,
progress: Optional[Callable[..., Any]] = None,
progress4: Optional[Callable[..., Any]] = None,
hash_supported: bool = True,
) -> None:
self.ssh_ctl_chan = ssh_conn
self.source_file = source_file
self.dest_file = dest_file
self.direction = direction
self.socket_timeout = socket_timeout
self.progress = progress
self.progress4 = progress4
auto_flag = (
"cisco_ios" in ssh_conn.device_type
or "cisco_xe" in ssh_conn.device_type
or "cisco_xr" in ssh_conn.device_type
)
if not file_system:
if auto_flag:
self.file_system = self.ssh_ctl_chan._autodetect_fs()
else:
raise ValueError("Destination file system not specified")
else:
self.file_system = file_system
if direction == "put":
self.source_md5 = self.file_md5(source_file) if hash_supported else None
self.file_size = os.stat(source_file).st_size
elif direction == "get":
self.source_md5 = (
self.remote_md5(remote_file=source_file) if hash_supported else None
)
self.file_size = self.remote_file_size(remote_file=source_file)
else:
raise ValueError("Invalid direction specified")
def __enter__(self) -> "BaseFileTransfer":
"""Context manager setup"""
self.establish_scp_conn()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
"""Context manager cleanup."""
self.close_scp_chan()
def establish_scp_conn(self) -> None:
"""Establish SCP connection."""
self.scp_conn = SCPConn(
self.ssh_ctl_chan,
socket_timeout=self.socket_timeout,
progress=self.progress,
progress4=self.progress4,
)
def close_scp_chan(self) -> None:
"""Close the SCP connection to the remote network device."""
self.scp_conn.close()
del self.scp_conn
def remote_space_available(self, search_pattern: str = r"(\d+) \w+ free") -> int:
"""Return space available on remote device."""
remote_cmd = f"dir {self.file_system}"
remote_output = self.ssh_ctl_chan._send_command_str(remote_cmd)
match = re.search(search_pattern, remote_output)
if match:
if "kbytes" in match.group(0) or "Kbytes" in match.group(0):
return int(match.group(1)) * 1000
return int(match.group(1))
else:
msg = (
f"pattern: {search_pattern} not detected in output:\n\n{remote_output}"
)
raise ValueError(msg)
def _remote_space_available_unix(self, search_pattern: str = "") -> int:
"""Return space available on *nix system (BSD/Linux)."""
self.ssh_ctl_chan._enter_shell()
remote_cmd = f"/bin/df -k {self.file_system}"
remote_output = self.ssh_ctl_chan._send_command_str(
remote_cmd, expect_string=r"[\$#]"
)
# Try to ensure parsing is correct:
# Filesystem 1K-blocks Used Avail Capacity Mounted on
# /dev/bo0s3f 1264808 16376 1147248 1% /cf/var
remote_output = remote_output.strip()
output_lines = remote_output.splitlines()
# First line is the header; second is the actual file system info
header_line = output_lines[0]
filesystem_line = output_lines[1]
if "Filesystem" not in header_line or "Avail" not in header_line.split()[3]:
# Filesystem 1K-blocks Used Avail Capacity Mounted on
msg = "Parsing error, unexpected output from {}:\n{}".format(
remote_cmd, remote_output
)
raise ValueError(msg)
space_available = filesystem_line.split()[3]
if not re.search(r"^\d+$", space_available):
msg = "Parsing error, unexpected output from {}:\n{}".format(
remote_cmd, remote_output
)
raise ValueError(msg)
self.ssh_ctl_chan._return_cli()
return int(space_available) * 1024
def local_space_available(self) -> int:
"""Return space available on local filesystem."""
if sys.platform == "win32":
import ctypes
free_bytes = ctypes.c_ulonglong(0)
ctypes.windll.kernel32.GetDiskFreeSpaceExW(
ctypes.c_wchar_p("."), None, None, ctypes.pointer(free_bytes)
)
return free_bytes.value
else:
destination_stats = os.statvfs(".")
return destination_stats.f_bsize * destination_stats.f_bavail
def verify_space_available(self, search_pattern: str = r"(\d+) \w+ free") -> bool:
"""Verify sufficient space is available on destination file system (return boolean)."""
if self.direction == "put":
space_avail = self.remote_space_available(search_pattern=search_pattern)
elif self.direction == "get":
space_avail = self.local_space_available()
if space_avail > self.file_size:
return True
return False
def check_file_exists(self, remote_cmd: str = "") -> bool:
"""Check if the dest_file already exists on the file system (return boolean)."""
if self.direction == "put":
if not remote_cmd:
remote_cmd = f"dir {self.file_system}/{self.dest_file}"
remote_out = self.ssh_ctl_chan._send_command_str(remote_cmd)
search_string = r"Directory of .*{0}".format(self.dest_file)
if (
"Error opening" in remote_out
or "No such file or directory" in remote_out
or "Path does not exist" in remote_out
):
return False
elif re.search(search_string, remote_out, flags=re.DOTALL):
return True
else:
raise ValueError("Unexpected output from check_file_exists")
elif self.direction == "get":
return os.path.exists(self.dest_file)
else:
raise ValueError("Unexpected value for self.direction")
def _check_file_exists_unix(self, remote_cmd: str = "") -> bool:
"""Check if the dest_file already exists on the file system (return boolean)."""
if self.direction == "put":
self.ssh_ctl_chan._enter_shell()
remote_cmd = f"/bin/ls {self.file_system}/{self.dest_file} 2> /dev/null"
remote_out = self.ssh_ctl_chan._send_command_str(
remote_cmd, expect_string=r"[\$#]"
)
self.ssh_ctl_chan._return_cli()
return self.dest_file in remote_out
elif self.direction == "get":
return os.path.exists(self.dest_file)
else:
raise ValueError("Unexpected value for self.direction")
def remote_file_size(
self, remote_cmd: str = "", remote_file: Optional[str] = None
) -> int:
"""Get the file size of the remote file."""
if remote_file is None:
if self.direction == "put":
remote_file = self.dest_file
elif self.direction == "get":
remote_file = self.source_file
if not remote_cmd:
remote_cmd = f"dir {self.file_system}/{remote_file}"
remote_out = self.ssh_ctl_chan._send_command_str(remote_cmd)
# Strip out "Directory of flash:/filename line
remote_out_lines = re.split(r"Directory of .*", remote_out)
remote_out = "".join(remote_out_lines)
# Match line containing file name
assert isinstance(remote_file, str)
remote_file_base = os.path.basename(remote_file)
escape_file_name = re.escape(remote_file_base)
pattern = r".*({}).*".format(escape_file_name)
match = re.search(pattern, remote_out)
if match:
line = match.group(0)
# Format will be 26 -rw- 6738 Jul 30 2016 19:49:50 -07:00 filename
file_size = line.split()[2]
else:
raise IOError("Unable to parse 'dir' output in remote_file_size method")
if "Error opening" in remote_out or "No such file or directory" in remote_out:
raise IOError("Unable to find file on remote system")
else:
return int(file_size)
def _remote_file_size_unix(
self, remote_cmd: str = "", remote_file: Optional[str] = None
) -> int:
"""Get the file size of the remote file."""
if remote_file is None:
if self.direction == "put":
remote_file = self.dest_file
elif self.direction == "get":
remote_file = self.source_file
remote_file = f"{self.file_system}/{remote_file}"
if not remote_cmd:
remote_cmd = f"/bin/ls -l {remote_file}"
self.ssh_ctl_chan._enter_shell()
remote_out = self.ssh_ctl_chan._send_command_str(
remote_cmd, expect_string=r"[\$#]"
)
self.ssh_ctl_chan._return_cli()
if "No such file or directory" in remote_out:
raise IOError("Unable to find file on remote system")
escape_file_name = re.escape(remote_file)
pattern = r"^.* ({}).*$".format(escape_file_name)
match = re.search(pattern, remote_out, flags=re.M)
if match:
# Format: -rw-r--r-- 1 pyclass wheel 12 Nov 5 19:07 /var/tmp/test3.txt
line = match.group(0)
file_size = line.split()[4]
return int(file_size)
raise ValueError(
"Search pattern not found for remote file size during SCP transfer."
)
def file_md5(self, file_name: str, add_newline: bool = False) -> str:
"""Compute MD5 hash of file.
add_newline is needed to support Cisco IOS MD5 calculation which expects the newline in
the string
Args:
file_name: name of file to get md5 digest of
add_newline: add newline to end of file contents or not
"""
file_hash = hashlib.md5()
with open(file_name, "rb") as f:
while True:
file_contents = f.read(512)
if not file_contents:
if add_newline:
file_contents + b"\n"
break
file_hash.update(file_contents)
return file_hash.hexdigest()
@staticmethod
def process_md5(md5_output: str, pattern: str = r"=\s+(\S+)") -> str:
"""
Process the string to retrieve the MD5 hash
Output from Cisco IOS (ASA is similar)
.MD5 of flash:file_name Done!
verify /md5 (flash:file_name) = 410db2a7015eaa42b1fe71f1bf3d59a2
"""
match = re.search(pattern, md5_output)
if match:
return match.group(1)
else:
raise ValueError(f"Invalid output from MD5 command: {md5_output}")
def compare_md5(self) -> bool:
"""Compare md5 of file on network device to md5 of local file."""
if self.direction == "put":
remote_md5 = self.remote_md5()
return self.source_md5 == remote_md5
elif self.direction == "get":
local_md5 = self.file_md5(self.dest_file)
return self.source_md5 == local_md5
else:
raise ValueError("Unexpected value for self.direction")
def remote_md5(
self, base_cmd: str = "verify /md5", remote_file: Optional[str] = None
) -> str:
"""Calculate remote MD5 and returns the hash.
This command can be CPU intensive on the remote device.
"""
if remote_file is None:
if self.direction == "put":
remote_file = self.dest_file
elif self.direction == "get":
remote_file = self.source_file
remote_md5_cmd = f"{base_cmd} {self.file_system}/{remote_file}"
dest_md5 = self.ssh_ctl_chan._send_command_str(remote_md5_cmd, read_timeout=300)
dest_md5 = self.process_md5(dest_md5)
return dest_md5
def transfer_file(self) -> None:
"""SCP transfer file."""
if self.direction == "put":
self.put_file()
elif self.direction == "get":
self.get_file()
else:
raise ValueError("Unexpected value for self.direction in transfer_file")
def get_file(self) -> None:
"""SCP copy the file from the remote device to local system."""
source_file = f"{self.file_system}/{self.source_file}"
self.scp_conn.scp_get_file(source_file, self.dest_file)
self.scp_conn.close()
def put_file(self) -> None:
"""SCP copy the file from the local system to the remote device."""
destination = f"{self.file_system}/{self.dest_file}"
self.scp_conn.scp_transfer_file(self.source_file, destination)
# Must close the SCP connection to get the file written (flush)
self.scp_conn.close()
def verify_file(self) -> bool:
"""Verify the file has been transferred correctly."""
return self.compare_md5()
def enable_scp(self, cmd: str = "ip scp server enable") -> None:
"""
Enable SCP on remote device.
Defaults to Cisco IOS command
"""
self.ssh_ctl_chan.send_config_set(cmd)
def disable_scp(self, cmd: str = "no ip scp server enable") -> None:
"""
Disable SCP on remote device.
Defaults to Cisco IOS command
"""
self.ssh_ctl_chan.send_config_set(cmd)
Classes
class BaseFileTransfer (ssh_conn: BaseConnection, source_file: str, dest_file: str, file_system: Optional[str] = None, direction: str = 'put', socket_timeout: float = 10.0, progress: Optional[Callable[..., Any]] = None, progress4: Optional[Callable[..., Any]] = None, hash_supported: bool = True)
-
Class to manage SCP file transfer and associated SSH control channel.
Expand source code
class BaseFileTransfer(object): """Class to manage SCP file transfer and associated SSH control channel.""" def __init__( self, ssh_conn: "BaseConnection", source_file: str, dest_file: str, file_system: Optional[str] = None, direction: str = "put", socket_timeout: float = 10.0, progress: Optional[Callable[..., Any]] = None, progress4: Optional[Callable[..., Any]] = None, hash_supported: bool = True, ) -> None: self.ssh_ctl_chan = ssh_conn self.source_file = source_file self.dest_file = dest_file self.direction = direction self.socket_timeout = socket_timeout self.progress = progress self.progress4 = progress4 auto_flag = ( "cisco_ios" in ssh_conn.device_type or "cisco_xe" in ssh_conn.device_type or "cisco_xr" in ssh_conn.device_type ) if not file_system: if auto_flag: self.file_system = self.ssh_ctl_chan._autodetect_fs() else: raise ValueError("Destination file system not specified") else: self.file_system = file_system if direction == "put": self.source_md5 = self.file_md5(source_file) if hash_supported else None self.file_size = os.stat(source_file).st_size elif direction == "get": self.source_md5 = ( self.remote_md5(remote_file=source_file) if hash_supported else None ) self.file_size = self.remote_file_size(remote_file=source_file) else: raise ValueError("Invalid direction specified") def __enter__(self) -> "BaseFileTransfer": """Context manager setup""" self.establish_scp_conn() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: """Context manager cleanup.""" self.close_scp_chan() def establish_scp_conn(self) -> None: """Establish SCP connection.""" self.scp_conn = SCPConn( self.ssh_ctl_chan, socket_timeout=self.socket_timeout, progress=self.progress, progress4=self.progress4, ) def close_scp_chan(self) -> None: """Close the SCP connection to the remote network device.""" self.scp_conn.close() del self.scp_conn def remote_space_available(self, search_pattern: str = r"(\d+) \w+ free") -> int: """Return space available on remote device.""" remote_cmd = f"dir {self.file_system}" remote_output = self.ssh_ctl_chan._send_command_str(remote_cmd) match = re.search(search_pattern, remote_output) if match: if "kbytes" in match.group(0) or "Kbytes" in match.group(0): return int(match.group(1)) * 1000 return int(match.group(1)) else: msg = ( f"pattern: {search_pattern} not detected in output:\n\n{remote_output}" ) raise ValueError(msg) def _remote_space_available_unix(self, search_pattern: str = "") -> int: """Return space available on *nix system (BSD/Linux).""" self.ssh_ctl_chan._enter_shell() remote_cmd = f"/bin/df -k {self.file_system}" remote_output = self.ssh_ctl_chan._send_command_str( remote_cmd, expect_string=r"[\$#]" ) # Try to ensure parsing is correct: # Filesystem 1K-blocks Used Avail Capacity Mounted on # /dev/bo0s3f 1264808 16376 1147248 1% /cf/var remote_output = remote_output.strip() output_lines = remote_output.splitlines() # First line is the header; second is the actual file system info header_line = output_lines[0] filesystem_line = output_lines[1] if "Filesystem" not in header_line or "Avail" not in header_line.split()[3]: # Filesystem 1K-blocks Used Avail Capacity Mounted on msg = "Parsing error, unexpected output from {}:\n{}".format( remote_cmd, remote_output ) raise ValueError(msg) space_available = filesystem_line.split()[3] if not re.search(r"^\d+$", space_available): msg = "Parsing error, unexpected output from {}:\n{}".format( remote_cmd, remote_output ) raise ValueError(msg) self.ssh_ctl_chan._return_cli() return int(space_available) * 1024 def local_space_available(self) -> int: """Return space available on local filesystem.""" if sys.platform == "win32": import ctypes free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW( ctypes.c_wchar_p("."), None, None, ctypes.pointer(free_bytes) ) return free_bytes.value else: destination_stats = os.statvfs(".") return destination_stats.f_bsize * destination_stats.f_bavail def verify_space_available(self, search_pattern: str = r"(\d+) \w+ free") -> bool: """Verify sufficient space is available on destination file system (return boolean).""" if self.direction == "put": space_avail = self.remote_space_available(search_pattern=search_pattern) elif self.direction == "get": space_avail = self.local_space_available() if space_avail > self.file_size: return True return False def check_file_exists(self, remote_cmd: str = "") -> bool: """Check if the dest_file already exists on the file system (return boolean).""" if self.direction == "put": if not remote_cmd: remote_cmd = f"dir {self.file_system}/{self.dest_file}" remote_out = self.ssh_ctl_chan._send_command_str(remote_cmd) search_string = r"Directory of .*{0}".format(self.dest_file) if ( "Error opening" in remote_out or "No such file or directory" in remote_out or "Path does not exist" in remote_out ): return False elif re.search(search_string, remote_out, flags=re.DOTALL): return True else: raise ValueError("Unexpected output from check_file_exists") elif self.direction == "get": return os.path.exists(self.dest_file) else: raise ValueError("Unexpected value for self.direction") def _check_file_exists_unix(self, remote_cmd: str = "") -> bool: """Check if the dest_file already exists on the file system (return boolean).""" if self.direction == "put": self.ssh_ctl_chan._enter_shell() remote_cmd = f"/bin/ls {self.file_system}/{self.dest_file} 2> /dev/null" remote_out = self.ssh_ctl_chan._send_command_str( remote_cmd, expect_string=r"[\$#]" ) self.ssh_ctl_chan._return_cli() return self.dest_file in remote_out elif self.direction == "get": return os.path.exists(self.dest_file) else: raise ValueError("Unexpected value for self.direction") def remote_file_size( self, remote_cmd: str = "", remote_file: Optional[str] = None ) -> int: """Get the file size of the remote file.""" if remote_file is None: if self.direction == "put": remote_file = self.dest_file elif self.direction == "get": remote_file = self.source_file if not remote_cmd: remote_cmd = f"dir {self.file_system}/{remote_file}" remote_out = self.ssh_ctl_chan._send_command_str(remote_cmd) # Strip out "Directory of flash:/filename line remote_out_lines = re.split(r"Directory of .*", remote_out) remote_out = "".join(remote_out_lines) # Match line containing file name assert isinstance(remote_file, str) remote_file_base = os.path.basename(remote_file) escape_file_name = re.escape(remote_file_base) pattern = r".*({}).*".format(escape_file_name) match = re.search(pattern, remote_out) if match: line = match.group(0) # Format will be 26 -rw- 6738 Jul 30 2016 19:49:50 -07:00 filename file_size = line.split()[2] else: raise IOError("Unable to parse 'dir' output in remote_file_size method") if "Error opening" in remote_out or "No such file or directory" in remote_out: raise IOError("Unable to find file on remote system") else: return int(file_size) def _remote_file_size_unix( self, remote_cmd: str = "", remote_file: Optional[str] = None ) -> int: """Get the file size of the remote file.""" if remote_file is None: if self.direction == "put": remote_file = self.dest_file elif self.direction == "get": remote_file = self.source_file remote_file = f"{self.file_system}/{remote_file}" if not remote_cmd: remote_cmd = f"/bin/ls -l {remote_file}" self.ssh_ctl_chan._enter_shell() remote_out = self.ssh_ctl_chan._send_command_str( remote_cmd, expect_string=r"[\$#]" ) self.ssh_ctl_chan._return_cli() if "No such file or directory" in remote_out: raise IOError("Unable to find file on remote system") escape_file_name = re.escape(remote_file) pattern = r"^.* ({}).*$".format(escape_file_name) match = re.search(pattern, remote_out, flags=re.M) if match: # Format: -rw-r--r-- 1 pyclass wheel 12 Nov 5 19:07 /var/tmp/test3.txt line = match.group(0) file_size = line.split()[4] return int(file_size) raise ValueError( "Search pattern not found for remote file size during SCP transfer." ) def file_md5(self, file_name: str, add_newline: bool = False) -> str: """Compute MD5 hash of file. add_newline is needed to support Cisco IOS MD5 calculation which expects the newline in the string Args: file_name: name of file to get md5 digest of add_newline: add newline to end of file contents or not """ file_hash = hashlib.md5() with open(file_name, "rb") as f: while True: file_contents = f.read(512) if not file_contents: if add_newline: file_contents + b"\n" break file_hash.update(file_contents) return file_hash.hexdigest() @staticmethod def process_md5(md5_output: str, pattern: str = r"=\s+(\S+)") -> str: """ Process the string to retrieve the MD5 hash Output from Cisco IOS (ASA is similar) .MD5 of flash:file_name Done! verify /md5 (flash:file_name) = 410db2a7015eaa42b1fe71f1bf3d59a2 """ match = re.search(pattern, md5_output) if match: return match.group(1) else: raise ValueError(f"Invalid output from MD5 command: {md5_output}") def compare_md5(self) -> bool: """Compare md5 of file on network device to md5 of local file.""" if self.direction == "put": remote_md5 = self.remote_md5() return self.source_md5 == remote_md5 elif self.direction == "get": local_md5 = self.file_md5(self.dest_file) return self.source_md5 == local_md5 else: raise ValueError("Unexpected value for self.direction") def remote_md5( self, base_cmd: str = "verify /md5", remote_file: Optional[str] = None ) -> str: """Calculate remote MD5 and returns the hash. This command can be CPU intensive on the remote device. """ if remote_file is None: if self.direction == "put": remote_file = self.dest_file elif self.direction == "get": remote_file = self.source_file remote_md5_cmd = f"{base_cmd} {self.file_system}/{remote_file}" dest_md5 = self.ssh_ctl_chan._send_command_str(remote_md5_cmd, read_timeout=300) dest_md5 = self.process_md5(dest_md5) return dest_md5 def transfer_file(self) -> None: """SCP transfer file.""" if self.direction == "put": self.put_file() elif self.direction == "get": self.get_file() else: raise ValueError("Unexpected value for self.direction in transfer_file") def get_file(self) -> None: """SCP copy the file from the remote device to local system.""" source_file = f"{self.file_system}/{self.source_file}" self.scp_conn.scp_get_file(source_file, self.dest_file) self.scp_conn.close() def put_file(self) -> None: """SCP copy the file from the local system to the remote device.""" destination = f"{self.file_system}/{self.dest_file}" self.scp_conn.scp_transfer_file(self.source_file, destination) # Must close the SCP connection to get the file written (flush) self.scp_conn.close() def verify_file(self) -> bool: """Verify the file has been transferred correctly.""" return self.compare_md5() def enable_scp(self, cmd: str = "ip scp server enable") -> None: """ Enable SCP on remote device. Defaults to Cisco IOS command """ self.ssh_ctl_chan.send_config_set(cmd) def disable_scp(self, cmd: str = "no ip scp server enable") -> None: """ Disable SCP on remote device. Defaults to Cisco IOS command """ self.ssh_ctl_chan.send_config_set(cmd)
Subclasses
- CienaSaosFileTransfer
- CiscoFileTransfer
- DellOS10FileTransfer
- ExtremeExosFileTransfer
- JuniperFileTransfer
- MikrotikRouterOsFileTransfer
- NokiaSrosFileTransfer
- UbiquitiEdgeRouterFileTransfer
Static methods
def process_md5(md5_output: str, pattern: str = '=\\s+(\\S+)') ‑> str
-
Process the string to retrieve the MD5 hash
Output from Cisco IOS (ASA is similar) .MD5 of flash:file_name Done! verify /md5 (flash:file_name) = 410db2a7015eaa42b1fe71f1bf3d59a2
Expand source code
@staticmethod def process_md5(md5_output: str, pattern: str = r"=\s+(\S+)") -> str: """ Process the string to retrieve the MD5 hash Output from Cisco IOS (ASA is similar) .MD5 of flash:file_name Done! verify /md5 (flash:file_name) = 410db2a7015eaa42b1fe71f1bf3d59a2 """ match = re.search(pattern, md5_output) if match: return match.group(1) else: raise ValueError(f"Invalid output from MD5 command: {md5_output}")
Methods
def check_file_exists(self, remote_cmd: str = '') ‑> bool
-
Check if the dest_file already exists on the file system (return boolean).
Expand source code
def check_file_exists(self, remote_cmd: str = "") -> bool: """Check if the dest_file already exists on the file system (return boolean).""" if self.direction == "put": if not remote_cmd: remote_cmd = f"dir {self.file_system}/{self.dest_file}" remote_out = self.ssh_ctl_chan._send_command_str(remote_cmd) search_string = r"Directory of .*{0}".format(self.dest_file) if ( "Error opening" in remote_out or "No such file or directory" in remote_out or "Path does not exist" in remote_out ): return False elif re.search(search_string, remote_out, flags=re.DOTALL): return True else: raise ValueError("Unexpected output from check_file_exists") elif self.direction == "get": return os.path.exists(self.dest_file) else: raise ValueError("Unexpected value for self.direction")
def close_scp_chan(self) ‑> None
-
Close the SCP connection to the remote network device.
Expand source code
def close_scp_chan(self) -> None: """Close the SCP connection to the remote network device.""" self.scp_conn.close() del self.scp_conn
def compare_md5(self) ‑> bool
-
Compare md5 of file on network device to md5 of local file.
Expand source code
def compare_md5(self) -> bool: """Compare md5 of file on network device to md5 of local file.""" if self.direction == "put": remote_md5 = self.remote_md5() return self.source_md5 == remote_md5 elif self.direction == "get": local_md5 = self.file_md5(self.dest_file) return self.source_md5 == local_md5 else: raise ValueError("Unexpected value for self.direction")
def disable_scp(self, cmd: str = 'no ip scp server enable') ‑> None
-
Disable SCP on remote device.
Defaults to Cisco IOS command
Expand source code
def disable_scp(self, cmd: str = "no ip scp server enable") -> None: """ Disable SCP on remote device. Defaults to Cisco IOS command """ self.ssh_ctl_chan.send_config_set(cmd)
def enable_scp(self, cmd: str = 'ip scp server enable') ‑> None
-
Enable SCP on remote device.
Defaults to Cisco IOS command
Expand source code
def enable_scp(self, cmd: str = "ip scp server enable") -> None: """ Enable SCP on remote device. Defaults to Cisco IOS command """ self.ssh_ctl_chan.send_config_set(cmd)
def establish_scp_conn(self) ‑> None
-
Establish SCP connection.
Expand source code
def establish_scp_conn(self) -> None: """Establish SCP connection.""" self.scp_conn = SCPConn( self.ssh_ctl_chan, socket_timeout=self.socket_timeout, progress=self.progress, progress4=self.progress4, )
def file_md5(self, file_name: str, add_newline: bool = False) ‑> str
-
Compute MD5 hash of file.
add_newline is needed to support Cisco IOS MD5 calculation which expects the newline in the string
Args
file_name
- name of file to get md5 digest of
add_newline
- add newline to end of file contents or not
Expand source code
def file_md5(self, file_name: str, add_newline: bool = False) -> str: """Compute MD5 hash of file. add_newline is needed to support Cisco IOS MD5 calculation which expects the newline in the string Args: file_name: name of file to get md5 digest of add_newline: add newline to end of file contents or not """ file_hash = hashlib.md5() with open(file_name, "rb") as f: while True: file_contents = f.read(512) if not file_contents: if add_newline: file_contents + b"\n" break file_hash.update(file_contents) return file_hash.hexdigest()
def get_file(self) ‑> None
-
SCP copy the file from the remote device to local system.
Expand source code
def get_file(self) -> None: """SCP copy the file from the remote device to local system.""" source_file = f"{self.file_system}/{self.source_file}" self.scp_conn.scp_get_file(source_file, self.dest_file) self.scp_conn.close()
def local_space_available(self) ‑> int
-
Return space available on local filesystem.
Expand source code
def local_space_available(self) -> int: """Return space available on local filesystem.""" if sys.platform == "win32": import ctypes free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW( ctypes.c_wchar_p("."), None, None, ctypes.pointer(free_bytes) ) return free_bytes.value else: destination_stats = os.statvfs(".") return destination_stats.f_bsize * destination_stats.f_bavail
def put_file(self) ‑> None
-
SCP copy the file from the local system to the remote device.
Expand source code
def put_file(self) -> None: """SCP copy the file from the local system to the remote device.""" destination = f"{self.file_system}/{self.dest_file}" self.scp_conn.scp_transfer_file(self.source_file, destination) # Must close the SCP connection to get the file written (flush) self.scp_conn.close()
def remote_file_size(self, remote_cmd: str = '', remote_file: Optional[str] = None) ‑> int
-
Get the file size of the remote file.
Expand source code
def remote_file_size( self, remote_cmd: str = "", remote_file: Optional[str] = None ) -> int: """Get the file size of the remote file.""" if remote_file is None: if self.direction == "put": remote_file = self.dest_file elif self.direction == "get": remote_file = self.source_file if not remote_cmd: remote_cmd = f"dir {self.file_system}/{remote_file}" remote_out = self.ssh_ctl_chan._send_command_str(remote_cmd) # Strip out "Directory of flash:/filename line remote_out_lines = re.split(r"Directory of .*", remote_out) remote_out = "".join(remote_out_lines) # Match line containing file name assert isinstance(remote_file, str) remote_file_base = os.path.basename(remote_file) escape_file_name = re.escape(remote_file_base) pattern = r".*({}).*".format(escape_file_name) match = re.search(pattern, remote_out) if match: line = match.group(0) # Format will be 26 -rw- 6738 Jul 30 2016 19:49:50 -07:00 filename file_size = line.split()[2] else: raise IOError("Unable to parse 'dir' output in remote_file_size method") if "Error opening" in remote_out or "No such file or directory" in remote_out: raise IOError("Unable to find file on remote system") else: return int(file_size)
def remote_md5(self, base_cmd: str = 'verify /md5', remote_file: Optional[str] = None) ‑> str
-
Calculate remote MD5 and returns the hash.
This command can be CPU intensive on the remote device.
Expand source code
def remote_md5( self, base_cmd: str = "verify /md5", remote_file: Optional[str] = None ) -> str: """Calculate remote MD5 and returns the hash. This command can be CPU intensive on the remote device. """ if remote_file is None: if self.direction == "put": remote_file = self.dest_file elif self.direction == "get": remote_file = self.source_file remote_md5_cmd = f"{base_cmd} {self.file_system}/{remote_file}" dest_md5 = self.ssh_ctl_chan._send_command_str(remote_md5_cmd, read_timeout=300) dest_md5 = self.process_md5(dest_md5) return dest_md5
def remote_space_available(self, search_pattern: str = '(\\d+) \\w+ free') ‑> int
-
Return space available on remote device.
Expand source code
def remote_space_available(self, search_pattern: str = r"(\d+) \w+ free") -> int: """Return space available on remote device.""" remote_cmd = f"dir {self.file_system}" remote_output = self.ssh_ctl_chan._send_command_str(remote_cmd) match = re.search(search_pattern, remote_output) if match: if "kbytes" in match.group(0) or "Kbytes" in match.group(0): return int(match.group(1)) * 1000 return int(match.group(1)) else: msg = ( f"pattern: {search_pattern} not detected in output:\n\n{remote_output}" ) raise ValueError(msg)
def transfer_file(self) ‑> None
-
SCP transfer file.
Expand source code
def transfer_file(self) -> None: """SCP transfer file.""" if self.direction == "put": self.put_file() elif self.direction == "get": self.get_file() else: raise ValueError("Unexpected value for self.direction in transfer_file")
def verify_file(self) ‑> bool
-
Verify the file has been transferred correctly.
Expand source code
def verify_file(self) -> bool: """Verify the file has been transferred correctly.""" return self.compare_md5()
def verify_space_available(self, search_pattern: str = '(\\d+) \\w+ free') ‑> bool
-
Verify sufficient space is available on destination file system (return boolean).
Expand source code
def verify_space_available(self, search_pattern: str = r"(\d+) \w+ free") -> bool: """Verify sufficient space is available on destination file system (return boolean).""" if self.direction == "put": space_avail = self.remote_space_available(search_pattern=search_pattern) elif self.direction == "get": space_avail = self.local_space_available() if space_avail > self.file_size: return True return False
class SCPConn (ssh_conn: BaseConnection, socket_timeout: float = 10.0, progress: Optional[Callable[..., Any]] = None, progress4: Optional[Callable[..., Any]] = None)
-
Establish a secure copy channel to the remote network device.
Must close the SCP connection to get the file to write to the remote filesystem
Expand source code
class SCPConn(object): """ Establish a secure copy channel to the remote network device. Must close the SCP connection to get the file to write to the remote filesystem """ def __init__( self, ssh_conn: "BaseConnection", socket_timeout: float = 10.0, progress: Optional[Callable[..., Any]] = None, progress4: Optional[Callable[..., Any]] = None, ) -> None: self.ssh_ctl_chan = ssh_conn self.socket_timeout = socket_timeout self.progress = progress self.progress4 = progress4 self.establish_scp_conn() def establish_scp_conn(self) -> None: """Establish the secure copy connection.""" ssh_connect_params = self.ssh_ctl_chan._connect_params_dict() self.scp_conn = self.ssh_ctl_chan._build_ssh_client() self.scp_conn.connect(**ssh_connect_params) self.scp_client = scp.SCPClient( self.scp_conn.get_transport(), socket_timeout=self.socket_timeout, progress=self.progress, progress4=self.progress4, ) def scp_transfer_file(self, source_file: str, dest_file: str) -> None: """Put file using SCP (for backwards compatibility).""" self.scp_client.put(source_file, dest_file) def scp_get_file(self, source_file: str, dest_file: str) -> None: """Get file using SCP.""" self.scp_client.get(source_file, dest_file) def scp_put_file(self, source_file: str, dest_file: str) -> None: """Put file using SCP.""" self.scp_client.put(source_file, dest_file) def close(self) -> None: """Close the SCP connection.""" self.scp_conn.close()
Methods
def close(self) ‑> None
-
Close the SCP connection.
Expand source code
def close(self) -> None: """Close the SCP connection.""" self.scp_conn.close()
def establish_scp_conn(self) ‑> None
-
Establish the secure copy connection.
Expand source code
def establish_scp_conn(self) -> None: """Establish the secure copy connection.""" ssh_connect_params = self.ssh_ctl_chan._connect_params_dict() self.scp_conn = self.ssh_ctl_chan._build_ssh_client() self.scp_conn.connect(**ssh_connect_params) self.scp_client = scp.SCPClient( self.scp_conn.get_transport(), socket_timeout=self.socket_timeout, progress=self.progress, progress4=self.progress4, )
def scp_get_file(self, source_file: str, dest_file: str) ‑> None
-
Get file using SCP.
Expand source code
def scp_get_file(self, source_file: str, dest_file: str) -> None: """Get file using SCP.""" self.scp_client.get(source_file, dest_file)
def scp_put_file(self, source_file: str, dest_file: str) ‑> None
-
Put file using SCP.
Expand source code
def scp_put_file(self, source_file: str, dest_file: str) -> None: """Put file using SCP.""" self.scp_client.put(source_file, dest_file)
def scp_transfer_file(self, source_file: str, dest_file: str) ‑> None
-
Put file using SCP (for backwards compatibility).
Expand source code
def scp_transfer_file(self, source_file: str, dest_file: str) -> None: """Put file using SCP (for backwards compatibility).""" self.scp_client.put(source_file, dest_file)