Module netmiko.snmp_autodetect

This module is used to auto-detect the type of a device in order to automatically create a Netmiko connection.

The will avoid to hard coding the 'device_type' when using the ConnectHandler factory function from Netmiko.

Example:

from netmiko.snmp_autodetect import SNMPDetect

my_snmp = SNMPDetect(hostname='1.1.1.70', user='pysnmp', auth_key='key1', encrypt_key='key2') device_type = my_snmp.autodetect()


autodetect will return None if no match.

SNMPDetect class defaults to SNMPv3

Note, pysnmp is a required dependency for SNMPDetect and is intentionally not included in netmiko requirements. So installation of pysnmp might be required.

Functions

def identify_address_type(entry: str) ‑> List[str]

Return a list containing all ip types found. An empty list means no valid ip were found Parameters


entry : str
Can be an ipv4, an ipv6 or an FQDN.

Returns

list of string: list
A list of string 'IPv4' | 'IPv6' which indicates if entry is a valid ipv4 and/or ipv6.

Classes

class SNMPDetect (hostname: str, snmp_version: str = 'v3', snmp_port: int = 161, community: Optional[str] = None, user: str = '', auth_key: str = '', encrypt_key: str = '', auth_proto: str = 'sha', encrypt_proto: str = 'aes128')

The SNMPDetect class tries to automatically determine the device type.

Typically this will use the MIB-2 SysDescr and regular expressions.

Parameters

hostname : str
The name or IP address of the hostname we want to guess the type
snmp_version : str, optional ('v1', 'v2c' or 'v3')
The SNMP version that is running on the device (default: 'v3')
snmp_port : int, optional
The UDP port on which SNMP is listening (default: 161)
community : str, optional
The SNMP read community when using SNMPv2 (default: None)
user : str, optional
The SNMPv3 user for authentication (default: '')
auth_key : str, optional
The SNMPv3 authentication key (default: '')
encrypt_key : str, optional
The SNMPv3 encryption key (default: '')
auth_proto : str, optional ('des', '3des', 'aes128', 'aes192', 'aes256')
The SNMPv3 authentication protocol (default: 'aes128')
encrypt_proto : str, optional ('sha', 'md5')
The SNMPv3 encryption protocol (default: 'sha')

Attributes

hostname : str
The name or IP address of the device we want to guess the type
snmp_version : str
The SNMP version that is running on the device
snmp_port : int
The UDP port on which SNMP is listening
community : str
The SNMP read community when using SNMPv2
user : str
The SNMPv3 user for authentication
auth_key : str
The SNMPv3 authentication key
encrypt_key : str
The SNMPv3 encryption key
auth_proto : str
The SNMPv3 authentication protocol
encrypt_proto : str
The SNMPv3 encryption protocol

Methods

autodetect() Try to determine the device type.

