over-env/libexec/render.py
Martinez 10345f6e3a organize the repo
remove unused files
create config file
update dotfiles
update ebuild
2015-12-26 16:00:20 +01:00

514 lines
13 KiB
Python
Executable file

#! /usr/bin/env python3
# encoding: utf-8
import datetime
import os
import socket
import subprocess
import sys
COLOR_LOAD_IDLE = 10
COLOR_LOAD_OK = 2
COLOR_LOAD_WARN = 3
COLOR_LOAD_ERROR = 1
COLOR_USER_USER = 10
COLOR_USER_ROOT = 196
COLOR_SESSION_LOCAL = 7
COLOR_SESSION_REMOTE = 202
COLOR_TERM_SCREEN = 27
COLOR_CLOCK = 27
COLOR_CLOCK_DELTA = 11
COLOR_MEMSWAP = 8
COLOR_SPACE_OK = 2
COLOR_SPACE_WARN = 3
COLOR_SPACE_ERROR = 1
COLOR_DIR_RW = 34
COLOR_DIR_RO = 202
COLOR_DIR_NO = 196
COLOR_GIT_CLEAN = 10
COLOR_GIT_DIRTY = 1
COLOR_GIT_MERGE = 196
COLOR_GIT_UNTRACKED = 202
COLOR_GIT_MODIFIED = 1
COLOR_GIT_STAGED = 10
COLOR_PROMPT_OK = 14
COLOR_PROMPT_ERROR = 1
MOUNT_IGNORE_FS = ["iso9660", "tmpfs"]
MOUNT_IGNORE_DIR = ["/dev", "/proc", "/sys"]
GIT_BLACKLIST = ["/var/paludis/repositories"]
def style_color(fg, bg=0):
return "\033[38;5;%dm\033[48;5;%dm" %(fg, bg)
def style_bold():
return "\033[1m"
def style_reset():
return "\033[0m"
def colored_strlen(raw):
"""
Returns string length without ANSI control escapes.
"""
l = 0
reading_escape = False
for c in raw:
if c == "\x1b":
reading_escape = True
continue
if reading_escape and c == "m":
reading_escape = False
continue
if not reading_escape:
l += 1
return l
def si_number(raw):
if raw > 2**40:
raw /= 2**40
si = "Ti"
elif raw > 2**30:
raw /= 2**30
si = "Gi"
elif raw > 2**20:
raw /= 2**20
si = "Mi"
elif raw > 2**10:
raw /= 2**10
si = "ki"
else:
si = ""
return (raw, si)
def space_string(free):
"""
Returns a formatting string suitable for printing out free space: float for >1k, int otherwise.
"""
return "%.2f%s" if free > 1024 else "%d%s"
class Sysload:
def __init__(self):
loadavg = open("/proc/loadavg").read().split()
cores = os.sysconf(os.sysconf_names["SC_NPROCESSORS_ONLN"])
self.load1 = float(loadavg[0]) / cores
self.load5 = float(loadavg[1]) / cores
self.load15 = float(loadavg[2]) / cores
self.tasks_ready, self.tasks_total = [int(x) for x in loadavg[3].split("/")]
class Settings:
def __init__(self, argv):
try:
self.return_value = int(argv[1])
self.term_width = int(argv[2])
self.last_refresh = int(argv[3]) if argv[3] else None
self.load_warn = float(argv[4])
self.load_error = float(argv[5])
self.space_warn = float(argv[6])
self.space_error = float(argv[7])
self.is_root = os.geteuid() == 0
except:
print("Usage: %s return_value $COLUMNS last_refresh load_warn load_error space_warn space_error")
print(" e.g. %s 1 $COLUMNS 25 1.25 2.5 0.15 0.05")
print()
raise
def get_load_color(self, load):
if load > self.load_error:
return COLOR_LOAD_ERROR
elif load > self.load_warn:
return COLOR_LOAD_WARN
elif load > 0.1:
return COLOR_LOAD_OK
else:
return COLOR_LOAD_IDLE
def get_space_color(self, free, total):
free = free / total
if free < self.space_error:
return COLOR_SPACE_ERROR
elif free < self.space_warn:
return COLOR_SPACE_WARN
else:
return COLOR_SPACE_OK
class Part:
def __init__(self):
self.fragments = []
def __str__(self):
return "".join(self.fragments)
def __len__(self):
return colored_strlen(str(self))
class LoginPart(Part):
"""
username@hostname:screen
- username is green for users, red for root
- @ is a literal "@" if this is a local session, "" when remote (ssh)
- hostname is colored according to sysload
- screen is the screen window ID if available
"""
def __init__(self, settings, sysload):
Part.__init__(self)
is_remote = bool(os.getenv("SSH_CLIENT"))
screen = os.getenv("WINDOW")
# username
if settings.is_root:
self.fragments.append(style_color(COLOR_USER_ROOT))
else:
self.fragments.append(style_color(COLOR_USER_USER))
self.fragments.append(style_bold())
user = os.getlogin()
self.fragments.append(user)
self.fragments.append(style_reset())
# sign
if is_remote:
self.fragments.append(style_color(COLOR_SESSION_REMOTE))
else:
self.fragments.append(style_color(COLOR_SESSION_LOCAL))
sign = "" if is_remote else "@"
self.fragments.append(sign)
# hostname
self.fragments.append(style_color(settings.get_load_color(sysload.load1)))
hostname = socket.gethostname()
self.fragments.append(hostname)
# screen window ID
if screen:
self.fragments.append(style_reset())
self.fragments.append(":")
self.fragments.append(style_bold())
self.fragments.append(style_color(COLOR_TERM_SCREEN))
self.fragments.append(screen)
self.fragments.append(style_reset())
def list_homes():
with open("/etc/passwd") as f:
for line in f:
tokens = line.split(":")
if tokens[5] != "/dev/null":
yield (tokens[5], tokens[0])
class Dir:
def __init__(self, path, name=None, slash="/"):
self.path = path
self.slash = slash
self.truncated = False
self.name = os.path.basename(path) if name is None else name
if os.access(path, os.W_OK):
self.color = COLOR_DIR_RW
elif os.access(path, os.X_OK):
self.color = COLOR_DIR_RO
else:
self.color = COLOR_DIR_NO
def __len__(self):
return len(self.name) + len(self.slash)
def truncate(self):
self.truncated = True
if self.name:
self.name = self.name[0] + ""
def __str__(self):
output = [self.slash]
if self.truncated:
output.append(style_bold())
output.append(style_color(self.color))
output.append(self.name)
output.append(style_reset())
return "".join(output)
class PathPart(Part):
"""
/path/to/cwd
Contructs a list of Dirs from the root to CWD. When fit is called, some Dirs may be shortened.
"""
def __init__(self, term_width):
Part.__init__(self)
self.term_width = term_width
self.dirs = []
homes = dict(list_homes())
dirs = os.getcwd().split("/")[1:]
path = ""
for dir in dirs:
path += "/" + dir
if path in homes:
self.dirs = []
if homes[path] == os.getlogin():
self.dirs.append(Dir(path, "", "~"))
else:
self.dirs.append(Dir(path, homes[path], "~"))
else:
self.dirs.append(Dir(path))
@property
def full_length(self):
return sum(len(dir) for dir in self.dirs)
def shrink_fit(self, line):
self.fragments = []
available_space = self.term_width - sum(len(part) for part in line if part is not self) - len(line) + 1 # account for single-space separators
for dir in self.dirs:
if self.full_length > available_space:
dir.truncate()
self.fragments.append(str(dir))
def command(cmd):
"""
Executes a command, returns stdout, suppresses stderr."
"""
s = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
s.wait()
return s.stdout.read().decode("utf-8")
class GitPart(Part):
"""
↘2 ↗4 ⚠master ?11 ✎6 ✉10
- 2 commits are available for pulling (remote is ahead)
- 4 commits are available for pushing (local is ahead)
- we're merging
- branch name is master
- 11 untracked filed
- 6 modified files
- 10 modified and staged files
Numeric elements are hidden if the value is 0.
"""
def __init__(self):
Part.__init__(self)
branch_name = command("git name-rev --name-only --no-undefined --always HEAD").strip()
if branch_name:
self.fragments.append("")
count_to_pull = command("git log --oneline ..@{u}").count("\n")
count_to_push = command("git log --oneline @{u}..").count("\n")
git_dir = command("git rev-parse --git-dir").strip()
merging = os.path.exists(os.path.join(git_dir, "MERGE_HEAD"))
untracked = command("git ls-files --other --exclude-standard").count("\n")
modified = command("git diff --name-only").count("\n")
staged = command("git diff --name-only --staged").count("\n")
if count_to_pull:
self.fragments.append("%d " %(count_to_pull))
if count_to_push:
self.fragments.append("%d " %(count_to_push))
if merging:
self.fragments.append(style_color(COLOR_GIT_MERGE))
self.fragments.append(style_bold())
self.fragments.append("")
elif modified or staged:
self.fragments.append(style_color(COLOR_GIT_DIRTY))
else:
self.fragments.append(style_color(COLOR_GIT_CLEAN))
self.fragments.append(branch_name)
self.fragments.append(style_reset())
if untracked:
self.fragments.append(style_color(COLOR_GIT_UNTRACKED))
self.fragments.append(" ?%d" %(untracked))
if modified:
self.fragments.append(style_color(COLOR_GIT_MODIFIED))
self.fragments.append("%d" %(modified))
if staged:
self.fragments.append(style_color(COLOR_GIT_STAGED))
self.fragments.append("%d" %(staged))
self.fragments.append(style_reset())
class Padding(Part):
def __init__(self, term_width):
Part.__init__(self)
self.term_width = term_width
def expand_fit(self, line):
length = self.term_width - sum(len(part) for part in line if part is not self) - len(line) + 1 # account for single-space separators
self.fragments = [" " * length]
class StatsPart(Part):
"""
[ ?Δ1? clock | sysload tasks | m:memory ?s:swap? mountpoint_spaces...
- delta is shown if the last prompt refresh was before midnight, and displays how many midnights elapsed since
- clock shows current time (in HH:MM)
- sysload is current loadavg divided by amount of cores (in 1)
- tasks is the total amount of tasks on the system
- memory shows current free memory (in octets)
- swap shows remaining swap space but only if any is actually used
- mountpoint_spaces list one free space per mountpoint
"""
def __init__(self, settings, sysload):
Part.__init__(self)
self.fragments.append("[ ")
self.fragments.append(style_bold())
# delta
if settings.last_refresh:
then = datetime.datetime.fromtimestamp(settings.last_refresh).replace(hour=0, minute=0, second=0, microsecond=0)
now = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
midnights = (now - then).days
if midnights:
self.fragments.append(style_color(COLOR_CLOCK_DELTA))
self.fragments.append("Δ%d " %(midnights))
# clock
self.fragments.append(datetime.datetime.now().strftime("\033[38;5;%dm%%H:%%M\033[0m | " %(COLOR_CLOCK)))
# sysload
self.fragments.append(style_color(settings.get_load_color(sysload.load1)))
self.fragments.append("%.2f " %(sysload.load1))
self.fragments.append(style_reset())
# tasks
self.fragments.append("%d | " %(sysload.tasks_total))
# memory (and swap, if used)
mem_total = 0
mem_free = 0
swap_total = 0
swap_free = 0
with open("/proc/meminfo") as f:
for line in f:
if line.startswith("MemTotal"):
mem_total = int(line.split()[1]) * 1024
elif line.startswith("MemAvailable"):
mem_free = int(line.split()[1]) * 1024
elif line.startswith("SwapTotal"):
swap_total = int(line.split()[1]) * 1024
elif line.startswith("SwapFree"):
swap_free = int(line.split()[1]) * 1024
self.fragments.append(style_color(COLOR_MEMSWAP))
self.fragments.append("m")
self.fragments.append(style_bold())
self.fragments.append(style_color(settings.get_space_color(mem_free, mem_total)))
mem_free_si = si_number(mem_free)
self.fragments.append(space_string(mem_free) %(mem_free_si[0], mem_free_si[1]))
self.fragments.append(style_reset())
if swap_total - swap_free > 2**20:
self.fragments.append(style_color(COLOR_MEMSWAP))
self.fragments.append(" s")
self.fragments.append(style_bold())
self.fragments.append(style_color(settings.get_space_color(swap_free, swap_total)))
swap_free_si = si_number(swap_free)
self.fragments.append(space_string(swap_free) %(swap_free_si[0], swap_free_si[1]))
self.fragments.append(style_reset())
# mountpoints
names = []
with open("/proc/self/mounts") as f:
for line in f:
_, dir, type, options, *rest = line.split()
# skip non-storage mounts
if type in MOUNT_IGNORE_FS:
continue
if any([dir.startswith(d) for d in MOUNT_IGNORE_DIR]):
continue
if "rw" not in options.split(","):
continue
# /proc/self/mounts uses a literal \\040 string to escape spaces
dir = dir.replace("\\040", " ")
basename = os.path.basename(dir)
if basename:
short_name = " "
for c in basename:
short_name += c
if short_name not in names:
break
else:
short_name = " /"
stat = os.statvfs(dir)
stor_total = stat.f_blocks * stat.f_bsize
stor_free = stat.f_bavail * stat.f_bsize
self.fragments.append(short_name)
self.fragments.append(style_bold())
self.fragments.append(style_color(settings.get_space_color(stor_free, stor_total)))
stor_free_si = si_number(stor_free)
self.fragments.append(space_string(stor_free) %(stor_free_si[0], stor_free_si[1]))
self.fragments.append(style_reset())
if __name__ == "__main__":
settings = Settings(sys.argv)
sysload = Sysload()
lp = LoginPart(settings, sysload)
pp = PathPart(settings.term_width)
gp = GitPart()
pad = Padding(settings.term_width)
sp = StatsPart(settings, sysload)
line = [lp, pp, gp, pad, sp]
pp.shrink_fit(line)
pad.expand_fit(line)
line_str = " ".join(str(part) for part in line)
sys.stderr.write(line_str)
sys.stderr.write("\n")
sys.stderr.flush()