Source code for exec_helpers.subprocess

#    Copyright 2018 - 2023 Aleksei Stepanov aka penguinolog.

#    Copyright 2016 Mirantis, Inc.

#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at

#         http://www.apache.org/licenses/LICENSE-2.0

#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""Python subprocess.Popen wrapper."""

from __future__ import annotations

import concurrent.futures
import copy
import datetime
import errno
import logging
import os
import pathlib
import subprocess  # nosec  # Expected usage
import typing
import warnings

from exec_helpers import api
from exec_helpers import constants
from exec_helpers import exceptions
from exec_helpers import exec_result
from exec_helpers import proc_enums

from . import _log_templates
from . import _subprocess_helpers

if typing.TYPE_CHECKING:
    from types import TracebackType

    from exec_helpers.api import CalledProcessErrorSubClassT
    from exec_helpers.api import CommandT
    from exec_helpers.api import ErrorInfoT
    from exec_helpers.api import ExpectedExitCodesT
    from exec_helpers.api import LogMaskReT
    from exec_helpers.api import OptionalStdinT
    from exec_helpers.api import OptionalTimeoutT

__all__ = ("CwdT", "EnvT", "Subprocess", "SubprocessExecuteAsyncResult")

EnvT = typing.Optional[
    typing.Union[typing.Mapping[bytes, typing.Union[bytes, str]], typing.Mapping[str, typing.Union[bytes, str]]]
]
CwdT = typing.Optional[typing.Union[str, bytes, pathlib.Path]]


# noinspection PyTypeHints
class SubprocessExecuteAsyncResult(api.ExecuteAsyncResult):
    """Override original NamedTuple with proper typing."""

    __slots__ = ()

    @property
    def interface(self) -> subprocess.Popen[bytes]:
        """Override original NamedTuple with proper typing.

        :return: control interface
        :rtype: subprocess.Popen[bytes]
        """
        return super().interface  # type: ignore[no-any-return]

    @property
    def stdin(self) -> typing.IO[bytes] | None:
        """Override original NamedTuple with proper typing.

        :return: STDIN interface
        :rtype: typing.IO[bytes] | None
        """
        warnings.warn(
            "stdin access deprecated: FIFO is often closed on execution and direct access is not expected.",
            DeprecationWarning,
            stacklevel=2,
        )
        return super().stdin

    @property
    def stderr(self) -> typing.IO[bytes] | None:
        """Override original NamedTuple with proper typing.

        :return: STDERR interface
        :rtype: typing.IO[bytes] | None
        """
        return super().stderr

    @property
    def stdout(self) -> typing.IO[bytes] | None:
        """Override original NamedTuple with proper typing.

        :return: STDOUT interface
        :rtype: typing.IO[bytes] | None
        """
        return super().stdout


