Module netmiko.linux

Expand source code
from netmiko.linux.linux_ssh import LinuxSSH, LinuxFileTransfer

__all__ = ["LinuxSSH", "LinuxFileTransfer"]

Sub-modules

netmiko.linux.linux_ssh

Classes

class LinuxFileTransfer (ssh_conn: BaseConnection, source_file: str, dest_file: str, file_system: Optional[str] = '/var/tmp', direction: str = 'put', **kwargs: Any)

Linux SCP File Transfer driver.

Mostly for testing purposes.

Expand source code
class LinuxFileTransfer(CiscoFileTransfer):
    """
    Linux SCP File Transfer driver.

    Mostly for testing purposes.
    """

    def __init__(
        self,
        ssh_conn: "BaseConnection",
        source_file: str,
        dest_file: str,
        file_system: Optional[str] = "/var/tmp",
        direction: str = "put",
        **kwargs: Any,
    ) -> None:
        super().__init__(
            ssh_conn=ssh_conn,
            source_file=source_file,
            dest_file=dest_file,
            file_system=file_system,
            direction=direction,
            **kwargs,
        )

    def remote_space_available(self, search_pattern: str = "") -> int:
        """Return space available on remote device."""
        return self._remote_space_available_unix(search_pattern=search_pattern)

    def check_file_exists(self, remote_cmd: str = "") -> bool:
        """Check if the dest_file already exists on the file system (return boolean)."""
        return self._check_file_exists_unix(remote_cmd=remote_cmd)

    def remote_file_size(
        self, remote_cmd: str = "", remote_file: Optional[str] = None
    ) -> int:
        """Get the file size of the remote file."""
        return self._remote_file_size_unix(
            remote_cmd=remote_cmd, remote_file=remote_file
        )

    def remote_md5(
        self, base_cmd: str = "md5sum", remote_file: Optional[str] = None
    ) -> str:
        if remote_file is None:
            if self.direction == "put":
                remote_file = self.dest_file
            elif self.direction == "get":
                remote_file = self.source_file
        remote_md5_cmd = f"{base_cmd} {self.file_system}/{remote_file}"
        dest_md5 = self.ssh_ctl_chan._send_command_str(remote_md5_cmd, read_timeout=300)
        dest_md5 = self.process_md5(dest_md5).strip()
        return dest_md5

    @staticmethod
    def process_md5(md5_output: str, pattern: str = r"^(\S+)\s+") -> str:
        return super(LinuxFileTransfer, LinuxFileTransfer).process_md5(
            md5_output, pattern=pattern
        )

    def enable_scp(self, cmd: str = "") -> None:
        raise NotImplementedError

    def disable_scp(self, cmd: str = "") -> None:
        raise NotImplementedError

Ancestors

Inherited members

class LinuxSSH (ip: str = '', host: str = '', username: str = '', password: Optional[str] = None, secret: str = '', port: Optional[int] = None, device_type: str = '', verbose: bool = False, global_delay_factor: float = 1.0, global_cmd_verify: Optional[bool] = None, use_keys: bool = False, key_file: Optional[str] = None, pkey: Optional[paramiko.pkey.PKey] = None, passphrase: Optional[str] = None, disabled_algorithms: Optional[Dict[str, Any]] = None, disable_sha2_fix: bool = False, allow_agent: bool = False, ssh_strict: bool = False, system_host_keys: bool = False, alt_host_keys: bool = False, alt_key_file: str = '', ssh_config_file: Optional[str] = None, conn_timeout: int = 10, auth_timeout: Optional[int] = None, banner_timeout: int = 15, blocking_timeout: int = 20, timeout: int = 100, session_timeout: int = 60, read_timeout_override: Optional[float] = None, keepalive: int = 0, default_enter: Optional[str] = None, response_return: Optional[str] = None, serial_settings: Optional[Dict[str, Any]] = None, fast_cli: bool = True, session_log: Optional[SessionLog] = None, session_log_record_writes: bool = False, session_log_file_mode: str = 'write', allow_auto_change: bool = False, encoding: str = 'utf-8', sock: Optional[socket.socket] = None, sock_telnet: Optional[Dict[str, Any]] = None, auto_connect: bool = True, delay_factor_compat: bool = False, disable_lf_normalization: bool = False)

