Module netmiko.scp_handler
Netmiko SCP operations.
Supports file get and file put operations.
SCP requires a separate SSH connection for a control channel.
Classes
class BaseFileTransfer (ssh_conn: BaseConnection, source_file: str, dest_file: str, file_system: Optional[str] = None, direction: str = 'put', socket_timeout: float = 10.0, progress: Optional[Callable[..., Any]] = None, progress4: Optional[Callable[..., Any]] = None, hash_supported: bool = True)
-
Class to manage SCP file transfer and associated SSH control channel.
Expand source code
class BaseFileTransfer(object): """Class to manage SCP file transfer and associated SSH control channel.""" def __init__( self, ssh_conn: "BaseConnection", source_file: str, dest_file: str, file_system: Optional[str] = None, direction: str = "put", socket_timeout: float = 10.0, progress: Optional[Callable[..., Any]] = None, progress4: Optional[Callable[..., Any]] = None, hash_supported: bool = True, ) -> None: self.ssh_ctl_chan = ssh_conn self.source_file = source_file self.dest_file = dest_file self.direction = direction self.socket_timeout = socket_timeout self.progress = progress self.progress4 = progress4 auto_flag = ( "cisco_ios" in ssh_conn.device_type or "cisco_xe" in ssh_conn.device_type or "cisco_xr" in ssh_conn.device_type ) if not file_system: if auto_flag: self.file_system = self.ssh_ctl_chan._autodetect_fs() else: raise ValueError("Destination file system not specified") else: self.file_system = file_system if direction == "put": self.source_md5 = self.file_md5(source_file) if hash_supported else None self.file_size = os.stat(source_file).st_size elif direction == "get": self.source_md5 = ( self.remote_md5(remote_file=source_file) if hash_supported else None ) self.file_size = self.remote_file_size(remote_file=source_file) else: raise ValueError("Invalid direction specified") def __enter__(self) -> "BaseFileTransfer": """Context manager setup""" self.establish_scp_conn() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: """Context manager cleanup.""" self.close_scp_chan() def establish_scp_conn(self) -> None: """Establish SCP connection.""" self.scp_conn = SCPConn( self.ssh_ctl_chan, socket_timeout=self.socket_timeout, progress=self.progress, progress4=self.progress4, ) def close_scp_chan(self) -> None: """Close the SCP connection to the remote network device.""" self.scp_conn.close() del self.scp_conn def remote_space_available(self, search_pattern: str = r"(\d+) \w+ free") -> int: """Return space available on remote device.""" remote_cmd = f"dir {self.file_system}" remote_output = self.ssh_ctl_chan._send_command_str(remote_cmd) match = re.search(search_pattern, remote_output) if match: if "kbytes" in match.group(0) or "Kbytes" in match.group(0): return int(match.group(1)) * 1000 return int(match.group(1)) else: msg = ( f"pattern: {search_pattern} not detected in output:\n\n{remote_output}" ) raise ValueError(msg) def _remote_space_available_unix(self, search_pattern: str = "") -> int: """Return space available on *nix system (BSD/Linux).""" self.ssh_ctl_chan._enter_shell() remote_cmd = f"/bin/df -k {self.file_system}" remote_output = self.ssh_ctl_chan._send_command_str( remote_cmd, expect_string=r"[\$#]" ) # Try to ensure parsing is correct: # Filesystem 1K-blocks Used Avail Capacity Mounted on # /dev/bo0s3f 1264808 16376 1147248 1% /cf/var remote_output = remote_output.strip() output_lines = remote_output.splitlines() # First line is the header; second is the actual file system info header_line = output_lines[0] filesystem_line = output_lines[1] if "Filesystem" not in header_line or "Avail" not in header_line.split()[3]: # Filesystem 1K-blocks Used Avail Capacity Mounted on msg = "Parsing error, unexpected output from {}:\n{}".format( remote_cmd, remote_output ) raise ValueError(msg) space_available = filesystem_line.split()[3] if not re.search(r"^\d+$", space_available): msg = "Parsing error, unexpected output from {}:\n{}".format( remote_cmd, remote_output ) raise ValueError(msg) self.ssh_ctl_chan._return_cli() return int(space_available) * 1024 def local_space_available(self) -> int: """Return space available on local filesystem.""" if sys.platform == "win32": import ctypes free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW( ctypes.c_wchar_p("."), None, None, ctypes.pointer(free_bytes) ) return free_bytes.value else: destination_stats = os.statvfs(".") return destination_stats.f_bsize * destination_stats.f_bavail def verify_space_available(self, search_pattern: str = r"(\d+) \w+ free") -> bool: """Verify sufficient space is available on destination file system (return boolean).""" if self.direction == "put": space_avail = self.remote_space_available(search_pattern=search_pattern) elif self.direction == "get": space_avail = self.local_space_available() if space_avail > self.file_size: return True return False def check_file_exists(self, remote_cmd: str = "") -> bool: """Check if the dest_file already exists on the file system (return boolean).""" if self.direction == "put": if not remote_cmd: remote_cmd = f"dir {self.file_system}/{self.dest_file}" remote_out = self.ssh_ctl_chan._send_command_str(remote_cmd) search_string = r"Directory of .*{0}".format(self.dest_file) if ( "Error opening" in remote_out or "No such file or directory" in remote_out or "Path does not exist" in remote_out ): return False elif re.search(search_string, remote_out, flags=re.DOTALL): return True else: raise ValueError("Unexpected output from check_file_exists") elif self.direction == "get": return os.path.exists(self.dest_file) else: raise ValueError("Unexpected value for self.direction") def _check_file_exists_unix(self, remote_cmd: str = "") -> bool: """Check if the dest_file already exists on the file system (return boolean).""" if self.direction == "put": self.ssh_ctl_chan._enter_shell() remote_cmd = f"/bin/ls {self.file_system}/{self.dest_file} 2> /dev/null" remote_out = self.ssh_ctl_chan._send_command_str( remote_cmd, expect_string=r"[\$#]" ) self.ssh_ctl_chan._return_cli() return self.dest_file in remote_out elif self.direction == "get": return os.path.exists(self.dest_file) else: raise ValueError("Unexpected value for self.direction") def remote_file_size( self, remote_cmd: str = "", remote_file: Optional[str] = None ) -> int: """Get the file size of the remote file.""" if remote_file is None: if self.direction == "put": remote_file = self.dest_file elif self.direction == "get": remote_file = self.source_file if not remote_cmd: remote_cmd = f"dir {self.file_system}/{remote_file}" remote_out = self.ssh_ctl_chan._send_command_str(remote_cmd) # Strip out "Directory of flash:/filename line remote_out_lines = re.split(r"Directory of .*", remote_out) remote_out = "".join(remote_out_lines) # Match line containing file name assert isinstance(remote_file, str) remote_file_base = os.path.basename(remote_file) escape_file_name = re.escape(remote_file_base) pattern = r".*({}).*".format(escape_file_name) match = re.search(pattern, remote_out) if match: line = match.group(0) # Format will be 26 -rw- 6738 Jul 30 2016 19:49:50 -07:00 filename file_size = line.split()[2] else: raise IOError("Unable to parse 'dir' output in remote_file_size method") if "Error opening" in remote_out or "No such file or directory" in remote_out: raise IOError("Unable to find file on remote system") else: return int(file_size) def _remote_file_size_unix( self, remote_cmd: str = "", remote_file: Optional[str] = None ) -> int: """Get the file size of the remote file.""" if remote_file is None: if self.direction == "put": remote_file = self.dest_file elif self.direction == "get": remote_file = self.source_file remote_file = f"{self.file_system}/{remote_file}" if not remote_cmd: remote_cmd = f"/bin/ls -l {remote_file}" self.ssh_ctl_chan._enter_shell() remote_out = self.ssh_ctl_chan._send_command_str( remote_cmd, expect_string=r"[\$#]" ) self.ssh_ctl_chan._return_cli() if "No such file or directory" in remote_out: raise IOError("Unable to find file on remote system") escape_file_name = re.escape(remote_file) pattern = r"^.* ({}).*$".format(escape_file_name) match = re.search(pattern, remote_out, flags=re.M) if match: # Format: -rw-r--r-- 1 pyclass wheel 12 Nov 5 19:07 /var/tmp/test3.txt line = match.group(0) file_size = line.split()[4] return int(file_size) raise ValueError( "Search pattern not found for remote file size during SCP transfer." ) def file_md5(self, file_name: str, add_newline: bool = False) -> str: """Compute MD5 hash of file. add_newline is needed to support Cisco IOS MD5 calculation which expects the newline in the string Args: file_name: name of file to get md5 digest of add_newline: add newline to end of file contents or not """ file_hash = hashlib.md5() with open(file_name, "rb") as f: while True: file_contents = f.read(512) if not file_contents: if add_newline: file_contents + b"\n" break file_hash.update(file_contents) return file_hash.hexdigest() @staticmethod def process_md5(md5_output: str, pattern: str = r"=\s+(\S+)") -> str: """ Process the string to retrieve the MD5 hash Output from Cisco IOS (ASA is similar) .MD5 of flash:file_name Done! verify /md5 (flash:file_name) = 410db2a7015eaa42b1fe71f1bf3d59a2 """ match = re.search(pattern, md5_output) if match: return match.group(1) else: raise ValueError(f"Invalid output from MD5 command: {md5_output}") def compare_md5(self) -> bool: """Compare md5 of file on network device to md5 of local file.""" if self.direction == "put": remote_md5 = self.remote_md5() return self.source_md5 == remote_md5 elif self.direction == "get": local_md5 = self.file_md5(self.dest_file) return self.source_md5 == local_md5 else: raise ValueError("Unexpected value for self.direction") def remote_md5( self, base_cmd: str = "verify /md5", remote_file: Optional[str] = None ) -> str: """Calculate remote MD5 and returns the hash. This command can be CPU intensive on the remote device. """ if remote_file is None: if self.direction == "put": remote_file = self.dest_file elif self.direction == "get": remote_file = self.source_file remote_md5_cmd = f"{base_cmd} {self.file_system}/{remote_file}" dest_md5 = self.ssh_ctl_chan._send_command_str(remote_md5_cmd, read_timeout=300) dest_md5 = self.process_md5(dest_md5) return dest_md5 def transfer_file(self) -> None: """SCP transfer file.""" if self.direction == "put": self.put_file() elif self.direction == "get": self.get_file() else: raise ValueError("Unexpected value for self.direction in transfer_file") def get_file(self) -> None: """SCP copy the file from the remote device to local system.""" source_file = f"{self.file_system}/{self.source_file}" self.scp_conn.scp_get_file(source_file, self.dest_file) self.scp_conn.close() def put_file(self) -> None: """SCP copy the file from the local system to the remote device.""" destination = f"{self.file_system}/{self.dest_file}" self.scp_conn.scp_transfer_file(self.source_file, destination) # Must close the SCP connection to get the file written (flush) self.scp_conn.close() def verify_file(self) -> bool: """Verify the file has been transferred correctly.""" return self.compare_md5() def enable_scp(self, cmd: str = "ip scp server enable") -> None: """ Enable SCP on remote device. Defaults to Cisco IOS command """ self.ssh_ctl_chan.send_config_set(cmd) def disable_scp(self, cmd: str = "no ip scp server enable") -> None: """ Disable SCP on remote device. Defaults to Cisco IOS command """ self.ssh_ctl_chan.send_config_set(cmd)
Subclasses
- CienaSaosFileTransfer
- CiscoFileTransfer
- DellOS10FileTransfer
- ExtremeExosFileTransfer
- JuniperFileTransfer
- MikrotikRouterOsFileTransfer
- NokiaSrosFileTransfer
- UbiquitiEdgeRouterFileTransfer
Static methods
def process_md5(md5_output: str, pattern: str = '=\\s+(\\S+)') ‑> str
-
Process the string to retrieve the MD5 hash
Output from Cisco IOS (ASA is similar) .MD5 of flash:file_name Done! verify /md5 (flash:file_name) = 410db2a7015eaa42b1fe71f1bf3d59a2
Methods
def check_file_exists(self, remote_cmd: str = '') ‑> bool
-
Check if the dest_file already exists on the file system (return boolean).
def close_scp_chan(self) ‑> None
-
Close the SCP connection to the remote network device.
def compare_md5(self) ‑> bool
-
Compare md5 of file on network device to md5 of local file.
def disable_scp(self, cmd: str = 'no ip scp server enable') ‑> None
-
Disable SCP on remote device.
Defaults to Cisco IOS command
def enable_scp(self, cmd: str = 'ip scp server enable') ‑> None
-
Enable SCP on remote device.
Defaults to Cisco IOS command
def establish_scp_conn(self) ‑> None
-
Establish SCP connection.
def file_md5(self, file_name: str, add_newline: bool = False) ‑> str
-
Compute MD5 hash of file.
add_newline is needed to support Cisco IOS MD5 calculation which expects the newline in the string
Args
file_name
- name of file to get md5 digest of
add_newline
- add newline to end of file contents or not
def get_file(self) ‑> None
-
SCP copy the file from the remote device to local system.
def local_space_available(self) ‑> int
-
Return space available on local filesystem.
def put_file(self) ‑> None
-
SCP copy the file from the local system to the remote device.
def remote_file_size(self, remote_cmd: str = '', remote_file: Optional[str] = None) ‑> int
-
Get the file size of the remote file.
def remote_md5(self, base_cmd: str = 'verify /md5', remote_file: Optional[str] = None) ‑> str
-
Calculate remote MD5 and returns the hash.
This command can be CPU intensive on the remote device.
def remote_space_available(self, search_pattern: str = '(\\d+) \\w+ free') ‑> int
-
Return space available on remote device.
def transfer_file(self) ‑> None
-
SCP transfer file.
def verify_file(self) ‑> bool
-
Verify the file has been transferred correctly.
def verify_space_available(self, search_pattern: str = '(\\d+) \\w+ free') ‑> bool
-
Verify sufficient space is available on destination file system (return boolean).
class SCPConn (ssh_conn: BaseConnection, socket_timeout: float = 10.0, progress: Optional[Callable[..., Any]] = None, progress4: Optional[Callable[..., Any]] = None)
-
Establish a secure copy channel to the remote network device.
Must close the SCP connection to get the file to write to the remote filesystem
Expand source code
class SCPConn(object): """ Establish a secure copy channel to the remote network device. Must close the SCP connection to get the file to write to the remote filesystem """ def __init__( self, ssh_conn: "BaseConnection", socket_timeout: float = 10.0, progress: Optional[Callable[..., Any]] = None, progress4: Optional[Callable[..., Any]] = None, ) -> None: self.ssh_ctl_chan = ssh_conn self.socket_timeout = socket_timeout self.progress = progress self.progress4 = progress4 self.establish_scp_conn() def establish_scp_conn(self) -> None: """Establish the secure copy connection.""" ssh_connect_params = self.ssh_ctl_chan._connect_params_dict() self.scp_conn = self.ssh_ctl_chan._build_ssh_client() self.scp_conn.connect(**ssh_connect_params) self.scp_client = scp.SCPClient( self.scp_conn.get_transport(), socket_timeout=self.socket_timeout, progress=self.progress, progress4=self.progress4, ) def scp_transfer_file(self, source_file: str, dest_file: str) -> None: """Put file using SCP (for backwards compatibility).""" self.scp_client.put(source_file, dest_file) def scp_get_file(self, source_file: str, dest_file: str) -> None: """Get file using SCP.""" self.scp_client.get(source_file, dest_file) def scp_put_file(self, source_file: str, dest_file: str) -> None: """Put file using SCP.""" self.scp_client.put(source_file, dest_file) def close(self) -> None: """Close the SCP connection.""" self.scp_conn.close()
Methods
def close(self) ‑> None
-
Close the SCP connection.
def establish_scp_conn(self) ‑> None
-
Establish the secure copy connection.
def scp_get_file(self, source_file: str, dest_file: str) ‑> None
-
Get file using SCP.
def scp_put_file(self, source_file: str, dest_file: str) ‑> None
-
Put file using SCP.
def scp_transfer_file(self, source_file: str, dest_file: str) ‑> None
-
Put file using SCP (for backwards compatibility).