Source code for exec_helpers._ssh_helpers

"""SSH client shared helpers."""

from __future__ import annotations

import functools
import pathlib
import typing

import paramiko

if typing.TYPE_CHECKING:
    from collections.abc import Collection

    # noinspection PyPackageRequirements
    import logwrap

SSHConfigDictLikeT = typing.Dict[str, typing.Union[str, int, bool, typing.Collection[str]]]
SSHConfigsDictT = typing.Dict[str, SSHConfigDictLikeT]


# Parse default SSHConfig if available
SSH_CONFIG_FILE_SYSTEM = pathlib.Path("/etc/ssh/ssh_config")
SSH_CONFIG_FILE_USER = pathlib.Path("~/.ssh/config").expanduser()


@functools.lru_cache(maxsize=128, typed=True)
def _parse_ssh_config_file(file_path: pathlib.Path) -> paramiko.SSHConfig | None:
    """Parse ssh config file.

    :param file_path: file path for parsing
    :type file_path: pathlib.Path
    :return: SSH config if file found and parsed else None
    :rtype: paramiko.SSHConfig | None
    """
    if not file_path.exists():
        return None
    # noinspection PyBroadException
    try:
        config = paramiko.SSHConfig()
        with file_path.open() as f_obj:
            config.parse(f_obj)
    except Exception:
        config = None

    return config