Base Class for cisco-like behavior.

    Initialize attributes for establishing connection to target device.

    :param ip: IP address of target device. Not required if <code>host</code> is
        provided.

    :param host: Hostname of target device. Not required if <code>ip</code> is
            provided.

    :param username: Username to authenticate against target device if
            required.

    :param password: Password to authenticate against target device if
            required.

    :param secret: The enable password if target device requires one.

    :param port: The destination port used to connect to the target
            device.

    :param device_type: Class selection based on device type.

    :param verbose: Enable additional messages to standard output.

    :param global_delay_factor: Multiplication factor affecting Netmiko delays (default: 1).

    :param use_keys: Connect to target device using SSH keys.

    :param key_file: Filename path of the SSH key file to use.

    :param pkey: SSH key object to use.

    :param passphrase: Passphrase to use for encrypted key; password will be used for key
            decryption if not specified.

    :param disabled_algorithms: Dictionary of SSH algorithms to disable. Refer to the Paramiko
            documentation for a description of the expected format.

    :param disable_sha2_fix: Boolean that fixes Paramiko issue with missing server-sig-algs
        <https://github.com/paramiko/paramiko/issues/1961> (default: False)

    :param allow_agent: Enable use of SSH key-agent.

    :param ssh_strict: Automatically reject unknown SSH host keys (default: False, which
            means unknown SSH host keys will be accepted).

    :param system_host_keys: Load host keys from the users known_hosts file.

    :param alt_host_keys: If <code>True</code> host keys will be loaded from the file specified in
            alt_key_file.

    :param alt_key_file: SSH host key file to use (if alt_host_keys=True).

    :param ssh_config_file: File name of OpenSSH configuration file.

    :param conn_timeout: TCP connection timeout.

    :param session_timeout: Set a timeout for parallel requests.

    :param auth_timeout: Set a timeout (in seconds) to wait for an authentication response.

    :param banner_timeout: Set a timeout to wait for the SSH banner (pass to Paramiko).

    :param read_timeout_override: Set a timeout that will override the default read_timeout
            of both send_command and send_command_timing. This is useful for 3rd party
            libraries where directly accessing method arguments might be impractical.

    :param keepalive: Send SSH keepalive packets at a specific interval, in seconds.
            Currently defaults to 0, for backwards compatibility (it will not attempt
            to keep the connection alive).

    :param default_enter: Character(s) to send to correspond to enter key (default:

).

    :param response_return: Character(s) to use in normalized return data to represent
            enter key (default:

)

    :param serial_settings: Dictionary of settings for use with serial port (pySerial).

    :param fast_cli: Provide a way to optimize for performance. Converts select_delay_factor
            to select smallest of global and specific. Sets default global_delay_factor to .1
            (default: True)

    :param session_log: File path, SessionLog object, or BufferedIOBase subclass object
            to write the session log to.

    :param session_log_record_writes: The session log generally only records channel reads due
            to eliminate command duplication due to command echo. You can enable this if you
            want to record both channel reads and channel writes in the log (default: False).

    :param session_log_file_mode: "write" or "append" for session_log file mode
            (default: "write")

    :param allow_auto_change: Allow automatic configuration changes for terminal settings.
            (default: False)

    :param encoding: Encoding to be used when writing bytes to the output channel.
            (default: "utf-8")

    :param sock: An open socket or socket-like object (such as a <code>.Channel</code>) to use for
            communication to the target host (default: None).

    :param sock_telnet: A dictionary of telnet socket parameters (SOCKS proxy). See
            telnet_proxy.py code for details.

    :param global_cmd_verify: Control whether command echo verification is enabled or disabled
            (default: None). Global attribute takes precedence over function <code>cmd\_verify</code>
            argument. Value of <code>None</code> indicates to use function <code>cmd\_verify</code> argument.

    :param auto_connect: Control whether Netmiko automatically establishes the connection as
            part of the object creation (default: True).

    :param delay_factor_compat: Set send_command and send_command_timing back to using Netmiko
            3.x behavior for delay_factor/global_delay_factor/max_loops. This argument will be
            eliminated in Netmiko 5.x (default: False).

    :param disable_lf_normalization: Disable Netmiko's linefeed normalization behavior
            (default: False)
