over/over/app.py
2018-12-12 22:40:28 +01:00

832 lines
24 KiB
Python

#! /usr/bin/env python3
# encoding: utf-8
# --------------------------------------------------
# Library imports
from collections import OrderedDict
import enum
import sys
import re
import os
import shlex
import hashlib
import traceback
try:
import xdg.BaseDirectory as xdg_bd
except:
xdg_bd = None
# --------------------------------------------------
# Local imports
from . import callback as callback_module
from . import cmd
from . import docs
from . import text
from . import types
from . import version
# --------------------------------------------------
# Exceptions
class UnknownAbbreviation(Exception):
"""
This abbreviation doesn't map to any known option. If this was a target, place it at the end of the line after a guard (--).
"""
pass
class UnknownOption(Exception):
"""
This option was mentioned in code, the config file or on the command line but is not known.
"""
pass
class ReadingUnsetOption(Exception):
"""
This option has no default, config file or command line value set.
"""
pass
class UnknownFeature(Exception):
"""
This requested feature is not know to this version of over.
"""
pass
class OptionNameUsed(Exception):
"""
This option's name or abbreviation is already used by a previously defined option.
"""
pass
class OptionSyntaxError(Exception):
"""
This Option rejected the value passed to it.
"""
pass
class InternalError(Exception):
"""
A possible bug in over.
"""
pass
class IncompleteArguments(Exception):
@property
def description(self):
option_name, option_count, option_remaining = self.args
received = option_count - option_remaining
return "option --%s takes a %d-word argument but has received %s" %(option_name, option_count, received or "none")
class BoolUsageError(Exception):
@property
def description(self):
option_name = self.args[0]
return "option --%s is not boolean so you can't use --no-%s" %(option_name, option_name)
class NotAList(Exception):
"""
Option defaults must be in a list.
"""
pass
# --------------------------------------------------
class Option_sources(enum.Enum):
none = 0
default = 1
config_file = 2
command_line = 3
class Option:
def __init__(self, name, description, callback, default=Option_sources.none, count=0, overwrite=True, abbr=None, in_cfg_file=True, in_help=True, is_boolean=None):
self.name = name
self.description = description
self.callback = callback
self.default = default
self.count = count
self.overwrite = overwrite
self.abbr = abbr
self.in_cfg_file = in_cfg_file
self.in_help = in_help
self.hash = hashlib.sha1(name.encode("utf-8")).hexdigest()
if is_boolean is None:
self.is_boolean = callback in (bool, callback_module.boolean, callback_module.booleans)
else:
self.is_boolean = is_boolean
# coerce a few Python types to our own
if callback == int:
self.callback = callback_module.integer
self.reset(True) # sets self.source and self._value_lists
def reset(self, default=False):
self.source = Option_sources.none
self._value_list = []
self._raw_value_list = []
if default:
if self.default != Option_sources.none:
default_values = [self.default] if self.overwrite else self.default
for default_value in default_values:
self.set_value(default_value, Option_sources.default)
def set_value(self, raw_value, source):
"""
Adds a value to this option's state. If the `source` is different from `self.source`, resets the state.
@while setting option value
"""
if source != self.source:
self.reset()
self.source = source
if type(raw_value) not in (tuple, list):
raise NotAList(self.name, raw_value)
if self.count > 0: # bools have count = 0 yet they still get one arg
l = len(raw_value)
if l != self.count:
raise IncompleteArguments(self.name, self.count, self.count - l)
try:
value = self.callback(*raw_value)
except ValueError:
raise OptionSyntaxError(self.name, repr(raw_value))
if self.overwrite:
self._value_list = [value]
self._raw_value_list = [raw_value]
else:
self._value_list.append(value)
self._raw_value_list.append(raw_value)
@property
def value(self):
if not self._value_list:
return None
else:
if self.overwrite:
return self._value_list[0]
else:
return self._value_list
# --------------------------------------------------
class OptionRouter:
def __init__(self, options):
self.options = options
def __getattr__(self, requested_name):
"""
@while retrieving an option's value
"""
matches = [name for name in self.options if name.replace("-", "_") == requested_name]
if matches:
if len(matches) == 1:
return self.options[matches[0]].value
else:
raise InternalError("more than one option name matched")
else:
raise UnknownOption(requested_name)
# --------------------------------------------------
def expand_group(token, options):
"""
@while expanding a commandline group
"""
output = []
options_by_abbr = {option.abbr: option for option in options.values() if option.abbr}
group_positive = token[0] is "+"
for abbr in token[1:]:
try:
option = options_by_abbr[abbr]
except KeyError:
raise UnknownAbbreviation(abbr)
if option.is_boolean:
output.append("--%s%s" %("" if group_positive else "no-", option.name))
else:
output.append("--%s" %(option.name))
return output
# --------------------------------------------------
def get_xdg_paths(appname):
"""
Returns absolute directory paths {"data": ..., "config": ...} for application data and config storage.
"""
if xdg_bd:
xdg_data = xdg_bd.xdg_data_home
xdg_config = xdg_bd.xdg_config_home
else:
xdg_data = os.path.expanduser('~/.local/share')
xdg_config = os.path.expanduser('~/.config')
return {
"data": os.path.join(xdg_data, appname),
"config": os.path.join(xdg_config, appname)
}
# --------------------------------------------------
def serialize_callback(f):
return "%s.%s" %(f.__module__, f.__name__)
# --------------------------------------------------
def format_description(description):
colorless = text.render(description, colors=False)
lines = text.paragraph(colorless, 55, 2, join=False)
# comment them out and assemble
return "\n".join("#" + line[1:] for line in lines)
# --------------------------------------------------
def serialize_default(option):
if option.default is Option_sources.none:
return ""
else:
return cmd.format_invocation(str(x) for x in option.default)
# --------------------------------------------------
def serialize_source(source):
if source == Option_sources.none:
return "<R>None<.>"
elif source == Option_sources.default:
return "default"
elif source == Option_sources.config_file:
return "<y>config file<.>"
elif source == Option_sources.command_line:
return "<Y>command line<.>"
# --------------------------------------------------
class ConfigFile:
"""
Config file object. Takes a {name: Option} dictionary and a file path, and:
- opens the file, loads all option values in it into Options
- unknown Options present in the file generate exceptions
- missing options are appended to the file
"""
def __init__(self, options, path, ignore_unknown=False):
"""
"""
self.options = options
self.path = path
self.ignore_unknown = ignore_unknown
self.print = text.Output("over.app.ConfigFile")
self.seen_hashes = set()
def read_config(self):
"""
Reads the config file, updates self.options with values when available, and returns a set of updated options.
@while reading the config file
"""
updated = set()
if os.path.exists(self.path):
# read all options in the file
with open(self.path) as f:
for line in f:
line = line.strip()
if line:
if line[0] == "#":
m = re.findall("# ---- ([0-9a-f]{40}) ----", line)
if m:
self.seen_hashes.add(m[0])
else:
L, R = (t.strip() for t in line.split("="))
try:
option = self.options[L]
except KeyError:
if self.ignore_unknown:
continue
else:
raise UnknownOption(L)
args = shlex.split(R)
option.set_value(args, Option_sources.config_file)
updated.add(option)
return updated
def update_config(self, header_template, header_args, new_options_commented=True):
"""
Creates the config file if necessary, adds new or missing options into it, and returns a set of newly added options.
@while updating the config file with new options
"""
# create the config dir
config_dir = os.path.dirname(self.path)
if config_dir.strip() and not os.path.exists(config_dir):
os.mkdir(config_dir)
self.print("created config directory <c>%s<.>" %(config_dir))
# if the file doesn't exist, create it with a boilerplate header
if not os.path.exists(self.path):
with open(self.path, "w") as f:
f.write(header_template %header_args)
self.print("created empty config file <c>%s<.>" %(self.path))
# add new or otherwise missing options
updated = set()
with open(self.path, "a") as f:
for option in self.options.values():
if option.hash not in self.seen_hashes and option.in_cfg_file:
self.print("adding <W>--<G>%s<.> to config file" %(option.name))
f.write(docs.config_file_item %(
option.hash,
("--{0} or --no-{0}" if (option.is_boolean and option.count == 0) else "--{0}").format(option.name),
serialize_callback(option.callback),
option.count or "no",
"" if option.count == 1 else "s",
"the last instance counts" if option.overwrite else "can be specified multiple times",
format_description(option.description),
"#" if new_options_commented else "",
option.name,
serialize_default(option)
))
updated.add(option)
return updated
def __repr__(self):
return "ConfigFile(%s)" %(self.path)
# --------------------------------------------------
class Main:
default_features = {
"config_file": (False, "Maintains and uses a config file in the XDG config directory (usually ~/.config/appname)."),
"auto_add_help": (True, "Generates the program's help and state view."),
"handle_exceptions": (True, "Displays human-readable stack traces when unhandled exceptions are raised.")
}
"""
Application backbone. Provides:
* A configuration system consisting of
* a given set of (hardcoded) default values
* a configuration file that overrides the defaults
* a command line parser that overrides all of the above
* Help system that generates help text based on the application's configurable options.
* A prettyprinting exception handler.
"""
def __init__(self, name, version=None, license=None, features={}):
self.name = name
self.version = version
self.license = license
self.print = text.Output(name)
self.options = OrderedDict()
self.options_by_abbr = OrderedDict()
self.docs = OrderedDict()
self.cfg = OptionRouter(self.options)
self.targets = []
self.invocation = cmd.format_invocation(sys.argv)
self.features = types.ndict()
self.last_command_line = None
self.using_alternate_config = False
self.uncontained_exception_callbacks = [] # (function, args)
for feature_name in self.default_features:
if feature_name in features:
self.features[feature_name] = features[feature_name]
else:
self.features[feature_name] = self.default_features[feature_name][0]
for feature_name in features:
if feature_name not in self.default_features:
raise UnknownFeature(feature_name)
if self.features.handle_exceptions:
sys.excepthook = self.stack_tracer
def __repr__(self):
return 'over.app.Main(name="%s")' %(self.name)
def exit(self, rv=0):
"""
Terminates the program and returns `rv`.
"""
sys.exit(rv)
def load_alternate_config(self, path):
"""
Resets the option's internal state and loads a different config file.
@while loading alternate config file
"""
if self.using_alternate_config:
return
else:
self.using_alternate_config = True
for option in self.options.values():
option.reset(True)
self.parse(self.last_command_line, path)
def add_option(self, *args, **kwargs):
"""
@while registering a new configuration option
"""
option = Option(*args, **kwargs)
if option.name in self.options:
raise OptionNameUsed("name %s is already in use" %(option.name))
else:
self.options[option.name] = option
if option.abbr:
if option.abbr in self.options_by_abbr:
raise OptionNameUsed("abbreviation %s is already used by --%s" %(option.abbr, self.options_by_abbr[option.abbr].name))
else:
self.options_by_abbr[option.abbr] = option
def add_doc(self, chapter, paragraphs):
self.docs[chapter] = paragraphs
def dump(self):
"""
Prints the application's own configuration state as one long command line.
Skips action verbs.
"""
tokens = [sys.argv[0]] # FIXME breaks encapsulation
for option in self.options.values():
if option.is_boolean and option.count == 0:
if option.value:
tokens.append("<W>--<g>%s<.>" %(option.name))
else:
tokens.append("<R>--no-<g>%s<.>" %(option.name))
elif option.count > 0:
for raw_value in option._raw_value_list:
tokens.append("<W>--<g>%s<.> <M>%s<.>" %(option.name, cmd.format_invocation(str(x) for x in raw_value)))
for target in self.targets:
tokens.append(target)
self.print(" ".join(tokens), format="<t>")
def activate_debug(self):
raise NotImplementedError("debug is not yet here")
def enable_help(self, name="help", abbr="h"):
"""
Map application help to --name and -abbr, and enable library ---help.
@while adding help commandline options
"""
self.add_option(name, "Display the application configuration view.", self.help, abbr=abbr, in_cfg_file=False)
self.add_option("-help", "Display the library help and about views.", lambda: self.help(alternate_docs=docs.over_docs), in_cfg_file=False)
self.add_option("-config", "Path to an alternate config file.", self.load_alternate_config, count=1, in_cfg_file=False)
self.add_option("-dump", "Show a command line that exactly represents the application's state.", self.dump, in_cfg_file=False)
self.add_option("-debug", "Enable extra debug messages (sets <c>over<.>.<c>app<.>.<c>Main<.>.<y>debug<.> = <M>True<.>). Currently does nothing.", self.activate_debug, in_cfg_file=False)
def help(self, alternate_docs=None):
"""
Displays a help text and exits the program:
- application name, version and license name
- application description and other documentation supplied with `Main.add_doc`
- a list of application options with their descriptions and current values
- a list of currently specified targets.
@while displaying help
"""
print = text.Output("help", format="<t>", end="\n")
# App name and version
print("[<W>Application<.>]")
print(" <W>%s<.>-<c>%s<.> licensed under <W>%s<.>" %(self.name, self.version, self.license))
print(" using over-%s" %(version.str))
# Main features
print("")
print("[<W>over.app.Main features<.>]")
for feature_name in self.features:
print(" %s: <B>%s<.> (%s)" %(" <G>ON<.>" if self.features[feature_name] else "<R>OFF<.>", feature_name, self.default_features[feature_name][1]))
# App docs
print("")
for chapter, paragraphs in (alternate_docs or self.docs).items():
print("[<W>%s<.>]" %(chapter))
for paragraph in paragraphs:
print(paragraph, format=" <i><t>")
print("")
# App options
if not alternate_docs:
print("[<W>Options<.>]")
for option in self.options.values():
if option.in_help:
# option name and type
full_names = ["<W>--<g>%s<.>" %(option.name)]
abbr_names = []
if option.abbr:
abbr_names.append("<W>+<G>%s<.>" %(option.abbr))
if option.is_boolean and option.count == 0:
full_names.append("<R>--no-<g>%s<.>" %(option.name))
if option.abbr:
abbr_names.append("<R>-<G>%s<.>" %(option.abbr))
line_parts = [" " + " or ".join(full_names)]
if abbr_names:
line_parts.append(" (%s)" %(" or ".join(abbr_names)))
if not (option.count == 0 and not option.is_boolean): # don't display type on actions
line_parts.append("; type <B>%s<.>" %(serialize_callback(option.callback)))
if not option.overwrite:
line_parts.append(", <C>can be used more than once<.>")
print("".join(line_parts))
# Current value
if option.source != Option_sources.none:
for raw_value in option._raw_value_list:
colored_current_value = cmd.format_invocation(str(x) for x in raw_value)
if colored_current_value and colored_current_value[0] == '"':
colored_current_value = '"<M>' + colored_current_value[1:-1] + '<.>"'
else:
colored_current_value = '<M>' + colored_current_value + '<.>'
print(" Current value (%s): %s" %(
serialize_source(option.source),
colored_current_value
))
# some misc flags
if not option.in_cfg_file:
print(" (<c>not in config file<.>)")
# the desc text
print(option.description, format=" <i><t>")
print("")
# Current targets, if any
if self.targets:
print("[<W>Targets<.>]")
print(" <M>%s<.>" %(cmd.format_invocation(self.targets)))
self.exit()
def parse_config(self, force_path=None):
if self.features.config_file:
config_dir = get_xdg_paths(self.name)["config"]
config_path = force_path or os.path.join(config_dir, "main.cfg")
self.config_file = ConfigFile(self.options, config_path)
self.config_file.read_config()
self.config_file.update_config(docs.config_file_header, (self.name, self.version, version.str))
else:
self.config_file = None
def parse_command_line(self, command_line=None):
"""
Parses arguments on the commandline.
There are 5 types of arguments:
- bool flags (True is --option (+o), False is --no-option (-o))
- action options (passing the option triggers a callback, e.g. --execute)
- options taking a `Option.count` (>=1) arguments (--option ARGUMENT, --option ARG1 ARG2 ARG3 and so on)
- a guard that marks all following arguments as targets (--)
- all other arguments are targets
Additionally, options are either over-writing or cumulative. An overwriting
option, if encountered more than once, takes the last (rightmost) state. E.g.
+A -A +A evaluates to True. A cumulative option would evaluate to a list of
[True, False, True].
@while parsing commandline arguments
"""
command_line = command_line or sys.argv[1:]
self.last_command_line = command_line
# placing it here ensures --help is the last option in the list of options
if self.features.auto_add_help and "help" not in self.options:
self.enable_help()
last_read_option = None
remaining_payload_tokens = 0
temporary_payload = []
guard_passed = False
invocation = [sys.argv[0]]
scheduled_actions = []
for top_token in command_line:
# if it's a group
if not (remaining_payload_tokens or guard_passed) and len(top_token) >= 2 and top_token[0] in "+-" and top_token[1] != "-":
expanded = expand_group(top_token, self.options)
else:
expanded = [top_token]
for token in expanded:
invocation.append(token)
if not (remaining_payload_tokens or guard_passed) and token[:2] == "--":
if token == "--":
guard_passed = True
else:
option_name = token[2:]
option_is_positive = True
if option_name not in self.options:
if option_name[:3] == "no-":
option_name = option_name[3:]
option_is_positive = False
try:
option = self.options[option_name]
except KeyError:
raise UnknownOption(token)
if not option_is_positive and not option.is_boolean:
raise BoolUsageError(option.name)
last_read_option = option
if option.count == 0:
if option.is_boolean:
# single booleans are handled specially because of their --option/--no-option forms
option.set_value([option_is_positive], Option_sources.command_line)
else:
# --action verbs just want their callbacks called
scheduled_actions.append(option.callback)
else:
remaining_payload_tokens = option.count
temporary_payload = []
else:
if remaining_payload_tokens:
temporary_payload.append(token)
remaining_payload_tokens -= 1
if remaining_payload_tokens == 0:
last_read_option.set_value(temporary_payload, Option_sources.command_line)
else:
self.targets.append(token)
if remaining_payload_tokens:
raise IncompleteArguments(last_read_option.name, last_read_option.count, remaining_payload_tokens)
self.invocation = cmd.format_invocation(invocation)
for action in scheduled_actions:
action()
def setup(self, command_line=None, force_config=None, reset=False):
"""
@while determining application configuration
"""
if reset:
for option in self.options.values():
option.reset(True)
self.parse_config(force_config)
self.parse_command_line(command_line)
def stack_tracer(self, exception_type, exception, trace):
"""
@while formatting a traceback
Over Exception handler - prints human readable tracebacks.
If things go south even here, prints the topmost traceback and gives up in a safe manner.
"""
self.print("<R>uncontained exception<.> <Y>%s<.> raised" %(exception_type.__name__), self.print.tl.fail)
try:
tb_lines = ["", "---------------- Stack trace ----------------", "In program <W>%s<.>" %(self.invocation)]
while trace:
frame = trace.tb_frame
trace = trace.tb_next
method_name = frame.f_code.co_name
# if there's a "self" in locals, we can get a hold of the method using getattr
if "self" in frame.f_locals:
method = getattr(frame.f_locals["self"], method_name)
method_display_name = "<c>%s<.>.<y>%s<.>" %(method.__self__.__class__.__name__, method_name)
method_type = "method"
# if it's a global function, it's likely going to be here
elif method_name in frame.f_globals:
method = frame.f_globals[method_name]
method_display_name = "<y>%s<.>" %(method_name)
method_type = "function"
# otherwise give up
else:
method = None
method_display_name = "<y>%s<.>" %(method_name)
if method_name == "<module>":
method_type = "module"
else:
method_type = "<r>unknown callable<.>"
# use a docstring-provided action description if available
if method and method.__doc__ and "@while" in method.__doc__:
action = "while <m>%s<.> " %(re.findall("@while (.+)", method.__doc__)[0].strip())
else:
action = ""
tb_lines.append("%sin %s <y>%s<.> at %s:%d," %(action, method_type, method_display_name, frame.f_code.co_filename, frame.f_lineno))
if hasattr(exception, "description"):
reason = ": <R>%s<.>" %(exception.description)
elif exception.args:
reason = ": <R>%s<.>" %(" ".join(str(a) for a in exception.args))
else:
reason = ""
doc = " [%s]" %(exception.__doc__.strip()) if exception.__doc__ else ""
tb_lines.append("exception <Y>%s<.> was raised%s%s" %(exception_type.__name__, reason, doc))
last_i = len(tb_lines) - 1
for i, line in enumerate(tb_lines):
if i < 3:
format = "<t>"
elif i == last_i:
format = "%s ⇒ <i><t>" %((i - 3) * " ")
else:
format = "%s - <i><t>" %((i - 3) * " ")
self.print(line, format=format, end="\n")
self.print("---------------------------------------------", format="<t>", end="\n")
except:
self.print("<R>failed to contain exception<.>", self.print.tl.epic)
traceback.print_exc()
if self.uncontained_exception_callbacks:
self.print("executing containment callbacks", self.print.tl.exec)
l = len(self.uncontained_exception_callbacks)
for i, (cb, ctx) in enumerate(self.uncontained_exception_callbacks):
self.print("(%d/%d) %s" %(i+1, l, cb.__name__))
try:
cb(*ctx)
except Exception as e:
self.print("(%d/%d) <r>%s failed<.> (<R>%s<.>)" %(i+1, l, cb.__name__, e.__class__.__name__), self.print.tl.epic, end=":\n")
exc_info = sys.exc_info()
traceback.print_exception(*exc_info)
del exc_info
self.print("containment callbacks executed", self.print.tl.done)