Module netmiko.nokia.nokia_sros_ssh

Source code
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2014 - 2020 Kirk Byers
# Copyright (c) 2014 - 2020 Twin Bridges Technology
# Copyright (c) 2019 - 2020 NOKIA Inc.
# MIT License - See License file at:
#   https://github.com/ktbyers/netmiko/blob/develop/LICENSE

import re
import os
import time

from netmiko import log
from netmiko.base_connection import BaseConnection
from netmiko.scp_handler import BaseFileTransfer


class NokiaSrosSSH(BaseConnection):
    """
    Implement methods for interacting with Nokia SR OS devices.

    Not applicable in Nokia SR OS (disabled):
        - exit_enable_mode()

    Overriden methods to adapt Nokia SR OS behavior (changed):
        - session_preparation()
        - set_base_prompt()
        - config_mode()
        - exit_config_mode()
        - check_config_mode()
        - save_config()
        - commit()
        - strip_prompt()
        - enable()
        - check_enable_mode()
    """

    def session_preparation(self):
        self._test_channel_read()
        self.set_base_prompt()
        # "@" indicates model-driven CLI (vs Classical CLI)
        if "@" in self.base_prompt:
            self._disable_complete_on_space()
            self.set_terminal_width(
                command="environment console width 512", pattern="environment"
            )
            self.disable_paging(command="environment more false")
            # To perform file operations we need to disable paging in classical-CLI also
            self.disable_paging(command="//environment no more")
        else:
            # Classical CLI has no method to set the terminal width nor to disable command
            # complete on space; consequently, cmd_verify needs disabled.
            self.global_cmd_verify = False
            self.disable_paging(command="environment no more", pattern="environment")

        # Clear the read buffer
        time.sleep(0.3 * self.global_delay_factor)
        self.clear_buffer()

    def set_base_prompt(self, *args, **kwargs):
        """Remove the > when navigating into the different config level."""
        cur_base_prompt = super().set_base_prompt(*args, **kwargs)
        match = re.search(r"\*?(.*?)(>.*)*#", cur_base_prompt)
        if match:
            # strip off >... from base_prompt; strip off leading *
            self.base_prompt = match.group(1)
            return self.base_prompt

    def _disable_complete_on_space(self):
        """
        SR-OS tries to auto complete commands when you type a "space" character.

        This is a bad idea for automation as what your program is sending no longer matches
        the command echo from the device, so we disable this behavior.
        """
        delay_factor = self.select_delay_factor(delay_factor=0)
        time.sleep(delay_factor * 0.1)
        command = "environment command-completion space false"
        self.write_channel(self.normalize_cmd(command))
        time.sleep(delay_factor * 0.1)
        return self.read_channel()

    def enable(self, cmd="enable", pattern="ssword", re_flags=re.IGNORECASE):
        """Enable SR OS administrative mode"""
        if "@" not in self.base_prompt:
            cmd = "enable-admin"
        return super().enable(cmd=cmd, pattern=pattern, re_flags=re_flags)

    def check_enable_mode(self, check_string="in admin mode"):
        """Check if in enable mode."""
        cmd = "enable"
        if "@" not in self.base_prompt:
            cmd = "enable-admin"
        self.write_channel(self.normalize_cmd(cmd))
        output = self.read_until_prompt_or_pattern(pattern="ssword")
        if "ssword" in output:
            self.write_channel(self.RETURN)  # send ENTER to pass the password prompt
            self.read_until_prompt()
        return check_string in output

    def exit_enable_mode(self, *args, **kwargs):
        """Nokia SR OS does not have a notion of exiting administrative mode"""
        return ""

    def config_mode(self, config_command="edit-config exclusive", pattern=r"\(ex\)\["):
        """Enable config edit-mode for Nokia SR OS"""
        output = ""
        # Only model-driven CLI supports config-mode
        if "@" in self.base_prompt:
            output += super().config_mode(
                config_command=config_command, pattern=pattern
            )
        return output

    def exit_config_mode(self, *args, **kwargs):
        """Disable config edit-mode for Nokia SR OS"""
        output = self._exit_all()
        # Model-driven CLI
        if "@" in self.base_prompt and "(ex)[" in output:
            # Asterisk indicates changes were made.
            if "*(ex)[" in output:
                log.warning("Uncommitted changes! Discarding changes!")
                output += self._discard()
            cmd = "quit-config"
            self.write_channel(self.normalize_cmd(cmd))
            if self.global_cmd_verify is not False:
                output += self.read_until_pattern(pattern=re.escape(cmd))
            else:
                output += self.read_until_prompt()
        if self.check_config_mode():
            raise ValueError("Failed to exit configuration mode")
        return output

    def check_config_mode(self, check_string=r"(ex)[", pattern=r"@"):
        """Check config mode for Nokia SR OS"""
        if "@" not in self.base_prompt:
            # Classical CLI
            return False
        else:
            # Model-driven CLI look for "exclusive"
            return super().check_config_mode(check_string=check_string, pattern=pattern)

    def save_config(self, *args, **kwargs):
        """Persist configuration to cflash for Nokia SR OS"""
        return self.send_command(command_string="/admin save", expect_string=r"#")

    def send_config_set(self, config_commands=None, exit_config_mode=None, **kwargs):
        """Model driven CLI requires you not exit from configuration mode."""
        if exit_config_mode is None:
            # Set to False if model-driven CLI
            exit_config_mode = False if "@" in self.base_prompt else True
        return super().send_config_set(
            config_commands=config_commands, exit_config_mode=exit_config_mode, **kwargs
        )

    def commit(self, *args, **kwargs):
        """Activate changes from private candidate for Nokia SR OS"""
        output = self._exit_all()
        if "@" in self.base_prompt and "*(ex)[" in output:
            log.info("Apply uncommitted changes!")
            cmd = "commit"
            self.write_channel(self.normalize_cmd(cmd))
            new_output = ""
            if self.global_cmd_verify is not False:
                new_output += self.read_until_pattern(pattern=re.escape(cmd))
            if "@" not in new_output:
                new_output += self.read_until_pattern(r"@")
            output += new_output
        return output

    def _exit_all(self):
        """Return to the 'root' context."""
        output = ""
        exit_cmd = "exit all"
        self.write_channel(self.normalize_cmd(exit_cmd))
        # Make sure you read until you detect the command echo (avoid getting out of sync)
        if self.global_cmd_verify is not False:
            output += self.read_until_pattern(pattern=re.escape(exit_cmd))
        else:
            output += self.read_until_prompt()
        return output

    def _discard(self):
        """Discard changes from private candidate for Nokia SR OS"""
        output = ""
        if "@" in self.base_prompt:
            cmd = "discard"
            self.write_channel(self.normalize_cmd(cmd))
            new_output = ""
            if self.global_cmd_verify is not False:
                new_output += self.read_until_pattern(pattern=re.escape(cmd))
            if "@" not in new_output:
                new_output += self.read_until_prompt()
            output += new_output
        return output

    def strip_prompt(self, *args, **kwargs):
        """Strip prompt from the output."""
        output = super().strip_prompt(*args, **kwargs)
        if "@" in self.base_prompt:
            # Remove context prompt too
            strips = r"[\r\n]*\!?\*?(\((ex|gl|pr|ro)\))?\[\S*\][\r\n]*"
            return re.sub(strips, "", output)
        else:
            return output

    def cleanup(self, command="logout"):
        """Gracefully exit the SSH session."""
        try:
            # The pattern="" forces use of send_command_timing
            if self.check_config_mode(pattern=""):
                self.exit_config_mode()
        except Exception:
            pass
        # Always try to send final 'logout'.
        self._session_log_fin = True
        self.write_channel(command + self.RETURN)


