2 from threading
import Lock
4 from .utils
import supports_terminal_sequences
, write_string
9 'ERASE_LINE': '\033[K',
33 def format_text(text
, f
):
35 @param f String representation of formatting to apply in the form:
36 [style] [light] font_color [on [light] bg_color]
37 E.g. "red", "bold green on light blue"
40 tokens
= f
.strip().split()
44 if tokens
[-1] == 'ON':
45 raise SyntaxError(f
'Empty background format specified in {f!r}')
46 if tokens
[-1] not in _COLORS
:
47 raise SyntaxError(f
'{tokens[-1]} in {f!r} must be a color')
48 bg_color
= f
'4{_COLORS[tokens.pop()]}'
49 if tokens
[-1] == 'LIGHT':
50 bg_color
= f
'0;10{bg_color[1:]}'
52 if tokens
[-1] != 'ON':
53 raise SyntaxError(f
'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
54 bg_color
= f
'\033[{bg_color}m'
59 elif tokens
[-1] not in _COLORS
:
60 raise SyntaxError(f
'{tokens[-1]} in {f!r} must be a color')
62 fg_color
= f
'3{_COLORS[tokens.pop()]}'
63 if tokens
and tokens
[-1] == 'LIGHT':
64 fg_color
= f
'9{fg_color[1:]}'
66 fg_style
= tokens
.pop() if tokens
and tokens
[-1] in _TEXT_STYLES
else 'NORMAL'
67 fg_color
= f
'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
69 raise SyntaxError(f
'Invalid format {" ".join(tokens)!r} in {f!r}')
71 if fg_color
or bg_color
:
72 text
= text
.replace(CONTROL_SEQUENCES
['RESET'], f
'{fg_color}{bg_color}')
73 return f
'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
78 class MultilinePrinterBase
:
79 def __init__(self
, stream
=None, lines
=1):
81 self
.maximum
= lines
- 1
82 self
._HAVE
_FULLCAP
= supports_terminal_sequences(stream
)
87 def __exit__(self
, *args
):
90 def print_at_line(self
, text
, pos
):
96 def _add_line_number(self
, text
, line
):
98 return f
'{line + 1}: {text}'
101 def write(self
, *text
):
102 write_string(''.join(text
), self
.stream
)
105 class QuietMultilinePrinter(MultilinePrinterBase
):
109 class MultilineLogger(MultilinePrinterBase
):
110 def write(self
, *text
):
111 self
.stream
.debug(''.join(text
))
113 def print_at_line(self
, text
, pos
):
114 # stream is the logger object, not an actual stream
115 self
.write(self
._add
_line
_number
(text
, pos
))
118 class BreaklineStatusPrinter(MultilinePrinterBase
):
119 def print_at_line(self
, text
, pos
):
120 self
.write(self
._add
_line
_number
(text
, pos
), '\n')
123 class MultilinePrinter(MultilinePrinterBase
):
124 def __init__(self
, stream
=None, lines
=1, preserve_output
=True):
125 super().__init
__(stream
, lines
)
126 self
.preserve_output
= preserve_output
127 self
._lastline
= self
._lastlength
= 0
128 self
._movelock
= Lock()
131 @functools.wraps(func
)
132 def wrapper(self
, *args
, **kwargs
):
134 return func(self
, *args
, **kwargs
)
137 def _move_cursor(self
, dest
):
138 current
= min(self
._lastline
, self
.maximum
)
140 distance
= dest
- current
142 yield CONTROL_SEQUENCES
['UP'] * -distance
144 yield CONTROL_SEQUENCES
['DOWN'] * distance
145 self
._lastline
= dest
148 def print_at_line(self
, text
, pos
):
149 if self
._HAVE
_FULLCAP
:
150 self
.write(*self
._move
_cursor
(pos
), CONTROL_SEQUENCES
['ERASE_LINE'], text
)
153 text
= self
._add
_line
_number
(text
, pos
)
155 if self
._lastline
== pos
:
156 # move cursor at the start of progress when writing to same line
158 if self
._lastlength
> textlen
:
159 text
+= ' ' * (self
._lastlength
- textlen
)
160 self
._lastlength
= textlen
162 # otherwise, break the line
164 self
._lastlength
= textlen
165 self
.write(prefix
, text
)
170 # move cursor to the end of the last line, and write line break
171 # so that other to_screen calls can precede
172 text
= self
._move
_cursor
(self
.maximum
) if self
._HAVE
_FULLCAP
else []
173 if self
.preserve_output
:
174 self
.write(*text
, '\n')
177 if self
._HAVE
_FULLCAP
:
179 *text
, CONTROL_SEQUENCES
['ERASE_LINE'],
180 f
'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self
.maximum
)
182 self
.write('\r', ' ' * self
._lastlength
, '\r')