Package netmiko

Expand source code
import sys

__version__ = "4.3.0"
PY_MAJ_VER = 3
PY_MIN_VER = 8
MIN_PYTHON_VER = "3.8"


# Make sure user is using a valid Python version (for Netmiko)
def check_python_version():  # type: ignore
    python_snake = "\U0001F40D"

    # Use old-school .format() method in case someone tries to use Netmiko with very old Python
    msg = """

Netmiko Version {net_ver} requires Python Version {py_ver} or higher.

""".format(
        net_ver=__version__, py_ver=MIN_PYTHON_VER
    )
    if sys.version_info.major != PY_MAJ_VER:
        raise ValueError(msg)
    elif sys.version_info.minor < PY_MIN_VER:
        # Why not :-)
        msg = msg.rstrip() + " {snake}\n\n".format(snake=python_snake)
        raise ValueError(msg)


check_python_version()  # type: ignore


import logging  # noqa


# Logging configuration
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())


from netmiko.ssh_dispatcher import ConnectHandler  # noqa
from netmiko.ssh_dispatcher import TelnetFallback  # noqa
from netmiko.ssh_dispatcher import ConnLogOnly  # noqa
from netmiko.ssh_dispatcher import ConnUnify  # noqa
from netmiko.ssh_dispatcher import ssh_dispatcher  # noqa
from netmiko.ssh_dispatcher import redispatch  # noqa
from netmiko.ssh_dispatcher import platforms  # noqa
from netmiko.ssh_dispatcher import FileTransfer  # noqa
from netmiko.scp_handler import SCPConn  # noqa
from netmiko.cisco.cisco_ios import InLineTransfer  # noqa
from netmiko.exceptions import (  # noqa
    NetmikoTimeoutException,
    NetMikoTimeoutException,
)
from netmiko.exceptions import (  # noqa
    NetmikoAuthenticationException,
    NetMikoAuthenticationException,
)
from netmiko.exceptions import ConfigInvalidException  # noqa
from netmiko.exceptions import ReadException, ReadTimeout  # noqa
from netmiko.exceptions import NetmikoBaseException, ConnectionException  # noqa
from netmiko.ssh_autodetect import SSHDetect  # noqa
from netmiko.base_connection import BaseConnection  # noqa
from netmiko.scp_functions import file_transfer, progress_bar  # noqa

# Alternate naming
Netmiko = ConnectHandler

__all__ = (
    "ConnectHandler",
    "AgnosticHandler",
    "ConnLogOnly",
    "ConnUnify",
    "ssh_dispatcher",
    "platforms",
    "SCPConn",
    "FileTransfer",
    "NetmikoBaseException",
    "ConnectionException",
    "NetmikoTimeoutException",
    "NetMikoTimeoutException",
    "ConfigInvalidException",
    "ReadException",
    "ReadTimeout",
    "NetmikoAuthenticationException",
    "NetMikoAuthenticationException",
    "InLineTransfer",
    "redispatch",
    "SSHDetect",
    "BaseConnection",
    "Netmiko",
    "file_transfer",
    "progress_bar",
)

# Cisco cntl-shift-six sequence
CNTL_SHIFT_6 = chr(30)

Sub-modules

netmiko.a10
netmiko.accedian
netmiko.adtran
netmiko.adva

Adva Device Drivers

netmiko.alcatel
netmiko.allied_telesis
netmiko.apresia
netmiko.arista
netmiko.arris
netmiko.aruba
netmiko.audiocode
netmiko.base_connection

Base connection class for netmiko …

netmiko.broadcom
netmiko.brocade
netmiko.calix
netmiko.casa
netmiko.cdot
netmiko.centec
netmiko.channel
netmiko.checkpoint
netmiko.ciena
netmiko.cisco
netmiko.cisco_base_connection

CiscoBaseConnection is netmiko SSH class for Cisco and Cisco-like platforms.

netmiko.citrix
netmiko.cli_tools
netmiko.cloudgenix
netmiko.coriant
netmiko.dell
netmiko.digi
netmiko.dlink
netmiko.eltex
netmiko.endace
netmiko.enterasys
netmiko.ericsson
netmiko.exceptions
netmiko.extreme
netmiko.f5
netmiko.fiberstore
netmiko.flexvnf
netmiko.fortinet
netmiko.hillstone
netmiko.hp
netmiko.huawei
netmiko.ipinfusion
netmiko.juniper
netmiko.keymile
netmiko.linux
netmiko.maipu
netmiko.mellanox
netmiko.mikrotik
netmiko.mrv
netmiko.netapp
netmiko.netgear
netmiko.netmiko_globals
netmiko.no_config
netmiko.no_enable
netmiko.nokia
netmiko.oneaccess
netmiko.ovs
netmiko.paloalto
netmiko.pluribus
netmiko.quanta
netmiko.rad
netmiko.raisecom
netmiko.ruckus
netmiko.ruijie
netmiko.scp_functions

Netmiko SCP operations …

netmiko.scp_handler

Netmiko SCP operations …

netmiko.session_log
netmiko.sixwind
netmiko.snmp_autodetect

This module is used to auto-detect the type of a device in order to automatically create a Netmiko connection …

netmiko.sophos
netmiko.ssh_auth
netmiko.ssh_autodetect

The ssh_autodetect module is used to auto-detect the netmiko device_type to use to further initiate a new SSH connection with a remote host. This …

netmiko.supermicro
netmiko.teldat
netmiko.telnet_proxy
netmiko.terminal_server
netmiko.tplink
netmiko.ubiquiti
netmiko.utilities

Miscellaneous utility functions.

netmiko.vyos
netmiko.watchguard
netmiko.yamaha
netmiko.zte
netmiko.zyxel

Functions

def AgnosticHandler(*args: Any, **kwargs: Any) ‑> BaseConnection

Factory function selects the proper class and creates object based on device_type.

Expand source code
def ConnectHandler(*args: Any, **kwargs: Any) -> "BaseConnection":
    """Factory function selects the proper class and creates object based on device_type."""
    device_type = kwargs["device_type"]
    if device_type not in platforms:
        if device_type is None:
            msg_str = platforms_str
        else:
            msg_str = telnet_platforms_str if "telnet" in device_type else platforms_str
        raise ValueError(
            "Unsupported 'device_type' "
            "currently supported platforms are: {}".format(msg_str)
        )
    ConnectionClass = ssh_dispatcher(device_type)
    return ConnectionClass(*args, **kwargs)
def ConnLogOnly(log_file: str = 'netmiko.log', log_level: Optional[int] = None, log_format: Optional[str] = None, **kwargs: Any) ‑> Optional[BaseConnection]

Dispatcher function that will return either: netmiko_object or None

Excluding errors in logging configuration should never generate an exception all errors should be logged.

Expand source code
def ConnLogOnly(
    log_file: str = "netmiko.log",
    log_level: Optional[int] = None,
    log_format: Optional[str] = None,
    **kwargs: Any,
) -> Optional["BaseConnection"]:
    """
    Dispatcher function that will return either: netmiko_object or None

    Excluding errors in logging configuration should never generate an exception
    all errors should be logged.
    """

    import logging

    if log_level is None:
        log_level = logging.ERROR
    if log_format is None:
        log_format = "%(asctime)s %(levelname)s %(name)s %(message)s"

    logging.basicConfig(filename=log_file, level=log_level, format=log_format)
    logger = logging.getLogger(__name__)

    try:
        kwargs["auto_connect"] = False
        net_connect = ConnectHandler(**kwargs)
        hostname = net_connect.host
        port = net_connect.port
        device_type = net_connect.device_type

        net_connect._open()
        msg = f"Netmiko connection succesful to {hostname}:{port}"
        logger.info(msg)
        return net_connect
    except NetmikoAuthenticationException as e:
        msg = (
            f"Authentication failure to: {hostname}:{port} ({device_type})\n\n{str(e)}"
        )
        logger.error(msg)
        return None
    except NetmikoTimeoutException as e:
        if "DNS failure" in str(e):
            msg = f"Device failed due to a DNS failure, hostname {hostname}"
        elif "TCP connection to device failed" in str(e):
            msg = f"Netmiko was unable to reach the provided host and port: {hostname}:{port}"
            msg += f"\n\n{str(e)}"
        else:
            msg = f"An unknown NetmikoTimeoutException occurred:\n\n{str(e)}"
        logger.error(msg)
        return None
    except Exception as e:
        msg = f"An unknown exception occurred during connection:\n\n{str(e)}"
        logger.error(msg)
        return None
def ConnUnify(**kwargs: Any) ‑> BaseConnection
Expand source code
def ConnUnify(
    **kwargs: Any,
) -> "BaseConnection":
    try:
        kwargs["auto_connect"] = False
        net_connect = ConnectHandler(**kwargs)
        hostname = net_connect.host
        port = net_connect.port
        device_type = net_connect.device_type
        general_msg = f"Connection failure to {hostname}:{port} ({device_type})\n\n"

        net_connect._open()
        return net_connect
    except NetmikoAuthenticationException as e:
        msg = general_msg + str(e)
        raise ConnectionException(msg)
    except NetmikoTimeoutException as e:
        msg = general_msg + str(e)
        raise ConnectionException(msg)
    except Exception as e:
        msg = f"An unknown exception occurred during connection:\n\n{str(e)}"
        raise ConnectionException(msg)
def ConnectHandler(*args: Any, **kwargs: Any) ‑> BaseConnection

Factory function selects the proper class and creates object based on device_type.

Expand source code
def ConnectHandler(*args: Any, **kwargs: Any) -> "BaseConnection":
    """Factory function selects the proper class and creates object based on device_type."""
    device_type = kwargs["device_type"]
    if device_type not in platforms:
        if device_type is None:
            msg_str = platforms_str
        else:
            msg_str = telnet_platforms_str if "telnet" in device_type else platforms_str
        raise ValueError(
            "Unsupported 'device_type' "
            "currently supported platforms are: {}".format(msg_str)
        )
    ConnectionClass = ssh_dispatcher(device_type)
    return ConnectionClass(*args, **kwargs)
def FileTransfer(*args: Any, **kwargs: Any) ‑> BaseFileTransfer

Factory function selects the proper SCP class and creates object based on device_type.

Expand source code
def FileTransfer(*args: Any, **kwargs: Any) -> "BaseFileTransfer":
    """Factory function selects the proper SCP class and creates object based on device_type."""
    if len(args) >= 1:
        device_type = args[0].device_type
    else:
        device_type = kwargs["ssh_conn"].device_type
    if device_type not in scp_platforms:
        raise ValueError(
            "Unsupported SCP device_type: "
            "currently supported platforms are: {}".format(scp_platforms_str)
        )
    FileTransferClass: Type["BaseFileTransfer"]
    FileTransferClass = FILE_TRANSFER_MAP[device_type]
    return FileTransferClass(*args, **kwargs)
def Netmiko(*args: Any, **kwargs: Any) ‑> BaseConnection

Factory function selects the proper class and creates object based on device_type.

Expand source code
def ConnectHandler(*args: Any, **kwargs: Any) -> "BaseConnection":
    """Factory function selects the proper class and creates object based on device_type."""
    device_type = kwargs["device_type"]
    if device_type not in platforms:
        if device_type is None:
            msg_str = platforms_str
        else:
            msg_str = telnet_platforms_str if "telnet" in device_type else platforms_str
        raise ValueError(
            "Unsupported 'device_type' "
            "currently supported platforms are: {}".format(msg_str)
        )
    ConnectionClass = ssh_dispatcher(device_type)
    return ConnectionClass(*args, **kwargs)
def file_transfer(ssh_conn: BaseConnection, source_file: str, dest_file: str, file_system: Optional[str] = None, direction: str = 'put', disable_md5: bool = False, inline_transfer: bool = False, overwrite_file: bool = False, socket_timeout: float = 10.0, progress: Optional[Callable[..., Any]] = None, progress4: Optional[Callable[..., Any]] = None, verify_file: Optional[bool] = None) ‑> Dict[str, bool]

Use Secure Copy or Inline (IOS-only) to transfer files to/from network devices.

inline_transfer ONLY SUPPORTS TEXT FILES and will not support binary file transfers.

return { 'file_exists': boolean, 'file_transferred': boolean, 'file_verified': boolean, }

Expand source code
def file_transfer(
    ssh_conn: "BaseConnection",
    source_file: str,
    dest_file: str,
    file_system: Optional[str] = None,
    direction: str = "put",
    disable_md5: bool = False,
    inline_transfer: bool = False,
    overwrite_file: bool = False,
    socket_timeout: float = 10.0,
    progress: Optional[Callable[..., Any]] = None,
    progress4: Optional[Callable[..., Any]] = None,
    verify_file: Optional[bool] = None,
) -> Dict[str, bool]:
    """Use Secure Copy or Inline (IOS-only) to transfer files to/from network devices.

    inline_transfer ONLY SUPPORTS TEXT FILES and will not support binary file transfers.

    return {
        'file_exists': boolean,
        'file_transferred': boolean,
        'file_verified': boolean,
    }
    """
    transferred_and_verified = {
        "file_exists": True,
        "file_transferred": True,
        "file_verified": True,
    }
    transferred_and_notverified = {
        "file_exists": True,
        "file_transferred": True,
        "file_verified": False,
    }
    nottransferred_but_verified = {
        "file_exists": True,
        "file_transferred": False,
        "file_verified": True,
    }

    if "cisco_ios" in ssh_conn.device_type or "cisco_xe" in ssh_conn.device_type:
        cisco_ios = True
    else:
        cisco_ios = False
    if not cisco_ios and inline_transfer:
        raise ValueError("Inline Transfer only supported for Cisco IOS/Cisco IOS-XE")

    # Replace disable_md5 argument with verify_file argument across time
    if verify_file is None:
        verify_file = not disable_md5

    scp_args = {
        "ssh_conn": ssh_conn,
        "source_file": source_file,
        "dest_file": dest_file,
        "direction": direction,
        "socket_timeout": socket_timeout,
        "progress": progress,
        "progress4": progress4,
    }
    if file_system is not None:
        scp_args["file_system"] = file_system

    TransferClass: Callable[..., BaseFileTransfer]
    if inline_transfer:
        TransferClass = InLineTransfer
    else:
        TransferClass = FileTransfer

    with TransferClass(**scp_args) as scp_transfer:
        if scp_transfer.check_file_exists():
            if overwrite_file:
                if verify_file:
                    if scp_transfer.verify_file():
                        return nottransferred_but_verified
                    else:
                        # File exists, you can overwrite it, MD5 is wrong (transfer file)
                        verifyspace_and_transferfile(scp_transfer)
                        if scp_transfer.verify_file():
                            return transferred_and_verified
                        else:
                            raise ValueError(
                                "MD5 failure between source and destination files"
                            )
                else:
                    # File exists, you can overwrite it, but MD5 not allowed (transfer file)
                    verifyspace_and_transferfile(scp_transfer)
                    return transferred_and_notverified
            else:
                # File exists, but you can't overwrite it.
                if verify_file:
                    if scp_transfer.verify_file():
                        return nottransferred_but_verified
                msg = "File already exists and overwrite_file is disabled"
                raise ValueError(msg)
        else:
            verifyspace_and_transferfile(scp_transfer)
            # File doesn't exist
            if verify_file:
                if scp_transfer.verify_file():
                    return transferred_and_verified
                else:
                    raise ValueError("MD5 failure between source and destination files")
            else:
                return transferred_and_notverified
def progress_bar(filename: ~AnyStr, size: int, sent: int, peername: Optional[str] = None) ‑> None
Expand source code
def progress_bar(
    filename: AnyStr, size: int, sent: int, peername: Optional[str] = None
) -> None:
    max_width = 50
    if isinstance(filename, bytes):
        filename_str = filename.decode()
    else:
        filename_str = filename
    clear_screen = chr(27) + "[2J"
    terminating_char = "|"

    # Percentage done
    percent_complete = sent / size
    percent_str = f"{percent_complete*100:.2f}%"
    hash_count = int(percent_complete * max_width)
    progress = hash_count * ">"

    if peername is None:
        header_msg = f"Transferring file: {filename_str}\n"
    else:
        header_msg = f"Transferring file to {peername}: {filename_str}\n"

    msg = f"{progress:<50}{terminating_char:1} ({percent_str})"
    print(clear_screen)
    print(header_msg)
    print(msg)
def redispatch(obj: BaseConnection, device_type: str, session_prep: bool = True)

Dynamically change Netmiko object's class to proper class. Generally used with terminal_server device_type when you need to redispatch after interacting with terminal server.

Expand source code
def redispatch(
    obj: "BaseConnection", device_type: str, session_prep: bool = True
) -> None:
    """Dynamically change Netmiko object's class to proper class.
    Generally used with terminal_server device_type when you need to redispatch after interacting
    with terminal server.
    """
    new_class = ssh_dispatcher(device_type)
    obj.device_type = device_type
    obj.__class__ = new_class
    if session_prep:
        obj._try_session_preparation()
def ssh_dispatcher(device_type: str) ‑> Type[BaseConnection]

Select the class to be instantiated based on vendor/platform.

Expand source code
def ssh_dispatcher(device_type: str) -> Type["BaseConnection"]:
    """Select the class to be instantiated based on vendor/platform."""
    return CLASS_MAPPER[device_type]

Classes

class BaseConnection (ip: str = '', host: str = '', username: str = '', password: Optional[str] = None, secret: str = '', port: Optional[int] = None, device_type: str = '', verbose: bool = False, global_delay_factor: float = 1.0, global_cmd_verify: Optional[bool] = None, use_keys: bool = False, key_file: Optional[str] = None, pkey: Optional[paramiko.pkey.PKey] = None, passphrase: Optional[str] = None, disabled_algorithms: Optional[Dict[str, Any]] = None, disable_sha2_fix: bool = False, allow_agent: bool = False, ssh_strict: bool = False, system_host_keys: bool = False, alt_host_keys: bool = False, alt_key_file: str = '', ssh_config_file: Optional[str] = None, conn_timeout: int = 10, auth_timeout: Optional[int] = None, banner_timeout: int = 15, blocking_timeout: int = 20, timeout: int = 100, session_timeout: int = 60, read_timeout_override: Optional[float] = None, keepalive: int = 0, default_enter: Optional[str] = None, response_return: Optional[str] = None, serial_settings: Optional[Dict[str, Any]] = None, fast_cli: bool = True, session_log: Optional[SessionLog] = None, session_log_record_writes: bool = False, session_log_file_mode: str = 'write', allow_auto_change: bool = False, encoding: str = 'utf-8', sock: Optional[socket.socket] = None, sock_telnet: Optional[Dict[str, Any]] = None, auto_connect: bool = True, delay_factor_compat: bool = False, disable_lf_normalization: bool = False)

Defines vendor independent methods.

Otherwise method left as a stub method.

    Initialize attributes for establishing connection to target device.

    :param ip: IP address of target device. Not required if <code>host</code> is
        provided.

    :param host: Hostname of target device. Not required if <code>ip</code> is
            provided.

    :param username: Username to authenticate against target device if
            required.

    :param password: Password to authenticate against target device if
            required.

    :param secret: The enable password if target device requires one.

    :param port: The destination port used to connect to the target
            device.

    :param device_type: Class selection based on device type.

    :param verbose: Enable additional messages to standard output.

    :param global_delay_factor: Multiplication factor affecting Netmiko delays (default: 1).

    :param use_keys: Connect to target device using SSH keys.

    :param key_file: Filename path of the SSH key file to use.

    :param pkey: SSH key object to use.

    :param passphrase: Passphrase to use for encrypted key; password will be used for key
            decryption if not specified.

    :param disabled_algorithms: Dictionary of SSH algorithms to disable. Refer to the Paramiko
            documentation for a description of the expected format.

    :param disable_sha2_fix: Boolean that fixes Paramiko issue with missing server-sig-algs
        <https://github.com/paramiko/paramiko/issues/1961> (default: False)

    :param allow_agent: Enable use of SSH key-agent.

    :param ssh_strict: Automatically reject unknown SSH host keys (default: False, which
            means unknown SSH host keys will be accepted).

    :param system_host_keys: Load host keys from the users known_hosts file.

    :param alt_host_keys: If <code>True</code> host keys will be loaded from the file specified in
            alt_key_file.

    :param alt_key_file: SSH host key file to use (if alt_host_keys=True).

    :param ssh_config_file: File name of OpenSSH configuration file.

    :param conn_timeout: TCP connection timeout.

    :param session_timeout: Set a timeout for parallel requests.

    :param auth_timeout: Set a timeout (in seconds) to wait for an authentication response.

    :param banner_timeout: Set a timeout to wait for the SSH banner (pass to Paramiko).

    :param read_timeout_override: Set a timeout that will override the default read_timeout
            of both send_command and send_command_timing. This is useful for 3rd party
            libraries where directly accessing method arguments might be impractical.

    :param keepalive: Send SSH keepalive packets at a specific interval, in seconds.
            Currently defaults to 0, for backwards compatibility (it will not attempt
            to keep the connection alive).

    :param default_enter: Character(s) to send to correspond to enter key (default:

).

    :param response_return: Character(s) to use in normalized return data to represent
            enter key (default:

)

    :param serial_settings: Dictionary of settings for use with serial port (pySerial).

    :param fast_cli: Provide a way to optimize for performance. Converts select_delay_factor
            to select smallest of global and specific. Sets default global_delay_factor to .1
            (default: True)

    :param session_log: File path, SessionLog object, or BufferedIOBase subclass object
            to write the session log to.

    :param session_log_record_writes: The session log generally only records channel reads due
            to eliminate command duplication due to command echo. You can enable this if you
            want to record both channel reads and channel writes in the log (default: False).

    :param session_log_file_mode: "write" or "append" for session_log file mode
            (default: "write")

    :param allow_auto_change: Allow automatic configuration changes for terminal settings.
            (default: False)

    :param encoding: Encoding to be used when writing bytes to the output channel.
            (default: "utf-8")

    :param sock: An open socket or socket-like object (such as a <code>.Channel</code>) to use for
            communication to the target host (default: None).

    :param sock_telnet: A dictionary of telnet socket parameters (SOCKS proxy). See
            telnet_proxy.py code for details.

    :param global_cmd_verify: Control whether command echo verification is enabled or disabled
            (default: None). Global attribute takes precedence over function <code>cmd\_verify</code>
            argument. Value of <code>None</code> indicates to use function <code>cmd\_verify</code> argument.

    :param auto_connect: Control whether Netmiko automatically establishes the connection as
            part of the object creation (default: True).

    :param delay_factor_compat: Set send_command and send_command_timing back to using Netmiko
            3.x behavior for delay_factor/global_delay_factor/max_loops. This argument will be
            eliminated in Netmiko 5.x (default: False).

    :param disable_lf_normalization: Disable Netmiko's linefeed normalization behavior
            (default: False)
