#! /usr/bin/env python3 # encoding: utf-8 import datetime import os import pwd import socket import subprocess import sys COLOR_LOAD_IDLE = 10 COLOR_LOAD_OK = 2 COLOR_LOAD_WARN = 3 COLOR_LOAD_ERROR = 1 COLOR_USER_USER = 10 COLOR_USER_DAEMON = 32 COLOR_USER_ROOT = 196 COLOR_SESSION_LOCAL = 7 COLOR_SESSION_REMOTE = 202 COLOR_TERM_SCREEN = 27 COLOR_CLOCK = 27 COLOR_CLOCK_DELTA = 11 COLOR_MEMSWAP = 7 COLOR_SPACE_BTRFS = 12 COLOR_SPACE_OK = 2 COLOR_SPACE_WARN = 3 COLOR_SPACE_ERROR = 1 COLOR_DIR_RW = 34 COLOR_DIR_RO = 202 COLOR_DIR_NO = 196 COLOR_GIT_CLEAN = 10 COLOR_GIT_DIRTY = 1 COLOR_GIT_MERGE = 196 COLOR_GIT_UNTRACKED = 202 COLOR_GIT_MODIFIED = 1 COLOR_GIT_STAGED = 10 COLOR_PROMPT_OK = 14 COLOR_PROMPT_ERROR = 1 COLOR_OK = 10 COLOR_ERROR = 196 MOUNT_IGNORE_FS = ["iso9660", "tmpfs", "rootfs"] MOUNT_IGNORE_DIR = ["/dev", "/proc", "/sys"] def get_username(): return pwd.getpwuid(os.geteuid())[0] def style_color(fg): return "\033[38;5;%dm" %(fg) def style_bold(): return "\033[1m" def style_invert(): return "\033[7m" def style_reset(): return "\033[0m" def colored_strlen(raw): """ Returns string length without ANSI control escapes. """ l = 0 reading_escape = False for c in raw: if c == "\x1b": reading_escape = True continue if reading_escape and c == "m": reading_escape = False continue if not reading_escape: l += 1 return l def si_number(raw): if raw > 2**40: raw /= 2**40 si = "Ti" elif raw > 2**30: raw /= 2**30 si = "Gi" elif raw > 2**20: raw /= 2**20 si = "Mi" elif raw > 2**10: raw /= 2**10 si = "ki" else: si = "" return (raw, si) def space_string(free): """ Returns a formatting string suitable for printing out free space: float for >1k, int otherwise. """ return "%.1f%s" if free > 1024 else "%d%s" class Sysload: def __init__(self): loadavg = open("/proc/loadavg").read().split() cores = os.sysconf(os.sysconf_names["SC_NPROCESSORS_ONLN"]) self.load1 = float(loadavg[0]) / cores self.load5 = float(loadavg[1]) / cores self.load15 = float(loadavg[2]) / cores self.tasks_ready, self.tasks_total = [int(x) for x in loadavg[3].split("/")] class Settings: def __init__(self, argv): try: self.term_width = int(argv[1]) self.cwd = argv[2] self.last_refresh_time = int(argv[3]) if argv[3] else None self.load_warn = float(argv[4]) self.load_error = float(argv[5]) self.space_warn = float(argv[6]) self.space_error = float(argv[7]) self.is_root = os.geteuid() == 0 self.is_daemon = os.geteuid() < 1000 except: print("Usage: %s $COLUMNS $PWD last_refresh_time load_warn load_error space_warn space_error") print(" e.g. %s 1 $COLUMNS $PWD 25 1.25 2.5 0.15 0.05") print() raise def get_load_color(self, load): if load > self.load_error: return COLOR_LOAD_ERROR elif load > self.load_warn: return COLOR_LOAD_WARN elif load > 0.1: return COLOR_LOAD_OK else: return COLOR_LOAD_IDLE def get_space_color(self, free, total): free = free / total if free < self.space_error: return COLOR_SPACE_ERROR elif free < self.space_warn: return COLOR_SPACE_WARN else: return COLOR_SPACE_OK class Part: def __init__(self): self.fragments = [] def __str__(self): return "".join(self.fragments) def __len__(self): return colored_strlen(str(self)) class LoginPart(Part): """ username@hostname:screen - username is green for users, red for root - @ is a literal "@" if this is a local session, " ⇄ " when remote (ssh) - hostname is colored according to sysload - screen is the screen window ID if available """ def __init__(self, settings, sysload): Part.__init__(self) is_remote = bool(os.getenv("SSH_CLIENT")) screen = os.getenv("WINDOW") # username if settings.is_root: self.fragments.append(style_color(COLOR_USER_ROOT)) elif settings.is_daemon: self.fragments.append(style_color(COLOR_USER_DAEMON)) else: self.fragments.append(style_color(COLOR_USER_USER)) self.fragments.append(style_bold()) user = get_username() self.fragments.append(user) self.fragments.append(style_reset()) # sign if is_remote: self.fragments.append(style_color(COLOR_SESSION_REMOTE)) else: self.fragments.append(style_color(COLOR_SESSION_LOCAL)) sign = " ⇄ " if is_remote else "@" self.fragments.append(sign) # hostname self.fragments.append(style_color(settings.get_load_color(sysload.load1))) hostname = socket.gethostname() self.fragments.append(hostname) # screen window ID if screen: self.fragments.append(style_reset()) self.fragments.append(":") self.fragments.append(style_bold()) self.fragments.append(style_color(COLOR_TERM_SCREEN)) self.fragments.append(screen) self.fragments.append(style_reset()) def list_homes(): already_listed = set() with open("/etc/passwd") as f: for line in f: tokens = line.split(":") if tokens[5] != "/dev/null": if tokens[5] not in already_listed: already_listed.add(tokens[5]) yield (tokens[5], tokens[0]) class Dir: def __init__(self, path, name=None, slash="/"): self.path = path self.slash = slash self.truncated = False self.void = False self.name = os.path.basename(path) if name is None else name if os.access(path, os.W_OK): self.color = COLOR_DIR_RW elif os.access(path, os.X_OK): self.color = COLOR_DIR_RO else: self.color = COLOR_DIR_NO if os.path.islink(path) and os.path.realpath(path).startswith("/dev/shm/void"): self.name = "«" + self.name + "»" self.void = True def __len__(self): return len(self.name) + len(self.slash) def truncate(self): self.truncated = True if self.name: self.name = self.name[0] + "…" def __str__(self): output = [self.slash] if self.truncated: output.append(style_bold()) if self.void: output.append(style_invert()) output.append(style_color(self.color)) output.append(self.name) output.append(style_reset()) return "".join(output) class PathPart(Part): """ /path/to/cwd Contructs a list of Dirs from the root to CWD. When shrink_fit is called, some Dirs may be shortened. """ def __init__(self, settings, overloaded): Part.__init__(self) self.term_width = settings.term_width self.dirs = [] homes = {} if overloaded else dict(list_homes()) dirs = settings.cwd.split("/")[1:] path = "" for dir in dirs: path += "/" + dir if path in homes: self.dirs = [] if homes[path] == get_username(): self.dirs.append(Dir(path, "", "~")) else: self.dirs.append(Dir(path, homes[path], "~")) else: self.dirs.append(Dir(path)) @property def full_length(self): return sum(len(dir) for dir in self.dirs) def shrink_fit(self, line): self.fragments = [] available_space = self.term_width - sum(len(part) for part in line if part is not self) - len(line) + 1 # account for single-space separators for dir in self.dirs: if self.full_length > available_space: dir.truncate() self.fragments.append(str(dir)) def command(cmd, timeout=0.5): """ Executes a command, returns stdout, suppresses stderr." """ if type(cmd) == str: cmd = cmd.split() s = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) s.wait(timeout) return s.stdout.read().decode("utf-8") class VirtualEnvPart(Part): """ ⬢ path Displays the current python virtualenv's path iff set. """ def __init__(self): Part.__init__(self) virtualenv = os.getenv("VIRTUAL_ENV") if virtualenv: self.fragments.append("⬢ ") self.fragments.append(style_color(COLOR_OK)) parts = virtualenv.split("/") if parts[-1] == "env" and len(parts) > 1: venv_name = parts[-2] else: venv_name = parts[-1] self.fragments.append(venv_name) self.fragments.append(style_reset()) class VoidPart(Part): """ ∅ name Displays the name(s) of the VOIDs held by the current shell and its ancestors. """ def __init__(self): Part.__init__(self) voids = os.getenv("OVER_PROMPT_VOIDS") if voids: self.fragments.append("∅ ") for void in voids.split(":"): if void: self.fragments.append(style_color(COLOR_OK)) self.fragments.append(void) self.fragments.append(style_reset()) class WineprefixPart(Part): """ ♿ name:[32|64] Displays the current WINEPREFIX (iff set) basename and x86 subarchitecture (32 or 64). The basename is green if the directory exists, red otherwise. """ def __init__(self): Part.__init__(self) wineprefix = os.getenv("WINEPREFIX") arch = os.getenv("WINEARCH") if wineprefix: self.fragments.append("♿ ") self.fragments.append(style_color(COLOR_OK if os.path.isdir(wineprefix) else COLOR_ERROR)) self.fragments.append(os.path.basename(wineprefix)) self.fragments.append(style_reset()) if arch == "win32": self.fragments.append(":32") elif arch: self.fragments.append(":") self.fragments.append(style_color(COLOR_ERROR)) self.fragments.append("?") self.fragments.append(style_reset()) else: self.fragments.append(":64") class GitPart(Part): """ ↘2 ↗4 ⚠M master ?11 ✎6 ✉10 - 2 commits are available for pulling (remote is ahead) - 4 commits are available for pushing (local is ahead) - we're merging - branch name is master - 11 untracked files - 6 modified files - 10 modified and staged files Numeric elements are hidden if the value is 0. """ def __init__(self): Part.__init__(self) branch_name = command("git name-rev --name-only --no-undefined --always HEAD").strip() if branch_name: try: self.fragments.append("↱ ") count_to_pull = command("git log --oneline ..@{u}").count("\n") count_to_push = command("git log --oneline @{u}..").count("\n") git_dir = command("git rev-parse --git-dir").strip() merging = os.path.exists(os.path.join(git_dir, "MERGE_HEAD")) untracked = command("git ls-files --other --exclude-standard", timeout=0.2).count("\n") modified = command("git diff --name-only").count("\n") staged = command("git diff --name-only --staged").count("\n") if count_to_pull: self.fragments.append("↘%d " %(count_to_pull)) if count_to_push: self.fragments.append("↗%d " %(count_to_push)) if merging: self.fragments.append(style_color(COLOR_GIT_MERGE)) self.fragments.append(style_bold()) self.fragments.append("⚠M ") elif modified or staged: self.fragments.append(style_color(COLOR_GIT_DIRTY)) else: self.fragments.append(style_color(COLOR_GIT_CLEAN)) self.fragments.append(branch_name) self.fragments.append(style_reset()) if untracked: self.fragments.append(style_color(COLOR_GIT_UNTRACKED)) self.fragments.append(" ?%d" %(untracked)) if modified: self.fragments.append(style_color(COLOR_GIT_MODIFIED)) self.fragments.append(" ✎%d" %(modified)) if staged: self.fragments.append(style_color(COLOR_GIT_STAGED)) self.fragments.append(" ✉%d" %(staged)) self.fragments.append(style_reset()) except subprocess.TimeoutExpired: self.fragments = [] self.fragments.append(style_color(COLOR_GIT_DIRTY)) self.fragments.append("⚠ git timeout ⚠") self.fragments.append(style_reset()) class Padding(Part): def __init__(self, term_width): Part.__init__(self) self.term_width = term_width def expand_fit(self, line): length = self.term_width - sum(len(part) for part in line if part is not self) - len(line) + 1 # account for single-space separators self.fragments = [" " * length] class StatsPart(Part): """ [ ?Δ1? clock | sysload tasks | m:memory ?s:swap? mountpoint_spaces... - delta is shown if the last prompt refresh was before midnight, and displays how many midnights elapsed since - clock shows current time (in HH:MM) - sysload is current loadavg divided by amount of cores (in 1) - tasks is the total amount of tasks on the system - memory shows current free memory (in octets) - swap shows remaining swap space but only if any is actually used - mountpoint_spaces list one free space per mountpoint """ def __init__(self, settings, sysload): Part.__init__(self) self.fragments.append("[ ") self.fragments.append(style_bold()) # delta if settings.last_refresh_time: then = datetime.datetime.fromtimestamp(settings.last_refresh_time).replace(hour=0, minute=0, second=0, microsecond=0) now = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) midnights = (now - then).days if midnights: self.fragments.append(style_color(COLOR_CLOCK_DELTA)) self.fragments.append("Δ%d " %(midnights)) # clock self.fragments.append(datetime.datetime.now().strftime("\033[38;5;%dm%%H:%%M\033[0m | " %(COLOR_CLOCK))) # sysload self.fragments.append(style_color(settings.get_load_color(sysload.load1))) self.fragments.append("%.1f " %(sysload.load1)) self.fragments.append(style_reset()) # tasks self.fragments.append("%d " %(sysload.tasks_total)) if sysload.load1 >= settings.load_error: self.fragments.append(" | ") self.fragments.append(style_color(COLOR_ERROR)) self.fragments.append(style_bold()) self.fragments.append("⚠ OVERLOAD ⚠ ") self.fragments.append(style_reset()) else: # memory (and swap, if used) mem_total = 0 mem_free = 0 swap_total = 0 swap_free = 0 # MemAvailable is not available on 2**20: self.fragments.append(style_color(COLOR_MEMSWAP)) self.fragments.append(" s") self.fragments.append(style_bold()) self.fragments.append(style_color(settings.get_space_color(swap_free, swap_total))) swap_free_si = si_number(swap_free) self.fragments.append(space_string(swap_free) %(swap_free_si[0], swap_free_si[1])) self.fragments.append(style_reset()) # mountpoints names = [] first_mountpoint = True btrfs_encountered_devices = set() if os.path.isdir("/dev/disk/by-label"): device_to_label = {os.path.join("/dev", os.path.basename(os.readlink(os.path.join("/dev/disk/by-label", x)))): x for x in os.listdir("/dev/disk/by-label")} else: device_to_label = {} with open("/proc/self/mounts") as f: for line in f: device, dir, type, options, *rest = line.split() # skip non-storage mounts if type in MOUNT_IGNORE_FS: continue if any([dir.startswith(d) for d in MOUNT_IGNORE_DIR]): continue if "rw" not in options.split(","): continue # /proc/self/mounts uses a literal \040 string to escape spaces dir = dir.replace("\\040", " ") basename = os.path.basename(dir) if basename: short_name = " " for c in basename: short_name += c if short_name not in names: break else: short_name = " /" # handle btrfs subvolumes if type == "btrfs": if device in btrfs_encountered_devices: continue btrfs_encountered_devices.add(device) btrfs_raw = command(["/sbin/btrfs", "fi", "usage", "-b", dir]) for line in btrfs_raw.split("\n"): if "Device size" in line: stor_total = int(line.split()[-1]) elif "Free" in line: stor_free = int(line.split()[-3]) break else: try: stat = os.statvfs(dir) except PermissionError: continue stor_total = stat.f_blocks * stat.f_bsize stor_free = stat.f_bavail * stat.f_bsize # ignore virtual filesystems if stor_total == 0: continue if first_mountpoint: self.fragments.append(" |") first_mountpoint = False if type == "btrfs": self.fragments.append(style_color(COLOR_SPACE_BTRFS)) self.fragments.append(short_name) if type == "btrfs": self.fragments.append(style_reset()) self.fragments.append(style_bold()) self.fragments.append(style_color(settings.get_space_color(stor_free, stor_total))) stor_free_si = si_number(stor_free) self.fragments.append(space_string(stor_free) %(stor_free_si[0], stor_free_si[1])) self.fragments.append(style_reset()) if __name__ == "__main__": settings = Settings(sys.argv) sysload = Sysload() overloaded = sysload.load1 >= settings.load_error lp = LoginPart(settings, sysload) pp = PathPart(settings, overloaded) gp = "" if overloaded else GitPart() vep = VirtualEnvPart() vop = VoidPart() wp = WineprefixPart() pad = Padding(settings.term_width) sp = StatsPart(settings, sysload) line = [lp, pp, gp, vep, vop, wp, pad, sp] pp.shrink_fit(line) pad.expand_fit(line) line_str = " ".join(str(part) for part in line) sys.stderr.write(line_str) sys.stderr.write("\n") sys.stderr.flush()