Expand source code
class LinuxSSH(CiscoSSHConnection):
    prompt_pattern = rf"[{re.escape(LINUX_PROMPT_PRI)}{re.escape(LINUX_PROMPT_ALT)}]"

    def session_preparation(self) -> None:
        """Prepare the session after the connection has been established."""
        self.ansi_escape_codes = True
        self._test_channel_read(pattern=self.prompt_pattern)
        self.set_base_prompt()

    def _enter_shell(self) -> str:
        """Already in shell."""
        return ""

    def _return_cli(self) -> str:
        """The shell is the CLI."""
        return ""

    def disable_paging(self, *args: Any, **kwargs: Any) -> str:
        """Linux doesn't have paging by default."""
        return ""

    def find_prompt(
        self, delay_factor: float = 1.0, pattern: Optional[str] = None
    ) -> str:
        if pattern is None:
            pattern = self.prompt_pattern
        return super().find_prompt(delay_factor=delay_factor, pattern=pattern)

    def set_base_prompt(
        self,
        pri_prompt_terminator: str = LINUX_PROMPT_PRI,
        alt_prompt_terminator: str = LINUX_PROMPT_ALT,
        delay_factor: float = 1.0,
        pattern: Optional[str] = None,
    ) -> str:
        """Determine base prompt."""
        if pattern is None:
            pattern = self.prompt_pattern
        return super().set_base_prompt(
            pri_prompt_terminator=pri_prompt_terminator,
            alt_prompt_terminator=alt_prompt_terminator,
            delay_factor=delay_factor,
            pattern=pattern,
        )

    def send_config_set(
        self,
        config_commands: Union[str, Sequence[str], Iterator[str], TextIO, None] = None,
        exit_config_mode: bool = True,
        **kwargs: Any,
    ) -> str:
        """Can't exit from root (if root)"""
        if self.username == "root":
            exit_config_mode = False
        return super().send_config_set(
            config_commands=config_commands, exit_config_mode=exit_config_mode, **kwargs
        )

    def check_config_mode(
        self,
        check_string: str = LINUX_PROMPT_ROOT,
        pattern: str = "",
        force_regex: bool = False,
    ) -> bool:
        """Verify root"""
        return self.check_enable_mode(check_string=check_string)

    def config_mode(
        self,
        config_command: str = "sudo -s",
        pattern: str = "ssword",
        re_flags: int = re.IGNORECASE,
    ) -> str:
        """Attempt to become root."""
        return self.enable(cmd=config_command, pattern=pattern, re_flags=re_flags)

    def exit_config_mode(self, exit_config: str = "exit", pattern: str = "") -> str:
        return self.exit_enable_mode(exit_command=exit_config)

    def check_enable_mode(self, check_string: str = LINUX_PROMPT_ROOT) -> bool:
        """Verify root"""
        return super().check_enable_mode(check_string=check_string)

    def exit_enable_mode(self, exit_command: str = "exit") -> str:
        """Exit enable mode."""
        output = ""
        if self.check_enable_mode():
            self.write_channel(self.normalize_cmd(exit_command))
            output += self.read_until_pattern(pattern=exit_command)
            output += self.read_until_pattern(pattern=self.prompt_pattern)
            # Nature of prompt might change with the privilege deescalation
            self.set_base_prompt(pattern=self.prompt_pattern)
            if self.check_enable_mode():
                raise ValueError("Failed to exit enable mode.")
        return output

    def enable(
        self,
        cmd: str = "sudo -s",
        pattern: str = "ssword",
        enable_pattern: Optional[str] = None,
        check_state: bool = True,
        re_flags: int = re.IGNORECASE,
    ) -> str:
        """Attempt to become root."""
        msg = """

Netmiko failed to elevate privileges.

Please ensure you pass the sudo password into ConnectHandler
using the 'secret' argument and that the user has sudo
permissions.

"""

        output = ""
        if check_state and self.check_enable_mode():
            return output

        self.write_channel(self.normalize_cmd(cmd))

        # Failed "sudo -s" will put "#" in output so have to delineate further
        root_prompt = rf"(?m:{LINUX_PROMPT_ROOT}\s*$)"
        prompt_or_password = rf"({root_prompt}|{pattern})"
        output += self.read_until_pattern(pattern=prompt_or_password)
        if re.search(pattern, output, flags=re_flags):
            self.write_channel(self.normalize_cmd(self.secret))
            try:
                output += self.read_until_pattern(pattern=root_prompt)
            except ReadTimeout:
                raise ValueError(msg)
        # Nature of prompt might change with the privilege escalation
        self.set_base_prompt(pattern=root_prompt)
        if not self.check_enable_mode():
            raise ValueError(msg)
        return output

    def cleanup(self, command: str = "exit") -> None:
        """Try to Gracefully exit the SSH session."""
        return super().cleanup(command=command)

    def save_config(self, *args: Any, **kwargs: Any) -> str:
        """Not Implemented"""
        raise NotImplementedError

