Text output API was brought in line with ACE and similar libraries: there are
different functions for each log level, all with prinf semantics.

Unit is now fully usable with base-2 (MiB) and base-10 (MB) systems, as well
as arbitrary-base logarithms (dBm).

ProgressBar has been updated to expose more Unit configuration.
This commit is contained in:
Martin Sekera 2021-08-24 09:23:29 +02:00
parent 852cce04b6
commit 8f505849af
5 changed files with 193 additions and 155 deletions

View file

@ -305,7 +305,7 @@ class ConfigFile:
self.options = options
self.path = path
self.ignore_unknown = ignore_unknown
self.print = text.Output("over.app.ConfigFile")
self.log = text.Log("over.app.ConfigFile")
self.seen_hashes = set()
def read_config(self):
@ -356,13 +356,13 @@ class ConfigFile:
config_dir = os.path.dirname(self.path)
if config_dir.strip() and not os.path.exists(config_dir):
os.mkdir(config_dir)
self.print("created config directory <c>%s<.>" %(config_dir))
self.log.info("created config directory <c>%s<.>", config_dir)
# if the file doesn't exist, create it with a boilerplate header
if not os.path.exists(self.path):
with open(self.path, "w") as f:
f.write(header_template %header_args)
self.print("created empty config file <c>%s<.>" %(self.path))
self.log.info("created empty config file <c>%s<.>", self.path)
# add new or otherwise missing options
updated = set()
@ -370,7 +370,7 @@ class ConfigFile:
with open(self.path, "a") as f:
for option in self.options.values():
if option.hash not in self.seen_hashes and option.in_cfg_file:
self.print("adding <W>--<G>%s<.> to config file" %(option.name))
self.log.info("adding <W>--<G>%s<.> to config file", option.name)
f.write(docs.config_file_item %(
option.hash,
@ -415,7 +415,7 @@ class Main:
self.name = name
self.version = version
self.license = license
self.print = text.Output(name)
self.log = text.Log(name)
self.options = OrderedDict()
self.options_by_abbr = OrderedDict()
self.docs = OrderedDict()
@ -444,21 +444,6 @@ class Main:
def __repr__(self):
return 'over.app.Main(name="%s")' %(self.name)
def info(self, *args, **kwargs):
self.print(*args, **kwargs)
def start(self, *args, **kwargs):
self.print(*args, tag=self.print.tl.start, **kwargs)
def done(self, *args, **kwargs):
self.print(*args, tag=self.print.tl.done, **kwargs)
def warn(self, *args, **kwargs):
self.print(*args, tag=self.print.tl.warn, **kwargs)
def fail(self, *args, **kwargs):
self.print(*args, tag=self.print.tl.fail, **kwargs)
def exit(self, rv=0):
"""
Terminates the program and returns `rv`.
@ -526,7 +511,7 @@ class Main:
for target in self.targets:
tokens.append(target)
self.print(" ".join(tokens), format="<t>")
self.log.write(" ".join(tokens), format="<t>")
def activate_debug(self):
raise NotImplementedError("debug is not yet here")
@ -555,23 +540,23 @@ class Main:
@while displaying help
"""
print = text.Output("help", format="<t>", end="\n")
print = text.Log("help", format="<t>", end="\n").write
# App name and version
print("[<W>Application<.>]")
print(" <W>%s<.>-<c>%s<.> licensed under <W>%s<.>" %(self.name, self.version, self.license))
print(" <W>%s<.>-<c>%s<.> licensed under <W>%s<.>", self.name, self.version, self.license)
print(" using over-%s" %(version.str))
# Main features
print("")
print("[<W>over.app.Main features<.>]")
for feature_name in self.features:
print(" %s: <B>%s<.> (%s)" %(" <G>ON<.>" if self.features[feature_name] else "<R>OFF<.>", feature_name, self.default_features[feature_name][1]))
print(" %s: <B>%s<.> (%s)", " <G>ON<.>" if self.features[feature_name] else "<R>OFF<.>", feature_name, self.default_features[feature_name][1])
# App docs
print("")
for chapter, paragraphs in (alternate_docs or self.docs).items():
print("[<W>%s<.>]" %(chapter))
print("[<W>%s<.>]", chapter)
for paragraph in paragraphs:
print(paragraph, format=" <i><t>")
@ -618,10 +603,10 @@ class Main:
else:
colored_current_value = '<M>' + colored_current_value + '<.>'
print(" Current value (%s): %s" %(
print(" Current value (%s): %s",
serialize_source(option.source),
colored_current_value
))
)
# some misc flags
if not option.in_cfg_file:
@ -635,7 +620,7 @@ class Main:
# Current targets, if any
if self.targets:
print("[<W>Targets<.>]")
print(" <M>%s<.>" %(cmd.format_invocation(self.targets)))
print(" <M>%s<.>", cmd.format_invocation(self.targets))
self.exit()
@ -764,7 +749,7 @@ class Main:
If things go south even here, prints the topmost traceback and gives up in a safe manner.
"""
self.print("<R>uncontained exception<.> <Y>%s<.> raised" %(exception_type.__name__), self.print.tl.fail)
self.log.fail("<R>uncontained exception<.> <Y>%s<.> raised", exception_type.__name__)
try:
tb_lines = ["", "---------------- Stack trace ----------------", "In program <W>%s<.>" %(self.invocation)]
@ -824,26 +809,26 @@ class Main:
else:
format = "%s - <i><t>" %((i - 3) * " ")
self.print(line, format=format, end="\n")
self.log.write(line, format=format, end="\n")
self.print("---------------------------------------------", format="<t>", end="\n")
self.log.write("---------------------------------------------", format="<t>", end="\n")
except:
self.print("<R>failed to contain exception<.>", self.print.tl.epic)
self.log.epic("<R>failed to contain exception<.>")
traceback.print_exc()
if self.uncontained_exception_callbacks:
self.print("executing containment callbacks", self.print.tl.exec)
self.log.start("executing containment callbacks")
l = len(self.uncontained_exception_callbacks)
for i, (cb, ctx) in enumerate(self.uncontained_exception_callbacks):
self.print("(%d/%d) %s" %(i+1, l, cb.__name__))
self.log.write("(%d/%d) %s", i+1, l, cb.__name__)
try:
cb(*ctx)
except Exception as e:
self.print("(%d/%d) <r>%s failed<.> (<R>%s<.>)" %(i+1, l, cb.__name__, e.__class__.__name__), self.print.tl.epic, end=":\n")
self.log.epic("(%d/%d) <r>%s failed<.> (<R>%s<.>)", i+1, l, cb.__name__, e.__class__.__name__, end=":\n")
exc_info = sys.exc_info()
traceback.print_exception(*exc_info)
del exc_info
self.print("containment callbacks executed", self.print.tl.done)
self.log.done("containment callbacks executed")

View file

@ -7,7 +7,7 @@ import traceback
from . import text
class DeprecationForwarder:
print = text.Output("over.aux.DeprecationForwarder", stream=sys.stderr)
log = text.Log("over.aux.DeprecationForwarder")
def __init__(self, target, old_name, new_name):
self._target = target
@ -16,6 +16,6 @@ class DeprecationForwarder:
def __getattr__(self, name):
caller = traceback.extract_stack()[-2]
self.print("%s is deprecated, please use %s instead (%s:%d)" %(self._old_name, self._new_name, caller[0], caller[1]), self.print.tl.warn)
self.log.warn("%s is deprecated, please use %s instead (%s:%d)", self._old_name, self._new_name, caller[0], caller[1])
return getattr(self._target, name)

View file

@ -30,10 +30,13 @@ def char_in_str(chars, string):
# --------------------------------------------------
def format_invocation(args):
def escape_quote_invocation(args):
escaped = (arg.replace('"', '\\"') for arg in args)
return " ".join(('"%s"' %(a) if char_in_str(' $()[];\\"', a) else a) for a in escaped)
return [('"%s"' %(a) if char_in_str(' $()[];\\"|', a) else a) for a in escaped]
def format_invocation(args):
return " ".join(escape_quote_invocation(args))
# --------------------------------------------------
@ -97,7 +100,8 @@ class Command:
out.append(str(item))
if pretty:
return [('"%s"' %(a) if (not a or char_in_str(" $()[];\\", a)) else a) for a in out]
return escape_quote_invocation(out)
# return [('"%s"' %(a) if (not a or char_in_str(" $()[];\\", a)) else a) for a in out]
else:
return out

View file

@ -15,16 +15,16 @@ import tzlocal
# --------------------------------------------------
def lexical_join(words, oxford=False):
def lexical_join(words, oxford=True):
"""
Joins an iterable of words or sentence fragments into a lexical list:
>>> lexical_join(["this", "that", "one of them too"])
"this, that and one of them too"
>>> lexical_join(["this", "that", "one of them too"], oxford=True)
"this, that, and one of them too"
>>> lexical_join(["this", "that", "one of them too"], oxford=False)
"this, that and one of them too"
>>> lexical_join(["this", "that"])
"this and that"
@ -49,66 +49,80 @@ def lexical_join(words, oxford=False):
class Unit:
"""
A object that represents numbers and units in human-readable form.
TODO use significant digits instead of rigid order boundaries
TODO float superclass?
TODO base_2 prefixes (Ki, Mi, ...)
"""
_prefixes = (
_prefixes_base2 = (
("Yi", 80), ("Zi", 70), ("Ei", 60), ("Pi", 50), ("Ti", 40), ("Gi", 30), ("Mi", 20), ("Ki", 10),
("", 0)
)
_prefixes_base10 = (
("Y", 24), ("Z", 21), ("E", 18), ("P", 15), ("T", 12), ("G", 9), ("M", 6), ("k", 3),
("h", 2), ("D", 1), ("", 0), ("d", -1), ("c", -2), ("m", -3), ("μ", -6), ("n", -9),
("", 0), ("d", -1), ("c", -2), ("m", -3), ("µ", -6), ("n", -9),
("p", -12), ("f", -15), ("a", -18), ("z", -21), ("y", -24)
)
def __init__(self,
value, unit=None, dimension=1,
use_prefixes="YZEPTGMkmμnpfazy", format="%.2f pU",
logarithmic=False, log_base=10
):
def __init__(self, value, unit=None, base=10, dimension=1, format="%.2f pU", overshoot=1.5, only_prefixes=[]):
"""
value the numerical value of the variable (int or float)
unit the symbol to use, if any (str or None)
dimension the dimensionality of the value (1, 2, 3, ...)
use_prefixes which multiplier prefixes to use
- the default "YZEPTGMkmμnpfazy" omits "c" for centi- and "D" for deca-
format use printf notation for value (e.g. %010.5f), p for prefix and U for unit
logarithmic when True, log_10 prefixes are used
log_base logarithm base value
param value value to display [int, float]
param unit base unit (e.g. "m") [str]
param base see Base [int]
param dimension dimensionality of the value (1, 2, 3, ...) [int]
param format format string [str]
param overshoot how much should a lower prefix persist [float]
param restrict_prefixes list of whitelisted prefixes; uses everything if empty [list of str]
note that deca- is correctly rendered as "da", the "D" is used in use_prefixes only
# Base
With linear units, this is either exactly 2 for Base 2 prefixes (Ki, Mi, Gi, ...)
or exactly 10 for Base 10 ones (K, M, G, ... as well as m, µ, n, ...).
Negative bases force logarithmic behavior with the base equal to the absolute
value, so e.g. base=-10 forces log10 decibels.
# Overshoot behavior
Overshoot adjusts the boundary between prefixes, e.g. with overshoot=1.0
a Unit switches from k to M when the value exceeds 1000. With overshoot=1.5,
this happens at 1500.
"""
assert (base in [2, 10]) or base < 0
self.value = float(value)
self.unit = unit if unit else ""
self.unit = unit or ""
self.base = base
self.dimension = dimension
self.use_prefixes = use_prefixes
self.format = format
self.logarithmic = logarithmic
self.log_base = log_base
self.overshoot = overshoot
if self.logarithmic and (self.value < 0 or self.log_base < 0):
all_prefixes = self._prefixes_base2 if base == 2 else self._prefixes_base10
if only_prefixes:
self.prefixes = [(prefix, exp) for prefix, exp in all_prefixes if prefix in only_prefixes]
else:
self.prefixes = all_prefixes
if self.base < 0 and self.value < 0:
raise ValueError("math domain error (negative values can't be represented in dB)")
def __str__(self):
if self.value == 0.0:
e = 0
else:
e = round(math.log(abs(self.value), 10), 6)
if self.logarithmic:
prefix = "dB"
value = math.log(self.value, self.log_base) * 10
else:
for prefix, mul in self._prefixes:
if prefix in self.use_prefixes and mul*self.dimension <= e:
# each branch determines a prefix and an adjusted value
if self.base > 0:
for prefix, exp in self.prefixes:
if exp == 0: # always prefer 1.0
overshoot = 1
else:
overshoot = self.overshoot
if self.base ** (exp * self.dimension) * overshoot <= self.value:
break
value = self.value / 10**(mul*self.dimension)
value = self.value / self.base ** (exp * self.dimension)
else:
prefix = "dB"
value = math.log(self.value, -self.base) * 10
output = self.format %(value)
output = output.replace("p", prefix if prefix != "D" else "da") # deca- handler
output = output.replace("p", prefix)
output = output.replace("U", self.unit)
return output
@ -283,35 +297,21 @@ def count_leading(text, char):
# --------------------------------------------------
class Output:
class Log:
class tag:
class short:
info = " "
debug = " <c>?<.>"
start = "<W>>>><.>"
exec = start
note = " <C>#<.>"
warn = " <Y>!<.>"
fail = "<R>!!!<.>"
done = " <G>*<.>"
class long:
info = "INFO"
debug = "<c>DEBG<.>"
start = "<W>EXEC<.>"
exec = start
note = "<C>NOTE<.>"
warn = "<Y>WARN<.>"
fail = "<R>FAIL<.>"
epic = "<R>EPIC<.>"
done = "<G>DONE<.>"
ts = tag.short
tl = tag.long
info = "INFO"
debug = "<b>DEBG<.>"
begin = "<W>BEGN<.>"
note = "<C>NOTE<.>"
warn = "<Y>WARN<.>"
fail = "<R>FAIL<.>"
epic = "<R>EPIC<.>"
done = "<G>DONE<.>"
"""
Text UI output renderer.
Prints messages to the stdout with optional eye candy
Prints messages to stderr with optional eye candy
like colors and timestamps.
If the output stream is not a tty, colors are disabled.
@ -325,7 +325,7 @@ class Output:
<i> - indentation start marker
"""
def __init__(self, name, format="[%Y-%m-%d %H:%M:%S %Z] <@> -- <n>, <i><t>", colors=True, end=".\n", indent=True, stream=sys.stderr):
def __init__(self, name, format="[%Y-%m-%d %H:%M:%S %Z] <@> -- <n>, <i><t>", colors=True, end="\n", indent=True, stream=sys.stderr):
self.name = name
self.format = format
self.end = end
@ -337,12 +337,12 @@ class Output:
def add_sink(self, stream, colors):
self.sinks.append((stream, colors))
def write(self, text, tag=None, format=None, end=None):
def write(self, fmt, *args, tag=None, format=None, end=None):
"""
@while displaying text
"""
tag = tag or self.__class__.tag.long.info
tag = tag or self.__class__.tag.info
format = format or self.format
end = end or self.end
@ -350,6 +350,8 @@ class Output:
output = output.replace("<@>", tag)
output = output.replace("<n>", self.name)
text = fmt % args
if self.indent and "<i>" in format:
indent_width = render(output, colors=False).index("<i>")
text = paragraph(text, indent=indent_width)[indent_width:]
@ -362,15 +364,29 @@ class Output:
sink.write(render(output, colors))
sink.flush()
def flush(self):
"""
Dummy method for compatibility.
"""
pass
def __call__(self, *args, **kwargs):
def info(self, *args, **kwargs):
self.write(*args, **kwargs)
def debug(self, *args, **kwargs):
self.write(*args, tag=self.tag.debug, **kwargs)
def begin(self, *args, **kwargs):
self.write(*args, tag=self.tag.begin, **kwargs)
def note(self, *args, **kwargs):
self.write(*args, tag=self.tag.note, **kwargs)
def warn(self, *args, **kwargs):
self.write(*args, tag=self.tag.warn, **kwargs)
def fail(self, *args, **kwargs):
self.write(*args, tag=self.tag.fail, **kwargs)
def epic(self, *args, **kwargs):
self.write(*args, tag=self.tag.epic, **kwargs)
def done(self, *args, **kwargs):
self.write(*args, tag=self.tag.done, **kwargs)
# --------------------------------------------------
@ -391,15 +407,14 @@ def get_terminal_size():
# --------------------------------------------------
class _ProgressBarChannel:
print = Output("over.text._ProgressBarChannel")
def __init__(self, unit, top, precision, use_prefixes=True, prefix_base2=False, min_width_raw=0, min_width_rate=0, min_width_time=0, min_width_percent=7):
def __init__(self, unit, top, precision, use_prefixes=True, base=10, only_prefixes=[], min_width_raw=0, min_width_rate=0, min_width_time=0, min_width_percent=7):
self.unit = unit
self.top = top
self.dynamic_top = top is None
self.prefix_base2 = prefix_base2
self.base = base
self.precision = precision
self.use_prefixes = use_prefixes
self.only_prefixes = only_prefixes
self.min_width = {
"raw": min_width_raw,
"rate": min_width_rate,
@ -409,9 +424,6 @@ class _ProgressBarChannel:
self._value = 0
self.set()
if prefix_base2:
self.print("Unit does not yet support base2 prefixes (e.g. Gi, Mi), using decadic (G, M) instead", self.print.tl.warn)
def set(self, value=None):
if value is not None:
@ -423,10 +435,13 @@ class _ProgressBarChannel:
if unit is None:
unit = self.unit
u = Unit(value, unit, format="%.{:d}f pU".format(self.precision))
if not self.use_prefixes or just == "percent":
u._prefixes = (("", 0),) # Unit needs fixin"
u = Unit(
value,
unit,
format="%.{:d}f pU".format(self.precision),
base=self.base,
only_prefixes=[""] if (not self.use_prefixes or just == "percent") else self.only_prefixes
)
s = str(u)
@ -490,20 +505,18 @@ class ProgressBar:
width is the desired size of the progressbar drawing area in characters (columns)
and defaults to terminal width.
space_before_unit enables a space character between a value and its unit.
channels is a dict that for each channel lists its properties:
{
"a": {
"unit": "f",
"prefix_base2": False,
"top": 39600,
"precision": 1
},
"b": {
"unit": "o",
"prefix_base2": True,
"unit": "B",
"base": 2,
"only_prefixes": ["Ki", "Mi"],
"top": measure_file_size_closure("/path/to/file"),
"precision": 0
}
@ -511,7 +524,7 @@ class ProgressBar:
Channel IDs (the "a", "b") are arbitrary lowercase letters ([a-z]).
Properties "unit" and "prefix_base2" are passed to over.core.text.Unit.
Properties "unit", "base", and "only_prefixes" are passed to over.text.Unit.
"top" is the value of the channel that corresponds to 100%. Channels are
allowed to exceed this value. It can either be a number or a callable. If
it"s a callable, it will be called without arguments and shall return a number.
@ -658,28 +671,64 @@ class ProgressBar:
# --------------------------------------------------
if __name__ == "__main__":
o = Output("over.text")
o("Sample info message")
o("Sample debug message", o.tl.debug)
o("Sample action start message", o.tl.start)
o("Sample action success message", o.tl.done)
o("Sample warning message", o.tl.warn)
o("Sample error message", o.tl.fail)
def timestamp_to_hms(t):
"""
Converts a duration in seconds to a [[?h ]?m ]?s representation.
o("Available colors", end=":\n")
>>> timestamp_to_hms(12345)
'3h 25m 45s'
>>> timestamp_to_hms(3600)
'1h 0m 0s'
>>> timestamp_to_hms(1234)
'20m 34s'
>>> timestamp_to_hms(12)
'12s'
"""
hh = t // 3600
mm = (t % 3600) // 60
ss = t % 60
out = []
if hh:
out.append("%dh" %(hh))
if mm or hh:
out.append("%dm" %(mm))
out.append("%ds" %(ss))
return " ".join(out)
# --------------------------------------------------
if __name__ == "__main__":
log = Log("over.text")
log.info("Sample info message")
log.debug("Sample debug message")
log.begin("Sample action start message")
log.done("Sample action success message")
log.warn("Sample warning message")
log.fail("Sample error message")
log.note("Sample note message")
log.write("Available colors", end=":\n")
for abbr, (code, name) in sorted(ansi_colors.items()):
o("%s = <%s>%s<.>" %(abbr, abbr, name))
log.write("%s = <%s>%s<.>", abbr, abbr, name)
o("ProgressBar test")
log.begin("ProgressBar test")
pb = ProgressBar(
"§%a (§ra/§za) [§=a>§ A] §sa [§rb/§zb] (ETA §TA)",
{
"a": {
"unit": "f",
"prefix_base2": False,
"base": 10,
"top": 39600,
"precision": 1,
"min_width_raw": 10,
@ -688,8 +737,8 @@ if __name__ == "__main__":
"min_width_time": 0
},
"b": {
"unit": "o",
"prefix_base2": True,
"unit": "B",
"base": 2,
"top": 3200,
"precision": 0,
"min_width_raw": 10,
@ -707,4 +756,4 @@ if __name__ == "__main__":
pb.render()
time.sleep(0.25)
pb.end(not True)
pb.end(True)

View file

@ -1,8 +1,8 @@
#! /usr/bin/env python3
# encoding: utf-8
major = 2 # VERSION_MAJOR_IDENTIFIER
minor = 3 # VERSION_MINOR_IDENTIFIER
# VERSION_LAST_MM 2.3
major = 3 # VERSION_MAJOR_IDENTIFIER
minor = 0 # VERSION_MINOR_IDENTIFIER
# VERSION_LAST_MM 3.0
patch = 0 # VERSION_PATCH_IDENTIFIER
str = "2.3.0" # VERSION_STRING_IDENTIFIER
str = "3.0.0" # VERSION_STRING_IDENTIFIER