class NokiaSrosFileTransfer(BaseFileTransfer):
    def __init__(
        self, ssh_conn, source_file, dest_file, hash_supported=False, **kwargs
    ):
        super().__init__(
            ssh_conn, source_file, dest_file, hash_supported=hash_supported, **kwargs
        )

    def _file_cmd_prefix(self):
        """
        Allow MD-CLI to execute file operations by using classical CLI.

        Returns "//" if the current prompt is MD-CLI (empty string otherwise).
        """
        return "//" if "@" in self.ssh_ctl_chan.base_prompt else ""

    def remote_space_available(self, search_pattern=r"(\d+)\s+\w+\s+free"):
        """Return space available on remote device."""

        # Sample text for search_pattern.
        # "               3 Dir(s)               961531904 bytes free."
        remote_cmd = self._file_cmd_prefix() + "file dir {}".format(self.file_system)
        remote_output = self.ssh_ctl_chan.send_command(remote_cmd)
        match = re.search(search_pattern, remote_output)
        return int(match.group(1))

    def check_file_exists(self, remote_cmd=""):
        """Check if destination file exists (returns boolean)."""

        if self.direction == "put":
            if not remote_cmd:
                remote_cmd = self._file_cmd_prefix() + "file dir {}/{}".format(
                    self.file_system, self.dest_file
                )
            dest_file_name = self.dest_file.replace("\\", "/").split("/")[-1]
            remote_out = self.ssh_ctl_chan.send_command(remote_cmd)
            if "File Not Found" in remote_out:
                return False
            elif dest_file_name in remote_out:
                return True
            else:
                raise ValueError("Unexpected output from check_file_exists")
        elif self.direction == "get":
            return os.path.exists(self.dest_file)

    def remote_file_size(self, remote_cmd=None, remote_file=None):
        """Get the file size of the remote file."""

        if remote_file is None:
            if self.direction == "put":
                remote_file = self.dest_file
            elif self.direction == "get":
                remote_file = self.source_file
        if not remote_cmd:
            remote_cmd = self._file_cmd_prefix() + "file dir {}/{}".format(
                self.file_system, remote_file
            )
        remote_out = self.ssh_ctl_chan.send_command(remote_cmd)

        if "File Not Found" in remote_out:
            raise IOError("Unable to find file on remote system")

        # Parse dir output for filename. Output format is:
        # "10/16/2019  10:00p                6738 {filename}"

        pattern = r"\S+\s+\S+\s+(\d+)\s+{}".format(re.escape(remote_file))
        match = re.search(pattern, remote_out)

        if not match:
            raise ValueError("Filename entry not found in dir output")

        file_size = int(match.group(1))
        return file_size

    def verify_file(self):
        """Verify the file has been transferred correctly based on filesize."""
        if self.direction == "put":
            return os.stat(self.source_file).st_size == self.remote_file_size(
                remote_file=self.dest_file
            )
        elif self.direction == "get":
            return (
                self.remote_file_size(remote_file=self.source_file)
                == os.stat(self.dest_file).st_size
            )

    def file_md5(self, **kwargs):
        raise AttributeError("SR-OS does not support an MD5-hash operation.")

    def process_md5(self, **kwargs):
        raise AttributeError("SR-OS does not support an MD5-hash operation.")

    def compare_md5(self, **kwargs):
        raise AttributeError("SR-OS does not support an MD5-hash operation.")

    def remote_md5(self, **kwargs):
        raise AttributeError("SR-OS does not support an MD5-hash operation.")

Classes

class NokiaSrosFileTransfer (ssh_conn, source_file, dest_file, hash_supported=False, **kwargs)

Class to manage SCP file transfer and associated SSH control channel.

Source code
class NokiaSrosFileTransfer(BaseFileTransfer):
    def __init__(
        self, ssh_conn, source_file, dest_file, hash_supported=False, **kwargs
    ):
        super().__init__(
            ssh_conn, source_file, dest_file, hash_supported=hash_supported, **kwargs
        )

    def _file_cmd_prefix(self):
        """
        Allow MD-CLI to execute file operations by using classical CLI.

        Returns "//" if the current prompt is MD-CLI (empty string otherwise).
        """
        return "//" if "@" in self.ssh_ctl_chan.base_prompt else ""

    def remote_space_available(self, search_pattern=r"(\d+)\s+\w+\s+free"):
        """Return space available on remote device."""

        # Sample text for search_pattern.
        # "               3 Dir(s)               961531904 bytes free."
        remote_cmd = self._file_cmd_prefix() + "file dir {}".format(self.file_system)
        remote_output = self.ssh_ctl_chan.send_command(remote_cmd)
        match = re.search(search_pattern, remote_output)
        return int(match.group(1))

    def check_file_exists(self, remote_cmd=""):
        """Check if destination file exists (returns boolean)."""

        if self.direction == "put":
            if not remote_cmd:
                remote_cmd = self._file_cmd_prefix() + "file dir {}/{}".format(
                    self.file_system, self.dest_file
                )
            dest_file_name = self.dest_file.replace("\\", "/").split("/")[-1]
            remote_out = self.ssh_ctl_chan.send_command(remote_cmd)
            if "File Not Found" in remote_out:
                return False
            elif dest_file_name in remote_out:
                return True
            else:
                raise ValueError("Unexpected output from check_file_exists")
        elif self.direction == "get":
            return os.path.exists(self.dest_file)

    def remote_file_size(self, remote_cmd=None, remote_file=None):
        """Get the file size of the remote file."""

        if remote_file is None:
            if self.direction == "put":
                remote_file = self.dest_file
            elif self.direction == "get":
                remote_file = self.source_file
        if not remote_cmd:
            remote_cmd = self._file_cmd_prefix() + "file dir {}/{}".format(
                self.file_system, remote_file
            )
        remote_out = self.ssh_ctl_chan.send_command(remote_cmd)

        if "File Not Found" in remote_out:
            raise IOError("Unable to find file on remote system")

        # Parse dir output for filename. Output format is:
        # "10/16/2019  10:00p                6738 {filename}"

        pattern = r"\S+\s+\S+\s+(\d+)\s+{}".format(re.escape(remote_file))
        match = re.search(pattern, remote_out)

        if not match:
            raise ValueError("Filename entry not found in dir output")

        file_size = int(match.group(1))
        return file_size

    def verify_file(self):
        """Verify the file has been transferred correctly based on filesize."""
        if self.direction == "put":
            return os.stat(self.source_file).st_size == self.remote_file_size(
                remote_file=self.dest_file
            )
        elif self.direction == "get":
            return (
                self.remote_file_size(remote_file=self.source_file)
                == os.stat(self.dest_file).st_size
            )

    def file_md5(self, **kwargs):
        raise AttributeError("SR-OS does not support an MD5-hash operation.")

    def process_md5(self, **kwargs):
        raise AttributeError("SR-OS does not support an MD5-hash operation.")

    def compare_md5(self, **kwargs):
        raise AttributeError("SR-OS does not support an MD5-hash operation.")

    def remote_md5(self, **kwargs):
        raise AttributeError("SR-OS does not support an MD5-hash operation.")

