rename dtk -> dcd
This commit is contained in:
parent
e7a35d23e5
commit
139381e7fc
7 changed files with 56 additions and 26 deletions
7
dcd/__init__.py
Normal file
7
dcd/__init__.py
Normal file
|
@ -0,0 +1,7 @@
|
|||
#! /usr/bin/env python3
|
||||
# encoding: utf-8
|
||||
|
||||
from . import argv
|
||||
from . import aux
|
||||
from . import cfg
|
||||
from . import dt
|
188
dcd/argv.py
Normal file
188
dcd/argv.py
Normal file
|
@ -0,0 +1,188 @@
|
|||
#! /usr/bin/env python3
|
||||
# encoding: utf-8
|
||||
|
||||
import sys
|
||||
|
||||
from . import aux
|
||||
|
||||
class Action:
|
||||
"""
|
||||
A callable with signature:
|
||||
|
||||
p_args is a list of (name, type, desc) tuples
|
||||
kv_args is a dict with {name: (type, default value, desc), ...}
|
||||
"""
|
||||
|
||||
p_args = []
|
||||
kv_args = {}
|
||||
|
||||
def __call__(self, cmd, **kwargs):
|
||||
"""
|
||||
This is passed the invoking command and all the args.
|
||||
|
||||
Override this.
|
||||
"""
|
||||
|
||||
raise NotImplementedError
|
||||
|
||||
class UnknownCommand(Exception):
|
||||
def __init__(self, cmd):
|
||||
super().__init__(" ".join(cmd))
|
||||
|
||||
class BadArg(Exception):
|
||||
def __init__(self, name, T, raw):
|
||||
super().__init__("unable to interpret %s as %s (%s)" %(raw, name, T.__name__))
|
||||
|
||||
class BadKeywordArg(Exception):
|
||||
def __init__(self, name):
|
||||
super().__init__("unknown keyword argument %s" %(name))
|
||||
|
||||
class IncompleteInvocation(Exception):
|
||||
def __init__(self):
|
||||
super().__init__("one or more mandatory arguments are missing")
|
||||
|
||||
class Invocation:
|
||||
"""
|
||||
A simple command line parser.
|
||||
|
||||
Initialized with a (nested) routing dict which maps commands to Actions.
|
||||
|
||||
COMMANDS ARGS [KV_ARGS]
|
||||
|
||||
COMMANDS is one or more words describing the Action to take.
|
||||
|
||||
ARGS is exactly N ordered words (N depends on the selected Action).
|
||||
|
||||
KV_ARGS is zero or more name=value pairs. Their meaning depends on the selected Action.
|
||||
|
||||
It is an error if any token remains after parsing.
|
||||
"""
|
||||
|
||||
def __init__(self, routing):
|
||||
self.routing = routing
|
||||
|
||||
self.action = None
|
||||
self.cmd = []
|
||||
self.args = {} # name: value
|
||||
|
||||
def parse_argv(self, argv=None):
|
||||
"""
|
||||
Parses the supplied argv in order to pre-select an Action to call.
|
||||
|
||||
If no argv is passed, uses sys.argv[1:].
|
||||
"""
|
||||
|
||||
argv = argv or sys.argv[1:]
|
||||
cursor = self.routing
|
||||
consumed = 0
|
||||
|
||||
# decode the command
|
||||
for arg in argv:
|
||||
self.cmd.append(arg)
|
||||
consumed += 1
|
||||
|
||||
if arg in cursor:
|
||||
cursor = cursor[arg]
|
||||
else:
|
||||
raise UnknownCommand(self.cmd)
|
||||
|
||||
if issubclass(type(cursor), Action):
|
||||
self.action = cursor
|
||||
break
|
||||
|
||||
# at this point the expected layout of the remaining args is known
|
||||
|
||||
if not self.action:
|
||||
return
|
||||
|
||||
# pre-populate args
|
||||
for name, *_ in self.action.p_args:
|
||||
self.args[name] = None
|
||||
|
||||
for name, (T, default, _) in self.action.kv_args.items():
|
||||
self.args[name] = T(default)
|
||||
|
||||
# process positional (mandatory) args
|
||||
count_p_args = len(self.action.p_args)
|
||||
raw_p_args = argv[consumed:][:count_p_args]
|
||||
|
||||
if not len(raw_p_args) == count_p_args:
|
||||
raise IncompleteInvocation
|
||||
|
||||
for raw, (name, T, _) in zip(raw_p_args, self.action.p_args):
|
||||
try:
|
||||
self.args[name] = T(raw)
|
||||
except:
|
||||
raise BadArg(name, T, raw)
|
||||
|
||||
consumed += count_p_args
|
||||
|
||||
# process keyword (optional) args
|
||||
raw_kv_args = argv[consumed:]
|
||||
|
||||
for arg in raw_kv_args:
|
||||
if "=" in arg:
|
||||
aname, avalue = arg.split("=", 1)
|
||||
|
||||
if aname in self.action.kv_args:
|
||||
self.args[aname] = self.action.kv_args[aname][0](avalue)
|
||||
else:
|
||||
raise BadKeywordArg(arg)
|
||||
else:
|
||||
raise BadKeywordArg(arg)
|
||||
|
||||
def execute(self, context=None):
|
||||
self.action.context = context
|
||||
self.action(self.cmd, **self.args)
|
||||
|
||||
def help(self, stream=sys.stdout):
|
||||
"""
|
||||
Prints auto-generated usage information.
|
||||
"""
|
||||
|
||||
print("Usage:")
|
||||
|
||||
for cmd, act in aux.flatten_dict(self.routing):
|
||||
synopsis = ["$", sys.argv[0], cmd]
|
||||
args = []
|
||||
|
||||
for name, T, desc in act.p_args:
|
||||
synopsis.append(name)
|
||||
args.append("* %s (%s) = %s" %(name, T.__name__, desc))
|
||||
|
||||
if act.kv_args:
|
||||
synopsis.append("[")
|
||||
|
||||
for name, (T, default, desc) in act.kv_args.items():
|
||||
synopsis.append("%s=%s" %(name, default or "…"))
|
||||
args.append("+ %s (%s) = %s" %(name, T.__name__, desc))
|
||||
|
||||
synopsis.append("]")
|
||||
|
||||
print(" " + " ".join(synopsis))
|
||||
for arg in args:
|
||||
print(" " + arg)
|
||||
|
||||
print()
|
||||
print(" " + act.__doc__.strip())
|
||||
|
||||
print()
|
||||
|
||||
if __name__ == "__main__":
|
||||
class TestActionHello(Action):
|
||||
p_args = [("NAME", str, "description"), ("AGE", int, "another short piece of wisdom")]
|
||||
kv_args = {"speed": (float, 75, "more stuff")}
|
||||
|
||||
def __call__(self, cmd, NAME, AGE, speed):
|
||||
print("Hello %s, age %d." %(NAME, AGE))
|
||||
print("Called by %s with %s" %(cmd, speed))
|
||||
|
||||
routing = {
|
||||
"person": {
|
||||
"hello": TestActionHello()
|
||||
}
|
||||
}
|
||||
|
||||
invo = Invocation(routing)
|
||||
invo.parse_argv(["person", "hello", "Martin", "31", "speed=85"])
|
||||
invo.execute()
|
79
dcd/aux.py
Normal file
79
dcd/aux.py
Normal file
|
@ -0,0 +1,79 @@
|
|||
#! /usr/bin/env python3
|
||||
# encoding: utf-8
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
def flatten_dict(root, glue=" ", prefix=[]):
|
||||
lines = []
|
||||
|
||||
for k, v in root.items():
|
||||
new_prefix = prefix + [k]
|
||||
|
||||
if type(v) == dict:
|
||||
lines.extend(flatten_dict(v, glue, new_prefix))
|
||||
else:
|
||||
lines.append((glue.join(new_prefix), v))
|
||||
|
||||
return lines
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
import random
|
||||
|
||||
PASSPHRASE_VOWELS = "aeiuAEU"
|
||||
PASSPHRASE_CONSONANTS = "bcdfghjkmnpqrstvwxyzBCDFGHJKLMNPQRSTVWXYZ23456789"
|
||||
|
||||
def random_bytes(length):
|
||||
return open("/dev/urandom", "rb").read(length)
|
||||
|
||||
def random_syllable():
|
||||
return random.choice(PASSPHRASE_CONSONANTS) + random.choice(PASSPHRASE_VOWELS) + random.choice(PASSPHRASE_CONSONANTS)
|
||||
|
||||
def random_word(syllables=3):
|
||||
parts = []
|
||||
|
||||
for _ in range(syllables):
|
||||
parts.append(random_syllable())
|
||||
|
||||
return "".join(parts)
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
import hashlib
|
||||
|
||||
def hash_password(p, hexdigest=True):
|
||||
h = hashlib.blake2b()
|
||||
|
||||
h.update(p.encode("utf-8"))
|
||||
|
||||
return h.hexdigest() if hexdigest else h.digest()
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
def raw_to_hex(raw, separator=" "):
|
||||
"""
|
||||
Converts a bytearray (or bytes) into its textual hexadecimal representation.
|
||||
"""
|
||||
|
||||
output = []
|
||||
|
||||
for o in raw:
|
||||
output.append(hex(o)[2:].zfill(2))
|
||||
|
||||
return separator.join(output)
|
||||
|
||||
def hex_to_raw(text, separator=" "):
|
||||
"""
|
||||
Converts a hexadecimal representation of a byte array into a bytearray.
|
||||
"""
|
||||
|
||||
output = []
|
||||
|
||||
text = text.replace(separator, "")
|
||||
|
||||
for i in range(len(text)//2):
|
||||
output.append(int(text[2*i:2*i+2], 16))
|
||||
|
||||
return bytearray(output)
|
||||
|
||||
# ----------------------------------------------------------------
|
100
dcd/cfg.py
Normal file
100
dcd/cfg.py
Normal file
|
@ -0,0 +1,100 @@
|
|||
#! /usr/bin/env python3
|
||||
# encoding: utf-8
|
||||
|
||||
import jsmin
|
||||
import json
|
||||
|
||||
class Config:
|
||||
"""
|
||||
A hierarchical key-value container that can be loaded from JSON or a Python dict.
|
||||
|
||||
Any contained dicts are automatically converted to Config instances as well.
|
||||
|
||||
Supports sequential overwriting - so one can load a default config and then
|
||||
progressively overwrite it with overlays:
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
>>> raw
|
||||
{'a': 1, 'b': {'ba': 2, 'bb': 3, 'bc': {'bca': None}}, 'c': 4}
|
||||
>>> overlay
|
||||
{'b': {'bc': {'bca': 42}}}
|
||||
|
||||
|
||||
|
||||
while stack:
|
||||
current = stack[-1]
|
||||
|
||||
...
|
||||
|
||||
stack.pop()
|
||||
|
||||
for key in overlay.keys():
|
||||
|
||||
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, d, readonly=False):
|
||||
assert type(d) == dict
|
||||
object.__setattr__(self, "raw", d)
|
||||
object.__setattr__(self, "readonly", readonly)
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, path, readonly=False):
|
||||
with open(path) as f:
|
||||
initial = json.loads(jsmin.jsmin(f.read()))
|
||||
|
||||
return cls(initial, readonly)
|
||||
|
||||
def update(self, d):
|
||||
|
||||
|
||||
def keys(self):
|
||||
return self.raw.keys()
|
||||
|
||||
def values(self):
|
||||
return self.raw.values()
|
||||
|
||||
def items(self):
|
||||
return self.raw.items()
|
||||
|
||||
def __getitem__(self, name):
|
||||
return self.raw[name]
|
||||
|
||||
def __setitem__(self, name, value):
|
||||
if self.readonly:
|
||||
raise AttributeError("config is read-only")
|
||||
|
||||
self.raw[name] = value
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
if self.readonly:
|
||||
raise AttributeError("config is read-only")
|
||||
|
||||
self.raw[name] = value
|
||||
|
||||
def __getattr__(self, name):
|
||||
matches = [key for key in self.raw if key.replace("-", "_") == name]
|
||||
|
||||
if matches:
|
||||
assert len(matches) == 1
|
||||
|
||||
value = self.raw[matches[0]]
|
||||
|
||||
if type(value) == dict:
|
||||
return Config(value)
|
||||
else:
|
||||
return value
|
||||
else:
|
||||
raise KeyError(name)
|
||||
|
||||
def __contains__(self, name):
|
||||
return name in self.raw
|
||||
|
||||
def __repr__(self):
|
||||
return "Config(%s)" %(" ".join("%s=%s" %(k, "…" if type(v) == dict else v) for k, v in self.raw.items()))
|
16
dcd/dt.py
Normal file
16
dcd/dt.py
Normal file
|
@ -0,0 +1,16 @@
|
|||
#! /usr/bin/env python3
|
||||
# encoding: utf-8
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
import datetime
|
||||
import pytz
|
||||
|
||||
def now(tz_name="UTC"):
|
||||
t = datetime.datetime.now()
|
||||
tz = pytz.timezone(tz_name)
|
||||
|
||||
return tz.localize(t)
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue