You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
480 lines
16 KiB
Python
480 lines
16 KiB
Python
2 years ago
|
import contextlib
|
||
|
import io
|
||
|
import os
|
||
|
import shlex
|
||
|
import shutil
|
||
|
import sys
|
||
|
import tempfile
|
||
|
import typing as t
|
||
|
from types import TracebackType
|
||
|
|
||
|
from . import formatting
|
||
|
from . import termui
|
||
|
from . import utils
|
||
|
from ._compat import _find_binary_reader
|
||
|
|
||
|
if t.TYPE_CHECKING:
|
||
|
from .core import BaseCommand
|
||
|
|
||
|
|
||
|
class EchoingStdin:
|
||
|
def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None:
|
||
|
self._input = input
|
||
|
self._output = output
|
||
|
self._paused = False
|
||
|
|
||
|
def __getattr__(self, x: str) -> t.Any:
|
||
|
return getattr(self._input, x)
|
||
|
|
||
|
def _echo(self, rv: bytes) -> bytes:
|
||
|
if not self._paused:
|
||
|
self._output.write(rv)
|
||
|
|
||
|
return rv
|
||
|
|
||
|
def read(self, n: int = -1) -> bytes:
|
||
|
return self._echo(self._input.read(n))
|
||
|
|
||
|
def read1(self, n: int = -1) -> bytes:
|
||
|
return self._echo(self._input.read1(n)) # type: ignore
|
||
|
|
||
|
def readline(self, n: int = -1) -> bytes:
|
||
|
return self._echo(self._input.readline(n))
|
||
|
|
||
|
def readlines(self) -> t.List[bytes]:
|
||
|
return [self._echo(x) for x in self._input.readlines()]
|
||
|
|
||
|
def __iter__(self) -> t.Iterator[bytes]:
|
||
|
return iter(self._echo(x) for x in self._input)
|
||
|
|
||
|
def __repr__(self) -> str:
|
||
|
return repr(self._input)
|
||
|
|
||
|
|
||
|
@contextlib.contextmanager
|
||
|
def _pause_echo(stream: t.Optional[EchoingStdin]) -> t.Iterator[None]:
|
||
|
if stream is None:
|
||
|
yield
|
||
|
else:
|
||
|
stream._paused = True
|
||
|
yield
|
||
|
stream._paused = False
|
||
|
|
||
|
|
||
|
class _NamedTextIOWrapper(io.TextIOWrapper):
|
||
|
def __init__(
|
||
|
self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any
|
||
|
) -> None:
|
||
|
super().__init__(buffer, **kwargs)
|
||
|
self._name = name
|
||
|
self._mode = mode
|
||
|
|
||
|
@property
|
||
|
def name(self) -> str:
|
||
|
return self._name
|
||
|
|
||
|
@property
|
||
|
def mode(self) -> str:
|
||
|
return self._mode
|
||
|
|
||
|
|
||
|
def make_input_stream(
|
||
|
input: t.Optional[t.Union[str, bytes, t.IO]], charset: str
|
||
|
) -> t.BinaryIO:
|
||
|
# Is already an input stream.
|
||
|
if hasattr(input, "read"):
|
||
|
rv = _find_binary_reader(t.cast(t.IO, input))
|
||
|
|
||
|
if rv is not None:
|
||
|
return rv
|
||
|
|
||
|
raise TypeError("Could not find binary reader for input stream.")
|
||
|
|
||
|
if input is None:
|
||
|
input = b""
|
||
|
elif isinstance(input, str):
|
||
|
input = input.encode(charset)
|
||
|
|
||
|
return io.BytesIO(t.cast(bytes, input))
|
||
|
|
||
|
|
||
|
class Result:
|
||
|
"""Holds the captured result of an invoked CLI script."""
|
||
|
|
||
|
def __init__(
|
||
|
self,
|
||
|
runner: "CliRunner",
|
||
|
stdout_bytes: bytes,
|
||
|
stderr_bytes: t.Optional[bytes],
|
||
|
return_value: t.Any,
|
||
|
exit_code: int,
|
||
|
exception: t.Optional[BaseException],
|
||
|
exc_info: t.Optional[
|
||
|
t.Tuple[t.Type[BaseException], BaseException, TracebackType]
|
||
|
] = None,
|
||
|
):
|
||
|
#: The runner that created the result
|
||
|
self.runner = runner
|
||
|
#: The standard output as bytes.
|
||
|
self.stdout_bytes = stdout_bytes
|
||
|
#: The standard error as bytes, or None if not available
|
||
|
self.stderr_bytes = stderr_bytes
|
||
|
#: The value returned from the invoked command.
|
||
|
#:
|
||
|
#: .. versionadded:: 8.0
|
||
|
self.return_value = return_value
|
||
|
#: The exit code as integer.
|
||
|
self.exit_code = exit_code
|
||
|
#: The exception that happened if one did.
|
||
|
self.exception = exception
|
||
|
#: The traceback
|
||
|
self.exc_info = exc_info
|
||
|
|
||
|
@property
|
||
|
def output(self) -> str:
|
||
|
"""The (standard) output as unicode string."""
|
||
|
return self.stdout
|
||
|
|
||
|
@property
|
||
|
def stdout(self) -> str:
|
||
|
"""The standard output as unicode string."""
|
||
|
return self.stdout_bytes.decode(self.runner.charset, "replace").replace(
|
||
|
"\r\n", "\n"
|
||
|
)
|
||
|
|
||
|
@property
|
||
|
def stderr(self) -> str:
|
||
|
"""The standard error as unicode string."""
|
||
|
if self.stderr_bytes is None:
|
||
|
raise ValueError("stderr not separately captured")
|
||
|
return self.stderr_bytes.decode(self.runner.charset, "replace").replace(
|
||
|
"\r\n", "\n"
|
||
|
)
|
||
|
|
||
|
def __repr__(self) -> str:
|
||
|
exc_str = repr(self.exception) if self.exception else "okay"
|
||
|
return f"<{type(self).__name__} {exc_str}>"
|
||
|
|
||
|
|
||
|
class CliRunner:
|
||
|
"""The CLI runner provides functionality to invoke a Click command line
|
||
|
script for unittesting purposes in a isolated environment. This only
|
||
|
works in single-threaded systems without any concurrency as it changes the
|
||
|
global interpreter state.
|
||
|
|
||
|
:param charset: the character set for the input and output data.
|
||
|
:param env: a dictionary with environment variables for overriding.
|
||
|
:param echo_stdin: if this is set to `True`, then reading from stdin writes
|
||
|
to stdout. This is useful for showing examples in
|
||
|
some circumstances. Note that regular prompts
|
||
|
will automatically echo the input.
|
||
|
:param mix_stderr: if this is set to `False`, then stdout and stderr are
|
||
|
preserved as independent streams. This is useful for
|
||
|
Unix-philosophy apps that have predictable stdout and
|
||
|
noisy stderr, such that each may be measured
|
||
|
independently
|
||
|
"""
|
||
|
|
||
|
def __init__(
|
||
|
self,
|
||
|
charset: str = "utf-8",
|
||
|
env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
|
||
|
echo_stdin: bool = False,
|
||
|
mix_stderr: bool = True,
|
||
|
) -> None:
|
||
|
self.charset = charset
|
||
|
self.env = env or {}
|
||
|
self.echo_stdin = echo_stdin
|
||
|
self.mix_stderr = mix_stderr
|
||
|
|
||
|
def get_default_prog_name(self, cli: "BaseCommand") -> str:
|
||
|
"""Given a command object it will return the default program name
|
||
|
for it. The default is the `name` attribute or ``"root"`` if not
|
||
|
set.
|
||
|
"""
|
||
|
return cli.name or "root"
|
||
|
|
||
|
def make_env(
|
||
|
self, overrides: t.Optional[t.Mapping[str, t.Optional[str]]] = None
|
||
|
) -> t.Mapping[str, t.Optional[str]]:
|
||
|
"""Returns the environment overrides for invoking a script."""
|
||
|
rv = dict(self.env)
|
||
|
if overrides:
|
||
|
rv.update(overrides)
|
||
|
return rv
|
||
|
|
||
|
@contextlib.contextmanager
|
||
|
def isolation(
|
||
|
self,
|
||
|
input: t.Optional[t.Union[str, bytes, t.IO]] = None,
|
||
|
env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
|
||
|
color: bool = False,
|
||
|
) -> t.Iterator[t.Tuple[io.BytesIO, t.Optional[io.BytesIO]]]:
|
||
|
"""A context manager that sets up the isolation for invoking of a
|
||
|
command line tool. This sets up stdin with the given input data
|
||
|
and `os.environ` with the overrides from the given dictionary.
|
||
|
This also rebinds some internals in Click to be mocked (like the
|
||
|
prompt functionality).
|
||
|
|
||
|
This is automatically done in the :meth:`invoke` method.
|
||
|
|
||
|
:param input: the input stream to put into sys.stdin.
|
||
|
:param env: the environment overrides as dictionary.
|
||
|
:param color: whether the output should contain color codes. The
|
||
|
application can still override this explicitly.
|
||
|
|
||
|
.. versionchanged:: 8.0
|
||
|
``stderr`` is opened with ``errors="backslashreplace"``
|
||
|
instead of the default ``"strict"``.
|
||
|
|
||
|
.. versionchanged:: 4.0
|
||
|
Added the ``color`` parameter.
|
||
|
"""
|
||
|
bytes_input = make_input_stream(input, self.charset)
|
||
|
echo_input = None
|
||
|
|
||
|
old_stdin = sys.stdin
|
||
|
old_stdout = sys.stdout
|
||
|
old_stderr = sys.stderr
|
||
|
old_forced_width = formatting.FORCED_WIDTH
|
||
|
formatting.FORCED_WIDTH = 80
|
||
|
|
||
|
env = self.make_env(env)
|
||
|
|
||
|
bytes_output = io.BytesIO()
|
||
|
|
||
|
if self.echo_stdin:
|
||
|
bytes_input = echo_input = t.cast(
|
||
|
t.BinaryIO, EchoingStdin(bytes_input, bytes_output)
|
||
|
)
|
||
|
|
||
|
sys.stdin = text_input = _NamedTextIOWrapper(
|
||
|
bytes_input, encoding=self.charset, name="<stdin>", mode="r"
|
||
|
)
|
||
|
|
||
|
if self.echo_stdin:
|
||
|
# Force unbuffered reads, otherwise TextIOWrapper reads a
|
||
|
# large chunk which is echoed early.
|
||
|
text_input._CHUNK_SIZE = 1 # type: ignore
|
||
|
|
||
|
sys.stdout = _NamedTextIOWrapper(
|
||
|
bytes_output, encoding=self.charset, name="<stdout>", mode="w"
|
||
|
)
|
||
|
|
||
|
bytes_error = None
|
||
|
if self.mix_stderr:
|
||
|
sys.stderr = sys.stdout
|
||
|
else:
|
||
|
bytes_error = io.BytesIO()
|
||
|
sys.stderr = _NamedTextIOWrapper(
|
||
|
bytes_error,
|
||
|
encoding=self.charset,
|
||
|
name="<stderr>",
|
||
|
mode="w",
|
||
|
errors="backslashreplace",
|
||
|
)
|
||
|
|
||
|
@_pause_echo(echo_input) # type: ignore
|
||
|
def visible_input(prompt: t.Optional[str] = None) -> str:
|
||
|
sys.stdout.write(prompt or "")
|
||
|
val = text_input.readline().rstrip("\r\n")
|
||
|
sys.stdout.write(f"{val}\n")
|
||
|
sys.stdout.flush()
|
||
|
return val
|
||
|
|
||
|
@_pause_echo(echo_input) # type: ignore
|
||
|
def hidden_input(prompt: t.Optional[str] = None) -> str:
|
||
|
sys.stdout.write(f"{prompt or ''}\n")
|
||
|
sys.stdout.flush()
|
||
|
return text_input.readline().rstrip("\r\n")
|
||
|
|
||
|
@_pause_echo(echo_input) # type: ignore
|
||
|
def _getchar(echo: bool) -> str:
|
||
|
char = sys.stdin.read(1)
|
||
|
|
||
|
if echo:
|
||
|
sys.stdout.write(char)
|
||
|
|
||
|
sys.stdout.flush()
|
||
|
return char
|
||
|
|
||
|
default_color = color
|
||
|
|
||
|
def should_strip_ansi(
|
||
|
stream: t.Optional[t.IO] = None, color: t.Optional[bool] = None
|
||
|
) -> bool:
|
||
|
if color is None:
|
||
|
return not default_color
|
||
|
return not color
|
||
|
|
||
|
old_visible_prompt_func = termui.visible_prompt_func
|
||
|
old_hidden_prompt_func = termui.hidden_prompt_func
|
||
|
old__getchar_func = termui._getchar
|
||
|
old_should_strip_ansi = utils.should_strip_ansi # type: ignore
|
||
|
termui.visible_prompt_func = visible_input
|
||
|
termui.hidden_prompt_func = hidden_input
|
||
|
termui._getchar = _getchar
|
||
|
utils.should_strip_ansi = should_strip_ansi # type: ignore
|
||
|
|
||
|
old_env = {}
|
||
|
try:
|
||
|
for key, value in env.items():
|
||
|
old_env[key] = os.environ.get(key)
|
||
|
if value is None:
|
||
|
try:
|
||
|
del os.environ[key]
|
||
|
except Exception:
|
||
|
pass
|
||
|
else:
|
||
|
os.environ[key] = value
|
||
|
yield (bytes_output, bytes_error)
|
||
|
finally:
|
||
|
for key, value in old_env.items():
|
||
|
if value is None:
|
||
|
try:
|
||
|
del os.environ[key]
|
||
|
except Exception:
|
||
|
pass
|
||
|
else:
|
||
|
os.environ[key] = value
|
||
|
sys.stdout = old_stdout
|
||
|
sys.stderr = old_stderr
|
||
|
sys.stdin = old_stdin
|
||
|
termui.visible_prompt_func = old_visible_prompt_func
|
||
|
termui.hidden_prompt_func = old_hidden_prompt_func
|
||
|
termui._getchar = old__getchar_func
|
||
|
utils.should_strip_ansi = old_should_strip_ansi # type: ignore
|
||
|
formatting.FORCED_WIDTH = old_forced_width
|
||
|
|
||
|
def invoke(
|
||
|
self,
|
||
|
cli: "BaseCommand",
|
||
|
args: t.Optional[t.Union[str, t.Sequence[str]]] = None,
|
||
|
input: t.Optional[t.Union[str, bytes, t.IO]] = None,
|
||
|
env: t.Optional[t.Mapping[str, t.Optional[str]]] = None,
|
||
|
catch_exceptions: bool = True,
|
||
|
color: bool = False,
|
||
|
**extra: t.Any,
|
||
|
) -> Result:
|
||
|
"""Invokes a command in an isolated environment. The arguments are
|
||
|
forwarded directly to the command line script, the `extra` keyword
|
||
|
arguments are passed to the :meth:`~clickpkg.Command.main` function of
|
||
|
the command.
|
||
|
|
||
|
This returns a :class:`Result` object.
|
||
|
|
||
|
:param cli: the command to invoke
|
||
|
:param args: the arguments to invoke. It may be given as an iterable
|
||
|
or a string. When given as string it will be interpreted
|
||
|
as a Unix shell command. More details at
|
||
|
:func:`shlex.split`.
|
||
|
:param input: the input data for `sys.stdin`.
|
||
|
:param env: the environment overrides.
|
||
|
:param catch_exceptions: Whether to catch any other exceptions than
|
||
|
``SystemExit``.
|
||
|
:param extra: the keyword arguments to pass to :meth:`main`.
|
||
|
:param color: whether the output should contain color codes. The
|
||
|
application can still override this explicitly.
|
||
|
|
||
|
.. versionchanged:: 8.0
|
||
|
The result object has the ``return_value`` attribute with
|
||
|
the value returned from the invoked command.
|
||
|
|
||
|
.. versionchanged:: 4.0
|
||
|
Added the ``color`` parameter.
|
||
|
|
||
|
.. versionchanged:: 3.0
|
||
|
Added the ``catch_exceptions`` parameter.
|
||
|
|
||
|
.. versionchanged:: 3.0
|
||
|
The result object has the ``exc_info`` attribute with the
|
||
|
traceback if available.
|
||
|
"""
|
||
|
exc_info = None
|
||
|
with self.isolation(input=input, env=env, color=color) as outstreams:
|
||
|
return_value = None
|
||
|
exception: t.Optional[BaseException] = None
|
||
|
exit_code = 0
|
||
|
|
||
|
if isinstance(args, str):
|
||
|
args = shlex.split(args)
|
||
|
|
||
|
try:
|
||
|
prog_name = extra.pop("prog_name")
|
||
|
except KeyError:
|
||
|
prog_name = self.get_default_prog_name(cli)
|
||
|
|
||
|
try:
|
||
|
return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
|
||
|
except SystemExit as e:
|
||
|
exc_info = sys.exc_info()
|
||
|
e_code = t.cast(t.Optional[t.Union[int, t.Any]], e.code)
|
||
|
|
||
|
if e_code is None:
|
||
|
e_code = 0
|
||
|
|
||
|
if e_code != 0:
|
||
|
exception = e
|
||
|
|
||
|
if not isinstance(e_code, int):
|
||
|
sys.stdout.write(str(e_code))
|
||
|
sys.stdout.write("\n")
|
||
|
e_code = 1
|
||
|
|
||
|
exit_code = e_code
|
||
|
|
||
|
except Exception as e:
|
||
|
if not catch_exceptions:
|
||
|
raise
|
||
|
exception = e
|
||
|
exit_code = 1
|
||
|
exc_info = sys.exc_info()
|
||
|
finally:
|
||
|
sys.stdout.flush()
|
||
|
stdout = outstreams[0].getvalue()
|
||
|
if self.mix_stderr:
|
||
|
stderr = None
|
||
|
else:
|
||
|
stderr = outstreams[1].getvalue() # type: ignore
|
||
|
|
||
|
return Result(
|
||
|
runner=self,
|
||
|
stdout_bytes=stdout,
|
||
|
stderr_bytes=stderr,
|
||
|
return_value=return_value,
|
||
|
exit_code=exit_code,
|
||
|
exception=exception,
|
||
|
exc_info=exc_info, # type: ignore
|
||
|
)
|
||
|
|
||
|
@contextlib.contextmanager
|
||
|
def isolated_filesystem(
|
||
|
self, temp_dir: t.Optional[t.Union[str, os.PathLike]] = None
|
||
|
) -> t.Iterator[str]:
|
||
|
"""A context manager that creates a temporary directory and
|
||
|
changes the current working directory to it. This isolates tests
|
||
|
that affect the contents of the CWD to prevent them from
|
||
|
interfering with each other.
|
||
|
|
||
|
:param temp_dir: Create the temporary directory under this
|
||
|
directory. If given, the created directory is not removed
|
||
|
when exiting.
|
||
|
|
||
|
.. versionchanged:: 8.0
|
||
|
Added the ``temp_dir`` parameter.
|
||
|
"""
|
||
|
cwd = os.getcwd()
|
||
|
dt = tempfile.mkdtemp(dir=temp_dir) # type: ignore[type-var]
|
||
|
os.chdir(dt)
|
||
|
|
||
|
try:
|
||
|
yield t.cast(str, dt)
|
||
|
finally:
|
||
|
os.chdir(cwd)
|
||
|
|
||
|
if temp_dir is None:
|
||
|
try:
|
||
|
shutil.rmtree(dt)
|
||
|
except OSError: # noqa: B014
|
||
|
pass
|