Source code for exec_helpers.exec_result

#    Copyright 2018 - 2023 Aleksei Stepanov aka penguinolog.

#    Copyright 2016 Mirantis, Inc.

#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at

#         http://www.apache.org/licenses/LICENSE-2.0

#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""Execution result."""

from __future__ import annotations

import contextlib
import datetime
import functools
import json
import logging
import threading
import typing

from exec_helpers import exceptions
from exec_helpers import proc_enums

try:
    # noinspection PyPackageRequirements
    import yaml
except ImportError:
    yaml = None  # type: ignore[assignment]
try:
    from ruamel import yaml as ruamel_yaml
except ImportError:
    ruamel_yaml = None  # type: ignore[assignment]
try:
    # noinspection PyPackageRequirements
    import defusedxml.ElementTree
except ImportError:
    defusedxml = None  # pylint: disable=invalid-name
try:
    # noinspection PyPackageRequirements
    import lxml.etree  # nosec
except ImportError:
    lxml = None  # pylint: disable=invalid-name

if typing.TYPE_CHECKING:
    import xml.etree.ElementTree  # nosec  # for typing only
    from collections.abc import Callable
    from collections.abc import Collection
    from collections.abc import Iterable
    from collections.abc import Sequence

    # noinspection PyPackageRequirements
    import logwrap

    from exec_helpers.proc_enums import ExitCodeT

    _T = typing.TypeVar("_T")

__all__ = ("ExecResult", "OptionalStdinT")

LOGGER: logging.Logger = logging.getLogger(__name__)

OptionalStdinT = typing.Union[bytes, str, bytearray, None]


def _handle_deserialize(
    fmt: str,
) -> Callable[[Callable[[ExecResult], _T]], Callable[[ExecResult], _T]]:
    """Decorator fabric for decoder getters.

    :return: real decorator
    """

    def decorator(method: Callable[[ExecResult], _T]) -> Callable[[ExecResult], _T]:
        """Decorator for decoder getter.

        :return: wrapped to try/except getter
        """

        @functools.wraps(method)
        def wrapper(self: ExecResult) -> _T:
            """Getter wrapper.

            :return: getter output
            """
            try:
                return method(self)
            except Exception as exc:
                tmpl: str = f"{{self.cmd}} stdout is not valid {fmt}:\n{{stdout!r}}\n"
                LOGGER.exception(tmpl.format(self=self, stdout=self.stdout_str))

                raise exceptions.DeserializeValueError(tmpl.format(self=self, stdout=self.stdout_brief)).with_traceback(
                    exc.__traceback__
                ) from exc

        return wrapper

    return decorator


def _get_str_from_bin(src: bytearray) -> str:
    """Join data in list to the string.

    :param src: source to process
    :type src: bytearray
    :return: decoded string
    :rtype: str
    """
    return src.rstrip().decode(encoding="utf-8", errors="backslashreplace")


def _get_bytearray_from_array(src: Iterable[bytes]) -> bytearray:
    """Get bytearray from array of bytes blocks.

    :param src: source to process
    :type src: list[bytes]
    :return: bytearray
    :rtype: bytearray
    """
    return bytearray(b"".join(src))


class LinesAccessProxy:
    """Lines access proxy."""

    __slots__ = ("_data",)

    def __init__(self, data: Sequence[bytes]) -> None:
        """Lines access proxy.

        :param data: data to work with.
        :type data: Sequence[bytes]
        """
        self._data: tuple[bytes, ...] = tuple(data)

    # pylint: disable=undefined-variable
    def __getitem__(
        self,
        item: int | slice | Iterable[int | slice | ellipsis],  # noqa: F821
    ) -> str:
        """Access magic.

        :param item: index
        :type item: int | slice | Iterable[int | slice | ellipsis]
        :return: Joined selected lines
        :rtype: str
        :raises TypeError: Unexpected key
        """
        if isinstance(item, int):
            return _get_str_from_bin(_get_bytearray_from_array([self._data[item]]))
        if isinstance(item, slice):
            return _get_str_from_bin(_get_bytearray_from_array(self._data[item]))
        if isinstance(item, tuple):
            buf: list[bytes] = []
            for rule in item:
                if isinstance(rule, int):
                    buf.append(self._data[rule])
                elif isinstance(rule, slice):
                    buf.extend(self._data[rule])
                elif rule is Ellipsis:
                    buf.append(b"...\n")
                else:
                    raise TypeError(f"Unexpected key type: {rule!r} (from {item!r})")
            return _get_str_from_bin(_get_bytearray_from_array(buf))
        raise TypeError(f"Unexpected key type: {item!r}")

    def __len__(self) -> int:  # pragma: no cover
        """Data len.

        :return: strings count
        :rtype: int
        """
        return len(self._data)

    def __str__(self) -> str:  # pragma: no cover
        """Get string for debug purposes.

        :return: string representation for full content
        :rtype: str
        """
        return self[:]

    def __repr__(self) -> str:
        """Repr for debug purposes.

        :return: full representation for debug purposes
        :rtype: str
        """
        return f"{self.__class__.__name__}(data={self._data!r})"