Ancestors

Methods

def check_file_exists(self, remote_cmd='')

Check if destination file exists (returns boolean).

Source code
def check_file_exists(self, remote_cmd=""):
    """Check if destination file exists (returns boolean)."""

    if self.direction == "put":
        if not remote_cmd:
            remote_cmd = self._file_cmd_prefix() + "file dir {}/{}".format(
                self.file_system, self.dest_file
            )
        dest_file_name = self.dest_file.replace("\\", "/").split("/")[-1]
        remote_out = self.ssh_ctl_chan.send_command(remote_cmd)
        if "File Not Found" in remote_out:
            return False
        elif dest_file_name in remote_out:
            return True
        else:
            raise ValueError("Unexpected output from check_file_exists")
    elif self.direction == "get":
        return os.path.exists(self.dest_file)
def verify_file(self)

Verify the file has been transferred correctly based on filesize.

Source code
def verify_file(self):
    """Verify the file has been transferred correctly based on filesize."""
    if self.direction == "put":
        return os.stat(self.source_file).st_size == self.remote_file_size(
            remote_file=self.dest_file
        )
    elif self.direction == "get":
        return (
            self.remote_file_size(remote_file=self.source_file)
            == os.stat(self.dest_file).st_size
        )

Inherited members

class NokiaSrosSSH (ip='', host='', username='', password=None, secret='', port=None, device_type='', verbose=False, global_delay_factor=1, global_cmd_verify=None, use_keys=False, key_file=None, pkey=None, passphrase=None, allow_agent=False, ssh_strict=False, system_host_keys=False, alt_host_keys=False, alt_key_file='', ssh_config_file=None, conn_timeout=5, auth_timeout=None, banner_timeout=15, blocking_timeout=20, timeout=100, session_timeout=60, keepalive=0, default_enter=None, response_return=None, serial_settings=None, fast_cli=False, session_log=None, session_log_record_writes=False, session_log_file_mode='write', allow_auto_change=False, encoding='ascii', sock=None, auto_connect=True)

Implement methods for interacting with Nokia SR OS devices.

Not applicable in Nokia SR OS (disabled): - exit_enable_mode()