[docs] class SSHConfig: """Parsed SSH Config for creation connection.""" __slots__ = ( "__controlmaster", "__controlpath", "__hostname", "__identityfile", "__port", "__proxycommand", "__proxyjump", "__user", )
[docs] def __init__( self, hostname: str, port: str | int | None = None, user: str | None = None, identityfile: Collection[str] | None = None, proxycommand: str | None = None, proxyjump: str | None = None, *, controlpath: str | None = None, controlmaster: str | bool | None = None, ): """SSH Config for creation connection. :param hostname: hostname, which config relates :type hostname: str :param port: remote port :type port: str | int | None :param user: remote user :type user: str | None :param identityfile: connection ssh keys file names :type identityfile: Collection[str] | None :param proxycommand: proxy command for ssh connection :type proxycommand: str | None :type proxyjump: str | None :param proxyjump: proxy host name :param controlpath: shared socket file path for re-using connection by multiple instances :type controlpath: str | None :param controlmaster: re-use connection :type controlmaster: str | bool | None :raises ValueError: Invalid argument provided. .. versionadded:: 6.0.0 """ self.__hostname: str = hostname self.__port: int | None = self._parse_optional_int(port) if isinstance(self.__port, int) and not 0 < self.__port < 65535: raise ValueError(f"port {self.__port} if not in range [1, 65535], which is incorrect.") self.__user: str | None = user self.__identityfile: Collection[str] | None = identityfile if proxycommand and proxyjump: raise ValueError( f"ProxyCommand ({proxycommand}) and ProxyJump ({proxyjump}) is mixed for single connection!" ) self.__proxycommand: str | None = proxycommand self.__proxyjump: str | None = proxyjump self.__controlpath: str | None = controlpath self.__controlmaster: bool | None = self._parse_optional_bool(controlmaster)
def __hash__(self) -> int: # pragma: no cover """Hash for caching possibility. :return: hash for instance :rtype: int """ return hash( ( self.__class__, self.__hostname, self.__port, self.__user, self.__identityfile if self.__identityfile is None else tuple(self.__identityfile), self.__proxycommand, self.__proxyjump, self.__controlpath, self.__controlmaster, ) ) def __repr__(self) -> str: """Debug support. :return: string representation allowing to re-construct object :rtype: str """ return ( f"{self.__class__.__name__}(" f"hostname={self.hostname!r}, " f"port={self.port!r}, " f"user={self.user!r}, " f"identityfile={self.identityfile!r}, " f"proxycommand={self.proxycommand!r}, " f"proxyjump={self.proxyjump!r}, " f"controlpath={self.controlpath!r}, " f"controlmaster={self.controlmaster!r}, " f")" ) def __pretty_repr__( self, log_wrap: logwrap.PrettyRepr, indent: int = 0, no_indent_start: bool = False, ) -> str: """Make human-readable representation of object. :param log_wrap: logwrap instance :type log_wrap: logwrap.PrettyRepr :param indent: start indentation :type indent: int :param no_indent_start: do not indent open bracket and simple parameters :type no_indent_start: bool :return: formatted string :rtype: str """ next_indent = log_wrap.next_indent(indent) msg = ( f"{'':<{0 if no_indent_start else indent}}{self.__class__.__name__}(\n" f"{'':<{next_indent}}hostname={self.hostname!r},\n" f"{'':<{next_indent}}port={self.port!r},\n" f"{'':<{next_indent}}user={self.user!r},\n" f"{'':<{next_indent}}identityfile={self.identityfile!r},\n" f"{'':<{next_indent}}proxycommand={self.proxycommand!r},\n" f"{'':<{next_indent}}proxyjump={self.proxyjump!r},\n" f"{'':<{next_indent}}controlpath={self.controlpath!r},\n" f"{'':<{next_indent}}controlmaster={self.controlmaster!r},\n" f"{'':<{0 if no_indent_start else indent}})" ) return msg @staticmethod def _parse_optional_int(value: str | int | None) -> int | None: """Parse optional integer field in source data. :param value: value to process :type value: str | int | None :return: integer value if applicable :rtype: int | None """ if value is None or isinstance(value, int): return value return int(value) @staticmethod def _parse_optional_bool(value: str | bool | None) -> bool | None: """Parse optional bool field in source data. :param value: value to process :type value: str | bool | None :return: boolean value if applicable :rtype: bool | None """ if value is None or isinstance(value, bool): return value return value.lower() == "yes" @classmethod def from_ssh_config( cls, ssh_config: paramiko.config.SSHConfigDict | SSHConfigDictLikeT, ) -> SSHConfig: """Construct config from Paramiko parsed file. :param ssh_config: paramiko parsed ssh config or it reconstruction as a dict :type ssh_config: paramiko.config.SSHConfigDict | dict[str, str | int | bool | list[str]] :return: SSHConfig with supported values from config :rtype: SSHConfig """ return cls( hostname=ssh_config["hostname"], # type: ignore[arg-type] port=ssh_config.get("port"), # type: ignore[arg-type] user=ssh_config.get("user"), # type: ignore[arg-type] identityfile=ssh_config.get("identityfile"), # type: ignore[arg-type] proxycommand=ssh_config.get("proxycommand"), # type: ignore[arg-type] proxyjump=ssh_config.get("proxyjump"), # type: ignore[arg-type] controlpath=ssh_config.get("controlpath"), # type: ignore[arg-type] controlmaster=ssh_config.get("controlmaster"), # type: ignore[arg-type] ) @property def as_dict(self) -> SSHConfigDictLikeT: """Dictionary for rebuilding config. :return: config as dictionary with only not None values :rtype: dict[str, str | int | bool | list[str]] """ result: SSHConfigDictLikeT = {"hostname": self.hostname} if self.port is not None: result["port"] = self.port if self.user is not None: result["user"] = self.user if self.identityfile is not None: result["identityfile"] = self.identityfile if self.proxycommand is not None: result["proxycommand"] = self.proxycommand if self.proxyjump is not None: result["proxyjump"] = self.proxyjump if self.controlpath is not None: result["controlpath"] = self.controlpath if self.controlmaster is not None: result["controlmaster"] = self.controlmaster return result
[docs] def overridden_by(self, ssh_config: SSHConfig) -> SSHConfig: """Get copy with values overridden by another config. :param ssh_config: Other ssh config :type ssh_config: SSHConfig :return: Composite from 2 configs with priority of second one :rtype: SSHConfig """ cls: type[SSHConfig] = self.__class__ return cls( hostname=ssh_config.hostname, port=ssh_config.port if ssh_config.port is not None else self.port, user=ssh_config.user if ssh_config.user is not None else self.user, identityfile=ssh_config.identityfile if ssh_config.identityfile is not None else self.identityfile, proxycommand=ssh_config.proxycommand if ssh_config.proxycommand is not None else self.proxycommand, proxyjump=ssh_config.proxyjump if ssh_config.proxyjump is not None else self.proxyjump, controlpath=ssh_config.controlpath if ssh_config.controlpath is not None else self.controlpath, controlmaster=ssh_config.controlmaster if ssh_config.controlmaster is not None else self.controlmaster, )
def __eq__( self, other: SSHConfig | SSHConfigDictLikeT | typing.Any, ) -> bool | type(NotImplemented): # type: ignore[valid-type] """Equality check. :return: other equals self :rtype: bool """ if isinstance(other, SSHConfig): return all( getattr(self, attr) == getattr(other, attr) for attr in ( "hostname", "user", "port", "identityfile", "proxycommand", "proxyjump", "controlpath", "controlmaster", ) ) if isinstance(other, dict): return self == self.from_ssh_config(other) return NotImplemented @property def hostname(self) -> str: """Hostname which config relates. :return: remote hostname :rtype: str """ return self.__hostname @property def port(self) -> int | None: """Remote port. :return: propagated remote port for connection :rtype: int | None """ return self.__port @property def user(self) -> str | None: """Remote user. :return: propagated username for connection :rtype: str | None """ return self.__user @property def identityfile(self) -> Collection[str]: """Connection ssh keys file names. :return: list of ssh private keys names :rtype: Collection[str] """ if self.__identityfile is None: return () if isinstance(self.__identityfile, str): return (self.__identityfile,) return tuple(self.__identityfile) @property def proxycommand(self) -> str | None: """Proxy command for ssh connection. :return: command to be executed for socket creation if applicable :rtype: str | None """ return self.__proxycommand @property def proxyjump(self) -> str | None: """Proxy host name. :return: proxy hostname if applicable :rtype: str | None """ return self.__proxyjump @property def controlpath(self) -> str | None: """Shared socket file path for re-using connection by multiple instances. :return: shared socket filesystem path :rtype: str | None """ return self.__controlpath @property def controlmaster(self) -> bool | None: """Re-use connection. :return: connection should be re-used if possible :rtype: bool | None """ return self.__controlmaster
[docs] class HostsSSHConfigs(typing.Dict[str, SSHConfig]): """Specific dictionary for managing SSHConfig records. Instead of creating new record by request just generate default value and return if not exists. """
[docs] def __missing__(self, key: str) -> SSHConfig: """Missing key handling. :param key: nonexistent key :type key: str :return: generated ssh config for host :rtype: SSHConfig :raises KeyError: key is not string .. versionadded:: 6.0.0 """ if isinstance(key, str): return SSHConfig(key) raise KeyError(f"{key} is not available and not allowed.") # pragma: no cover
def _parse_paramiko_ssh_config(conf: paramiko.SSHConfig, host: str) -> HostsSSHConfigs: """Parse Paramiko ssh config for specific host to dictionary. :param conf: Paramiko SSHConfig instance :type conf: paramiko.SSHConfig :param host: hostname to seek in config :type host: str :return: parsed dictionary with proxy jump path, if available :rtype: HostsSSHConfigs """ config = HostsSSHConfigs({host: SSHConfig.from_ssh_config(conf.lookup(host))}) config.setdefault(config[host].hostname, config[host]) # Expand proxy info proxy_jump: str | None = config[host].proxyjump while proxy_jump is not None: config[proxy_jump] = SSHConfig.from_ssh_config(conf.lookup(proxy_jump)) proxy_jump = config[proxy_jump].proxyjump return config def _parse_dict_ssh_config(conf: SSHConfigsDictT, host: str) -> HostsSSHConfigs: """Extract required data from pre-parsed ssh config for specific host to dictionary. :param conf: pre-parsed dictionary :type conf: dict[str, dict[str, str | int | bool | list[str]]] :param host: hostname to seek in config :type host: str :return: parsed dictionary with proxy jump path, if available :rtype: HostsSSHConfigs """ config = HostsSSHConfigs({host: SSHConfig.from_ssh_config(conf.get(host, {"hostname": host}))}) config.setdefault(config[host].hostname, config[host]) # Expand proxy info proxy_jump: str | None = config[host].proxyjump while proxy_jump is not None: config[proxy_jump] = SSHConfig.from_ssh_config(conf.get(proxy_jump, {"hostname": proxy_jump})) proxy_jump = config[proxy_jump].proxyjump return config def parse_ssh_config( ssh_config: str | paramiko.SSHConfig | SSHConfigsDictT | None, host: str, ) -> HostsSSHConfigs: """Parse ssh config to get real connection parameters. :param ssh_config: SSH configuration for connection. Maybe config path, parsed as dict and paramiko parsed. :type ssh_config: str | paramiko.SSHConfig | dict[str, dict[str, str | int | bool | list[str]]] | None :param host: remote hostname :type host: str :return: parsed ssh config if available :rtype: HostsSSHConfigs """ if isinstance(ssh_config, paramiko.SSHConfig): return _parse_paramiko_ssh_config(ssh_config, host) if isinstance(ssh_config, dict): return _parse_dict_ssh_config(ssh_config, host) if isinstance(ssh_config, str): ssh_config_path = pathlib.Path(ssh_config).expanduser() if ssh_config_path.exists(): real_config = paramiko.SSHConfig() with ssh_config_path.open("tr", encoding="utf-8") as f_config: real_config.parse(f_config) return _parse_paramiko_ssh_config(real_config, host) system_ssh_config: paramiko.config.SSHConfig | None = _parse_ssh_config_file(SSH_CONFIG_FILE_SYSTEM) user_ssh_config: paramiko.config.SSHConfig | None = _parse_ssh_config_file(SSH_CONFIG_FILE_USER) if system_ssh_config is not None: config = _parse_paramiko_ssh_config(system_ssh_config, host) else: config = HostsSSHConfigs({host: SSHConfig(host)}) if user_ssh_config is not None: user_config = _parse_paramiko_ssh_config(user_ssh_config, host) for hostname, cfg in user_config.items(): config[hostname] = config[hostname].overridden_by(cfg) return config