over/over/text.py

710 lines
18 KiB
Python

#! /usr/bin/env python3
# encoding: utf-8
# --------------------------------------------------
# Library imports
import datetime
import fcntl
import math
import re
import struct
import sys
import time
import termios
import tzlocal
# --------------------------------------------------
def lexical_join(words, oxford=False):
"""
Joins an iterable of words or sentence fragments into a lexical list:
>>> lexical_join(["this", "that", "one of them too"])
"this, that and one of them too"
>>> lexical_join(["this", "that", "one of them too"], oxford=True)
"this, that, and one of them too"
>>> lexical_join(["this", "that"])
"this and that"
>>> lexical_join(["this"])
"this"
"""
words = list(words)
l = len(words)
if l == 0:
return ""
elif l == 1:
return words[0]
elif l == 2:
return "%s and %s" %(str(words[0]), str(words[1]))
else:
return "%s%s and %s" %(", ".join(str(w) for w in words[:-1]), "," if oxford else "", str(words[-1]))
# --------------------------------------------------
class Unit:
"""
A object that represents numbers and units in human-readable form.
TODO use significant digits instead of rigid order boundaries
TODO float superclass?
TODO base_2 prefixes (Ki, Mi, ...)
"""
_prefixes = (
("Y", 24), ("Z", 21), ("E", 18), ("P", 15), ("T", 12), ("G", 9), ("M", 6), ("k", 3),
("h", 2), ("D", 1), ("", 0), ("d", -1), ("c", -2), ("m", -3), ("μ", -6), ("n", -9),
("p", -12), ("f", -15), ("a", -18), ("z", -21), ("y", -24)
)
def __init__(self,
value, unit=None, dimension=1,
use_prefixes="YZEPTGMkmμnpfazy", format="%.2f pU",
logarithmic=False, log_base=10
):
"""
value the numerical value of the variable (int or float)
unit the symbol to use, if any (str or None)
dimension the dimensionality of the value (1, 2, 3, ...)
use_prefixes which multiplier prefixes to use
- the default "YZEPTGMkmμnpfazy" omits "c" for centi- and "D" for deca-
format use printf notation for value (e.g. %010.5f), p for prefix and U for unit
logarithmic when True, log_10 prefixes are used
log_base logarithm base value
note that deca- is correctly rendered as "da", the "D" is used in use_prefixes only
"""
self.value = float(value)
self.unit = unit if unit else ""
self.dimension = dimension
self.use_prefixes = use_prefixes
self.format = format
self.logarithmic = logarithmic
self.log_base = log_base
if self.logarithmic and (self.value < 0 or self.log_base < 0):
raise ValueError("math domain error (negative values can't be represented in dB)")
def __str__(self):
if self.value == 0.0:
e = 0
else:
e = round(math.log(abs(self.value), 10), 6)
if self.logarithmic:
prefix = "dB"
value = math.log(self.value, self.log_base) * 10
else:
for prefix, mul in self._prefixes:
if prefix in self.use_prefixes and mul*self.dimension <= e:
break
value = self.value / 10**(mul*self.dimension)
output = self.format %(value)
output = output.replace("p", prefix if prefix != "D" else "da") # deca- handler
output = output.replace("U", self.unit)
return output
def __repr__(self):
return "over.text.Unit(%s)" %(self)
# --------------------------------------------------
ansi_colors = {
"B": ("\x1b[1;34m", "bright blue"),
"C": ("\x1b[1;36m", "bright cyan"),
"G": ("\x1b[1;32m", "bright green"),
"K": ("\x1b[1;30m", "bright black"),
"M": ("\x1b[1;35m", "bright magenta"),
"R": ("\x1b[1;31m", "bright red"),
"W": ("\x1b[1;37m", "bright white"),
"Y": ("\x1b[1;33m", "bright yellow"),
"b": ("\x1b[0;34m", "blue"),
"c": ("\x1b[0;36m", "cyan"),
"g": ("\x1b[0;32m", "green"),
"k": ("\x1b[0;30m", "black"),
"m": ("\x1b[0;35m", "magenta"),
"r": ("\x1b[0;31m", "red"),
"w": ("\x1b[0;37m", "white"),
"y": ("\x1b[0;33m", "yellow"),
".": ("\x1b[0m", "reset"),
}
def render(text, colors=True):
"""
Processes text with color tags and either
removes them (with colors=False) or replaces
them with terminal color codes.
Color tags are <x> where x is the color code from ansi_colors.
<.> resets the color. Use <<x> for a literal <x>.
@while coloring text
"""
text = str(text)
output = []
window = []
for c in text:
window.append(c)
if len(window) < 3:
continue
elif len(window) == 4:
output.append(window.pop(0))
# the window now contains three chars
if window[0] == "<" and window[2] == ">":
code = window[1]
if code in ansi_colors.keys():
if not output or output[-1] != "<":
if colors:
output.append(ansi_colors[code][0])
window.clear()
elif output and output[-1] == "<":
window.pop(0) # remove the opening "<" so the rest of the escaped sequence slides into output
return "".join(output + window)
# --------------------------------------------------
def rfind(text, C, start, length):
"""
Returns the highest index of C in text[start:] that is <= start+length.
Color tags aren't counted into length.
"""
# find the index of the last C in colorless text
colorless = render(text[start:], False)
indices_colorless = [i for i, c in enumerate(colorless) if c == C and i < length]
# get indices of all Cs in the original
indices = [i for i, c in enumerate(text[start:]) if c == C]
# return the original index at the same position as the last in colorless
return start + (indices[len(indices_colorless) - 1] if indices_colorless else 0)
def paragraph(text, width=0, indent=0, stamp=None, join=True):
"""
str text text to format
int width required line length; if 0, current terminal width will be used
int indent how many spaces to indent the text with
str stamp placed into the first line's indent (whether it fits is your problem)
bool join join lines into one string, otherwise return a list of individual lines
Formats text into an indented paragraph that fits the terminal and returns it.
Correctly handles colors.
@while formatting a paragraph
"""
fit_into_width = (width or get_terminal_size()[1]) - indent
lines = []
last = 0
# break into lines
while True:
idx = rfind(text, " ", last, fit_into_width)
adj_last = last + 1 if last else 0
if len(text[adj_last:]) < fit_into_width or idx == -1:
lines.append(text[adj_last:])
break
elif idx == last:
return (stamp or "") + text
else:
line = text[adj_last:idx]
last = idx
lines.append(line)
# indent
indent_str = " " * indent
for i, line in enumerate(lines):
if stamp and i == 0:
lines[i] = stamp.ljust(indent) + line
else:
lines[i] = indent_str + line
# join
return "\n".join(lines) if join else lines
# --------------------------------------------------
def strlen(string):
"""
Returns the length of a string minus all color tags.
"""
plain_string = render(string, colors=False)
return len(plain_string)
# --------------------------------------------------
def char_in_str(chars, string):
"""
Returns True iff any char from `chars` is in `string`.
"""
for char in chars:
if char in string:
return True
return False
# --------------------------------------------------
def count_leading(text, char):
"""
Returns the count of leading `char`s within `text`.
"""
if not text:
return 0
for i, c in enumerate(text):
if c != char:
return i
return i + 1
# --------------------------------------------------
class Output:
class tag:
class short:
info = " "
debug = " <c>?<.>"
start = "<W>>>><.>"
exec = start
note = " <C>#<.>"
warn = " <Y>!<.>"
fail = "<R>!!!<.>"
done = " <G>*<.>"
class long:
info = "INFO"
debug = "<c>DEBG<.>"
start = "<W>EXEC<.>"
exec = start
note = "<C>NOTE<.>"
warn = "<Y>WARN<.>"
fail = "<R>FAIL<.>"
epic = "<R>EPIC<.>"
done = "<G>DONE<.>"
ts = tag.short
tl = tag.long
"""
Text UI output renderer.
Prints messages to the stdout with optional eye candy
like colors and timestamps.
If the output stream is not a tty, colors are disabled.
Formatting
==========
You can use all formatting characters supported by strftime, as well as:
<T> - tag
<n> - name
<t> - supplied text
<i> - indentation start marker
"""
def __init__(self, name, format="[%Y-%m-%d %H:%M:%S %Z] <@> -- <n>, <i><t>", colors=True, end=".\n", indent=True, stream=sys.stderr):
self.name = name
self.format = format
self.end = end
self.indent = indent
self.sinks = []
self.add_sink(stream, colors and stream.isatty())
def add_sink(self, stream, colors):
self.sinks.append((stream, colors))
def write(self, text, tag=None, format=None, end=None):
"""
@while displaying text
"""
tag = tag or self.__class__.tag.long.info
format = format or self.format
end = end or self.end
output = datetime.datetime.now(tzlocal.get_localzone()).strftime(format)
output = output.replace("<@>", tag)
output = output.replace("<n>", self.name)
if self.indent and "<i>" in format:
indent_width = render(output, colors=False).index("<i>")
text = paragraph(text, indent=indent_width)[indent_width:]
output = output.replace("<i>", "")
output = output.replace("<t>", text)
output += end
for sink, colors in self.sinks:
sink.write(render(output, colors))
sink.flush()
def flush(self):
"""
Dummy method for compatibility.
"""
pass
def __call__(self, *args, **kwargs):
self.write(*args, **kwargs)
# --------------------------------------------------
def get_terminal_size():
"""
Returns current terminal's (rows, cols).
If this can't be determined, returns a large enough number to turn it off.
"""
terminal = sys.stdout.fileno()
try:
return struct.unpack("HHHH", fcntl.ioctl(terminal, termios.TIOCGWINSZ, struct.pack("HHHH", 0, 0, 0, 0)))
except IOError:
return (64000, 64000)
# --------------------------------------------------
class _ProgressBarChannel:
print = Output("over.text._ProgressBarChannel")
def __init__(self, unit, top, precision, use_prefixes=True, prefix_base2=False, min_width_raw=0, min_width_rate=0, min_width_time=0, min_width_percent=7):
self.unit = unit
self.top = top
self.dynamic_top = top is None
self.prefix_base2 = prefix_base2
self.precision = precision
self.use_prefixes = use_prefixes
self.min_width = {
"raw": min_width_raw,
"rate": min_width_rate,
"percent": min_width_percent,
"time": min_width_time
}
self._value = 0
self.set()
if prefix_base2:
self.print("Unit does not yet support base2 prefixes (e.g. Gi, Mi), using decadic (G, M) instead", self.print.tl.warn)
def set(self, value=None):
if value is not None:
self._value = value
self.ratio = self._value / self.top if self.top else None
def _render(self, value, unit=None, just=None):
if unit is None:
unit = self.unit
u = Unit(value, unit, format="%.{:d}f pU".format(self.precision))
if not self.use_prefixes or just == "percent":
u._prefixes = (("", 0),) # Unit needs fixin"
s = str(u)
if just:
s = s.rjust(self.min_width[just])
return s
@property
def value(self):
return self._render(self._value, just="raw")
@property
def inverse(self):
return self._render(self.top - self._value, just="raw")
def divide(self, amount):
"""
Divides the amount into amount * ratio, amount * (1 - ratio) and rounds to integers.
"""
A = round(amount * self.ratio)
return (A, amount - A)
class ProgressBar:
"""
A configurable text progressbar.
Each atom consists of three characters: a literal §, an operator and a channel.
An operator is one of:
% - a percentage (value from 0 to 100, padded with spaces to three characters)
r - displays the raw value of the channel
z - displays the maximum (total) value
s - displays the rate of change in units/s
m - displays the rate of change in units/min
h - displays the rate of change in units/h
t - displays the elapsed time in s (use with uppercase channel to get an ETA)
T - displays the elapsed time in hh:mm:ss (use with uppercase channel to get an ETA)
any other character - a bar consisting of these characters filling the line so that
at 100% it fills the entirety of the remaining space
A channel ID as a lowercase letter is the channel"s actual value. An uppercase letter
is that channel"s complement. Operations z, s, m and h cannot have a complement.
Use §§ to display a literal §.
Examples
========
Format: §%a [§=a>§ A] §tA
Output: 42 % [============> ] 27 s
Format: §%a (§ra/§ma) [§-a>§ A] §sa [§rb/§rB] (ETA §tA)
Output: 53 % (21.0/39.6 kf) [---------------------> ] 27.1 f/s [233/439 Mio] (ETA 00:11:26)
"""
def __init__(self, format, channels, width=None):
"""
Initialize the ProgressBar.
width is the desired size of the progressbar drawing area in characters (columns)
and defaults to terminal width.
space_before_unit enables a space character between a value and its unit.
channels is a dict that for each channel lists its properties:
{
"a": {
"unit": "f",
"prefix_base2": False,
"top": 39600,
"precision": 1
},
"b": {
"unit": "o",
"prefix_base2": True,
"top": measure_file_size_closure("/path/to/file"),
"precision": 0
}
}
Channel IDs (the "a", "b") are arbitrary lowercase letters ([a-z]).
Properties "unit" and "prefix_base2" are passed to over.core.text.Unit.
"top" is the value of the channel that corresponds to 100%. Channels are
allowed to exceed this value. It can either be a number or a callable. If
it"s a callable, it will be called without arguments and shall return a number.
"precision" is the displayed floating point precision. Use 0 to force an integer.
@while creating a ProgressBar
"""
self.format = format
self.channels = {id: _ProgressBarChannel(**conf) for id, conf in channels.items()}
self.time_start = None
self.width = width
def set(self, channel_id, value):
"""
Sets the channel"s value.
"""
c = self.channels[channel_id]
c.set(value)
if c.ratio is not None:
for channel in self.channels.values():
if channel.dynamic_top:
channel.top = channel._value / c.ratio
def render(self, clear=True, stream=sys.stderr):
"""
Renders the progressbar into a stream. If clear is True, clears the previous content first.
@while drawing a progressbar update
"""
width = self.width or get_terminal_size()[1]
raw = self.format
t = time.time()
if not self.time_start:
self.time_start = t
for tag in re.findall("§[%rzsmhtT][a-zA-Z]", raw):
op = tag[1]
ch = tag[2]
channel = self.channels[ch.lower()]
inverse = ch.isupper()
if op == "%":
ratio = 1 - channel.ratio if inverse else channel.ratio
raw = raw.replace(tag, channel._render(ratio * 100, "%", "percent"), 1)
elif op == "r": # display the current value
raw = raw.replace(tag, channel.inverse if inverse else channel.value, 1)
elif op == "z": # display the top value
if inverse:
raise ValueError("inverse of the top value is not defined: {:s}".format(tag))
if channel.top:
s = channel._render(channel.top, just="raw")
else:
s = "- {:s}".format(channel.unit).rjust(channel.min_width["raw"])
raw = raw.replace(tag, s, 1)
elif op.lower() == "t":
t_elapsed = t - self.time_start
t_to_end = (t_elapsed * (channel.top - channel._value) / channel._value) if channel._value > 0 else None
raw = raw.replace(tag, self._render_time(t_to_end if inverse else t_elapsed, op.isupper(), channel.min_width["time"]), 1)
elif op in "smh":
if inverse:
raise ValueError("the inverse of rates is not defined: {:s}".format(tag))
t_elapsed = t - self.time_start
if op == "s":
t_unit = "s"
t_div = 1
elif op == "m":
t_unit = "min"
t_div = 60
elif op == "h":
t_unit = "h"
t_div = 3600
if t_elapsed > 0:
rate = t_div * channel._value / t_elapsed
s = channel._render(rate, channel.unit + "/" + t_unit, "rate")
else:
s = "- {:s}/{:s}".format(channel.unit, t_unit).rjust(channel.min_width["rate"])
raw = raw.replace(tag, s, 1)
# all that remains are bars
free_space = width - len(re.sub("§[^%rzsmhtT][a-zA-Z]", "", raw))
for tag in re.findall("§[^%rzsmhtT][a-zA-Z]", raw):
op = tag[1]
ch = tag[2]
channel = self.channels[ch.lower()]
lengths = channel.divide(free_space)
raw = raw.replace(tag, op * lengths[1 if ch.isupper() else 0], 1)
# render literal §s
raw = raw.replace("§§", "§")
# pave over previous render and draw the new one
if clear:
stream.write("\r" + " " * width)
stream.write("\r" + raw)
def end(self, finish=False):
"""
Print a newline (heh :-)
If finish is True, set all channels to 100% and draw the progressbar first.
"""
if finish:
for c in self.channels.values():
c.set(c.top)
self.render()
print()
def _render_time(self, seconds, hhmmss=False, just=0):
if hhmmss:
if seconds is None:
output = "--:--:--"
else:
mm, ss = divmod(int(seconds), 60)
hh, mm = divmod(mm, 60)
output = "{:02d}:{:02d}:{:02d}".format(hh, mm, ss)
else:
output = "- s" if seconds is None else "{:d} s".format(int(seconds))
return output.rjust(just)
# --------------------------------------------------
if __name__ == "__main__":
o = Output("over.text")
o("Sample info message")
o("Sample debug message", o.tl.debug)
o("Sample action start message", o.tl.start)
o("Sample action success message", o.tl.done)
o("Sample warning message", o.tl.warn)
o("Sample error message", o.tl.fail)
o("Available colors", end=":\n")
for abbr, (code, name) in sorted(ansi_colors.items()):
o("%s = <%s>%s<.>" %(abbr, abbr, name))
o("ProgressBar test")
pb = ProgressBar(
"§%a (§ra/§za) [§=a>§ A] §sa [§rb/§zb] (ETA §TA)",
{
"a": {
"unit": "f",
"prefix_base2": False,
"top": 39600,
"precision": 1,
"min_width_raw": 10,
"min_width_rate": 10,
# "min_width_percent": 7, # 7 is already the default
"min_width_time": 0
},
"b": {
"unit": "o",
"prefix_base2": True,
"top": 3200,
"precision": 0,
"min_width_raw": 10,
"min_width_rate": 10,
"min_width_time": 10
}
}
)
ratios = [0, 0.1, 0.25, 0.4, 0.6, 0.8, 0.95, 1.0]
for ratio in ratios:
pb.set("a", ratio * pb.channels["a"].top)
pb.set("b", ratio * pb.channels["b"].top)
pb.render()
time.sleep(0.25)
pb.end(not True)