Overriden methods to adapt Nokia SR OS behavior (changed): - session_preparation() - set_base_prompt() - config_mode() - exit_config_mode() - check_config_mode() - save_config() - commit() - strip_prompt() - enable() - check_enable_mode()

    Initialize attributes for establishing connection to target device.

    :param ip: IP address of target device. Not required if `host` is
        provided.
    :type ip: str

    :param host: Hostname of target device. Not required if `ip` is
            provided.
    :type host: str

    :param username: Username to authenticate against target device if
            required.
    :type username: str

    :param password: Password to authenticate against target device if
            required.
    :type password: str

    :param secret: The enable password if target device requires one.
    :type secret: str

    :param port: The destination port used to connect to the target
            device.
    :type port: int or None

    :param device_type: Class selection based on device type.
    :type device_type: str

    :param verbose: Enable additional messages to standard output.
    :type verbose: bool

    :param global_delay_factor: Multiplication factor affecting Netmiko delays (default: 1).
    :type global_delay_factor: int

    :param use_keys: Connect to target device using SSH keys.
    :type use_keys: bool

    :param key_file: Filename path of the SSH key file to use.
    :type key_file: str

    :param pkey: SSH key object to use.
    :type pkey: paramiko.PKey

    :param passphrase: Passphrase to use for encrypted key; password will be used for key
            decryption if not specified.
    :type passphrase: str

    :param allow_agent: Enable use of SSH key-agent.
    :type allow_agent: bool

    :param ssh_strict: Automatically reject unknown SSH host keys (default: False, which
            means unknown SSH host keys will be accepted).
    :type ssh_strict: bool

    :param system_host_keys: Load host keys from the users known_hosts file.
    :type system_host_keys: bool
    :param alt_host_keys: If `True` host keys will be loaded from the file specified in
            alt_key_file.
    :type alt_host_keys: bool

    :param alt_key_file: SSH host key file to use (if alt_host_keys=True).
    :type alt_key_file: str

    :param ssh_config_file: File name of OpenSSH configuration file.
    :type ssh_config_file: str

    :param timeout: Connection timeout.
    :type timeout: float

    :param session_timeout: Set a timeout for parallel requests.
    :type session_timeout: float

    :param auth_timeout: Set a timeout (in seconds) to wait for an authentication response.
    :type auth_timeout: float

    :param banner_timeout: Set a timeout to wait for the SSH banner (pass to Paramiko).
    :type banner_timeout: float

    :param keepalive: Send SSH keepalive packets at a specific interval, in seconds.
            Currently defaults to 0, for backwards compatibility (it will not attempt
            to keep the connection alive).
    :type keepalive: int

    :param default_enter: Character(s) to send to correspond to enter key (default:

). :type default_enter: str

    :param response_return: Character(s) to use in normalized return data to represent
            enter key (default:

) :type response_return: str

    :param fast_cli: Provide a way to optimize for performance. Converts select_delay_factor
            to select smallest of global and specific. Sets default global_delay_factor to .1
            (default: False)
    :type fast_cli: boolean

    :param session_log: File path or BufferedIOBase subclass object to write the session log to.
    :type session_log: str

    :param session_log_record_writes: The session log generally only records channel reads due
            to eliminate command duplication due to command echo. You can enable this if you
            want to record both channel reads and channel writes in the log (default: False).
    :type session_log_record_writes: boolean

    :param session_log_file_mode: "write" or "append" for session_log file mode
            (default: "write")
    :type session_log_file_mode: str

    :param allow_auto_change: Allow automatic configuration changes for terminal settings.
            (default: False)
    :type allow_auto_change: bool

    :param encoding: Encoding to be used when writing bytes to the output channel.
            (default: ascii)
    :type encoding: str

    :param sock: An open socket or socket-like object (such as a `.Channel`) to use for
            communication to the target host (default: None).
    :type sock: socket

    :param global_cmd_verify: Control whether command echo verification is enabled or disabled
            (default: None). Global attribute takes precedence over function `cmd_verify`
            argument. Value of `None` indicates to use function `cmd_verify` argument.
    :type global_cmd_verify: bool|None

    :param auto_connect: Control whether Netmiko automatically establishes the connection as
            part of the object creation (default: True).
    :type auto_connect: bool