class _SubprocessExecuteContext(api.ExecuteContext, typing.ContextManager[SubprocessExecuteAsyncResult]):
    """Subprocess Execute context."""

    __slots__ = ("__cwd", "__env", "__process")

    def __init__(
        self,
        *,
        command: str,
        stdin: bytes | None = None,
        open_stdout: bool = True,
        open_stderr: bool = True,
        cwd: CwdT = None,
        env: EnvT = None,
        logger: logging.Logger,
        **kwargs: typing.Any,
    ) -> None:
        """Subprocess Execute context.

        :param command: Command for execution (fully formatted)
        :type command: str
        :param stdin: pass STDIN text to the process (fully formatted)
        :type stdin: bytes
        :param open_stdout: open STDOUT stream for read
        :type open_stdout: bool
        :param open_stderr: open STDERR stream for read
        :type open_stderr: bool
        :param cwd: Sets the current directory before the child is executed.
        :type cwd: str | bytes | pathlib.Path | None
        :param env: Defines the environment variables for the new process.
        :type env: Mapping[str | bytes, str | bytes] | None
        :param logger: instance logger
        :type logger: logging.Logger
        :param kwargs: additional parameters for call.
        :type kwargs: typing.Any
        """
        super().__init__(
            command=command,
            stdin=stdin,
            open_stdout=open_stdout,
            open_stderr=open_stderr,
            logger=logger,
            **kwargs,
        )
        self.__cwd = cwd
        self.__env = env
        self.__process: subprocess.Popen[bytes] | None = None

    def __repr__(self) -> str:
        """Debug string.

        :return: reproduce for debug
        :rtype: str
        """
        return (
            f"<Subprocess().open_execute_context("
            f"command={self.command!r}, "
            f"stdin={self.stdin!r}, "
            f"open_stdout={self.open_stdout!r}, "
            f"open_stderr={self.open_stderr!r}, "
            f"cwd={self.__cwd!r}, "
            f"env={self.__env!r}, "
            f"logger={self.logger!r}) "
            f"at {id(self)}>"
        )

    def __enter__(self) -> SubprocessExecuteAsyncResult:
        """Context manager enter.

        :return: raw execution information
        :rtype: SshExecuteAsyncResult
        :raises OSError: stdin write failed/stdin close failed

        Command is executed only in context manager to be sure, that everything will be cleaned up properly.
        """
        started = datetime.datetime.now(tz=datetime.timezone.utc)

        self.__process = subprocess.Popen(
            args=self.command,
            stdout=subprocess.PIPE if self.open_stdout else subprocess.DEVNULL,
            stderr=subprocess.PIPE if self.open_stderr else subprocess.DEVNULL,
            stdin=subprocess.PIPE,
            shell=True,
            cwd=self.__cwd,
            env=self.__env,
            universal_newlines=False,
            **_subprocess_helpers.subprocess_kw,
        )
        process = self.__process.__enter__()

        if self.stdin is not None:
            if process.stdin is None:
                self.logger.warning("STDIN pipe is not set, but STDIN data is available to send.")
            else:
                try:
                    process.stdin.write(self.stdin)
                except BrokenPipeError:
                    self.logger.warning("STDIN Send failed: broken PIPE")
                except OSError as exc:
                    if exc.errno == errno.EINVAL:
                        # bpo-19612, bpo-30418: On Windows, stdin.write() fails
                        # with EINVAL if the child process exited or if the child
                        # process is still running but closed the pipe.
                        self.logger.warning("STDIN Send failed: closed PIPE")
                    else:
                        _subprocess_helpers.kill_proc_tree(process.pid)
                        process.kill()
                        raise
                try:
                    process.stdin.close()
                except BrokenPipeError:
                    self.logger.warning("STDIN Send failed: broken PIPE")
                except OSError as exc:
                    if exc.errno != errno.EINVAL:
                        process.kill()
                        raise

        # noinspection PyArgumentList
        return SubprocessExecuteAsyncResult(
            interface=process,
            stdin=None,
            stderr=process.stderr,
            stdout=process.stdout,
            started=started,
        )

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        if self.__process is not None:
            self.__process.__exit__(exc_type, exc_val, exc_tb)
            self.__process = None