Ancestors

Subclasses

Class variables

var prompt_pattern

Methods

def check_config_mode(self, check_string: str = '#', pattern: str = '', force_regex: bool = False) ‑> bool

Verify root

Expand source code
def check_config_mode(
    self,
    check_string: str = LINUX_PROMPT_ROOT,
    pattern: str = "",
    force_regex: bool = False,
) -> bool:
    """Verify root"""
    return self.check_enable_mode(check_string=check_string)
def check_enable_mode(self, check_string: str = '#') ‑> bool

Verify root

Expand source code
def check_enable_mode(self, check_string: str = LINUX_PROMPT_ROOT) -> bool:
    """Verify root"""
    return super().check_enable_mode(check_string=check_string)
def cleanup(self, command: str = 'exit') ‑> None

Try to Gracefully exit the SSH session.

Expand source code
def cleanup(self, command: str = "exit") -> None:
    """Try to Gracefully exit the SSH session."""
    return super().cleanup(command=command)
def config_mode(self, config_command: str = 'sudo -s', pattern: str = 'ssword', re_flags: int = re.IGNORECASE) ‑> str

Attempt to become root.

Expand source code
def config_mode(
    self,
    config_command: str = "sudo -s",
    pattern: str = "ssword",
    re_flags: int = re.IGNORECASE,
) -> str:
    """Attempt to become root."""
    return self.enable(cmd=config_command, pattern=pattern, re_flags=re_flags)
def disable_paging(self, *args: Any, **kwargs: Any) ‑> str

Linux doesn't have paging by default.

Expand source code
def disable_paging(self, *args: Any, **kwargs: Any) -> str:
    """Linux doesn't have paging by default."""
    return ""
def enable(self, cmd: str = 'sudo -s', pattern: str = 'ssword', enable_pattern: Optional[str] = None, check_state: bool = True, re_flags: int = re.IGNORECASE) ‑> str

Attempt to become root.