Source code
class NokiaSrosSSH(BaseConnection):
    """
    Implement methods for interacting with Nokia SR OS devices.

    Not applicable in Nokia SR OS (disabled):
        - exit_enable_mode()

    Overriden methods to adapt Nokia SR OS behavior (changed):
        - session_preparation()
        - set_base_prompt()
        - config_mode()
        - exit_config_mode()
        - check_config_mode()
        - save_config()
        - commit()
        - strip_prompt()
        - enable()
        - check_enable_mode()
    """

    def session_preparation(self):
        self._test_channel_read()
        self.set_base_prompt()
        # "@" indicates model-driven CLI (vs Classical CLI)
        if "@" in self.base_prompt:
            self._disable_complete_on_space()
            self.set_terminal_width(
                command="environment console width 512", pattern="environment"
            )
            self.disable_paging(command="environment more false")
            # To perform file operations we need to disable paging in classical-CLI also
            self.disable_paging(command="//environment no more")
        else:
            # Classical CLI has no method to set the terminal width nor to disable command
            # complete on space; consequently, cmd_verify needs disabled.
            self.global_cmd_verify = False
            self.disable_paging(command="environment no more", pattern="environment")

        # Clear the read buffer
        time.sleep(0.3 * self.global_delay_factor)
        self.clear_buffer()

    def set_base_prompt(self, *args, **kwargs):
        """Remove the > when navigating into the different config level."""
        cur_base_prompt = super().set_base_prompt(*args, **kwargs)
        match = re.search(r"\*?(.*?)(>.*)*#", cur_base_prompt)
        if match:
            # strip off >... from base_prompt; strip off leading *
            self.base_prompt = match.group(1)
            return self.base_prompt

    def _disable_complete_on_space(self):
        """
        SR-OS tries to auto complete commands when you type a "space" character.

        This is a bad idea for automation as what your program is sending no longer matches
        the command echo from the device, so we disable this behavior.
        """
        delay_factor = self.select_delay_factor(delay_factor=0)
        time.sleep(delay_factor * 0.1)
        command = "environment command-completion space false"
        self.write_channel(self.normalize_cmd(command))
        time.sleep(delay_factor * 0.1)
        return self.read_channel()

    def enable(self, cmd="enable", pattern="ssword", re_flags=re.IGNORECASE):
        """Enable SR OS administrative mode"""
        if "@" not in self.base_prompt:
            cmd = "enable-admin"
        return super().enable(cmd=cmd, pattern=pattern, re_flags=re_flags)

    def check_enable_mode(self, check_string="in admin mode"):
        """Check if in enable mode."""
        cmd = "enable"
        if "@" not in self.base_prompt:
            cmd = "enable-admin"
        self.write_channel(self.normalize_cmd(cmd))
        output = self.read_until_prompt_or_pattern(pattern="ssword")
        if "ssword" in output:
            self.write_channel(self.RETURN)  # send ENTER to pass the password prompt
            self.read_until_prompt()
        return check_string in output

    def exit_enable_mode(self, *args, **kwargs):
        """Nokia SR OS does not have a notion of exiting administrative mode"""
        return ""

    def config_mode(self, config_command="edit-config exclusive", pattern=r"\(ex\)\["):
        """Enable config edit-mode for Nokia SR OS"""
        output = ""
        # Only model-driven CLI supports config-mode
        if "@" in self.base_prompt:
            output += super().config_mode(
                config_command=config_command, pattern=pattern
            )
        return output

    def exit_config_mode(self, *args, **kwargs):
        """Disable config edit-mode for Nokia SR OS"""
        output = self._exit_all()
        # Model-driven CLI
        if "@" in self.base_prompt and "(ex)[" in output:
            # Asterisk indicates changes were made.
            if "*(ex)[" in output:
                log.warning("Uncommitted changes! Discarding changes!")
                output += self._discard()
            cmd = "quit-config"
            self.write_channel(self.normalize_cmd(cmd))
            if self.global_cmd_verify is not False:
                output += self.read_until_pattern(pattern=re.escape(cmd))
            else:
                output += self.read_until_prompt()
        if self.check_config_mode():
            raise ValueError("Failed to exit configuration mode")
        return output

    def check_config_mode(self, check_string=r"(ex)[", pattern=r"@"):
        """Check config mode for Nokia SR OS"""
        if "@" not in self.base_prompt:
            # Classical CLI
            return False
        else:
            # Model-driven CLI look for "exclusive"
            return super().check_config_mode(check_string=check_string, pattern=pattern)

    def save_config(self, *args, **kwargs):
        """Persist configuration to cflash for Nokia SR OS"""
        return self.send_command(command_string="/admin save", expect_string=r"#")

    def send_config_set(self, config_commands=None, exit_config_mode=None, **kwargs):
        """Model driven CLI requires you not exit from configuration mode."""
        if exit_config_mode is None:
            # Set to False if model-driven CLI
            exit_config_mode = False if "@" in self.base_prompt else True
        return super().send_config_set(
            config_commands=config_commands, exit_config_mode=exit_config_mode, **kwargs
        )

    def commit(self, *args, **kwargs):
        """Activate changes from private candidate for Nokia SR OS"""
        output = self._exit_all()
        if "@" in self.base_prompt and "*(ex)[" in output:
            log.info("Apply uncommitted changes!")
            cmd = "commit"
            self.write_channel(self.normalize_cmd(cmd))
            new_output = ""
            if self.global_cmd_verify is not False:
                new_output += self.read_until_pattern(pattern=re.escape(cmd))
            if "@" not in new_output:
                new_output += self.read_until_pattern(r"@")
            output += new_output
        return output

    def _exit_all(self):
        """Return to the 'root' context."""
        output = ""
        exit_cmd = "exit all"
        self.write_channel(self.normalize_cmd(exit_cmd))
        # Make sure you read until you detect the command echo (avoid getting out of sync)
        if self.global_cmd_verify is not False:
            output += self.read_until_pattern(pattern=re.escape(exit_cmd))
        else:
            output += self.read_until_prompt()
        return output

    def _discard(self):
        """Discard changes from private candidate for Nokia SR OS"""
        output = ""
        if "@" in self.base_prompt:
            cmd = "discard"
            self.write_channel(self.normalize_cmd(cmd))
            new_output = ""
            if self.global_cmd_verify is not False:
                new_output += self.read_until_pattern(pattern=re.escape(cmd))
            if "@" not in new_output:
                new_output += self.read_until_prompt()
            output += new_output
        return output

    def strip_prompt(self, *args, **kwargs):
        """Strip prompt from the output."""
        output = super().strip_prompt(*args, **kwargs)
        if "@" in self.base_prompt:
            # Remove context prompt too
            strips = r"[\r\n]*\!?\*?(\((ex|gl|pr|ro)\))?\[\S*\][\r\n]*"
            return re.sub(strips, "", output)
        else:
            return output

    def cleanup(self, command="logout"):
        """Gracefully exit the SSH session."""
        try:
            # The pattern="" forces use of send_command_timing
            if self.check_config_mode(pattern=""):
                self.exit_config_mode()
        except Exception:
            pass
        # Always try to send final 'logout'.
        self._session_log_fin = True
        self.write_channel(command + self.RETURN)

