for stream_name in ('stdout', 'stderr'):
stream = getattr(error, stream_name) if stream:
log(stream.decode() if isinstance(stream, bytes) else stream, origin=('subprocess', stream_name))
def log_stream(stream_name: str, stream: typing.IO[str]) -> None: for line in stream:
log(line, origin=('subprocess', stream_name))
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor, subprocess.Popen(
cmd, encoding='utf-8', env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE
) as process:
log(subprocess.list2cmdline(cmd), origin=('subprocess', 'cmd'))
# Logging in sub-thread to more-or-less ensure order of stdout and stderr whilst also # being able to distinguish between the two.
concurrent.futures.wait(
[executor.submit(partial(log_stream, n, getattr(process, n))) for n in ('stdout', 'stderr')]
)
code = process.wait() if code: raise subprocess.CalledProcessError(code, process.args)
if typing.TYPE_CHECKING:
log: _Logger
verbosity: bool
else:
def __getattr__(name): if name == 'log': return LOGGER.get() elif name == 'verbosity': return VERBOSITY.get() raise AttributeError(name) # pragma: no cover
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.