[docs] class Subprocess(api.ExecHelper): """Subprocess helper with timeouts and lock-free FIFO. For excluding race-conditions we allow to run 1 command simultaneously :param log_mask_re: regex lookup rule to mask command for logger. all MATCHED groups will be replaced by '<*masked*>' :type log_mask_re: str | re.Pattern[str] | None .. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd .. versionchanged:: 3.1.0 Not singleton anymore. Only lock is shared between all instances. .. versionchanged:: 3.2.0 Logger can be enforced. .. versionchanged:: 4.1.0 support chroot .. versionchanged:: 4.3.0 Lock is not shared anymore: allow parallel call of different instances. """
[docs] def __init__( self, log_mask_re: LogMaskReT = None, ) -> None: """Subprocess helper with timeouts and lock-free FIFO.""" mod_name = "exec_helpers" if self.__module__.startswith("exec_helpers") else self.__module__ super().__init__( logger=logging.getLogger(f"{mod_name}.{self.__class__.__name__}"), log_mask_re=log_mask_re, )
def _exec_command( # type: ignore[override] self, command: str, async_result: SubprocessExecuteAsyncResult, timeout: OptionalTimeoutT, *, verbose: bool = False, log_mask_re: LogMaskReT = None, stdin: OptionalStdinT = None, log_stdout: bool = True, log_stderr: bool = True, **kwargs: typing.Any, ) -> exec_result.ExecResult: """Get exit status from channel with timeout. :param command: Command for execution :type command: str :param async_result: execute_async result :type async_result: SubprocessExecuteAsyncResult :param timeout: Timeout for command execution :type timeout: int | float | None :param verbose: produce verbose log record on command call :type verbose: bool :param log_mask_re: regex lookup rule to mask command for logger. all MATCHED groups will be replaced by '<*masked*>' :type log_mask_re: str | re.Pattern[str] | None :param stdin: pass STDIN text to the process :type stdin: bytes | str | bytearray | None :param log_stdout: log STDOUT during read :type log_stdout: bool :param log_stderr: log STDERR during read :type log_stderr: bool :param kwargs: additional parameters for call. :type kwargs: typing.Any :return: Execution result :rtype: ExecResult :raises OSError: exception during process kill (and not regarding to already closed process) :raises ExecHelperNoKillError: Process not dies on SIGTERM & SIGKILL :raises ExecHelperTimeoutError: Timeout exceeded .. versionadded:: 1.2.0 """ def poll_stdout() -> None: """Sync stdout poll.""" result.read_stdout(src=async_result.stdout, log=self.logger if log_stdout else None, verbose=verbose) def poll_stderr() -> None: """Sync stderr poll.""" result.read_stderr(src=async_result.stderr, log=self.logger if log_stderr else None, verbose=verbose) def close_streams() -> None: """Enforce FIFO closure.""" if async_result.stdout is not None and not async_result.stdout.closed: async_result.stdout.close() if async_result.stderr is not None and not async_result.stderr.closed: async_result.stderr.close() # Store command with hidden data cmd_for_log: str = self._mask_command(cmd=command, log_mask_re=log_mask_re) result = exec_result.ExecResult(cmd=cmd_for_log, stdin=stdin, started=async_result.started) with concurrent.futures.ThreadPoolExecutor(thread_name_prefix="exec-helpers_subprocess_poll_") as executor: stdout_future: concurrent.futures.Future[None] = executor.submit(poll_stdout) stderr_future: concurrent.futures.Future[None] = executor.submit(poll_stderr) try: exit_code: int = async_result.interface.wait(timeout=timeout) # Wait real timeout here # Minimal timeout to complete polling concurrent.futures.wait([stdout_future, stderr_future], timeout=0.1) result.exit_code = exit_code except subprocess.TimeoutExpired as exc: # kill -9 for all subprocesses _subprocess_helpers.kill_proc_tree(async_result.interface.pid) exit_signal: int | None = async_result.interface.poll() if exit_signal is None: # pylint: disable=consider-using-assignment-expr raise exceptions.ExecHelperNoKillError( result=result, timeout=timeout, # type: ignore[arg-type] ) from exc result.exit_code = exit_signal else: return result finally: stdout_future.cancel() stderr_future.cancel() _, not_done = concurrent.futures.wait([stdout_future, stderr_future], timeout=1) if not_done and async_result.interface.returncode: self.logger.critical( f"Process {command!s} was closed with exit code {async_result.interface.returncode!s}, " f"but FIFO buffers are still open" ) result.set_timestamp() close_streams() wait_err_msg: str = _log_templates.CMD_WAIT_ERROR.format(result=result, timeout=timeout) self.logger.debug(wait_err_msg) raise exceptions.ExecHelperTimeoutError(result=result, timeout=timeout) # type: ignore[arg-type] def open_execute_context( self, command: str, *, stdin: OptionalStdinT = None, open_stdout: bool = True, open_stderr: bool = True, chroot_path: str | None = None, cwd: CwdT = None, env: EnvT = None, env_patch: EnvT = None, **kwargs: typing.Any, ) -> _SubprocessExecuteContext: """Get execution context manager. :param command: Command for execution :type command: str | Iterable[str] :param stdin: pass STDIN text to the process :type stdin: bytes | str | bytearray | None :param open_stdout: open STDOUT stream for read :type open_stdout: bool :param open_stderr: open STDERR stream for read :type open_stderr: bool :param chroot_path: chroot path override :type chroot_path: str | None :param cwd: Sets the current directory before the child is executed. :type cwd: str | bytes | pathlib.Path | None :param env: Defines the environment variables for the new process. :type env: Mapping[str | bytes, str | bytes] | None :param env_patch: Defines the environment variables to ADD for the new process. :type env_patch: Mapping[str | bytes, str | bytes] | None :param kwargs: additional parameters for call. :type kwargs: typing.Any :return: Execute context :rtype: _SubprocessExecuteContext .. versionadded:: 8.0.0 """ if env_patch is not None: # make mutable copy env = dict(copy.deepcopy(os.environ) if env is None else copy.deepcopy(env)) # type: ignore[arg-type] env.update(env_patch) # type: ignore[arg-type] return _SubprocessExecuteContext( command=f"{self._prepare_command(cmd=command, chroot_path=chroot_path)}\n", stdin=None if stdin is None else self._string_bytes_bytearray_as_bytes(stdin), open_stdout=open_stdout, open_stderr=open_stderr, cwd=cwd, env=env, logger=self.logger, **kwargs, )
[docs] def execute( self, command: CommandT, verbose: bool = False, timeout: OptionalTimeoutT = constants.DEFAULT_TIMEOUT, *, log_mask_re: LogMaskReT = None, stdin: OptionalStdinT = None, open_stdout: bool = True, log_stdout: bool = True, open_stderr: bool = True, log_stderr: bool = True, chroot_path: str | None = None, cwd: CwdT = None, env: EnvT = None, env_patch: EnvT = None, **kwargs: typing.Any, ) -> exec_result.ExecResult: """Execute command and wait for return code. :param command: Command for execution :type command: str | Iterable[str] :param verbose: Produce log.info records for command call and output :type verbose: bool :param timeout: Timeout for command execution. :type timeout: int | float | None :param log_mask_re: regex lookup rule to mask command for logger. all MATCHED groups will be replaced by '<*masked*>' :type log_mask_re: str | re.Pattern[str] | None :param stdin: pass STDIN text to the process :type stdin: bytes | str | bytearray | None :param open_stdout: open STDOUT stream for read :type open_stdout: bool :param log_stdout: log STDOUT during read :type log_stdout: bool :param open_stderr: open STDERR stream for read :type open_stderr: bool :param log_stderr: log STDERR during read :type log_stderr: bool :param chroot_path: chroot path override :type chroot_path: str | None :param cwd: Sets the current directory before the child is executed. :type cwd: str | bytes | pathlib.Path | None :param env: Defines the environment variables for the new process. :type env: Mapping[str | bytes, str | bytes] | None :param env_patch: Defines the environment variables to ADD for the new process. :type env_patch: Mapping[str | bytes, str | bytes] | None :param kwargs: additional parameters for call. :type kwargs: typing.Any :return: Execution result :rtype: ExecResult :raises ExecHelperTimeoutError: Timeout exceeded .. versionchanged:: 1.2.0 default timeout 1 hour .. versionchanged:: 2.1.0 Allow parallel calls .. versionchanged:: 7.0.0 Allow command as list of arguments. Command will be joined with components escaping. .. versionchanged:: 8.0.0 chroot path exposed. """ return super().execute( command=command, verbose=verbose, timeout=timeout, log_mask_re=log_mask_re, stdin=stdin, open_stdout=open_stdout, log_stdout=log_stdout, open_stderr=open_stderr, log_stderr=log_stderr, chroot_path=chroot_path, cwd=cwd, env=env, env_patch=env_patch, **kwargs, )
[docs] def __call__( self, command: CommandT, verbose: bool = False, timeout: OptionalTimeoutT = constants.DEFAULT_TIMEOUT, *, log_mask_re: LogMaskReT = None, stdin: OptionalStdinT = None, open_stdout: bool = True, log_stdout: bool = True, open_stderr: bool = True, log_stderr: bool = True, chroot_path: str | None = None, cwd: CwdT = None, env: EnvT = None, env_patch: EnvT = None, **kwargs: typing.Any, ) -> exec_result.ExecResult: """Execute command and wait for return code. :param command: Command for execution :type command: str | Iterable[str] :param verbose: Produce log.info records for command call and output :type verbose: bool :param timeout: Timeout for command execution. :type timeout: int | float | None :param log_mask_re: regex lookup rule to mask command for logger. all MATCHED groups will be replaced by '<*masked*>' :type log_mask_re: str | re.Pattern[str] | None :param stdin: pass STDIN text to the process :type stdin: bytes | str | bytearray | None :param open_stdout: open STDOUT stream for read :type open_stdout: bool :param log_stdout: log STDOUT during read :type log_stdout: bool :param open_stderr: open STDERR stream for read :type open_stderr: bool :param log_stderr: log STDERR during read :type log_stderr: bool :param chroot_path: chroot path override :type chroot_path: str | None :param cwd: Sets the current directory before the child is executed. :type cwd: str | bytes | pathlib.Path | None :param env: Defines the environment variables for the new process. :type env: Mapping[str | bytes, str | bytes] | None :param env_patch: Defines the environment variables to ADD for the new process. :type env_patch: Mapping[str | bytes, str | bytes] | None :param kwargs: additional parameters for call. :type kwargs: typing.Any :return: Execution result :rtype: ExecResult :raises ExecHelperTimeoutError: Timeout exceeded .. versionchanged:: 1.2.0 default timeout 1 hour .. versionchanged:: 2.1.0 Allow parallel calls """ return super().__call__( command=command, verbose=verbose, timeout=timeout, log_mask_re=log_mask_re, stdin=stdin, open_stdout=open_stdout, log_stdout=log_stdout, open_stderr=open_stderr, log_stderr=log_stderr, chroot_path=chroot_path, cwd=cwd, env=env, env_patch=env_patch, **kwargs, )
[docs] def check_call( self, command: CommandT, verbose: bool = False, timeout: OptionalTimeoutT = constants.DEFAULT_TIMEOUT, error_info: ErrorInfoT = None, expected: ExpectedExitCodesT = (proc_enums.EXPECTED,), raise_on_err: bool = True, *, log_mask_re: LogMaskReT = None, stdin: OptionalStdinT = None, open_stdout: bool = True, log_stdout: bool = True, open_stderr: bool = True, log_stderr: bool = True, cwd: CwdT = None, env: EnvT = None, env_patch: EnvT = None, exception_class: CalledProcessErrorSubClassT = exceptions.CalledProcessError, **kwargs: typing.Any, ) -> exec_result.ExecResult: """Execute command and check for return code. :param command: Command for execution :type command: str | Iterable[str] :param verbose: Produce log.info records for command call and output :type verbose: bool :param timeout: Timeout for command execution. :type timeout: int | float | None :param error_info: Text for error details, if fail happens :type error_info: str | None :param expected: expected return codes (0 by default) :type expected: Iterable[int | proc_enums.ExitCodes] :param raise_on_err: Raise exception on unexpected return code :type raise_on_err: bool :param log_mask_re: regex lookup rule to mask command for logger. all MATCHED groups will be replaced by '<*masked*>' :type log_mask_re: str | re.Pattern[str] | None :param stdin: pass STDIN text to the process :type stdin: bytes | str | bytearray | None :param open_stdout: open STDOUT stream for read :type open_stdout: bool :param log_stdout: log STDOUT during read :type log_stdout: bool :param open_stderr: open STDERR stream for read :type open_stderr: bool :param log_stderr: log STDERR during read :type log_stderr: bool :param cwd: Sets the current directory before the child is executed. :type cwd: str | bytes | pathlib.Path | None :param env: Defines the environment variables for the new process. :type env: Mapping[str | bytes, str | bytes] | None :param env_patch: Defines the environment variables to ADD for the new process. :type env_patch: Mapping[str | bytes, str | bytes] | None :param exception_class: Exception class for errors. Subclass of CalledProcessError is mandatory. :type exception_class: type[exceptions.CalledProcessError] :param kwargs: additional parameters for call. :type kwargs: typing.Any :return: Execution result :rtype: ExecResult :raises ExecHelperTimeoutError: Timeout exceeded :raises CalledProcessError: Unexpected exit code .. versionchanged:: 1.2.0 default timeout 1 hour .. versionchanged:: 3.2.0 Exception class can be substituted .. versionchanged:: 3.4.0 Expected is not optional, defaults os dependent """ return super().check_call( command=command, verbose=verbose, timeout=timeout, error_info=error_info, expected=expected, raise_on_err=raise_on_err, log_mask_re=log_mask_re, stdin=stdin, open_stdout=open_stdout, log_stdout=log_stdout, open_stderr=open_stderr, log_stderr=log_stderr, cwd=cwd, env=env, env_patch=env_patch, exception_class=exception_class, **kwargs, )
[docs] def check_stderr( self, command: CommandT, verbose: bool = False, timeout: OptionalTimeoutT = constants.DEFAULT_TIMEOUT, error_info: ErrorInfoT = None, raise_on_err: bool = True, *, expected: ExpectedExitCodesT = (proc_enums.EXPECTED,), log_mask_re: LogMaskReT = None, stdin: OptionalStdinT = None, open_stdout: bool = True, log_stdout: bool = True, open_stderr: bool = True, log_stderr: bool = True, cwd: CwdT = None, env: EnvT = None, env_patch: EnvT = None, exception_class: CalledProcessErrorSubClassT = exceptions.CalledProcessError, **kwargs: typing.Any, ) -> exec_result.ExecResult: """Execute command expecting return code 0 and empty STDERR. :param command: Command for execution :type command: str | Iterable[str] :param verbose: Produce log.info records for command call and output :type verbose: bool :param timeout: Timeout for command execution. :type timeout: int | float | None :param error_info: Text for error details, if fail happens :type error_info: str | None :param raise_on_err: Raise exception on unexpected return code :type raise_on_err: bool :param expected: expected return codes (0 by default) :type expected: Iterable[int | proc_enums.ExitCodes] :param log_mask_re: regex lookup rule to mask command for logger. all MATCHED groups will be replaced by '<*masked*>' :type log_mask_re: str | re.Pattern[str] | None :param stdin: pass STDIN text to the process :type stdin: bytes | str | bytearray | None :param open_stdout: open STDOUT stream for read :type open_stdout: bool :param log_stdout: log STDOUT during read :type log_stdout: bool :param open_stderr: open STDERR stream for read :type open_stderr: bool :param log_stderr: log STDERR during read :type log_stderr: bool :param cwd: Sets the current directory before the child is executed. :type cwd: str | bytes | pathlib.Path | None :param env: Defines the environment variables for the new process. :type env: Mapping[str | bytes, str | bytes] | None :param env_patch: Defines the environment variables to ADD for the new process. :type env_patch: Mapping[str | bytes, str | bytes] | None :param exception_class: Exception class for errors. Subclass of CalledProcessError is mandatory. :type exception_class: type[exceptions.CalledProcessError] :param kwargs: additional parameters for call. :type kwargs: typing.Any :return: Execution result :rtype: ExecResult :raises ExecHelperTimeoutError: Timeout exceeded :raises CalledProcessError: Unexpected exit code or stderr presents .. versionchanged:: 1.2.0 default timeout 1 hour .. versionchanged:: 3.2.0 Exception class can be substituted .. versionchanged:: 3.4.0 Expected is not optional, defaults os dependent """ return super().check_stderr( command=command, verbose=verbose, timeout=timeout, error_info=error_info, raise_on_err=raise_on_err, expected=expected, log_mask_re=log_mask_re, stdin=stdin, open_stdout=open_stdout, log_stdout=log_stdout, open_stderr=open_stderr, log_stderr=log_stderr, cwd=cwd, env=env, env_patch=env_patch, exception_class=exception_class, **kwargs, )