Ancestors

Methods

def check_config_mode(self, check_string='(ex)[', pattern='@')

Check config mode for Nokia SR OS

Source code
def check_config_mode(self, check_string=r"(ex)[", pattern=r"@"):
    """Check config mode for Nokia SR OS"""
    if "@" not in self.base_prompt:
        # Classical CLI
        return False
    else:
        # Model-driven CLI look for "exclusive"
        return super().check_config_mode(check_string=check_string, pattern=pattern)
def check_enable_mode(self, check_string='in admin mode')

Check if in enable mode.

Source code
def check_enable_mode(self, check_string="in admin mode"):
    """Check if in enable mode."""
    cmd = "enable"
    if "@" not in self.base_prompt:
        cmd = "enable-admin"
    self.write_channel(self.normalize_cmd(cmd))
    output = self.read_until_prompt_or_pattern(pattern="ssword")
    if "ssword" in output:
        self.write_channel(self.RETURN)  # send ENTER to pass the password prompt
        self.read_until_prompt()
    return check_string in output
def cleanup(self, command='logout')

Gracefully exit the SSH session.

Source code
def cleanup(self, command="logout"):
    """Gracefully exit the SSH session."""
    try:
        # The pattern="" forces use of send_command_timing
        if self.check_config_mode(pattern=""):
            self.exit_config_mode()
    except Exception:
        pass
    # Always try to send final 'logout'.
    self._session_log_fin = True
    self.write_channel(command + self.RETURN)
def commit(self, *args, **kwargs)

Activate changes from private candidate for Nokia SR OS

