# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
import re
import sys
import os
from .ansi
import AnsiFore, AnsiBack, AnsiStyle, Style, BEL
from .winterm
import enable_vt_processing, WinTerm, WinColor, WinStyle
from .win32
import windll, winapi_test
winterm =
None
if windll
is not None:
winterm = WinTerm()
class StreamWrapper(object):
'''
Wraps a stream (such
as stdout), acting
as a transparent proxy
for all
attribute access apart
from method
'write()', which
is delegated to our
Converter instance.
'''
def __init__(self, wrapped, converter):
# double-underscore everything to prevent clashes with names of
# attributes on the wrapped stream object.
self.__wrapped = wrapped
self.__convertor = converter
def __getattr__(self, name):
return getattr(self.__wrapped, name)
def __enter__(self, *args, **kwargs):
# special method lookup bypasses __getattr__/__getattribute__, see
# https://stackoverflow.com/questions/12632894/why-doesnt-getattr-work-with-exit
# thus, contextlib magic methods are not proxied via __getattr__
return self.__wrapped.__enter__(*args, **kwargs)
def __exit__(self, *args, **kwargs):
return self.__wrapped.__exit__(*args, **kwargs)
def __setstate__(self, state):
self.__dict__ = state
def __getstate__(self):
return self.__dict__
def write(self, text):
self.__convertor.write(text)
def isatty(self):
stream = self.__wrapped
if 'PYCHARM_HOSTED' in os.environ:
if stream
is not None and (stream
is sys.__stdout__
or stream
is sys.__stderr__):
return True
try:
stream_isatty = stream.isatty
except AttributeError:
return False
else:
return stream_isatty()
@property
def closed(self):
stream = self.__wrapped
try:
return stream.closed
# AttributeError in the case that the stream doesn't support being closed
# ValueError for the case that the stream has already been detached when atexit runs
except (AttributeError, ValueError):
return True
class AnsiToWin32(object):
'''
Implements a
'write()' method which, on Windows, will strip ANSI character
sequences
from the text,
and if outputting to a tty, will convert them into
win32 function calls.
'''
ANSI_CSI_RE = re.compile(
'\001?\033\\[((?:\\d|;)*)([a-zA-Z])\002?')
# Control Sequence Introducer
ANSI_OSC_RE = re.compile(
'\001?\033\\]([^\a]*)(\a)\002?')
# Operating System Command
def __init__(self, wrapped, convert=
None, strip=
None, autoreset=
False):
# The wrapped stream (normally sys.stdout or sys.stderr)
self.wrapped = wrapped
# should we reset colors to defaults after every .write()
self.autoreset = autoreset
# create the proxy wrapping our output stream
self.stream = StreamWrapper(wrapped, self)
on_windows = os.name ==
'nt'
# We test if the WinAPI works, because even if we are on Windows
# we may be using a terminal that doesn't support the WinAPI
# (e.g. Cygwin Terminal). In this case it's up to the terminal
# to support the ANSI codes.
conversion_supported = on_windows
and winapi_test()
try:
fd = wrapped.fileno()
except Exception:
fd = -1
system_has_native_ansi =
not on_windows
or enable_vt_processing(fd)
have_tty =
not self.stream.closed
and self.stream.isatty()
need_conversion = conversion_supported
and not system_has_native_ansi
# should we strip ANSI sequences from our output?
if strip
is None:
strip = need_conversion
or not have_tty
self.strip = strip
# should we should convert ANSI sequences into win32 calls?
if convert
is None:
convert = need_conversion
and have_tty
self.convert = convert
# dict of ansi codes to win32 functions and parameters
self.win32_calls = self.get_win32_calls()
# are we wrapping stderr?
self.on_stderr = self.wrapped
is sys.stderr
def should_wrap(self):
'''
True if this
class is actually needed.
If false, then the output
stream will
not be affected, nor will win32 calls be issued, so
wrapping stdout
is not actually required. This will generally be
False on non-Windows platforms, unless optional functionality like
autoreset has been requested using kwargs to init()
'''
return self.convert
or self.strip
or self.autoreset
def get_win32_calls(self):
if self.convert
and winterm:
return {
AnsiStyle.RESET_ALL: (winterm.reset_all, ),
AnsiStyle.BRIGHT: (winterm.style, WinStyle.BRIGHT),
AnsiStyle.DIM: (winterm.style, WinStyle.NORMAL),
AnsiStyle.NORMAL: (winterm.style, WinStyle.NORMAL),
AnsiFore.BLACK: (winterm.fore, WinColor.BLACK),
AnsiFore.RED: (winterm.fore, WinColor.RED),
AnsiFore.GREEN: (winterm.fore, WinColor.GREEN),
AnsiFore.YELLOW: (winterm.fore, WinColor.YELLOW),
AnsiFore.BLUE: (winterm.fore, WinColor.BLUE),
AnsiFore.MAGENTA: (winterm.fore, WinColor.MAGENTA),
AnsiFore.CYAN: (winterm.fore, WinColor.CYAN),
AnsiFore.WHITE: (winterm.fore, WinColor.GREY),
AnsiFore.RESET: (winterm.fore, ),
AnsiFore.LIGHTBLACK_EX: (winterm.fore, WinColor.BLACK,
True),
AnsiFore.LIGHTRED_EX: (winterm.fore, WinColor.RED,
True),
AnsiFore.LIGHTGREEN_EX: (winterm.fore, WinColor.GREEN,
True),
AnsiFore.LIGHTYELLOW_EX: (winterm.fore, WinColor.YELLOW,
True),
AnsiFore.LIGHTBLUE_EX: (winterm.fore, WinColor.BLUE,
True),
AnsiFore.LIGHTMAGENTA_EX: (winterm.fore, WinColor.MAGENTA,
True),
AnsiFore.LIGHTCYAN_EX: (winterm.fore, WinColor.CYAN,
True),
AnsiFore.LIGHTWHITE_EX: (winterm.fore, WinColor.GREY,
True),
AnsiBack.BLACK: (winterm.back, WinColor.BLACK),
AnsiBack.RED: (winterm.back, WinColor.RED),
AnsiBack.GREEN: (winterm.back, WinColor.GREEN),
AnsiBack.YELLOW: (winterm.back, WinColor.YELLOW),
AnsiBack.BLUE: (winterm.back, WinColor.BLUE),
AnsiBack.MAGENTA: (winterm.back, WinColor.MAGENTA),
AnsiBack.CYAN: (winterm.back, WinColor.CYAN),
AnsiBack.WHITE: (winterm.back, WinColor.GREY),
AnsiBack.RESET: (winterm.back, ),
AnsiBack.LIGHTBLACK_EX: (winterm.back, WinColor.BLACK,
True),
AnsiBack.LIGHTRED_EX: (winterm.back, WinColor.RED,
True),
AnsiBack.LIGHTGREEN_EX: (winterm.back, WinColor.GREEN,
True),
AnsiBack.LIGHTYELLOW_EX: (winterm.back, WinColor.YELLOW,
True),
AnsiBack.LIGHTBLUE_EX: (winterm.back, WinColor.BLUE,
True),
AnsiBack.LIGHTMAGENTA_EX: (winterm.back, WinColor.MAGENTA,
True),
AnsiBack.LIGHTCYAN_EX: (winterm.back, WinColor.CYAN,
True),
AnsiBack.LIGHTWHITE_EX: (winterm.back, WinColor.GREY,
True),
}
return dict()
def write(self, text):
if self.strip
or self.convert:
self.write_and_convert(text)
else:
self.wrapped.write(text)
self.wrapped.flush()
if self.autoreset:
self.reset_all()
def reset_all(self):
if self.convert:
self.call_win32(
'm', (0,))
elif not self.strip
and not self.stream.closed:
self.wrapped.write(Style.RESET_ALL)
def write_and_convert(self, text):
'''
Write the given text to our wrapped stream, stripping any ANSI
sequences
from the text,
and optionally converting them into win32
calls.
'''
cursor = 0
text = self.convert_osc(text)
for match
in self.ANSI_CSI_RE.finditer(text):
start, end = match.span()
self.write_plain_text(text, cursor, start)
self.convert_ansi(*match.groups())
cursor = end
self.write_plain_text(text, cursor, len(text))
def write_plain_text(self, text, start, end):
if start < end:
self.wrapped.write(text[start:end])
self.wrapped.flush()
def convert_ansi(self, paramstring, command):
if self.convert:
params = self.extract_params(command, paramstring)
self.call_win32(command, params)
def extract_params(self, command, paramstring):
if command
in 'Hf':
params = tuple(int(p)
if len(p) != 0
else 1
for p
in paramstring.split(
';'))
while len(params) < 2:
# defaults:
params = params + (1,)
else:
params = tuple(int(p)
for p
in paramstring.split(
';')
if len(p) != 0)
if len(params) == 0:
# defaults:
if command
in 'JKm':
params = (0,)
elif command
in 'ABCD':
params = (1,)
return params
def call_win32(self, command, params):
if command ==
'm':
for param
in params:
if param
in self.win32_calls:
func_args = self.win32_calls[param]
func = func_args[0]
args = func_args[1:]
kwargs = dict(on_stderr=self.on_stderr)
func(*args, **kwargs)
elif command
in 'J':
winterm.erase_screen(params[0], on_stderr=self.on_stderr)
elif command
in 'K':
winterm.erase_line(params[0], on_stderr=self.on_stderr)
elif command
in 'Hf':
# cursor position - absolute
winterm.set_cursor_position(params, on_stderr=self.on_stderr)
elif command
in 'ABCD':
# cursor position - relative
n = params[0]
# A - up, B - down, C - forward, D - back
x, y = {
'A': (0, -n),
'B': (0, n),
'C': (n, 0),
'D': (-n, 0)}[command]
winterm.cursor_adjust(x, y, on_stderr=self.on_stderr)
def convert_osc(self, text):
for match
in self.ANSI_OSC_RE.finditer(text):
start, end = match.span()
text = text[:start] + text[end:]
paramstring, command = match.groups()
if command == BEL:
if paramstring.count(
";") == 1:
params = paramstring.split(
";")
# 0 - change title and icon (we will only change title)
# 1 - change icon (we don't support this)
# 2 - change title
if params[0]
in '02':
winterm.set_title(params[1])
return text
def flush(self):
self.wrapped.flush()