Expand source code
class BaseConnection:
    """
    Defines vendor independent methods.

    Otherwise method left as a stub method.
    """

    def __init__(
        self,
        ip: str = "",
        host: str = "",
        username: str = "",
        password: Optional[str] = None,
        secret: str = "",
        port: Optional[int] = None,
        device_type: str = "",
        verbose: bool = False,
        global_delay_factor: float = 1.0,
        global_cmd_verify: Optional[bool] = None,
        use_keys: bool = False,
        key_file: Optional[str] = None,
        pkey: Optional[paramiko.PKey] = None,
        passphrase: Optional[str] = None,
        disabled_algorithms: Optional[Dict[str, Any]] = None,
        disable_sha2_fix: bool = False,
        allow_agent: bool = False,
        ssh_strict: bool = False,
        system_host_keys: bool = False,
        alt_host_keys: bool = False,
        alt_key_file: str = "",
        ssh_config_file: Optional[str] = None,
        #
        # Connect timeouts
        # ssh-connect --> TCP conn (conn_timeout) --> SSH-banner (banner_timeout)
        #       --> Auth response (auth_timeout)
        conn_timeout: int = 10,
        # Timeout to wait for authentication response
        auth_timeout: Optional[int] = None,
        banner_timeout: int = 15,  # Timeout to wait for the banner to be presented
        # Other timeouts
        blocking_timeout: int = 20,  # Read blocking timeout
        timeout: int = 100,  # TCP connect timeout | overloaded to read-loop timeout
        session_timeout: int = 60,  # Used for locking/sharing the connection
        read_timeout_override: Optional[float] = None,
        keepalive: int = 0,
        default_enter: Optional[str] = None,
        response_return: Optional[str] = None,
        serial_settings: Optional[Dict[str, Any]] = None,
        fast_cli: bool = True,
        _legacy_mode: bool = False,
        session_log: Optional[SessionLog] = None,
        session_log_record_writes: bool = False,
        session_log_file_mode: str = "write",
        allow_auto_change: bool = False,
        encoding: str = "utf-8",
        sock: Optional[socket.socket] = None,
        sock_telnet: Optional[Dict[str, Any]] = None,
        auto_connect: bool = True,
        delay_factor_compat: bool = False,
        disable_lf_normalization: bool = False,
    ) -> None:
        """
        Initialize attributes for establishing connection to target device.

        :param ip: IP address of target device. Not required if `host` is
            provided.

        :param host: Hostname of target device. Not required if `ip` is
                provided.

        :param username: Username to authenticate against target device if
                required.

        :param password: Password to authenticate against target device if
                required.

        :param secret: The enable password if target device requires one.

        :param port: The destination port used to connect to the target
                device.

        :param device_type: Class selection based on device type.

        :param verbose: Enable additional messages to standard output.

        :param global_delay_factor: Multiplication factor affecting Netmiko delays (default: 1).

        :param use_keys: Connect to target device using SSH keys.

        :param key_file: Filename path of the SSH key file to use.

        :param pkey: SSH key object to use.

        :param passphrase: Passphrase to use for encrypted key; password will be used for key
                decryption if not specified.

        :param disabled_algorithms: Dictionary of SSH algorithms to disable. Refer to the Paramiko
                documentation for a description of the expected format.

        :param disable_sha2_fix: Boolean that fixes Paramiko issue with missing server-sig-algs
            https://github.com/paramiko/paramiko/issues/1961 (default: False)

        :param allow_agent: Enable use of SSH key-agent.

        :param ssh_strict: Automatically reject unknown SSH host keys (default: False, which
                means unknown SSH host keys will be accepted).

        :param system_host_keys: Load host keys from the users known_hosts file.

        :param alt_host_keys: If `True` host keys will be loaded from the file specified in
                alt_key_file.

        :param alt_key_file: SSH host key file to use (if alt_host_keys=True).

        :param ssh_config_file: File name of OpenSSH configuration file.

        :param conn_timeout: TCP connection timeout.

        :param session_timeout: Set a timeout for parallel requests.

        :param auth_timeout: Set a timeout (in seconds) to wait for an authentication response.

        :param banner_timeout: Set a timeout to wait for the SSH banner (pass to Paramiko).

        :param read_timeout_override: Set a timeout that will override the default read_timeout
                of both send_command and send_command_timing. This is useful for 3rd party
                libraries where directly accessing method arguments might be impractical.

        :param keepalive: Send SSH keepalive packets at a specific interval, in seconds.
                Currently defaults to 0, for backwards compatibility (it will not attempt
                to keep the connection alive).

        :param default_enter: Character(s) to send to correspond to enter key (default: \n).

        :param response_return: Character(s) to use in normalized return data to represent
                enter key (default: \n)

        :param serial_settings: Dictionary of settings for use with serial port (pySerial).

        :param fast_cli: Provide a way to optimize for performance. Converts select_delay_factor
                to select smallest of global and specific. Sets default global_delay_factor to .1
                (default: True)

        :param session_log: File path, SessionLog object, or BufferedIOBase subclass object
                to write the session log to.

        :param session_log_record_writes: The session log generally only records channel reads due
                to eliminate command duplication due to command echo. You can enable this if you
                want to record both channel reads and channel writes in the log (default: False).

        :param session_log_file_mode: "write" or "append" for session_log file mode
                (default: "write")

        :param allow_auto_change: Allow automatic configuration changes for terminal settings.
                (default: False)

        :param encoding: Encoding to be used when writing bytes to the output channel.
                (default: "utf-8")

        :param sock: An open socket or socket-like object (such as a `.Channel`) to use for
                communication to the target host (default: None).

        :param sock_telnet: A dictionary of telnet socket parameters (SOCKS proxy). See
                telnet_proxy.py code for details.

        :param global_cmd_verify: Control whether command echo verification is enabled or disabled
                (default: None). Global attribute takes precedence over function `cmd_verify`
                argument. Value of `None` indicates to use function `cmd_verify` argument.

        :param auto_connect: Control whether Netmiko automatically establishes the connection as
                part of the object creation (default: True).

        :param delay_factor_compat: Set send_command and send_command_timing back to using Netmiko
                3.x behavior for delay_factor/global_delay_factor/max_loops. This argument will be
                eliminated in Netmiko 5.x (default: False).

        :param disable_lf_normalization: Disable Netmiko's linefeed normalization behavior
                (default: False)
        """

        self.remote_conn: Union[
            None, telnetlib.Telnet, paramiko.Channel, serial.Serial
        ] = None
        # Does the platform support a configuration mode
        self._config_mode = True
        self._read_buffer = ""
        self.delay_factor_compat = delay_factor_compat

        self.TELNET_RETURN = "\r\n"
        if default_enter is None:
            if "telnet" not in device_type:
                self.RETURN = "\n"
            else:
                self.RETURN = self.TELNET_RETURN
        else:
            self.RETURN = default_enter

        # Line Separator in response lines
        self.RESPONSE_RETURN = "\n" if response_return is None else response_return
        self.disable_lf_normalization = True if disable_lf_normalization else False

        if ip:
            self.host = ip.strip()
        elif host:
            self.host = host.strip()
        if not ip and not host and "serial" not in device_type:
            raise ValueError("Either ip or host must be set")
        if port is None:
            if "telnet" in device_type:
                port = 23
            else:
                port = 22
        self.port = int(port)

        self.username = username
        self.password = password
        self.secret = secret
        self.device_type = device_type
        self.ansi_escape_codes = False
        self.verbose = verbose
        self.auth_timeout = auth_timeout
        self.banner_timeout = banner_timeout
        self.blocking_timeout = blocking_timeout
        self.conn_timeout = conn_timeout
        self.session_timeout = session_timeout
        self.timeout = timeout
        self.read_timeout_override = read_timeout_override
        self.keepalive = keepalive
        self.allow_auto_change = allow_auto_change
        self.encoding = encoding
        self.sock = sock
        self.sock_telnet = sock_telnet
        self.fast_cli = fast_cli
        self._legacy_mode = _legacy_mode
        self.global_delay_factor = global_delay_factor
        self.global_cmd_verify = global_cmd_verify
        if self.fast_cli and self.global_delay_factor == 1:
            self.global_delay_factor = 0.1
        self.session_log = None
        self._session_log_close = False

        # prevent logging secret data
        no_log = {}
        if self.password:
            no_log["password"] = self.password
        if self.secret:
            no_log["secret"] = self.secret
        # Always sanitize username and password
        log.addFilter(SecretsFilter(no_log=no_log))

        # Netmiko will close the session_log if we open the file
        if session_log is not None:
            if isinstance(session_log, str):
                # If session_log is a string, open a file corresponding to string name.
                self.session_log = SessionLog(
                    file_name=session_log,
                    file_mode=session_log_file_mode,
                    no_log=no_log,
                    record_writes=session_log_record_writes,
                )
                self.session_log.open()
            elif isinstance(session_log, io.BufferedIOBase):
                # In-memory buffer or an already open file handle
                self.session_log = SessionLog(
                    buffered_io=session_log,
                    no_log=no_log,
                    record_writes=session_log_record_writes,
                )
            elif isinstance(session_log, SessionLog):
                # SessionLog object
                self.session_log = session_log
                self.session_log.open()
            else:
                raise ValueError(
                    "session_log must be a path to a file, a file handle, "
                    "SessionLog object, or a BufferedIOBase subclass."
                )

        # Default values
        self.serial_settings = {
            "port": "COM1",
            "baudrate": 9600,
            "bytesize": serial.EIGHTBITS,
            "parity": serial.PARITY_NONE,
            "stopbits": serial.STOPBITS_ONE,
        }
        if serial_settings is None:
            serial_settings = {}
        self.serial_settings.update(serial_settings)

        if "serial" in device_type:
            self.host = "serial"
            comm_port = self.serial_settings.pop("port")
            # Get the proper comm port reference if a name was enterred
            comm_port = check_serial_port(comm_port)
            self.serial_settings.update({"port": comm_port})

        # set in set_base_prompt method
        self.base_prompt = ""
        self._session_locker = Lock()

        # determine if telnet or SSH
        if "_telnet" in device_type:
            self.protocol = "telnet"
            self.password = password or ""
        elif "_serial" in device_type:
            self.protocol = "serial"
            self.password = password or ""
        else:
            self.protocol = "ssh"

            self.key_policy: paramiko.client.MissingHostKeyPolicy
            if not ssh_strict:
                self.key_policy = paramiko.AutoAddPolicy()
            else:
                self.key_policy = paramiko.RejectPolicy()

            # Options for SSH host_keys
            self.use_keys = use_keys
            self.key_file = (
                path.abspath(path.expanduser(key_file)) if key_file else None
            )
            if self.use_keys is True:
                self._key_check()
            self.pkey = pkey
            self.passphrase = passphrase
            self.allow_agent = allow_agent
            self.system_host_keys = system_host_keys
            self.alt_host_keys = alt_host_keys
            self.alt_key_file = alt_key_file

            if disabled_algorithms:
                self.disabled_algorithms = disabled_algorithms
            else:
                self.disabled_algorithms = (
                    {"pubkeys": ["rsa-sha2-256", "rsa-sha2-512"]}
                    if disable_sha2_fix
                    else {}
                )

            # For SSH proxy support
            self.ssh_config_file = ssh_config_file

        # Establish the remote connection
        if auto_connect:
            self._open()

    def _open(self) -> None:
        """Decouple connection creation from __init__ for mocking."""
        self._modify_connection_params()
        self.establish_connection()
        self._try_session_preparation()

    def __enter__(self) -> "BaseConnection":
        """Establish a session using a Context Manager."""
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        """Gracefully close connection on Context Manager exit."""
        self.disconnect()

    def _modify_connection_params(self) -> None:
        """Modify connection parameters prior to SSH connection."""
        pass

    def _timeout_exceeded(self, start: float, msg: str = "Timeout exceeded!") -> bool:
        """Raise NetmikoTimeoutException if waiting too much in the serving queue.

        :param start: Initial start time to see if session lock timeout has been exceeded
        :type start: float (from time.time() call i.e. epoch time)

        :param msg: Exception message if timeout was exceeded
        :type msg: str
        """
        if not start:
            # Must provide a comparison time
            return False
        if time.time() - start > self.session_timeout:
            # session_timeout exceeded
            raise NetmikoTimeoutException(msg)
        return False

    def _lock_netmiko_session(self, start: Optional[float] = None) -> bool:
        """Try to acquire the Netmiko session lock. If not available, wait in the queue until
        the channel is available again.

        :param start: Initial start time to measure the session timeout
        :type start: float (from time.time() call i.e. epoch time)
        """
        if not start:
            start = time.time()
        # Wait here until the SSH channel lock is acquired or until session_timeout exceeded
        while not self._session_locker.acquire(False) and not self._timeout_exceeded(
            start, "The netmiko channel is not available!"
        ):
            time.sleep(0.1)
        return True

    def _unlock_netmiko_session(self) -> None:
        """
        Release the channel at the end of the task.
        """
        if self._session_locker.locked():
            self._session_locker.release()

    def _autodetect_fs(self, cmd: str = "", pattern: str = "") -> str:
        raise NotImplementedError

    def _enter_shell(self) -> str:
        raise NotImplementedError

    def _return_cli(self) -> str:
        raise NotImplementedError

    def _key_check(self) -> bool:
        """Verify key_file exists."""
        msg = f"""
use_keys has been set to True, but specified key_file does not exist:

use_keys: {self.use_keys}
key_file: {self.key_file}
"""
        if self.key_file is None:
            raise ValueError(msg)

        my_key_file = Path(self.key_file)
        if not my_key_file.is_file():
            raise ValueError(msg)
        return True

    @lock_channel
    @log_writes
    def write_channel(self, out_data: str) -> None:
        """Generic method that will write data out the channel.

        :param out_data: data to be written to the channel
        :type out_data: str
        """
        self.channel.write_channel(out_data)

    def is_alive(self) -> bool:
        """Returns a boolean flag with the state of the connection."""
        null = chr(0)
        if self.remote_conn is None:
            log.error("Connection is not initialised, is_alive returns False")
            return False
        if self.protocol == "telnet":
            try:
                # Try sending IAC + NOP (IAC is telnet way of sending command)
                # IAC = Interpret as Command; it comes before the NOP.
                log.debug("Sending IAC + NOP")
                # Need to send multiple times to test connection
                assert isinstance(self.remote_conn, telnetlib.Telnet)
                telnet_socket = self.remote_conn.get_socket()
                telnet_socket.sendall(telnetlib.IAC + telnetlib.NOP)
                telnet_socket.sendall(telnetlib.IAC + telnetlib.NOP)
                telnet_socket.sendall(telnetlib.IAC + telnetlib.NOP)
                return True
            except AttributeError:
                return False
        else:
            # SSH
            try:
                # Try sending ASCII null byte to maintain the connection alive
                log.debug("Sending the NULL byte")
                self.write_channel(null)
                assert isinstance(self.remote_conn, paramiko.Channel)
                assert self.remote_conn.transport is not None
                result = self.remote_conn.transport.is_active()
                assert isinstance(result, bool)
                return result
            except (socket.error, EOFError):
                log.error("Unable to send", exc_info=True)
                # If unable to send, we can tell for sure that the connection is unusable
                return False
        return False

    @lock_channel
    def read_channel(self) -> str:
        """Generic handler that will read all the data from given channel."""
        new_data = self.channel.read_channel()

        if self.disable_lf_normalization is False:
            start = time.time()
            # Data blocks shouldn't end in '\r' (can cause problems with normalize_linefeeds)
            # Only do the extra read if '\n' exists in the output
            # this avoids devices that only use \r.
            while ("\n" in new_data) and (time.time() - start < 1.0):
                if new_data[-1] == "\r":
                    time.sleep(0.01)
                    new_data += self.channel.read_channel()
                else:
                    break
            new_data = self.normalize_linefeeds(new_data)

        if self.ansi_escape_codes:
            new_data = self.strip_ansi_escape_codes(new_data)
        log.debug(f"read_channel: {new_data}")
        if self.session_log:
            self.session_log.write(new_data)

        # If data had been previously saved to the buffer, the prepend it to output
        # do post read_channel so session_log/log doesn't record buffered data twice
        if self._read_buffer:
            output = self._read_buffer + new_data
            self._read_buffer = ""
        else:
            output = new_data
        return output

    def read_until_pattern(
        self,
        pattern: str = "",
        read_timeout: float = 10.0,
        re_flags: int = 0,
        max_loops: Optional[int] = None,
    ) -> str:
        """Read channel until pattern is detected.

        Will return string up to and including pattern.

        Returns ReadTimeout if pattern not detected in read_timeout seconds.

        :param pattern: Regular expression pattern used to identify that reading is done.

        :param read_timeout: maximum time to wait looking for pattern. Will raise ReadTimeout.
            A read_timeout value of 0 will cause the loop to never timeout (i.e. it will keep
            reading indefinitely until pattern is detected.

        :param re_flags: regex flags used in conjunction with pattern (defaults to no flags).

        :param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.
        """
        if max_loops is not None:
            msg = """\n
Netmiko 4.x has deprecated the use of max_loops with read_until_pattern.
You should convert all uses of max_loops over to read_timeout=x
where x is the total number of seconds to wait before timing out.\n"""
            warnings.warn(msg, DeprecationWarning)

        if self.read_timeout_override:
            read_timeout = self.read_timeout_override

        output = ""
        loop_delay = 0.01
        start_time = time.time()
        # if read_timeout == 0 or 0.0 keep reading indefinitely
        while (time.time() - start_time < read_timeout) or (not read_timeout):
            output += self.read_channel()

            if re.search(pattern, output, flags=re_flags):
                if "(" in pattern and "(?:" not in pattern:
                    msg = f"""
Parenthesis found in pattern.

pattern: {pattern}\n

This can be problemtic when used in read_until_pattern().

You should ensure that you use either non-capture groups i.e. '(?:' or that the
parenthesis completely wrap the pattern '(pattern)'"""
                    log.debug(msg)
                results = re.split(pattern, output, maxsplit=1, flags=re_flags)

                # The string matched by pattern must be retained in the output string.
                # re.split will do this if capturing parenthesis are used.
                if len(results) == 2:
                    # no capturing parenthesis, convert and try again.
                    pattern = f"({pattern})"
                    results = re.split(pattern, output, maxsplit=1, flags=re_flags)

                if len(results) != 3:
                    # well, we tried
                    msg = f"""Unable to successfully split output based on pattern:
pattern={pattern}
output={repr(output)}
results={results}
"""
                    raise ReadException(msg)

                # Process such that everything before and including pattern is return.
                # Everything else is retained in the _read_buffer
                output, match_str, buffer = results
                output = output + match_str
                if buffer:
                    self._read_buffer += buffer
                log.debug(f"Pattern found: {pattern} {output}")
                return output
            time.sleep(loop_delay)

        msg = f"""\n\nPattern not detected: {repr(pattern)} in output.

Things you might try to fix this:
1. Adjust the regex pattern to better identify the terminating string. Note, in
many situations the pattern is automatically based on the network device's prompt.
2. Increase the read_timeout to a larger value.

You can also look at the Netmiko session_log or debug log for more information.\n\n"""
        raise ReadTimeout(msg)

    def read_channel_timing(
        self,
        last_read: float = 2.0,
        read_timeout: float = 120.0,
        delay_factor: Optional[float] = None,
        max_loops: Optional[int] = None,
    ) -> str:
        """Read data on the channel based on timing delays.

        General pattern is keep reading until no new data is read.

        Once no new data is read wait `last_read` amount of time (one last read).
        As long as no new data, then return data.

        Setting `read_timeout` to zero will cause read_channel_timing to never expire based
        on an absolute timeout. It will only complete based on timeout based on there being
        no new data.

        :param last_read: Amount of time to wait before performing one last read (under the
            idea that we should be done reading at this point and there should be no new
            data).

        :param read_timeout: Absolute timer for how long Netmiko should keep reading data on
            the channel (waiting for there to be no new data). Will raise ReadTimeout if this
            timeout expires. A read_timeout value of 0 will cause the read-loop to never timeout
            (i.e. Netmiko will keep reading indefinitely until there is no new data and last_read
            passes).

        :param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

        :param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.
        """

        if delay_factor is not None or max_loops is not None:
            warnings.warn(DELAY_FACTOR_DEPR_SIMPLE_MSG, DeprecationWarning)

        if self.read_timeout_override:
            read_timeout = self.read_timeout_override

        # Time to delay in each read loop
        loop_delay = 0.1
        channel_data = ""
        start_time = time.time()

        # Set read_timeout to 0 to never timeout
        while (time.time() - start_time < read_timeout) or (not read_timeout):
            time.sleep(loop_delay)
            new_data = self.read_channel()
            # gather new output
            if new_data:
                channel_data += new_data
            # if we have some output, but nothing new, then do the last read
            elif channel_data != "":
                # Make sure really done (i.e. no new data)
                time.sleep(last_read)
                new_data = self.read_channel()
                if not new_data:
                    break
                else:
                    channel_data += new_data
        else:
            msg = f"""\n
read_channel_timing's absolute timer expired.

The network device was continually outputting data for longer than {read_timeout}
seconds.

If this is expected i.e. the command you are executing is continually emitting
data for a long period of time, then you can set 'read_timeout=x' seconds. If
you want Netmiko to keep reading indefinitely (i.e. to only stop when there is
no new data), then you can set 'read_timeout=0'.

You can look at the Netmiko session_log or debug log for more information.

"""
            raise ReadTimeout(msg)
        return channel_data

    def read_until_prompt(
        self,
        read_timeout: float = 10.0,
        read_entire_line: bool = False,
        re_flags: int = 0,
        max_loops: Optional[int] = None,
    ) -> str:
        """Read channel up to and including self.base_prompt."""
        pattern = re.escape(self.base_prompt)
        if read_entire_line:
            pattern = f"{pattern}.*"
        return self.read_until_pattern(
            pattern=pattern,
            re_flags=re_flags,
            max_loops=max_loops,
            read_timeout=read_timeout,
        )

    def read_until_prompt_or_pattern(
        self,
        pattern: str = "",
        read_timeout: float = 10.0,
        read_entire_line: bool = False,
        re_flags: int = 0,
        max_loops: Optional[int] = None,
    ) -> str:
        """Read until either self.base_prompt or pattern is detected."""
        prompt_pattern = re.escape(self.base_prompt)
        if read_entire_line:
            prompt_pattern = f"{prompt_pattern}.*"
        if pattern:
            combined_pattern = r"(?:{}|{})".format(prompt_pattern, pattern)
        else:
            combined_pattern = prompt_pattern
        return self.read_until_pattern(
            pattern=combined_pattern,
            re_flags=re_flags,
            max_loops=max_loops,
            read_timeout=read_timeout,
        )

    def serial_login(
        self,
        pri_prompt_terminator: str = r"#\s*$",
        alt_prompt_terminator: str = r">\s*$",
        username_pattern: str = r"(?:[Uu]ser:|sername|ogin)",
        pwd_pattern: str = r"assword",
        delay_factor: float = 1.0,
        max_loops: int = 20,
    ) -> str:
        return self.telnet_login(
            pri_prompt_terminator,
            alt_prompt_terminator,
            username_pattern,
            pwd_pattern,
            delay_factor,
            max_loops,
        )

    def telnet_login(
        self,
        pri_prompt_terminator: str = r"#\s*$",
        alt_prompt_terminator: str = r">\s*$",
        username_pattern: str = r"(?:user:|username|login|user name)",
        pwd_pattern: str = r"assword",
        delay_factor: float = 1.0,
        max_loops: int = 20,
    ) -> str:
        """Telnet login. Can be username/password or just password.

        :param pri_prompt_terminator: Primary trailing delimiter for identifying a device prompt

        :param alt_prompt_terminator: Alternate trailing delimiter for identifying a device prompt

        :param username_pattern: Pattern used to identify the username prompt

        :param pwd_pattern: Pattern used to identify the pwd prompt

        :param delay_factor: See __init__: global_delay_factor

        :param max_loops: Controls the wait time in conjunction with the delay_factor
        """
        delay_factor = self.select_delay_factor(delay_factor)

        # Revert telnet_login back to old speeds/delays
        if delay_factor < 1:
            if not self._legacy_mode and self.fast_cli:
                delay_factor = 1

        time.sleep(1 * delay_factor)

        output = ""
        return_msg = ""
        i = 1
        while i <= max_loops:
            try:
                output = self.read_channel()
                return_msg += output

                # Search for username pattern / send username
                if re.search(username_pattern, output, flags=re.I):
                    # Sometimes username/password must be terminated with "\r" and not "\r\n"
                    self.write_channel(self.username + "\r")
                    time.sleep(1 * delay_factor)
                    output = self.read_channel()
                    return_msg += output

                # Search for password pattern / send password
                if re.search(pwd_pattern, output, flags=re.I):
                    # Sometimes username/password must be terminated with "\r" and not "\r\n"
                    assert isinstance(self.password, str)
                    self.write_channel(self.password + "\r")
                    time.sleep(0.5 * delay_factor)
                    output = self.read_channel()
                    return_msg += output
                    if re.search(
                        pri_prompt_terminator, output, flags=re.M
                    ) or re.search(alt_prompt_terminator, output, flags=re.M):
                        return return_msg

                # Check if proper data received
                if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
                    alt_prompt_terminator, output, flags=re.M
                ):
                    return return_msg

                self.write_channel(self.TELNET_RETURN)
                time.sleep(0.5 * delay_factor)
                i += 1
            except EOFError:
                assert self.remote_conn is not None
                self.remote_conn.close()
                msg = f"Login failed: {self.host}"
                raise NetmikoAuthenticationException(msg)

        # Last try to see if we already logged in
        self.write_channel(self.TELNET_RETURN)
        time.sleep(0.5 * delay_factor)
        output = self.read_channel()
        return_msg += output
        if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
            alt_prompt_terminator, output, flags=re.M
        ):
            return return_msg

        msg = f"Login failed: {self.host}"
        assert self.remote_conn is not None
        self.remote_conn.close()
        raise NetmikoAuthenticationException(msg)

    def _try_session_preparation(self, force_data: bool = True) -> None:
        """
        In case of an exception happening during `session_preparation()` Netmiko should
        gracefully clean-up after itself. This might be challenging for library users
        to do since they do not have a reference to the object. This is possibly related
        to threads used in Paramiko.
        """
        try:
            # Netmiko needs there to be data for session_preparation to work.
            if force_data:
                self.write_channel(self.RETURN)
                time.sleep(0.1)
            self.session_preparation()
        except Exception:
            self.disconnect()
            raise

    def session_preparation(self) -> None:
        """
        Prepare the session after the connection has been established

        This method handles some differences that occur between various devices
        early on in the session.

        In general, it should include:
        self._test_channel_read(pattern=r"some_pattern")
        self.set_base_prompt()
        self.set_terminal_width()
        self.disable_paging()
        """
        self._test_channel_read()
        self.set_base_prompt()
        self.set_terminal_width()
        self.disable_paging()

    def _use_ssh_config(self, dict_arg: Dict[str, Any]) -> Dict[str, Any]:
        """Update SSH connection parameters based on contents of SSH config file.

        :param dict_arg: Dictionary of SSH connection parameters
        """
        connect_dict = dict_arg.copy()

        # Use SSHConfig to generate source content.
        assert self.ssh_config_file is not None
        full_path = path.abspath(path.expanduser(self.ssh_config_file))
        source: Union[paramiko.config.SSHConfigDict, Dict[str, Any]]
        if path.exists(full_path):
            ssh_config_instance = paramiko.SSHConfig()
            with io.open(full_path, "rt", encoding="utf-8") as f:
                ssh_config_instance.parse(f)
                source = ssh_config_instance.lookup(self.host)
        else:
            source = {}

        # Keys get normalized to lower-case
        proxy: Optional[paramiko.proxy.ProxyCommand]
        if "proxycommand" in source:
            proxy = paramiko.ProxyCommand(source["proxycommand"])
        elif "proxyjump" in source:
            hops = list(reversed(source["proxyjump"].split(",")))
            if len(hops) > 1:
                raise ValueError(
                    "ProxyJump with more than one proxy server is not supported."
                )
            port = source.get("port", self.port)
            host = source.get("hostname", self.host)
            # -F {full_path} forces the continued use of the same SSH config file
            cmd = "ssh -F {} -W {}:{} {}".format(full_path, host, port, hops[0])
            proxy = paramiko.ProxyCommand(cmd)
        else:
            proxy = None

        # Only update 'hostname', 'sock', 'port', and 'username'
        # For 'port' and 'username' only update if using object defaults
        if connect_dict["port"] == 22:
            connect_dict["port"] = int(source.get("port", self.port))
        if connect_dict["username"] == "":
            connect_dict["username"] = source.get("user", self.username)
        if proxy:
            connect_dict["sock"] = proxy
        connect_dict["hostname"] = source.get("hostname", self.host)

        return connect_dict

    def _connect_params_dict(self) -> Dict[str, Any]:
        """Generate dictionary of Paramiko connection parameters."""
        conn_dict = {
            "hostname": self.host,
            "port": self.port,
            "username": self.username,
            "password": self.password,
            "look_for_keys": self.use_keys,
            "allow_agent": self.allow_agent,
            "key_filename": self.key_file,
            "pkey": self.pkey,
            "passphrase": self.passphrase,
            "disabled_algorithms": self.disabled_algorithms,
            "timeout": self.conn_timeout,
            "auth_timeout": self.auth_timeout,
            "banner_timeout": self.banner_timeout,
            "sock": self.sock,
        }

        # Check if using SSH 'config' file mainly for SSH proxy support
        if self.ssh_config_file:
            conn_dict = self._use_ssh_config(conn_dict)
        return conn_dict

    def _sanitize_output(
        self,
        output: str,
        strip_command: bool = False,
        command_string: Optional[str] = None,
        strip_prompt: bool = False,
    ) -> str:
        """Strip out command echo and trailing router prompt."""
        if strip_command and command_string:
            output = self.strip_command(command_string, output)
        if strip_prompt:
            output = self.strip_prompt(output)
        return output

    def establish_connection(self, width: int = 511, height: int = 1000) -> None:
        """Establish SSH connection to the network device

        Timeout will generate a NetmikoTimeoutException
        Authentication failure will generate a NetmikoAuthenticationException

        :param width: Specified width of the VT100 terminal window (default: 511)
        :type width: int

        :param height: Specified height of the VT100 terminal window (default: 1000)
        :type height: int
        """
        self.channel: Channel
        if self.protocol == "telnet":
            if self.sock_telnet:
                self.remote_conn = telnet_proxy.Telnet(
                    self.host,
                    port=self.port,
                    timeout=self.timeout,
                    proxy_dict=self.sock_telnet,
                )
            else:
                self.remote_conn = telnetlib.Telnet(
                    self.host, port=self.port, timeout=self.timeout
                )
            # Migrating communication to channel class
            self.channel = TelnetChannel(conn=self.remote_conn, encoding=self.encoding)
            self.telnet_login()
        elif self.protocol == "serial":
            self.remote_conn = serial.Serial(**self.serial_settings)
            self.channel = SerialChannel(conn=self.remote_conn, encoding=self.encoding)
            self.serial_login()
        elif self.protocol == "ssh":
            ssh_connect_params = self._connect_params_dict()
            self.remote_conn_pre: Optional[paramiko.SSHClient]
            self.remote_conn_pre = self._build_ssh_client()

            # initiate SSH connection
            try:
                self.remote_conn_pre.connect(**ssh_connect_params)
            except socket.error as conn_error:
                self.paramiko_cleanup()
                msg = f"""TCP connection to device failed.

Common causes of this problem are:
1. Incorrect hostname or IP address.
2. Wrong TCP port.
3. Intermediate firewall blocking access.

Device settings: {self.device_type} {self.host}:{self.port}

"""

                # Handle DNS failures separately
                if "Name or service not known" in str(conn_error):
                    msg = (
                        f"DNS failure--the hostname you provided was not resolvable "
                        f"in DNS: {self.host}:{self.port}"
                    )

                msg = msg.lstrip()
                raise NetmikoTimeoutException(msg)
            except paramiko.ssh_exception.AuthenticationException as auth_err:
                self.paramiko_cleanup()
                msg = f"""Authentication to device failed.

Common causes of this problem are:
1. Invalid username and password
2. Incorrect SSH-key file
3. Connecting to the wrong device

Device settings: {self.device_type} {self.host}:{self.port}

"""

                msg += self.RETURN + str(auth_err)
                raise NetmikoAuthenticationException(msg)
            except paramiko.ssh_exception.SSHException as e:
                self.paramiko_cleanup()
                if "No existing session" in str(e):
                    msg = (
                        "Paramiko: 'No existing session' error: "
                        "try increasing 'conn_timeout' to 15 seconds or larger."
                    )
                    raise NetmikoTimeoutException(msg)
                else:
                    msg = f"""
A paramiko SSHException occurred during connection creation:

{str(e)}

"""
                    raise NetmikoTimeoutException(msg)

            if self.verbose:
                print(f"SSH connection established to {self.host}:{self.port}")

            # Use invoke_shell to establish an 'interactive session'
            self.remote_conn = self.remote_conn_pre.invoke_shell(
                term="vt100", width=width, height=height
            )

            self.remote_conn.settimeout(self.blocking_timeout)
            if self.keepalive:
                assert isinstance(self.remote_conn.transport, paramiko.Transport)
                self.remote_conn.transport.set_keepalive(self.keepalive)

            # Migrating communication to channel class
            self.channel = SSHChannel(conn=self.remote_conn, encoding=self.encoding)

            self.special_login_handler()
            if self.verbose:
                print("Interactive SSH session established")

        return None

    def _test_channel_read(self, count: int = 40, pattern: str = "") -> str:
        """Try to read the channel (generally post login) verify you receive data back.

        :param count: the number of times to check the channel for data

        :param pattern: Regular expression pattern used to determine end of channel read
        """

        def _increment_delay(
            main_delay: float, increment: float = 1.1, maximum: int = 8
        ) -> float:
            """Increment sleep time to a maximum value."""
            main_delay = main_delay * increment
            if main_delay >= maximum:
                main_delay = maximum
            return main_delay

        i = 0
        delay_factor = self.select_delay_factor(delay_factor=0)

        if pattern:
            return self.read_until_pattern(pattern=pattern, read_timeout=20)

        main_delay = delay_factor * 0.1
        time.sleep(main_delay * 10)
        new_data = ""
        while i <= count:
            new_data += self.read_channel_timing(read_timeout=20)
            if new_data:
                return new_data

            self.write_channel(self.RETURN)
            main_delay = _increment_delay(main_delay)
            time.sleep(main_delay)
            i += 1

        raise NetmikoTimeoutException("Timed out waiting for data")

    def _build_ssh_client(self) -> paramiko.SSHClient:
        """Prepare for Paramiko SSH connection."""
        # Create instance of SSHClient object
        remote_conn_pre = paramiko.SSHClient()

        # Load host_keys for better SSH security
        if self.system_host_keys:
            remote_conn_pre.load_system_host_keys()
        if self.alt_host_keys and path.isfile(self.alt_key_file):
            remote_conn_pre.load_host_keys(self.alt_key_file)

        # Default is to automatically add untrusted hosts (make sure appropriate for your env)
        remote_conn_pre.set_missing_host_key_policy(self.key_policy)
        return remote_conn_pre

    def select_delay_factor(self, delay_factor: float) -> float:
        """
        Choose the greater of delay_factor or self.global_delay_factor (default).
        In fast_cli choose the lesser of delay_factor of self.global_delay_factor.

        :param delay_factor: See __init__: global_delay_factor
        :type delay_factor: int
        """
        if self.fast_cli:
            if delay_factor and delay_factor <= self.global_delay_factor:
                return delay_factor
            else:
                return self.global_delay_factor
        else:
            if delay_factor >= self.global_delay_factor:
                return delay_factor
            else:
                return self.global_delay_factor

    def special_login_handler(self, delay_factor: float = 1.0) -> None:
        """Handler for devices like WLC, Extreme ERS that throw up characters prior to login."""
        pass

    def disable_paging(
        self,
        command: str = "terminal length 0",
        delay_factor: Optional[float] = None,
        cmd_verify: bool = True,
        pattern: Optional[str] = None,
    ) -> str:
        """Disable paging default to a Cisco CLI method.

        :param command: Device command to disable pagination of output

        :param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

        :param cmd_verify: Verify command echo before proceeding (default: True).

        :param pattern: Pattern to terminate reading of channel
        """
        if delay_factor is not None:
            warnings.warn(DELAY_FACTOR_DEPR_SIMPLE_MSG, DeprecationWarning)

        command = self.normalize_cmd(command)
        log.debug("In disable_paging")
        log.debug(f"Command: {command}")
        self.write_channel(command)
        # Make sure you read until you detect the command echo (avoid getting out of sync)
        if cmd_verify and self.global_cmd_verify is not False:
            output = self.read_until_pattern(
                pattern=re.escape(command.strip()), read_timeout=20
            )
        elif pattern:
            output = self.read_until_pattern(pattern=pattern, read_timeout=20)
        else:
            output = self.read_until_prompt()
        log.debug(f"{output}")
        log.debug("Exiting disable_paging")
        return output

    def set_terminal_width(
        self,
        command: str = "",
        delay_factor: Optional[float] = None,
        cmd_verify: bool = False,
        pattern: Optional[str] = None,
    ) -> str:
        """CLI terminals try to automatically adjust the line based on the width of the terminal.
        This causes the output to get distorted when accessed programmatically.

        Set terminal width to 511 which works on a broad set of devices.

        :param command: Command string to send to the device

        :param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.
        """
        if delay_factor is not None:
            warnings.warn(DELAY_FACTOR_DEPR_SIMPLE_MSG, DeprecationWarning)

        if not command:
            return ""
        command = self.normalize_cmd(command)
        self.write_channel(command)

        # Avoid cmd_verify here as terminal width must be set before doing cmd_verify
        if cmd_verify and self.global_cmd_verify is not False:
            output = self.read_until_pattern(pattern=re.escape(command.strip()))
        elif pattern:
            output = self.read_until_pattern(pattern=pattern)
        else:
            output = self.read_until_prompt()
        return output

    def set_base_prompt(
        self,
        pri_prompt_terminator: str = "#",
        alt_prompt_terminator: str = ">",
        delay_factor: float = 1.0,
        pattern: Optional[str] = None,
    ) -> str:
        """Sets self.base_prompt

        Used as delimiter for stripping of trailing prompt in output.

        Should be set to something that is general and applies in multiple contexts. For Cisco
        devices this will be set to router hostname (i.e. prompt without > or #).

        This will be set on entering user exec or privileged exec on Cisco, but not when
        entering/exiting config mode.

        :param pri_prompt_terminator: Primary trailing delimiter for identifying a device prompt

        :param alt_prompt_terminator: Alternate trailing delimiter for identifying a device prompt

        :param delay_factor: See __init__: global_delay_factor

        :param pattern: Regular expression pattern to search for in find_prompt() call
        """
        if pattern is None:
            if pri_prompt_terminator and alt_prompt_terminator:
                pri_term = re.escape(pri_prompt_terminator)
                alt_term = re.escape(alt_prompt_terminator)
                pattern = rf"({pri_term}|{alt_term})"
            elif pri_prompt_terminator:
                pattern = re.escape(pri_prompt_terminator)
            elif alt_prompt_terminator:
                pattern = re.escape(alt_prompt_terminator)

        if pattern:
            prompt = self.find_prompt(delay_factor=delay_factor, pattern=pattern)
        else:
            prompt = self.find_prompt(delay_factor=delay_factor)

        if not prompt[-1] in (pri_prompt_terminator, alt_prompt_terminator):
            raise ValueError(f"Router prompt not found: {repr(prompt)}")

        # If all we have is the 'terminator' just use that :-(
        if len(prompt) == 1:
            self.base_prompt = prompt
        else:
            # Strip off trailing terminator
            self.base_prompt = prompt[:-1]
        return self.base_prompt

    def find_prompt(
        self, delay_factor: float = 1.0, pattern: Optional[str] = None
    ) -> str:
        """Finds the current network device prompt, last line only.

        :param delay_factor: See __init__: global_delay_factor
        :type delay_factor: int

        :param pattern: Regular expression pattern to determine whether prompt is valid
        """
        delay_factor = self.select_delay_factor(delay_factor)
        sleep_time = delay_factor * 0.25
        self.clear_buffer()
        self.write_channel(self.RETURN)

        if pattern:
            prompt = self.read_until_pattern(pattern=pattern)
        else:
            # Initial read
            time.sleep(sleep_time)
            prompt = self.read_channel().strip()

            count = 0
            while count <= 12 and not prompt:
                if not prompt:
                    self.write_channel(self.RETURN)
                    time.sleep(sleep_time)
                    prompt = self.read_channel().strip()
                    if sleep_time <= 3:
                        # Double the sleep_time when it is small
                        sleep_time *= 2
                    else:
                        sleep_time += 1
                count += 1

        # If multiple lines in the output take the last line
        prompt = prompt.split(self.RESPONSE_RETURN)[-1]
        prompt = prompt.strip()
        self.clear_buffer()
        if not prompt:
            raise ValueError(f"Unable to find prompt: {prompt}")
        log.debug(f"[find_prompt()]: prompt is {prompt}")
        return prompt

    def clear_buffer(
        self,
        backoff: bool = True,
        backoff_max: float = 3.0,
        delay_factor: Optional[float] = None,
    ) -> str:
        """Read any data available in the channel."""

        if delay_factor is None:
            delay_factor = self.global_delay_factor
        sleep_time = 0.1 * delay_factor

        output = ""
        for _ in range(10):
            time.sleep(sleep_time)
            data = self.read_channel()
            data = self.strip_ansi_escape_codes(data)
            output += data
            if not data:
                break
            # Double sleep time each time we detect data
            log.debug("Clear buffer detects data in the channel")
            if backoff:
                sleep_time *= 2
                sleep_time = backoff_max if sleep_time >= backoff_max else sleep_time
        return output

    def command_echo_read(self, cmd: str, read_timeout: float) -> str:
        # Make sure you read until you detect the command echo (avoid getting out of sync)
        new_data = self.read_until_pattern(
            pattern=re.escape(cmd), read_timeout=read_timeout
        )

        # There can be echoed prompts that haven't been cleared before the cmd echo
        # this can later mess up the trailing prompt pattern detection. Clear this out.
        lines = new_data.split(cmd)
        if len(lines) == 2:
            # lines[-1] should realistically just be the null string
            new_data = f"{cmd}{lines[-1]}"
        else:
            # cmd exists in the output multiple times? Just retain the original output
            pass
        return new_data

    @flush_session_log
    @select_cmd_verify
    def send_command_timing(
        self,
        command_string: str,
        last_read: float = 2.0,
        read_timeout: float = 120.0,
        delay_factor: Optional[float] = None,
        max_loops: Optional[int] = None,
        strip_prompt: bool = True,
        strip_command: bool = True,
        normalize: bool = True,
        use_textfsm: bool = False,
        textfsm_template: Optional[str] = None,
        use_ttp: bool = False,
        ttp_template: Optional[str] = None,
        use_genie: bool = False,
        cmd_verify: bool = False,
    ) -> Union[str, List[Any], Dict[str, Any]]:
        """Execute command_string on the SSH channel using a delay-based mechanism. Generally
        used for show commands.

        :param command_string: The command to be executed on the remote device.

        :param last_read: Time waited after end of data

        :param read_timeout: Absolute timer for how long Netmiko should keep reading data on
            the channel (waiting for there to be no new data). Will raise ReadTimeout if this
            timeout expires. A read_timeout value of 0 will cause the read-loop to never timeout
            (i.e. Netmiko will keep reading indefinitely until there is no new data and last_read
            passes).

        :param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

        :param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

        :param strip_prompt: Remove the trailing router prompt from the output (default: True).

        :param strip_command: Remove the echo of the command from the output (default: True).

        :param normalize: Ensure the proper enter is sent at end of command (default: True).

        :param use_textfsm: Process command output through TextFSM template (default: False).

        :param textfsm_template: Name of template to parse output with; can be fully qualified
            path, relative path, or name of file in current directory. (default: None).

        :param use_ttp: Process command output through TTP template (default: False).

        :param ttp_template: Name of template to parse output with; can be fully qualified
            path, relative path, or name of file in current directory. (default: None).

        :param use_genie: Process command output through PyATS/Genie parser (default: False).

        :param cmd_verify: Verify command echo before proceeding (default: False).
        """
        if delay_factor is not None or max_loops is not None:
            warnings.warn(DELAY_FACTOR_DEPR_SIMPLE_MSG, DeprecationWarning)

        output = ""
        new_data = ""
        if normalize:
            command_string = self.normalize_cmd(command_string)
        self.write_channel(command_string)

        cmd = command_string.strip()
        if cmd and cmd_verify:
            new_data = self.command_echo_read(cmd=cmd, read_timeout=10)
        output += new_data
        output += self.read_channel_timing(
            last_read=last_read, read_timeout=read_timeout
        )

        output = self._sanitize_output(
            output,
            strip_command=strip_command,
            command_string=command_string,
            strip_prompt=strip_prompt,
        )
        return_data = structured_data_converter(
            command=command_string,
            raw_data=output,
            platform=self.device_type,
            use_textfsm=use_textfsm,
            use_ttp=use_ttp,
            use_genie=use_genie,
            textfsm_template=textfsm_template,
            ttp_template=ttp_template,
        )
        return return_data

    def _send_command_timing_str(self, *args: Any, **kwargs: Any) -> str:
        """Wrapper for `send_command_timing` method that always returns a
        string"""
        output = self.send_command_timing(*args, **kwargs)
        assert isinstance(output, str)
        return output

    def strip_prompt(self, a_string: str) -> str:
        """Strip the trailing router prompt from the output.

        :param a_string: Returned string from device
        :type a_string: str
        """
        response_list = a_string.split(self.RESPONSE_RETURN)
        last_line = response_list[-1]

        if self.base_prompt in last_line:
            return self.RESPONSE_RETURN.join(response_list[:-1])
        else:
            return a_string

    def _first_line_handler(self, data: str, search_pattern: str) -> Tuple[str, bool]:
        """
        In certain situations the first line will get repainted which causes a false
        match on the terminating pattern.

        Filter this out.

        returns a tuple of (data, first_line_processed)

        Where data is the original data potentially with the first line modified
        and the first_line_processed is a flag indicating that we have handled the
        first line.
        """
        try:
            # First line is the echo line containing the command. In certain situations
            # it gets repainted and needs filtered
            lines = data.split(self.RETURN)
            first_line = lines[0]
            if BACKSPACE_CHAR in first_line:
                pattern = search_pattern + r".*$"
                first_line = re.sub(pattern, repl="", string=first_line)
                lines[0] = first_line
                data = self.RETURN.join(lines)
            return (data, True)
        except IndexError:
            return (data, False)

    def _prompt_handler(self, auto_find_prompt: bool) -> str:
        if auto_find_prompt:
            try:
                prompt = self.find_prompt()
            except ValueError:
                prompt = self.base_prompt
        else:
            prompt = self.base_prompt
        return re.escape(prompt.strip())

    @flush_session_log
    @select_cmd_verify
    def send_command(
        self,
        command_string: str,
        expect_string: Optional[str] = None,
        read_timeout: float = 10.0,
        delay_factor: Optional[float] = None,
        max_loops: Optional[int] = None,
        auto_find_prompt: bool = True,
        strip_prompt: bool = True,
        strip_command: bool = True,
        normalize: bool = True,
        use_textfsm: bool = False,
        textfsm_template: Optional[str] = None,
        use_ttp: bool = False,
        ttp_template: Optional[str] = None,
        use_genie: bool = False,
        cmd_verify: bool = True,
    ) -> Union[str, List[Any], Dict[str, Any]]:
        """Execute command_string on the SSH channel using a pattern-based mechanism. Generally
        used for show commands. By default this method will keep waiting to receive data until the
        network device prompt is detected. The current network device prompt will be determined
        automatically.

        :param command_string: The command to be executed on the remote device.

        :param expect_string: Regular expression pattern to use for determining end of output.
            If left blank will default to being based on router prompt.

        :param read_timeout: Maximum time to wait looking for pattern. Will raise ReadTimeout
            if timeout is exceeded.

        :param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

        :param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

        :param auto_find_prompt: Use find_prompt() to override base prompt

        :param strip_prompt: Remove the trailing router prompt from the output (default: True).

        :param strip_command: Remove the echo of the command from the output (default: True).

        :param normalize: Ensure the proper enter is sent at end of command (default: True).

        :param use_textfsm: Process command output through TextFSM template (default: False).

        :param textfsm_template: Name of template to parse output with; can be fully qualified
            path, relative path, or name of file in current directory. (default: None).

        :param use_ttp: Process command output through TTP template (default: False).

        :param ttp_template: Name of template to parse output with; can be fully qualified
            path, relative path, or name of file in current directory. (default: None).

        :param use_genie: Process command output through PyATS/Genie parser (default: False).

        :param cmd_verify: Verify command echo before proceeding (default: True).
        """

        # Time to delay in each read loop
        loop_delay = 0.025

        if self.read_timeout_override:
            read_timeout = self.read_timeout_override

        if self.delay_factor_compat:
            # For compatibility calculate the old equivalent read_timeout
            # i.e. what it would have been in Netmiko 3.x
            if delay_factor is None:
                tmp_delay_factor = self.global_delay_factor
            else:
                tmp_delay_factor = self.select_delay_factor(delay_factor)
            compat_timeout = calc_old_timeout(
                max_loops=max_loops,
                delay_factor=tmp_delay_factor,
                loop_delay=0.2,
                old_timeout=self.timeout,
            )
            msg = f"""\n
You have chosen to use Netmiko's delay_factor compatibility mode for
send_command. This will revert Netmiko to behave similarly to how it
did in Netmiko 3.x (i.e. to use delay_factor/global_delay_factor and
max_loops).

Using these parameters Netmiko has calculated an effective read_timeout
of {compat_timeout} and will set the read_timeout to this value.

Please convert your code to that new format i.e.:

    net_connect.send_command(cmd, read_timeout={compat_timeout})

And then disable delay_factor_compat.

delay_factor_compat will be removed in Netmiko 5.x.\n"""
            warnings.warn(msg, DeprecationWarning)

            # Override the read_timeout with Netmiko 3.x way :-(
            read_timeout = compat_timeout

        else:
            # No need for two deprecation messages so only display this if not using
            # delay_factor_compat
            if delay_factor is not None or max_loops is not None:
                msg = """\n
Netmiko 4.x has deprecated the use of delay_factor/max_loops with
send_command. You should convert all uses of delay_factor and max_loops
over to read_timeout=x where x is the total number of seconds to wait
before timing out.\n"""
                warnings.warn(msg, DeprecationWarning)

        if expect_string is not None:
            search_pattern = expect_string
        else:
            search_pattern = self._prompt_handler(auto_find_prompt)

        if normalize:
            command_string = self.normalize_cmd(command_string)

        # Start the clock
        start_time = time.time()
        self.write_channel(command_string)
        new_data = ""

        cmd = command_string.strip()
        if cmd and cmd_verify:
            new_data = self.command_echo_read(cmd=cmd, read_timeout=10)

        MAX_CHARS = 2_000_000
        DEQUE_SIZE = 20
        output = ""
        # Check only the past N-reads. This is for the case where the output is
        # very large (i.e. searching a very large string for a pattern a whole bunch of times)
        past_n_reads: Deque[str] = deque(maxlen=DEQUE_SIZE)
        first_line_processed = False

        # Keep reading data until search_pattern is found or until read_timeout
        while time.time() - start_time < read_timeout:
            if new_data:
                output += new_data
                past_n_reads.append(new_data)

                # Case where we haven't processed the first_line yet (there is a potential issue
                # in the first line (in cases where the line is repainted).
                if not first_line_processed:
                    output, first_line_processed = self._first_line_handler(
                        output, search_pattern
                    )
                    # Check if we have already found our pattern
                    if re.search(search_pattern, output):
                        break

                else:
                    if len(output) <= MAX_CHARS:
                        if re.search(search_pattern, output):
                            break
                    else:
                        # Switch to deque mode if output is greater than MAX_CHARS
                        # Check if pattern is in the past n reads
                        if re.search(search_pattern, "".join(past_n_reads)):
                            break

            time.sleep(loop_delay)
            new_data = self.read_channel()

        else:  # nobreak
            msg = f"""
Pattern not detected: {repr(search_pattern)} in output.

Things you might try to fix this:
1. Explicitly set your pattern using the expect_string argument.
2. Increase the read_timeout to a larger value.

You can also look at the Netmiko session_log or debug log for more information.

"""
            raise ReadTimeout(msg)

        output = self._sanitize_output(
            output,
            strip_command=strip_command,
            command_string=command_string,
            strip_prompt=strip_prompt,
        )
        return_val = structured_data_converter(
            command=command_string,
            raw_data=output,
            platform=self.device_type,
            use_textfsm=use_textfsm,
            use_ttp=use_ttp,
            use_genie=use_genie,
            textfsm_template=textfsm_template,
            ttp_template=ttp_template,
        )
        return return_val

    def _send_command_str(self, *args: Any, **kwargs: Any) -> str:
        """Wrapper for `send_command` method that always returns a string"""
        output = self.send_command(*args, **kwargs)
        assert isinstance(output, str)
        return output

    def send_command_expect(
        self, *args: Any, **kwargs: Any
    ) -> Union[str, List[Any], Dict[str, Any]]:
        """Support previous name of send_command method."""
        return self.send_command(*args, **kwargs)

    def _multiline_kwargs(self, **kwargs: Any) -> Dict[str, Any]:
        strip_prompt = kwargs.get("strip_prompt", False)
        kwargs["strip_prompt"] = strip_prompt
        strip_command = kwargs.get("strip_command", False)
        kwargs["strip_command"] = strip_command
        return kwargs

    def send_multiline(
        self,
        commands: Sequence[Union[str, List[str]]],
        multiline: bool = True,
        **kwargs: Any,
    ) -> str:
        """
        commands should either be:

        commands = [[cmd1, expect1], [cmd2, expect2], ...]]

        Or

        commands = [cmd1, cmd2, cmd3, ...]

        Any expect_string that is a null-string will use pattern based on
        device's prompt (unless expect_string argument is passed in via
        kwargs.

        """
        output = ""
        if multiline:
            kwargs = self._multiline_kwargs(**kwargs)

        default_expect_string = kwargs.pop("expect_string", None)
        if not default_expect_string:
            auto_find_prompt = kwargs.get("auto_find_prompt", True)
            default_expect_string = self._prompt_handler(auto_find_prompt)

        if commands and isinstance(commands[0], str):
            # If list of commands just send directly using default_expect_string (probably prompt)
            for cmd in commands:
                cmd = str(cmd)
                output += self._send_command_str(
                    cmd, expect_string=default_expect_string, **kwargs
                )
        else:
            # If list of lists, then first element is cmd and second element is expect_string
            for cmd_item in commands:
                assert not isinstance(cmd_item, str)
                cmd, expect_string = cmd_item
                if not expect_string:
                    expect_string = default_expect_string
                output += self._send_command_str(
                    cmd, expect_string=expect_string, **kwargs
                )
        return output

    def send_multiline_timing(
        self, commands: Sequence[str], multiline: bool = True, **kwargs: Any
    ) -> str:
        if multiline:
            kwargs = self._multiline_kwargs(**kwargs)
        output = ""
        for cmd in commands:
            cmd = str(cmd)
            output += self._send_command_timing_str(cmd, **kwargs)
        return output

    @staticmethod
    def strip_backspaces(output: str) -> str:
        """Strip any backspace characters out of the output.

        :param output: Output obtained from a remote network device.
        :type output: str
        """
        backspace_char = "\x08"
        return output.replace(backspace_char, "")

    def strip_command(self, command_string: str, output: str) -> str:
        """
        Strip command_string from output string

        Cisco IOS adds backspaces into output for long commands (i.e. for commands that line wrap)

        :param command_string: The command string sent to the device
        :type command_string: str

        :param output: The returned output as a result of the command string sent to the device
        :type output: str
        """
        backspace_char = "\x08"

        # Check for line wrap (remove backspaces)
        if backspace_char in output:
            output = output.replace(backspace_char, "")

        # Juniper has a weird case where the echoed command will be " \n"
        # i.e. there is an extra space there.
        cmd = command_string.strip()
        if output.startswith(cmd):
            output_lines = output.split(self.RESPONSE_RETURN)
            new_output = output_lines[1:]
            return self.RESPONSE_RETURN.join(new_output)
        else:
            # command_string isn't there; do nothing
            return output

    def normalize_linefeeds(self, a_string: str) -> str:
        """Convert `\r\r\n`,`\r\n`, `\n\r` to `\n.`

        :param a_string: A string that may have non-normalized line feeds
            i.e. output returned from device, or a device prompt
        :type a_string: str
        """
        newline = re.compile("(\r\r\r\n|\r\r\n|\r\n|\n\r)")
        a_string = newline.sub(self.RESPONSE_RETURN, a_string)
        if self.RESPONSE_RETURN == "\n":
            # Convert any remaining \r to \n
            return re.sub("\r", self.RESPONSE_RETURN, a_string)
        else:
            return a_string

    def normalize_cmd(self, command: str) -> str:
        """Normalize CLI commands to have a single trailing newline.

        :param command: Command that may require line feed to be normalized
        :type command: str
        """
        command = command.rstrip()
        command += self.RETURN
        return command

    def check_enable_mode(self, check_string: str = "") -> bool:
        """Check if in enable mode. Return boolean.

        :param check_string: Identification of privilege mode from device
        :type check_string: str
        """
        self.write_channel(self.RETURN)
        output = self.read_until_prompt(read_entire_line=True)
        return check_string in output

    def enable(
        self,
        cmd: str = "",
        pattern: str = "ssword",
        enable_pattern: Optional[str] = None,
        check_state: bool = True,
        re_flags: int = re.IGNORECASE,
    ) -> str:
        """Enter enable mode.

        :param cmd: Device command to enter enable mode

        :param pattern: pattern to search for indicating device is waiting for password

        :param enable_pattern: pattern indicating you have entered enable mode

        :param check_state: Determine whether we are already in enable_mode using
                check_enable_mode() before trying to elevate privileges (default: True)

        :param re_flags: Regular expression flags used in conjunction with pattern
        """
        output = ""
        msg = (
            "Failed to enter enable mode. Please ensure you pass "
            "the 'secret' argument to ConnectHandler."
        )

        # Check if in enable mode already.
        if check_state and self.check_enable_mode():
            return output

        # Send "enable" mode command
        self.write_channel(self.normalize_cmd(cmd))
        try:
            # Read the command echo
            if self.global_cmd_verify is not False:
                output += self.read_until_pattern(pattern=re.escape(cmd.strip()))

            # Search for trailing prompt or password pattern
            output += self.read_until_prompt_or_pattern(
                pattern=pattern, re_flags=re_flags
            )

            # Send the "secret" in response to password pattern
            if re.search(pattern, output):
                self.write_channel(self.normalize_cmd(self.secret))
                output += self.read_until_prompt()

            # Search for terminating pattern if defined
            if enable_pattern and not re.search(enable_pattern, output):
                output += self.read_until_pattern(pattern=enable_pattern)
            else:
                if not self.check_enable_mode():
                    raise ValueError(msg)

        except NetmikoTimeoutException:
            raise ValueError(msg)

        return output

    def exit_enable_mode(self, exit_command: str = "") -> str:
        """Exit enable mode.

        :param exit_command: Command that exits the session from privileged mode
        :type exit_command: str
        """
        output = ""
        if self.check_enable_mode():
            self.write_channel(self.normalize_cmd(exit_command))
            output += self.read_until_prompt()
            if self.check_enable_mode():
                raise ValueError("Failed to exit enable mode.")
        return output

    def check_config_mode(
        self, check_string: str = "", pattern: str = "", force_regex: bool = False
    ) -> bool:
        """Checks if the device is in configuration mode or not.

        :param check_string: Identification of configuration mode from the device
        :type check_string: str

        :param pattern: Pattern to terminate reading of channel
        :type pattern: str

        :param force_regex: Use regular expression pattern to find check_string in output
        :type force_regex: bool

        """
        self.write_channel(self.RETURN)
        # You can encounter an issue here (on router name changes) prefer delay-based solution
        if not pattern:
            output = self.read_channel_timing(read_timeout=10.0)
        else:
            output = self.read_until_pattern(pattern=pattern)

        if force_regex:
            return bool(re.search(check_string, output))
        else:
            return check_string in output

    def config_mode(
        self, config_command: str = "", pattern: str = "", re_flags: int = 0
    ) -> str:
        """Enter into config_mode.

        :param config_command: Configuration command to send to the device
        :type config_command: str

        :param pattern: Pattern to terminate reading of channel
        :type pattern: str

        :param re_flags: Regular expression flags
        :type re_flags: RegexFlag
        """
        output = ""
        if not self.check_config_mode():
            self.write_channel(self.normalize_cmd(config_command))
            # Make sure you read until you detect the command echo (avoid getting out of sync)
            if self.global_cmd_verify is not False:
                output += self.read_until_pattern(
                    pattern=re.escape(config_command.strip())
                )
            if pattern:
                output += self.read_until_pattern(pattern=pattern, re_flags=re_flags)
            else:
                output += self.read_until_prompt(read_entire_line=True)
            if not self.check_config_mode():
                raise ValueError("Failed to enter configuration mode.")
        return output

    def exit_config_mode(self, exit_config: str = "", pattern: str = "") -> str:
        """Exit from configuration mode.

        :param exit_config: Command to exit configuration mode
        :type exit_config: str

        :param pattern: Pattern to terminate reading of channel
        :type pattern: str
        """
        output = ""
        if self.check_config_mode():
            self.write_channel(self.normalize_cmd(exit_config))
            # Make sure you read until you detect the command echo (avoid getting out of sync)
            if self.global_cmd_verify is not False:
                output += self.read_until_pattern(
                    pattern=re.escape(exit_config.strip())
                )
            if pattern:
                output += self.read_until_pattern(pattern=pattern)
            else:
                output += self.read_until_prompt(read_entire_line=True)
            if self.check_config_mode():
                raise ValueError("Failed to exit configuration mode")
        log.debug(f"exit_config_mode: {output}")
        return output

    def send_config_from_file(
        self, config_file: Union[str, bytes, "PathLike[Any]"], **kwargs: Any
    ) -> str:
        """
        Send configuration commands down the SSH channel from a file.

        The file is processed line-by-line and each command is sent down the
        SSH channel.

        **kwargs are passed to send_config_set method.

        :param config_file: Path to configuration file to be sent to the device

        :param kwargs: params to be sent to send_config_set method
        """
        with io.open(config_file, "rt", encoding="utf-8") as cfg_file:
            commands = cfg_file.readlines()
        return self.send_config_set(commands, **kwargs)

    @flush_session_log
    def send_config_set(
        self,
        config_commands: Union[str, Sequence[str], Iterator[str], TextIO, None] = None,
        *,
        exit_config_mode: bool = True,
        read_timeout: Optional[float] = None,
        delay_factor: Optional[float] = None,
        max_loops: Optional[int] = None,
        strip_prompt: bool = False,
        strip_command: bool = False,
        config_mode_command: Optional[str] = None,
        cmd_verify: bool = True,
        enter_config_mode: bool = True,
        error_pattern: str = "",
        terminator: str = r"#",
        bypass_commands: Optional[str] = None,
    ) -> str:
        """
        Send configuration commands down the SSH channel.

        config_commands is an iterable containing all of the configuration commands.
        The commands will be executed one after the other.

        Automatically exits/enters configuration mode.

        :param config_commands: Multiple configuration commands to be sent to the device

        :param exit_config_mode: Determines whether or not to exit config mode after complete

        :param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

        :param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

        :param strip_prompt: Determines whether or not to strip the prompt

        :param strip_command: Determines whether or not to strip the command

        :param read_timeout: Absolute timer to send to read_channel_timing. Also adjusts
        read_timeout in read_until_pattern calls.

        :param config_mode_command: The command to enter into config mode

        :param cmd_verify: Whether or not to verify command echo for each command in config_set

        :param enter_config_mode: Do you enter config mode before sending config commands

        :param error_pattern: Regular expression pattern to detect config errors in the
        output.

        :param terminator: Regular expression pattern to use as an alternate terminator in certain
        situations.

        :param bypass_commands: Regular expression pattern indicating configuration commands
        where cmd_verify is automatically disabled.
        """

        if self.global_cmd_verify is not None:
            cmd_verify = self.global_cmd_verify

        if delay_factor is not None or max_loops is not None:
            warnings.warn(DELAY_FACTOR_DEPR_SIMPLE_MSG, DeprecationWarning)

            # Calculate an equivalent read_timeout (if using old settings)
            # Eliminate in Netmiko 5.x
            if read_timeout is None:
                max_loops = 150 if max_loops is None else max_loops
                delay_factor = 1.0 if delay_factor is None else delay_factor

                # If delay_factor has been set, then look at global_delay_factor
                delay_factor = self.select_delay_factor(delay_factor)

                read_timeout = calc_old_timeout(
                    max_loops=max_loops, delay_factor=delay_factor, loop_delay=0.1
                )

        if delay_factor is None:
            delay_factor = self.select_delay_factor(0)
        else:
            delay_factor = self.select_delay_factor(delay_factor)

        if read_timeout is None:
            read_timeout = 15
        else:
            read_timeout = read_timeout

        if config_commands is None:
            return ""
        elif isinstance(config_commands, str):
            config_commands = (config_commands,)

        if not hasattr(config_commands, "__iter__"):
            raise ValueError("Invalid argument passed into send_config_set")

        if bypass_commands is None:
            # Commands where cmd_verify is automatically disabled reg-ex logical-or
            bypass_commands = r"^banner .*$"

        # Set bypass_commands="" to force no-bypass (usually for testing)
        bypass_detected = False
        if bypass_commands:
            # Make a copy of the iterator
            config_commands, config_commands_tmp = itertools.tee(config_commands, 2)
            bypass_detected = any(
                [True for cmd in config_commands_tmp if re.search(bypass_commands, cmd)]
            )
        if bypass_detected:
            cmd_verify = False

        # Send config commands
        output = ""
        if enter_config_mode:
            if config_mode_command:
                output += self.config_mode(config_mode_command)
            else:
                output += self.config_mode()

        # Perform output gathering line-by-line (legacy way)
        if self.fast_cli and self._legacy_mode and not error_pattern:
            for cmd in config_commands:
                self.write_channel(self.normalize_cmd(cmd))
            # Gather output
            output += self.read_channel_timing(read_timeout=read_timeout)

        elif not cmd_verify:
            for cmd in config_commands:
                self.write_channel(self.normalize_cmd(cmd))
                time.sleep(delay_factor * 0.05)

                # Gather the output incrementally due to error_pattern requirements
                if error_pattern:
                    output += self.read_channel_timing(read_timeout=read_timeout)
                    if re.search(error_pattern, output, flags=re.M):
                        msg = f"Invalid input detected at command: {cmd}"
                        raise ConfigInvalidException(msg)

            # Standard output gathering (no error_pattern)
            if not error_pattern:
                output += self.read_channel_timing(read_timeout=read_timeout)

        else:
            for cmd in config_commands:
                self.write_channel(self.normalize_cmd(cmd))

                # Make sure command is echoed
                output += self.read_until_pattern(
                    pattern=re.escape(cmd.strip()), read_timeout=read_timeout
                )

                # Read until next prompt or terminator (#); the .*$ forces read of entire line
                pattern = f"(?:{re.escape(self.base_prompt)}.*$|{terminator}.*$)"
                output += self.read_until_pattern(
                    pattern=pattern, read_timeout=read_timeout, re_flags=re.M
                )

                if error_pattern:
                    if re.search(error_pattern, output, flags=re.M):
                        msg = f"Invalid input detected at command: {cmd}"
                        raise ConfigInvalidException(msg)

        if exit_config_mode:
            output += self.exit_config_mode()
        output = self._sanitize_output(output)
        log.debug(f"{output}")
        return output

    def strip_ansi_escape_codes(self, string_buffer: str) -> str:
        """
        Remove any ANSI (VT100) ESC codes from the output

        http://en.wikipedia.org/wiki/ANSI_escape_code

        Note: this does not capture ALL possible ANSI Escape Codes only the ones
        I have encountered

        Current codes that are filtered:
        ESC = '\x1b' or chr(27)
        ESC = is the escape character [^ in hex ('\x1b')
        ESC[24;27H   Position cursor
        ESC[?25h     Show the cursor
        ESC[E        Next line (HP does ESC-E)
        ESC[K        Erase line from cursor to the end of line
        ESC[2K       Erase entire line
        ESC[1;24r    Enable scrolling from start to row end
        ESC[?6l      Reset mode screen with options 640 x 200 monochrome (graphics)
        ESC[?7l      Disable line wrapping
        ESC[2J       Code erase display
        ESC[00;32m   Color Green (30 to 37 are different colors)
        ESC[6n       Get cursor position
        ESC[1D       Move cursor position leftward by x characters (1 in this case)
        ESC[9999B    Move cursor down N-lines (very large value is attempt to move to the
                     very bottom of the screen).

        HP ProCurve and Cisco SG300 require this (possible others).

        :param string_buffer: The string to be processed to remove ANSI escape codes
        :type string_buffer: str
        """  # noqa

        code_position_cursor = chr(27) + r"\[\d+;\d+H"
        code_show_cursor = chr(27) + r"\[\?25h"
        code_next_line = chr(27) + r"E"
        code_erase_line_end = chr(27) + r"\[K"
        code_erase_line = chr(27) + r"\[2K"
        code_erase_start_line = chr(27) + r"\[K"
        code_enable_scroll = chr(27) + r"\[\d+;\d+r"
        code_insert_line = chr(27) + r"\[(\d+)L"
        code_carriage_return = chr(27) + r"\[1M"
        code_disable_line_wrapping = chr(27) + r"\[\?7l"
        code_reset_mode_screen_options = chr(27) + r"\[\?\d+l"
        code_reset_graphics_mode = chr(27) + r"\[00m"
        code_erase_display = chr(27) + r"\[2J"
        code_erase_display_0 = chr(27) + r"\[J"
        code_graphics_mode = chr(27) + r"\[\dm"
        code_graphics_mode1 = chr(27) + r"\[\d\d;\d\dm"
        code_graphics_mode2 = chr(27) + r"\[\d\d;\d\d;\d\dm"
        code_graphics_mode3 = chr(27) + r"\[(3|4)\dm"
        code_graphics_mode4 = chr(27) + r"\[(9|10)[0-7]m"
        code_get_cursor_position = chr(27) + r"\[6n"
        code_cursor_position = chr(27) + r"\[m"
        code_attrs_off = chr(27) + r"\[0m"
        code_reverse = chr(27) + r"\[7m"
        code_cursor_left = chr(27) + r"\[\d+D"
        code_cursor_forward = chr(27) + r"\[\d*C"
        code_cursor_up = chr(27) + r"\[\d*A"
        code_cursor_down = chr(27) + r"\[\d*B"
        code_wrap_around = chr(27) + r"\[\?7h"
        code_bracketed_paste_mode = chr(27) + r"\[\?2004h"

        code_set = [
            code_position_cursor,
            code_show_cursor,
            code_erase_line,
            code_enable_scroll,
            code_erase_start_line,
            code_carriage_return,
            code_disable_line_wrapping,
            code_erase_line_end,
            code_reset_mode_screen_options,
            code_reset_graphics_mode,
            code_erase_display,
            code_graphics_mode,
            code_graphics_mode1,
            code_graphics_mode2,
            code_graphics_mode3,
            code_graphics_mode4,
            code_get_cursor_position,
            code_cursor_position,
            code_erase_display,
            code_erase_display_0,
            code_attrs_off,
            code_reverse,
            code_cursor_left,
            code_cursor_up,
            code_cursor_down,
            code_cursor_forward,
            code_wrap_around,
            code_bracketed_paste_mode,
        ]

        output = string_buffer
        for ansi_esc_code in code_set:
            output = re.sub(ansi_esc_code, "", output)

        # CODE_NEXT_LINE must substitute with return
        output = re.sub(code_next_line, self.RETURN, output)

        # Aruba and ProCurve switches can use code_insert_line for <enter>
        insert_line_match = re.search(code_insert_line, output)
        if insert_line_match:
            # Substitute each insert_line with a new <enter>
            count = int(insert_line_match.group(1))
            output = re.sub(code_insert_line, count * self.RETURN, output)

        return output

    def cleanup(self, command: str = "") -> None:
        """Logout of the session on the network device plus any additional cleanup."""
        pass

    def paramiko_cleanup(self) -> None:
        """Cleanup Paramiko to try to gracefully handle SSH session ending."""
        if self.remote_conn_pre is not None:
            self.remote_conn_pre.close()
        del self.remote_conn_pre

    def disconnect(self) -> None:
        """Try to gracefully close the session."""
        try:
            self.cleanup()
        except Exception:
            # Keep going on cleanup process even if exceptions
            pass

        try:
            if self.protocol == "ssh":
                self.paramiko_cleanup()
            elif self.protocol == "telnet":
                assert isinstance(self.remote_conn, telnetlib.Telnet)
                self.remote_conn.close()
            elif self.protocol == "serial":
                assert isinstance(self.remote_conn, serial.Serial)
                self.remote_conn.close()
        except Exception:
            # There have been race conditions observed on disconnect.
            pass
        finally:
            self.remote_conn_pre = None
            self.remote_conn = None
            if self.session_log:
                self.session_log.close()

    def commit(self) -> str:
        """Commit method for platforms that support this."""
        raise AttributeError("Network device does not support 'commit()' method")

    def save_config(
        self, cmd: str = "", confirm: bool = False, confirm_response: str = ""
    ) -> str:
        """Not Implemented"""
        raise NotImplementedError

    def run_ttp(
        self,
        template: Union[str, bytes, "PathLike[Any]"],
        res_kwargs: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Any:
        """
        Run TTP template parsing by using input parameters to collect
        devices output.

        :param template: template content, OS path to template or reference
            to template within TTP templates collection in
            ttp://path/to/template.txt format

        :param res_kwargs: ``**res_kwargs`` arguments to pass to TTP result method

        :param kwargs: any other ``**kwargs`` to use for TTP object instantiation

        TTP template must have inputs defined together with below parameters.

        :param method: name of Netmiko connection object method to call, default ``send_command``

        :param kwargs: Netmiko connection object method arguments

        :param commands: list of commands to collect

        Inputs' load could be of one of the supported formats and controlled by input's ``load``
        attribute, supported values - python, yaml or json. For each input output collected
        from device and parsed accordingly.
        """
        if res_kwargs is None:
            res_kwargs = {}
        return run_ttp_template(
            connection=self, template=template, res_kwargs=res_kwargs, **kwargs
        )

Subclasses

Static methods

def strip_backspaces(output: str) ‑> str

Strip any backspace characters out of the output.

:param output: Output obtained from a remote network device. :type output: str

Expand source code
@staticmethod
def strip_backspaces(output: str) -> str:
    """Strip any backspace characters out of the output.

    :param output: Output obtained from a remote network device.
    :type output: str
    """
    backspace_char = "\x08"
    return output.replace(backspace_char, "")

Methods

def check_config_mode(self, check_string: str = '', pattern: str = '', force_regex: bool = False) ‑> bool

Checks if the device is in configuration mode or not.

:param check_string: Identification of configuration mode from the device :type check_string: str

:param pattern: Pattern to terminate reading of channel :type pattern: str

:param force_regex: Use regular expression pattern to find check_string in output :type force_regex: bool

Expand source code
def check_config_mode(
    self, check_string: str = "", pattern: str = "", force_regex: bool = False
) -> bool:
    """Checks if the device is in configuration mode or not.

    :param check_string: Identification of configuration mode from the device
    :type check_string: str

    :param pattern: Pattern to terminate reading of channel
    :type pattern: str

    :param force_regex: Use regular expression pattern to find check_string in output
    :type force_regex: bool

    """
    self.write_channel(self.RETURN)
    # You can encounter an issue here (on router name changes) prefer delay-based solution
    if not pattern:
        output = self.read_channel_timing(read_timeout=10.0)
    else:
        output = self.read_until_pattern(pattern=pattern)

    if force_regex:
        return bool(re.search(check_string, output))
    else:
        return check_string in output
def check_enable_mode(self, check_string: str = '') ‑> bool

Check if in enable mode. Return boolean.

:param check_string: Identification of privilege mode from device :type check_string: str

Expand source code
def check_enable_mode(self, check_string: str = "") -> bool:
    """Check if in enable mode. Return boolean.

    :param check_string: Identification of privilege mode from device
    :type check_string: str
    """
    self.write_channel(self.RETURN)
    output = self.read_until_prompt(read_entire_line=True)
    return check_string in output
def cleanup(self, command: str = '') ‑> None

Logout of the session on the network device plus any additional cleanup.

Expand source code
def cleanup(self, command: str = "") -> None:
    """Logout of the session on the network device plus any additional cleanup."""
    pass
def clear_buffer(self, backoff: bool = True, backoff_max: float = 3.0, delay_factor: Optional[float] = None) ‑> str

Read any data available in the channel.

Expand source code
def clear_buffer(
    self,
    backoff: bool = True,
    backoff_max: float = 3.0,
    delay_factor: Optional[float] = None,
) -> str:
    """Read any data available in the channel."""

    if delay_factor is None:
        delay_factor = self.global_delay_factor
    sleep_time = 0.1 * delay_factor

    output = ""
    for _ in range(10):
        time.sleep(sleep_time)
        data = self.read_channel()
        data = self.strip_ansi_escape_codes(data)
        output += data
        if not data:
            break
        # Double sleep time each time we detect data
        log.debug("Clear buffer detects data in the channel")
        if backoff:
            sleep_time *= 2
            sleep_time = backoff_max if sleep_time >= backoff_max else sleep_time
    return output
def command_echo_read(self, cmd: str, read_timeout: float) ‑> str
Expand source code
def command_echo_read(self, cmd: str, read_timeout: float) -> str:
    # Make sure you read until you detect the command echo (avoid getting out of sync)
    new_data = self.read_until_pattern(
        pattern=re.escape(cmd), read_timeout=read_timeout
    )

    # There can be echoed prompts that haven't been cleared before the cmd echo
    # this can later mess up the trailing prompt pattern detection. Clear this out.
    lines = new_data.split(cmd)
    if len(lines) == 2:
        # lines[-1] should realistically just be the null string
        new_data = f"{cmd}{lines[-1]}"
    else:
        # cmd exists in the output multiple times? Just retain the original output
        pass
    return new_data
def commit(self) ‑> str

Commit method for platforms that support this.

Expand source code
def commit(self) -> str:
    """Commit method for platforms that support this."""
    raise AttributeError("Network device does not support 'commit()' method")
def config_mode(self, config_command: str = '', pattern: str = '', re_flags: int = 0) ‑> str

Enter into config_mode.

:param config_command: Configuration command to send to the device :type config_command: str

:param pattern: Pattern to terminate reading of channel :type pattern: str

:param re_flags: Regular expression flags :type re_flags: RegexFlag

Expand source code
def config_mode(
    self, config_command: str = "", pattern: str = "", re_flags: int = 0
) -> str:
    """Enter into config_mode.

    :param config_command: Configuration command to send to the device
    :type config_command: str

    :param pattern: Pattern to terminate reading of channel
    :type pattern: str

    :param re_flags: Regular expression flags
    :type re_flags: RegexFlag
    """
    output = ""
    if not self.check_config_mode():
        self.write_channel(self.normalize_cmd(config_command))
        # Make sure you read until you detect the command echo (avoid getting out of sync)
        if self.global_cmd_verify is not False:
            output += self.read_until_pattern(
                pattern=re.escape(config_command.strip())
            )
        if pattern:
            output += self.read_until_pattern(pattern=pattern, re_flags=re_flags)
        else:
            output += self.read_until_prompt(read_entire_line=True)
        if not self.check_config_mode():
            raise ValueError("Failed to enter configuration mode.")
    return output
def disable_paging(self, command: str = 'terminal length 0', delay_factor: Optional[float] = None, cmd_verify: bool = True, pattern: Optional[str] = None) ‑> str

Disable paging default to a Cisco CLI method.

:param command: Device command to disable pagination of output

:param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

:param cmd_verify: Verify command echo before proceeding (default: True).

:param pattern: Pattern to terminate reading of channel

Expand source code
def disable_paging(
    self,
    command: str = "terminal length 0",
    delay_factor: Optional[float] = None,
    cmd_verify: bool = True,
    pattern: Optional[str] = None,
) -> str:
    """Disable paging default to a Cisco CLI method.

    :param command: Device command to disable pagination of output

    :param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

    :param cmd_verify: Verify command echo before proceeding (default: True).

    :param pattern: Pattern to terminate reading of channel
    """
    if delay_factor is not None:
        warnings.warn(DELAY_FACTOR_DEPR_SIMPLE_MSG, DeprecationWarning)

    command = self.normalize_cmd(command)
    log.debug("In disable_paging")
    log.debug(f"Command: {command}")
    self.write_channel(command)
    # Make sure you read until you detect the command echo (avoid getting out of sync)
    if cmd_verify and self.global_cmd_verify is not False:
        output = self.read_until_pattern(
            pattern=re.escape(command.strip()), read_timeout=20
        )
    elif pattern:
        output = self.read_until_pattern(pattern=pattern, read_timeout=20)
    else:
        output = self.read_until_prompt()
    log.debug(f"{output}")
    log.debug("Exiting disable_paging")
    return output
def disconnect(self) ‑> None

Try to gracefully close the session.

Expand source code
def disconnect(self) -> None:
    """Try to gracefully close the session."""
    try:
        self.cleanup()
    except Exception:
        # Keep going on cleanup process even if exceptions
        pass

    try:
        if self.protocol == "ssh":
            self.paramiko_cleanup()
        elif self.protocol == "telnet":
            assert isinstance(self.remote_conn, telnetlib.Telnet)
            self.remote_conn.close()
        elif self.protocol == "serial":
            assert isinstance(self.remote_conn, serial.Serial)
            self.remote_conn.close()
    except Exception:
        # There have been race conditions observed on disconnect.
        pass
    finally:
        self.remote_conn_pre = None
        self.remote_conn = None
        if self.session_log:
            self.session_log.close()
def enable(self, cmd: str = '', pattern: str = 'ssword', enable_pattern: Optional[str] = None, check_state: bool = True, re_flags: int = re.IGNORECASE) ‑> str

Enter enable mode.

:param cmd: Device command to enter enable mode

:param pattern: pattern to search for indicating device is waiting for password

:param enable_pattern: pattern indicating you have entered enable mode

:param check_state: Determine whether we are already in enable_mode using check_enable_mode() before trying to elevate privileges (default: True)

:param re_flags: Regular expression flags used in conjunction with pattern

Expand source code
def enable(
    self,
    cmd: str = "",
    pattern: str = "ssword",
    enable_pattern: Optional[str] = None,
    check_state: bool = True,
    re_flags: int = re.IGNORECASE,
) -> str:
    """Enter enable mode.

    :param cmd: Device command to enter enable mode

    :param pattern: pattern to search for indicating device is waiting for password

    :param enable_pattern: pattern indicating you have entered enable mode

    :param check_state: Determine whether we are already in enable_mode using
            check_enable_mode() before trying to elevate privileges (default: True)

    :param re_flags: Regular expression flags used in conjunction with pattern
    """
    output = ""
    msg = (
        "Failed to enter enable mode. Please ensure you pass "
        "the 'secret' argument to ConnectHandler."
    )

    # Check if in enable mode already.
    if check_state and self.check_enable_mode():
        return output

    # Send "enable" mode command
    self.write_channel(self.normalize_cmd(cmd))
    try:
        # Read the command echo
        if self.global_cmd_verify is not False:
            output += self.read_until_pattern(pattern=re.escape(cmd.strip()))

        # Search for trailing prompt or password pattern
        output += self.read_until_prompt_or_pattern(
            pattern=pattern, re_flags=re_flags
        )

        # Send the "secret" in response to password pattern
        if re.search(pattern, output):
            self.write_channel(self.normalize_cmd(self.secret))
            output += self.read_until_prompt()

        # Search for terminating pattern if defined
        if enable_pattern and not re.search(enable_pattern, output):
            output += self.read_until_pattern(pattern=enable_pattern)
        else:
            if not self.check_enable_mode():
                raise ValueError(msg)

    except NetmikoTimeoutException:
        raise ValueError(msg)

    return output
def establish_connection(self, width: int = 511, height: int = 1000) ‑> None

Establish SSH connection to the network device

Timeout will generate a NetmikoTimeoutException Authentication failure will generate a NetmikoAuthenticationException

:param width: Specified width of the VT100 terminal window (default: 511) :type width: int

:param height: Specified height of the VT100 terminal window (default: 1000) :type height: int

Expand source code
    def establish_connection(self, width: int = 511, height: int = 1000) -> None:
        """Establish SSH connection to the network device

        Timeout will generate a NetmikoTimeoutException
        Authentication failure will generate a NetmikoAuthenticationException

        :param width: Specified width of the VT100 terminal window (default: 511)
        :type width: int

        :param height: Specified height of the VT100 terminal window (default: 1000)
        :type height: int
        """
        self.channel: Channel
        if self.protocol == "telnet":
            if self.sock_telnet:
                self.remote_conn = telnet_proxy.Telnet(
                    self.host,
                    port=self.port,
                    timeout=self.timeout,
                    proxy_dict=self.sock_telnet,
                )
            else:
                self.remote_conn = telnetlib.Telnet(
                    self.host, port=self.port, timeout=self.timeout
                )
            # Migrating communication to channel class
            self.channel = TelnetChannel(conn=self.remote_conn, encoding=self.encoding)
            self.telnet_login()
        elif self.protocol == "serial":
            self.remote_conn = serial.Serial(**self.serial_settings)
            self.channel = SerialChannel(conn=self.remote_conn, encoding=self.encoding)
            self.serial_login()
        elif self.protocol == "ssh":
            ssh_connect_params = self._connect_params_dict()
            self.remote_conn_pre: Optional[paramiko.SSHClient]
            self.remote_conn_pre = self._build_ssh_client()

            # initiate SSH connection
            try:
                self.remote_conn_pre.connect(**ssh_connect_params)
            except socket.error as conn_error:
                self.paramiko_cleanup()
                msg = f"""TCP connection to device failed.

Common causes of this problem are:
1. Incorrect hostname or IP address.
2. Wrong TCP port.
3. Intermediate firewall blocking access.

Device settings: {self.device_type} {self.host}:{self.port}

"""

                # Handle DNS failures separately
                if "Name or service not known" in str(conn_error):
                    msg = (
                        f"DNS failure--the hostname you provided was not resolvable "
                        f"in DNS: {self.host}:{self.port}"
                    )

                msg = msg.lstrip()
                raise NetmikoTimeoutException(msg)
            except paramiko.ssh_exception.AuthenticationException as auth_err:
                self.paramiko_cleanup()
                msg = f"""Authentication to device failed.

Common causes of this problem are:
1. Invalid username and password
2. Incorrect SSH-key file
3. Connecting to the wrong device

Device settings: {self.device_type} {self.host}:{self.port}

"""

                msg += self.RETURN + str(auth_err)
                raise NetmikoAuthenticationException(msg)
            except paramiko.ssh_exception.SSHException as e:
                self.paramiko_cleanup()
                if "No existing session" in str(e):
                    msg = (
                        "Paramiko: 'No existing session' error: "
                        "try increasing 'conn_timeout' to 15 seconds or larger."
                    )
                    raise NetmikoTimeoutException(msg)
                else:
                    msg = f"""
A paramiko SSHException occurred during connection creation:

{str(e)}

"""
                    raise NetmikoTimeoutException(msg)

            if self.verbose:
                print(f"SSH connection established to {self.host}:{self.port}")

            # Use invoke_shell to establish an 'interactive session'
            self.remote_conn = self.remote_conn_pre.invoke_shell(
                term="vt100", width=width, height=height
            )

            self.remote_conn.settimeout(self.blocking_timeout)
            if self.keepalive:
                assert isinstance(self.remote_conn.transport, paramiko.Transport)
                self.remote_conn.transport.set_keepalive(self.keepalive)

            # Migrating communication to channel class
            self.channel = SSHChannel(conn=self.remote_conn, encoding=self.encoding)

            self.special_login_handler()
            if self.verbose:
                print("Interactive SSH session established")

        return None
def exit_config_mode(self, exit_config: str = '', pattern: str = '') ‑> str

Exit from configuration mode.

:param exit_config: Command to exit configuration mode :type exit_config: str

:param pattern: Pattern to terminate reading of channel :type pattern: str

Expand source code
def exit_config_mode(self, exit_config: str = "", pattern: str = "") -> str:
    """Exit from configuration mode.

    :param exit_config: Command to exit configuration mode
    :type exit_config: str

    :param pattern: Pattern to terminate reading of channel
    :type pattern: str
    """
    output = ""
    if self.check_config_mode():
        self.write_channel(self.normalize_cmd(exit_config))
        # Make sure you read until you detect the command echo (avoid getting out of sync)
        if self.global_cmd_verify is not False:
            output += self.read_until_pattern(
                pattern=re.escape(exit_config.strip())
            )
        if pattern:
            output += self.read_until_pattern(pattern=pattern)
        else:
            output += self.read_until_prompt(read_entire_line=True)
        if self.check_config_mode():
            raise ValueError("Failed to exit configuration mode")
    log.debug(f"exit_config_mode: {output}")
    return output
def exit_enable_mode(self, exit_command: str = '') ‑> str

Exit enable mode.

:param exit_command: Command that exits the session from privileged mode :type exit_command: str

Expand source code
def exit_enable_mode(self, exit_command: str = "") -> str:
    """Exit enable mode.

    :param exit_command: Command that exits the session from privileged mode
    :type exit_command: str
    """
    output = ""
    if self.check_enable_mode():
        self.write_channel(self.normalize_cmd(exit_command))
        output += self.read_until_prompt()
        if self.check_enable_mode():
            raise ValueError("Failed to exit enable mode.")
    return output
def find_prompt(self, delay_factor: float = 1.0, pattern: Optional[str] = None) ‑> str

Finds the current network device prompt, last line only.

:param delay_factor: See init: global_delay_factor :type delay_factor: int

:param pattern: Regular expression pattern to determine whether prompt is valid

Expand source code
def find_prompt(
    self, delay_factor: float = 1.0, pattern: Optional[str] = None
) -> str:
    """Finds the current network device prompt, last line only.

    :param delay_factor: See __init__: global_delay_factor
    :type delay_factor: int

    :param pattern: Regular expression pattern to determine whether prompt is valid
    """
    delay_factor = self.select_delay_factor(delay_factor)
    sleep_time = delay_factor * 0.25
    self.clear_buffer()
    self.write_channel(self.RETURN)

    if pattern:
        prompt = self.read_until_pattern(pattern=pattern)
    else:
        # Initial read
        time.sleep(sleep_time)
        prompt = self.read_channel().strip()

        count = 0
        while count <= 12 and not prompt:
            if not prompt:
                self.write_channel(self.RETURN)
                time.sleep(sleep_time)
                prompt = self.read_channel().strip()
                if sleep_time <= 3:
                    # Double the sleep_time when it is small
                    sleep_time *= 2
                else:
                    sleep_time += 1
            count += 1

    # If multiple lines in the output take the last line
    prompt = prompt.split(self.RESPONSE_RETURN)[-1]
    prompt = prompt.strip()
    self.clear_buffer()
    if not prompt:
        raise ValueError(f"Unable to find prompt: {prompt}")
    log.debug(f"[find_prompt()]: prompt is {prompt}")
    return prompt
def is_alive(self) ‑> bool

Returns a boolean flag with the state of the connection.

Expand source code
def is_alive(self) -> bool:
    """Returns a boolean flag with the state of the connection."""
    null = chr(0)
    if self.remote_conn is None:
        log.error("Connection is not initialised, is_alive returns False")
        return False
    if self.protocol == "telnet":
        try:
            # Try sending IAC + NOP (IAC is telnet way of sending command)
            # IAC = Interpret as Command; it comes before the NOP.
            log.debug("Sending IAC + NOP")
            # Need to send multiple times to test connection
            assert isinstance(self.remote_conn, telnetlib.Telnet)
            telnet_socket = self.remote_conn.get_socket()
            telnet_socket.sendall(telnetlib.IAC + telnetlib.NOP)
            telnet_socket.sendall(telnetlib.IAC + telnetlib.NOP)
            telnet_socket.sendall(telnetlib.IAC + telnetlib.NOP)
            return True
        except AttributeError:
            return False
    else:
        # SSH
        try:
            # Try sending ASCII null byte to maintain the connection alive
            log.debug("Sending the NULL byte")
            self.write_channel(null)
            assert isinstance(self.remote_conn, paramiko.Channel)
            assert self.remote_conn.transport is not None
            result = self.remote_conn.transport.is_active()
            assert isinstance(result, bool)
            return result
        except (socket.error, EOFError):
            log.error("Unable to send", exc_info=True)
            # If unable to send, we can tell for sure that the connection is unusable
            return False
    return False
def normalize_cmd(self, command: str) ‑> str

Normalize CLI commands to have a single trailing newline.

:param command: Command that may require line feed to be normalized :type command: str

Expand source code
def normalize_cmd(self, command: str) -> str:
    """Normalize CLI commands to have a single trailing newline.

    :param command: Command that may require line feed to be normalized
    :type command: str
    """
    command = command.rstrip()
    command += self.RETURN
    return command
def normalize_linefeeds(self, a_string: str) ‑> str

Convert `

, ,

to .`

    :param a_string: A string that may have non-normalized line feeds
        i.e. output returned from device, or a device prompt
    :type a_string: str
Expand source code
def normalize_linefeeds(self, a_string: str) -> str:
    """Convert `\r\r\n`,`\r\n`, `\n\r` to `\n.`

    :param a_string: A string that may have non-normalized line feeds
        i.e. output returned from device, or a device prompt
    :type a_string: str
    """
    newline = re.compile("(\r\r\r\n|\r\r\n|\r\n|\n\r)")
    a_string = newline.sub(self.RESPONSE_RETURN, a_string)
    if self.RESPONSE_RETURN == "\n":
        # Convert any remaining \r to \n
        return re.sub("\r", self.RESPONSE_RETURN, a_string)
    else:
        return a_string
def paramiko_cleanup(self) ‑> None

Cleanup Paramiko to try to gracefully handle SSH session ending.

Expand source code
def paramiko_cleanup(self) -> None:
    """Cleanup Paramiko to try to gracefully handle SSH session ending."""
    if self.remote_conn_pre is not None:
        self.remote_conn_pre.close()
    del self.remote_conn_pre
def read_channel(self) ‑> str

Generic handler that will read all the data from given channel.

Expand source code
@lock_channel
def read_channel(self) -> str:
    """Generic handler that will read all the data from given channel."""
    new_data = self.channel.read_channel()

    if self.disable_lf_normalization is False:
        start = time.time()
        # Data blocks shouldn't end in '\r' (can cause problems with normalize_linefeeds)
        # Only do the extra read if '\n' exists in the output
        # this avoids devices that only use \r.
        while ("\n" in new_data) and (time.time() - start < 1.0):
            if new_data[-1] == "\r":
                time.sleep(0.01)
                new_data += self.channel.read_channel()
            else:
                break
        new_data = self.normalize_linefeeds(new_data)

    if self.ansi_escape_codes:
        new_data = self.strip_ansi_escape_codes(new_data)
    log.debug(f"read_channel: {new_data}")
    if self.session_log:
        self.session_log.write(new_data)

    # If data had been previously saved to the buffer, the prepend it to output
    # do post read_channel so session_log/log doesn't record buffered data twice
    if self._read_buffer:
        output = self._read_buffer + new_data
        self._read_buffer = ""
    else:
        output = new_data
    return output
def read_channel_timing(self, last_read: float = 2.0, read_timeout: float = 120.0, delay_factor: Optional[float] = None, max_loops: Optional[int] = None) ‑> str

Read data on the channel based on timing delays.

General pattern is keep reading until no new data is read.

Once no new data is read wait last_read amount of time (one last read). As long as no new data, then return data.

Setting read_timeout to zero will cause read_channel_timing to never expire based on an absolute timeout. It will only complete based on timeout based on there being no new data.

:param last_read: Amount of time to wait before performing one last read (under the idea that we should be done reading at this point and there should be no new data).

:param read_timeout: Absolute timer for how long Netmiko should keep reading data on the channel (waiting for there to be no new data). Will raise ReadTimeout if this timeout expires. A read_timeout value of 0 will cause the read-loop to never timeout (i.e. Netmiko will keep reading indefinitely until there is no new data and last_read passes).

:param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

:param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

Expand source code
    def read_channel_timing(
        self,
        last_read: float = 2.0,
        read_timeout: float = 120.0,
        delay_factor: Optional[float] = None,
        max_loops: Optional[int] = None,
    ) -> str:
        """Read data on the channel based on timing delays.

        General pattern is keep reading until no new data is read.

        Once no new data is read wait `last_read` amount of time (one last read).
        As long as no new data, then return data.

        Setting `read_timeout` to zero will cause read_channel_timing to never expire based
        on an absolute timeout. It will only complete based on timeout based on there being
        no new data.

        :param last_read: Amount of time to wait before performing one last read (under the
            idea that we should be done reading at this point and there should be no new
            data).

        :param read_timeout: Absolute timer for how long Netmiko should keep reading data on
            the channel (waiting for there to be no new data). Will raise ReadTimeout if this
            timeout expires. A read_timeout value of 0 will cause the read-loop to never timeout
            (i.e. Netmiko will keep reading indefinitely until there is no new data and last_read
            passes).

        :param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

        :param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.
        """

        if delay_factor is not None or max_loops is not None:
            warnings.warn(DELAY_FACTOR_DEPR_SIMPLE_MSG, DeprecationWarning)

        if self.read_timeout_override:
            read_timeout = self.read_timeout_override

        # Time to delay in each read loop
        loop_delay = 0.1
        channel_data = ""
        start_time = time.time()

        # Set read_timeout to 0 to never timeout
        while (time.time() - start_time < read_timeout) or (not read_timeout):
            time.sleep(loop_delay)
            new_data = self.read_channel()
            # gather new output
            if new_data:
                channel_data += new_data
            # if we have some output, but nothing new, then do the last read
            elif channel_data != "":
                # Make sure really done (i.e. no new data)
                time.sleep(last_read)
                new_data = self.read_channel()
                if not new_data:
                    break
                else:
                    channel_data += new_data
        else:
            msg = f"""\n
read_channel_timing's absolute timer expired.

The network device was continually outputting data for longer than {read_timeout}
seconds.

If this is expected i.e. the command you are executing is continually emitting
data for a long period of time, then you can set 'read_timeout=x' seconds. If
you want Netmiko to keep reading indefinitely (i.e. to only stop when there is
no new data), then you can set 'read_timeout=0'.

You can look at the Netmiko session_log or debug log for more information.

"""
            raise ReadTimeout(msg)
        return channel_data
def read_until_pattern(self, pattern: str = '', read_timeout: float = 10.0, re_flags: int = 0, max_loops: Optional[int] = None) ‑> str

Read channel until pattern is detected.

Will return string up to and including pattern.

Returns ReadTimeout if pattern not detected in read_timeout seconds.

:param pattern: Regular expression pattern used to identify that reading is done.

:param read_timeout: maximum time to wait looking for pattern. Will raise ReadTimeout. A read_timeout value of 0 will cause the loop to never timeout (i.e. it will keep reading indefinitely until pattern is detected.

:param re_flags: regex flags used in conjunction with pattern (defaults to no flags).

:param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

Expand source code
    def read_until_pattern(
        self,
        pattern: str = "",
        read_timeout: float = 10.0,
        re_flags: int = 0,
        max_loops: Optional[int] = None,
    ) -> str:
        """Read channel until pattern is detected.

        Will return string up to and including pattern.

        Returns ReadTimeout if pattern not detected in read_timeout seconds.

        :param pattern: Regular expression pattern used to identify that reading is done.

        :param read_timeout: maximum time to wait looking for pattern. Will raise ReadTimeout.
            A read_timeout value of 0 will cause the loop to never timeout (i.e. it will keep
            reading indefinitely until pattern is detected.

        :param re_flags: regex flags used in conjunction with pattern (defaults to no flags).

        :param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.
        """
        if max_loops is not None:
            msg = """\n
Netmiko 4.x has deprecated the use of max_loops with read_until_pattern.
You should convert all uses of max_loops over to read_timeout=x
where x is the total number of seconds to wait before timing out.\n"""
            warnings.warn(msg, DeprecationWarning)

        if self.read_timeout_override:
            read_timeout = self.read_timeout_override

        output = ""
        loop_delay = 0.01
        start_time = time.time()
        # if read_timeout == 0 or 0.0 keep reading indefinitely
        while (time.time() - start_time < read_timeout) or (not read_timeout):
            output += self.read_channel()

            if re.search(pattern, output, flags=re_flags):
                if "(" in pattern and "(?:" not in pattern:
                    msg = f"""
Parenthesis found in pattern.

pattern: {pattern}\n

This can be problemtic when used in read_until_pattern().

You should ensure that you use either non-capture groups i.e. '(?:' or that the
parenthesis completely wrap the pattern '(pattern)'"""
                    log.debug(msg)
                results = re.split(pattern, output, maxsplit=1, flags=re_flags)

                # The string matched by pattern must be retained in the output string.
                # re.split will do this if capturing parenthesis are used.
                if len(results) == 2:
                    # no capturing parenthesis, convert and try again.
                    pattern = f"({pattern})"
                    results = re.split(pattern, output, maxsplit=1, flags=re_flags)

                if len(results) != 3:
                    # well, we tried
                    msg = f"""Unable to successfully split output based on pattern:
pattern={pattern}
output={repr(output)}
results={results}
"""
                    raise ReadException(msg)

                # Process such that everything before and including pattern is return.
                # Everything else is retained in the _read_buffer
                output, match_str, buffer = results
                output = output + match_str
                if buffer:
                    self._read_buffer += buffer
                log.debug(f"Pattern found: {pattern} {output}")
                return output
            time.sleep(loop_delay)

        msg = f"""\n\nPattern not detected: {repr(pattern)} in output.

Things you might try to fix this:
1. Adjust the regex pattern to better identify the terminating string. Note, in
many situations the pattern is automatically based on the network device's prompt.
2. Increase the read_timeout to a larger value.

You can also look at the Netmiko session_log or debug log for more information.\n\n"""
        raise ReadTimeout(msg)
def read_until_prompt(self, read_timeout: float = 10.0, read_entire_line: bool = False, re_flags: int = 0, max_loops: Optional[int] = None) ‑> str

Read channel up to and including self.base_prompt.

Expand source code
def read_until_prompt(
    self,
    read_timeout: float = 10.0,
    read_entire_line: bool = False,
    re_flags: int = 0,
    max_loops: Optional[int] = None,
) -> str:
    """Read channel up to and including self.base_prompt."""
    pattern = re.escape(self.base_prompt)
    if read_entire_line:
        pattern = f"{pattern}.*"
    return self.read_until_pattern(
        pattern=pattern,
        re_flags=re_flags,
        max_loops=max_loops,
        read_timeout=read_timeout,
    )
def read_until_prompt_or_pattern(self, pattern: str = '', read_timeout: float = 10.0, read_entire_line: bool = False, re_flags: int = 0, max_loops: Optional[int] = None) ‑> str

Read until either self.base_prompt or pattern is detected.

Expand source code
def read_until_prompt_or_pattern(
    self,
    pattern: str = "",
    read_timeout: float = 10.0,
    read_entire_line: bool = False,
    re_flags: int = 0,
    max_loops: Optional[int] = None,
) -> str:
    """Read until either self.base_prompt or pattern is detected."""
    prompt_pattern = re.escape(self.base_prompt)
    if read_entire_line:
        prompt_pattern = f"{prompt_pattern}.*"
    if pattern:
        combined_pattern = r"(?:{}|{})".format(prompt_pattern, pattern)
    else:
        combined_pattern = prompt_pattern
    return self.read_until_pattern(
        pattern=combined_pattern,
        re_flags=re_flags,
        max_loops=max_loops,
        read_timeout=read_timeout,
    )
def run_ttp(self, template: Union[str, bytes, ForwardRef('PathLike[Any]')], res_kwargs: Optional[Dict[str, Any]] = None, **kwargs: Any) ‑> Any

Run TTP template parsing by using input parameters to collect devices output.

:param template: template content, OS path to template or reference to template within TTP templates collection in ttp://path/to/template.txt format

:param res_kwargs: **res_kwargs arguments to pass to TTP result method

:param kwargs: any other **kwargs to use for TTP object instantiation

TTP template must have inputs defined together with below parameters.

:param method: name of Netmiko connection object method to call, default send_command

:param kwargs: Netmiko connection object method arguments

:param commands: list of commands to collect

Inputs' load could be of one of the supported formats and controlled by input's load attribute, supported values - python, yaml or json. For each input output collected from device and parsed accordingly.

Expand source code
def run_ttp(
    self,
    template: Union[str, bytes, "PathLike[Any]"],
    res_kwargs: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
) -> Any:
    """
    Run TTP template parsing by using input parameters to collect
    devices output.

    :param template: template content, OS path to template or reference
        to template within TTP templates collection in
        ttp://path/to/template.txt format

    :param res_kwargs: ``**res_kwargs`` arguments to pass to TTP result method

    :param kwargs: any other ``**kwargs`` to use for TTP object instantiation

    TTP template must have inputs defined together with below parameters.

    :param method: name of Netmiko connection object method to call, default ``send_command``

    :param kwargs: Netmiko connection object method arguments

    :param commands: list of commands to collect

    Inputs' load could be of one of the supported formats and controlled by input's ``load``
    attribute, supported values - python, yaml or json. For each input output collected
    from device and parsed accordingly.
    """
    if res_kwargs is None:
        res_kwargs = {}
    return run_ttp_template(
        connection=self, template=template, res_kwargs=res_kwargs, **kwargs
    )
def save_config(self, cmd: str = '', confirm: bool = False, confirm_response: str = '') ‑> str

Not Implemented

Expand source code
def save_config(
    self, cmd: str = "", confirm: bool = False, confirm_response: str = ""
) -> str:
    """Not Implemented"""
    raise NotImplementedError
def select_delay_factor(self, delay_factor: float) ‑> float

Choose the greater of delay_factor or self.global_delay_factor (default). In fast_cli choose the lesser of delay_factor of self.global_delay_factor.

:param delay_factor: See init: global_delay_factor :type delay_factor: int

Expand source code
def select_delay_factor(self, delay_factor: float) -> float:
    """
    Choose the greater of delay_factor or self.global_delay_factor (default).
    In fast_cli choose the lesser of delay_factor of self.global_delay_factor.

    :param delay_factor: See __init__: global_delay_factor
    :type delay_factor: int
    """
    if self.fast_cli:
        if delay_factor and delay_factor <= self.global_delay_factor:
            return delay_factor
        else:
            return self.global_delay_factor
    else:
        if delay_factor >= self.global_delay_factor:
            return delay_factor
        else:
            return self.global_delay_factor
def send_command(self, command_string: str, expect_string: Optional[str] = None, read_timeout: float = 10.0, delay_factor: Optional[float] = None, max_loops: Optional[int] = None, auto_find_prompt: bool = True, strip_prompt: bool = True, strip_command: bool = True, normalize: bool = True, use_textfsm: bool = False, textfsm_template: Optional[str] = None, use_ttp: bool = False, ttp_template: Optional[str] = None, use_genie: bool = False, cmd_verify: bool = True) ‑> Union[str, List[Any], Dict[str, Any]]

Execute command_string on the SSH channel using a pattern-based mechanism. Generally used for show commands. By default this method will keep waiting to receive data until the network device prompt is detected. The current network device prompt will be determined automatically.

:param command_string: The command to be executed on the remote device.

:param expect_string: Regular expression pattern to use for determining end of output. If left blank will default to being based on router prompt.

:param read_timeout: Maximum time to wait looking for pattern. Will raise ReadTimeout if timeout is exceeded.

:param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

:param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

:param auto_find_prompt: Use find_prompt() to override base prompt

:param strip_prompt: Remove the trailing router prompt from the output (default: True).

:param strip_command: Remove the echo of the command from the output (default: True).

:param normalize: Ensure the proper enter is sent at end of command (default: True).

:param use_textfsm: Process command output through TextFSM template (default: False).

:param textfsm_template: Name of template to parse output with; can be fully qualified path, relative path, or name of file in current directory. (default: None).

:param use_ttp: Process command output through TTP template (default: False).

:param ttp_template: Name of template to parse output with; can be fully qualified path, relative path, or name of file in current directory. (default: None).

:param use_genie: Process command output through PyATS/Genie parser (default: False).

:param cmd_verify: Verify command echo before proceeding (default: True).

Expand source code
    @flush_session_log
    @select_cmd_verify
    def send_command(
        self,
        command_string: str,
        expect_string: Optional[str] = None,
        read_timeout: float = 10.0,
        delay_factor: Optional[float] = None,
        max_loops: Optional[int] = None,
        auto_find_prompt: bool = True,
        strip_prompt: bool = True,
        strip_command: bool = True,
        normalize: bool = True,
        use_textfsm: bool = False,
        textfsm_template: Optional[str] = None,
        use_ttp: bool = False,
        ttp_template: Optional[str] = None,
        use_genie: bool = False,
        cmd_verify: bool = True,
    ) -> Union[str, List[Any], Dict[str, Any]]:
        """Execute command_string on the SSH channel using a pattern-based mechanism. Generally
        used for show commands. By default this method will keep waiting to receive data until the
        network device prompt is detected. The current network device prompt will be determined
        automatically.

        :param command_string: The command to be executed on the remote device.

        :param expect_string: Regular expression pattern to use for determining end of output.
            If left blank will default to being based on router prompt.

        :param read_timeout: Maximum time to wait looking for pattern. Will raise ReadTimeout
            if timeout is exceeded.

        :param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

        :param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

        :param auto_find_prompt: Use find_prompt() to override base prompt

        :param strip_prompt: Remove the trailing router prompt from the output (default: True).

        :param strip_command: Remove the echo of the command from the output (default: True).

        :param normalize: Ensure the proper enter is sent at end of command (default: True).

        :param use_textfsm: Process command output through TextFSM template (default: False).

        :param textfsm_template: Name of template to parse output with; can be fully qualified
            path, relative path, or name of file in current directory. (default: None).

        :param use_ttp: Process command output through TTP template (default: False).

        :param ttp_template: Name of template to parse output with; can be fully qualified
            path, relative path, or name of file in current directory. (default: None).

        :param use_genie: Process command output through PyATS/Genie parser (default: False).

        :param cmd_verify: Verify command echo before proceeding (default: True).
        """

        # Time to delay in each read loop
        loop_delay = 0.025

        if self.read_timeout_override:
            read_timeout = self.read_timeout_override

        if self.delay_factor_compat:
            # For compatibility calculate the old equivalent read_timeout
            # i.e. what it would have been in Netmiko 3.x
            if delay_factor is None:
                tmp_delay_factor = self.global_delay_factor
            else:
                tmp_delay_factor = self.select_delay_factor(delay_factor)
            compat_timeout = calc_old_timeout(
                max_loops=max_loops,
                delay_factor=tmp_delay_factor,
                loop_delay=0.2,
                old_timeout=self.timeout,
            )
            msg = f"""\n
You have chosen to use Netmiko's delay_factor compatibility mode for
send_command. This will revert Netmiko to behave similarly to how it
did in Netmiko 3.x (i.e. to use delay_factor/global_delay_factor and
max_loops).

Using these parameters Netmiko has calculated an effective read_timeout
of {compat_timeout} and will set the read_timeout to this value.

Please convert your code to that new format i.e.:

    net_connect.send_command(cmd, read_timeout={compat_timeout})

And then disable delay_factor_compat.

delay_factor_compat will be removed in Netmiko 5.x.\n"""
            warnings.warn(msg, DeprecationWarning)

            # Override the read_timeout with Netmiko 3.x way :-(
            read_timeout = compat_timeout

        else:
            # No need for two deprecation messages so only display this if not using
            # delay_factor_compat
            if delay_factor is not None or max_loops is not None:
                msg = """\n
Netmiko 4.x has deprecated the use of delay_factor/max_loops with
send_command. You should convert all uses of delay_factor and max_loops
over to read_timeout=x where x is the total number of seconds to wait
before timing out.\n"""
                warnings.warn(msg, DeprecationWarning)

        if expect_string is not None:
            search_pattern = expect_string
        else:
            search_pattern = self._prompt_handler(auto_find_prompt)

        if normalize:
            command_string = self.normalize_cmd(command_string)

        # Start the clock
        start_time = time.time()
        self.write_channel(command_string)
        new_data = ""

        cmd = command_string.strip()
        if cmd and cmd_verify:
            new_data = self.command_echo_read(cmd=cmd, read_timeout=10)

        MAX_CHARS = 2_000_000
        DEQUE_SIZE = 20
        output = ""
        # Check only the past N-reads. This is for the case where the output is
        # very large (i.e. searching a very large string for a pattern a whole bunch of times)
        past_n_reads: Deque[str] = deque(maxlen=DEQUE_SIZE)
        first_line_processed = False

        # Keep reading data until search_pattern is found or until read_timeout
        while time.time() - start_time < read_timeout:
            if new_data:
                output += new_data
                past_n_reads.append(new_data)

                # Case where we haven't processed the first_line yet (there is a potential issue
                # in the first line (in cases where the line is repainted).
                if not first_line_processed:
                    output, first_line_processed = self._first_line_handler(
                        output, search_pattern
                    )
                    # Check if we have already found our pattern
                    if re.search(search_pattern, output):
                        break

                else:
                    if len(output) <= MAX_CHARS:
                        if re.search(search_pattern, output):
                            break
                    else:
                        # Switch to deque mode if output is greater than MAX_CHARS
                        # Check if pattern is in the past n reads
                        if re.search(search_pattern, "".join(past_n_reads)):
                            break

            time.sleep(loop_delay)
            new_data = self.read_channel()

        else:  # nobreak
            msg = f"""
Pattern not detected: {repr(search_pattern)} in output.

Things you might try to fix this:
1. Explicitly set your pattern using the expect_string argument.
2. Increase the read_timeout to a larger value.

You can also look at the Netmiko session_log or debug log for more information.

"""
            raise ReadTimeout(msg)

        output = self._sanitize_output(
            output,
            strip_command=strip_command,
            command_string=command_string,
            strip_prompt=strip_prompt,
        )
        return_val = structured_data_converter(
            command=command_string,
            raw_data=output,
            platform=self.device_type,
            use_textfsm=use_textfsm,
            use_ttp=use_ttp,
            use_genie=use_genie,
            textfsm_template=textfsm_template,
            ttp_template=ttp_template,
        )
        return return_val
def send_command_expect(self, *args: Any, **kwargs: Any) ‑> Union[str, List[Any], Dict[str, Any]]

Support previous name of send_command method.

Expand source code
def send_command_expect(
    self, *args: Any, **kwargs: Any
) -> Union[str, List[Any], Dict[str, Any]]:
    """Support previous name of send_command method."""
    return self.send_command(*args, **kwargs)
def send_command_timing(self, command_string: str, last_read: float = 2.0, read_timeout: float = 120.0, delay_factor: Optional[float] = None, max_loops: Optional[int] = None, strip_prompt: bool = True, strip_command: bool = True, normalize: bool = True, use_textfsm: bool = False, textfsm_template: Optional[str] = None, use_ttp: bool = False, ttp_template: Optional[str] = None, use_genie: bool = False, cmd_verify: bool = False) ‑> Union[str, List[Any], Dict[str, Any]]

Execute command_string on the SSH channel using a delay-based mechanism. Generally used for show commands.

:param command_string: The command to be executed on the remote device.

:param last_read: Time waited after end of data

:param read_timeout: Absolute timer for how long Netmiko should keep reading data on the channel (waiting for there to be no new data). Will raise ReadTimeout if this timeout expires. A read_timeout value of 0 will cause the read-loop to never timeout (i.e. Netmiko will keep reading indefinitely until there is no new data and last_read passes).

:param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

:param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

:param strip_prompt: Remove the trailing router prompt from the output (default: True).

:param strip_command: Remove the echo of the command from the output (default: True).

:param normalize: Ensure the proper enter is sent at end of command (default: True).

:param use_textfsm: Process command output through TextFSM template (default: False).

:param textfsm_template: Name of template to parse output with; can be fully qualified path, relative path, or name of file in current directory. (default: None).

:param use_ttp: Process command output through TTP template (default: False).

:param ttp_template: Name of template to parse output with; can be fully qualified path, relative path, or name of file in current directory. (default: None).

:param use_genie: Process command output through PyATS/Genie parser (default: False).

:param cmd_verify: Verify command echo before proceeding (default: False).

Expand source code
@flush_session_log
@select_cmd_verify
def send_command_timing(
    self,
    command_string: str,
    last_read: float = 2.0,
    read_timeout: float = 120.0,
    delay_factor: Optional[float] = None,
    max_loops: Optional[int] = None,
    strip_prompt: bool = True,
    strip_command: bool = True,
    normalize: bool = True,
    use_textfsm: bool = False,
    textfsm_template: Optional[str] = None,
    use_ttp: bool = False,
    ttp_template: Optional[str] = None,
    use_genie: bool = False,
    cmd_verify: bool = False,
) -> Union[str, List[Any], Dict[str, Any]]:
    """Execute command_string on the SSH channel using a delay-based mechanism. Generally
    used for show commands.

    :param command_string: The command to be executed on the remote device.

    :param last_read: Time waited after end of data

    :param read_timeout: Absolute timer for how long Netmiko should keep reading data on
        the channel (waiting for there to be no new data). Will raise ReadTimeout if this
        timeout expires. A read_timeout value of 0 will cause the read-loop to never timeout
        (i.e. Netmiko will keep reading indefinitely until there is no new data and last_read
        passes).

    :param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

    :param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

    :param strip_prompt: Remove the trailing router prompt from the output (default: True).

    :param strip_command: Remove the echo of the command from the output (default: True).

    :param normalize: Ensure the proper enter is sent at end of command (default: True).

    :param use_textfsm: Process command output through TextFSM template (default: False).

    :param textfsm_template: Name of template to parse output with; can be fully qualified
        path, relative path, or name of file in current directory. (default: None).

    :param use_ttp: Process command output through TTP template (default: False).

    :param ttp_template: Name of template to parse output with; can be fully qualified
        path, relative path, or name of file in current directory. (default: None).

    :param use_genie: Process command output through PyATS/Genie parser (default: False).

    :param cmd_verify: Verify command echo before proceeding (default: False).
    """
    if delay_factor is not None or max_loops is not None:
        warnings.warn(DELAY_FACTOR_DEPR_SIMPLE_MSG, DeprecationWarning)

    output = ""
    new_data = ""
    if normalize:
        command_string = self.normalize_cmd(command_string)
    self.write_channel(command_string)

    cmd = command_string.strip()
    if cmd and cmd_verify:
        new_data = self.command_echo_read(cmd=cmd, read_timeout=10)
    output += new_data
    output += self.read_channel_timing(
        last_read=last_read, read_timeout=read_timeout
    )

    output = self._sanitize_output(
        output,
        strip_command=strip_command,
        command_string=command_string,
        strip_prompt=strip_prompt,
    )
    return_data = structured_data_converter(
        command=command_string,
        raw_data=output,
        platform=self.device_type,
        use_textfsm=use_textfsm,
        use_ttp=use_ttp,
        use_genie=use_genie,
        textfsm_template=textfsm_template,
        ttp_template=ttp_template,
    )
    return return_data
def send_config_from_file(self, config_file: Union[str, bytes, ForwardRef('PathLike[Any]')], **kwargs: Any) ‑> str

Send configuration commands down the SSH channel from a file.

The file is processed line-by-line and each command is sent down the SSH channel.

**kwargs are passed to send_config_set method.

:param config_file: Path to configuration file to be sent to the device

:param kwargs: params to be sent to send_config_set method

Expand source code
def send_config_from_file(
    self, config_file: Union[str, bytes, "PathLike[Any]"], **kwargs: Any
) -> str:
    """
    Send configuration commands down the SSH channel from a file.

    The file is processed line-by-line and each command is sent down the
    SSH channel.

    **kwargs are passed to send_config_set method.

    :param config_file: Path to configuration file to be sent to the device

    :param kwargs: params to be sent to send_config_set method
    """
    with io.open(config_file, "rt", encoding="utf-8") as cfg_file:
        commands = cfg_file.readlines()
    return self.send_config_set(commands, **kwargs)
def send_config_set(self, config_commands: Union[str, Sequence[str], Iterator[str], TextIO, ForwardRef(None)] = None, *, exit_config_mode: bool = True, read_timeout: Optional[float] = None, delay_factor: Optional[float] = None, max_loops: Optional[int] = None, strip_prompt: bool = False, strip_command: bool = False, config_mode_command: Optional[str] = None, cmd_verify: bool = True, enter_config_mode: bool = True, error_pattern: str = '', terminator: str = '#', bypass_commands: Optional[str] = None) ‑> str

Send configuration commands down the SSH channel.

config_commands is an iterable containing all of the configuration commands. The commands will be executed one after the other.

Automatically exits/enters configuration mode.

:param config_commands: Multiple configuration commands to be sent to the device

:param exit_config_mode: Determines whether or not to exit config mode after complete

:param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

:param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

:param strip_prompt: Determines whether or not to strip the prompt

:param strip_command: Determines whether or not to strip the command

:param read_timeout: Absolute timer to send to read_channel_timing. Also adjusts read_timeout in read_until_pattern calls.

:param config_mode_command: The command to enter into config mode

:param cmd_verify: Whether or not to verify command echo for each command in config_set

:param enter_config_mode: Do you enter config mode before sending config commands

:param error_pattern: Regular expression pattern to detect config errors in the output.

:param terminator: Regular expression pattern to use as an alternate terminator in certain situations.

:param bypass_commands: Regular expression pattern indicating configuration commands where cmd_verify is automatically disabled.

Expand source code
@flush_session_log
def send_config_set(
    self,
    config_commands: Union[str, Sequence[str], Iterator[str], TextIO, None] = None,
    *,
    exit_config_mode: bool = True,
    read_timeout: Optional[float] = None,
    delay_factor: Optional[float] = None,
    max_loops: Optional[int] = None,
    strip_prompt: bool = False,
    strip_command: bool = False,
    config_mode_command: Optional[str] = None,
    cmd_verify: bool = True,
    enter_config_mode: bool = True,
    error_pattern: str = "",
    terminator: str = r"#",
    bypass_commands: Optional[str] = None,
) -> str:
    """
    Send configuration commands down the SSH channel.

    config_commands is an iterable containing all of the configuration commands.
    The commands will be executed one after the other.

    Automatically exits/enters configuration mode.

    :param config_commands: Multiple configuration commands to be sent to the device

    :param exit_config_mode: Determines whether or not to exit config mode after complete

    :param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

    :param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

    :param strip_prompt: Determines whether or not to strip the prompt

    :param strip_command: Determines whether or not to strip the command

    :param read_timeout: Absolute timer to send to read_channel_timing. Also adjusts
    read_timeout in read_until_pattern calls.

    :param config_mode_command: The command to enter into config mode

    :param cmd_verify: Whether or not to verify command echo for each command in config_set

    :param enter_config_mode: Do you enter config mode before sending config commands

    :param error_pattern: Regular expression pattern to detect config errors in the
    output.

    :param terminator: Regular expression pattern to use as an alternate terminator in certain
    situations.

    :param bypass_commands: Regular expression pattern indicating configuration commands
    where cmd_verify is automatically disabled.
    """

    if self.global_cmd_verify is not None:
        cmd_verify = self.global_cmd_verify

    if delay_factor is not None or max_loops is not None:
        warnings.warn(DELAY_FACTOR_DEPR_SIMPLE_MSG, DeprecationWarning)

        # Calculate an equivalent read_timeout (if using old settings)
        # Eliminate in Netmiko 5.x
        if read_timeout is None:
            max_loops = 150 if max_loops is None else max_loops
            delay_factor = 1.0 if delay_factor is None else delay_factor

            # If delay_factor has been set, then look at global_delay_factor
            delay_factor = self.select_delay_factor(delay_factor)

            read_timeout = calc_old_timeout(
                max_loops=max_loops, delay_factor=delay_factor, loop_delay=0.1
            )

    if delay_factor is None:
        delay_factor = self.select_delay_factor(0)
    else:
        delay_factor = self.select_delay_factor(delay_factor)

    if read_timeout is None:
        read_timeout = 15
    else:
        read_timeout = read_timeout

    if config_commands is None:
        return ""
    elif isinstance(config_commands, str):
        config_commands = (config_commands,)

    if not hasattr(config_commands, "__iter__"):
        raise ValueError("Invalid argument passed into send_config_set")

    if bypass_commands is None:
        # Commands where cmd_verify is automatically disabled reg-ex logical-or
        bypass_commands = r"^banner .*$"

    # Set bypass_commands="" to force no-bypass (usually for testing)
    bypass_detected = False
    if bypass_commands:
        # Make a copy of the iterator
        config_commands, config_commands_tmp = itertools.tee(config_commands, 2)
        bypass_detected = any(
            [True for cmd in config_commands_tmp if re.search(bypass_commands, cmd)]
        )
    if bypass_detected:
        cmd_verify = False

    # Send config commands
    output = ""
    if enter_config_mode:
        if config_mode_command:
            output += self.config_mode(config_mode_command)
        else:
            output += self.config_mode()

    # Perform output gathering line-by-line (legacy way)
    if self.fast_cli and self._legacy_mode and not error_pattern:
        for cmd in config_commands:
            self.write_channel(self.normalize_cmd(cmd))
        # Gather output
        output += self.read_channel_timing(read_timeout=read_timeout)

    elif not cmd_verify:
        for cmd in config_commands:
            self.write_channel(self.normalize_cmd(cmd))
            time.sleep(delay_factor * 0.05)

            # Gather the output incrementally due to error_pattern requirements
            if error_pattern:
                output += self.read_channel_timing(read_timeout=read_timeout)
                if re.search(error_pattern, output, flags=re.M):
                    msg = f"Invalid input detected at command: {cmd}"
                    raise ConfigInvalidException(msg)

        # Standard output gathering (no error_pattern)
        if not error_pattern:
            output += self.read_channel_timing(read_timeout=read_timeout)

    else:
        for cmd in config_commands:
            self.write_channel(self.normalize_cmd(cmd))

            # Make sure command is echoed
            output += self.read_until_pattern(
                pattern=re.escape(cmd.strip()), read_timeout=read_timeout
            )

            # Read until next prompt or terminator (#); the .*$ forces read of entire line
            pattern = f"(?:{re.escape(self.base_prompt)}.*$|{terminator}.*$)"
            output += self.read_until_pattern(
                pattern=pattern, read_timeout=read_timeout, re_flags=re.M
            )

            if error_pattern:
                if re.search(error_pattern, output, flags=re.M):
                    msg = f"Invalid input detected at command: {cmd}"
                    raise ConfigInvalidException(msg)

    if exit_config_mode:
        output += self.exit_config_mode()
    output = self._sanitize_output(output)
    log.debug(f"{output}")
    return output
def send_multiline(self, commands: Sequence[Union[str, List[str]]], multiline: bool = True, **kwargs: Any) ‑> str

commands should either be:

commands = [[cmd1, expect1], [cmd2, expect2], …]]

Or

commands = [cmd1, cmd2, cmd3, …]

Any expect_string that is a null-string will use pattern based on device's prompt (unless expect_string argument is passed in via kwargs.

Expand source code
def send_multiline(
    self,
    commands: Sequence[Union[str, List[str]]],
    multiline: bool = True,
    **kwargs: Any,
) -> str:
    """
    commands should either be:

    commands = [[cmd1, expect1], [cmd2, expect2], ...]]

    Or

    commands = [cmd1, cmd2, cmd3, ...]

    Any expect_string that is a null-string will use pattern based on
    device's prompt (unless expect_string argument is passed in via
    kwargs.

    """
    output = ""
    if multiline:
        kwargs = self._multiline_kwargs(**kwargs)

    default_expect_string = kwargs.pop("expect_string", None)
    if not default_expect_string:
        auto_find_prompt = kwargs.get("auto_find_prompt", True)
        default_expect_string = self._prompt_handler(auto_find_prompt)

    if commands and isinstance(commands[0], str):
        # If list of commands just send directly using default_expect_string (probably prompt)
        for cmd in commands:
            cmd = str(cmd)
            output += self._send_command_str(
                cmd, expect_string=default_expect_string, **kwargs
            )
    else:
        # If list of lists, then first element is cmd and second element is expect_string
        for cmd_item in commands:
            assert not isinstance(cmd_item, str)
            cmd, expect_string = cmd_item
            if not expect_string:
                expect_string = default_expect_string
            output += self._send_command_str(
                cmd, expect_string=expect_string, **kwargs
            )
    return output
def send_multiline_timing(self, commands: Sequence[str], multiline: bool = True, **kwargs: Any) ‑> str
Expand source code
def send_multiline_timing(
    self, commands: Sequence[str], multiline: bool = True, **kwargs: Any
) -> str:
    if multiline:
        kwargs = self._multiline_kwargs(**kwargs)
    output = ""
    for cmd in commands:
        cmd = str(cmd)
        output += self._send_command_timing_str(cmd, **kwargs)
    return output
def serial_login(self, pri_prompt_terminator: str = '#\\s*$', alt_prompt_terminator: str = '>\\s*$', username_pattern: str = '(?:[Uu]ser:|sername|ogin)', pwd_pattern: str = 'assword', delay_factor: float = 1.0, max_loops: int = 20) ‑> str
Expand source code
def serial_login(
    self,
    pri_prompt_terminator: str = r"#\s*$",
    alt_prompt_terminator: str = r">\s*$",
    username_pattern: str = r"(?:[Uu]ser:|sername|ogin)",
    pwd_pattern: str = r"assword",
    delay_factor: float = 1.0,
    max_loops: int = 20,
) -> str:
    return self.telnet_login(
        pri_prompt_terminator,
        alt_prompt_terminator,
        username_pattern,
        pwd_pattern,
        delay_factor,
        max_loops,
    )
def session_preparation(self) ‑> None

Prepare the session after the connection has been established

This method handles some differences that occur between various devices early on in the session.

In general, it should include: self._test_channel_read(pattern=r"some_pattern") self.set_base_prompt() self.set_terminal_width() self.disable_paging()

Expand source code
def session_preparation(self) -> None:
    """
    Prepare the session after the connection has been established

    This method handles some differences that occur between various devices
    early on in the session.

    In general, it should include:
    self._test_channel_read(pattern=r"some_pattern")
    self.set_base_prompt()
    self.set_terminal_width()
    self.disable_paging()
    """
    self._test_channel_read()
    self.set_base_prompt()
    self.set_terminal_width()
    self.disable_paging()
def set_base_prompt(self, pri_prompt_terminator: str = '#', alt_prompt_terminator: str = '>', delay_factor: float = 1.0, pattern: Optional[str] = None) ‑> str

Sets self.base_prompt

Used as delimiter for stripping of trailing prompt in output.

Should be set to something that is general and applies in multiple contexts. For Cisco devices this will be set to router hostname (i.e. prompt without > or #).

This will be set on entering user exec or privileged exec on Cisco, but not when entering/exiting config mode.

:param pri_prompt_terminator: Primary trailing delimiter for identifying a device prompt

:param alt_prompt_terminator: Alternate trailing delimiter for identifying a device prompt

:param delay_factor: See init: global_delay_factor

:param pattern: Regular expression pattern to search for in find_prompt() call

Expand source code
def set_base_prompt(
    self,
    pri_prompt_terminator: str = "#",
    alt_prompt_terminator: str = ">",
    delay_factor: float = 1.0,
    pattern: Optional[str] = None,
) -> str:
    """Sets self.base_prompt

    Used as delimiter for stripping of trailing prompt in output.

    Should be set to something that is general and applies in multiple contexts. For Cisco
    devices this will be set to router hostname (i.e. prompt without > or #).

    This will be set on entering user exec or privileged exec on Cisco, but not when
    entering/exiting config mode.

    :param pri_prompt_terminator: Primary trailing delimiter for identifying a device prompt

    :param alt_prompt_terminator: Alternate trailing delimiter for identifying a device prompt

    :param delay_factor: See __init__: global_delay_factor

    :param pattern: Regular expression pattern to search for in find_prompt() call
    """
    if pattern is None:
        if pri_prompt_terminator and alt_prompt_terminator:
            pri_term = re.escape(pri_prompt_terminator)
            alt_term = re.escape(alt_prompt_terminator)
            pattern = rf"({pri_term}|{alt_term})"
        elif pri_prompt_terminator:
            pattern = re.escape(pri_prompt_terminator)
        elif alt_prompt_terminator:
            pattern = re.escape(alt_prompt_terminator)

    if pattern:
        prompt = self.find_prompt(delay_factor=delay_factor, pattern=pattern)
    else:
        prompt = self.find_prompt(delay_factor=delay_factor)

    if not prompt[-1] in (pri_prompt_terminator, alt_prompt_terminator):
        raise ValueError(f"Router prompt not found: {repr(prompt)}")

    # If all we have is the 'terminator' just use that :-(
    if len(prompt) == 1:
        self.base_prompt = prompt
    else:
        # Strip off trailing terminator
        self.base_prompt = prompt[:-1]
    return self.base_prompt
def set_terminal_width(self, command: str = '', delay_factor: Optional[float] = None, cmd_verify: bool = False, pattern: Optional[str] = None) ‑> str

CLI terminals try to automatically adjust the line based on the width of the terminal. This causes the output to get distorted when accessed programmatically.

Set terminal width to 511 which works on a broad set of devices.

:param command: Command string to send to the device

:param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.

Expand source code
def set_terminal_width(
    self,
    command: str = "",
    delay_factor: Optional[float] = None,
    cmd_verify: bool = False,
    pattern: Optional[str] = None,
) -> str:
    """CLI terminals try to automatically adjust the line based on the width of the terminal.
    This causes the output to get distorted when accessed programmatically.

    Set terminal width to 511 which works on a broad set of devices.

    :param command: Command string to send to the device

    :param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.
    """
    if delay_factor is not None:
        warnings.warn(DELAY_FACTOR_DEPR_SIMPLE_MSG, DeprecationWarning)

    if not command:
        return ""
    command = self.normalize_cmd(command)
    self.write_channel(command)

    # Avoid cmd_verify here as terminal width must be set before doing cmd_verify
    if cmd_verify and self.global_cmd_verify is not False:
        output = self.read_until_pattern(pattern=re.escape(command.strip()))
    elif pattern:
        output = self.read_until_pattern(pattern=pattern)
    else:
        output = self.read_until_prompt()
    return output
def special_login_handler(self, delay_factor: float = 1.0) ‑> None

Handler for devices like WLC, Extreme ERS that throw up characters prior to login.

Expand source code
def special_login_handler(self, delay_factor: float = 1.0) -> None:
    """Handler for devices like WLC, Extreme ERS that throw up characters prior to login."""
    pass
def strip_ansi_escape_codes(self, string_buffer: str) ‑> str

Remove any ANSI (VT100) ESC codes from the output

http://en.wikipedia.org/wiki/ANSI_escape_code

Note: this does not capture ALL possible ANSI Escape Codes only the ones I have encountered

Current codes that are filtered: ESC = '' or chr(27) ESC = is the escape character [^ in hex ('') ESC[24;27H Position cursor ESC[?25h Show the cursor ESC[E Next line (HP does ESC-E) ESC[K Erase line from cursor to the end of line ESC[2K Erase entire line ESC[1;24r Enable scrolling from start to row end ESC[?6l Reset mode screen with options 640 x 200 monochrome (graphics) ESC[?7l Disable line wrapping ESC[2J Code erase display ESC[00;32m Color Green (30 to 37 are different colors) ESC[6n Get cursor position ESC[1D Move cursor position leftward by x characters (1 in this case) ESC[9999B Move cursor down N-lines (very large value is attempt to move to the very bottom of the screen).

HP ProCurve and Cisco SG300 require this (possible others).

:param string_buffer: The string to be processed to remove ANSI escape codes :type string_buffer: str

Expand source code
def strip_ansi_escape_codes(self, string_buffer: str) -> str:
    """
    Remove any ANSI (VT100) ESC codes from the output

    http://en.wikipedia.org/wiki/ANSI_escape_code

    Note: this does not capture ALL possible ANSI Escape Codes only the ones
    I have encountered

    Current codes that are filtered:
    ESC = '\x1b' or chr(27)
    ESC = is the escape character [^ in hex ('\x1b')
    ESC[24;27H   Position cursor
    ESC[?25h     Show the cursor
    ESC[E        Next line (HP does ESC-E)
    ESC[K        Erase line from cursor to the end of line
    ESC[2K       Erase entire line
    ESC[1;24r    Enable scrolling from start to row end
    ESC[?6l      Reset mode screen with options 640 x 200 monochrome (graphics)
    ESC[?7l      Disable line wrapping
    ESC[2J       Code erase display
    ESC[00;32m   Color Green (30 to 37 are different colors)
    ESC[6n       Get cursor position
    ESC[1D       Move cursor position leftward by x characters (1 in this case)
    ESC[9999B    Move cursor down N-lines (very large value is attempt to move to the
                 very bottom of the screen).

    HP ProCurve and Cisco SG300 require this (possible others).

    :param string_buffer: The string to be processed to remove ANSI escape codes
    :type string_buffer: str
    """  # noqa

    code_position_cursor = chr(27) + r"\[\d+;\d+H"
    code_show_cursor = chr(27) + r"\[\?25h"
    code_next_line = chr(27) + r"E"
    code_erase_line_end = chr(27) + r"\[K"
    code_erase_line = chr(27) + r"\[2K"
    code_erase_start_line = chr(27) + r"\[K"
    code_enable_scroll = chr(27) + r"\[\d+;\d+r"
    code_insert_line = chr(27) + r"\[(\d+)L"
    code_carriage_return = chr(27) + r"\[1M"
    code_disable_line_wrapping = chr(27) + r"\[\?7l"
    code_reset_mode_screen_options = chr(27) + r"\[\?\d+l"
    code_reset_graphics_mode = chr(27) + r"\[00m"
    code_erase_display = chr(27) + r"\[2J"
    code_erase_display_0 = chr(27) + r"\[J"
    code_graphics_mode = chr(27) + r"\[\dm"
    code_graphics_mode1 = chr(27) + r"\[\d\d;\d\dm"
    code_graphics_mode2 = chr(27) + r"\[\d\d;\d\d;\d\dm"
    code_graphics_mode3 = chr(27) + r"\[(3|4)\dm"
    code_graphics_mode4 = chr(27) + r"\[(9|10)[0-7]m"
    code_get_cursor_position = chr(27) + r"\[6n"
    code_cursor_position = chr(27) + r"\[m"
    code_attrs_off = chr(27) + r"\[0m"
    code_reverse = chr(27) + r"\[7m"
    code_cursor_left = chr(27) + r"\[\d+D"
    code_cursor_forward = chr(27) + r"\[\d*C"
    code_cursor_up = chr(27) + r"\[\d*A"
    code_cursor_down = chr(27) + r"\[\d*B"
    code_wrap_around = chr(27) + r"\[\?7h"
    code_bracketed_paste_mode = chr(27) + r"\[\?2004h"

    code_set = [
        code_position_cursor,
        code_show_cursor,
        code_erase_line,
        code_enable_scroll,
        code_erase_start_line,
        code_carriage_return,
        code_disable_line_wrapping,
        code_erase_line_end,
        code_reset_mode_screen_options,
        code_reset_graphics_mode,
        code_erase_display,
        code_graphics_mode,
        code_graphics_mode1,
        code_graphics_mode2,
        code_graphics_mode3,
        code_graphics_mode4,
        code_get_cursor_position,
        code_cursor_position,
        code_erase_display,
        code_erase_display_0,
        code_attrs_off,
        code_reverse,
        code_cursor_left,
        code_cursor_up,
        code_cursor_down,
        code_cursor_forward,
        code_wrap_around,
        code_bracketed_paste_mode,
    ]

    output = string_buffer
    for ansi_esc_code in code_set:
        output = re.sub(ansi_esc_code, "", output)

    # CODE_NEXT_LINE must substitute with return
    output = re.sub(code_next_line, self.RETURN, output)

    # Aruba and ProCurve switches can use code_insert_line for <enter>
    insert_line_match = re.search(code_insert_line, output)
    if insert_line_match:
        # Substitute each insert_line with a new <enter>
        count = int(insert_line_match.group(1))
        output = re.sub(code_insert_line, count * self.RETURN, output)

    return output
def strip_command(self, command_string: str, output: str) ‑> str

Strip command_string from output string

Cisco IOS adds backspaces into output for long commands (i.e. for commands that line wrap)

:param command_string: The command string sent to the device :type command_string: str

:param output: The returned output as a result of the command string sent to the device :type output: str

Expand source code
def strip_command(self, command_string: str, output: str) -> str:
    """
    Strip command_string from output string

    Cisco IOS adds backspaces into output for long commands (i.e. for commands that line wrap)

    :param command_string: The command string sent to the device
    :type command_string: str

    :param output: The returned output as a result of the command string sent to the device
    :type output: str
    """
    backspace_char = "\x08"

    # Check for line wrap (remove backspaces)
    if backspace_char in output:
        output = output.replace(backspace_char, "")

    # Juniper has a weird case where the echoed command will be " \n"
    # i.e. there is an extra space there.
    cmd = command_string.strip()
    if output.startswith(cmd):
        output_lines = output.split(self.RESPONSE_RETURN)
        new_output = output_lines[1:]
        return self.RESPONSE_RETURN.join(new_output)
    else:
        # command_string isn't there; do nothing
        return output
def strip_prompt(self, a_string: str) ‑> str

Strip the trailing router prompt from the output.

:param a_string: Returned string from device :type a_string: str

Expand source code
def strip_prompt(self, a_string: str) -> str:
    """Strip the trailing router prompt from the output.

    :param a_string: Returned string from device
    :type a_string: str
    """
    response_list = a_string.split(self.RESPONSE_RETURN)
    last_line = response_list[-1]

    if self.base_prompt in last_line:
        return self.RESPONSE_RETURN.join(response_list[:-1])
    else:
        return a_string
def telnet_login(self, pri_prompt_terminator: str = '#\\s*$', alt_prompt_terminator: str = '>\\s*$', username_pattern: str = '(?:user:|username|login|user name)', pwd_pattern: str = 'assword', delay_factor: float = 1.0, max_loops: int = 20) ‑> str

Telnet login. Can be username/password or just password.

:param pri_prompt_terminator: Primary trailing delimiter for identifying a device prompt

:param alt_prompt_terminator: Alternate trailing delimiter for identifying a device prompt

:param username_pattern: Pattern used to identify the username prompt

:param pwd_pattern: Pattern used to identify the pwd prompt

:param delay_factor: See init: global_delay_factor

:param max_loops: Controls the wait time in conjunction with the delay_factor

Expand source code
def telnet_login(
    self,
    pri_prompt_terminator: str = r"#\s*$",
    alt_prompt_terminator: str = r">\s*$",
    username_pattern: str = r"(?:user:|username|login|user name)",
    pwd_pattern: str = r"assword",
    delay_factor: float = 1.0,
    max_loops: int = 20,
) -> str:
    """Telnet login. Can be username/password or just password.

    :param pri_prompt_terminator: Primary trailing delimiter for identifying a device prompt

    :param alt_prompt_terminator: Alternate trailing delimiter for identifying a device prompt

    :param username_pattern: Pattern used to identify the username prompt

    :param pwd_pattern: Pattern used to identify the pwd prompt

    :param delay_factor: See __init__: global_delay_factor

    :param max_loops: Controls the wait time in conjunction with the delay_factor
    """
    delay_factor = self.select_delay_factor(delay_factor)

    # Revert telnet_login back to old speeds/delays
    if delay_factor < 1:
        if not self._legacy_mode and self.fast_cli:
            delay_factor = 1

    time.sleep(1 * delay_factor)

    output = ""
    return_msg = ""
    i = 1
    while i <= max_loops:
        try:
            output = self.read_channel()
            return_msg += output

            # Search for username pattern / send username
            if re.search(username_pattern, output, flags=re.I):
                # Sometimes username/password must be terminated with "\r" and not "\r\n"
                self.write_channel(self.username + "\r")
                time.sleep(1 * delay_factor)
                output = self.read_channel()
                return_msg += output

            # Search for password pattern / send password
            if re.search(pwd_pattern, output, flags=re.I):
                # Sometimes username/password must be terminated with "\r" and not "\r\n"
                assert isinstance(self.password, str)
                self.write_channel(self.password + "\r")
                time.sleep(0.5 * delay_factor)
                output = self.read_channel()
                return_msg += output
                if re.search(
                    pri_prompt_terminator, output, flags=re.M
                ) or re.search(alt_prompt_terminator, output, flags=re.M):
                    return return_msg

            # Check if proper data received
            if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
                alt_prompt_terminator, output, flags=re.M
            ):
                return return_msg

            self.write_channel(self.TELNET_RETURN)
            time.sleep(0.5 * delay_factor)
            i += 1
        except EOFError:
            assert self.remote_conn is not None
            self.remote_conn.close()
            msg = f"Login failed: {self.host}"
            raise NetmikoAuthenticationException(msg)

    # Last try to see if we already logged in
    self.write_channel(self.TELNET_RETURN)
    time.sleep(0.5 * delay_factor)
    output = self.read_channel()
    return_msg += output
    if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
        alt_prompt_terminator, output, flags=re.M
    ):
        return return_msg

    msg = f"Login failed: {self.host}"
    assert self.remote_conn is not None
    self.remote_conn.close()
    raise NetmikoAuthenticationException(msg)
def write_channel(self, out_data: str) ‑> None

Generic method that will write data out the channel.

:param out_data: data to be written to the channel :type out_data: str

Expand source code
@lock_channel
@log_writes
def write_channel(self, out_data: str) -> None:
    """Generic method that will write data out the channel.

    :param out_data: data to be written to the channel
    :type out_data: str
    """
    self.channel.write_channel(out_data)
class ConfigInvalidException (*args, **kwargs)

Exception raised for invalid configuration error.

Expand source code
class ConfigInvalidException(NetmikoBaseException):
    """Exception raised for invalid configuration error."""

    pass

Ancestors

class ConnectionException (*args, **kwargs)

Generic exception indicating the connection failed.

Expand source code
class ConnectionException(NetmikoBaseException):
    """Generic exception indicating the connection failed."""

    pass

Ancestors

class InLineTransfer (ssh_conn: BaseConnection, source_file: str = '', dest_file: str = '', file_system: Optional[str] = None, direction: str = 'put', source_config: Optional[str] = None, socket_timeout: float = 10.0, progress: Optional[Callable[..., Any]] = None, progress4: Optional[Callable[..., Any]] = None, hash_supported: bool = True)

Use TCL on Cisco IOS to directly transfer file.

Expand source code
class InLineTransfer(CiscoIosFileTransfer):
    """Use TCL on Cisco IOS to directly transfer file."""

    def __init__(
        self,
        ssh_conn: BaseConnection,
        source_file: str = "",
        dest_file: str = "",
        file_system: Optional[str] = None,
        direction: str = "put",
        source_config: Optional[str] = None,
        socket_timeout: float = 10.0,
        progress: Optional[Callable[..., Any]] = None,
        progress4: Optional[Callable[..., Any]] = None,
        hash_supported: bool = True,
    ) -> None:

        if not dest_file:
            raise ValueError(
                "Destination file must be specified for InlineTransfer operations."
            )
        if hash_supported is False:
            raise ValueError("hash_supported=False is not supported for InLineTransfer")

        if source_file and source_config:
            msg = "Invalid call to InLineTransfer both source_file and source_config specified."
            raise ValueError(msg)
        if direction != "put":
            raise ValueError("Only put operation supported by InLineTransfer.")

        if progress is not None or progress4 is not None:
            raise NotImplementedError(
                "Progress bar is not supported on inline transfers."
            )
        else:
            self.progress = progress
            self.progress4 = progress4

        self.ssh_ctl_chan = ssh_conn
        self.source_file = source_file
        if source_file:
            self.source_config = None
            self.source_md5 = self.file_md5(source_file)
            self.file_size = os.stat(source_file).st_size
        elif source_config:
            self.source_config = source_config
            self.source_md5 = self.config_md5(source_config)
            self.file_size = len(source_config.encode("UTF-8"))
        self.dest_file = dest_file
        self.direction = direction

        if not file_system:
            self.file_system = self.ssh_ctl_chan._autodetect_fs()
        else:
            self.file_system = file_system

        self.socket_timeout = socket_timeout

    @staticmethod
    def _read_file(file_name: str) -> str:
        with io.open(file_name, "rt", encoding="utf-8") as f:
            return f.read()

    @staticmethod
    def _tcl_newline_rationalize(tcl_string: str) -> str:
        r"""
        When using put inside a TCL {} section the newline is considered a new TCL
        statement and causes a missing curly-brace message. Convert "\n" to "\r". TCL
        will convert the "\r" to a "\n" i.e. you will see a "\n" inside the file on the
        Cisco IOS device.
        """
        NEWLINE = r"\n"
        CARRIAGE_RETURN = r"\r"
        tmp_string = re.sub(NEWLINE, CARRIAGE_RETURN, tcl_string)
        if re.search(r"[{}]", tmp_string):
            msg = "Curly brace detected in string; TCL requires this be escaped."
            raise ValueError(msg)
        return tmp_string

    def __enter__(self) -> "InLineTransfer":
        self._enter_tcl_mode()
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        self._exit_tcl_mode()

    def _enter_tcl_mode(self) -> str:
        TCL_ENTER = "tclsh"
        cmd_failed = ['Translating "tclsh"', "% Unknown command", "% Bad IP address"]
        output = self.ssh_ctl_chan._send_command_str(
            TCL_ENTER,
            expect_string=r"\(tcl\)#",
            strip_prompt=False,
            strip_command=False,
        )
        for pattern in cmd_failed:
            if pattern in output:
                raise ValueError(f"Failed to enter tclsh mode on router: {output}")
        return output

    def _exit_tcl_mode(self) -> str:
        TCL_EXIT = "tclquit"
        self.ssh_ctl_chan.write_channel("\r")
        time.sleep(1)
        output = self.ssh_ctl_chan.read_channel()
        if "(tcl)" in output:
            self.ssh_ctl_chan.write_channel(TCL_EXIT + "\r")
        time.sleep(1)
        output += self.ssh_ctl_chan.read_channel()
        return output

    def establish_scp_conn(self) -> None:
        raise NotImplementedError

    def close_scp_chan(self) -> None:
        raise NotImplementedError

    def local_space_available(self) -> bool:
        raise NotImplementedError

    def file_md5(self, file_name: str, add_newline: bool = False) -> str:
        """Compute MD5 hash of file."""
        if add_newline is True:
            raise ValueError(
                "add_newline argument is not supported for inline transfers."
            )
        file_contents = self._read_file(file_name)
        file_contents = file_contents + "\n"  # Cisco IOS automatically adds this
        file_contents_bytes = file_contents.encode("UTF-8")
        return hashlib.md5(file_contents_bytes).hexdigest()

    def config_md5(self, source_config: str) -> str:
        """Compute MD5 hash of text."""
        file_contents = source_config + "\n"  # Cisco IOS automatically adds this
        file_contents_bytes = file_contents.encode("UTF-8")
        return hashlib.md5(file_contents_bytes).hexdigest()

    def put_file(self) -> None:
        curlybrace = r"{"
        TCL_FILECMD_ENTER = 'puts [open "{}{}" w+] {}'.format(
            self.file_system, self.dest_file, curlybrace
        )
        TCL_FILECMD_EXIT = "}"

        if self.source_file:
            file_contents = self._read_file(self.source_file)
        elif self.source_config:
            file_contents = self.source_config
        file_contents = self._tcl_newline_rationalize(file_contents)

        # Try to remove any existing data
        self.ssh_ctl_chan.clear_buffer()

        self.ssh_ctl_chan.write_channel(TCL_FILECMD_ENTER)
        time.sleep(0.25)
        self.ssh_ctl_chan.write_channel(file_contents)
        self.ssh_ctl_chan.write_channel(TCL_FILECMD_EXIT + "\r")

        # This operation can be slow (depends on the size of the file)
        read_timeout = 100
        sleep_time = 4
        if self.file_size >= 2500:
            read_timeout = 300
            sleep_time = 12
        elif self.file_size >= 7500:
            read_timeout = 600
            sleep_time = 25

        # Initial delay
        time.sleep(sleep_time)

        # File paste and TCL_FILECMD_exit should be indicated by "router(tcl)#"
        output = self.ssh_ctl_chan.read_until_pattern(
            pattern=r"\(tcl\).*$", re_flags=re.M, read_timeout=read_timeout
        )

        # The file doesn't write until tclquit
        TCL_EXIT = "tclquit"
        self.ssh_ctl_chan.write_channel(TCL_EXIT + "\r")

        time.sleep(1)
        # Read all data remaining from the TCLSH session
        pattern = rf"tclquit.*{self.ssh_ctl_chan.base_prompt}.*$"
        re_flags = re.DOTALL | re.M
        output += self.ssh_ctl_chan.read_until_pattern(
            pattern=pattern, re_flags=re_flags, read_timeout=read_timeout
        )
        return None

    def get_file(self) -> None:
        raise NotImplementedError

    def enable_scp(self, cmd: str = "") -> None:
        raise NotImplementedError

    def disable_scp(self, cmd: str = "") -> None:
        raise NotImplementedError

Ancestors

Methods

def config_md5(self, source_config: str) ‑> str

Compute MD5 hash of text.

Expand source code
def config_md5(self, source_config: str) -> str:
    """Compute MD5 hash of text."""
    file_contents = source_config + "\n"  # Cisco IOS automatically adds this
    file_contents_bytes = file_contents.encode("UTF-8")
    return hashlib.md5(file_contents_bytes).hexdigest()
def file_md5(self, file_name: str, add_newline: bool = False) ‑> str

Compute MD5 hash of file.

Expand source code
def file_md5(self, file_name: str, add_newline: bool = False) -> str:
    """Compute MD5 hash of file."""
    if add_newline is True:
        raise ValueError(
            "add_newline argument is not supported for inline transfers."
        )
    file_contents = self._read_file(file_name)
    file_contents = file_contents + "\n"  # Cisco IOS automatically adds this
    file_contents_bytes = file_contents.encode("UTF-8")
    return hashlib.md5(file_contents_bytes).hexdigest()

Inherited members

class NetmikoAuthenticationException (*args, **kwargs)

SSH authentication exception based on Paramiko AuthenticationException.

Expand source code
class NetmikoAuthenticationException(AuthenticationException):
    """SSH authentication exception based on Paramiko AuthenticationException."""

    pass

Ancestors

  • paramiko.ssh_exception.AuthenticationException
  • paramiko.ssh_exception.SSHException
  • builtins.Exception
  • builtins.BaseException
class NetMikoAuthenticationException (*args, **kwargs)

SSH authentication exception based on Paramiko AuthenticationException.

Expand source code
class NetmikoAuthenticationException(AuthenticationException):
    """SSH authentication exception based on Paramiko AuthenticationException."""

    pass

Ancestors

  • paramiko.ssh_exception.AuthenticationException
  • paramiko.ssh_exception.SSHException
  • builtins.Exception
  • builtins.BaseException
class NetmikoBaseException (*args, **kwargs)

General base exception except for exceptions that inherit from Paramiko.

Expand source code
class NetmikoBaseException(Exception):
    """General base exception except for exceptions that inherit from Paramiko."""

    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

class NetmikoTimeoutException (*args, **kwargs)

SSH session timed trying to connect to the device.

Expand source code
class NetmikoTimeoutException(SSHException):
    """SSH session timed trying to connect to the device."""

    pass

Ancestors

  • paramiko.ssh_exception.SSHException
  • builtins.Exception
  • builtins.BaseException
class NetMikoTimeoutException (*args, **kwargs)

SSH session timed trying to connect to the device.

Expand source code
class NetmikoTimeoutException(SSHException):
    """SSH session timed trying to connect to the device."""

    pass

Ancestors

  • paramiko.ssh_exception.SSHException
  • builtins.Exception
  • builtins.BaseException
class ReadException (*args, **kwargs)

General exception indicating an error occurred during a Netmiko read operation.

Expand source code
class ReadException(NetmikoBaseException):
    """General exception indicating an error occurred during a Netmiko read operation."""

    pass

Ancestors

Subclasses

class ReadTimeout (*args, **kwargs)

General exception indicating an error occurred during a Netmiko read operation.

Expand source code
class ReadTimeout(ReadException):
    """General exception indicating an error occurred during a Netmiko read operation."""

    pass

Ancestors

class SCPConn (ssh_conn: BaseConnection, socket_timeout: float = 10.0, progress: Optional[Callable[..., Any]] = None, progress4: Optional[Callable[..., Any]] = None)

Establish a secure copy channel to the remote network device.

Must close the SCP connection to get the file to write to the remote filesystem

Expand source code
class SCPConn(object):
    """
    Establish a secure copy channel to the remote network device.

    Must close the SCP connection to get the file to write to the remote filesystem
    """

    def __init__(
        self,
        ssh_conn: "BaseConnection",
        socket_timeout: float = 10.0,
        progress: Optional[Callable[..., Any]] = None,
        progress4: Optional[Callable[..., Any]] = None,
    ) -> None:
        self.ssh_ctl_chan = ssh_conn
        self.socket_timeout = socket_timeout
        self.progress = progress
        self.progress4 = progress4
        self.establish_scp_conn()

    def establish_scp_conn(self) -> None:
        """Establish the secure copy connection."""
        ssh_connect_params = self.ssh_ctl_chan._connect_params_dict()
        self.scp_conn = self.ssh_ctl_chan._build_ssh_client()
        self.scp_conn.connect(**ssh_connect_params)
        self.scp_client = scp.SCPClient(
            self.scp_conn.get_transport(),
            socket_timeout=self.socket_timeout,
            progress=self.progress,
            progress4=self.progress4,
        )

    def scp_transfer_file(self, source_file: str, dest_file: str) -> None:
        """Put file using SCP (for backwards compatibility)."""
        self.scp_client.put(source_file, dest_file)

    def scp_get_file(self, source_file: str, dest_file: str) -> None:
        """Get file using SCP."""
        self.scp_client.get(source_file, dest_file)

    def scp_put_file(self, source_file: str, dest_file: str) -> None:
        """Put file using SCP."""
        self.scp_client.put(source_file, dest_file)

    def close(self) -> None:
        """Close the SCP connection."""
        self.scp_conn.close()

Methods

def close(self) ‑> None

Close the SCP connection.

Expand source code
def close(self) -> None:
    """Close the SCP connection."""
    self.scp_conn.close()
def establish_scp_conn(self) ‑> None

Establish the secure copy connection.

Expand source code
def establish_scp_conn(self) -> None:
    """Establish the secure copy connection."""
    ssh_connect_params = self.ssh_ctl_chan._connect_params_dict()
    self.scp_conn = self.ssh_ctl_chan._build_ssh_client()
    self.scp_conn.connect(**ssh_connect_params)
    self.scp_client = scp.SCPClient(
        self.scp_conn.get_transport(),
        socket_timeout=self.socket_timeout,
        progress=self.progress,
        progress4=self.progress4,
    )
def scp_get_file(self, source_file: str, dest_file: str) ‑> None

Get file using SCP.

Expand source code
def scp_get_file(self, source_file: str, dest_file: str) -> None:
    """Get file using SCP."""
    self.scp_client.get(source_file, dest_file)
def scp_put_file(self, source_file: str, dest_file: str) ‑> None

Put file using SCP.

Expand source code
def scp_put_file(self, source_file: str, dest_file: str) -> None:
    """Put file using SCP."""
    self.scp_client.put(source_file, dest_file)
def scp_transfer_file(self, source_file: str, dest_file: str) ‑> None

Put file using SCP (for backwards compatibility).

Expand source code
def scp_transfer_file(self, source_file: str, dest_file: str) -> None:
    """Put file using SCP (for backwards compatibility)."""
    self.scp_client.put(source_file, dest_file)
class SSHDetect (*args: Any, **kwargs: Any)

The SSHDetect class tries to automatically guess the device type running on the SSH remote end. Be careful that the kwargs 'device_type' must be set to 'autodetect', otherwise it won't work at all.

Parameters

*args : list
The same *args that you might provide to the netmiko.ssh_dispatcher.ConnectHandler.
*kwargs : dict
The same *kwargs that you might provide to the netmiko.ssh_dispatcher.ConnectHandler.

Attributes

connection : TerminalServerSSH
A basic connection to the remote SSH end.
potential_matches : dict
Dict of (device_type, accuracy) that is populated through an interaction with the remote end.

Methods

autodetect() Try to determine the device type.

Constructor of the SSHDetect class

Expand source code
class SSHDetect(object):
    """
    The SSHDetect class tries to automatically guess the device type running on the SSH remote end.
    Be careful that the kwargs 'device_type' must be set to 'autodetect', otherwise it won't work at
    all.

    Parameters
    ----------
    *args : list
        The same *args that you might provide to the netmiko.ssh_dispatcher.ConnectHandler.
    *kwargs : dict
        The same *kwargs that you might provide to the netmiko.ssh_dispatcher.ConnectHandler.

    Attributes
    ----------
    connection : netmiko.terminal_server.TerminalServerSSH
        A basic connection to the remote SSH end.
    potential_matches: dict
        Dict of (device_type, accuracy) that is populated through an interaction with the
        remote end.

    Methods
    -------
    autodetect()
        Try to determine the device type.
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """
        Constructor of the SSHDetect class
        """
        if kwargs["device_type"] != "autodetect":
            raise ValueError("The connection device_type must be 'autodetect'")
        # Always set cmd_verify to False for autodetect
        kwargs["global_cmd_verify"] = False
        self.connection = ConnectHandler(*args, **kwargs)

        # Add additional sleep to let the login complete.
        time.sleep(3)

        # Call the _test_channel_read() in base to clear initial data
        output = BaseConnection._test_channel_read(self.connection)
        self.initial_buffer = output
        self.potential_matches: Dict[str, int] = {}
        self._results_cache: Dict[str, str] = {}

    def autodetect(self) -> Union[str, None]:
        """
        Try to guess the best 'device_type' based on patterns defined in SSH_MAPPER_BASE

        Returns
        -------
        best_match : str or None
            The device type that is currently the best to use to interact with the device
        """
        for device_type, autodetect_dict in SSH_MAPPER_BASE:
            tmp_dict = autodetect_dict.copy()
            call_method = tmp_dict.pop("dispatch")
            assert isinstance(call_method, str)
            autodetect_method = getattr(self, call_method)
            accuracy = autodetect_method(**tmp_dict)
            if accuracy:
                self.potential_matches[device_type] = accuracy
                if accuracy >= 99:  # Stop the loop as we are sure of our match
                    best_match = sorted(
                        self.potential_matches.items(), key=lambda t: t[1], reverse=True
                    )
                    # WLC needs two different auto-dectect solutions
                    if "cisco_wlc_85" in best_match[0]:
                        best_match[0] = ("cisco_wlc", 99)
                    # IOS XR needs two different auto-dectect solutions
                    if "cisco_xr_2" in best_match[0]:
                        best_match[0] = ("cisco_xr", 99)
                    self.connection.disconnect()
                    return best_match[0][0]

        if not self.potential_matches:
            self.connection.disconnect()
            return None

        best_match = sorted(
            self.potential_matches.items(), key=lambda t: t[1], reverse=True
        )
        self.connection.disconnect()
        return best_match[0][0]

    def _send_command(self, cmd: str = "") -> str:
        """
        Handle reading/writing channel directly. It is also sanitizing the output received.

        Parameters
        ----------
        cmd : str, optional
            The command to send to the remote device (default : "", just send a new line)

        Returns
        -------
        output : str
            The output from the command sent
        """
        self.connection.write_channel(cmd + "\n")
        time.sleep(1)
        output = self.connection.read_channel_timing(last_read=6.0)
        output = self.connection.strip_backspaces(output)
        return output

    def _send_command_wrapper(self, cmd: str) -> str:
        """
        Send command to the remote device with a caching feature to avoid sending the same command
        twice based on the SSH_MAPPER_BASE dict cmd key.

        Parameters
        ----------
        cmd : str
            The command to send to the remote device after checking cache.

        Returns
        -------
        response : str
            The response from the remote device.
        """
        cached_results = self._results_cache.get(cmd)
        if not cached_results:
            response = self._send_command(cmd)
            self._results_cache[cmd] = response
            return response
        else:
            return cached_results

    def _autodetect_remote_version(
        self,
        search_patterns: Optional[List[str]] = None,
        re_flags: int = re.IGNORECASE,
        priority: int = 99,
        **kwargs: Any
    ) -> int:
        """
        Method to try auto-detect the device type, by matching a regular expression on the reported
        remote version of the SSH server.

        Parameters
        ----------
        search_patterns : list
            A list of regular expression to look for in the reported remote SSH version
            (default: None).
        re_flags: re.flags, optional
            Any flags from the python re module to modify the regular expression (default: re.I).
        priority: int, optional
            The confidence the match is right between 0 and 99 (default: 99).
        """
        invalid_responses = [r"^$"]

        if not search_patterns:
            return 0

        try:
            remote_conn = self.connection.remote_conn
            assert isinstance(remote_conn, paramiko.Channel)
            assert remote_conn.transport is not None
            remote_version = remote_conn.transport.remote_version
            for pattern in invalid_responses:
                match = re.search(pattern, remote_version, flags=re.I)
                if match:
                    return 0
            for pattern in search_patterns:
                match = re.search(pattern, remote_version, flags=re_flags)
                if match:
                    return priority
        except Exception:
            return 0
        return 0

    def _autodetect_std(
        self,
        cmd: str = "",
        search_patterns: Optional[List[str]] = None,
        re_flags: int = re.IGNORECASE,
        priority: int = 99,
    ) -> int:
        """
        Standard method to try to auto-detect the device type. This method will be called for each
        device_type present in SSH_MAPPER_BASE dict ('dispatch' key). It will attempt to send a
        command and match some regular expression from the ouput for each entry in SSH_MAPPER_BASE
        ('cmd' and 'search_pattern' keys).

        Parameters
        ----------
        cmd : str
            The command to send to the remote device after checking cache.
        search_patterns : list
            A list of regular expression to look for in the command's output (default: None).
        re_flags: re.flags, optional
            Any flags from the python re module to modify the regular expression (default: re.I).
        priority: int, optional
            The confidence the match is right between 0 and 99 (default: 99).
        """
        invalid_responses = [
            r"% Invalid input detected",
            r"syntax error, expecting",
            r"Error: Unrecognized command",
            r"%Error",
            r"command not found",
            r"Syntax Error: unexpected argument",
            r"% Unrecognized command found at",
        ]
        if not cmd or not search_patterns:
            return 0
        try:
            # _send_command_wrapper will use already cached results if available
            response = self._send_command_wrapper(cmd)
            # Look for error conditions in output
            for pattern in invalid_responses:
                match = re.search(pattern, response, flags=re.I)
                if match:
                    return 0
            for pattern in search_patterns:
                match = re.search(pattern, response, flags=re_flags)
                if match:
                    return priority
        except Exception:
            return 0
        return 0

Methods

def autodetect(self) ‑> Optional[str]

Try to guess the best 'device_type' based on patterns defined in SSH_MAPPER_BASE

Returns

best_match : str or None
The device type that is currently the best to use to interact with the device
Expand source code
def autodetect(self) -> Union[str, None]:
    """
    Try to guess the best 'device_type' based on patterns defined in SSH_MAPPER_BASE

    Returns
    -------
    best_match : str or None
        The device type that is currently the best to use to interact with the device
    """
    for device_type, autodetect_dict in SSH_MAPPER_BASE:
        tmp_dict = autodetect_dict.copy()
        call_method = tmp_dict.pop("dispatch")
        assert isinstance(call_method, str)
        autodetect_method = getattr(self, call_method)
        accuracy = autodetect_method(**tmp_dict)
        if accuracy:
            self.potential_matches[device_type] = accuracy
            if accuracy >= 99:  # Stop the loop as we are sure of our match
                best_match = sorted(
                    self.potential_matches.items(), key=lambda t: t[1], reverse=True
                )
                # WLC needs two different auto-dectect solutions
                if "cisco_wlc_85" in best_match[0]:
                    best_match[0] = ("cisco_wlc", 99)
                # IOS XR needs two different auto-dectect solutions
                if "cisco_xr_2" in best_match[0]:
                    best_match[0] = ("cisco_xr", 99)
                self.connection.disconnect()
                return best_match[0][0]

    if not self.potential_matches:
        self.connection.disconnect()
        return None

    best_match = sorted(
        self.potential_matches.items(), key=lambda t: t[1], reverse=True
    )
    self.connection.disconnect()
    return best_match[0][0]