Source code
def commit(self, *args, **kwargs):
    """Activate changes from private candidate for Nokia SR OS"""
    output = self._exit_all()
    if "@" in self.base_prompt and "*(ex)[" in output:
        log.info("Apply uncommitted changes!")
        cmd = "commit"
        self.write_channel(self.normalize_cmd(cmd))
        new_output = ""
        if self.global_cmd_verify is not False:
            new_output += self.read_until_pattern(pattern=re.escape(cmd))
        if "@" not in new_output:
            new_output += self.read_until_pattern(r"@")
        output += new_output
    return output
def config_mode(self, config_command='edit-config exclusive', pattern='\\(ex\\)\\[')

Enable config edit-mode for Nokia SR OS

Source code
def config_mode(self, config_command="edit-config exclusive", pattern=r"\(ex\)\["):
    """Enable config edit-mode for Nokia SR OS"""
    output = ""
    # Only model-driven CLI supports config-mode
    if "@" in self.base_prompt:
        output += super().config_mode(
            config_command=config_command, pattern=pattern
        )
    return output
def enable(self, cmd='enable', pattern='ssword', re_flags=)

Enable SR OS administrative mode

Source code
def enable(self, cmd="enable", pattern="ssword", re_flags=re.IGNORECASE):
    """Enable SR OS administrative mode"""
    if "@" not in self.base_prompt:
        cmd = "enable-admin"
    return super().enable(cmd=cmd, pattern=pattern, re_flags=re_flags)
def exit_config_mode(self, *args, **kwargs)

Disable config edit-mode for Nokia SR OS

Source code
def exit_config_mode(self, *args, **kwargs):
    """Disable config edit-mode for Nokia SR OS"""
    output = self._exit_all()
    # Model-driven CLI
    if "@" in self.base_prompt and "(ex)[" in output:
        # Asterisk indicates changes were made.
        if "*(ex)[" in output:
            log.warning("Uncommitted changes! Discarding changes!")
            output += self._discard()
        cmd = "quit-config"
        self.write_channel(self.normalize_cmd(cmd))
        if self.global_cmd_verify is not False:
            output += self.read_until_pattern(pattern=re.escape(cmd))
        else:
            output += self.read_until_prompt()
    if self.check_config_mode():
        raise ValueError("Failed to exit configuration mode")
    return output
def exit_enable_mode(self, *args, **kwargs)

Nokia SR OS does not have a notion of exiting administrative mode

Source code
def exit_enable_mode(self, *args, **kwargs):
    """Nokia SR OS does not have a notion of exiting administrative mode"""
    return ""
def save_config(self, *args, **kwargs)

Persist configuration to cflash for Nokia SR OS

Source code
def save_config(self, *args, **kwargs):
    """Persist configuration to cflash for Nokia SR OS"""
    return self.send_command(command_string="/admin save", expect_string=r"#")
def send_config_set(self, config_commands=None, exit_config_mode=None, **kwargs)

Model driven CLI requires you not exit from configuration mode.

Source code
def send_config_set(self, config_commands=None, exit_config_mode=None, **kwargs):
    """Model driven CLI requires you not exit from configuration mode."""
    if exit_config_mode is None:
        # Set to False if model-driven CLI
        exit_config_mode = False if "@" in self.base_prompt else True
    return super().send_config_set(
        config_commands=config_commands, exit_config_mode=exit_config_mode, **kwargs
    )
def set_base_prompt(self, *args, **kwargs)

Remove the > when navigating into the different config level.

Source code
def set_base_prompt(self, *args, **kwargs):
    """Remove the > when navigating into the different config level."""
    cur_base_prompt = super().set_base_prompt(*args, **kwargs)
    match = re.search(r"\*?(.*?)(>.*)*#", cur_base_prompt)
    if match:
        # strip off >... from base_prompt; strip off leading *
        self.base_prompt = match.group(1)
        return self.base_prompt
def strip_prompt(self, *args, **kwargs)

Strip prompt from the output.

Source code
def strip_prompt(self, *args, **kwargs):
    """Strip prompt from the output."""
    output = super().strip_prompt(*args, **kwargs)
    if "@" in self.base_prompt:
        # Remove context prompt too
        strips = r"[\r\n]*\!?\*?(\((ex|gl|pr|ro)\))?\[\S*\][\r\n]*"
        return re.sub(strips, "", output)
    else:
        return output

Inherited members