Expand source code
class SNMPDetect(object):
    """
    The SNMPDetect class tries to automatically determine the device type.

    Typically this will use the MIB-2 SysDescr and regular expressions.

    Parameters
    ----------
    hostname: str
        The name or IP address of the hostname we want to guess the type
    snmp_version : str, optional ('v1', 'v2c' or 'v3')
        The SNMP version that is running on the device (default: 'v3')
    snmp_port : int, optional
        The UDP port on which SNMP is listening (default: 161)
    community : str, optional
        The SNMP read community when using SNMPv2 (default: None)
    user : str, optional
        The SNMPv3 user for authentication (default: '')
    auth_key : str, optional
        The SNMPv3 authentication key (default: '')
    encrypt_key : str, optional
        The SNMPv3 encryption key (default: '')
    auth_proto : str, optional ('des', '3des', 'aes128', 'aes192', 'aes256')
        The SNMPv3 authentication protocol (default: 'aes128')
    encrypt_proto : str, optional ('sha', 'md5')
        The SNMPv3 encryption protocol (default: 'sha')

    Attributes
    ----------
    hostname: str
        The name or IP address of the device we want to guess the type
    snmp_version : str
        The SNMP version that is running on the device
    snmp_port : int
        The UDP port on which SNMP is listening
    community : str
        The SNMP read community when using SNMPv2
    user : str
        The SNMPv3 user for authentication
    auth_key : str
        The SNMPv3 authentication key
    encrypt_key : str
        The SNMPv3 encryption key
    auth_proto : str
        The SNMPv3 authentication protocol
    encrypt_proto : str
        The SNMPv3 encryption protocol

    Methods
    -------
    autodetect()
        Try to determine the device type.

    """

    def __init__(
        self,
        hostname: str,
        snmp_version: str = "v3",
        snmp_port: int = 161,
        community: Optional[str] = None,
        user: str = "",
        auth_key: str = "",
        encrypt_key: str = "",
        auth_proto: str = "sha",
        encrypt_proto: str = "aes128",
    ) -> None:
        # Check that the SNMP version is matching predefined type or raise ValueError
        if snmp_version == "v1" or snmp_version == "v2c":
            if not community:
                raise ValueError("SNMP version v1/v2c community must be set.")
        elif snmp_version == "v3":
            if not user:
                raise ValueError("SNMP version v3 user and password must be set")
        else:
            raise ValueError("SNMP version must be set to 'v1', 'v2c' or 'v3'")

        # Check that the SNMPv3 auth & priv parameters match allowed types
        self._snmp_v3_authentication = {
            "sha": cmdgen.usmHMACSHAAuthProtocol,
            "md5": cmdgen.usmHMACMD5AuthProtocol,
        }
        self._snmp_v3_encryption = {
            "des": cmdgen.usmDESPrivProtocol,
            "3des": cmdgen.usm3DESEDEPrivProtocol,
            "aes128": cmdgen.usmAesCfb128Protocol,
            "aes192": cmdgen.usmAesCfb192Protocol,
            "aes256": cmdgen.usmAesCfb256Protocol,
        }
        if auth_proto not in self._snmp_v3_authentication.keys():
            raise ValueError(
                "SNMP V3 'auth_proto' argument must be one of the following: {}".format(
                    self._snmp_v3_authentication.keys()
                )
            )
        if encrypt_proto not in self._snmp_v3_encryption.keys():
            raise ValueError(
                "SNMP V3 'encrypt_proto' argument must be one of the following: {}".format(
                    self._snmp_v3_encryption.keys()
                )
            )

        self.hostname = hostname
        self.snmp_version = snmp_version
        self.snmp_port = snmp_port
        self.community = community
        self.user = user
        self.auth_key = auth_key
        self.encrypt_key = encrypt_key
        self.auth_proto = self._snmp_v3_authentication[auth_proto]
        self.encryp_proto = self._snmp_v3_encryption[encrypt_proto]
        self._response_cache: Dict[str, str] = {}
        self.snmp_target = (self.hostname, self.snmp_port)

        if "IPv6" in identify_address_type(self.hostname):
            self.udp_transport_target = cmdgen.Udp6TransportTarget(
                self.snmp_target, timeout=1.5, retries=2
            )
        else:
            self.udp_transport_target = cmdgen.UdpTransportTarget(
                self.snmp_target, timeout=1.5, retries=2
            )

    async def _run_query(self, creds: object, oid: str) -> str:
        """
        Asynchronous getCmd query to the device.

        Parameters
        ----------
        creds : UsmUserData or CommunityData object
            The authentication credentials.
        oid : str
            The SNMP OID that you want to get.

        Returns
        -------
        string : str
            The string as part of the value from the OID you are trying to retrieve.
        """
        errorIndication, errorStatus, errorIndex, varBinds = await cmdgen.getCmd(
            cmdgen.SnmpEngine(),
            creds,
            self.udp_transport_target,
            cmdgen.ContextData(),
            cmdgen.ObjectType(cmdgen.ObjectIdentity(oid)),
        )

        if not errorIndication and varBinds[0][1]:
            return str(varBinds[0][1])
        return ""

    def _get_snmpv3_asyncwr(self, oid: str) -> str:
        """
        This is an asynchronous wrapper to call code in newer versions of the pysnmp library
        (V6 and later).
        """
        return asyncio.run(
            self._run_query(
                cmdgen.UsmUserData(
                    self.user,
                    self.auth_key,
                    self.encrypt_key,
                    authProtocol=self.auth_proto,
                    privProtocol=self.encryp_proto,
                ),
                oid,
            )
        )

    def _get_snmpv3(self, oid: str) -> str:
        """
        Try to send an SNMP GET operation using SNMPv3 for the specified OID.

        Parameters
        ----------
        oid : str
            The SNMP OID that you want to get.

        Returns
        -------
        string : str
            The string as part of the value from the OID you are trying to retrieve.
        """
        if SNMP_MODE == "legacy":
            cmd_gen = cmdgen.CommandGenerator()

            (error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd(
                cmdgen.UsmUserData(
                    self.user,
                    self.auth_key,
                    self.encrypt_key,
                    authProtocol=self.auth_proto,
                    privProtocol=self.encryp_proto,
                ),
                self.udp_transport_target,
                oid,
                lookupNames=True,
                lookupValues=True,
            )

            if not error_detected and snmp_data[0][1]:
                return str(snmp_data[0][1])
            return ""
        elif SNMP_MODE == "v6_async":
            return self._get_snmpv3_asyncwr(oid=oid)
        else:
            raise ValueError("SNMP mode must be set to 'legacy' or 'v6_async'")

    def _get_snmpv2c_asyncwr(self, oid: str) -> str:
        """
        This is an asynchronous wrapper to call code in newer versions of the pysnmp library
        (V6 and later).
        """
        return asyncio.run(self._run_query(cmdgen.CommunityData(self.community), oid))

    def _get_snmpv2c(self, oid: str) -> str:
        """
        Try to send an SNMP GET operation using SNMPv2 for the specified OID.

        Parameters
        ----------
        oid : str
            The SNMP OID that you want to get.

        Returns
        -------
        string : str
            The string as part of the value from the OID you are trying to retrieve.
        """
        if SNMP_MODE == "legacy":
            cmd_gen = cmdgen.CommandGenerator()
            (error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd(
                cmdgen.CommunityData(self.community),
                self.udp_transport_target,
                oid,
                lookupNames=True,
                lookupValues=True,
            )
            if not error_detected and snmp_data[0][1]:
                return str(snmp_data[0][1])
            return ""

        elif SNMP_MODE == "v6_async":
            return self._get_snmpv2c_asyncwr(oid=oid)
        else:
            raise ValueError("SNMP mode must be set to 'legacy' or 'v6_async'")

    def _get_snmp(self, oid: str) -> str:
        """Wrapper for generic SNMP call."""
        if self.snmp_version in ["v1", "v2c"]:
            return self._get_snmpv2c(oid)
        else:
            return self._get_snmpv3(oid)

    def autodetect(self) -> Optional[str]:
        """
        Try to guess the device_type using SNMP GET based on the SNMP_MAPPER dict. The type which
        is returned is directly matching the name in *netmiko.ssh_dispatcher.CLASS_MAPPER_BASE*
        dict.

        Thus you can use this name to retrieve automatically the right ConnectionClass

        Returns
        -------
        potential_type : str
            The name of the device_type that must be running.
        """
        # Convert SNMP_MAPPER to a list and sort by priority
        snmp_mapper_orig = []
        for k, v in SNMP_MAPPER.items():
            snmp_mapper_orig.append({k: v})
        snmp_mapper_list = sorted(
            snmp_mapper_orig, key=lambda x: list(x.values())[0]["priority"]  # type: ignore
        )
        snmp_mapper_list.reverse()

        for entry in snmp_mapper_list:
            for device_type, v in entry.items():
                oid: str = v["oid"]  # type: ignore
                regex: Pattern = v["expr"]

                # Used cache data if we already queryied this OID
                if self._response_cache.get(oid):
                    snmp_response = self._response_cache.get(oid)
                else:
                    snmp_response = self._get_snmp(oid)
                    self._response_cache[oid] = snmp_response

                # See if we had a match
                assert isinstance(snmp_response, str)
                if re.search(regex, snmp_response):
                    assert isinstance(device_type, str)
                    return device_type

        return None

Methods

def autodetect(self) ‑> Optional[str]

Try to guess the device_type using SNMP GET based on the SNMP_MAPPER dict. The type which is returned is directly matching the name in netmiko.ssh_dispatcher.CLASS_MAPPER_BASE dict.

Thus you can use this name to retrieve automatically the right ConnectionClass

Returns

potential_type : str
The name of the device_type that must be running.