[docs] class ExecResult: """Execution result.""" __slots__ = ( "__cmd", "__exit_code", "__started", "__stderr_lock", "__stdin", "__stdout_lock", "__timestamp", "_stderr", "_stderr_brief", "_stderr_str", "_stdout", "_stdout_brief", "_stdout_str", )
[docs] def __init__( self, cmd: str, stdin: OptionalStdinT = None, stdout: Iterable[bytes] | None = None, stderr: Iterable[bytes] | None = None, exit_code: ExitCodeT = proc_enums.INVALID, *, started: datetime.datetime | None = None, ) -> None: """Command execution result. :param cmd: command :type cmd: str :param stdin: string STDIN :type stdin: bytes | str | bytearray | None :param stdout: binary STDOUT :type stdout: Iterable[bytes] | None :param stderr: binary STDERR :type stderr: Iterable[bytes] | None :param exit_code: Exit code. If integer - try to convert to BASH enum. :type exit_code: int | proc_enums.ExitCodes :param started: Timestamp of command start :type started: datetime.datetime | None """ self.__stdout_lock = threading.RLock() self.__stderr_lock = threading.RLock() self.__cmd: str = cmd if isinstance(stdin, bytes): self.__stdin: str | None = _get_str_from_bin(bytearray(stdin)) elif isinstance(stdin, bytearray): self.__stdin = _get_str_from_bin(stdin) else: self.__stdin = stdin if stdout is not None: self._stdout: tuple[bytes, ...] = tuple(stdout) else: self._stdout = () if stderr is not None: self._stderr: tuple[bytes, ...] = tuple(stderr) else: self._stderr = () self.__exit_code: ExitCodeT = proc_enums.INVALID self.__timestamp: datetime.datetime | None = None self.exit_code = exit_code # noqa: PLE0237 self.__started: datetime.datetime | None = started # By default is none: self._stdout_str: str | None = None self._stderr_str: str | None = None self._stdout_brief: str | None = None self._stderr_brief: str | None = None
@property def stdout_lock(self) -> threading.RLock: """Lock object for thread-safe operation. :return: internal lock for stdout :rtype: threading.RLock .. versionadded:: 2.2.0 """ return self.__stdout_lock @property def stderr_lock(self) -> threading.RLock: """Lock object for thread-safe operation. :return: internal lock for stderr :rtype: threading.RLock .. versionadded:: 2.2.0 """ return self.__stderr_lock @property def timestamp(self) -> datetime.datetime | None: """Timestamp. :return: exit code timestamp :rtype: datetime.datetime | None """ return self.__timestamp
[docs] def set_timestamp(self) -> None: """Set timestamp if empty. This will block future object changes. .. versionadded:: 4.0.0 """ if self.timestamp is None: self.__timestamp = datetime.datetime.now(tz=datetime.timezone.utc)
@classmethod def _get_brief(cls, data: tuple[bytes, ...]) -> str: """Get brief output: 7 lines maximum (3 first + ... + 3 last). :param data: source to process :type data: tuple[bytes, ...] :return: brief from source :rtype: str """ if len(data) <= 7: return _get_str_from_bin(_get_bytearray_from_array(data)) return LinesAccessProxy(data)[:3, ..., -3:] @property def cmd(self) -> str: """Executed command. :return: command string :rtype: str """ return self.__cmd @property def stdin(self) -> str | None: """Stdin input as string. :return: STDIN content if applicable. :rtype: str | None """ return self.__stdin @property def stdout(self) -> tuple[bytes, ...]: """Stdout output as list of binaries. :return: STDOUT as tuple of binary strings :rtype: tuple[bytes, ...] """ return self._stdout @property def stderr(self) -> tuple[bytes, ...]: """Stderr output as list of binaries. :return: STDERR as tuple of binary strings :rtype: tuple[bytes, ...] """ return self._stderr @staticmethod def _poll_stream( src: Iterable[bytes], log: logging.Logger | None = None, verbose: bool = False, ) -> list[bytes]: """Stream poll helper. :param src: source to read from :param log: logger instance, if line per line logging expected :param verbose: use INFO level for logging :return: read result as list of bytes strings :rtype: list[bytes] """ dst: list[bytes] = [] with contextlib.suppress(IOError): for line in src: dst.append(line) if log: log.log( level=logging.INFO if verbose else logging.DEBUG, msg=line.decode("utf-8", errors="backslashreplace").rstrip(), ) return dst
[docs] def read_stdout( self, src: Iterable[bytes] | None = None, log: logging.Logger | None = None, verbose: bool = False, ) -> None: """Read stdout file-like object to stdout. :param src: source :type src: Iterable[bytes] | None :param log: logger :type log: logging.Logger | None :param verbose: use log.info instead of log.debug :type verbose: bool :raises RuntimeError: Exit code is already received .. versionchanged:: 1.2.0 - src can be None """ if not src: return if self.timestamp: raise RuntimeError("Final exit code received.") with self.stdout_lock: self._stdout_str = self._stdout_brief = None self._stdout += tuple(self._poll_stream(src, log, verbose))
[docs] def read_stderr( self, src: Iterable[bytes] | None = None, log: logging.Logger | None = None, verbose: bool = False, ) -> None: """Read stderr file-like object to stdout. :param src: source :type src: Iterable[bytes] | None :param log: logger :type log: logging.Logger | None :param verbose: use log.info instead of log.debug :type verbose: bool :raises RuntimeError: Exit code is already received .. versionchanged:: 1.2.0 - src can be None """ if not src: return if self.timestamp: raise RuntimeError("Final exit code received.") with self.stderr_lock: self._stderr_str = self._stderr_brief = None self._stderr += tuple(self._poll_stream(src, log, verbose))
@property def stdout_bin(self) -> bytearray: """Stdout in binary format. Sometimes logging is used to log binary objects too (example: Session), and for debug purposes we can use this as data source. :return: full STDOUT output as bytearray. :rtype: bytearray """ with self.stdout_lock: return _get_bytearray_from_array(self.stdout) @property def stderr_bin(self) -> bytearray: """Stderr in binary format. :return: full STDERR output as bytearray. :rtype: bytearray """ with self.stderr_lock: return _get_bytearray_from_array(self.stderr) @property def stdout_str(self) -> str: """Stdout output as string. :return: full STDOUT output. :rtype: str """ with self.stdout_lock: if self._stdout_str is None: self._stdout_str = _get_str_from_bin(self.stdout_bin) return self._stdout_str @property def stderr_str(self) -> str: """Stderr output as string. :return: full STDERR output. :rtype: str """ with self.stderr_lock: if self._stderr_str is None: self._stderr_str = _get_str_from_bin(self.stderr_bin) return self._stderr_str @property def stdout_brief(self) -> str: """Brief stdout output (mostly for exceptions). :return: up to 3 first and 3 last lines of output. :rtype: str """ with self.stdout_lock: if self._stdout_brief is None: self._stdout_brief = self._get_brief(self.stdout) return self._stdout_brief @property def stderr_brief(self) -> str: """Brief stderr output (mostly for exceptions). :return: up to 3 first and 3 last lines of output. :rtype: str """ with self.stderr_lock: if self._stderr_brief is None: self._stderr_brief = self._get_brief(self.stderr) return self._stderr_brief @property def stdout_lines(self) -> LinesAccessProxy: """Get lines by indexes. :return: proxy object for lines join by line indexes :rtype: LinesAccessProxy Usage example: .. code-block::python res.stdout_lines[<line_number>, <index_start>:<index_end>, ...] """ return LinesAccessProxy(self.stdout) @property def stderr_lines(self) -> LinesAccessProxy: """Magic to get lines human-friendly way. :return: proxy object for lines join by line indexes :rtype: LinesAccessProxy """ return LinesAccessProxy(self.stderr) @property def exit_code(self) -> ExitCodeT: """Return(exit) code of command. :return: exit code :rtype: int | proc_enums.ExitCodes """ return self.__exit_code @exit_code.setter def exit_code(self, new_val: ExitCodeT) -> None: """Return(exit) code of command. :param new_val: new exit code :type new_val: int | proc_enums.ExitCodes :raises RuntimeError: Exit code is already received :raises TypeError: exit code is not int instance If valid exit code is set - object became read-only. """ if self.timestamp: raise RuntimeError("Exit code is already received.") if not isinstance(new_val, int): raise TypeError(f"Exit code is strictly int, received: {new_val!r}") with self.stdout_lock, self.stderr_lock: self.__exit_code = proc_enums.exit_code_to_enum(new_val) if self.__exit_code != proc_enums.INVALID: self.__timestamp = datetime.datetime.now(tz=datetime.timezone.utc) @property def ok(self) -> bool: """Exit code is EX_OK. :return: Exit code is EX_OK :rtype: bool """ return self.exit_code == 0 # pylint: disable=use-implicit-booleaness-not-comparison-to-zero
[docs] def check_exit_code( self, expected_codes: Iterable[ExitCodeT] = (0,), raise_on_err: bool = True, *, error_info: str | None = None, exception_class: type[exceptions.CalledProcessError] = exceptions.CalledProcessError, logger: logging.Logger = LOGGER, verbose: bool = False, ) -> None: """Check exit code and log/raise for unexpected code. :param error_info: optional additional error information :type error_info: str | None :param raise_on_err: raise `exception_class` in case of error :type raise_on_err: bool :param expected_codes: iterable expected exit codes :type expected_codes: Iterable[int | ExitCodes] :param exception_class: exception class for usage in case of errors (subclass of CalledProcessError) :type exception_class: type[exceptions.CalledProcessError] :param logger: logger instance for error log :type logger: logging.Logger :param verbose: produce verbose log in case of failure :type verbose: bool :raises exceptions.CalledProcessError: unexpected exit code and raise_on_err enabled """ append: str = error_info + "\n" if error_info else "" expected = tuple(frozenset(expected_codes)) if self.exit_code not in expected: message = f"{append}Command {self.cmd!r} returned exit code {self.exit_code!s} while expected {expected!s}" if verbose: message += f"\nSTDOUT:\n{self.stdout_str}\n\nSTDERR:\n{self.stderr_str}" else: message += f"\nBrief STDOUT:\n{self.stdout_brief}\n\nBrief STDERR:\n{self.stderr_brief}" logger.error(msg=message) if raise_on_err: self.raise_for_status(expected_codes=expected, exception_class=exception_class)
[docs] def raise_for_status( self, expected_codes: Collection[ExitCodeT] = (0,), *, exception_class: type[exceptions.CalledProcessError] = exceptions.CalledProcessError, ) -> None: """Requests-like exit code checker. :param expected_codes: iterable expected exit codes :type expected_codes: Iterable[int | ExitCodes] :param exception_class: exception class for usage in case of errors (subclass of CalledProcessError) :type exception_class: type[exceptions.CalledProcessError] :raises exceptions.CalledProcessError: unexpected exit code and raise_on_err enabled """ if self.exit_code in expected_codes: return raise exception_class(self, expected_codes)
@property def started(self) -> datetime.datetime | None: """Timestamp of command start. :return: timestamp from command start, if applicable :rtype: datetime.datetime | None .. versionadded:: 4.0.0 """ return self.__started @property @_handle_deserialize("json") def stdout_json( self, ) -> typing.Any: """JSON from stdout. :return: decoded JSON document :rtype: typing.Any :raises DeserializeValueError: STDOUT can not be deserialized as JSON """ with self.stdout_lock: return json.loads(self.stdout_str) if yaml is not None or ruamel_yaml is not None: @property @_handle_deserialize("yaml") def stdout_yaml(self) -> typing.Any: """YAML from stdout. :return: decoded YAML document :rtype: typing.Any :raises DeserializeValueError: STDOUT can not be deserialized as YAML """ with self.stdout_lock: if yaml is not None: if yaml.__with_libyaml__: # pragma: no cover return yaml.load(self.stdout_str, Loader=yaml.CSafeLoader) # nosec # Safe return yaml.safe_load(self.stdout_str) # pragma: no cover return ruamel_yaml.YAML(typ="safe").load(self.stdout_str) # nosec # Safe if defusedxml is not None: # noinspection PyUnresolvedReferences @property @_handle_deserialize("xml") def stdout_xml(self) -> xml.etree.ElementTree.Element: """XML from stdout. :return: decoded XML document :rtype: xml.etree.ElementTree.Element :raises DeserializeValueError: STDOUT can not be deserialized as XML """ with self.stdout_lock: return defusedxml.ElementTree.fromstring(b"".join(self.stdout)) # type: ignore[no-any-return] if lxml is not None: @property @_handle_deserialize("lxml") def stdout_lxml(self) -> lxml.etree.Element: """XML from stdout using lxml. :return: decoded XML document :rtype: lxml.etree.Element :raises DeserializeValueError: STDOUT can not be deserialized as XML .. note:: Can be insecure. """ with self.stdout_lock: return lxml.etree.fromstring(b"".join(self.stdout)) # nosec[blacklist] def __dir__(self) -> list[str]: """Override dir for IDE and as source for getitem checks. :return: list with public attributes and methods :rtype: list[str] """ content = [ "cmd", "stdout", "stderr", "exit_code", "stdout_bin", "stderr_bin", "stdout_str", "stderr_str", "stdout_brief", "stderr_brief", "stdout_lines", "stderr_lines", "stdout_json", "lock", ] if yaml is not None or ruamel_yaml is not None: content.append("stdout_yaml") if defusedxml is not None: content.append("stdout_xml") if lxml is not None: content.append("stdout_lxml") return content def __getitem__(self, item: str) -> typing.Any: """Dict like get data. :param item: key :type item: str :return: item if attribute exists :rtype: typing.Any :raises IndexError: no attribute exists or not allowed to get (not in dir()) """ if item in dir(self): return getattr(self, item) raise IndexError(f'"{item}" not found in {dir(self)}') def __repr__(self) -> str: """Representation for debugging. :return: full representation for debug purposes :rtype: str """ if self.started: started = f" started={self.started!r}," else: started = "" return ( f"{self.__class__.__name__}(" f"cmd={self.cmd!r}, stdout={self.stdout!r}, stderr={self.stderr!r}, exit_code={self.exit_code!s},{started})" ) def __pretty_repr__( self, log_wrap: logwrap.PrettyRepr, indent: int = 0, no_indent_start: bool = False, ) -> str: """Make human-readable representation of object. :param log_wrap: logwrap instance :type log_wrap: logwrap.PrettyRepr :param indent: start indentation :type indent: int :param no_indent_start: do not indent open bracket and simple parameters :type no_indent_start: bool :return: formatted string :rtype: str """ next_indent = log_wrap.next_indent(indent) started = f"{'':<{next_indent}}started={self.started!r},\n" if self.started else "" stdout = log_wrap.process_element(self.stdout, indent=next_indent, no_indent_start=True) stderr = log_wrap.process_element(self.stderr, indent=next_indent, no_indent_start=True) msg = ( f"{'':<{0 if no_indent_start else indent}}{self.__class__.__name__}(\n" f"{'':<{next_indent}}cmd={self.cmd!r},\n" f"{'':<{next_indent}}stdout={stdout},\n" f"{'':<{next_indent}}stderr={stderr},\n" f"{'':<{next_indent}}exit_code={self.exit_code!s},\n" f"{started}" f"{'':<{0 if no_indent_start else indent}})" ) return msg def __str__(self) -> str: """Representation for logging. :return: string representation with brief information :rtype: str """ if self.started: started = f"\tstarted={self.started.strftime('%Y-%m-%d %H:%M:%S')},\n" if self.timestamp: _spent = (self.timestamp - self.started).seconds spent = f"\tspent={_spent // (60 * 60):02d}:{_spent // 60:02d}:{_spent % 60:02d},\n" else: spent = "" else: started = "" spent = "" return ( f"{self.__class__.__name__}(\n" f"\tcmd={self.cmd!r},\n" f"\tstdout=\n" f"{self.stdout_brief!r},\n" f"\tstderr=\n" f"{self.stderr_brief!r}, \n" f"\texit_code={self.exit_code!s},\n" f"{started}{spent})" ) def __eq__(self, other: object) -> bool: """Comparison. :param other: other ExecResult instance. :type other: typing.Any :return: current object equals other :rtype: bool """ return ( self.__class__ is other.__class__ or issubclass(self.__class__, other.__class__) or issubclass(other.__class__, self.__class__) ) and ( self.cmd == other.cmd # type: ignore[attr-defined] and self.stdin == other.stdin # type: ignore[attr-defined] and self.stdout == other.stdout # type: ignore[attr-defined] and self.stderr == other.stderr # type: ignore[attr-defined] and self.exit_code == other.exit_code # type: ignore[attr-defined] ) def __ne__(self, other: object) -> bool: """Comparison. :param other: other ExecResult instance. :type other: typing.Any :return: current object not equals other :rtype: bool """ return not self.__eq__(other) def __hash__(self) -> int: """Hash for usage as dict key and in sets. :return: calculated hash value :rtype: int """ return hash((self.__class__, self.cmd, self.stdin, self.stdout, self.stderr, self.exit_code))