Expand source code
    def enable(
        self,
        cmd: str = "sudo -s",
        pattern: str = "ssword",
        enable_pattern: Optional[str] = None,
        check_state: bool = True,
        re_flags: int = re.IGNORECASE,
    ) -> str:
        """Attempt to become root."""
        msg = """

Netmiko failed to elevate privileges.

Please ensure you pass the sudo password into ConnectHandler
using the 'secret' argument and that the user has sudo
permissions.

"""

        output = ""
        if check_state and self.check_enable_mode():
            return output

        self.write_channel(self.normalize_cmd(cmd))

        # Failed "sudo -s" will put "#" in output so have to delineate further
        root_prompt = rf"(?m:{LINUX_PROMPT_ROOT}\s*$)"
        prompt_or_password = rf"({root_prompt}|{pattern})"
        output += self.read_until_pattern(pattern=prompt_or_password)
        if re.search(pattern, output, flags=re_flags):
            self.write_channel(self.normalize_cmd(self.secret))
            try:
                output += self.read_until_pattern(pattern=root_prompt)
            except ReadTimeout:
                raise ValueError(msg)
        # Nature of prompt might change with the privilege escalation
        self.set_base_prompt(pattern=root_prompt)
        if not self.check_enable_mode():
            raise ValueError(msg)
        return output
def exit_enable_mode(self, exit_command: str = 'exit') ‑> str

Exit enable mode.

Expand source code
def exit_enable_mode(self, exit_command: str = "exit") -> str:
    """Exit enable mode."""
    output = ""
    if self.check_enable_mode():
        self.write_channel(self.normalize_cmd(exit_command))
        output += self.read_until_pattern(pattern=exit_command)
        output += self.read_until_pattern(pattern=self.prompt_pattern)
        # Nature of prompt might change with the privilege deescalation
        self.set_base_prompt(pattern=self.prompt_pattern)
        if self.check_enable_mode():
            raise ValueError("Failed to exit enable mode.")
    return output
def save_config(self, *args: Any, **kwargs: Any) ‑> str

Not Implemented

Expand source code
def save_config(self, *args: Any, **kwargs: Any) -> str:
    """Not Implemented"""
    raise NotImplementedError
def send_config_set(self, config_commands: Union[str, Sequence[str], Iterator[str], TextIO, ForwardRef(None)] = None, exit_config_mode: bool = True, **kwargs: Any) ‑> str

Can't exit from root (if root)

Expand source code
def send_config_set(
    self,
    config_commands: Union[str, Sequence[str], Iterator[str], TextIO, None] = None,
    exit_config_mode: bool = True,
    **kwargs: Any,
) -> str:
    """Can't exit from root (if root)"""
    if self.username == "root":
        exit_config_mode = False
    return super().send_config_set(
        config_commands=config_commands, exit_config_mode=exit_config_mode, **kwargs
    )
def session_preparation(self) ‑> None

Prepare the session after the connection has been established.

Expand source code
def session_preparation(self) -> None:
    """Prepare the session after the connection has been established."""
    self.ansi_escape_codes = True
    self._test_channel_read(pattern=self.prompt_pattern)
    self.set_base_prompt()
def set_base_prompt(self, pri_prompt_terminator: str = '$', alt_prompt_terminator: str = '#', delay_factor: float = 1.0, pattern: Optional[str] = None) ‑> str

Determine base prompt.

Expand source code
def set_base_prompt(
    self,
    pri_prompt_terminator: str = LINUX_PROMPT_PRI,
    alt_prompt_terminator: str = LINUX_PROMPT_ALT,
    delay_factor: float = 1.0,
    pattern: Optional[str] = None,
) -> str:
    """Determine base prompt."""
    if pattern is None:
        pattern = self.prompt_pattern
    return super().set_base_prompt(
        pri_prompt_terminator=pri_prompt_terminator,
        alt_prompt_terminator=alt_prompt_terminator,
        delay_factor=delay_factor,
        pattern=pattern,
    )

Inherited members