You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
610 lines
16 KiB
Python
610 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
|
# pylint: disable=unsubscriptable-object
|
|
"""Beautiful terminal spinners in Python.
|
|
"""
|
|
from __future__ import absolute_import, unicode_literals
|
|
|
|
import atexit
|
|
import functools
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
import halo.cursor as cursor
|
|
|
|
from log_symbols.symbols import LogSymbols
|
|
from spinners.spinners import Spinners
|
|
|
|
from halo._utils import (
|
|
colored_frame,
|
|
decode_utf_8_text,
|
|
get_environment,
|
|
get_terminal_columns,
|
|
is_supported,
|
|
is_text_type,
|
|
encode_utf_8_text,
|
|
)
|
|
|
|
|
|
class Halo(object):
|
|
"""Halo library.
|
|
Attributes
|
|
----------
|
|
CLEAR_LINE : str
|
|
Code to clear the line
|
|
"""
|
|
|
|
CLEAR_LINE = "\033[K"
|
|
SPINNER_PLACEMENTS = (
|
|
"left",
|
|
"right",
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
text="",
|
|
color="cyan",
|
|
text_color=None,
|
|
spinner=None,
|
|
animation=None,
|
|
placement="left",
|
|
interval=-1,
|
|
enabled=True,
|
|
stream=sys.stdout,
|
|
):
|
|
"""Constructs the Halo object.
|
|
Parameters
|
|
----------
|
|
text : str, optional
|
|
Text to display.
|
|
text_color : str, optional
|
|
Color of the text.
|
|
color : str, optional
|
|
Color of the text to display.
|
|
spinner : str|dict, optional
|
|
String or dictionary representing spinner. String can be one of 60+ spinners
|
|
supported.
|
|
animation: str, optional
|
|
Animation to apply if text is too large. Can be one of `bounce`, `marquee`.
|
|
Defaults to ellipses.
|
|
placement: str, optional
|
|
Side of the text to place the spinner on. Can be `left` or `right`.
|
|
Defaults to `left`.
|
|
interval : integer, optional
|
|
Interval between each frame of the spinner in milliseconds.
|
|
enabled : boolean, optional
|
|
Spinner enabled or not.
|
|
stream : io, optional
|
|
Output.
|
|
"""
|
|
self._color = color
|
|
self._animation = animation
|
|
|
|
self.spinner = spinner
|
|
self.text = text
|
|
self._text_color = text_color
|
|
|
|
self._interval = (
|
|
int(interval) if int(interval) > 0 else self._spinner["interval"]
|
|
)
|
|
self._stream = stream
|
|
|
|
self.placement = placement
|
|
self._frame_index = 0
|
|
self._text_index = 0
|
|
self._spinner_thread = None
|
|
self._stop_spinner = None
|
|
self._spinner_id = None
|
|
self.enabled = enabled
|
|
|
|
environment = get_environment()
|
|
|
|
def clean_up():
|
|
"""Handle cell execution"""
|
|
self.stop()
|
|
|
|
if environment in ("ipython", "jupyter"):
|
|
from IPython import get_ipython
|
|
|
|
ip = get_ipython()
|
|
ip.events.register("post_run_cell", clean_up)
|
|
else: # default terminal
|
|
atexit.register(clean_up)
|
|
|
|
def __enter__(self):
|
|
"""Starts the spinner on a separate thread. For use in context managers.
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
return self.start()
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
"""Stops the spinner. For use in context managers."""
|
|
self.stop()
|
|
|
|
def __call__(self, f):
|
|
"""Allow the Halo object to be used as a regular function decorator."""
|
|
|
|
@functools.wraps(f)
|
|
def wrapped(*args, **kwargs):
|
|
with self:
|
|
return f(*args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
@property
|
|
def spinner(self):
|
|
"""Getter for spinner property.
|
|
Returns
|
|
-------
|
|
dict
|
|
spinner value
|
|
"""
|
|
return self._spinner
|
|
|
|
@spinner.setter
|
|
def spinner(self, spinner=None):
|
|
"""Setter for spinner property.
|
|
Parameters
|
|
----------
|
|
spinner : dict, str
|
|
Defines the spinner value with frame and interval
|
|
"""
|
|
|
|
self._spinner = self._get_spinner(spinner)
|
|
self._frame_index = 0
|
|
self._text_index = 0
|
|
|
|
@property
|
|
def text(self):
|
|
"""Getter for text property.
|
|
Returns
|
|
-------
|
|
str
|
|
text value
|
|
"""
|
|
return self._text["original"]
|
|
|
|
@text.setter
|
|
def text(self, text):
|
|
"""Setter for text property.
|
|
Parameters
|
|
----------
|
|
text : str
|
|
Defines the text value for spinner
|
|
"""
|
|
self._text = self._get_text(text)
|
|
|
|
@property
|
|
def text_color(self):
|
|
"""Getter for text color property.
|
|
Returns
|
|
-------
|
|
str
|
|
text color value
|
|
"""
|
|
return self._text_color
|
|
|
|
@text_color.setter
|
|
def text_color(self, text_color):
|
|
"""Setter for text color property.
|
|
Parameters
|
|
----------
|
|
text_color : str
|
|
Defines the text color value for spinner
|
|
"""
|
|
self._text_color = text_color
|
|
|
|
@property
|
|
def color(self):
|
|
"""Getter for color property.
|
|
Returns
|
|
-------
|
|
str
|
|
color value
|
|
"""
|
|
return self._color
|
|
|
|
@color.setter
|
|
def color(self, color):
|
|
"""Setter for color property.
|
|
Parameters
|
|
----------
|
|
color : str
|
|
Defines the color value for spinner
|
|
"""
|
|
self._color = color
|
|
|
|
@property
|
|
def placement(self):
|
|
"""Getter for placement property.
|
|
Returns
|
|
-------
|
|
str
|
|
spinner placement
|
|
"""
|
|
return self._placement
|
|
|
|
@placement.setter
|
|
def placement(self, placement):
|
|
"""Setter for placement property.
|
|
Parameters
|
|
----------
|
|
placement: str
|
|
Defines the placement of the spinner
|
|
"""
|
|
if placement not in self.SPINNER_PLACEMENTS:
|
|
raise ValueError(
|
|
"Unknown spinner placement '{0}', available are {1}".format(
|
|
placement, self.SPINNER_PLACEMENTS
|
|
)
|
|
)
|
|
self._placement = placement
|
|
|
|
@property
|
|
def spinner_id(self):
|
|
"""Getter for spinner id
|
|
Returns
|
|
-------
|
|
str
|
|
Spinner id value
|
|
"""
|
|
return self._spinner_id
|
|
|
|
@property
|
|
def animation(self):
|
|
"""Getter for animation property.
|
|
Returns
|
|
-------
|
|
str
|
|
Spinner animation
|
|
"""
|
|
return self._animation
|
|
|
|
@animation.setter
|
|
def animation(self, animation):
|
|
"""Setter for animation property.
|
|
Parameters
|
|
----------
|
|
animation: str
|
|
Defines the animation of the spinner
|
|
"""
|
|
self._animation = animation
|
|
self._text = self._get_text(self._text["original"])
|
|
|
|
def _check_stream(self):
|
|
"""Returns whether the stream is open, and if applicable, writable
|
|
Returns
|
|
-------
|
|
bool
|
|
Whether the stream is open
|
|
"""
|
|
if self._stream.closed:
|
|
return False
|
|
|
|
try:
|
|
# Attribute access kept separate from invocation, to avoid
|
|
# swallowing AttributeErrors from the call which should bubble up.
|
|
check_stream_writable = self._stream.writable
|
|
except AttributeError:
|
|
pass
|
|
else:
|
|
return check_stream_writable()
|
|
|
|
return True
|
|
|
|
def _write(self, s):
|
|
"""Write to the stream, if writable
|
|
Parameters
|
|
----------
|
|
s : str
|
|
Characters to write to the stream
|
|
"""
|
|
if self._check_stream():
|
|
self._stream.write(s)
|
|
|
|
def _hide_cursor(self):
|
|
"""Disable the user's blinking cursor
|
|
"""
|
|
if self._check_stream() and self._stream.isatty():
|
|
cursor.hide(stream=self._stream)
|
|
|
|
def _show_cursor(self):
|
|
"""Re-enable the user's blinking cursor
|
|
"""
|
|
if self._check_stream() and self._stream.isatty():
|
|
cursor.show(stream=self._stream)
|
|
|
|
def _get_spinner(self, spinner):
|
|
"""Extracts spinner value from options and returns value
|
|
containing spinner frames and interval, defaults to 'dots' spinner.
|
|
Parameters
|
|
----------
|
|
spinner : dict, str
|
|
Contains spinner value or type of spinner to be used
|
|
Returns
|
|
-------
|
|
dict
|
|
Contains frames and interval defining spinner
|
|
"""
|
|
default_spinner = Spinners["dots"].value
|
|
|
|
if spinner and type(spinner) == dict:
|
|
return spinner
|
|
|
|
if is_supported():
|
|
if all([is_text_type(spinner), spinner in Spinners.__members__]):
|
|
return Spinners[spinner].value
|
|
else:
|
|
return default_spinner
|
|
else:
|
|
return Spinners["line"].value
|
|
|
|
def _get_text(self, text):
|
|
"""Creates frames based on the selected animation
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
animation = self._animation
|
|
stripped_text = text.strip()
|
|
|
|
# Check which frame of the animation is the widest
|
|
max_spinner_length = max([len(i) for i in self._spinner["frames"]])
|
|
|
|
# Subtract to the current terminal size the max spinner length
|
|
# (-1 to leave room for the extra space between spinner and text)
|
|
terminal_width = get_terminal_columns() - max_spinner_length - 1
|
|
text_length = len(stripped_text)
|
|
|
|
frames = []
|
|
|
|
if terminal_width < text_length and animation:
|
|
if animation == "bounce":
|
|
"""
|
|
Make the text bounce back and forth
|
|
"""
|
|
for x in range(0, text_length - terminal_width + 1):
|
|
frames.append(stripped_text[x : terminal_width + x])
|
|
frames.extend(list(reversed(frames)))
|
|
elif "marquee":
|
|
"""
|
|
Make the text scroll like a marquee
|
|
"""
|
|
stripped_text = stripped_text + " " + stripped_text[:terminal_width]
|
|
for x in range(0, text_length + 1):
|
|
frames.append(stripped_text[x : terminal_width + x])
|
|
elif terminal_width < text_length and not animation:
|
|
# Add ellipsis if text is larger than terminal width and no animation was specified
|
|
frames = [stripped_text[: terminal_width - 6] + " (...)"]
|
|
else:
|
|
frames = [stripped_text]
|
|
|
|
return {"original": text, "frames": frames}
|
|
|
|
def clear(self):
|
|
"""Clears the line and returns cursor to the start.
|
|
of line
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
self._write("\r")
|
|
self._write(self.CLEAR_LINE)
|
|
return self
|
|
|
|
def _render_frame(self):
|
|
"""Renders the frame on the line after clearing it.
|
|
"""
|
|
if not self.enabled:
|
|
# in case we're disabled or stream is closed while still rendering,
|
|
# we render the frame and increment the frame index, so the proper
|
|
# frame is rendered if we're reenabled or the stream opens again.
|
|
return
|
|
|
|
self.clear()
|
|
frame = self.frame()
|
|
output = "\r{}".format(frame)
|
|
try:
|
|
self._write(output)
|
|
except UnicodeEncodeError:
|
|
self._write(encode_utf_8_text(output))
|
|
|
|
def render(self):
|
|
"""Runs the render until thread flag is set.
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
while not self._stop_spinner.is_set():
|
|
self._render_frame()
|
|
time.sleep(0.001 * self._interval)
|
|
|
|
return self
|
|
|
|
def frame(self):
|
|
"""Builds and returns the frame to be rendered
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
frames = self._spinner["frames"]
|
|
frame = frames[self._frame_index]
|
|
|
|
if self._color:
|
|
frame = colored_frame(frame, self._color)
|
|
|
|
self._frame_index += 1
|
|
self._frame_index = self._frame_index % len(frames)
|
|
|
|
text_frame = self.text_frame()
|
|
return "{0} {1}".format(
|
|
*[
|
|
(text_frame, frame)
|
|
if self._placement == "right"
|
|
else (frame, text_frame)
|
|
][0]
|
|
)
|
|
|
|
def text_frame(self):
|
|
"""Builds and returns the text frame to be rendered
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
if len(self._text["frames"]) == 1:
|
|
if self._text_color:
|
|
return colored_frame(self._text["frames"][0], self._text_color)
|
|
|
|
# Return first frame (can't return original text because at this point it might be ellipsed)
|
|
return self._text["frames"][0]
|
|
|
|
frames = self._text["frames"]
|
|
frame = frames[self._text_index]
|
|
|
|
self._text_index += 1
|
|
self._text_index = self._text_index % len(frames)
|
|
|
|
if self._text_color:
|
|
return colored_frame(frame, self._text_color)
|
|
|
|
return frame
|
|
|
|
def start(self, text=None):
|
|
"""Starts the spinner on a separate thread.
|
|
Parameters
|
|
----------
|
|
text : None, optional
|
|
Text to be used alongside spinner
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
if text is not None:
|
|
self.text = text
|
|
|
|
if self._spinner_id is not None:
|
|
return self
|
|
|
|
if not (self.enabled and self._check_stream()):
|
|
return self
|
|
|
|
self._hide_cursor()
|
|
|
|
self._stop_spinner = threading.Event()
|
|
self._spinner_thread = threading.Thread(target=self.render)
|
|
self._spinner_thread.setDaemon(True)
|
|
self._render_frame()
|
|
self._spinner_id = self._spinner_thread.name
|
|
self._spinner_thread.start()
|
|
|
|
return self
|
|
|
|
def stop(self):
|
|
"""Stops the spinner and clears the line.
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
if self._spinner_thread and self._spinner_thread.is_alive():
|
|
self._stop_spinner.set()
|
|
self._spinner_thread.join()
|
|
|
|
if self.enabled:
|
|
self.clear()
|
|
|
|
self._frame_index = 0
|
|
self._spinner_id = None
|
|
self._show_cursor()
|
|
return self
|
|
|
|
def succeed(self, text=None):
|
|
"""Shows and persists success symbol and text and exits.
|
|
Parameters
|
|
----------
|
|
text : None, optional
|
|
Text to be shown alongside success symbol.
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
return self.stop_and_persist(symbol=LogSymbols.SUCCESS.value, text=text)
|
|
|
|
def fail(self, text=None):
|
|
"""Shows and persists fail symbol and text and exits.
|
|
Parameters
|
|
----------
|
|
text : None, optional
|
|
Text to be shown alongside fail symbol.
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
return self.stop_and_persist(symbol=LogSymbols.ERROR.value, text=text)
|
|
|
|
def warn(self, text=None):
|
|
"""Shows and persists warn symbol and text and exits.
|
|
Parameters
|
|
----------
|
|
text : None, optional
|
|
Text to be shown alongside warn symbol.
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
return self.stop_and_persist(symbol=LogSymbols.WARNING.value, text=text)
|
|
|
|
def info(self, text=None):
|
|
"""Shows and persists info symbol and text and exits.
|
|
Parameters
|
|
----------
|
|
text : None, optional
|
|
Text to be shown alongside info symbol.
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
return self.stop_and_persist(symbol=LogSymbols.INFO.value, text=text)
|
|
|
|
def stop_and_persist(self, symbol=" ", text=None):
|
|
"""Stops the spinner and persists the final frame to be shown.
|
|
Parameters
|
|
----------
|
|
symbol : str, optional
|
|
Symbol to be shown in final frame
|
|
text: str, optional
|
|
Text to be shown in final frame
|
|
|
|
Returns
|
|
-------
|
|
self
|
|
"""
|
|
if not self.enabled:
|
|
return self
|
|
|
|
symbol = decode_utf_8_text(symbol)
|
|
|
|
if text is not None:
|
|
text = decode_utf_8_text(text)
|
|
else:
|
|
text = self._text["original"]
|
|
|
|
text = text.strip()
|
|
|
|
if self._text_color:
|
|
text = colored_frame(text, self._text_color)
|
|
|
|
self.stop()
|
|
|
|
output = "{0} {1}\n".format(
|
|
*[(text, symbol) if self._placement == "right" else (symbol, text)][0]
|
|
)
|
|
|
|
try:
|
|
self._write(output)
|
|
except UnicodeEncodeError:
|
|
self._write(encode_utf_8_text(output))
|
|
|
|
return self
|